欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

【并发编程】阻塞队列BlockingQueue入门

时间:2023-08-10
BlockingQueue是什么

BlockingQueue 继承了 Queue 接口,是队列的一种。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满。支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空。 阻塞队列继承了队列Queue接口

add(E e):添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常。offer(E e):添加一个元素,添加成功返回true, 如果队列满了,返回false。remove():返回并删除队首元素,队列为空则抛出异常。poll():返回并删除队首元素,队列为空则返回null。element():返回队首元素,但不移除,队列为空则抛出异常peek():获取队首元素,但不移除,队列为空则返回null BlockingQueue常用方法 方法抛出异常返回特定值阻塞阻塞特定时间入队add(e)offer(e)put(e)offer(e, time, unit)出队remove()poll()take()poll(time, unit)获取队首元素element()peek()不支持不支持阻塞队列的特性

阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字!阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。 阻塞队列的take方法

take方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。 阻塞队列的put方法

put方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入。如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。 阻塞队列的边界

容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如 linkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。 阻塞队列的应用场景

BlockingQueue 是线程安全的,可以解决线程的安全问题。因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开发的难度和工作量。队列它还能起到一个隔离的作用。生产者不需要关注消费者的逻辑。实现了具体任务与执行任务类之间的解耦,提高了安全性。 常见阻塞队列

ArrayBlockingQueue:基于数组结构实现的一个有界阻塞队列。linkedBlockingQueue:基于链表结构实现的一个无界阻塞队列。PriorityBlockingQueue:支持按优先级排序的无界阻塞队列。DelayQueue:基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列。SynchronousQueue:不存储元素的阻塞队列。linkedTransferQueue:基于链表结构实现的一个无界阻塞队列。linkedBlockingDeque:基于链表结构实现的一个双端阻塞队列。 阻塞队列的验证方法

import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingQueueTest {public static void main(String[] args) {addTest();// removeTest();// elementTest()// offerTest()// pollTest()// peekTest()// putTest()// putTest()}private static void addTest() {BlockingQueue blockingQueue = new ArrayBlockingQueue(2);System.out.println(blockingQueue.add(1));System.out.println(blockingQueue.add(2));System.out.println(blockingQueue.add(3));}private static void removeTest() {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);blockingQueue.add(1);blockingQueue.add(2);System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());}private static void elementTest() {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);blockingQueue.element();}private static void offerTest() {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);System.out.println(blockingQueue.offer(1));System.out.println(blockingQueue.offer(2));System.out.println(blockingQueue.offer(3));}private static void pollTest() {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);blockingQueue.offer(1);blockingQueue.offer(2);blockingQueue.offer(3);System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());}private static void peekTest() {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);System.out.println(blockingQueue.peek());}private static void putTest() {BlockingQueue blockingQueue = new ArrayBlockingQueue(2);try {blockingQueue.put(1);blockingQueue.put(2);blockingQueue.put(3);} catch (InterruptedException e) {e.printStackTrace();}}private static void takeTest() {BlockingQueue blockingQueue = new ArrayBlockingQueue(2);try {blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}}}

结束语

获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!关注公众号,可以让你对MySQL有非常深入的了解关注公众号,每天持续高效的了解并发编程!关注公众号,后续持续高效的了解spring源码!这个公众号,无广告!!!每日更新!!!

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。