Tuesday, January 30, 2024

Enhancing Apache Spark and Parquet Efficiency: A Deep Dive into Column Indexes and Bloom Filters

In the ever-evolving landscape of big data, Apache Spark and Apache Parquet continue to introduce game-changing features. Their latest updates have brought forward significant enhancements, including column indexes, bloom filters. This blog post delves into these new features, exploring their applications and benefits. This post is based on the extended notes at:


Key Advantages of Parquet in Spark

This is not an introductory article, however here is a quick recap of why you may want to spend time learning more about Apache Parquet and Spark. Parquet is a columnar storage file format optimized for use with data processing frameworks like Apache Spark. It offers efficient data compression and encoding schemes.

  • Parquet is a columnar format enabling efficient data storage and retrieval
  • It supports compression and encoding
  • Optimizations in Spark for Parquet include:
    • Vectorized Parquet reader
    • Filter push down capabilities
    • Enhanced support for partitioning and handling large files

Another key aspect of Parquet with Spark that are important to know for the following is:
  • Row Group Organization: Parquet files consist of one or more 'row groups,' typically sized around 128 MB, although this is adjustable.
  • Parallel Processing Capabilities: Both Spark and other engines can process Parquet data in parallel, leveraging the row group level or the file level for enhanced efficiency.
  • Row Group Statistics: Each row group holds vital statistics like minimum and maximum values, counts, and the number of nulls. These stats enable the 'skip data' feature when filters are applied, essentially serving as a zone map to optimize query performance.

ORC: For a comparison of Apache Parquet with another popular data format, Apache ORC, refer to Parquet-ORC Comparison.


Understanding Column Indexes and Bloom Filters in Parquet

Column Indexes: Enhancing Query Efficiency

Column indexes, introduced in Parquet 1.11 and utilized in Spark 3.2.0 and higher, offer a fine-grained approach to data filtering. These indexes store min and max values at the Parquet page level, allowing Spark to efficiently execute filter predicates at a much finer granularity than the default 128 MB row group size. Particularly effective for sorted data, column indexes can drastically reduce the data read from disk, improving query performance.

Bloom Filters: A Leap in Filter Operations

Parquet 1.12 (utilized by Spark 3.2.0 and higher) introduced Bloom filters, a probabilistic data structure that efficiently determines whether an element is in a set. They are particularly useful for columns with high cardinality and scenarios where filter operations are based on values likely to be absent from the dataset. Using bloom filters can lead to significant improvements in read performance.


Example: Spark using Parquet column indexes

Test dataset and preparation

  • The Parquet test file used below parquet112file_sorted is extracted from the TPCDS benchmark table web_sales
  • the table (parquet file) contains data sorted on the column ws_sold_time_sk
  • it's important that the data is sorted, this groups together values in the filter column "ws_sold_time_sk", if the values are scattered the column index min-max statistics will have a wide range and will not be able to help with skipping data
  • the sorted dataset has been created using spark.read.parquet("path + "web_sales_piece.parquet").sort("ws_sold_time_sk").coalesce(1).write.parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")
  • Download the test data:

Run the tests

  1. Fast (reads only 20k rows):
    Spark will read the Parquet using a filter and makes use of column and offset indexes:
bin/spark-shell

val path = "./" 
val df = spark.read.parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")

// Read the file using a filter, this will use column and offset indexes
val q1 = df.filter("ws_sold_time_sk=28801")
val plan = q1.queryExecution.executedPlan
q1.collect

// Use Spark metrics to see how many rows were processed
// This is also available for the WebUI in graphical form
val metrics = plan.collectLeaves().head.metrics
metrics("numOutputRows").value

res: Long = 20000

The result shows that only 20000 rows were processed, this corresponds to processing just a few pages, as opposed to reading and processing the entire file. This is made possible by the use of the min-max value statistics in the column index for column ws_sold_time_sk.
Column indexes are created by default in Spark version 3.2.x and higher.


  1. Slow (reads 2M rows):
    Same as above, but this time we disable the use of column indexes.
    Note this is also what happens if you use Spark versions prior to Spark 3.2.0 (notably Spark 2.x) to read the file.
