欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Spring+Redis+RabbitMQ限流和秒杀项目的开发

时间:2023-08-22

本文将围绕高并发场景中的限流和秒杀需求综合演示Spring Boot整合JPA、Redis缓存和RabbitMQ消息队列的做法。

本项目将通过整合Springboot和Redis以及Lua脚本来实现限流和秒杀的效果,将通过RabbitMQ消息队列来实现异步保存秒杀结果的效果。

一、项目概述

本项目将要实现的秒杀是指商家在某个时间段以非常低的价格销售商品的一种营销活动。

由于商品价格非常低,因此单位时间内发起购买商品的请求会非常多,从而会对系统造巨大的压力。对此,在一些秒杀系统中往往会整合限流的功能,同时会通过消息队列异步地保存秒杀结果。

本章将要实现的限流和秒杀功能归纳如下:

(1)通过Spring Boot的控制器类对外接收秒杀请求。

(2)针对请求进行限流操作,比如秒杀商品的数量是10个,就限定在秒杀开始后的20秒内只有100个请求能参加秒杀,该操作是通过Redis来实现的。

(3)通过限流检验的这些请求将会同时竞争若干个秒杀商品。该操作将通过基于Redis的Lua脚本来实现。

(4)为了降低数据库的压力,秒杀成功的记录将通过RabbitMQ队列以异步的方式记录到数据库中。

(5)同时,将通过RestTemple对象以多线程的方式模拟发送秒杀请求,以此来观察本秒杀系统的运行效果。

也就是说,本系统会综合用到Spring Boot、JPA、Redis和RabbitMQ,相关组件之间的关系如图所示。

二、基于Redis的Lua脚本分析

Lua使用标准C语言开发而成的,它是一种轻量级的脚本语言,可嵌入基于Redis等的应用程序中。Lua脚本可以驻留在内存中,所以具有较高的性能,适用于处理高并发的场景。

Lua脚本的特性

Lua脚本语言是由巴西一所大学的Roberto lerusalimschy 、 Waldemar Celes和 LnHenrique de Figuciredo设计而成的,它具有如下两大特性

(1)轻量性:Lua只具有一些核心和最基本的库,所以非常轻便,非常适合嵌入由其他语言编写的代码中。

(2)扩展性:Lua语言中预留了扩展接口和相关扩展机制,这样在Lua语言中就能很方便地引入其他开发语言的功能,

本章给出的秒杀场景中会向Redis服务器发送多条指令,为了降低网络调用的开销,会把相关Redis命令放在Lua脚本里。通过调用Lua脚本只需要耗费少量的网络调用代价就能执行多条Redis命令。

此外,秒杀相关的Redis语句还需要具备原子性,即这些语句要么全都执行,要么全都不执行。而Lua脚本是作为一个整体来执行的,所以可以充分地确保相关秒杀语句的原子性。

在Redis中引入Lua脚本

在启动Redis服务器以后,可以通过redis-cli命令运行lua脚本,具体步骤如下:

可以在C:workredisConflua目录中创建redisCallLua.lua文件,在其中编写Lua脚本,注意,Lua脚本文件的扩展名一般都是.lua。

在第一步创建的redisCallLua.lua文件中加入一行代码,在其中通过redis.call命令执行set name Peter的命令,

redis.call('set', 'name', 'Peter')

通过rdis.call方法在Redis中调用Lua脚本时,第一个参数是Redis命令,比如这里是set,第二个参数以及之后的参数是执行该条Redis命令的参数。

通过如下的--eval命令执行第二步定义的Lua脚本,其中C:workredisConflua是这条Lua脚本所在的路径,而redisCallLua.lua是脚本名。

redis-cli --eval C:workredisConfluaredisCallLua.lua

上述命令运行后,得到的返回结果是空(nil),原因是该Lua脚本只是通过set命令设置了值,并没有返回结果。不过通过get name命令就能看到通过这条Lua脚本缓存的name值,具体是Peter。

