Monday, November 21, 2016

IPython/Jupyter SQL Magic Functions for PySpark

Topic: this post is about a simple implementation with examples of IPython custom magic functions for running SQL in Apache Spark using PySpark and Jupyter notebooks.

If you are already familiar with Apache Spark and Jupyter notebooks you may want to go directly to the example notebook and code. If you want additional context and introduction to the topic of using Spark on notebooks, please read on.


Jupyter notebooks and Spark SQL

Notebooks are very useful and popular environments for data analysis. Among others they provide a user friendly environment for exploratory analysis and simplify the task of sharing your work such as preparing presentations and tutorials. Jupyter notebooks in particular are very popular, especially with Python users and data scientist. One of the neat tricks that you can do with IPython and Jupyter notebooks is to define "custom magic functions", these are commands processed by IPython that can be used as shortcuts for needed actions and functions. The custom magic functions extend the list of built-in magic commands.
In particular when running SQL in notebook environments, %sql magic functions provide handy shortcuts to the code. Custom magic functions come in two flavors, one is the line functions, such as %sql, that take their input from one line. For those cases where you want to run SQL statements that span over multiple lines you can use %%sql which works with cell input.
Apache Spark is a popular engine for data processing at scale. Spark SQL in particular provides a scalable and fast engine and API for processing structured data (see also docs in the references section of this post).
In this post you will find a simple way to implement magic functions for running SQL in Spark using PySpark (the Python API for Spark) with IPython and Jupyter notebooks.


IPython magic

One typical way to process and execute SQL in PySpark from the pyspark shell is by using the following syntax: sqlContext.sql("<SQL statement>") (code tested for pyspark versions 1.6 and 2.0) . It is easy to define %sql magic commands for IPython that are effectively wrappers/aliases that take the SQL statement as argument and feed them to sqlContext (see the docs at "custom magic functions"). An example of magic functions for running SQL in pyspark can be found at this link to the code . The following magic functions are defined in the accompanying example code:

%sql <statement>          - return a Spark DataFrame for lazy evaluation of the SQL
%sql_show <statement>     - run the SQL statement and show max_show_lines (50) lines 
%sql_display <statement>  - run the SQL statement and display the results using an HTML table. This is implemented passing via Pandas and displays up to max_show_lines (50)
%sql_explain <statement>  - display the execution plan of the SQL statement

An example of how the listed magic functions can be used to run SQL in PySpark can be found at this link to the example notebook. The code is simple and you can easily modify it to fit your needs, if different from the provided examples.




Tips on how build a test environment

If you are not familiar with using Spark SQL and notebooks, here some links that can help you get started.
Download Spark from http://spark.apache.org/downloads.html
Note: you will not need to have Hadoop and/or a YARN cluster installed to run the tests described in this post.
An easy way to install a Python environment and Jupyter is by downloading Anaconda from https://www.continuum.io/downloads
If not yet installed in your test machine, you can download Java 8 from Oracle Technet: http://www.oracle.com/technetwork/java/javase/downloads/index.html

The Python shell for Spark can be started simply by running "pyspark".
If you want to run pyspark inside a Jupyter notebook, as in the example notebook provided with this post, you can do that by setting the environment variable PYSPARK_DRIVER_PYTHON prior to running pyspark. Example:
export PYSPARK_DRIVER_PYTHON=$PATH_TO_ANACONDA/bin/jupyter-notebook

I also find this additional (optional) configuration useful:
export PYSPARK_DRIVER_PYTHON_OPTS="--ip=`hostname` --browser='/dev/null' --port=8888"

Similarly if you just want to run pyspark under IPython using the command line (rather than the web notebook interface with Jupyter), you can set PYSPARK_DRIVER_PYTHON to point to the executable for IPython prior to running pyspark. Example:
export PYSPARK_DRIVER_PYTHON=$PATH_TO_ANACONDA/bin/ipython

The paragraphs just lists a few examples of how you can get started with running the code described in this post. There are other ways to start your Python notebook environment for Spark. Notably you can start the notebook with IPython/Jupyter first and later manually start the Spark Context and SQLContext, therefore bypassing the use of the pyspark tool/shell.


Links to documentation, references and previous work

Link to Apache Spark documentation on Spark SQL and DataFrames and PySpark API.
Spark SQL Language Manual, which is part of the Databricks User Guide.
Overview article on Spark: "Apache Spark: A Unified Engine for Big Data Processing" in Communications of the ACM November 2016.
Article with details on Spark SQL: "Spark SQL: Relational Data Processing in Spark", proceeding of SIGMOD 2015
I have covered in this blog other examples of how to implement and use %sql magic functions, see: "IPython/Jupyter Notebooks for Oracle" and "IPython Notebooks for Querying Apache Impala".
Previous work published in this blog on Spark SQL includes "Apache Spark 2.0 Performance Improvements Investigated With Flame Graphs".
This blog post and accompanying example code have been developed in the context of the CERN IT Hadoop and Spark service.


Thursday, September 15, 2016

Apache Spark 2.0 Performance Improvements Investigated With Flame Graphs

Topic: This post is about performance optimizations introduced in Apache Spark 2.0, in particular whole-stage code generation. A test case is introduced and investigated with diagnostic tools.


Introduction: performance troubleshooting of a slow query using parallel query execution in a Hadoop cluster

The idea for this post comes from a performance troubleshooting case that has come up recently at CERN database services. It started with a user reporting slow response time from a query for a custom report in a relational database. After investigations and initial troubleshooting, the query was still running slow (running in about 12 hours). It was understood that the query was mostly running "on CPU" and spending most of its time in evaluating a non-equijoin condition repeated 100s of millions of times. Most importantly it was also found that the query was easily parallelizable, this was good news as it meant that we could simply "throw hardware at it" to make it run faster. One way that the team (see the acknowledgments section at the end of the post) used to parallelize the workload (without affecting the production database), is to export the data to a Hadoop cluster and run the query there using Spark SQL (the cluster used has 14 nodes, installed with CDH 5.7, Spark version 1.6). This way it was possible to bring the execution time down to less than 20 minutes. All this with relatively low effort, as the query could be run basically unchanged.
This post is about Spark, however it is interesting to note that the query was also tested using Apache Impala (version 2.5) on the same cluster and produced comparable speedup results. Later I have also run the query on a "beefy Oracle server" with 60 cores adding the relevant configuration for activating Oracle parallel query (test done on Oracle RDBMS version 12.1.0.2) reproducing comparable execution times/speedup as in the cases of Spark and Impala.


