Spark Session

Spark Session is the entry point for spark applications to create RDD, DataFrame, and Dataset.

In this article, we will discuss different was and options in creating and configuring spark session.

SparkSession object spark is the default variable available in spark-shell and it can be created programmatically using SparkSession builder pattern.

spark is the default variable available in spark-shell and pyspark when we start repl. Whereas in application development we create SparkSession object to interact with spark cluster.

Introduction To SparkSession

Below are few keys points about spark session.

  • org.apache.spark.sql.SparkSession class is used to create spark session object.

  • SparkSession.builder() builder pattern is used for creating SparkSession object.

  • Prior to spark 2.0, SparkContext, HiveContext, SQLContext were used to work with spark sql, rdds, etc., SparkSession is introduced to act as a single entry point while working with Spark SQL, RDDs.

  • However, SparkContext, HiveContext, SQLContext, etc., can still be used while working with spark. In fact, SparkSession internally creates SparkConfig and SparkContext with the provided configurations in SparkSession.

  • Spark Session includes the APIs available in contexts like,

    • SparkContext

    • SQLContext

    • StreamingContext

    • HiveContext

  • SparkSession.builder() or SparkSession.newSession() can be used to create one or more spark sessions. One or more Spark sessions objects may be required when we wanted to maintain Spark relational entities (tables) logically separated.

SparkSession In Spark Shell

Be default Spark shell creates spark object when we start spark-shell. spark object is an instance of SparkSession class.

We can start spark shell using spark-shell from cli as shown below,

spark-shell

We will see logs in the cli and after successful creation of SparkSession object, we see scala repl prompt as shown.

scala>

Notice the logs in the cli,

  • We can see the web UI URI
  • Spark context available as sc
  • master = local[*]
  • app id = local-1661178478951
  • Spark session available as spark
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.104:4040
Spark context available as 'sc' (master = local[*], app id = local-1661178478951).
Spark session available as 'spark'.
Welcome to
      ____              __
    / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
  /___/ .__/\_,_/_/ /_/\_\   version 3.0.3
      /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_282)
Type in expressions to have them evaluated.
Type :help for more information.

We can start pyspark shell using pyspark from cli as shown below,

pyspark

After creation of spark session object, python prompt will be shown in cli as >>>.

Example

Welcome to
    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /__ / .__/\_,_/_/ /_/\_\   version 3.0.3
    /_/

Using Python version 3.8.5 (default, Sep  4 2020 02:22:02)
SparkSession available as 'spark'.
>>>

SparkSession In Spark Applications

To create SparkSession in Scala or Python, we need to use the builder pattern method builder() and call getOrCreate() method. If SparkSession already exists it returns otherwise creates a new SparkSession.

Here, let’s see spark session creation and usage in spark application with an example.

package live.whiletrue.sde

import org.apache.spark.sql.SparkSession

object Sample extends App {
  // Create SparkSession from builder
  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("sde.whiletrue.live")
    .getOrCreate();

  println("First SparkContext:")
  println("APP Name :" + spark.sparkContext.appName)
  println("Deploy Mode :" + spark.sparkContext.deployMode)
  println("Master :" + spark.sparkContext.master)
}

# Create SparkSession from builder
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('sde.whiletrue.live') \
                    .getOrCreate()

print('First SparkContext:')
print(f'App Name: {spark.sparkContext.appName}')
print(f'Deploy Mode : {spark.sparkContext.deployMode}')
print(f'Master : {spark.sparkContext.master}')
import org.apache.spark.sql.SparkSession;

public class SparkJava {
  public static void main(String[] args) {
    // Create SparkSession from builder
    SparkSession spark = SparkSession.builder()
      .master("local[1]")
      .appName("sparkInJava")
      .getOrCreate();

    System.out.println("First SparkContext:");
    System.out.println("APP Name :" + spark.sparkContext().appName());
    System.out.println("Deploy Mode :" + spark.sparkContext().deployMode());
    System.out.println("Master :" + spark.sparkContext().master());
  }
}
  • Use master() – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either yarn or mesos depends on your cluster setup.

  • Use local when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.

  • appName() – Used to set your application name.

  • getOrCreate() – This returns a SparkSession object if already exists, and creates a new one if not exist.

Working With SparkSession

Either we use spark repl or spark application, the way we interact with spark will be same. So, in the following sections we will use spark repl.

Note: In most of the tools like jupyter notebooks, and Azure Databricks, the environment creates a default SparkSession object as spark

In this section we will be discussion on -

Set Spark Configurations

use config() method to add configurations while creating spark session object.

Syntax:

// Usage of config()
builder.config("<spark.some.config.option>", "<config-value>")

Example:

import org.apache.spark.sql.SparkSession

// Usage of config()
val spark = SparkSession.builder()
      .master("local[1]")
      .appName("sde.whiletrue.live")
      .config("spark.executor.memory", "3g")
      .getOrCreate()

// Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g")

Syntax:

# Usage of config()
builder.config("<spark.some.config.option>", "<config-value>")

Example:

from pyspark.sql import SparkSession

spark = SparkSession.builder\
      .master("local[1]")\
      .appName("sde.whiletrue.live")\
      .config("spark.executor.memory", "3g")\
      .getOrCreate()

# Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g")

Syntax:

builder.config("<spark.some.config.option>", "<config-value>") \

