RDD Programming Guide – Spark 3.4.0 Documentation

  • Overview
  • Linking
  • to Spark

  • Initializing Spark
    • using the shell
  • Resilient distributed datasets (RDDs)
    • Parallelized collections External datasets
    • RDD Operations Basics
      • Passing features to Spark
      • Description of closures
        • Example
        • Local vs. Clustered Modes
        • Printing Elements of an RDD
      • Working with key-value
      • pairs

      • Transformations
      • Actions
      • Mix operations Background performance impact RDD
        • persistence
      • Which storage tier to choose?
      • Deleting
  • data shared variables
    • Broadcast variable
    • accumulators
  • Deploying in a cluster
  • Launching Spark jobs from Java / Scala
  • Unit testing Where to
  • go from here

At a high level, each Spark application consists of a controller program that executes the user’s primary function and executes several parallel operations in a cluster. The primary abstraction that Spark provides is a resilient distributed dataset (RDD), which is a collection of partitioned items on cluster nodes that can be operated in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-compatible file system) or an existing Scala collection in the driver program and transforming it. Users can also ask Spark to retain an RDD in memory, allowing it to be efficiently reused in parallel operations. Finally, RDDs automatically recover from node failures.

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it sends a copy of each variable used in the function to each task. Sometimes a variable must be shared between tasks, or between tasks and the controller program. Spark supports two types of shared variables: broadcast variables, which can be used to cache an in-memory value across all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

This guide lists each of these features in each of the languages supported by Spark. It’s easier to follow if you start the Spark interactive shell, either bin/spark-shell for the Scala shell or bin/pyspark for the Python shell.

The appName parameter is a name for the application to display in the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you don’t want to code the master into the program, but start the application with spark-submit and receive it there. However, for local tests and unit tests, you can pass “local” to run Spark in-process.

The use of Shell

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated in parallel. There are two ways to create RDDs: parallelize an existing collection in the driver program, or reference a dataset on an external storage system, such as a shared file system, HDFS, HBase, or any data source that provides a Hadoop input format.

Parallelized

collections

An important parameter for parallel collections is the number of partitions into which to cut the dataset. Spark will run one task for each partition in the cluster. Typically, you want 2 to 4 partitions for each CPU in your cluster. Typically, Spark tries to set the number of partitions automatically based on your cluster. However, you can also configure it manually by passing it as a second parameter to parallelize (for example, sc.parallelize(data, 10)). Note: Some places in the code use the term slices (synonymous with partitions) to maintain backward compatibility.

RDD Operations

of

External

Datasets

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the controller program after executing a calculation on the dataset. For example, map is a transformation that passes each item in the dataset through a function and returns a new RDD that represents the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the controller program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, as they don’t calculate their results right away. Instead, they only remember transformations applied to some base dataset (for example, a file). Transformations are calculated only when an action requires that a result be returned to the controller program. This design allows Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduction and return only the result of the reduction to the controller, instead of the larger mapped dataset.

By default, each transformed RDD can be recalculated each time you perform an action on it. However, you can also keep an RDD in memory using the persist (or cache) method, in which case Spark will keep the items in the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk or replicated across multiple nodes.

Basics

Passing functions to

Spark

Understanding closures

One of the hardest things about Spark is understanding the scope and lifecycle of variables and methods when running code in a cluster. RDD operations that modify variables outside their scope can be a frequent source of confusion. In the following example, we’ll look at the code that uses foreach() to increment a counter, but similar problems can also occur for other operations.

Example

Consider the sum of naïve RDD elements below, which may behave differently depending on whether execution is occurring within the same JVM. A common example of this is when running Spark in local mode (-master = local[n]) versus deploying a Spark application in a cluster (for example, via spark-submit to YARN): Local vs. cluster

modes

The behavior of the preceding code is undefined and may not work as expected. To run jobs, Spark divides the processing of RDD operations into tasks, each of which is executed by an executor. Before execution, Spark calculates the closure of the task. The closure are those variables and methods that must be visible for the executor to perform their calculations in the RDD (in this case foreach()). This closure is serialized and sent to each executor.

The variables within the close sent to each executor are now copies and therefore when the counter is referenced within the foreach function, it is no longer the counter on the controller node. There is still a counter in the memory of the controller node, but it is no longer visible to the executors! Executors see only the copy of the serialized closure. Therefore, the final value of the counter will remain zero, since all operations on the counter referenced the value within the serialized closing.

In local mode, in some circumstances, the foreach function will actually run within the same JVM as the driver and reference the same original counter, and you can update it

.

To ensure well-defined behavior in such scenarios, one should use an accumulator. Accumulators in Spark are specifically used to provide a mechanism to securely update a variable when execution is split among worker nodes in a cluster. The Accumulators section of this guide discusses them in more detail.

