等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。
假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多 线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完 成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用 join()方法,如代码清单8-1所示。
代码清单8-1 JoinCountDownLatchTest.java
 


public class JoinCountDownLatchTest {
	public static void main(String[] args) throws InterruptedException {
		Thread parser1 = new Thread(new Runnable() {
			@Override
			public void run() {
			}
		});
		Thread parser2 = new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("parser2 finish");
			}
		});
		parser1.start();
		parser2.start();

		parser1.join();
		parser2.join();
		System.out.println("all parser finish");
	}
}

join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存 活,如果join线程存活则让当前线程永远等待。其中,wait(0)表示永远等待下去,代码片段如
下。

while (isAlive()) { 
wait(0);
 }

直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里 实现的,所以在JDK里看不到,大家可以查看JVM源码。
在JDK 1.5之后的并发包中提供的CountDownLatch也可以实现join的功能,并且比join的功 能更多,如代码清单8-2所示。
代码清单8-2 CountDownLatchTest.java
 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {
	static CountDownLatch c = new CountDownLatch(2);

	public static void main(String[] args) throws InterruptedException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(1);
				c.countDown();
				System.out.println(2);
				c.countDown();
			}
		}).start();
		
		c.await( );
		System.out.println("3");
	}
}

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完 成,这里就传入N。
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法 会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可。
如果有某个解析sheet的线程处理得比较慢,我们不可能让主线程一直等待,所以可以使 用另外一个带指定时间的await方法——await(long time,TimeUnit unit),这个方法等待特定时 间后,就会不再阻塞当前线程。join也有类似的方法。

计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会 阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数 器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

CountDownLatch是java.util.concurrent包中一个类,CountDownLatch主要提供的机制是多个(具体数量等于初始化CountDownLatch时count的值)线程都达到了预期状态或者完成了预期工作时触发事件,其他线程可以等待这个事件来触发自己后续的工作。等待的线程可以是多个,即CountDownLatch可以唤醒多个等待的线程。到达自己预期状态的线程会调用CountDownLatch的countDown方法,而等待的线程会调用CountDownLatch的await方法。 
等待多线程完成的CountDownLatch

 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;

public class CountDownLatchTest {
	public static void main(String[] args) throws InterruptedException {
	        CountDownLatch countDown = new CountDownLatch(1);
	        CountDownLatch await = new CountDownLatch(5);
	 
	        // 依次创建并启动处于等待状态的5个MyRunnable线程
	        for (int i = 0; i < 5; ++i) {
	            new Thread(new MyRunnable(countDown, await)).start();
	        }
	 
	        System.out.println("用于触发处于等待状态的线程开始工作......");
	        System.out.println("用于触发处于等待状态的线程工作完成,等待状态线程开始工作......");
	        Thread.sleep(1000); 
	        countDown.countDown();
	        await.await();
	        System.out.println("Bingo!");
	}
	 
	
}
  class MyRunnable implements Runnable {
	 
    private final CountDownLatch countDown;
    private final CountDownLatch await;
 
    public MyRunnable(CountDownLatch countDown, CountDownLatch await) {
        this.countDown = countDown;
        this.await = await;
    }
 
    public void run() {
        try {
            countDown.await();//等待主线程执行完毕,获得开始执行信号...
            System.out.println("处于等待的线程开始自己预期工作......");
            await.countDown();//完成预期工作,发出完成信号...
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
执行结果

用于触发处于等待状态的线程开始工作......
用于触发处于等待状态的线程工作完成,等待状态线程开始工作......
处于等待的线程开始自己预期工作......
处于等待的线程开始自己预期工作......
处于等待的线程开始自己预期工作......
处于等待的线程开始自己预期工作......
处于等待的线程开始自己预期工作......
Bingo!
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {
		 
	    public static void main(String[] args) throws InterruptedException {
	        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5));
	         
	        int count = 10;
	        final CountDownLatch latch = new CountDownLatch(count);
	 
	        for (int i = 0; i < count; i++) {
	            threadPool.execute(new MyRunnable1(latch, i));
	        }
	 
	        latch.await();
	        System.err.println("等待线程被唤醒!");
	        threadPool.shutdown();
	    }
	}
	 

class MyRunnable1 implements Runnable {
	 
    CountDownLatch latch = null;
    int i;
 
    public MyRunnable1(CountDownLatch latch, int i) {
        this.latch = latch;
        this.i = i;
    }
 
    @Override
    public void run() {
        System.err.println("线程" + i +"完成了操作...");
        try {
            Thread.currentThread();
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        latch.countDown();
    }
 
}
   
输出结果


线程0完成了操作...
线程2完成了操作...
线程1完成了操作...
线程3完成了操作...
线程4完成了操作...
线程5完成了操作...
线程7完成了操作...
线程9完成了操作...
线程8完成了操作...
线程6完成了操作...
等待线程被唤醒!

 

上一篇:pwnable.kr-flag


下一篇:Installing Ruby 2.2 on Centos7