如果Lua脚本包含的语句很少,那么还可以直接用eval命令来执行该脚本,具体做法是,
先通过redis-cli语句连接到Redis服务器,随后再执行如下eval命令:

eval "redis.call('set','BookName','Spring Boot')" 0

从上述语句中能看到,在该条eval命令之后通过双引号引入了待执行的Lua脚本,在该
脚本中依然是通过redis.call语句执行Redis的set命令,进行设置缓存的操作。

在该eval命令之后还指定了Lua脚本中KEYS类型参数的个数,这里是0,表示该Lua脚本没有KEYS类型的参数。注意,这里设置的是KEYS类型的参数,而不是ARGV类型的参数,下文将详细说明这两种参数的差别。

Lua脚本的返回值和参数

在Lua脚本中,可以通过retum语句返回执行的结果,这部分对应的语法比较简单。

同时,Redis在通过eval命令执行Lua脚本时,可以传入KEYS和ARGV这两种不同类型的参数,它们的区别是,可以用KEYS参数来传入Redis命令所需要的参数,可以用ARGV参数来传入自定义的参数,通过如下两个eval执行Lua脚本的命令,可以看到这两种参数的差别。

127.0.0.1:6379> eval "return {KEYS[1],ARGV[1],ARGV[2]" 1 keyono argvone argvtwo1) "keyone"2) "argvone"3) "argvtwo"127.0.0.1:6379> eval "return {KEYS[1].ARGV[1],ARGV[2]}" 2 keyone argvone argvtwo1) "key1"2) "argvtwo"

在第1行eval语句中,KEYS[1]表示KEYS类型的第一个参数,而ARGV[1]和ARGV[2]对应地表示第一个和第二个ARGV类型的参数。

在第1行eval语句中,双引号之后的1表示KEYS类型的参数个数是1,所以统计参数个数时并不把ARGV自定义类型的参数统计在内,随后的keyone, argvone和argvtwo分别对应KEYS[1]、ARGV[1]和ARGV[2].

执行第一行对应的Lua脚本时,会看到如第2~4行所示的输出结果,这里输出了KEYS[1]、
ARGV[1]和ARGV[2]这3个参数对应的值。

第5行脚本和第1行的差别是,表示KEYS参数个数的值从1变成了2。但这里第2个参数是ARGV类型的,而不是KEYS类型的,所以这条Lua脚本语句会抛弃第2个参数,即ARGV[1],通过第6行和第7行的输出结果能验证这点。

所以,在通过eval命令执行Lua脚本时,一定要确保参数个数和类型的正确性。同时,这里再次提醒,eval命令之后传入的参数个数是KEYS类型参数的个数,而不是ARGV类型的。

分支语句

在Lua脚本中,可以通过if…else语句来控制代码的执行流程,具体语法如下:

if(布尔表达式) then布尔表达式是true时执行的语句else布尔表达式是false时执行的语句end

通过如下的ifDemo.lua范例,读者可以看到在Lua脚本中使用分支语句的做法。

if redis.call('exists','studentID')==1 thenreturn 'Existed'elseredis.call('set','StudentID','001');return 'Not Existed'end

在第1行中,通过if语句判断redis.call命令执行的exists语句是否返回1,如果是,则表示StudentID键存在,就会执行第2行的returm 'Existed’语句返回Existed,否则走第3行的else流程,执行第4行和第5行的语句,设置StudentID的值,并通过retum语句返回Not Existed。

由此可以看到在Lua脚本中使用if分支语句的做法。该脚本的运行结果是:第一次运行时,由于StudentID键不存在,因此会走else流程,从而看到Not Existed的输出,而在第二次运行时,由于此时该键已经存在,因此会直接输出’Existed’的结果。

三、实现限流和秒杀功能

本节将要创建的QuickBuyDemo项目中,一方面会用到上文提到的Lua脚本实现限流和秒杀的功能,另一方面将通过RabbitMQ消息队列实现异步保存秒杀结果的功能。

