控制并发流程

aqs6

CountDownLatch

在CountDownLatch出现之前一般都使用线程的join()方法来实现这一点,但是join方法不够灵活,不能够满足不同场景的需要,所以JDK提供了CountDownLatch这个类.
CountDownLatch是一种同步辅助工具,允许一个或多个线程等待,直到其他线程执行的一组操作完成
CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。其它线程调用countDown方法会将计数器减1,当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
注意:这是一种一次性工具,即无法重置计数

主要方法

下面来看些 CountDownLatch 的一些重要方法。
先从 CountDownLatch 的构造方法看起:

public CountDownLatch(int count)

构造方法会传入一个整型数 N,之后调用 CountDownLatch 的countDown方法会对 N 减一,知道 N 减到 0 的时候,当前调用await方法的线程继续执行。
CountDownLatch 的方法不是很多,将它们一个个列举出来:

  1. **await() **throws InterruptedException:调用该方法的线程等到构造方法传入的 N 减到 0 的时候,才能继续往下执行;
  2. await(long timeout, TimeUnit unit):与上面的 await 方法功能一致,只不过这里有了时间限制,调用该方法的线程等到指定的 timeout 时间后,不管 N 是否减至为 0,都会继续往下执行;
  3. countDown():使 CountDownLatch 初始值 N 减 1;
  4. long getCount():获取当前 CountDownLatch 维护的值;

用法一

一个线程等待多个线程都执行完毕,再继续自己的工作。(等待所有子线程执行完毕后,主线程再往下执行)
AQS7

用法示例

/**
 * 描述:     工厂中,质检,5个工人检查,所有人都认为通过,才通过
 */
public class CountDownLatchDemo1 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + "完成了检查。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        System.out.println("等待5个人检查完.....");
        latch.await();
        System.out.println("所有人都完成了工作,进入下一个环节。");
    }
}

用法二

多个线程等待一个线程的信号,同时开始执行。(让所有线程等待,直到latch计数为0后,再同时执行)

用法示例

我们在这里设置了两个门,一个是开始门,一个是结束门。

  • 开始门: 所有运动员处于准备状态,等待教练的枪声。这时候运动员为n个,枪响只需要一声,等待的这一声枪响到了,开始门也就打开了,所有运动员开始跑。
  • 结束门: 教练等待所有运动员,当最后一个运动员也冲破底线,教练才能宣布所有人到达终点,这时候是教练等待n个运动员,直到n为0。
/**
 * 描述:模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。
 */
public class CountDownLatchDemo1And2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);

        CountDownLatch end = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "准备完毕,等待发令枪");
                    try {
                        begin.await();
                        System.out.println("No." + no + "开始跑步了");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + "跑到终点了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }
                }
            };
            service.submit(runnable);
        }
        //裁判员检查发令枪...
        Thread.sleep(5000);
        System.out.println("发令枪响,比赛开始!");
        begin.countDown();

        end.await();
        System.out.println("所有人到达终点,比赛结束");
    }
}

Semaphore信号量

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

Semaphore

Semaphore 有两个构造函数,参数为许可的个数 permits 和是否公平竞争 fair。通过 acquire 方法能够获得的许可个数为 permits,如果超过了这个个数,就需要等待。当一个线程 release 释放了一个许可后,fair 决定了正在等待的线程该由谁获取许可,如果是公平竞争则等待时间最长的线程获取,如果是非公平竞争则随机选择一个线程获取许可。不传 fair 的构造函数默认采用非公开竞争。

Semaphore(int permits)
Semaphore(int permits, boolean fair)

一个线程可以一次获取一个许可,也可以一次获取多个。 在 acquire 等待的过程中,如果线程被中断,acquire 会抛出中断异常,如果希望忽略中断继续等待可以调用 acquireUninterruptibly 方法。同时提供了 tryAcquire 方法尝试获取,获取失败返回 false,获取成功返回 true。tryAcquire 方法可以在获取不到时立即返回,也可以等待一段时间。需要注意的是,没有参数的 tryAcquire 方法在有许可可以获取的情况下,无论有没有线程在等待都能立即获取许可,即便是公平竞争也能立即获取。

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
public void acquire(int permits)
public void acquireUninterruptibly(int permits)
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
public void release(int permits)

除了 acquire方法之外,另一个比较常用的与之对应的方法是tryAcquire方法,该方法如果获取不到许可就立即返回false。

Semaphore 有两种模式,公平模式和非公平模式。

  • 公平模式: 调用acquire的顺序就是获取许可证的顺序,遵循FIFO;
  • 非公平模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

   public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。

使用示例

