欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

MyReduce经典习题练习

时间:2023-06-17
实现过程所需的java代码 1,在hdfs目录/tmp/input/wordcount中有一系列文件,内容均为","号分隔, 求按","号分隔的各个元素的出现频率,输出到目录/tmp/个人用户名的hdfs目录中。

实现代码:

package com.tledu.hadoop.mr.homework;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.StringTokenizer;public class HomeWork1 { public static class HomeWork1Mapper extends Mapper { Text word = new Text(); IntWritable one = new IntWritable(1); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer str = new StringTokenizer(value.toString(), ","); while (str.hasMoreTokens()) { word.set(str.nextToken()); context.write(word, one); } } } public static class HomeWork1Reducer extends Reducer { IntWritable sumRes = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } sumRes.set(sum); context.write(key, sumRes); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 1、创建配置 Configuration conf = new Configuration(); // 2、创建任务 Job job = Job.getInstance(conf, "homework1"); // 3、设置对应类 job.setJarByClass(HomeWork1.class); job.setMapperClass(HomeWork1Mapper.class); job.setCombinerClass(HomeWork1Reducer.class); job.setReducerClass(HomeWork1Reducer.class); // 4、输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 5、输入和输出的路径 // 获取所有输入的txt文件 List fileList = getTxtFileListFromPath(args[0]); for (String filePath : fileList) { FileInputFormat.addInputPath(job, new Path(filePath)); } FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6、启动任务 System.exit(job.waitForCompletion(true)?0:1); } public static List getTxtFileListFromPath(String folderPath) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(folderPath); // 1、获取文件夹下所有的文件 FileStatus[] statuses = fs.listStatus(path); // 对应的txt文件的地址列表 List list = new ArrayList<>(); for (FileStatus fileStatus : statuses) { // 获取到对应的文件 Path file = fileStatus.getPath(); // 读取的是txt文件 if (fileStatus.isFile() && file.getName().endsWith(".txt")) { list.add(file.toString()); } // 如果是文件夹 else if (fileStatus.isDirectory()) { list.addAll(getTxtFileListFromPath(file.toString())); } } return list; }}

2、在hdfs目录/tmp/input/wordcount目录中有一系列文件,内容为","号分隔,分隔后的元素均为数值类型、字母、中文,求所有出现的数值的平均值

实现代码:

package com.tledu.hadoop.mr.homework;import com.tledu.hadoop.utils.RegUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.StringTokenizer;public class HomeWork3 { public static class HomeWork3Mapper extends Mapper { Text word = new Text("sum="); DoubleWritable number = new DoubleWritable(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String path = ((FileSplit)context.getInputSplit()).getPath().toString();//最后一个斜线的位置 int index = path.lastIndexOf("/");//得到输入文件的文件名 String fileName = path.substring(index+1); StringTokenizer str = new StringTokenizer(value.toString(), ","); while (str.hasMoreTokens()) { String text = str.nextToken(); if (RegUtils.isNumber(text)) { number.set(Double.parseDouble(text)); context.write(word, number); } } } } public static class HomeWork3Reducer extends Reducer { DoubleWritable sumRes = new DoubleWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { double sum = 0; int count = 0; for (DoubleWritable val : values) { sum += val.get(); count++; } System.out.println(sum / count); sumRes.set(sum / count); context.write(key, sumRes); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 1、创建配置 Configuration conf = new Configuration(); // 2、创建任务 Job job = Job.getInstance(conf, "homework2"); // 3、设置对应类 job.setJarByClass(HomeWork3.class); job.setMapperClass(HomeWork3Mapper.class);// job.setCombinerClass(HomeWork3Reducer.class); job.setReducerClass(HomeWork3Reducer.class); // 4、输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); // 5、输入和输出的路径 // 获取所有输入的txt文件 List fileList = getTxtFileListFromPath(args[0]); for (String filePath : fileList) { FileInputFormat.addInputPath(job, new Path(filePath)); } FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6、启动任务 System.exit(job.waitForCompletion(true)?0:1); } public static List getTxtFileListFromPath(String folderPath) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(folderPath); // 1、获取文件夹下所有的文件 FileStatus[] statuses = fs.listStatus(path); // 对应的txt文件的地址列表 List list = new ArrayList<>(); for (FileStatus fileStatus : statuses) { // 获取到对应的文件 Path file = fileStatus.getPath(); // 读取的是txt文件 if (fileStatus.isFile() && file.getName().endsWith(".txt")) { list.add(file.toString()); } // 如果是文件夹 else if (fileStatus.isDirectory()) { list.addAll(getTxtFileListFromPath(file.toString())); } } return list; }}

3,在hdfs目录/tmp/input/wordcount目录中有一系列文件,内容为","号分隔,同时在hdfs路径/tmp/black.txt黑名单文件,一行一个单词用于存放不记入统计的单词列表。求按","号分隔的各个元素去除掉黑名单后的出现频率,输出到目录/tmp/output/个人用户名的hdfs目录中。

实现代码:

package com.tledu.hadoop.mr.homework;import com.tledu.hadoop.utils.RegUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.StringTokenizer;public class HomeWork5 { public static class HomeWork5Mapper extends Mapper { Text word = new Text(); IntWritable one = new IntWritable(1); IntWritable blackVal = new IntWritable(0); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String path = ((FileSplit) context.getInputSplit()).getPath().toString(); int index = path.lastIndexOf("/"); //得到输入文件的文件名 String fileName = path.substring(index+1); if (fileName.contains("blacklist")) { // 是黑名单的内容 word.set(value.toString()); context.write(word,blackVal); }else { StringTokenizer str = new StringTokenizer(value.toString(), ","); while (str.hasMoreTokens()) { word.set(str.nextToken()); context.write(word, one); } } } } public static class HomeWork5Reducer extends Reducer { IntWritable sumRes = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; boolean isBlack = false; for (IntWritable val : values) { // 有一项等于0了,说明它属于黑名单 if (val.get() == 0) { // 这是黑名单 isBlack = true; break; } sum += val.get(); } sumRes.set(sum); if (!isBlack) { context.write(key, sumRes); } } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 1、创建配置 Configuration conf = new Configuration(); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); // 2、创建任务 Job job = Job.getInstance(conf, "homework3"); // 3、设置对应类 job.setJarByClass(HomeWork5.class); job.setMapperClass(HomeWork5Mapper.class);// job.setCombinerClass(HomeWork5Reducer.class); job.setReducerClass(HomeWork5Reducer.class); // 4、输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 5、输入和输出的路径 // 获取所有输入的txt文件 List fileList = getTxtFileListFromPath(remainingArgs[0]); for (String filePath : fileList) { FileInputFormat.addInputPath(job, new Path(filePath)); } FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1])); // 6、启动任务 System.exit(job.waitForCompletion(true)?0:1); } public static List getTxtFileListFromPath(String folderPath) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(folderPath); // 1、获取文件夹下所有的文件 FileStatus[] statuses = fs.listStatus(path); // 对应的txt文件的地址列表 List list = new ArrayList<>(); for (FileStatus fileStatus : statuses) { // 获取到对应的文件 Path file = fileStatus.getPath(); // 读取的是txt文件 if (fileStatus.isFile() && file.getName().endsWith(".txt")) { list.add(file.toString()); } // 如果是文件夹 else if (fileStatus.isDirectory()) { list.addAll(getTxtFileListFromPath(file.toString())); } } return list; }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。