Monday, May 23, 2022

Making histograms with Apache Spark and other SQL engines

Topic: This post will show you how to generate histograms using Apache Spark. You will find examples using the Spark DataFrame API and with a custom helper package, SparkHistogram. Additional examples will extend the work to histogram generation for several other databases and SQL engines.  

Disambiguation: we refer here to computing histograms for data analysis, rather than histograms of table columns or statistics used by cost- based optimizers.

  

Why histograms with Apache Spark?

Histograms are common tools for data analysis and are a key element in most High Energy Physics analyses. See also the post Can High Energy Physics Analysis Profit from Apache Spark APIs?

The advantage of generating histograms using Apache Spark, or other distributed data engines, is that the computation can be run at scale, with higher bandwidth to the data. This is useful if you have large datasets, for example, datasets that require distributed computing as they cannot be timely computed by one machine.

When handling smaller amounts of data, however, you can evaluate the alternative of just processing filters and map functions at scale, then fetching all the results into the driver, and finally using state-of-the-art libraries to generate histograms, such as Pandas histogram or numpy histogram or boost-histogram.

 

Vanilla solution: Spark's native histogram_numeric function

Spark has a DataFrame aggregate function for generating approximate histograms, histogram_numeric, since Spark version 3.3.0 (see SPARK-16280). There are a few implementation details and limitations to keep in mind when using histogram_numeric:

  • it produces as output an array of (x,y) pairs representing the center of the histogram bins and their corresponding value.
  • bins don't have a uniform size
  • the result is an approximate calculation
  • when using a large number of bins (e.g. more than 1000 bins) the histogram_numeric can become quite slow

See also this link for an example of how to use histogram_numeric.

Given the limitations of histogram_numeric, we have developed a different solution based on the DataFrame API (see next paragraph).

  

An improved solution: reduce boilerplate code with SparkHistogram

It is easy to implement some basic histogram generation using the DataFrame API or Spark SQL. For a few simple cases, a wrapper around  the width_bucket function can do the job. Width_bucket is a common function in many SQL engines including Apache Spark since version 3.1.0.

A simple expression for computing the histogram works by mapping each data value into a bucket and then aggregating the values in each bucket using the count function, as in this example:

  
hist = (df
.selectExpr("width_bucket(column_name, min_val, max_val, num_bins) as bucket")
.groupBy("bucket")
.count() )
  
The implementation is straightforward, however, additional code is needed to make it more useful in practice: we need to take care of buckets with no elements, and of  computing the data value to assign to each bucket. The resulting expression can be found, for example, in the code of the computeHistogram function. 

The SparkHistogram package is built with the idea of reducing boilerplate code and contains helper functions for generating frequency histograms and also a close variant of it, weighted histograms. Computing histograms with SparkHistogram becomes simply:


from sparkhistogram import computeHistogram

hist = computeHistogram(df, f"{data_column}", min_val, max_val, num_bins)

# or, in alternative:
hist = df.transform(computeHistogram, f"{data_column}", min_val, max_val, num_bins)
  

More information on the SparkHistogram package for Python and Scala at:

     
   

Examples

  
Jupyter notebooks showing how to generate histograms using PySpark and SparkHistogram (see further in this post for Spark SQL examples):

Additional examples in the context of Physics analysis:
     
This histogram has been generated using ATLAS Open Data collected at the LHC at CERN and processed using PySpark and the SparkHistogram package.

   

Extend the work on histogram generation to more SQL engines


You can also use SQL to generate your histograms. The following examples work with minor modifications across different data/database systems and can be easily extended to run on all SQL engines that implement the width_bucket function. Example notebooks:
For more complex histogram use cases with Spark see also Histogrammar
  
This work has been done in the context of the CERN databases and analytics services and the ATLAS data engineering efforts. Additional thanks to Jim Pivarski for discussions.
  

No comments:

Post a Comment