欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

容易被误会的Kafka消费者属性enable.auto.commit

时间:2023-04-17
前言

理解一下Kafka的读的自动提交功能。

找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记。

自动提交参数auto.commit的设置

Understanding the ‘enable.auto.commit’ Kafka Consumer property

Kafka Consumers read messages from a Kafka topic, its not a hard concept to get your head around、But behind the scenes there’s a lot more going on than meets the eye.

Say we’re consuming messages from a Topic and our Consumer crashes、once we realise that the world isn't ending, we recover from the crash and we start consuming again、We start receiving messages exactly where we left off from, its kinda neat.

假设我们正在从一个 Topic 中消费消息,这个时候我们的这个消费者(客户端)宕机了。我们意识到这不是世界的末日,我们可以从宕机中恢复,重新开始消费。我们可以从我们上一次离开的地方重新接收消息,这非常灵巧。

There’s two reasons as to why this happens、One is something referred to as the “Offset” and the other is a couple of default Consumer values.

发生这样的事情是因为两个原因。一个是一个叫 “Offset” 的东西,另外一个是一些 Consumer 的默认的值。

So whats an Offset?

The Offset is a piece of metadata, an integer value that continually increases for each message that is received in a partition、Each message will have a unique Offset value in a partition.

Offset 是一块元数据,一个整数,会针对每一个 partition 上接收到的消息而持续增长。每一个消息在一个 partition 上将会有唯一的一个Offset。

I use Keys in some of my projects, some of them I don’t ;)

So as you can see here, each message has a unique Offset, and that Offset represents the position of that message in that particular partition.

上面介绍了一下Kafka的offset是什么,offset是记录每条消息在partition里面的位置的。

When a Consumer reads the messages from the Partition it lets Kafka know the Offset of the last consumed message、This Offset is stored in a Topic named _consumer_offsets, in doing this a consumer can stop and restart without forgetting which messages it has consumed.

这里讲,offset会被存在一个叫做_consumer_offsets的主题中,这样来帮助消费者记录处理到哪里了。

When we create our Consumers, they have a set of default properties which we can override or we can just leave the default values in effect.

There are two properties that are driving this behaviour.

有两个属性需要关注。

enable.auto.commit

auto.commit.interval.ms

The first property enable.auto.commit has a default value of true and the second property auto.commit.interval.ms has a default value of 5000、These values are correct for Blizzards node-rdkafka client and the Java KafkaConsumer client but other libraries may differ.

enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。

auto.commit.interval.ms 的默认值是 5000,单位是毫秒。

So by default every 5 seconds a Consumer is going to commit its Offset to Kafka or every time data is fetched from the specified Topic it will commit the latest Offset.

这样,默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka,或者每一次数据从指定的 Topic 取回时,将会提交最后一次的 Offset。

Now in some scenarios this is the ideal behaviour but on other scenarios its not.

这样,在某些场景下,这是理想的表现,但是在其他场景下,并不是。

Say our Consumer is processing a message with an Offset of 100 and whilst processing it the Consumer fetches some more data, the Offset is commit and then the Consumer crashes、Upon coming back up it will start consuming messages from the most recent committed Offset, but how can we safely say that we haven’t lost messages and the Offset of the new message isn't later then the one of the message been processed?

这么说,我们的 Consumer 正在消费一个 Offset 是100的消息,同时这个 Consumer 取回了一些数据,这个 Offset 提交了,然后 Consumer 崩溃了。在我们回来的时候,我们会重新从最新提交的 Offset 去进行消息的消费,但是我们如何能安全地说,我们没有丢失消息,并且这个新消息的 Offset 不会比刚刚被处理的那个消息靠后呢?

What we can do is commit the Offset of messages manually after processing them、This give us full control over when we consider a message dealt with, processed and ready to let Kafka know that.

解决这个问题的方案就是我们手动地提交这个 Offset,在处理完这些消息之后。这给与了我们完全的控制,什么时候去处理一个消息,什么时候去让 Kafka 知道这个。

Firstly we have to change the value of the enable.auto.commit property.

enable.auto.commit: false

When we change this property the auto.commit.interval.ms value isnt taken into consideration.

