欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

从ES中读取数据,并将数据输出到本地(本地执行)

时间:2023-06-30
从ES中读取数据,并将数据输出到本地(本地执行ES) 文章目录

从ES中读取数据,并将数据输出到本地(本地执行ES)

pom文件mapper程序如下:job执行文件
本程序总共需要两个文件,一个是job执行文件,另一个是对读取的数据进行处理的mapper文件。

因程序是maven程序,需要有一个pom文件,pom文件如下:

pom文件

<?xml version="1.0" encoding="UTF-8"?> 4.0.0 org.example ES2hadoop 1.0-SNAPSHOT huaweicloudsdk https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ true true alimaven aliyun maven https://maven.aliyun.com/repository/public true false 1.18.14 1.8 8 8 UTF-8 UTF-8 org.apache.hadoop hadoop-client 3.1.1-hw-ei-312005 provided org.elasticsearch elasticsearch-hadoop 7.16.2 org.slf4j slf4j-api 1.7.28 org.apache.hadoop hadoop-common 3.1.1-hw-ei-312005 org.apache.hadoop hadoop-mapreduce-client-core 3.1.1-hw-ei-312005 org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.google.code.findbugs:jsr305 org.slf4j:* log4j:* org.apache.hadoop:* *:* meta-INF @Override protected void map(Text key, linkedMapWritable value, Context context) throws IOException, InterruptedException { LOG.info("key {} value {}", key, value); context.write(key, value); //数据不做任何处理,直接输出 } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); }}

job执行文件

package com.es_hadoop_example;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.elasticsearch.hadoop.mr.EsInputFormat;import org.elasticsearch.hadoop.mr.linkedMapWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class E2HJob { private static Logger LOG = LoggerFactory.getLogger(E2HJob.class); public static void main(String[] args) { try { Configuration conf = new Configuration(); //禁止speculative(推测执行)机制,该机制会启动多个相同task,使数据重复索引 conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); conf.set("es.nodes", "127.0.0.1:9200"); //ElasticSearch节点 conf.set("es.resource", "ecommerce/product"); //ElaticSearch source: Index/Type// conf.set("es.resource", "user/_doc");// conf.set("es.resource", "kibana_sample_data_ecommerce/_doc"); //无法执行成功??? conf.set("es.nodes.wan.only","true"); // 禁用网络中其他节点的自动发现.强制系统使用“es.nodes”属性,默认情况下会尝试连接到 localhost. Job job = Job.getInstance(conf, "JOBE2H"); //构建job对象 job.setJarByClass(E2HJob.class); //指定jar包运行主类 job.setInputFormatClass(EsInputFormat.class); //指定输入格式的类 job.setMapperClass(E2HMapper.class); //指定map类 job.setMapOutputKeyClass(Text.class); //指定map输出的 key的类 job.setMapOutputValueClass(linkedMapWritable.class); //指定map输出的 value的类 FileSystem fs = FileSystem.get(conf); Path outPath =new Path("D:\test\es_data"); if(fs.exists(outPath)) { fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); //指定输出路径 System.out.println(job.waitForCompletion(true));//打印执行结果,结果为true表明执行成功 } catch (Exception e) { LOG.error(e.getMessage(), e); } }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。