欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SpringBoot项目中连接两个RabbitMq

时间:2023-07-16

今天在写项目的时候遇到新需求,一个mq的功能要使用我们公司的服务器的mq,一个mq的功能要使用部署的那边的服务器的mq,话不多说直接上代码。

配置文件application.yml:

spring: rabbitmq: yjdpeservice: host: xxx.xxx.xxx.xxx port: 5672 username: admin password: admin yjservice: host: xxx.xxx.xxx.xxx port: 5672 username: admin password: admin

配置类:

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;@Configurationpublic class RabbitPlusConfig { @Bean(name="mergeConnectionFactory") @Primary public ConnectionFactory MergeConnectionFactory( @Value("${spring.rabbitmq.yjdpeservice.host}") String host, @Value("${spring.rabbitmq.yjdpeservice.port}") int port, @Value("${spring.rabbitmq.yjdpeservice.username}") String username, @Value("${spring.rabbitmq.yjdpeservice.password}") String password ){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } @Bean(name="LocalConnectionFactory") public ConnectionFactory LocalConnectionFactory( @Value("${spring.rabbitmq.yjservice.host}") String host, @Value("${spring.rabbitmq.yjservice.port}") int port, @Value("${spring.rabbitmq.yjservice.username}") String username, @Value("${spring.rabbitmq.yjservice.password}") String password ){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } @Bean(name="mergeRabbitTemplate") @Primary public RabbitTemplate mergeRabbitTemplate( @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory ){ RabbitTemplate yjdpRabbitTemplate = new RabbitTemplate(connectionFactory); return yjdpRabbitTemplate; } @Bean(name="LocalRabbitTemplate") public RabbitTemplate LocalRabbitTemplate( @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory ){ RabbitTemplate yjRabbitTemplate = new RabbitTemplate(connectionFactory); return yjRabbitTemplate; } @Bean(name="mergeFactory") public SimpleRabbitListenerContainerFactory mergeFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(name="LocalFactory") public SimpleRabbitListenerContainerFactory LocalFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; }}

发送消息类:

import io.renren.common.utils.R;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.web.bind.annotation.*;@Api(tags = "测试双mq发送")@RestController@RequestMapping("rbt/mq")public class RbtMqController { @Autowired @Qualifier(value = "mergeRabbitTemplate") private RabbitTemplate mergerabbitTemplate; @Autowired @Qualifier(value = "LocalRabbitTemplate") private RabbitTemplate LocalrabbitTemplate; @ApiOperation("测试发送mq") @PostMapping("/PostMq/{mqone}/{mqtwo}") public Object PostMq(@RequestParam("token") String token, @PathVariable String mqone, @PathVariable String mqtwo){ mergerabbitTemplate.convertAndSend("CeshiQueue", (Object) mqone, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { long l = 40000; //设置定时发布的时间发送到延时队列 到时间后转交给死信队列 message.getMessageProperties().setExpiration(String.valueOf(l)); return message; } }); String msgTwo = "success"; LocalrabbitTemplate.convertAndSend("CeshiQueue", (Object) mqtwo, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { long l = 40000; //设置定时发布的时间发送到延时队列 到时间后转交给死信队列 message.getMessageProperties().setExpiration(String.valueOf(l)); return message; } }); return R.ok(); }}

消费者端:

消费LocalFactory对应的mq中的my-dlx-queue-Ceshi

@Component@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "LocalFactory")@Log4j2public class locallistener { @RabbitHandler public void RegularlyAddAsCheckIn(String msg) throws Exception { log.info(new Date() + "::LocalFactory收到信息::" + msg); }}

消费mergeFactoryFactory对应的mq中的my-dlx-queue-Ceshi

@Component@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "mergeFactory")@Log4j2public class mergerlistener { @RabbitHandler public void RegularlyAddAsCheckIn(String msg) throws Exception { log.info(new Date() + "::mergeFactory收到信息::" + msg); }}

完事,实测有效。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。