In this blog, I will discuss the word count problem as done with Python. It is often used to show how map reduce works. In most examples, it is developed within the context of a Java programme. The idea is that the programme is split into two stages. In one stage, calculations are made on a part of the total data set. The results are stored in a name, value set. This stage is called the mapper stage. In a subsequent stage, the reduce stage, the results as stored in name, value sets are taken together. They are summarised into a single solution. This stage is called the reduce stage and it will generate one final outcome.
In Python, the mapper programme looks like:
#!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)
The outcomes are stored as word,1. This is the name, value set that is sent by the mapper as an intermediate outcome. This programme can also be run on its own via:
/usr/local/hadoop/bin/hadoop dfs -cat /user/drink/drink | /home/hduser/mapper.py
hduser@ubuntu:~$ /usr/local/hadoop/bin/hadoop dfs -cat /user/drink/drink | /home/hduser/mapper.py DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 15/10/18 12:27:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable choco 1 sinas 1 cola 1 water 1 hduser@ubuntu:~$
The programme works on streams. Hence the input must be generated as a stream that is sent to the STDIN.
The second part, the reduce stage looks like:
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
Both parts can be run via:
/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar -file /home/hduser/mapper.py -mapper /home/hduser/mapper.py -file /home/hduser/reducer.py -reducer /home/hduser/reducer.py -input /user/drink/* -output /user/output91
The intermediate results are sent as a stream from mapper to reducer. To handle this on the hadoop platform, the streaming jar is used. Roughly stated, the system works as follows: stream input > mapper > reducer > outcome. Thanks to the hadoop streaming jar, we do not have to indicate that intermediate outcomes are generated on one server and are sent to another server where the reduce part happens.