欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink写入数据到MySQL案例

时间:2023-04-17
案例准备:

1、启动MySQL,在mysql中创建数据库flinkdb,并创建表sensor_temp

CREATE TABLE sensor_temp ( id varchar(32), temp double)

代码实现:

def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream: DataStream[SensorReading] = env.addSource(new MyDefSource) dataStream.addSink(new MyJdbcSinkFunction()) env.execute()}class MyJdbcSinkFunction extends RichSinkFunction[SensorReading]{ var connection: Connection =_ var insertStmt: PreparedStatement=_ var updateStmt: PreparedStatement=_ override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { updateStmt.setDouble(1,value.temperature) updateStmt.setString(2,value.id) updateStmt.execute() if(updateStmt.getUpdateCount == 0){ insertStmt.setString(1,value.id) insertStmt.setDouble(2,value.temperature) insertStmt.execute() } } override def open(parameters: Configuration): Unit = { connection = DriverManager.getConnection("jdbc:mysql://192.168.91.180:3306/flinkdb?useSSL=false", "root", "123123") insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) value(?,?)") updateStmt = connection.prepareStatement("update sensor_temp set temp=? where id=?") } override def close(): Unit = { insertStmt.close() updateStmt.close() connection.close() }

运行结果:

查询数据select * from sensor_temp;

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。