Showing posts with label flame graph. Show all posts
Showing posts with label flame graph. Show all posts

Wednesday, September 27, 2023

Enhancing Apache Spark Performance with Flame Graphs: A Practical Example Using Grafana Pyroscope

TL;DR Explore a step-by-step example of troubleshooting Apache Spark job performance using flame graph visualization and profiling. Discover the seamless integration of Grafana Pyroscope with Spark for streamlined data collection and visualization.


The Puzzle of the Slow Query

Set within the framework of data analysis for the ATLAS experiment's Data Control System, our exploration uses data stored in the Parquet format and deploys Apache Spark for queries. The setup: Jupyter notebooks operating on the SWAN service at CERN interfacing with the Hadoop and Spark service.

The Hiccup: A notably slow query during data analysis where two tables are joined. Running on 32 cores, this query takes 27 minutes—surprisingly long given the amount of data in play.

The tables involved:

  • EVENTHISTORY: A log of events for specific sub-detectors, each row contains a timestamp, the subsystem id and a value
  • LUMINOSITY, a table containing the details of time intervals called "luminosity blocks", see Luminosity block - Particle Wiki

Data size:
EVENTHISTORY is a large table, it can collect millions of data points per day, while LUMINOSITY is a much smaller table (only thousands of points per day). In the test case reported here we used data collected over 1 day, with EVENTHISTORY -> 75M records, and LUMINOSITY -> 2K records.


The join condition between EVENTHISTORY and LUMINOSITY is an expression used to match for events in EVENTHISORY and intervals in LUMINOSITY (note this is not a join based on an equality predicate). This is what the query looks like in SQL:


spark.sql("""
select l.LUMI_NUMBER, e.ELEMENT_ID, e.VALUE_NUMBER
from eventhistory e, luminosity l
where e.ts between l.starttime and l.endtime
""")


An alternative version of the same query written using the DataFrame API:

eventhistory_df.join(
    luminosity_df, 
    (eventhistory_df.ts >= luminosity_df.starttime) & 
    (eventhistory_df.ts <= luminosity_df.endtime)
    ).select(luminosity_df.LUMI_NUMBER,
             eventhistory_df.ELEMENT_ID,
             eventhistory_df.VALUE_NUMBER)


Cracking the Performance Case

WebUI: The first point of entry for troubleshooting this was the Spark WebUI. We could find there the execution time of the query (27 minutes) and details on the execution plan and SQL metrics under the "SQL/ DataFrame" tab. Figure 1 shows a relevant snippet where we could clearly see that Broadcast nested loop join was used for this.


Execution Plan: The execution plan is the one we wanted for this query, that is the small LUMINOSITY table is broadcasted to all the executors and then joined with each partition of the larger EVENTHISTORY table.


Figure 1: This shows a relevant snippet of the execution graph from the Spark WebUI. The slow query discussed in this post runs using broadcast nested loops join. This means that the small table is broadcasted to all the nodes and then joined to each partition of the larger table.


CPU utilization measured with Spark Dashboard

Spark Dashboard instrumentation provides a way to collect and visualize Spark execution metrics. This makes it easy to plot the CPU used during the SQL execution. From there we could see that  the workload was CPU-bound


The Clue: Profiling with Flame Graphs and Pyroscope

Stack profiling and Flame Graphs visualization are powerful techniques to investigate CPU-bound workloads. We use it here to find where the CPU cycles are consumed and thus make the query slow.

First a little recap of what is stack profiling with flame graph visualization, and what tools we can use to apply it to Apache Spark workloads:

Stack profiling and Flame Graphs visualization provide a powerful technique for troubleshooting CPU-bound workloads. 
  • Flame Graphs provide information on the "hot methods" consuming CPU
  • Flame Graphs and profiling can also be used to profile time spent waiting (off-cpu) and memory allocation

Grafana Pyroscope simplifies data collections and visualization, using agents and a custom WebUI. Key motivations for using it with Spark are:
  • Streamlined Data Collection & Visualization: The Pyroscope project page offers a simplified approach to data gathering and visualization with its custom WebUI and agent integration.
  • Java Integration: The Pyroscope java agent is tailored to work seamlessly with Spark. This integration shines especially when Spark is running on various clusters such as YARN, K8S, or standalone Spark clusters.
  • Correlation with Grafana: Grafana’s integration with Pyroscope lets you juxtapose metrics with other instruments, including the Spark metrics dashboard.
  • Proven Underlying Technology: For Java and Python, the tech essentials for collecting stack profiling data, async-profiler and py-spy, are time-tested and reliable.
  • Functional & Detailed WebUI: Pyroscope’s WebUI stands out with features that allow users to:
    • Select specific data periods
    • Store and display data across various measurements
    • Offer functionalities to contrast and differentiate measurements
    • Showcase collected data for all Spark executors, with an option to focus on individual executors or machines
  • Lightweight Data Acquisition: The Pyroscope java agent is efficient in data gathering. By default, stacks are sampled every 10 milliseconds and uploaded every 10 seconds. We did not observe any measurable  performance or stability impact of the instrumentation.

Spark Configuration


To use Pyroscope with Spark we used some additional configurations. Note this uses a specialized Spark Plugin from this repo. It is also possible to use java agents. The details are at:  

This is how we profiled and visualized the Flame Graph of the query execution:

1. Start Pyroscope
  • Download from https://github.com/grafana/pyroscope/releases
  • CLI start: ./pyroscope -server.http-listen-port 5040
  • Or use docker: docker run -it -p 5040:4040 grafana/pyroscope
  • Note: customize the port number, I used port 5040 to avoid confusion with the Spark WebUI which defaults to port 4040 too
2. Start Spark with custom configuration, as in this example with PySpark:

# Get the Spark session
from pyspark.sql import SparkSession
spark = (SparkSession.builder.
      appName("DCS analysis").master("yarn")
      .config("spark.jars.packages",
      "ch.cern.sparkmeasure:sparkplugins_2.12:0.3, io.pyroscope:agent:0.12.0")
      .config("spark.plugins", "ch.cern.PyroscopePlugin")
      .config("spark.pyroscope.server", "http://pyroscope_hostname:5040")
      .getOrCreate()
    )



