欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink之State状态编程

时间:2023-06-19
文章目录

1.State分类2.算子状态(Operator State)

2.1 算子状态的数据结构(non-keyed state) 3.键控状态(keyed State)

3.1 键控状态数据结构 4.状态后端(state backends)5.状态编程 1.State分类


State[ValueState、ReadOnlyBroadcastState、MapState、AppendingState]

AppendingState[FoldingState、MergingState]

MergingState[ListState、AggregatingState、ReducingState]

在flink中,状态始终与特定算子相关联,像reduce、sum等算子都是默认带状态的,而map、flatmap本身时不带状态的,如果需要用到状态,可以自定义

为了使运行的flink了解算子的状态,算子需要预先注册其状态

算子状态(Operator State): 算子状态的作用范围限定为算子任务键控状态(keyed State):生产中应用案例较多, 根据输入数据流中定义的key来维护和访问

不管是哪种类型的State,都有2种不同的状态,raw(原生状态)、managed(Flink设定好的状态)

managed状态由Flink-runtime控制,类似于RocksDB、HashTable、Fs;类似于ValueState、ListState,Flink-runtime能将状态进行特定的编码,然后写入到检查点,所有的算子都能使用managed-stateraw状态而是将state维护在自己的数据结构,当checkpoint的时候,只会将state以序列化的形式写进checkpoint,flink只能看到原生的字节,而对state的数据结构一无所知 2.算子状态(Operator State) 2.1 算子状态的数据结构(non-keyed state)

列表状态(list state)
联合列表状态(union list state)
广播状态(broadast)

3.键控状态(keyed State) 3.1 键控状态数据结构

值状态(value state)
列表状态 list state
映射状态 map state
聚合状态(reducing state & aggregating state)

4.状态后端(state backends)

MemoryStateBackend(一般用于测试环境)

FsStateBackend(将checkpoint存在文件系统中,本地状态还是存在taskmanager本地内存中,不适合超大状态的存储)

RocksDBStateBackend(将所有状态序列化后,存入本地RocksDB(kv存储介质)中存储)

5.状态编程

package com.shufang.flink.stateimport com.shufang.flink.bean.SensorReadingimport org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.util.Collectorobject StateDemo { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(300) // env.setStateBackend(new MemoryStateBackend()) val sensorStream: DataStream[SensorReading] = env.socketTextStream("localhost", 9999) .map(a => { val strings: Array[String] = a.split(",") SensorReading(strings(0), strings(1).trim.toLong, strings(2).trim.toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) { override def extractTimestamp(element: SensorReading): Long = { element.timeStamp } }) val processedStream: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id) .process(new MyProcessFunction01) val processedStream02: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id) .flatMap(new MyFlatMapFunction) val processStream03: DataStream[(String, Double, Double)] = sensorStream.keyBy(_.id) .flatMapWithState[(String, Double, Double), Double] { //如果状态为空,那么只更新state为当前temp case (sensor: SensorReading, None) => (List.empty, Some(sensor.temperture)) //实际上,这里使用Option来维持状态的,没有状态保存而获取的话,就相当于getorelse(0) case (sensor: SensorReading, pretemp: Some[Double]) => val lastTemp: Double = pretemp.get val diff: Double = (sensor.temperture - lastTemp).abs if (diff > 10) { (Seq((sensor.id, lastTemp, sensor.temperture)), Some(sensor.temperture)) } else { (List(), Some(sensor.temperture)) } } processStream03.print("flatMapWith-result")// processedStream02.print("报警信息-温度波动过大") sensorStream.print("输入数据") env.execute("state") }}class MyFlatMapFunction extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { private var preTemp: ValueState[Double] = _ //利用open函数的特性,在初始化的时候就执行 override def open(parameters: Configuration): Unit = { preTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) } override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { val lastTemp: Double = preTemp.value() if ((value.temperture - lastTemp).abs > 10) { out.collect((value.id, lastTemp, value.temperture)) } preTemp.update(value.temperture) }}class MyProcessFunction01 extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { //声明State lazy val pretemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("pretemp", classOf[Double])) override def processElement( value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { //调用state val lastTemp: Double = pretemp.value() val currentTemp: Double = value.temperture if ((currentTemp - lastTemp).abs > 10) { out.collect((value.id, lastTemp, currentTemp)) } //更新state pretemp.update(currentTemp) }}

state设置ttl

// 设置ttl的配置val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build //声明状态描述器 val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])//state.enableTimeToLive(ttl配置) stateDescriptor.enableTimeToLive(ttlConfig)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。