欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

rabbitmq上云实践

时间:2023-05-03

    由于线上rabbitmq集群出现两次问题,一次由于阿里云机器本身宕机,阿里云自动在另一台机器上启动新的rabbitmq服务,导致新的服务中没有对应的队列等等,从而无法进行正常运转;另一次是因为小伙伴的不正确的编码,导致生产者产生的消息瞬间过多压垮了rabbitmq服务,基于两次问题,决定对rabbitmq进行上云尝试,希望依托阿里云来让rabbitmq的服务更加强劲。

上云方案

    为了实现无感知上云,设计了一套上云流程:

让一台机器的生产者和消费者都连接云上rabbitmq集群,确认云上集群能正常运转。

修改一台机器生产者连接云上,消费者连接云下,让这台机器保持到最后,作为殿后机器,保证云下有消费者消费最后剩余的消息。具体修改如下,增加一个新的mq连接配置,让消费者使用老得连接配置,生产者使用新的连接配置

@Configurationclass NewRabbitMqConfig { @Bean("newConnectionFactory") fun newConnectionFactory(@Value("%{spring.new-rabbitmq.addresses}") host: String, @Value("%{spring.new-rabbitmq.username}") username: String, @Value("%{spring.new-rabbitmq.password}") password: String): CachingConnectionFactory { val factory = CachingConnectionFactory() factory.setAddresses(host) factory.username = username factory.setPassword(password) return factory } @Bean fun messageConvert(): MessageConverter { val objectMapper = ObjectMapper().registerModules(JavaTimeModule(), KotlinModule(), JodaModule(), Jdk8Module()) val messageConvert = ContentTypeDelegatingMessageConverter(Jackson2JsonMessageConverter(objectMapper)) messageConvert.addDelegate("application/x-java-serialized-object", SimpleMessageConverter()) return messageConvert } @Bean("newRabbitTemplate") fun newRabbitTemplate(@Qualifier("newConnectionFactory")newConnectionFactory: CachingConnectionFactory,messageConvert: MessageConverter): RabbitTemplate { val rabbitTemplate = RabbitTemplate(newConnectionFactory) rabbitTemplate.messageConverter = messageConvert return rabbitTemplate } @Bean("connectionFactory") fun connectionFactory(@Value("%{spring.rabbitmq.addresses}") host: String, @Value("%{spring.rabbitmq.username}") username: String, @Value("%{spring.rabbitmq.password}") password: String): CachingConnectionFactory { val factory = CachingConnectionFactory() factory.setAddresses(host) factory.username = username factory.setPassword(password) return factory } @Bean("rabbitTemplate") fun rabbitTemplate(@Qualifier("connectionFactory")connectionFactory: CachingConnectionFactory,messageConvert: MessageConverter): RabbitTemplate { val rabbitTemplate = RabbitTemplate(connectionFactory) rabbitTemplate.messageConverter = messageConvert return rabbitTemplate } @Bean("simpleRabbitListenerContainerFactory") fun simpleRabbitListenerContainerFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer, @Qualifier("connectionFactory")connectionFactory: CachingConnectionFactory ): SimpleRabbitListenerContainerFactory { val listenerFactory = SimpleRabbitListenerContainerFactory() configurer.configure(listenerFactory, connectionFactory) return listenerFactory } @Bean("rabbitAdmin") fun newRabbitAdmin(@Qualifier("connectionFactory")connectionFactory: CachingConnectionFactory): RabbitAdmin { return RabbitAdmin(connectionFactory) }}

使用这个新的rabbit连接配置来发送rabbitmq消息:

import io.zhudy.roar.admin.props.ServerPropsimport io.zhudy.roar.admin.rabbitmq.message.*import org.slf4j.LoggerFactoryimport org.springframework.amqp.rabbit.core.RabbitTemplateimport org.springframework.beans.factory.annotation.Qualifierimport org.springframework.stereotype.Component@Componentclass RabbitMQHelper( @Qualifier("rabbitTemplate")private val rabbitTemplate: RabbitTemplate, @Qualifier("newRabbitTemplate")private val newRabbitTemplate: RabbitTemplate, private val serverProps: ServerProps) { private val logger = LoggerFactory.getLogger(RabbitMQHelper::class.java) fun welcomeUserMessage(message: WelcomeUserMessage) { try { newRabbitTemplate.convertAndSend( "user", "user.welcome.${serverProps.env.name}", message ) } catch (e: Exception) { logger.error("发布打招呼消息失败!$message", e) } }}

陆续迁移其他机器,将他们的生产者和消费者都连接到云上

等云下集群所有的消息都被殿后机器消费后,将殿后机器的消费者连接到云上去(需要注意延时消息是否消费完,可以通过rabbitmq控台的channel中去查看剩余多少没有被消费,另外根据业务场景判断殿后机器到底需要等待多久)

出现的问题

整个上云过程还是蛮顺利的,可是上完之后发现问题了,主要两点:

阿里云的rabbitmq的延时消息,最大延时时常只有1天,而我们的业务场景中存在超过一天延时消息使用场景,如果超过一天,阿里云的rabbitmq消息就会被立马消费掉,当然我们可以基于一些方式解决这个问题,但是需要编码测试发版,也需要较长的时间。阿里云的错误消息重试机制,是存在最大重试次数,超过N次重试,mq就会抛弃消息,这种丢消息的情况对我们现有系统是不能接受的,当然网上也存在合理的解决方案,就是给每个消息创建死信队列,超过最大重试次数后,放入死信队列,等待认为介入,但是对于我们来说给改造代价较大。 结果

    最后基于评估,我们决定还是对自己原本的rabbitmq集群进行软硬件升级后,再重新放回到云下运行,整个下云流程和上云流程是一样的,将云下集群和云上集群反过来就好了。

额外总结

    涉及到延时队列,我们总有场景需要到长延时的使用情况,比如一个优惠券需要几天过期,一个会员卡可能几个月过期,而我们的延时队列就算不像阿里云只有1天的最大值,其实也是有本身的最大值的,即integer有符号整数的最大值代表的毫秒数,大约二十多天。那对于长延时场景如何处理呢,总结下来一般两种方式:

延时队列接力,即当队列进行消费时查看时间是否到了实际需要执行的时间,如果没有的话,重复发一个当前消息的延时消息,这样循环知道最后到了指定消费时间再消费。这种方式的优点是使用的东西相对单一,完全基于mq,编码较容易。缺点是如果消息数很多,就会导致mq集群上堆积过多没有消费的消息(这可能也是为啥阿里云的mq直允许最长1天的延时的原因之一),还有一个缺点就是,如果你需要迁移mq,比如我们这次的上云,对于已经堆积在mq上的长延时消息,你无法处理,只能等他自己到时间被消费。配合不频繁的轮训脚本。将数据落库,等到了接近执行的时间,再由轮训脚本,基于目标时间和当前脚本时间塞到延时队列中,等到了指定时间,由mq消费者进行消费。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。