Spark 2.0 enters the scene

As I write this post, Spark 1.6 is installed in our production clusters and Spark 2.0 is still relatively new (it has been released at the end of July 2016). Notably Spark 2.0 has very interesting improvements over the previous versions, among others improvements in the area of performance that I was eager to test (see this blog post by Databricks).
My first test was to try the query discussed in the previous paragraph on a test server with Spark 2.0 and I found that it was running considerably faster than in the tests with Spark 1.6. The best result I achieved, this time on a large box with 60 CPU cores and using Spark 2.0, was an elapsed time of about 2 minutes (to be compared with 20 minutes in Spark 1.6). I was impressed by Spark 2.0's speedup compared to Spark 1.6 and decided to investigate further.


The test case

Rather than using the original query and data, I will report here on a synthetic test case that hopefully illustrates the main points of the original case and at the same is simple and easy to reproduce on your test systems, if you wish to do so. This test uses pyspark, the Python interface to Spark, in the simplest configuration for a test machine: local mode (that is without hadoop or yarn). If needed you can add to the command line "--master local". If you are not familiar with how to run Spark, see further on in this post some hints on how to build a test system.

The preparation of the test data proceeds as follows: (1) it creates a DataFrame and registers it as table "t0" with 10 million rows. (2) Table t0 is used to create the actual test data, which is composed of an "id" column and three additional columns of randomly generated data, all integers. The resulting DataFrame is cached in memory and "registered" as a temporary table called "t1". Spark SQL interface for DataFrames makes this preparation task straightforward:

$ pyspark --driver-memory 2g

test_numrows = 1e7

sqlContext.range(0,test_numrows,1).registerTempTable("t0")

sqlContext.sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().registerTempTable("t1")

The following commands are additional checks to make sure the table t1 has been created correctly and is first read into memory. In particular, note that "t1" has the required test_numrows (10M) rows and the description of its column from the output of the command "desc":

sqlContext.sql("select count(*) from t1").show()

+--------+
|count(1)|
+--------+
|10000000|
+--------+

sqlContext.sql("desc t1").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|      id|   bigint|       |
|  bucket|   bigint|       |
|    val1|   bigint|       |
|    val2|   bigint|       |
+--------+---------+-------+

The actual test query is here below. It consists of a join with two conditions: an equality predicate on the column bucket, which becomes an obvious point of where the query can be executed in parallel, and a more resource-intensive non-equality condition. Notably the query has also an aggregation operation. Some additional boilerplate code is added for timing the duration of the query:

import time
starttime=time.time()

sqlContext.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()

print("Delta time = %f" % (time.time()-starttime))


The results are that Spark 2.0 is about 7 times faster than Spark 1.6 when running the test query on the test server (see details below): Spark 2.0 completed the job in about 15 minutes of elapsed time and 390 minutes of CPU time, while Spark 1.6 took about 100 minutes of elapsed time and 2840 minutes of CPU time. There are fluctuations on the actual job execution time between runs, however you can ignore the fine details and focus on the main finding that the performance difference is striking between runs using Spark 1.6 and Spark 2.0 (Spark 2.0 being much faster). This is worth further investigations into the internals and root causes.

I have run the tests using Spark in its simplest configuration (local mode) using a standalone (non-clustered) server with 16 cores (2 x  E5-2650) and 128 GB of RAM (the virtual memory allocated by the test workload is about 16 GB) running Linux (kernel 2.6.32, RHEL 6.7). If you want to run it on a smaller machine you can scale down the preparation phase by setting test_numrows to a smaller value (for example to 1e6). In that case you probably could do also with using the default value of 1g for the driver-memory.
The tests have been performed on a single server alternating runs with Spark 1.6 and 2.0. In both cases monitoring with OS tools showed that the jobs were CPU-bound, that is with 32 threads (16 cores x 2 for multithreading) running on CPU and utilizing the available resources on the test box. During the tests, no additional significant workload was running on the box.


Drilling down into the execution plans

The physical execution plan generated and executed by Spark (in particular by Catalyst, the optimizer and Tungsten, the execution engine) has important differences in Spark 2.0 compared to Spark 1.6. The logical plan for executing the query however deploys a sort merge join in both cases. Please note in the execution plans reported here that in the case of Spark 2.0 several steps in the execution plan are marked with a star (*) around them. This marks steps optimized with whole-stage code generation.


Physical execution plan in Spark 1.6:

Note that a sort merge join operation is central to the execution plan of this query. Another important step after the join is the aggregation operation, used to compute "sum(a.val2)" as seen in the query text:



Physical execution plan in Spark 2.0:

Note in particular the steps marked with (*), they are optimized with who-stage code generation:




Details of the SQL execution from the Spark Web UI, Spark 1.6. vs. Spark 2.0. This reproduces the physical execution plan with additional metrics gathered at run-time. Note in particular in Spark 2.0 the steps marked as "Whole Stage Codegen".


Code generation is the key

The key to understand the improved performance is with the new features in Spark 2.0 for whole-stage code generation. This is expected and detailed for example in the blog post by Databricks Engineering "Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop Deep dive into the new Tungsten execution engine". The main point is that Spark 2.0 compiles query execution into bytecode that is then executed, as opposed to looping with an iterator over result sets. A detailed discussion on the benefits of query compilation and code generation vs. the "traditional approach" to query execution, also called volcano model, can be found in the lecture by Andy Pavlo on Query Compilation.


