欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RabbitMQ学习笔记05--分布式事务

时间:2023-07-27
RabbitMQ学习笔记05

分布式事务

实现分布式事务的解决方案分布式事务案例分布式事务-可靠生产和推送确认分布式事务导致数据不一致案例

订单服务派单服务 RabbitMQ实现分布式事务可靠生产RabbitMQ实现可靠消费

try catch + 手动ack + 死信队列方式 可靠消费 分布式事务

分布式事务指事务的操作位于不同的节点上,需要保证事务的 AICD 特性。
例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。

实现分布式事务的解决方案

1.两阶段提交(2PC)
两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。
1.1准备阶段

协调者询问参与者事务是否执行成功,参与者发回事务执行结果。
1.2 提交阶段
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。

存在的问题:

同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

2.补偿事务(TCC)严选
TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

Try 阶段主要是对业务系统做检测及资源预留Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 /confirm/i阶段时,默认 - - - /confirm/i阶段是不会出错的。即:只要Try成功,/confirm/i一定成功。Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假入如 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用
1:首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
2:在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。
3:如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。
优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些
缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

3.本地消息表(异步确保)
本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。

在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。

之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。

在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。

优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。
缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

MQ 事务消息 异步场景,通用性较强,拓展性较高。
支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 Kafka 不支持。
以阿里的 RabbitMQ 中间件为例,其思路大致为:

第一阶段Prepared消息,会拿到消息的地址。 第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。

在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
-
优点: 实现了最终一致性,不需要依赖本地数据库事务。
缺点: 实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也未开源。

分布式事务案例


订单系统和配送平台可能就是两个独立的服务,消息的传递用消息中间件来完成。

分布式事务-可靠生产和推送确认


分布式事务导致数据不一致案例

新建两个springboot模块实现订单和运单两个独立服务
引入依赖:

org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-data-jdbc org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-jdbc org.springframework.boot spring-boot-starter-test test org.springframework.amqp spring-rabbit-test test mysql mysql-connector-java 5.1.10 org.projectlombok lombok true com.fasterxml.jackson.dataformat jackson-dataformat-avro org.apache.commons commons-lang3 3.6

在数据库新建表

