本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket),在此基础上增加cep实现复杂模式匹配测试。
1、添加依赖这里的版本应保持与flink版本一致,如果版本不一致,可能会导致各种错误。
2、程序代码package com.demo.cep;import com.demo.entity.DataModel;import com.demo.entity.ResultModel;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.cep.CEP;import org.apache.flink.cep.PatternSelectFunction;import org.apache.flink.cep.PatternStream;import org.apache.flink.cep.pattern.Pattern;import org.apache.flink.cep.pattern.conditions.IterativeCondition;import org.apache.flink.cep.pattern.conditions.SimpleCondition;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;import java.util.Map;public class FlinkCEPDemo { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从文件中读取数据 String inputPath = "data/testdata.txt"; DataStream
程序代码从大于4的数据开始,找到连续2到4个小于3的数据,尽可能匹配,然后再找到一个大于4的数据,则表示模式匹配完成。
3、辅助代码package com.demo.entity;public class DataModel { private String code; private float price; private float high; private float low; private float open; public DataModel() { } public DataModel(String code, float price, float high, float low, float open) { this.code = code; this.price = price; this.high = high; this.low = low; this.open = open; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } public float getHigh() { return high; } public void setHigh(float high) { this.high = high; } public float getLow() { return low; } public void setLow(float low) { this.low = low; } public float getOpen() { return open; } public void setOpen(float open) { this.open = open; } @Override public String toString() { return "DataModel{" + "code='" + code + ''' + ", price=" + price + ", high=" + high + ", low=" + low + ", open=" + open + '}'; }}
4、结果类定义package com.demo.entity;public class ResultModel { private String code; private float startPrice; private float endPrice; public ResultModel() { } public ResultModel(String code, float startPrice, float endPrice) { this.code = code; this.startPrice = startPrice; this.endPrice = endPrice; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public float getStartPrice() { return startPrice; } public void setStartPrice(float startPrice) { this.startPrice = startPrice; } public float getEndPrice() { return endPrice; } public void setEndPrice(float endPrice) { this.endPrice = endPrice; } @Override public String toString() { return "ResultModel{" + "code='" + code + ''' + ", startPrice=" + startPrice + ", endPrice=" + endPrice + '}'; }}
5、测试数据data/testdata.txt
0000001,1,1,1,10000001,1,1,1,10000001,1,1,1,10000001,5,1,1,10000001,2,1,1,10000001,1,1,1,10000001,2,1,1,10000001,3,1,1,10000001,6,1,1,1
6、启动程序,执行测试可以看到找到匹配的数据,该数据有模式匹配的第一条数据和最后一条数据组合而成。
ResultModel{code='0000001', startPrice=5.0, endPrice=6.0}