欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

kafka顺序消息

时间:2023-06-07
场景概述

假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。
已下单 → 已支付 → 已确认
不允许错乱!

顺序级别

1)全局有序:
串行化。每条经过kafka的消息必须严格保障有序性。这就要求kafka单通道,每个groupid下单消费者极大的影响性能,现实业务下几乎没必要。

2)局部有序:
业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓。

充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。(和RocketMQ的解决思路很相似)

实现方案

1)发送端:
指定key发送,key=order.id即可。

2)发送中:
给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理
吞吐量显然上不去,kafka开多个分区还有何意义?
所以开多个消费者指定分区消费,理想状况下,每个分区配一个。
但是,这个吞吐量依然有限,那如何处理呢?

方案:多线程
在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!
参考下图:thread处理后,会将data变成 2-1-3。因为即使线程A先拿到1.线程B再拿到2。也不能确保A能先消费1。

改进:接收后分发二级内存队列

消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。
再开启多个线程,每个队列分配一个线程处理(1对1的关系)。提升吞吐量

代码验证

1)新建一个sort队列,2个分区

2)编写代码
SortedProducer(顺序性发送端)

@RestControllerpublic class SortedProducer { @Resource private KafkaTemplate kafkaTemplate; @GetMapping("/kafka/test/{id}/{status}") public void sendSortedMsg(@PathVariable("id") int id ,@PathVariable("status") int status ) { Order order = new Order(); order.setId(id); order.setStatus(status);// 以订单号做key发送 kafkaTemplate.send("sorted",String.valueOf(id), JSON.toJSONString(order)); }}

SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路):

@Componentpublic class SortedConsumer { private final Logger logger = LoggerFactory.getLogger(SortedConsumer.class); //队列数量,根据业务情况决定 //本案例,队列=4,线程=4,一一对应 linkedBlockingQueue[] queues = new linkedBlockingQueue[4]; @PostConstruct void execute(){ //遍历队列数组,初始化每一个元素,同时让线程启动 for (int i = 0; i < 4; i++) { logger.info("thread started , index = {}", i); final int current = i; queues[current] = new linkedBlockingQueue(); new Thread( () -> { try { //循环4次,起4个线程一直监听自己的队列,不停获取数据 //取到就执行打印log,取不到阻塞等待 while (true) { Order order = (Order) queues[current].take(); logger.info("get from queue, queue:{},order:[id={},status={}]", current, order.getId(), order.getStatus()); } } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } @KafkaListener(topics = {"sorted"},topicPattern = "0",groupId = "sorted-group-1") public void onMessage(ConsumerRecord<?, ?> consumerRecord) throws InterruptedException { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get();// 从kafka里获取到消息// 注意分发方式,kafka有两个分区0和1,对应两个消费者// 当前分区是0,也就是偶数的id,0、2、4、6会在这里被消费 Order order = JSON.parseObject(String.valueOf(msg),Order.class);// 而队列是4个。也就是每个消费者再分到两个队列里去// 队列另一端分别对应4个线程在等待// 所以,按4取余数 int index = order.getId() % 4;// logger.info("put to queue,queue={},order:[id={},status={}]",index,order.getId(),order.getStatus()); queues[index].put(order); } } @KafkaListener(topics = {"sorted"},topicPattern = "1",groupId = "sorted-group-1") public void onMessage1(ConsumerRecord<?, ?> consumerRecord) throws InterruptedException { //相同的实现,现实中为另一台机器,这里用两个listener模拟 //奇数的id会被分到这里,也就是1、3、5、7 onMessage(consumerRecord); }}

SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!):

//@Componentpublic class SortedConsumer2 { private final Logger logger = LoggerFactory.getLogger(SortedConsumer2.class); //队列数量,根据业务情况决定 //本案例,队列=4,线程=4,一一对应 ExecutorService[] queues = new ExecutorService[4]; @PostConstruct void execute(){ //遍历队列数组,初始化每一个元素,同时让线程启动 for (int i = 0; i < 4; i++) { logger.info("thread started , index = {}", i); final int current = i; queues[current] = Executors.newSingleThreadExecutor(); } } @KafkaListener(topics = {"sorted"},topicPattern = "0",groupId = "sorted-group-2") public void onMessage(ConsumerRecord<?, ?> consumerRecord) throws InterruptedException { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get();// 从kafka里获取到消息// 注意分发方式,kafka有两个分区0和1,对应两个消费者// 当前分区是0,也就是偶数的id,0、2、4、6会在这里被消费 Order order = JSON.parseObject(String.valueOf(msg),Order.class);// 而队列是4个。也就是每个消费者再分到两个队列里去// 队列另一端分别对应4个线程在等待// 所以,按4取余数 int index = order.getId() % 4;// logger.info("put to queue,queue={},order:[id={},status={}]",index,order.getId(),order.getStatus()); queues[index].execute(()->{ logger.info("get from queue, queue:{},order:[id={},status={}]", index, order.getId(), order.getStatus()); }); } } @KafkaListener(topics = {"sorted"},topicPattern = "1",groupId = "sorted-group-2") public void onMessage1(ConsumerRecord<?, ?> consumerRecord) throws InterruptedException { //相同的实现,现实中为另一台机器,这里用两个listener模拟 //奇数的id会被分到这里,也就是1、3、5、7 onMessage(consumerRecord); }}

3)通过swagger请求

先按不同的id发送,查看控制台日志,id被正确分发到对应的队列

同一个key分配到同一个queue,顺序性得到保障

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。