欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Kafkaproducer的事务和幂等性

时间:2023-07-23

背景:kafka 客户端之producer API发送消息以及简单源码分析

从Kafka 0.11开始,KafkaProducer又支持两种模式:幂等生产者和事务生产者。幂等生产者加强了Kafka的交付语义,从至少一次交付到精确一次交付。特别是生产者的重试将不再引入重复。事务性生产者允许应用程序原子地将消息发送到多个分区(和主题)。

幂等性

Kafka在 0.11 版本引入了一项重大特性,幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。
拿 http 举例来说,一次或多次请求,得到的响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。

如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。

幂等性基本原理


对于Kafka来说,要解决的是生产者发送消息的幂等问题。在生产者生产消息时,如果出现 retry 时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。

为了实现生产者的幂等性,Kafka 引入了 Producer ID(PID)和 Sequence Number 的概念。

PID:每个 Producer 在初始化时,都会分配一个唯一的 PID,这个 PID 对用户来说,是透明的Sequence Number:针对每个生产者(对应 PID )发送到指定主题分区的消息都对应一个从 0 开始递增的 Sequence Number,Server 端就是根据这个值来判断数据是否重复


producer初始化会由server端生成一个PID,然后发送每条信息都包含该PID和sequence number,在server端,是按照partition同样存放一个sequence numbers 信息,通过判断客户端发送过来的sequence number与server端number+1差值来决定数据是否重复或者漏掉。

当 Producer 发送消息给 Broker 时,Broker 接收到消息并将其追加到消息流中。此时,Broker 返回 Ack 信号给 Producer 时,发生异常导致 Producer 接收 Ack 信号失败。对于 Producer 来说,会触发重试机制,将消息再次发送,但是,由于引入了幂等性,在每条消息中附带了 PID(Producer ID)和Sequence Number。相同的 PID 和 Sequence Number 发送给 Broker,而之前 Broker 缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条,不会出现重复发送的情况。

生成PID的流程

//在执行创建事务时Producer producer = new KafkaProducer(props);//会创建一个Sender,并启动线程,执行如下run方法Sender{ void run(long now) { if (transactionManager != null) { try { ........ if (!transactionManager.isTransactional()) { // 为idempotent producer生成一个producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {

为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_ConNECTION 小于等于5

通常情况下为了保证数据顺序性,我们可以通过MAX_IN_FLIGHT_REQUESTS_PER_ConNECTION=1来保证,这个也只是针对单实例。在kafka2.0+版本上,只要开启幂等性,不用设置这个参数也能保证发送数据的顺序性。

其实这里,要求 MAX_IN_FLIGHT_REQUESTS_PER_ConNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。

假设应用将 MAX_IN_FLIGHT_REQUESTS_PER_ConNECTION 设置为 6,假设发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求对应的 batch 数据,这时候假设请求 1 发送失败,需要重试,当重试的请求发送过来后,首先先检查是否为重复的 batch,这时候检查的结果是否,之后会开始 check 其 sequence number 值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力(相当于client 狂发错误请求)。

幂等性的注意事项

幂等性 Producer 只能保证单分区上的幂等性:即只能保证某个主题上的一个分区上不出现重复消息,无法实现多个分区的幂等性幂等性 Producer 只能实现单会话上的幂等性,不能实现跨会话的幂等性

会话:Producer 进程的一次运行,如果重启 Producer 进程,将丢失幂等性保证

配置幂等性

properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

要启用幂等(idempotence),必须将enable.idempotence配置设置为true。 如果设置,则retries(重试)配置将默认为Integer.MAX_VALUE,acks配置将默认为all,如果显性的将acks设置为0,-1,那么将会报错。

此外,如果send(ProducerRecord)即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。最后,生产者只能保证单个会话内发送的消息的幂等性。

Exactly Once语义

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once(至少一次) 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即At Most Once(最多一次) 语义。
At Least once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most once 可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once(刚好一次) 语义。在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

0.11 版本的 Kafka,引入了一项重大特性:幂等性。幂等性结合 At Least once 语义,就构成了 Kafka 的Exactly once 语义。即:
At Least once + 幂等性 = Exactly Once
Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。

