欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Semaphore,CountDownLatch使用和浅谈源码

时间:2023-08-12
1.Semaphore 1.如何使用

Smaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。

创建Smaphore时,给定一个数量参数,表示同一时刻许可线程的数量

//1、构造方法public Semaphore(int permits)public Semaphore(int permits, boolean fair)    //2、主要方法public void acquire() throws InterruptedExceptionpublic void release()tryAcquire(int args,long timeout, TimeUnit unit)    //acquire() 表示阻塞并获取许可   可以携带参数,表示一个线程占用几个资源//release() 表示释放许可//tryAcquire尝试获取,如果获取不到,最长等待timeout时间,返回值是一个boolean,可以根据这个boolean的值来进行两类业务逻辑    //如果有6份资源,每个线程执行时需要2份资源,那么同一时刻只有3个线程可以执行    Semaphore semaphore = new Semaphore(6); semaphore.acquire(2);

2.使用场景

资源访问,服务限流(Hystrix里限流就有基于信号量方式)。主要用于限制当前业务有多少线程能够执行

public class SemaphoreRunner {    public static void main(String[] args) {        Semaphore semaphore = new Semaphore(2);        for (int i=0;i<5;i++){            new Thread(new Task(semaphore,"yangguo+"+i)).start();       }   }​    static class Task extends Thread{        Semaphore semaphore;​        public Task(Semaphore semaphore,String tname){            this.semaphore = semaphore;            this.setName(tname);       }​        public void run() {            try {                semaphore.acquire();                              System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());                Thread.sleep(1000);                semaphore.release();                              System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());           } catch (InterruptedException e) {                e.printStackTrace();           }​       }   }}

2.Semaphore源码分析

这里我采用和ReentrantLock独占对比的方式表述,Semaphore使用共享的方式和独占有很多相似点都是使用AQS框架中的CLH队列+Node结点+state资源数量作为可行,我之前有在ReentrantLock写过独占的

//设置资源 Sync(int permits) { setState(permits); }protected final void setState(int newState) { state = newState; }//通过ASQ框架中的state变量值来设置

1.尝试获取资源

尝试获取资源,如果剩余资源 - 当前线程想要获取的资源>0,表示还有资源,可以尝试获取    尝试CAS原子性获取,如果获取不到就死循环获取,只要资源够,就说明可以获取,没获取到说明被别的线程抢了,但是还够,就接着获取    如果资源不够,或者直接获取到了就直接返回,返回值是资源的剩余数量 //执行tryAcquireShared

