In a previous post, I showed how two files can be merged in Scala. The idea was that RDDs were translated as data frames and a join was undertaken on these.
In this post, the philosophy is slightly different. Now the RDD is rewritten as a key-value pair with a unique key. This then allows a merge on this unique key.
Let us first see how a RDD can be created with a unique key:
val counts = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,1)).reduceByKey((v1,v2) => v1+v2)
A file is read (“keyvalue”) that is subsequently split along their comma. Each word is then rewritten as an own record. If the original file contains 6 words, we end up having 6 records. We then create a new RDD with a “1” added to each record. Subsequently the word is seen as a key. The “1” is then aggregated over the records. This result could be used as a wordcount example.
I created a similar RDD (“counts1”) that also had the words as a key.
val counts1 = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,2)).reduceByKey((v1,v2) => v1+v2)
The join can then be undertaken as:
val pipo = counts1.join(counts)
The outcomes can be shown as pipo.foreach(println).
And a similar scripts runs as
val kbreqs = sc.textFile("/user/hdfs/keyuser").filter(line => line.contains("KBDOC")).keyBy(line => (line.split(' ')(5))) val kblist = sc.textFile("/user/hdfs/keydoc").keyBy(line => (line.split(':')(0))) val titlereqs = kbreqs.join(kblist)
A final script:
val logs=sc.textFile("/user/hdfs/naamtoev") val userreqs = logs.map(line => line.split(' ')).map(words => (words(0),1)).reduceByKey((v1,v2) => v1 + v2) val accountsdata="/user/hdfs/naamstraat" val accounts = sc.textFile(accountsdata).keyBy(line => line.split(',')(0)) val accounthits = accounts.join(userreqs) for (pair <- accounthits) {printf("%s, %s, %s, %s\n",pair._1,pair._2._1," score= ",pair._2._2)}