pyflink用jdbc连接mysql例子
准备:
面向 flink1.13.5
安装模块:python -m pip install apache-flink==1.13.5
添加 mysql 的 jdbc jar 插件、pyflink 的 flink-connector-jdbc_2.11 两个插件
到pyflink模块的 lib 文件夹(注意mysql的jdbc版本、pyflink的路径、flink的版本)
${PYTHON_HOME} Python37site-packagespyflinklib
创建mysql表
CREATE TABLE `print_table` ( `f0` int(11) DEFAULT NULL, `f1` int(11) DEFAULT NULL, `f2` varchar(500) DEFAULT NULL)`
具体python代码(注意 jdbc url 的 hostname、database 和 user、password根据自己的进行更改)
from pyflink.table import EnvironmentSettings, TableEnvironment# 1、创建 TableEnvironmentenv_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()table_env = TableEnvironment.create(env_settings)# 2、创建 source 表table_env.execute_sql("""CREATE TABLE source_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5' )""")# 3、创建 sink 表table_env.execute_sql("""CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://hostname:3306/test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', 'username'='root', 'password'='password', 'table-name' = 'print_table' )""")# 或者通过 SQL 查询语句来写入 sink 表:table_env.execute_sql("insert into print_table select f0,f1,f2 from source_table").wait(60000)# wait(60000)的60000是超时时间,60000毫秒,即60秒,这个可以根据自己的需求进行更改
转载说明出处