欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

消息驱动--SpringCloudStream

时间:2023-04-18

1 产生的原因

现在一个很项目可能分为三部分:
前端—>后端---->大数据
而后端开发使用消息中间件,可能会使用RabbitMq
而大数据开发,一般都是使用Kafka,
那么一个项目中有多个消息中间件,并且这两个消息中间件的架构上也有所不同,像RabbitMq有exchange ,Kafka有Topic和Partitions分区,对程序员很不友好,所以产生Spring Cloud Stream

而Spring Cloud Stream就类似jpa,屏蔽底层消息中间件的差异,程序员主要操作Spring Cloud Stream即可,降低切换成本,统一消息编程模型

2 什么是Spring Cloud Stream
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。
它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

消息中间件有:ActiveMQ、RabbitMQ、RocketMQ、Kafka
但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

3 Spring Cloud Stream怎么屏蔽底层差异


应用程序通过inputs(生产者)和outputs(消费者)来与Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
就是利用定义绑定器Binder作为中间层,实现了隔离

4 Spring Cloud Stream的 通信模式

Spring Cloud Stream 中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。

5 Spring Cloud Stream的业务流程

Source和Sink:
简单理解操作对象就是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
source用于获取数据(要发送到mq的数据)

Channel :
通道,是队列的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。
channel类似SpringCloudStream中的中间件,用于存放source接收到的数据,或者是存放binder拉取的数据

6 Spring Cloud Stream 常用注解和api

7 使用Spring Cloud Stream
需要创建三个项目,一个生产者,两个消费者

7.1)创建生产者
7.1.1)pom

org.springframework.cloud spring-cloud-starter-stream-rabbit

7.1.2) 配置文件

server: port: 8801spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

7.1.3) 主启动

@SpringBootApplication@EnableEurekaClientpublic class CloudStreamRabbitmqProvider8801Application { public static void main(String[] args) { SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args); System.out.println("启动成功"); }}

7.1.4) 业务类
serice接口和实现类

public interface IMessageProviderService { String send();}//@EnableBinding(Source.class) 定义消息的推送管道(生产消息) 将Channel和Exchanges绑定在一起@EnableBinding(Source.class)public class MessageProviderServiceImpl implements IMessageProviderService { @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); Message stringMessage = MessageBuilder.withPayload(serial).build();//build会构建一个message类 output.send(stringMessage); System.out.println("*****serial: " + serial); return serial; }}

controller类

@RestControllerpublic class SendMessageController { @Resource private IMessageProviderService messageProviderService; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProviderService.send(); }}

这样调用send方法,将消息发送给channel,然后channel将消费发送给binder,然后发送到rabbitmq中,会在rabbitmq中创建一个Exchange,就是我们配置文件中配置的exchange

7.2)创建消费者1号
7.2.1) pom

org.springframework.cloud spring-cloud-starter-stream-rabbit

7.2.2) 配置文件

server: port: 8802spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

7.2.3) 主启动

@SpringBootApplication@EnableEurekaClientpublic class CloudStreamRabbitmqConsumer8802Application { public static void main(String[] args) { SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args); System.out.println("启动成功"); }}

7.2.4) 业务类

//@EnableBinding(Sink.class) 负责接收channel发送过来的数据进行消费@Component@EnableBinding(Sink.class)public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) //监听sink的input,而input在配置文件中配置并且绑定了exchange,获取数据 public void input(Message message) { System.out.println("port:" + serverPort + "t接受:" + message.getPayload()); }}

7.3)创建消费者2号

当生产者发送消息的时候,可以看到消费者1号和1号都同时消费了同一条消息,产生了重复消费

8 解决重复消费问题

利用Stream中的消息分组解决:
在Stream中同一个group中是竞争关系,保证消息只被一个应用消费一次
不同组可以全面消费(重复消费),Stream默认是不同组的

更改消费者1号和2号的配置文件

bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: zuB

9 持久化消费

就是当服务挂了,怎么消费没有消费的数据

这里,先将消费者1号移除zuB组,
然后将消费者1号和2号服务关闭
此时生产者开启,发送3条消息
此时重启消费者1号和2号
可以看到,当消费者1号退出zuB组后,它就获取不到在它宕机的时间段内的数据
但是消费者2号重启后,直接获取到了宕机期间它没有消费的数据,并且消费了
总结:
也就是,当我们没有配置分组时,会出现消息漏消费的问题
而配置分组后,我们可以自动获取未消费的数据

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。