Kafka简单介绍Kafka 3.0 documentation 2、KAFKA群起脚本
zookeeper群起脚本
[atguigu@hadoop102 bin]$ cat myzookeeper #!/bin/bashif [ $# -lt 1 ]then echo "Input Args Error....." exitfifor i in hadoop102 hadoop103 hadoop104docase $1 instart) echo "==================START $i ZOOKEEPER===================" ssh $i /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start;;stop) echo "==================STOP $i ZOOKEEPER===================" ssh $i /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh stop;;*) echo "Input Args Error....." exit;; esacdone
kafka群起脚本
[atguigu@hadoop102 bin]$ cat mykafka #!/bin/bashif [ $# -lt 1 ]then echo "Input Args Error....." exitfifor i in hadoop102 hadoop103 hadoop104docase $1 instart) echo "==================START $i KAFKA===================" ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties;;stop) echo "==================STOP $i KAFKA===================" ssh $i /opt/module/kafka/bin/kafka-server-stop.sh stop;;*) echo "Input Args Error....." exit;; esacdone
topic增删改查
# 创建topic[atguigu@hadoop102 ~]$ kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first# 查看topic[atguigu@hadoop102 ~]$ kafka-topics.sh --zookeeper hadoop102:2181 --list# 删除topic[atguigu@hadoop102 ~]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic seconder# 查看topic属性[atguigu@hadoop102 ~]$ kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first# 修改topic属性[atguigu@hadoop102 ~]$ kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
topic生产消费
# 产生实时数据[atguigu@hadoop102 ~]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic first# 消费实时数据[atguigu@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first# 会把主题中现有的所有的数据都读取出来[atguigu@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning
3、kafka架构 3.1、Kafka文件存储机制 3.2、kafka分区策略 Kafka 分区策略
Kafka分区分配策略-RangeAssignor、RoundRobinAssignor、StickyAssignor
kafka数据可靠性深度解读
1、Producer收到Topic partition的ACK才会进行下一轮的message发送,否则重新发送数据;
2、Topic partition全部follower(ISR中的follower)完成同步,leader才发送ACK;
3、in-sync replica set (ISR),意为和leader保持同步的follower集合;
4、follower长时间未向leader同步数据,其将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定;
5、不是所有情况都需要ISR中的follower响应后,leader才返回ACK(ACK={0(At Most Once),1(会丢失数据),-1(At Least Once)});
6、LEO指的是每个follower最大的offset,ISR队列中最小的LEO称为HW(消费者能见到的最大的offset);
7、follower发生故障后会被临时踢出ISR,待该follower恢复后,follower读取本地磁盘记录的上次HW,将log文件中高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了;
8、leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据;
总结:ACK=-1时保证数据不丢失但可能会重复,而follower和leader故障恢复机制只能保证数据一致性。
3.4、Exactly Once语义0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least once + 幂等性 = Exactly Once
要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对
但是Producer重启PID就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区(即同一个Producer向不同Partition发送同一个message)或跨会话(即在不同会话下(重启前后),同一个Producer向同一个Partition发送同一个message)的Exactly Once。
3.5、kafka消费者consumer采用pull(拉)模式从broker中读取数据,pull模式则可以根据consumer的消费能力以适当的速率消费消息。pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。
# 查看__consumer_offsets内容[atguigu@hadoop102 __consumer_offsets-49]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/module/kafka/logs/__consumer_offsets-49/00000000000000000000.index --print-data-log[atguigu@hadoop102 __consumer_offsets-49]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/module/kafka/logs/__consumer_offsets-49/00000000000000000000.log --print-data-log
Kafka为什么能那么快?高效读写数据,原来是这样做到的
使用内核空间、Page Cache 进行Kafka数据的交互时,在服务器宕机机时可能会造成数据丢失。
Zookeeper 在 Kafka 中的作用
Broker注册
# 打开zookeeper客户端[atguigu@hadoop102 bin]$ zkCli.sh # 查看所有broker的ID[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids[0, 1, 2]# 获取Broker注册信息[zk: localhost:2181(CONNECTED) 5] get /brokers/ids/0{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://hadoop102:9092"],"jmx_port":-1,"host":"hadoop102","timestamp":"1643511732953","port":9092,"version":4}# Broker创建的节点类型是临时节点[zk: localhost:2181(CONNECTED) 6] stat /brokers/ids/0cZxid = 0xe0000007ectime = Sun Jan 30 11:02:12 CST 2022mZxid = 0xe0000007emtime = Sun Jan 30 11:02:12 CST 2022pZxid = 0xe0000007ecversion = 0dataVersion = 1aclVersion = 0ephemeralOwner = 0x40000036a500001dataLength = 188numChildren = 0
Topic注册
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics[__consumer_offsets, first, test-topic][zk: localhost:2181(CONNECTED) 11] get /brokers/topics/first{"version":2,"partitions":{"2":[0,2,1],"1":[2,1,0],"0":[1,0,2]},"adding_replicas":{},"removing_replicas":{}}# 查看topics[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics[__consumer_offsets, first, test-topic][zk: localhost:2181(CONNECTED) 2] ls /brokers/topics/first[partitions]# 查看first的partitions信息[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/first/partitions[0, 1, 2][zk: localhost:2181(CONNECTED) 4] get /brokers/topics/first/partitions/2/state{"controller_epoch":11,"leader":1,"version":1,"leader_epoch":27,"isr":[1,2,0]}[zk: localhost:2181(CONNECTED) 5] get /brokers/topics/first/partitions/1/state{"controller_epoch":11,"leader":1,"version":1,"leader_epoch":27,"isr":[1,2,0]}[zk: localhost:2181(CONNECTED) 6] get /brokers/topics/first/partitions/0/state{"controller_epoch":11,"leader":1,"version":1,"leader_epoch":31,"isr":[1,2,0]}# 查看kafka的topic信息[atguigu@hadoop102 ~]$ kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic firstTopic: firstPartitionCount: 3ReplicationFactor: 3Configs: Topic: firstPartition: 0Leader: 1Replicas: 1,0,2Isr: 1,2,0Topic: firstPartition: 1Leader: 1Replicas: 2,1,0Isr: 1,2,0Topic: firstPartition: 2Leader: 1Replicas: 0,2,1Isr: 1,2,0
3.6、事务 Kafka Exactly Once和事务
kafka事务机制
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。
为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
3.6.2 Consumer事务(精准一次性消费)上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
如果想完成Consumer端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如mysql)。
4、Kafka API 4.1、Producer APIProperties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer
Since the send call is asynchronous it returns a Future for the Recordmetadata that will be assigned to this record、Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
kafka——Producer API 4.2、Consumer API
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
Kafka核心API——Consumer消费者java并发原子类AtomicBoolean解析kafka客户端–wakeup方法 5、Kafka Eagle
Kafka-Eagle 安装到使用全教程
Kafka Eagle安装详情及问题解答
登录页面查看监控数据:http://192.168.202.102:8048/ke
Linux mmap内存映射KafkaProducer Sender 线程详解(含详细的执行流程图)