Figure 2: This is a snapshot from the Grafana Pyroscope dashboard with data collected during the execution of the slow query (join between EVENTHISTORY and LUMINOSITY). The query runs in 27 minutes, using 32 cores. The Flame Graph shows the top executed methods and the Flame Graph. Notably, a large fraction of the execution time appears to be spent into SparkDateTimeUtils performing date-datatype conversion operations. This is a crucial finding for the rest of the troubleshooting and proposed fix.


The Insight  


Using profiling data from Pyroscope, we pinpointed the root cause of the query's sluggishness. Spark was expending excessive CPU cycles on data type conversion operations during the evaluation of the join predicate. Upon revisiting the WebUI and delving deeper into the execution plan under the SQL/DataFrame tab, we discovered, almost concealed in plain view, the specific step responsible for the heightened CPU consumption:

(9) BroadcastNestedLoopJoin [codegen id : 2]
Join condition: ((ts#1 >= cast(starttime_dec#57 as timestamp)) AND (ts#1 <= cast(endtime_dec#58 as timestamp)))

The extra operations of "cast to timestamp" appear to be key in explaining the issue.
Why do we have date format conversions? 
By inspecting the schema of the involved tables, it turns out that in the LUMINOSITY table the fields used for joining with the timestamp are of type Decimal.

To recap, profiling data, together with the execution plan, showed that the query was slow because it forced data type conversion over and over for each row where the join condition was evaluated.

The fix:  
The solution we applied for this was simple: we converted to use the same data type for all the columns involved in the join, in particular converting to timestamp the columns starttime and endtime of the LUMINOSITY table. 

Results: improved performance 70x:  
The results are that the query after the change runs in 23 sec, compared to the previous runtime of 27 minutes. Figure 3 shows the Flame graph after the fix was applied.



Figure 3: This is a snapshot of the Grafana Pyroscope dashboard with data collected during the execution of the query after tuning. The query takes only 23 seconds compared to 27 minutes before tuning (see Figure 2)

Related work and links

Details of how to use Pyroscope with Spark can be found in the note:  
Related work of interest for Apache Spark performance troubleshooting:
  • Spark Dashboard - tooling and configuration for deploying an Apache Spark Performance Dashboard using containers technology.
  • Spark Measure - a tool for performance troubleshooting of Apache Spark workloads. It simplifies the collection and analysis of Spark task and stage metrics data.
  • Spark Plugins - Code and examples of how to write and deploy Apache Spark Plugins.
  • Spark Notes and Performance Testing notes

Wrapping up

Wrapping Up: Stack profiling and Flame Graph visualization aren’t just jargon—they’re game-changers. Our deep dive illuminated how they transformed an Apache Spark query performance by 70x. Using Grafana Pyroscope with Spark, we demonstrated a holistic approach to gather, analyze, and leverage stack profile data.

A hearty thank you to my colleagues at CERN for their guidance. A special nod to the CERN data analytics, monitoring, and web notebook services, and to the ATLAS database team.

Thursday, September 15, 2016

Apache Spark 2.0 Performance Improvements Investigated With Flame Graphs

Topic: This post is about performance optimizations introduced in Apache Spark 2.0, in particular whole-stage code generation. A test case is introduced and investigated with diagnostic tools.


Introduction: performance troubleshooting of a slow query using parallel query execution in a Hadoop cluster

The idea for this post comes from a performance troubleshooting case that has come up recently at CERN database services. It started with a user reporting slow response time from a query for a custom report in a relational database. After investigations and initial troubleshooting, the query was still running slow (running in about 12 hours). It was understood that the query was mostly running "on CPU" and spending most of its time in evaluating a non-equijoin condition repeated 100s of millions of times. Most importantly it was also found that the query was easily parallelizable, this was good news as it meant that we could simply "throw hardware at it" to make it run faster. One way that the team (see the acknowledgments section at the end of the post) used to parallelize the workload (without affecting the production database), is to export the data to a Hadoop cluster and run the query there using Spark SQL (the cluster used has 14 nodes, installed with CDH 5.7, Spark version 1.6). This way it was possible to bring the execution time down to less than 20 minutes. All this with relatively low effort, as the query could be run basically unchanged.
This post is about Spark, however it is interesting to note that the query was also tested using Apache Impala (version 2.5) on the same cluster and produced comparable speedup results. Later I have also run the query on a "beefy Oracle server" with 60 cores adding the relevant configuration for activating Oracle parallel query (test done on Oracle RDBMS version 12.1.0.2) reproducing comparable execution times/speedup as in the cases of Spark and Impala.


Spark 2.0 enters the scene

As I write this post, Spark 1.6 is installed in our production clusters and Spark 2.0 is still relatively new (it has been released at the end of July 2016). Notably Spark 2.0 has very interesting improvements over the previous versions, among others improvements in the area of performance that I was eager to test (see this blog post by Databricks).
My first test was to try the query discussed in the previous paragraph on a test server with Spark 2.0 and I found that it was running considerably faster than in the tests with Spark 1.6. The best result I achieved, this time on a large box with 60 CPU cores and using Spark 2.0, was an elapsed time of about 2 minutes (to be compared with 20 minutes in Spark 1.6). I was impressed by Spark 2.0's speedup compared to Spark 1.6 and decided to investigate further.


The test case

Rather than using the original query and data, I will report here on a synthetic test case that hopefully illustrates the main points of the original case and at the same is simple and easy to reproduce on your test systems, if you wish to do so. This test uses pyspark, the Python interface to Spark, in the simplest configuration for a test machine: local mode (that is without hadoop or yarn). If needed you can add to the command line "--master local". If you are not familiar with how to run Spark, see further on in this post some hints on how to build a test system.

The preparation of the test data proceeds as follows: (1) it creates a DataFrame and registers it as table "t0" with 10 million rows. (2) Table t0 is used to create the actual test data, which is composed of an "id" column and three additional columns of randomly generated data, all integers. The resulting DataFrame is cached in memory and "registered" as a temporary table called "t1". Spark SQL interface for DataFrames makes this preparation task straightforward:

$ pyspark --driver-memory 2g

test_numrows = 1e7

sqlContext.range(0,test_numrows,1).registerTempTable("t0")

sqlContext.sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().registerTempTable("t1")

The following commands are additional checks to make sure the table t1 has been created correctly and is first read into memory. In particular, note that "t1" has the required test_numrows (10M) rows and the description of its column from the output of the command "desc":

sqlContext.sql("select count(*) from t1").show()

+--------+
|count(1)|
+--------+
|10000000|
+--------+

sqlContext.sql("desc t1").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|      id|   bigint|       |
|  bucket|   bigint|       |
|    val1|   bigint|       |
|    val2|   bigint|       |
+--------+---------+-------+

The actual test query is here below. It consists of a join with two conditions: an equality predicate on the column bucket, which becomes an obvious point of where the query can be executed in parallel, and a more resource-intensive non-equality condition. Notably the query has also an aggregation operation. Some additional boilerplate code is added for timing the duration of the query:

import time
starttime=time.time()

sqlContext.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()

print("Delta time = %f" % (time.time()-starttime))


The results are that Spark 2.0 is about 7 times faster than Spark 1.6 when running the test query on the test server (see details below): Spark 2.0 completed the job in about 15 minutes of elapsed time and 390 minutes of CPU time, while Spark 1.6 took about 100 minutes of elapsed time and 2840 minutes of CPU time. There are fluctuations on the actual job execution time between runs, however you can ignore the fine details and focus on the main finding that the performance difference is striking between runs using Spark 1.6 and Spark 2.0 (Spark 2.0 being much faster). This is worth further investigations into the internals and root causes.

I have run the tests using Spark in its simplest configuration (local mode) using a standalone (non-clustered) server with 16 cores (2 x  E5-2650) and 128 GB of RAM (the virtual memory allocated by the test workload is about 16 GB) running Linux (kernel 2.6.32, RHEL 6.7). If you want to run it on a smaller machine you can scale down the preparation phase by setting test_numrows to a smaller value (for example to 1e6). In that case you probably could do also with using the default value of 1g for the driver-memory.
The tests have been performed on a single server alternating runs with Spark 1.6 and 2.0. In both cases monitoring with OS tools showed that the jobs were CPU-bound, that is with 32 threads (16 cores x 2 for multithreading) running on CPU and utilizing the available resources on the test box. During the tests, no additional significant workload was running on the box.


Drilling down into the execution plans

The physical execution plan generated and executed by Spark (in particular by Catalyst, the optimizer and Tungsten, the execution engine) has important differences in Spark 2.0 compared to Spark 1.6. The logical plan for executing the query however deploys a sort merge join in both cases. Please note in the execution plans reported here that in the case of Spark 2.0 several steps in the execution plan are marked with a star (*) around them. This marks steps optimized with whole-stage code generation.


Physical execution plan in Spark 1.6:

Note that a sort merge join operation is central to the execution plan of this query. Another important step after the join is the aggregation operation, used to compute "sum(a.val2)" as seen in the query text:



Physical execution plan in Spark 2.0:

Note in particular the steps marked with (*), they are optimized with who-stage code generation:




Details of the SQL execution from the Spark Web UI, Spark 1.6. vs. Spark 2.0. This reproduces the physical execution plan with additional metrics gathered at run-time. Note in particular in Spark 2.0 the steps marked as "Whole Stage Codegen".


Code generation is the key

The key to understand the improved performance is with the new features in Spark 2.0 for whole-stage code generation. This is expected and detailed for example in the blog post by Databricks Engineering "Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop Deep dive into the new Tungsten execution engine". The main point is that Spark 2.0 compiles query execution into bytecode that is then executed, as opposed to looping with an iterator over result sets. A detailed discussion on the benefits of query compilation and code generation vs. the "traditional approach" to query execution, also called volcano model, can be found in the lecture by Andy Pavlo on Query Compilation.


Run time investigations with flame graphs

Flame graphs visualization of stack profiles provide additional insights on which parts of the code are executed and where CPU cycles are consumed. The upper layers of the flame graph highlight where CPU cycles are spent. The lower layers add context by detailing the information on the parent functions/methods that called the "upper layers". The idea for this paragraph is to use stack profiles and flame graphs to further drill down on the differences in the execution model between Spark 2.0 and Spark 1.6.

To collect and generate the flame graphs I have used the methods described by Kay Ousterhout in "Generating Flame Graphs for Apache Spark using Java Flight Recorder".  I have used the Java flight recorder on Oracle's Java 8, starting pyspark with the following options:

pyspark --conf "spark.driver.extraJavaOptions"="-XX:+UnlockCommercialFeatures -XX:+FlightRecorder" --conf "spark.executor.extraJavaOptions"="-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"

Note added October 2017: see this link for more info on tools and techniques available for stack profiling and flame graph visualization for Spark.

Here below you can find two flame graphs that visualize the stack profiles collected for Spark 1.6 and Spark 2.0 while running the test workload/query. The graphs represent samples collected over 100 seconds. The major differences you should notice between the two flame graphs are that on Spark 1.6 the execution iterates over rows of data, looping on Row Iterator to Scala for example. In the Spark 2.0 example, however, you can see in the flame graph that the methods executing the bulk of the work are built/optimized with whole-stage code generation. For example the method where most time is spent during execution is code-generated and performs operations on Hash Maps in vector form.

What you can learn from flame graphs:

  • The flame graph for Spark 1.6 shows that a considerable amount of CPU cycles are spent on the Scala collection iterator. This can be linked with Spark 1.6 using the "traditional volcano model" for SQL execution. This is the part that is optimized in Spark 2.0 (see next bullet points).
  • Spark 2.0 is making use of whole-stage code generation and does not use Scala collection iterator.
  • Spark 2.0 is also using Vectorized Hash Maps to perform aggregations that are also code generated. The use of vectorized operations is likely introducing further performance improvements.


Spark 1.6:


Flame graph for a sample of the execution of the test query using Spark 1.6 in local mode (on a machine with 16 cores). Note that most of the time is spent processing data on a iterative way (which is not optimal). Click on this link for a SVG version of the graph where you can drill down on the details of each step.


Spark 2.0:


Flame graph for a sample of the execution of the test query using Spark 2.0 in local mode (on a machine with 16 cores). Note that most of the time is spent executing code that is generated dynamically via whole-stage code generation. Click on this link for a SVG version of the graph where you can drill down on the details of each step.


Note: the process of collecting stack profiles for Spark in this test is made easier by the fact that I have used Spark in local mode, which results in only one (multi-threaded) process to trace in a single box . In the general case tracing Spark is more complicated due to the distributed nature of the workload when running on a cluster for example.


Linux Perf stat counters

In this paragraph you can find the output of Linux Perf stat counters measured during the execution of the test query. The idea is to find differences in the run-time usage of resources that can further highlight the origin of the performance improvement that was measured in Spark 2.0 compared to Spark 1.6. The selection of stat counters to measure is taken from Tanel Poder's blog post "RAM is the new disk – and how to measure its performance – Part 2 – Tools". Notably you can find there also a short explanation of meaning of the counters.

# perf stat -e task-clock,cycles,instructions,branches,branch-misses \
          -e stalled-cycles-frontend,stalled-cycles-backend \
          -e cache-references,cache-misses \
          -e LLC-loads,LLC-load-misses,LLC-stores,LLC-store-misses \
          -e L1-dcache-loads,L1-dcache-load-misses,L1-dcache-stores,L1-dcache-store-misses \
          -p <pid_spark_process> sleep 100


What you can learn from comparing perf stat counters between Spark 1.6 and Spark 2.0 runs:
  • In both cases the workload is CPU-bound. The machine has 16 cores and is configured with multi-threading support (i.e. 32 execution threads). Perf stat counters report an average CPU utilization of about 31 CPU threads in both cases, which confirms the fact that the workload is CPU bound.
  • Reading from main memory seems to be key and Spark 2.0 appears to access memory with much higher throughput than Spark 1.6. In particular, I believe it is important to look at the counters LLC-loads and LLC-load-misses, those count respectively how many time a cache line was requested from last level cache (LLC) and the fraction of those requests that resulted in access from main memory. Notably Spark 2.0 in the given sample reports 33 M/sec LLC-loads with ~63% of loads resulting in misses (reads from main memory) while Spark 1.6 has 0,7 M/sec LLC-loads and also ~60% misses. I have noticed that these values fluctuate over different samples, but Spark 2.0 presents always much higher access rate to LLC and memory than Spark 1.6.
  • It is interesting to note that the measurements in the case of Spark 1.6 run present a higher ratio of instructions per cycle than the run with Spark 2.0. Spark 2.0 workload is stalling for memory access more frequently. A higher ratio of instructions per cycle is often an indicator of better performance, however, in this case the opposite appears to be true. I believe a possible interpretation of what is happening is that Spark 2.0 is more efficient at using CPU resources and high throughput to memory, therefore it quickly gets into what appears to be the bottleneck for this workload: stalling for memory access.

This is the output of perf stat while running the test workload with Spark 1.6:

    3091790.707578      task-clock (msec)         #   30.915 CPUs utilized
 7,325,423,029,717      cycles                    #    2.369 GHz                     [25.01%]
 9,577,944,921,957      instructions              #    1.31  insns per cycle
                                                  #    0.45  stalled cycles per insn [31.25%]
 1,323,763,955,367      branches                  #  428.154 M/sec                   [31.06%]
     3,540,000,205      branch-misses             #    0.27% of all branches         [25.02%]
 4,332,575,248,710      stalled-cycles-frontend   #   59.14% frontend cycles idle    [25.02%]
 1,809,219,108,190      stalled-cycles-backend    #   24.70% backend  cycles idle    [25.01%]
     4,025,591,940      cache-references          #    1.302 M/sec                   [27.12%]
     2,688,865,386      cache-misses              #   66.794 % of all cache refs     [29.14%]
     2,305,317,283      LLC-loads                 #    0.746 M/sec                   [25.40%]
     1,382,318,864      LLC-load-misses           #   59.96% of all LL-cache hits    [25.40%]
     1,265,162,587      LLC-stores                #    0.409 M/sec                   [24.60%]
     1,256,986,002      LLC-store-misses          #    0.407 M/sec                   [26.51%]
 3,084,754,630,344      L1-dcache-loads           #  997.724 M/sec                   [29.69%]
    18,141,140,551      L1-dcache-load-misses     #    0.59% of all L1-dcache hits   [28.93%]
   891,386,721,821      L1-dcache-stores          #  288.308 M/sec                   [25.97%]
     1,281,601,143      L1-dcache-store-misses    #    0.415 M/sec                   [25.20%]


This is the output of perf stat while running the test workload with Spark 2.0:

    3095743.023060      task-clock (msec)         #   30.955 CPUs utilized
 7,267,137,941,598      cycles                    #    2.347 GHz                     [25.01%]
 5,810,442,547,610      instructions              #    0.80  insns per cycle
                                                  #    1.00  stalled cycles per insn [31.25%]
 1,142,058,628,367      branches                  #  368.913 M/sec                   [31.07%]
    17,683,392,720      branch-misses             #    1.55% of all branches         [25.43%]
 5,791,745,210,259      stalled-cycles-frontend   #   79.70% frontend cycles idle    [25.23%]
 3,993,653,110,520      stalled-cycles-backend    #   54.95% backend  cycles idle    [25.03%]
    51,936,035,185      cache-references          #   16.777 M/sec                   [30.84%]
    14,525,498,107      cache-misses              #   27.968 % of all cache refs     [25.21%]
   104,521,878,980      LLC-loads                 #   33.763 M/sec                   [25.01%]
    66,262,327,528      LLC-load-misses           #   63.40% of all LL-cache hits    [28.30%]
       301,797,039      LLC-stores                #    0.097 M/sec                   [26.72%]
       215,020,365      LLC-store-misses          #    0.069 M/sec                   [24.58%]
 2,520,703,012,324      L1-dcache-loads           #  814.248 M/sec                   [24.80%]
    96,261,558,827      L1-dcache-load-misses     #    3.82% of all L1-dcache hits   [24.99%]
   178,644,475,917      L1-dcache-stores          #   57.706 M/sec                   [29.09%]
     1,045,403,531      L1-dcache-store-misses    #    0.338 M/sec                   [27.73%]



Source code

If you want to further drill down on the changes  in Spark 2.0 that benefit the performance of the test workload you can head to GitHub and browse the source code of Spark. For example from the flame graphs you can find the name of relevant the classes with path and/or you can use the search function in GitHub. So far I have only skimmed through the source code with these methods and found a few links that I believe are interesting as an example of the drill-down analysis that one can do thanks to the fact that Spark is an open source project:

  • One link of interest is "org.apache.sql.execution.WholeStageCodegenExec". This is code introduced in the Spark 2.0 branch, you can find there also comments that shed some light on the mechanism used for code generation.
  • Another interesting point is about the use of "vectorized hash maps" in Spark 2.0, which appears important as it is on the top line of the Spark 2.0 flame graph: "org.apache.spark.sql.executio.aggregate.VectorizedHashMapGenerator.scala" has additional details about the implementation. You can find there that this is an implementation for fast index lookup, also introduced in the Spark 2.0 branch. It is also mentioned there that the execution can be code generated for boosting its performance, that is what you can see happening in the flame graph of Spark 2.0 workload.


Tips on how to build a test environment 

For the readers who are not familiar with running Spark, here some tips on how to build a test environment:
Download Spark from http://spark.apache.org/downloads.html
You will not need to have Hadoop and/or a YARN cluster to run the tests described in this post.
An easy way to install a Python environment is by downloading Anaconda from https://www.continuum.io/downloads
You can download Java 8 from Oracle technet: http://www.oracle.com/technetwork/java/javase/downloads/index.html
Code for generating flame graphs for Spark using Java Flight Recorder (see the recipe at this link) at: https://github.com/brendangregg/FlameGraph and https://github.com/chrishantha/jfr-flame-graph


Summary

Apache Spark 2.0 has important optimizations for performance compared to Spark version 1.6. Notably Spark optimizer and execution engine in version 2.0 can take advantage of whole-stage code generation and of vector operations to make more efficient use of CPU cycles and memory bandwidth for improved performance. This post briefly discusses an example how Spark SQL and its parallel execution engine have been useful to tune a query from a production RDBMS. Moreover an example comparing Spark 1.6 and Spark 2.0 performance has been discussed and drilled-down using execution plan details, flame graphs and Linux Perf stat counters.


Additional comments and my take-away from the tests in this post

The Hadoop ecosystem provides a powerful and easy-to-use environment for running reports and analytics queries. The point is nicely illustrated for me by the fact that we could simply take data and a query from production RDBMS and run it on the Hadoop cluster (with Spark and Impala) to make it run with parallelism and fast. This provides a simple and quick way to throw HW at a performance problem.

I am impressed by the work on Spark 2.0 optimizations for whole-stage code generation, in particular by how these new features address the important point of how to optimize CPU-bound workloads.  This makes a great addition to Spark and strengthen its position as a leading player in data processing a scale.

Query compilation and/or code generation for executing SQL has become a common feature for many of the new databases appearing on the market optimized for "in memory" (i.e. processing an important fraction of their workload in main memory). This is implemented in various forms for different products, however it is proven to give significant gains in performance, typically of the order of one order of magnitude, for queries where it is applicable. The test case examined in this post provides an example of this type of optimization.

How are the mainstream RDBMS engines, that typically process result sets in an iterative way (similarly to what was found in this post with Spark 1,6 and often referred to as the volcano model) going to respond to this performance-based challenge?


Acknowledgements and references

This work has been made possible and funded by CERN IT, in particular in the context of the CERN IT Hadoop Service and Database Services. In particular I would like to thanks CERN colleagues who have contributed to the performance troubleshooting case mentioned in this post: Raul Garcia Martinez, Zbigniew Baranowski and Luca Menichetti.

On the topic of Spark 2.0 improvements for code generation, see the blog post "Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop" and the references therein, notably including "Efficiently compiling efficient query plans for modern hardware" and JIRA ticket SPARK-12795.
On the topic of query compilation on modern database systems vs. the volcano model, see also the lecture by Andy Pavlo on Query Compilation.
Flame graphs are the brain child of Brendan Gregg.
Additional links on using Linux Perf to measure performance counters: this article by Brendan Gregg and the 3-part blog posts by Tanel Poder on "RAM is the new disk".
On the topic of connecting Hadoop and relational databases see also Tanel's presentation "Connecting Hadoop and Oracle".


Wednesday, November 18, 2015

Oracle Wait Events Investigated with Extended Stack Profiling and Flame Graphs

Topic: this post is about investigating Oracle wait events using stack profiles and flame graphs extended with OS-process state and Oracle wait event details.

Context: The case of the DB Time > CPU Time + Wait Time

Oracle instrumentation provides wait event and CPU time accounting, a powerful and readily accessible data source for performance troubleshooting. An Oracle session at a given point in time is either on CPU, for example when processing data from cache, or it is in a waiting state, for example waiting for an I/O operation to complete.
A recent talk by John Beresniewicz (JB) at the Oak Table World 2015 goes into the details of an interesting case, where the CPU time and wait time measurements from Oracle's V$ tables (and AWR reports) appear to violate the equation "CPU time + Wait time = DB Time".
In particular JB discusses in details a case of I/O testing, originally raised by Kevin Closson, where this discrepancy is apparent and originates from the fact of using modern "fast" storage.
To further set the context for the following discussion, you can watch JB's presentation and the slides. In short the root cause of the issue is that a significant amount of CPU cycles spent serving I/Os by the OS can be double counted both into Oracle I/O wait events and CPU counters. In this post you will learn of a technique and related tools for extended stack tracing that can be used to investigate this and similar cases and provide direct evidence of what is happening "inside Oracle wait events".

A test system reproducing high CPU usage inside I/O wait events

This is about reproducing a case where AWR reports  %Wait + %CPU > 100% (of DB Time). Following the original case reported by JB, the test cases are generated using Kevin Closson's SLOB. One crucial point for the following is to have a system that serves random I/O with low latency. For this you don't necessarily need expensive HW, you can emulate (for testing) a fast I/O subsystem  in various ways, typically using RAM-based devices.
The test system used in the following is a Virtual box VM with 4 CPU/cores and 4GB of RAM. The test DB is installed with OL 6.7 and the test database is Oracle version 12.1.0.2. Storage has been configured as three block devices and handled with ASM on the database in the VM, but it is actually served by 3 files on the host machine. The buffer cache of the test database is set to a very low size,  64MB.
The important point is that the host where Virtualbox runs has enough RAM to be able to cache the files used as storage for the test DB. A host with 128 GB of RAM has been used and the active set for the test DB is 40 GB.

Running the test workload, I/O bound with low latency I/O and high number of IOPS

This first test consists in running SLOB for a few minutes to drive the (I/O bound) workload. It is a simple read-only workload (random reads of single Oracle blocks, 8 KB) and no updates. Only one SLOB schema is needed, this has also the advantage of keeping the overall server CPU utilization low. Some relevant slob.conf parameters are: SCALE=40000M, UPDATE_PCT=0, RUN_TIME=1000. The command to run the test, after SLOB schema setup, is simply ./runit.sh 1.
Additional checks and boundary conditions are:
  • The system is not CPU-bound. This should be the case if the only workload on the database is the one-schema SLOB run. However it is important to double check with OS commands, for example "vmstat" or "top". In a CPU-bound server interpreting CPU and wait time becomes more complicated, due to the additional time spent waiting in the run queue.
  • I/Os are served at low latency and high IOPS. If you build your test machine using a VM created with Virtualbox, as described earlier in this post, this can be achieved by using a host with enough RAM to cache the files acting as back-end for the VM storage. The caching on the host filesystem would normally happen during SLOB schema creation, if the host server has enough RAM and no concurrent activity is competing for the filesystem cache. However if needed, caching can be triggered by running a few full scans of the USER1.CF1 table.

The test reproduces the case "%Wait + %CPU > 100%"

The AWR report of the SLOB workload shows that main activity on the database is random reads, instrumented by Oracle with the wait events: "db file parallel read", "db file sequential read" and by CPU time. The AWR report also shows that the system did not have any other significant activity besides the SLOB workload. The notable point (see also Figure 1) is that the sum of the percentage of DB time spent on non-idle wait events and CPU adds up to a value significantly higher than 100%. This is an anomaly, as it seems to violate the concept that an Oracle process at a given point in time is either on CPU or waiting for a wait event.

Figure 1: Extract from the AWR report of an I/O bound workload (generated with SLOB) showing %Wait + %CPU > 100% (DB Time). This is an apparent anomaly of the instrumentation as DB time is expected to be constituted of DB CPU and non-idle wait time.

Another way to measure the workload is with Tanel Poder's snapper. Snapper has the advantage of drilling down to session level rather than aggregating at the system level. The output of a 10-second sample is reported in Figure 2. From the metrics reported there, you can see that the workload is mostly single-block reads at a rate of 10K IOPS and that the wait time for I/O wait and CPU adds up to a value larger than DB Time. Note that DB Time for this workload is equal to the elapsed time, as there is only 1 session active and it is active all the time serving DB calls. An extract of relevant snapper measurements from Figure 2 is:

TIME, DB CPU: 71.5%
WAIT, db file sequential read: 9.1%
WAIT, db file parallel read: 78.7%
STAT, physical read IO requests: 10.31k


Figure 2: Snapper output of the SLOB workload in the case of low-latency I/O. The sum of DB CPU and non-idle Wait Time is larger than DB Time, which is an anomaly of the instrumentation. ASH data, however, reports what looks like a correct accounting of the percentage of DB Time spent into DB CPU and Wait events.


Something is wrong: the sum of CPU time and wait time should not be larger than DB time. One first check that you can do is to investigate if the CPU time measurement reported by Oracle is accurate. It turns out that the problem is not here, you can verify this using OS commands, for example with "pidstat". Pidstat output (not reported here for brevity) is also interesting because it adds two additional pieces of information: it confirms that the process spends most of its time on CPU and that the CPU cycles are accounted as "sys" CPU time, i.e. time spent in kernel operations.
The wait events are timed by Oracle, at the beginning of the wait and measured again at the end. The Oracle functions kskthbwt (Kernel service Kompile thread begin wait) and kskthewt (Kernel service Kompile thread end wait) appear to take care of measuring the wait time and update the instrumentation with the relevant metrics. In particular the wait time measurement for "db file sequential read" and "db file parallel read" seems to correctly account for the time spent in OS calls from the tracing tests performed by the author and reported in the post "Life of an Oracle I/O: Tracing Logical and Physical I/O with SystemTap".

Moving into the active session sampling data of snapper reported at the bottom of Figure 2, you can find a more correct picture of where DB Time is spent: most of the DB time is spent in wait and a small amount on CPU. Most importantly the sum of DB CPU and Wait events adds up to 100%, as expected.
Following JB's analysis, the collected evidence points to the fact that many CPU cycles spent performing kernel-related activities for serving I/O are accounted into Oracle wait events. Therefore the sum of %CPU and %Wait is higher than 100% because of the double counting that happens in  these events, especially with the I/O interface is working at a high rate of IOPS. In the following you will see how profiling of OS-process state together with wait events and kernel stacks provides direct observation of this hypothesis.

Direct investigation with extended stack traces and flame graphs

The idea is to sample the Oracle process running the test workload (random I/O driven by SLOB) using kernel stack profiles extended with additional sampling of the Oracle OS-process state and the Oracle wait event interface. This is an extension of the work discussed in the blog article "Linux Kernel Stack Profiling and Flame Graphs Applied to Oracle Investigations". A custom tool ORA_KStackProfiler is introduced to perform the following actions:
  • sampling of /proc/<pid>/stat to measure the process state (Running, Stopped, Disk, etc),
  • sampling /proc/<pid>/stack to sample the kernel stack trace if any
  • sampling the status of the Oracle session (on CPU of waiting for a wait event). The sampling is done directly from SGA by reading the memory locations corresponding to X$KSUSE.KSUSEOPC and X$KSUSE.KSUSETIM.
Note that ORA_KStackProfiler is an experimental tool with several limitations, including the fact that sampling is not done atomically (the stack profile, process state and wait event are gathered with subsequent calls), which can include errors. See also the README file.
These are the steps used for testing and the resulting flame graph can be found in Figure 3:

1. Run "make" to build the executable: ora_kstackprofiler

2. Run the scripts in the oracle_scripts directory 
     - run eventsname.sql using SQL*plus to generate the file eventsname.sed translating event# into event names 
     - run find_pid_ksustim_ksuseopc.sql using SQL*plus to find the pid and details of X$KSUSE of the Oracle session to trace
Example: 

SQL> @find_pid_ksustim_ksuseopc.sql
This script finds the OS pid and address of X$KSUSE.KSUSEOPC and X$KSUSE.KSUSETIM for a given Oracle session. To be used together with ora_kstackprofiler. Run as user SYS.

Enter Oracle SID to be investigated: 111
OS pid = 1234, ksuseopc = 5796061762, ksusetim = 5796061792

3. Stack traces sampling extended with Oracle wait event data as in the following example:

./ora_kstackprofiler --pid 1234 --delay 100 --count 1000 --ksuseopc 5796061762 --ksusetim 5796061792 > ora_stack_slob_fastio.txt

4. Process the output with additional filtering and with flame graphs for visualization using https://github.com/brendangregg/FlameGraph

cat ora_stack_slob_fastio.txt| grep -v 0xffffffffffffffff | sed -f oracle_scripts/eventsname.sed | ../FlameGraph/stackcollapse-stap.pl| ../FlameGraph/flamegraph.pl --title "Kernel stack, OS State and Wait event profiling of Oracle random I/O (SLOB workload, low latency I/O)" > ora_stack_slob_fastio.svg


Figure 3: Flame graph visualization of Kernel stack traces extended with process state sampling and wait event details from SGA show a large amount of CPU cycles consumed inside the wait events "db file parallel read" and "db file sequential read" for an I/O bound workload (low-latency random reads generated with SLOB). Note that the wait event details from SGA is reported at the bottom of the graph. The line above that is the process state from /proc/<pid>/stat. On top, the kernel stack trace from /proc/<pid>/stack. Click here for a svg version of the graph.


Figure 3 shows a flame graph of the Kernel stack traces extended with process state sampling and wait event details from SGA. The wait event data is sampled from SGA (and in particular from the memory locations for X$KSUSE.KSUSEOPC and X$KSUSE.KSUSETIM) and displayed at the bottom line of the graph. Just on top of that, the OS process state from /proc/<pid>/stat, finally the kernel stack data from /proc/<pid>/stack.
The flame graph of Figure 3 provides direct evidence of the fact that a significant amount of the time accounted by Oracle as wait events for I/O, is actually spent consuming CPU cycles for serving I/O.
This is consistent with the fact that I/O in this test is served at high rate. The percentage of samples marked as "On CPU (Oracle state)" is about 10%, which is consistent with what measured with ASH (see snapper output in Figure 2).
There are some important limitations in the method of extended stack sampling: sampling of the stack profile, process state and Oracle wait event details is not atomic, so we can expect mismatches. Moreover in Figure 3 you can see that the percentage of samples in OS state "running" is higher that the amount of CPU time measured with pidstat, in this example by about 20%. Note that the state "running" reported from /proc/pid/status, means that the process is either actually running on CPU or that it is runnable state (in the run queue). When reading the flame graph you should also remember that it is based on sampling, therefore it shows the stack traces that are mostly active. For example in Figure 3 you can see io_submit as one of the most active functions/code paths for asynchronous I/O (in the context of the wait event "db file parallel read"). The corresponding call io_getevents does not appear in the graph as the low latency and high rate of  I/O makes this operation very fast. Tracing with strace or SystemTap can be used to further drill down on this observation with measurements of the I/O calls and their timing.

For additional understanding of what the system is doing it is useful to measure also the userspace stack. For this an extended tool is introduced: Ptrace_Profiler.
The output of Ptrace_Profiler is reported in Figure 4, with some annotations of relevant functions in the userspace and their meaning. You can see that most of the time is spent in operations related to asynchronous I/O, both on the kernel level and on the userspace libaio function. This is consistent with the workload executing random reads under the wait event "db file parallel read" at high speed.


Figure 4: Flame graph visualization of kernel and userspace stack traces for Oracle I/O workload (SLOB) extended with OS process state and wait event details. Data is collected with a custom tool: Ptrace_Profiler. Some of the relevant functions are annotated with their role. Most of the time in the workload is spent executing I/O and in particular io_submit in libaio or in the kernel. Click here for a svg version of the graph.


I/O latency

The nature of the workload and in particular the high number of IOPS and low latency of the storage play an important role in amplifying the amount of CPU consumed in Oracle wait events per unit of time (more work requires more CPU cycles). A direct confirmation that the storage is serving I/O at low latency can be found with OS-based tools reading from block I/O tracepoints. The following tools can be used based on SystemTap and Ftrace: blockio_rq_issue_filter_latencyhistogram.stp and iolatency_micro. Some sample measurements are reported here below. Note that a large number of I/O requests are served in the bucket 64-to-128 microseconds.

# ./iolatency_micro 10
Tracing block I/O. Output every DeltaT = 10 seconds. Ctrl-C to end.
 >=(mus) .. <(mus)  : IOPS       IO_latency/DeltaT |IOPS Distribution                     |
      16 -> 32      : 0          0                 |                                      |
      32 -> 64      : 623        29947             |####                                  |
      64 -> 128     : 6302       605049            |######################################|
     128 -> 256     : 3482       668659            |#####################                 |
     256 -> 512     : 664        255206            |#####                                 |
     512 -> 1024    : 47         36249             |#                                     |
    1024 -> 2048    : 3          5529              |#                                     |
    2048 -> 4096    : 4          13209             |#                                     |



No double counting of CPU usage into wait events with slow I/O

You may not have found a situation like the one described above in your production DBs. Here we want to investigate what made the test "special". The key element of the example above is the low latency and high IOPS of the test workload. This causes the system to be very busy serving I/O and this amplifies the effect of double counting CPU in wait events. In this paragraph an additional test will hopefully make the matter more clear. The idea is to run the same SLOB workload against a storage system serving I/Os with much higher latency. In the test system used for this blog post, based on a VM, the I/O can be slowed down simply by flushing the filesystem cache on the host system (that is the system running Virtualbox):

# free && sync && echo 3 > /proc/sys/vm/drop_caches && free

The next step is to start the SLOB workload using the same test parameters as in the previous test, with the main difference that the I/O workload now runs against a much slower storage system: I/Os are not cached in the Virtualbox host anymore and are served by slow SATA disks. Using OS instrumentation and the AWR report one can quickly see that the system is I/O bound, serving only 300 IOPS (it was 10K IOPS in the previous test with low-latency I/O). Note also from Figure 5 that in this test %DB CPU + %Wait = 102% (of DB Time). This number is quite close to the ideal value of 100% and could easily be interpreted as measurement noise, if you did not have the above analysis to explain the effect double counting of CPU in I/O wait events. Here below an example of the latency histogram taken during this test, note the predominance of very high I/O latency.

# ./iolatency_micro 10
Tracing block I/O. Output every DeltaT = 10 seconds. Ctrl-C to end.
 >=(mus) .. <(mus)  : IOPS       IO_latency/DeltaT |IOPS Distribution                     |
      64 -> 128     : 0          48                |#                                     |
     128 -> 256     : 2          480               |#                                     |
     256 -> 512     : 5          2150              |##                                    |
     512 -> 1024    : 3          2918              |##                                    |
    1024 -> 2048    : 3          5222              |#                                     |
    2048 -> 4096    : 1          3686              |#                                     |
    4096 -> 8192    : 13         80486             |####                                  |
    8192 -> 16384   : 27         333004            |########                              |
   16384 -> 32768   : 44         1103462           |#############                         |
   32768 -> 65536   : 75         3706060           |######################                |
   65536 -> 131072  : 136        13389004          |######################################|
  131072 -> 262144  : 38         7569408           |###########                           |
  262144 -> 524288  : 0          157286            |#                                     |



Figure 5: Extract of the AWR report showing the top wait time for a test using SLOB workload against "slow I/O". Note that DB CPU and no-idle Wait sums up to 102% of DB Time. Which is close to the ideal case of a sum of 100%.

Using ORA_KStackProfiler you can get a more detailed view of what the system is doing while running the SLOB workload: most of the time is spent inside an I/O wait event, the most important being "db file parallel read". However contrary to the case of Figure 3, here the process is mostly in Sleep or Disk Sleep state, waiting for the io_getevents call reap I/Os. The flame graphs of Figure 6 (slow I/O) and Figure 3 (fast I/O) appear to be quite different but they just reflect two aspects of asynchronous I/O underlying the "db file parallel read" wait event. In the case of low latency I/O (Figure 3) most of the I/O wait time is spent submitting I/O while getting the results is very fast, in the opposite example of slow I/O (Figure 6), most of the execution time is spent waiting to reap I/Os (io_getevents). The code path is the same, stack profiling and flame graphs highlight in each case the code path where most of the time is spent.


Figure 6: Flame graph of the kernel stack profile extended with OS process state and wait event details for an I/O-bound Oracle workload generated with SLOB. The I/O system is serving I/Os at about 300 IOPS. Most of the time is spent waiting for I/O. During the I/O wait time the Oracle process is mostly in Sleep or Disk sleep state, although also on running/runnable state for a fraction of the time. Click here for a svg version of the graph.


Conclusions

Extended stack profiling and flame graphs combine visualization of kernel stack trace profiles, userspace stack, OS process state and Oracle wait event data. This technique can be used for advanced troubleshooting and root cause analysis to find the code paths that are most frequently executed and correlate this information with the application context. For Oracle troubleshooting this leads to additional data beyond what is available with the wait interface, to be used in special cases where low-level investigations are needed. An example is provided of using extended stack profiling for drilling down on a case of high utilization of CPU inside I/O wait events where the one observes the anomaly DB CPU Time + Wait Time >> DB time. Extended stack profiles and flame graphs provide direct evidence of what happens in the system and why CPU is consumed inside Oracle wait events.

Download the tools and previous work

The tool discussed in this post can be downloaded from this webpage or from Github.
Relevant previous work on stack profiling, flame graphs and investigations of Oracle workload and wait events can be found at: "Oracle Optimizer Investigated with Flame Graphs", "Flame Graphs for Oracle", "Linux Kernel Stack Profiling and Flame Graphs Applied to Oracle Investigations", "Life of an Oracle I/O: Tracing Logical and Physical I/O with SystemTap".

Acknowledgmeents and references

Brendan Gregg is the inventor of flame graphs and has published very interesting material on the topic. John Beresniewicz (JB)'s talk at the Oak Table World 2015 has discussed the original test case for this blog post. Kevin Closson is the author of  SLOB. Tanel Poder has covered the topic of profiling /proc/<pid>/stack in his blog. Frits Hoogland has many details on Oracle internals and I/O investigations in his blog.