一、kafka简介二、kafka基本概念三、kafka消息传输流程四、kafka的存储机制五、kafka与传统MQ消息系统的区别六、Kafka如何实现高性能IO七、Kafka丢数据八、Zookeeper对于kafka的应用九、ISR、AR、OSR十、为什么Kafka不支持读写分离十一、kafka事务
kafka入门实操教程参考:https://blog.csdn.net/tttalk/article/details/121951552?spm=1001.2014.3001.5502 一、kafka简介
1.概念:分布式的发布订阅式的数据流消息系统。
2.优点
(1)高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
(2)可扩展性:kafka集群支持热扩展;
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
(4)容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障);
(5)高并发:支持数千个客户端同时读写。
3.应用场景:
(1) 日志聚合:指将不同服务器上的日志收集起来并放入一个日志中心,通过kafka以统一接口服务的方式开放给各种consumer;
(2) 消息队列(MQ):解耦生产者和消费者、缓存消息等。MQ的常见使用场景如流量削峰、数据驱动的任务依赖等等。在MQ领域,除了Kafka外还有传统的消息队列如ActiveMQ和RabbitMQ等;
(3) 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;
(4)运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
(5) 流式处理:比如spark streaming和storm。
1、Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
2、Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
3、Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
4、Segment:partition物理上由多个segment组成,后面有详细说明。
5、offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.
1.topic中partition存储分布
(1)基本原理
在Kafka文件存储中,同一个topic下有多个不同partition,每一个partition为一个文件夹,partiton命名规则为topic名称+有序序号,第一个partiton序号从0開始。如有两个topic:qq,wechat。partion一个为2,一个为3,则会产生|–qq-0|–qq-1|–wechat-0|–wechat-1|–wechat-2这5个文件夹。
(2)多broker、多副本情况
若有多个broker,可以理解平均每个broker分区数=partions*replication-factor/broker数。
这里解释下replication-factor:副本因子,用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
若partions 设置为10,replicationFactor设置为1、Broker为2.分区会均匀在broker。broker1分区为13579,broker2为246810;
若partions 设置为10,replicationFactor设置为2、Broker为2.每个broker都有副本存在。broker1和broker2副本均为1到10;
若partions 设置为3,replicationFactor设置为1、Broker为3.每个broker都有副本存在。broker1分区为1,broker2为2,broker2为3,当一个broker宕机了,该topic就无法使用了;
若partions 设置为3,replicationFactor设置为2、Broker为3.每个broker都有副本存在。broker1分区为12,broker2为23,broker2为13,当一个broker宕机了,该topic还能使用了。
2.partiton中文件存储方式
(1)每一个partion(文件夹)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件里。
(2)但每一个段segment file消息数量不一定相等,这样的特性方便old segment file高速被删除。(默认情况下每一个文件大小为1G)
(3)每一个partiton仅仅须要支持顺序读写即可了。segment文件生命周期由服务端配置参数决定。
这样做的优点就是能高速删除无用文件。有效提高磁盘利用率。
3.partiton中segment文件存储结构
(1)segment file组成:由2大部分组成。分别为index file和data file,此2个文件一一相应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。根据索引文件可以精确定位数据文件。
(2)segment文件命名规则:partion全局的第一个segment从0開始,兴许每一个segment文件名称为上一个segment文件最后一条消息的offset值。数值最大为64位long大小。19位数字字符长度,没有数字用0填充。
offset:在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它能够唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4.在partition中怎样通过offset查找message
比如读取offset=888的message,须要通过以下2个步骤查找。
(1)第一步查找segment file
从刚刚我们了解的segment文件命名规则可以知道每个segment最后一条消息的offset值,知道了这个,使用二分法很快就能找到segment file。
(2)第二步通过segment file查找message
通过第一步定位到segment file,然后通过索引index文件就能定位到数据文件。
5.优点
(1)Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
(2)通过索引信息可以快速定位message和确定response的最大大小。
(3)通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
(4)通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
稀疏索引:这类文件是将所有数据记录关键字值分成许多组,每组一个索引项,这种索引称为稀疏索引。这类文件的数据记录要求按关键字顺序排列。因此,其特点是索引项少,管理方便,但插入、删除记录代价较大。
1.Kafka持久化日志,这些日志可以重复读取和无限期保存
2.kafka是一个分布式系统,以集群的方式运行,在内部通过复制数据提升容错能力和高可用性
3.kafka支持实时的流式处理
1.批量处理消息。使用send()发送消息时会先缓存起来批量发送给broker,消费也是以批为单位。
2.使用顺序读写。只需要寻址一次就可以连续的读写下去,相比随机读写省去了很多寻址时间
3.利用缓存页PageCache加速消息读写。通过缓存加快读写速度,缓存清理策略一般是LRU(Least recently used,最近最少使用),优先保留最近常使用的那些缓存,缓存命中的效率时比较高的。
LRU实现:
最常见的实现是使用一个链表保存缓存数据,详细算法实现如下:
(1)新数据插入到链表头部;
(2) 每当缓存命中(即缓存数据被访问),则将数据移到链表头部;
(4) 当链表满的时候,将链表尾部的数据丢弃。
在Java中可以使用linkHashMap去实现LRU。
4.零拷贝技术。在消费端处理消费的过程中,需要把数据复制到用户内存空间,然后再复制到socket缓冲区,然后这个技术可以直接把数据复制到socket缓冲区。
1.消息发送
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
0—表示不进行消息接收是否成功的确认;
1—表示当Leader接收成功时确认;
-1—表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,配置-1时,不会丢失消息,下面分情况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)acks=1,同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
2.消息消费
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;
3.消息重复解决
将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。
(1)用于kafka的分布式应用。主要用于集群中不同节点进行通信。用于提交偏移量,节点失败时可以从之前的偏移量中获取,还用于leader检测、分布式同步、管理配置、集群、节点实时状态等
(2)zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
九、ISR、AR、OSR (1)概念介绍
ISR:In-Sync Replicas 副本同步队列。
AR:Assigned Replicas 所有副本。AR=ISR+OSR。
(2)运行原理
ISR是由leader维护,follower从leader同步数据有一些延迟,超过设定的阈值时都会把Broker(follower)剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
1.Kafka的幂等性
(1)使用
开启幂等性功能的方式很简单,将生产者客户端参数enable.idempotence设置为true(默认false)
(2)实现原理
①Kafka引入了producer id(以下简称PID)和序列号(sequence number)这两个概念。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。
②对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将对应的序列号的值加1。
③broker端会在内存中为每一对维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new = SN_old + 1)时,broker才会接收它。
④如果SN_new< SN_old + 1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new> SN_old + 1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,这个异常是一个严重的异常。
(2)kafka跨分区事务
引入序列号来实现幂等也只是针对每一对而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。幂等性不能跨多个分区运作,而事务可以弥补这个缺陷。从生产者的角度分析,通过事务,Kafka可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。
(1)使用
应用程序必须提供唯一的transactionalId,这个transactionalId通过客户端参数transactional.id来显式设置。事务要求生产者开启幂等特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将enable.idempotence设置为true(如果未显式设置,则KafkaProducer默认会将它的值设置为true)。
(2)实现原理
①transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。
②为了保证新的生产者启动后具有相同transactionalId的旧生产者能够立即失效,每个生产者通过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch。如果使用同一个transactionalId开启两个生产者,那么前一个开启的生产者会报错。
③具有相同transactionalId的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalId的生产者实例将不再工作。当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。
④KafkaProducer提供了5个与事务相关的方法,详细如下:
//初始化事务void initTransactions();//开启事务void beginTransaction() throws ProducerFencedException;//消费者提供在事务内的位移提交的操作void sendOffsetsToTransaction(Map
⑤在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。
举个例子,如果生产者开启事务并向某个分区值发送3条消息msg1、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了abortTransaction()方法,那么KafkaConsumer会将这些缓存的消息丢弃而不推送给消费端应用。