Monday, March 17, 2025

ATLAS DCS Analysis with Apache Spark and Jupyter Notebooks

The ATLAS Detector Control System (DCS) at CERN is essential for ensuring optimal detector performance. Each year, the system generates tens of billions of time-stamped sensor readings, presenting considerable challenges for large-scale data analysis. Although these data are stored in Oracle databases that excel in real-time transactional processing, the configuration—optimized with limited CPU resources to manage licensing costs—makes them less suited for extensive historical time-series analysis.

To overcome these challenges, a modern data pipeline has been developed that leverages Apache Spark, CERN’s Hadoop service, and the Service for Web-based Analysis (SWAN) platform. This scalable, high-performance framework enables researchers to efficiently process and analyze DCS data over extended periods, unlocking valuable insights into detector operations. By integrating advanced big data technologies, the new system enhances performance monitoring, aids in troubleshooting Data Acquisition (DAQ) link failures, and supports predictive maintenance, thereby ensuring the continued reliability of the ATLAS detector systems.

Note: this blog post is a reduced version of the article Advancing ATLAS DCS Data Analysis with a Modern Data Platform by Luca Canali, Andrea Formica and Michelle Solis.


The Data Pipeline: From Storage to Analysis

Figure 1: Overview of the Big Data architecture for Detector Control System (DCS) data analysis. The system integrates data from Oracle databases (including DCS, luminosity, and run information) and file-based metadata and mappings into the Hadoop ecosystem using Parquet files. Apache Spark serves as the core processing engine, enabling scalable analysis within an interactive environment powered by Jupyter notebooks on CERN SWAN. Reproduced with permission from Advancing ATLAS DCS Data Analysis with a Modern Data Platform.


Data Storage in Oracle Databases

The ATLAS Detector Control System (DCS) data is primarily stored in Oracle databases using a commercial product, the WinCC OA system, optimized for real-time monitoring and transactional operations. Each detector’s data is managed within dedicated database schemas, ensuring structured organization and efficient access.

At the core of this storage model is the EVENTHISTORY table, a high-volume repository that records sensor IDs, timestamps, and measurement values across thousands of monitoring channels. This table grows rapidly, exceeding one billion rows annually, requiring advanced partitioning strategies to facilitate efficient data access. To improve performance, range partitioning is implemented, segmenting the table into smaller, manageable partitions based on predefined time intervals, such as monthly partitions.

Since direct querying of this vast dataset for large-scale analysis can impose a heavy load on the production Oracle systems, a read-only replica copy, is used as the data source for many data querying use cases and for data extraction into CERN’s Hadoop-based analytics platform. This approach ensures that the primary database remains unaffected by analytical workloads, allowing detector experts to access and process historical data efficiently without impacting real-time operations.


Leveraging CERN’s Hadoop Service

To address the challenges of handling large-scale DCS data analysis, CERN’s Hadoop cluster, Analytix, provides a scalable and high-performance infrastructure tailored for parallelized computation and distributed storage. With over 1,400 physical cores and 20 PB of distributed storage, it enables efficient ingestion, processing, and querying of massive datasets.

Currently, approximately 3 TB of DCS data—representing 30% of the total available records—has been migrated into the Hadoop ecosystem, covering data from 2022 onward. Data extraction is performed via Apache Spark, leveraging the Spark JDBC connector to read from the read-only Oracle replica. Daily import jobs incrementally update the core EVENTHISTORY table, appending new records without reprocessing the entire dataset. Smaller, less dynamic tables undergo full replacements to maintain consistency.

For optimized storage and performance, all ingested data is converted to Apache Parquet format, a columnar storage system designed for high-speed analytical queries. The dataset is partitioned by day, enabling partition pruning—a technique that allows queries to efficiently filter relevant time slices, significantly reducing query execution times. The system can use Spark's parallel processing to rapidly process queries that target billions of individual data rows, completing such operations in just a few seconds and making it an ideal solution for correlation studies, anomaly detection, and long-term trend analysis of detector performance.

This modern data pipeline integrates seamlessly with CERN’s Jupyter notebooks service (SWAN), providing detector experts with a Python-based interactive environment for exploratory data analysis, visualization, and machine learning applications. The combination of Apache Spark, Parquet, and Hadoop enables the scalable processing of DCS data, facilitating key analyses such as monitoring DAQ link instabilities, tracking high-voltage performance, and diagnosing hardware failures in the ATLAS New Small Wheel (NSW) detector.


The Role of Apache Spark

Apache Spark plays a pivotal role in transforming how this data is accessed and analyzed. The Spark-based data pipeline extracts data from a read-only replica of the primary production database, ensuring minimal disruption to live operations. Using JDBC connectivity, Spark jobs are scheduled to run daily, incrementally updating Parquet files stored in CERN’s Hadoop cluster.

Key optimizations include:

  • Partitioning: Data is partitioned by day to facilitate faster querying and improved storage efficiency.

  • Incremental Updates: Only new data is ingested daily, preventing redundant processing.

  • Columnar Storage with Parquet: Apache Parquet enables efficient data retrieval, reducing query execution time and storage costs.


Extracting Data from Oracle using Apache Spark

Apache Spark plays a pivotal role in transforming how this data is accessed and analyzed. The Spark-based data pipeline extracts data from a read-only replica of the primary production database, ensuring minimal disruption to live operations. Using JDBC connectivity, Spark jobs are scheduled to run daily, incrementally updating Parquet files stored in CERN’s Hadoop cluster.

Below is an example of how to create a Spark DataFrame that reads from an Oracle table using JDBC:

Run Oracle free 23ai on a container from gvenzl dockerhub repo https://github.com/gvenzl/oci-oracle-free

  • docker run -d --name mydb1 -e ORACLE_PASSWORD=oracle -p 1521:1521 gvenzl/oracle-free:23-slim
  • wait till the DB is fully started by checking the progress of the startup log at: docker logs -f mydb1
