Spark Memory Management

The main feature of apache spark is its ability to run computations in memory. Hence, it is obvious that memory management plays a very important role in the whole system. In this article we will dive into spark memory management.

It is very important for data engineers and developers to understand apache spark memory management to unleash the apache spark potential in processing high volume data.

In this article, we are considering Yarn as the cluster manager.

Apache Spark Memory Overview

While running spark application, there are many levels at which the memory should be managed. And it is important to monitor the memory at specified levels below,

  • Operating System
  • Spark Application
  • Cluster Manager (Yarn in our case)
  • JVM

When the spark application is launched, the spark cluster starts two processes

  • Spark Driver Process
  • Executor Process

Spark Memory Management
Spark Memory Management

As explained in spark architecture, the driver process is a master process responsible for creating the Spark Context, submission of Spark jobs, and the translation of the Spark pipeline into computational units called tasks. Driver process also coordinates task scheduling and orchestration on each executor process. Driver memory management is like the typical JVM process.

The executor process is responsible for performing specific computational tasks on the worker nodes and returns the results to the driver process, as well as providing storage for RDDs. And understanding the internal memory management of executor process is very crucial for running spark programs efficiently.

Yarn Container Execution

When we submit a Spark job to the cluster with Yarn, Yarn allocates containers to the executors to perform the job on different nodes.

Yarn ResourceManager handles memory requests and allocates executor container to the maximum allocation size settled by yarn.scheduler.maximum-allocation-mb config. Any memory requests higher than the specified value in yarn.scheduler.maximum-allocation-mb config will not take effect.

In the single node environment, scheduling is done by NodeManager. As NodeManager is limited by the resources of one node cluster, it has an upper limit of resources available to it. In Yarn the NodeManager memory limits can be set using yarn.nodemanager.resource.memory-mb config. It is the amount of physical memory per NodeManager (in MB), which can be allocated for yarn containers at the time of execution.

An ExecutorContainer is a JVM process. And the entire ExecutorContainer memory area is divided into three sections -

Spark Executor Memory
Spark Executor Memory

Heap memory

We can call the heap memory as Executor Memory.

Heap memory is the memory that should be known to every Spark developers. The heap memory size can be specified by --executor-memory during spark-submit or by setting spark.executor.memory. It’s the maximum JVM heap memory (Xmx) and the objects here are bound by the garbage collector(GC).

Off-heap memory

In Apache Spark it is possible to use off-heap storage for certain operations. And we can provide the size of the off-heap memory that will be used by your application.

Overhead memory

It is used for the various internal Spark overheads to execution spark pipelines.

Executor Heap Memory

In this section, let’s deep dive into Executor Heap Memory.

Detailed Executor Heap Memory

In apache spark, MemoryManager is an interface for memory management. StaticMemoryManager and UnifiedMemoryManager are two implementations for MemoryManager interface. StaticMemoryManager was used until spark version 1.6 and deprecated later. But StaticMemoryManager can still be used and can be configured with spark.memory.useLegacyMode parameter.

Default MemoryManager implementation since spark version 1.7 is UnifiedMemoryManager. We will be diving deep into UnifiedMemoryManager in this article.

Detialed Spark Executor Memory
Detialed Spark Executor Memory

The amount of memory available to for each executor can be managed using spark.executor.memory configuration. Apache Spark logically and physically divides spark.executor.memory, to use and manage more efficiently.

Spark Memory Blocks

Spark application physically groups the data in blocks. The grouped blocks are transferable objects and can be used as inputs to Spark tasks, as returned as outputs, they can also be used as intermediate steps in the shuffle process, and to store temporary files.

The BlockManager is the key-value store for blocks of data in Spark. BlockManager works as a local cache that runs on every node of the Spark application. The BlockManager manages the data in spark driver and executors.

The Blocks can be stored on disk or in memory either on-heap or off-heap, either locally or remotely, for some time. After that, they are evicted. More on eviction later in this article.

Blocks help achieve more concurrency. Suppose, if a given block can be extracted from 4 different executor nodes, it would be faster to extract it from them in parallel.