bin/spark-shell

val path = "./"
// disable the use of column indexes for testing purposes
val df = spark.read.option("parquet.filter.columnindex.enabled","false").parquet(path + "web_sales_piece_sorted_ws_sold_time_sk.parquet")

val q1 = df.filter("ws_sold_time_sk=28801")
val plan = q1.queryExecution.executedPlan
q1.collect

// Use Spark metrics to see how many rows were processed
val metrics = plan.collectLeaves().head.metrics
metrics("numOutputRows").value

res: Long = 2088626

The result is that all the rows in the row group (2088626 rows in the example) were read as Spark could not push the filter down to the Parquet page level. This example runs more slowly than the example below and in general performs more work (uses more CPU cycles and reads more data from the filesystem).


Diagnostics and Internals of Column and Offset Indexes

Column indexes in Parquet are key structures designed to optimize filter performance during data reads. They are particularly effective for managing and querying large datasets.

Key Aspects of Column Indexes:

  • Purpose and Functionality: Column indexes offer statistical data (minimum and maximum values) at the page level, facilitating efficient filter evaluation and optimization.
  • Default Activation: By default, column indexes are enabled to ensure optimal query performance.
  • Granularity Insights: While column indexes provide page-level statistics, similar statistics are also available at the row group level. Typically, a row group is approximately 128MB, contrasting with pages usually around 1MB.
  • Customization Options: Both rowgroup and page sizes are configurable, offering flexibility to tailor data organization. For further details, see Parquet Configuration Options.

Complementary Role of Offset Indexes:

  • Association with Column Indexes: Offset indexes work in tandem with column indexes and are stored in the file's footer in Parquet versions 1.11 and above.
  • Scanning Efficiency: A key benefit of these indexes is their role in data scanning. When filters are not applied in Parquet file scans, the footers with column index data can be efficiently skipped, enhancing the scanning process.

Additional Resources:

For an in-depth explanation of column and offset indexes in Parquet, consider reading this detailed description.

The integration of column and offset indexes significantly improves Parquet's capability in efficiently handling large-scale data, especially in scenarios involving filtered reads. Proper understanding and utilization of these indexes can lead to marked performance improvements in data processing workflows.


Tools to drill down on column index metadata in Parquet files

  • parquet-cli

    • example: hadoop jar target/parquet-cli-1.13.1-runtime.jar org.apache.parquet.cli.Main column-index -c ws_sold_time_sk <path>/my_parquetfile
    • More details on how to use parquet-cli at Tools for Parquet Diagnostics
  • Example with the Java API from Spark-shell

    // customize with the file path and name
    val fullPathUri = java.net.URI.create("<path>/myParquetFile")
    
    // crate a Hadoop input file and opens it with ParquetFileReader
    val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(fullPathUri), spark.sessionState.newHadoopConf())
    val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
    
    // Get the Parquet file version
    pf.getFooter.getFileMetaData.getCreatedBy
    
    // columns index
    val columnIndex = pf.readColumnIndex(columns.get(0))
    columnIndex.toString.foreach(print)
    
    // offset index
    pf.readOffsetIndex(columns.get(0))
    print(pf.readOffsetIndex(columns.get(0)))
    

The output on a column that is sorted looks like:

row-group 0:
column index for column ws_sold_time_sk:
Boudary order: ASCENDING
                      null count  min                                       max
