跟着大神学Java并发原理JDK源码剖析之同步工具类Phaser

360影视 2024-12-29 20:53 4

摘要:用Phaser替代CyclicBarrier和CountDownLatch从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。

用Phaser替代CyclicBarrier和CountDownLatch从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。

1.用Phaser替代CountDownLatch

考虑讲CountDownLatch时的例子,1个主线程要等10个worker线程完成之后,才能做接下来的事情,也可以用Phaser来实现此功能。在CountDownLatch中,主要是2个函数:await和countDown,在Phaser中,与之相对应的函数是awaitAdance(int n)和arrive。

2.用Phaser替代CyclicBarrier

考虑前面讲CyclicBarrier时,10个工程师去公司应聘的例子,也可以用Phaser实现,代码基本类似。

arriveAndAwaitAdance就是 arrive与 awaitAdvance(int)的组合,表示“我自己已到达这个同步点,同时要等待所有人都到达这个同步点,然后再一起前行”。

4.5.2 Phaser新特性

特性1:动态调整线程个数

CyclicBarrier 所要同步的线程个数是在构造函数中指定的,之后不能更改,而 Phaser 可以在运行期间动态地调整要同步的线程个数。Phaser 提供了下面这些函数来增加、减少所要同步的线程个数。

特性2:层次Phaser

多个Phaser可以组成如图4-7所示的树状结构,可以通过在构造函数中传入父Phaser来实现。

图4-7 树状的Phaser

先简单看一下Phaser内部关于树状结构的存储,如下面代码所示。

可以发现,在Phaser的内部结构中,每个Phaser记录了自己的父节点,但并没有记录自己的子节点列表。所以,每个 Phaser 知道自己的父节点是谁,但父节点并不知道自己有多少个子节点,对父节点的操作,是通过子节点来实现的。

树状的Phaser怎么使用呢?考虑如下代码,会组成如图4-8所示的树状Phaser。

图4-8 代码组成的树状Phaser

本来root有两个参与者,然后为其加入了两个子Phaser(c1,c2),每个子Phaser会算作1个参与者,root的参与者就变成2+2=4个。

c1本来有3个参与者,为其加入了一个子Phaser c3,参与者数量变成3+1=4个。c3的参与者初始为0,后续可以通过调用register函数加入。对于树状Phaser上的每个节点来说,可以当作一个独立的Phaser来看待,其运作机制和一个单独的Phaser是一样的。具体来讲:父Phaser并不用感知子Phaser的存在,当子Phaser中注册的参与者数量大于0时,会把自己向父节点注册;当子Phaser中注册的参与者数量等于0时,会自动向父节点解注册。父Phaser把子Phaser当作一个正常参与的线程就可以了。

4.5.3 state变量解析

大致了解了Phaser的用法和新特性之后,下面仔细剖析其实现原理。Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列。先从state变量说起。

这个64位的state变量被拆成4部分,如图4-9所示为state变量各部分表示的意思。

图4-9 state变量各部分表示的意思

最高位0表示未同步完成,1表示同步完成,初始最高位为0。

Phaser提供了一系列的成员函数来从state中获取图4-9中的几个数字,如下所示。

下面再看一下state变量在构造函数中是如何被赋值的:

当parties=0时,state被赋予一个EMPTY常量,常量为1;

当parties!=0时,把phase值左移32位;把parties左移16位;然后parties也作为最低的16位,3个值做或操作,赋值给state。这个赋值操作也反映了图4-9的意思。

4.5.4 阻塞与唤醒(Treiber Stack)

基于上述的state变量,对其执行CAS操作,并进行相应的阻塞与唤醒。如图4-10所示,右边的主线程会调用awaitAdvance进行阻塞;左边的arrive会对state进行CAS的累减操作,当未到达的线程数减到0时,唤醒右边阻塞的主线程。

图4-10 基于state的CAS的阻塞与唤醒示意图

在这里,阻塞使用的是一个称为Treiber Stack的数据结构,而不是AQS的双向链表。Treiber Stack是一个无锁的栈,由R.Kent Treiber在其于1986年发表的论文Systems Programming:Coping with Parallelism 中首次提出。它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要tail指针。实现代码如下所示。

为了减少并发冲突,这里定义了2个链表,也就是2个Treiber Stack。当phase为奇数轮的时候,阻塞线程放在oddQ里面;当phase为偶数轮的时候,阻塞线程放在evenQ里面。代码如下所示。

4.5.5 arrive函数分析

下面看arrive函数是如何对state变量进行操作,又是如何唤醒线程的。

arrive和 arriveAndDeregister内部调用的都是 doArrive(boolean)函数。区别在于前者只是把“未达到线程数”减1;后者则把“未到达线程数”和“下一轮的总线程数”都减1。下面看一下doArrive(boolean)函数的实现。

关于上面的函数,有以下几点说明:

(1)定义了2个常量如下。当 deregister=false 时,只最低的16位需要减 1,adj=ONE_ARRIVAL;当deregister=true时,低32位中的2个16位都需要减1,adj=ONE_ARRIVAL|ONE_PARTY。

(2)把未到达线程数减1。减了之后,如果还未到0,什么都不做,直接返回。如果到0,会做2件事情:第1,重置state,把state的未到达线程个数重置到总的注册的线程数中,同时phase加1;第2,唤醒队列中的线程。

下面看一下唤醒函数:

遍历整个栈,只要栈当中节点的phase不等于当前Phaser的phase,说明该节点不是当前轮的,而是前一轮的,应该被释放并唤醒。

接下来看一下线程是如何被阻塞的。

4.5.6 awaitAdvance函数分析

这里调用了ForkJoinPool.managedBlock(ManagedBlocker blocker)函数,目的是把node对应的线程阻塞。ManagerdBlocker是ForkJoinPool里面的一个接口,定义如下:

QNode实现了该接口,实现原理还是park,如下所示。之所以没有直接使用park/unpark来实现阻塞、唤醒,而是封装了ManagedBlocker这一层,主要是出于使用上的方便考虑。一方面是park可能被中断唤醒,另一方面是带超时时间的park,把这二者都封装在一起。

理解了arrive和awaitAdvance,arriveAndAwaitAdvance就是二者的一个组合版本,此处不再展开分析。

来源:小羊论科技

相关推荐