package org.example.scalaimport org.apache.flink.api.java.ExecutionEnvironmentimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}object Flink_2021_0323_1443 { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置并行度 doPlan02(env) env.execute("Flink_2021_0323_1443") } def doPlan01(env: StreamExecutionEnvironment): Unit = { val file = "G:\workspace01\flink\src\main\resources\test.txt" val dataStream: DataStream[String] = env.readTextFile(file) val splitStream: DataStream[(String, Int)] = dataStream .flatMap(_.toUpperCase.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) splitStream.print() } def doPlan02(env: StreamExecutionEnvironment): Unit = { val socketStream: DataStream[String] = env.socketTextStream("192.168.195.178", 9999) val filterStream:DataStream[String]=socketStream.filter(_.contains('a')) // 过滤条件 filterStream.print() }}
声明:本文档仅是自己学习总结,其中有些知识点可能存在错误,若是学友偶然搜到参考,望斟酌后再使用,以免给您带来困扰,若是发现错误也希望您指出更正,在此提前感谢!! 总结过程中要是有些地方借鉴了各路大神成果,您觉得侵犯了您的知识产权,对您有所冒犯,烦请通知鄙人,鄙人将会尽快修正! 邮箱地址:390835164@qq.com