欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

day05kafka

时间:2023-04-17
一、回顾 对于topic的管理:kafka-topic.sh生产数据负载均衡:先判断是否指定分区,判断自定义分区器,在判断是否指定了key (指定类似hash,没有的话黏性分区)kafka生产数据时不丢失:应答机制和重试机制:acks应答机制(0生产者发送数据到对应的分区,不用返回ack,直接发送下一条;1.生产者写入数据到对应的分区leader副本中,kafka返回ack,生产者收到ack再发送下一条;all:生产者写入数据到对应的分区的leader副本中,等待所有可用副本同步成功,再返回ack)retries重试机制:如果生产者没有收到ack,就会重新发送这条数据。kafka消费者消费数据的规则:以消费者组第一次消费和第一次以后开始,(消费者组之前在kafka中不存在:没有任何一个消费者之前运行过:第一次费;auto.offset.reset=latest |earliest;消费者已经存在于kafka中根据offset进行消费)kafka保障消费数据不丢失及不重复:只要让消费者组中的消费者严格的按照offset消费每个分区;kafka让所有的消费者自动或手动将commit offset提交到一个topic中(__ consumer_offsets);或自己管理offset,存储在外部系统消费者负载均衡策略:一个分区只能被一个消费者消费,一个消费者可以消费多个分区;分配策略:范围分配:每个消费者消费一定范围的分区,尽量均分,如果不能均分,优先分配给编号小的消费者;轮询分配:将所有topic和分区进行排序,轮询分配给每个消费者(不适用于每个消费者订阅的topic不一样的场景);黏性分配:分配更加均衡,在消费者故障负载均衡,尽量保持原先得分配不动。 二、kafka的存储过程(读写速度很快的原因) 生产者每一条数据,将数据放入到一个本地缓存中,等待批次构建,如果batch满了或者达到一定的时间,提交写入请求(批量提交,不是每生产一条就写入一条)生产者根据分区规则构建数据分区,获取对应的元数据,将请求提交给leader副本所在的broker(生产者怎么知道leader在哪台机器:生产者请求kafka服务端9092,kafka的服务端返回元数据存储的地址(zk地址)。生产者根据zk地址到zk中获取这个分区的元数据,找到这个分区的leader副本所在的broker);zk中的元数据:
分区的元数据:
先写入这台broker的pageCache(操作系统)中,kafka也用了内存机制来实现数据的快速读写 (kafka使用日志方式记录数据,并不是直接写入磁盘;所有磁盘文件在生成时操作系统的内存中都必须建立对应的映射;操作系统级别的内存:pagecache页缓存)操作系统的后台的自动将页缓存中的数据sync同步到磁盘文件中,最新的segment.log中(空间:dirty page所占内存达到10%;时间:数据在page cachezhong 达到配置时间默认30s)其他的follower到leader中同步数据segment:加快查询效率(通过将分区的数据根据offset划分成多个比较小的segment文件,在检索数据时可以根据offset快速定位数据所在的segment;加载单个segment文件查询数据,可以提高查询效率);减少删除数据IO(删除数据时kafka可以以segment为单位删除某个segment的数据,避免一条一条删除,增加IO负载,性能较差);.log:存储真实数据。.index/.timeindex:存储对应的文件索引。segment划分规则:按时间周期生成log.roll.hours=168 七天重新生成一个新的segment,按照文件大小生成log.segment.bytes=1073741824。 segment命名规则:以当前文件存储的最小的offset来命名kafka读取过程:消费者根据topic、partition、offset提交给kafka请求读取数据;kafka根据元数据信息,找到对应的这个分区对应的leader副本节点;请求leader副本所在的broker,先读pagecache,通过零拷贝机制读取pagecache(实现0磁盘读写,直接将内存数据发送到网络端口,实现传输);如果pagecache中没有,读取segement文件端,先根据offset找到要读取的segment;将.log文件对应的.index文件加载带内存中,根据.index中索引的信息找到offset在.log文件中的最近的位置(index中记录的是稀疏索引:不是每一条数据都有索引);读取.log根据索引读取对应的offset数据。index索引设计:全量索引(全部有索引)和稀疏索引(部分数据有索引)减少了索引存储的数据量加快索引的索引检索效率。log.index.interval.bytes=4096
两列:第一列:这个文件中的第几条;第二列:这个文件中的第几个字节开始
如果直接使用offset作为索引第一列导致索引过大数据清理及压缩:log.cleaner.enable=true(开启清理) log.cleanup.policy=delete | compact
10.1delete:基于存活时间规则:最常用的方式:log.retention.ms/minutes/hours=168/7day
10.2基于文件大小规则:删除文件阈值如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,-1表示不适用这种规则:log.retention.bytes=-1
10.3基于offset消费规则:将明确已经消费的offset的数据删除。
判断已经消费到什么位置:编写一个文件offset.json
10.4压缩规则(合并)log.cleanup.policy=compact
将重复的更新数据的老版本删除,保留新版本,要求每条数据必须要有key,根据key来判断是否重复
三.副本机制(AR,ISR,OSR) 分区数据的安全性:每个kafka中分区都可以构建多个副本,相同分区的副本存储在不同节点上(leader对外提供读写数据,follower与leader同步数据,如果leader故障,选举一个新的leader,选举通过kafka的主节点controller实现)AR:all Replicas所有副本,指的是一个分区在所有节点上的副本ISR:IN-Sync-Replicas可用副本所有正在与leader同步的follower副本(列表中按照优先级排列controller根据副本同步状态及broker健康状态)越靠前越会成为leader,如果leader故障会从ISR列表中选取新的leader;如果生产者写入数据 :ack=all,写入所有ISR列表中的副本就返回ackOSR:Out-Sync-Replicas不可用副本与leader副本的同步差距大;网络故障等外部环境因素;判断OSR副本:
LW:low_watermark:最低消费的offsetHW:high_watermark:最高消费的offset,当前分区的最小的leoLEO:log end offset:当前分区最新的offset位置+1,即将写入的下一个offset的位置LSO:log start offset:当前分区最老的offset位置,正常为0
kafka分区副本leader选举
四.kafka的数据安全 消息队列的一次性语义:
1.1 at-most-once:至多一次,数据丢失
1.2 at-least-once: 至少一次,数据重复
1.3 exactly-once:有且仅有一次,只消费处理成功一次所有消息队列的目标保障生产数据不丢失:ack+重试如果ISR只有一个副本,写入成功
生产部重复:ack没有返回,导致重复写入
幂等性机制:f(x)=f(f(x))在每条数据中增加一个数据id,下一条数据会比上一条数据id多1,kafka会根据id进行判断
消费数据不重复丢失kafka可视化工具安装:
修改配置文件:


修改配置文件:

export KAFKA_HOME=/export/server/kafka_2.12-2.4.1export PATH=:$KAFKA_HOME/bin:$PATH

添加权限:chmod u+x ke.sh
启动:ke.sh start
界面:node3:8048/ke
用户名密码:admin/123456

数据积压:消费跟不上生产速度,导致处理延迟(消费者组的并发能力不够,消费者处理失败,网络故障;解决:提高消费者组中消费者并行度,找到网络故障,分析处理失败的原因)

限流:
五.常见面试题 为什么kafka读写速度很快:写:先写内存,同步磁盘:顺序写机制(速度可以和内存媲美);
读:先读内存,读内存使用零拷贝机制;基于索引的顺序检索。什么是cap理论,kafka满足那两个
答:c:一致性,任何一台机器写入数据,其他节点也可以读取到
a:可用性,如果一个节点故障,其他节点可以正常提供数据服务
p:分区容错性,如果某个分区故障,这个分区的数据照样可用
kafka满足ca

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。