
什么是CompletableFutureCompletableFuture是jdk8的新特性,在Java中作为一种多步骤多任务的任务编排工具出现completableFuture与stream流默认都是使用的forkjoin类型的线程池这是一种全局线程池ForkJoinPool.commonPool()他的核心数等于cpu数量工作原理是任务拆分并行执行结果合并基于线程窃取的思想在项目中使用会有难以预估的问题出现CompletableFuture有哪些方法创建异步任务supplyAsync是创建带有返回值的异步任务runAsync是创建没有返回值的异步任务。获取结果的方法get() 获取任务执行结果无返回值的任务返回null也可以设置最大等待时间join() 完成时返回结果值否则抛出unchecked异常。getNow(T valueIfAbsent) 如果完成则返回结果值或抛出任何遇到的异常否则返回给定的 valueIfAbsent。complete(T value ) 如果任务没有完成返回的值设置为给定值completeExceptionally(Throwable ex)如果任务没有完成就抛出给定异常异步任务回调thenApply 表示某个任务执行完成后执行的动作即回调方法会将该任务的执行结果即方法返回值作为入参传递到回调方法中带有返回值。thenAccep表示某个任务执行完成后执行的动作即回调方法会将该任务的执行结果即方法返回值作为入参传递到回调方法中无返回值。thenRun表示某个任务执行完成后执行的动作即回调方法无入参无返回值。whenComplete是当某个任务执行完成后执行的回调方法会将执行结果或者执行期间抛出的异常传递给回调方法如果是正常执行则异常为null回调方法对应的CompletableFuture的result和该任务一致如果该任务正常执行则get方法返回执行结果如果是执行异常则get方法抛出异常handle和handleAsync跟whenComplete基本一致区别在于handle的回调方法有返回值。多任务组合处理thenCombine、thenAcceptBoth 和runAfterBoth这三个方法都是将两个CompletableFuture组合起来处理只有两个任务都正常完成时才进行下阶段任务。区别thenCombine会将两个任务的执行结果作为所提供函数的参数且该方法有返回值thenAcceptBoth同样将两个任务的执行结果作为方法入参但是无返回值runAfterBoth没有入参也没有返回值。注意两个任务中只要有一个执行异常则将该异常信息作为指定任务的执行结果。applyToEither、acceptEither和runAfterEither这三个方法和上面一样也是将两个CompletableFuture组合起来处理当有一个任务正常完成时就会进行下阶段任务。区别applyToEither会将已经完成任务的执行结果作为所提供函数的参数且该方法有返回值acceptEither同样将已经完成任务的执行结果作为方法入参但是无返回值runAfterEither没有入参也没有返回值。allOf多个任务都执行完成后才返回只要有一个任务执行异常则返回的CompletableFuture执行get方法时会抛出异常如果都是正常执行则get返回null。anyOf 是多个任务只要有一个任务有异常则CompletableFuture执行get方法时会抛出异常如果都是正常执行则get返回最快完成任务的结果。使用completableFuture的难点异常被吞噬如果链中某环节异常且未处理异常可能不会传播。务必在链的末端添加异常处理。· 2. 错误使用 get() / join()在回调中直接调用会阻塞线程抵消异步优势。应使用 thenAccept 等回调。· 3. 线程安全问题多个异步任务并发修改共享集合如 ArrayList会导致问题。需使用 synchronizedList 等并发安全容器。· 4. 默认线程池瓶颈ForkJoinPool.commonPool() 是全局共享的可能被拖慢。生产环境务必使用自定义线程池。· 5. 忽略超时控制任务可能因外部服务故障而永久阻塞。应配合 orTimeout() 等方法设置超时。通过简单例子说明其运行逻辑我们用一个最经典的 supplyAsync thenApply thenAccept 例子把时间线拆成毫秒级看线程和内存如何流转。ExecutorServiceexecutorExecutors.newFixedThreadPool(2);CompletableFuture.supplyAsync(()-{returnHello;// 任务1},executor).thenApply(s-{returns World;// 任务2}).thenAccept(s-{System.out.println(s);// 任务3});阶段一注册主线程耗时 1ms主线程调用 supplyAsync生成 CF1未完成resultnullstacknull将业务逻辑 () - “Hello” 提交给线程池。主线程调用 thenApply生成 CF2未完成并将 UniApply 回调节点压入 CF1 的 stack 栈顶此时 CF1.stack 指向该节点。主线程调用 thenAccept生成 CF3未完成并将 UniAccept 节点压入 CF2 的 stack 栈顶。此时依赖链CF1.stack → UniApply(指向CF2) CF2.stack → UniAccept(指向CF3)。注意thenAccept 并不是挂在 CF1 上的而是挂在 thenApply 返回的 CF2 上。阶段二任务1执行线程池线程 T1 执行· T1 执行 () - “Hello”通过 CAS 将 CF1.result 从 null 改为 “Hello”。· CAS 成功T1 立即调用 CF1.postComplete()。阶段三触发风暴依旧是 T1 线程执行postComplete 内部开始弹出 CF1.stack弹出栈顶拿到 UniApply 节点。T1 调用该节点的 tryFire(SYNC)。因为这里用的是 thenApply没有 Async且未指定自定义线程池SYNC 参数指示 T1 直接在当前线程同步执行 s - s World。执行完毕得到 “Hello World”通过 CAS 写入 CF2.result。tryFire 返回 0代表内部生成了新的子 CFpostComplete 检测到后将当前处理对象 f 指向 CF2。继续 while 循环弹出 CF2.stack拿到 UniAccept 节点。依旧是 T1 同步执行 System.out.println(s)打印 “Hello World”。执行完毕将 CF3.result 设为 nullVoid 类型。流程结束全程只有 T1 这一个线程完成了全部工作主线程和线程池 T2 完全空闲。completableFuture的底层原理其本质是一个基于cas的treiber(并发链栈)的事件驱动状态机没有复杂的state状态位只有两个volatile修饰的字段来驱动volatile object reslutvolatile completion stackresult为null表示未完成非null表示已完成stack是一个先进先出的并发栈所有的thenapplythenaccept等回调都会被封装成一个completion节点加入栈中当你调用 thenApply(fn) 时底层会生成一个 UniApply 对象继承自 Completion。这个节点内持有· executor执行该回调的线程池若为 null 则由当前完成线程直接跑。· dep指向依赖的前置任务即父任务的引用。· fn你传入的具体业务函数流程分为两步· 步骤一入栈当父任务未完成时调用 thenApply 会将新生成的 UniApply 节点通过 CAS 压入父任务的 stack 头部并不会立即执行。· 步骤二触发 postComplete当父任务通过 CAS 将 result 从 null 改为具体值即任务完成时它会立刻调用 postComplete() 方法。该方法会逆序后进先出弹出 stack 中的节点并尝试执行这些回调。相关细节执行回调时若该 Completion 节点没有绑定自定义线程池则由当前完成任务的线程即调用 complete() 的线程直接同步运行。若绑定了线程池则提交给线程池。这种策略叫 “Run-or-Post”目的是减少线程切换开销但若回调链路极长会导致当前线程长时间占用 CPU。当 postComplete 执行完第一个回调如 thenApply后该回调内部会生成一个新的 CompletableFuture 结果。此时该新任务会再次调用自己的 postComplete()去触发它自己的 stack 里的下一层回调。注意postComplete 会用一个 for 循环疯狂弹栈直到栈空。这存在一个隐患如果任务链极深几千层可能引发 StackOverflowError因为它在处理依赖时可能会递归调用。join() 并非无限自旋。如果读取 result null未完成它会调用 waitingGet()内部会创建一个 WaitNode 节点同样压入 stack 栈然后通过 LockSupport.park() 挂起当前线程。直到 complete() 触发 postComplete() 时出栈发现有 WaitNode才会调用 LockSupport.unpark() 唤醒该线程。当你调用 future.complete(value) 时底层只做一件事通过 CAS 将 result 字段从 null 改为 value。· 如果 CAS 成功代表此任务只能被完成一次其他线程并发调用会失败随后立即触发 postComplete() 释放所有等待的阻塞线程如 join()。· 如果 CAS 失败说明结果已被设置直接忽略。