1.安装zookeeper
2.修改配置文件
#server.properties#指定broker的id id必须为唯一的integer类型broker.id=1#数据存储的目录log.dirs=/data/kafka#指定zk地址zookeeper.connect=linux01:2181,linux02:2181,linux03:2181#可以删除topic的数据(生成环境不用配置)#delete.topic.enable=true
将配置好的kafka分发给其他机器
for i in {2..3};do scp -r kafka_2.12-2.7.2/ linux0$i:$PWD;done
修改其他机器上的broker.id
3.启动zookeeper
bin/zkServer.sh start
4.启动kafka
bin/kafka-server-start.sh -daemon config/server.properties-daemon 为后台启动服务
kafka基本命令查看kafka的topic
bin/kafka-topics.sh --list --bootstrap-server linux01:9092,linux02:9092,linux03:9092--新版,直接指定broker机器 多个broker机器用,隔开bin/kafka-topics.sh --list --zookeeper linux01:2181,linux02:2181,linux03:2181--老版本,指定zookeeper的地址
创建topic
bin/kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --create --topic test --partitions 3 --replication-factor 3//创建一个3个分区3个分区的topic
查看topic的描述信息
bin/kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --describe --topic test
删除topic
bin/kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --delete --topic test
增加topic的分区数 (只能增加,减少分区报错)
bin/kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --alter --topic test --partitions 5
命令行创建一个生产者
bin/kafka-console-producer.sh --broker-list linux01:9092,linux02:9092,linux03:9092 --topic hellokafka
命令行创建一个消费者
bin/kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic hellokafka --from-beginning
查看__consumer_offsets topic的记录的偏移量
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --formatter "kafka.coordinator.group.GroupmetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning