kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该 具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确 并 没有完全遵循JMS规范。
我们先来看一下Kafka中相关术语:还有个副本的概念稍后讲
这个图代表这三个broker,一个topic,两个分区,三个副本,两个消费者组,四个消费者。
服务端与客户端的通信是靠tcp协议来完成。
发送消息 kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic。
消费消息
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic。
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输 出, 默认是消费最新的消息 。使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息
所谓的偏移量的意思就是现在消费者消费到哪里了,实际上是个队列,我这个图只是个大概意思,就是假如现在有4条消息,是在这个消费者2上线之前发的,已经被消费者1消费了,这时候偏移量在最后一条消息,这个消费者上线之后可以选择是从头消费还是从最后一条加一的偏移量消费(也就是这个例子的第五条消费)
方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-begining--topic test
方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-begining--topic test
几个注意点:
消息会被存储消息是顺序存储(但不一定是顺序消费的)消息是有偏移量的消费时可以指明偏移量进行消费 单播消息的实现:
单播消息:一个消费组里 只会有一个消费者能消费到某一个topic中的消息。于是可以创建多个消费者,这些消费者在同一个消费组中。(但是如果这个topic分区的话,就可以被同一个消费组的不同消费者消费,而且一个消费者是可以消费多个分区的。但是一个分区只能被一个消费者消费)
多播消息的实现:在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。
kafka实现多播,只需要让不同的消费者处于不同的消费组即可。
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup2 --topic test
# 查看当前主题下有哪些消费组 ./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list # 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量 ./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup
Currennt-offset: 当前消费组的已消费偏移量Log-end-offset: 主题对应分区消息的结束偏移量(HW)Lag: 当前消费组未消费的消息数分区的作用:
可以分布式存储可以并行写
实际上是存在data/kafka-logs/test-0 和 test-1中的0000000.log文件中
小细节:
定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的 时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定 期清理topic里的消息,最后就保留最新的那条数据 因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配 50 个分区(可以 通过offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。 通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区 公式:hash(consumerGroupId) % __consumer_offsets主题的分区数
图中Kafka集群有两个broker,每个broker中有多个partition。一个partition只能被一个消费组里的某一个消费者消费,从而保证消费顺序。Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。一个消费者可以消费多个partition。
消费组中消费者的数量不能比一个topic中的partition数量多,否则多出来的消费者消费不到消息。
假如其中有一个消费者挂了,其他的消费者就会消费它消费的分区,这个选举方法也是有几种策略的。这个就是Kafka的rebalance机制。
rebalance机制
前提是:消费者没有指明分区消费。当消费组里消费者和分区的关系发生变化,那么就会触发rebalance机制。
这个机制会重新调整消费者消费哪个分区。
在触发rebalance机制之前,消费者消费哪个分区有三种策略:
range:通过公示来计算某个消费者消费哪个分区轮询:大家轮着消费sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整。(其他两个会打乱原来的顺序直接按照range方式或者轮询方式重新计算)