我们在做实时数据开发的时候,通常要用spark、flink去消费kafka的数据,拿到数据流后会和外部数据库(Hbase、MySQL等)进行维表关联来把数据流打宽。当然了,有些外部数据库不只是存储维度数据,也会有很多事实数据,并且这些数据更新频繁,数据量巨大,但是我们的Flink流也会去实时的join这些巨大的事实表,这就需要选择一个合适的外部数据库作为支持,这个外部数据库一定要满足海量数据高效的读写性能,这样才能满足实时场景的需求,说到这,我们的目光自然而然的落到了Hbase上,来吧,我们直接上图,下面这张图就是以上所说场景的一个基本架构
那么问题来了,FlinkSQL如何去关联Hbase大表呢,如果关联字段不是hbase维表的rowkey那么将会触发全表扫描,如果这个表很大,全表扫描效率就很不乐观了,耗时少则几秒,多则无限延长,所以我们一定是要走hbase二级索引的,但是很遗憾,FlinkSQL里的Hbase connector不会处理索引,它要么scan,要么就get,那么我们该怎么办呢,别急,我们也有笨办法,那就是我们自己维护索引表,如果你还不懂hbase二级索引的实现方式请自行补充这方面知识,下面的内容就是有关二级索引的试用了,看图吧
来,我描述一下上图的流程,首先消费到kafka数据后我们的流不能直接去join hbase的数据表而是要先去join索引表,这样就拿到了数据表的rowkey,然后我们再join数据表,这样就不会触发全表扫描了,而是通过rowkey查询,效率就一下子有了质的提升,那么代码是怎么实现呢?上案例!稍等,我先说一下这个案例,我们需要在Flink流中创建三个表:kafka表、hbase维度索引表、hbase维度表,然后我们就可以愉快的写SQL了
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //接入socket流,测试数据如下 //{'name':'kafka_tb','type':'INSERT','new':{'id':'1','name':'lxz'}} DataStream dataStream = env.socketTextStream("localhost", 9999); //定义kafka_tb表类型(有序) TypeInformation[] kafka_tb_types = new TypeInformation[]{Types.STRING,Types.STRING}; RowTypeInfo kafka_tb_rowType = new RowTypeInfo(kafka_tb_types); //socket接收到的流转换后注册成kafka_tb表 DataStream ds = dataStream.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String type = JSON.parseObject(value).getString("type"); JSonObject new_row = JSON.parseObject(value).getJSonObject("new"); switch (type) { case "INSERT": out.collect(Row.ofKind(RowKind.INSERT, new_row.getString("id"), new_row.getString("name")));break; } } }).returns(kafka_tb_rowType); //注册kafka表kafka_tb Schema schema01 = Schema.newBuilder().build(); Table tab1 = tEnv.fromChangelogStream(ds,schema01).as("id","name"); tEnv.createTemporaryView("kafka_tb", tab1); //注册Hbase索引表hbase_index_tb tEnv.executeSql("CREATE TABLE hbase_index_tb (n" + " ID STRING,n" + " CF ROW,n" + " PRIMARY KEY (ID) NOT ENFORCEDn" + ") WITH (n" + " 'connector' = 'hbase-2.2',n" + " 'table-name' = 'hbase_index_tb',n" + " 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',n" + " 'zookeeper.znode.parent' = '/hbase-unsecure'n"+ ")"); //注册Hbase数据表hbase_data_tb tEnv.executeSql("CREATE TABLE hbase_data_tb (n" + " ID STRING,n" + " CF ROW,n" + " PRIMARY KEY (ID) NOT ENFORCEDn" + ") WITH (n" + " 'connector' = 'hbase-2.2',n" + " 'table-name' = 'hbase_data_tb',n" + " 'zookeeper.quorum' = 'prod-bigdata-pc10:2181,prod-bigdata-pc14:2181,prod-bigdata-pc15:2181',n" + " 'zookeeper.znode.parent' = '/hbase-unsecure'n"+ ")"); //执行关联查询 tEnv.executeSql("select a.* " + "from hbase_data_tb a " + "join hbase_index_tb b " + "on a.ID = b.ID " + "join kafka_tb c " + "on c.name=b.NAME").print(); }