今天我们来学习flink中较为基础的DataStream API,DataStream API用来处理流数据。本文主要是以pyflink的形式来进行讲解,对往期内容感兴趣的小伙伴:
hadoop专题: hadoop系列文章.spark专题: spark系列文章.flink专题: Flink系列文章、
本博客的API都是python的,根据流数据处理的不同阶段,去官方的pyflink文档中寻找对应的python API 总结而成,如有遗漏的地方,请大家指正。
目录1、安装pyflink2、DataStream API
2.1 DataSources数据输入2.2 DataSteam转换操作2.3 DataSinks数据输出 3、DataSet4、参考资料 1、安装pyflink
Flink支持python3.6、3.7和3.8,同时Flink1.11以后也支持windows系统了,大家只要直接运行命令即可安装。
#安装命令python3 -m pip install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple/
我是在ubuntu中安装的,记得安装java8或11哦,出现如下界面即成功了。
DataStream API是Flink框架处理无界数据流的重要接口。前面提到,任何一个完整的Flink应用程序应该包含如下三个部分:
数据源(DataSource)。转换操作(Transformation)。数据汇(DataSink)。 2.1 DataSources数据输入 从文件读取数据
env.read_text_file(file_path: str, charset_name: str = 'UTF-8')
从集合Collection中读取数据env.from_collection(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None)
自定义数据源env.add_source(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None)
还支持其他的数据源,上面几种较为常见。 2.2 DataSteam转换操作当Flink应用程序生成数据源后,就需要根据业务需求,通过一系列转换操作对数据流上的元素进行各种计算,从而输出最终的结果。
map有时候,我们需要对数据流上的每个元素进行处理,比如将单个文本转换成一个元组,即1对1的转换操作,此时可以通过map转换操作完成。
datastreamsource.map(func, output_type) #Parameters#func – The MapFunction that is called for each element of the DataStream.#output_type – The type information of the MapFunction output data.#Returns#The transformed DataStream.
flat_map在某些情况下,需要对数据流中每个元素生成多个输出,即1对N的转换操作,那么此时可以利用flatMap操作。
datastreamsource.flat_map(func, output_type) #Parameters#func – The FlatMapFunction that is called for each element of the DataStream.#output_type – The type information of output data.#Returns#The transformed DataStream.
fliter有时要从数据流中筛选出符合预期的数据,那就需要对数据流进行过滤处理,即利用filter转换操作。
datastreamsource.filter(func) #Parameters#func – The FilterFunction that is called for each element of the DataStream.#Returns#The filtered DataStream.
key_by针对不同的数据流元素,有时需要根据某些字段值,作为分区的Key来并行处理数据,此时就需要用到keyBy转换操作。它将一个DataStream类型的数据流转换成一个KeyedStream数据流类型
datastreamsource.key_by(key_selector,key_type) #Parameters#key_selector – The KeySelector to be used for extracting the key for partitioning.#key_type – The type information describing the key type.#Returns#The DataStream with partitioned state(i.e、KeyedStream).
reduce对于分区的数据流,对数据进行reduce处理,它实际上是一种聚合操作,将两个输入元素合并成一个输出元素。它是KeyedStream流上的操作
datastreamsource.reduce(func)#Parameters#func – The ReduceFunction that is called for each element of the DataStream.#Returns#The transformed DataStream.
例如:
ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])
union在流操作场景中,有时需要合并多个流,即将多个数据流合并成一个数据流,此时可以使用union转换操作(最多合并3个)
#流1合并2,3datastreamsource1.union(datastreamsource2,datastreamsource3) #Parameters#datastreamsource – The DataStream to union outputwith.#Returns The DataStream.
connect除了union可以合并流,还可以使用connect对2个数据流进行合并,且两个流的数据类型可以不相同。
datastreamsource.connect(ds)#Parameters#ds – The DataStream with which this stream will be connected.#Returns#The ConnectedStreams.
project#dataStreamSource.project(1, 0)方法从数据源dataStreamSource中筛选出2个字段,其字段索引分别是1和0,此时列也重新进行排序。datastreamsource.project(*field_indexes: int) #Parameters#field_indexes – The field indexes of the input tuples that are retained、The order of fields in the output tuple corresponds to the order of field indexes.#Returns#The projected DataStream.
partition_custompartition_custom转换操作可以根据自身需要,自行制定分区规则,partitionCustom只能对单个Key进行分区,不支持复合Key。
datastreamsource.partition_custom(partitioner, key_selector) #Parameters#partitioner – The partitioner to assign partitions to keys.#key_selector – The KeySelector with which the DataStream is partitioned.#Returns#The partitioned DataStream.
window转换操作Flink通过window机制,将无界数据流划分成多个有界的数据流,从而对有界数据流进行数据统计分析,window上还有多种转换操作,如max求窗口最大值,sum求窗口中元素和等。当窗口中的内置转换操作不能满足业务需求时,可以自定义内部的处理逻辑,即用apply方法传入一个自定义的WindowFunction
#CountWindow将datastream分成几个窗口datastreamsource.CountWindow(id: int)
2.3 DataSinks数据输出当数据流经过一系列的转换后,需要将计算结果进行输出,那么负责输出结果的算子称为Sink。
sink_todatastreamsource.sink_to(sink: pyflink.datastream.connectors.Sink) #Adds the given sink to this DataStream、only streams with sinks added will be executed once the execute() method is called.#Parameters#sink – The user defined sink.#Returns#The closed DataStream.
add_sinkdatastreamsource.add_sink(sink_func: pyflink.datastream.functions.SinkFunction) #Adds the given sink to this DataStream、only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.#Parameters#sink_func – The SinkFunction object.#Returns#The closed DataStream.
3、DataSet上面的部分,我们主要讲述了流处理DataStream的DataSource数据源、DataStream转换操作以及DataSink数据汇,在Flink中将批数据称为DataSet,关于批数据的处理总结如下:
数据源:和DataStream相似转换操作:参考spark的批处理api数据汇:和DataStream相似
DataSet在这里就不做过多讲述。
4、参考资料 《PyDocs》(pyflink官方文档)
《Flink入门与实战》
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》