Of all the developer delights, none is more engaging than a set of APIs that make developers productive, easy to use, and intuitive and expressive. One of the attractions of Apache Spark for developers has been its easy-to-use APIs, to operate on large data sets, in all languages: Scala, Java, Python and R.
In this blog, I explore three sets of APIs: RDD, DataFrames, and Datasets, available in Apache Spark 2.2 and later;
why and when you should use each set; describe their performance and optimization benefits; and list scenarios in which DataFrames and Datasets should be used instead of RDD. Mainly, I will focus on DataFrames and Datasets, because in Apache Spark 2.0, these two APIs are unified.
Our main motivation behind this unification is our quest to simplify Spark by limiting the number of concepts you have to learn and offering ways to process structured data. And through structure, Spark can offer high-level abstraction and APIs as domain-specific language constructs.
Resilient Distributed Dataset (
RDD)
RDD was the primary user-facing API in Spark since its inception. In essence, an RDD is an immutable distributed collection of elements of your data, partitioned between nodes in your cluster that can be operated in parallel with a low-level API that delivers transformations and actions.
When
to use RDD?
Consider these common scenarios or use cases for using RDD when:
you want
- low-level transformation and actions and control in your dataset
- is unstructured, such as media streams or text flows
- you want to manipulate your data with functional programming constructs that domain-specific expressions;
- you don’t mind imposing a schema, such as column formatting, when processing or accessing data attributes by name or column;
- you can forgo some optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data.
; your data
;
and
What about RDDs in Apache Spark 2.0?
You may ask: Are RDDs being relegated as second-class citizens?
The answer is a resounding NO!
In addition, as you’ll notice below, you can seamlessly move between DataFrame or Dataset and RDD at will, using simple API method calls, and DataFrames and Datasets are built on top of RDD.
Like
an
RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organized into named columns, like a table in a relational database. Designed to make processing large data sets even easier, DataFrame allows developers to impose structure on a distributed collection of data, allowing for higher-level abstraction; provides a domain-specific language API for manipulating distributed data; and makes Spark accessible to a wider audience, beyond specialized data engineers.
In our Apache Spark 2.0
webinar preview and subsequent blog, we mentioned that in Spark 2.0, DataFrame APIs will be merged with dataset APIs, unifying data processing capabilities across libraries. Because of this unification, developers now have fewer concepts to learn or remember, and work with a single, high-level, type-safe API called the Dataset.
<img src="https://www.databricks.com/wp-content/uploads/2016/06/Unified-Apache-Spark-2.0-API-1.png" alt="Unified
Dataset API Diagram in Apache Spark 2.0″ />
Datasets
Starting with Spark 2.0, Dataset adopts two distinct API features: a strongly typed API and an untyped API , as shown in the following table. Conceptually, consider DataFrame as an alias for a collection of generic objects Dataset[Row], where Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly typed JVM objects, dictated by a case class defined in Scala or a class in Java.
Typed and
untyped API LanguageMainAbstraction ScalaDataset[T] & DataFrame (alias for Dataset[Row])JavaDataset[T]Python*DataFrameR*DataFrame
Note: Since Python and R are not type-safe at compile time, we only have typeless APIs, namely DataFrames.
Benefits of Dataset APIs
As a Spark developer, you benefit from the unified DataFrame and Dataset APIs in Spark 2.0 in several ways
.
1. Static
typing and runtime type security
Consider static typing and runtime security as a spectrum, with SQL less restrictive for more restrictive dataset. For example, in your Spark SQL string queries, you won’t know a syntax error until runtime (which could be costly), while in DataFrames and Datasets you can catch compile-time errors (saving the developer time and cost). That is, if you invoke a function in the DataFrame that is not part of the API, the compiler will capture it. However, it will not detect a non-existent column name until runtime.
At the other end of the spectrum is Dataset, the most restrictive. Because all Dataset APIs are expressed as lambda functions and JVM-typed objects, any discrepancies in typed parameters will be detected at compile time. In addition, its parsing error can also be detected at compile time, when using datasets, which saves time and costs to the developer.
All of this translates into a spectrum of type safety throughout syntax and parsing error in your Spark code, with datasets being the most restrictive but productive for a developer
.
<img src="https://www.databricks.com/wp-content/uploads/2016/07/sql-vs-dataframes-vs-datasets-type-safety-spectrum.png" alt="Type security spectrum between SQL
, DataFrames, and DataSets” />2. High-level abstraction and custom view in
DataFrames
of structured and semi-structured data
as a collection of DataSets[Row] represent a structured custom view in your semi-structured data. For example, suppose you have a huge dataset of IoT device events, expressed as JSON. Since JSON is a semi-structured format, it lends itself well to employing Dataset as a strongly type-specific Dataset collection [DeviceIoTData].
{“device_id”: 198164, “device_name”: “sensor-pad-198164owomcJZ”, “ip”: “80.55.20.25”, “cca2”: “PL”, “cca3”: “POL”, “cn”: “Poland”, “latitude”: 53.080000, “longitude”: 18.620000, “scale”: “Celsius”, “temp”: 21, “humidity”: 65, “battery_level”: 8, “c02_level”: 1408, “lcd”: “red”, “timestamp” :1458081226051}
You can express each JSON entry as DeviceIoTData, a custom object, with a Scala case class.
DeviceIoTData case class (battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long, ip: Stringe, latitude: Double, lcd: String, longitude: Double, scale:String, temp: Long, timestamp: Long)
We can then read the data from a JSON file. read
the json file and create the dataset from the case class // DeviceIoTData // ds is now a collection of Scala JVM objects DeviceIoTData val ds = spark.read.json(“/databricks-public-datasets/data/iot/iot_devices.json”).as[DeviceIoTData]
Three things happen here under the hood in the code above:
Spark
- reads the JSON, infers the schema, and creates a collection of DataFrames
- At this point, Spark converts its data to DataFrame = Dataset[Row], a collection of generic Row objects, since it doesn’t know the exact type.
- Now, Spark converts the specific Scala JVM object of type Dataset[Row] -> Dataset[DeviceIoTData], as dictated by the DeviceIoTData class.
.
Most of us who work with structured data are used to viewing and processing data in a columnar manner or accessing specific attributes within an object. With Dataset as a collection of objects typed Dataset[ElementType], you seamlessly get compile-time security and custom view for strongly typed JVM objects. And its resulting strongly typed Dataset[T] from the above code can be easily displayed or processed with high-level methods.
3. Ease of use of structured APIs
Although structure may limit control over what your Spark program can do with data, it introduces rich semantics and an easy set of domain-specific operations that can be expressed as high-level constructs. However, most calculations can be performed with Dataset’s high-level APIs. For example, it is much easier to perform agg, select, sum, avg, map, filter, or groupBy operations by accessing the DeviceIoTData of a Dataset-type object than by using the data fields in RDD rows.
Expressing your calculation in a domain-specific API is much simpler and easier than with expressions of type relational algebra (in RDD). For example, the following code will filter () and map() will create another immutable dataset.
// Use filter(), map(), groupBy() country and calculate avg() // for temperatures and humidity. This operation results in // another immutable Dataset. The query is easier to read, // and expressive val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($”_3″).avg() //display the dataset display(dsAvgTmp)
4. Performance and optimization
Along with all the above benefits, you can’t overlook space efficiency and performance gains in using DataFrames
and Dataset APIs for two reasons.
First, because DataFrame and Dataset APIs are built on top of the Spark SQL engine, it uses Catalyst to generate an optimized logical and physical query plan. In the R, Java, Scala, or Python DataFrame/Dataset APIs, all relationship type queries are subjected to the same code optimizer, providing space efficiency and speed. While the Dataset[T] typed API is optimized for data engineering tasks, the untyped Dataset[Row] (a DataFrame alias) is even faster and suitable for interactive analysis.
Second, since Spark as a compiler understands its JVM object of type Dataset, it maps its type-specific JVM object to the Tungsten internal memory representation using encoders. As a result, Tungsten encoders can efficiently serialize/deserialize JVM objects, as well as generate compact bytecode that can run at higher speeds.
When should I use DataFrames or DataSets?
If you want rich semantics, high-level abstractions, and domain-specific APIs, use DataFrame or Dataset.
- If processing requires high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, column access, and lambda function usage on semi-structured data, use DataFrame or Dataset.
- If you want a higher degree of compile-time type security, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset.
- If you want to unify and simplify APIs across Spark libraries, use DataFrame or Dataset.
- If you are an R user, use DataFrames.
- If you’re a Python user, use DataFrames and turn to RDD if you need more control.
Note that you can always seamlessly interoperate or convert from DataFrame and/or Dataset to an RDD, using a simple method called .rdd. For example
, // select specific fields from the dataset, apply a predicate // using the where() method, convert to an RDD, and display the first 10 // RDD rows val deviceEventsDS = ds.select($”device_name”, $”cca3″, $”c02_level”).where($”c02_level” > 1300) // convert to RDD and take the first 10 rows val eventsRDD = deviceEventsDS.rdd.take(10)
Bringing it all together
In short, the choice of when to use RDD or DataFrame and/or Dataset seems obvious. While the former gives you low-level functionality and control, the latter allows for a custom view and structure, offers specific high-level and domain operations, saves space, and runs at higher speeds.
As we examined the lessons we learned from early versions of Spark (how to simplify Spark for developers, how to optimize it, and make it efficient), we decided to elevate low-level RDD APIs to a high-level abstraction like DataFrame and Dataset and build this unified data abstraction across libraries on top of Catalyst optimizer and Tungsten.
Choose one, DataFrames and/or DataSet or RDD APIs, that meets your needs and use cases, but I wouldn’t be surprised if it falls into the realm of most developers working with structured and semi-structured data.
What’s next?
You can try Apache Spark 2.2 on Databricks.
You can also watch the Spark Summit presentation on A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets
If you haven’t signed up yet, try Databricks now
.
In the coming weeks, we’ll have a series of blogs about Structured Streaming. Stay tuned.