从ES中读取数据,并将数据输出到本地(本地执行ES)
pom文件mapper程序如下:job执行文件
本程序总共需要两个文件,一个是job执行文件,另一个是对读取的数据进行处理的mapper文件。
因程序是maven程序,需要有一个pom文件,pom文件如下:
pom文件<?xml version="1.0" encoding="UTF-8"?>
package com.es_hadoop_example;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.elasticsearch.hadoop.mr.EsInputFormat;import org.elasticsearch.hadoop.mr.linkedMapWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class E2HJob { private static Logger LOG = LoggerFactory.getLogger(E2HJob.class); public static void main(String[] args) { try { Configuration conf = new Configuration(); //禁止speculative(推测执行)机制,该机制会启动多个相同task,使数据重复索引 conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); conf.set("es.nodes", "127.0.0.1:9200"); //ElasticSearch节点 conf.set("es.resource", "ecommerce/product"); //ElaticSearch source: Index/Type// conf.set("es.resource", "user/_doc");// conf.set("es.resource", "kibana_sample_data_ecommerce/_doc"); //无法执行成功??? conf.set("es.nodes.wan.only","true"); // 禁用网络中其他节点的自动发现.强制系统使用“es.nodes”属性,默认情况下会尝试连接到 localhost. Job job = Job.getInstance(conf, "JOBE2H"); //构建job对象 job.setJarByClass(E2HJob.class); //指定jar包运行主类 job.setInputFormatClass(EsInputFormat.class); //指定输入格式的类 job.setMapperClass(E2HMapper.class); //指定map类 job.setMapOutputKeyClass(Text.class); //指定map输出的 key的类 job.setMapOutputValueClass(linkedMapWritable.class); //指定map输出的 value的类 FileSystem fs = FileSystem.get(conf); Path outPath =new Path("D:\test\es_data"); if(fs.exists(outPath)) { fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); //指定输出路径 System.out.println(job.waitForCompletion(true));//打印执行结果,结果为true表明执行成功 } catch (Exception e) { LOG.error(e.getMessage(), e); } }}