3500字彻底搞懂CompletableFuture内部原理!

360影视 欧美动漫 2025-05-20 22:17 2

摘要:CompletableFuture的构造:ForkJoinPoolCompletableFuture中任务的执行同样依靠ForkJoinPool,代码如下所示。

CompletableFuture的构造:ForkJoinPoolCompletableFuture中任务的执行同样依靠ForkJoinPool,代码如下所示。

通过上面的代码可以看到,asyncPool是一个static类型,supplierAsync、asyncSupplyStage也都是static函数。Static函数会返回一个CompletableFuture类型对象,之后就可以链式调用,CompletionStage里面的各个方法。

ForkJoinPool接受的任务是ForkJoinTask 类型,而我们向CompletableFuture提交的任务是Runnable/Supplier/Consumer/Function。

因此,肯定需要一个适配机制,把这四种类型的任务转换成ForkJoinTask,然后提交给ForkJoinPool,如图8-1所示。

为了完成这种转换,在CompletableFuture内部定义了一系列的内部类,图8-2所示为CompletableFuture的各种内部类的继承体系。

在 supplierAsync(..)函数内部,会把一个 Supplier 转换成一个 AsyncSupply,然后提交给ForkJoinPool执行;

在runAsync(..)函数内部,会把一个Runnable转换成一个AsyncRun,然后提交给ForkJoinPool执行;

在 thenRun/thenAccept/thenApply 内部,会分别把 Runnable/Consumer/Function 转换成UniRun/UniAccept/UniApply对象,然后提交给ForkJoinPool执行;

除此之外,还有两种 CompletableFuture 组合的情况,分为“与”和“或”,所以有对应的Bi和Or类型的Completion类型。

下面的代码分别为 UniRun、UniApply、UniAccept 的定义,可以看到,其内部分别封装了Runnable、Function、Consumer。

图8-3所示为CompletableFuture的接口层面和内部实现层面对比。

下 面 以 CompletableFuture.supplyAsync ( … ) .thenApply(…).thenRun(…) 链式代码为例,分析整个执行过程。

第1步:CompletableFuture future1=CompletableFuture.supplyAsync(…)

在上面的代码中,关键是构造了一个AsyncSupply对象,该对象有三个关键点:

(1)它继承自ForkJoinTask,所以能够提交ForkJoinPool来执行。

(2)它封装了Supplier f,即它所执行任务的具体内容。

(3)该任务的返回值,即CompletableFuture d,也被封装在里面。

图8-4所示为这几个概念之间的关系。ForkJoinPool执行一个ForkJoinTask类型的任务,即AsyncSupply。该任务的输入就是Supply,输出结果存放在CompletableFuture中。

图8-4 几个概念之间的关系

第2步:CompletableFuture future2=future1.thenApply(…)第1步的返回值,也就是上面代码中的 CompletableFuture d,紧接着调用其成员函数thenApply。

我们知道,必须等第1步的任务执行完毕,第2步的任务才可以执行。因此,这里提交的任务不可能立即执行,在此处构建了一个UniApply对象,也就是一个ForkJoinTask类型的任务,这个任务放入了第1个任务的栈当中。

每一个CompletableFuture对象内部都有一个栈,存储着是后续依赖它的任务,如下面代码所示。这个栈也就是Treiber Stack,这里的stack存储的就是栈顶指针。

上面的UniApply对象类似于第1步里面的AsyncSupply,它的构造函数传入了4个参数:

第1个参数是执行它的ForkJoinPool;

第2个参数是输出一个CompletableFuture对象。这个参数,也是thenApply函数的返回值,用来链式执行下一个任务;

第3个参数是其依赖的前置任务,也就是第1步里面提交的任务;

第4个参数是输入(也就是一个Function对象)。

UniApply对象被放入了第1步的CompletableFuture的栈中,在第1步的任务执行完成之后,就会从栈中弹出并执行。下面看一下代码:

ForkJoinPool执行上面的AsyncSupply对象的run方法,实质就是执行Supplier的get方法。执行结果被塞入了 CompletableFuture d 当中,也就是赋值给了 CompletableFuture 内部的Object result变量。

调用d.postComplete,也正是在这个函数里面,把第2步压入的UniApply对象弹出来执行,代码如下所示。

第3步:CompletableFuture future3=future2.thenRun(…)第3步和第2步的过程类似,构建了一个 UniRun 对象,这个对象被压入第2步的CompletableFuture所在的栈中。第2步的任务,当执行完成时,从自己的栈中弹出UniRun对象并执行。

总结一下上述过程,如图8-5所示。

通过supplyAsync/thenApply/thenRun,分别提交了3个任务,每1个任务都有1个返回值对象,也就是1个CompletableFuture。这3个任务通过2个CompletableFuture完成串联。后1个任务,被放入了前1个任务的CompletableFuture里面,前1个任务在执行完成时,会从自己的栈中,弹出下1个任务执行。如此向后传递,完成任务的链式执行。

