The Programmers Book

# Spark Pair RDD Functions Explanation

Spark has PairRDDFunctions class with several functions to work with Pair RDD or RDD key-value pair,

In this post, we will learn those functions with Scala examples.

All these functions are grouped into Transformations and Actions similar to regular RDD’s.

## Spark Pair RDD Actions

#### Pair RDD Functions Examples

I will explain Spark pair RDD functions with scala examples, before we get started let’s create a pair RDD.

``````
val spark = SparkSession.builder()
.appName("SparkBy")
.master("local")
.getOrCreate()
val rdd = spark.sparkContext.parallelize(
List("Germany India USA","USA India Russia","India Brazil Canada China")
)
val wordsRdd = rdd.flatMap(_.split(" "))
val pairRDD = wordsRdd.map(w=>(w,1))
pairRDD.foreach(println)
``````

This snippet creates a pair RDD by splitting by space on every element in an RDD, flatten it to form a single word string on each element in RDD and finally assigns an integer “1” to every word.

``````
(Germany,1)
(India,1)
(USA,1)
(USA,1)
(India,1)
(Russia,1)
(India,1)
(Brazil,1)
(China,1)
``````

#### distinct – Returns distinct keys.

``````
pairRDD.distinct().foreach(println)

//Prints below output
(Germany,1)
(India,1)
(Brazil,1)
(China,1)
(USA,1)
(Russia,1)
``````

#### sortByKey – Transformation returns an RDD after sorting by key

``````
println("Sort by Key ==>")
val sortRDD = pairRDD.sortByKey()
sortRDD.foreach(println)
``````

Output

``````
Sort by Key ==>
(Brazil,1)
(China,1)
(Germany,1)
(India,1)
(India,1)
(India,1)
(Russia,1)
(USA,1)
(USA,1)
``````

#### reduceByKey – Transformation returns an RDD after adding value for each key.

Result RDD contains unique keys.

``````
println("Reduce by Key ==>")
val wordCount = pairRDD.reduceByKey((a,b)=>a+b)
wordCount.foreach(println)
``````

This reduces the key by summing the values. Yields below output.

``````
Reduce by Key ==>
(Brazil,1)
(China,1)
(USA,2)
(Germany,1)
(Russia,1)
(India,3)

``````
``````
Reduce by Key ==>
(Brazil,1)
(China,1)
(USA,2)
(Germany,1)
(Russia,1)
(India,3)
``````

#### aggregateByKey – Transformation same as reduceByKey

In our example, this is similar to reduceByKey but uses a different approach.

``````
def param1= (accu:Int,v:Int) => accu + v
def param2= (accu1:Int,accu2:Int) => accu1 + accu2
println("Aggregate by Key ==> wordcount")
val wordCount2 = pairRDD.aggregateByKey(0)(param1,param2)
wordCount2.foreach(println)
``````

(Brazil,1)
(China,1)
(USA,2)
(Germany,1)
(Russia,1)
(India,3)

#### keys – Return RDD[K] with all keys in an dataset

``````
println("Keys ==>")
wordCount2.keys.foreach(println)
``````

Yields below output

``````
Brazil
China
USA
Germany
Russia
India
``````

#### values – return RDD[V] with all values in an dataset

``````
println("Keys ==>")
wordCount2.keys.foreach(println)
``````

#### count – This is an action function and returns a count of a dataset

``````
println("Count :"+wordCount2.count())
``````

#### collectAsMap – This is an action function and returns Map to the master for retrieving all date from a dataset.

``````
println("collectAsMap ==>")
pairRDD.collectAsMap().foreach(println)
``````

Yields below output:

``````
(Brazil,1)
(Germany,1)
(China,1)
(Russia,1)
(India,1)
``````

#### Complete Example

``````
package com.theprogrammersbook.spark.rdd

import org.apache.spark.sql.SparkSession

import scala.collection.mutable

object OperationsOnPairRDD {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
.appName("Spark")
.master("local")
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

val rdd = spark.sparkContext.parallelize(
List("Germany India USA","USA India Russia","India Brazil Canada China")
)

val wordsRdd = rdd.flatMap(_.split(" "))
val pairRDD = wordsRdd.map(f=>(f,1))
pairRDD.foreach(println)

println("Distinct ==>")
pairRDD.distinct().foreach(println)

//SortByKey
println("Sort by Key ==>")
val sortRDD = pairRDD.sortByKey()
sortRDD.foreach(println)

//reduceByKey
println("Reduce by Key ==>")
val wordCount = pairRDD.reduceByKey((a,b)=>a+b)
wordCount.foreach(println)

def param1= (accu:Int,v:Int) => accu + v
def param2= (accu1:Int,accu2:Int) => accu1 + accu2
println("Aggregate by Key ==> wordcount")
val wordCount2 = pairRDD.aggregateByKey(0)(param1,param2)
wordCount2.foreach(println)

//keys
println("Keys ==>")
wordCount2.keys.foreach(println)

//values
println("values ==>")
wordCount2.values.foreach(println)

println("Count :"+wordCount2.count())

println("collectAsMap ==>")
pairRDD.collectAsMap().foreach(println)

}
}

``````

#### References:

https://spark.apache.org/docs/latest/index.html