Thursday, March 10, 2022

Can High Energy Physics Analysis Profit from Apache Spark APIs?

We are in a golden age for distributed data processing, with an abundance of tools and solutions emerging from industry and open source. High Energy Physics (HEP) experiments at the LHC stand to profit from all this progress, as they are data-intensive operations with several hundreds of Petabytes of data to collect and process.

This post collects a few examples of code and open data, where Apache Spark, a very popular tool in industry and open source, is used for a few simple HEP data analyses. This post aims to be a general overview both for physicists wanting to know more about what Spark can do, and for data scientists wanting to get a feeling of what a HEP data analysis looks like.
The code and discussion here are proposed as a technology exploration and do not reflect any particular official activity by an experiment team.
This post comes with a series of notebooks at this link.

TLDR; Apache Spark (PySpark) APIs can easily be used for simple HEP analysis tasks, for example running the analysis in notebook environments, and profiting of a cluster infrastructure for computing power. Complex analyses can be challenging to implement and often require to develop UDF (user defined function) which may increase complexity and reduce performance. Follow this link to an example analysis notebook in Colab, where you play with code and open data. 

     

A High-level view of particle physics analysis

The input to the analysis work is a set of files containing event data. For each event, a large set of attributes  is provided, with details on the particles and physical quantities that are associated with it (photons, electrons, muons, jets, etc). Events are what comes from particle collisions collected at a detector, plus all the processing steps in between, to prepare it via reconstruction, calibration, etc. In other cases, event data is generated from simulations.

Data is sliced with projection and filter operations, then specific computations are processed for each event of interest. In the final processing steps, data is typically aggregated into one or more histograms. These are the output "plots" with physical quantities of interest.

Some good news, for data engines based on DataFrames and/or table abstractions, like Spark or SQL platforms, are: that event data have fixed schemas, moreover they are statistically independent, so you will typically not need to perform joins across events. Engines and data formats for columnar processing are also quite a good fit, as often only a subset of attributes is processed for a given analysis.

The hard part, for data processing engines, is that event data is nested, typically containing arrays. Moreover, complex formulas and in some cases algorithms, are needed to process event data, which require high efficiency in CPU utilization. Finally, there are tons of data, and many different tests to be executed to find the "good plot".


Example analyses: notebooks and open data





Lessons learned


Apache Spark API for HEP:
  • (+) The DataFrame API and Spark SQL work well for structured data like HEP data. Moreover, the key HEP data processing operations are, map, filter, and reduction to histograms, which are well implemented in Spark DataFrame API.
  • (+) Physics datasets consist of a large number (GBs to 100s of TBs) of statistically independent events, which can be processed in parallel. This fits well with the Spark execution model.
  • (+) Lazy evaluation in Spark allows building the analysis from small steps, each in a different piece of code, which helps exploration and allows detailed comments inside the code. All operations will be optimized together at the execution time (when an action is triggered such as fetching the histogram for plotting).
  • (+) the function width_bucket provides an acceptable solution for computing histograms with the DataFrame API and SQL.
  • (+) Spark DataFrame API and SQL can handle complex data types with arrays and structs. It implements explode and posexplode functions, it has several array functions, it also has higher order functions specialized for array processing. 
  • (-) Spark (3.2 and 3.3) does not implement the SQL UNNEST operator. Spark does not have functions to handle natively 4-vectors.
  • (-) Some of the complex data processing is hard to implement with the DataFrame API or SQL, and requires UDF.  
  
Data formats:
  • (+) Spark is optimized (with a vectorized reader) to ingest columnar formats such as Apache Parquet and ORC. This brings to the table performance-enhancing features such as: filter pushdown, min-max filtering with rowgroup and page index statistics, bloom filters. Spark has additional optimizations for handling complex data types (e.g. arrays) with ORC (Spark 3.2) and Parquet (Spark 3.3, see SPARK-34863).
  • (+/-) The Laurelin library  allows reading HEP specialized data format, ROOT. However, this is still experimental and not optimized for performance, rather to be used for format conversion.
  • (+/-) The examples reported here use data in a relatively flat structure (nanoaod format), which plays well with Spark DataFrame API. HEP data with more nested structures, which is common for HEP data in the recent past, introduces additional performance issue when using Spark.
  • (-) The large majority of HEP data is stored in ROOT format at present. This "adds friction" when using tools from industry and open source that do not fully support it.
   
