欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Rabbit发布确认模式

时间:2023-07-28

以下介绍消息发布确认的三种模式:

1.单个确认模式2.批量确认模式3.异步批量确认模式

先创建一个连接工具类

package com.zevin.rabbitmq.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.*;public class RabbitMqUtils { public static Channel channel()throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置工厂IP 目的:连接rabbitMQ队列 factory.setHost("123.123.123");//此处为服务器ip //用户名 factory.setUsername("admin"); //密码 factory.setPassword("1236548974"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); return channel; }}

接着开始测试三种发布确认模式

package com.zevin.rabbitmq.four;import com.rabbitmq.client.Channel;import com.rabbitmq.client./confirm/iCallback;import com.zevin.rabbitmq.utils.RabbitMqUtils;import java.util.UUID;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;public class ConfirmMessage { //批量发消息的个数 public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception{ / // * 2.批量确认模式 // * 3.异步批量确认模式 publishMessageAsync(); } //单个确认 public static void publishMessageIndividually()throws Exception{ Channel channel = RabbitMqUtils.channel(); //队列的声明 String ququeName = UUID.randomUUID().toString(); channel.queueDeclare(ququeName,true,false,false,null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量发消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String messag = i + ""; channel.basicPublish("",ququeName,null,messag.getBytes()); //单个消息就马上进行发布确认 boolean flag = channel.waitForConfirms(); } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin)); } //批量发布确认 public static void publishMessageBatch()throws Exception{ Channel channel = RabbitMqUtils.channel(); //队列的声明 String ququeName = UUID.randomUUID().toString(); channel.queueDeclare(ququeName,true,false,false,null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量确认消息大小 int batchSize = 100; //批量发消息 (批量发布确认)!!!!!! for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",ququeName,true,null,message.getBytes()); if (i+1%batchSize == 0){ channel.waitForConfirms(); } } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end-begin)); } //异步发布 确认 public static void publishMessageAsync() throws Exception{ Channel channel = RabbitMqUtils.channel(); //队列的声明 String ququeName = UUID.randomUUID().toString(); channel.queueDeclare(ququeName,true,false,false,null); //开启发布确认 channel.confirmSelect(); ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>(); //消息确认成功回调函数 ConfirmCallback ackcallback = (var1,var3)->{ if (var3){ //(2)删除掉已经确认的消息 剩下的就是未确认的消息 ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(var1); /confirm/ied.clear(); }else { outstanding/confirm/is.remove(var1); } System.out.println("确认的消息"+var1); }; //消息确认失败回调函数 ConfirmCallback nackcallback = (var1,var3)->{ //(3) 打印一下未确认的消息有哪些 String message = outstanding/confirm/is.get(var1); System.out.println("未确认的消息是:"+message+":::::未确认的消息tag:"+var1); }; //准备消息的监听器 监听哪些成功了 哪些消息失败了 channel.addConfirmListener(ackcallback,nackcallback); //开始时间 long begin = System.currentTimeMillis(); //批量发送 for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i ; channel.basicPublish("",ququeName,null,message.getBytes()); //(1)记录下所有要发送的消息 消息的总和 outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message); } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时"+(end-begin)); }}

 

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。