如下的示例中,测试方法 test 创建了多个线程,每个线程启动后都调用 acquire 方法,然后延时 5s 模仿业务耗时,最后调用 release 方法释放许可。

public class SemaphoreTest {
    private int threadNum;
    private Semaphore semaphore;
    public SemaphoreTest(int permits,int threadNum, boolean fair) {
        this.threadNum = threadNum;
        semaphore = new Semaphore(permits,fair);
    }
    
    private void println(String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("[YYYY-MM-dd HH:mm:ss.SSS] ");
        System.out.println(sdf.format(new Date()) + msg);
    }
    
    public void test(){
        for(int i =  0; i < threadNum; i ++){
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    println(Thread.currentThread().getName() + " acquire");
                    Thread.sleep(5000);//模拟业务耗时
                    println(Thread.currentThread().getName() + " release");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在上述的示例中,如果 fair 传的是 true,则各个线程公平竞争,即按照等待时间的长短决定谁先获取许可。以 9 个线程竞争 3 个许可为例,执行结果如下,首选是线程 0、1、2 获取了许可,5s 后线程 3、4、5 获取了许可,最后是线程 6、7、8 获取许可,顺序基本上与创建线程并启动的先后顺序一致,也与各个线程等待的时间基本相符。

condition 介绍

 Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。

  • Condition是个接口,基本的方法就是await()和signal()方法;
  • Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
  • 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用

  Conditon中的await()对应Object的wait();
  Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
下面是demo:

import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
public class ConTest {  
      
     final Lock lock = new ReentrantLock();  
     final Condition condition = lock.newCondition();  
  
    public static void main(String[] args) {  
        // TODO Auto-generated method stub  
        ConTest test = new ConTest();  
        Producer producer = test.new Producer();  
        Consumer consumer = test.new Consumer();  
                
          
        consumer.start();   
        producer.start();  
    }  
      
     class Consumer extends Thread{  
           
            @Override  
            public void run() {  
                consume();  
            }  
                
            private void consume() {  
                               
                    try {  
                        lock.lock();  
                        System.out.println("我在等一个新信号"+this.currentThread().getName());  
                        condition.await();  
                          
                    } catch (InterruptedException e) {  
                        // TODO Auto-generated catch block  
                        e.printStackTrace();  
                    } finally{  
                        System.out.println("拿到一个信号"+this.currentThread().getName());  
                        lock.unlock();  
                    }  
                  
            }  
        }  
       
     class Producer extends Thread{  
           
            @Override  
            public void run() {  
                produce();  
            }  
                
            private void produce() {                   
                    try {  
                           lock.lock();  
                           System.out.println("我拿到锁"+this.currentThread().getName());  
                            condition.signalAll();                             
                        System.out.println("我发出了一个信号:"+this.currentThread().getName());  
                    } finally{  
                        lock.unlock();  
                    }  
                }  
     }  
          
}

运行结果:
AQS8
Condition的执行方式,是当在线程Consumer中调用await方法后,线程Consumer将释放锁,并且将自己沉睡,等待唤醒,线程Producer获取到锁后,开始做事,完毕后,调用Condition的signalall方法,唤醒线程Consumer,线程Consumer恢复执行。
以上说明Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。

Condition实现生产者、消费者模式:

import java.util.PriorityQueue;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
  
public class ConTest2 {  
        private int queueSize = 10;  
        private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);  
        private Lock lock = new ReentrantLock();  
        private Condition notFull = lock.newCondition();  
        private Condition notEmpty = lock.newCondition();  
           
        public static void main(String[] args) throws InterruptedException  {  
            ConTest2 test = new ConTest2();  
            Producer producer = test.new Producer();  
            Consumer consumer = test.new Consumer();                
            producer.start();  
            consumer.start();  
            Thread.sleep(0);  
            producer.interrupt();  
            consumer.interrupt();  
        }  
            
        class Consumer extends Thread{              
            @Override  
            public void run() {  
                consume();  
            }  
            volatile boolean flag=true;    
            private void consume() {  
                while(flag){  
                    lock.lock();  
                    try {  
                        while(queue.size() == 0){  
                            try {  
                                System.out.println("队列空,等待数据");  
                                notEmpty.await();  
                            } catch (InterruptedException e) {                              
                                flag =false;  
                            }  
                        }  
                        queue.poll();                //每次移走队首元素  
                        notFull.signal();  
                        System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");  
                    } finally{  
                        lock.unlock();  
                    }  
                }  
            }  
        }  
            
        class Producer extends Thread{              
            @Override  
            public void run() {  
                produce();  
            }  
            volatile boolean flag=true;    
            private void produce() {  
                while(flag){  
                    lock.lock();  
                    try {  
                        while(queue.size() == queueSize){  
                            try {  
                                System.out.println("队列满,等待有空余空间");  
                                notFull.await();  
                            } catch (InterruptedException e) {  
                                  
                                flag =false;  
                            }  
                        }  
                        queue.offer(1);        //每次插入一个元素  
                        notEmpty.signal();  
                        System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));  
                    } finally{  
                        lock.unlock();  
                    }  
                }  
            }  
        }  
    }

