A fault-tolerant collection of elements that can be operated on in parallel: “Resilient Distributed Dataset” a.k.a. RDD
RDD (Resilient Distributed Dataset) is the fundamental data structure of Apache Spark which are an immutable collection of objects which computes on the different node of the cluster. Each and every dataset in Spark RDD is logically partitioned across many servers so that they can be computed on different nodes of the cluster.
RDDs provide a restricted form of shared memory, based on coarse-grained transformations rather than fine-grained updates to shared state.
Coarse-grained transformations are those that are applied over an entire dataset. On the other hand, a fine grained transaction is one applied on smaller set, may be a single row. But with fine grained transactions you have to save the updates which can be costlier but it is flexible than a coarse grained one.
Need for RDD?
Data reuse is common in many iterative machine learning and graph algorithms, including PageRank, K-means clustering, and logistic regression. Another compelling use case is interactive data mining, where a user runs multiple ad-hoc queries on the same subset of the data.
Unfortunately, in most frameworks, the only way to reuse data between computations (e.g., between two MapReduce jobs) is to write it to an external stable storage system, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serialization, which can dominate application execution times.
RDDs solve these problems by enabling fault-tolerant distributed In-memory computations.
Spark runtime: The user’s driver program launches multiple workers, which read data blocks from a distributed file system and can persist computed RDD partitions in memory.
Features of RDDs:
- Resilient, i.e. fault-tolerant with the help of RDD lineage graph [DAG] and so able to recompute missing or damaged partitions due to node failures.
- Distributed, since Data resides on multiple nodes.
- Dataset represents records of the data you work with. The user can load the dataset externally which can be either JSON file, CSV file, text file or database via JDBC with no specific data structure.
- Lazy evaluated, i.e. the data inside RDD is not available or transformed until an action is executed that triggers the execution.
- Cacheable, i.e. you can hold all the data in a persistent “storage” like memory (default and the most preferred) or disk (the least preferred due to access speed).
- Location-Stickiness — RDD can define placement preferences to compute partitions (as close to the records as possible). Preferred Locations is basically information about the locations of RDD records (that Spark’s DAGScheduler uses to place computing partitions on to have the tasks as close to the data as possible)
- Immutable or Read-Only, i.e. it does not change once created and can only be transformed using transformations to new RDDs.
RDD Api in Spark
RDD is a common interface that exposes five pieces of information: a set of partitions, which are atomic pieces of the dataset; a set of dependencies on parent RDDs; a function for computing the dataset based on its parents; and metadata about its partitioning scheme and data placement.
For example, its partitioning scheme and data placement. For example, an RDD representing an HDFS file has a partition for each block of the file and knows which machines each block is on. Meanwhile, the result of a map on this RDD has the same partitions, but applies the map function to the parent’s data when computing its elements.
Types of RDD
RDD is an abstract class in Scala. There are several subclasses that extend RDD. Some of them are:
- ParallelCollectionRDD – An RDD created by Spark Context through parallelizing an existing collection. Eg. sc.parallelize, sc.makeRDD
- CoGroupedRDD – An RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a tuple with the list of values for that key.
- HadoopRDD is an RDD that provides core functionality for reading data stored in HDFS using the older MapReduce API. The most notable use case is the return RDD of SparkContext.textFile.
- MapPartitionsRDD – An RDD that applies the provided function to every partition of the parent RDD. A result of calling operations like map, flatMap, filter, mapPartitions, etc.
- CoalescedRDD – a result of repartition or coalesce transformations.
- ShuffledRDD – a result of shuffling, e.g. after repartition or coalesce transformations.
- PipedRDD – an RDD created by piping elements to a forked external process.
- SequenceFileRDD – is an RDD that can be saved as a SequenceFile.
Who should use RDD?
RDDs are best suited for batch applications that apply the same operation to all elements of a dataset. In these cases, RDDs can efficiently remember each transformation as one step in a lineage graph and can recover lost partitions without having to log large amounts of data.
RDDs would be less suitable for applications that make asynchronous fine-grained updates to shared state, such as a storage system for a web application or an incremental web crawler. For these applications, it is more efficient to use systems that perform traditional update logging and data checkpointing, such as databases, RAMCloud, Percolator and Piccolo.
We have enough background knowledge about RDD that what is, how it is and why it is. Its time to do some practical and bring these theories into existence.
There are three ways to create RDD:
1. Parallelized Collection: The first way for creating RDD is through the parallelizing the collection.
SparkContext.parallelize method distributes a local Scala collection to form an RDD.
val sc = new SparkContext(newSparkConf().setMaster("local[*]").setAppName("Demo App")) val rdd = sc.parallelize(List(1,2,3,4,5)) val result = rdd.map(_ *5).filter(_ <span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span><15).count println(result)
2. External Datasets: We can load the data from external data source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase etc. We can create textfile RDDs by sparkcontext’s textfile method. This method uses the URL for the file (either a local path on the machine or database or a hdfs://, s3n://, e
tc URL). It also reads whole as a collection of lines.
NOTE: The path of the local system and worker node should always be similar. The file should be available at the same place in the local file system and worker node.
DataFrameReader Interface is used to load a Dataset from external storage systems (e.g. file systems, key-value stores, etc). Use SparkSession.read to access an instance of DataFrameReader.DataFrameReader supports many file formats-
(i) CSV File:
val sparkSession = SparkSession .builder() .master("local[*]") .appName("My_app") .getOrCreate() val csvData = sparkSession.read.csv("/home/wokspace/data.csv).rdd
Here .rdd method is used to convert Dataset to RDD.
(ii) JSON file
It loads a JSON file (one object per line) and returns the result as a Dataset
val jsonData = sparkSession.read.json("/home/workspace/data.json").rdd
It loads text files and returns a Dataset of String.
val textFileData = sparkSession.read.textFile("home/workspace/data.txt").rdd
3. Existing RDD: As mentioned before, transformations create a new RDD from the existing RDD. Any transformation function would take an RDD as a parameter and returns an RDD without changing the original RDD since RDD are immutable objects. Transformation operations include map, flatMap, filter etc.
val naturalNumbersRdd: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) val evenNumbersRdd: RDD[Int] = naturalNumbersRdd.filter(_ % 2 == 0) val oddNumbersRdd: RDD[Int] = naturalNumbersRdd.filter(_ % 2 != 0) oddNumbersRdd.foreach(println)
As you can see, evenNumbersRdd and oddNumbersRdd are RDDs created from the existing naturalNumbersRdd.
These were the major three ways to create RDD in Spark.
So, that was ground knowledge for Spark’s powerful weapon RDD. I hope it helped you. 🙂