欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink和Kafka端到端数据一致性

时间:2023-05-02
Flink内部

把Checkpoint开启,设置Checkpoint模式为EXACTLY_ONCE

env.enableCheckpointing(1000*10L);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

KafkaSource

当Flink开启Checkpoint时,Kafka的offset会在Checkpoint的时候,把偏移量保存到状态后端(也会提交到kafka中一份)。 注意,在这个场景中,Kafka设置的Properties 中的自动定期 offset 提交设置会被完全忽略。
注意:Kafka source 不依赖于 broker 上提交的位点来恢复失败的作业。提交位点只是为了上报 Kafka consumer 和消费组的消费进度,以在 broker 端进行监控。

KafkaSink

DataStream stream = …;Properties properties = new Properties(); properties.setProperty(“bootstrap.servers”, “localhost:9092”);properties.setProperty(“transaction.timeout.ms”,1000*60*5+"");FlinkKafkaProducer myProducer = new FlinkKafkaProducer( “my-topic”, // 目标 topic new SimpleStringSchema(), // 序列化 schema properties, // producer 配置 FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 容错stream.addSink(myProducer);

1、设置FlinkKafkaProducer的语义为EXACTLY_onCE(默认为AT_LEAST_ONCE)
2、设置Kafka的事务隔离级别isolation.level = read_committed(默认为read_uncommitted)
3、设置transaction.timeout.ms小于15分钟
注意:默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_onCE 模式之前应该增加 transaction.max.timeout.ms 的值。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。