Map and reduce – what happens?

In Big Data, the concept of mapping and reducing plays a huge role. The idea is that a a massive dataset is split over several servers. On each server, a part of the data is investigated. This part is called a mapper. In a subsequent part, these parts are merged into an outcome. This latter part is called the reduce part. The communication between these two parts go along key-value pairs.
In a well-known example (MaxTemperature), this mechanism is demonstrated in a Java programme. This programme consists of 3 classes: a supervisory programme, that is shown below.

// cc MaxTemperature Application to find the maximum temperature in the weather dataset
// vv MaxTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("invoer is " + args[0]);
System.out.println("uitvoer is " + args[1]);
System.err.println("Usage: MaxTemperature [input path] [output path]");
System.exit(-1);
}
@SuppressWarnings("deprecation")
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
System.out.println("invoer is " + args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.out.println("uitvoer is " + args[1]);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// ^^ MaxTemperature

This programme calls two other classes. The call is done via job.setMapperClass, which is coded below:

// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.util.Date;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
File file = new File("/home/hduser/example-mapper.txt");
if (!file.exists()) {
	file.createNewFile();
};
FileWriter fw = new FileWriter(file.getAbsoluteFile(),true);
BufferedWriter output = new BufferedWriter(fw);
Date date = new Date();
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
output.append("mappert is jaar " + date.toString() +">"+ year + " temp  " + airTemperature + "\n");
output.close();
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// ^^ MaxTemperatureMapper

In this class, the input is read as a key value pair. On its turn the output is written as a new key value pair. This key value pair consists of a year and a temperature measurement. To know exactly what values are communicated, the key-value pairs are written to a file. The file (“/home/hduser/example-mapper.txt”) contains these lines:

mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp  12341
mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp  12342
mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp  12343
mappert is jaar Mon Jun 15 05:58:29 PDT 2015>1975 temp  12345

The value pairs that are communicated are 1975 – 12341, 1975 12342 etc, . The resulting key value pair are processed in the subsequent reducer part that has this code:


// cc MaxTemperatureReducer Reducer for maximum temperature example
// vv MaxTemperatureReducer
import java.io.IOException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer {
@Override
public void reduce(Text key, Iterable values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
File file = new File("/home/hduser/example-reducer.txt");
BufferedWriter output = new BufferedWriter(new FileWriter(file));
for (IntWritable value : values) {
int waarde = value.get();
maxValue = Math.max(maxValue, waarde);
output.write("mappert is gelezen waarde " + waarde + " max  " + maxValue + "\n");
}
context.write(key, new IntWritable(maxValue));
output.close();
};

}
// ^^ MaxTemperatureReducer

Also, in this part a file is written that contains the values as they are processed. The values are 12345, 12343 etc

mappert is gelezen waarde 12345 max  12345
mappert is gelezen waarde 12343 max  12345
mappert is gelezen waarde 12342 max  12345
mappert is gelezen waarde 12341 max  12345

From these values the maximum is calculated.
The final result(key and maximum) can finally be read in the hdfs file with:
/usr/local/hadoop/bin/hadoop dfs -cat /user/output51/part-r-00000. This shows: 1975 12345, which is the final outcome of this exercise.

Door tom