Python: yet another way to implement map/ reduce

In this blog, I will discuss the word count problem as done with Python. It is often used to show how map reduce works. In most examples, it is developed within the context of a Java programme. The idea is that the programme is split into two stages. In one stage, calculations are made on a part of the total data set. The results are stored in a name, value set. This stage is called the mapper stage. In a subsequent stage, the reduce stage, the results as stored in name, value sets are taken together. They are summarised into a single solution. This stage is called the reduce stage and it will generate one final outcome.
In Python, the mapper programme looks like:

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

The outcomes are stored as word,1. This is the name, value set that is sent by the mapper as an intermediate outcome. This programme can also be run on its own via:

/usr/local/hadoop/bin/hadoop dfs -cat /user/drink/drink | /home/hduser/

leading to:

hduser@ubuntu:~$ /usr/local/hadoop/bin/hadoop dfs -cat /user/drink/drink | /home/hduser/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

15/10/18 12:27:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
choco	1
sinas	1
cola	1
water	1

The programme works on streams. Hence the input must be generated as a stream that is sent to the STDIN.
The second part, the reduce stage looks like:

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Both parts can be run via:

/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar 
-file /home/hduser/  -mapper /home/hduser/
-file /home/hduser/  -reducer /home/hduser/
-input /user/drink/* 
-output /user/output91

The intermediate results are sent as a stream from mapper to reducer. To handle this on the hadoop platform, the streaming jar is used. Roughly stated, the system works as follows: stream input > mapper > reducer > outcome. Thanks to the hadoop streaming jar, we do not have to indicate that intermediate outcomes are generated on one server and are sent to another server where the reduce part happens.