Repartition() vs Coalesce() explanation


In Spark or PySpark repartition is used to increase or decrease the RDD, DataFrame, Dataset partitions whereas the Spark coalesce is used to only decrease the number of partitions in an efficient way.

In this post, we will learn what is Spark repartition() and coalesce() methods? , difference between repartition vs coalesce with Scala examples.

Here, I will explain examples with RDD and DataFrame using Scala and the same approach can be used with PySpark (Spark with Python).

  1. Spark partitioning
    • Local
    • Cluster
    • Configuration
  2. RDD Partition
    • RDD repartition
    • RDD coalesce
  3. DataFrame Partition
    • DataFrame repartition
    • DataFrame coalesce

One important point to note is, Spark repartition() and coalesce() are very expensive operations as they shuffle the data across many partitions hence try to minimize repartition as much as possible.

1. How Spark Partitions data files

One main advantage of the Spark is, it splits data into multiple partitions and executes operations on all partitions of data in parallel which allows us to complete the job faster. While working with partition data we often need to increase or decrease the partitions based on data distribution. Methods repartition and coalesce helps us to repartition.

When not specified programmatically or through configuration, Spark by default partitions data based on number of factors and the factors differs were you running your job on and what mode.

1.1 Local mode

When we are running on local in standalone mode, Spark partitions data into the number of CPU cores you have on your system or the value you specify at the time of creating SparkSession object

The above example provides local[5] as an argument to master() method meaning to run the job locally with 5 partitions. Though if you have just 2 cores on your system, it still creates 5 partition tasks.

1.2 HDFS cluster mode

When you running Spark jobs on the Hadoop cluster the default number of partitions are based on the following.

  • On the HDFS cluster, by default, Spark creates one Partition for each block of the file.
  • In Version 1 Hadoop the HDFS block size is 64 MB and in Version 2 Hadoop the HDFS block size is 128 MB
  • Total number of cores on all executor nodes in a cluster or 2, whichever is larger

For example if you have 640 MB file and running it on Hadoop version 2, creates 5 partitions with each consists on 128 MB blocks (5 blocks * 128 MB = 640 MB). If you repartition to 10 then it creates 2 partitions for each block.

1.3 Spark configuration

  • spark.default.parallelism configuration default value set to the number of all cores on all nodes in a cluster, on local it is set to number of cores on your system.
  • spark.sql.shuffle.partitions configuration default value is set to 200 and be used when you call shuffle operations like reduceByKey()  , groupByKey()join() and many more. This property is available only in DataFrame API but not in RDD.

You can change the values of these properties through programmatically using the below statement.

You can also set the partition value of these configurations using spark-submit command.

1. RDD Partition and repartition

In RDD, you can create parallelism at the time of the creation of an RDD using parallelize(), textFile() and wholeTextFiles().

The above example yields below output

spark.sparkContext.parallelize(Range(0,20),6) distributes RDD into 6 partitions and the data is distributed as below.

1.1 RDD repartition()

Spark RDD repartition() method is used to increase or decrease the partitions. The below example decreases the partitions from 10 to 4 by moving data from all partitions.

This yields output Repartition size : 4 and the repartition re-distributes the data(as shown below) from all partitions which is full shuffle leading to very expensive operation when dealing with billions and trillions of data.

1.2 RDD coalesce()

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

If you compared the below output with section 1, you will notice partition 3 has been moved to 2 and Partition 6 has moved to 5, resulting data movement from just 2 partitions.

1.3 Complete Example of Spark RDD repartition and coalesce

Below is complete example of Spark RDD repartition and coalesce in Scala language.

2. DataFrame Partition and repartition

Unlike RDD, you can’t specify the partition/parallelism while creating DataFrame. DataFrame or Dataset by default uses the methods specified in Section 1 to determine the default partition and splits the data for parallelism.

The above example creates 5 partitions as specified in master("local[5]") and the data is distributed across all these 5 partitions.

2.1 DataFrame repartition()

Similar to RDD, the Spark DataFrame repartition() method is used to increase or decrease the partitions. The below example increases the partitions from 5 to 6 by moving data from all partitions.

Just increasing 1 partition results data movements from all partitions.

And, even decreasing the partitions also results in moving data from all partitions. hence when you wanted to decrease the partition recommendation is to use coalesce()/

2.2 DataFrame coalesce()

Spark DataFrame coalesce() is used only to decrease the number of partitions. This is an optimized or improved version of repartition() where the movement of the data across the partitions is fewer using coalesce.

This yields output 2 and the resultant partition looks like

Since we are reducing 5 to 2 partitions, the data movement happens only from 3 partitions and it moves to remain 2 partitions.

Reference

Have any Question or Comment?

Leave a Reply

Your email address will not be published. Required fields are marked *