欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

【hadoop学习项目】9.存在依赖关系的多个MapReduce处理

时间:2023-06-18
0、项目结构


数据内容
公司名 总收入 总支出

1 apple 1520 1002 apple 3421 2543 apple 4500 3641 huawei 3700 2542 huawei 2700 3543 huawei 5700 5541 xiaomi 3521 2542 xiaomi 3123 3543 xiaomi 3412 554

目标:获得公司总利润,即总收入-总支出

1、company

DoubleMr.java

package hadoop_test.mutil_mr_10.company;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class DoubleMr implements WritableComparable {private String name;private int profit;@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeInt(profit);}@Overridepublic void readFields(DataInput in) throws IOException {this.name=in.readUTF();this.profit=in.readInt();}@Overridepublic int compareTo(DoubleMr o) {return this.profit-o.profit;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getProfit() {return profit;}public void setProfit(int profit) {this.profit = profit;}@Overridepublic String toString() {return name+" "+profit;}}

2、mr1 FirstMrDriver

package hadoop_test.mutil_mr_10.mr1;import hadoop_test.Utils_hadoop;import hadoop_test.mutil_mr_10.company.DoubleMr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.awt.*;public class FirstMrDriver {public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "root");Configuration conf=new Configuration();Job job=Job.getInstance(conf);job.setJarByClass(FirstMrDriver.class);job.setMapperClass(FirstMrMapper.class);job.setReducerClass(FirstMrReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleMr.class);job.setOutputKeyClass(DoubleMr.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new Path("/hadoop_test/muti_mr/mutil_mr.txt"));FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/muti_mr/result"));job.waitForCompletion(true);}}

FirstMrMapper

package hadoop_test.mutil_mr_10.mr1;import hadoop_test.mutil_mr_10.company.DoubleMr;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FirstMrMapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] data = line.split(" ");DoubleMr mr = new DoubleMr();//作为keyOutmr.setName(data[1]);//计算利润作为valueOutmr.setProfit(Integer.parseInt(data[2])-Integer.parseInt(data[3]));context.write(new Text(mr.getName()), mr);}}

FirstMrReducer

package hadoop_test.mutil_mr_10.mr1;import hadoop_test.mutil_mr_10.company.DoubleMr;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FirstMrReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {DoubleMr tmp = new DoubleMr();tmp.setName(key.toString());for(DoubleMr mr:values){tmp.setProfit(tmp.getProfit() + mr.getProfit());}context.write(tmp, NullWritable.get());}}

3、mr2 SecondMrDriver

package hadoop_test.mutil_mr_10.mr2;import hadoop_test.Utils_hadoop;import hadoop_test.mutil_mr_10.company.DoubleMr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SecondMrDriver {public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "root");Configuration conf=new Configuration();Job job=Job.getInstance(conf);job.setJarByClass(SecondMrDriver.class);job.setMapperClass(SecondMrMapper.class);job.setMapOutputKeyClass(DoubleMr.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,new Path("/hadoop_test/muti_mr/result"));FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/muti_mr/result01"));job.waitForCompletion(true);}}

依赖于mr1输出的文件结果result01,只有等到结果输出后,mr2中才能正常启动

SecondMapper

package hadoop_test.mutil_mr_10.mr2;import hadoop_test.mutil_mr_10.company.DoubleMr;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class SecondMrMapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] data = line.split(" ");DoubleMr mr = new DoubleMr();mr.setName(data[0]);mr.setProfit(Integer.parseInt(data[1]));//存在问题,是因为mapper端,是局部排序,要想全局排序//context.write(1, mr);context.write(mr, NullWritable.get());}}

输出结果
mr1输出

经过mr2后

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。