欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

为啥要造轮子?redis消息队列

时间:2023-05-05

我在设计游戏后端框架时,搜索各种有关消息队列的文章,90%都有提到使用redis构建消息队列,却都一致写的不推荐,推荐做法都是使用市面上成熟的消息队列产品。我就纳闷了,为啥不推荐用redis构建消息队列,却都要提一嘴呢?为啥关于redis构建消息队列都没提什么细节?redis构建消息队列究竟适不适合?这里根据自己造轮子的经验,分享足量细节,尝试解答这些问题。

为啥要造轮子?各种MQ产品它们不香吗?

不是不香,而是太沉重了,任何一款成熟的MQ产品,比如RocketMQ, Kafka等,都是重量级组件,需要部署多台服务器多个服务。对于游戏后端,如果本身只是一个轻量级框架,附带这么一个组件,依赖沉重,徒增服务器成本,维护成本,踩坑成本。思量再三,决定自己使用redis造一个轮子,实现一个框架专用的轻量级消息队列,不仅依赖少,扩展方便,还易于维护。当然,自己造轮子劣势在于,相比成熟产品需要牺牲一些东西,而且实现难度较大,质量不好的话很容易造成数据丢失或者数据错乱等大问题。这些劣势同时也是我的挑战,我选择硬刚!

设计方案

游戏业务中,使用消息队列最重要的目的就是,异步处理数据落地,提高性能,降低处理延迟,提供玩家流畅的游戏体验。以下将在消息队列的实现层面对各个方面的选择与考量进行说明。

数据结构

我把一条消息定义为:玩家行为引发的对应数据库中一行数据更改。消息的value为这一行数据,消息的key为这行数据的唯一标识。

举例说明

玩家某次请求导致几个道具数量更新,游戏中的业务为生产者,生产的消息如下

数据key数据value(json格式)D:items:1001:12{“pid”:1001, “cid”:12, “quantity”:6}D:items:1001:13{“pid”:1001, “cid”:13, “quantity”:8}D:items:1001:14{“pid”:1001, “cid”:14, “quantity”:7}

pid为玩家id,cid为配置表id,quantity为数量。

在redis中存储消息的方式

消息队列数据结构选用Redis中的List,底层一般情况下为双向链表结构,在数据量小时为ziplist结构。
我将消息队列命名为Q:G,在Q:G这个List中存储的只是消息的key。

消息的value通过redis的SET命令存储为string类型。

一条消息放入redis的命令步骤为:

SET 消息key 消息valueLPUSH D:G 消息key

当有新的消息投递时,如果消息key和之前的重复,那么消息value会被覆盖,并且消息队列中有多条相同的消息。这样做的目的是将消息value保持最新。
只将消息key放入LIST数据结构,并将消息value保持最新,这样设计有一个非常大的好处,在消息消费时,不需要按顺序消费,不需要保证顺序,就可以很愉快的并发消费了。

数据结构选型的问题 为什么不使用redis的Streams数据结构?
主要是业务消息并不需要多方消费,只需要一个消费者组,Streams这样复杂的数据结构并不轻量。List在数据量小时, 底层为 ziplist结构,是否会因为数据结构频繁在ziplist和双向链表之间切换影响性能?
底层数据结构切换时,肯定会影响性能,但是这个影响我认为是微乎其微的,数据量小的情况下,切换很快,并且消息队列压力也不大。还可以在redis的配置中指定 ziplist的相关配置。消息value被覆盖之后,是否会出现,在消息消费完成后的ack环节,同步了旧的value而将最新的删掉?
并不会误删数据,具体做法在消费环节讨论。 消息队列 生产和消费的原子性

一次请求会产生多条消息,原子性的保证需要多条消息要么一起成功,要么一起失败。我这里使用redis的事务机制来一定程度保证原子性。redis事务使用MULTI与EXEC命令,中间包含多组更新命令。将一次事务的所有命令使用pipeline机制打包发送,不仅能提高效率,还能防止一次事务中被插入其他命令。有的同学可能会问,生产消息为什么不用lua脚本?lua脚本也能保证原子性,但是对于更新命令比较复杂的情况,使用MULTI与EXEC命令更加的灵活。在消费阶段,因为逻辑比较固定,使用的lua脚本方式保证原子性。

业务节点内存数据 与 Redis中数据 一致性