page-0                        45  29                                        12320
page-1                         0  12320                                     19782
page-2                         0  19782                                     26385
page-3                         0  26385                                     31758
page-4                         0  31758                                     36234
page-5                         0  36234                                     40492
page-6                         0  40492                                     44417
page-7                         0  44417                                     47596
page-8                         0  47596                                     52972
page-9                         0  52972                                     58388
page-10                        0  58388                                     62482
page-11                        0  62482                                     65804
page-12                        0  65804                                     68647
page-13                        0  68647                                     71299
page-14                        0  71303                                     74231
page-15                        0  74231                                     77978
page-16                        0  77978                                     85712
page-17                        0  85712                                     86399

offset index for column ws_sold_time_sk:
                          offset   compressed size       first row index
page-0                     94906              4759                     0
page-1                     99665              4601                 20000
page-2                    104266              4549                 40000
page-3                    108815              4415                 60000
page-4                    113230              4343                 80000
page-5                    117573              4345                100000
page-6                    121918              4205                120000
page-7                    126123              3968                140000
page-8                    130091              4316                160000
page-9                    134407              4370                180000
page-10                   138777              4175                200000
page-11                   142952              4012                220000
page-12                   146964              3878                240000
page-13                   150842              3759                260000
page-14                   154601              3888                280000
page-15                   158489              4048                300000
page-16                   162537              4444                320000
page-17                   166981               200                340000


Bloom filters in Parquet

With the release of Parquet 1.12, there's now the capability to generate and store Bloom filters within the file footer's metadata. This addition significantly enhances query performance for specific filtering operations. Bloom filters are especially advantageous in the following scenarios:

High Cardinality Columns: They effectively address the limitations inherent in using Parquet dictionaries for columns with a vast range of unique values.

Absent Value Filtering: Bloom filters are highly efficient for queries that filter based on values likely to be missing from the table or DataFrame. This efficiency stems from the characteristic of Bloom filters where false positives (erroneously concluding that a non-existent value is present) are possible, but false negatives (failing to identify an existing value) do not occur.

For a comprehensive understanding and technical details of implementing Bloom filters in Apache Parquet, refer to the official documentation on bloom filters in Apache Parquet


Configuration

Important configurations for writing bloom filters in Parquet files are:

.option("parquet.bloom.filter.enabled","true") // write bloom filters for all columns, default is false
.option("parquet.bloom.filter.enabled#column_name", "true") // write bloom filter for the given column
.option("parquet.bloom.filter.expected.ndv#column_name", num_values) // tuning for bloom filters, ndv = number of distinct values
.option("parquet.bloom.filter.max.bytes", 1024*1024) // The maximum number of bytes for a bloom filter bitset, default 1 MB


Write Parquet files with Bloom filters

This is an example of how to read a Parquet file without bloom filter (for example because it had been created with an older version of Spark/Parquet) and add the bloom filter, with additional tuning of the bloom filter parameters for one of the columns:

val df = spark.read.parquet("<path>/web_sales")
df.coalesce(1).write.option("parquet.bloom.filter.enabled","true").option("parquet.bloom.filter.expected.ndv#ws_sold_time_sk", 25000).parquet("<myfilepath")


Example: Checking I/O Performance in Parquet: With and Without Bloom Filters

Understanding the impact of using bloom filters on I/O performance during Parquet file reads can be important for optimizing data processing. This example outlines the steps to compare I/O performance when reading Parquet files, both with and without the utilization of bloom filters.

This example uses Parquet bloom filters to improve Spark read performance

1. Prepare the test table


bin/spark-shell
val numDistinctVals=1e6.toInt
val df=sql(s"select id, int(random()*100*$numDistinctVals) randomval from range($numDistinctVals)")
val path = "./"

// Write the test DataFrame into a Parquet file with a Bloom filter
df.coalesce(1).write.mode("overwrite").option("parquet.bloom.filter.enabled","true").option("parquet.bloom.filter.enabled#randomval", "true").option("parquet.bloom.filter.expected.ndv#randomval", numDistinctVals).parquet(path + "spark320_test_bloomfilter")

// Write the same DataFrame in Parquet, but this time without Bloom filters 
df.coalesce(1).write.mode("overwrite").option("parquet.bloom.filter.enabled","false").parquet(path + "spark320_test_bloomfilter_nofilter")

