把Checkpoint开启,设置Checkpoint模式为EXACTLY_ONCE
env.enableCheckpointing(1000*10L);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
KafkaSource 当Flink开启Checkpoint时,Kafka的offset会在Checkpoint的时候,把偏移量保存到状态后端(也会提交到kafka中一份)。 注意,在这个场景中,Kafka设置的Properties 中的自动定期 offset 提交设置会被完全忽略。
注意:Kafka source 不依赖于 broker 上提交的位点来恢复失败的作业。提交位点只是为了上报 Kafka consumer 和消费组的消费进度,以在 broker 端进行监控。
DataStream stream = …;Properties properties = new Properties(); properties.setProperty(“bootstrap.servers”, “localhost:9092”);properties.setProperty(“transaction.timeout.ms”,1000*60*5+"");FlinkKafkaProducer myProducer = new FlinkKafkaProducer( “my-topic”, // 目标 topic new SimpleStringSchema(), // 序列化 schema properties, // producer 配置 FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 容错stream.addSink(myProducer);
1、设置FlinkKafkaProducer的语义为EXACTLY_onCE(默认为AT_LEAST_ONCE)
2、设置Kafka的事务隔离级别isolation.level = read_committed(默认为read_uncommitted)
3、设置transaction.timeout.ms小于15分钟
注意:默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_onCE 模式之前应该增加 transaction.max.timeout.ms 的值。