In Big Data, the concept of mapping and reducing plays a huge role. The idea is that a a massive dataset is split over several servers. On each server, a part of the data is investigated. This part is called a mapper. In a subsequent part, these parts are merged into an outcome. This latter part is called the reduce part. The communication between these two parts go along key-value pairs.
In a well-known example (MaxTemperature), this mechanism is demonstrated in a Java programme. This programme consists of 3 classes: a supervisory programme, that is shown below.
// cc MaxTemperature Application to find the maximum temperature in the weather dataset // vv MaxTemperature import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.out.println("invoer is " + args[0]); System.out.println("uitvoer is " + args[1]); System.err.println("Usage: MaxTemperature [input path] [output path]"); System.exit(-1); } @SuppressWarnings("deprecation") Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); System.out.println("invoer is " + args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.out.println("uitvoer is " + args[1]); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } // ^^ MaxTemperature
This programme calls two other classes. The call is done via job.setMapperClass, which is coded below:
// cc MaxTemperatureMapper Mapper for maximum temperature example // vv MaxTemperatureMapper import java.io.IOException; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper{ private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); File file = new File("/home/hduser/example-mapper.txt"); if (!file.exists()) { file.createNewFile(); }; FileWriter fw = new FileWriter(file.getAbsoluteFile(),true); BufferedWriter output = new BufferedWriter(fw); Date date = new Date(); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } output.append("mappert is jaar " + date.toString() +">"+ year + " temp " + airTemperature + "\n"); output.close(); String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } // ^^ MaxTemperatureMapper
In this class, the input is read as a key value pair. On its turn the output is written as a new key value pair. This key value pair consists of a year and a temperature measurement. To know exactly what values are communicated, the key-value pairs are written to a file. The file (“/home/hduser/example-mapper.txt”) contains these lines:
mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp 12341 mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp 12342 mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp 12343 mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp 12345
The value pairs that are communicated are 1975 – 12341, 1975 12342 etc, . The resulting key value pair are processed in the subsequent reducer part that has this code:
// cc MaxTemperatureReducer Reducer for maximum temperature example // vv MaxTemperatureReducer import java.io.IOException; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer{ @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; File file = new File("/home/hduser/example-reducer.txt"); BufferedWriter output = new BufferedWriter(new FileWriter(file)); for (IntWritable value : values) { int waarde = value.get(); maxValue = Math.max(maxValue, waarde); output.write("mappert is gelezen waarde " + waarde + " max " + maxValue + "\n"); } context.write(key, new IntWritable(maxValue)); output.close(); }; } // ^^ MaxTemperatureReducer
Also, in this part a file is written that contains the values as they are processed. The values are 12345, 12343 etc
mappert is gelezen waarde 12345 max 12345 mappert is gelezen waarde 12343 max 12345 mappert is gelezen waarde 12342 max 12345 mappert is gelezen waarde 12341 max 12345
From these values the maximum is calculated.
The final result(key and maximum) can finally be read in the hdfs file with:
/usr/local/hadoop/bin/hadoop dfs -cat /user/output51/part-r-00000. This shows: 1975 12345, which is the final outcome of this exercise.