欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ应用Demo:使用exchange广播消息

时间:2023-07-27
环境准备

安装RabbitMq: 个人是先在Win10便携机上安装VMWare Workstation, 再安装Cent OS 操作系统,在此基础上安装RabbitMQ。

安装过程可以参考这篇博客: https://blog.csdn.net/hsxy123123/article/details/104006744

需要注意RabbitMQ官网提供的erlang与RabbitMQ的配套版本,按配套版本安装。

安装后的界面截图:

消息生产者Demo代码

private final static String EXCHANGE_NAME = "elon_exchange"; public void produceMessage2Exchange(String messageBody) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.*.*.*"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("*******"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.basicPublish(EXCHANGE_NAME, "", null, messageBody.getBytes(Charsets.UTF_8)); LOGGER.info("Sent {}", messageBody); } catch (Exception e) { LOGGER.info("Produce message fail.", e); } }

消息消费者Demo代码

private final static String EXCHANGE_NAME = "elon_exchange"; public void consumeMessageFromExchange(){ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.*.*.*"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("*******"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); LOGGER.info("Get queue name:{}", queueName); channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOGGER.info("Receive message from binding exchange:{}", message); }; channel.basicConsume(queueName, true, deliverCallback, tag->{}); } catch (Exception e) { LOGGER.error("Consume message exception.", e); } }

核心逻辑是获取一个分配的队列名,绑定到交换器,再通过队列接收交换器转发的消息。多个队列可绑定到同一个交换器,从而实现广播消息。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。