So now we can commit our Offset manually after the processing has taken place and if the Consumer crashes whilst processing a message it will start consuming from that same Offset, no messages lost.

我们把这个参数设置为 false ,就会由我们自己手动地来处理这个事情。

Both the clients mentioned earlier in this article have methods exposed to commit the Offset.

For further reading on the clients check out the links below.

如果 enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑了。

JSDoc: Class: KafkaConsumerKafkaConsumer class for reading messages from Kafka This is the main entry point for reading data from Kafka、You…blizzard.github.io

KafkaConsumer (kafka 0.10.2.1 API)To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the…kafka.apache.org

If anyone wants any more information on Kafka or Consumers get in touch on Twitter.

Cheers,

Danny

https://twitter.com/danieljameskay

举例

与 kafka auto commit 两个配置:

enable.auto.commit:是否开启自动提交auto.commit.interval.ms:自动提交时间间隔

假设 enable.auto.commit 设置为 true,auto.commit.interval.ms 设置为 3000,试想一下会不会出现这样的问题:

poll 方法返回了 500 条数据,需要 5 秒钟才能处理完,假设在第 4 秒的时候应用挂了,offset 是不是在第 3 秒的时候已经被自动提交了,从而导致第 4 秒之后的数据“丢失”了?

正确答案是:不会的!虽然 auto.commit.interval.ms 设置为 3000,但是检查时间间隔是否过了 3 秒是由 poll 方法去触发的,所以只要在记录还没处理完之前我们没有主动去调用 poll 方法,就算时间间隔到了,也不会去自动提交。

自动提交是在哪里执行的

kafka consumer offset 的提交是有 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 来完成的,真正执行提交的有两个方法:

同步提交:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsSync异步提交:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync

同步提交

1234567891011121314151617

private void maybeAutoCommitOffsetsSync(Timer timer) { if (autoCommitEnabled) { Map allConsumedOffsets = subscriptions.allConsumed(); try { log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); if (!commitOffsetsSync(allConsumedOffsets, timer)) log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets); } catch (WakeupException | InterruptException e) { log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets); // rethrow wakeups since they are triggered by the user throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage()); } }}

调用这个方法,当我们开启了自动提交,就会触发一个同步提交。那么哪里会调用这个方法?

加入一个消费者组之前:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#onJoinPrepare关闭一个消费者之前:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#close

这两个触发点都跟我们要讨论的 auto.commit.interval.ms 问题无关,所以这里就不展开了。

异步提交

123456789

public void maybeAutoCommitOffsetsAsync(long now) { if (autoCommitEnabled) { nextAutoCommitTimer.update(now); if (nextAutoCommitTimer.isExpired()) { nextAutoCommitTimer.reset(autoCommitIntervalMs); doAutoCommitOffsetsAsync(); } }}

当 nextAutoCommitTimer 到期了就会执行 doAutoCommitOffsetsAsync() 方法进行异步提交,这个到期时间间隔就是 auto.commit.interval.ms 设置的间隔,所以我们只要跟踪 maybeAutoCommitOffsetsAsync 方法的调用方就知道什么时候会检查是否已经到期,从而进行自动异步提交。

通过 IDEA 快捷键查看,也有两个地方调用:

手动分配分区时:org.apache.kafka.clients.consumer.KafkaConsumer#assign拉取数据时:org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)

手动分配分区时调用是确保消费者之前分配的老分区 offset 的提交,也和 auto.commit.interval.ms 无关。所以,无论同步提交还是异步提交,跟 auto.commit.interval.ms 有关的只剩下 org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration) 方法了,只有这个方法在正常情况下会被多次调用的。

这就验证了文章开头的问题,只要我们没有去调用 poll 方法,就算时间间隔到了,也无法触发自动提交。

注意

如果auto_commit_interval_ms的值设置的过大,当消费者在自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好

enable.auto.commit
如果为true,则将在后台定期提交消费者的offset。
默认值为true;

总结
offset自动提交,要注意可能引起重复消费的问题

参考

https://medium.com/@danieljameskay/understanding-the-enable-auto-commit-kafka-consumer-property-12fa0ade7b65

KafkaConsumer (kafka 0.10.2.1 API) 这里是官网介绍如何使用consumer

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。