欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Kafka数据存储结构

时间:2023-06-14

Kafka 的数据分为两部分:元数据、消息数据。

元数据:元数据包括集群信息、节点信息、队列信息、主从信息、分区信息、分区分布信息等,这类信息都存储在 Zookeeper 上,Kafka 的任何一个节点都可以读取元数据信息。消息数据:生产者向集群发送的消息。生产者直接向分区的 Leader 发送消息,Leader 会同步至 Flowers 节点,分区的 Leader 及 Flowers 节点都存储了一份消息数据。

元数据结构

clustercontrollercontroller_epochbrokersconsumers 消息数据结构

Segment 元数据结构

当安装好 kafka 并启动运行以后,就会在 zookeeper 上初始化 kafka 相关数据。

主要包括以下几个目录

clustercontrollercontroller_epochbrokersconsumers

在配置 Zookeeper 连接时,可以配置目录,比如:node01:2181,node02:2181,node03:2181/kafka,这样 Kafka 的相关文件就会在 /kafka 目录下,不然就散落在根目录下。

cluster

存储 Kafka 集群信息,cluster/id 目录存储了集群的ID

{ "version":"1", "id":"SZ1VE5STTrKpIDG9zRqsfQ"}

controller

存储 Kafka 集群的 controller 中央控制器信息。

{ "version":1, // 版本号,默认为 1 "brokerid":1, // Kafka 中央控制器的实例ID "timestamp":"1642666808318" // Kafka 中央控制器变更时间}

controller_epoch

存储 Kafka 集群的中央控制器选举次数。集群第一次启动时,该数值为 1,此后,只要发生中央控制器重新选举(比如:中央控制器实例宕机,就会导致重新选举),该数值就会 +1。

brokers

broker 信息

brokers/ids 目录存储所有的实例ID(server.properties 中配置的 broker.id),获取其中一个实例的数据如下

{ "listener_security_protocol_map":{ "PLAINTEXT":"PLAINTEXT" }, "endpoints":["PLAINTEXT://node04:9092"], "jmx_port":-1, // JMX 端口号 "features":{}, "host":"node04", // 主机名或者IP地址 "timestamp":"1642666808138", // 当前 broker 初始启动时的时间戳 "port":9092, // broker 服务端的端口号 "version":5 // 版本号,默认为 1}

topic 信息

brokers/topics 目录存储所有的 Topic,获取其中一个 Topic 的数据,partitions 记录了分区分布信息。

该 Topic 拥有两个分区,分区 0 分布在实例 2、1 上,分区 1 分布的实例 3、2 上

{ "removing_replicas":{}, "partitions":{ // 分区信息 "1":[3,2], // 分区 1 "0":[2,1] // 分区 0 }, "topic_id":"6Iig0yr-Shm5buNoQP95jQ", "adding_replicas":{}, "version":3}

partition 信息

其子目录 brokers/topics/${topic}/partitions/${partition}/state 存储了 Topic 的 partition 的状态

该分区的 Leader 是实例 3,ISR 列表包括了实例 3、2

{ "controller_epoch":4, // 中央控制器选举次数 "leader":3, // 当前 partition 的 Leader "version":1, // 版本,默认为 1 "leader_epoch":0, // 当前 partition 的 Leader 选举次数 "isr":[3,2] // ISR 列表}

consumers

该节点原本用于存储消费者的 offset,已经废弃。新版本使用一个内部的 Topic 存储消费者的 offset。

当 kafka 集群第一次有消费者消费时,会自动初始化一个名为 __consumer_offsets 的 Topic,用于存储所有消费者的 offset,为了提高吞吐量,该 Topic 默认有 50 个分区。

可以通过配置文件进行设置该 Topic 的参数:

offsets.topic.num.partitions 设置分区数offsets.topic.replication.factor 设置副本数offsets.retention.minutes 消息保留时长 消息数据结构

为了保障消息可靠性,Kafka 会将接收到的消息序列化并存储到磁盘(log.dirs 配置项配置存储位置)。

Kafka 的每个分区对应一个文件夹,比如:名称为 ooxx 的 Topic,有两个分区:0、1,那么在 $log.dirs 目录下就会存在 ooxx-0 及 ooxx-1 两个文件夹,分别存储两个分区的所有数据文件。

Kafka 的数据文件由 .log 数据文件及 .index、.timeindex 索引文件共同组成。

.log 是数据文件,存储所有的消息数据。消息以批量的形式进行存储,BatchRecords 是最基本的数据存储单元,一个 BatchRecords 中包含多个 Record(即我们通常说的一条消息)。

BatchRecords 结构,baseOffset 记录了当前批次起始消息的 offset

数据文件整体结构

.index 是索引文件,存储 offset 与 position 的对应关系(offset 对应的消息在 .log 文件中的偏移量),通过 offset 可以快速读取指定消息。

索引文件结构

注意:Kafka 并不会为每条消息都保存一个索引,而是根据 log.index.interval.bytes 等配置构建稀疏的索引信息。

.timeindex 也是索引文件,存储 timestamp 与 offset 的对应关系,用于应对一些需要根据 timestamp 来定位消息的场景,.timeindex 和 .index 中的记录是一一对应的。

索引文件结构

Segment

因为一个分区的消息可能会非常多,如果都存储在一个 .log 文件中,读写效率会受到影响,无法保障高吞吐,Kafka 引入了 Segment 的概念,其实就是将原来一个文件需要存储的内容存储到多个段文件,每个段文件达到一定条件,就创建一个新的段文件,这样文件存储就变成了一个横向可扩展的存储方案。

每个段依然由 .log、.index、.timeindex 三个文件组成,文件名为当前段的起始消息 offset,这样通过文件名就可以简单定位到指定 offset 所在的文件。log.segment.bytes 配置项配置段的 .log 文件大小,当 .log 文件大于该配置值,就会创建一个新的段。

生产过程

生产者向分区 Leader 发送消息,Leader 需要将消息保存到磁盘,假如已经存在的消息文件是 00000000000000000000,那么此时,消息内容写入 00000000000000000000.log 文件,而索引写入 00000000000000000000.index 及 00000000000000000000.timeindex 文件。

生产者持续向 Leader 发送消息,00000000000000000000.log 文件达到 log.segment.bytes 配置项配置的大小,则会创建新的段文件(.log、.index、.timeindex 文件)。假设发送第 10 条消息时达到设定值,则会产生 00000000000000000010 的新的段文件

消费过程

消费者对每个分区都记录了自己已经处理过的消息的 offset,通过该 offset 向后获取消息。

消费者向分区 Leader 拉取消息并带上了自己的 offset,Leader 需要根据 offset 定位到相应的段及段文件中的位置,这里分为两步:

第一步:根据 offset 定位段文件,因为段文件的文件名就是该段中起始消息的 offset,所以通过文件名就可以定义到消费者指定 offset 的段文件。

第二步:通过第一步定位到了文件,但也不能每次都读取整个文件,所以还需要进一步定位到文件中的位置,这就需要 .index 索引文件来进行辅助,索引文件中存储的是 offset 与 position 的对应关系,通过 position 就可以快速定位到 .log 文件中的位置。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。