欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink自定义实现端到端的exactly-once语义(java版本)

时间:2023-06-18

Flink 中有两个 Exactly-once 语义实现,一个是 Kafka,另一个是 StreamingFileSink。

参考他们实现的逻辑,来自定义实现端到端exactly-once语义。


分析:

Flink的checkpoint机制(通过Chandy-Lamport):

JobManager的CheckpointCoordinator通过在stream中添加barrier,当barrier前的数据的所有operator的checkpoint都操作完成并返回CheckpointCoordinator,才代表此次checkpoint执行完成;

checkpoint机制可以保证不丢数据,因为每次恢复的时候都是从最后一次checkpoint成功的地方开始处理,这样可能会重复处理某些数据,实现了at-least-once,没法做到exactly-once语义;

flink提供了TwoPhaseCommit两阶段提交机制:pre-commit预提交和commit正式提交,其中pre-commit不是真正提交了,可以回滚的,当两次checkpoint间某operator挂了,此时sink端预提交的数据操作会被回滚,然后从最后一次checkpoint成功的地方开始处理,实现了exactly-once语义。


实现:

具体实现主要是通过继承TwoPhaseCommitSinkFunction,重写里面的方法,关闭mysql自动提交,在commit()方法中真正提交,abort()方法中rollback

主要代码如下:

public class TwoPhaseCommitMysqlConsumerDemo { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置ck属性 env.setStateBackend(new FsStateBackend("hdfs://zcx1:9000:/flink/ck")); env.enableCheckpointing(10000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setFailonCheckpointingErrors(false); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); Properties properties=new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092"); //设置读取已提交的数据 properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed"); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx2222", new SimpleStringSchema(),properties); DataStreamSource dataStreamSource = env.addSource(kafka); SingleOutputStreamOperator sum = dataStreamSource.flatMap(new FlatMapFunction() { @Override public void flatMap(String s, Collector collector) throws Exception { String[] split = s.split(" "); for (String ss : split) { collector.collect(new WC(ss, 1)); } } }).keyBy("word").sum("num"); sum.print(); TwoPhaseCommitSinkFunction twoPhaseCommitSinkFunction = new TwoPhaseCommitSinkFunction(new KryoSerializer(MyConnection.class, new ExecutionConfig()), new VoidSerializer()) { @Override protected void invoke(MyConnection myconnection, WC wc, Context context) throws Exception { PreparedStatement preparedStatement = myconnection.connection.prepareStatement("insert into wc values(?,?) on duplicate key update num=?"); preparedStatement.setString(1, wc.word); preparedStatement.setInt(2, wc.num); preparedStatement.setInt(3, wc.num); preparedStatement.executeUpdate(); preparedStatement.close(); } @Override protected MyConnection beginTransaction() throws Exception { Class.forName("com.mysql.jdbc.Driver"); Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root"); //关闭自动提交 connection.setAutoCommit(false); return new MyConnection(connection); } @Override protected void preCommit(MyConnection myconnection) throws Exception { //invoke中完成 } @Override protected void commit(MyConnection myconnection) { try { myconnection.connection.commit(); myconnection.connection.close(); } catch (SQLException e) { e.printStackTrace(); } } @Override protected void abort(MyConnection myconnection) { try { myconnection.connection.rollback(); myconnection.connection.close(); } catch (SQLException e) { e.printStackTrace(); } } }; sum.addSink(twoPhaseCommitSinkFunction); env.execute(); }}class MyConnection{ transient Connection connection; public MyConnection(Connection connection) { this.connection = connection; }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。