You need an Oracle client JDBC jar, available in Maven Central or download from the Oracle website:
bin/pyspark --packages com.oracle.database.jdbc:ojdbc11:23.7.0.25.01


Edit with the target database username:

db_user = "system"

Database server connection string (modify for the actual setup):

db_connect_string = "localhost:1521/FREEPDB1"

Database password:

db_pass = "oracle"

Query to extract data from the target database (example query):

myquery = "SELECT rownum AS id FROM dual CONNECT BY level<=10"

Mapping the Oracle query/table to a Spark DataFrame:

df = (spark.read.format("jdbc")
           .option("url", f"jdbc:oracle:thin:@{db_connect_string}")
           .option("driver", "oracle.jdbc.driver.OracleDriver")
           .option("query", myquery)
           .option("user", db_user)
           .option("password", db_pass)
           .option("fetchsize", 10000)
           .load())

Show schema and data for testing purposes:

df.printSchema()
df.show()


For more details on using Spark to read data from Oracle databases, see this note.


Implementing Time Partitioning

To efficiently partition the data by time, a custom post-processing code in PySpark is used. Below is an example of how partitioning is applied:

Import necessary functions:

from pyspark.sql.functions import col, year, month, dayofmonth

Read data from Oracle as a DataFrame:

df = (spark.read.format("jdbc")
           .option("url", f"jdbc:oracle:thin:@{db_connect_string}")
           .option("driver", "oracle.jdbc.driver.OracleDriver")
           .option("dbtable", "EVENTHISTORY")
           .option("user", db_user)
           .option("password", db_pass)
           .option("fetchsize", 10000)
           .load())

# Extract partitioning keys (year, month, day) from the 'timestamp' column

df = df.withColumn("year", year(col("timestamp"))) \
       .withColumn("month", month(col("timestamp"))) \
       .withColumn("day", dayofmonth(col("timestamp")))

# Write the DataFrame as Parquet files partitioned by year, month, and day

output_path = "hdfs://path/to/output_directory"

df.write.partitionBy("year", "month", "day").parquet(output_path)


For more details on writing data to Parquet with Spark, see this note.


Analysis Framework: A User-Friendly Approach

Apache Spark as the Core Processing Engine

The Apache Spark ecosystem allows for seamless querying and processing of vast datasets. Spark DataFrames and Spark SQL APIs offer a familiar and flexible interface for data manipulation, similar to Pandas for Python users. By enabling distributed computation, Spark ensures that billions of rows can be processed within seconds.

Benefits of Spark in the ATLAS DCS framework:

  • Scalability and Performance: Spark efficiently uses the available cores on each node and distributes workloads across multiple nodes.

  • Powerful APIs: Spark natively uses the DataFrame API and also makes available the SQL language, both provide for powerful and expressive APIs to boost performance. 

  • Fault Tolerance: Spark has a proven architecture that provides automatic recovery and retries from many type of failures in a distributed environment.


Platform integration with Jupyter notebooks and Spark

Front-end analysis is conducted via Jupyter notebooks on the CERN’s SWAN platform, offering researchers an interactive and intuitive interface. Key capabilities include:

  • Spark integration: A dedicated component, the Spark Connector, abstracts the complexities of Spark configuration, ensuring seamless interaction with the Hadoop ecosystem.

  • Python environment and Dynamic Visualization: The platform harnesses the robust Python ecosystem for data processing, enabling the dynamic creation of tables, charts, and plots.

  • Data Integration: Seamless connectivity to diverse data sources—including Oracle databases and web services—simplifies the integration process, providing comprehensive access to all relevant data.


Figure 2: Analysis of ATLAS Detector Control System (DCS) data using Python and Apache Spark. The figure highlights specific elements of the ATLAS New Small Wheel (NSW) MicroMegas (MMG) subdetector that exhibited unstable behavior, prompting further investigation. This visualization was generated using a modern data platform that integrates Jupyter notebooks, CERN’s Hadoop service, and Spark-based analytics. The approach enables large-scale processing and efficient troubleshooting of detector performance. Reproduced with permission from Advancing ATLAS DCS Data Analysis with a Modern Data Platform.


Future Enhancements

To further optimize scalability, performance, and analytical capabilities, we are exploring several key improvements:

  • Kubernetes for Spark Orchestration: Moving from a Hadoop-based cluster to Kubernetes-managed Spark deployments will streamline resource allocation, optimize workload scheduling, and enable dynamic scaling during peak analysis periods. This transition also facilitates a smoother shift toward cloud-based architectures.

  • Cloud Storage Solutions: We are evaluating cloud-based storage options such as Amazon S3, which would further ease migration to a cloud environment and enhance data accessibility and scalability.

  • Advanced Data Formats: We are considering the adoption of modern data formats like Apache Iceberg and Delta Lake. These formats offer improved data ingestion workflows, better query performance and support for evolving data schemas, and enhanced data management capabilities in general.

  • Machine Learning and AI Integration: Leveraging GPU resources available on CERN’s SWAN platform will enable advanced machine learning techniques for predictive analytics, anomaly detection, and automated troubleshooting. This integration aims to identify detector inefficiencies and potential failures in real time, ultimately improving operational reliability and reducing downtime.

These enhancements aim to future-proof the DCS data analysis framework, ensuring it remains a highly efficient, scalable, and adaptable platform for ongoing and future ATLAS detector operations.


Conclusion

The integration of Apache Spark with CERN’s Hadoop infrastructure and CERN's Notebook service, has significantly enhanced ATLAS DCS data processing and analysis, by enabling a scalable, high-performance, and user-friendly platform. This framework empowers researchers to extract meaningful insights, enhance detector performance monitoring, and streamline troubleshooting processes, significantly improving operational efficiency. As the project continues to evolve, the adoption of cloud-based storage, Kubernetes orchestration, and AI-driven analytics will further enhance the platform’s capabilities supporting the needs of the scientific and engineering community.