protected int tryAcquireShared(int acquires) {            for (;;) {                if (hasQueuedPredecessors())                    return -1;                int available = getState();                //剩余资源-想要的资源                int remaining = available - acquires;                //如果相减完>0就说明当前线程还能有执行权                if (remaining < 0 ||                    //CAS操作资源数,也就是信号量                    compareAndSetState(available, remaining))                    return remaining;           }       }//这里尝试获取锁和独占模式不一样在于.独占资源只有一份,被别人抢到就没了,应该进队列,而共享资源如果没抢到可能是因为抢一块去了,但是资源够两个人的.所以才是死循环抢资源,直到资源真的不够

2.对尝试获取资源的结果处理加锁

1、如果资源不够 执行doAcquireSharedInterruptibly    执行入队,addWaiter进入CLH队列,并且头部再尝试获取        //doAcquireShared和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)功能大致是相似的

private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {   //入队        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            //并不是尝试获取资源失败就直接阻塞,还要再次进行尝试获取资源            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                   }               }                //真的获取不到就在这里阻塞                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();           }       } finally {            if (failed)                cancelAcquire(node);       }//这里和独占模式就很类似了,都是先入队列,队列头不直接阻塞,会尝试获取资源//区别在于如果获取到资源会进行setHeadAndPropagate(node, r),传播的一个操作

3.解锁和传播

手动解锁以后,激活CLH队列中的继续抢锁,因为是共享的,一定会涉及到传播问题

   private void doReleaseShared() {        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    //在这里进行解锁                    unparkSuccessor(h);               }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS           }            //这里还要判断h还是不是头            if (h == head)                   // loop if head changed                break;       }   }//这里比较复杂,因为同一时间有多个线程执行,T1可能去CLH中唤醒,T2也有可能同时去唤醒,一旦同时别唤醒,那么指针可能就会后移两位,所以加了大量if (h == head) break;这种判断

不管是头结点不直接阻塞尝试获取,获取到了锁,还是被手动释放资源,同时后续结点去获取锁,只要是抢锁的过程都是下面这个逻辑​记住独占模式和共享模式的区别就是,独占模式是当前节点获取到锁后,不会释放队列中的所有的节点一块争夺锁,而是按照队列中排好的顺序一个个的释放。而共享模式会在设置为头节点后,把队列中的所有节点释放出来​只要唤醒就会传播大家一起唤醒抢,但是又是个死循环,抢不到又阻塞,通过0,-1,3这三个状态来处理传递抢

//setHeadAndPropagate这个函数只要执行就会传递抢,并且这个函数只有在有资源时才会执行,这是一个设置状态的方法 //release()方法其实只唤醒头结点,但是头结点一被唤醒就会触发传递抢​​​​​​​private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {   //入队        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            //并不是尝试获取资源失败就直接阻塞,还要再次进行尝试获取资源            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                   }               }                //真的获取不到就在这里阻塞                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();           }       } finally {            if (failed)                cancelAcquire(node);       }

 3..CountDownLatch 1.如何使用

CountDownLatch作为门闩锁,初始时设置门闩的数量,门闩的数量可以往下减,减到0就表示可以继续执行

每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

创建CountDownLatch时,给定一个数量参数,表示门闩数

//1、构造方法public CountDownLatch(int count)​//2、主要方法public void await()public boolean await(long timeout, TimeUnit unit)public void countDown()//await表示进行加门闩,我们线程执行时,一定要确定在哪个线程上加门闩才行,只要门闩加在线程上才有意义//await(long timeout, TimeUnit unit)表示有时间约束的门闩,如果在这个之间只能门闩没有被减到0,那么门将自动打开,不再阻塞//countDown表示门闩减一个    //使用CountDownLatch latch = new CountDownLatch(2);        latch.await();        latch.countDown();

2.使用场景

Zookeeper分布式锁,Jmeter模拟高并发等

CountDownLatch应用场景例子

比如陪媳妇去看病。医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。现在我们是双核,可以同时做这两个事(多线程)。假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)。

public class SeeDoctorTask implements Runnable {    private CountDownLatch countDownLatch;​    public SeeDoctorTask(CountDownLatch countDownLatch){        this.countDownLatch = countDownLatch;   }​    public void run() {        try {            System.out.println("开始看医生");            Thread.sleep(3000);            System.out.println("看医生结束,准备离开病房");       } catch (InterruptedException e) {            e.printStackTrace();       }finally {            if (countDownLatch != null)                countDownLatch.countDown();       }   }​}​public class QueueTask implements Runnable {​    private CountDownLatch countDownLatch;​    public QueueTask(CountDownLatch countDownLatch){        this.countDownLatch = countDownLatch;   }    public void run() {        try {            System.out.println("开始在医院药房排队买药....");            Thread.sleep(5000);            System.out.println("排队成功,可以开始缴费买药");       } catch (InterruptedException e) {            e.printStackTrace();       }finally {            if (countDownLatch != null)                countDownLatch.countDown();       }   }}​public class CountDownLaunchRunner {​    public static void main(String[] args) throws InterruptedException {        long now = System.currentTimeMillis();        CountDownLatch countDownLatch = new CountDownLatch(2);​        new Thread(new SeeDoctorTask(countDownLatch)).start();        new Thread(new QueueTask(countDownLatch)).start();        //等待线程池中的2个任务执行完毕,否则一直        countDownLatch.await();        System.out.println("over,回家 cost:"+(System.currentTimeMillis()-now));   }}

4.CountDownLatch源码分析 1.尝试获取资源

如果当前的门闩剩余数为0,就表示获取资源成功

if (tryAcquireShared(arg) < 0)            doAcquireSharedInterruptibly(arg);​        protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;       }//这里相比于Semaphore就变得非常简单,不用什么资源相减的判断了,就是一个门闩数是不是为0

2.对尝试获取资源的结果处理加锁

如果资源不够 执行doAcquireSharedInterruptibly    执行入队,addWaiter进入CLH队列,并且头部再尝试获取        //这里和Semaphore共用一个方法,    //在1中尝试获取资源的返回值-1和1对于这个方法是完全契合可以复用的

3.解锁和传播

解锁和传播和Semaphore的逻辑也是一样的

if (tryReleaseShared(arg)) {            doReleaseShared();            return true;       }​//无非对资源的判断逻辑不同,Semaphore是资源为>=0才唤醒CLH等待的//CountDownLatch是资源为0,才唤醒CLH等待的protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            for (;;) {                int c = getState();                if (c == 0)                    return false;                int nextc = c-1;                if (compareAndSetState(c, nextc))                    return nextc == 0;           }       }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。