欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

【并发编程】基于数组结构实现的一个有界阻塞队列ArrayBlockingQueue

时间:2023-08-10
ArrayBlockingQueue是什么

ArrayBlockingQueue是最典型的有界阻塞队列。内部使用数组存储元素!初始化时需要指定容量大小。利用 ReentrantLock 实现线程安全! ArrayBlockingQueue的适用场景

在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,可以使用ArrayBlockingQueue。当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。 ArrayBlockingQueue的实现原理

使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。 ArrayBlockingQueue的特点

有界队列!先进先出!存取互相排斥!使用的数据结构是静态数组:容量固定,没有扩容机制;没有元素的位置也占用空间,被 null 占位;使用ReentrantLock锁:存取是同一把锁,操作的是同一个数组对象,存取互相排斥。 ArrayBlockingQueue的入队出队操作

两个指针都是从队首向队尾移动,保证队列的先进先出原则!入队阻塞对象notFull:队列count=length,放不进去元素时,阻塞在该对象上。出队阻塞对象notEmpty:队列count=0,无元素可取时,阻塞在该对象上。入队操作:从队首开始添加元素,记录putIndex(到队尾时设置为0),唤醒notEmpty。出队操作:从队首开始取出元素,记录takeIndex(到队尾时设置为0),唤醒notFull。 ArrayBlockingQueue的使用方式

// 定义同步队列BlockingQueue blockingQueue = new ArrayBlockingQueue(1000);// 放入元素System.out.println(blockingQueue.add(9));blockingQueue.put(10);// 取出元素System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());

ArrayBlockingQueue的数据结构源码分析

// 数据元素数组final Object[] items;// 下一个待取出元素索引int takeIndex;// 下一个待添加元素索引int putIndex;// 元素个数int count;// 内部使用的锁final ReentrantLock lock;// 消费者条件队列private final Condition notEmpty;// 生产者条件队列private final Condition notFull;

ArrayBlockingQueue的构造方法源码分析