Run time investigations with flame graphs

Flame graphs visualization of stack profiles provide additional insights on which parts of the code are executed and where CPU cycles are consumed. The upper layers of the flame graph highlight where CPU cycles are spent. The lower layers add context by detailing the information on the parent functions/methods that called the "upper layers". The idea for this paragraph is to use stack profiles and flame graphs to further drill down on the differences in the execution model between Spark 2.0 and Spark 1.6.

To collect and generate the flame graphs I have used the methods described by Kay Ousterhout in "Generating Flame Graphs for Apache Spark using Java Flight Recorder".  I have used the Java flight recorder on Oracle's Java 8, starting pyspark with the following options:

pyspark --conf "spark.driver.extraJavaOptions"="-XX:+UnlockCommercialFeatures -XX:+FlightRecorder" --conf "spark.executor.extraJavaOptions"="-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"

Note added October 2017: see this link for more info on tools and techniques available for stack profiling and flame graph visualization for Spark.

Here below you can find two flame graphs that visualize the stack profiles collected for Spark 1.6 and Spark 2.0 while running the test workload/query. The graphs represent samples collected over 100 seconds. The major differences you should notice between the two flame graphs are that on Spark 1.6 the execution iterates over rows of data, looping on Row Iterator to Scala for example. In the Spark 2.0 example, however, you can see in the flame graph that the methods executing the bulk of the work are built/optimized with whole-stage code generation. For example the method where most time is spent during execution is code-generated and performs operations on Hash Maps in vector form.

What you can learn from flame graphs:

  • The flame graph for Spark 1.6 shows that a considerable amount of CPU cycles are spent on the Scala collection iterator. This can be linked with Spark 1.6 using the "traditional volcano model" for SQL execution. This is the part that is optimized in Spark 2.0 (see next bullet points).
  • Spark 2.0 is making use of whole-stage code generation and does not use Scala collection iterator.
  • Spark 2.0 is also using Vectorized Hash Maps to perform aggregations that are also code generated. The use of vectorized operations is likely introducing further performance improvements.


Spark 1.6:


Flame graph for a sample of the execution of the test query using Spark 1.6 in local mode (on a machine with 16 cores). Note that most of the time is spent processing data on a iterative way (which is not optimal). Click on this link for a SVG version of the graph where you can drill down on the details of each step.


Spark 2.0:


Flame graph for a sample of the execution of the test query using Spark 2.0 in local mode (on a machine with 16 cores). Note that most of the time is spent executing code that is generated dynamically via whole-stage code generation. Click on this link for a SVG version of the graph where you can drill down on the details of each step.


Note: the process of collecting stack profiles for Spark in this test is made easier by the fact that I have used Spark in local mode, which results in only one (multi-threaded) process to trace in a single box . In the general case tracing Spark is more complicated due to the distributed nature of the workload when running on a cluster for example.


Linux Perf stat counters

In this paragraph you can find the output of Linux Perf stat counters measured during the execution of the test query. The idea is to find differences in the run-time usage of resources that can further highlight the origin of the performance improvement that was measured in Spark 2.0 compared to Spark 1.6. The selection of stat counters to measure is taken from Tanel Poder's blog post "RAM is the new disk – and how to measure its performance – Part 2 – Tools". Notably you can find there also a short explanation of meaning of the counters.

# perf stat -e task-clock,cycles,instructions,branches,branch-misses \
          -e stalled-cycles-frontend,stalled-cycles-backend \
          -e cache-references,cache-misses \
          -e LLC-loads,LLC-load-misses,LLC-stores,LLC-store-misses \
          -e L1-dcache-loads,L1-dcache-load-misses,L1-dcache-stores,L1-dcache-store-misses \
          -p <pid_spark_process> sleep 100


What you can learn from comparing perf stat counters between Spark 1.6 and Spark 2.0 runs:
  • In both cases the workload is CPU-bound. The machine has 16 cores and is configured with multi-threading support (i.e. 32 execution threads). Perf stat counters report an average CPU utilization of about 31 CPU threads in both cases, which confirms the fact that the workload is CPU bound.
  • Reading from main memory seems to be key and Spark 2.0 appears to access memory with much higher throughput than Spark 1.6. In particular, I believe it is important to look at the counters LLC-loads and LLC-load-misses, those count respectively how many time a cache line was requested from last level cache (LLC) and the fraction of those requests that resulted in access from main memory. Notably Spark 2.0 in the given sample reports 33 M/sec LLC-loads with ~63% of loads resulting in misses (reads from main memory) while Spark 1.6 has 0,7 M/sec LLC-loads and also ~60% misses. I have noticed that these values fluctuate over different samples, but Spark 2.0 presents always much higher access rate to LLC and memory than Spark 1.6.
  • It is interesting to note that the measurements in the case of Spark 1.6 run present a higher ratio of instructions per cycle than the run with Spark 2.0. Spark 2.0 workload is stalling for memory access more frequently. A higher ratio of instructions per cycle is often an indicator of better performance, however, in this case the opposite appears to be true. I believe a possible interpretation of what is happening is that Spark 2.0 is more efficient at using CPU resources and high throughput to memory, therefore it quickly gets into what appears to be the bottleneck for this workload: stalling for memory access.

