Spark Memory Management
The main feature of apache spark is its ability to run computations in memory. Hence, it is obvious that memory management plays a very important role in the whole system. In this article we will dive into spark memory management.
It is very important for data engineers and developers to understand apache spark memory management to unleash the apache spark potential in processing high volume data.
In this article, we are considering Yarn
as the cluster manager.
Apache Spark Memory Overview
While running spark application, there are many levels at which the memory should be managed. And it is important to monitor the memory at specified levels below,
- Operating System
- Spark Application
- Cluster Manager (Yarn in our case)
- JVM
When the spark application is launched, the spark cluster starts two processes
- Spark Driver Process
- Executor Process
As explained in spark architecture, the driver process is a master process responsible for creating the Spark Context
, submission of Spark jobs, and the translation of the Spark pipeline into computational units called tasks. Driver process also coordinates task scheduling and orchestration on each executor
process. Driver memory management is like the typical JVM process.
The executor process is responsible for performing specific computational tasks on the worker nodes and returns the results to the driver process, as well as providing storage for RDDs
. And understanding the internal memory management of executor process is very crucial for running spark programs efficiently.
Yarn Container Execution
When we submit a Spark job to the cluster with Yarn, Yarn allocates containers to the executors to perform the job on different nodes.
Yarn ResourceManager
handles memory requests and allocates executor container to the maximum allocation size settled by yarn.scheduler.maximum-allocation-mb
config. Any memory requests higher than the specified value in yarn.scheduler.maximum-allocation-mb
config will not take effect.
In the single node environment, scheduling is done by NodeManager
. As NodeManager
is limited by the resources of one node cluster, it has an upper limit of resources available to it. In Yarn
the NodeManager
memory limits can be set using yarn.nodemanager.resource.memory-mb
config. It is the amount of physical memory per NodeManager
(in MB
), which can be allocated for yarn containers at the time of execution.
An ExecutorContainer
is a JVM process. And the entire ExecutorContainer
memory area is divided into three sections -
Heap memory
We can call the heap memory as Executor Memory.
Heap memory is the memory that should be known to every Spark developers. The heap memory size can be specified by --executor-memory
during spark-submit
or by setting spark.executor.memory
. It’s the maximum JVM heap memory (Xmx
) and the objects here are bound by the garbage collector(GC
).
Off-heap memory
In Apache Spark
it is possible to use off-heap storage for certain operations. And we can provide the size of the off-heap
memory that will be used by your application.
Overhead memory
It is used for the various internal Spark overheads to execution spark pipelines.
Executor Heap Memory
In this section, let’s deep dive into Executor Heap Memory.
Detailed Executor Heap Memory
In apache spark, MemoryManager
is an interface for memory management. StaticMemoryManager
and UnifiedMemoryManager
are two implementations for MemoryManager
interface. StaticMemoryManager
was used until spark version 1.6 and deprecated later. But StaticMemoryManager
can still be used and can be configured with spark.memory.useLegacyMode
parameter.
Default MemoryManager
implementation since spark version 1.7
is UnifiedMemoryManager
. We will be diving deep into UnifiedMemoryManager
in this article.
The amount of memory available to for each executor can be managed using spark.executor.memory
configuration. Apache Spark logically and physically divides spark.executor.memory
, to use and manage more efficiently.
Spark Memory Blocks
Spark application physically groups the data in blocks. The grouped blocks are transferable objects and can be used as inputs to Spark tasks, as returned as outputs, they can also be used as intermediate steps in the shuffle process, and to store temporary files.
The BlockManager
is the key-value store for blocks of data in Spark. BlockManager
works as a local cache that runs on every node of the Spark application. The BlockManager
manages the data in spark driver and executors.
The Blocks can be stored on disk or in memory either on-heap or off-heap, either locally or remotely, for some time. After that, they are evicted. More on eviction later in this article.
Blocks help achieve more concurrency. Suppose, if a given block can be extracted from 4 different executor nodes, it would be faster to extract it from them in parallel.
Reserved Memory
The reserved memory is the most boring part of the memory. Spark reserves reserved memory
to store internal objects. It guarantees to reserve sufficient memory for the system even for small JVM heaps.
Reserved Memory is hard coded and equal to 300 MB
by default (value RESERVED_SYSTEM_MEMORY_BYTES
in source code). In the test environment (when spark.testing
is set) we can modify it with spark.testing.reservedMemory
.
Storage Memory
Storage Memory is used for caching and broadcasting data at the time of execution.
Storage Memory size can be found by
storageMemory = (usableMemory
* spark.memory.fraction
* spark.memory.storageFraction)
Storage Memory is 30%
of all system memory by default. In our calculation it would be 1 * 0.6 * 0.5 = 0.3
.
Execution Memory
Execution Memory is mainly used to store temporary data in the shuffle process, join, sort, aggregation of data, etc. Most likely, if the pipeline runs too long, the problem can be the lack of space.
executionMemory = (usableMemory
* spark.memory.fraction
* (1 - spark.memory.storageFraction))
As Storage Memory
, Execution Memory
is also equal to 30%
of all system memory
by default, our calculation would be 1 * 0.6 * (1 - 0.5) = 0.3
.
In the implementation of UnifiedMemory
, these two parts of memory can be borrowed from each other. Read the Dynamic occupancy mechanism to read more about specific borrowing mechanism.
User Memory
User memory is mainly used to store data needed for RDD conversion operations. You can store your own data structures in user memory which can be used inside transformations. It’s up to the developer what should be stored in this memory and how should be stored. Spark makes completely no accounting on what you do there.
userMemory = (usableMemory * (1 - spark.memory.fraction))
In our calculations, 1 * (1 - 0.6) = 0.4
which is 40%
of available memory by default.
Dynamic Occupancy Mechanism
Execution and Storage have a shared memory. They can borrow it from each other. The process of sharing the memory is called the Dynamic occupancy mechanism.
There are two parts of the shared memory.
- Storage Memory
- Execution Memory
The shared Storage memory can be used up to a certain threshold. In the code, this threshold is called onHeapStorageRegionSize
. But onHeapStorageRegionSize
will be used only if it is not occupied by Execution memory
. Storage memory
has to wait for the used memory to be released by the executor
processes. The default size of onHeapStorageRegionSize
is all Storage Memory.
When Execution memory
is not used, Storage can borrow as much Execution memory
as available until execution reclaims its space. When this happens, the spark memory blocks will be evicted from memory until the sufficient borrowed memory is released to satisfy the Execution memory requests.
The creators of dynamic occupancy mechanism decided that Execution memory has priority over Storage memory. The reason being, execution of the task is more important than the cached data, the whole job can crash if there is an Out Of Memory Exception
(OOME
) in the execution.
If space on both sides is insufficient and within the appropriate boundaries, it is evicted according to their respective storage levels using the LRU algorithm.
The eviction process has an overhead. The cost of memory eviction depends the storage on disk. MEMORY_ONLY
may be the most expensive because it has to be recalculated. whereas, MEMORY_AND_DISK_SER
is the opposite. On this storage level, the format of data stored at runtime is compact and the overhead for serialization is low, and it only includes disk I/O
operations.
Off-heap memory
Most operations happen entirely in on-heap memory
and take the help of garbage collector, Spark also makes it possible to use off-heap storage for certain operations.
The Off heap
memory does not bound to garbage collector but calls the Java API (sun.misc.Unsafe
) for unsafe operations such as C
which uses malloc()
to use operating system memory. This way, Spark can directly operate the off-heap memory, reducing unnecessary memory overhead, frequent GC scanning, GC collection, and improving processing performance.
Direct memory handling can provide significant performance benefits, however direct memory handling requires careful management of these pieces of memory. This might not be desired or even possible in some deployment scenarios. It’s a good practice to restrict unsafe operations in the Java security manager config.
If off-heap memory is enabled, Executor will have both on-heap and off-heap memory. They are complementary to each other.
The Execution memory is the sum of on-heap Execution memory and off-heap Execution memory.
Execution memory = on-heap Execution memory + off-heap Execution memory
The same applies to the Storage memory.
Mostly, off-heap memory was done for Tungsten, to perform bookkeeping for memory that Tungsten may use. Read more about tungsten
in the next section.
Tungsten
A goal of the project Tungsten is to enhance memory management and binary processing of applications. Tungsten uses custom Encoders and Decoders to represent JVM objects in a compact format to ensure high performance and low memory footprint in memory.
Even working in on-heap mode by default Tungsten tries to manage memory explicitly and eliminate the overhead of the JVM object model and GC. Tungsten in this mode really does allocate objects on the heap for its internal purposes, and those pieces of allocated memory can be huge, but this happens much less frequently and withstands GC short-lived generation.
If Tungsten is configured to use off-heap execution memory for allocating data, then all data page allocations must fit within this off-heap size limit. These are things that need to be carefully designed to allocate memory outside the JVM process.
This can cause some difficulties in container managers when you need to allow and plan for additional pieces of memory besides the JVM process configuration.
Note:
Off-heap memory
is disabled by default, but you can enable it with thespark.memory.offHeap.enabled
parameter and set the memory size with thespark.memory.offHeap.size parameter
. It has no effect on heap memory usage but make sure not to exceed your executor’s total limits.Compared to on-heap memory, the off-heap memory model is relatively simple. It only includes Storage memory and Execution memory. Both Storage memory and Execution memory account for about
50%
of all system memory by default. InUnifiedMemory
management, these two parts can be borrowed from each other as we described earlier.
Overhead memory
When allocating ExecutorContainer
in cluster mode, additional memory is also allocated for things like VM overheads, interned strings, other native overheads, etc. This memory is set using spark.executor.memoryOverhead
configuration (or deprecated spark.yarn.executor.memoryOverhead
). The default size is 10%
of Executor memory with a minimum of 384 MB
.
The overhead memory includes memory for PySpark executors when the spark.executor.pyspark.memory
is not configured and memory used by other non-executable processes running in the same container.
From Spark 3.0, this memory does not include off-heap memory.
The overall memory is calculated using
val totalMemMiB =
(executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
Task Memory Manager
Spark tasks never directly interact with the MemoryManager
. TaskMemoryManager
is used to manage the memory of individual tasks — acquire memory, release memory, and calculate memory allocation requested from the heap or outside the heap.
Without going into details of the implementation of this class, let us describe one of the problems it solves.
Tasks in Executor are executed in threads, and each thread shares JVM resources, that is, Execution memory. There is no strong isolation of memory resources between tasks because TaskMemoryManager
shares the memory managed by the MemoryManager
. Therefore, it is possible that the task that came first may take up a lot of memory, and the next task may hang due to lack of memory.
TaskMemoryManager
via memory pools limit memory that can be allocated to each task to range from 1/(2 * n)
to 1/n
, where n
is the number of tasks that are currently running. Therefore, more tasks running concurrently — less memory available to each of them.
Conclusion
We can customize Spark to your specific use case. Spark has a many config parameters that can be changed and can customize the processing of any data on any cluster efficiently.
Efficient memory usage is critical for good performance in Spark. It has its own internal model of memory management that is able to cope with its work with the default configuration.
Simply enabling dynamic resource allocation is also not sufficient. Data engineers also have to understand how executor memory is laid out and used by Spark so that executors are not starved of memory or troubled by JVM garbage collection.
Apache Spark 2.x
introduced the second-generation Tungsten engine, which feature the whole-stage code generation and vectorized column-based memory layout
. Built on ideas and techniques from modern compilers, this new version is also capitalized on modern CPUs and cache architectures for fast parallel data processing.