欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flink1.10.1java版本jdbcsink写入处理结果到mysql

时间:2023-06-26

本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket),在此基础上增加输出结果到mysql。

1、添加依赖

mysql mysql-connector-java 8.0.18

2、测试代码

package com.demo.mysql;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.util.Collector;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class FlinkMySqlSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream dataStream = env.socketTextStream("192.168.0.181",9000); SingleOutputStreamOperator flatMap = dataStream.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String[] strings = value.split(" "); for (String s : strings) { out.collect(s); } } }); SingleOutputStreamOperator> map = flatMap.map(new MapFunction>() { @Override public Tuple2 map(String value) throws Exception { return Tuple2.of(value, 1); } }); SingleOutputStreamOperator> sum = map.keyBy("f0").sum(1); DataStream result = sum.map(new MapFunction, String>() { @Override public String map(Tuple2 data) throws Exception { return data.f0 + ":" + data.f1; } }); result.addSink(new MyJdbcSink()); result.print(); env.execute(); } public static class MyJdbcSink extends RichSinkFunction { Connection connection = null; PreparedStatement insertStat = null; PreparedStatement updateStat = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8", "username", "password"); insertStat = connection.prepareStatement("insert into tbl_flink_wordcount(word_count, word_name) values (?, ?)"); updateStat = connection.prepareStatement("update tbl_flink_wordcount set word_count = ? where word_name = ?"); } @Override public void invoke(String value, Context context) throws Exception { String vals[] = value.split(":"); String wordName = vals[0]; Integer wordCount = Integer.parseInt(vals[1]); updateStat.setInt(1, wordCount); updateStat.setString(2, wordName); updateStat.execute(); // 如果更新失败,进行添加 if(updateStat.getUpdateCount() == 0) { insertStat.setInt(1, wordCount); insertStat.setString(2, wordName); insertStat.execute(); } } @Override public void close() throws Exception { insertStat.close(); updateStat.close(); connection.close(); } }}

这里的mysql版本是8.0.14,连接的时候需要指定时区。 

3、启动程序,执行测试

在nc输入测试字符串:

[test@bogon ~]# nc -l 9000hello worldhello flinkhello mysql

在idea看到统计结果:

hello:1world:1hello:2flink:1hello:3mysql:1

在mysql数据库test中,看到对应的数据。

 可以看到,这里的结果,对于相同单词的数据统计,进行的是更新操作。

数据库建表脚本:

CREATE TABLE `tbl_flink_wordcount` ( `id` int(11) NOT NULL AUTO_INCREMENT, `word_name` varchar(64) DEFAULT NULL, `word_count` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。