The Spark shuffle is a mechanism for redistributing or re-partitioning data so that the data grouped differently across partitions. Spark shuffle is a very expensive operation as it moves the data between executors of same worker node or even between worker nodes in a cluster. When you have a performance issue on Spark jobs, Spark transformation that involves shuffling is one of the places to look into.
In this tutorial, you will learn what triggers the shuffle between different RDD and DataFrame transformations using scala examples. The same approach also can be used with PySpark (Spark with Python)
What is Spark Shuffle?
Shuffling is a mechanism Spark uses to redistribute the data across different executors and even across machines. Spark shuffling triggers when we perform certain transformation operations like
join() on RDD and DataFrame.
Spark Shuffle is an expensive operation since it involves the following
- Disk I/O
- Network I/O
- Involving data serialization and deserialization
When creating an RDD or DataFrame, Spark doesn’t necessarily store the data for all keys in a partition since at the time of creation there is no way we can set the key for data set.
Hence, when we run the reduceByKey() operation to aggregate the data on keys, Spark does the following.
For example, when we perform reduceByKey() operation, Spark does the following
- Spark first runs map tasks on all partitions which groups all values for a single key.
- The results of the map tasks are kept in memory.
- When results do not fit in memory, Spark stores the data into a disk.
- Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate.
- Run the garbage collection
- Finally runs reduce tasks on each partition based on key.
Spark RDD triggers shuffle and repartition for several operations like
join() but not
val spark:SparkSession = SparkSession.builder() .master("local") .appName("Spark") .getOrCreate() val sc = spark.sparkContext val rdd:RDD[String] = sc.textFile("src/main/resources/test.txt") println(rdd.getNumPartitions) val rdd2 = rdd.flatMap(f=>f.split(" ")) .map(m=>(m,1)) //ReduceBy transformation val rdd5 = rdd2.reduceByKey(_ + _) println(rdd5.getNumPartitions)
Both getNumPartitions from above examples returns the same number of partitions. Though reduceByKey() triggers shuffle but results in same number of partitions as parent RDD.
Unlike RDD, Spark SQL DataFrame API increases the partitions when the operation performs shuffling.
val spark:SparkSession = SparkSession.builder() .master("local") .appName("Spark") .getOrCreate() import spark.implicits._ val simpleData = Seq(("James","Sales","NY",90000,34,10000), ("Michael","Sales","NY",86000,56,20000), ("Robert","Sales","CA",81000,30,23000), ("Maria","Finance","CA",90000,24,23000), ("Raman","Finance","CA",99000,40,24000), ("Scott","Finance","NY",83000,36,19000), ("Jen","Finance","NY",79000,53,15000), ("Jeff","Marketing","CA",80000,25,18000), ("Kumar","Marketing","NY",91000,50,21000) ) val df = simpleData.toDF("employee_name","department","state","salary","age","bonus") val df2 = df.groupBy("state").count() println(df2.rdd.getNumPartitions)
This outputs the partition count as 200 since by default shuffle partition set to 200 on s
DataFrame increases the partition number to 200 automatically when Spark operation performs data shuffling. Default shuffle partition number comes from Spark SQL configuration
spark.sql.shuffle.partitions which is by default set to 200.
You can change this default shuffle partition value using conf method of the SparkSession object.
Shuffle partition size
Based on your dataset size, a number of cores and memory Spark shuffling can benefit or harm your jobs. When you dealing with less amount of data, you should typically reduce the shuffle partitions otherwise you will end up with many partitioned files with less number of records in each partition. which results in running many tasks with lesser data to process.
On other hand, when you have too much of data and having less number of partitions results in fewer longer running tasks and some times you may also get out of memory error.
Getting a right size of the shuffle partition is always tricky and takes many runs with different value to achieve the optimized number. This is one of the key property to look for when you have performance issues on Spark jobs.