配置系统环境变量/etc/profile和flink的配置文件flink-conf.yaml
/etc/profile 增加配置如下(这里默认jdk,haodoop都配置好):
#flink configexport Flink_HOME=/opt/module/flink-1.14.3export HADOOP_CLASSPATH=`hadoop classpath`export PATH=$PATH:$Flink_HOME/bin
flink-conf.yaml配置如下:
# 修改了一个task可以使用2个slottaskmanager.numberOfTaskSlots: 2# 增加一行classloader.check-leaked-classloader: false
在bin/config.sh中第一行也添加了 以下环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
3、设置启动配置新增sql-conf.sql配置文件,配置hivecatalog
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/usr/hdp/3.1.4.0-315/hive/conf', 'hadoop-conf-dir'='/usr/hdp/3.1.4.0-315/hadoop/etc/hadoop/');-- set the HiveCatalog as the current catalog of the sessionUSE CATALOG myhive;
4、启动#注意切换用户su hive;# 在flink1.14.3目录下 执行以下命令bin/yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm flinkConnect -qu flink -dbin/sql-client.sh embedded -i conf/sql-conf.sql -s yarn-session
5、DDL# 设置sql执行格式为hiveset table.sql-dialect=hive;#设置像hive一样的表显示set sql-client.execution.result-mode = tableau;
6、测试# 建hive表CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIonED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' ); # 建hbase表-- shell: create 'flink_test22', {NAME => 'info', COMPRESSION => 'SNAPPY', TTL=>'FOREVER', VERSIONS => 3} put 'flink_test22','1004','info:sex','male' put 'flink_test22','1004','info:age','18' put 'flink_test22','1004','info:name','Tom'-- FlinkSQLCREATE TABLE Test_P ( rowkey STRING, info ROW, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'flink_test22', 'zookeeper.quorum' = 'tp001:2181,tp002:2181,tp003:2181', 'zookeeper.znode.parent' = '/hbase-unsecure' );-- 将hbase某张表数据存到另外一张表insert into Test_P select rowkey,ROW(age,name,sex) from htable_test2;# 创建kafka流表{"uid":"1001","shoid":"1036","money":"111","ts":"1644477567913"}CREATE TABLE kafka_table_test ( `uid` STRING, `shoid` STRING, `money` STRING, `ts` TIMESTAMP(3) metaDATA FROM 'timestamp') WITH ( 'connector' = 'kafka', 'topic' = 'kafka_table_test', 'properties.bootstrap.servers' = 'tp001:6667,tp002:6667,tp003:6667', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json')# 创建kafka数据写入hive的表CREATE TABLE kafka_table_sink_hive ( uid STRING, shoid STRING, money STRING, ts STRING ) PARTITIonED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' );# 往hive表插入数据insert into hive_table select '002',26,'2022-02-09','13';insert into hive_table select '1001',25,'2022-02-10','11';insert into hive_table select '1002',27,'2022-02-10','12';#关联hive表和hbase表select *from hive_table aleft join hTable_test2 bon a.user_id = b.rowkey#关联kafka和hive表select * fromkafka_table_test aleft joinhive_table bon a.uid = b.user_idselect * fromkafka_table_test aleft joinhTable_test2 bon a.uid = b.rowkey#求和select dt,count(1) from hive_table group by dt;
#!/bin/bashbin/sql-client.sh embedded -i conf/sql-conf.sql -s yarn-sessionCREATE TABLE Test_P ( rowkey STRING, info ROW, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'flink_test22', 'zookeeper.quorum' = 'tp001:2181,tp002:2181,tp003:2181', 'zookeeper.znode.parent' = '/hbase-unsecure' ); insert into Test_P select rowkey,ROW(age,name,sex) from htable_test2;
# 数据文件0001,zhangsan,99,98,100,school1,class10002,lisi,59,89,79,school2,class10003,wangwu,89,99,100,school3,class10004,zhangsan2,99,98,100,school1,class10005,lisi2,59,89,79,school2,class10006,wangwu2,89,99,100,school3,class1# 创建普通表create table score1(id string comment 'ID', name string comment 'name', Chinese double comment 'Chinese',English double comment 'English',math double comment 'math',school string comment 'school',class string comment 'class')comment 'score1' row format delimited fields terminated by ','stored as textfile;# 将文件放入hdfs文件系统hadoop fs -put /opt/module/flink-1.14.3/testData/score.txt /flink1.14-test-data# 加载本地数据进表load data local inpath '/opt/module/flink-1.14.3/testData/score.txt' into table score1;#直接把数据插入表中1、hiveSQL语法insert into table score1 values(0006,'wangwu2',89,99,100,'school3','class1'),(0005,'lisi2',59,89,79,'school2','class1');2、FlinkSQL语法INSERT INTO score1 SELECT 0004,'zhangsan2',99,98,100,'school1','class1' -- 注意: flinkSQL不支持ACID表的操作