安装RabbitMq: 个人是先在Win10便携机上安装VMWare Workstation, 再安装Cent OS 操作系统,在此基础上安装RabbitMQ。
安装过程可以参考这篇博客: https://blog.csdn.net/hsxy123123/article/details/104006744
需要注意RabbitMQ官网提供的erlang与RabbitMQ的配套版本,按配套版本安装。
安装后的界面截图:
消息生产者Demo代码private final static String EXCHANGE_NAME = "elon_exchange"; public void produceMessage2Exchange(String messageBody) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.*.*.*"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("*******"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.basicPublish(EXCHANGE_NAME, "", null, messageBody.getBytes(Charsets.UTF_8)); LOGGER.info("Sent {}", messageBody); } catch (Exception e) { LOGGER.info("Produce message fail.", e); } }
消息消费者Demo代码private final static String EXCHANGE_NAME = "elon_exchange"; public void consumeMessageFromExchange(){ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.*.*.*"); factory.setPort(5672); factory.setUsername("***"); factory.setPassword("*******"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); LOGGER.info("Get queue name:{}", queueName); channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); LOGGER.info("Receive message from binding exchange:{}", message); }; channel.basicConsume(queueName, true, deliverCallback, tag->{}); } catch (Exception e) { LOGGER.error("Consume message exception.", e); } }
核心逻辑是获取一个分配的队列名,绑定到交换器,再通过队列接收交换器转发的消息。多个队列可绑定到同一个交换器,从而实现广播消息。