In general, closures, constructs such as loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside the closures. Some code that does this may work in local mode, but that’s just by accident and such code won’t behave as expected in distributed mode. Use an accumulator instead if any global aggregation is needed.

Printing

elements of an RDD

Another common idiom is to try printing the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all elements of the RDD. However, in cluster mode, the output to stdout called by the executors is now written to the runner’s stdout, not the controller’s, so stdout to the controller will not display them. To print all items in the controller, you can use the collect() method to first bring the RDD to the controller node like this: rdd.collect().foreach(println). However, this can cause the controller to run out of memory, because collect() recovers the entire RDD on a single machine; If you only need to print a few elements of the RDD, a safer approach is to use take(): rdd.take(100).foreach(println).

Working with key-value

pair transformations

The following table lists some of the common transformations supported by Spark. See the RDD API document (Scala, Java, Python, R) and the pairing RDD functions document (Scala, Java) for more details.

TransformationMeaning map(func) Returns a new distributed dataset formed by passing each element of the source through a func function. filter(func) Returns a new dataset formed by selecting the source items where func returns true. flatMap(func) Similar to the map, but each input element can be mapped to 0 or more output elements (so func must return a Seq instead of a single element). mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when executed on an RDD of type T. sample(withReplacement, fraction, seed) Sample a fraction of a fraction of the data, with or without replacement, using a given random number generating seed. union(otherDataset) Returns a new dataset that contains the union of the source dataset items and the argument. intersection(otherDataset) Returns a new RDD that contains the intersection of source dataset items and argument. distinct([numPartitions])) Returns a new dataset that contains the various elements of the source dataset. groupByKey([numPartitions]) When called on a pair dataset (K, V), returns a pair dataset (K, Iterable<V>). note: If you are grouping to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will produce much better performance. note: By default, the level of parallelism in the output depends on the number of partitions in the primary RDD. You can pass an optional numPartitions argument to set a different number of tasks. reduceByKey(func, [numPartitions]) When called on a pair dataset (K, V), returns a pair dataset (K, V) where the values of each key are added using the given reduced function, which must be of type (V,V) => V. As in groupByKey, the number of shrink tasks can be configured using an optional second argument. aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) When called on a pair dataset (K, V), returns a pair dataset (K, U) where the values of each key are aggregated using the given combined functions and a neutral “zero” value. It allows for a value-added type that is different from the input value type, while avoiding unnecessary allocations. As in groupByKey, the number of shrink tasks can be configured using an optional second argument. sortByKey([ascending], [numPartitions]) When called on a dataset of pairs (K, V) where K implements Ordered, returns a dataset of pairs (K, V) sorted by keys in ascending or descending order, as specified in the Boolean ascending argument. join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. External joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. cogroup(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith. cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of pairs (T, U) (all pairs of items). pipe(command, [envVars]) Pipes each RDD partition through a shell command, for example, a Perl or bash script. RDD elements are written to the process stdin, and the output lines to their stdout are returned as a string RDD. coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for executing operations more efficiently after filtering a large data set. repartition(numPartitions) Reorganize the data in the RDD randomly to create more or fewer partitions and balance them between them. This always shuffles all data over the network. repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort the records by their keys. This is more efficient than calling the partition and then sorting within each partition because it can push the sorting down the shuffling machinery.

Actions

The following table lists some of the common actions supported by Spark. See the

RDD API document (Scala, Java, Python, R)

and the pairing RDD functions document (Scala, Java) for more details.

ActionMeaning reduce(func) Add the items in the dataset using a func function (which takes two arguments and returns one). The function must be commutative and associative so that it can be computed correctly in parallel. collect() Returns all items in the dataset as an array in the driver program. This is usually useful after a filter or other operation returns a sufficiently small subset of the data. count() Returns the number of items in the dataset. first() Returns the first item in the dataset (similar to take(1)). take(n) Returns an array with the first n elements of the dataset. takeSample(withReplacement, num, [seed]) Returns an array with a random sample of num elements from the dataset, with or without replacement, optionally specifying a random number-generating seed beforehand. takeOrdered(n, [ordering]) Returns the first n elements of the RDD using their natural sort or a custom comparator. saveAsTextFile(path) Type the dataset items as a text file (or set of text files) to a particular directory on the local file system, HDFS, or any other Hadoop-compatible file system. Spark will call toString on each element to convert it to a line of text in the file. saveAsSequenceFile(path) (Java and Scala) Write dataset items as a Hadoop SequenceFile to a given path in the local file system, HDFS, or any other Hadoop-compatible file system. This is available in RDDs of key-value pairs that implement the Hadoop Wiritable interface. In Scala, it is also available in types that are implicitly convertible to Writable (Spark includes conversions for basic types such as Int, Double, String, etc.). saveAsObjectFile(path) (Java and Scala) Write the dataset elements in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile(). countByKey() Only available in type RDD (K, V). Returns a hashmap of pairs (K, Int) with the count of each key. foreach(func) Execute a func function on each item in the dataset. This is usually done for side effects such as upgrading an accumulator or interacting with external storage systems. Note: Modifying variables other than accumulators outside the foreach() can result in undefined behavior. See Understanding Closures for details.

The Spark RDD API also exposes asynchronous versions of some actions, such as foreachAsync for foreach, which immediately returns a FutureAction to the caller instead of crashing at the end of the action. This can be used to manage or wait for the asynchronous execution of the action.

Random

operations

Certain operations within Spark trigger an event known as random. Shuffle is Spark’s mechanism for redistributing data so that it is grouped differently across partitions. This usually involves copying data between performers and machines, making mixing a complex and expensive operation.

To

understand what happens during shuffle, we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all the values of a single key are combined into a tuple: the key and the result of executing a function reduces against all the values associated with that key. The challenge is that not all values of a single key necessarily reside on the same partition, or even on the same machine, but must be located together to calculate the result.

In Spark, data is generally not distributed across partitions to be in place for a specific operation. During calculations, a single task will operate on a single partition, therefore, to organize all data so that a single task reducesByKey reduces, Spark needs to perform an all-to-all operation. You need to read from all partitions to find all the values of all keys, and then gather the values between the partitions to calculate the final result of each key, this is called shuffle.

Although the set of elements in

each newly shuffled data partition will be deterministic, and so is the order of the partitions themselves, the order of these elements is not. If one wants predictably sorted data after shuffling, then it is possible to use:

mapPartitions to sort

  • each partition using, for example, .sorted
  • repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously
  • repartitioning

  • sortBy to make a globally ordered RDD

Operations that can cause a merge include repartitioning operations such as repartitioning and coalescing, ‘ByKey operations (except for counting) such as groupByKey and reduceByKey, and merge operations such as cogroup and join.

Performance Impact

Shuffle is an expensive operation because it involves disk I/O, data serialization, and network I/O. To organize the data for shuffle, Spark generates task sets: assign tasks to organize the data and a set of reduction tasks to add it. This naming comes from MapReduce and does not directly relate to Spark’s map and shrink operations.

Internally, the results of individual map tasks are saved in memory until they cannot fit. These are then sorted according to the destination partition and written to a single file. On the reduction side, tasks read the relevant ordered blocks.

Certain random operations can consume significant amounts of heap memory because they employ in-memory data structures to organize records before or after they are transferred. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and the ‘ByKey operations’ generate them on the reduce side. When data doesn’t fit in memory, Spark will spill these tables to disk, incurring additional disk I/O overhead and increased garbage collection.

Shuffle mode also generates a large number of intermediate files on the disk. Starting with Spark 1.3, these files are retained until the corresponding RDDs are no longer used and garbage items are collected. This is done so that random files do not need to be recreated if the lineage is recalculated. Garbage collection can occur only after a long period of time, if the application retains references to these RDDs or if GC is not activated frequently. This means that long-running Spark jobs can consume a lot of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

Random behavior can be adjusted by adjusting a variety of configuration parameters. See the “Random Behavior” section within the Spark Setup Guide.

RDD persistence

One of

Spark’s most important capabilities is to persist (or cache) a set of in-memory data across all operations. When an RDD is retained, each node stores the partitions it calculates in memory and reuses them in other actions of that dataset (or datasets derived from it). This allows future actions to be much faster (often more than 10x). Caching is a key tool for iterative algorithms and rapid interactive use.

You can mark an RDD to be preserved using the persist() or cache() methods. The first time it is calculated in an action, it will be kept in the memory of the nodes. The Spark cache is fault tolerant: if any partitions of an RDD are lost, it will be automatically recalculated using the transformations that originally created it.

In addition, each persistent RDD can be stored using a different storage tier, allowing you, for example, to keep the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it to the nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is an abbreviation for using the default storage tier, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage tiers is: Storage Tier

, which means MEMORY_ONLY store RDDs as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recalculated on the fly whenever they are needed. This is the default level. MEMORY_AND_DISK Store RDDs as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that do not fit on the disk and read them from there when necessary. MEMORY_ONLY_SER (Java and Scala) Store RDDs as serialized Java objects (one-byte-per-partition array). This is generally more space-efficient than deserialized objects, especially when using a fast, but more CPU-intensive serializer to read. MEMORY_AND_DISK_SER (Java and Scala) Similar to MEMORY_ONLY_SER, but spills partitions that don’t fit in memory to disk instead of recalculating them on the fly whenever they are needed. DISK_ONLY Store RDD partitions only on disk. MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as previous levels, but replicate each partition on two cluster nodes. OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but stores data in memory outside the heap. This requires off-the-heap memory to be enabled.

Note: In Python, stored objects will always be serialized with the Pickle library, so it doesn’t matter if you choose a serialized level. Storage tiers available in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, DISK_ONLY_2, and DISK_ONLY_3.

Spark also automatically retains some intermediate data in random operations (e.g. reduceByKey), even without users calling persistently. This is done to avoid recalculating the entire input if a node fails during shuffle. We still recommend that user calls persist in the resulting RDD if they plan to reuse it.

Which storage tier to choose?

Spark’s storage tiers are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

  • If your RDDs comfortably fit the default storage tier (MEMORY_ONLY), leave them at that. This is the most efficient CPU option, allowing operations on RDDs to run as quickly as possible.

  • If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make objects much more space-efficient, but reasonably quick to access. (Java and Scala)

  • Don’t spill to disk unless the functions that calculated your datasets are expensive or leak out a lot of data. Otherwise, recalculating a partition can be as fast as reading it from disk.

  • Use replicated storage tiers if you want fast recovery from errors (for example, if you use Spark to service requests from a web application). All storage tiers provide full fault tolerance when recalculating lost data, but replicates allow you to continue running tasks on the RDD without waiting to recalculate a lost partition.

Data

Spark deletion automatically monitors cache usage on each node and deletes old data partitions in a less-recently used (LRU) manner. If you want to manually delete an RDD instead of waiting for it to exit the cache, use the RDD.unpersist() method. Note that this method is not blocked by default. To block until resources are released, specify blocking=true when calling this method.

Typically, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all variables used in the function. These variables are copied to each machine and no updates of the variables on the remote machine are propagated to the driver program. Supporting general shared read and write variables across all tasks would be inefficient. However, Spark provides two limited types of shared variables for two common usage patterns: diffusion variables and accumulators.

Broadcast

variables allow the programmer to keep a read-only variable cached on each machine instead of sending a copy of it with tasks. They can be used, for example, to give each node a copy of a large set of input data in an efficient way. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce the cost of communication.

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically transmits the common data needed by tasks within each stage. Data disseminated in this manner is cached in serialized form and deserialized before each task is executed. This means that explicit creation of broadcast variables is only useful when multi-stage tasks need the same data or when it is important to cache the data in deserialized form.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a container around v, and its value can be accessed by calling the value method. The following code shows this:

After the broadcast variable is created, it must be used instead of the value v in any function executed in the cluster so that v is not sent to the nodes more than once. In addition, object v should not be modified after it is broadcast to ensure that all nodes get the same value from the broadcast variable (for example, if the variable is sent to a new node later).

To release the resources that the broadcast variable copied to the runners, call .unpersist(). If the stream is used again afterwards, it will be rebroadcast. To permanently release all resources used by the broadcast variable, call .destroy(). The diffusion variable cannot be used after that. Note that these methods are not blocked by default. To block until resources are released, specify blocking=true when calling them.

Accumulators are variables that are only “aggregated” through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports numeric type accumulators, and programmers can add support for new types.

As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this case, counter) will be displayed in the web UI for the stage that modifies that accumulator. Spark displays the value of each accumulator modified by a task in the “Tasks” table.