玩家请求产生的数据更改,需要在先在游戏业务节点的内存中更新,并且推送到消息队列。如何保证两份数据的一致性?

先来看一下生产消息的步骤:

推送成功的情况

推送失败的情况

这里与RocketMQ事务消息的两阶段提交不同,也没有半消息。先在内存中做数据更改,推送消息到redis成功后直接返回客户端成功状态,这种情况下直接满足数据一致。重点是推送消息错误或者超时的情况,如果发生错误或者超时,会返回客户端相应的错误码,并最终清理内存数据。遇到错误的情况,基本上是程序代码问题,开发阶段可以检查并避免;对于超时的情况,有可能更新成功,有可能更新失败,清理内存数据后,数据以Redis中的为准。下一次玩家登录后有可能之前的操作成功,也有可能失败,这对于玩家来说是可以接受的,即使失败也没有损失。
所以,这里并不保证一致性,而是以Redis中的数据为准。这种方式省去了消息重投,半消息确认等复杂机制。

如何保证不丢消息?

如何保证消息在redis中不丢失?

生产阶段
在推送消息的步骤,采用的是快速失败的原则,不进行重试,并接受超时的随机结果,这对应的是At most once模式,最多成功投递一次。没有投递成功对应的结果是玩家此次操作失败,可以算作没丢失消息。存储阶段
因redis的主从同步机制为异步复制,所以不可避免的在redis宕机时,主从切换后会丢失一部分数据。这部分数据的丢失也会破坏生产阶段的原子性。这时候如何处理?
我的方案是:不马上处理,让数据丢失!有的同学会说,你这不扯淡呢,数据都丢了,你不处理玩家不炸锅了。。。
首先,Redis宕机概率不大,并不会经常发生,其次,当消息消费速度大于生产速度时,消息存量很少,丢失的数量也比较少,最后,还有相应的消息记录,用于后续的数据丢失处理。消息记录
在生产阶段推送消息前,先将消息暂存到业务节点内存,类似于WAL(Write Ahead Log)机制,业务节点存储一定时间内(主从同步延迟,我设置的30秒)的最新消息记录,当Redis宕机后,主从切换过程中,业务节点检测到错误或者发送超时,此时将目前的所有消息记录存储到本地磁盘,用于后续查询和数据恢复。数据恢复
找一个干净的redis,把消息记录中的所有消息重新投递一遍,进行发送方的消息回溯,得到主从延迟可能丢失的数据的正确版本。当然,这时候服务器正常运转又产生了新的数据,很可能这些数据不是最新的了。所以拿到这些数据,需要去和道具相关的日志,数据库中的数据进行比对,寻找错漏,来确定丢失数据从哪一条消息记录开始,逐个排查之后的所有消息。排查完毕后对所有受影响的玩家进行补偿和数据恢复。这是一个非常麻烦的过程。也可以配合玩家反馈进行特定玩家的数据恢复。
以上数据恢复很繁琐,暂时没有想到更好的处理方案,欢迎提出改进意见。升级版本的数据恢复
对每一条消息指定一个自增长的唯一id,redis故障后,可以通过在redis主从切换成功后,在新的主机生效前,将消息记录中丢失的记录在新的主机重放一遍,这样切换成功后就丢失消息。但是需要修改redis代码,内置从机升级主机后的消息重放接口,并且自己部署redis集群。该方案目前未实现!!!消费阶段
消息消费时,并不把消息直接pop出来,而是使用rpoplpush将消息key放入另一个正在执行的队列中,防止因消费节点挂掉导致丢消息。 并且在消费时实现了一个CAS乐观锁,防止同步的是旧数据,却删了新数据。 消息消费流程

消息消费过程中的每一步操作都需要保证原子性,因此每一步都是一个lua脚本。没有消费者组的概念,所有消费者都在一个消费者组中,消费者为多个,共同消费全局消息队列D:G中的消息。
大致的消费流程如下

和所有的消息队列一样,都是取消息-> 消费 -> ack的整个流程。流程图中的全局消息队列指的就是所有待处理的消息的消息队列 D:G

取消息的流程
取消息的同时,将消息的key重命名为_key,用于区分是否为正在处理的消息,当重命名失败时,表示有其他消费的worker正在处理此条消息,此时发生乐观锁冲突,直接将消息投递回原处。此乐观锁设计基于业务上重复消息的概率很低,冲突概率很低。

