仅适用于Blink 2.0及以上版本。仅适用于独享模式。Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。二进制数据不支持本地调试,语法检查没有问题请进行线上调试。
从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。
DDL定义Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。
create table kafka_stream( --必须和Kafka源表中的5个字段的顺序和类型保持一致。 messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT ) with ( type ='kafka010', topic = '
通用配置 参数 注释说明 是否必选 备注 typeKafka对应版本是Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。topic读取的单个topic是无topicPattern读取一批topic的表达式否Topic用竖线(|)分隔。例如:topic1|topic2|topic3。startupMode启动位点否启动位点取值如下:
GROUP_OFFSETS(默认值):根据Group读取。
EARLIEST:从Kafka最早分区开始读取。
LATEST:从Kafka最新位点开始读取。
TIMESTAMP:从指定的时间点读取。partitionDiscoveryIntervalMS定时检查是否有新分区产生否Kafka 08版本:系统默认开启该功能。
Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。extraConfig额外的KafkaConsumer配置项目否不在可选配置项中,但是期望额外增加的配置。
Kafka08配置
Kafka08必选配置
可选配置Key
consumer.idsocket.timeout.msfetch.message.max.bytesnum.consumer.fetchersauto.commit.enableauto.commit.interval.msqueued.max.message.chunksrebalance.max.retriesfetch.min.bytesfetch.wait.max.msrebalance.backoff.msrefresh.leader.backoff.msauto.offset.resetconsumer.timeout.msexclude.internal.topicspartition.assignment.strategyclient.idzookeeper.session.timeout.mszookeeper.connection.timeout.mszookeeper.sync.time.msoffsets.storageoffsets.channel.backoff.msoffsets.channel.socket.timeout.msoffsets.commit.max.retriesdual.commit.enabledpartition.assignment.strategysocket.receive.buffer.bytesfetch.min.bytes
Kafka09/Kafka010/Kafka011配置
Kafka09/Kafka010/Kafka011必选配置
Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。
Kafka09Kafka010Kafka011
当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加security.protocol、sasl.mechanism和sasl.jaas.config3个参数,示例如下。
create table kafka_stream( messageKey varbinary, `message` varbinary, topic varchar, `partition` int, `offset` bigint) with ( type ='kafka010', topic = '
场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。
Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。
{ "name":"Alice", "age":13, "grade":"A"}
方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK
Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。
CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT) WITH ( type = 'kafka010' --请参见Kafka版本对应关系。);CREATE TABLE rds_sink ( `name` VARCHAR, age VARCHAR, grade VARCHAR) WITH( type='rds');CREATE VIEW input_view AS SELECt CAST(`message` as VARCHAR ) as `message`FROM kafka_src;INSERT INTO rds_sinkSELECt JSON_VALUE(`message`,'$.name'), JSON_VALUE(`message`,'$.age'), JSON_VALUE(`message`,'$.grade')FROM input_view;
方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink
针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。
SQL
-- 定义解析Kafka message的UDTF。CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF';-- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT) WITH ( type = 'kafka010', --请参见Kafka版本对应关系。 topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3');CREATE TABLE rds_sink ( name VARCHAR, age INT, grade VARCHAR, updateTime TIMESTAMP) WITH( type='rds', url='jdbc:mysql://localhost:3306/test', tableName='test4', userName='test', password='
UDTF
Blink 2.2.4版本Maven依赖,示例如下。
package com.alibaba;import com.alibaba.fastjson.JSONObject;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.DataTypes;import org.apache.flink.types.Row;import java.io.UnsupportedEncodingException;import java.sql.Timestamp;public class kafkaUDTF extends TableFunction
场景2:从Kafka读取数据,输入实时计算Flink版进行窗口计算。
按照实时计算Flink版目前的设计,滚动或滑动等窗口操作,必须在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以Kafka中message字段中的Event Time进行窗口操作,需要先从message字段使用UDX解析出Event Time,才能定义Watermark。在Kafka源表场景中,需要使用计算列。例如Kafka中写入数据:2018-11-11 00:00:00|1|Anna|female 。计算流程为:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink。
方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK
Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。
CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT, ts as to_timestamp(json_value(cast(`message` as VARCHAR ),'$.nodes.time')), WATERMARK wk FOR ts as withOffset(ts, 2000)) WITH (type = 'kafka' --请参见Kafka版本对应关系。);CREATE TABLE rds_sink ( starttime TIMESTAMP , endtime TIMESTAMP , `message` BIGINT ) WITH (type = 'rds');INSERT INTO rds_sinkSELECT TUMBLE_START(ts, INTERVAL '1' MINUTE), TUMBLE_END(ts, INTERVAL '1' MINUTE), count(`message`)FROM kafka_srcGROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
方法2:Kafka SOURCE->UDTF->Realtime Compute for Apache Flink->RDS SINK
SQL
-- 定义解析Kafka message的UDTF。CREATE FUNCTION kafkapaser AS 'com.alibaba.kafkaUDTF';CREATE FUNCTION kafkaUDF AS 'com.alibaba.kafkaUDF';-- 定义源表,注意:Kafka源表DDL字段必须与以下示例一模一样。WITH中参数可改。create table kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT, ctime AS TO_TIMESTAMP(kafkaUDF(`message`)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意:计算列的类型必须为TIMESTAMP才能创建Watermark。 watermark for `ctime` as withoffset(`ctime`,0) -- 在计算列上定义Watermark。) WITH ( type = 'kafka010', -- 请参见Kafka版本对应关系。 topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3');create table rds_sink ( `name` VARCHAR, age INT, grade VARCHAR, updateTime TIMESTAMP) WITH( type='rds', url='jdbc:mysql://localhost:3306/test', tableName='test4', userName='test', password='
UDF&UDTF
Blink 2.2.4版本Maven依赖,示例如下。
UDTF
package com.alibaba;import com.alibaba.fastjson.JSONObject;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.DataTypes;import org.apache.flink.types.Row;import java.io.UnsupportedEncodingException;public class kafkaUDTF extends TableFunction
UDF
package com.alibaba;package com.hjc.test.blink.sql.udx;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;public class KafkaUDF extends ScalarFunction { // 可选,open方法可以不写。 // 需要import org.apache.flink.table.functions.FunctionContext; public String eval(byte[] message) { // 读入一个二进制数据,并将其转换为String格式。 String msg = new String(message, "UTF-8"); return msg.split('\|')[0]; } public long eval(String b, String c) { return eval(b) + eval(c); } //可选,close方法可以不写。 @Override public void close() { }}
自建Kafka
示例
create table kafka_stream( messageKey VARBINARY, `message` VARBINARY, topic varchar, `partition` int, `offset` bigint) with ( type ='kafka011', topic = 'kafka_01', `group.id` = 'CID_blink', bootstrap.servers = '192.168.0.251:****');
WITH参数
bootstrap.servers参数需要填写自建的地址和端口号。
仅在Blink 2.2.6及以上版本支持阿里云Kafka或自建Kafka显示TPS和RPS等指标信息。