创建项目并编写配置文件

可以在IDEA集成开发环境中创建名为QuickBuyDemo的Maven项目,在该项目的pom.xml文件中通过如下关键代码引入所需要的依赖包:

org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-data-redis org.apache.httpcomponents httpclient 4.5.5 org.apache.httpcomponents httpcore 4.4.10

这里通过第2-5行代码引入了SpringBoot的依赖包,通过第6-9行代码引入了RabbitMQ消息队列相关的依赖包,通过第10-13行代码引入了Redis相关的依赖包,通过第14-23行代码引入了HTTP客户端相关的依赖包,在本项目中将通过HTTP客户端模拟客户请求,从而验证秒杀效果。

在本项目resources目录的application.properties配置文件中,将通过如下代码配置消息队列和Redis缓存:

rabbitmq.host=127.0.0.1rabbitmq.port=5672rabbitmq.username=guestrabbitmq.password=guestredis.host=localhostredis.port=6379

在该配置文件中,通过第1~4行代码配置了RabbitMQ的连接参数,通过第5行和第6行代码配置了Redis的连接参数。

编写启动类和控制器类

本项目的启动类如下,由于和大多数的Spring Boot项目启动类完全一致,因此不再重复讲述。

package prj;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class SpringBootApp { public static void main(String[] args) { SpringApplication.run(SpringBootApp.class, args); }}

本项目的控制器类代码如下,在该Controller控制器类的第11-25行代码中封装了实现秒杀服务的quickBuy方法,该方法是以quickBuy/{item}/{person}格式的URL请求对外提供服务的,其中item参数表示商品,而person参数则表示商品的购买人。

