TL;DR: When your database “feels slow,” metrics are key for investigations, however relying on averages may mislead you. That's when wait latency histograms and heat map visualization can help you understand where and how wait time is spent and fix what actually matters.
I recently chased an Oracle performance issue where most reads were sub-millisecond (cache), but a thin band around ~10 ms (spindles) dominated total wait time. Classic bimodal latency: the fast band looked fine in averages, yet the rare slow band owned the delay.
To investigate, and prove it, I refreshed two of my old tools:
OraLatencyMap (SQL*Plus script): samples Oracle’s microsecond wait-event histograms and renders two terminal heat maps with wait event latency details over time
PyLatencyMap(Python): a general latency heat-map visualizer that reads record-oriented histogram streams from Oracle scripts, BPF/bcc, SystemTap, DTrace, trace files, etc.
Both now have fresh releases with minor refactors and dependency checks.
Why heat maps for wait latency?
Latency data naturally forms histograms (bucketed counts by latency). That’s a 2D view. But performance evolves over time, adding a third dimension (latency × time × magnitude). A heat map projects this 3D story onto your terminal so you can spot patterns at a glance.
Why two heat maps?
Frequency heat map answers: Are many calls slow? (events/sec)
Intensity heat map answers: Are a few slow calls consuming most time? (time/sec)
In my case, frequency showed a bright <1 ms band (healthy) while intensity lit up at ~10 ms (spindles). Rare tail, real culprit.
Reading the canvas
Y-axis = latency buckets (displayed in ms).
X-axis = time, newest at the right.
Top = Frequency (events/sec). Bottom = Intensity (time/sec).
Look for bands (stable tiers), streaks (bursts), and hot tails (small but expensive).
Example output from OraLatencyMap v1.3
This example shows the latency heatmap measured and displayed with OraLatencyMap for the db file sequential read event.
The system is experiencing a bimodal latency distribution, indicating two distinct latency patterns:
Reads from fast storage (SSD) with latency < 1 ms (visible in the Frequency Heatmap, blue area).
Reads from slower storage (spinning disks) with latency ≈ 10 ms (visible in the Intensity Heatmap, yellow-red areas).
Quick starts
Oracle (OraLatencyMap)
OraLatencyMap is a command-line script/widget for Oracle databases.
Prerequisites • SQL*Plus • Privileges: SELECT on GV$EVENT_HISTOGRAM_MICRO and EXECUTE on DBMS_LOCK
Example (focus on single-block reads, 5s sampling, 11 bins, 100 columns, only inst_id=1): SQL> @OraLatencyMap_advanced 5 "db file sequential read" 11 100 "and inst_id=1"
Parameter notes • <interval_s> — sampling interval in seconds • <event_name> — Oracle wait event to analyze (quoted) • <bins> — number of latency buckets • <columns> — width (time axis) of the heat map • <where_clause> — optional extra filter (e.g., RAC: and inst_id=1)
Which wait events to start with? • db file sequential read — typical for random single-block I/O; good starting point for read latency issues. • log file sync — measures commit latency; use when users report slow commits.
Requirements: SQL*Plus; SELECT on GV$EVENT_HISTOGRAM_MICRO and EXECUTE on DBMS_LOCK..
Linux & everything else (PyLatencyMap)
PyLatencyMap is a tool running in Python. It is a terminal-based visualizer for latency histograms. It’s intended to help with performance tuning and troubleshooting. It renders two scrolling heat maps—Frequency and Intensity—so you can see how latency distributions evolve over time. Works from the command line and plays nicely with sources that output latency histograms (Oracle wait histograms,
BPF/bcc, DTrace, SystemTap, tracefiles, etc.).
Suspect Oracle waits? Start with OraLatencyMap on db file sequential read or log file sync.
Need cross-stack visibility (OS/storage/trace)? Use PyLatencyMap and feed it histograms from your favorite tracer.
See it, prove it, and fix it
Bottom line: If you only look at averages, you may miss complex behaviors like multimodal I/O (fast I/O with a slow tail). Heat maps also show you the time evolution of the latency patterns. With two small tools and two heat maps, you can see it, prove it, and fix it.
The ATLAS Detector Control System (DCS) at CERN is
essential for ensuring optimal detector performance. Each year, the
system generates tens of billions of time-stamped sensor readings,
presenting considerable challenges for large-scale data analysis.
Although these data are stored in Oracle databases that
excel in real-time transactional processing, the
configuration—optimized with limited CPU resources to manage licensing
costs—makes them less suited for extensive historical time-series analysis.
To overcome these challenges, a modern data pipeline has been developed that leverages Apache Spark, CERN’s Hadoop service, and the Service for Web-based Analysis
(SWAN) platform. This scalable, high-performance framework enables
researchers to efficiently process and analyze DCS data over extended
periods, unlocking valuable insights into detector operations. By
integrating advanced big data technologies, the new system enhances
performance monitoring, aids in troubleshooting Data Acquisition (DAQ)
link failures, and supports predictive maintenance, thereby ensuring the
continued reliability of the ATLAS detector systems.
Figure 1: Overview of the Big Data architecture for Detector Control System (DCS) data analysis. The
system integrates data from Oracle databases (including DCS, luminosity, and run information) and
file-based metadata and mappings into the Hadoop ecosystem using Parquet files. Apache Spark serves
as the core processing engine, enabling scalable analysis within an interactive environment powered by
Jupyter notebooks on CERN SWAN. Reproduced with permission from Advancing ATLAS DCS Data Analysis with a Modern Data Platform.
Data Storage in Oracle Databases
The ATLAS Detector Control System (DCS) data is primarily stored in Oracle databases using a commercial product, the WinCC OA system, optimized for real-time monitoring and transactional operations. Each detector’s data is managed within dedicated database schemas, ensuring structured organization and efficient access.
At the core of this storage model is the EVENTHISTORY table, a high-volume repository that records sensor IDs, timestamps, and measurement values across thousands of monitoring channels. This table grows rapidly, exceeding one billion rows annually, requiring advanced partitioning strategies to facilitate efficient data access. To improve performance, range partitioning is implemented, segmenting the table into smaller, manageable partitions based on predefined time intervals, such as monthly partitions.
Since direct querying of this vast dataset for large-scale analysis can impose a heavy load on the production Oracle systems, a read-only replica copy, is used as the data source for many data querying use cases and for data extraction into CERN’s Hadoop-based analytics platform. This approach ensures that the primary database remains unaffected by analytical workloads, allowing detector experts to access and process historical data efficiently without impacting real-time operations.
Leveraging CERN’s Hadoop Service
To address the challenges of handling large-scale DCS data analysis, CERN’s Hadoop cluster, Analytix, provides a scalable and high-performance infrastructure tailored for parallelized computation and distributed storage. With over 1,400 physical cores and 20 PB of distributed storage, it enables efficient ingestion, processing, and querying of massive datasets.
Currently, approximately 3 TB of DCS data—representing 30% of the total available records—has been migrated into the Hadoop ecosystem, covering data from 2022 onward. Data extraction is performed via Apache Spark, leveraging the Spark JDBC connector to read from the read-only Oracle replica. Daily import jobs incrementally update the core EVENTHISTORY table, appending new records without reprocessing the entire dataset. Smaller, less dynamic tables undergo full replacements to maintain consistency.
For optimized storage and performance, all ingested data is converted to Apache Parquet format, a columnar storage system designed for high-speed analytical queries. The dataset is partitioned by day, enabling partition pruning—a technique that allows queries to efficiently filter relevant time slices, significantly reducing query execution times. The system can use Spark's parallel processing to rapidly process queries that target billions of individual data rows, completing such operations in just a few seconds and making it an ideal solution for correlation studies, anomaly detection, and long-term trend analysis of detector performance.
This modern data pipeline integrates seamlessly with CERN’s Jupyter notebooks service (SWAN), providing detector experts with a Python-based interactive environment for exploratory data analysis, visualization, and machine learning applications. The combination of Apache Spark, Parquet, and Hadoop enables the scalable processing of DCS data, facilitating key analyses such as monitoring DAQ link instabilities, tracking high-voltage performance, and diagnosing hardware failures in the ATLAS New Small Wheel (NSW) detector.
The Role of Apache Spark
Apache Spark plays a pivotal role in transforming how this data is accessed and analyzed. The Spark-based data pipeline extracts data from a read-only replica of the primary production database, ensuring minimal disruption to live operations. Using JDBC connectivity, Spark jobs are scheduled to run daily, incrementally updating Parquet files stored in CERN’s Hadoop cluster.
Key optimizations include:
Partitioning: Data is partitioned by day to facilitate faster querying and improved storage efficiency.
Incremental Updates: Only new data is ingested daily, preventing redundant processing.
Columnar Storage with Parquet: Apache Parquet enables efficient data retrieval, reducing query execution time and storage costs.
Extracting Data from Oracle using Apache Spark
Apache Spark plays a pivotal role in transforming how this data is accessed and analyzed. The Spark-based data pipeline extracts data from a read-only replica of the primary production database, ensuring minimal disruption to live operations. Using JDBC connectivity, Spark jobs are scheduled to run daily, incrementally updating Parquet files stored in CERN’s Hadoop cluster.
Below is an example of how to create a Spark DataFrame that reads from an Oracle table using JDBC:
For more details on writing data to Parquet with Spark, see this note.
Analysis Framework: A User-Friendly Approach
Apache Spark as the Core Processing Engine
The Apache Spark ecosystem allows for seamless querying and processing of vast datasets. Spark DataFrames and Spark SQL APIs offer a familiar and flexible interface for data manipulation, similar to Pandas for Python users. By enabling distributed computation, Spark ensures that billions of rows can be processed within seconds.
Benefits of Spark in the ATLAS DCS framework:
Scalability and Performance: Spark efficiently uses the available cores on each node and distributes workloads across multiple nodes.
Powerful APIs: Spark natively uses the DataFrame API and also makes available the SQL language, both provide for powerful and expressive APIs to boost performance.
Fault Tolerance: Spark has a proven architecture that provides automatic recovery and retries from many type of failures in a distributed environment.
Platform integration with Jupyter notebooks and Spark
Front-end analysis is conducted via Jupyter notebooks on the CERN’s SWAN platform, offering researchers an interactive and intuitive interface. Key capabilities include:
Spark integration: A dedicated component, the Spark Connector, abstracts the complexities of Spark configuration, ensuring seamless interaction with the Hadoop ecosystem.
Python environment and Dynamic Visualization: The platform harnesses the robust Python ecosystem for data processing, enabling the dynamic creation of tables, charts, and plots.
Data Integration: Seamless connectivity to diverse data sources—including Oracle databases and web services—simplifies the integration process, providing comprehensive access to all relevant data.
Figure 2: Analysis of ATLAS Detector Control System (DCS) data using Python and Apache Spark. The figure highlights specific elements of the ATLAS New Small Wheel (NSW) MicroMegas (MMG) subdetector that exhibited unstable behavior, prompting further investigation. This visualization was generated using a modern data platform that integrates Jupyter notebooks, CERN’s Hadoop service, and Spark-based analytics. The approach enables large-scale processing and efficient troubleshooting of detector performance. Reproduced with permission from Advancing ATLAS DCS Data Analysis with a Modern Data Platform.
Future Enhancements
To further optimize scalability, performance, and analytical capabilities, we are exploring several key improvements:
Kubernetes for Spark Orchestration: Moving from a Hadoop-based cluster to Kubernetes-managed Spark deployments will streamline resource allocation, optimize workload scheduling, and enable dynamic scaling during peak analysis periods. This transition also facilitates a smoother shift toward cloud-based architectures.
Cloud Storage Solutions: We are evaluating cloud-based storage options such as Amazon S3, which would further ease migration to a cloud environment and enhance data accessibility and scalability.
Advanced Data Formats: We are considering the adoption of modern data formats like Apache Iceberg and Delta Lake. These formats offer improved data ingestion workflows, better query performance and support for evolving data schemas, and enhanced data management capabilities in general.
Machine Learning and AI Integration: Leveraging GPU resources available on CERN’s SWAN platform will enable advanced machine learning techniques for predictive analytics, anomaly detection, and automated troubleshooting. This integration aims to identify detector inefficiencies and potential failures in real time, ultimately improving operational reliability and reducing downtime.
These enhancements aim to future-proof the DCS data analysis framework, ensuring it remains a highly efficient, scalable, and adaptable platform for ongoing and future ATLAS detector operations.
Conclusion
The integration of Apache Spark with CERN’s Hadoop infrastructure and CERN's Notebook service, has significantly enhanced ATLAS DCS data processing and analysis, by enabling a scalable, high-performance, and user-friendly platform. This framework empowers researchers to extract meaningful insights, enhance detector performance monitoring, and streamline troubleshooting processes, significantly improving operational efficiency. As the project continues to evolve, the adoption of cloud-based storage, Kubernetes orchestration, and AI-driven analytics will further enhance the platform’s capabilities supporting the needs of the scientific and engineering community.
Acknowledgements and Links
This work is based on the article Advancing ATLAS DCS Data Analysis with a Modern Data Platform by Luca Canali, Andrea Formica and Michelle Solis. Many thanks to our ATLAS colleagues, in particular from the ADAM (Atlas Data and Metadata) team and ATLAS DCS. Special thanks to the CERN Databases and Data Analytics group for their help and support with Oracle, Hadoop, SWAN and Spark services.
Johannes Kepler’s analysis of Mars’ orbit stands as one of the greatest achievements in scientific history, revealing the elliptical nature of planetary paths and establishing the foundational laws of planetary motion. In this post, you will explore how you can recreate Kepler’s revolutionary findings using Python’s robust data science ecosystem.
Our goal is not to produce a specialized scientific paper but to provide a clear, interactive, and visually appealing demonstration suitable for a broad audience. Python libraries like NumPy, Pandas, SciPy, and Matplotlib, provide an efficient environment for numerical computations, data manipulation, and visualization. Jupyter Notebooks further enhance this process by providing an interactive and user-friendly platform to run code, visualize results, and document your insights clearly. Additionally, AI-assisted coding significantly simplifies technical tasks such as ellipse fitting, data interpolation, and creating insightful visualizations. This integration allows us to focus more on understanding the insights behind Kepler’s discoveries, making complex analyses accessible and engaging.
In this post, we’ll explore how you can recreate Kepler’s revolutionary findings using Python’s robust data science ecosystem. Our goal is not to produce a specialized scientific paper but to provide a clear, interactive, and visually appealing demonstration suitable for a broad audience.
Python libraries like NumPy, Pandas, SciPy, and Matplotlib, provide an efficient environment for numerical computations, data manipulation, and visualization. Jupyter Notebooks further enhance this process by providing an interactive and user-friendly platform to run code, visualize results, and document your insights clearly.
Additionally, AI-assisted coding significantly simplifies technical tasks such as ellipse fitting, data interpolation, and creating insightful visualizations. This integration allows us to focus more on understanding the insights behind Kepler’s discoveries, making complex analyses accessible and engaging.
This project showcases:
A structured approach to data analysis using a handful of short Jupyter Notebooks.
How Python’s ecosystem (NumPy, Pandas, SciPy, Matplotlib) facilitates computational research.
The benefits of AI-assisted coding in accelerating development and improving workflow efficiency.
An interactive, visually engaging reproduction of Kepler’s findings.
Jupyter Notebooks and AI-Assisted Coding: A Powerful Combination for Data Science
Jupyter Notebooks have become the standard environment for data science, offering an interactive and flexible platform for scientific computing. They can be run on local machines or cloud services such as Google Colab, Amazon SageMaker, IBM Watson Studio, Microsoft Azure, GitHub Codesopaces, Databricks, etc. CERN users can also run the notebooks on the CERN-hosted Jupyter notebooks service SWAN (Service for Web-based ANalysis), a widely popular service used by engineers and physicists across CERN for large-scale scientific analysis.
How Python and AI Tools Enhance This Project
Data Interpolation & Curve Fitting: Python libraries like SciPy and AI-assisted tools help generate optimal curve fits in seconds.
Plotting & Visualization: AI-driven code completion and Matplotlib make it easier and faster to generate plots.
Error Handling & Debugging: AI suggestions help identify and fix errors quickly, improving workflow efficiency.
Exploring Alternative Approaches: AI can suggest different computational methods, allowing for a more robust and exploratory approach to the analysis.
Generate the measurements of Mars' celestial positions
Data is key for the success of this analysis, Kepler used Ticho Brahe's data, we are going to use NASA JPL's DE421 ephemeris via the Skyfield library to generate accurate planetary positions over a period of 12 Martian years (approximately 22 Earth years), starting from January 1, 2000.
Determine the ecliptic longitude of Mars and the Sun in the plane of Earth's orbit.Filters out observations where Mars is obscured by the Sun.
Save the filtered ephemeris data into a CSV file (ephemeris_mars_sun.csv).
Key attributes in the saved data are: Date, Mars Ecliptic Longitude (deg), Sun Ecliptic Longitude (deg)
Understand how Earth-based observations reveal Mars’ trajectory
Mars completes one full revolution around the Sun in 687 days (one Mars year). During this period, Earth occupies a different position in its orbit at each observation. By selecting measurements taken exactly one Mars year apart,
we capture Mars' apparent position from varied vantage points. With
enough observations over several Mars years, these multiple perspectives
enable us to triangulate the position of Mars.
Figure 1, Triangulating Mars' Position:
Select observations spaced 687 days apart (one Mars year) so that Mars is observed at nearly the same position relative to the Sun for each measurement.
For each observation, compute Earth's position in the ecliptic and derive Mars' line-of-sight vectors.
Apply least-squares estimation to solve for Mars' ecliptic coordinates.
Calculate Mars orbit by triangulating Mars' position using all available observations.
Load the dataset (line_of_sight_mars_from_earth.csv) with
Mars and Sun observations, notably the following fields: Date, Mars
Ecliptic Longitude (deg), and Sun Ecliptic Longitude (deg).Computes Mars' heliocentric coordinates and estimates its orbit.
Generalized Triangulation
For each start date within the first Mars year, iterate through
subsequent measurements at 687-day intervals (one Mars year), so that
Mars is observed at nearly the same position relative to the Sun for
each measurement.
Triangulate Mars' position from the accumulated data when at least two valid measurements are available.
Gracefully handle missing data and singular matrices to ensure robust estimation.
Compile the computed Mars positions into a results DataFrame and save the results to a CSV file (computed_Mars_orbit.csv) for further analysis.
Figure2: Demonstrate Kepler's First Law by fitting an elliptical model to confirm Mars’ orbit is an ellipse with the Sun at one focus. The fitted parameters match accepted values, notable eccentricity e ~ 0.09 and semi-major axis a ~ 1.52 AU.
Second Law: Demonstrate that Mars sweeps out equal areas in equal time intervals using the measured values of Mars' orbit.
Third Law: Validate the harmonic law by comparing the ratio T^2/a^3 for Mars and Earth.
Use Mars' ephemeris and line-of-sight data to determine Earth’s orbit
Earth Position Computation:
For each selected observation, compute Earth's heliocentric position
by solving for the Earth-Sun distance using the observed Sun and Mars
ecliptic longitudes and the estimated Mars position (found in notebook 3
of this series "Compute Mars Orbit")
Utilize a numerical solver (via fsolve) to ensure that the computed Earth position yields the correct LOS angle towards Mars.
Fits Earth’s computed positions to an elliptical model and compares the results with accepted astronomical values.
Visualizes Earth’s orbit alongside the positions of Mars and the Sun.
Conclusion
Kepler’s groundbreaking work reshaped our understanding of planetary motion, and today, we can revisit his analysis with modern computational tools. By combining Jupyter Notebooks, Python’s scientific libraries, and AI-assisted coding, we demonstrate how complex data analysis can be performed efficiently and interactively.
This project serves as an example of how AI and open-source tools empower researchers, educators, and enthusiasts to explore scientific discoveries with greater ease and depth.
👉 Check out the full project and try the notebooks yourself! GitHub Repository
References
This work is directly inspired by Terence Tao's project Climbing the Cosmic Distance Ladder. In particular see the two-part video series with Grant Sanderson (3Blue1Brown): Part 1 and Part 2
Further details on Kepler's analysis can be found in Tao's draft book chapter Chapter 4: Fourth Rung - The Planets: Download here
Mars-Orbit-Workshop contains material to conduct a workshop recreating Kepler's analysis.
The original work of Kepler was published in Astronomia Nova (New Astronomy) in 1609. The book is available on archive.org. See for example this link to chapter 42 of Astronomia Nova
Figure 3: An illustration from Chapter 42 of Astronomia Nova (1609) by Kepler, depicting the key concept of triangulating Mars' position using observations taken 687 days apart (one Martian year). This is the original version of Figures 1 and 2 in this post.
Acknowledgements
This work has been conducted in the context of the Databases and Analytics activities at CERN, in particular I'd like to thank my colleagues in the SWAN(Service for Web-based ANalysis) team.
Apache Spark is renowned for its speed and efficiency in handling large-scale data processing. However, optimizing Spark to achieve maximum performance requires a precise understanding of its inner workings. This blog post will guide you through establishing a Spark Performance Lab with essential tools and techniques aimed at enhancing Spark performance through detailed metrics analysis.
Why a Spark Performance Lab
The purpose of a Spark Performance Lab isn't just to measure the elapsed time of your Spark jobs but to understand the underlying performance metrics deeply. By using these metrics, you can create models that explain what's happening within Spark's execution and identify areas for improvement. Here are some key reasons to set up a Spark Performance Lab:
Hands-on learning and testing: A controlled lab setting allows for safer experimentation with Spark configurations and tuning and also experimenting and understanding the monitoring tools and Spark-generated metrics.
Load and scale: Our lab uses a workload generator, running TPCDS queries. This is a well-known set of complex queries that is representative of OLAP workloads, and that can easily be scaled up for testing from GBs to 100s of TBs.
Improving your toolkit: Having a toolbox is invaluable, however you need to practice and understand their output in a sandbox environment before moving to production.
Get value from the Spark metric system: Instead of focusing solely on how long a job takes, use detailed metrics to understand the performance and spot inefficiencies.
Tools and Components
In our Spark Performance Lab, several key tools and components form the backbone of our testing and monitoring environment:
Workload generator:
We use a custom tool, TPCDS-PySpark, to generate a consistent set of queries (TPCDS benchmark), creating a reliable testing framework.
Spark instrumentation:
Spark’s built-in Web UIfor initial metrics and job visualization.
Custom tools:
SparkMeasure: Use this for detailed performance metrics collection.
Spark-Dashboard: Use this to monitor Spark jobs and visualize key performance metrics.
Additional tools for Performance Measurement include:
These quick demos and tutorials will show you how to use the tools in this Spark Performance Lab. You can follow along and get the same results on your own, which will help you start learning and exploring.
Figure 1: The graph illustrates the dynamic task allocation in a Spark application during a TPCDS 10TB benchmark on a YARN cluster with 256 cores. It showcases the variability in the number of active tasks over time, highlighting instances of execution "long tails" and straggler tasks, as seen in the periodic spikes and troughs.
How to Make the Best of Spark Metrics System
Understanding and utilizing Spark's metrics system is crucial for optimization:
Importance of Metrics: Metrics provide insights beyond simple timing, revealing details about task execution, resource utilization, and bottlenecks.
Execution Time is Not Enough: Measuring the execution time of a job (how long it took to run it), is useful, but it doesn’t show the whole picture. Say the job ran in 10 seconds. It's crucial to understand why it took 10 seconds instead of 100 seconds or just 1 second. What was slowing things down? Was it the CPU, data input/output, or something else, like data shuffling? This helps us identify the root causes of performance issues.
Key Metrics to Collect:
Executor Run Time: Total time executors spend processing tasks.
Executor CPU Time: Direct CPU time consumed by tasks.
JVM GC Time: Time spent in garbage collection, affecting performance.
Shuffle and I/O Metrics: Critical for understanding data movement and disk interactions.
Memory Metrics: Key for performance and troubleshooting Out Of Memory errors.
Metrics Analysis, what to look for:
Look for bottlenecks: are there resources that are the bottleneck? Are the jobs running mostly on CPU or waiting for I/O or spending a lot of time on Garbage Collection?
USE method: Utilization Saturation and Errors (USE) Method is a methodology for analyzing the performance of any system.
The tools described here can help you to measure and understand Utilization and Saturation.
Can your job use a significant fraction of the available CPU cores?
Examine the measurement of the actual number of active tasks vs. time.
Figure 1 shows the number of active tasks measured while running TPCDS 10TB on a YARN cluster, with 256 cores allocated. The graph shows spikes and troughs.
Understand the root causes of the troughs using metrics and monitoring data. The reasons can be many: resource allocation, partition skew, straggler tasks, stage boundaries, etc.
Which tool should I use?
Start with using the Spark Web UI
Instrument your jobs with sparkMesure. This is recommended early in the application development, testing, and for Continuous Integration (CI) pipelines.
Observe your Spark application execution profile with Spark-Dashboard.
Use available tools with OS metrics too. See also Spark-Dashboard extended instrumentation: it collects and visualizes OS metrics (from cgroup statistics) like network stats, etc
For those interested in delving deeper into Spark instrumentation and metrics, the Spark documentation offers a comprehensive guide.
SparkMeasure: This tool captures metrics directly from Spark’s instrumentation via the Listener Bus. For a detailed understanding of how it operates, refer to the SparkMeasure architecture. It specifically gathers data from Spark's Task Metrics System, which you can explore further here.
Figure 2: This technical drawing outlines the integrated monitoring pipeline for Apache Spark implemented by Spark-Dashboard using open-source components. The flow of the diagram illustrates the Spark metrics source and the components used to store and visualize the metrics.
Lessons Learned and Conclusions
From setting up and running a Spark Performance Lab, here are some key takeaways:
Collect, analyze and visualize metrics: Go beyond just measuring jobs' execution times to troubleshoot and fine-tune Spark performance effectively.
Use the Right Tools: Familiarize yourself with tools for performance measurement and monitoring.
Start Small, Scale Up: Begin with smaller datasets and configurations, then gradually scale to test larger, more complex scenarios.
Tuning is an Iterative Process: Experiment with different configurations, parallelism levels, and data partitioning strategies to find the best setup for your workload.
Establishing a Spark Performance Lab is a fundamental step for any data engineer aiming to master Spark's performance aspects. By integrating tools like Web UI, TPCDS_PySpark, sparkMeasure, and Spark-Dashboard, developers and data engineers can gain unprecedented insights into Spark operations and optimizations.
Explore this lab setup to turn theory into expertise in managing and optimizing Apache Spark. Learn by doing and experimentation!
Acknowledgements: A special acknowledgment goes out to the teams behind the CERN data
analytics, monitoring, and web notebook services, as well as the
dedicated members of the ATLAS database group.
Resources
To get started with the tools mentioned in this blog:
In the ever-evolving landscape of big data, Apache Spark and Apache Parquet continue to introduce game-changing features. Their latest updates have brought forward significant enhancements, including column indexes, bloom filters. This blog post delves into these new features, exploring their applications and benefits. This post is based on the extended notes at:
This is not an introductory article, however here is a quick recap of why you may want to spend time learning more about Apache Parquet and Spark. Parquet is a columnar storage file format optimized for use with data processing frameworks like Apache Spark. It offers efficient data compression and encoding schemes.
Parquet is a columnar format enabling efficient data storage and retrieval
It supports compression and encoding
Optimizations in Spark for Parquet include:
Vectorized Parquet reader
Filter push down capabilities
Enhanced support for partitioning and handling large files
Another key aspect of Parquet with Spark that are important to know for the following is:
Row Group Organization: Parquet files consist of one or more 'row groups,' typically sized around 128 MB, although this is adjustable.
Parallel Processing Capabilities: Both Spark and other engines can process Parquet data in parallel, leveraging the row group level or the file level for enhanced efficiency.
Row Group Statistics: Each row group holds vital statistics like minimum and maximum values, counts, and the number of nulls. These stats enable the 'skip data' feature when filters are applied, essentially serving as a zone map to optimize query performance.
ORC: For a comparison of Apache Parquet with another popular data format, Apache ORC, refer to Parquet-ORC Comparison.
Understanding Column Indexes and Bloom Filters in Parquet
Column Indexes: Enhancing Query Efficiency
Column indexes, introduced in Parquet 1.11 and utilized in Spark 3.2.0 and higher, offer a fine-grained approach to data filtering. These indexes store min and max values at the Parquet page level, allowing Spark to efficiently execute filter predicates at a much finer granularity than the default 128 MB row group size. Particularly effective for sorted data, column indexes can drastically reduce the data read from disk, improving query performance.
Bloom Filters: A Leap in Filter Operations
Parquet 1.12 (utilized by Spark 3.2.0 and higher) introduced Bloom filters, a probabilistic data structure that efficiently determines whether an element is in a set. They are particularly useful for columns with high cardinality and scenarios where filter operations are based on values likely to be absent from the dataset. Using bloom filters can lead to significant improvements in read performance.
Example: Spark using Parquet column indexes
Test dataset and preparation
The Parquet test file used below parquet112file_sorted is extracted from the TPCDS benchmark table web_sales
the table (parquet file) contains data sorted on the column ws_sold_time_sk
it's important that the data is sorted, this groups together values in the filter column "ws_sold_time_sk", if the values are scattered the column index min-max statistics will have a wide range and will not be able to help with skipping data
the sorted dataset has been created using spark.read.parquet("path + "web_sales_piece.parquet").sort("ws_sold_time_sk").coalesce(1).write.parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")
Download the test data:
Retrieve the test data using wget, a web browser, or any method of your choosing
Fast (reads only 20k rows): Spark will read the Parquet using a filter and makes use of column and offset indexes:
bin/spark-shell
val path = "./"
val df = spark.read.parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")
// Read the file using a filter, this will use column and offset indexes
val q1 = df.filter("ws_sold_time_sk=28801")
val plan = q1.queryExecution.executedPlan
q1.collect
// Use Spark metrics to see how many rows were processed
// This is also available for the WebUI in graphical form
val metrics = plan.collectLeaves().head.metrics
metrics("numOutputRows").value
res: Long = 20000
The result shows that only 20000 rows were processed, this corresponds to processing just a few pages, as opposed to reading and processing the entire file. This is made possible by the use of the min-max value statistics in the column index for column ws_sold_time_sk. Column indexes are created by default in Spark version 3.2.x and higher.
Slow (reads 2M rows): Same as above, but this time we disable the use of column indexes. Note this is also what happens if you use Spark versions prior to Spark 3.2.0 (notably Spark 2.x) to read the file.
bin/spark-shell
val path = "./"
// disable the use of column indexes for testing purposes
val df = spark.read.option("parquet.filter.columnindex.enabled","false").parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")
val q1 = df.filter("ws_sold_time_sk=28801")
val plan = q1.queryExecution.executedPlan
q1.collect
// Use Spark metrics to see how many rows were processed
val metrics = plan.collectLeaves().head.metrics
metrics("numOutputRows").value
res: Long = 2088626
The result is that all the rows in the row group (2088626 rows in the example) were read as Spark could not push the filter down to the Parquet page level. This example runs more slowly than the example below and in general performs more work (uses more CPU cycles and reads more data from the filesystem).
Diagnostics and Internals of Column and Offset Indexes
Column indexes in Parquet are key structures designed to optimize filter performance during data reads. They are particularly effective for managing and querying large datasets.
Key Aspects of Column Indexes:
Purpose and Functionality: Column indexes offer statistical data (minimum and maximum values) at the page level, facilitating efficient filter evaluation and optimization.
Default Activation: By default, column indexes are enabled to ensure optimal query performance.
Granularity Insights: While column indexes provide page-level statistics, similar statistics are also available at the row group level. Typically, a row group is approximately 128MB, contrasting with pages usually around 1MB.
Customization Options: Both rowgroup and page sizes are configurable, offering flexibility to tailor data organization. For further details, see Parquet Configuration Options.
Complementary Role of Offset Indexes:
Association with Column Indexes: Offset indexes work in tandem with column indexes and are stored in the file's footer in Parquet versions 1.11 and above.
Scanning Efficiency: A key benefit of these indexes is their role in data scanning. When filters are not applied in Parquet file scans, the footers with column index data can be efficiently skipped, enhancing the scanning process.
Additional Resources:
For an in-depth explanation of column and offset indexes in Parquet, consider reading this detailed description.
The integration of column and offset indexes significantly improves Parquet's capability in efficiently handling large-scale data, especially in scenarios involving filtered reads. Proper understanding and utilization of these indexes can lead to marked performance improvements in data processing workflows.
Tools to drill down on column index metadata in Parquet files
parquet-cli
example: hadoop jar target/parquet-cli-1.13.1-runtime.jar org.apache.parquet.cli.Main column-index -c ws_sold_time_sk <path>/my_parquetfile
// customize with the file path and name
val fullPathUri = java.net.URI.create("<path>/myParquetFile")
// crate a Hadoop input file and opens it with ParquetFileReader
val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(fullPathUri), spark.sessionState.newHadoopConf())
val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
// Get the Parquet file version
pf.getFooter.getFileMetaData.getCreatedBy
// columns index
val columnIndex = pf.readColumnIndex(columns.get(0))
columnIndex.toString.foreach(print)
// offset index
pf.readOffsetIndex(columns.get(0))
print(pf.readOffsetIndex(columns.get(0)))
With the release of Parquet 1.12, there's now the capability to generate and store Bloom filters within the file footer's metadata. This addition significantly enhances query performance for specific filtering operations. Bloom filters are especially advantageous in the following scenarios:
High Cardinality Columns: They effectively address the limitations inherent in using Parquet dictionaries for columns with a vast range of unique values.
Absent Value Filtering: Bloom filters are highly efficient for queries that filter based on values likely to be missing from the table or DataFrame. This efficiency stems from the characteristic of Bloom filters where false positives (erroneously concluding that a non-existent value is present) are possible, but false negatives (failing to identify an existing value) do not occur.
Important configurations for writing bloom filters in Parquet files are:
.option("parquet.bloom.filter.enabled","true") // write bloom filters for all columns, default is false
.option("parquet.bloom.filter.enabled#column_name", "true") // write bloom filter for the given column
.option("parquet.bloom.filter.expected.ndv#column_name", num_values) // tuning for bloom filters, ndv = number of distinct values
.option("parquet.bloom.filter.max.bytes", 1024*1024) // The maximum number of bytes for a bloom filter bitset, default 1 MB
Write Parquet files with Bloom filters
This is an example of how to read a Parquet file without bloom filter (for example because it had been created with an older version of Spark/Parquet) and add the bloom filter, with additional tuning of the bloom filter parameters for one of the columns:
val df = spark.read.parquet("<path>/web_sales")
df.coalesce(1).write.option("parquet.bloom.filter.enabled","true").option("parquet.bloom.filter.expected.ndv#ws_sold_time_sk", 25000).parquet("<myfilepath")
Example: Checking I/O Performance in Parquet: With and Without Bloom Filters
Understanding the impact of using bloom filters on I/O performance during Parquet file reads can be important for optimizing data processing. This example outlines the steps to compare I/O performance when reading Parquet files, both with and without the utilization of bloom filters.
This example uses Parquet bloom filters to improve Spark read performance
1. Prepare the test table
bin/spark-shell
val numDistinctVals=1e6.toInt
val df=sql(s"select id, int(random()*100*$numDistinctVals) randomval from range($numDistinctVals)")
val path = "./"
// Write the test DataFrame into a Parquet file with a Bloom filter
df.coalesce(1).write.mode("overwrite").option("parquet.bloom.filter.enabled","true").option("parquet.bloom.filter.enabled#randomval", "true").option("parquet.bloom.filter.expected.ndv#randomval", numDistinctVals).parquet(path + "spark320_test_bloomfilter")
// Write the same DataFrame in Parquet, but this time without Bloom filters
df.coalesce(1).write.mode("overwrite").option("parquet.bloom.filter.enabled","false").parquet(path + "spark320_test_bloomfilter_nofilter")
// use the OS (ls -l) to compare the size of the files with bloom filter and without
// in my test (Spark 3.5.0, Parquet 1.13.1) it was 10107275 with bloom filter and 8010077 without
:quit
2. Read data using the Bloom filter, for improved performance
bin/spark-shell
val path = "./"
val df =spark.read.option("parquet.filter.bloom.enabled","true").parquet(path + "spark320_test_bloomfilter")
val q1 = df.filter("randomval=1000000") // filter for a value that is not in the file
q1.collect
// print I/O metrics
org.apache.hadoop.fs.FileSystem.printStatistics()
// Output
FileSystem org.apache.hadoop.fs.RawLocalFileSystem: 1091611 bytes read, ...
:quit
3. Read disabling the Bloom filter (this will read more data from the filesystem and have worse performance)
bin/spark-shell
val path = "./"
val df =spark.read.option("parquet.filter.bloom.enabled","false").parquet(path + "spark320_test_bloomfilter")
val q1 = df.filter("randomval=1000000") // filter for a value that is not in the file
q1.collect
// print I/O metrics
org.apache.hadoop.fs.FileSystem.printStatistics()
// Output
FileSystem org.apache.hadoop.fs.RawLocalFileSystem: 8299656 bytes read, ...
Reading Parquet Bloom Filter Metadata with Apache Parquet Java API
To extract metadata about the bloom filter from a Parquet file using the Apache Parquet Java API in spark-shell, follow these steps:
Initialize the File Path: define the full path of your Parquet file
Create Input File: utilize HadoopInputFile to create an input file from the specified path
val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(
new org.apache.hadoop.fs.Path(fullPathUri),
spark.sessionState.newHadoopConf()
)
Open Parquet File Reader: open the Parquet file reader for the input file
val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
Retrieve Blocks and Columns: extract the blocks from the file footer and then get the columns from the first block
val blocks = pf.getFooter.getBlocks
val columns = blocks.get(0).getColumns
Read Bloom Filter: finally, read the bloom filter from the first column
val bloomFilter = pf.readBloomFilter(columns.get(0))
bloomFilter.getBitsetSize
By following these steps, you can successfully read the bloom filter metadata from a Parquet file using the Java API in the spark-shell environment.
Discovering Parquet Version
The Parquet file format is constantly evolving, incorporating additional metadata to support emerging features. Each Parquet file embeds the version information within its metadata, reflecting the Parquet version used during its creation.
Importance of Version Awareness:
Compatibility Considerations: When working with Parquet files generated by older versions of Spark and its corresponding Parquet library, it's important to be aware that certain newer features may not be supported. For instance, column indexes, which are available in the Spark DataFrame Parquet writer from version 3.2.0, might not be present in files created with older versions.
Upgrading for Enhanced Features: Upon upgrading your Spark version, it's beneficial to also update the metadata in existing Parquet files. This update allows you to utilize the latest features introduced in newer versions of Parquet.
Checking the Parquet File Version:
The following sections will guide you on how to check the Parquet version used in your files, ensuring that you can effectively manage and upgrade your Parquet datasets. This format provides a structured and detailed approach to understanding and managing Parquet file versions, emphasizing the importance of version compatibility and the process of upgrading.
example: hadoop jar parquet-cli/target/parquet-cli-1.13.1-runtime.jar org.apache.parquet.cli.Main meta <path>/myParquetFile
Hadoop API ...
example of using Hadoop API from the spark-shell CLI
// customize with the file path and name
val fullPathUri = java.net.URI.create("<path>/myParquetFile")
// crate a Hadoop input file and opens it with ParquetFileReader
val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(fullPathUri), spark.sessionState.newHadoopConf())
val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
// Get the Parquet file version
pf.getFooter.getFileMetaData.getCreatedBy
// Info on file metadata
print(pf.getFileMetaData)
print(pf.getRowGroups)
Spark extension library The spark-extension library allows to query Parquet metadata using Apache Spark. Example:
Upgrading your Parquet files to a newer version can be achieved by copying them using a more recent version of Spark. This section covers the steps to convert your Parquet files to an updated version.
Conversion Method:
Using Recent Spark Versions: To update Parquet files, read them with a newer version of Spark and then save them again. This process effectively updates the files to the Parquet version used by that Spark release. For instance, using Spark 3.5.0 will allow you to write files in Parquet version 1.13.1.
Approach Note: This method is somewhat brute-force as there isn't a direct mechanism solely for upgrading Parquet metadata.
Practical Example: Copying and converting Parquet version by reading and re-writing, applied to the TPCDS benchmark:
bin/spark-shell --master yarn --driver-memory 4g --executor-memory 50g --executor-cores 10 --num-executors 20 --conf spark.sql.shuffle.partitions=400
val inpath="/project/spark/TPCDS/tpcds_1500_parquet_1.10.1/"
val outpath="/project/spark/TPCDS/tpcds_1500_parquet_1.13.1/"
val compression_type="snappy" // may experiment with "zstd"
// we need to do this in two separate groups: partitioned and non-partitioned tables
// copy the **partitioned tables** of the TPCDS benchmark
// compact each directory into 1 file with repartition
val tables_partition=List(("catalog_returns","cr_returned_date_sk"), ("catalog_sales","cs_sold_date_sk"), ("inventory","inv_date_sk"), ("store_returns","sr_returned_date_sk"), ("store_sales","ss_sold_date_sk"), ("web_returns","wr_returned_date_sk"), ("web_sales","ws_sold_date_sk"))
for (t <- tables_partition) {
println(s"Copying partitioned table $t")
spark.read.parquet(inpath + t._1).repartition(col(t._2)).write.partitionBy(t._2).mode("overwrite").option("compression", compression_type).parquet(outpath + t._1)
}
// copy non-partitioned tables of the TPCDS benchmark
// compact each directory into 1 file with repartition
val tables_nopartition=List("call_center","catalog_page","customer","customer_address","customer_demographics","date_dim","household_demographics","income_band","item","promotion","reason","ship_mode","store","time_dim","warehouse","web_page","web_site")
for (t <- tables_nopartition) {
println(s"Copying table $t")
spark.read.parquet(inpath + t).coalesce(1).write.mode("overwrite").option("compression", compression_type).parquet(outpath + t)
}
Conclusions
Apache Spark and Apache Parquet continue to innovate and are constantly upping their game in big data. They've rolled out cool features like column indexes and bloom filters, really pushing the envelope on speed and efficiency. It's a smart move to keep your Spark updated, especially to Spark 3.x or newer, to get the most out of these perks. Also, don’t forget to give your Parquet files a quick refresh to the latest format – the blog post has got you covered with a how-to. Staying on top of these updates is key to keeping your data game strong!
I extend my deepest gratitude to my colleagues at CERN for their invaluable guidance and support. A special acknowledgment goes out to the teams behind the CERN data analytics, monitoring, and web notebook services, as well as the dedicated members of the ATLAS database team.
Further details on the topics covered here can be found at: