Thursday, May 4, 2023

CPU Load Testing Exercises: Tools and Analysis for Oracle Database Servers

This document describes some basic CPU load testing exercises on three different types of database servers used by the Oracle Service at CERN. It reports on the tests performed, tools used for data gathering, data analysis, findings, and lessons learned.

Motivations

CPU usage is important for data processing: We observe that workloads on Oracle database services at CERN are often CPU-bound. Database workloads for transactional processing perform many random read operations. In the past, this mostly stressed the I/O subsystem, these days we deploy databases with large buffer caches (400 GB or more of data block caches) and most operations are CPU bound, reading data from buffer cache.

Server consolidation, quality of service and licensing: We deploy on commodity HW considering various constraints: striking a balance between consolidating workloads and isolating critical workloads from different users’ communities. Moreover, Oracle licensing costs, which are proportional to the deployed CPUs, play a key input in the efforts streamlining the CPU deployments across the DB service. 


Description and limitations of the tests

The tests reported here are extremely limited in scope, as they focus only on CPU performance and with two specific and “narrow” workloads. However, I believe they provide some indications on the behavior of the server CPU performance and the overall CPU capacity of the installed servers. The comparison between three different server models is the original motivation of this work as we wanted to understand how newer model can be deployed to replace old ones. This work is not a benchmark of the tested systems. 


Tools used for load testing

The first workload generator and testing tool is a simple script burning CPU cycles in a loop and executed using multiple workers running in parallel, two implementations have been used, one in Python and one in Rust compiled to binary. Both provide similar results.

The second workload generator is SLOB a tool that runs on top of Oracle databases for testing and specifically stresses “Logical IO”, that is reading blocks from the Oracle buffer cache (memory).

Links to the code, measured data, and data analyses using notebooks:

CPU load testing kit - Python version

Kit for load testing and measuring CPU-intensive workloads, Python version.

CPU load testing kit - Rust version

Kit for Load testing and measuring CPU-intensive workloads, Rust version.

Oracle CPU load testing using SLOB

Load testing Oracle using the SLOB test kit.



Key findings

-          RAC55 is the newest server model of the three tested and shows the highest CPU per-thread performance and highest CPU total throughput at saturation.

-          RAC55 has about 2.0x single-thread performance increase compared to RAC52.

-          RAC55 has about 1.5x single-thread performance increase over RAC54, but this is valid only for low load, as RAC54 has only 8 physical cores vs 16 cores in RAC55. Moreover, RAC54 provides considerably less total CPU throughput compared to RAC52 and RAC55.

-          RAC55 has about 2.0x more total CPU throughput at saturation compared to RAC52 despite having only 16 physical cores compared to 20 physical cores in RAC52.

 

Description of the platforms

CPU load tests have been performed on three dedicated test servers representative of the production database servers in March 2023: RAC52, RAC54, and RAC55.

The servers were installed with RHEL 7.9 and Oracle tests used Oracle 19c (v. 19.17). We omit the configuration of networking and I/O, as not relevant for these tests. We don't report the exact CPU models in this doc.

 

RAC52 configuration:

  • 20 physical cores (2 sockets, 10 physical cores each), 40 logical cores visible on the OS due to hyperthreading
  • CPU nominal frequency: 2.20 GHz
  • CPU from 2016, L1 caches: 32K + 32K, L2 cache 256K, L3 cache 25600K
  • RAM: DDR4, 512 GB


RAC54 configuration:

  • 8 physical cores (2 sockets, 4 physical cores each), 16 logical cores visible on the OS due to hyperthreading
  • CPU nominal frequency: 3.80 GHz
  • CPU from 2019, L1 caches: 32K + 32K, L2 cache 1024K, L3 cache 16896K
  • RAM: DDR4, 768 GB


RAC55 configuration:

  • 16 physical cores (2 sockets, 8 physical cores each), 32 logical cores visible on the OS due to hyperthreading
  • CPU nominal frequency: 3.7 GHz
  • CPU from 2019, L1 caches: 32K + 32K, L2 cache 512K, L3 cache 32768K
  • RAM: DDR4, 1 TB



Test 1 – Concurrent workers burning CPU cycles in a loop and in parallel

 

The workload generator and testing tool is a simple Python script burning CPU cycles in a loop.

The script is executed running on a configurable number of concurrent workers. The script measures the time spent executing a simple CPU-burning loop.

This provides a simple way to generate CPU load on the system.

Example of how the data was collected with the testing tool written in Rust and compiled to binary:

./test_cpu_parallel --num_workers 8 --full --output myout.csv


See the code at this link 

The advantage of this approach is that the testing tool is easy to write and can be easily automated.

The weak point of testing this way is that the test workload is somewhat “artificial” and disconnected with the server actual purpose as a DB server. For example, the CPU-burning loop used for this test is mostly instruction-intensive on the CPU and does not spend much time on memory access.

 

Measurements and results:

See also Data and Notebooks at this link

The following figures represent the same data in different ways to highlight different performance and scalability characteristics.


Figure 1 – Raw data

-          The figure reports the testing job execution time, measured for varying server load on the three tested servers.

