Spark Session
Spark Session is the entry point for spark applications to create RDD, DataFrame, and Dataset.
In this article, we will discuss different was and options in creating and configuring spark session.
SparkSession object spark is the default variable available in spark-shell and it can be created programmatically using SparkSession builder pattern.
spark
is the default variable available in spark-shell
and pyspark
when we start repl. Whereas in application development we create SparkSession
object to interact with spark cluster.
Introduction To SparkSession
Below are few keys points about spark session.
org.apache.spark.sql.SparkSession
class is used to create spark session object.SparkSession.builder()
builder pattern is used for creatingSparkSession
object.Prior to spark 2.0,
SparkContext
,HiveContext
,SQLContext
were used to work with spark sql, rdds, etc., SparkSession is introduced to act as a single entry point while working with Spark SQL, RDDs.However,
SparkContext
,HiveContext
,SQLContext
, etc., can still be used while working with spark. In fact,SparkSession
internally createsSparkConfig
andSparkContext
with the provided configurations inSparkSession
.Spark Session includes the APIs available in contexts like,
SparkContext
SQLContext
StreamingContext
HiveContext
SparkSession.builder()
orSparkSession.newSession()
can be used to create one or more spark sessions. One or more Spark sessions objects may be required when we wanted to maintain Spark relational entities (tables) logically separated.
SparkSession In Spark Shell
Be default Spark shell creates spark
object when we start spark-shell
. spark
object is an instance of SparkSession
class.
We can start spark shell using spark-shell
from cli as shown below,
spark-shell
We will see logs in the cli and after successful creation of SparkSession
object, we see scala repl prompt as shown.
scala>
Notice the logs in the cli,
- We can see the web UI URI
- Spark context available as
sc
- master = local[*]
- app id = local-1661178478951
- Spark session available as
spark
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.104:4040
Spark context available as 'sc' (master = local[*], app id = local-1661178478951).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.3
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_282)
Type in expressions to have them evaluated.
Type :help for more information.
We can start pyspark shell using pyspark
from cli as shown below,
pyspark
After creation of spark session object, python prompt will be shown in cli as >>>
.
Example
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.3
/_/
Using Python version 3.8.5 (default, Sep 4 2020 02:22:02)
SparkSession available as 'spark'.
>>>
SparkSession In Spark Applications
To create SparkSession
in Scala or Python, we need to use the builder pattern method builder()
and call getOrCreate()
method. If SparkSession
already exists it returns otherwise creates a new SparkSession
.
Here, let’s see spark session creation and usage in spark application with an example.
package live.whiletrue.sde
import org.apache.spark.sql.SparkSession
object Sample extends App {
// Create SparkSession from builder
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.getOrCreate();
println("First SparkContext:")
println("APP Name :" + spark.sparkContext.appName)
println("Deploy Mode :" + spark.sparkContext.deployMode)
println("Master :" + spark.sparkContext.master)
}
# Create SparkSession from builder
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName('sde.whiletrue.live') \
.getOrCreate()
print('First SparkContext:')
print(f'App Name: {spark.sparkContext.appName}')
print(f'Deploy Mode : {spark.sparkContext.deployMode}')
print(f'Master : {spark.sparkContext.master}')
import org.apache.spark.sql.SparkSession;
public class SparkJava {
public static void main(String[] args) {
// Create SparkSession from builder
SparkSession spark = SparkSession.builder()
.master("local[1]")
.appName("sparkInJava")
.getOrCreate();
System.out.println("First SparkContext:");
System.out.println("APP Name :" + spark.sparkContext().appName());
System.out.println("Deploy Mode :" + spark.sparkContext().deployMode());
System.out.println("Master :" + spark.sparkContext().master());
}
}
Use master() – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either yarn or mesos depends on your cluster setup.
Use local when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
appName() – Used to set your application name.
getOrCreate() – This returns a SparkSession object if already exists, and creates a new one if not exist.
Working With SparkSession
Either we use spark repl or spark application, the way we interact with spark will be same. So, in the following sections we will use spark repl
.
Note: In most of the tools like jupyter notebooks, and Azure Databricks, the environment creates a default
SparkSession
object asspark
In this section we will be discussion on -
- Setting spark configs
- Getting spark configs
- Getting existing spark session
- Creating an another new spark session
- Create sparkSession with hive enabled
- Creating DataFrame using spark session
- Creating hive table
- Working with hive catalogs
Set Spark Configurations
use config()
method to add configurations while creating spark session object.
Syntax:
// Usage of config()
builder.config("<spark.some.config.option>", "<config-value>")
Example:
import org.apache.spark.sql.SparkSession
// Usage of config()
val spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.executor.memory", "3g")
.getOrCreate()
// Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g")
Syntax:
# Usage of config()
builder.config("<spark.some.config.option>", "<config-value>")
Example:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[1]")\
.appName("sde.whiletrue.live")\
.config("spark.executor.memory", "3g")\
.getOrCreate()
# Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g")
Syntax:
builder.config("<spark.some.config.option>", "<config-value>") \
Example:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.executor.memory", "3g")
.getOrCreate();
// Create Spark Session at runtime
spark.conf.set("spark.executor.memory", "4g");
Get Spark Configurations
To get spark configurations at runtime, use getAll()
method
val configMap: Map[String, String] = spark.conf.getAll
spark_configs = spark.conf
Map<String, String> configMap = spark.conf().getAll();
Get Existing SparkSession
getOrCreate()
method in spark create the spark session object if not created.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.getOrCreate
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.getOrCreate();
Create An Another SparkSession
We can also create an another new SparkSession
using newSession()
method. This uses the same app name, master as the existing session. However, Underlying SparkContext
will be the same for both sessions as we can have only one context per spark application.
Note: A spark application can have only one spark context object.
// Create a new SparkSession
val spark3 = spark.newSession()
# Create new SparkSession
spark = SparkSession.newSession
SparkSession spark = spark.newSession();
Create SparkSession With Hive Enabled
We need to enable enableHiveSupport()
method to use Hive with Spark.
import org.apache.spark.sql.SparkSession
// Enabling Hive to use in Spark
val spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.sql.warehouse.dir", "<path>/spark-warehouse")
.enableHiveSupport()
.getOrCreate()
from pyspark.sql import SparkSession
# Enabling Hive to use in Spark
spark = SparkSession.builder\
.master("local[1]")\
.appName("sde.whiletrue.live")\
.config("spark.sql.warehouse.dir", "<path>/spark-warehouse")\
.enableHiveSupport()\
.getOrCreate()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession.builder()
.master("local[1]")
.appName("sde.whiletrue.live")
.config("spark.executor.memory", "3g")
.enableHiveSupport()
.getOrCreate();
Create DataFrame
The SparkSession also provides methods to create a Spark DataFrame and DataSet objects. The below example uses the createDataFrame()
method which takes a list of data.
// Create DataFrame
val df = spark.createDataFrame(
List(
("Rohith", 28),
("Anusha", 29)
)
)
df.show()
# Create DataFrame
df = spark.createDataFrame(
[
("Rohith", 28),
("Anusha", 29)
]
)
df.show()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.sparkproject.guava.collect.ImmutableList;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schemata = DataTypes.createStructType(
new StructField[]{
createStructField("NAME", StringType, false),
createStructField("AGE", IntegerType, false),
});
Row r1 = RowFactory.create("Rohith", 28);
Row r2 = RowFactory.create("Anusha", 29);
List<Row> rowList = ImmutableList.of(r1, r2);
Dataset<Row> df = spark.createDataFrame(rowList, schemata);
Create Hive Table
As explained above SparkSession
is used to create and query Hive tables. Note that in order to do this for testing you don’t need Hive to be installed. saveAsTable()
creates Hive managed table. Query the table using spark.sql()
.
// Create Hive table named my_hive_table & query the table.
spark.table("my_table").write.saveAsTable("my_hive_table")
val df = spark.sql("SELECT _1,_2 FROM my_hive_table")
# Create Hive table named my_hive_table & query the table.
spark.table("my_table").write.saveAsTable("my_hive_table")
df = spark.sql("SELECT _1,_2 FROM my_hive_table")
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Create Hive table named my_hive_table & query the table.
spark.table("my_table").write.saveAsTable("my_hive_table");
Dataset<Row> df = spark.sql("SELECT _1,_2 FROM my_hive_table");
Working With Catalogs
To get the catalog metadata, Spark Session exposes catalog variable. Note that these methods spark.catalog.listDatabases
and spark.catalog.listTables
and returns the DataSet.
val db_df = spark.catalog.listDatabases
val dt_df = spark.catalog.listTables
db_df = spark.catalog.listDatabases
dt_df = spark.catalog.listTables
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> db_df = spark.catalog().listDatabases();
Dataset<Row> dt_df = spark.catalog().listTables();
Frequently Used SparkSession Methods
method | Desrcription |
---|---|
version | Returns Spark version where your application is running, probably the Spark version your cluster is configured with. |
catalog | Returns the catalog object to access metadata. |
conf | Returns the RuntimeConfig object. |
builder() | builder() is used to create a new SparkSession, this return SparkSession.Builder |
newSession() | Creaetes a new SparkSession. |
range(n) | Returns a single column Dataset with LongType and column named id, containing elements in a range from 0 to n (exclusive) with |
createDataFrame() | This creates a DataFrame from a collection and an RDD |
createDataset() | This creates a Dataset from the collection, DataFrame, and RDD. |
emptyDataFrame() | Creates an empty DataFrame. |
emptyDataset() | Creates an empty Dataset. |
getActiveSession() | Returns an active Spark session for the current thread. |
getDefaultSession() | Returns the default SparkSession that is returned by the builder. |
implicits() | To access the nested Scala object. |
read() | Returns an instance of DataFrameReader class, this is used to read records from CSV, Parquet, Avro, and more file formats into |
readStream() | Returns an instance of DataStreamReader class, this is used to read streaming data. that can be used to read streaming data |
sparkContext() | Returns a SparkContext. |
sql(String sql) | Returns a DataFrame after executing the SQL mentioned. |
sqlContext() | Returns SQLContext. |
stop() | Stop the current SparkContext. |
table() | Returns a DataFrame of a table or view. |
udf() | Creates a Spark UDF to use it on DataFrame, Dataset, and SQL. |