以下介绍消息发布确认的三种模式:
1.单个确认模式2.批量确认模式3.异步批量确认模式
先创建一个连接工具类
package com.zevin.rabbitmq.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.*;public class RabbitMqUtils { public static Channel channel()throws Exception{ //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置工厂IP 目的:连接rabbitMQ队列 factory.setHost("123.123.123");//此处为服务器ip //用户名 factory.setUsername("admin"); //密码 factory.setPassword("1236548974"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); return channel; }}
接着开始测试三种发布确认模式
package com.zevin.rabbitmq.four;import com.rabbitmq.client.Channel;import com.rabbitmq.client./confirm/iCallback;import com.zevin.rabbitmq.utils.RabbitMqUtils;import java.util.UUID;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;public class ConfirmMessage { //批量发消息的个数 public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception{ / // * 2.批量确认模式 // * 3.异步批量确认模式 publishMessageAsync(); } //单个确认 public static void publishMessageIndividually()throws Exception{ Channel channel = RabbitMqUtils.channel(); //队列的声明 String ququeName = UUID.randomUUID().toString(); channel.queueDeclare(ququeName,true,false,false,null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量发消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String messag = i + ""; channel.basicPublish("",ququeName,null,messag.getBytes()); //单个消息就马上进行发布确认 boolean flag = channel.waitForConfirms(); } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin)); } //批量发布确认 public static void publishMessageBatch()throws Exception{ Channel channel = RabbitMqUtils.channel(); //队列的声明 String ququeName = UUID.randomUUID().toString(); channel.queueDeclare(ququeName,true,false,false,null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量确认消息大小 int batchSize = 100; //批量发消息 (批量发布确认)!!!!!! for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("",ququeName,true,null,message.getBytes()); if (i+1%batchSize == 0){ channel.waitForConfirms(); } } //结束时间 long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时"+(end-begin)); } //异步发布 确认 public static void publishMessageAsync() throws Exception{ Channel channel = RabbitMqUtils.channel(); //队列的声明 String ququeName = UUID.randomUUID().toString(); channel.queueDeclare(ququeName,true,false,false,null); //开启发布确认 channel.confirmSelect(); ConcurrentSkipListMap