Acknowledgements and Links

This work is based on the article Advancing ATLAS DCS Data Analysis with a Modern Data Platform by Luca Canali, Andrea Formica and Michelle Solis. Many thanks to our ATLAS colleagues, in particular from the ADAM (Atlas Data and Metadata) team and ATLAS DCS. Special thanks to the CERN Databases and Data Analytics group for their help and support with Oracle, Hadoop, SWAN and Spark services.

Additional links and notes:

 


Thursday, February 27, 2025

Kepler’s Mars Orbit Analysis with Python Notebooks & AI-Assisted Coding

Johannes Kepler’s analysis of Mars’ orbit stands as one of the greatest achievements in scientific history, revealing the elliptical nature of planetary paths and establishing the foundational laws of planetary motion. In this post, you will explore how you can recreate Kepler’s revolutionary findings using Python’s robust data science ecosystem. 
 
Our goal is not to produce a specialized scientific paper but to provide a clear, interactive, and visually appealing demonstration suitable for a broad audience. Python libraries like NumPy, Pandas, SciPy, and Matplotlib, provide an efficient environment for numerical computations, data manipulation, and visualization. Jupyter Notebooks further enhance this process by providing an interactive and user-friendly platform to run code, visualize results, and document your insights clearly. Additionally, AI-assisted coding significantly simplifies technical tasks such as ellipse fitting, data interpolation, and creating insightful visualizations. This integration allows us to focus more on understanding the insights behind Kepler’s discoveries, making complex analyses accessible and engaging.

In this post, we’ll explore how you can recreate Kepler’s revolutionary findings using Python’s robust data science ecosystem. Our goal is not to produce a specialized scientific paper but to provide a clear, interactive, and visually appealing demonstration suitable for a broad audience.  
Python libraries like NumPy, Pandas, SciPy, and Matplotlib, provide an efficient environment for numerical computations, data manipulation, and visualization. Jupyter Notebooks further enhance this process by providing an interactive and user-friendly platform to run code, visualize results, and document your insights clearly.

Additionally, AI-assisted coding significantly simplifies technical tasks such as ellipse fitting, data interpolation, and creating insightful visualizations. This integration allows us to focus more on understanding the insights behind Kepler’s discoveries, making complex analyses accessible and engaging.

This project showcases:

  • A structured approach to data analysis using a handful of short Jupyter Notebooks.
  • How Python’s ecosystem (NumPy, Pandas, SciPy, Matplotlib) facilitates computational research.
  • The benefits of AI-assisted coding in accelerating development and improving workflow efficiency.
  • An interactive, visually engaging reproduction of Kepler’s findings.

The full code and notebooks are available at: GitHub Repository


Jupyter Notebooks and AI-Assisted Coding: A Powerful Combination for Data Science

Jupyter Notebooks have become the standard environment for data science, offering an interactive and flexible platform for scientific computing. They can be run on local machines or cloud services such as Google Colab, Amazon SageMaker, IBM Watson Studio, Microsoft Azure, GitHub Codesopaces, Databricks, etc. CERN users can also run the notebooks on the CERN-hosted Jupyter notebooks service SWAN (Service for Web-based ANalysis), a widely popular service used by engineers and physicists across CERN for large-scale scientific analysis.

How Python and AI Tools Enhance This Project

  • Data Interpolation & Curve Fitting: Python libraries like SciPy and AI-assisted tools help generate optimal curve fits in seconds.

  • Plotting & Visualization: AI-driven code completion and Matplotlib make it easier and faster to generate plots.

  • Error Handling & Debugging: AI suggestions help identify and fix errors quickly, improving workflow efficiency.

  • Exploring Alternative Approaches: AI can suggest different computational methods, allowing for a more robust and exploratory approach to the analysis.

Why Use Jupyter Notebooks and AI-Assisted Coding?

  • Saves Time: Avoids writing repetitive, boilerplate code.

  • Enhances Accuracy: Reduces human error in complex calculations.

  • Boosts Creativity: Frees up cognitive resources to focus on insights rather than syntax.

  • Flexible & Scalable: Python notebooks can be used locally or on powerful cloud-based platforms for large-scale computations.

  • Widely Adopted: Used by researchers, engineers, and data scientists across academia, industry, and institutions like CERN.


Overview of the Analysis

The project is structured into a series of Jupyter notebooks, each building on the previous one to triangulate Mars' orbit and verify Kepler’s laws.  

