本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket),在此基础上增加输出结果到mysql。
1、添加依赖
mysql mysql-connector-java 8.0.18
2、测试代码
package com.demo.mysql;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.util.Collector;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class FlinkMySqlSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream dataStream = env.socketTextStream("192.168.0.181",9000); SingleOutputStreamOperator flatMap = dataStream.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String[] strings = value.split(" "); for (String s : strings) { out.collect(s); } } }); SingleOutputStreamOperator> map = flatMap.map(new MapFunction>() { @Override public Tuple2 map(String value) throws Exception { return Tuple2.of(value, 1); } }); SingleOutputStreamOperator> sum = map.keyBy("f0").sum(1); DataStream result = sum.map(new MapFunction, String>() { @Override public String map(Tuple2 data) throws Exception { return data.f0 + ":" + data.f1; } }); result.addSink(new MyJdbcSink()); result.print(); env.execute(); } public static class MyJdbcSink extends RichSinkFunction { Connection connection = null; PreparedStatement insertStat = null; PreparedStatement updateStat = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8", "username", "password"); insertStat = connection.prepareStatement("insert into tbl_flink_wordcount(word_count, word_name) values (?, ?)"); updateStat = connection.prepareStatement("update tbl_flink_wordcount set word_count = ? where word_name = ?"); } @Override public void invoke(String value, Context context) throws Exception { String vals[] = value.split(":"); String wordName = vals[0]; Integer wordCount = Integer.parseInt(vals[1]); updateStat.setInt(1, wordCount); updateStat.setString(2, wordName); updateStat.execute(); // 如果更新失败,进行添加 if(updateStat.getUpdateCount() == 0) { insertStat.setInt(1, wordCount); insertStat.setString(2, wordName); insertStat.execute(); } } @Override public void close() throws Exception { insertStat.close(); updateStat.close(); connection.close(); } }}
这里的mysql版本是8.0.14,连接的时候需要指定时区。
3、启动程序,执行测试
在nc输入测试字符串:
[test@bogon ~]# nc -l 9000hello worldhello flinkhello mysql
在idea看到统计结果:
hello:1world:1hello:2flink:1hello:3mysql:1
在mysql数据库test中,看到对应的数据。
可以看到,这里的结果,对于相同单词的数据统计,进行的是更新操作。
数据库建表脚本:
CREATE TABLE `tbl_flink_wordcount` ( `id` int(11) NOT NULL AUTO_INCREMENT, `word_name` varchar(64) DEFAULT NULL, `word_count` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;