This site uses cookies to offer you a better browsing experience. Find out more on Privacy Policy.

Spark Memory Management Part 2 – Push It to the Limits

June 27, 2017, Norbert Kozłowski

In Spark Memory Management Part 1 – Push it to the Limits, I mentioned that memory plays a crucial role in Big Data applications.

This article analyses a few popular memory contentions and describes how Apache Spark handles them.

Contention #1: Execution and Storage

The following section deals with the problem of choosing the correct sizes of execution and storage regions within an executor’s process.

Static assignment

From Spark 1.0, May 2014

The first approach to this problem involved using fixed execution and storage sizes. The problem with this approach is that when we run out of memory in a certain region (even though there is plenty of it
available in the other
) it starts to spill into the disk – which is obviously bad for the performance.

Caching is expressed in terms of blocks so when we run out of storage memory Spark evicts the LRU (“least recently used”) block to the disk.

To use this method, the user is advised to adjust many parameters, which increase the overall complexity of the application.

Unified memory management

From Spark 1.6+, Jan 2016

Instead of expressing execution and storage in two separate chunks, Spark can use one unified region (M), which they both share. When execution memory is not used, storage can acquire all
the available memory and vice versa. Execution may evict storage if necessary, but only as long as the total storage memory usage falls under a certain threshold (R).

R is the storage space within M where cached blocks are immune to being evicted by the execution – you can specify this with a certain property.

In other words, R describes a subregion within M where cached blocks are never evicted – meaning that storage cannot evict execution due to complications in the implementation. This solution
tends to work as expected and it is used by default in current Spark releases.

Properties

Static assignment:

  • spark.memory.useLegacyMode – the option to divide heap space into fixed-size regions (default false)
  • spark.shuffle.memoryFraction – the fraction of the heap used for aggregation and cogroup during shuffles. Works only if spark.memory.useLegacyMode=true (default 0.2)
  • spark.storage.memoryFraction – the fraction of the heap used for Spark’s memory cache. Works only if spark.memory.useLegacyMode=true (default 0.6)
  • spark.storage.unrollFraction – the fraction of spark.storage.memoryFraction used for unrolling blocks in the memory. This is dynamically allocated by dropping existing blocks when
    there is not enough free storage space to unroll the new block in its entirety. Works only if spark.memory.useLegacyMode=true (default 0.2).

Unified memory management:

  • spark.memory.storageFraction – expresses the size of R as a fraction of M. The higher it is, the less working memory may be available for execution and tasks may spill into
    the disk more often (default 0.5).

Contention #2: Tasks running in parallel

In this case, we are referring to the tasks running within a single thread and competing for the executor’s resources.

Static assignment

From Spark 1.0+, May 2014

The user specifies the maximum amount of resources for a fixed number of tasks (N) that will be shared amongst them equally. The problem is that very often not all of the available resources are used which
does not lead to optimal performance.

Dynamic assignment

From Spark 1.0+, May 2014

The amount of resources allocated to each task depends on a number of actively running tasks (N changes dynamically). This option provides a good solution to dealing with “stragglers”, (which
are the last running tasks resulting from skews in the partitions).

Properties

There are no tuning possibilities – the dynamic assignment is used by default.

Contention #3: Operators running within the same task

After running a query (such as aggregation), Spark creates an internal query plan (consisting of operators such as scan, aggregate, sort, etc.), which occurs
within one task. Here, there is also a need to distribute available task memory between each of them.

We assume that each task has a certain number of memory pages (the size of each page does not matter).

A page for each operator

Each operator reserves one page of memory – this is simple but not optimal. This obviously poses problems for a larger number of operators, (or highly complex operators such as aggregate).

Cooperative spilling

From Spark 1.6+, Jan 2016

Operators negotiate the need for pages with each other (dynamically) during task execution.

Properties

There are no tuning possibilities – cooperative spilling is used by default.

Project Tungsten

Project Tungsten is a Spark SQL component, which makes operations more efficient by working directly at the byte level.

This function became default in Spark 1.5 and can be enabled in earlier versions by setting spark.sql.tungsten.enabled=true. It is optimised for hardware architecture and works for all available interfaces (SQL, Python, Java/Scala, R) by using the DataFrame abstraction.

Even when Tungsten is disabled, Spark still tries to minimise memory overhead by using the columnar storage format and Kryo serialisation.

Underneath, Tungsten uses encoders/decoders to represent JVM objects as highly specialised Spark SQL Types objects, which can then be serialised and operated on in a highly performant way (efficient and GC-friendly).

Some improvements include:

  • storing data in binary row format – reduces the overall memory footprint
  • no need for serialisation and deserialisation – the row is already serialised
  • cache aware computation; (layout records are kept in the memory, which is more conducive to a higher L1, L2, and L3 cache hit rate).

The take-away checklist

Below there is a brief checklist worth considering when dealing with performance issues:

  • Are my cached RDDs’ partitions being evicted and rebuilt over time (check in Spark’s UI)?
  • Is the GC phase taking too long (maybe it would be better to use off-heap memory)?
  • Maybe there is too much unused user memory (adjust it with the spark.memory.fraction property)?
  • Is data stored in DataFrames (allowing Tungsten optimisations to take place)?
  • Is there any data skew (tune the partitioning within the app)?

Sources

Last posts