|
【Java 并发笔记】CountDownLatch 相关整理
文前说明
作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。
本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。
1. 简介
CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。
CountDownLatch 在 JDK 1.5 被引入。
存在于 java.util.concurrent 包下。
例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。
2. CountDownLatch 的原理
CountDownLatch 是通过一个计数器来实现的,计数器的初始值为线程的数量。
每当一个线程完成了自己的任务后,计数器的值就会减 1。
当计数器值到达 0 时,表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
CountDownLatch 原理
CountDownLatch 的伪代码//Main thread start
//Create CountDownLatch for N threads
//Create and start N threads
//Main thread wait on latch
//N threads completes there tasks are returns
//Main thread resume execution |
构造函数
//Constructs a CountDownLatch initialized with the given count.
public CountDownLatch(int count) {...} |
这个计数(count)本质上是闭锁需要等待的线程数。
此值只能设置一次,并且 CountDownLatch 不提供任何其他机制来重置此计数。
第一次与 CountDownLatch 的交互是与等待其他线程的主线程进行的。
此主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务为止。
其他 N 个线程必须引用闭锁对象,因为它们如果完成了任务需要通知 CountDownLatch 对象。
此通知是通过 CountDownLatch.countDown() 方法完成的,每次调用计数减少 1。
当所有 N 个线程都调用了这个方法时,计数将达到 0,主线程可以在 await() 方法之后继续执行。
CountDownLatch 和 ReentrantLock 一样,内部使用 Sync 继承 AQS。
构造函数将计数值(count)传递给 Sync,并且设置了 state。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
......
Sync(int count) {
setState(count);
} |
阻塞线程
await 方法
直接调用了 AQS 的 acquireSharedInterruptibly()。
首先判断是否被中断,中断就抛出异常。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
......
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
} |
tryAcquireShared 方法
首先尝试获取共享锁,实现方式和独占锁类似,由 CountDownLatch 实现判断逻辑。
state 状态变量,state 的值代表着待达到条件的线程数,比如初始化为 5,表示待达到条件的线程数为 5,每次调用 countDown() 函数都会减 1。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
} |
返回 1 代表获取成功,返回 -1 代表获取失败。如果获取失败,需要调用 doAcquireSharedInterruptibly()。
释放操作
countDown 操作实际就是释放锁的操作,每调用一次,计数值减少 1。
public void countDown() {
sync.releaseShared(1);
} |
同样是首先尝试释放锁,具体实现在 CountDownLatch 中。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
......
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
} |
死循环加上 CAS 的方式保证 state 的减 1 操作,当计数值等于 0,代表所有子线程都执行完毕,被 await() 阻塞的线程接着调用 doReleaseShared() 唤醒。
限定时间的 await
CountDownLatch 的 await 方法还有个限定阻塞时间的版本。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
} |
最后调用 doAcquireSharedNanos() 方法,不同之处只是加了时间的处理。
3. CountDownLatch 的使用场景
实现最大的并行性
想同时启动多个线程,实现最大程度的并行性。
例如,想测试一个单例类。如果创建一个初始计数器为 1 的 CountDownLatch,并让其他所有线程都在这个锁上等待,只需要调用一次 countDown() 方法就可以让其他所有等待的线程同时恢复执行。
开始执行前等待 N 个线程完成各自任务
例如应用程序启动类要确保在处理用户请求前,所有 N 个外部系统都已经启动和运行了。
死锁检测
用 N 个线程去访问共享资源,在每个测试阶段线程数量不同,并尝试产生死锁。
4. CountDownLatch 的使用案例
模拟一个应用程序启动类,开始就启动 N 个线程,去检查 N 个外部服务是否正常并通知闭锁。
启动类一直在闭锁上等待,一旦验证和检查了所有外部服务,就恢复启动类执行。
BaseHealthChecker
实现 Runnable 接口,负责所有特定的外部服务健康检查的基类。
import java.util.concurrent.CountDownLatch;
public abstract class BaseHealthChecker implements Runnable {
private CountDownLatch _latch;
private String _serviceName;
private boolean _serviceUp;
public BaseHealthChecker(String serviceName, CountDownLatch latch)
{
super();
this._latch = latch;
this._serviceName = serviceName;
this._serviceUp = false;
}
@Override
public void run() {
try {
verifyService();
_serviceUp = true;
} catch (Throwable t) {
t.printStackTrace(System.err);
_serviceUp = false;
} finally {
if(_latch != null) {
_latch.countDown();
}
}
}
public String getServiceName() {
return _serviceName;
}
public boolean isServiceUp() {
return _serviceUp;
}
public abstract void verifyService();
} |
以下三个类都继承自 BaseHealthChecker,引用 CountDownLatch 实例,除了服务名和休眠时间不同外,都实现各自的 verifyService() 方法。
NetworkHealthChecker
import java.util.concurrent.CountDownLatch;
public class NetworkHealthChecker extends BaseHealthChecker
{
public NetworkHealthChecker (CountDownLatch latch)
{
super("Network Service", latch);
}
@Override
public void verifyService()
{
System.out.println("Checking " + this.getServiceName());
try
{
Thread.sleep(7000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
} |
DatabaseHealthChecker
import java.util.concurrent.CountDownLatch;
public class DatabaseHealthChecker extends BaseHealthChecker
{
public DatabaseHealthChecker (CountDownLatch latch)
{
super("Database Service", latch);
}
@Override
public void verifyService()
{
System.out.println("Checking " + this.getServiceName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
} |
CacheHealthChecker
import java.util.concurrent.CountDownLatch;
public class CacheHealthChecker extends BaseHealthChecker
{
public CacheHealthChecker (CountDownLatch latch)
{
super("Cache Service", latch);
}
@Override
public void verifyService()
{
System.out.println("Checking " + this.getServiceName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
} |
ApplicationStartupUtil
一个主启动类,负责初始化闭锁,然后等待所有服务都被检查完成,再恢复执行。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class ApplicationStartupUtil
{
private static List<BaseHealthChecker> _services;
private static CountDownLatch _latch;
private ApplicationStartupUtil()
{
}
private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
public static ApplicationStartupUtil getInstance()
{
return INSTANCE;
}
public static boolean checkExternalServices() throws Exception
{
_latch = new CountDownLatch(3);
_services = new ArrayList<BaseHealthChecker>();
_services.add(new NetworkHealthChecker(_latch));
_services.add(new CacheHealthChecker(_latch));
_services.add(new DatabaseHealthChecker(_latch));
Executor executor = Executors.newFixedThreadPool(_services.size());
for(final BaseHealthChecker v : _services)
{
executor.execute(v);
}
_latch.await();
for(final BaseHealthChecker v : _services)
{
if( ! v.isServiceUp())
{
return false;
}
}
return true;
}
} |
测试代码
public class Main {
public static void main(String[] args)
{
boolean result = false;
try {
result = ApplicationStartupUtil.checkExternalServices();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("External services validation completed !! Result was :: "+ result);
}
}
/** --- print ---
Checking Network Service
Checking Cache Service
Checking Database Service
Database Service is UP
Cache Service is UP
Network Service is UP
External services validation completed !! Result was :: true
**/ |
5. 总结
CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,不能再次被使用。
Thread 的 join() 方法可以实现相同的功能,但是当使用了线程池时,则 join() 方法便无法实现,CountDownLatch 依然可以实现功能。
package concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.*;
public class CountDownLatchDemo {
private final static CountDownLatch cdl=new CountDownLatch(3);
private final static Vector v=new Vector();
private final static ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());//使用线程池
private static class WriteThread extends Thread{
private final String writeThreadName;
private final int stopTime;
private final String str;
public WriteThread(String name,int time,String str)
{
this.writeThreadName=name;
this.stopTime=time;
this.str=str;
}
public void run()
{
System.out.println(writeThreadName+"开始写入工作");
try
{
Thread.sleep(stopTime);
}
catch(InterruptedException e)
{
e.printStackTrace();
}
cdl.countDown();
v.add(str);
System.out.println(writeThreadName+"写入内容为:"+str+"。写入工作结束!");
}
}
private static class ReadThread extends Thread{
public void run()
{
System.out.println("读操作之前必须先进行写操作");
try
{
cdl.await();//该线程进行等待,直到countDown减到0,然后逐个苏醒过来。
//Thread.sleep(3000);
}
catch(InterruptedException e)
{
e.printStackTrace();
}
for(int i=0;i<v.size();i++)
{
System.out.println("读取第"+(i+1)+"条记录内容为:"+v.get(i));
}
System.out.println("读操作结束!");
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Thread read=new ReadThread();
threadPool.execute(read);
String[] str= {"多线程知识点","多线程CountDownLatch的知识点","多线程中控制顺序可以使用CountDownLatch"};
for(int i=0;i<3;i++)
{
Thread t1= new WriteThread("writeThread"+(i+1),1000*(i+1),str[i]);
threadPool.execute(t1);
}
//new WriteThread("writeThread1",1000,"多线程知识点").start();
//new WriteThread("writeThread2",2000,"多线程CountDownLatch的知识点").start();
//new WriteThread("writeThread3",3000,"多线程中控制顺序可以使用CountDownLatch").start();
}
} |
CountDownLatch 类主要使用的场景有明显的顺序要求。
比如只有等跑完步才能计算排名,只有等所有记录都写入才能进行统计工作等等,因此 CountDownLatch 完善的是某种逻辑上的功能,使得线程按照正确的逻辑进行。
程序猿的技术大观园:www.javathinker.net
|
|