Platform and ecosystem:

  • (+) PySpark works well on notebooks. Spark sessions can run locally and on clusters (stand-alone, YARN, Kubernetes) and this makes it a good building block for a data analysis platform. At CERN we have integrated the web analysis service, called SWAN, with Spark services running on YARN and Kubernetes.
  • (+) Spark integrates well with cloud environments. Connectors are available to major object stores, s3 and more. For CERN storage system EOS, there is the Hadoop-XRootD connector.
  • (+) Spark is a well know platform, with many libraries and integration available. Users like the idea of learning Spark as it is widely used in the industry.
  • (+/-) Hardware resources for physics are made available on HPC systems and on batch systems, some work to use the standalone cluster mode is needed there.
  
Performance:
  • (-) Python UDFs in Spark have improved their performance with the latest releases, but their need to serialize and deserialize the data passed to Python workers can take a considerable hit on performance, even when using Apache Arrow.
  • (-) The state-of-the-art platforms for HEP analysis have large parts written in C/C++ and optimized for performance of numerical computations on HEP data, typically using vectorized computations. Apache Spark (3.2) does not have vectorized execution.
  • (+/-) Using UDF written in Scala via PySpark can be useful to combine performance and advanced features (see benchmark examples Q6 and Q8), however, they add complexity and will require most users to spend time learning how to do this.
  • (+/-) Spark higher-order function for array processing are expressive, but their performance in Apache Spark (3.2) could be improved (compare the 2 solutions to benchmark Q7).
   
Note: these comments refer to the tests run for this work in 2022, using Apache Spark version 3.2.1 and 3.3.0.
  

Conclusions

  
Apache Spark provides a suitable API, platform, and ecosystem for High Energy Physics data analysis, with some caveats. The examples shown here demonstrate how PySpark on notebooks can be used to write simple analysis code and run it locally or at scale on clusters. Since several years, CERN runs notebooks service integrated with YARN and Kubernetes clusters and cloud storage.
Spark DataFrame API works surprisingly well for several simple HEP use cases, but it needs to be supplemented user defined functions (UDF) for the complex real-world cases. The performance of UDFs, in particular when written in Python, are a concern. Also a concern is the current need to read/convert files in ROOT format, as Spark is rather optimized for data formats common in industry, like Apache Parquet and ORC. 
What is reported here complements previous work on using Apache Spark for data reduction at scale and data preparation for a ML tasks (see references below).
Additional work is needed both on the HEP and Apache Spark sides to bring Apache Spark up-to-speed with specialized HEP analysis software in their optimization domain (see links below in related work).
  

Related work and acknowledgments

ROOT is the reference platform for running HEP data analysis, using C++ and also Python bindings. Its current evolution implements the dataframe abstraction, with "RDataframe"  and integrates with Apache Spark and Dask to scale out computations.

Coffea, Awkward ArrayUproot, ServiceX, are components of a suite of Python libraries and packages to build a HEP data analysis platform. The platform is integrated with Dask and Apache Spark, Parsl, and Work Queue Executor for scaling out computations.

The Laurelin library integrates with Apache Spark for reading ROOT files (by Andrew Melo). The Hadoop-XRootD connector integrates with Apache Spark to access "the root:// filesystem" (by the CERN Hadoop and Spark service).


The work on implementing the HEP benchmark with Apache Spark reported here, stems from:


Previous work on the topic of using Apache Spark for physics, for ML data preparation and data reduction at scale, include:


Many thanks go to Jim Pivarski, Lindsey Gray, Andrew Melo, Lukas Heinrich, Gordon Watts, Ghislain Fourny, Ingo Müller, for discussions. To Ruslan Dautkhanov and Hyukjin Kwon from Databricks for their support and work with mapInArrow, see SPARK-37227 and SPARK-30153. To the Hadoop and Spark team and the SWAN (platform for web-based analysis) team at CERN, in particular Riccardo Castellotti.

This work was done in the context of the Hadoop, Spark, and SWAN services at CERN, and of the data engineering efforts with the ATLAS experiment.


No comments:

Post a Comment