欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink学习之DataStreamAPI(python版本)

时间:2023-05-15

今天我们来学习flink中较为基础的DataStream API,DataStream API用来处理流数据。本文主要是以pyflink的形式来进行讲解,对往期内容感兴趣的小伙伴:

hadoop专题: hadoop系列文章.spark专题: spark系列文章.flink专题: Flink系列文章、

本博客的API都是python的,根据流数据处理的不同阶段,去官方的pyflink文档中寻找对应的python API 总结而成,如有遗漏的地方,请大家指正。

目录

1、安装pyflink2、DataStream API

2.1 DataSources数据输入2.2 DataSteam转换操作2.3 DataSinks数据输出 3、DataSet4、参考资料 1、安装pyflink

Flink支持python3.6、3.7和3.8,同时Flink1.11以后也支持windows系统了,大家只要直接运行命令即可安装。

#安装命令python3 -m pip install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple/

我是在ubuntu中安装的,记得安装java8或11哦,出现如下界面即成功了。

2、DataStream API

DataStream API是Flink框架处理无界数据流的重要接口。前面提到,任何一个完整的Flink应用程序应该包含如下三个部分:

数据源(DataSource)。转换操作(Transformation)。数据汇(DataSink)。 2.1 DataSources数据输入 从文件读取数据

env.read_text_file(file_path: str, charset_name: str = 'UTF-8')

从集合Collection中读取数据

env.from_collection(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None)

自定义数据源

env.add_source(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None)

还支持其他的数据源,上面几种较为常见。 2.2 DataSteam转换操作

当Flink应用程序生成数据源后,就需要根据业务需求,通过一系列转换操作对数据流上的元素进行各种计算,从而输出最终的结果。

map

有时候,我们需要对数据流上的每个元素进行处理,比如将单个文本转换成一个元组,即1对1的转换操作,此时可以通过map转换操作完成。

datastreamsource.map(func, output_type) #Parameters#func – The MapFunction that is called for each element of the DataStream.#output_type – The type information of the MapFunction output data.#Returns#The transformed DataStream.

flat_map

在某些情况下,需要对数据流中每个元素生成多个输出,即1对N的转换操作,那么此时可以利用flatMap操作。

datastreamsource.flat_map(func, output_type) #Parameters#func – The FlatMapFunction that is called for each element of the DataStream.#output_type – The type information of output data.#Returns#The transformed DataStream.

fliter

有时要从数据流中筛选出符合预期的数据,那就需要对数据流进行过滤处理,即利用filter转换操作。

datastreamsource.filter(func) #Parameters#func – The FilterFunction that is called for each element of the DataStream.#Returns#The filtered DataStream.

key_by

针对不同的数据流元素,有时需要根据某些字段值,作为分区的Key来并行处理数据,此时就需要用到keyBy转换操作。它将一个DataStream类型的数据流转换成一个KeyedStream数据流类型

datastreamsource.key_by(key_selector,key_type) #Parameters#key_selector – The KeySelector to be used for extracting the key for partitioning.#key_type – The type information describing the key type.#Returns#The DataStream with partitioned state(i.e、KeyedStream).

reduce

对于分区的数据流,对数据进行reduce处理,它实际上是一种聚合操作,将两个输入元素合并成一个输出元素。它是KeyedStream流上的操作

datastreamsource.reduce(func)#Parameters#func – The ReduceFunction that is called for each element of the DataStream.#Returns#The transformed DataStream.

例如:

ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])

union

在流操作场景中,有时需要合并多个流,即将多个数据流合并成一个数据流,此时可以使用union转换操作(最多合并3个)

#流1合并2,3datastreamsource1.union(datastreamsource2,datastreamsource3) #Parameters#datastreamsource – The DataStream to union outputwith.#Returns The DataStream.

connect

除了union可以合并流,还可以使用connect对2个数据流进行合并,且两个流的数据类型可以不相同。

datastreamsource.connect(ds)#Parameters#ds – The DataStream with which this stream will be connected.#Returns#The ConnectedStreams.

project

#dataStreamSource.project(1, 0)方法从数据源dataStreamSource中筛选出2个字段,其字段索引分别是1和0,此时列也重新进行排序。datastreamsource.project(*field_indexes: int) #Parameters#field_indexes – The field indexes of the input tuples that are retained、The order of fields in the output tuple corresponds to the order of field indexes.#Returns#The projected DataStream.

partition_custom

partition_custom转换操作可以根据自身需要,自行制定分区规则,partitionCustom只能对单个Key进行分区,不支持复合Key。

datastreamsource.partition_custom(partitioner, key_selector) #Parameters#partitioner – The partitioner to assign partitions to keys.#key_selector – The KeySelector with which the DataStream is partitioned.#Returns#The partitioned DataStream.

window转换操作

Flink通过window机制,将无界数据流划分成多个有界的数据流,从而对有界数据流进行数据统计分析,window上还有多种转换操作,如max求窗口最大值,sum求窗口中元素和等。当窗口中的内置转换操作不能满足业务需求时,可以自定义内部的处理逻辑,即用apply方法传入一个自定义的WindowFunction

#CountWindow将datastream分成几个窗口datastreamsource.CountWindow(id: int)

2.3 DataSinks数据输出

当数据流经过一系列的转换后,需要将计算结果进行输出,那么负责输出结果的算子称为Sink。

sink_to

datastreamsource.sink_to(sink: pyflink.datastream.connectors.Sink) #Adds the given sink to this DataStream、only streams with sinks added will be executed once the execute() method is called.#Parameters#sink – The user defined sink.#Returns#The closed DataStream.

add_sink

datastreamsource.add_sink(sink_func: pyflink.datastream.functions.SinkFunction) #Adds the given sink to this DataStream、only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.#Parameters#sink_func – The SinkFunction object.#Returns#The closed DataStream.

3、DataSet

上面的部分,我们主要讲述了流处理DataStream的DataSource数据源、DataStream转换操作以及DataSink数据汇,在Flink中将批数据称为DataSet,关于批数据的处理总结如下:

数据源:和DataStream相似转换操作:参考spark的批处理api数据汇:和DataStream相似

DataSet在这里就不做过多讲述。

4、参考资料

《PyDocs》(pyflink官方文档)
《Flink入门与实战》
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。