欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

BlinkSQL之创建消息队列Kafka源表

时间:2023-07-24
创建消息队列Kafka源表 注意事项

仅适用于Blink 2.0及以上版本。仅适用于独享模式。Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。二进制数据不支持本地调试,语法检查没有问题请进行线上调试。

从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。

DDL定义

Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。

create table kafka_stream( --必须和Kafka源表中的5个字段的顺序和类型保持一致。 messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT ) with ( type ='kafka010', topic = '', `group.id` = '', ...);

WITH参数

通用配置 参数注释说明是否必选备注typeKafka对应版本是Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。topic读取的单个topic是无topicPattern读取一批topic的表达式否Topic用竖线(|)分隔。例如:topic1|topic2|topic3。startupMode启动位点否启动位点取值如下:
GROUP_OFFSETS(默认值):根据Group读取。
EARLIEST:从Kafka最早分区开始读取。
LATEST:从Kafka最新位点开始读取。
TIMESTAMP:从指定的时间点读取。partitionDiscoveryIntervalMS定时检查是否有新分区产生否Kafka 08版本:系统默认开启该功能。
Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。extraConfig额外的KafkaConsumer配置项目否不在可选配置项中,但是期望额外增加的配置。

Kafka08配置

Kafka08必选配置

参数注释说明是否必选group.id消费组ID是zookeeper.connectzk链接地址是

可选配置Key

consumer.idsocket.timeout.msfetch.message.max.bytesnum.consumer.fetchersauto.commit.enableauto.commit.interval.msqueued.max.message.chunksrebalance.max.retriesfetch.min.bytesfetch.wait.max.msrebalance.backoff.msrefresh.leader.backoff.msauto.offset.resetconsumer.timeout.msexclude.internal.topicspartition.assignment.strategyclient.idzookeeper.session.timeout.mszookeeper.connection.timeout.mszookeeper.sync.time.msoffsets.storageoffsets.channel.backoff.msoffsets.channel.socket.timeout.msoffsets.commit.max.retriesdual.commit.enabledpartition.assignment.strategysocket.receive.buffer.bytesfetch.min.bytes

Kafka09/Kafka010/Kafka011配置

Kafka09/Kafka010/Kafka011必选配置

参数注释说明group.id消费组IDbootstrap.serversKafka集群地址

Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。

Kafka09Kafka010Kafka011

当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加security.protocol、sasl.mechanism和sasl.jaas.config3个参数,示例如下。

create table kafka_stream( messageKey varbinary, `message` varbinary, topic varchar, `partition` int, `offset` bigint) with ( type ='kafka010', topic = '', `group.id` = '', ..., `security.protocol`='SASL_PLAINTEXT', `sasl.mechanism`='PLAIN', `sasl.jaas.config`='org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";');

Kafka版本对应关系 typeKafka版本Kafka080.8.22Kafka090.9.0.1Kafka0100.10.2.1Kafka0110.11.0.2及以上Kafka消息解析示例

场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。

Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。

{ "name":"Alice", "age":13, "grade":"A"}

方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK

Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。

CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT) WITH ( type = 'kafka010' --请参见Kafka版本对应关系。);CREATE TABLE rds_sink ( `name` VARCHAR, age VARCHAR, grade VARCHAR) WITH( type='rds');CREATE VIEW input_view AS SELECt CAST(`message` as VARCHAR ) as `message`FROM kafka_src;INSERT INTO rds_sinkSELECt JSON_VALUE(`message`,'$.name'), JSON_VALUE(`message`,'$.age'), JSON_VALUE(`message`,'$.grade')FROM input_view;

方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink

针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。

SQL

-- 定义解析Kafka message的UDTF。CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF';-- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT) WITH ( type = 'kafka010', --请参见Kafka版本对应关系。 topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3');CREATE TABLE rds_sink ( name VARCHAR, age INT, grade VARCHAR, updateTime TIMESTAMP) WITH( type='rds', url='jdbc:mysql://localhost:3306/test', tableName='test4', userName='test', password='');-- 使用UDTF,将二进制数据解析成格式化数据。CREATE VIEW input_view ( name, age, grade, updateTime) ASSELECT T.name, T.age, T.grade, T.updateTimeFROM kafka_src as S, LATERAL TABLE (kafkaparser (`message`)) as T ( name, age, grade, updateTime);-- 使用解析出的格式化数据进行计算,并将结果输出到RDS。INSERT INTO rds_sinkSELECT name, age, grade, updateTimeFROM input_view;

UDTF

Blink 2.2.4版本Maven依赖,示例如下。

org.apache.flink flink-core blink-2.2.4-SNAPSHOT provided org.apache.flink flink-streaming-java_2.11 blink-2.2.4-SNAPSHOT provided org.apache.flink flink-table_2.11 blink-2.2.4-SNAPSHOT provided com.alibaba fastjson 1.2.9

package com.alibaba;import com.alibaba.fastjson.JSONObject;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.DataTypes;import org.apache.flink.types.Row;import java.io.UnsupportedEncodingException;import java.sql.Timestamp;public class kafkaUDTF extends TableFunction { public void eval(byte[] message) { try { String msg = new String(message, "UTF-8"); try { JSONObject data = JSON.parseObject(msg); if (data != null) { String name = data.getString("name") == null ? "null" : data.getString("name"); Integer age = data.getInteger("age") == null ? 0 : data.getInteger("age"); String grade = data.getString("grade") == null ? "null" : data.getString("grade"); Timestamp updateTime = data.getTimestamp("updateTime"); Row row = new Row(4); row.setField(0, name); row.setField(1, age); row.setField(2, grade); row.setField(3,updateTime ); System.out.println("Kafka message str ==>" + row.toString()); collect(row); } } catch (ClassCastException e) { System.out.println("Input data format error、Input data " + msg + "is not json string"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override // 如果返回值是Row,重新加载UDTF这个方法,并指明系统返回的字段类型。 public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.createRowType(DataTypes.STRING, DataTypes.INT, DataTypes.STRING, DataTypes.TIMESTAMP); }}

场景2:从Kafka读取数据,输入实时计算Flink版进行窗口计算。

按照实时计算Flink版目前的设计,滚动或滑动等窗口操作,必须在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以Kafka中message字段中的Event Time进行窗口操作,需要先从message字段使用UDX解析出Event Time,才能定义Watermark。在Kafka源表场景中,需要使用计算列。例如Kafka中写入数据:2018-11-11 00:00:00|1|Anna|female 。计算流程为:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink。

方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK

Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。

CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT, ts as to_timestamp(json_value(cast(`message` as VARCHAR ),'$.nodes.time')), WATERMARK wk FOR ts as withOffset(ts, 2000)) WITH (type = 'kafka' --请参见Kafka版本对应关系。);CREATE TABLE rds_sink ( starttime TIMESTAMP , endtime TIMESTAMP , `message` BIGINT ) WITH (type = 'rds');INSERT INTO rds_sinkSELECT TUMBLE_START(ts, INTERVAL '1' MINUTE), TUMBLE_END(ts, INTERVAL '1' MINUTE), count(`message`)FROM kafka_srcGROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);

方法2:Kafka SOURCE->UDTF->Realtime Compute for Apache Flink->RDS SINK

SQL

-- 定义解析Kafka message的UDTF。CREATE FUNCTION kafkapaser AS 'com.alibaba.kafkaUDTF';CREATE FUNCTION kafkaUDF AS 'com.alibaba.kafkaUDF';-- 定义源表,注意:Kafka源表DDL字段必须与以下示例一模一样。WITH中参数可改。create table kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT, ctime AS TO_TIMESTAMP(kafkaUDF(`message`)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意:计算列的类型必须为TIMESTAMP才能创建Watermark。 watermark for `ctime` as withoffset(`ctime`,0) -- 在计算列上定义Watermark。) WITH ( type = 'kafka010', -- 请参见Kafka版本对应关系。 topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3');create table rds_sink ( `name` VARCHAR, age INT, grade VARCHAR, updateTime TIMESTAMP) WITH( type='rds', url='jdbc:mysql://localhost:3306/test', tableName='test4', userName='test', password='');-- 使用UDTF,将二进制数据解析成格式化数据。CREATE VIEW input_view ASSELECT S.ctime, T.`order`, T.`name`, T.sex from kafka_src as S, LATERAL TABLE (kafkapaser (`message`)) as T ( ctime, `order`, `name`, sex);-- 对input_view中输出的数据做计算。CREATE VIEW view2 ( cnt, sex) AS SELECt COUNT(*) as cnt, T.sex from input_viewGroup BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE);-- 使用解析出的格式化数据进行计算,并将结果输出到RDS。insert into rds_sink SELECt cnt,sexfrom view2;

UDF&UDTF

Blink 2.2.4版本Maven依赖,示例如下。

org.apache.flink flink-core blink-2.2.4-SNAPSHOT provided org.apache.flink flink-streaming-java_2.11 blink-2.2.4-SNAPSHOT provided org.apache.flink flink-table_2.11 blink-2.2.4-SNAPSHOT provided com.alibaba fastjson 1.2.9

UDTF

package com.alibaba;import com.alibaba.fastjson.JSONObject;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.DataTypes;import org.apache.flink.types.Row;import java.io.UnsupportedEncodingException;public class kafkaUDTF extends TableFunction { public void eval(byte[] message) { try { // 读入一个二进制数据,并将其转换为String格式。 String msg = new String(message, "UTF-8"); // 提取JSON Object中各字段。 String ctime = Timestamp.valueOf(data.split('\|')[0]); String order = data.split('\|')[1]; String name = data.split('\|')[2]; String sex = data.split('\|')[3]; // 将解析出的字段放到要输出的Row()对象。 Row row = new Row(4); row.setField(0, ctime); row.setField(1, age); row.setField(2, grade); row.setField(3, updateTime); System.out.println("Kafka message str ==>" + row.toString()); // 输出一行。 collect(row); } catch (ClassCastException e) { System.out.println("Input data format error、Input data " + msg + "is not json string"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override // 如果返回值是Row,重新加载UDTF这个方法,并指明系统返回的字段类型。 // 定义输出Row()对象的字段类型。 public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING); }}

UDF

package com.alibaba;package com.hjc.test.blink.sql.udx;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;public class KafkaUDF extends ScalarFunction { // 可选,open方法可以不写。 // 需要import org.apache.flink.table.functions.FunctionContext; public String eval(byte[] message) { // 读入一个二进制数据,并将其转换为String格式。 String msg = new String(message, "UTF-8"); return msg.split('\|')[0]; } public long eval(String b, String c) { return eval(b) + eval(c); } //可选,close方法可以不写。 @Override public void close() { }}

自建Kafka

示例

create table kafka_stream( messageKey VARBINARY, `message` VARBINARY, topic varchar, `partition` int, `offset` bigint) with ( type ='kafka011', topic = 'kafka_01', `group.id` = 'CID_blink', bootstrap.servers = '192.168.0.251:****');

WITH参数

bootstrap.servers参数需要填写自建的地址和端口号。

仅在Blink 2.2.6及以上版本支持阿里云Kafka或自建Kafka显示TPS和RPS等指标信息。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。