Click on the notebook links below to explore the details of each step.

  1. Notebook Generating Mars Ephemeris

    Generate the measurements of Mars' celestial positions

    • Data is key for the success of this analysis, Kepler used Ticho Brahe's data, we are going to use NASA JPL's DE421 ephemeris via the Skyfield library to generate accurate planetary positions over a period of 12 Martian years (approximately 22 Earth years), starting from January 1, 2000.

    • Determine the ecliptic longitude of Mars and the Sun in the plane of Earth's orbit.Filters out observations where Mars is obscured by the Sun.

    • Save the filtered ephemeris data into a CSV file (ephemeris_mars_sun.csv).

    • Key attributes in the saved data are: Date, Mars Ecliptic Longitude (deg), Sun Ecliptic Longitude (deg)
  2. Notebook Key Insight of Kepler's Analysis

    Understanding how Earth-based observations reveal Mars’ trajectory.

    • Mars completes one full revolution around the Sun in 687 days (one Mars year). During this period, Earth occupies a different position in its orbit at each observation. By selecting measurements taken exactly one Mars year apart, we capture Mars' apparent position from varied vantage points. With enough observations over several Mars years, these multiple perspectives enable us to triangulate the position of Mars.


    • Figure 1, Triangulating Mars' Position:
      • Select observations spaced 687 days apart (one Mars year) so that Mars is observed at nearly the same position relative to the Sun for each measurement.

      • For each observation, compute Earth's position in the ecliptic and derive Mars' line-of-sight vectors.
      • Apply least-squares estimation to solve for Mars' ecliptic coordinates.
  3. Notebook Computing Mars' Orbit

    Calculate Mars orbit by triangulating Mars' position using all available observations.

    • Load the dataset (line_of_sight_mars_from_earth.csv) with Mars and Sun observations, notably the following fields: Date, Mars Ecliptic Longitude (deg), and Sun Ecliptic Longitude (deg).Computes Mars' heliocentric coordinates and estimates its orbit.

    • Generalized Triangulation

      • For each start date within the first Mars year, iterate through subsequent measurements at 687-day intervals (one Mars year), so that Mars is observed at nearly the same position relative to the Sun for each measurement.
      • Triangulate Mars' position from the accumulated data when at least two valid measurements are available.
      • Gracefully handle missing data and singular matrices to ensure robust estimation.
    • Compile the computed Mars positions into a results DataFrame and save the results to a CSV file (computed_Mars_orbit.csv) for further analysis.
  4. Notebook Kepler’s Laws

    Verifying Kepler’s three laws with real data.

    • Figure2: Demonstrate Kepler's First Law by fitting an elliptical model to confirm Mars’ orbit is an ellipse with the Sun at one focus. The fitted parameters match accepted values, notable eccentricity e ~ 0.09 and semi-major axis a ~ 1.52 AU.

    • Second Law: Demonstrate that Mars sweeps out equal areas in equal time intervals using the measured values of Mars' orbit.

    • Third Law: Validate the harmonic law by comparing the ratio T^2/a^3 for Mars and Earth.

  5. Notebook Estimating Earth's Orbit

    Use Mars' ephemeris and line-of-sight data to determine Earth’s orbit.

    • Earth Position Computation:

      • For each selected observation, compute Earth's heliocentric position by solving for the Earth-Sun distance using the observed Sun and Mars ecliptic longitudes and the estimated Mars position (found in notebook 3 of this series "Compute Mars Orbit")
      • Utilize a numerical solver (via fsolve) to ensure that the computed Earth position yields the correct LOS angle towards Mars.
    • Fits Earth’s computed positions to an elliptical model and compares the results with accepted astronomical values.

    • Visualizes Earth’s orbit alongside the positions of Mars and the Sun. 



Conclusion

Kepler’s groundbreaking work reshaped our understanding of planetary motion, and today, we can revisit his analysis with modern computational tools. By combining Jupyter Notebooks, Python’s scientific libraries, and AI-assisted coding, we demonstrate how complex data analysis can be performed efficiently and interactively.

This project serves as an example of how AI and open-source tools empower researchers, educators, and enthusiasts to explore scientific discoveries with greater ease and depth.


👉 Check out the full project and try the notebooks yourself! GitHub Repository



References

This work is directly inspired by Terence Tao's project Climbing the Cosmic Distance Ladder. In particular see the two-part video series with Grant Sanderson (3Blue1Brown): Part 1 and Part 2

Further details on Kepler's analysis can be found in Tao's draft book chapter Chapter 4: Fourth Rung - The PlanetsDownload here

