概念的东西都略过,感觉和rabit MQ差不多
文章目录
搭建命令
topicsproducerconsumer 搭建
搭建zookeeper
docker pull zookeeper:3.4.9docker run --privileged=true -d --name zookeeper --publish 2181:2181 -d zookeeper:3.4.9
创建kafka环境
docker pull wurstmeister/kafkadocker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
参数解释:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 kafka 配置zookeeper管理kafka的路径
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9092 把kafka的地址端口注册给zookeeper
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
可能会有OOM的情况导致无法启动kafka容器
修改Kafka的堆内存分配
添加参数-e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" -t wurstmeister/kafka
默认情况下云服务器没有设置交换区,我们需要手动设置一下
添加2G的交换分区
dd if=/dev/zero of=/swapfile bs=1k count=2048000
创建SWAP文件
mkswap /swapfile
3、激活SWAP文件
swapon /swapfile
查看SWAP信息是否正确
swapon -s
添加到fstab文件中让系统引导时自动启动
echo "/swapfile swap swap defaults 0 0" >> /etc/fstab
用命令free检查2G交换分区生效
free -m
或者
grep SwapTotal /proc/meminfo
进入kafka容器
docker exec -it kafka /bin/bash
进入路径
cd /opt/kafka/bin
退出容器
ctrl + p + q
kafka可视化工具
http://www.kafkatool.com/download.html
方便查看消息情况
集群设置
使用9093端口, broker_id设为1,修改监听端口,修改内存分配
docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" -t wurstmeister/kafka
使用9094端口, broker_id设为2,修改监听端口,修改内存分配
docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_ConNECT=146.56.246.156:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://146.56.246.156:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" -t wurstmeister/kafka
可视化效果如下图:
测试集群效果
进入一个kafka容器
docker exec -i -t kafka2 /bin/bash
cd /opt/kafka/bin
创建一个test topic,副本大小1,分片大小3
./kafka-topics.sh --zookeeper 146.56.246.156:2181 --create --topic test --replication-factor 1 --partitions 3
效果如下:
查看topic
./kafka-topics.sh --zookeeper 146.56.246.156:2181 --describe --topic test
可以看到,已经是集群环境,可以看到leader机器、副本在分区上的保存情况,和ISR列表成员
命令
进入/opt/kafka/bin
topics
查看当前服务器中的所有 topic
./kafka-topics.sh --bootstrap-server 146.56.246.156:9092 --list
创建一个test topic,副本大小1,分片大小3
./kafka-topics.sh --zookeeper 146.56.246.156:2181 --create --topic test --replication-factor 1 --partitions 3
修改topic
/kafka-topics.sh --bootstrap-server 146.56.246.156:9092 --alter --topic test --partitions 3
–bootstrap-server
–topic
–create 创建主题。
–delete 删除主题。
–alter 修改主题。注意:分区数只能增加,不能减少
–list 查看所有主题。
–describe 查看主题详细描述。
–partitions
–replication-factor
–config
生产消息
./kafka-console-producer.sh -- bootstrap-server 146.56.246.156:9092 --topic test
–bootstrap-server
–topic
–bootstrap-server
–topic
–from-beginning 从头开始消费。把主题中所有的数据都读取出来(包括历史数据)。
–group