Kafka 的数据分为两部分:元数据、消息数据。
元数据:元数据包括集群信息、节点信息、队列信息、主从信息、分区信息、分区分布信息等,这类信息都存储在 Zookeeper 上,Kafka 的任何一个节点都可以读取元数据信息。消息数据:生产者向集群发送的消息。生产者直接向分区的 Leader 发送消息,Leader 会同步至 Flowers 节点,分区的 Leader 及 Flowers 节点都存储了一份消息数据。
元数据结构
clustercontrollercontroller_epochbrokersconsumers 消息数据结构
Segment 元数据结构
当安装好 kafka 并启动运行以后,就会在 zookeeper 上初始化 kafka 相关数据。
主要包括以下几个目录
clustercontrollercontroller_epochbrokersconsumers
cluster在配置 Zookeeper 连接时,可以配置目录,比如:node01:2181,node02:2181,node03:2181/kafka,这样 Kafka 的相关文件就会在 /kafka 目录下,不然就散落在根目录下。
存储 Kafka 集群信息,cluster/id 目录存储了集群的ID
{ "version":"1", "id":"SZ1VE5STTrKpIDG9zRqsfQ"}
controller存储 Kafka 集群的 controller 中央控制器信息。
{ "version":1, // 版本号,默认为 1 "brokerid":1, // Kafka 中央控制器的实例ID "timestamp":"1642666808318" // Kafka 中央控制器变更时间}
controller_epoch存储 Kafka 集群的中央控制器选举次数。集群第一次启动时,该数值为 1,此后,只要发生中央控制器重新选举(比如:中央控制器实例宕机,就会导致重新选举),该数值就会 +1。
brokersbroker 信息
brokers/ids 目录存储所有的实例ID(server.properties 中配置的 broker.id),获取其中一个实例的数据如下
{ "listener_security_protocol_map":{ "PLAINTEXT":"PLAINTEXT" }, "endpoints":["PLAINTEXT://node04:9092"], "jmx_port":-1, // JMX 端口号 "features":{}, "host":"node04", // 主机名或者IP地址 "timestamp":"1642666808138", // 当前 broker 初始启动时的时间戳 "port":9092, // broker 服务端的端口号 "version":5 // 版本号,默认为 1}
topic 信息
brokers/topics 目录存储所有的 Topic,获取其中一个 Topic 的数据,partitions 记录了分区分布信息。
该 Topic 拥有两个分区,分区 0 分布在实例 2、1 上,分区 1 分布的实例 3、2 上
{ "removing_replicas":{}, "partitions":{ // 分区信息 "1":[3,2], // 分区 1 "0":[2,1] // 分区 0 }, "topic_id":"6Iig0yr-Shm5buNoQP95jQ", "adding_replicas":{}, "version":3}
partition 信息
其子目录 brokers/topics/${topic}/partitions/${partition}/state 存储了 Topic 的 partition 的状态
该分区的 Leader 是实例 3,ISR 列表包括了实例 3、2
{ "controller_epoch":4, // 中央控制器选举次数 "leader":3, // 当前 partition 的 Leader "version":1, // 版本,默认为 1 "leader_epoch":0, // 当前 partition 的 Leader 选举次数 "isr":[3,2] // ISR 列表}
consumers该节点原本用于存储消费者的 offset,已经废弃。新版本使用一个内部的 Topic 存储消费者的 offset。
当 kafka 集群第一次有消费者消费时,会自动初始化一个名为 __consumer_offsets 的 Topic,用于存储所有消费者的 offset,为了提高吞吐量,该 Topic 默认有 50 个分区。
可以通过配置文件进行设置该 Topic 的参数:
offsets.topic.num.partitions 设置分区数offsets.topic.replication.factor 设置副本数offsets.retention.minutes 消息保留时长 消息数据结构
为了保障消息可靠性,Kafka 会将接收到的消息序列化并存储到磁盘(log.dirs 配置项配置存储位置)。
Kafka 的每个分区对应一个文件夹,比如:名称为 ooxx 的 Topic,有两个分区:0、1,那么在 $log.dirs 目录下就会存在 ooxx-0 及 ooxx-1 两个文件夹,分别存储两个分区的所有数据文件。
Kafka 的数据文件由 .log 数据文件及 .index、.timeindex 索引文件共同组成。
.log 是数据文件,存储所有的消息数据。消息以批量的形式进行存储,BatchRecords 是最基本的数据存储单元,一个 BatchRecords 中包含多个 Record(即我们通常说的一条消息)。
BatchRecords 结构,baseOffset 记录了当前批次起始消息的 offset
数据文件整体结构
.index 是索引文件,存储 offset 与 position 的对应关系(offset 对应的消息在 .log 文件中的偏移量),通过 offset 可以快速读取指定消息。
索引文件结构
注意:Kafka 并不会为每条消息都保存一个索引,而是根据 log.index.interval.bytes 等配置构建稀疏的索引信息。
.timeindex 也是索引文件,存储 timestamp 与 offset 的对应关系,用于应对一些需要根据 timestamp 来定位消息的场景,.timeindex 和 .index 中的记录是一一对应的。
索引文件结构
Segment因为一个分区的消息可能会非常多,如果都存储在一个 .log 文件中,读写效率会受到影响,无法保障高吞吐,Kafka 引入了 Segment 的概念,其实就是将原来一个文件需要存储的内容存储到多个段文件,每个段文件达到一定条件,就创建一个新的段文件,这样文件存储就变成了一个横向可扩展的存储方案。
每个段依然由 .log、.index、.timeindex 三个文件组成,文件名为当前段的起始消息 offset,这样通过文件名就可以简单定位到指定 offset 所在的文件。log.segment.bytes 配置项配置段的 .log 文件大小,当 .log 文件大于该配置值,就会创建一个新的段。
生产过程
生产者向分区 Leader 发送消息,Leader 需要将消息保存到磁盘,假如已经存在的消息文件是 00000000000000000000,那么此时,消息内容写入 00000000000000000000.log 文件,而索引写入 00000000000000000000.index 及 00000000000000000000.timeindex 文件。
生产者持续向 Leader 发送消息,00000000000000000000.log 文件达到 log.segment.bytes 配置项配置的大小,则会创建新的段文件(.log、.index、.timeindex 文件)。假设发送第 10 条消息时达到设定值,则会产生 00000000000000000010 的新的段文件
消费过程
消费者对每个分区都记录了自己已经处理过的消息的 offset,通过该 offset 向后获取消息。
消费者向分区 Leader 拉取消息并带上了自己的 offset,Leader 需要根据 offset 定位到相应的段及段文件中的位置,这里分为两步:
第一步:根据 offset 定位段文件,因为段文件的文件名就是该段中起始消息的 offset,所以通过文件名就可以定义到消费者指定 offset 的段文件。
第二步:通过第一步定位到了文件,但也不能每次都读取整个文件,所以还需要进一步定位到文件中的位置,这就需要 .index 索引文件来进行辅助,索引文件中存储的是 offset 与 position 的对应关系,通过 position 就可以快速定位到 .log 文件中的位置。