Another insightful video on Kepler’s discoveries is How the Bizarre Path of Mars Reshaped Astronomy [Kepler's Laws Part 2] by Welch Labs.

Mars-Orbit-Workshop contains material to conduct a workshop recreating Kepler's analysis.

The original work of Kepler was published in Astronomia Nova (New Astronomy) in 1609. The book is available on archive.org. See for example this link to chapter 42 of Astronomia Nova 

Figure 3: An illustration from Chapter 42 of Astronomia Nova (1609) by Kepler, depicting the key concept of triangulating Mars' position using observations taken 687 days apart (one Martian year). This is the original version of Figures 1 and 2 in this post.



Acknowledgements

This work has been conducted in the context of the Databases and Analytics activities at CERN, in particular I'd like to thank my colleagues in the SWAN (Service for Web-based ANalysis) team.

Friday, April 26, 2024

Building an Apache Spark Performance Lab: Tools and Techniques for Spark Optimization

Apache Spark is renowned for its speed and efficiency in handling large-scale data processing. However, optimizing Spark to achieve maximum performance requires a precise understanding of its inner workings. This blog post will guide you through establishing a Spark Performance Lab with essential tools and techniques aimed at enhancing Spark performance through detailed metrics analysis.

Why a Spark Performance Lab

The purpose of a Spark Performance Lab isn't just to measure the elapsed time of your Spark jobs but to understand the underlying performance metrics deeply. By using these metrics, you can create models that explain what's happening within Spark's execution and identify areas for improvement. Here are some key reasons to set up a Spark Performance Lab:

  • Hands-on learning and testing: A controlled lab setting allows for safer experimentation with Spark configurations and tuning and also experimenting and understanding the monitoring tools and Spark-generated metrics.
  • Load and scale: Our lab uses a workload generator, running TPCDS queries. This is a well-known set of complex queries that is representative of OLAP workloads, and that can easily be scaled up for testing from GBs to 100s of TBs.
  • Improving your toolkit: Having a toolbox is invaluable, however you need to practice and understand their output in a sandbox environment before moving to production.
  • Get value from the Spark metric system: Instead of focusing solely on how long a job takes, use detailed metrics to understand the performance and spot inefficiencies.

Tools and Components

In our Spark Performance Lab, several key tools and components form the backbone of our testing and monitoring environment:

  • Workload generator: 
    • We use a custom tool, TPCDS-PySpark, to generate a consistent set of queries (TPCDS benchmark), creating a reliable testing framework.
  • Spark instrumentation: 
    • Spark’s built-in Web UI for initial metrics and job visualization.
  • Custom tools:
    • SparkMeasure: Use this for detailed performance metrics collection.
    • Spark-Dashboard: Use this to monitor Spark jobs and visualize key performance metrics.

Additional tools for Performance Measurement include:

Demos

These quick demos and tutorials will show you how to use the tools in this Spark Performance Lab. You can follow along and get the same results on your own, which will help you start learning and exploring.


Figure 1: The graph illustrates the dynamic task allocation in a Spark application during a TPCDS 10TB benchmark on a YARN cluster with 256 cores. It showcases the variability in the number of active tasks over time, highlighting instances of execution "long tails" and straggler tasks, as seen in the periodic spikes and troughs.

How to Make the Best of Spark Metrics System

Understanding and utilizing Spark's metrics system is crucial for optimization:
  • Importance of Metrics: Metrics provide insights beyond simple timing, revealing details about task execution, resource utilization, and bottlenecks.

  • Execution Time is Not Enough: Measuring the execution time of a job (how long it took to run it), is useful, but it doesn’t show the whole picture. Say the job ran in 10 seconds. It's crucial to understand why it took 10 seconds instead of 100 seconds or just 1 second. What was slowing things down? Was it the CPU, data input/output, or something else, like data shuffling? This helps us identify the root causes of performance issues.

  • Key Metrics to Collect:
    • Executor Run Time: Total time executors spend processing tasks.
    • Executor CPU Time: Direct CPU time consumed by tasks.
    • JVM GC Time: Time spent in garbage collection, affecting performance.
    • Shuffle and I/O Metrics: Critical for understanding data movement and disk interactions.
    • Memory Metrics: Key for performance and troubleshooting Out Of Memory errors.

  • Metrics Analysis, what to look for:
    • Look for bottlenecks: are there resources that are the bottleneck? Are the jobs running mostly on CPU or waiting for I/O or spending a lot of time on Garbage Collection?
    • USE method: Utilization Saturation and Errors (USE) Method is a methodology for analyzing the performance of any system. 
      • The tools described here can help you to measure and understand Utilization and Saturation.
    • Can your job use a significant fraction of the available CPU cores? 
      • Examine the measurement of  the actual number of active tasks vs. time.
      • Figure 1 shows the number of active tasks measured while running TPCDS 10TB on a YARN cluster, with 256 cores allocated. The graph shows spikes and troughs.
      • Understand the root causes of the troughs using metrics and monitoring data. The reasons can be many: resource allocation, partition skew, straggler tasks, stage boundaries, etc.
    • Which tool should I use?
      • Start with using the Spark Web UI
      • Instrument your jobs with sparkMesure. This is recommended early in the application development, testing, and for Continuous Integration (CI) pipelines.
      • Observe your Spark application execution profile with Spark-Dashboard.
      • Use available tools with OS metrics too. See also Spark-Dashboard extended instrumentation: it collects and visualizes OS metrics (from cgroup statistics) like network stats, etc
    • Drill down:
Figure 2: This technical drawing outlines the integrated monitoring pipeline for Apache Spark implemented by Spark-Dashboard using open-source components. The flow of the diagram illustrates the Spark metrics source and the components used to store and visualize the metrics.


Lessons Learned and Conclusions

From setting up and running a Spark Performance Lab, here are some key takeaways:

  • Collect, analyze and visualize metrics: Go beyond just measuring jobs' execution times to troubleshoot and fine-tune Spark performance effectively.
  • Use the Right Tools: Familiarize yourself with tools for performance measurement and monitoring.
  • Start Small, Scale Up: Begin with smaller datasets and configurations, then gradually scale to test larger, more complex scenarios.
  • Tuning is an Iterative Process: Experiment with different configurations, parallelism levels, and data partitioning strategies to find the best setup for your workload.

Establishing a Spark Performance Lab is a fundamental step for any data engineer aiming to master Spark's performance aspects. By integrating tools like Web UI, TPCDS_PySpark, sparkMeasure, and Spark-Dashboard, developers and data engineers can gain unprecedented insights into Spark operations and optimizations. 

Explore this lab setup to turn theory into expertise in managing and optimizing Apache Spark. Learn by doing and experimentation!

Acknowledgements: A special acknowledgment goes out to the teams behind the CERN data analytics, monitoring, and web notebook services, as well as the dedicated members of the ATLAS database group.


Tuesday, January 30, 2024

Enhancing Apache Spark and Parquet Efficiency: A Deep Dive into Column Indexes and Bloom Filters

In the ever-evolving landscape of big data, Apache Spark and Apache Parquet continue to introduce game-changing features. Their latest updates have brought forward significant enhancements, including column indexes, bloom filters. This blog post delves into these new features, exploring their applications and benefits. This post is based on the extended notes at:


Key Advantages of Parquet in Spark

This is not an introductory article, however here is a quick recap of why you may want to spend time learning more about Apache Parquet and Spark. Parquet is a columnar storage file format optimized for use with data processing frameworks like Apache Spark. It offers efficient data compression and encoding schemes.

  • Parquet is a columnar format enabling efficient data storage and retrieval
  • It supports compression and encoding
  • Optimizations in Spark for Parquet include:
    • Vectorized Parquet reader
    • Filter push down capabilities
    • Enhanced support for partitioning and handling large files

Another key aspect of Parquet with Spark that are important to know for the following is:
  • Row Group Organization: Parquet files consist of one or more 'row groups,' typically sized around 128 MB, although this is adjustable.
  • Parallel Processing Capabilities: Both Spark and other engines can process Parquet data in parallel, leveraging the row group level or the file level for enhanced efficiency.
  • Row Group Statistics: Each row group holds vital statistics like minimum and maximum values, counts, and the number of nulls. These stats enable the 'skip data' feature when filters are applied, essentially serving as a zone map to optimize query performance.

ORC: For a comparison of Apache Parquet with another popular data format, Apache ORC, refer to Parquet-ORC Comparison.


Understanding Column Indexes and Bloom Filters in Parquet

Column Indexes: Enhancing Query Efficiency

Column indexes, introduced in Parquet 1.11 and utilized in Spark 3.2.0 and higher, offer a fine-grained approach to data filtering. These indexes store min and max values at the Parquet page level, allowing Spark to efficiently execute filter predicates at a much finer granularity than the default 128 MB row group size. Particularly effective for sorted data, column indexes can drastically reduce the data read from disk, improving query performance.

Bloom Filters: A Leap in Filter Operations

Parquet 1.12 (utilized by Spark 3.2.0 and higher) introduced Bloom filters, a probabilistic data structure that efficiently determines whether an element is in a set. They are particularly useful for columns with high cardinality and scenarios where filter operations are based on values likely to be absent from the dataset. Using bloom filters can lead to significant improvements in read performance.


Example: Spark using Parquet column indexes

Test dataset and preparation

  • The Parquet test file used below parquet112file_sorted is extracted from the TPCDS benchmark table web_sales
  • the table (parquet file) contains data sorted on the column ws_sold_time_sk
  • it's important that the data is sorted, this groups together values in the filter column "ws_sold_time_sk", if the values are scattered the column index min-max statistics will have a wide range and will not be able to help with skipping data
  • the sorted dataset has been created using spark.read.parquet("path + "web_sales_piece.parquet").sort("ws_sold_time_sk").coalesce(1).write.parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")
  • Download the test data:

Run the tests

  1. Fast (reads only 20k rows):
    Spark will read the Parquet using a filter and makes use of column and offset indexes:
bin/spark-shell

val path = "./" 
val df = spark.read.parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")

// Read the file using a filter, this will use column and offset indexes
val q1 = df.filter("ws_sold_time_sk=28801")
val plan = q1.queryExecution.executedPlan
q1.collect

// Use Spark metrics to see how many rows were processed
// This is also available for the WebUI in graphical form
val metrics = plan.collectLeaves().head.metrics
metrics("numOutputRows").value

res: Long = 20000

The result shows that only 20000 rows were processed, this corresponds to processing just a few pages, as opposed to reading and processing the entire file. This is made possible by the use of the min-max value statistics in the column index for column ws_sold_time_sk.
Column indexes are created by default in Spark version 3.2.x and higher.


  1. Slow (reads 2M rows):
    Same as above, but this time we disable the use of column indexes.
    Note this is also what happens if you use Spark versions prior to Spark 3.2.0 (notably Spark 2.x) to read the file.
bin/spark-shell

val path = "./"
// disable the use of column indexes for testing purposes
val df = spark.read.option("parquet.filter.columnindex.enabled","false").parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")

val q1 = df.filter("ws_sold_time_sk=28801")
val plan = q1.queryExecution.executedPlan
q1.collect

// Use Spark metrics to see how many rows were processed
val metrics = plan.collectLeaves().head.metrics
metrics("numOutputRows").value

res: Long = 2088626

The result is that all the rows in the row group (2088626 rows in the example) were read as Spark could not push the filter down to the Parquet page level. This example runs more slowly than the example below and in general performs more work (uses more CPU cycles and reads more data from the filesystem).


Diagnostics and Internals of Column and Offset Indexes

Column indexes in Parquet are key structures designed to optimize filter performance during data reads. They are particularly effective for managing and querying large datasets.

Key Aspects of Column Indexes:

  • Purpose and Functionality: Column indexes offer statistical data (minimum and maximum values) at the page level, facilitating efficient filter evaluation and optimization.
  • Default Activation: By default, column indexes are enabled to ensure optimal query performance.
  • Granularity Insights: While column indexes provide page-level statistics, similar statistics are also available at the row group level. Typically, a row group is approximately 128MB, contrasting with pages usually around 1MB.
  • Customization Options: Both rowgroup and page sizes are configurable, offering flexibility to tailor data organization. For further details, see Parquet Configuration Options.

Complementary Role of Offset Indexes:

  • Association with Column Indexes: Offset indexes work in tandem with column indexes and are stored in the file's footer in Parquet versions 1.11 and above.
  • Scanning Efficiency: A key benefit of these indexes is their role in data scanning. When filters are not applied in Parquet file scans, the footers with column index data can be efficiently skipped, enhancing the scanning process.

Additional Resources:

For an in-depth explanation of column and offset indexes in Parquet, consider reading this detailed description.

The integration of column and offset indexes significantly improves Parquet's capability in efficiently handling large-scale data, especially in scenarios involving filtered reads. Proper understanding and utilization of these indexes can lead to marked performance improvements in data processing workflows.


Tools to drill down on column index metadata in Parquet files

  • parquet-cli

    • example: hadoop jar target/parquet-cli-1.13.1-runtime.jar org.apache.parquet.cli.Main column-index -c ws_sold_time_sk <path>/my_parquetfile
    • More details on how to use parquet-cli at Tools for Parquet Diagnostics
  • Example with the Java API from Spark-shell

    // customize with the file path and name
    val fullPathUri = java.net.URI.create("<path>/myParquetFile")
    
    // crate a Hadoop input file and opens it with ParquetFileReader
    val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(fullPathUri), spark.sessionState.newHadoopConf())
    val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
    
    // Get the Parquet file version
    pf.getFooter.getFileMetaData.getCreatedBy
    
    // columns index
    val columnIndex = pf.readColumnIndex(columns.get(0))
    columnIndex.toString.foreach(print)
    
    // offset index
    pf.readOffsetIndex(columns.get(0))
    print(pf.readOffsetIndex(columns.get(0)))
    