Reserved Memory

The reserved memory is the most boring part of the memory. Spark reserves reserved memory to store internal objects. It guarantees to reserve sufficient memory for the system even for small JVM heaps.

Reserved Memory is hard coded and equal to 300 MB by default (value RESERVED_SYSTEM_MEMORY_BYTES in source code). In the test environment (when spark.testing is set) we can modify it with spark.testing.reservedMemory.

Storage Memory

Storage Memory is used for caching and broadcasting data at the time of execution.

Storage Memory size can be found by

storageMemory = (usableMemory 
                  * spark.memory.fraction 
                  * spark.memory.storageFraction)

Storage Memory is 30% of all system memory by default. In our calculation it would be 1 * 0.6 * 0.5 = 0.3.

Execution Memory

Execution Memory is mainly used to store temporary data in the shuffle process, join, sort, aggregation of data, etc. Most likely, if the pipeline runs too long, the problem can be the lack of space.

executionMemory = (usableMemory 
                      * spark.memory.fraction 
                      * (1 - spark.memory.storageFraction))

As Storage Memory, Execution Memory is also equal to 30% of all system memory by default, our calculation would be 1 * 0.6 * (1 - 0.5) = 0.3.

In the implementation of UnifiedMemory, these two parts of memory can be borrowed from each other. Read the Dynamic occupancy mechanism to read more about specific borrowing mechanism.

User Memory

User memory is mainly used to store data needed for RDD conversion operations. You can store your own data structures in user memory which can be used inside transformations. It’s up to the developer what should be stored in this memory and how should be stored. Spark makes completely no accounting on what you do there.

userMemory = (usableMemory * (1 - spark.memory.fraction))

In our calculations, 1 * (1 - 0.6) = 0.4 which is 40% of available memory by default.

Dynamic Occupancy Mechanism

Execution and Storage have a shared memory. They can borrow it from each other. The process of sharing the memory is called the Dynamic occupancy mechanism.

There are two parts of the shared memory.

  • Storage Memory
  • Execution Memory

The shared Storage memory can be used up to a certain threshold. In the code, this threshold is called onHeapStorageRegionSize. But onHeapStorageRegionSize will be used only if it is not occupied by Execution memory. Storage memory has to wait for the used memory to be released by the executor processes. The default size of onHeapStorageRegionSize is all Storage Memory.

When Execution memory is not used, Storage can borrow as much Execution memory as available until execution reclaims its space. When this happens, the spark memory blocks will be evicted from memory until the sufficient borrowed memory is released to satisfy the Execution memory requests.

The creators of dynamic occupancy mechanism decided that Execution memory has priority over Storage memory. The reason being, execution of the task is more important than the cached data, the whole job can crash if there is an Out Of Memory Exception (OOME) in the execution.

If space on both sides is insufficient and within the appropriate boundaries, it is evicted according to their respective storage levels using the LRU algorithm.

The eviction process has an overhead. The cost of memory eviction depends the storage on disk. MEMORY_ONLY may be the most expensive because it has to be recalculated. whereas, MEMORY_AND_DISK_SER is the opposite. On this storage level, the format of data stored at runtime is compact and the overhead for serialization is low, and it only includes disk I/O operations.

Dynamic Occupancy Mechanism Image
Dynamic Occupancy Mechanism

Off-heap memory

Most operations happen entirely in on-heap memory and take the help of garbage collector, Spark also makes it possible to use off-heap storage for certain operations.

The Off heap memory does not bound to garbage collector but calls the Java API (sun.misc.Unsafe) for unsafe operations such as C which uses malloc() to use operating system memory. This way, Spark can directly operate the off-heap memory, reducing unnecessary memory overhead, frequent GC scanning, GC collection, and improving processing performance.

Worker Node Memory Image
Worker Node Memory

Direct memory handling can provide significant performance benefits, however direct memory handling requires careful management of these pieces of memory. This might not be desired or even possible in some deployment scenarios. It’s a good practice to restrict unsafe operations in the Java security manager config.

If off-heap memory is enabled, Executor will have both on-heap and off-heap memory. They are complementary to each other.

The Execution memory is the sum of on-heap Execution memory and off-heap Execution memory.

Execution memory = on-heap Execution memory + off-heap Execution memory

The same applies to the Storage memory.

Mostly, off-heap memory was done for Tungsten, to perform bookkeeping for memory that Tungsten may use. Read more about tungsten in the next section.

Tungsten

A goal of the project Tungsten is to enhance memory management and binary processing of applications. Tungsten uses custom Encoders and Decoders to represent JVM objects in a compact format to ensure high performance and low memory footprint in memory.

Even working in on-heap mode by default Tungsten tries to manage memory explicitly and eliminate the overhead of the JVM object model and GC. Tungsten in this mode really does allocate objects on the heap for its internal purposes, and those pieces of allocated memory can be huge, but this happens much less frequently and withstands GC short-lived generation.

If Tungsten is configured to use off-heap execution memory for allocating data, then all data page allocations must fit within this off-heap size limit. These are things that need to be carefully designed to allocate memory outside the JVM process.

This can cause some difficulties in container managers when you need to allow and plan for additional pieces of memory besides the JVM process configuration.

Note: Off-heap memory is disabled by default, but you can enable it with the spark.memory.offHeap.enabled parameter and set the memory size with the spark.memory.offHeap.size parameter. It has no effect on heap memory usage but make sure not to exceed your executor’s total limits.

Compared to on-heap memory, the off-heap memory model is relatively simple. It only includes Storage memory and Execution memory. Both Storage memory and Execution memory account for about 50% of all system memory by default. In UnifiedMemory management, these two parts can be borrowed from each other as we described earlier.

Overhead memory

When allocating ExecutorContainer in cluster mode, additional memory is also allocated for things like VM overheads, interned strings, other native overheads, etc. This memory is set using spark.executor.memoryOverhead configuration (or deprecated spark.yarn.executor.memoryOverhead). The default size is 10% of Executor memory with a minimum of 384 MB.

The overhead memory includes memory for PySpark executors when the spark.executor.pyspark.memory is not configured and memory used by other non-executable processes running in the same container.

From Spark 3.0, this memory does not include off-heap memory.

The overall memory is calculated using

val totalMemMiB =
      (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)

Task Memory Manager

Spark tasks never directly interact with the MemoryManager. TaskMemoryManager is used to manage the memory of individual tasks — acquire memory, release memory, and calculate memory allocation requested from the heap or outside the heap.

Without going into details of the implementation of this class, let us describe one of the problems it solves.

Memory Manager Image
Memory Manager

Tasks in Executor are executed in threads, and each thread shares JVM resources, that is, Execution memory. There is no strong isolation of memory resources between tasks because TaskMemoryManager shares the memory managed by the MemoryManager. Therefore, it is possible that the task that came first may take up a lot of memory, and the next task may hang due to lack of memory.

TaskMemoryManager via memory pools limit memory that can be allocated to each task to range from 1/(2 * n) to 1/n, where n is the number of tasks that are currently running. Therefore, more tasks running concurrently — less memory available to each of them.

Conclusion

We can customize Spark to your specific use case. Spark has a many config parameters that can be changed and can customize the processing of any data on any cluster efficiently.

Efficient memory usage is critical for good performance in Spark. It has its own internal model of memory management that is able to cope with its work with the default configuration.

Simply enabling dynamic resource allocation is also not sufficient. Data engineers also have to understand how executor memory is laid out and used by Spark so that executors are not starved of memory or troubled by JVM garbage collection.

Apache Spark 2.x introduced the second-generation Tungsten engine, which feature the whole-stage code generation and vectorized column-based memory layout. Built on ideas and techniques from modern compilers, this new version is also capitalized on modern CPUs and cache architectures for fast parallel data processing.

apache spark bigdata architecture memory jvm yarn heap off-heap distributed-system gc

Subscribe For More Content