一、flink水印,state,窗口(哪些,区别,适用场景)二、Flink架构三、Flink时间类型有那些,他们有什么区别?四、Flink窗口类型有哪些,你们目前用的什么窗口?五、Flink的状态你们有没有用过,用的什么类型的状态?六、Flink如何处理延迟数据?七、Flink中managed state和raw state区别?八、Flink的keyedstate有什么不足,优点是什么,缺点是什么?九、Flink的watermark有哪几种?十、Flink自定义sink和source有没有写过,遇到了什么问题?十一、flink内存管理机制目前面试暂时没有问到过十二、Flink处理完数据,写到mysql与redis中的时候mysql成功而redis失败十三、Flink的流批统一是怎么做的十四、Flink的状态是怎么保存的十五、flink如果不用水印怎么代码实现延迟数据?十六、Flink的反压机制十七、窗口有那些?十八、窗口函数十九、flink写入reidis,怎么增加效率?直白一点:设置什么参数可以让一个核有多个slot?二十、state有几种?代码中怎么获取state?
一、flink水印,state,窗口(哪些,区别,适用场景)
时间语义:
Event Time、 Ingestion Time和Processing Time
四种窗口操作:
reduce()、fold()、sum()、max() 待定
水印WaterMaker:
作用:解决乱序数据概念: 给数据额外加一个时间列(时间戳),即最大允许延时时间或乱序时间计算公式:WaterMaker当前窗口最大的事件时间 - 最大允许的延迟时间或乱序时间触发条件:a、窗口中有数据;b.WaterMaker>=窗口结束时间
Allowed Lateness:
作用:解决延迟数据概念:表示允许延迟数据最多可以迟到多久,还可以进行计算(保存窗口,并触发窗口计算)方法:WindowedStream.allowedLateness()方法来设置窗口的允许延迟。即正常情况下,窗口触发计算完成后就会被销毁,但设置了允许延迟后,窗口会等待设置的时长后再销毁,在该时间区间内迟到的数据仍可以进入窗口中,并触发新的计算。
OutputTag侧边流:
解决超延时数据,防止数据丢失。如果数据再设置延时时间之外到达,可以通过侧边流保存数据,进行后续处理。
窗口类型:
时间窗口TimeWindow:按时间间隔划分窗口;计数窗口CountWindow:按数据条目数划分窗口;会话窗口SessionWindow:按超时时间间隔划分窗口;用窗口大小和滑动大小来决定是滑动窗口还是滚动窗口
窗口API: 分两种
针对keyedStream的窗口API:window()针对非keyedStream的窗口API:windowAll()
并行度设置:
一个Operator(source、transformation、sink)由多个并行的Task(线程)执行。并行的线程数就是并行度(Parallel)
Flink中,Operator并行度设置方式有4中:
Operator Level(算子级别)(可以使用)
对某一个算子、数据源和sink的并行度设置可以调用setParallelism()方法来指定。
x.flatMap().keyBy().timeWindow().sum().setParallelism()Execution Environment Level(Env级别)(可以使用)
为执行环境的所有算子、数据源和sink设置并行度 用setParallelism()方法指定。
env.setParallelism()Clinet Level(客户端级别,推荐使用)(可以使用)
在客户端提交Job是设定,用 -p 参数指定并行度
/export/server/flink/bin/flink run -p 10 WordCount-java.jarSystem Level(系统默认级别,尽量不使用)
在配置文件flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度
State:
1、状态组织形式(forms)划分:Managed(管理) 和 Raw(原始)
2、状态基本类型划分:Keyed State 和 Operator State
Keyed State :
只能用在KeyedStream上的算子中每个Key对应一个State:一个Operator实例处理多个Key,访问相应的多个State并发改变,State随着Key在实例间迁移通过RuntimeContext访问:Rich Function支持的数据结构:
ValueState,ListState,ReducingState,AggregatingState,MapState
在Flink Stream模型中,DataStream经过KeyBy的操作可以变为KeyedStream。KeyedState是基于KeyedStream上的状态,这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个State,如stream.keyBy()
Operator State:
可以用于所有算子:常用于source,例如FlinkKafkaConsumer一个Operator实例对应一个State并发改变时有多种重新分配方式可选:均匀分配;合并后每个得到全量实现CheckPointedFunction或ListCheckpointed接口支持的数据结构:ListState
Operator State又称为non-keyed state,与key无关的State,每一个operator state都仅与一个operator的实例绑定。Operator State可以用于所有算子,但一般常用语source
3、KeyedState与OperatorState区别
current key:
OperatorState没有current key概念KeyedState的数值总是与一个current key对应的
heap:
OperatorState只有堆内存一种实现KeyedState有堆内存和RocksDB两种实现
snapshot:
OperatorState需要手动实现snapshot和restore方法KeyedState由backend实现,对用户透明
size:
OperatorState一般被认为是规模比较小的KeyedState一般是相对规模较大的
4、存储State数据结构
有状态计算其实需要考虑历史数据,而历史数据需要搞个地方存储起来。Flink为了方便不同分类的State的存储和管理,提供了如下API/数据结构来存储State
1)、Keyed State,保存Keyed State的数据结构
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态。它可以通过update方法更新状态值,通过vlaue()方法获取状态值。如求按用户id统计用户交易总额ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。如统计按用户id统计用户经常登录的IP。ReducingState: 这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。MapState 2)、Operator State 需要自己实现CheckpointedFunction或ListCheckpointed接口,保存OperatorState的数据结构:ListState和BroadcastState 定位反压节点: 通过Flink Web UI自带的反压监控面板;通过Flink Task Metrics、 反压监控面板: 该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的Operator(如flatmap)下游的节点接收速率较慢,通过反压机制限制了该节点的发送速率。 值得注意:反压的根源节点并不一定会在反压面板体现出高反呀,因为反压面板监控的是发送端,如果某个节点是性能瓶颈并不会导致它本身出现高反压,而是导致它的上游出现高反压。总体来看,如果我们找到第一个出现反压的节点,那么反压根源要么是这个节点,要么是紧接着的下游节点。 分析反压的大致思路是: 在实践中,很多情况下的反压是由于数据倾斜造成的。这点可以通过WebUI各个SubTask的Recods Sent和 Record Received来确认。另外Checkpoint detail里不同SubTask的State Szie也是一个分析数据倾斜的有用指标。 此外,最常见的问题可能是用户代码的执行效率问题(频繁被阻塞或者性能问题)。最有用的方法就是对TaskManager进行CPU profile,从中分析到Task Thread是否跑满了一个CPU核,若是的话,要分析CPU主要花费在哪些函数里面,比如卡在Regex的用户函数(ReDoS);若不是的话,可能是checkpoint或GC等系统活动导致的暂时系统暂停。 当然,性能分析的结果可能是正常的,只是作业申请的资源不足导致了反压,这就要求拓展并行度。 另外TaskManager的内存以及GC问题也可能导致反压,包括TaskManager JVM各区内存不合理导致的频繁Full GC甚至失联。推荐使用通过给TaskManager启用G1垃圾回收器来优化GC,并打印GC日志来观察GC问题。 总结:反压是Flink应用运维中常见的问题,它不仅意味着性能瓶颈还可能导致作业的不稳定性。定位反压可以从Web UI的反压监控面板和Task Metric两者入手,前者方便简单分析,后者适合深入挖掘。定位到反压节点后我们可以通过数据分布、CPU Profile和GC指标日志等手段来进一步分析反压背后的具体原因并进行针对性的优化。 Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。 JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。 TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。 Flink 中的计算资源通过 Task Slot 来定义。每个 task slot 代表了 TaskManager 的一个固定大小的资源子集。例如,一个拥有3个slot的 TaskManager,会将其管理的内存平均分成三分分给各个 slot。将资源 slot 化意味着来自不同job的task不会为了内存而竞争,而是每个task都拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的内存。 Event Time 是每条数据在其生产设备上发生的时间。这段时间通常嵌入在记录数据中,然后进入Flink,可以从记录中提取事件的时间戳;Event Time即使在数据发生乱序,延迟或者从备份或持久性日志中重新获取数据的情况下,也能提供正确的结果。这个时间是最有价值的,和挂在任何电脑/操作系统的时钟时间无关。 Processing Time 是指执行相应操作的机器的系统时间。如果流计算系统基于Processing Time来处理,对流处理系统来说是最简单的,所有基于时间的操作(如Time Window)将使用运行相应算子的机器的系统时钟。然而,在分布式和异步环境中,Processing Time并不能保证确定性,它容易受到Event到达系统的速度(例如来自消息队列)以及数据在Flink系统内部处理的先后顺序的影响,所以Processing Time不能准确地反应数据发生的时间序列情况。 Ingestion Time是事件进入Flink的时间。 在Source算子处产生,也就是在Source处获取到这个数据的时间,Ingestion Time在概念上位于Event Time和Processing Time之间。在Source处获取数据的时间,不受Flink分布式系统内部处理Event的先后顺序和数据传输的影响,相对稳定一些,但是Ingestion Time和Processing Time一样,不能准确地反应数据发生的时间序列情况。 Tumbling window(固定时间窗口),Sliding Windows(滑动窗口),Session window(一个session window关闭通常是由于一段时间没有收到元素。),Global window(相同keyed的元素分配到一个窗口) keyed state 和operator state。参考 Operator State:该类State与key无关,整个operator对应一个state,该类State没有Keyed Key支持的数据结构多,仅支持ListState。举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。 默认丢弃 keyed state和operator state。以两种形式存在:managed state和raw state Keyed State始终与键有关,只能在KeyedStream的函数和运算符中使用。您可以将“Keyed State”视为已分区或分片的operator state,每个键仅具有一个状态分区。每个键状态在逻辑上都绑定到 watermark是用于处理乱序和延迟事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。 import java.sql.{Connection, DriverManager, PreparedStatement}import bean.Irisimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.RichSinkFunction class MysqlSink extends RichSinkFunction[Iris] { private var connection: Connection = null private var ps: PreparedStatement = null 在open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。 override def open(parameters: Configuration): Unit = { super.open(parameters) val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/bike?characterEncoding=utf-8&useSSL=true" val username = "root" val password = "123456" //1.加载驱动 Class.forName(driver) //2.创建连接 connection = DriverManager.getConnection(url, username, password) val sql = "insert into tb_iris(category,sepalLength,sepalWidth,petalLength,petalWidth,insertTime) values(?,?,?,?,?,?);" 获得执行语句 ps = connection.prepareStatement(sql) } override def invoke(value: Iris): Unit = { try { ps.setString(1, value.category) ps.setDouble(2, value.sepalLength) ps.setDouble(3, value.sepalWidth) ps.setDouble(4, value.petalLength) ps.setDouble(5, value.petalWidth) ps.setLong(6, System.currentTimeMillis()) ps.executeUpdate() } catch { case e: Exception => println(e.getMessage) } } 关闭连接 override def close(): Unit = { super.close() if (connection != null) { connection.close() } if (ps != null) { ps.close() } }} 主函数 object mongoSourceTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 获取source的数据 val iris = env.addSource(new MongodbSource()) .flatMap(_.toSeq) 中间的数据处理逻辑省略 iris.addSink(new MysqlSink) env.execute("Flink add mongo data sourc") }} Source mongodb import com.alibaba.fastjson.JSON case class Iris( category: String, sepalLength: Double, sepalWidth: Double, petalLength: Double, petalWidth: Double ) {}object Iris { def apply(jsonString: String): Iris = { val jobject = JSON.parseObject(jsonString) new Iris(jobject.getString("class"), jobject.getDouble("sepalLength"), jobject.getDouble("sepalWidth"), jobject.getDouble("petalLength"), jobject.getDouble("petalWidth")) }}import bean.Irisimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.mongodb.scala.{document, MongoClient, MongoCollection, MongoDatabase}import Helpers._// https://github.com/mongodb/mongo-scala-driver/blob/master/examples/src/test/scala/tour/Helpers.scala class MongodbSource extends RichSourceFunction[Seq[Iris]] { // mongodb url private val mongourl = "mongodb://root:123456@localhost:27017/admin" var mongoClient: MongoClient = null var collection: MongoCollection[document] = null override def cancel(): Unit = {} // open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 override def open(parameters: Configuration): Unit = { super.open(parameters) mongoClient = MongoClient(mongourl) val database: MongoDatabase = mongoClient.getDatabase("datatest") collection = database.getCollection("iris") } // 关闭连接 override def close(): Unit = { super.close() if (mongoClient != null) { mongoClient.close() } } // DataStream 调用 run() 方法用来获取数据 override def run(ctx: SourceFunction.SourceContext[Seq[Iris]]) = { val irises = collection.find().results() .map(_.toJson()) .map(Iris.apply) ctx.collect(irises) }} 在 JVM 堆内或堆外实现显式的内存管理,即用自定义内存池来进行内存块的分配和回收并将对象序列化后存储到内存块,比如 Spark 的 Project Tungsten 和 Hbase 的 BlockCache。因为内存可以被精准地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以组件可以对内存有更好的掌控,这种内存管理方式也被认为是相比 Java 更像 C 语言化的做法。 Flink 内存管理概览 Network Buffers 区: 网络模块用于网络传输的一组缓存块对象,单个缓存块对象默认是32KB大小。Flink 会根据TaskManager 的最大内存来计算该区大小,默认范围是64MB至1GB。Memory Manager 区: 用于为算子缓存运行时消息记录的大缓存池(比如 Sort、Join这类耗费大量内存的操作),消息记录会被序列化之后存进这些缓存块对象。这部分区域默认占最大 heap 内存减去 Network Buffers 后的70%,单个缓存块同样默认是32KB。Free 区: 除去上述两个区域的内存剩余部分便是 Free heap,这个区域用于存放用户代码所产生的数据结构,比如用户定义的State。 目前 Memory Manager 的内存初始化方式有两种: 第一种是启动时即为 Network Buffers 区和 MemoryManager 区分配全部内存,这样 TaskManager 启动过程中会产生一次到多次的 full GC,导致 TaskManager 的启动慢一点,但是节省了后续执行作业时的 GC 时长。第二种方式是采用”懒分配”的方法,在内存紧张时再增量向操作系统申请内存,避免一下吃完所有的内存导致后续的其他操作内存不足,例如流计算作业的 StateBackend 保存在内存的 State 对象。 Network Buffers 和 MemoryManager 的存在会贯穿 TaskManager 的整个生命周期。它们管理的 Memory Segment 不断被重用,因此不会被 JVM 回收。经过若干次 GC 之后它们会进入老年代,变成常驻的对象。 Memory Segment 不管消息数据实际存储在 on-heap 还在是 off-heap,Flink 都会将它序列化成为一个或多个的 Memory Segment(内部又称 page)。系统可能会在其他的数据结构里存指向条消息的指针(一条消息通常会被构造成一个 Memory Segment 的集合),这意味着 Flink 需要依赖于一个有 page 概念并支持跨 page 消息的高效率序列化器。因此 Flink 实现了自己的类型信息系统(Type Information System)和序列化栈。 熟悉 Spark 的同学看到这里可能会联想到 Spark 的 Catalyst,它可以通过运行时编译来生成 Java bytecode 并提供了直接访问二进制对象字段的能力。然而 Flink 的开发语言是 Java,不便利用 Scala 的 quasiquotes 特性,因此 Flink 采用的办法是使用定制化的序列化机制和类型系统。 Flink 序列化机制 & 类型信息系统 Flink 作业可以处理任意 Java 或 Scala 对象类型的数据。要对作业执行计划进行优化,首先要识别出作业数据流的每一个步骤的数据类型。对于 Java 程序,Flink 实现了基于反射的类型提取器来分析用户定义函数的返回类型;对于 Scala 程序,Flink 可以直接利用 Scala 编译器的类型推导特性。其后 Flink 会用TypeInformation作为类型描述符来表示每种数据类型。 Flink 类型系统存下以下内建分类: 基础类型: 所有 Java 原始类型和它们的包装类型(包括 Hadoop Writable类型),加上void、String、Date、BigDecimal和BigInteger。原始类型的数组和对象数组。复合类型: Flink Java Tuple (Flink Java API 的一部分): 最多支持25个字段,不支持 null 字段。Scala case classe (包括 Scala 元组): 最多支持22个字段,不支持 null 字段。Row: Flink API 对象,代表任意数量字段的元组,支持 null 字段。POJO: 遵循 Java Bean 模式的类。 辅助类型: Option、Either、Lists、Maps等等。通用类型: 不能被识别为以上任意类型的数据类型。Flink 不会自己序列化这些类型,而是交由 Kyro 序列化。 每个数据类型都会有专属的序列化器。举个例子,BasicTypeInfo(基础类型)的序列化器为TypeSerializer,用于序列化 T 原始类型的数据;WritableTypeInfo(Hadoop Writable类型)的序列化器为WritableSerializer,用于序列化实现类 Writable 接口的 T 类型的数据。 对象序列化后会写到 DataOutput (由 MemorySegement 所支持),并高效地自动通过 Java 的 Unsafe API 操作写到内存。对于作为 key 的数据类型,TypeInformation还会提供类型比较器(TypeComparator)。类型比较器负责对对象进行哈希和比较,取决于具体的数据类型还可以高效地比较二进制形式的对象和提取固定长度的 key 前缀。 复合类型的对象可能包含内嵌的数据类型,在这种情况下,它们的序列化器和类型比较器同样是复合的。它们会将内嵌类型的序列化和比较大小任务委托给对应类型的序列化器和类型比较器。下图描述了一个典型的复合类型对象Tuple 最后,如果内建的数据类型和序列化方式不能满足你的需求,Flink 的类型信息系统也支持用户拓展。用户只需要实现TypeInformation、TypeSerializer和TypeComparator即可定制自己类型的序列化和比较大小方式。 对 GC 的影响 因此在 JVM 内存结构规划上,Flink 也作了相应的调整: MemoryManager 和 Network Buffers 两个实现了显式内存管理的子系统分配到老年代,而留给用户代码的 Free 区域分配到新生代,见下图。 为了证明 Flink 内存管理和序列化器的优势,Flink 官方对 Object-on-Heap (直接 Java 对象存储)、Flink-Serialized (内建序列化器 + 显式内存管理)和 Kryo-Serialized (Kryo 序列化器 + 显式内存管理)三种方案进行了 GC 表现的对比测试。 测试方法是对一千万个 Tuple2 测试在 JVM GC 上的表现如下图: 幂等写和事务写 Flinksql实现了flink的流批统一 检查点我们是保存在外部分布式文件系统中的 Spark Streaming 反压 这种限制就比较明显了,需要用户自己去预估,如果后端的处理能力远大于我们限制的数据量,那么就会造成资源的浪费。在Spark1.5以后,引入了pid的概念。 TCP的反压,是通过callback实现的,当socket发送数据去receive buffer后,receiver会反馈给send端,目前receiver端的buffer还有多少剩余空间,然后send会根据剩余空间,控制发送速率。 具体的,我可以来看下Flink的ExecutionGraph: 其中RP用来发送数据,IG用来接收数据。 先来看第一个问题,跨TM的反压。 当sink端数据处理不过来的时候,IG会不断向LocalBufferPool申请内存,导致LocalBufferPool会不断向NetWorkBufferPool申请内存。最终导致NetworkBufferPool的可用内存被申请完。 所以此时,数据就会被不断缓存在上游的netty,当上游的netty的buffer写到限制大小后,就会不能被写入。这个时候netty 的 channel.isWritable() 就会返回false。 在1.5版本以后 1.因为TM中会有多个Task运行,所以单个Task的反压会阻断整个TM的socket,而其他的task却无法向下游发送数据,连checkpoint的barrier也无法发出。 2.反压传播路径长,导致生效延迟比较大。 credit的反压是类似于TCP的反压,但它是在Flink层面上的。可以看下图: 长此以往,当credit返回0的时候,表示没有内存缓存了,那么RS接收到credit的时候,就不会继续往netty写数据了。这样socket就不会堵塞了,同时生效延迟也降低了。同时RP也会不断去探测IG是否有空余的空间。 答案当然不是,这个很大程度上取决于sink端数据存储在哪里。譬如说,数据最终落地在es中,那么由于前面数据量过大,es端直接报错,整个任务就停止了,这个时候动态反压就没有了,那么静态的限制consumer的量就很关键了,但是这个设置是在1.8版本里才有。 窗口的分类: // Stream of (userId, buyCnt)val buyCnts: DataStream[(Int, Int)] = ...val tumblingCnts: DataStream[(Int, Int)] = buyCnts // key stream by userId .keyBy(0) // tumbling time window of 1 minute length .timeWindow(Time.minutes(1)) // compute sum over buyCnt .sum(1) 滑动窗口: val slidingCnts: DataStream[(Int, Int)] = buyCnts .keyBy(0) // sliding time window of 1 minute length and 30 secs trigger interval .timeWindow(Time.minutes(1), Time.seconds(30)) .sum(1) 事件窗口:count window 滚动窗口(Tumbling Window) // Stream of (userId, buyCnts)val buyCnts: DataStream[(Int, Int)] = ...val tumblingCnts: DataStream[(Int, Int)] = buyCnts // key stream by sensorId .keyBy(0) // tumbling count window of 100 elements size .countWindow(100) // compute the buyCnt sum .sum(1) 滑动窗口(Sliding Window) val slidingCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // sliding count window of 100 elements size and 10 elements trigger interval .countWindow(100, 10) .sum(1) 滚动窗口以及滑动窗口:区别是滚动窗口没有重叠 滑动窗口有重叠 会话窗口:session window // Stream of (userId, buyCnts)val buyCnts: DataStream[(Int, Int)] = ... val sessionCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // session window based on a 30 seconds session gap interval .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) .sum(1) 窗口函数主要分为两种,一种是增量计算,如reduce和aggregate,一种是全量计算,如process。增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算。 说说并行度、slot和taskManager的关系 除了考虑实际情况,一个TM内存和core不能设置过大,那么在一个可选择的区间里,要怎么选择呢? 这下知道怎么去平衡taskmanager个数及slot个数/TM的关系了吧。
要解决反压首先要做的是定位到造成反压的节点,主要有两种方法:
Flink Web UI的反压监控提供了SubTask级别的反压监控,原理是通过周期性对Task线程的栈信息采样,得到线程被阻塞在请求Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。默认配置下,这个频率在0.1以下则为OK,0.1至0.5为Low,超过0.5则为High。
若处反压状态,可能有两种可能性:
如果一个SubTask的发送端Buffer占用率很高,则表明它被下游反压限速了;如果一个SubTask的接收端Buffer占用很高,则它表明它被反压传到至上游。
Keyed State:该类状态是基于 KeyedStream 上的状态,这个状态是根据特定的 key 绑定的,对 keyedStream 流上的每一个 key,都对应着一个 state。stream.keyBy(…).window().apply(new KeyedStateRichFunction())
Allowedlateness指定允许数据延迟时间
Sideoutputlatedate侧边流
managed state用Flink运行时控制的数据结构表示,例如内部哈希表或RocksDB。例如“ ValueState”,“ ListState”等。Flink的运行时对状态进行编码,并将其写入检查点。
原始状态是operator保留其自己的数据结构的状态。被检查点时,它们仅将字节序列写入检查点。Flink对状态的数据结构一无所知,只看到原始字节。
所有数据流功能都可以使用托管状态,但是原始状态接口只能在实现运算符时使用。建议使用托管状态(而不是原始状态),因为有了托管状态,Flink可以在更改并行性时自动重新分配状态,并且还可以进行更好的内存管理。
Keyed State被进一步组织成所谓的Key Groups。Key Groups是Flink可以重新分配Keyed State的原子单位; 与定义的最大并行度完全相同的键组数。在执行期间,Keyed运算符的每个并行实例都使用一个或多个键组的键。
window划分
window的设定无关数据本身,而是系统定义好了的。
window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开。
水印
(1)周期性水印
AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流元素,或纯粹基于处理时间)。
(2)带标点水印(特定事件出发的水印)
要在某个事件指示可能生成新水印时生成水印,请使用 AssignerWithPunctuatedWatermarks、对于这个类,Flink 将首先调用该extractTimestamp(…)方法为元素分配时间戳,然后立即调用该checkAndGetNextWatermark(…)元素上的 方法。
该checkAndGetNextWatermark(…)方法传递在方法中分配的时间戳extractTimestamp(…) ,并且可以决定是否要生成水印。每当该checkAndGetNextWatermark(…) 方法返回一个非空水印,并且该水印大于最新的先前水印时,就会发出该新水印。
flink官网参考链接
Flink内置的source或者sink如果解决不了你的问题,你可以自定义source或者sink
如:
Mysql sink
以鸢尾花数据集作为mongodb中的数据原
mongodb-driver 的pom依赖
Flink 的内存管理也并不例外,采用了显式的内存管理并用序列化方式存储对象,同时支持 on-heap 和 off-heap。在这点上 Flink 与 Spark 的MEMORY_ONLY_SER存储级别十分相似,不同点在于 Spark 仍以 on-heap 对象存储为主,而 Flink 则只支持序列化的对象存储。
Flink 内存主要指 TaskManager 运行时提供的内存资源。TaskManager 主要由几个内部组件构成: 负责和 JobManager 等进程通信的 actor 系统,负责在内存不足时将数据溢写到磁盘和读回的 IOManager,还有负责内存管理的 MemoryManager。其中 actor 系统和 MemoryManager 会要求大量的内存。相应地,Flink 将 TaskManager 的运行时 JVM heap 分为 Network Buffers、MemoryManager 和 Free 三个区域(在 streaming 模式下只存在 Network Buffers 和 Free 两个区域,因为算子不需要缓存一次读入的大量数据)。
各个区域的功能如下:
Memory Segment 是 Flink 内存管理的核心概念,是在 JVM 内存上的进一步抽象(包括 on-heap 和 off-heap),代表了 Flink Managed Memory 分配的单元。每个 Memory Segment 默认占32KB,支持存储和访问不同类型的数据,包括 long、int、byte、数组等。你可以将 Memory Segment 想象为 Flink 版本的 java.nio.ByteBuffer。
序列化的格式由 Flink 序列化器定义,并且可以描述到消息记录的单个字段,这意味着 Flink 只需要序列化目标字段而不是整个对象。这点非常重要,因为 Flink 的输入数据是以序列化方式存储的,在访问时需要反序列化,这在减小了存储空间的同时会带来一定的计算开销,所以提升序列化和反序列化(SerDe)效率可以明显提高整体性能。
Java 生态圈实际上已经有不少出色的序列化库,包括 Kryo、Apache Avro、Apache Thrift 和 Google 的 ProtoBuf,然而 Flink 毅然重新造了一套轮子以定制数据的二进制格式。这带来了三点重要的优势:其一,掌握了对序列化后的数据结构信息,使得二进制数据间的比较甚至于直接操作二进制数据成为可能;其二,Flink 依据计划要执行的操作来提前优化序列化结构,极大地提高了性能;其三,Flink 可以在作业执行之前确定对象的类型,并在序列化时利用这个信息进行优化。
其中比较特别的是GenericTypeInfo,作为后备计划的它会委托 Kyro 进行序列化。
可以看出序列化后的对象存储是非常紧凑的,POJO 首部序列化后仅占1字节的空间,String 这种不固定长度的对象也以实际长度来存储。如同上文所讲,一个对象并不要求完整地存放在一个 MemorySegment 内,而是可以跨 MemorySegment 存放,这意味着将不会有内存碎片产生。
作为简单回顾,Flink 不会将消息记录当作对象直接放到 heap 上,而是序列化后存在长期缓存对象里。这意味着将不会出现低效的短期对象,消息对象只用来在用户函数内传递和被序列化。而长期对象是 MemorySegment 本身,它们并不会被 GC 清理。
JVM 参数中控制新生代和老年代比例的参数是-XX:NewRatio,这表示了老年代空间与新生代空间之比,默认为2,即(不计meta区)新生代占 heap 的三分之一,老年代占 heap 的三分之二。
显而易见,使用显式内存管理可以显著地减少 GC 频率。在 Object-on-Heap 的测试中,GC 频繁地被触发并导致 CPU 峰值达到90%。在测试中使用的8核机器上单线程的作业最多只占用12.5%的 CPU ,机器花费在 GC 的成本显然超过了实际运行作业的成本。而在另外两个依赖显式内存管理和序列化的测试中,GC 很少被触发,CPU 使用率也一直稳定在较低的水平。
总结
Flink 在内存管理上较大程度上借鉴了 Spark 的方案,包括存储不足时的溢写机制和内存区域的划分。不过 Flink 的管理粒度更细更精确,最为典型的一点是 Spark 的序列化存储是以 RDD 的一个 Partiton 为单位,而 Flink 的序列化则是以消息记录为单位,这也体现了两者最大的区别: Spark 的哲学是将 Streaming 视作 Batch 的特例 (micro batch) ,相反地,Flink 的哲学是将 Batch 视作 Streaming 的特例。
状态算子数据是保存在内存中的
Spark Streaming的反压是从1.5版本以后引入的。在这之前,基本就是通过控制最大接受速率来控制的。譬如,如果是基于Receiver形式,可以配置spark.streaming.receiver.maxRate。限制每个receiver每秒可以接受的数据。对于Direct方式,我们可以配置spark.streaming.kafka.maxRatePerPartition,来限制每个分区每次所能接受的最大记录数,这个在追kafka数据的时候,极为重要。
它会从buffer以及各个process中拉去数据,通过processingDelay 、schedulingDelay 、当前 Batch 处理的记录条数以及处理完成事件来估算出一个速率,就是流每秒能处理的最大条数。然后反馈给source端。
Flink反压
flink的反压又分为两个阶段,一个是1.5版本之前,一个是1.5版本以后
在1.5版本以前
Flink的反压是通过TCP的反压机制来控制的
我们来看下flink的网络传输
可以看到Flink在Produce产生数据后,经过netty使用socket传输,使用的是TCP协议。而TCP自带了反压机制。
我们可以看到,上游task向下游task传输数据的时候,有ResultPartition和InputGate两个组件。
这个问题可以被看成两个问题,一个是夸TaskManager,一个是TaskManager内部。譬如说下游的sink性能出了点问题,那么inputChannel就会告知上个TM的ResultPartition。然后第二个TM中的,RP也会通知该TM中的IG。
这个是跨TM的网络传输的流程。理解过Flink内存管理的同学都会知道,Flink会向off-heap内存申请一段固定的内存来使用,作为NetWork BufferPool。然后RP和IG都会向LocalBufferPool申请内存资源,LocalBufferPool会向NetWorkBufferPool申请资源。至于Netty直接走的JVM。
当IG无法写入的时候,就不会去读netty里的数据。当netty的规定缓存被写满了以后,socket就无法往netty里写数据。这个时候,socket的缓存就很快会被用满,使用TCP的ACK机制,通知上游Socket。然后上游的Socket由于无法往外写,所以上游的Socket也很快会被上游的netty写满。
上游的RS每次往netty写入数据的时候,都会通过 netty的channel.isWritable()。来判断netty是否能被写入。当channel.isWritable()返回false的时候,就会发生阻塞。所以当RS被写满的时候,就会去向LocalBufferPool请求内存,导致LocalBufferPool会向NetworkBufferPool请求内存,如果下游处理数据的速度一直跟不上产生数据的速度,那么会最终导致,上游的NetworkBufferPool的可用内存被申请完,产生堵塞。
以上是跨TaskManager的反压过程,下面是TaskManager内部的反压过程,也就是RS到IG。其实和上面的类似。
这就是 1.5版本以前的反压机制。
引进了credit这种反压机制。先来说下TCP这种方式的弊端:
RS会向IG发送这个要发送的量,IG返回当前空余量,包含LocalBufferPool的。如果这个时候发现backlog > credit,那么LocalBufferPool就会向NetWorkPool申请内存。
问题:
有了动态反压,那么静态反压是不是就没有用了?
Flink页面上监控反压的数据来源
在源码包中,有个md文件,对获取back pressure的ratio值做了解释。
默认情况下,每50ms发送 100此探测,通过调用Thrad.getStackTrace()方法来探测,从网络堆栈请求缓存区。如果有的任务比较紧张,那么有的请求就会卡主,以此来监控任务是否存在反压。中期,0.01表示100次中有1次探测卡住了。
时间窗口:time window
滚动窗口以及滑动窗口:区别是滚动窗口没有重叠滑动窗口有重叠
滚动窗口:
taskmanager.numberOfTaskSlots
每个TM的slot个数TM个数=最大并行度
一般我们设置TM内存:0.5gslot个数/TM,core:0.5*slot个数/TM。
假设并行度为1000,
方式一:设置100个TM,每个TM设置10个slot,每个设置5g内存,5core;
方式二:设置200个TM,每个TM设置5个slot,每个设置2.5g内存,2.5core(忽略小数的问题)。
这两种方式哪个会好一些?这个问题有没有困扰你很久?
答案:没有绝对的好坏,更分散会增加TM 之间数据交换开销,更集中的话,如果对状态访问较多,会导致对磁盘压力太大。
首先更分散会增加TM 之间数据交换开销容易理解吧。
其次,为何更集中的话,如果对状态访问较多,会导致对磁盘压力太大?
如果只有50台机器的话,那么每台机器上分布方式一的2个TM,或方式二的4个TM,感觉对磁盘来说压力应该还是一致的。但实际不会这么理想。在集群上已经有很多任务的情况下,假设100个TM大概会分布在10台机器上,200个TM会分布在10-20台机器上?这时候对单机的磁盘压力差别就出来了,如果磁盘资源确实紧俏,那么就考虑增加TM的数量。