The output on a column that is sorted looks like:

row-group 0:
column index for column ws_sold_time_sk:
Boudary order: ASCENDING
                      null count  min                                       max
page-0                        45  29                                        12320
page-1                         0  12320                                     19782
page-2                         0  19782                                     26385
page-3                         0  26385                                     31758
page-4                         0  31758                                     36234
page-5                         0  36234                                     40492
page-6                         0  40492                                     44417
page-7                         0  44417                                     47596
page-8                         0  47596                                     52972
page-9                         0  52972                                     58388
page-10                        0  58388                                     62482
page-11                        0  62482                                     65804
page-12                        0  65804                                     68647
page-13                        0  68647                                     71299
page-14                        0  71303                                     74231
page-15                        0  74231                                     77978
page-16                        0  77978                                     85712
page-17                        0  85712                                     86399

offset index for column ws_sold_time_sk:
                          offset   compressed size       first row index
page-0                     94906              4759                     0
page-1                     99665              4601                 20000
page-2                    104266              4549                 40000
page-3                    108815              4415                 60000
page-4                    113230              4343                 80000
page-5                    117573              4345                100000
page-6                    121918              4205                120000
page-7                    126123              3968                140000
page-8                    130091              4316                160000
page-9                    134407              4370                180000
page-10                   138777              4175                200000
page-11                   142952              4012                220000
page-12                   146964              3878                240000
page-13                   150842              3759                260000
page-14                   154601              3888                280000
page-15                   158489              4048                300000
page-16                   162537              4444                320000
page-17                   166981               200                340000


