# kafka
问题整理如下:1、因代码问题导致消费者在使用过程中逻辑不是幂等的,造成数据库插入数据,单kafka不提交offset,当再次启动kafka时,存在重复消费的情况,造成数据库流入脏数据2、在修复数据的过程中,因kafka的offset错误重置,导致部分历史消息进行错误消费。3、存在消费者因数据库链接等问题,造成在kafka允许时间内不进行提交,kafka进行重新分配的过程。 引出有关于kafka的相关问题 一、kafka的消息是如何生产的 二、kafka的消息是如何存储的 三、kafka的消息是如何消费的 四、重置及查询过程中可能涉及到的一些命令
一、kafka的消息是如何生产的
首先生产者在生产消息的时候会存在topic分区的概念,一个topic存在多个partion,那么生产的消息进入具体哪个partion遵循四种策略方式,目前生产上基本基于以下按照key的分配方式
按照key来定义分区,存在一个问题,就是数据倾斜的问题,比如我们使用公司id来作为key进行消息发送,因为key值相同,那么会出现一个公司数据量很大,一个公司数据量很小,量大的公司会分配进入一个分区,该分区的消息数量要远超过其他分区,从而给该节点服务器会造成巨大的压力和数据的阻塞。 二、kafka的消息是如何存储的已知消息已经通过生产者生产到对应的partition中。 参考的链接:[1]有关kafka存储的参考文献一个partion中,存在多个segment 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
写message消息从java堆转入page cache(即物理内存)。由异步线程刷盘,消息从page cache刷入磁盘。
segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
topic partition offset 这三个唯一确定一条消息。
消息的消费模型有两种,推送模型(push)和拉取模型(pull)。
基于推送模型(push)的消息系统,消息代理在将消息推送到消费者后,标记这条消息已经消费,但这种方式无法很好地保证消费被处理。缺点:标记为消费后,其他消费者则不可以再消费了,不可取。用pull拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息。由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
读message消息直接从page cache转入socket发送出去。当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁 盘Load消息到page cache,然后直接从socket发出去
生产上可能存在几种情况
1、消费者消费的速度跟不上生产者生产消息的速度
2、生产者生产的速度跟不上消费的速度
3、topic的某一个partion挂掉了
以上几种情况可能会导致当前的消费者无法去消费信息了,必然会影响业务的使用
kafka构建了rebalance机制,也就是在均衡的机制
发生rebalance的情况有以下几种:
1、消费者组的consumer个数发生了变化
2、订阅的topic个数发生了变化
3、分区发生了变化
等等
**rebalance过程中,所有的消费者将不再进行消费,直到rebalance过程完成
消费者分区分配策略默认使用Range范围分配策略
计算公式为:
n = 分区数量/消费者数量
m = 分区数量%消费者数量
前m个消费者消费n+1个,剩余消费者消费n个
按照这个公式来计算,3个消费者,7个分区
本身还存在RoundRobin策略和Stricky粘性分配策略,本次先不赘述
可参考:[2]
本次记录offset重置命令
./kafka-consumer-groups.sh --execute --bootstrap-server 172.17.195.178:9092 --group testaaa --reset-offsets --topic thirdparty_clue_callback:0 --to-offset 470//将testaaa消费者组中 topic为thirdparty_clue_callback,分区为0的offset重置为470./kafka-consumer-groups.sh --describe --bootstrap-server 172.17.195.178:9092 --group testaaa//查询消费者组数据./kafka-topics.sh --zookeeper localhost:2181 --describe --topic tanma_shop_hupan_topic_trade//查询topic数据//创建外部订单写入topic./kafka-topics.sh --zookeeper localhost:2181 --create --topic tanma_mall_order_topic --partitions 12 --replication-factor 2
还有一些借鉴的其他命令[3]