业务超时这种场景我们经常碰到。举例来说:支付在请求到支付网关后但支付网关那或者是因为第三方支付渠道问题、亦或是网络等问题导致这笔支付回调没有“成功”或者根本就没有回调请求来通知企业方相关的支付状态。对于此情况我们亲爱的程序员们我看了最多的设计就是喜欢用:每隔X分钟跑一个JOB,然后这个JOB去把所有的status=未支付成功的订单状态在数据库里改一下状态(以便于后续业务操作)。
哎。。。问题出就出在这个跑JOB上面。这也是程序员或者相关研发团队缺少训练的典型事故。数据库死锁往往就是在这种情况下发生的。
试想一下如果一个中大型工程,有几百个这样的JOB。。。嘿嘿嘿,你公司的DBA得有多崩溃。不信,来看一个实例吧。
模拟死锁 准备工作 我们写书一段mybatis dao,它会从数据库里捞出status为1(我们假设status=1为一直未响应的支付超时状态)并对它的状态进行变更成“2”的“业务补偿模拟”;相应的我们书一个service方法来调用这个dao方法;使用junit test case启动5个线程并且设成同等优先级每隔X秒跑一次(为了快速把死锁问题重现我们用1秒超时来模拟支付超时),因此每隔1秒会有5个线程(模拟5个payment的集群幅本)同时去update数据库中的状态;我们在数据库的payment表里造了884万条数据;PaymentDao.xml相对应的PaymentDao.java的内容
public int updatePaymentStatusByLimit();
PaymentService.java@Resource private PaymentDao paymentDao; @Transactional(rollbackFor = Exception.class) public void updatePaymentStatusByLimit() throws Exception { try { int records = paymentDao.updatePaymentStatusByLimit(); logger.info(">>>>>>updated {}", records); } catch (Exception e) { logger.error(">>>>>>updatePamentStatusByLimit error: " + e.getMessage(), e); throw new Exception(">>>>>>updatePamentStatusByLimit error: " + e.getMessage(), e); } }
很简单,书写代码没有压力、增删改查谁不会!
相关的单元测试中启动5个线程的代码package org.mk.demo.skypayment.service;import java.util.List;import java.util.concurrent.CountDownLatch;import javax.annotation.Resource;import org.junit.jupiter.api.Test;import org.mk.demo.skypayment.SkyPaymentApp;import org.mk.demo.skypayment.vo.PaymentBean;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.test.context.SpringBootTest;import net.minidev.json.JSONArray;@SpringBootTest(classes = {SkyPaymentApp.class})public class PaymentServiceTest { private Logger logger = LoggerFactory.getLogger(this.getClass()); private int threadCnt = 5; private CountDownLatch latch = new CountDownLatch(threadCnt); @Resource private PaymentService updatePaymentStatusService; @Test public void updatePaymentStatusByLimit() throws Exception { for (int i = 0; i < threadCnt; i++) { (new Thread(new UpdatorRunner(), "JUNIT多线程测试")).start(); } latch.await(); } class UpdatorRunner implements Runnable { @Override public void run() { logger.info(">>>>>>[当前线程ID]:" + Thread.currentThread().getId()); try { while (true) { updatePaymentStatusService.updatePaymentStatusByLimit(); Thread.sleep(1000); } } catch (Exception e) { logger.error(e.getMessage(), e); } latch.countDown(); // 执行完毕,计数器减1 } }}
造了847万条数据在数据库里CREATE TABLE `sky_payment` ( `pay_id` int(11) NOT NULL AUTO_INCREMENT, `status` tinyint(3) DEFAULT NULL, `transf_amount` varchar(45) DEFAULT NULL, `created_date` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `updated_date` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`pay_id`)) ENGINE=InnoDB AUTO_INCREMENT=9098509 DEFAULT CHARSET=utf8mb4;
然后运行一下我们看到了什么?哇。。。我可是用了两台512GB内存的数据库,CPU是128核的服务器安装成了mysql master slaver读写分理的模式啊,只用了1分钟就出现了以下介个玩意儿。。。
大家知道吧,一般在生产上我们的单个微服务的集群幅本一般为10-20个左右幅本的量去支撑日常的9,000~1万左右的并发。因此不需要5分钟的,往往1-2分钟内如果两个幅本间有重复的update语句,整个生产db就会直接卡死。
此时如果你的数据库里数据量单表在>百万行数时,100%就会产生“DB主从延迟”,然后“一路向上卡”,进而你的整个商城也就“拜拜”了您哎。
这一切源于对数据库中的批量update(含delete)原理的不熟悉不要只知道增、删、改、查。
批量的更新(含delete)动作,尤其是update...where或者是delete...where是锁数据的。一个tomcat/netty(反正就是一个spring boot的应用)就是一个线程,当>1个线程同时运行了更新...where且同时不同update/delete的where条件中有数据重复,就会产生死锁。
这个问题在单机上永不会发生,只有在集群环境中发生。
因为在生产环境,我们不可能单机运行一个业务模块的,我们要应对外部的流量就势必多机多集群运行来接受外部的流量。
这下好,你单机运行倒是保证了业务的正确性,当流量一大你要弹出几台幅本时瞬间卡爆了整个商城应用。
这是典型的“不符合云原生设计”的案例。
说了难听点这叫“脖子细,胃小”,要么吃不下,要吃得下的话却把了个胃撑爆。。。你这样的设计、代码还让不让企业玩了哈?你对企业对团队有不满你说一声呢,何必用这种“手段”来玩死企业呢?
嘿嘿,以上开个玩笑而己哈。
下面来让我们看正确的设计
使用“延时”队列来解决业务补偿类跑批的设计多说一句:
其实上述设计也可以改成把update...where改成限定死在每一条update语句是by主键,然后把几百个、上千个(依赖于数据库的性能)update by id串成mybatis的batch update去处理,当by主键去处理时数据库是永远不会发生死锁的。但是这也不符合云原生。因为你跑批始终是跑在单机上的,它并不会随着你的应用幅本数增加而同时增加并行计算、处理能力。
还有一个点,你的job是频率性跑补偿,在没有“业务需要补偿”时你的job实际上是在空转,极其消耗系统资源,这也是很“恐龙”级的设计、不合理的设计。
云原生的宗旨之一就是横向扩展集群幅本时你的系统的计算、处理能力也会随着扩展。
现在的RabbitMQ最新的版本如3.9已经拥有了“延时队列”了,Redis里也有延时队列的成熟特性。
不过我们这边还是利用RabbitMQ3.8.x,因为大多公司用的RabbitMQ还是是3.7-3.8版。如果为了用新特性对企业的底层架构这种改动显然风险过高。
在RabbitMQ3.8.x中有一种队列叫死信队列,这个死信队列就是可以用来作“延时操作的”。
我们使用spring boot2.x结合RabbitTemplate对于RabbitMQ的死信队列的使用如下配置
application-local.ymlmysql: datasource: db: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver minIdle: 50 initialSize: 50 maxActive: 300 maxWait: 1000 testOnBorrow: false testOnReturn: true testWhileIdle: true validationQuery: select 1 validationQueryTimeout: 1 timeBetweenEvictionRunsMillis: 5000 ConnectionErrorRetryAttempts: 3 NotFullTimeoutRetryCount: 3 numTestsPerEvictionRun: 10 minEvictableIdleTimeMillis: 480000 maxEvictableIdleTimeMillis: 480000 keepAliveBetweenTimeMillis: 480000 keepalive: true poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 512 maxOpenPreparedStatements: 512 master: #master db type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/ecom?useUnicode=true&characterEncoding=utf-8&useSSL=false&useAffectedRows=true&autoReconnect=true username: root password: 111111 slaver: #slaver db type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3307/ecom?useUnicode=true&characterEncoding=utf-8&useSSL=false&useAffectedRows=true&autoReconnect=true username: root password: 111111 server: port: 9080 tomcat: max-http-post-size: -1 max-http-header-size: 10240000spring: application: name: skypayment servlet: multipart: max-file-size: 10MB max-request-size: 10MB context-path: /skypayment #配置rabbitMq 服务器 rabbitmq: addresses: localhost:5672 username: admin password: admin #虚拟host 可以不设置,使用server默认host virtual-host: / publisher-/confirm/i-type: CORRELATED listener: ## simple类型 simple: #最小消费者数量 concurrency: 32 #最大的消费者数量 maxConcurrency: 64 #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量 prefetch: 32 retry: enabled: false#rabbitmq的超时用于队列的超时使用queue: expire: 1000
RabbitMqConfig.javapackage org.mk.demo.skypayment.config.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.ExchangeBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;@Componentpublic class RabbitMqConfig { public static final String PAYMENT_EXCHANGE = "payment.exchange"; public static final String PAYMENT_DL_EXCHANGE = "payment.dl.exchange"; public static final String PAYMENT_QUEUE = "payment.queue"; public static final String PAYMENT_DEAD_QUEUE = "payment.queue.dead"; public static final String PAYMENT_FANOUT_EXCHANGE = "paymentFanoutExchange"; @Value("${queue.expire:5000}") private long queueExpire; @Bean public TopicExchange paymentExchange() { return (TopicExchange)ExchangeBuilder.topicExchange(PAYMENT_EXCHANGE).durable(true).build(); } @Bean public TopicExchange paymentExchangeDl() { return (TopicExchange)ExchangeBuilder.topicExchange(PAYMENT_DL_EXCHANGE).durable(true).build(); } @Bean public Queue paymentQueue() { return QueueBuilder.durable(PAYMENT_QUEUE).withArgument("x-dead-letter-exchange", PAYMENT_DL_EXCHANGE)// 设置死信交换机 .withArgument("x-message-ttl", queueExpire).withArgument("x-dead-letter-routing-key", PAYMENT_DEAD_QUEUE)// 设置死信routingKey .build(); } @Bean public Queue paymentDelayQueue() { return QueueBuilder.durable(PAYMENT_DEAD_QUEUE).build(); } @Bean public Binding bindDeadBuilders() { return BindingBuilder.bind(paymentDelayQueue()).to(paymentExchangeDl()).with(PAYMENT_DEAD_QUEUE); } @Bean public Binding bindBuilders() { return BindingBuilder.bind(paymentQueue()).to(paymentExchange()).with(PAYMENT_QUEUE); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(PAYMENT_FANOUT_EXCHANGE); } @Bean public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(producerJackson2MessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); }}
RabbitMqListenerConfig.javapackage org.mk.demo.skypayment.config.mq;import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.converter.MappingJackson2MessageConverter;import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;@Configurationpublic class RabbitMqListenerConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registor) { registor.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); }}
当spring boot工程启动后它会在RabbitMQ里建立如下主要的内容
两个queue,一个正常的队列,一个以.dead结束的队列;
两个exchange,一个正常的exchange对应着正常的队列,一个.dl.exchange对应着.dead结束的队列。
注:
在运行我的项目例子时要记得给自己本机的rabbitmq的连接用户分配到可以建立queue和exchange的权限,如何给rabbitmq自带用户分配权限网上太多不在此作详细说明了。
上述中的payment.queue有一个过期时间,此处我们设置成了1秒,用来模拟正常支付回调超时。一旦这个queue到达了1秒的超时,它就会被转发到payment.dl.exchange里,然后此时如果你的应用通过"监听“对应着payment.dl.exchange中的payment.queue.dead时,你就可以得到这个消息。得到了这个消息后就是你正常的“业务超时补偿”的那些“业务代码处理了”,此处我们为update by id方法。如下代码块。
PaymentDao.xml<?xml version="1.0" encoding="UTF-8"?>
相对应的PaymentDao.java
package org.mk.demo.skypayment.dao;import org.springframework.stereotype.Repository;import java.util.List;import org.mk.demo.skypayment.vo.PaymentBean;@Repositorypublic interface PaymentDao { public int updatePaymentStatusByLimit(); public int updatePaymentStatusById(PaymentBean payment);}
Publisher.java用于产生正常支付请求到第三方支付渠道或者是支付网关的的请求,这个类就是正常提交支付请求的同时对这个队列设一个超时。
如果在超时内有响应那么这个队列就不会产生“相应的死信队列”;反过来就是“如果你在死信队列中接到了相应的订单流水,那么一定代表了这一条支付请求没有在业务允许的超时时间内被响应,因此就需要补偿了”;
为了模拟这个场景,我们不会去处理这条请求,那么这条请求永远必进死信队列
package org.mk.demo.skypayment.service;import org.mk.demo.skypayment.config.mq.RabbitMqConfig;import org.mk.demo.skypayment.vo.PaymentBean;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Publisher { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RabbitTemplate rabbitTemplate; public void publishPaymentStatusChange(PaymentBean payment) { try { rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.convertAndSend(RabbitMqConfig.PAYMENT_EXCHANGE, RabbitMqConfig.PAYMENT_QUEUE, payment); } catch (Exception ex) { logger.error(">>>>>>publish exception: " + ex.getMessage(), ex); } }}
Subscriber.java用于实现监听死信队列得到消息后的业务补偿操作用。
package org.mk.demo.skypayment.service;import javax.annotation.Resource;import org.mk.demo.skypayment.config.mq.RabbitMqConfig;import org.mk.demo.skypayment.vo.PaymentBean;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class Subscriber { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private PaymentService paymentService; @RabbitListener(queues = RabbitMqConfig.PAYMENT_DEAD_QUEUE) public void receiveDL(PaymentBean payment) { try { if (payment != null) { logger.info(">>>>>>从死信队列拿到数据并开发更改:payId->{} status->{} transfAmount->{}", payment.getPayId(), payment.getStatus(), payment.getTransfAmount()); int records = paymentService.updatePaymentStatusById(payment); logger.info(">>>>>>修改: {}", records); } } catch (Exception ex) { logger.error(">>>>>>Subscriber from dead queue exception: " + ex.getMessage(), ex); } }}
运行使用死信队列来实现业务补偿的例子接下去我们先以两个幅本来运行,前端压入这么多请求每一个请求都会产生一条死信。
为此我们先把两个spring boot的幅本做成集群模式如下nginix设置:
nginix中对spring boot集群做代理的核心配置
upstream skypayment-lb { server localhost:9080 weight=1 fail_timeout=1s; server localhost:9081 weight=1 fail_timeout=1s; } server { location /skypayment/ { port_in_redirect on; # 负载配置 proxy_pass http://skypayment-lb/; proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; add_header backendIP $upstream_addr; add_header backendCode $upstream_status; }
运行了近3分钟了,没有任何的死锁。因为不可能产生死锁,就是因为每次的数据库里的update都是by id。
我们继续观察rabbitmq的控制台http://localhost:15672中队列的状态。
发觉有700-1,000个unack的队列有积压。这不是问题,因为前端的请求量太大了,你可以认为是并发的2万3千多个支付请求需要处理呢!
如果你的企业一天有这么多笔支付,恭喜你,你一年肯定是>3个亿的收入的。
为了解决这个不断的有unack消息的积压,我。。。多启动一个幅本,让应用从原来的两个幅本变成了三个幅本同时在运行。
过了10秒不到,整个RabbitMQ城的unack队列瞬间减少到个位数。
这就是云原生中提到过的,你的应用不能受限于相应的资源限制并且随着应用幅本的横向弹性扩容系统的计算能力也会随之扩大。
最终,当前端的2万3千多个请求结束后,RabbitMQ继续用了10秒左右把一些未处理完的队列中的支付状态变更完后整个系统归于平静。
0 error、0死锁、系统响应高、吐吞量大的不得 、微服务、云原生。
附件 parent-pom.xmlpackage org.mk.demo.skypayment;import org.mybatis.spring.annotation.MapperScan;import org.redisson.spring.starter.RedissonAutoConfiguration;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;import org.springframework.context.annotation.ComponentScan;@SpringBootApplication@ComponentScan(basePackages = {"org.mk"})@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, RedisAutoConfiguration.class, RedissonAutoConfiguration.class, RedisRepositoriesAutoConfiguration.class})@MapperScan("org.mk.demo.skypayment.dao")public class SkyPaymentApp { public static void main(String[] args) { SpringApplication.run(SkyPaymentApp.class); }}
PaymentController.javapackage org.mk.demo.skypayment.controller;import javax.annotation.Resource;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.mk.demo.util.response.ResponseBean;import org.mk.demo.util.response.ResponseCodeEnum;import org.mk.demo.skypayment.service.Publisher;import org.mk.demo.skypayment.vo.PaymentBean;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.ResponseBody;import org.springframework.web.bind.annotation.RestController;import reactor.core.publisher.Mono;@RestControllerpublic class PaymentController { private Logger logger = LogManager.getLogger(this.getClass()); @Resource private Publisher publisher; @PostMapping(value = "/updateStatus", produces = "application/json") @ResponseBody public Mono