Spark Memory Management
The main feature of apache spark is its ability to run computations in memory. Hence, it is obvious that memory management plays a very important role in the whole system. In this article we will dive into spark memory management.
It is very important for data engineers and developers to understand apache spark memory management to unleash the apache spark potential in processing high volume data.
In this article, we are considering
Yarn as the cluster manager.
Apache Spark Memory Overview
While running spark application, there are many levels at which the memory should be managed. And it is important to monitor the memory at specified levels below,
- Operating System
- Spark Application
- Cluster Manager (Yarn in our case)
When the spark application is launched, the spark cluster starts two processes
- Spark Driver Process
- Executor Process
As explained in spark architecture, the driver process is a master process responsible for creating the
Spark Context, submission of Spark jobs, and the translation of the Spark pipeline into computational units called tasks. Driver process also coordinates task scheduling and orchestration on each
executor process. Driver memory management is like the typical JVM process.
The executor process is responsible for performing specific computational tasks on the worker nodes and returns the results to the driver process, as well as providing storage for
RDDs. And understanding the internal memory management of executor process is very crucial for running spark programs efficiently.
Yarn Container Execution
When we submit a Spark job to the cluster with Yarn, Yarn allocates containers to the executors to perform the job on different nodes.
ResourceManager handles memory requests and allocates executor container to the maximum allocation size settled by
yarn.scheduler.maximum-allocation-mb config. Any memory requests higher than the specified value in
yarn.scheduler.maximum-allocation-mb config will not take effect.
In the single node environment, scheduling is done by
NodeManager is limited by the resources of one node cluster, it has an upper limit of resources available to it. In
NodeManager memory limits can be set using
yarn.nodemanager.resource.memory-mb config. It is the amount of physical memory per
MB), which can be allocated for yarn containers at the time of execution.
ExecutorContainer is a JVM process. And the entire
ExecutorContainer memory area is divided into three sections -
We can call the heap memory as Executor Memory.
Heap memory is the memory that should be known to every Spark developers. The heap memory size can be specified by
spark-submit or by setting
spark.executor.memory. It’s the maximum JVM heap memory (
Xmx) and the objects here are bound by the garbage collector(
Apache Spark it is possible to use off-heap storage for certain operations. And we can provide the size of the
off-heap memory that will be used by your application.
It is used for the various internal Spark overheads to execution spark pipelines.
Executor Heap Memory
In this section, let’s deep dive into Executor Heap Memory.
Detailed Executor Heap Memory
In apache spark,
MemoryManager is an interface for memory management.
UnifiedMemoryManager are two implementations for
StaticMemoryManager was used until spark version 1.6 and deprecated later. But
StaticMemoryManager can still be used and can be configured with
MemoryManager implementation since spark
version 1.7 is
UnifiedMemoryManager. We will be diving deep into
UnifiedMemoryManager in this article.
The amount of memory available to for each executor can be managed using
spark.executor.memory configuration. Apache Spark logically and physically divides
spark.executor.memory, to use and manage more efficiently.
Spark Memory Blocks
Spark application physically groups the data in blocks. The grouped blocks are transferable objects and can be used as inputs to Spark tasks, as returned as outputs, they can also be used as intermediate steps in the shuffle process, and to store temporary files.
BlockManager is the key-value store for blocks of data in Spark.
BlockManager works as a local cache that runs on every node of the Spark application. The
BlockManager manages the data in spark driver and executors.
The Blocks can be stored on disk or in memory either on-heap or off-heap, either locally or remotely, for some time. After that, they are evicted. More on eviction later in this article.
Blocks help achieve more concurrency. Suppose, if a given block can be extracted from 4 different executor nodes, it would be faster to extract it from them in parallel.
The reserved memory is the most boring part of the memory. Spark reserves
reserved memory to store internal objects. It guarantees to reserve sufficient memory for the system even for small JVM heaps.
Reserved Memory is hard coded and equal to
300 MB by default (value
RESERVED_SYSTEM_MEMORY_BYTES in source code). In the test environment (when
spark.testing is set) we can modify it with
Storage Memory is used for caching and broadcasting data at the time of execution.
Storage Memory size can be found by
storageMemory = (usableMemory * spark.memory.fraction * spark.memory.storageFraction)
Storage Memory is
30% of all system memory by default. In our calculation it would be
1 * 0.6 * 0.5 = 0.3.
Execution Memory is mainly used to store temporary data in the shuffle process, join, sort, aggregation of data, etc. Most likely, if the pipeline runs too long, the problem can be the lack of space.
executionMemory = (usableMemory * spark.memory.fraction * (1 - spark.memory.storageFraction))
Execution Memory is also equal to
30% of all
system memory by default, our calculation would be
1 * 0.6 * (1 - 0.5) = 0.3.
In the implementation of
UnifiedMemory, these two parts of memory can be borrowed from each other. Read the Dynamic occupancy mechanism to read more about specific borrowing mechanism.
User memory is mainly used to store data needed for RDD conversion operations. You can store your own data structures in user memory which can be used inside transformations. It’s up to the developer what should be stored in this memory and how should be stored. Spark makes completely no accounting on what you do there.
userMemory = (usableMemory * (1 - spark.memory.fraction))
In our calculations,
1 * (1 - 0.6) = 0.4 which is
40% of available memory by default.
Dynamic Occupancy Mechanism
Execution and Storage have a shared memory. They can borrow it from each other. The process of sharing the memory is called the Dynamic occupancy mechanism.
There are two parts of the shared memory.
- Storage Memory
- Execution Memory
The shared Storage memory can be used up to a certain threshold. In the code, this threshold is called
onHeapStorageRegionSize will be used only if it is not occupied by
Storage memory has to wait for the used memory to be released by the
executor processes. The default size of
onHeapStorageRegionSize is all Storage Memory.
Execution memory is not used, Storage can borrow as much
Execution memory as available until execution reclaims its space. When this happens, the spark memory blocks will be evicted from memory until the sufficient borrowed memory is released to satisfy the Execution memory requests.
The creators of dynamic occupancy mechanism decided that Execution memory has priority over Storage memory. The reason being, execution of the task is more important than the cached data, the whole job can crash if there is an
Out Of Memory Exception (
OOME) in the execution.
If space on both sides is insufficient and within the appropriate boundaries, it is evicted according to their respective storage levels using the LRU algorithm.
The eviction process has an overhead. The cost of memory eviction depends the storage on disk.
MEMORY_ONLY may be the most expensive because it has to be recalculated. whereas,
MEMORY_AND_DISK_SER is the opposite. On this storage level, the format of data stored at runtime is compact and the overhead for serialization is low, and it only includes disk
Most operations happen entirely in
on-heap memory and take the help of garbage collector, Spark also makes it possible to use off-heap storage for certain operations.
Off heap memory does not bound to garbage collector but calls the Java API (
sun.misc.Unsafe) for unsafe operations such as
C which uses
malloc() to use operating system memory. This way, Spark can directly operate the off-heap memory, reducing unnecessary memory overhead, frequent GC scanning, GC collection, and improving processing performance.
Direct memory handling can provide significant performance benefits, however direct memory handling requires careful management of these pieces of memory. This might not be desired or even possible in some deployment scenarios. It’s a good practice to restrict unsafe operations in the Java security manager config.
If off-heap memory is enabled, Executor will have both on-heap and off-heap memory. They are complementary to each other.
The Execution memory is the sum of on-heap Execution memory and off-heap Execution memory.
Execution memory = on-heap Execution memory + off-heap Execution memory
The same applies to the Storage memory.
Mostly, off-heap memory was done for Tungsten, to perform bookkeeping for memory that Tungsten may use. Read more about
tungsten in the next section.
A goal of the project Tungsten is to enhance memory management and binary processing of applications. Tungsten uses custom Encoders and Decoders to represent JVM objects in a compact format to ensure high performance and low memory footprint in memory.
Even working in on-heap mode by default Tungsten tries to manage memory explicitly and eliminate the overhead of the JVM object model and GC. Tungsten in this mode really does allocate objects on the heap for its internal purposes, and those pieces of allocated memory can be huge, but this happens much less frequently and withstands GC short-lived generation.
If Tungsten is configured to use off-heap execution memory for allocating data, then all data page allocations must fit within this off-heap size limit. These are things that need to be carefully designed to allocate memory outside the JVM process.
This can cause some difficulties in container managers when you need to allow and plan for additional pieces of memory besides the JVM process configuration.
Off-heap memoryis disabled by default, but you can enable it with the
spark.memory.offHeap.enabledparameter and set the memory size with the
spark.memory.offHeap.size parameter. It has no effect on heap memory usage but make sure not to exceed your executor’s total limits.
Compared to on-heap memory, the off-heap memory model is relatively simple. It only includes Storage memory and Execution memory. Both Storage memory and Execution memory account for about
50%of all system memory by default. In
UnifiedMemorymanagement, these two parts can be borrowed from each other as we described earlier.
ExecutorContainer in cluster mode, additional memory is also allocated for things like VM overheads, interned strings, other native overheads, etc. This memory is set using
spark.executor.memoryOverhead configuration (or deprecated
spark.yarn.executor.memoryOverhead). The default size is
10% of Executor memory with a minimum of
The overhead memory includes memory for PySpark executors when the
spark.executor.pyspark.memory is not configured and memory used by other non-executable processes running in the same container.
From Spark 3.0, this memory does not include off-heap memory.
The overall memory is calculated using
val totalMemMiB = (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
Task Memory Manager
Spark tasks never directly interact with the
TaskMemoryManager is used to manage the memory of individual tasks — acquire memory, release memory, and calculate memory allocation requested from the heap or outside the heap.
Without going into details of the implementation of this class, let us describe one of the problems it solves.
Tasks in Executor are executed in threads, and each thread shares JVM resources, that is, Execution memory. There is no strong isolation of memory resources between tasks because
TaskMemoryManager shares the memory managed by the
MemoryManager. Therefore, it is possible that the task that came first may take up a lot of memory, and the next task may hang due to lack of memory.
TaskMemoryManager via memory pools limit memory that can be allocated to each task to range from
1/(2 * n) to
n is the number of tasks that are currently running. Therefore, more tasks running concurrently — less memory available to each of them.
We can customize Spark to your specific use case. Spark has a many config parameters that can be changed and can customize the processing of any data on any cluster efficiently.
Efficient memory usage is critical for good performance in Spark. It has its own internal model of memory management that is able to cope with its work with the default configuration.
Simply enabling dynamic resource allocation is also not sufficient. Data engineers also have to understand how executor memory is laid out and used by Spark so that executors are not starved of memory or troubled by JVM garbage collection.
Apache Spark 2.x introduced the second-generation Tungsten engine, which feature the whole-stage code generation and
vectorized column-based memory layout. Built on ideas and techniques from modern compilers, this new version is also capitalized on modern CPUs and cache architectures for fast parallel data processing.