public ArrayBlockingQueue(int capacity) { this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) { // 传入数组的长度小于0,直接抛出异常 if (capacity <= 0) throw new IllegalArgumentException(); // 初始化数组 this.items = new Object[capacity]; // 初始化锁 lock = new ReentrantLock(fair); // 初始化消费者的条件队列 notEmpty = lock.newCondition(); // 初始化生产者的条件队列 notFull = lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { // 调用俩个参数的构造方法 this(capacity, fair); // 得到当前队列的lock锁 final ReentrantLock lock = this.lock; // 加锁操作:这里加锁是防止由于指令重排序导致的可见性问题。 lock.lock(); // Lock only for visibility, not mutual exclusion try { // 定义一个数组元素的临时角标 int i = 0; try { // 循环每个元素,放入到ArrayBlockingQueue的数组中 for (E e : c) { // 元素为NULL抛出空指针异常 checkNotNull(e); // 将元素放入ArrayBlockingQueue的数组中 items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { // 传入数组长度大于给定的capacity的长度,会抛出异常 throw new IllegalArgumentException(); } // 赋值元素的数量到count上 count = i; // 数组中元素满了,插入的计数器从0开始 putIndex = (i == capacity) ? 0 : i; } finally { // 释放锁 lock.unlock(); }}

ArrayBlockingQueue的入队方法:put(E e) 源码分析

public void put(E e) throws InterruptedException { // 如果元素是NULL,抛出异常 checkNotNull(e); // 得到当前队列的lock锁 final ReentrantLock lock = this.lock; // 尝试去获取锁 lock.lockInterruptibly(); try { // 数量满了的时候,生产者队列等待 while (count == items.length) notFull.await(); // 入队 enqueue(e); } finally { // 唤醒消费者线程 lock.unlock(); }}private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; // 获取到当前的元素数组 final Object[] items = this.items; // 在该添加的位置放入当前的元素 items[putIndex] = x; // 数组中元素满了,插入的计数器从0开始。 // 这里进行了一次加一操作,将putIndex指向下个要插入的位置 if (++putIndex == items.length) putIndex = 0; // 元素的数量加一 count++; // 准备唤醒消费者条件队列 notEmpty.signal();}public void lockInterruptibly() throws InterruptedException { // 直接调用AQS的acquireInterruptibly方法 sync.acquireInterruptibly(1);}public final void acquireInterruptibly(int arg) throws InterruptedException { // 如果线程被中断了,抛异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取锁。tryAcquire在AQS中,与ReentrantLock的实现方式一致 if (!tryAcquire(arg)) // 循环的获取锁,优先考虑中断 doAcquireInterruptibly(arg);}private void doAcquireInterruptibly(int arg) throws InterruptedException { // 得到当前的节点 final Node node = addWaiter(Node.EXCLUSIVE); // 定义失败标志位true boolean failed = true; try { for (;;) { // 得到当前节点的上一个(前置)节点,前置节点为null,会抛出空指针异常 final Node p = node.predecessor(); // 如果前置节点是头结点,并且尝试获取锁成功 if (p == head && tryAcquire(arg)) { // 把当前的节点设置为头结点 setHead(node); // 去掉前驱节点的指向,方便GC去回收线程 p.next = null; // help GC // 变更失败标志位false failed = false; // 跳出循环 return; } // 代码执行到这里,说明尝试获取锁,但是获取锁失败了。 // 阻塞前的准备工作操作成功(状态是-1的时候成功) // 将线程阻塞,等待他去唤醒。唤醒后返回线程的中断状态! // 这里的代码在AQS中,与ReentrantLock的实现方式一致 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { // 上面代码抛出异常的时候,会执行这里的逻辑 if (failed) // 取消获取锁的逻辑。cancelAcquire在AQS中,与ReentrantLock的实现方式一致 cancelAcquire(node); }}public final void signal() { // 不是当前线程,直接抛出异常! if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取条件队列的头结点 Node first = firstWaiter; // 条件队列的头结点不是null,尝试去唤醒头结点 if (first != null) doSignal(first);}private void doSignal(Node first) { do { // 如果当前节点的下一个节点是null。说明唤醒这个就没有其他节点了。 if ( (firstWaiter = first.nextWaiter) == null) // 无节点的时候设置尾结点为null lastWaiter = null; // 将当前节点的指向情况,方便GC去回收 first.nextWaiter = null; // 这里的循环条件为条件队列转同步队列,transferForSignal在AQS中,与CyclicBarrier的实现方式一致 // 转同步队列失败并且节点存在会一直循环 // 转同步队列成功或者条件队列中没有节点,跳出循环! } while (!transferForSignal(first) && (first = firstWaiter) != null);}

ArrayBlockingQueue的出队方法:take() 源码分析

public E take() throws InterruptedException { // 得到当前队列的lock锁 final ReentrantLock lock = this.lock; // 尝试去获取锁 lock.lockInterruptibly(); try { // 队列中无数据的时候,消费者队列等待 while (count == 0) // 消费者队列等待 notEmpty.await(); // 返回出队的结果 return dequeue(); } finally { // 唤醒生产者线程 lock.unlock(); }}private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取到当前要出队的元素 E x = (E) items[takeIndex]; // 将要出队位置的元素变为null方便GC去回收 items[takeIndex] = null; // 出队到最后一个,下一个出队的计数器从0开始。 // 这里进行了一次加一操作,将takeIndex指向下个要出队的位置 if (++takeIndex == items.length) takeIndex = 0; // 元素总数量减一 count--; // 迭代器不为空的时候 if (itrs != null) // 这里的逻辑主要是头结点为空的时候,清空所有迭代器! // 迭代器需要去重写:iterator()定义 itrs.elementDequeued(); notFull.signal(); // 返回当前要出队的元素 return x;}

结束语

获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!关注公众号,可以让你对MySQL有非常深入的了解关注公众号,每天持续高效的了解并发编程!关注公众号,后续持续高效的了解spring源码!这个公众号,无广告!!!每日更新!!!

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。