欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flinkelasticsearchsink

时间:2023-05-04

flink elasticsearch sink 目前我这边电脑资源不够耍,写入虚拟机很慢数据有差异.
1.data 数据

sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718207,36.3sensor_1,1547718209,32.8sensor_1,1547718212,37.1sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718207,36.3sensor_1,1547718209,32.8sensor_1,1547718212,37.1

2.pom.xml

org.apache.flink flink-connector-elasticsearch6_2.11 1.10.1

3.java代码

public class sink_es { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource txtSink = env.readTextFile("D:\ideaProject\flink-java\flink-java-api\src\main\resources\data.txt"); DataStream streamOperator = txtSink.map(new MapFunction() { public SensorReading map(String line) throws Exception { String[] split = line.split(","); return new SensorReading(split[0], new Long(split[1]), new Double(split[2])); } }); streamOperator.print(); //定义 httphost ArrayList list = new ArrayList(); list.add(new HttpHost("192.168.174.204",9200)); list.add(new HttpHost("192.168.174.205",9200)); ElasticsearchSink readingElasticsearchSink = new ElasticsearchSink.Builder(list, new ElasticsearchSinkFunction() { public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { //定义写入的数据source HashMap map = new HashMap(); map.put("id", sensorReading.getId()); map.put("temp", sensorReading.getTemperature().toString()); map.put("ts", sensorReading.getTimestamp().toString()); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest().index("book").type("serson").source(map); //用 requestIndexer 发送最后的请求 requestIndexer.add(indexRequest); } }).build(); streamOperator.addSink(readingElasticsearchSink); env.execute(); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。