Joining files with Pyspark

Pyspark allows us to process files in a big data/ Hadoop environment. I showed in another post how Pyspark can be started and how it can be used.
The concept of Pyspark is very interesting. It allows us to circumvent the limitations of the mapreduce framework. Mapreduce is somewhat limiting as we have two steps: the map phase where records are processed and a reduce phase where aggregates are processed.
Pyspark has more flexibility as we may mix both phases: first a mapping, then reduce, then going back to mapping.

To show how it works, we start with two files. One file is called “namen” and it has next content:

tom,1
ine,2
paula,3
stella,4
bart,5

A second file is called leeftijd with subsequent content:

1,59
2,58
3,28
4,26
5,22

The two files both have a key (‘1′,’2’,’3’etc.). In the first file, we have a name with the key. In the second file, we have a key with the age attached.
The purpose of my exercise is a join the two files on their key. The results should be a file that contains the name and the age. The programme reads like:

namen=sc.textFile('/user/hdfs/namen')
namen1 = namen.map(lambda line: line.split(',')).map(lambda fields: (fields[1],(fields[1],fields[0])))
leeftijd = sc.textFile('/user/hdfs/leeftijd')
leeftijd1 = leeftijd.map(lambda line: line.split(',')).map(lambda fields: (fields[0],(fields[0],fields[1])))
samen=leeftijd1.join(namen1)
samen1=samen.map(lambda x: (x[1][1][1],x[1][0][1]))
samen1.saveAsTextFile('/user/hdfs/naamleeftijd')

Pyspark works with RDD. Very roughly stated, these are tables with rows that may contain data elements or structures with data elements. On these RDD, we have mapping and filter functions that allow to modify a RDD. There are several ways to create such RDD. One possibility is to make use of the “sc” object. It has a method textFile that allows to read a file and store the content as a RDD.

When a file is read, each line is an element. As we have a key and a name in the file namen, we would like to be able to access the key and the name as two separate elements. So the line is split into two fields. in a subsequent step, the key is separated from the remainder of the line.

The same exercise is repeated for the leeftijd file. Also there, we would like to have a key and something that contains the remainder of the values.

We then have 2 RDD, both with a key and additional information. That can then be joined into a new RDD.

We then take the relevant items and write the content to a file on Hadoop.

 

 

 

Door tom