二、Kafka 快速入门 1、集群规划 2、集群部署
下载地址
1 )解压安装包:
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
2 )修改解压后的文件名称:
mv kafka_2.12-3.0.0/ kafka
3 )进入到/opt/module/kafka 目录,修改配置文件
cd config/vim server.properties
输入以下内容:
#broker 的全局唯一编号,不能重复,只能是数字。broker.id=0#处理网络请求的线程数量num.network.threads=3#用来处理磁盘 IO 的线程数量num.io.threads=8#发送套接字的缓冲区大小socket.send.buffer.bytes=102400#接收套接字的缓冲区大小socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小socket.request.max.bytes=104857600#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔log.dirs=/opt/module/kafka/datas#topic 在当前 broker 上的分区个数num.partitions=1#用来恢复和清理 data 下数据的线程数量num.recovery.threads.per.data.dir=1# 每个 topic 创建时的副本数,默认时 1 个副本offsets.topic.replication.factor=1#segment 文件保留的最长时间,超时将被删除log.retention.hours=168#每个 segment 文件的大小,默认最大 1Glog.segment.bytes=1073741824# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期log.retention.check.interval.ms=300000#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
3、集群 启停脚本在/home/atguigu/bin 目录下创建文件 kf.sh 脚本文件
vim kf.sh
脚本如下:
#! /bin/bashcase $1 in"start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done};;"stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done};;esac
添加执行权限
chmod +x kf.sh
启动集群命令
kf.sh start
停止集群命令
kf.sh stop
3、Kafka 命令行操作
三、Kafka 生产者 1、生产者 消息发送流程 ①发送原理
在消息发送的过程中,涉及到了 两个线程 ——main 线程和Sender 线程。
在 main 线程中创建了 一个 双端列队列 RecordAccumulator。
main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
②生产者重要参数列表
2、异步送 发送 API ①普通异步发送
需求:创建 Kafka生产者,采用异步的方式发送到 Kafka Broker
导入依赖
编写不带回调函数的 API代码:
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) throws InterruptedException {// 1、创建 kafka 生产者的配置对象Properties properties = new Properties();// 2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 3、创建 kafka 生产者对象KafkaProducer
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元
数据信息(Recordmetadata)和异常信息(Exception)
如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1、创建 kafka 生产者的配置对象Properties properties = new Properties();// 2、给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3、创建 kafka 生产者对象KafkaProducer
只需在异步发送的基础上,再调用一下 get()方法即可。
四、生产者分区 1、分区好处 2、生产者发送消息的分区策略 ①默认的分区器 DefaultPartitioner ②自定义分区器
实现步骤:
(1)定义类实现 Partitioner 接口。(2)重写 partition()方法。
import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[]keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取消息String msgValue = value.toString();// 创建 partitionint partition;// 判断消息是否包含 atguiguif (msgValue.contains("atguigu")){partition = 0;}else {partition = 1;}// 返回分区号return partition;}// 关闭资源@Overridepublic void close() {}// 配置方法@Overridepublic void configure(Map
使用分区器的方法,在生产者的配置中添加分区器参数。
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");KafkaProducer
五、生产者 如何提高吞吐量
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) throwsInterruptedException {// 1、创建 kafka 生产者的配置对象Properties properties = new Properties();// 2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");// 3、创建 kafka 生产者对象KafkaProducer
六、数据可靠性
回顾发送流程:
ack 应答原理:
ACK应答级别:
在配置properties中指定使用对应的ack级别
七、数据去重 1、数据传递语义 2、幂等性 ①幂等性原理 ②如何使用幂等性
开启参数 enable.idempotence 默认为 true,false关闭。
3、生产者事务 ①Kafka事务原理 ②Kafka的事务一共有如下 5个 API// 1 初始化事务void initTransactions();// 2 开启事务void beginTransaction() throws ProducerFencedException;// 3 在事务内提交已经消费的偏移量(主要用于消费者)void sendOffsetsToTransaction(Map
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerTransactions {public static void main(String[] args) throws InterruptedException {// 1、创建 kafka 生产者的配置对象Properties properties = new Properties();// 2、给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 设置事务 id(必须),事务 id 任意起名properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");// 3、创建 kafka 生产者对象KafkaProducer
八、数据有序 九、数据乱序
十、Broker 工作流程 1、Zookeeper 存储的 Kafka 信息
启动 Zookeeper 客户端:
bin/zkCli.sh
通过 ls命令可以查看 kafka 相关信息:
[zk: localhost:2181(CONNECTED) 2] ls /kafka
2、Kafka Broker总体工作流程 3、Broker 重要参数
4、生产经验 —— 节点服役和退役
服役新节点
修改 haodoop105中 kafka的 broker.id为 3。保证唯一即可
执行 负载均衡 操作
创建一个要均衡的主题:
vim topics-to-move.json
{"topics": [{"topic": "first"}],"version": 1}
生成一个负载均衡的计划:
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3中):
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
执行副本存储计划:
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:Reassignment of partition first-0 is complete.Reassignment of partition first-1 is complete.Reassignment of partition first-2 is complete.Clearing broker-level throttles on brokers 0,1,2,3Clearing topic-level throttles on topic first
退役旧节点
执行负载均衡操作:
先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
创建一个要均衡的主题:
vim topics-to-move.json
{"topics": [{"topic": "first"}],"version": 1}
创建执行计划:
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
Current partition replica assignment{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。
vim increase-replication-factor.json{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
执行副本存储计划:
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本存储计划:
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verifyStatus of partition reassignment:Reassignment of partition first-0 is complete.Reassignment of partition first-1 is complete.Reassignment of partition first-2 is complete.Clearing broker-level throttles on brokers 0,1,2,3Clearing topic-level throttles on topic first
执行停止命令
bin/kafka-server-stop.sh
5、Kafka副本 ①副本基本信息
ISR,和Leader通讯正常的Follower集合
OSR,和Leader通讯不正常的Follower集合
②Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper的。
③Leader 和 Follower 故障处理
如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka
底层如何分配存储副本呢?
查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
⑥生产经验 ——Leader Partition 负载 平衡 推荐关闭,或设置percentage>20%
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
6、文件存储 ①文件存储机制 ②思考:Topic 数据到底存储 在什么位置?
十一、Kafka 消费者 1、Kafka 消费方式 2、消费者工作流程 ①消费者总体工作流程 ②消费者组原理
消费者组
消费者组初始化流程
消费者组详细消费流程
③消费者重要参数
需求
创建一个独立消费者,消费 first主题中数据。
注意:
在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.ArrayList;import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建消费者对象KafkaConsumer
明天继续!!!