欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flinkcep基础知识以及相关api说明

时间:2023-07-12

Flink CEP用于处理复杂事件模式匹配,整个CEP知识和体系还是比较复杂的,这里主要对一些常用的模式进行整理,为了简化内容,对于很少用的模式及api就省略了。

1、模式

模式API可以从输入流中提取的复杂模式序列。

注意每个模式必须具有唯一的名称,以便后续可以使用该名称来标识匹配的事件。

1.1 单个模式

单个模式接受单个事件,而循环模式可以接受多个事件。

Pattern pattern = Pattern.begin("start").where( new SimpleCondition() { @Override public boolean filter(Event event) { return event.getId() == 42; } } );

只针对单个事件进行判断。 比如上述代码只要id为42,则表示符合名为start的模式。

1.1.1 循环模式

循环模式针对多个事件进行判断,实现方式有多种。

对于名为start的模式,以下是有效的数量关系:

// 匹配4次 start.times(4); // 匹配0或4次 start.times(4).optional(); // 匹配 2, 3 或 4 次 start.times(2, 4); // 匹配2, 3 or 4 次,尽可能多地匹配 start.times(2, 4).greedy(); // 匹配0,2, 3 or 4 次 start.times(2, 4).optional(); // 匹配0,2, 3 or 4 次,尽可能多地匹配 start.times(2, 4).optional().greedy(); // 匹配 1 或多次 start.oneOrMore(); // 匹配 1 或多次,尽可能多地匹配 start.oneOrMore().greedy(); // 匹配 0 或多次 start.oneOrMore().optional(); // 匹配 0 或多次,尽可能多地匹配 start.oneOrMore().optional().greedy(); // 匹配2 或更多次 start.timesOrMore(2); // 匹配2 或更多次 尽可能多地匹配 start.timesOrMore(2).greedy(); // 匹配0, 2 或更多次 尽可能多地匹配 start.timesOrMore(2).optional().greedy();

1.1.2 匹配条件

主要通过where指定条件。

.where() // 条件相连为and.or() // 条件相连为or// 终止条件.until() // 当使用了oneOrMore或者oneOrMore.optional时需要进行终止,以便清楚状态

简单条件

start.where(new SimpleCondition() { @Override public boolean filter(Event value) { return value.getName().startsWith("foo"); }});

迭代条件

middle.oneOrMore().where(new IterativeCondition() { @Override public boolean filter(SubEvent value, Context ctx) throws Exception { double sum = value.getPrice(); for (Event event : ctx.getEventsForPattern("middle")) { sum += event.getPrice(); } return Double.compare(sum, 5.0) < 0; }});

组合条件

pattern.where(new SimpleCondition() { @Override public boolean filter(Event value) { return ..、// some condition }}).or(new SimpleCondition() { @Override public boolean filter(Event value) { return ..、// or condition }});

停止条件

在循环模式(oneOrMore()和oneOrMore().optional())的情况下,还可以指定停止条件,此时使用until进行指定。

1.1.3 Pattern API where(condition),定义当前模式的条件。 为了匹配模式,事件必须满足条件。 多个连续的where(),其关系为AND。or(condition),添加与现有条件进行OR运算的新条件。 只有在至少通过其中一个条件时,事件才能匹配该模式:

until(condition),指定循环模式的停止条件。 意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。仅适用于oneOrMore()

timesOrMore(#times),指定此模式至少需要#times次出现匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。

times(#ofTimes),指定此模式需要匹配事件的确切出现次数。默认情况下,使用宽松的内部连续性(在后续事件之间)。

times(#fromTimes, #toTimes),指定此模式期望在匹配事件的#fromTimes次和#toTimes次之间出现。默认情况下,使用宽松的内部连续性。

greedy(),指定此模式是贪婪的,即它将尽可能多地重复。 这仅适用于quantifiers,目前不支持组模式。

consecutive(),与oneOrMore()和times()一起使用并在匹配事件之间强加严格的连续性,即任何不匹配的元素都会中断匹配。如果不使用,则使用宽松的连续性(如followBy())。

1.2 组合模式

多个个体模式组合起来就形成了一个组合模式。

严格连续性:希望所有匹配事件一个接一个地出现,中间没有任何不匹配的事件。宽松连续性:忽略匹配的事件之间出现的不匹配事件。 不能忽略两个事件之间的匹配事件。非确定性轻松连续性:进一步放宽连续性,允许忽略某些匹配事件的其他匹配。

对多个事件组成规则严格性的宽容度,使用近邻。

严格近邻:所有事件严格按照顺序进行,中间没有任何不匹配的事件。使用next()指定。宽松近邻:允许中间出现不匹配事件。使用followedBy()指定。非确定宽松近邻:一个匹配的事件能够再次使用。使用followedByAny()指定。

对于循环模式(例如oneOrMore()和times()),默认是宽松的连续性。 如果你想要严格的连续性,你必须使用consecutive()显式指定它。

1.2.1 简介

模式序列必须以初始模式开始。

Pattern start = Pattern.begin("start");

接下来,您可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。 

// 严格连续性Pattern strict = start.next("middle").where(...); // 宽松连续性Pattern relaxed = start.followedBy("middle").where(...); // 非确定性轻松连续性Pattern nonDetermin = start.followedByAny("middle").where(...); // NOT pattern with strict contiguityPattern strictNot = start.notNext("not").where(...); // NOT pattern with relaxed contiguityPattern relaxedNot = start.notFollowedBy("not").where(...);

notNext() 如果不希望一个事件类型紧接着另一个类型出现。

注意 模式序列不能以notFollowedBy()结束。

注意 not模式前面不能有可选模式。

1.2.2 相关api

begin(#name) 或者 begin(#pattern_sequence)

定义一个开始模式

Pattern start = Pattern.begin("start");

next(#name) 或者 next(#pattern_sequence)

追加一个新的模式。匹配事件必须直接跟着先前的匹配事件(严格连续性):

Pattern next = start.next("middle");

followedBy(#name) 或者 followedBy(#pattern_sequence)

追加加新模式。 匹配事件和先前匹配事件(宽松连续)之间可能发生其他非匹配事件:

Pattern followedBy = start.followedBy("middle");

notNext()

添加新的否定模式。 匹配(否定)事件必须直接跟着先前的匹配事件(严格连续性)才能丢弃部分匹配:

Pattern notNext = start.notNext("not");

 notFollowedBy()

追加一个新的否定模式匹配。即使在匹配(否定)事件和先前匹配事件(宽松连续性)之间发生其他事件,也将丢弃部分匹配事件序列:

Pattern notFollowedBy = start.notFollowedBy("not");

within(time)

定义事件序列进行模式匹配的最大时间间隔。 如果未完成的事件序列超过此时间,则将其丢弃:

pattern.within(Time.seconds(10));

1.2.3 匹配后的跳过策略

对于给定模式,可以将同一事件分配给多个成功匹配。 要控制将分配事件的匹配数,需要指定名为AfterMatchSkipStrategy的跳过策略。

NO_SKIP:将发出每个可能的匹配。

SKIP_PAST_LAST_EVENT:丢弃包含匹配事件的每个部分匹配。

SKIP_TO_FIRST:丢弃包含PatternName第一个之前匹配事件的每个部分匹配。

SKIP_TO_LAST:丢弃包含PatternName最后一个匹配事件之前的每个部分匹配。

AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();Pattern.begin("patternName", strategy );

1.3 检测模式-Detecting Patterns

指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。 

PatternStream patternStream = CEP.pattern(dataStream.keyBy(DataModel::getCode), pattern);

1.4 读取匹配数据

获得PatternStream后,您可以通过select或flatSelect方法从检测到的事件序列中进行查询。

SingleOutputStreamOperator select = patternStream.select(new MyPatternSelectFunction());select.print();public static class MyPatternSelectFunction implements PatternSelectFunction { @Override public com.demo.entity.ResultModel select(Map> pattern) { com.demo.entity.DataModel startEvent = pattern.get("start").get(0); com.demo.entity.DataModel endEvent = pattern.get("end").get(0); return new ResultModel(startEvent.getCode(), startEvent.getPrice(), endEvent.getPrice()); } }

1.5 处理超时部分模式

每当模式具有通过within关键字附加的时间窗口长度时,部分事件序列可能因为超出时间窗口长度而被丢弃。 为了对这些超时的部分匹配作出相应的处理,select和flatSelect API调用允许指定超时处理程序。

PatternStreamPatte patternStream = CEP.pattern(input, pattern); OutputTag outputTag = new OutputTag("side-output"){}; SingleOutputStreamOperator result = patternStream.select( new PatternTimeoutFunction() {...}, outputTag, new PatternSelectFunction() {...}); // 获取超时数据DataStream timeoutResult = result.getSideOutput(outputTag); // flatSelect方式SingleOutputStreamOperator flatResult = patternStream.flatSelect( new PatternFlatTimeoutFunction() {...}, outputTag, new PatternFlatSelectFunction() {...}); DataStream timeoutFlatResult = flatResult.getSideOutput(outputTag);

1.6 事件事件模式下处理滞后数据

在CEP中,元素处理的顺序很重要。为了保证在采用事件事件时以正确的顺序处理事件,最初将传入的事件放入缓冲区,其中事件基于它们的时间戳以升序排序,

请注意,在采用事件时间时,CEP library会假设watermark是正确的。

为了保证跨watermark的记录按照事件时间顺序处理,Flink的CEP库假定watermark是正确的,并将时间戳小于上次可见watermark的时间视为滞后事件。滞后事件不会被进一步处理。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。