欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

kafka学习(一)搭建与基本命令

时间:2023-04-17

概念的东西都略过,感觉和rabit MQ差不多


文章目录

搭建命令

topicsproducerconsumer 搭建

搭建zookeeper

docker pull zookeeper:3.4.9docker run --privileged=true -d --name zookeeper --publish 2181:2181 -d zookeeper:3.4.9


创建kafka环境

docker pull wurstmeister/kafkadocker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

参数解释:

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 kafka 配置zookeeper管理kafka的路径

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9092 把kafka的地址端口注册给zookeeper

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间


可能会有OOM的情况导致无法启动kafka容器

修改Kafka的堆内存分配

添加参数-e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" -t wurstmeister/kafka


默认情况下云服务器没有设置交换区,我们需要手动设置一下

添加2G的交换分区
dd if=/dev/zero of=/swapfile bs=1k count=2048000
创建SWAP文件
mkswap /swapfile
3、激活SWAP文件
swapon /swapfile
查看SWAP信息是否正确
swapon -s
添加到fstab文件中让系统引导时自动启动
echo "/swapfile swap swap defaults 0 0" >> /etc/fstab
用命令free检查2G交换分区生效
free -m
或者
grep SwapTotal /proc/meminfo


进入kafka容器

docker exec -it kafka /bin/bash

进入路径

cd /opt/kafka/bin

退出容器
ctrl + p + q


kafka可视化工具

http://www.kafkatool.com/download.html

方便查看消息情况


集群设置

使用9093端口, broker_id设为1,修改监听端口,修改内存分配

docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" -t wurstmeister/kafka

使用9094端口, broker_id设为2,修改监听端口,修改内存分配

docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" -t wurstmeister/kafka

可视化效果如下图:


测试集群效果

进入一个kafka容器
docker exec -i -t kafka2 /bin/bash
cd /opt/kafka/bin

创建一个test topic,副本大小1,分片大小3
./kafka-topics.sh --zookeeper 146.56.246.156:2181 --create --topic test --replication-factor 1 --partitions 3

效果如下:

查看topic
./kafka-topics.sh --zookeeper 146.56.246.156:2181 --describe --topic test

可以看到,已经是集群环境,可以看到leader机器、副本在分区上的保存情况,和ISR列表成员


命令

进入/opt/kafka/bin


topics

查看当前服务器中的所有 topic
./kafka-topics.sh --bootstrap-server 146.56.246.156:9092 --list

创建一个test topic,副本大小1,分片大小3
./kafka-topics.sh --zookeeper 146.56.246.156:2181 --create --topic test --replication-factor 1 --partitions 3

修改topic
/kafka-topics.sh --bootstrap-server 146.56.246.156:9092 --alter --topic test --partitions 3

–bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
–topic 操作的 topic 名称。
–create 创建主题。
–delete 删除主题。
–alter 修改主题。注意:分区数只能增加,不能减少
–list 查看所有主题。
–describe 查看主题详细描述。
–partitions 设置分区数。
–replication-factor 设置分区副本。
–config 更新系统默认的配置

producer

生产消息
./kafka-console-producer.sh -- bootstrap-server 146.56.246.156:9092 --topic test

–bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
–topic 操作的 topic 名称。

consumer

–bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
–topic 操作的 topic 名称。
–from-beginning 从头开始消费。把主题中所有的数据都读取出来(包括历史数据)。
–group 指定消费者组名称。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。