Bloom filters in Parquet

With the release of Parquet 1.12, there's now the capability to generate and store Bloom filters within the file footer's metadata. This addition significantly enhances query performance for specific filtering operations. Bloom filters are especially advantageous in the following scenarios:

High Cardinality Columns: They effectively address the limitations inherent in using Parquet dictionaries for columns with a vast range of unique values.

Absent Value Filtering: Bloom filters are highly efficient for queries that filter based on values likely to be missing from the table or DataFrame. This efficiency stems from the characteristic of Bloom filters where false positives (erroneously concluding that a non-existent value is present) are possible, but false negatives (failing to identify an existing value) do not occur.

For a comprehensive understanding and technical details of implementing Bloom filters in Apache Parquet, refer to the official documentation on bloom filters in Apache Parquet


Configuration

Important configurations for writing bloom filters in Parquet files are:

.option("parquet.bloom.filter.enabled","true") // write bloom filters for all columns, default is false
.option("parquet.bloom.filter.enabled#column_name", "true") // write bloom filter for the given column
.option("parquet.bloom.filter.expected.ndv#column_name", num_values) // tuning for bloom filters, ndv = number of distinct values
.option("parquet.bloom.filter.max.bytes", 1024*1024) // The maximum number of bytes for a bloom filter bitset, default 1 MB


Write Parquet files with Bloom filters

This is an example of how to read a Parquet file without bloom filter (for example because it had been created with an older version of Spark/Parquet) and add the bloom filter, with additional tuning of the bloom filter parameters for one of the columns:

val df = spark.read.parquet("<path>/web_sales")
df.coalesce(1).write.option("parquet.bloom.filter.enabled","true").option("parquet.bloom.filter.expected.ndv#ws_sold_time_sk", 25000).parquet("<myfilepath")


Example: Checking I/O Performance in Parquet: With and Without Bloom Filters

Understanding the impact of using bloom filters on I/O performance during Parquet file reads can be important for optimizing data processing. This example outlines the steps to compare I/O performance when reading Parquet files, both with and without the utilization of bloom filters.

This example uses Parquet bloom filters to improve Spark read performance

1. Prepare the test table


bin/spark-shell
val numDistinctVals=1e6.toInt
val df=sql(s"select id, int(random()*100*$numDistinctVals) randomval from range($numDistinctVals)")
val path = "./"

// Write the test DataFrame into a Parquet file with a Bloom filter
df.coalesce(1).write.mode("overwrite").option("parquet.bloom.filter.enabled","true").option("parquet.bloom.filter.enabled#randomval", "true").option("parquet.bloom.filter.expected.ndv#randomval", numDistinctVals).parquet(path + "spark320_test_bloomfilter")

// Write the same DataFrame in Parquet, but this time without Bloom filters 
df.coalesce(1).write.mode("overwrite").option("parquet.bloom.filter.enabled","false").parquet(path + "spark320_test_bloomfilter_nofilter")

// use the OS (ls -l) to compare the size of the files with bloom filter and without
// in my test (Spark 3.5.0, Parquet 1.13.1) it was 10107275 with bloom filter and 8010077 without

:quit

2. Read data using the Bloom filter, for improved performance

bin/spark-shell

val path = "./"
val df =spark.read.option("parquet.filter.bloom.enabled","true").parquet(path + "spark320_test_bloomfilter")
val q1 = df.filter("randomval=1000000") // filter for a value that is not in the file
q1.collect

// print I/O metrics
org.apache.hadoop.fs.FileSystem.printStatistics()

// Output
FileSystem org.apache.hadoop.fs.RawLocalFileSystem: 1091611 bytes read, ...

:quit

3. Read disabling the Bloom filter (this will read more data from the filesystem and have worse performance)

bin/spark-shell

val path = "./"
val df =spark.read.option("parquet.filter.bloom.enabled","false").parquet(path + "spark320_test_bloomfilter")
val q1 = df.filter("randomval=1000000") // filter for a value that is not in the file
q1.collect

// print I/O metrics
org.apache.hadoop.fs.FileSystem.printStatistics()

// Output
FileSystem org.apache.hadoop.fs.RawLocalFileSystem: 8299656 bytes read, ...


Reading Parquet Bloom Filter Metadata with Apache Parquet Java API