Accumulator tracking in the UI can be useful for understanding the progress of the execution stages (NOTE: this is not yet supported by Python).

For accumulator updates performed only within actions, Spark guarantees that updating each task to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware that updating each task can be applied more than once if tasks or job stages are rerun.

Accumulators don’t change Spark’s lazy evaluation model. If they are updated within an operation in an RDD, their value is only updated once that RDD is calculated as part of an action. Consequently, accumulator updates are not guaranteed to run when performed within a lazy transformation such as map(). The following code snippet shows this property: The

Application Submission Guide describes how to submit applications to a cluster. In short, once you package your application into a JAR (for Java/Scala) or a set of .py or .zip files (for Python), the bin/spark-submit script allows you to submit it to any supported cluster administrator.

The org.apache.spark.launcher package provides classes for starting Spark jobs as child processes using a simple Java API.

Spark is unit testing friendly with any popular unit testing framework. Simply create a SparkContext in your test with the master URL set to local, run its operations, and then call SparkContext.stop() to shoot it down. Be sure to stop the context within a finally block or the tearDown method of the test framework, as Spark does not support two contexts running simultaneously in the same program.

You can see some examples of Spark programs on the Spark website. In addition, Spark includes several examples in the samples directory (Scala, Java, Python, R). You can run Java and Scala samples by passing the class name to the Spark bin/run-example script; for example

: For Python examples, use spark-submit instead: For

R examples, use spark-submit instead: For

help optimizing your programs, the setup and tuning guides provide information on best practices. They are especially important to make sure that your data is stored in memory in an efficient format. For deployment assistance, the cluster mode overview describes the components involved in distributed operation and the supported cluster administrators.

Finally,

full API documentation is available in Scala, Java, Python and R.

Contact US