TL;DR: Apache Spark 4 lets you build first-class data sources in pure Python. If your reader yields Arrow RecordBatch
objects, Spark ingests them with reduced Python↔JVM serialization overhead. I used this to ship a ROOT dataformat reader for PySpark.
A PySpark reader for the ROOT format
ROOT is the de-facto data format across High-Energy Physics; CERN experiments alone store more than 1 exabytes of data in ROOT files. With Spark 4’s Python API, building a ROOT reader becomes a focused Python exercise: parse with Uproot/Awkward, produce a PyArrow Table, yield RecordBatch
, and let Spark do the rest.
-
ROOT Datasource for PySpark: pyspark-root-datasource and PyPI
Spark docs: Spark Python Datasource
Yield arrow batches, skip the row-by-row serialization tax
Spark 4, introduces a Python datasource, which makes it simple perform data ingestion into Spark via Python libraries. However, the vanilla version has to yield Python tuples/Rows, incurring per-row Python/JVM hops. With direct Arrow batch support introduced in SPARK-48493, your DataSourceReader.read()
can yield pyarrow.RecordBatch
objects. Spark ingests them in batch using Arrow, avoiding that row-by-row overhead and considerably reducing the time spent across the boundary Python-JVM.
ROOT → Arrow → Spark in a few moving parts
Here’s the essence of the ROOT data format reader (the writer is not needed at moment and it is left for future work) I implemented and packaged in pyspark-root-datasource:
-
Subclass the DataSource and DataSourceReader from
pyspark.sql.datasource
. -
In
read(partition)
, producepyarrow.RecordBatch
objects. -
Register your source with
spark.dataSource.register(...)
, thenspark.read.format("root")...
.
Minimal, schematic code (pared down for clarity):
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition class RootDataSource(DataSource): @classmethod def name(cls): return "root" def schema(self): # Return a Spark schema (string or StructType) or infer in the reader. return "nMuon int, Muon_pt array<float>, Muon_eta array<float>" def reader(self, schema): return RootReader(schema, self.options) class RootReader(DataSourceReader): def __init__(self, schema, options): self.schema = schema self.path = options.get("path") self.tree = options.get("tree", "Events") self.step_size = int(options.get("step_size", 1_000_000)) def partitions(self): # Example: split work into N partitions N = 8 return [InputPartition(i) for i in range(N)] def read(self, partition): import uproot, awkward as ak, pyarrow as pa start = partition.index * self.step_size stop = start + self.step_size with uproot.open(self.path) as f: arrays = f[self.tree].arrays(entry_start=start, entry_stop=stop, how=ak.Array) table = ak.to_arrow_table(arrays, list_to32=True) for batch in table.to_batches(): # <-- yield Arrow RecordBatches directly yield batch
Why this felt easy
-
Tiny surface area: one
DataSource
, oneDataSourceReader
, and aread()
generator. The Spark docs walk through the exact hooks and options: Apache Spark -
Python-native stack: I could leverage existing Python libraries for reading the ROOT format and for processing Arrow, Uproot + Awkward + PyArrow, with no additional Scala/Java code.
Arrow batches = speed: SPARK-48493 wires Arrow batches into the reader path to avoid per-row conversion. issues.apache.org
Practical tips
-
Provide a schema if you can. It enables early pruning and tighter I/O; the Spark guide covers schema handling and type conversions.
-
Partitioning matters. ROOT TTrees can be large and jagged: tune
step_size
to balance task parallelism and batch size. (The reader exposes options for this.) -
Arrow knobs. For very large datasets, controlling Arrow batch sizes can improve downstream pipeline behavior; see Spark’s Arrow notes.
“Show me the full thing”
If you want a working reader with options for local files, directories, globs, and the XRootD protocol (root://
), check out the package at: pyspark-root-datasource
Quick start
1. Install
pip install pyspark-root-datasource
2. Grab an example ROOT file:
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/Run2012BC_DoubleMuParked_Muons.root
3. Read with PySpark (Python code):from pyspark.sql import SparkSession from pyspark_root_datasource import register spark = SparkSession.builder.appName("ROOT via PySpark").getOrCreate() register(spark) # registers "root" format schema = "nMuon int, Muon_pt array<float>, Muon_eta array<float>, Muon_phi array<float>" df = (spark.read.format("root") .schema(schema) .option("path", "Run2012BC_DoubleMuParked_Muons.root") .option("tree", "Events") .option("step_size", "1000000") .load()) df.printSchema() df.show(3, truncate=False) df.count()