欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RocketMQ(二)主题统一配置

时间:2023-04-16

目录

前言

代码实现

1、引入依赖

2、异步消息与异步延迟消息生产者工具类-RocketMqUtil

3、MQ常量配置:消息主题、消息标签、消息分组

4、调用MQ

5、MQ监听类


前言

Topic表示一类消息的集合,每个主题包含若干个消息,每个消息只属于一个主题,是RocketMQ进行消息订阅的基本单位。topic:message=1:n;  message:topic=1:1

一个生产者可以同时发送多种topic消息,而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。producer:topic=1:n;  consumer:topic=1:1

(一) RocketMQ异步消息_不如自成宇宙-CSDN博客

RocketMQ官网:Quick Start - Apache RocketMQ

下载安装:windows下RocketMQ下载安装教程 - darendu - 博客园

分布式事务消息:SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务_飘渺Jam的博客-CSDN博客

MQ选型:关于ActiveMQ、RocketMQ、RabbitMQ、Kafka一些总结和区别 - Alano的自嘲 - 博客园

代码实现

1、引入依赖

org.apache.rocketmq rocketmq-spring-boot-starter

2、异步消息与异步延迟消息生产者工具类-RocketMqUtil

package com.xx.xx.mq.util;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import java.io.Serializable;@Component@RequiredArgsConstructor@Slf4jpublic class RocketMqUtil { private final RocketMQTemplate rocketMQTemplate; public void asyncSendDelay(String topic, String tag, T msg, int delayLevel) { Message message = MessageBuilder.withPayload(msg).build(); log.info("发送的Topic={},tag={},内容={}", topic, tag, msg); rocketMQTemplate.asyncSend(topic + ":" + tag, message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("发送延时消息成功的Topic={},tag={},内容={}", topic, tag, msg); } @Override public void onException(Throwable throwable) { log.error("发送延时消息失败的Topic={},tag={},内容={}", topic, tag, msg); } }, 3000L, delayLevel); } public void asyncSend(String topic, String tag, T msg) { log.info("发送的Topic={},tag={},内容={}", topic, tag, msg); rocketMQTemplate.asyncSend(topic + ":" + tag, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("发送成功的Topic={},tag={},内容={}", topic, tag, msg); } @Override public void onException(Throwable throwable) { log.error("发送失败的Topic={},tag={},内容={}", topic, tag, msg); } }); } public void convertAndSend(String topic, String tag, Object msg) { log.info("发送的Topic={},tag={},内容={}", topic, tag, msg); rocketMQTemplate.convertAndSend(topic + ":" + tag, msg); }}

3、MQ常量配置:消息主题、消息标签、消息分组

package com.xx.xx.common.constant;public final class MqConstants { public static final String TEST_TOPIC = "test_topic"; public static final String TEST_TAG = "test_tag"; public static final String TEST_GROUP = "test_group"; public static final String ORDER_TOPIC = "order_topic"; public static final String ORDER_CANCEL_TAG = "order_cancel_tag"; public static final String ORDER_CANCEL_GROUP = "order_cancel_group";}

4、调用MQ

//发送延迟队列,1小时未支付自动取消 rocketMqUtil.asyncSendDelay(MqConstants.ORDER_TOPIC, MqConstants.ORDER_CANCEL_TAG, order.getId(), 9);

5、MQ监听类

package com.xx.xx.listener;import com.xx.xx.common.constant.MqConstants;import com.xx.order.service.OrderService;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = MqConstants.ORDER_TOPIC, selectorexpression = MqConstants.ORDER_CANCEL_TAG, consumerGroup = MqConstants.ORDER_CANCEL_GROUP)@Slf4j@RequiredArgsConstructorpublic class OrderCancelMqListener implements RocketMQListener { private final OrderService orderService; @Override public void onMessage(Long id) { log.info("下单后延迟1小时未支付订单将自动取消id:{}", id); try { orderService.autoCancel(id); } catch (Exception e) { log.error("取消订单异常", e); } }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。