Smaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。
创建Smaphore时,给定一个数量参数,表示同一时刻许可线程的数量
//1、构造方法public Semaphore(int permits)public Semaphore(int permits, boolean fair) //2、主要方法public void acquire() throws InterruptedExceptionpublic void release()tryAcquire(int args,long timeout, TimeUnit unit) //acquire() 表示阻塞并获取许可 可以携带参数,表示一个线程占用几个资源//release() 表示释放许可//tryAcquire尝试获取,如果获取不到,最长等待timeout时间,返回值是一个boolean,可以根据这个boolean的值来进行两类业务逻辑 //如果有6份资源,每个线程执行时需要2份资源,那么同一时刻只有3个线程可以执行 Semaphore semaphore = new Semaphore(6); semaphore.acquire(2);
2.使用场景资源访问,服务限流(Hystrix里限流就有基于信号量方式)。主要用于限制当前业务有多少线程能够执行
public class SemaphoreRunner { public static void main(String[] args) { Semaphore semaphore = new Semaphore(2); for (int i=0;i<5;i++){ new Thread(new Task(semaphore,"yangguo+"+i)).start(); } } static class Task extends Thread{ Semaphore semaphore; public Task(Semaphore semaphore,String tname){ this.semaphore = semaphore; this.setName(tname); } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis()); Thread.sleep(1000); semaphore.release(); System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } }}
2.Semaphore源码分析这里我采用和ReentrantLock独占对比的方式表述,Semaphore使用共享的方式和独占有很多相似点都是使用AQS框架中的CLH队列+Node结点+state资源数量作为可行,我之前有在ReentrantLock写过独占的
//设置资源 Sync(int permits) { setState(permits); }protected final void setState(int newState) { state = newState; }//通过ASQ框架中的state变量值来设置
1.尝试获取资源尝试获取资源,如果剩余资源 - 当前线程想要获取的资源>0,表示还有资源,可以尝试获取 尝试CAS原子性获取,如果获取不到就死循环获取,只要资源够,就说明可以获取,没获取到说明被别的线程抢了,但是还够,就接着获取 如果资源不够,或者直接获取到了就直接返回,返回值是资源的剩余数量 //执行tryAcquireShared
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); //剩余资源-想要的资源 int remaining = available - acquires; //如果相减完>0就说明当前线程还能有执行权 if (remaining < 0 || //CAS操作资源数,也就是信号量 compareAndSetState(available, remaining)) return remaining; } }//这里尝试获取锁和独占模式不一样在于.独占资源只有一份,被别人抢到就没了,应该进队列,而共享资源如果没抢到可能是因为抢一块去了,但是资源够两个人的.所以才是死循环抢资源,直到资源真的不够
2.对尝试获取资源的结果处理加锁1、如果资源不够 执行doAcquireSharedInterruptibly 执行入队,addWaiter进入CLH队列,并且头部再尝试获取 //doAcquireShared和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)功能大致是相似的
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //并不是尝试获取资源失败就直接阻塞,还要再次进行尝试获取资源 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //真的获取不到就在这里阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }//这里和独占模式就很类似了,都是先入队列,队列头不直接阻塞,会尝试获取资源//区别在于如果获取到资源会进行setHeadAndPropagate(node, r),传播的一个操作
3.解锁和传播手动解锁以后,激活CLH队列中的继续抢锁,因为是共享的,一定会涉及到传播问题
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //在这里进行解锁 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //这里还要判断h还是不是头 if (h == head) // loop if head changed break; } }//这里比较复杂,因为同一时间有多个线程执行,T1可能去CLH中唤醒,T2也有可能同时去唤醒,一旦同时别唤醒,那么指针可能就会后移两位,所以加了大量if (h == head) break;这种判断
不管是头结点不直接阻塞尝试获取,获取到了锁,还是被手动释放资源,同时后续结点去获取锁,只要是抢锁的过程都是下面这个逻辑记住独占模式和共享模式的区别就是,独占模式是当前节点获取到锁后,不会释放队列中的所有的节点一块争夺锁,而是按照队列中排好的顺序一个个的释放。而共享模式会在设置为头节点后,把队列中的所有节点释放出来只要唤醒就会传播大家一起唤醒抢,但是又是个死循环,抢不到又阻塞,通过0,-1,3这三个状态来处理传递抢
//setHeadAndPropagate这个函数只要执行就会传递抢,并且这个函数只有在有资源时才会执行,这是一个设置状态的方法 //release()方法其实只唤醒头结点,但是头结点一被唤醒就会触发传递抢private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //并不是尝试获取资源失败就直接阻塞,还要再次进行尝试获取资源 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //真的获取不到就在这里阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }
3..CountDownLatch 1.如何使用CountDownLatch作为门闩锁,初始时设置门闩的数量,门闩的数量可以往下减,减到0就表示可以继续执行
每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
创建CountDownLatch时,给定一个数量参数,表示门闩数
//1、构造方法public CountDownLatch(int count)//2、主要方法public void await()public boolean await(long timeout, TimeUnit unit)public void countDown()//await表示进行加门闩,我们线程执行时,一定要确定在哪个线程上加门闩才行,只要门闩加在线程上才有意义//await(long timeout, TimeUnit unit)表示有时间约束的门闩,如果在这个之间只能门闩没有被减到0,那么门将自动打开,不再阻塞//countDown表示门闩减一个 //使用CountDownLatch latch = new CountDownLatch(2); latch.await(); latch.countDown();
2.使用场景Zookeeper分布式锁,Jmeter模拟高并发等
CountDownLatch应用场景例子
比如陪媳妇去看病。医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。现在我们是双核,可以同时做这两个事(多线程)。假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)。
public class SeeDoctorTask implements Runnable { private CountDownLatch countDownLatch; public SeeDoctorTask(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("开始看医生"); Thread.sleep(3000); System.out.println("看医生结束,准备离开病房"); } catch (InterruptedException e) { e.printStackTrace(); }finally { if (countDownLatch != null) countDownLatch.countDown(); } }}public class QueueTask implements Runnable { private CountDownLatch countDownLatch; public QueueTask(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("开始在医院药房排队买药...."); Thread.sleep(5000); System.out.println("排队成功,可以开始缴费买药"); } catch (InterruptedException e) { e.printStackTrace(); }finally { if (countDownLatch != null) countDownLatch.countDown(); } }}public class CountDownLaunchRunner { public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new SeeDoctorTask(countDownLatch)).start(); new Thread(new QueueTask(countDownLatch)).start(); //等待线程池中的2个任务执行完毕,否则一直 countDownLatch.await(); System.out.println("over,回家 cost:"+(System.currentTimeMillis()-now)); }}
4.CountDownLatch源码分析 1.尝试获取资源如果当前的门闩剩余数为0,就表示获取资源成功
if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }//这里相比于Semaphore就变得非常简单,不用什么资源相减的判断了,就是一个门闩数是不是为0
2.对尝试获取资源的结果处理加锁3.解锁和传播如果资源不够 执行doAcquireSharedInterruptibly 执行入队,addWaiter进入CLH队列,并且头部再尝试获取 //这里和Semaphore共用一个方法, //在1中尝试获取资源的返回值-1和1对于这个方法是完全契合可以复用的
解锁和传播和Semaphore的逻辑也是一样的
if (tryReleaseShared(arg)) { doReleaseShared(); return true; }//无非对资源的判断逻辑不同,Semaphore是资源为>=0才唤醒CLH等待的//CountDownLatch是资源为0,才唤醒CLH等待的protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }