Python in a map reduce environment

I have written a very small python programme that follows the mapper / reducer sequence. This works as a replacement of a more complicated set of Java programmes that might be created to generate a mapper / reducer sequence. The idea is relatively simple. We create a stream from an input file. That stream is processed by a mapper programme (written in Python) that produces a series of name, value pairs. These value pairs are written to disk. That must then be sorted and subsequently streamed to a reducer programme (written in Python). That programme then produces the final outcome. This is also written to disk.
Such logic is used on a hadoop platform. The idea to stream data to a mapper is translated on a hadoop platform as something that can be run concurrently on different nodes. After that the intermediate output stream is sent a stream of name, value pairs to a reducer where the final calculations are made. These results are sent as a file with name, value pairs.
Let us first look at an input file that is used to be analysed. It looks like:

2013-10-09	13:22	Gouda	Wafels	2.98	Visa
2013-10-09	13:22	New York	Iphone	455.76	tMasterCard
2013-10-09	13:22	New York	Rommel	354.76	tMasterCard
2016-10-09	I/O error

This input set is processed by this Python programme:

#!/usr/bin/python
# Your task is to make sure that this mapper code does not fail on corrupt data lines,
# but instead just ignores them and continues working
import sys

def mapper():
    # read standard input line by line
    for line in sys.stdin:
        # strip off extra whitespace, split on tab and put the data in an array
        data = line.strip().split("\t")

        # This is the place you need to do some defensive programming
        # what if there are not exactly 6 fields in that line?
        if len(data) != 6:
            continue

        # this next line is called 'multiple assignment' in Python
        # this is not really necessary, we could access the data
        # with data[2] and data[5], but we do this for conveniency
        # and to make the code easier to read
        date, time, store, item, cost, payment = data

        # Now print out the data that will be passed to the reducer
        print "{0}\t{1}".format(store, cost)


def main():

	mapper()
	sys.stdin = sys.__stdin__

main()

One might test whether it actually work by a command cat test | ./mapper.py. This should produce a set of name, value pairs. I got as outcome:

[training@localhost ~]$ cat test | ./mapper.py
Gouda	2.98
New York	455.76
New York	354.76

Next programme is a reducer programme that looks like:

#!/usr/bin/python

import sys


salesTotal = 0
oldKey = None

# Loop around the data
# It will be in the format key\tval
# Where key is the store name, val is the sale amount
#
# All the sales for a particular store will be presented,
# then the key will change and we'll be dealing with the next store

for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 2:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisSale = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", salesTotal
        oldKey = thisKey;
        salesTotal = 0

    oldKey = thisKey
    salesTotal += float(thisSale)

if oldKey != None:
    print oldKey, "\t", salesTotal

This can also be run on Linux:

[training@localhost ~]$ cat test | ./mapper.py | ./reducer.py
Gouda 	2.98
New York 	810.52

A next step is to run everything on hadoop as a streaming set:

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input /myinput/test -output joboutput

which generates:

packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-training/hadoop-unjar5879319460019186346/] [] /tmp/streamjob1136107035137838419.jar tmpDir=null
16/08/27 12:01:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
16/08/27 12:01:12 WARN snappy.LoadSnappy: Snappy native library is available
16/08/27 12:01:12 INFO snappy.LoadSnappy: Snappy native library loaded
16/08/27 12:01:12 INFO mapred.FileInputFormat: Total input paths to process : 1
16/08/27 12:01:12 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-hdfs/cache/training/mapred/local]
16/08/27 12:01:12 INFO streaming.StreamJob: Running job: job_201608271023_0015
16/08/27 12:01:12 INFO streaming.StreamJob: To kill this job, run:
16/08/27 12:01:12 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201608271023_0015
16/08/27 12:01:12 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201608271023_0015
16/08/27 12:01:13 INFO streaming.StreamJob:  map 0%  reduce 0%
16/08/27 12:01:16 INFO streaming.StreamJob:  map 100%  reduce 0%
16/08/27 12:01:19 INFO streaming.StreamJob:  map 100%  reduce 100%
16/08/27 12:01:21 INFO streaming.StreamJob: Job complete: job_201608271023_0015
16/08/27 12:01:21 INFO streaming.StreamJob: Output: joboutput10

The output can be inspected as

[training@localhost ~]$ hadoop fs -cat /user/training/joboutput10/part-00000
Gouda 	2.98
New York 	810.52

Door tom