// use the OS (ls -l) to compare the size of the files with bloom filter and without
// in my test (Spark 3.5.0, Parquet 1.13.1) it was 10107275 with bloom filter and 8010077 without

:quit

2. Read data using the Bloom filter, for improved performance

bin/spark-shell

val path = "./"
val df =spark.read.option("parquet.filter.bloom.enabled","true").parquet(path + "spark320_test_bloomfilter")
val q1 = df.filter("randomval=1000000") // filter for a value that is not in the file
q1.collect

// print I/O metrics
org.apache.hadoop.fs.FileSystem.printStatistics()

// Output
FileSystem org.apache.hadoop.fs.RawLocalFileSystem: 1091611 bytes read, ...

:quit

3. Read disabling the Bloom filter (this will read more data from the filesystem and have worse performance)

bin/spark-shell

val path = "./"
val df =spark.read.option("parquet.filter.bloom.enabled","false").parquet(path + "spark320_test_bloomfilter")
val q1 = df.filter("randomval=1000000") // filter for a value that is not in the file
q1.collect

// print I/O metrics
org.apache.hadoop.fs.FileSystem.printStatistics()

// Output
FileSystem org.apache.hadoop.fs.RawLocalFileSystem: 8299656 bytes read, ...


Reading Parquet Bloom Filter Metadata with Apache Parquet Java API

To extract metadata about the bloom filter from a Parquet file using the Apache Parquet Java API in spark-shell, follow these steps:

 

  1. Initialize the File Path: define the full path of your Parquet file
    bin/spark-shell
    val fullPathUri = java.net.URI.create("<my_file_path>")
  2. Create Input File: utilize HadoopInputFile to create an input file from the specified path
    val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(
             new org.apache.hadoop.fs.Path(fullPathUri), 
             spark.sessionState.newHadoopConf()
             )
    
     
  3. Open Parquet File Reader: open the Parquet file reader for the input file
    val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
    
     
  4. Retrieve Blocks and Columns: extract the blocks from the file footer and then get the columns from the first block
    val blocks = pf.getFooter.getBlocks
    val columns = blocks.get(0).getColumns
    
     
  5. Read Bloom Filter: finally, read the bloom filter from the first column
    val bloomFilter = pf.readBloomFilter(columns.get(0))
    bloomFilter.getBitsetSize
     

By following these steps, you can successfully read the bloom filter metadata from a Parquet file using the Java API in the spark-shell environment.

Discovering Parquet Version

The Parquet file format is constantly evolving, incorporating additional metadata to support emerging features. Each Parquet file embeds the version information within its metadata, reflecting the Parquet version used during its creation.

Importance of Version Awareness:

Compatibility Considerations: When working with Parquet files generated by older versions of Spark and its corresponding Parquet library, it's important to be aware that certain newer features may not be supported. For instance, column indexes, which are available in the Spark DataFrame Parquet writer from version 3.2.0, might not be present in files created with older versions.

Upgrading for Enhanced Features: Upon upgrading your Spark version, it's beneficial to also update the metadata in existing Parquet files. This update allows you to utilize the latest features introduced in newer versions of Parquet.

Checking the Parquet File Version:

