欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SpringCloudDay08--消息驱动

时间:2023-07-28
文章目录

11、SpringCloud Stream 消息驱动

11.1 消息驱动概述

11.1.1 是什么?11.1.2 标准MQ11.1.3 为什么用Cloud Stream?11.1.4 Stream中的消息通信方式遵循了发布-订阅模式11.1.5 Spring Cloud Stream标准流程套路11.1.6 编码API和常用注解 11.2 案例说明11.3 消息驱动之生产者11.4 消息驱动之消费者11.5 分组消费与持久化

11.5.1 提出问题11.5.2 分析问题11.5.3 分组案例11.5.4 持久化案例 11、SpringCloud Stream 消息驱动 11.1 消息驱动概述 11.1.1 是什么?

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka。

一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

官网参考文档:

https://spring.io/projects/spring-cloud-stream#overview

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/Spring Cloud Stream中文指导手册: https://m.wang1314.com/doc/webapp/topic/20971999.html 11.1.2 标准MQ

生产者/消费者之间靠消息媒介传递信息内容==>Message消息必须走特定的通道==>消息通道MessageChannel消息通道里的消息如何被消费呢,谁负责收发处理==>消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅 11.1.3 为什么用Cloud Stream?

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,
像RabbitMQ有exchange,kafka有Topic和Partitions分区

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

1.stream凭什么可以统一底层差异?

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,
由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

2.Binder

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

通过定义绑定器Binder作为中间

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

11.1.4 Stream中的消息通信方式遵循了发布-订阅模式

Topic主题进行广播

在RabbitMQ就是Exchange在Kakfa中就是Topic 11.1.5 Spring Cloud Stream标准流程套路

Binder:很方便的连接中间件,屏蔽差异Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。 11.1.6 编码API和常用注解 11.2 案例说明

RabbitMQ环境已经OK

工程中新建三个子模块

cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块cloud-stream-rabbitmq-consumer8802,作为消息接收模块cloud-stream-rabbitmq-consumer8803 作为消息接收模块 11.3 消息驱动之生产者

建Module—cloud-stream-rabbitmq-provider8801POM

org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test

YML

server: port: 8801spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.168.174.128 port: 5672 username: admin password: admin bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit #设置要绑定的消息服务的具体设置eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7004.com:7004/eureka # 集群版 instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

主启动类

@SpringBootApplicationpublic class StreamMQMain8801{ public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class,args); }}

业务类

发送消息接口:

public interface IMessageProvider{ public String send() ;}

发送消息接口实现类:

@EnableBinding(Source.class)//可以理解为是一个消息的发送管道的定义@Slf4jpublic class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output;//消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); this.output.send(MessageBuilder.withPayload(serial).build());//创建并发送消息 log.info("*****serial:{}",serial); return serial; }}

controller

@RestControllerpublic class SendMessageController{ @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); }}

测试

1.启动Eureka7001,7002,7004;

2.启动RabbitMQ

3.启动8801

可以看到exchange中定义的交换机.

4.访问:http://localhost:8801/sendMessage

11.4 消息驱动之消费者

建Module—cloud-stream-rabbitmq-consumer8802POM

org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-netflix-eureka-client org.springframework.cloud spring-cloud-starter-stream-rabbit org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test

YML

server: port: 8802spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: 192.168.174.128 port: 5672 username: admin password: admin bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit #设置要绑定的消息服务的具体设置eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7004.com:7004/eureka # 集群版 instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

主启动类:

@SpringBootApplicationpublic class StreamMQMain8802{ public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class,args); }}

业务类

@Component@EnableBinding(Sink.class)public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message message){ System.out.println("消费者1号,------------>接收到的消息: "+message.getPayload()+"t port: "+serverPort); }}

测试8801发送8802接收消息

访问:http://localhost:8801/sendMessage

11.5 分组消费与持久化 11.5.1 提出问题

依照8802,clone出来一份运行8803

启动Eureka7001,7002,7004、provider8001、consumer8002,8003

运行后存在问题

1.有重复消费问题

2.消息持久化问题

11.5.2 分析问题

目前是8802/8803同时都收到了,存在重复消费问题

如何解决?—分组和持久化属性group

生产实际案例

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

11.5.3 分组案例

1.原理

微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

2.代码演示

8802/8803都变成不同组,group两个不同

8802修改YML

8803修改YML

我们自己配置:

分布式微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例,本例启动了两个消费微服务(8802/8803)

多数情况,生产者发送消息给某个具体微服务时只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。

结论:还是重复消费

8802/8803都变成相同组,group两个相同

8802/8803实现了轮询分组,每次只有一个消费者.8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费

8802修改YML,修改组属性为:group: atrjxyA

8803修改YML,修改组属性为:group: atrjxyA

结论:同一个组的多个微服务实例,每次只会有一个拿到

11.5.4 持久化案例

停止8802/8803并去除掉8802的分组group: atrjxyA,8803的分组group: atguiguA没有去掉8801先发送4条消息到rabbitmq先启动8802,无分组属性配置,后台没有打出来消息

再启动8803,有分组属性配置,后台打出来了MQ上的消息

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。