local key_inbox = KEYS[1]local key_current = KEYS[2]local key = redis.call('rpoplpush',key_inbox,key_current)if not key then returnendlocal result = redis.call('get',key)if result then local suc = redis.call('renamenx',key,string.format('_%s',key)) if suc == 0 then redis.call('rpoplpush',key_current,key_inbox) else return {key,result} endelse redis.call('ltrim',key_current,1,-1)endreturn {key}

消费失败的处理流程
消费失败后,需要将key放入死信队列,并将_key重命名回key, 如果此时重命名失败,表示有新的数据生产出来,发生了冲突,直接可以删除旧的key。

local _key = string.format('_%s',KEYS[1])local result = redis.call('get',_key)if result then local suc = redis.call('renamenx',_key,KEYS[1]) if suc == 0 then redis.call("del",_key) endendif KEYS[2] and KEYS[3] then redis.call('rpoplpush',KEYS[2],KEYS[3])end

ack消息的流程
ack消息主要是删除当前消息队列中的元素,并删除旧的重命名后的_key

redis.call('del',string.format('_%s',KEYS[2]))local result = redis.call('get',KEYS[2])if not result then redis.call('srem',KEYS[1],KEYS[2])endredis.call('ltrim',KEYS[3],1,-1)

消费环节的每个步骤都使用lua脚本方式保证操作原子性。

如何保证消息消费的幂等性?

重复消息有两种情况

数据一份最新,第一条消息消费成功后删除,之后的重复消息,都取不到数据,直接删除。这里重复消息只消费了一次数据在同步的过程中发生更新,有多份新老数据,重复消息每次都对应不同版本数据,最后一个消息消费完成后,数据库中也是最新的。
以上两种情况下,重复的消息都能妥善的得到处理,满足幂等性的要求。 如何确保消息ack有效?

消息无论ack失败的情况,并没有做特殊处理,所以并不能保证ack一定有效。ack步骤未执行的结果是,Redis中依然保留了最新数据,没有删除,对玩家的数据一致性并没有影响。消费worker当前的队列中也没有删除对应的消息。目前采用的方案是:只在消费worker重启时检查一遍当前队列,并将消息重新投递。

玩家从redis中获取最新数据

玩家获取最新数据时,redis中有一部分最新的未同步的数据,mysql中也有数据。需要先从redis中获取数据,然后再从mysql中拉取数据,并将其合并,合并原则为,如果redis有,则使用redis中的数据。

流量控制

目前没有做流量控制功能,全力进行消息同步。后续可以考虑在取消息步骤监测时长,如果取消息耗时过长,可能redis负荷过大,需要延缓同步速度。

消息累积导致redis内存满了如何处理?

当消息队列的消费速度小于生产速度,就会发生消息累积,消息累积到一定程度,redis内存满了就会发生swap,造成卡死。目前的方案是: 监控消息队列的存量,及时报警,并在游戏业务端限流。

部署redis时的注意点

一定不能使用redis分片集群,因为lua脚本中会先从消息队列中取key,再去取value,如果key对应的value在另一个分片上,就取不到了,消息队列也就没有办法工作。

性能

生产消费环节都介绍完毕,这里提一下性能,初步测试了一下,单机redis,同时生产和消费(消费环节无逻辑)情况下,在3万左右。一个redis相当于RocketMQ中的一个broker。

如何扩展?

扩展消息队列需要游戏业务节点配合,将玩家按id分配到特定的消息队列。

难点主要在哪里?

多个消费worker并发操作,多个业务节点并发生产消息,在消息使用重命名key这样的中间状态时,容易发生错误导致丢失最新数据。

有哪些缺陷?

最大的缺陷是在redis宕机时有可能丢数据,数据的可靠性比成熟的MQ产品低一些。

开头提到的问题 为啥都不推荐使用redis构建消息队列? 可靠性不能得到严格保证,需要配合实际业务,并不通用。为何都没提什么细节?专用的消息队列有专用的场景,提细节可能涉及大量业务描述。有些实现可能也没有得到全方位验证,所以不提细节。redis构建消息队列是否适合?要看业务,框架,还有项目的依赖,程序员的喜好等多方面,并不一定适合,技术没有好坏,只有根据特定的需求灵活选择才是正途。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。