Home » Tech Tips » PySpark RDD Basics Cheat Sheet

PySpark RDD Basics Cheat Sheet

This PySpark RDD Basics Cheat Sheet with code samples covers the basics like initializing Spark in Python, loading data, sorting, and repartitioning.

Apache Spark is generally known as a fast, general and open-source engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. It allows you to speed analytic applications up to 100 times faster compared to technologies on the market today. You can interface Spark with Python through “PySpark”. This is the Spark Python API exposes the Spark programming model to Python.

Even though working with Spark will remind you of many ways of working with Pandas DataFrames, you’ll also see that it can be tough getting familiar with all the functions that you can use to query, transform, inspect, … your data. What’s more, if you’ve never worked with any other programming language or if you’re new to the field, it might be hard to distinguish between RDD operations.

Let’s face it, map() and flatMap() are different enough, but it might still come as a challenge to decide which one you really need when you’re faced with them in your analysis. Or what about other functions, like reduce() and reduceByKey()?

Even though the documentation is very elaborate, it never hurts to have a cheat sheet by your side, especially when you’re just getting into it.

This PySpark cheat sheet covers the basics, from initializing Spark and loading your data, to retrieving RDD information, sorting, filtering and sampling your data. But that’s not all. You’ll also see that topics such as repartitioning, iterating, merging, saving your data and stopping the SparkContext are included in the cheat sheet.

Related:  PySpark - unionByName()

Note that the examples in the document take small data sets to illustrate the effect of specific functions on your data. In real life data analysis, you’ll be using Spark to analyze big data.

PySpark is the Spark Python API that exposes the Spark programming model to Python.

Initializing Spark 

SparkContext 

from pyspark import SparkContext
sc = SparkContext(master = 'local[2]')

Inspect SparkContext 

sc.version #Retrieve SparkContext version
sc.pythonVer #Retrieve Python version
sc.master #Master URL to connect to
str(sc.sparkHome) #Path where Spark is installed on worker nodes
str(sc.sparkUser()) #Retrieve name of the Spark User running SparkContext
sc.appName #Return application name
sc.applicationld #Retrieve application ID
sc.defaultParallelism #Return default level of parallelism
sc.defaultMinPartitions #Default minimum number of partitions for RDDs

Configuration

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
    .setMaster("local")
    .setAppName("My app")
    . set   ("spark. executor.memory",   "lg"))
sc = SparkContext(conf = conf)

Using the Shell

In the PySpark shell, a special interpreter-aware SparkContext is already created in the variable called sc.

./bin/spark-shell --master local[2]
./bin/pyspark --master local[s] --py-files code.py

Set which master the context connects to with the –master argument, and add Python .zip..egg or.py files to the

runtime path by passing a comma-separated list to  –py-files.

Loading Data 

Parallelized Collections 

rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd = sc.parallelize([("a",["x","y","z"]),
               ("b" ["p","r,"])])

External Data

Read either one text file from HDFS, a local file system or any Hadoop-supported file system URI with textFile(), or read in a directory of text files with wholeTextFiles().

textFile = sc.textFile("/my/directory/•.txt")
textFile2 = sc.wholeTextFiles("/my/directory")

Retrieving RDD Information

Basic Information

rdd.getNumPartitions() #List the number of partitions
rdd.count() #Count RDD instances 3
rdd.countByKey() #Count RDD instances by key
#defaultdict(<type 'int'>,{'a':2,'b':1})
rdd.countByValue() #Count RDD instances by value
#defaultdict(<type 'int'>,{('b',2):1,('a',2):1,('a',7):1})
rdd.collectAsMap() #Return (key,value) pairs as a dictionary
#  {'a': 2, 'b': 2}
rdd3.sum() #Sum of RDD elements 4950
sc.parallelize([]).isEmpty() #Check whether RDD is empty
#True

Summary

rdd3.max() #Maximum value of RDD elements 
#99
rdd3.min() #Minimum value of RDD elements
#0
rdd3.mean() #Mean value of RDD elements 
#49.5
rdd3.stdev() #Standard deviation of RDD elements 
#28.866070047722118
rdd3.variance() #Compute variance of RDD elements 
#833.25
rdd3.histogram(3) #Compute histogram by bins
#([0,33,66,99],[33,33,34])
rdd3.stats() #Summary statistics (count, mean, stdev, max & min)

