JUC工具类Phaser

简介

java7中引入了一种新的可重复使用的同步屏障,Phaser阶段器,与CyclicBarrier和CountDownLatch类似,但更强大。

CyclicBarrier解决了CountDownLatch不能重用的问题,但是仍有以下不足:

1)不能动态调整计数器值,假如线程数不足以打破barrier,就只能reset或者多加些线程,在实际运用中显然不现实

2)每次await仅消耗1个计数器值,不够灵活

Phaser就是用来解决这些问题的。Phaser将多个线程协作执行的任务划分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与到某个阶段。

Phaser的常用方法

1、register方法
动态添加一个parties

int register()

2、bulkRegister方法
动态添加多个parties

parties:需要添加的个数
int bulkRegister(int parties)

3、getRegisteredParties方法
获取当前的parties数

int getRegisteredParties()

4、arriveAndAwaitAdvance方法
到达并等待其他线程到达

int arriveAndAwaitAdvance()

5、arriveAndDeregister方法
到达并注销该parties,这个方法不会使线程阻塞

int arriveAndDeregister()

6、arrive方法
到达,但不会使线程阻塞

int arrive()

7、awaitAdvance方法
等待前行,可阻塞也可不阻塞,判断条件为传入的phase是否为当前phaser的phase。如果相等则阻塞,反之不进行阻塞

phase:阶段数值
int awaitAdvance(int phase)

8、awaitAdvanceInterruptibly方法
该方法与awaitAdvance类似,唯一不一样的就是它可以进行打断。

phase:阶段数值
timeout:超时时间
unit:时间单位
int awaitAdvanceInterruptibly(int phase)
int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)

9、getArrivedParties方法
获取当前到达的parties数

int getArrivedParties()

10、getUnarrivedParties方法
获取当前未到达的parties数

int getUnarrivedParties()

11、getPhase方法
获取当前属于第几阶段,默认从0开始,最大为integer的最大值

int getPhase()

12、isTerminated方法
判断当前phaser是否关闭

boolean isTerminated()

13、forceTermination方法
强制关闭当前phaser

void forceTermination()

常用方法演示

package com.dongguo.juc;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/9/19 0019-13:26
 * @description:
 */
public class PhaserTest2 {
    public static void main(String[] args) throws InterruptedException {
        //初始化5个parties
        Phaser phaser = new Phaser(5);

        //只有当全部线程通过时才会进入下一阶段,从0开始
        System.out.println("当前阶段数:"+phaser.getPhase());

        //添加一个parties
        phaser.register();
        System.out.println("当前Parties数:"+phaser.getRegisteredParties());
        //添加多个parties
        phaser.bulkRegister(4);
        System.out.println("当前Parties数:"+phaser.getRegisteredParties());

        new Thread(()->{
            //到达并等待其他线程到达
            phaser.arriveAndAwaitAdvance();
        },"t1").start();
        
        new Thread(()->{
            //到达后注销该parties,不等待其他线程
            phaser.arriveAndDeregister();
            System.out.println("go on");
        },"t2").start();

        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("当前Parties数:"+phaser.getRegisteredParties());
        System.out.println("当前到达数:"+phaser.getArrivedParties());
        System.out.println("当前未达数:"+phaser.getUnarrivedParties());

        //何时会停止,只有当parties中的数量为0时或者调用forceTermination方法就会停止了,我们也可以重写phaser中的onAdvance,给他返回true就会使这个phaser停止了
        System.out.println("phaser是否结束:"+phaser.isTerminated());
        phaser.forceTermination();
        System.out.println("phaser是否结束:"+phaser.isTerminated());
    }
}
运行结果
当前阶段数:0
当前Parties数:6
当前Parties数:10
go on
当前Parties数:9
当前到达数:1
当前未达数:8
phaser是否结束:false
phaser是否结束:true

案例

使用Phaser动态注册parties

package com.dongguo.juc;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/9/19 0019-11:40
 * @description:5个工人全都完成工作,同一时刻完成
 */
public class PhaserTest {

    private static Phaser phaser = new Phaser();

    public static void main(String args[]) {
        for (int i = 1; i <= 5; i++) {
          new Thread(()->{
              phaser.register();

              System.out.println(Thread.currentThread().getName() +" is working");
              try {
                  TimeUnit.SECONDS.sleep(2);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              phaser.arriveAndAwaitAdvance();//等待
              System.out.println(Thread.currentThread().getName() +" work finished "+System.currentTimeMillis());
          },"t"+i).start();
        }
    }
}
运行结果
t1 is working
t3 is working
t2 is working
t4 is working
t5 is working
t4 work finished 1632029871265
t5 work finished 1632029871265
t2 work finished 1632029871265
t1 work finished 1632029871265
t3 work finished 1632029871265

使用Phaser设置多个阶段

package com.dongguo.juc;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/9/19 0019-12:42
 * @description: 模拟3个运动员一同参加3项运动
 */
public class PhaserTest1 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        for (int i = 1; i <= 3; i++) {
            int no =i;
            new Thread(() -> {
                try {
                    System.out.println(no+": 当前处于第:"+phaser.getPhase()+"阶段");
                    System.out.println(no+": start running");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(no+": end running");
                    //等待其他运动员完成跑步
                    phaser.arriveAndAwaitAdvance();

                    System.out.println(no+": 当前处于第:"+phaser.getPhase()+"阶段");
                    System.out.println(no+": start bicycle");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(no+": end bicycle");
                    //等待其他运动员完成骑行
                    phaser.arriveAndAwaitAdvance();

                    System.out.println(no+": 当前处于第:"+phaser.getPhase()+"阶段");
                    System.out.println(no+": start long jump");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(no+": end long jump");
                    //等待其他运动员完成跳远
                    phaser.arriveAndAwaitAdvance();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "t"+i).start();
        }
    }
}
运行结果
1: 当前处于第:0阶段
2: 当前处于第:0阶段
2: start running
1: start running
3: 当前处于第:0阶段
3: start running
2: end running
1: end running
3: end running
3: 当前处于第:1阶段
3: start bicycle
1: 当前处于第:1阶段
2: 当前处于第:1阶段
1: start bicycle
2: start bicycle
1: end bicycle
2: end bicycle
3: end bicycle
1: 当前处于第:2阶段
1: start long jump
3: 当前处于第:2阶段
3: start long jump
2: 当前处于第:2阶段
2: start long jump
1: end long jump
3: end long jump
2: end long jump

总结

Phaser 可以通过register() 方法和arriveAndDeregister() 方法,动态的增加或者减少注册量
使用arriveAndAwaitAdvance,相当于CyclicBarrier机制
使用arrive,相当于countdown机制
可以利用awaitAdvance,让主线程等待子线程全部完成任务

上一篇:并发编程JUC(下)


下一篇:JUC基础(21):ReentrantReadWriteLock读写锁