Example:

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession.builder()
      .master("local[1]")
      .appName("sde.whiletrue.live")
      .config("spark.executor.memory", "3g")
      .getOrCreate();

// Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g");

Get Spark Configurations

To get spark configurations at runtime, use getAll() method

val configMap: Map[String, String] = spark.conf.getAll
spark_configs = spark.conf
Map<String, String> configMap = spark.conf().getAll();

Get Existing SparkSession

getOrCreate() method in spark create the spark session object if not created.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .getOrCreate
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
                      .builder()
                      .getOrCreate();

Create An Another SparkSession

We can also create an another new SparkSession using newSession() method. This uses the same app name, master as the existing session. However, Underlying SparkContext will be the same for both sessions as we can have only one context per spark application.

Note: A spark application can have only one spark context object.

// Create a new SparkSession
val spark3 = spark.newSession()
# Create new SparkSession
spark = SparkSession.newSession
SparkSession spark = spark.newSession();

Create SparkSession With Hive Enabled

We need to enable enableHiveSupport() method to use Hive with Spark.

import org.apache.spark.sql.SparkSession

// Enabling Hive to use in Spark
val spark = SparkSession.builder()
      .master("local[1]")
      .appName("sde.whiletrue.live")
      .config("spark.sql.warehouse.dir", "<path>/spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()
from pyspark.sql import SparkSession

# Enabling Hive to use in Spark
spark = SparkSession.builder\
      .master("local[1]")\
      .appName("sde.whiletrue.live")\
      .config("spark.sql.warehouse.dir", "<path>/spark-warehouse")\
      .enableHiveSupport()\
      .getOrCreate()
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession.builder()
      .master("local[1]")
      .appName("sde.whiletrue.live")
      .config("spark.executor.memory", "3g")
      .enableHiveSupport()
      .getOrCreate();

Create DataFrame

The SparkSession also provides methods to create a Spark DataFrame and DataSet objects. The below example uses the createDataFrame() method which takes a list of data.

// Create DataFrame
val df = spark.createDataFrame(
  List(
    ("Rohith", 28), 
    ("Anusha", 29)
  )
)

df.show()
# Create DataFrame
df = spark.createDataFrame(
  [
    ("Rohith", 28), 
    ("Anusha", 29)
  ]
)

df.show()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.sparkproject.guava.collect.ImmutableList;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schemata = DataTypes.createStructType(
        new StructField[]{
                createStructField("NAME", StringType, false),
                createStructField("AGE", IntegerType, false),
        });
Row r1 = RowFactory.create("Rohith", 28);
Row r2 = RowFactory.create("Anusha", 29);
List<Row> rowList = ImmutableList.of(r1, r2);
Dataset<Row> df = spark.createDataFrame(rowList, schemata);

Create Hive Table

As explained above SparkSession is used to create and query Hive tables. Note that in order to do this for testing you don’t need Hive to be installed. saveAsTable() creates Hive managed table. Query the table using spark.sql().

// Create Hive table named my_hive_table & query the table.  
spark.table("my_table").write.saveAsTable("my_hive_table")
val df = spark.sql("SELECT _1,_2 FROM my_hive_table")
# Create Hive table named my_hive_table & query the table.  
spark.table("my_table").write.saveAsTable("my_hive_table")
df = spark.sql("SELECT _1,_2 FROM my_hive_table")
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Create Hive table named my_hive_table & query the table.  
spark.table("my_table").write.saveAsTable("my_hive_table");
Dataset<Row> df = spark.sql("SELECT _1,_2 FROM my_hive_table");

Working With Catalogs

To get the catalog metadata, Spark Session exposes catalog variable. Note that these methods spark.catalog.listDatabases and spark.catalog.listTables and returns the DataSet.

val db_df = spark.catalog.listDatabases
val dt_df = spark.catalog.listTables
db_df = spark.catalog.listDatabases
dt_df = spark.catalog.listTables
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> db_df = spark.catalog().listDatabases();
Dataset<Row> dt_df = spark.catalog().listTables();

Frequently Used SparkSession Methods

methodDesrcription
versionReturns Spark version where your application is running, probably the Spark version your cluster is configured with.
catalogReturns the catalog object to access metadata.
confReturns the RuntimeConfig object.
builder()builder() is used to create a new SparkSession, this return SparkSession.Builder
newSession()Creaetes a new SparkSession.
range(n)Returns a single column Dataset with LongType and column named id, containing elements in a range from 0 to n (exclusive) with
createDataFrame()This creates a DataFrame from a collection and an RDD
createDataset()This creates a Dataset from the collection, DataFrame, and RDD.
emptyDataFrame()Creates an empty DataFrame.
emptyDataset()Creates an empty Dataset.
getActiveSession()Returns an active Spark session for the current thread.
getDefaultSession()Returns the default SparkSession that is returned by the builder.
implicits()To access the nested Scala object.
read()Returns an instance of DataFrameReader class, this is used to read records from CSV, Parquet, Avro, and more file formats into
readStream()Returns an instance of DataStreamReader class, this is used to read streaming data. that can be used to read streaming data
sparkContext()Returns a SparkContext.
sql(String sql)Returns a DataFrame after executing the SQL mentioned.
sqlContext()Returns SQLContext.
stop()Stop the current SparkContext.
table()Returns a DataFrame of a table or view.
udf()Creates a Spark UDF to use it on DataFrame, Dataset, and SQL.
apache spark bigdata distributed-system spark-session

Subscribe For More Content