Parallelize() In Spark
Parallelize() is the SparkContext method used to create rdd from the list of elements.
RDD (Resilient Distributed Dataset) is the fundamental and immutable data structure in apache spark. Spark logically divides RDDs into multiple parts and distributes the data to the available nodes for further processing.
Read about Resilient Distributed Dataset - RDD
Parallelize is the SparkContext
method.
Read about SparkContext and SparkSession
Parallelize Usage In spark-shell
When we start spark-shell
or pyspark
, SparkSession will be automatically created for us before we get repl
prompt in the cli.
SparkSession intern creates SparkContext
Below example explains how to use get the sparkContext object using SparkSession instance and usage of parallelize method.
// get spark context
scala> spark
// res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2af5eab6
scala> spark.sparkContext
// res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4374c051
// We can also use `sc` in spark-shell to get spark context
scala> sc
// res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4374c051
// Usage of parallelize method to produce RDD
scala> val range = Range(0, 100)
// range: scala.collection.immutable.Range = Range 0 until 100
scala> val rdd = sc.parallelize(range)
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:26
scala> rdd.collect
// res3: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
scala> rdd.getNumPartitions
// res4: Int = 16
####################
## Get spark context
####################
>>> sc
# OR
>>> sc = spark.sparkContext
#############################
## Generate Data using range
#############################
>>> data = range(0, 100)
# apply parallelize on the data
>>> rdd = sc.parallelize(data)
>>> rdd.getNumPartitions()
# 16
>>> rdd.collect()
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
Parallelize Usage In Spark Applications
package live.whiletrue.sde
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Parallelize {
def main(args: Array[String]): Unit = {
// create spark session
val spark = SparkSession
.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.getOrCreate()
// generate sample data
val sampleData = Range(0, 100)
// create rdd from sample data using parallelize
val rdd: RDD[Int] = spark.sparkContext.parallelize(sampleData)
// print rdd info
println(s"Default number of partitions: ${rdd.getNumPartitions}")
println(s"Get first element from RDD: ${rdd.first()}")
// Apply action on rdd to pull the data
val collectRDD: Array[Int] = rdd.collect()
// print rdd data
collectRDD.foreach(println)
}
}
def parallelize():
# Create SparksSession
spark = SparkSession\
.builder\
.master('master[1]')\
.appName('sde.whiletrue.live')\
.getOrCreate()
# Create SparkContext
sc = spark.sparkContext
# Create Sample Data
data = range(100)
# Create RDD from the sample data
rdd = sc.parallelize(data)
# print the rdd partitions and first element
print(f'Default rdd partitions: {rdd.getNumPartitions}')
print(f'Get rdd first element: {rdd.first}')
# collect rdd
rdd_collect = rdd.collect()
# print rdd elements
print(rdd_collect)
if __name__ == '__main__':
parallelize()