I have written a very small python programme that follows the mapper / reducer sequence. This works as a replacement of a more complicated set of Java programmes that might be created to generate a mapper / reducer sequence. The idea is relatively simple. We create a stream from an input file. That stream is processed by a mapper programme (written in Python) that produces a series of name, value pairs. These value pairs are written to disk. That must then be sorted and subsequently streamed to a reducer programme (written in Python). That programme then produces the final outcome. This is also written to disk.
Such logic is used on a hadoop platform. The idea to stream data to a mapper is translated on a hadoop platform as something that can be run concurrently on different nodes. After that the intermediate output stream is sent a stream of name, value pairs to a reducer where the final calculations are made. These results are sent as a file with name, value pairs.
Let us first look at an input file that is used to be analysed. It looks like:
2013-10-09 13:22 Gouda Wafels 2.98 Visa 2013-10-09 13:22 New York Iphone 455.76 tMasterCard 2013-10-09 13:22 New York Rommel 354.76 tMasterCard 2016-10-09 I/O error
This input set is processed by this Python programme:
#!/usr/bin/python # Your task is to make sure that this mapper code does not fail on corrupt data lines, # but instead just ignores them and continues working import sys def mapper(): # read standard input line by line for line in sys.stdin: # strip off extra whitespace, split on tab and put the data in an array data = line.strip().split("\t") # This is the place you need to do some defensive programming # what if there are not exactly 6 fields in that line? if len(data) != 6: continue # this next line is called 'multiple assignment' in Python # this is not really necessary, we could access the data # with data[2] and data[5], but we do this for conveniency # and to make the code easier to read date, time, store, item, cost, payment = data # Now print out the data that will be passed to the reducer print "{0}\t{1}".format(store, cost) def main(): mapper() sys.stdin = sys.__stdin__ main()
One might test whether it actually work by a command cat test | ./mapper.py. This should produce a set of name, value pairs. I got as outcome:
[training@localhost ~]$ cat test | ./mapper.py Gouda 2.98 New York 455.76 New York 354.76
Next programme is a reducer programme that looks like:
#!/usr/bin/python import sys salesTotal = 0 oldKey = None # Loop around the data # It will be in the format key\tval # Where key is the store name, val is the sale amount # # All the sales for a particular store will be presented, # then the key will change and we'll be dealing with the next store for line in sys.stdin: data_mapped = line.strip().split("\t") if len(data_mapped) != 2: # Something has gone wrong. Skip this line. continue thisKey, thisSale = data_mapped if oldKey and oldKey != thisKey: print oldKey, "\t", salesTotal oldKey = thisKey; salesTotal = 0 oldKey = thisKey salesTotal += float(thisSale) if oldKey != None: print oldKey, "\t", salesTotal
This can also be run on Linux:
[training@localhost ~]$ cat test | ./mapper.py | ./reducer.py Gouda 2.98 New York 810.52
A next step is to run everything on hadoop as a streaming set:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input /myinput/test -output joboutput
which generates:
packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-training/hadoop-unjar5879319460019186346/] [] /tmp/streamjob1136107035137838419.jar tmpDir=null 16/08/27 12:01:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/08/27 12:01:12 WARN snappy.LoadSnappy: Snappy native library is available 16/08/27 12:01:12 INFO snappy.LoadSnappy: Snappy native library loaded 16/08/27 12:01:12 INFO mapred.FileInputFormat: Total input paths to process : 1 16/08/27 12:01:12 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-hdfs/cache/training/mapred/local] 16/08/27 12:01:12 INFO streaming.StreamJob: Running job: job_201608271023_0015 16/08/27 12:01:12 INFO streaming.StreamJob: To kill this job, run: 16/08/27 12:01:12 INFO streaming.StreamJob: UNDEF/bin/hadoop job -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201608271023_0015 16/08/27 12:01:12 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201608271023_0015 16/08/27 12:01:13 INFO streaming.StreamJob: map 0% reduce 0% 16/08/27 12:01:16 INFO streaming.StreamJob: map 100% reduce 0% 16/08/27 12:01:19 INFO streaming.StreamJob: map 100% reduce 100% 16/08/27 12:01:21 INFO streaming.StreamJob: Job complete: job_201608271023_0015 16/08/27 12:01:21 INFO streaming.StreamJob: Output: joboutput10
The output can be inspected as
[training@localhost ~]$ hadoop fs -cat /user/training/joboutput10/part-00000 Gouda 2.98 New York 810.52