-          A common pattern is that at low load (see data with just a few parallel workers) the job run time is almost constant.

-          An important difference is that the job run time is different on the different platforms, in order of increasing performance: RAC52, RAC54, RAC55 (the newest server and the fastest).

-          Another pattern is that the job running starts to increase linearly at higher load.

-          The job execution time curve starts to bend upwards as the load increases. Typically, we see this happening when the num of parallel workers is greater than the number of physical cores on the server (20 cores on RAC52, 8 cores on RAC54 and 16 cores on RAC55)

 

Figure 2 - Speed

-          This plot reports the number of jobs per minute per worker

-          Data points can be interpreted as a measure of the “speed of the CPU” for a new job coming into the system given a defined system load

-          We see that the “effective CPU speed” decreases as the load increases, with sudden changes at the points where the number of parallel workers is equal to the number of physical cores

-          The CPU speed per thread is also different depending on the CPU architecture, in order of increasing performance: RAC52, RAC54, RAC55 (the newest server and the fastest).


Figure 3 - Capacity

-          This plot shows the number of jobs executed per minute summed over all the running worker threads.

-          As the load increases the server capacity increases, reaching a maximum value at number of workers = number of logical cores (40 for RAC52, 8 for RAC54, 32 for RAC55)

-          This allows to compare the “Total CPU capacity” of the three servers. In order of increasing capacity: lowest capacity with RAC54 (the server with fewer cores), then RAC52, finally RAC55 has the highest CPU throughput (it’s the newest server)


Figure 4 - Scalability

-          This shows the speedup, a measure of scalability. For the scope of this plot, speedup is calculated as N * (job execution time at load n) / (job execution time at load 1)

-          We see almost linear scalability for low loads (up to the number of physical cores), then a slower increase up to the number of logical cores, and, eventually, the speedup reaches saturation

-          RAC54 and RAC55 appear to scale almost linearly up to the number of physical cores (respectively 8 and 16)


Notes:

-          Of the tested servers RAC55 appears the fastest on per-thread CPU performance at low and high loads and the one with higher CPU capacity.

-          The difference in performance between RAC52 (oldest) and RAC55 (newest) is roughly x1.5 in per-CPU thread performance and x2 in overall CPU capacity at high load.

-          RAC54 performs similarly to RAC55 but only at low loads (<= 8 concurrent workers)

 

Test 2 – Parallel workers running “SLOB tests”, measuring Oracle logical IO throughput

 

The second workload generator is SLOB, a tool by Kevin Closson, that runs on top of Oracle databases for load testing and specifically stresses Physical and Logical IO. In the configuration used for these tests we only stressed Logical IO, that is accessing blocks from the Oracle buffer cache (memory).

The tool creates test tables on the database and performs block IO reading from the test tables with a tunable number of concurrent workers.

See also the official SLOB page.

