We are in a golden age for distributed data processing, with an abundance of tools and solutions emerging from industry and open source. High Energy Physics (HEP) experiments at the LHC stand to profit from all this progress, as they are data-intensive operations with several hundreds of Petabytes of data to collect and process.
This post collects a few examples of code and open data, where Apache Spark, a very popular tool in industry and open source, is used for a few simple HEP data analyses. This post aims to be a general overview both for physicists wanting to know more about what Spark can do, and for data scientists wanting to get a feeling of what a HEP data analysis looks like.
The code and discussion here are proposed as a technology exploration and do not reflect any particular official activity by an experiment team.
This post comes with a series of notebooks at this link.
TLDR; Apache Spark (PySpark) APIs can easily be used for simple HEP analysis tasks, for example running the analysis in notebook environments, and profiting of a cluster infrastructure for computing power. Complex analyses can be challenging to implement and often require to develop UDF (user defined function) which may increase complexity and reduce performance. Follow this link to an example analysis notebook in Colab, where you play with code and open data.
A High-level view of particle physics analysis
The input to the analysis work is a set of files containing event data. For each event, a large set of attributes is provided, with details on the particles and physical quantities that are associated with it (photons, electrons, muons, jets, etc). Events are what comes from particle collisions collected at a detector, plus all the processing steps in between, to prepare it via reconstruction, calibration, etc. In other cases, event data is generated from simulations.
Data is sliced with projection and filter operations, then specific computations are processed for each event of interest. In the final processing steps, data is typically aggregated into one or more histograms. These are the output "plots" with physical quantities of interest.
Some good news, for data engines based on DataFrames and/or table abstractions, like Spark or SQL platforms, are: that event data have fixed schemas, moreover they are statistically independent, so you will typically not need to perform joins across events. Engines and data formats for columnar processing are also quite a good fit, as often only a subset of attributes is processed for a given analysis.
The hard part, for data processing engines, is that event data is nested, typically containing arrays. Moreover, complex formulas and in some cases algorithms, are needed to process event data, which require high efficiency in CPU utilization. Finally, there are tons of data, and many different tests to be executed to find the "good plot".
Example analyses: notebooks and open data
- Dimuon mass spectrum analysis link to the notebooks and data:
This is a sort of "Hello World!" example for High Energy Physics analysis.
It's a dimuon mass spectrum analysis accompanied by open data with 6.5 billion events (200 GB). You can find there what HEP data looks like, how to perform simple filters and reduction to histograms, finally rewarded by a nice plot! - HEP benchmark, link to the notebooks and data: This implements a series of tasks typical of Particle Physics analysis, organized as a benchmark with an accompanying dataset of 53 million events from CERN open data. You can find there example of how to process array data for HEP using the Spark DataFrame API and and some examples of UDF.
- Atlas Higgs boson analysis of the decay channel H - ZZ* - 4lep. This is an outreach-style analysis directly inspired by the original analysis and ATLAS paper for the discovery of the Higgs boson.
- LHCb matter antimatter asymmetries analysis, outreach-style: An analysis meant for outreach by the LHCb experiment, re-implemented using the Apache Spark DataFrame API.
- Notes:
Lessons learned
- (+) The DataFrame API and Spark SQL work well for structured data like HEP data. Moreover, the key HEP data processing operations are, map, filter, and reduction to histograms, which are well implemented in Spark DataFrame API.
- (+) Physics datasets consist of a large number (GBs to 100s of TBs) of statistically independent events, which can be processed in parallel. This fits well with the Spark execution model.
- (+) Lazy evaluation in Spark allows building the analysis from small steps, each in a different piece of code, which helps exploration and allows detailed comments inside the code. All operations will be optimized together at the execution time (when an action is triggered such as fetching the histogram for plotting).
- (+) the function width_bucket provides an acceptable solution for computing histograms with the DataFrame API and SQL.
- (+) Spark DataFrame API and SQL can handle complex data types with arrays and structs. It implements explode and posexplode functions, it has several array functions, it also has higher order functions specialized for array processing.
- (-) Spark (3.2 and 3.3) does not implement the SQL UNNEST operator. Spark does not have functions to handle natively 4-vectors.
- (-) Some of the complex data processing is hard to implement with the DataFrame API or SQL, and requires UDF.
- (+) Spark is optimized (with a vectorized reader) to ingest columnar formats such as Apache Parquet and ORC. This brings to the table performance-enhancing features such as: filter pushdown, min-max filtering with rowgroup and page index statistics, bloom filters. Spark has additional optimizations for handling complex data types (e.g. arrays) with ORC (Spark 3.2) and Parquet (Spark 3.3, see SPARK-34863).
- (+/-) The Laurelin library allows reading HEP specialized data format, ROOT. However, this is still experimental and not optimized for performance, rather to be used for format conversion.
- (+/-) The examples reported here use data in a relatively flat structure (nanoaod format), which plays well with Spark DataFrame API. HEP data with more nested structures, which is common for HEP data in the recent past, introduces additional performance issue when using Spark.
- (-) The large majority of HEP data is stored in ROOT format at present. This "adds friction" when using tools from industry and open source that do not fully support it.
- (+) PySpark works well on notebooks. Spark sessions can run locally and on clusters (stand-alone, YARN, Kubernetes) and this makes it a good building block for a data analysis platform. At CERN we have integrated the web analysis service, called SWAN, with Spark services running on YARN and Kubernetes.
- (+) Spark integrates well with cloud environments. Connectors are available to major object stores, s3 and more. For CERN storage system EOS, there is the Hadoop-XRootD connector.
- (+) Spark is a well know platform, with many libraries and integration available. Users like the idea of learning Spark as it is widely used in the industry.
- (+/-) Hardware resources for physics are made available on HPC systems and on batch systems, some work to use the standalone cluster mode is needed there.
- (-) Python UDFs in Spark have improved their performance with the latest releases, but their need to serialize and deserialize the data passed to Python workers can take a considerable hit on performance, even when using Apache Arrow.
- (-) The state-of-the-art platforms for HEP analysis have large parts written in C/C++ and optimized for performance of numerical computations on HEP data, typically using vectorized computations. Apache Spark (3.2) does not have vectorized execution.
- (+/-) Using UDF written in Scala via PySpark can be useful to combine performance and advanced features (see benchmark examples Q6 and Q8), however, they add complexity and will require most users to spend time learning how to do this.
- (+/-) Spark higher-order function for array processing are expressive, but their performance in Apache Spark (3.2) could be improved (compare the 2 solutions to benchmark Q7).
Conclusions
What is reported here complements previous work on using Apache Spark for data reduction at scale and data preparation for a ML tasks (see references below).
Related work and acknowledgments
ROOT is the reference platform for running HEP data analysis, using C++ and also Python bindings. Its current evolution implements the dataframe abstraction, with "RDataframe" and integrates with Apache Spark and Dask to scale out computations.
Coffea, Awkward Array, Uproot, ServiceX, are components of a suite of Python libraries and packages to build a HEP data analysis platform. The platform is integrated with Dask and Apache Spark, Parsl, and Work Queue Executor for scaling out computations.
The Laurelin library integrates with Apache Spark for reading ROOT files (by Andrew Melo). The Hadoop-XRootD connector integrates with Apache Spark to access "the root:// filesystem" (by the CERN Hadoop and Spark service).
The work on implementing the HEP benchmark with Apache Spark reported here, stems from:
- The IRIS-HEP benchmark specifications and solutions linked there.
The article and related code: Evaluating Query Languages and Systems for High-Energy Physics Data.
Previous work on the topic of using Apache Spark for physics, for ML data preparation and data reduction at scale, include:
- Machine Learning Pipelines with Modern Big Data Tools for High Energy Physics, Matteo Migliorini, Riccardo Castellotti, Luca Canali, Marco Zanetti, Comput Softw Big Sci 4, 8 (2020).
- Using Big Data Technologies for HEP Analysis, M. Cremonesi et al., EPJ Web of Conferences 214, 06030 (2019)
- CMS Analysis and Data Reduction with Apache Spark, O. Gutsche et al. 2018 J. Phys.: Conf. Ser.1085 042030
- Big Data Tools and Cloud Services for High Energy Physics Analysis in TOTEM Experiment, V. Avati et al., 2018, Proceeding of: 2018 IEEE/ACM International Conference on Utility and Cloud Computing Companion (UCC Companion)
Many thanks go to Jim Pivarski, Lindsey Gray, Andrew Melo, Lukas Heinrich, Gordon Watts, Ghislain Fourny, Ingo Müller, for discussions. To Ruslan Dautkhanov and Hyukjin Kwon from Databricks for their support and work with mapInArrow, see SPARK-37227 and SPARK-30153. To the Hadoop and Spark team and the SWAN (platform for web-based analysis) team at CERN, in particular Riccardo Castellotti.
This work was done in the context of the Hadoop, Spark, and SWAN services at CERN, and of the data engineering efforts with the ATLAS experiment.
No comments:
Post a Comment