Applying Functions

#Apply a function to each RFD element
rdd.map(lambda x: x+(x[1],x[0])).collect()
#[('a' ,7,7, 'a'),('a' ,2,2, 'a'), ('b' ,2,2, 'b')]
#Apply a function to each RDD element and flatten the result
rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0]))
rdd5.collect()
#['a',7 , 7 ,  'a' , 'a' , 2,  2,  'a', 'b', 2 , 2, 'b']
#Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys
#rdds.flatMapValues(lambda x: x).collect()
#[('a', 'x'), ('a', 'y'), ('a', 'z'),('b', 'p'),('b', 'r')]

Selecting Data

Getting

rdd.collect() #Return a list with all RDD elements 
#[('a', 7), ('a', 2), ('b', 2)]
rdd.take(2) #Take first 2 RDD elements 
#[('a', 7),  ('a', 2)]
rdd.first() #Take first RDD element
#('a', 7)
rdd.top(2) #Take top 2 RDD elements 
#[('b', 2), ('a', 7)]

Sampling

rdd3.sample(False, 0.15, 81).collect() #Return sampled subset of rdd3
 #[3,4,27,31,40,41,42,43,60,76,79,80,86,97]

Filtering

rdd.filter(lambda x: "a" in x).collect() #Filter the RDD
#[('a',7),('a',2)]
rdd5.distinct().collect() #Return distinct RDD values
#['a' ,2, 'b',7]
rdd.keys().collect() #Return (key,value) RDD's keys
#['a',  'a',  'b']

Iterating

def g (x): print(x)
rdd.foreach(g) #Apply a function to all RDD elements
#('a', 7)
#('b', 2)
#('a', 2)

Reshaping Data

Reducing

rdd.reduceByKey(lambda x,y : x+y).collect() #Merge the rdd values for each key
#[('a',9),('b',2)]
rdd.reduce(lambda a, b: a+ b) #Merge the rdd values
#('a', 7, 'a' , 2 , 'b' , 2)

Grouping by

rdd3.groupBy(lambda x: x % 2) #Return RDD of grouped values
         .mapValues(list)
         .collect()
rdd.groupByKey() #Group rdd by key
         .mapValues(list)
         .collect() 
#[('a',[7,2]),('b',[2])]

Aggregating

seqOp = (lambda x,y: (x[0]+y,x[1]+1))
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
#Aggregate RDD elements of each partition and then the results
rdd3.aggregate((0,0),seqOp,combOp) 
#(4950,100)
#Aggregate values of each RDD key
rdd.aggregateByKey((0,0),seqop,combop).collect() 
#    [('a',(9,2)), ('b',(2,1))]
#Aggregate the elements of each partition, and then the results
rdd3.fold(0,add)
#     4950
#Merge the values for each key
rdd.foldByKey(0, add).collect()
#[('a' ,9), ('b' ,2)]
#Create tuples of RDD elements by applying a function
rdd3.keyBy(lambda x: x+x).collect()

Mathematical Operations

rdd.subtract(rdd2).collect() #Return each rdd value not contained in rdd2
#[('b' ,2), ('a' ,7)]
#Return each (key,value) pair of rdd2 with no matching key in rdd
rdd2.subtractByKey(rdd).collect()
#[('d', 1)1
rdd.cartesian(rdd2).collect() #Return the Cartesian product of rdd and rdd2

Sort

rdd2.sortBy(lambda x: x[1]).collect() #Sort RDD by given function
#[('d',1),('b',1),('a',2)]
rdd2.sortByKey().collect() #Sort (key, value) ROD by key
#[('a' ,2), ('b' ,1), ('d' ,1)]

Repartitioning

rdd.repartition(4) #New RDD with 4 partitions
rdd.coalesce(1) #Decrease the number of partitions in the RDD to 1

Saving

rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs:// namenodehost/parent/child",
              'org.apache.hadoop.mapred.TextOutputFormat')

Stopping SparkContext

sc.stop()

Execution

./bin/spark-submit examples/src/main/python/pi.py

Download PySpark RDD Basics Cheat Sheet

 

1 thought on “PySpark RDD Basics Cheat Sheet”

Leave a Comment