DROP TABLE IF EXISTS `ksd_dispather_order`;CREATE TABLE `ksd_dispather_order` ( `dispatch_id` varchar(100) DEFAULT NULL, `order_id` varchar(100) NOT NULL DEFAULT '', `status` varchar(255) DEFAULT NULL, `order_content` varchar(255) DEFAULT NULL, `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `user_id` varchar(32) DEFAULT NULL, PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;DROP TABLE IF EXISTS `ksd_order`;CREATE TABLE `ksd_order` ( `order_id` VARCHAR(32) DEFAULT NULL, `user_id` VARCHAR(32) DEFAULT NULL, `order_content` VARCHAR(255) DEFAULT NULL, `create_time` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP) ENGINE=INNODB DEFAULT CHARSET=utf8;DROP TABLE IF EXISTS `ksd_order_message`;CREATE TABLE `ksd_order_message` ( `order_id` VARCHAR(100) NOT NULL, `status` INT(1) DEFAULT NULL, `order_content` VARCHAR(255) DEFAULT NULL, `unique_id` VARCHAR(100) DEFAULT NULL) ENGINE=INNODB DEFAULT CHARSET=utf8;

订单服务

application.yml配置连接jdbc及rabbitmq

server: port: 8089spring: datasource: url: jdbc:mysql://localhost:3306/rabbit?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false username: root password: xxxx driver-class-name: com.mysql.jdbc.Driver rabbitmq: username: guest password: guest virtual-host: / addresses: xxxx:5672 publisher-/confirm/i-type: correlatedlogging: level: root: debug

json字符串处理工具类

public class JsonUtil { private static ObjectMapper objectMapper = new ObjectMapper(); private static Logger log = LoggerFactory.getLogger(JsonUtil.class); static { // 对象的所有字段全部列入 objectMapper.setSerializationInclusion(Inclusion.ALWAYS); // 取消默认转换timestamps形式 objectMapper.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, false); // 忽略空Bean转json的错误 objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false); // 所有的日期格式都统一为以下的样式,即yyyy-MM-dd HH:mm:ss objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); // 忽略 在json字符串中存在,但是在java对象中不存在对应属性的情况。防止错误 objectMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); // 精度的转换问题 objectMapper.configure(DeserializationConfig.Feature.USE_BIG_DECIMAL_FOR_FLOATS, true); objectMapper.configure(DeserializationConfig.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true); } public static String obj2String(T obj) { if (obj == null) { return null; } try { return obj instanceof String ? (String) obj : objectMapper.writevalueAsString(obj); } catch (Exception e) { log.warn("Parse Object to String error", e); return null; } } public static String obj2StringPretty(T obj) { if (obj == null) { return null; } try { return obj instanceof String ? (String) obj : objectMapper.writerWithDefaultPrettyPrinter().writevalueAsString(obj); } catch (Exception e) { log.warn("Parse Object to String error", e); return null; } } public static T string2Obj(String str, Class clazz) { if (StringUtils.isEmpty(str) || clazz == null) { return null; } try { return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz); } catch (Exception e) { log.warn("Parse String to Object error", e); return null; } } public static T string2Obj(String str, TypeReference typeReference) { if (StringUtils.isEmpty(str) || typeReference == null) { return null; } try { return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference)); } catch (Exception e) { log.warn("Parse String to Object error", e); return null; } } public static T string2Obj(String str, Class<?> collectionClass, Class<?>..、elementClasses) { JavaType javaType = objectMapper.getTypeFactory().constructParametricType(collectionClass, elementClasses); try { return objectMapper.readValue(str, javaType); } catch (Exception e) { log.warn("Parse String to Object error", e); return null; } }}

pojo订单实体类

@Datapublic class Order implements java.io.Serializable{ public String orderId; public Integer userId; public String orderContent; public Date createTime;}

spring事务实现订单记录的保存,出现异常则回滚,订单不会保存到数据库

@Service@Transactional(rollbackFor = Exception.class)public class OrderDatabaseService { @Autowired private JdbcTemplate jdbcTemplate; public void saveOrder(Order order) throws Exception{ // 定义保存sql String sqlString = "insert into ksd_order(order_id,user_id,order_content)values(?,?,?)"; // 1:添加订单记录 int count = jdbcTemplate.update(sqlString,order.getOrderId(),order.getUserId(),order.getOrderContent()); if(count!=1) { throw new Exception("订单创建失败,原因[数据库操作失败]"); } //因为在下单可能会会rabbit会出现宕机,就引发消息是没有放入MQ.为来消息可靠生产,对消息做一次冗余 saveLocalMessage(order); } public void saveLocalMessage(Order order) throws Exception{ // 定义保存sql String sqlString = "insert into ksd_order_message(order_id,order_content,status,unique_id)values(?,?,?,?)"; // 添加运动记录 int count = jdbcTemplate.update(sqlString,order.getOrderId(),order.getOrderContent(),0,1); if(count!=1) { throw new Exception("出现异常,原因[数据库操作失败]"); } }}

OrderService类调用保存订单的方法,并远程访问派单服务,实现派单

@Servicepublic class OrderService { @Autowired private OrderDatabaseService orderDatabaseService; // 创建订单 @Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务 acid public void createOrder(Order orderInfo) throws Exception { // 1: 订单信息--插入订单系统,订单数据库事务 orderDatabaseService.saveOrder(orderInfo); // 2:通过Http接口发送订单信息到运单系统 String result = dispatchHttpApi(orderInfo.getOrderId()); if(!"success".equals(result)) { throw new Exception("订单创建失败,原因是运单接口调用失败!"); } } private String dispatchHttpApi(String orderId) { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); String url = "http://localhost:9000/dispatch/order?orderId="+orderId; RestTemplate restTemplate = new RestTemplate(factory);//异常 String result = restTemplate.getForObject(url, String.class); return result; }}这里的dispatchHttpApi方法通过RestTemplate通过http的get方式访问派单服务,进入到派单服务的Controller执行派单逻辑

派单服务

application.yml连接数据库及rabbitmq

server: port: 9000spring: datasource: url: jdbc:mysql://localhost:3306/rabbit?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false username: root password: xxxx driver-class-name: com.mysql.jdbc.Driver rabbitmq: username: guest password: guest listener: simple: acknowledge-mode: manual # 开启手动ack retry: enabled: true # 开启重试 max-attempts: 3 #最大重试次数 initial-interval: 2000ms #重试间隔时间 addresses: xxxx:5672logging: level: root: debug

DispatchService派单业务

@Service@Transactional(rollbackFor = Exception.class)public class DispatchService { @Autowired private JdbcTemplate jdbcTemplate; public void dispatch(String orderId) throws Exception { // 定义保存sql String sqlString = "insert into ksd_dispather_order(order_id,dispatch_id,status,order_content,user_id)values(?,?,?,?,?)"; // 添加运动记录 int count = jdbcTemplate.update(sqlString, orderId, UUID.randomUUID().toString(), 0, "xxx买了一个泡面", "1"); if (count != 1) { throw new Exception("订单创建失败,原因[数据库操作失败]"); } }}

DispatchController派单,由订单服务访问,在这里调用派单业务

@RestController@RequestMapping("/dispatch")public class DispatchController { @Autowired public DispatchService dispatchService; // 添加订单后,添加调度信息 @GetMapping("/order") public String lock(String orderId) throws Exception { dispatchService.dispatch(orderId); // 将外卖订单分配 return "success"; }}

运行派单服务,订单服务执行测试方法:

@Autowired public OrderService orderService; @Test public void orderCreated() throws Exception { //订单生成 String orderId = "1000001"; Order orderInfo = new Order(); orderInfo.setOrderId(orderId); orderInfo.setUserId(1); orderInfo.setOrderContent("买了一个方便面"); orderService.createOrder(orderInfo); System.out.println("订单创建成功......."); }

结果:
派单服务的表

订单服务的表

本地的表

在dispatchHttpApi访问派单服务的方法中,SimpleClientHttpRequestFactory设置超时:

private String dispatchHttpApi(String orderId) { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); // 链接超时 > 3秒 factory.setConnectTimeout(3000); // 处理超时 > 2秒 factory.setReadTimeout(2000); // 发送http请求 String url = "http://localhost:9000/dispatch/order?orderId="+orderId; RestTemplate restTemplate = new RestTemplate(factory);//异常 String result = restTemplate.getForObject(url, String.class); return result; }

在派单的controller中,执行派单前让线程休眠三秒:

@GetMapping("/order") public String lock(String orderId) throws Exception { if(orderId.equals("1000001")) { Thread.sleep(1000L); // 模拟业务耗时,接口调用者会认为超时 } dispatchService.dispatch(orderId); // 将外卖订单分配给小哥 return "success"; }

再启动测试,结果是只有派单服务的表中有订单信息,而订单表和本地表则没有数据,原因在于,订单服务出现异常而事务回滚,但这影响不到派单服务,派单服务只是sleep了3秒,然后继续执行派单业务。这就是分布式事务导致数据不一致的情况。

RabbitMQ实现分布式事务可靠生产


存在的问题只有消息队列会出现问题,使用消息冗余的方式来处理,这样使消息不会丢失,然后用定时器去重新发送消息,等待消息队列中间件恢复。
消息冗余:

rabbitmq的确认机制

@PostConstruct public void regCallback() { // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性 rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() { @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { System.out.println("cause:"+cause); String orderId = correlationData.getId(); // 如果ack为true代表消息已经收到 if (!ack) { // 这里可能要进行其他的方式进行存储 System.out.println("MQ队列应答失败,orderId是:" + orderId); return; } try { String updatesql = "update ksd_order_message set status = 1 where order_id = ?"; int count = jdbcTemplate.update(updatesql, orderId); if (count == 1) { System.out.println("本地消息状态修改成功,消息成功投递到消息队列中..."); } } catch (Exception ex) { System.out.println("本地消息状态修改失败,出现异常:" + ex.getMessage()); } } }); }

ack即为给予生产者的消息回执

RabbitMQ将订单服务的消息投递给派单服务,取代http调用的方式

RabbitMQ消息业务

@Servicepublic class MQOrderService { @Autowired private OrderDatabaseService orderDatabaseService; @Autowired private OrderMQService orderMQService; // 创建订单 public void createOrder(Order orderInfo) throws Exception { // 1: 订单信息--插入丁订单系统,订单数据库事务 orderDatabaseService.saveOrder(orderInfo); // 2:通過Http接口发送订单信息到运单系统 orderMQService.sendMessage(orderInfo); }}

投递消息进队列以及确认机制:

@Servicepublic class OrderMQService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private JdbcTemplate jdbcTemplate; @PostConstruct public void regCallback() { // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性 rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() { @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { System.out.println("cause:"+cause); String orderId = correlationData.getId(); // 如果ack为true代表消息已经收到 if (!ack) { // 这里可能要进行其他的方式进行存储 System.out.println("MQ队列应答失败,orderId是:" + orderId); return; } try { String updatesql = "update ksd_order_message set status = 1 where order_id = ?"; int count = jdbcTemplate.update(updatesql, orderId); if (count == 1) { System.out.println("本地消息状态修改成功,消息成功投递到消息队列中..."); } } catch (Exception ex) { System.out.println("本地消息状态修改失败,出现异常:" + ex.getMessage()); } } }); } public void sendMessage(Order order) { // 通过MQ发送消息 rabbitTemplate.convertAndSend("order_fanout_exchange", "", JsonUtil.obj2String(order), new CorrelationData(order.getOrderId())); }}

RabbitMQ配置

@Configurationpublic class RabbitMQConfiguration { @Bean public FanoutExchange deadExchange() { return new FanoutExchange("dead_order_fanout_exchange", true, false); } @Bean public Queue deadOrderQueue() { return new Queue("dead.order.queue", true); } @Bean public Binding bindDeadOrder() { return BindingBuilder.bind(deadOrderQueue()).to(deadExchange()); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("order_fanout_exchange", true, false); } @Bean public Queue orderQueue() { Map args = new HashMap<>(); args.put("x-dead-letter-exchange", "dead_order_fanout_exchange"); return new Queue("order.queue", true, false, false, args); } @Bean public Binding bindorder() { return BindingBuilder.bind(orderQueue()).to(fanoutExchange()); }}

测试方法:

@Test public void orderCreatedMQ() throws Exception { //订单生成 String orderId = "1000001"; Order orderInfo = new Order(); orderInfo.setOrderId(orderId); orderInfo.setUserId(1); orderInfo.setOrderContent("买了一个方便面"); mqOrderService.createOrder(orderInfo); System.out.println("订单创建成功......."); }


可以看到消息回执ack为true,这代表着消息可靠地投放到了消息队列,生产者收到了确认的回执,以此实现可靠消费,表中的订单数据的status也会在收到消息回执后置为1

当发现订单消息的status为0,说明没有可靠投递到消息队列中,这时可以依靠之前的消息冗余,加上定时器,重新去投放消息。

RabbitMQ实现可靠消费


消息消费时如果出现异常,消息重试机制会导致该服务出现死循环,解决消息重试的方案:
1.控制重发的次数
2.try catch + 手动ack
3.try catch + 手动ack + 死信队列
配置重试次数

rabbitmq: username: guest password: guest listener: simple: acknowledge-mode: manual # 这里是开启手动ack,让程序去控制MQ的消息的重发和删除和转移 retry: enabled: true # 开启重试 max-attempts: 3 #最大重试次数 initial-interval: 2000ms #重试间隔时间 addresses: xxxx:5672

try catch + 手动ack + 死信队列方式 可靠消费

@Servicepublic class OrderMqConsumer { @Autowired private DispatchService dispatchService; private int count = 1; // 解决消息重试的集中方案: // 1: 控制重发的次数 + 死信队列 // 2: try+catch+手动ack // 3: try+catch+手动ack + 死信队列处理 + 人工干预 @RabbitListener(queues = {"order.queue"}) public void messageconsumer(String ordermsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { try { // 1:获取消息队列的消息 System.out.println("收到MQ的消息是: " + ordermsg + ",count = " + count++); // 2: 获取订单服务的信息 Order order = JsonUtil.string2Obj(ordermsg, Order.class); // 3: 获取订单id String orderId = order.getOrderId(); // 4:保存运单 dispatchService.dispatch(orderId); System.out.println(1 / 0); //出现异常 //手动ack告诉mq消息已经正常消费 channel.basicAck(tag, false); } catch (Exception ex) { //如果出现异常的情况下,根据实际的情况去进行重发 //重发一次后,丢失,还是日记,存库根据自己的业务场景去决定 //参数1:消息的tag 参数2:false 多条处理 参数3:requeue 重发 // false 不会重发,会把消息打入到死信队列 // true 的会会死循环的重发,建议如果使用true的话,不加try/catch否则就会造成死循环 channel.basicNack(tag, false, false);// 死信队列 } }}

出现异常后,catch中将消息拒绝,不重发,转移到死信队列中。

死信队列消费:

@Servicepublic class DeadMqConsumer { @Autowired private DispatchService dispatchService; // 解决消息重试的集中方案: // 1: 控制重发的次数 + 死信队列 // 2: try+catch+手动ack // 3: try+catch+手动ack + 死信队列处理 + 人工干预 @RabbitListener(queues = {"dead.order.queue"}) public void messageconsumer(String ordermsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { try { // 1:获取消息队列的消息 System.out.println("收到MQ的消息是: " + ordermsg ); // 2: 获取订单服务的信息 Order order = JsonUtil.string2Obj(ordermsg, Order.class); // 3: 获取订单id String orderId = order.getOrderId(); // 幂等性问题 //int count = countOrderById(orderId); // 4:保存运单 //if(count==0)dispatchService.dispatch(orderId); //if(count>0)dispatchService.updateDispatch(orderId); dispatchService.dispatch(orderId); // 3:手动ack告诉mq消息已经正常消费 channel.basicAck(tag, false); } catch (Exception ex) { System.out.println("人工干预"); System.out.println("发短信预警"); System.out.println("同时把消息转移别的存储DB"); channel.basicNack(tag, false,false); } }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。