运行结果如下:
AQS10

循环栅栏:CyclicBarrier

CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。
CyclicBarrier好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。
构造方法

  1. 默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达屏障位置,线程被阻塞。
  2. 另外一个构造方法CyclicBarrier(int parties, Runnable barrierAction),其中barrierAction任务会在所有线程到达屏障后执行。

CyclicBarrier 的主要方法:

//等到所有的线程都到达指定的临界点
await() throws InterruptedException, BrokenBarrierException
//与上面的await方法功能基本一致,只不过这里有超时限制,阻塞等待直至到达超时时间为止
await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException
//获取当前有多少个线程阻塞等待在临界点上
int getNumberWaiting()
//用于查询阻塞等待的线程是否被中断
boolean isBroken()
//将屏障重置为初始状态。如果当前有线程正在临界点等待的话,将抛出BrokenBarrierException。
void reset()

CyclicBarrier 的执行示意图如下:

代码展示

package sychronized;

import java.util.Random;
import java.util.concurrent.*;
import static net.mindview.util.Print.*;

class WriteTask implements Runnable{
    private static int count = 0;
    private final int id = count++;
    private CyclicBarrier barrier ;
    private static Random random = new Random(47);
    public WriteTask(CyclicBarrier cyclicBarrier) {
        this.barrier = cyclicBarrier;
    }
    
    @Override
    public void run() {
        print(this+"开始写入数据...");
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(5000));      //以睡眠来模拟写入数据操作
            print(this+"写入数据完毕,等待其他线程写入完毕"+" "+System.currentTimeMillis());
            barrier.await();
        } catch (InterruptedException e) {
            print(this + "is interrupted!");
        }catch(BrokenBarrierException e){
           throw new RuntimeException(e);
        }
        print("所有任务写入完毕,继续处理其他任务... "+System.currentTimeMillis());
    }

    @Override
    public String toString() {
        return getClass().getSimpleName()+"-"+id;
    }
}

public class CyclicBarrierTest {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < N; ++i){
            exec.execute(new WriteTask(barrier));
        }
        exec.shutdown();
    }
}

#输出结果:
WriteTask-3 开始写入数据...
WriteTask-2 开始写入数据...
WriteTask-1 开始写入数据...
WriteTask-0 开始写入数据...
WriteTask-2 写入数据完毕,等待其他线程写入完毕 1512048648904
WriteTask-1 写入数据完毕,等待其他线程写入完毕 1512048650042
WriteTask-0 写入数据完毕,等待其他线程写入完毕 1512048650209
WriteTask-3 写入数据完毕,等待其他线程写入完毕 1512048652606
所有任务写入完毕,继续处理其他任务... 1512048652607
所有任务写入完毕,继续处理其他任务... 1512048652607
所有任务写入完毕,继续处理其他任务... 1512048652607
所有任务写入完毕,继续处理其他任务... 1512048652607

实现原理

本文代码源于JDK1.8
CyclicBarrier实现主要基于ReentrantLock。

public class CyclicBarrier {
    private static class Generation {
        boolean broken = false;
    }
    
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    
    /** The number of parties */
    private final int parties;
    
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    
    /** The current generation */
    private Generation generation = new Generation();
    ...省略后面代码
}

其中Generation用来控制屏障的循环使用,如果generation.broken为true的话,说明这个屏障已经损坏,当某个线程await的时候,直接抛出异常
await实现

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation)
                return index;
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
  1. 每当线程执行await,内部变量count减1,如果count!= 0,说明有线程还未到屏障处,则在锁条件变量trip上等待。
  2. 当count == 0时,说明所有线程都已经到屏障处,执行条件变量的signalAll方法唤醒等待的线程。
    其中 nextGeneration方法可以实现屏障的循环使用:
  • 重新生成Generation对象
  • 恢复count值

总结

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

  • CountDownLatch 一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
  • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
  • CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的。

Semaphore 其实和锁有点类似,它一般用于控制对 某组 资源的访问权限,而锁是控制对 某个 资源的访问权限。

好文推荐:[CountDownLatch、CyclicBarrier、Semaphore 用法总结

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议