Mavn的依赖
1.8 ${java.version} ${java.version} 1.12.0 2.12 3.1.3 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.version} ${flink.version} org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-cep_${scala.version} ${flink.version} org.apache.flink flink-json ${flink.version} com.alibaba fastjson 1.2.68 org.apache.hadoop hadoop-client ${hadoop.version} mysql mysql-connector-java 5.1.49 com.alibaba.ververica flink-connector-mysql-cdc 1.2.0 org.slf4j slf4j-api 1.7.25 org.slf4j slf4j-log4j12 1.7.25 org.apache.logging.log4j log4j-to-slf4j 2.14.0 org.projectlombok lombok 1.18.12 org.apache.flink flink-connector-jdbc_${scala.version} ${flink.version} org.apache.phoenix phoenix-spark 5.0.0-Hbase-2.0 org.glassfish javax.el commons-beanutils commons-beanutils 1.9.3 com.google.guava guava 29.0-jre redis.clients jedis 3.3.0 ru.yandex.clickhouse clickhouse-jdbc 0.2.4 com.fasterxml.jackson.core jackson-databind com.fasterxml.jackson.core jackson-core org.apache.flink flink-table-api-java-bridge_${scala.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.version} ${flink.version} com.janeluo ikanalyzer 2012_u6 com.clickhouse clickhouse-jdbc 0.3.2-patch4 http * * org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single
核心代码
//流代码 EnvironmentSettings environment = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, environment); //dataStream.print(); //以一个的分钟作为周期 SingleOutputStreamOperator> streamOperator = dataStreams.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() { @Override public void apply(TimeWindow timeWindow, Iterable iterable, Collector> collector) throws Exception { ArrayList list = Lists.newArrayList(iterable); if (list.size() > 0) { collector.collect(list); } } }); //dataStreams.print(); Table table = streamTableEnvironment.fromDataStream(dataStreams, "user_id,item_id,cate_id,times,name,keyword,factory,price,pro,city,par,brank"); streamTableEnvironment.createTemporaryView("t1", table); streamOperator.addSink(new OrderSinkFunc()); //,tumble(times, interval '1' day) Table table1 = streamTableEnvironment.sqlQuery("select item_id,name,count(*)as num ,sum(price) as total from t1 group by item_id,name "); //支持撤回 streamTableEnvironment.toRetractStream(table1, Row.class).print("输出结果");