Spark Session
Spark Session is the entry point for spark applications to create RDD, DataFrame, and Dataset.
In this article, we will discuss different was and options in creating and configuring spark session.
SparkSession object spark is the default variable available in spark-shell and it can be created programmatically using SparkSession builder pattern.
spark is the default variable available in spark-shell and pyspark when we start repl. Whereas in application development we create SparkSession object to interact with spark cluster.
Introduction To SparkSession
Below are few keys points about spark session.
org.apache.spark.sql.SparkSessionclass is used to create spark session object.SparkSession.builder()builder pattern is used for creatingSparkSessionobject.Prior to spark 2.0,
SparkContext,HiveContext,SQLContextwere used to work with spark sql, rdds, etc., SparkSession is introduced to act as a single entry point while working with Spark SQL, RDDs.However,
SparkContext,HiveContext,SQLContext, etc., can still be used while working with spark. In fact,SparkSessioninternally createsSparkConfigandSparkContextwith the provided configurations inSparkSession.Spark Session includes the APIs available in contexts like,
SparkContext
SQLContext
StreamingContext
HiveContext
SparkSession.builder()orSparkSession.newSession()can be used to create one or more spark sessions. One or more Spark sessions objects may be required when we wanted to maintain Spark relational entities (tables) logically separated.
SparkSession In Spark Shell
Be default Spark shell creates spark object when we start spark-shell. spark object is an instance of SparkSession class.
We can start spark shell using spark-shell from cli as shown below,
spark-shell
We will see logs in the cli and after successful creation of SparkSession object, we see scala repl prompt as shown.
scala>
Notice the logs in the cli,
- We can see the web UI URI
- Spark context available as
sc - master = local[*]
- app id = local-1661178478951
- Spark session available as
spark
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.104:4040
Spark context available as 'sc' (master = local[*], app id = local-1661178478951).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.3
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_282)
Type in expressions to have them evaluated.
Type :help for more information.
We can start pyspark shell using pyspark from cli as shown below,
pyspark
After creation of spark session object, python prompt will be shown in cli as >>>.
Example
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.3
/_/
Using Python version 3.8.5 (default, Sep 4 2020 02:22:02)
SparkSession available as 'spark'.
>>>
SparkSession In Spark Applications
To create SparkSession in Scala or Python, we need to use the builder pattern method builder() and call getOrCreate() method. If SparkSession already exists it returns otherwise creates a new SparkSession.
Here, let’s see spark session creation and usage in spark application with an example.
package live.whiletrue.sde
import org.apache.spark.sql.SparkSession
object Sample extends App {
// Create SparkSession from builder
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.getOrCreate();
println("First SparkContext:")
println("APP Name :" + spark.sparkContext.appName)
println("Deploy Mode :" + spark.sparkContext.deployMode)
println("Master :" + spark.sparkContext.master)
}
# Create SparkSession from builder
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName('sde.whiletrue.live') \
.getOrCreate()
print('First SparkContext:')
print(f'App Name: {spark.sparkContext.appName}')
print(f'Deploy Mode : {spark.sparkContext.deployMode}')
print(f'Master : {spark.sparkContext.master}')
import org.apache.spark.sql.SparkSession;
public class SparkJava {
public static void main(String[] args) {
// Create SparkSession from builder
SparkSession spark = SparkSession.builder()
.master("local[1]")
.appName("sparkInJava")
.getOrCreate();
System.out.println("First SparkContext:");
System.out.println("APP Name :" + spark.sparkContext().appName());
System.out.println("Deploy Mode :" + spark.sparkContext().deployMode());
System.out.println("Master :" + spark.sparkContext().master());
}
}
Use master() – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either yarn or mesos depends on your cluster setup.
Use local when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
appName() – Used to set your application name.
getOrCreate() – This returns a SparkSession object if already exists, and creates a new one if not exist.
Working With SparkSession
Either we use spark repl or spark application, the way we interact with spark will be same. So, in the following sections we will use spark repl.
Note: In most of the tools like jupyter notebooks, and Azure Databricks, the environment creates a default
SparkSessionobject asspark
In this section we will be discussion on -
- Setting spark configs
- Getting spark configs
- Getting existing spark session
- Creating an another new spark session
- Create sparkSession with hive enabled
- Creating DataFrame using spark session
- Creating hive table
- Working with hive catalogs
Set Spark Configurations
use config() method to add configurations while creating spark session object.
Syntax:
// Usage of config()
builder.config("<spark.some.config.option>", "<config-value>")
Example:
import org.apache.spark.sql.SparkSession
// Usage of config()
val spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.executor.memory", "3g")
.getOrCreate()
// Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g")
Syntax:
# Usage of config()
builder.config("<spark.some.config.option>", "<config-value>")
Example:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[1]")\
.appName("sde.whiletrue.live")\
.config("spark.executor.memory", "3g")\
.getOrCreate()
# Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g")
Syntax:
builder.config("<spark.some.config.option>", "<config-value>") \
Example:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.executor.memory", "3g")
.getOrCreate();
// Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g");
Get Spark Configurations
To get spark configurations at runtime, use getAll() method
val configMap: Map[String, String] = spark.conf.getAll
spark_configs = spark.conf
Map<String, String> configMap = spark.conf().getAll();
Get Existing SparkSession
getOrCreate() method in spark create the spark session object if not created.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.getOrCreate
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.getOrCreate();
Create An Another SparkSession
We can also create an another new SparkSession using newSession() method. This uses the same app name, master as the existing session. However, Underlying SparkContext will be the same for both sessions as we can have only one context per spark application.
Note: A spark application can have only one spark context object.
// Create a new SparkSession
val spark3 = spark.newSession()
# Create new SparkSession
spark = SparkSession.newSession
SparkSession spark = spark.newSession();
Create SparkSession With Hive Enabled
We need to enable enableHiveSupport() method to use Hive with Spark.
import org.apache.spark.sql.SparkSession
// Enabling Hive to use in Spark
val spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.sql.warehouse.dir", "<path>/spark-warehouse")
.enableHiveSupport()
.getOrCreate()
from pyspark.sql import SparkSession
# Enabling Hive to use in Spark
spark = SparkSession.builder\
.master("local[1]")\
.appName("sde.whiletrue.live")\
.config("spark.sql.warehouse.dir", "<path>/spark-warehouse")\
.enableHiveSupport()\
.getOrCreate()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.executor.memory", "3g")
.enableHiveSupport()
.getOrCreate();
Create DataFrame
The SparkSession also provides methods to create a Spark DataFrame and DataSet objects. The below example uses the createDataFrame() method which takes a list of data.
// Create DataFrame
val df = spark.createDataFrame(
List(
("Rohith", 28),
("Anusha", 29)
)
)
df.show()
# Create DataFrame
df = spark.createDataFrame(
[
("Rohith", 28),
("Anusha", 29)
]
)
df.show()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.sparkproject.guava.collect.ImmutableList;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schemata = DataTypes.createStructType(
new StructField[]{
createStructField("NAME", StringType, false),
createStructField("AGE", IntegerType, false),
});
Row r1 = RowFactory.create("Rohith", 28);
Row r2 = RowFactory.create("Anusha", 29);
List<Row> rowList = ImmutableList.of(r1, r2);
Dataset<Row> df = spark.createDataFrame(rowList, schemata);
Create Hive Table
As explained above SparkSession is used to create and query Hive tables. Note that in order to do this for testing you don’t need Hive to be installed. saveAsTable() creates Hive managed table. Query the table using spark.sql().
// Create Hive table named my_hive_table & query the table.
spark.table("my_table").write.saveAsTable("my_hive_table")
val df = spark.sql("SELECT _1,_2 FROM my_hive_table")
# Create Hive table named my_hive_table & query the table.
spark.table("my_table").write.saveAsTable("my_hive_table")
df = spark.sql("SELECT _1,_2 FROM my_hive_table")
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Create Hive table named my_hive_table & query the table.
spark.table("my_table").write.saveAsTable("my_hive_table");
Dataset<Row> df = spark.sql("SELECT _1,_2 FROM my_hive_table");
Working With Catalogs
To get the catalog metadata, Spark Session exposes catalog variable. Note that these methods spark.catalog.listDatabases and spark.catalog.listTables and returns the DataSet.
val db_df = spark.catalog.listDatabases
val dt_df = spark.catalog.listTables
db_df = spark.catalog.listDatabases
dt_df = spark.catalog.listTables
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> db_df = spark.catalog().listDatabases();
Dataset<Row> dt_df = spark.catalog().listTables();
Frequently Used SparkSession Methods
| method | Desrcription |
|---|---|
version | Returns Spark version where your application is running, probably the Spark version your cluster is configured with. |
catalog | Returns the catalog object to access metadata. |
conf | Returns the RuntimeConfig object. |
builder() | builder() is used to create a new SparkSession, this return SparkSession.Builder |
newSession() | Creaetes a new SparkSession. |
range(n) | Returns a single column Dataset with LongType and column named id, containing elements in a range from 0 to n (exclusive) with |
createDataFrame() | This creates a DataFrame from a collection and an RDD |
createDataset() | This creates a Dataset from the collection, DataFrame, and RDD. |
emptyDataFrame() | Creates an empty DataFrame. |
emptyDataset() | Creates an empty Dataset. |
getActiveSession() | Returns an active Spark session for the current thread. |
getDefaultSession() | Returns the default SparkSession that is returned by the builder. |
implicits() | To access the nested Scala object. |
read() | Returns an instance of DataFrameReader class, this is used to read records from CSV, Parquet, Avro, and more file formats into |
readStream() | Returns an instance of DataStreamReader class, this is used to read streaming data. that can be used to read streaming data |
sparkContext() | Returns a SparkContext. |
sql(String sql) | Returns a DataFrame after executing the SQL mentioned. |
sqlContext() | Returns SQLContext. |
stop() | Stop the current SparkContext. |
table() | Returns a DataFrame of a table or view. |
udf() | Creates a Spark UDF to use it on DataFrame, Dataset, and SQL. |