在上面的代码中,我们分析了thenApply,还有一个与之对应的函数是thenApplyAsync。这两个函数调用的是同一个函数,只不过传入的参数不同。

最关键的是上面几行加粗的代码。

如果是thenApplyAsync,则e!=null,构建UniApply对象,入栈;

如果是thenApply,则会调用d.uniApply(this,f,null),该函数代码如下:

通过上面的代码可以看到:

(1)如果前置任务没有完成,即a.result=null,则上面的uniApply会返回false,此时thenApply也会走到thenApplyAsync的逻辑里面,生成UniApply对象入栈;

(2)只有在前置任务已经完成的情况下,thenApply才会立即执行,不会入栈,再出栈,此时thenApply和thenApplyAsync才有区别。同理,thenRun与thenRunAsync、thenAccept与thenAcceptAsync的区别与此类似。

如果任务只是链式执行,便不需要在每个CompletableFuture里面设1个栈了,用1个指针使所有任务组成链表即可。

但实际上,任务不只是链式执行,而是网状执行,组成 1 张图。

如图8-6所示,所有任务组成一个有向无环图:

任务1执行完成之后,任务2、任务3可以并行,在代码层面可以写为:future1.thenApply(任务2),future1.thenApply(任务3);

任务4在任务2执行完成时可开始执行;

任务5要等待任务2、任务3都执行完成,才能开始,这里是And关系;

任务6在任务3执行完成时可以开始执行;

对于任务7,只要任务4、任务5、任务6中任意一个任务结束,就可以开始执行。

总而言之,任务之间是多对多的关系:1个任务有n个依赖它的后继任务;1个任务也有n个它依赖的前驱任务。

这样一个有向无环图,用什么样的数据结构表达呢?And和Or的关系又如何表达呢?

有几个关键点:(1)在每个任务的返回值里面,存储了依赖它的接下来要执行的任务。所以在图8-6中,任务1的CompletableFuture的栈中存储了任务2、任务3;任务2的CompletableFuutre中存储了任务4、任务5;任务3的CompletableFuture中存储了任务5、任务6。也就是说,每个任务的CompletableFuture对象的栈里面,其实存储了该节点的出边对应的任务集合。

(2)任务2、任务3的CompletableFuture里面,都存储了任务5,那么任务5是不是会被触发两次,执行两次呢?

任务5的确会被触发2次,但它会判断任务2、任务3的结果是不是都完成,如果只完成其中一个,它就不会执行。

(3)任务7存在于任务4、任务5、任务6的CompletableFuture的栈里面,因此会被触发三次。但它只会执行一次,只要其中1个任务执行完成,就可以执行任务7了。

(4)正因为有And和Or 两种不同的关系,因此对应BiApply和OrApply两个对象,这两个对象的构造函数几乎一样,只是在内部执行的时候,一个是And的逻辑,一个是Or的逻辑。

(5)BiApply和OrApply都是二元操作符,也就是说,只能传入二个被依赖的任务。但上面的任务7同时依赖于任务4、任务5、任务6,这怎么处理呢?

任何一个多元操作,都能被转换为多个二元操作的叠加。如图8-7所示,假如任务1And任务2And任务3=任务4,那么它可以被转换为右边的形式。新建了一个And任务,这个And任务和任务3再作为参数,构造任务4。Or的关系,与此类似。

明白了任务的有向无环图的存储与计算过程,也就明白了8.1.4节thenCombine的内部实现原理。thenCombine用于任务1、任务2执行完成,再执行任务3,实际场景更为简单,此处不再进一步展开源码讨论。

上面的函数是一个递归函数,输入是一个CompletableFuture对象的列表,输出是一个具有And关系的复合CompletableFuture对象。最关键的代码如上面的加粗代码所示,因为c要等a,b都执行完成之后才能执行,因此c会被分别压入a,b所在的栈中。

图8-8所示为allOf内部的运作过程:方块表示任务,椭圆表示任务的执行结果。假设allof的参数传入了future1、future2、future3、future4,则对应四个原始任务。

生成BiRelay1、BiRelay2任务,分别压入future1/future2、future3/future4的栈中。无论future1或future2完成,都会触发BiRelay1;无论future3或future4完成,都会触发BiRelay2;

生成BiRelay3任务,压入future5/future6的栈中,无论future5或future6完成,都会触发BiRelay3任务。

图8-8 allOf内部的运作过程

BiRelay只是一个中转任务,它本身没有任务代码,只是参照输入的两个future是否完成。如果完成,就从自己的栈中弹出依赖它的BiRelay任务,然后执行。

来源:程序员高级码农II

相关推荐