欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

pyflink用jdbc写入mysql例子

时间:2023-05-04

pyflink用jdbc连接mysql例子
准备:

软件:python3.7 pycharm

面向 flink1.13.5
安装模块:python -m pip install apache-flink==1.13.5
添加 mysql 的 jdbc jar 插件、pyflink 的 flink-connector-jdbc_2.11 两个插件
到pyflink模块的 lib 文件夹(注意mysql的jdbc版本、pyflink的路径、flink的版本)

${PYTHON_HOME} Python37site-packagespyflinklib

创建mysql表

CREATE TABLE `print_table` ( `f0` int(11) DEFAULT NULL, `f1` int(11) DEFAULT NULL, `f2` varchar(500) DEFAULT NULL)`

具体python代码

(注意 jdbc url 的 hostname、database 和 user、password根据自己的进行更改)

from pyflink.table import EnvironmentSettings, TableEnvironment# 1、创建 TableEnvironmentenv_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()table_env = TableEnvironment.create(env_settings)# 2、创建 source 表table_env.execute_sql("""CREATE TABLE source_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5' )""")# 3、创建 sink 表table_env.execute_sql("""CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://hostname:3306/test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', 'username'='root', 'password'='password', 'table-name' = 'print_table' )""")# 或者通过 SQL 查询语句来写入 sink 表:table_env.execute_sql("insert into print_table select f0,f1,f2 from source_table").wait(60000)# wait(60000)的60000是超时时间,60000毫秒,即60秒,这个可以根据自己的需求进行更改

转载说明出处

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。