Wednesday, October 1, 2025

Why I’m Loving Spark 4’s Python Data Source (and How It Makes ROOT Easy)

TL;DR: Apache Spark 4 lets you build first-class data sources in pure Python. If your reader yields Arrow RecordBatch objects, Spark ingests them with reduced Python↔JVM serialization overhead. I used this to ship a ROOT dataformat reader for PySpark.

A PySpark reader for the ROOT format

ROOT is the de-facto data format across High-Energy Physics; CERN experiments alone store more than 1 exabytes of data in ROOT files. With Spark 4’s Python API, building a ROOT reader becomes a focused Python exercise: parse with Uproot/Awkward, produce a PyArrow Table, yield RecordBatch, and let Spark do the rest.


Yield arrow batches, skip the row-by-row serialization tax

Spark 4, introduces a Python datasource, which makes it simple perform data ingestion into Spark via Python libraries. However, the vanilla version has to yield Python tuples/Rows, incurring per-row Python/JVM hops. With direct Arrow batch support introduced in SPARK-48493, your DataSourceReader.read() can yield pyarrow.RecordBatch objects. Spark ingests them in batch using Arrow, avoiding that row-by-row overhead and considerably reducing the time spent across the boundary Python-JVM.


ROOT → Arrow → Spark in a few moving parts

Here’s the essence of the ROOT data format reader (the writer is not needed at moment and it is left for future work) I implemented and packaged in pyspark-root-datasource:

  1. Subclass the DataSource and DataSourceReader from pyspark.sql.datasource.

  2. In read(partition), produce pyarrow.RecordBatch objects.

  3. Register your source with spark.dataSource.register(...), then spark.read.format("root")....

Minimal, schematic code (pared down for clarity):

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition class RootDataSource(DataSource): @classmethod def name(cls): return "root" def schema(self): # Return a Spark schema (string or StructType) or infer in the reader. return "nMuon int, Muon_pt array<float>, Muon_eta array<float>" def reader(self, schema): return RootReader(schema, self.options) class RootReader(DataSourceReader): def __init__(self, schema, options): self.schema = schema self.path = options.get("path") self.tree = options.get("tree", "Events") self.step_size = int(options.get("step_size", 1_000_000)) def partitions(self): # Example: split work into N partitions N = 8 return [InputPartition(i) for i in range(N)] def read(self, partition): import uproot, awkward as ak, pyarrow as pa start = partition.index * self.step_size stop = start + self.step_size with uproot.open(self.path) as f: arrays = f[self.tree].arrays(entry_start=start, entry_stop=stop, how=ak.Array) table = ak.to_arrow_table(arrays, list_to32=True) for batch in table.to_batches(): # <-- yield Arrow RecordBatches directly yield batch


Why this felt easy

  • Tiny surface area: one DataSource, one DataSourceReader, and a read() generator. The Spark docs walk through the exact hooks and options: Apache Spark

  • Python-native stack: I could leverage existing Python libraries for reading the ROOT format and for processing Arrow, Uproot + Awkward + PyArrow, with no additional Scala/Java code.

  • Arrow batches = speed: SPARK-48493 wires Arrow batches into the reader path to avoid per-row conversion. issues.apache.org


Practical tips

  • Provide a schema if you can. It enables early pruning and tighter I/O; the Spark guide covers schema handling and type conversions.

  • Partitioning matters. ROOT TTrees can be large and jagged: tune step_size to balance task parallelism and batch size. (The reader exposes options for this.)

  • Arrow knobs. For very large datasets, controlling Arrow batch sizes can improve downstream pipeline behavior; see Spark’s Arrow notes.


“Show me the full thing”

If you want a working reader with options for local files, directories, globs, and the XRootD protocol (root://), check out the package at: pyspark-root-datasource

Quick start

1. Install

pip install pyspark-root-datasource

2. Grab an example ROOT file:

wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/Run2012BC_DoubleMuParked_Muons.root

3. Read with PySpark (Python code):

from pyspark.sql import SparkSession from pyspark_root_datasource import register spark = SparkSession.builder.appName("ROOT via PySpark").getOrCreate() register(spark) # registers "root" format schema = "nMuon int, Muon_pt array<float>, Muon_eta array<float>, Muon_phi array<float>" df = (spark.read.format("root") .schema(schema) .option("path", "Run2012BC_DoubleMuParked_Muons.root") .option("tree", "Events") .option("step_size", "1000000") .load()) df.printSchema() df.show(3, truncate=False) df.count()


A tiny physics “hello world”

Once the ROOT data is loaded, you can compute a dimuon mass spectrum from CERN Open Data in a few lines, a smoke test and a familiar HEP demo: Run a basic Physics analysis using CERN Opendata, link to the Notebook


The dimuon mass spectrum analysis, a "Hello World!" example for data analysis in High Energy Physics. This image is the output of a analysis using CERN Opendata and the pyspark-root-datasource.



Beyond HEP: a general recipe

The same approach works for any niche or binary format:

  1. Read bytes with your Python parser.

  2. Build a PyArrow Table.

  3. Yield RecordBatch objects from read().

Spark handles distribution, schema, projection, joins, writes, so “we should write a connector” becomes a focused Python task rather than a long JVM project.


Performance note

Python data sources still cross the Python↔JVM boundary. Direct Arrow batches greatly reduce the serialization tax and improve throughput, but if you need absolute peak performance, a native JVM DataSource V2 (like Laurelin for ROOT) will usually win. Choose based on your team’s skills and performance envelope. spark.apache.org+1

Limitations

This is a Python DataSource. Even with direct Arrow RecordBatch support, it won’t match a native Scala/Java V2 connector for raw throughput or advanced pushdown. Projection and predicate pushdown are limited. Schema inference on large TTrees is slow and may widen types—prefer explicit schemas. ROOT quirks apply: unsigned integers must be cast to Spark’s signed types, and deeply nested or jagged arrays may need options like extension arrays to stay practical. Watch memory: oversized Arrow batches or partitions can OOM executors—tune arrow_max_chunksizestep_size, and partition counts. XRootD works but depends on your client and network; misconfigured deps (uprootawkwardpyarrowfsspec-xrootd) are common failure points. Finally, the project is read-only (no writer), not streaming, and multi-file datasets with inconsistent schemas may require pre-validation or per-file options.

Pointers & references



    Acknowledgements

    Many thanks to ATLAS colleagues, especially the ADAM (Atlas Data and Metadata) team, and to the CERN Databases and Data Analytics group for their support with Hadoop, Spark, and the SWAN hosted notebook services. We are also grateful to Allison Wang for assistance with SPARK-48493, to Jim Pivarski for guidance on using Uproot and Awkward Array.


    No comments:

    Post a Comment