流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将您详细介绍如何使用自定义标量函数(UDF),对随机产生的数据进行处理后存入 MySQL 中。
前置准备 创建流计算 Oceanus 集群进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
创建 MySQL 实例进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。进入实例后,单击右上角【登陆】即可登陆 MySQL 数据库。
创建 MySQL 表
-- 建表语句,用于接收 Sink 端数据CREATE TABLE `udf_output` ( `id` int(10) NOT NULL, `len_name` int(10) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8
开发 UDF
这里定义一个获取字符串字段长度的函数。如果传入一个字段,则获取这个字段的长度后返回;如果传入两个字段,则获取这两个字段的长度和后返回。
1、代码编写
在本地IDE中创建 maven 项目,编写自定义函数UDF的代码。
// 类名:StringLengthUdfpackage demos.UDF;import org.apache.flink.table.functions.ScalarFunction;public class StringLengthUdf extends ScalarFunction { public long eval(String a) { return a == null ? 0 : a.length(); } public long eval(String b, String c) { return eval(b) + eval(c); }}
2、项目打包
使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:
mvn clean package
命令行打包后生成的 JAR 包可以在项目 target 目录下找到。
流计算 Oceanus 作业 上传依赖在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 JAR 包。
创建 SQL 作业在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。单击【作业参数】,在【引用程序包】处选择刚才上传的 JAR 包。
1、创建 Function
CREATE TEMPORARY SYSTEM FUNCTION StringLengthUdf AS 'demos.UDF.StringLengthUdf';
StringLengthUdf代表创建的函数名,demos.UDF.StringLengthUdf代表类路径。
2、创建 Source
CREATE TABLE random_source (id INT,name1 VARCHAR,name2 VARCHAR ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- 每秒产生的数据条数 'fields.id.kind'='sequence', -- 无界的随机数 'fields.id.start'='1', -- 随机数的最小值 'fields.id.end'='5', -- 随机数的最大值 'fields.name1.length'='10', -- 随机字符串的长度 'fields.name2.length'='10' -- 随机字符串的长度);
3、创建 Sink
CREATE TABLE `jdbc_upsert_sink_table` ( `id` INT, `len_name` INT, PRIMARY KEY(id) NOT ENFORCED) WITH ( -- 指定数据库连接参数 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数 'table-name' = 'udf_output', -- 需要写入的数据表 'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'xxxxxxxxx', -- 数据库访问的密码 'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数 'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔);
4、编写业务 SQL
INSERT INTO jdbc_upsert_sink_tableSELECtid,CAST(StringLengthUdf(name1,name2) AS INT) AS `len_name`FROM random_source;
运行作业
点击【发布草稿】->【运行版本】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。
总结本文首先在本地开发 UDF 函数,将其打成 JAR 包后上传到 Oceanus 平台引用。接下来使用 Datagen 连接器产生虚拟数据,调用 UDF 函数进行不同字段的字符串长度的加和操作后存入 MySQL 中。
自定义标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。
UDF 需要在 ScalarFunction 类中实现 eval 方法,且必须声明为 public 类型;自定义函数中 open 方法和 close 方法可选;可被重载,即在一个 UDF 中实现多个 eval 方法。
参考链接[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[3] MySQL 控制台:https://console.cloud.tencent.com/cdb
[4] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433