package prj.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import prj.receiver.BuyService;@RestControllerpublic class Controller { @Autowired private BuyService buyService; @RequestMapping("/quickBuy/{item}/{person}") public String quickBuy(@PathVariable String item, @PathVariable String person){ //20秒里限流100个请求 if(buyService.canVisit(item, 20,100)) { String result = buyService.buy(item, person); if (!result.equals("0")) { return person + " success"; } else { return person + " fail"; } } else{ return person + " fail"; } }}

在quickBuy方法中,首先通过第14行的buyService.canVisit方法对请求进行了限流操作,这里在20秒中只允许有100个请求访问,如果通过限流验证,那么会继续通过第15行的buyService.buy方法进行秒杀操作。注意,这里的实现限流和秒杀功能的代码都封装在第10行定义的BuyService类中。

消息队列的相关配置

在本项目的RabbitMQConfig类中将配置RabbitMQ的消息队列和消息交换机,具体代码如下:

package prj;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig{ //定义含主题的消息队列 @Bean public Queue objectQueue() { return new Queue("buyRecordQueue"); } //定义交换机 @Bean TopicExchange myExchange() { return new TopicExchange("myExchange"); } @Bean Binding bindingObjectQueue(Queue objectQueue,TopicExchange exchange) { return BindingBuilder.bind(objectQueue).to(exchange).with("buyRecordQueue"); }}

其中通过第9行的objectQueue方法创建了名为buyRecordQucue的消息队列,该消息队同将向用户传输秒杀的结果,通过第14行的myExchange方法创建了名为myExhnge的清息交换机,并通过第18行的bindingObjectQueue方法根据buyRecordQucue主题绑定了上述消息以列和消息交换机。

实现秒杀功能的Lua脚本

在本项目中,实现秒杀效果的Lua脚本代码如下:

local item = KEYS[1]local person = ARGV[1]local left = tonumber(redis.call('get',item))if (left>=1) thenredis.call ('decrby',item,1)redis.call ('rpush", 'personList',person)return 1else

在该脚本中,首先通过KEYS[1]参数传入待秒杀的商品,并赋予item对象,再通过ARGV[1]参数传入发起秒杀请求的用户,并赋子person对象。

随后在第3行中,通过get item命令从Redis缓存中获取该商品还有多少库存,再通过第4行的if语句进行判断。

如果发现该商品剩余的库存数量大于等于1,就会执行第5~7行的Lua脚本,先通过decrby命令把库存数减1,再调用rpush命令记录当前秒杀成功的用户,并通过第7行的return语句返回1,表示秒杀成功。如果发现库存数已经小于1,那么会直接通过第9行的语句返且0,表示秒杀失败。

在业务实现类中实现限流和秒杀

在BuyService.java中,将调用Redis和Lua脚本实现限流和秒杀的功能,具体代码如下:

package prj.receiver;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.connection.ReturnType;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisscript;import org.springframework.stereotype.Service;import prj.model.buyrecord;import javax.annotation.Resource;import java.util.concurrent.TimeUnit;@Servicepublic class BuyService { @Resource private RedisTemplate redisTemplate; @Autowired private AmqpTemplate amqpTemplate; public boolean canVisit(String item, int limitTime, int limitNum) { long curTime = System.currentTimeMillis(); // 在zset里存入请求 redisTemplate.opsForZSet().add(item, curTime, curTime); // 移除时间范围外的请求 redisTemplate.opsForZSet().removeRangeByScore(item,0,curTime - limitTime * 1000); // 统计时间范围内的请求个数 Long count = redisTemplate.opsForZSet().zCard(item); // 统一设置所有请求的超时时间 redisTemplate.expire(item, limitTime, TimeUnit.SECONDS); return limitNum >= count; } public String buy(String item, String person){ String luascript = "local person = ARGV[1]n" + "local item = KEYS[1] n" + "local left = tonumber(redis.call('get',item)) n" + "if (left >= 1) n" + "then redis.call('decrby',item,1) n" + " redis.call('rpush','personList',person) n" + "return 1 n" + "else n" + "return 0n" + "endn" + "n" ; String key=item; String args=person; DefaultRedisscript redisscript = new DefaultRedisscript(); redisscript.setscriptText(luascript); //调用lua脚本,请注意传入的参数 Object luaResult = redisTemplate.execute((RedisConnection connection) -> connection.eval( redisscript.getscriptAsString().getBytes(), ReturnType.INTEGER, 1, key.getBytes(), args.getBytes())); //如果秒杀成功,向消息队列发消息,异步插入到数据库 if(!luaResult.equals("0") ){ buyrecord record = new buyrecord(); record.setItem(item); record.setPerson(person); amqpTemplate.convertAndSend("myExchange","buyRecordQueue",record); } //根据lua脚本的执行情况返回结果 return luaResult.toString(); }}

在上述代码中,首先通过第2-11行的import语句引入了本类所要用到的依赖包,随后在第15行中定义了调用Redis会用到的redisTemplate对象,在第17行中定义了向RabbitMQ消息队列发送消息所要用到的amqpTemplate对象。

第18行的canVisit方法实现了限流效果,该方法的item参数表示待限流的商品,limitTime和LimitNum参数分别表示在指定时间内需要限流的请求个数。

在该方法中使用Redis的有序集合实现了限流效果,具体的做法是,在第21行的代码中,通过zadd方法把表示操作类型的item作为键插入有序集合,插入时用表示当前时间的curTime作为值,以保证值的唯一性,同样再用curTime值作为有序集合中元素的score值。

随后在第23行中,通过removeRangeByScore命令移除从0到距当前时间limitTime范围内的数据,比如限流的时间范围是20秒,那么通过这条命令就能在有序集合中移除score范围从0到距离当前时间20秒的数据,从而确保有序集合只保存最近20秒内的请求。

在此基础上,通过第25行代码用zcard命令统计有序集合内键为item的个数,如果通过第28行的布尔语句发现当前个数还没达到限流的上限,该方法就会返回true,表示该请求能继续,否则返回false,表示该请求将会被限流。

同时,需要通过第27行的expire语句设置有序集合中数据的超时时间,这样就能确保在限流以及秒杀动作完成后这些键能自动删除。

第30行定义的buy方法将会实现秒杀的功能,其中先通过第31~41行代码定义实现秒杀功能的Lua脚本,该脚本之前分析过,随后再通过第47一52行代码使用redisTemplate.execute方法执行这段Lua脚本。

在执行时,会通过第50行代码指定KEYS类型参数的个数,通过第51行和第52行代码传入该脚本执行时所需要用到的KEYS和ARGVS参数。

随后会通过第54行的f语句判断秒杀脚本的执行结果,如果秒杀成功,那么会通过第55~58行代码用amqpTemplate对象向buyRecordQueue队列发送包含秒杀结果的record对象。最后,再通过第61行的语句返回秒杀的结果。

观察秒杀效果

至此,可以通过如下步骤启动Redis、RabbitMQ和QuickBuyDemo项目,并观察秒杀效果。

在命令行中通过rabbitmq-server.bat start命令启动RabbitMQ。通过运行redis-server.exe启动Redis服务器,并通过运行redis-cli.exe启动Redis客户端,随后在Redis客户端通过set Computer 10命令向Redis中缓存一条库存数据,表示有10个Computer可供秒杀。在QuickBuyDemo项目中,通过运行SpringBootApp.java启动类启动该项目。成功启动后,在浏览器中输入http:localhost:8080/quickBuy/Computer/Tom发起秒杀请求,其中Computer参数表示秒杀的商品,而Tom则表示发起秒杀请求的人。

输入后,能在浏览器中看到Tom success的结果,随后到Redis客户端窗口运行get Computer命令,能看到Computer的库存数量会降到9,由此可以确认秒杀成功。同时,可以通过lindex personList 0命令观察到成功发起秒杀请求的人是Tom。

四、以异步方式保存秒杀结果

如果在上述QuickBuyDemo项目中直接把秒杀结果插入MySQL数据库,那么当秒杀请求并发量很高时会对数据库造成很大的压力,所以在该项目中会通过消息队列把秒杀结果传输到DBHandlerPrj项目中,用异步的方式保存数据,从而降低数据库的负载压力。

创建项目并设计数据库

首先需要创建名为DBHandlerPrj的Maven项目,在其中实现异步保存秒杀数据的功能,该项目的pom.xml文件如下,其中通过第2-5行代码引入了Spring Boot依赖包,通过第6-9行代码引入了RabbitMO消息队列的依赖包,通过第10~18行代码引入了JPA和MySQL的依赖包。

org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.springframework.boot spring-boot-starter-data-jpa

本项目将会用到如表所示的buyrecord表,该表是创建在本地MySQL的QuickBuy数据表(schema)中的,在其中将会保存秒杀结果。

字段名类型说明item字符串秒杀成功的商品名person字符串秒杀成功的用户

而本项目的启动类SpringBootAppjava和QuickBuyDemo项目中的完全一致,所以不再重复说明。

配置消息队列和数据库参数

在本项目resources目录的application.yml文件中,将通过如下代码配置消息队列和数据库连接参数。

server: port: 8090rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guestspring: jpa: show-sql: true hibernate: dll-auto: validate datasource: url: jdbc:mysql://localhost:3306/QuickBuy?serverTimezone=GMT username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver

由于之前的QuickBuyDemo项目已经占用了8080端口,因此本配置文件将通过第1行和第2行代码设置工作端口为8090。随后,本配置文件将通过第3~7行代码设置RabbiMQ消息队列的连接参数,具体是连接到本地5672端口,且连接所用的用户名和密码都是guest。

由于本项目是通过JPA的方式连接MySQL库的,因此本配置文件通过第8-12行代码配置了JPA的参数,通过第13-17行代码配置了MySQL的连接参数。

此外,和QuickBuyDemo项目一样,本项目依然是在RabbitMQConfg.java配置文件中设置RabbitMQ消息队列和交换机,具体代码如下,其中配置的消息队列名字buyRecordQueue与交换机的名字myExchange需要和QuickBuyDemo项目中的定义保持一致。

package prj;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig{ //定义含主题的消息队列 @Bean public Queue objectQueue() { return new Queue("buyRecordQueue"); } //定义交换机 @Bean TopicExchange myExchange() { return new TopicExchange("myExchange"); } @Bean Binding bindingObjectQueue(Queue objectQueue,TopicExchange exchange) { return BindingBuilder.bind(objectQueue).to(exchange).with("buyRecordQueue"); }}

监听消息队列并保存秒杀结果

在本项目的QuickBuySevivce.java文件中将会监听buyRecordQueue消息队列,并把秒杀结果存入MySOL数据表,具体代码如下:

package prj.service;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import prj.model.buyrecord;import prj.repo.BuyRecordRepo;@Component@RabbitListener(queues = "buyRecordQueue")public class QuickBuyService { @Autowired private AmqpTemplate amqpTemplate; @Autowired private BuyRecordRepo buyRecordRepo; @RabbitHandler public void saveBuyRecord(buyrecord record){ buyRecordRepo.save(record); }}

在本类的第10行通过@RabbitListener注解说明将要监听buyRecordQueue消息队列,当该消息队列有消息时,会触发本类第17行的saveBuyRecord方法,该方法被第16行的@RabbitHandler注解所修饰。在该方法中会调用JPA类buyRecordRepo的save方法向数据表中保存秒杀结果。

QuickBuyServce类中用到的模型类buyrecord和QuickBuyDemo项目中的很相似,由于该类需要通过消息队列在网络中传输,因此需要像第9行那样实现Serializable接口。

package prj.model;import java.io.Serializable;import javax.persistence.Column;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.Table;@Entity@Table(name="buyrecord")public class buyrecord implements Serializable { @Id @Column(name = "person") private String person; @Column(name = "item") private String item; public void setItem(String item) { this.item = item; } public void setPerson(String person) { this.person = person; } public String getItem() { return item; } public String getPerson() { return person; }}

全链路效果演示

开发好上述两个项目以后,可以用对如下步骤观察全链路的秒杀效果:

启动RabbitMQ、Redis服务器和客户端,通过set Computer 10命令缓存秒杀商品的数量,同时通过运行启动类启动QuickBuyDemo项目。

启动DBHandlerPrj项目

在QuickBuyDemo项日中开发如下的QuickBuyThread.java文件,在其中用多线程的方式模拟多个秒杀情求,代码如下:

package prj.client;import org.springframework.http.ResponseEntity;import org.springframework.web.client.RestTemplate;class QuickBuyThread extends Thread{ public void run() { RestTemplate restTemplate = new RestTemplate(); String user = Thread.currentThread().getName(); ResponseEntity entity = restTemplate. getForEntity("http://localhost:8080/quickBuy/Computer/"+user , String.class); System.out.println(entity.getBody()); }}public class MockQuickBuy { public static void main(String[] args){ for (int i = 0; i < 15; i++) { new QuickBuyThread().start(); } }}

第4行定义的QuickBuyThread类以继承Thread类的方式实现了线程的效果,在第5行线程的run方法中用restTemplate.getForEntity方法模拟发送了秒杀的请求,其中用当前线程的名字作为发起秒杀的用户。

public class MockQuickBuy { public static void main(String[] args){ for (int i = 0; i < 15; i++) { new QuickBuyThread().start(); } }}

在第12行MockQuickBuy类的main方法中,通过第14行的for循环启动了15个线程发起秒杀请求。由于之前在Redis缓存中设置的Computer商品数量是10个,因此会有10个请求秒杀成功。5个请求不成功。如下输出语句能确认这一结果。

此外,如果再到 MySQL数据库用select from QuickBuy.buyrecord语句观察秒杀结果,能看到成功秒杀的用户,这些用户名和上述输出结果中的用户名完全一致。


本文来自于《Spring Boot+Vue.js+分布式组件全栈开发训练营(视频教学版)》第17章

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。