事务

幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。Kafka 事务是 2017 年 Kafka 0.11 引入的新特性。类似于数据库的事务。Kafka 事务指的是在 Exactly once 语义的基础上,生产和消费可以跨分区和会话,生产者生产消息以及消费者提交 offset 的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交偏移量o2之前挂掉了(假设它最近提交的偏移量是o1),此时执行再均衡时,其它消费者会重复消费消息(o1到o2之间的消息)。

事务的应用情况

在一个原子操作中,根据包含的操作类型,可以分为三种情况:

只有Producer生产消息;消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的“consume-transform-produce ”模式只有consumer消费消息

前两种情况是事务引入的场景,最后一种情况没有使用价值(跟使用手动提交效果一样)。

相关属性配置

使用kafka的事务API 时的一些注意事项:

需要消费者的自动模式设置为false,并且不能子再手动的进行执行consumer#commitSync或者consumer#commitAsyc生产者配置transaction.id属性生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true消费者需要配置Isolation.level。在consume-trnasform-produce模式下使用事务时,必须设置为READ_COMMITTED。 Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的Transaction ID 获得原来的PID。
为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

要使用事务生产者和attendant API,必须设置transactional.id。如果设置了transactional.id,幂等性会和幂等所依赖的生产者配置一起自动启用。此外,应该对包含在事务中的topic进行耐久性配置。特别是,replication.factor应该至少是3,而且这些topic的min.insync.replicas应该设置为2。最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交的消息。

transactional.id的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。

所有新的事务性API都是阻塞的,并且会在失败时抛出异常。

Producer接口中有关事务的方法定义

//producer提供的事务方法 public void initTransactions();public void beginTransaction() throws ProducerFencedException ;public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException ;public void commitTransaction() throws ProducerFencedException;public void abortTransaction() throws ProducerFencedException ;

创建生产者

配置transactional.id属性

public static Producer createProducer() { Properties properties = new Properties(); //配置文件里面的变量都是静态final类型的,并且都有默认的值 //用于建立与 kafka 集群连接的 host/port //继承的hashtable,保证了线程安全 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG,"1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432"); properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000"); //将消息发送到kafka server, 所以肯定需要用到序列化的操作 我们这里发送的消息是string类型的,所以使用string的序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //设置事务ID 如果配置了transactional.id属性,则enable.idempotence 会被设置为true. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transactional-id"); return new KafkaProducer<>(properties);}

创建消费者

将配置中的自动提交属性(auto.commit)进行关闭而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )设置isolation.level

