1、表和视图2、Table API Connectors
2.1 filesystem、print、blackhole 3、timestamp和timestamp_ltz 1、表和视图
表分为临时表和永久表,相同名称下,临时表的优先级比永久表高
永久表需要数据库保存元数据,例如Hive数据库
连接外部数据系统通常用createTemporaryTable,中间结果表通常用createTemporatyView,如下所示:
tEnv.createTemporaryTable("table_name", tableDescriptor)tEnv.createTemporaryView("table_name", table)
2、Table API Connectors 2.1 filesystem、print、blackhole添加pom.xml依赖
程序如下:
import org.apache.flink.api.common.RuntimeExecutionModeimport org.apache.flink.streaming.api.functions.sink.DiscardingSinkimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentimport org.apache.flink.table.api.{DataTypes, FormatDescriptor, Schema, TableDescriptor, long2Literal, row, string2Literal}import org.apache.flink.types.Rowimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.FileSystemimport org.apache.hadoop.ipc.StandbyExceptionimport scala.util.control.Breaks.{break, breakable}object flink_test { // 获取Active HDFS Uri def getActiveHdfsUri() = { val hadoopConf = new Configuration() val hdfsUris = Array( "hdfs://192.168.23.101:8020", "hdfs://192.168.23.102:8020", "hdfs://192.168.23.103:8020" ) var hdfsCli: FileSystem = null var hdfsCapacity: Long = -1L var activeHdfsUri: String = null breakable { for (hdfsUri <- hdfsUris) { hadoopConf.set("fs.defaultFS", hdfsUri) hdfsCli = FileSystem.get(hadoopConf) try { hdfsCapacity = hdfsCli.getStatus.getCapacity activeHdfsUri = hdfsUri break } catch { case hdfsException: StandbyException => {} } } } activeHdfsUri } def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setRuntimeMode(RuntimeExecutionMode.STREAMING) val tEnv = StreamTableEnvironment.create(senv) val hdfsFilePath = s"${getActiveHdfsUri()}/test/test.txt" // HDFS表 val fileSystemTable = tEnv.from( TableDescriptor.forConnector("filesystem") .schema(Schema.newBuilder() .column("name", DataTypes.STRING()) .column("amount", DataTypes.BIGINT()) .build() ) .option("path", hdfsFilePath) .format(FormatDescriptor .forFormat("csv") .option("field-delimiter", ",") .build() ).build() ) tEnv.createTemporaryView("fileSystemTable", fileSystemTable) // print表 tEnv.createTemporaryTable("printSink", TableDescriptor.forConnector("print") .schema(Schema.newBuilder() .column("name", DataTypes.STRING()) .column("amount", DataTypes.BIGINT()) .build() ).build() ) // 读取HDFS表数据用print输出, 输出结果和转换成DataStream进行print一样 fileSystemTable.executeInsert("printSink") // blackhole表 tEnv.executeSql("create temporary table blackholeSink with ('connector' = 'blackhole') like printSink") // 读取HDFS表数据到blackhole tEnv.executeSql("insert into blackholeSink select * from fileSystemTable") // 转换为DataStream, 输出到blackhole val fileSystemDatastream = tEnv.toDataStream(fileSystemTable) fileSystemDatastream.addSink(new DiscardingSink[Row]()) senv.execute() }}
执行结果如下:
6> +I[zhang_san, 30]4> +I[li_si, 40]
3、timestamp和timestamp_ltz timestamp(p)p指小数秒的精度,范围为0-9,默认是6
val table = tEnv.sqlQuery("select timestamp '1970-01-01 00:00:04.001'") table.execute().print()
输出如下:
+----+-------------------------+| op | EXPR$0 |+----+-------------------------+| +I | 1970-01-01 00:00:04.001 |+----+-------------------------+
timestamp_ltz(p)用于描述时间线上的绝对时间点, 使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数
无法通过字符串来指定, 可以通过一个long类型的epoch时间来转化。在同一个时间点, 全世界所有的机器上执行System.currentTimeMillis()都会返回同样的值
tEnv.executeSql("create view t1 as select to_timestamp_ltz(4001, 3)") val table = tEnv.sqlQuery("select * from t1") table.execute().print()
输出如下:
+----+-------------------------+| op | EXPR$0 |+----+-------------------------+| +I | 1970-01-01 08:00:04.001 |+----+-------------------------+
各种当前时间函数tEnv.executeSql("create view myView1 as select localtime, localtimestamp, current_date, current_time, current_timestamp, current_row_timestamp(), now(), proctime()") val table = tEnv.sqlQuery("select * from myView1") table.printSchema() table.execute().print()
输出如下:
( `localtime` TIME(0) NOT NULL, `localtimestamp` TIMESTAMP(3) NOT NULL, `current_date` DATE NOT NULL, `current_time` TIME(0) NOT NULL, `current_timestamp` TIMESTAMP_LTZ(3) NOT NULL, `EXPR$5` TIMESTAMP_LTZ(3) NOT NULL, `EXPR$6` TIMESTAMP_LTZ(3) NOT NULL, `EXPR$7` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*)+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+| op | localtime | localtimestamp | current_date | current_time | current_timestamp | EXPR$5 | EXPR$6 | EXPR$7 |+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+| +I | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.862 |+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+