To extract metadata about the bloom filter from a Parquet file using the Apache Parquet Java API in spark-shell, follow these steps:

 

  1. Initialize the File Path: define the full path of your Parquet file
    bin/spark-shell
    val fullPathUri = java.net.URI.create("<my_file_path>")
  2. Create Input File: utilize HadoopInputFile to create an input file from the specified path
    val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(
             new org.apache.hadoop.fs.Path(fullPathUri), 
             spark.sessionState.newHadoopConf()
             )
    
     
  3. Open Parquet File Reader: open the Parquet file reader for the input file
    val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
    
     
  4. Retrieve Blocks and Columns: extract the blocks from the file footer and then get the columns from the first block
    val blocks = pf.getFooter.getBlocks
    val columns = blocks.get(0).getColumns
    
     
  5. Read Bloom Filter: finally, read the bloom filter from the first column
    val bloomFilter = pf.readBloomFilter(columns.get(0))
    bloomFilter.getBitsetSize
     

By following these steps, you can successfully read the bloom filter metadata from a Parquet file using the Java API in the spark-shell environment.

Discovering Parquet Version

The Parquet file format is constantly evolving, incorporating additional metadata to support emerging features. Each Parquet file embeds the version information within its metadata, reflecting the Parquet version used during its creation.

Importance of Version Awareness:

Compatibility Considerations: When working with Parquet files generated by older versions of Spark and its corresponding Parquet library, it's important to be aware that certain newer features may not be supported. For instance, column indexes, which are available in the Spark DataFrame Parquet writer from version 3.2.0, might not be present in files created with older versions.

Upgrading for Enhanced Features: Upon upgrading your Spark version, it's beneficial to also update the metadata in existing Parquet files. This update allows you to utilize the latest features introduced in newer versions of Parquet.

Checking the Parquet File Version:

The following sections will guide you on how to check the Parquet version used in your files, ensuring that you can effectively manage and upgrade your Parquet datasets. This format provides a structured and detailed approach to understanding and managing Parquet file versions, emphasizing the importance of version compatibility and the process of upgrading.

  • Details at Tools for Parquet Diagnostics

  • parquet-cli

    • example: hadoop jar parquet-cli/target/parquet-cli-1.13.1-runtime.jar org.apache.parquet.cli.Main meta <path>/myParquetFile
  • Hadoop API ...

    • example of using Hadoop API from the spark-shell CLI
    // customize with the file path and name
    val fullPathUri = java.net.URI.create("<path>/myParquetFile")
     
    // crate a Hadoop input file and opens it with ParquetFileReader
    val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(fullPathUri), spark.sessionState.newHadoopConf())
    val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
    
    // Get the Parquet file version
    pf.getFooter.getFileMetaData.getCreatedBy
    
    // Info on file metadata
    print(pf.getFileMetaData)
    print(pf.getRowGroups)
    
  • Spark extension library
    The spark-extension library allows to query Parquet metadata using Apache Spark.
    Example:

    bin/spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5
    
    import uk.co.gresearch.spark.parquet._
    spark.read.parquetMetadata("...path..").show()
    spark.read.parquetBlockColumns(...path..").show()
    

Updating Parquet File Versions

Upgrading your Parquet files to a newer version can be achieved by copying them using a more recent version of Spark. This section covers the steps to convert your Parquet files to an updated version.

Conversion Method:

Using Recent Spark Versions: To update Parquet files, read them with a newer version of Spark and then save them again. This process effectively updates the files to the Parquet version used by that Spark release.
For instance, using Spark 3.5.0 will allow you to write files in Parquet version 1.13.1.

Approach Note: This method is somewhat brute-force as there isn't a direct mechanism solely for upgrading Parquet metadata.

Practical Example: Copying and converting Parquet version by reading and re-writing, applied to the TPCDS benchmark:

bin/spark-shell --master yarn --driver-memory 4g --executor-memory 50g --executor-cores 10 --num-executors 20 --conf spark.sql.shuffle.partitions=400

val inpath="/project/spark/TPCDS/tpcds_1500_parquet_1.10.1/"
val outpath="/project/spark/TPCDS/tpcds_1500_parquet_1.13.1/"
val compression_type="snappy" // may experiment with "zstd"

// we need to do this in two separate groups: partitioned and non-partitioned tables

// copy the **partitioned tables** of the TPCDS benchmark
// compact each directory into 1 file with repartition
val tables_partition=List(("catalog_returns","cr_returned_date_sk"), ("catalog_sales","cs_sold_date_sk"), ("inventory","inv_date_sk"), ("store_returns","sr_returned_date_sk"), ("store_sales","ss_sold_date_sk"), ("web_returns","wr_returned_date_sk"), ("web_sales","ws_sold_date_sk"))
for (t <- tables_partition) {
  println(s"Copying partitioned table $t")
  spark.read.parquet(inpath + t._1).repartition(col(t._2)).write.partitionBy(t._2).mode("overwrite").option("compression", compression_type).parquet(outpath + t._1)
}

// copy non-partitioned tables of the TPCDS benchmark
// compact each directory into 1 file with repartition
val tables_nopartition=List("call_center","catalog_page","customer","customer_address","customer_demographics","date_dim","household_demographics","income_band","item","promotion","reason","ship_mode","store","time_dim","warehouse","web_page","web_site")
for (t <- tables_nopartition) {
  println(s"Copying table $t")
  spark.read.parquet(inpath + t).coalesce(1).write.mode("overwrite").option("compression", compression_type).parquet(outpath + t)
}

Conclusions

Apache Spark and Apache Parquet continue to innovate and are constantly upping their game in big data. They've rolled out cool features like column indexes and bloom filters, really pushing the envelope on speed and efficiency. It's a smart move to keep your Spark updated, especially to Spark 3.x or newer, to get the most out of these perks. Also, don’t forget to give your Parquet files a quick refresh to the latest format – the blog post has got you covered with a how-to. Staying on top of these updates is key to keeping your data game strong!

I extend my deepest gratitude to my colleagues at CERN for their invaluable guidance and support. A special acknowledgment goes out to the teams behind the CERN data analytics, monitoring, and web notebook services, as well as the dedicated members of the ATLAS database team.

Further details on the topics covered here can be found at: