Topic: This post will show you how to generate histograms using Apache Spark. You will find examples using the Spark DataFrame API and with a custom helper package, SparkHistogram. Additional examples will extend the work to histogram generation for several other databases and SQL engines.
Disambiguation: we refer here to computing histograms for data analysis, rather than histograms of table columns or statistics used by cost- based optimizers.
Why histograms with Apache Spark?
Histograms are common tools for data analysis and are a key element in most High Energy Physics analyses. See also the post Can High Energy Physics Analysis Profit from Apache Spark APIs?
The advantage of generating histograms using Apache Spark, or other distributed data engines, is that the computation can be run at scale, with higher bandwidth to the data. This is useful if you have large datasets, for example, datasets that require distributed computing as they cannot be timely computed by one machine.
When handling smaller amounts of data, however, you can evaluate the alternative of just processing filters and map functions at scale, then fetching all the results into the driver, and finally using state-of-the-art libraries to generate histograms, such as Pandas histogram or numpy histogram or boost-histogram.
Vanilla solution: Spark's native histogram_numeric function
Spark has a DataFrame aggregate function for generating approximate histograms, histogram_numeric, since Spark version 3.3.0 (see SPARK-16280). There are a few implementation details and limitations to keep in mind when using histogram_numeric:
- it produces as output an array of (x,y) pairs representing the center of the histogram bins and their corresponding value.
- bins don't have a uniform size
- the result is an approximate calculation
- when using a large number of bins (e.g. more than 1000 bins) the histogram_numeric can become quite slow
See also this link for an example of how to use histogram_numeric.
Given the limitations of histogram_numeric, we have developed a different solution based on the DataFrame API (see next paragraph).
An improved solution: reduce boilerplate code with SparkHistogram
It is easy to implement some basic histogram generation using the DataFrame API or Spark SQL. For a few simple cases, a wrapper around the width_bucket function can do the job. Width_bucket is a common function in many SQL engines including Apache Spark since version 3.1.0.
A simple expression for computing the histogram works by mapping each data value into a bucket and then aggregating the values in each bucket using the count function, as in this example:
hist = (df |
.selectExpr("width_bucket(column_name, min_val, max_val, num_bins) as bucket") |
.groupBy("bucket") |
The SparkHistogram package is built with the idea of reducing boilerplate code and contains helper functions for generating frequency histograms and also a close variant of it, weighted histograms. Computing histograms with SparkHistogram becomes simply:
from sparkhistogram import computeHistogram
hist = computeHistogram(df, f"{data_column}", min_val, max_val, num_bins)
More information on the SparkHistogram package for Python and Scala at:
- Python: SparkHistogram on Pypi
- Scala: SparkHistogram in Scala
Examples
Extend the work on histogram generation to more SQL engines
- frequency histograms using Spark SQL
- frequency histograms using_Trino_or_Presto
- frequency histograms using_PostgreSQL
- frequency histograms using_YugabyteDB
- frequency histograms using_Oracle