public static Consumer createConsumer() { Properties properties = new Properties(); // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开 properties.put("bootstrap.servers", "IP:9092"); // 消费者群组 properties.put("group.id", "groupxt"); // 设置隔离级别 properties.put("isolation.level","read_committed"); // 关闭自动提交 properties.put("enable.auto.commit", "false"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer(properties);}

只有Producer生产消息

public static void onlyProduceInTransaction(){ Producer producer = ProducerTransaction.createProducer(); // 1.初始化事务 producer.initTransactions(); try { // 2.开启事务 producer.beginTransaction(); // 3.kafka写操作集合 // 3.1 do业务逻辑 // 3.2 发送消息 // 消息对象 - ProducerRecoder for(int i=0;i<10;i++){ ProducerRecord record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i); //就是多传入一个回调实例 producer.send(record, new Callback() { @Override public void onCompletion(Recordmetadata recordmetadata, Exception e) { System.out.println( "partition : "+recordmetadata.partition()+" , offset : "+recordmetadata.offset()); } }); } // 3.3 do其他业务逻辑,还可以发送其他topic的消息。 // 4.事务提交 producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. // 5.放弃事务 producer.abortTransaction(); }finally{ // 所有的通道打开都需要关闭 close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,避免这个程序的进程突然挂掉,然后内存里面的消息丢失,所以这个方法结束的时候,将消息数据都发送出去 producer.close(); }}

只有consumer消费消息

public static void onlyConsumeInTransaction() { // 1.构建上产者 Producer producer = ProducerTransaction.createProducer(); // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作 producer.initTransactions(); // 3.构建消费者和订阅主题 Consumer consumer = createConsumer(); consumer.subscribe(Arrays.asList("xt")); while (true) { // 4.开启事务 producer.beginTransaction(); // 5.1 接受消息 Duration duration = Duration.ofMillis(500); ConsumerRecords records = consumer.poll(duration); try { // 5.2 do业务逻辑; System.out.println("customer Message---"); Map commits = new HashMap<>(); for (ConsumerRecord record : records) { // 5.2.1 处理消息 print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value()); // 5.2.2 记录提交偏移量 commits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndmetadata(record.offset())); } // 6.提交偏移量 producer.sendOffsetsToTransaction(commits, "groupxt"); // 7.事务提交 producer.commitTransaction(); }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. // 8.放弃事务 System.out.println(e.getMessage()); producer.abortTransaction(); }finally{ producer.flush(); } }}

消费消息和生产消息并存(consume-transform-produce)

在一个事务中,既有生产消息操作又有消费消息操作,即常说的Consume-tansform-produce模式。如下实例代码

public static void consumeTransferProduce() { // 1.构建上产者 Producer producer = ProducerTransaction.createProducer(); // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作 producer.initTransactions(); // 3.构建消费者和订阅主题 Consumer consumer = createConsumer(); consumer.subscribe(Arrays.asList("xt")); while (true) { // 4.开启事务 producer.beginTransaction(); // 5.1 接受消息 Duration duration = Duration.ofMillis(5000); ConsumerRecords records = consumer.poll(duration); System.out.println(records.count()); try { // 5.2 do业务逻辑; System.out.println("customer Message---"); Map commits = new HashMap<>(); for (ConsumerRecord record : records) { // 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %sn", record.offset(), record.key(), record.value()); // 5.2.2 记录提交的偏移量 commits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndmetadata(record.offset())); // 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息 producer.send(new ProducerRecord("xt", "data")); } // 7.提交偏移量 producer.sendOffsetsToTransaction(commits, "groupxt"); // 8.事务提交 producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. // 7.放弃事务 producer.abortTransaction(); }finally{ producer.flush(); } }}

Consumer事务

上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其是无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

相关配置文件字段 Broker configs 配置项描述transactional.id.timeout.mstransaction coordinator没有从生产者哪里接收到任何事务状态更新的等待时间, 过了时间之后会主动地使生产者。
Transactional Id过期默认为604800000(7天)。这允许制作人每周定期工作来维护他们的idmax.transaction.timeout.ms事务允许的最大超时时间。
如果客户端请求的事务时间超过这个值,那么broker将在InitPidRequest中返回一个InvalidTransactionTimeout错误。这可以防止客户端超时时间过长,否则会导致客户读取事务中包含的Topic 时出现停顿。
默认为900000(15分钟)。这是需要发送事务消息的时间的保守上限。transaction.state.log.replication.factor事务状态主题的副本数量。默认为3transaction.state.log.num.partitions事务状态主题的分区数。默认为50transaction.state.log.min.isr事务状态主题的每个分区需要考虑在线的insync副本的最小数量。默认为2transaction.state.log.segment.bytes事务状态主题的段大小。Default: 104857600 bytes.Producer configs 配置项描述enable.idempotence是否启用幂等(默认为false)。
如果禁用,生产者将不设置PID字段在生产请求和当前生产者交付语义中。注意,为了使用事务,必须启用幂等性。当幂等性启用时,我们强制acks=all,retries > 1,并且max、flight.requests.per.connection=1。 如果这些配置没有这些值,我们就不能保证幂等性。
如果这些设置没有被应用程序显式覆盖,当幂等功能启用时,生产者将设置acks=all, retries=Integer.MAX_VALUE, max.inflight.requests.per.connection=1transaction.timeout.ms事务协调器在主动中止正在进行的事务之前,等待事务状态更新的最长时间,这个配置值将与InitPidRequest一起发送到事务协调器。
如果该值大于代理中设置的max.transaction.timeout.ms,则请求将失败,并出现InvalidTransactionTimeout错误。
默认是60000ms。这使得事务不会阻塞下游消费超过一分钟,这在实时应用中通常是允许的。transactional.id用于事务交付的TransactionalId。
这支持跨多个生产者会话的可靠性语义,因为它允许客户端保证使用相同TransactionalId的事务在启动任何新事务之前已经完成。
如果没有提供TransactionalId,那么生产者将被限制为幂等交付。
如果配置了transactional.id属性,则enable.idempotence 会被设置为true.Consumer configs 配置项描述isolation.level(default is read_uncommitted)
read_uncommitted:按照偏移量顺序使用已提交和未提交的消息。
read_committed:只使用非事务性消息或按偏移顺序提交的事务性消息。为了保持偏移顺序,这个设置意味着我们必须在消费者中缓冲消息,直到我们看到给定事务中的所有消息。幂等性和事务性的关系

事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

幂等性引入了Porducer ID事务属性引入了Transaction Id属性

transactionalId 、producerId 和 producerEpoch

一个app有一个tid,同一个应用的不同实例PID是一样的,只是epoch的值不同。

对于同一个事务ID,先保证epoch小的producer执行init-transaction和committransaction,然后epoch较大的procuder才能开始执行init-transaction和commit-transaction,如下顺序:

有了transactionId后,Kafka可保证:

跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作。kafka保证了关联同一个事务的所有producer(一个应用有多个实例)必须按照顺序初始化事务、和提交事务,否则就会有问题,这保证了同一事务ID中消息是有序的(不同实例得按顺序创建事务和提交事务)。

spring-kafka的事务设置

kafka是跨Session的数据幂等发送,即如果应用部署多个实例时常会遇到“org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch、Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.”,必须保证这些实例生成者的提交事务顺序和创建顺序保持一致才可以,否则就无法成功。其实,在实践中,我们更多的是如何实现对应用单实例的事务性。可以通过spring-kafaka实现思路来学习,即每次创建生成者都设置一个不同的transactionId的值,如下代码:

====================================类名:ProducerFactoryUtils==================================== public static KafkaResourceHolder getTransactionalResourceHolder( final ProducerFactory producerFactory) { Assert.notNull(producerFactory, "ProducerFactory must not be null"); // 1.对于每一个线程会生成一个唯一key,然后根据key去查找resourceHolder @SuppressWarnings("unchecked") KafkaResourceHolder resourceHolder = (KafkaResourceHolder) TransactionSynchronizationManager .getResource(producerFactory); if (resourceHolder == null) { // 2.创建一个消费者 Producer producer = producerFactory.createProducer(); // 3.开启事务 producer.beginTransaction(); resourceHolder = new KafkaResourceHolder(producer); bindResourceToTransaction(resourceHolder, producerFactory); } return resourceHolder; }

在spring-kafka中,对于一个线程创建一个producer,事务提交之后,还会关闭这个producer并清除,后续同一个线程或者新的线程重新执行事务时,此时就会重新创建producer。

创建消费者代码

====================================类名:DefaultKafkaProducerFactory====================================protected Producer createTransactionalProducer() {Producer producer = this.cache.poll();if (producer == null) {Map configs = new HashMap<>(this.configs); // 对于每一次生成producer时,都设置一个不同的transactionIdconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());producer = new KafkaProducer(configs, this.keySerializer, this.valueSerializer); // 1.初始化话事务。producer.initTransactions();return new CloseSafeProducer(producer, this.cache);}else {return producer;}}

Consume-transform-Produce 的流程图

流程1 :查找Tranaction Corordinator。
Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址。

流程2:初始化事务 initTransaction
Producer发送InitpidRequest给事务协调器,获取一个Pid。InitpidRequest的处理过程是同步阻塞的,一旦该调用正确返回,Producer就可以开始新的事务。TranactionalId通过InitpidRequest发送给Tranciton Corordinator,然后在Tranaciton Log中记录这的映射关系。除了返回PID之外,还具有如下功能:

对PID对应的epoch进行递增,这样可以保证同一个app的不同实例对应的PID是一样的,但是epoch是不同的。
回滚之前的Producer未完成的事务(如果有)

流程3: 开始事务beginTransaction
执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。

注意:这个操作并没有通知Transaction Coordinator。

流程4: Consume-transform-produce loop

流程4.0: 通过Consumtor消费消息,处理业务逻辑

流程4.1: producer向TransactionCordinantro发送AddPartitionsToTxnRequest
在producer执行send操作时,如果是第一次给发送数据,此时会向Trasaction Corrdinator发送一个AddPartitionsToTxnRequest请求,Transaction Corrdinator会在transaction log中记录下tranasactionId和一个映射关系,并将状态改为begin。AddPartionsToTxnRequest的数据结构如下:

AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]] TransactionalId => string PID => int64 Epoch => int16 Topic => string Partition => int32

流程4.2: producer#send发送 ProduceRequst
生产者发送数据,虽然没有还没有执行commit或者absrot,但是此时消息已经保存到kafka上,而且即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。

流程4.3: AddOffsetCommitsToTxnRequest
Producer通过KafkaProducer.sendOffsetsToTransaction 向事务协调器器发送一个AddOffesetCommitsToTxnRequests:

AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID TransactionalId => string PID => int64 Epoch => int16 ConsumerGroupID => string

在执行事务提交时,可以根据ConsumerGroupID来推断_customer_offsets主题中相应的TopicPartions信息。这样在

流程4.4: TxnOffsetCommitRequest
Producer通过KafkaProducer.sendOffsetsToTransaction还会向消费者协调器Cosumer Corrdinator发送一个TxnOffsetCommitRequest,在主题_consumer_offsets中保存消费者的偏移量信息。

TxnOffsetCommitRequest => ConsumerGroupID PID Epoch RetentionTime OffsetAndmetadata ConsumerGroupID => string PID => int64 Epoch => int32 RetentionTime => int64 OffsetAndmetadata => [TopicName [Partition Offset metadata]] TopicName => string Partition => int32 Offset => int64 metadata => string

流程5: 事务提交和事务终结(放弃事务)
通过生产者的commitTransaction或abortTransaction方法来提交事务和终结事务,这两个操作都会发送一个EndTxnRequest给Transaction Coordinator。

流程5.1:EndTxnRequest。Producer发送一个EndTxnRequest给Transaction Coordinator,然后执行如下操作:

Transaction Coordinator会把PREPARE_COMMIT or PREPARE_ABORT
消息写入到transaction log中记录执行流程5.2执行流程5.3

流程5.2:WriteTxnMarkerRequest

WriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker [Topic [Partition]]] CoordinatorEpoch => int32 PID => int64 Epoch => int16 Marker => boolean (false(0) means ABORT, true(1) means COMMIT) Topic => string Partition => int32

对于Producer生产的消息。Tranaction Coordinator会发送WriteTxnMarkerRequest给当前事务涉及到每个的leader,leader收到请求后,会写入一个COMMIT(PID) 或者 ABORT(PID)的控制信息到data log中对于消费者偏移量信息,如果在这个事务里面包含_consumer-offsets主题。Tranaction
Coordinator会发送WriteTxnMarkerRequest给Transaction Coordinartor,Transaction Coordinartor收到请求后,会写入一个COMMIT(PID) 或者ABORT(PID)的控制信息到 data log中。

流程5.3:Transaction Coordinator会将最终COMPLETE_COMMIT或COMPLETE_ABORT消息写入Transaction Log中以标明该事务结束。

只会保留这个事务对应的PID和timstamp。然后把当前事务其他相关消息删除掉,包括PID和tranactionId的映射关系。 References:

https://blog.csdn.net/weixin_44758876/article/details/120195566https://www.cnblogs.com/fnlingnzb-learner/p/13646390.htmlhttps://www.orchome.com/303https://blog.csdn.net/looo000ngname/article/details/107183144http://www.heartthinkdo.com/?p=2040#4

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。