broker :server
topic:queue组,partition:queue,默认1:1,可以1:多,每个partition对应一个磁盘中的文件
为什么要设计topic和partition,1:多的关系?
kafka的设计上broker会存在消息积压,最终msg会落地到磁盘文件中持久化,大数据量的情况下要考虑分布式存储,所以要把topic拆成几个partition,存在放在不同的物理机上实现分布式存储指定不同的consumer消费不同的partition,提升消费并行度,不需要考虑单个queue并发情况下的同步(当然一个consumer也可以消费多个partition)
创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:
bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.65.60:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test
这个topic的元信息放在/broker/topic里面,真正queue里的信息放在kafka的broker里面
msg存在kafka的磁盘文件中,默认保存一周
创建主题:1 bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.65.60:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test
元信息放zookeeper, 真正消息队列中的数据放broker
kafka默认只消费消费端启动后所生产的消息,也可以通过修改启动参数【–from-beginning】来使消费端消费存量的消息(对没有指定group的consumer而言)
Topic(Rule):逻辑概念(可以理解成一种消息转发规则),Partition:物理概念
–replication-factor设置了每个partition的总副本数(包含leader),随机选一个Replicas作为这个Partition的leader
Topic: my-replicated-topicPartition: 0Leader: 2Replicas: 2,0,1Isr: 2,0,1Topic: my-replicated-topicPartition: 1Leader: 0Replicas: 0,1,2Isr: 0,1,2
副本数量(Replicas)不能大于节点数量(broker)
[2022-02-11 11:10:58,505] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 5 larger than available brokers: 3. (kafka.admin.TopicCommand$)
例如Partition=2,Replicas=3:每个broker节点上的有2个存放某个topic消息的文件,例如topic=my-replicated-topic,文件名分别是my-replicated-topic-0,my-replicated-topic-1
所以我们可以总结:Partition决定了每个broker节点里的某个topic消息文件的数量,比如Partition=10,topic=my-replicated-topic,文件名就是my-replicated-topic-0 => my-replicated-topic-10。Replicas决定了哪些broker节点上可以有这些Partition分片。
所以只要我们把Replicas的num配成和broker的num一致,就可以实现每个broker上分布所有partition分片
Kafka只有被标记为Leader的broker节点中的partition可以写入数据,其他follow节点不行Isr:已数据同步的副本细节:不同Partition的Leader副本会尽可能选择不一样的broker节点,比如分区0和分区1,保证了分区1服务挂了以后,相同topic的数据还可以往分区0中写数据,也起了容灾作用细节:kafka的Leader是到Partiton这个级别的,没有到broker这种节点的级别,要和其他的分布式组件(如zk、nacos等)区别开。也就是kafka的broker没有Leader、Follow这些关系 Producer
什么时候用同步发送:发送已是业务逻辑的最后一步
什么时候用同步发送:发送已是业务逻辑的最后一步
什么时候用异步发送:发送完后还有其他业务逻辑
生产者主动向服务端推送batch块
Consumer消费者通过poll长轮询主动从服务端轮询msg(cur_offset_idx => end_offset_idx) (activemq,rabbitmq是服务端主动push到consumer)
问题1:如果consumer配置的auto_commit = true,interval = 1000ms,但是consumer具体的业务消费过程比较慢,5000ms才能消费完,如何保证消息的不丢失?
问题2:如果consumer配置的auto_commit = true,interval = 1000ms,但是consumer具体的业务消费过程比较快,100ms就消费完,如何保证消息的不重复消费?
解决问题1和问题2:一般业务中用手动提交(auto_commit = false)
// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了//consumer.commitSync();
当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费latest(默认) :只消费自己启动之后发送到主题的消息(根据时间戳判断,从自己能感知到的offset的下一条开始)earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
问题3:如果consumer设置auto_commit = false,比如一次poll了5条消息,处理完后再去commit。如果处理到第2条,服务挂了,下次服务启动又会重复消费这2条消息,如何避免?
解决问题3:通过在poll和业务处理逻辑之间加一层幂等性处理机制,例如redis,来保证poll到的已经消费过的数据不再进入业务处理逻辑
解决问题3:也可以处理一条commit一条来解决,但这种方式性能低,而且要保证业务处理的原子性
问题4:
ConsumerConfig.MAX_POLL_INTERVAL_MS_ConFIG如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
解决问题4:
1.扩大ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG(不推荐)
2.检查消费逻辑的性能。
3.减少一次poll的msg数量。
一个新的Group会从Broker-Consumer-Offset的log中的最后一个offset的下一位,开始消费,可以通过修改AUTO_OFFSET_RESET_ConFIG = "earliest"来重定位offset
业务场景用:AUTO_OFFSET_RESET_ConFIG = “earliest”
大数据场景用:AUTO_OFFSET_RESET_ConFIG = “latest”
问题5:如何控制消息的幂等性?
解决问题5:通过幂等性来控制重复消费,而非通过手动提交/自动提交来控制。
1、如果consumer端数据落地时有唯一标识,例如订单(order)有一个唯一索引订单号:order_code,那么就可以由数据库来控制。2、用redis:setnx类似于分布式锁的实现方式来实现幂等性,也就是consumer消费到msg先丢redis,再从redis中取。 Broker
消费的重分配:(consumer的rebalance机制)
服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,对应的Partition也会被重新分配给其他consumer,默认是10秒props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);consumer给broker发送心跳的间隔时间,broker接收到心跳。如果此时有rebalance发生会通过心跳响应将rebalance方案下发给相关的consumer,这个时间可以稍微短一点props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
创建总控制器Controller的过程:
1.所有Broker向zk发起create /controller的命令,先创建成功的就是总控制器Controller
2.所有Broker会watch监听这个节点,如果发生变化(EPHEMERAL_NODE)消失,也就是Broker崩溃,则重新发起选举