The following sections will guide you on how to check the Parquet version used in your files, ensuring that you can effectively manage and upgrade your Parquet datasets. This format provides a structured and detailed approach to understanding and managing Parquet file versions, emphasizing the importance of version compatibility and the process of upgrading.

  • Details at Tools for Parquet Diagnostics

  • parquet-cli

    • example: hadoop jar parquet-cli/target/parquet-cli-1.13.1-runtime.jar org.apache.parquet.cli.Main meta <path>/myParquetFile
  • Hadoop API ...

    • example of using Hadoop API from the spark-shell CLI
    // customize with the file path and name
    val fullPathUri = java.net.URI.create("<path>/myParquetFile")
     
    // crate a Hadoop input file and opens it with ParquetFileReader
    val in = org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(fullPathUri), spark.sessionState.newHadoopConf())
    val pf = org.apache.parquet.hadoop.ParquetFileReader.open(in)
    
    // Get the Parquet file version
    pf.getFooter.getFileMetaData.getCreatedBy
    
    // Info on file metadata
    print(pf.getFileMetaData)
    print(pf.getRowGroups)
    
  • Spark extension library
    The spark-extension library allows to query Parquet metadata using Apache Spark.
    Example:

    bin/spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5
    
    import uk.co.gresearch.spark.parquet._
    spark.read.parquetMetadata("...path..").show()
    spark.read.parquetBlockColumns(...path..").show()
    

Updating Parquet File Versions

Upgrading your Parquet files to a newer version can be achieved by copying them using a more recent version of Spark. This section covers the steps to convert your Parquet files to an updated version.

Conversion Method:

Using Recent Spark Versions: To update Parquet files, read them with a newer version of Spark and then save them again. This process effectively updates the files to the Parquet version used by that Spark release.
For instance, using Spark 3.5.0 will allow you to write files in Parquet version 1.13.1.

Approach Note: This method is somewhat brute-force as there isn't a direct mechanism solely for upgrading Parquet metadata.

Practical Example: Copying and converting Parquet version by reading and re-writing, applied to the TPCDS benchmark:

bin/spark-shell --master yarn --driver-memory 4g --executor-memory 50g --executor-cores 10 --num-executors 20 --conf spark.sql.shuffle.partitions=400

val inpath="/project/spark/TPCDS/tpcds_1500_parquet_1.10.1/"
val outpath="/project/spark/TPCDS/tpcds_1500_parquet_1.13.1/"
val compression_type="snappy" // may experiment with "zstd"

// we need to do this in two separate groups: partitioned and non-partitioned tables

// copy the **partitioned tables** of the TPCDS benchmark
// compact each directory into 1 file with repartition
val tables_partition=List(("catalog_returns","cr_returned_date_sk"), ("catalog_sales","cs_sold_date_sk"), ("inventory","inv_date_sk"), ("store_returns","sr_returned_date_sk"), ("store_sales","ss_sold_date_sk"), ("web_returns","wr_returned_date_sk"), ("web_sales","ws_sold_date_sk"))
for (t <- tables_partition) {
  println(s"Copying partitioned table $t")
  spark.read.parquet(inpath + t._1).repartition(col(t._2)).write.partitionBy(t._2).mode("overwrite").option("compression", compression_type).parquet(outpath + t._1)
}

// copy non-partitioned tables of the TPCDS benchmark
// compact each directory into 1 file with repartition
val tables_nopartition=List("call_center","catalog_page","customer","customer_address","customer_demographics","date_dim","household_demographics","income_band","item","promotion","reason","ship_mode","store","time_dim","warehouse","web_page","web_site")
for (t <- tables_nopartition) {
  println(s"Copying table $t")
  spark.read.parquet(inpath + t).coalesce(1).write.mode("overwrite").option("compression", compression_type).parquet(outpath + t)
}

Conclusions

Apache Spark and Apache Parquet continue to innovate and are constantly upping their game in big data. They've rolled out cool features like column indexes and bloom filters, really pushing the envelope on speed and efficiency. It's a smart move to keep your Spark updated, especially to Spark 3.x or newer, to get the most out of these perks. Also, don’t forget to give your Parquet files a quick refresh to the latest format – the blog post has got you covered with a how-to. Staying on top of these updates is key to keeping your data game strong!

I extend my deepest gratitude to my colleagues at CERN for their invaluable guidance and support. A special acknowledgment goes out to the teams behind the CERN data analytics, monitoring, and web notebook services, as well as the dedicated members of the ATLAS database team.

Further details on the topics covered here can be found at: