欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flink1.10.1cepjava版本实现复杂事件模式匹配

时间:2023-07-05

本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket),在此基础上增加cep实现复杂模式匹配测试。

1、添加依赖

org.apache.flink flink-cep_2.11 1.10.1

这里的版本应保持与flink版本一致,如果版本不一致,可能会导致各种错误。

2、程序代码

package com.demo.cep;import com.demo.entity.DataModel;import com.demo.entity.ResultModel;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.cep.CEP;import org.apache.flink.cep.PatternSelectFunction;import org.apache.flink.cep.PatternStream;import org.apache.flink.cep.pattern.Pattern;import org.apache.flink.cep.pattern.conditions.IterativeCondition;import org.apache.flink.cep.pattern.conditions.SimpleCondition;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;import java.util.Map;public class FlinkCEPDemo { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从文件中读取数据 String inputPath = "data/testdata.txt"; DataStream inputDataSet = env.readTextFile(inputPath); //对数据流进行处理转换 DataStream dataStream = inputDataSet.map(new MapFunction() { @Override public DataModel map(String s) throws Exception { String splits[] = s.split(","); return new DataModel(splits[0], new Float(splits[1]), new Float(splits[2]), new Float(splits[3]), new Float(splits[4])); } }); Pattern pattern = Pattern.begin("start").where( new SimpleCondition() { @Override public boolean filter(DataModel dataModel) throws Exception { return dataModel.getPrice() > 4; } }) .times(1) .followedBy("middle").where( new SimpleCondition() { @Override public boolean filter(DataModel dataModel) throws Exception { return dataModel.getPrice() <= 3; } }).times(2, 4).greedy() .followedBy("end").where( new IterativeCondition() { @Override public boolean filter(DataModel dataModel, Context ctx) throws Exception { // 取得当前事件对象 double sum = dataModel.getPrice(); // 获取满足middle条件的事件对象 Iterable middle = ctx.getEventsForPattern("middle"); for (DataModel model : middle) { sum += model.getPrice(); } return dataModel.getPrice() > 4; } }); PatternStream patternStream = CEP.pattern(dataStream.keyBy(DataModel::getCode), pattern); SingleOutputStreamOperator select = patternStream.select(new MyPatternSelectFunction()); select.print(); env.execute(); } public static class MyPatternSelectFunction implements PatternSelectFunction { @Override public com.demo.entity.ResultModel select(Map> pattern) { com.demo.entity.DataModel startEvent = pattern.get("start").get(0); com.demo.entity.DataModel endEvent = pattern.get("end").get(0); return new ResultModel(startEvent.getCode(), startEvent.getPrice(), endEvent.getPrice()); } }}

程序代码从大于4的数据开始,找到连续2到4个小于3的数据,尽可能匹配,然后再找到一个大于4的数据,则表示模式匹配完成。

3、辅助代码

package com.demo.entity;public class DataModel { private String code; private float price; private float high; private float low; private float open; public DataModel() { } public DataModel(String code, float price, float high, float low, float open) { this.code = code; this.price = price; this.high = high; this.low = low; this.open = open; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } public float getHigh() { return high; } public void setHigh(float high) { this.high = high; } public float getLow() { return low; } public void setLow(float low) { this.low = low; } public float getOpen() { return open; } public void setOpen(float open) { this.open = open; } @Override public String toString() { return "DataModel{" + "code='" + code + ''' + ", price=" + price + ", high=" + high + ", low=" + low + ", open=" + open + '}'; }}

4、结果类定义

package com.demo.entity;public class ResultModel { private String code; private float startPrice; private float endPrice; public ResultModel() { } public ResultModel(String code, float startPrice, float endPrice) { this.code = code; this.startPrice = startPrice; this.endPrice = endPrice; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public float getStartPrice() { return startPrice; } public void setStartPrice(float startPrice) { this.startPrice = startPrice; } public float getEndPrice() { return endPrice; } public void setEndPrice(float endPrice) { this.endPrice = endPrice; } @Override public String toString() { return "ResultModel{" + "code='" + code + ''' + ", startPrice=" + startPrice + ", endPrice=" + endPrice + '}'; }}

5、测试数据

data/testdata.txt

0000001,1,1,1,10000001,1,1,1,10000001,1,1,1,10000001,5,1,1,10000001,2,1,1,10000001,1,1,1,10000001,2,1,1,10000001,3,1,1,10000001,6,1,1,1

6、启动程序,执行测试

可以看到找到匹配的数据,该数据有模式匹配的第一条数据和最后一条数据组合而成。

ResultModel{code='0000001', startPrice=5.0, endPrice=6.0}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。