This is the output of perf stat while running the test workload with Spark 1.6:

    3091790.707578      task-clock (msec)         #   30.915 CPUs utilized
 7,325,423,029,717      cycles                    #    2.369 GHz                     [25.01%]
 9,577,944,921,957      instructions              #    1.31  insns per cycle
                                                  #    0.45  stalled cycles per insn [31.25%]
 1,323,763,955,367      branches                  #  428.154 M/sec                   [31.06%]
     3,540,000,205      branch-misses             #    0.27% of all branches         [25.02%]
 4,332,575,248,710      stalled-cycles-frontend   #   59.14% frontend cycles idle    [25.02%]
 1,809,219,108,190      stalled-cycles-backend    #   24.70% backend  cycles idle    [25.01%]
     4,025,591,940      cache-references          #    1.302 M/sec                   [27.12%]
     2,688,865,386      cache-misses              #   66.794 % of all cache refs     [29.14%]
     2,305,317,283      LLC-loads                 #    0.746 M/sec                   [25.40%]
     1,382,318,864      LLC-load-misses           #   59.96% of all LL-cache hits    [25.40%]
     1,265,162,587      LLC-stores                #    0.409 M/sec                   [24.60%]
     1,256,986,002      LLC-store-misses          #    0.407 M/sec                   [26.51%]
 3,084,754,630,344      L1-dcache-loads           #  997.724 M/sec                   [29.69%]
    18,141,140,551      L1-dcache-load-misses     #    0.59% of all L1-dcache hits   [28.93%]
   891,386,721,821      L1-dcache-stores          #  288.308 M/sec                   [25.97%]
     1,281,601,143      L1-dcache-store-misses    #    0.415 M/sec                   [25.20%]


This is the output of perf stat while running the test workload with Spark 2.0:

    3095743.023060      task-clock (msec)         #   30.955 CPUs utilized
 7,267,137,941,598      cycles                    #    2.347 GHz                     [25.01%]
 5,810,442,547,610      instructions              #    0.80  insns per cycle
                                                  #    1.00  stalled cycles per insn [31.25%]
 1,142,058,628,367      branches                  #  368.913 M/sec                   [31.07%]
    17,683,392,720      branch-misses             #    1.55% of all branches         [25.43%]
 5,791,745,210,259      stalled-cycles-frontend   #   79.70% frontend cycles idle    [25.23%]
 3,993,653,110,520      stalled-cycles-backend    #   54.95% backend  cycles idle    [25.03%]
    51,936,035,185      cache-references          #   16.777 M/sec                   [30.84%]
    14,525,498,107      cache-misses              #   27.968 % of all cache refs     [25.21%]
   104,521,878,980      LLC-loads                 #   33.763 M/sec                   [25.01%]
    66,262,327,528      LLC-load-misses           #   63.40% of all LL-cache hits    [28.30%]
       301,797,039      LLC-stores                #    0.097 M/sec                   [26.72%]
       215,020,365      LLC-store-misses          #    0.069 M/sec                   [24.58%]
 2,520,703,012,324      L1-dcache-loads           #  814.248 M/sec                   [24.80%]
    96,261,558,827      L1-dcache-load-misses     #    3.82% of all L1-dcache hits   [24.99%]
   178,644,475,917      L1-dcache-stores          #   57.706 M/sec                   [29.09%]
     1,045,403,531      L1-dcache-store-misses    #    0.338 M/sec                   [27.73%]



Source code

If you want to further drill down on the changes  in Spark 2.0 that benefit the performance of the test workload you can head to GitHub and browse the source code of Spark. For example from the flame graphs you can find the name of relevant the classes with path and/or you can use the search function in GitHub. So far I have only skimmed through the source code with these methods and found a few links that I believe are interesting as an example of the drill-down analysis that one can do thanks to the fact that Spark is an open source project:

  • One link of interest is "org.apache.sql.execution.WholeStageCodegenExec". This is code introduced in the Spark 2.0 branch, you can find there also comments that shed some light on the mechanism used for code generation.
  • Another interesting point is about the use of "vectorized hash maps" in Spark 2.0, which appears important as it is on the top line of the Spark 2.0 flame graph: "org.apache.spark.sql.executio.aggregate.VectorizedHashMapGenerator.scala" has additional details about the implementation. You can find there that this is an implementation for fast index lookup, also introduced in the Spark 2.0 branch. It is also mentioned there that the execution can be code generated for boosting its performance, that is what you can see happening in the flame graph of Spark 2.0 workload.


Tips on how to build a test environment 

For the readers who are not familiar with running Spark, here some tips on how to build a test environment:
Download Spark from http://spark.apache.org/downloads.html
You will not need to have Hadoop and/or a YARN cluster to run the tests described in this post.
An easy way to install a Python environment is by downloading Anaconda from https://www.continuum.io/downloads
You can download Java 8 from Oracle technet: http://www.oracle.com/technetwork/java/javase/downloads/index.html
Code for generating flame graphs for Spark using Java Flight Recorder (see the recipe at this link) at: https://github.com/brendangregg/FlameGraph and https://github.com/chrishantha/jfr-flame-graph


Summary

Apache Spark 2.0 has important optimizations for performance compared to Spark version 1.6. Notably Spark optimizer and execution engine in version 2.0 can take advantage of whole-stage code generation and of vector operations to make more efficient use of CPU cycles and memory bandwidth for improved performance. This post briefly discusses an example how Spark SQL and its parallel execution engine have been useful to tune a query from a production RDBMS. Moreover an example comparing Spark 1.6 and Spark 2.0 performance has been discussed and drilled-down using execution plan details, flame graphs and Linux Perf stat counters.


Additional comments and my take-away from the tests in this post

The Hadoop ecosystem provides a powerful and easy-to-use environment for running reports and analytics queries. The point is nicely illustrated for me by the fact that we could simply take data and a query from production RDBMS and run it on the Hadoop cluster (with Spark and Impala) to make it run with parallelism and fast. This provides a simple and quick way to throw HW at a performance problem.

I am impressed by the work on Spark 2.0 optimizations for whole-stage code generation, in particular by how these new features address the important point of how to optimize CPU-bound workloads.  This makes a great addition to Spark and strengthen its position as a leading player in data processing a scale.

Query compilation and/or code generation for executing SQL has become a common feature for many of the new databases appearing on the market optimized for "in memory" (i.e. processing an important fraction of their workload in main memory). This is implemented in various forms for different products, however it is proven to give significant gains in performance, typically of the order of one order of magnitude, for queries where it is applicable. The test case examined in this post provides an example of this type of optimization.

How are the mainstream RDBMS engines, that typically process result sets in an iterative way (similarly to what was found in this post with Spark 1,6 and often referred to as the volcano model) going to respond to this performance-based challenge?


Acknowledgements and references

This work has been made possible and funded by CERN IT, in particular in the context of the CERN IT Hadoop Service and Database Services. In particular I would like to thanks CERN colleagues who have contributed to the performance troubleshooting case mentioned in this post: Raul Garcia Martinez, Zbigniew Baranowski and Luca Menichetti.

On the topic of Spark 2.0 improvements for code generation, see the blog post "Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop" and the references therein, notably including "Efficiently compiling efficient query plans for modern hardware" and JIRA ticket SPARK-12795.
On the topic of query compilation on modern database systems vs. the volcano model, see also the lecture by Andy Pavlo on Query Compilation.
Flame graphs are the brain child of Brendan Gregg.
Additional links on using Linux Perf to measure performance counters: this article by Brendan Gregg and the 3-part blog posts by Tanel Poder on "RAM is the new disk".
On the topic of connecting Hadoop and relational databases see also Tanel's presentation "Connecting Hadoop and Oracle".


Tuesday, July 26, 2016

How to Build a Neural Network Scoring Engine in PL/SQL

Topic: In this post, you will find an example of how to build and deploy a basic artificial neural network scoring engine using PL/SQL for recognizing handwritten digits. This post is intended for learning purposes, in particular for Oracle practitioners who want a hands-on introduction to neural networks.


Introduction

Machine learning and neural networks in particular, are currently hot topics in data processing. Many tools and platform are now easily available to work and experiment with neural networks and deep learning (see also the links at the end of this post)Recognizing hand-written digits, in particular using the MNIST database by Yann LeCun et al., is currently the "hello world" example for neural networks.
In this post, you will see how to build and deploy a simple neural network scoring engine to recognize handwritten digits using Oracle and PL/SQL. The final result is a short PL/SQL package with an accuracy of about 98%. The neural network is built and trained using TensorFlow and then transferred to Oracle for serving it.

One of the ideas that this post wants to illustrate is that scoring neural networks is much easier than training them: the operations required for serving a trained network can be implemented relatively easily on many computing languages/environments. Discussions on these topics normally are centered around platforms for "Big Data" (see for example Spark and MLlib). I find interesting to note that neural networks can also be successfully applied to the RDBMS world. This can be useful as large quantities of valuable data are currently stored in relational databases. In the case of Oracle, the implementation of a scoring engine is also made easier by the availability of a mature the PL/SQL environment with a package for linear algebra: UTL_NLA.


Let's start from the end: how to deploy the PL/SQL package MNIST and recognize handwritten digits using Oracle

One short PL/SQL package and two tables is all you need to replay the following example (you can find the details of the code on Github). The tables are:
  • TENSORS_ARRAY: this table contains the numerical values for the vectors and matrices (tensors) that constitute the neural network. There is a total of 79510 floating point numbers encoded into four tensors using the data type UTL_NLA_ARRAY_FLT.
  • TESTDATA_ARRAY: this table contains the test images. There are 10K images, each composed of 28x28 = 784 pixels. Image data is also encoded using the data type UTL_NLA_ARRAY_FLT.
The engine for scoring the example neural network is in a package called MNIST. It has a procedure called INIT that loads the components of the neural network from the table tensors_array into PL/SQL variables and a function called SCORE that takes an image as input and return a number, the predicted value of the digit. 
Here is an example of its usage, where the first image in the table testdata_array is examined and correctly predicted to represent the number 7 (the image label agrees with the prediction by MNIST.SCORE):

SQL> exec mnist.init

PL/SQL procedure successfully completed.

SQL> select mnist.score(image_array), label from testdata_array where rownum=1;

MNIST.SCORE(IMAGE_ARRAY)      LABEL

------------------------ ----------
                       7          7

Figure 1: This is a bitmap display of the test image used in the example. This confirms that the prediction of MNIST.SCORE is correct and indeed the image is a representation of the number 7 handwritten and encoded in a grid of 28x28 gray-scale pixels.


Processing all the test images is also a matter of a simple SQL command. In the example of Figure 2 it takes 2 minutes to process 10000 test images, that is about 12 ms per image on average. The accuracy of the scoring function is about 98%. It is calculated as follows: out of 10000 images, 9787 are scored correctly according to the data labels. Note also that the set of test images is disjoint from the images used to train the neural network. Therefore we can expect that the MNIST package has an accuracy of about 98% for recognizing digits also when used on generic input (additional evaluations of the quality of the MNIST package as a classifier are beyond the scope of this post).


The full PL/SQL code and the datapump dump file with the relevant tables can be found on Github. In the following paragraphs, you can read how to build and train the neural network.


Figure 2: The accuracy of the PL/SQL scoring function MNIST.SCORE on the test set of 10K images is about 98%. Processing takes about 12 ms per image.


The neural network

The neural network used in this post is composed of three layers (see also Figure 3): one input layer, one hidden layer and one output layer. If this topic is new to you, I recommend to do some additional reading (see references) and in particular to read Michael Nielsen's "Neural Networks and Deep Learning" which provides an excellent introduction to the topic and a series of step-by-step examples on the problem of recognizing handwritten digits.


Figure 3: The artificial neural network used in this post is composed of three layers. The input layer has 784 neurons, one per pixel of the input image. A hidden layer of 100 neurons is added to improve the accuracy. The output layer has 10 neurons, one per each possible output value (that is digits from 0 to 9).


Get the training and test data, build and train the neural network using TensorFlow

Another important step for deploying neural networks is training. For this you need data, lots of it if possible. You also need an engine to do the necessary computation. Luckily there are many platforms available for working with neural networks, that that are free and relatively use to deploy (see references). In this post, you will see how to use Google's TensorFlow and the Python environment. TensorFlow comes with a tutorial for recognizing handwritten digits in the MNIST database. Included in the tutorial are training and test data with labels and also example code.

You can find the code I used for training the neural network on Github. Some highlights and code snippets are discussed in the following.

Importing the data: The example dataset that comes with TensorFlow provides 55000 images for training and 10000 images for testing. These originally come from the work of Yann LeCun and coworkers. Having large amounts of high-quality data is very important to the success of the process. Moreover, the images come with labels: the labels tell which number each image is intended to depict and provide a very important piece of information as the exercise is to do supervised learning.

Defining the neural network: there are four tensors (vector and matrices in this case) in the network: W0, W1, b0 and b1. They are defined in the following snippet of code. To better understand their role and the key role that the cross entropy and the gradient descent optimizer play in training the network see the references, in particular "Neural Networks and Deep Learning" and TensorFlow tutorial.



Training the neural network: training proceeds with multiple steps of optimization. Training is performed using 55000 images with labels. It runs over 30000 iterations using "mini-batch" size of 100 images. At each step, the gradient descent algorithm computes an update of the weights and biases (W0, W1, bo and b1) with the goal of minimizing the loss function (cross_entropy). The relevant snippet of the code is:


Result: as a result, the trained network has the accuracy of about 98% in recognizing the images in the test set. Note that the test set is composed of 10000 images and is disjoint from the set of images used for training (the training set contains 55000 images).
It is possible to get higher accuracy with more advanced neural network configurations (see references for details), but that is beyond the scope of this post.


Manually scoring the neural network, a Python example

The main result of the training operations is that the tensors (matrices and vectors in this case) that make the neural network are now populated with useful values. I believe that a good way to understand how all this works is to "run the network manually", that is run as an example of how to go from an image of a handwritten digit to the prediction of its value by the trained neural network. As a first step we extract the values of the trained tensors in our model into numpy arrays for later processing:


An example of "manually" operating the network in Python is as follows:



W0_matrix, b0_array, W1_matrix and b1_array are the tensors that constitute the neural network after training, "testimage" is the input, sigmoid() is used as activation function, "hidden_layer" represents the hidden layer of the network, "predicted" is the output layer and softmax() is a function used to normalize the output as a probability distribution. At the end of the calculation, the array predicted[n] contains the prediction that the input image represents the digit "n". The function argmax() finds the value of "n" where predicted[n] is maximized.
The code shown above predicts the value 7 for a test image. The prediction is confirmed as correct by the value of the label and can also be visually confirmed by the bitmap display of the test image (see Figure 1).


Move test data and neural network tensors to an Oracle database

The example in the previous paragraph on how to manually run a the scoring engine illustrates that serving a neural network can be straightforward, in some cases it is just a matter of performing some basic computations with matrices. This contrasts with the complexity of training neural network models, where often one needs a specialized engine, large quality of training data and in the more complex cases also specialized hardware, such as GPU cards.
The discussion of the previous paragraph has also prepared the terrain for the following development: that is moving the neural network tensors and test data to Oracle and implement a serving engine there.
There are many ways to export Python's numpy arrays. One way is to save them in a text format. Here you will see instead a method targeted to exporting directly into Oracle using cx_Oracle, the Python library to interact with Oracle. See also the notebook "Oracle and Python with cx_Oracle" for additional examples and references on how to use cx_Oracle.

You can find the code on Github, here are some relevant snippets:

- Create the tables to host the tensor definition and test data:

SQL> create table tensors (name varchar2(20), val_id number, val binary_float, primary key(name, val_id));

SQL> create table testdata (image_id number, label number, val_id number, val binary_float, primary key(image_id, val_id));

- From Python, open a connection to Oracle:

import cx_Oracle
ora_conn = cx_Oracle.connect('mnist/mnist@ORCL')
cursor = ora_conn.cursor()

- Example of how to transfer the matrix W0 into the Oracle table "tensors"

i=0
sql="insert into tensors values ('W0', :val_id, :val)"
for column in W0_matrix:
    array_values = []
    for element in column:
        array_values.append((i, float(element)))
        i += 1
    cursor.executemany(sql, array_values)
ora_conn.commit()


Oracle's optimizations for linear algebra using UTL_NLA

From Oracle documentation: "The UTL_NLA package exposes a subset of the BLAS and LAPACK (Version 3.0) operations on vectors and matrices represented as VARRAYS". This is very useful for implementing the calculations needed to serve the neural network of this post.
A snippet of the MNIST code to get the gist of this works in practice is reported below. The code performs the calculation v_Y0 = v_Y0 + g_W0_matrix * p_testimage_array, there g_W0_matrix is a 784x100 matrix, p_testimage_array is a vector of 784 elements (encoding the 28x28 images) and v_Y0 is a vector of 100 elements.



utl_nla.blas_gemv(
                      trans => 'N',
                      m => 100,
                      n => 784,
                      alpha => 1.0,
                      a => g_W0_matrix,
                      lda => 100,
                      x => p_testimage_array,
                      incx => 1,
                      beta => 1.0,
                      y => v_Y0,
                      incy => 1,
                      pack => 'C'
        );


In order to use UTL_NLA the tensors that make the neural network and the test images need to be stored in varrays of binary_float, or rather be declared of data type UTL_NLA_ARRAY.
For this reason it is also convenient to post-process the tables "tensors" and "testdata" as follows:

SQL> create table testdata_array as
select a.image_id, a.label, 
cast(multiset(select val from testdata where image_id=a.image_id order by val_id) as utl_nla_array_flt) image_array 
from (select distinct image_id, label from testdata) a order by image_id;

SQL> create table tensors_array as
select a.name, cast(multiset(select val from tensors where name=a.name order by val_id) as utl_nla_array_flt) tensor_vals 
from (select distinct name from tensors) a;

Finally, you can export the tables for later use. In the Github repository you can find a dump file obtained with the following command (run as Oracle):

$ expdp mnist/mnist tables=testdata,tensors directory=DATA_PUMP_DIR dumpfile=MNIST_tables.dmp

The final step, which brings you back to the discussion in the paragraph "let's start from the end: how to test the PL/SQL package MNIST",  is to create the PL/SQL package MNIST that loads the tensors and performs the operations needed to score the neural network, See the details of the code on Github.


Conclusions and comments

This post describes an example of how to implement a scoring engine for an artificial neural network using the Oracle RDBMS and PL/SQL. The discussion is about a simple implementation of the "hello world" example of neural networks: recognizing handwritten digits of the MNIST database. The network is trained using TensorFlow and later exported into Oracle. The final result is a short PL/SQL package which provides digit recognition with an accuracy of about 98%.

We can expect in the near future to find increasing deployments of neural networks close to data sources and data stores. The example in this post of how to implement a neural network serving engine on an Oracle database shows that this is not only possible but also easy to implement.
Serving neural networks is much simpler than training them. While training requires specialized software/platforms and domain knowledge and large amounts of training data, trained networks can be imported into target systems and executed there, in many cases requiring low usage of computing resources.
This post is intended as learning material: a simple feed forward neural network has been used instead of the more performing convolutional network (see references). Moreover, data movement from TensorFlow to Oracle and the implementation of the serving engine in PL/SQL is a sort of a hack in the present state and it is not intended for production usage.

The code accompanying this post is available on Github.


Notes on how to build the test environment

The main components and tools for testing the scripts in this post are:
the Python environment (on Linux with Centos 7) installed using Anaconda 4.1: Python 2.7, Jupyter Ipython notebook.
TensorFlow, version 0.9 (the latest as I write this), installed following the instructions at https://www.tensorflow.org/versions/r0.9/get_started/os_setup.html
Oracle RDBMS running on Linux. The Oracle scripts have been tested on Oracle 11.2.0.4 and 12.1.0.2


References and acknowledgments

An excellent introduction to neural networks and an inspiration for this blog post is Michael Nielsen's book "Neural Networks and Deep Learning".
The code for neural network training used in this post is an extension of Google's TensorFlow MNIST tutorial.
See also: tutorial on TensorFlow by Martin Gorner
Basic techniques for TensorFlow by Aaron Schumacher
Visualizing MNIST by Christopher Olah
Python Machine Learning by Sebastian Raschka
Other popular frameworks for working with neural networks and deep learning besides TensorFlow include Theano and Torch among many others, see also this page on Wikipedia.


Monday, June 13, 2016

IPython/Jupyter Notebooks for Oracle

Topic: In this short post you can find examples of how to use IPython/Jupyter notebooks for running SQL on Oracle.

IPython/Jupyter notebooks are one of the leading free platforms for data analysis, with many advantages, notably the interactive web-based interface and a large ecosystem of readily available packages for data analysis and visualization. Moreover IPython/Jupyter notebooks are a very handy format for sharing code and data as you will see in the examples.
See also this blog post with examples on how to use Jupyter for querying Apache Impala.

It is of interest to integrate many data sources into Jupyter notebooks to make the platform versatile and to fulfill many different use cases. In this short post you can find examples of how to query data from Oracle using Jupyter notebooks and simple integration with pandas and matplotlib.


Notebook Short description
Oracle_IPython_sqlplus Examples of how to use sqlplus inside Jupyter notebooks. It is based on the use of %%bash cell magic and here documents to wrap up sqlplus inside Jupyter cells.
Oracle_IPython_cx_Oracle_pandas Examples of how to query Oracle from Python using cx_Oracle and how to integrate with pandas and visualization with matplotlib.
Oracle_IPython_SQL_magic Examples of how to query Oracle using %sql line magic (or %%sql cell magic) and of the integration with cx_Oracle and pandas.


Dependencies and pointers to build a test environment:

  • Install IPython and Jupyter. The following assumes Anaconda from Continuum Analytics)
  • Install the Oracle client
    • Download the software from OTN: http://www.oracle.com/technetwork/topics/linuxx86-64soft-092277.html
    • On that same link you can find the installation instructions (scroll down by the end of the page)
    • Example of the actions: perform ln -s libclntsh.so.12.1 libclntsh.so (12c version) and export LD_LIBRARY_PATH={oracle client home}
    • I have tested this with Oracle client versions 12.1.0.2 and 11.2.0.4 on Linux
  • Post client installation:
    • set environment: export ORACLE_HOME={path to the Oracile client installation}
    • If not already installed, install libaio (yum install libaio)
  • Check that the Oracle client works and all dependencies are set by running sqlplus from the Oracle client home, example:
    • check client connectivity with: sqlplus username/password@dbserver:port/service_name
  • Install cx_Oracle, for example with pip install cx_Oracle
  • Install ipython-sql
Reference: a web page by Julian Dyke with examples on how to use cx_Oracle


Tuesday, May 31, 2016

Linux BPF/bcc for Oracle Tracing

Topic: In this post you will find a short discussion and pointers to the code of a few sample scripts that I have written using Linux BPF/bcc and uprobes for Oracle tracing.

Previous work and motivations

Tools for dynamic tracing are very useful for troubleshooting and internals investigations of Oracle workloads. Dynamic tracing probes on the OS/kernel, can be used to measure the details for I/O latency for example. Moreover probes on the Oracle userspace functions can be be used to complement Oracle instrumentation and probe deeper into the internals of the Oracle engine, when needed. For example in a work done in collaboration with Frits Hoogland we have investigated how to link Oracle wait event interface with tools able to probe the userspace, such as gdb and dynamic tracing tools as DTrace, SystemTap and Perf. More details and examples on this topic can be found in previous work: Modern Linux Tools for Oracle Troubleshooting (presentation at UKOUG Tech14), Life of an Oracle I/O: Tracing Logical and Physical I/O with SystemTapLinux Perf Probes for Oracle TracingFlame Graphs for Oracle.


What's new with BPF/bcc

BPF together with his frontend bcc are new and exciting technologies in the Linux dynamic tracing ecosystem. If you are not familiar with them, you can get up to speed using the excellent material in Brendan Gregg's blog. While the tools are still in development as I write this, a large amount of important features are already available starting from kernel 4.5, notably the support for uprobes that I have used in the scripts discussed later in this post.

I have started experimenting with porting a few probes for the Oracle userspace that I had written using SystemTap and discussed in this blog. While the language/syntax is completely different between SystemTap and BPF/bcc, porting the probes has turned out to be relatively straightforward. The work has been made substantially easier by the availability of a large selection of well-written and documented scripts in the tools directory of bcc. At present writing BPF/bcc scripts is a mixture of coding in Python and C, which feels to me both familiar and at the same time strangely low-level compared to the more mature environments for example of DTrace and SystemTap.

The example scripts that I have written using BPF/bcc are on Github. Here is a list with comments:

Script name Short description
ora_sqlparse_trace.py Tracing of Oracle SQL parsing. This script traces SQL hard parsing on Oracle binaries hooking on the Oracle function "opiprs" and reads from function arguments (CPU registers) and from process memory.
ora_wait_trace.py Tracing of Oracle wait events. This script traces Oracle sessions by hooking on the functions "kskthewt" and "kews_update_wait_time" and reads from function arguments (CPU registers).
ora_logicalIO_histogram.py Logical IO latency histograms. This script measures the latency between call and return time for the Oracle function "kcbgtcr", which is an important part of the logical IO processing for consistent reads.
ora_wait_histogram.py Wait event latency histograms. This script traces Oracle sessions by hooking on the functions "kskthewt" and "kews_update_wait_time" and reads from function arguments (CPU registers). BPF computes the latency histogram for the wait events and the script prints the values on stdout.

An example of the usage of ora_wait_histogram.py to measure and display wait event latency:

# stdbuf -oL ./ora_wait_histogram.py 10 10|sed -e 's/event# = /event#=/g' -f eventsname.sed
Start tracing oracle wait events... Hit Ctrl-C to end.

event=db file sequential read
     wait time, microsec : count     distribution
         0 -> 1          : 0        |                                        |
         2 -> 3          : 0        |                                        |
         4 -> 7          : 0        |                                        |
         8 -> 15         : 0        |                                        |
        16 -> 31         : 0        |                                        |
        32 -> 63         : 0        |                                        |
        64 -> 127        : 25       |                                        |
       128 -> 255        : 24521    |********************                    |
       256 -> 511        : 46788    |****************************************|
       512 -> 1023       : 12169    |**********                              |
      1024 -> 2047       : 1132     |                                        |
      2048 -> 4095       : 660      |                                        |
      4096 -> 8191       : 248      |                                        |
      8192 -> 16383      : 29       |                                        |


Latency heat maps to display histograms collected with BPF/bcc

PyLatencyMap is a command-line tool for visualizing latency histograms using heat maps on terminal graphics, using ANSI escape codes. PyLatencyMap can be used to investigate I/O performance for random I/O, especially suited for the cases of multiple modes of response time from the storage (SSD cache, HDD, latency outliers). The original idea for PyLatencyMap comes from the work of Brendan Gregg on latency heat maps. I have added to the PyLatencyMap repository a modified version of the biolatency.py script to measure I/O latency histograms: this is the link to the script pylatencymap_biolatency.py and an example of heat maps generated with PyLatencyMap with the script Example11_BPF-bcc_blockIO_latency.sh:



The test workload has been generated using Kevin Closson's SLOB. Additional references with a more detailed discussion of the topic of testing Oracle I/O with SLOB and measuring latency heat maps are:
Heat Map Visualization of I/O Latency with SystemTap and PyLatencyMap and OraLatencyMap v1.1 and Testing I/O with SLOB 2.


Notes on the test environment

The scripts discussed in this post have been developed on Fedora 24 (alpha) running Linux kernel version 4.6 (using the rawhide kernel) and have been tested on workloads generated using Oracle version 11.2.0.4. This is not an Oracle-supported configuration and  the scripts are intended mainly as a demonstration of the technology and for learning purposes.
Here are some pointers on the steps I used to setup a lab environment for testing:

Note in particular the step for configuring the rawhide kernel, probably a good choice when testing BPF, as new features are being added on a regular basis as I write this:
# sudo dnf config-manager --add-repo=http://alt.fedoraproject.org/pub/alt/rawhide-kernel-nodebug/fedora-rawhide-kernel-nodebug.repo

# sudo dnf update

Additional pointers and recipes on how to install Oracle 11.2 on Fedora can be found on Tim Hall's website at: https://oracle-base.com/articles/11g/articles-11g


Conclusions

BPF with its bcc frontend are new and powerful tools for dynamic tracing for Linux. A discussion of the merits and shortfalls of BPF/bcc vs other existing solutions in the Linux dynamic tracing ecosystem is beyond the scope of this post. What you can find in this post are a few example scripts that I have written for tracing Oracle using BPF/bcc and uprobes and an additional script for integrating BPF/bcc with PyLatencyMap, which provides visualization as heat maps of the histograms generated using BPF/bcc.
Although the BPF/bcc environment is currently under evolution, it appears already a very useful addition to the toolbox for troubleshooting and performance investigations of Linux workloads. A set of example scripts/tools that come with the bcc repository are of great help for getting started both with using BPF/bcc and with writing new scripts. BPF/bcc can only run on relatively new kernels (as I write this, I have tested the scripts discussed in this post on kernel version 4.6) and this is an obstacle for its adoption in many environments, at least in the short term.

Note added, February 2019: Red Hat and Oracle Linux 7.6 have backported BPF and can run bcc scripts (yum install bcc*). I have updated the script repository accordingly, see https://github.com/LucaCanali/Linux_tracing_scripts/tree/master/BPF-bcc_Userspace_Oracle


Credits and acknowledgements

Many of the original ideas and tools discussed here are inspired or directly derived from the awesome published work of Brendan Gregg.
Many thanks also to the development teams of BPF and bcc for providing and supporting this new powerful tools. In particular thanks to Brenden Blanco for his work on uprobes for bcc and for his support on issue #478.
The work of investigating Oracle userspace with dynamic tracing tools has been done in collaboration with Frits Hoogland.