TL;DR: Apache Spark 4 lets you build first-class data sources in pure Python. If your reader yields Arrow RecordBatch
objects, Spark ingests them with reduced Python↔JVM serialization overhead. I used this to ship a ROOT dataformat reader for PySpark.
A PySpark reader for the ROOT format
ROOT is the de-facto data format across High-Energy Physics; CERN experiments alone store more than 1 exabytes of data in ROOT files. With Spark 4’s Python API, building a ROOT reader becomes a focused Python exercise: parse with Uproot/Awkward, produce a PyArrow Table, yield RecordBatch
, and let Spark do the rest.
-
ROOT Datasource for PySpark: pyspark-root-datasource and PyPI
Spark docs: Spark Python Datasource
Yield arrow batches, skip the row-by-row serialization tax
Spark 4, introduces a Python datasource, which makes it simple perform data ingestion into Spark via Python libraries. However, the vanilla version has to yield Python tuples/Rows, incurring per-row Python/JVM hops. With direct Arrow batch support introduced in SPARK-48493, your DataSourceReader.read()
can yield pyarrow.RecordBatch
objects. Spark ingests them in batch using Arrow, avoiding that row-by-row overhead and considerably reducing the time spent across the boundary Python-JVM.
ROOT → Arrow → Spark in a few moving parts
Here’s the essence of the ROOT data format reader (the writer is not needed at moment and it is left for future work) I implemented and packaged in pyspark-root-datasource:
-
Subclass the DataSource and DataSourceReader from
pyspark.sql.datasource
. -
In
read(partition)
, producepyarrow.RecordBatch
objects. -
Register your source with
spark.dataSource.register(...)
, thenspark.read.format("root")...
.
Minimal, schematic code (pared down for clarity):
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition class RootDataSource(DataSource): @classmethod def name(cls): return "root" def schema(self): # Return a Spark schema (string or StructType) or infer in the reader. return "nMuon int, Muon_pt array<float>, Muon_eta array<float>" def reader(self, schema): return RootReader(schema, self.options) class RootReader(DataSourceReader): def __init__(self, schema, options): self.schema = schema self.path = options.get("path") self.tree = options.get("tree", "Events") self.step_size = int(options.get("step_size", 1_000_000)) def partitions(self): # Example: split work into N partitions N = 8 return [InputPartition(i) for i in range(N)] def read(self, partition): import uproot, awkward as ak, pyarrow as pa start = partition.index * self.step_size stop = start + self.step_size with uproot.open(self.path) as f: arrays = f[self.tree].arrays(entry_start=start, entry_stop=stop, how=ak.Array) table = ak.to_arrow_table(arrays, list_to32=True) for batch in table.to_batches(): # <-- yield Arrow RecordBatches directly yield batch
Why this felt easy
-
Tiny surface area: one
DataSource
, oneDataSourceReader
, and aread()
generator. The Spark docs walk through the exact hooks and options: Apache Spark -
Python-native stack: I could leverage existing Python libraries for reading the ROOT format and for processing Arrow, Uproot + Awkward + PyArrow, with no additional Scala/Java code.
Arrow batches = speed: SPARK-48493 wires Arrow batches into the reader path to avoid per-row conversion. issues.apache.org
Practical tips
-
Provide a schema if you can. It enables early pruning and tighter I/O; the Spark guide covers schema handling and type conversions.
-
Partitioning matters. ROOT TTrees can be large and jagged: tune
step_size
to balance task parallelism and batch size. (The reader exposes options for this.) -
Arrow knobs. For very large datasets, controlling Arrow batch sizes can improve downstream pipeline behavior; see Spark’s Arrow notes.
“Show me the full thing”
If you want a working reader with options for local files, directories, globs, and the XRootD protocol (root://
), check out the package at: pyspark-root-datasource
Quick start
1. Install
pip install pyspark-root-datasource
2. Grab an example ROOT file:
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/Run2012BC_DoubleMuParked_Muons.root
3. Read with PySpark (Python code):from pyspark.sql import SparkSession from pyspark_root_datasource import register spark = SparkSession.builder.appName("ROOT via PySpark").getOrCreate() register(spark) # registers "root" format schema = "nMuon int, Muon_pt array<float>, Muon_eta array<float>, Muon_phi array<float>" df = (spark.read.format("root") .schema(schema) .option("path", "Run2012BC_DoubleMuParked_Muons.root") .option("tree", "Events") .option("step_size", "1000000") .load()) df.printSchema() df.show(3, truncate=False) df.count()
A tiny physics “hello world”
Once the ROOT data is loaded, you can compute a dimuon mass spectrum from CERN Open Data in a few lines, a smoke test and a familiar HEP demo: Run a basic Physics analysis using CERN Opendata, link to the Notebook
Beyond HEP: a general recipe
The same approach works for any niche or binary format:
-
Read bytes with your Python parser.
-
Build a PyArrow Table.
-
Yield
RecordBatch
objects fromread()
.
Spark handles distribution, schema, projection, joins, writes, so “we should write a connector” becomes a focused Python task rather than a long JVM project.
Performance note
Python data sources still cross the Python↔JVM boundary. Direct Arrow batches greatly reduce the serialization tax and improve throughput, but if you need absolute peak performance, a native JVM DataSource V2 (like Laurelin for ROOT) will usually win. Choose based on your team’s skills and performance envelope. spark.apache.org+1
Limitations
This is a Python DataSource. Even with direct Arrow RecordBatch
support, it won’t match a native Scala/Java V2 connector for raw throughput or advanced pushdown. Projection and predicate pushdown are limited. Schema inference on large TTrees is slow and may widen types—prefer explicit schemas. ROOT quirks apply: unsigned integers must be cast to Spark’s signed types, and deeply nested or jagged arrays may need options like extension arrays to stay practical. Watch memory: oversized Arrow batches or partitions can OOM executors—tune arrow_max_chunksize
, step_size
, and partition counts. XRootD works but depends on your client and network; misconfigured deps (uproot
, awkward
, pyarrow
, fsspec-xrootd
) are common failure points. Finally, the project is read-only (no writer), not streaming, and multi-file datasets with inconsistent schemas may require pre-validation or per-file options.
Pointers & references
- The ROOT format is part of the ROOT project
- Key dependencies from scikit-hep: uproot and awkward array
- Spark Python Datasources: Python Data Source API, Spark Python datasources, Datasource for Huggingface datasets
- SPARK-48493 - Arrow batch support for improved performance
- Example notebooks on Apache Spark for Physics + a note on reading ROOT files with Spark
- Laurelin, a Spark DataSource V2 implementation in Java
No comments:
Post a Comment