The Oracle database used for testing was configured with a large SGA (Oracle’s shared memory area),  able to cache the test tables in the Oracle buffer cache. The test workload was therefore stressing Oracle’s “Logical IO”, that is the part of the Oracle internal code that takes care of reading data blocks from the buffer cache (Oracle’s consistent read operations). Minimal additional operations were performed on the blocks accessed this way by SLOB. Logical IO is an operation with duration of the order of 1 microsecond during these tests. Notably, it includes time for Oracle-internals operations and serialization needed for reading data from the Oracle buffer cache, and finally the Oracle-internals code for reading the block header. The SLOB Logical IO test workload does not appear to saturate the CPU-memory channel on the tested servers (for example on RAC55, using OS tools we could see the CPU-memory bandwidth being utilized during SLOB logical I/O tests went up to 40 GB/s, while using memory-intensive load generators the system could scale at least up to 220 GB/s,see also Tools_Linux_Memory_Perf_Measure.

Standard Oracle instrumentation, notably SQL*Plus (Oracle’ CLI tool) and AWR reports (Oracle’s performance report), was used to collect the number of Logical I/Os recorded during the test and for other sanity checks, notably checking that the test workload was fully reading from memory rather than performing Physical I/O.

SLOB test tables have been created with size 16 GB per user, each concurrent worker running on a dedicated user. Tests for load higher than 16 concurrent users have used 8 GB test tables per user. The Oracle buffer cache for testing was allocated using Linux large pages, as it is recommended by Oracle, and was 400 GB in size, therefore large enough to cache all the test data. We took care of caching Oracle tables' data with a couple of executions of the test workload before starting to measure Logical I/O performance. The fact that Oracle was not performing Physical I/O during the tests was also double-checked using monitoring tools and by inspection of the AWR reports.

The advantage of this approach is that this is stressing the system on a key operation for the Oracle DB workloads that are CPU-bound: Oracle RDBMS Logical I/O.

These tests take longer to run the simple “Tests with a CPU-burning script” described above, moreover they require some Oracle DBA expertise to configure and validate the test results.

 

Measurements and results:

The following figures represent the same data in different ways to highlight different performance and scalability characteristics.

Data with the graphs on Notebooks at this link

 

Figure 5 – Raw Data and Capacity

-          The figure shows how the cumulative Oracle logical IO throughput increases with the number of parallel workers for the three servers tested.

-          The common trend is that the Logical IO throughput increases with load up to the number of logical CPUs (16 for RAC54, 32 for RAC55, and 40 for RAC52).

-          Measurements are “noisy” so we should take about 10% as the error margin on the collected data points.

-          There are differences in performance and total throughput with RAC55 being the most performance and with the highest throughput.

-          At low load, (<= 8 concurrent processes) RAC54 and RAC55 have similar performance.

-          At high load, RAC55 has about 20% more capacity/throughput of Logical IO than RAC52.

 

Figure 6 - Speed

-          The figure shows Oracle logical IO throughput per worker as function of load.

-          The performance of logical IOs decays with increasing load.

-          Logical IO performance appears close to constant up to the number of physical cores of the server (8 for RAC54, 16 for RAC55 and 20 for RAC52) and then decays for higher load, saturating when the number of logical cores is reached.

-          Measurements are “noisy” so we should take about 10% as the error margin on the collected data points.

-          RAC55 shows the highest performance overall for Logical I/O throughput.

-          At load below 16 concurrent workers, RAC55 appears 1.5x faster than RAC52, the gap closes to about 20% at high load.


Figure 7 - Scalability

-          The figure shows speedup as function of load.

-          The figure shows the speedup, a measure of scalability, for this plot it’s calculated as the ratio of (cumulative Logical I/O at load n) / (cumulative Logical I/O at load 1)

o   Linear scalability would be represented by a line with speedup = number of parallel workers.

-          A general trend observed in the data is that the scalability curves start close to the ideal linear scalability and then bend downwards due to contention.

-          RAC55 has the better scalability behavior of the three servers tested. At low load (less than 8 concurrent workers) RAC55 and RAC54 have similar behavior.

  

Conclusions

This work collects a few tests and measurements on stress testing and CPU loading on three different platforms of interest for the CERN Oracle database service.

The tests performed are narrow in scope, just addressing the CPU load.

Two different testing tools have been used for these tests: testing with a simple CPU-burning script loop run in parallel, and testing with an Oracle-specific workload generator for Logical I/O. 

The tools used, as well as the measured data and their analyses, are available at this link.

We find that the newest server model (RAC55) has the highest CPU per-core performance, scalability, and overall CPU throughput.

This work has been done in the context of the CERN databases and analytics services and the ATLAS data engineering efforts, many thanks to my colleagues for their help and suggestions.

Thursday, February 23, 2023

Introduction to Spark APIs for Data Processing

Introduction to Apache Spark APIs for Data Processing

This is a self-paced and open introduction course to Apache Spark. Theory and demos cover the main Spark APIs: DataFrame API, Spark SQL, Streaming, Machine Learning. You will also learn how to deploy Spark on CERN computing resources, notably using the CERN SWAN service. Most tutorials and exercises are in Python and run on Jupyter notebooks.

The main course website can be found at https://sparktraining.web.cern.ch/

Apache Spark is a popular engine for data processing at scale. Spark provides an expressive API and a scalable engine that integrates very well with the Hadoop ecosystem as well as with Cloud resources. Spark is currently used by several projects at CERN, notably by IT monitoring, by the security team, by the BE NXCALS project, by teams in ATLAS and CMS. Moreover, Spark is integrated with the CERN Hadoop service, the CERN Cloud service, and the CERN SWAN web notebooks service.

  

Accompanying notebooks

·       Get the notebooks from:

o   https://github.com/cerndb/SparkTraining

o   https://gitlab.cern.ch/db/SparkTraining

·     How to run the notebooks:

o   CERN SWAN (recommended option):

See also the SWAN gallery and the video:

o   Colab , Binder

o   Your local/private Jupyter notebook

   

Course lectures and tutorials

·       Introduction and objectives: slides and video

          


·       Session 1: Apache Spark fundamentals

o   Lecture “Spark architecture and intro to DataFrames”: slides and video

Graphical user interface, diagram

Description automatically generated

o   Notebooks:

o   Tutorial on DataFrames with exercisesvideo Icon

Description automatically generated

o   Solutions to the exercises

·       Session 2: Working with Spark DataFrames and SQL

o   Lecture “Introduction to Spark SQL”: slides and video



o   Notebooks:

o   Tutorial on Spark SQLvideo Icon

Description automatically generated

o   Exercises on Spark SQL

o   Solutions to the exercises

·       Session 3: Building on top of the DataFrame API

o   Lecture “Spark as a Data Platform”: slides and video



o   Lecture “Spark Streaming”: slides and video



o   Lecture “Spark and Machine Learning”: slides and video



o   Notebooks:

o   Tutorial on Spark Streamingvideo Icon

Description automatically generated

o   Tutorial on Spark Machine Learning – regression taskvideo Icon

Description automatically generated

o   Tutorial on Spark Machine Learning – classification task with the Higgs dataset

o   Demo of the Spark JDBC data source how to read Oracle tables from Spark

o   Note on Spark and Parquet format

·       Session 4: How to scale out Spark jobs

o   Lecture “Running Spark on CERN resources”: slides and video



o   Notebooks:

o   Demo on using SWAN with Spark on Hadoopvideo Icon

Description automatically generated

o   Demo of Spark processing Physics data using CERN private Cloud resourcesvideo Icon

Description automatically generated

o   Example notebook for the NXCALS project

 

·       Bonus material:

o   How to monitor Spark execution: slides and video Icon

Description automatically generated

o   Spark as a library, examples of how to use Spark in Scala and Python programs: code and video Icon

Description automatically generated

o   Next steps: reading material and links, miscellaneous Spark notes

 

·       Read and watch at your pace:

o   Download the course material for offline use:
 slides.zip, github_repo.zip, videos.zip

o   Watch the videos on YouTube Logo, icon

Description automatically generated


Acknowledgements and feedback

Author and contact for feedback and questions: Luca Canali - Luca.Canali@cern.ch

CERN-IT Spark and data analytics services

Former contributors: Riccardo Castellotti, Prasanth Kothuri

Many thanks to CERN Technical Training for their collaboration and support

 

License: CC BY-SA 4.0

Published in November 2022

Reposted from https://sparktraining.web.cern.ch/

 

Monday, May 23, 2022

Making histograms with Apache Spark and other SQL engines

Topic: This post will show you how to generate histograms using Apache Spark. You will find examples using the Spark DataFrame API and with a custom helper package, SparkHistogram. Additional examples will extend the work to histogram generation for several other databases and SQL engines.  

Disambiguation: we refer here to computing histograms for data analysis, rather than histograms of table columns or statistics used by cost- based optimizers.

  

Why histograms with Apache Spark?

Histograms are common tools for data analysis and are a key element in most High Energy Physics analyses. See also the post Can High Energy Physics Analysis Profit from Apache Spark APIs?

The advantage of generating histograms using Apache Spark, or other distributed data engines, is that the computation can be run at scale, with higher bandwidth to the data. This is useful if you have large datasets, for example, datasets that require distributed computing as they cannot be timely computed by one machine.

When handling smaller amounts of data, however, you can evaluate the alternative of just processing filters and map functions at scale, then fetching all the results into the driver, and finally using state-of-the-art libraries to generate histograms, such as Pandas histogram or numpy histogram or boost-histogram.

 

Vanilla solution: Spark's native histogram_numeric function

Spark has a DataFrame aggregate function for generating approximate histograms, histogram_numeric, since Spark version 3.3.0 (see SPARK-16280). There are a few implementation details and limitations to keep in mind when using histogram_numeric:

  • it produces as output an array of (x,y) pairs representing the center of the histogram bins and their corresponding value.
  • bins don't have a uniform size
  • the result is an approximate calculation
  • when using a large number of bins (e.g. more than 1000 bins) the histogram_numeric can become quite slow

See also this link for an example of how to use histogram_numeric.

Given the limitations of histogram_numeric, we have developed a different solution based on the DataFrame API (see next paragraph).

  

An improved solution: reduce boilerplate code with SparkHistogram

It is easy to implement some basic histogram generation using the DataFrame API or Spark SQL. For a few simple cases, a wrapper around  the width_bucket function can do the job. Width_bucket is a common function in many SQL engines including Apache Spark since version 3.1.0.

A simple expression for computing the histogram works by mapping each data value into a bucket and then aggregating the values in each bucket using the count function, as in this example:

  
hist = (df
.selectExpr("width_bucket(column_name, min_val, max_val, num_bins) as bucket")
.groupBy("bucket")
.count() )
  
The implementation is straightforward, however, additional code is needed to make it more useful in practice: we need to take care of buckets with no elements, and of  computing the data value to assign to each bucket. The resulting expression can be found, for example, in the code of the computeHistogram function. 

The SparkHistogram package is built with the idea of reducing boilerplate code and contains helper functions for generating frequency histograms and also a close variant of it, weighted histograms. Computing histograms with SparkHistogram becomes simply:


from sparkhistogram import computeHistogram

hist = computeHistogram(df, f"{data_column}", min_val, max_val, num_bins)

# or, in alternative:
hist = df.transform(computeHistogram, f"{data_column}", min_val, max_val, num_bins)
  

More information on the SparkHistogram package for Python and Scala at:

     
   

Examples

  
Jupyter notebooks showing how to generate histograms using PySpark and SparkHistogram (see further in this post for Spark SQL examples):

Additional examples in the context of Physics analysis:
     
This histogram has been generated using ATLAS Open Data collected at the LHC at CERN and processed using PySpark and the SparkHistogram package.

   

Extend the work on histogram generation to more SQL engines


You can also use SQL to generate your histograms. The following examples work with minor modifications across different data/database systems and can be easily extended to run on all SQL engines that implement the width_bucket function. Example notebooks:
For more complex histogram use cases with Spark see also Histogrammar
  
This work has been done in the context of the CERN databases and analytics services and the ATLAS data engineering efforts. Additional thanks to Jim Pivarski for discussions.
  

Thursday, March 10, 2022

Can High Energy Physics Analysis Profit from Apache Spark APIs?

We are in a golden age for distributed data processing, with an abundance of tools and solutions emerging from industry and open source. High Energy Physics (HEP) experiments at the LHC stand to profit from all this progress, as they are data-intensive operations with several hundreds of Petabytes of data to collect and process.

This post collects a few examples of code and open data, where Apache Spark, a very popular tool in industry and open source, is used for a few simple HEP data analyses. This post aims to be a general overview both for physicists wanting to know more about what Spark can do, and for data scientists wanting to get a feeling of what a HEP data analysis looks like.
The code and discussion here are proposed as a technology exploration and do not reflect any particular official activity by an experiment team.
This post comes with a series of notebooks at this link.

TLDR; Apache Spark (PySpark) APIs can easily be used for simple HEP analysis tasks, for example running the analysis in notebook environments, and profiting of a cluster infrastructure for computing power. Complex analyses can be challenging to implement and often require to develop UDF (user defined function) which may increase complexity and reduce performance. Follow this link to an example analysis notebook in Colab, where you play with code and open data. 

     

A High-level view of particle physics analysis

The input to the analysis work is a set of files containing event data. For each event, a large set of attributes  is provided, with details on the particles and physical quantities that are associated with it (photons, electrons, muons, jets, etc). Events are what comes from particle collisions collected at a detector, plus all the processing steps in between, to prepare it via reconstruction, calibration, etc. In other cases, event data is generated from simulations.

Data is sliced with projection and filter operations, then specific computations are processed for each event of interest. In the final processing steps, data is typically aggregated into one or more histograms. These are the output "plots" with physical quantities of interest.

Some good news, for data engines based on DataFrames and/or table abstractions, like Spark or SQL platforms, are: that event data have fixed schemas, moreover they are statistically independent, so you will typically not need to perform joins across events. Engines and data formats for columnar processing are also quite a good fit, as often only a subset of attributes is processed for a given analysis.

The hard part, for data processing engines, is that event data is nested, typically containing arrays. Moreover, complex formulas and in some cases algorithms, are needed to process event data, which require high efficiency in CPU utilization. Finally, there are tons of data, and many different tests to be executed to find the "good plot".


Example analyses: notebooks and open data





Lessons learned


Apache Spark API for HEP:
  • (+) The DataFrame API and Spark SQL work well for structured data like HEP data. Moreover, the key HEP data processing operations are, map, filter, and reduction to histograms, which are well implemented in Spark DataFrame API.
  • (+) Physics datasets consist of a large number (GBs to 100s of TBs) of statistically independent events, which can be processed in parallel. This fits well with the Spark execution model.
  • (+) Lazy evaluation in Spark allows building the analysis from small steps, each in a different piece of code, which helps exploration and allows detailed comments inside the code. All operations will be optimized together at the execution time (when an action is triggered such as fetching the histogram for plotting).
  • (+) the function width_bucket provides an acceptable solution for computing histograms with the DataFrame API and SQL.
  • (+) Spark DataFrame API and SQL can handle complex data types with arrays and structs. It implements explode and posexplode functions, it has several array functions, it also has higher order functions specialized for array processing. 
  • (-) Spark (3.2 and 3.3) does not implement the SQL UNNEST operator. Spark does not have functions to handle natively 4-vectors.
  • (-) Some of the complex data processing is hard to implement with the DataFrame API or SQL, and requires UDF.  
  
Data formats:
  • (+) Spark is optimized (with a vectorized reader) to ingest columnar formats such as Apache Parquet and ORC. This brings to the table performance-enhancing features such as: filter pushdown, min-max filtering with rowgroup and page index statistics, bloom filters. Spark has additional optimizations for handling complex data types (e.g. arrays) with ORC (Spark 3.2) and Parquet (Spark 3.3, see SPARK-34863).
  • (+/-) The Laurelin library  allows reading HEP specialized data format, ROOT. However, this is still experimental and not optimized for performance, rather to be used for format conversion.
  • (+/-) The examples reported here use data in a relatively flat structure (nanoaod format), which plays well with Spark DataFrame API. HEP data with more nested structures, which is common for HEP data in the recent past, introduces additional performance issue when using Spark.
  • (-) The large majority of HEP data is stored in ROOT format at present. This "adds friction" when using tools from industry and open source that do not fully support it.
   
Platform and ecosystem:

  • (+) PySpark works well on notebooks. Spark sessions can run locally and on clusters (stand-alone, YARN, Kubernetes) and this makes it a good building block for a data analysis platform. At CERN we have integrated the web analysis service, called SWAN, with Spark services running on YARN and Kubernetes.
  • (+) Spark integrates well with cloud environments. Connectors are available to major object stores, s3 and more. For CERN storage system EOS, there is the Hadoop-XRootD connector.
  • (+) Spark is a well know platform, with many libraries and integration available. Users like the idea of learning Spark as it is widely used in the industry.
  • (+/-) Hardware resources for physics are made available on HPC systems and on batch systems, some work to use the standalone cluster mode is needed there.
  
Performance:
  • (-) Python UDFs in Spark have improved their performance with the latest releases, but their need to serialize and deserialize the data passed to Python workers can take a considerable hit on performance, even when using Apache Arrow.
  • (-) The state-of-the-art platforms for HEP analysis have large parts written in C/C++ and optimized for performance of numerical computations on HEP data, typically using vectorized computations. Apache Spark (3.2) does not have vectorized execution.
  • (+/-) Using UDF written in Scala via PySpark can be useful to combine performance and advanced features (see benchmark examples Q6 and Q8), however, they add complexity and will require most users to spend time learning how to do this.
  • (+/-) Spark higher-order function for array processing are expressive, but their performance in Apache Spark (3.2) could be improved (compare the 2 solutions to benchmark Q7).
   
Note: these comments refer to the tests run for this work in 2022, using Apache Spark version 3.2.1 and 3.3.0.
  

Conclusions

  
Apache Spark provides a suitable API, platform, and ecosystem for High Energy Physics data analysis, with some caveats. The examples shown here demonstrate how PySpark on notebooks can be used to write simple analysis code and run it locally or at scale on clusters. Since several years, CERN runs notebooks service integrated with YARN and Kubernetes clusters and cloud storage.
Spark DataFrame API works surprisingly well for several simple HEP use cases, but it needs to be supplemented user defined functions (UDF) for the complex real-world cases. The performance of UDFs, in particular when written in Python, are a concern. Also a concern is the current need to read/convert files in ROOT format, as Spark is rather optimized for data formats common in industry, like Apache Parquet and ORC. 
What is reported here complements previous work on using Apache Spark for data reduction at scale and data preparation for a ML tasks (see references below).
Additional work is needed both on the HEP and Apache Spark sides to bring Apache Spark up-to-speed with specialized HEP analysis software in their optimization domain (see links below in related work).
  

Related work and acknowledgments

ROOT is the reference platform for running HEP data analysis, using C++ and also Python bindings. Its current evolution implements the dataframe abstraction, with "RDataframe"  and integrates with Apache Spark and Dask to scale out computations.

Coffea, Awkward ArrayUproot, ServiceX, are components of a suite of Python libraries and packages to build a HEP data analysis platform. The platform is integrated with Dask and Apache Spark, Parsl, and Work Queue Executor for scaling out computations.

The Laurelin library integrates with Apache Spark for reading ROOT files (by Andrew Melo). The Hadoop-XRootD connector integrates with Apache Spark to access "the root:// filesystem" (by the CERN Hadoop and Spark service).


The work on implementing the HEP benchmark with Apache Spark reported here, stems from:


Previous work on the topic of using Apache Spark for physics, for ML data preparation and data reduction at scale, include:


Many thanks go to Jim Pivarski, Lindsey Gray, Andrew Melo, Lukas Heinrich, Gordon Watts, Ghislain Fourny, Ingo Müller, for discussions. To Ruslan Dautkhanov and Hyukjin Kwon from Databricks for their support and work with mapInArrow, see SPARK-37227 and SPARK-30153. To the Hadoop and Spark team and the SWAN (platform for web-based analysis) team at CERN, in particular Riccardo Castellotti.

This work was done in the context of the Hadoop, Spark, and SWAN services at CERN, and of the data engineering efforts with the ATLAS experiment.


Friday, August 28, 2020

Apache Spark 3.0 Memory Monitoring Improvements

TLDR; Apache Spark 3.0 comes with many improvements, including new features for memory monitoring. This can help you troubleshooting memory usage and optimizing the memory configuration of your Spark jobs for better performance and stability, see SPARK-23429 and SPARK-27189.

The problem with memory
Memory is key for the performance and stability of Spark jobs. If you don't allocate enough memory for your Spark executors you are more likely to run into the much dreaded Java OOM (out of memory) errors or substantially degrade your jobs' performance. Memory is needed by Spark to execute efficiently Dataframe/RDD operations, and for improving the performance of algorithms that would otherwise have to swap to disk in their processing (e.g. shuffle operations), moreover, it can be used for caching data, reducing I/O. This is all good in theory, but in practice how do you know how much memory you need?

A basic solution
One first basic approach to memory sizing for Spark jobs, is to start by giving the executors ample amounts of memory, provided your systems has enough resources. For example, by setting the spark.executor.memory configuration parameter to several GBs. Note, in local mode you would set sprk.driver.memory instead. You can further tune the configuration by trial-and-error, by reducing and increasing memory with each test and observe the results. This approach may give good results quickly, but it is not a very solid approach to the problem.

A more structured approach to memory usage troubleshooting and to sizing memory for Spark jobs is to use monitoring data to understand how much memory is used by the Spark application, which jobs request more memory, and which memory areas are used, finally linking this back to the application details and in the context of other resources utilization (for example, CPU usage).
This approach helps with drilling down on issues of OOM, and also to be more precise in allocating memory for Spark applications, aiming at using just enough memory as needed, without wasting memory that can be a scarce shared resource in some systems. It is still an experimental and iterative process, but more informed than the basic trial-and-error solution.

How memory is allocated and used by Spark

Configuration of executor memory

The main configuration parameter used to request the allocation of executor memory is spark.executor.memory.Spark running on YARN, Kubernetes or Mesos, adds to that a memory overhead  to cover for additional memory usage (OS, redundancy, filesystem cache, off-heap allocations, etc), which is calculated as memory_overhead_factor * spark.executor.memory  (with a minimum of 384 MB). The overhead factor is 0.1 (10%), it can be configured when running on Kubernetes (only) using spark.kubernetes.memoryOverheadFactor.
When using PySpark additional memory can be allocated using spark.executor.pyspark.memory
Additional memory for off-heap allocation is configured using spark.memory.offHeap.size=<size> and spark.memory.offHeap.enabled=true. This works on YARN, for K8S, see SPARK-32661.  
Note also parameters for driver memory allocation: spark.driver.memory and spark.driver.memoryOverhead
Note: this covers recent versions of Spark at the time of this writing, notably Spark 3.0 and 2.4. See also Spark documentation.
  
Figure 1: Pictorial representation of the memory areas allocated and used by Spark executors and the main parameters for their configuration. 
  

Spark unified memory pool

Spark tasks allocate memory for execution and storage from the JVM heap of the executors using a unified memory pool managed by the Spark memory management systemUnified memory occupies by default 60% of the JVM heap: 0.6 * (spark.executor.memory - 300 MB). The factor 0.6 (60%) is the default value of the configuration parameter spark.memory.fraction. 300MB is a hard-coded value of "reserved memory". The rest of the memory is used for user data structures, internal metadata in Spark, and safeguarding against OOM errors. 
Spark manages execution and storage memory requests using the unified memory pool. When little execution memory is used, storage can acquire most of the available memory, and vice versa. Additional structure in the working of the storage and execution memory is exposed with the configuration parameter spark.memory.storageFraction (default is 0.5), which guarantees that the stored blocks will not be evicted from the unified memory by execution below the specified threshold.
The unified memory pool can optionally be allocated using off-heap memory, the relevant configuration parameters are: spark.memory.offHeap.size and spark.memory.offHeap.enabled
  

Opportunities for memory configuration settings

The first key configuration to get right is spark.executor.memory. Monitoring data (see the following paragraphs) can help you understand if you need to increase the memory allocated to Spark executors and or if you are already allocating plenty of memory and can consider reducing the memory footprint.
There are other memory-related configuration parameters that may need some adjustments for specific workloads: this can be analyzed and tested using memory monitoring data.
In particular, increasing spark.memory.fraction (default is 0.6) may be useful when deploying large Java heap, as there is a chance that you will not need to set aside 40% of the JVM heap for user memory. With similar reasoning, when using large Java heap allocation, manually setting spark.executor.memoryOverhead to a value lower than the default (0.1 * spark.executor.memory) can be tested.
  

Memory monitoring improvements in Spark 3.0

Two notable improvements in Spark 3.0 for memory monitoring are:
  • SPARK-23429: Add executor memory metrics to heartbeat and expose in executors REST API
    • see also the umbrella ticket SPARK-23206: Additional Memory Tuning Metrics
  • SPARK-27189: Add Executor metrics and memory usage instrumentation to the metrics system
When troubleshooting memory usage it is important to investigate how much memory was used as the workload progresses and measure peak values of memory usage. Peak values are particularly important, as this is where you get possible slow downs or even OOM errors. Spark 3.0 instrumentation adds monitoring data on the amount of memory used, drilling down on unified memory, and memory used by Python (when using PySpark). This is implemented using a new set of metrics called "executor metrics", and can be helpful for memory sizing and troubleshooting performance. 
    

Measuring memory usage and peak values using the REST API

An example of the data you can get from the REST API in Spark 3.0:

WebUI URL + /api/v1/applications/<application_id>/executors

Here below you can find a snippet of the peak executor memory metrics, sampled on a snapshot and limited to one of the executors used for testing:
"peakMemoryMetrics" : {
    "JVMHeapMemory" : 29487812552,
    "JVMOffHeapMemory" : 149957200,
    "OnHeapExecutionMemory" : 12458956272,
    "OffHeapExecutionMemory" : 0,
    "OnHeapStorageMemory" : 83578970,
    "OffHeapStorageMemory" : 0,
    "OnHeapUnifiedMemory" : 12540212490,
    "OffHeapUnifiedMemory" : 0,
    "DirectPoolMemory" : 66809076,
    "MappedPoolMemory" : 0,
    "ProcessTreeJVMVMemory" : 38084534272,
    "ProcessTreeJVMRSSMemory" : 36998328320,
    "ProcessTreePythonVMemory" : 0,
    "ProcessTreePythonRSSMemory" : 0,
    "ProcessTreeOtherVMemory" : 0,
    "ProcessTreeOtherRSSMemory" : 0,
    "MinorGCCount" : 561,
    "MinorGCTime" : 49918,
    "MajorGCCount" : 0,
    "MajorGCTime" : 0
  },

Notes:
  • Procfs metrics (SPARK-24958) provide a view on the process usage from "the OS point of observation".
    • Notably, procfs metrics provide a way to measure memory usage by Python, when using PySpark and in general other processes that may be spawned by Spark tasks.
  • Profs metrics are gathered conditionally:
    • if the /proc filesystem exists
    • if spark.executor.processTreeMetrics.enabled=true
    • The optional configuration spark.executor.metrics.pollingInterval allows to gather executor metrics at high frequency, see doc.
  • Additional improvements of the memory instrumentation via REST API (targeting Spark 3.1) are in "SPARK-23431 Expose the new executor memory metrics at the stage level".
  

Improvements to the Spark metrics system and Spark performance dashboard

The Spark metrics system based on the Dropwizard metrics library provides the data source to build a Spark performance dashboard. A dashboard naturally leads to time series visualization of Spark performance and workload metrics. Spark 3.0 instrumentation (SPARK-27189) hooks to the executor metrics data source and makes available the time series data with the evolution of memory usage. 
Some of the advantages of collecting metrics values and visualizing them with Grafana are:
  • The possibility to see the evolution of the metrics values in real time and to compare them with other key metrics of the workload. 
  • Metrics can be examined as aggregated values or drilled down at the executor level. This allows you to understand if there are outliers or stragglers.
  • It is possible to study the evolution of the metrics values with time and understand which part of the workload has generated certain spikes in a given metric, for example. It is also possible to annotate the dashboard graphs, as explained at this link, with details of query id, job id, and stage id.
Here are a few examples of dashboard graphs related to memory usage:


Figure 2: Graphs of memory-related  metrics collected and visualized using a Spark performance dashboard. Metrics reported in the figure are: Java heap memory, RSS memory, Execution memory, and Storage memory. The Grafana dashboard allows us to drill down on the metrics values per executor. These types of plots can be used to study the time evolution of key metrics.
  

What if you are using Spark 2.x?

Some monitoring features related to memory usage are already available in Spark 2.x and still useful in Spark 3.0:
  • Task metrics are available in the REST API and in the dropwizard-based metrics and provide information:
    • Garbage Collection time: when garbage collection takes a significant amount of time typically you want to investigate for the need for allocating more memory (or reducing memory usage).
    • Shuffle-related metrics: memory can prevent some shuffle operations with I/O to storage and be beneficial for performance.
    • Task peak execution memory metric.
  • The WebUI reports storage memory usage per executor.
  • Spark dropwizard-based metrics system provides a JVM source with memory-related utilization metrics.

  

Lab configuration:

When experimenting and trying to get a grasp for the many parameters related to memory and monitoring, I found it useful to set up a small test workload. Some notes on the setup I used:


bin/spark-shell --master yarn --num-executors 16 --executor-cores 8 \
--driver-memory 4g --executor-memory 32g \
--jars /home/luca/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar \
--conf spark.eventLog.enabled=false \
--conf spark.sql.shuffle.partitions=512 \
--conf spark.sql.autoBroadcastJoinThreshold=100000000 \
--conf spark.executor.processTreeMetrics.enabled=true
  
  
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(spark.sqlContext, "/home/luca/tpcds-kit/tools","1500")
tables.createTemporaryTables("/project/spark/TPCDS/tpcds_1500_parquet_1.10.1", "parquet")
val tpcds = new com.databricks.spark.sql.perf.tpcds.TPCDS(spark.sqlContext)
val experiment = tpcds.runExperiment(tpcds.tpcds2_4Queries)

  

Limitations and caveats

  • Spark metrics and instrumentation are still an area in active development. There is room for improvement both in their implementation and documentation. I found that some of the metrics may be difficult to understand or may present what looks like strange behaviors in some circumstances. In general, more testing and sharing experience between Spark users may be highly beneficial for further improving Spark instrumentation.
  • The tools and methods discussed here are based on metrics, they are reactive by nature, and suitable for troubleshooting and iterative experimentation.
  • This post is centered on  describing Spark 3.0 new features for memory monitoring and how you can experiment with them. A key piece left for future work is to show some real-world examples of troubleshooting using memory metrics and instrumentation.
  • For the scope of this post, we assume that the workload to troubleshoot is a black box and that we just want to try to optimize the memory allocation  and use. This post does not cover techniques to improve the memory footprint of Spark jobs, however, they are very important for correctly using Spark. Examples of techniques that are useful in this area are: implementing the correct partitioning scheme for the data and operations, reducing partition skew, using the appropriate join mechanisms, streamlining caching, and many others, covered elsewhere. 
  

References

Talks:
Spark documentation and blogs:

JIRAs:  SPARK-23206SPARK-23429 and SPARK-27189 contain most of the details of the improvements in Apache Spark discussed here.

  

Conclusions and acknowledgments

It is important to correctly size memory configurations for Spark applications. This improves performance, stability, and resource utilization in multi-tenant environments. Spark 3.0 has important improvements to memory monitoring instrumentation. The analysis of peak memory usage, and of memory use broken down by area and plotted as a function of time, provide important insights for troubleshooting OOM errors and for Spark job memory sizing.   
Many thanks to the Apache Spark community, and in particular the committers and reviewers who have helped with the improvements in SPARK-27189.
This work has been developed in the context of the data analytics services at CERN, many thanks to my colleagues for help and suggestions.