The Programmers Book

Convert Spark RDD to DataFrame | Dataset

While working in Apache Spark with Scala, we often need to convert RDD to DataFrame and Dataset as these provide more advantages over RDD. For instance, DataFrame is a distributed collection of data organised into named columns similar to Database tables and provides optimisation and performance improvement.

  1. Create Spark RDD
  2. Convert Spark RDD to DataFrame
    • using toDF()
    • using createDataFrame()
    • using RDD row type & schema
  3. Convert Spark RDD to Dataset

Create Spark RDD

Now, let’s create an RDD by passing Seq object to sparkContext.parallelize() function.

import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)

Convert Spark RDD to DataFrame

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

Using rdd.toDF() function

Spark provides an implicit function toDF() which would be used to convert RDD, Seq[T], List[T] to DataFrame. In order to use toDF() function, we should import implicits first using import spark.implicits._.

val dfFromRDD1 = rdd.toDF()

By default, toDF() function creates column names as “_1” and “_2” like Tuples. Outputs below schema.


 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)

toDF() has another signature that takes arguments to define column names as shown below.

val dfFromRDD1 = rdd.toDF("language","users_count")


Outputs below schema.

 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)

By default, the datatype of these columns infers to the type of data and set’s nullable to true. We can change this behavior by supplying schema using StructType – where we can specify a column name, data type and nullable for each field/column.

Using spark createDataFrame() function

SparkSession class provides createDataFrame() method to create DataFrame and it takes rdd object as an argument. and chain it with toDF() to specify names to the columns.

val columns = Seq("language","users_count")
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)

Here, we are using scala operator :_* to explode columns array to comma-separated values.

Using RDD Row type RDD[Row] to DataFrame

Spark createDataFrame() has another signature which takes the RDD[Row] type and schema for column names as arguments. To use this first, we need to convert our “rdd” object from RDD[T] to RDD[Row]. To define a schema, we use StructType that takes an array of StructField. And StructField takes column name, data type and nullable/not as arguments.

    //From RDD (USING createDataFrame and Adding schema using StructType)
    val schema = StructType(columns
      .map(fieldName => StructField(fieldName, StringType, nullable = true)))
    //convert RDD[T] to RDD[Row]
    val rowRDD = => Row(attributes._1, attributes._2))
    val dfFromRDD3 = spark.createDataFrame(rowRDD,schema)

This creates a data frame from RDD and assigns column names using schema.

Convert Spark RDD to Dataset

The DataFrame API is radically different from the RDD API because it is an API for building a relational query plan that Spark’s Catalyst optimizer can then execute.

The Dataset API aims to provide the best of both worlds: the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.

DataFrame is an alias to Dataset[Row]. As we mentioned before, Datasets are optimized for typed engineering tasks, for which you want types checking and object-oriented programming interface, while DataFrames are faster for interactive analytics and close to SQL style.

About data serializing. The Dataset API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders that are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object.

val ds = spark.createDataset(rdd)

Have any Question or Comment?

Leave a Reply

Your email address will not be published. Required fields are marked *