Showing posts with label TensorFlow. Show all posts
Showing posts with label TensorFlow. Show all posts

Thursday, March 26, 2020

Distributed Deep Learning for Physics with TensorFlow and Kubernetes

Summary: This post details a solution for distributed deep learning training for a High Energy Physics use case, deployed using cloud resources and Kubernetes. You will find the results for training using CPU and GPU nodes. This post also describes an experimental tool that we developed, TF-Spawner, and how we used it to run distributed TensorFlow on a Kubernetes cluster.

Authors: Riccardo.Castellotti@cern.ch and Luca.Canali@cern.ch

A Particle Classifier

  
This work was developed as part of the pipeline described in Machine Learning Pipelines with Modern Big DataTools for High Energy Physics. The main goal is to build a particle classifier to improve the quality of data filtering for online systems at the LHC experiments. The classifier is implemented using a neural network model described in this research article.
The datasets used for test and training are stored in TFRecord format, with a cumulative size of about 250 GB, with 34 million events in the training dataset. A key part of the neural network (see Figure 1) is a GRU layer that is trained using lists of 801 particles with 19 low-level features each, which account for most of the training dataset. The datasets used for this work have been produced using Apache Spark, see details and code. The original pipeline produces files in Apache Parquet format; we have used Spark and the spark-tensorflow-connector to convert the datasets into TFRecord format, see also the code.

Data: download the datasets used for this work at this link
Code: see the code used for the tests reported in this post at this link



Figure 1: (left) Diagram of the neural network for the Inclusive Classifier model, from T. Nguyen et. al. (right) TF.Keras implementation used in this work.

Distributed Training on Cloud Resources

  
Cloud resources provide a suitable environment for scaling distributed training of neural networks. One of the key advantages of using cloud resources is the elasticity of the platform that allows allocating resources when needed. Moreover, container orchestration systems, in particular Kubernetes, provide a powerful and flexible API for deploying many types of workloads on cloud resources, including machine learning and data processing pipelines. CERN physicists, and data scientists in general, can access cloud resources and Kubernetes clusters via the CERN OpenStack private cloud. The use of public clouds is also being actively tested for High Energy Physics (HEP) workloads. The tests reported here have been run using resources from Oracle's OCI.
For this work, we have developed a custom launcher script, TF-Spawner (see also the paragraph on TF-Spawner for more details) for running distributed TensorFlow training code on Kubernetes clusters.
Training and test datasets have been copied to the cloud object storage prior to running the tests, OCI object storage in this case, while for tests run at CERN we used an S3 installation based on Ceph. Our model training job with TensorFlow used training and test data in TFRecord format, produced at the end of the data preparation part of the pipeline, as discussed in the previous paragraph. TensorFlow reads natively TFRecord format and has tunable parameters and optimizations when ingesting this type of data using the modules tf.data and tf.io. We found that reading from OCI object storage can become a bottleneck for distributed training, as it requires reading data over the network which can suffer from bandwidth saturation, latency spikes and/or multi-tenancy noise. We followed TensorFlow's documentation recommendations for improving the data pipeline performance, by using prefetching, parallel data extraction, sequential interleaving, caching, and by using a large read buffer. Notably, caching has proven to be very useful for distributed training with GPUs and for some of the largest tests on CPU, where we observed that the first training epoch, which has to read the data into the cache, was much slower than subsequent epoch which would find data already cached.
Tests were run using TensorFlow version 2.0.1, using tf.distribute strategy "multi worker mirror strategy''. Additional care was taken to make sure that the different tests would also yield the same good results in terms of accuracy on the test dataset as what was found with training methods tested in previous work. To achieve this we have found that additional tuning was needed on the settings of the learning rate for the optimizer (we use the Adam optimizer for all the tests discussed in this article). We scaled the learning rate with the number of workers, to match the increase in effective batch size (we used 128 for each worker). In addition, we found that slowly reducing the learning rate as the number of epochs progressed, was beneficial to the convergence of the network. This additional step is an ad hoc tuning that we developed by trial and error and that we validated by monitoring the accuracy and loss on the test dataset at the end of each training.
To gather performance data, we ran the training for 6 epochs, which provided accuracy and loss very close to the best results that we would obtain by training the network up to 12 epochs. We have also tested adding shuffling between each epoch, using the shuffle method of the tf.data API, however it has not shown measurable improvements so this technique has not been further used in the tests reported here.

Figure 2: Measured speedup for the distributed training of the Inclusive Classifier model using TensorFlow and tf.distribute with “multi  worker  mirror  strategy”, running on cloud resources with CPU and GPU nodes (Nvidia P100), training for 6 epochs. The speedup values indicate how well the distributed training scales as the number of worker nodes, with CPU and GPU resources, increases.
  

Results and Performance Measurements, CPU and GPU Tests

  
We deployed our tests using Oracle's OCI. Cloud resources were used to build Kubernetes clusters using virtual machines (VMs). We used a set of Terraform script to automate the configuration process. The cluster for CPU tests used VMs of the flavor "VM.Standard2.16'', based on 2.0 GHz Intel Xeon Platinum 8167M, each providing 16 physical cores (Oracle cloud refers to this as OCPUs) and 240 GB of RAM. Tests in our configuration deployed 3 pods for each VM (Kubernetes node), each pod running one TensorFlow worker. Additional OS-based measurements on the VMs confirmed that this was a suitable configuration, as we could measure that the CPU utilization on each VM matched the number of available physical cores (OCPUs), therefore providing good utilization without saturation. The available RAM in the worker nodes was used to cache the training dataset using the tf.data API (data populates the cache during the first epoch).
Figure 2 shows the results of the Inclusive Classifier model training speedup for a variable number of nodes and CPU cores. Tests have been run using TF-Spawner. Measurements show that the training time decreases as the number of allocated cores increases. The speedup grows close to linearly in the range tested: from 32 cores to 480 cores. The largest distributed training test that we ran using CPU, used 480 physical cores (OCPU), distributed over 30 VM, each running 3 workers each (each worker running in a separate container in a pod), for a total of 90 workers.

Similarly, we have performed tests using GPU resources on OCI and running the workload with TF-Spawner. For the GPU tests we have used the VM flavor "GPU 2.1'' which comes equipped with one Nvidia P100 GPU, 12 physical cores (OCPU) and 72 GB of RAM. We have tested with distributed training up to 10 GPUs, and found that scalability was close to linear in the tested range. One important lesson learned when using GPUs is, that the slow performance of reading data from OCI storage makes the first training epoch much slower than the rest of the epochs (up to 3-4 times slower). It was therefore very important to use TensorFlow's caching for the training dataset for our tests with GPUs. However, we could only cache the training dataset for tests using 4 nodes or more, given the limited amount of memory in the VM flavor used (72 GB of RAM per node) compared to the size of the training set (200 GB).
Distributed training tests with CPUs and GPUs were performed using the same infrastructure, namely a Kubernetes cluster built on cloud resources and cloud storage allocated on OCI. Moreover, we used the same script for CPU and GPU training and used the same APIs, tf.distribute and tf.keras, and the same TensorFlow version. The TensorFlow runtime used was different for the two cases, as training on GPU resources took advantage of TensorFlow's optimizations for CUDA and Nvidia GPUs. Figure 3 shows the distributed training time measured for some selected cluster configurations. We can use these results to compare the performance we found when training on GPU and on CPU. For example, we find in Figure 3 that the training time of the Inclusive Classifier for 6 epochs using 400 CPU cores (distributed over 25 VMs equipped with 16 physical cores each) is about 2000 seconds, which is similar to the training time we measured when distributing the training over 6 nodes equipped with GPUs.
When training using GPU resources (Nvidia P100), we measured that each batch is processed in about 59 ms (except for epoch 1 which is I/O bound and is about 3x slower). Each batch contains 128 records, and has a size of about 7.4 MB. This corresponds to a measured throughput of training data flowing through the GPU of about 125 MB/sec per node (i.e. 1.2 GB/sec when training using 10 GPUs). When training on CPU, the measured processing time per batch is about 930 ms, which corresponds to 8 MB/sec per node, and amounts to 716 MB/sec for the training test with 90 workers and 480 CPU cores.
We do not believe these results can be easily generalized to other environments and models, however, they are reported here as they can be useful as an example and for future reference.

Figure 3: Selected measurements of the distributed training time for the Inclusive Classifier model using TensorFlow and tf.distribute with “multi worker mirror strategy”, training for 6 epochs, running on cloud resources, using CPU (2.0 GHz Intel Xeon Platinum 8167M) and GPU (Nvidia P100) nodes, on Oracle's OCI.

TF-Spawner

  
TF-Spawner is an experimental tool for running TensorFlow distributed training on Kubernetes clusters.
TF-Spawner takes as input the user's Python code for TensorFlow training, which is expected to use tf.distribute strategy for multi worker training, and runs it on a Kubernetes cluster. TF-Spawner takes care of requesting the desired number of workers, each running in a container image inside a dedicated pod (unit of execution) on a Kubernetes cluster. We used the official TensorFlow images from Docker Hub for this work. Moreover, TF-Spawner handles the distribution of the necessary credentials for authenticating with cloud storage and manages the TF_CONFIG environment variable needed by tf.distribute.

Examples:


TensorBoard  metrics visualization:

TensorBoard provides monitoring and instrumentation for TensorFlow operations. To use TensorBoard with TF-Spawner you can follow a few additional steps detailed in the documentation.

Figure 4: TensorBoard visualization of the distributed training metrics for the Inclusive Classifier, trained on 10 GPUs nodes on a Kubernetes cluster using TF-Spawner. Measurements show that training convergences smoothly. Note: the reason why we see lower accuracy and greater loss for the training dataset compared to the validation dataset is due to the use of dropout in the model.

Limitations: We found TF-Spawner powerful and easy to use for the scope of this work. However, it is an experimental tool. Notably, there is no validation of the user-provided training script, it is simply passed to Python for execution. Users need to make sure that all the requested pods are effectively running, and have to manually take care of possible failures. At the end of the training, the pods will be found in "Completed" state, users can then manually get the information they need, such as the training time from the pods' log files. Similarly, other common operations, such as fetching the saved trained model, or monitoring training with TensorBoard, will need to be performed manually. These are all relatively easy tasks, but require additional effort and some familiarity with the Kubernetes environment.
Another limitation to the proposed approach is that the use of TF-Spawner does not naturally fit with the use of Jupyter Notebooks, which are often the preferred environment for ML development. Ideas for future work in this direction and other tools that can be helpful in this area are listed in the conclusions.
If you try and find TF-Spawner useful for your work, we welcome feedback.

Conclusions and Acknowledgements

  
This work shows an example of how we implemented distributed deep learning for a High Energy Physics use case, using commonly used tools and platforms from industry and open source, namely TensorFlow and Kubernetes. A key point of this work is demonstrating the use of cloud resources to scale out distributed training.
Machine learning and deep learning on large amounts of data are standard tools for particle physics, and their use is expected to increase in the HEP community in the coming year, both for data acquisition and for data analysis workflows, notably in the context of the challenges of the High Luminosity LHC project. Improvements in productivity and cost reduction for development, deployment, and maintenance of machine learning pipelines on HEP data are of high interest.
We have developed and used a simple tool for running TensorFlow distributed training on Kubernetes clusters, TF-Spawner. Previously reported work has addressed the implementation of the pipeline and distributed training using Apache Spark. Future work may address the use of other solutions for distributed training, using cloud resources and open source tools, such as Horovod on Spark and KubeFlow. In particular, we are interested in further exploring the integration of distributed training with the analytics platforms based on Jupyter Notebooks.

This work has been developed in the context of the Data Analytics services at CERN and of the CERN openlab project on machine learning in the cloud in collaboration with Oracle. Additional information on the work described here can be found in the article Machine Learning Pipelines with Modern Big DataTools for High Energy Physics. The authors would like to thank Matteo Migliorini and Marco Zanetti of the University of Padova for their collaboration and joint work, Thong Nguyen and Maurizio Pierini for their  help, suggestions, and for providing the dataset and models for this work. Many thanks also to CERN openlab, to our Oracle contacts for this project, and to our colleagues at the Spark and Hadoop Service at CERN.




Tuesday, July 26, 2016

How to Build a Neural Network Scoring Engine in PL/SQL

Topic: In this post, you will find an example of how to build and deploy a basic artificial neural network scoring engine using PL/SQL for recognizing handwritten digits. This post is intended for learning purposes, in particular for Oracle practitioners who want a hands-on introduction to neural networks.


Introduction

Machine learning and neural networks in particular, are currently hot topics in data processing. Many tools and platform are now easily available to work and experiment with neural networks and deep learning (see also the links at the end of this post)Recognizing hand-written digits, in particular using the MNIST database by Yann LeCun et al., is currently the "hello world" example for neural networks.
In this post, you will see how to build and deploy a simple neural network scoring engine to recognize handwritten digits using Oracle and PL/SQL. The final result is a short PL/SQL package with an accuracy of about 98%. The neural network is built and trained using TensorFlow and then transferred to Oracle for serving it.

One of the ideas that this post wants to illustrate is that scoring neural networks is much easier than training them: the operations required for serving a trained network can be implemented relatively easily on many computing languages/environments. Discussions on these topics normally are centered around platforms for "Big Data" (see for example Spark and MLlib). I find interesting to note that neural networks can also be successfully applied to the RDBMS world. This can be useful as large quantities of valuable data are currently stored in relational databases. In the case of Oracle, the implementation of a scoring engine is also made easier by the availability of a mature the PL/SQL environment with a package for linear algebra: UTL_NLA.


Let's start from the end: how to deploy the PL/SQL package MNIST and recognize handwritten digits using Oracle

One short PL/SQL package and two tables is all you need to replay the following example (you can find the details of the code on Github). The tables are:
  • TENSORS_ARRAY: this table contains the numerical values for the vectors and matrices (tensors) that constitute the neural network. There is a total of 79510 floating point numbers encoded into four tensors using the data type UTL_NLA_ARRAY_FLT.
  • TESTDATA_ARRAY: this table contains the test images. There are 10K images, each composed of 28x28 = 784 pixels. Image data is also encoded using the data type UTL_NLA_ARRAY_FLT.
The engine for scoring the example neural network is in a package called MNIST. It has a procedure called INIT that loads the components of the neural network from the table tensors_array into PL/SQL variables and a function called SCORE that takes an image as input and return a number, the predicted value of the digit. 
Here is an example of its usage, where the first image in the table testdata_array is examined and correctly predicted to represent the number 7 (the image label agrees with the prediction by MNIST.SCORE):

SQL> exec mnist.init

PL/SQL procedure successfully completed.

SQL> select mnist.score(image_array), label from testdata_array where rownum=1;

MNIST.SCORE(IMAGE_ARRAY)      LABEL

------------------------ ----------
                       7          7

Figure 1: This is a bitmap display of the test image used in the example. This confirms that the prediction of MNIST.SCORE is correct and indeed the image is a representation of the number 7 handwritten and encoded in a grid of 28x28 gray-scale pixels.


Processing all the test images is also a matter of a simple SQL command. In the example of Figure 2 it takes 2 minutes to process 10000 test images, that is about 12 ms per image on average. The accuracy of the scoring function is about 98%. It is calculated as follows: out of 10000 images, 9787 are scored correctly according to the data labels. Note also that the set of test images is disjoint from the images used to train the neural network. Therefore we can expect that the MNIST package has an accuracy of about 98% for recognizing digits also when used on generic input (additional evaluations of the quality of the MNIST package as a classifier are beyond the scope of this post).


The full PL/SQL code and the datapump dump file with the relevant tables can be found on Github. In the following paragraphs, you can read how to build and train the neural network.


Figure 2: The accuracy of the PL/SQL scoring function MNIST.SCORE on the test set of 10K images is about 98%. Processing takes about 12 ms per image.


The neural network

The neural network used in this post is composed of three layers (see also Figure 3): one input layer, one hidden layer and one output layer. If this topic is new to you, I recommend to do some additional reading (see references) and in particular to read Michael Nielsen's "Neural Networks and Deep Learning" which provides an excellent introduction to the topic and a series of step-by-step examples on the problem of recognizing handwritten digits.


Figure 3: The artificial neural network used in this post is composed of three layers. The input layer has 784 neurons, one per pixel of the input image. A hidden layer of 100 neurons is added to improve the accuracy. The output layer has 10 neurons, one per each possible output value (that is digits from 0 to 9).


Get the training and test data, build and train the neural network using TensorFlow

Another important step for deploying neural networks is training. For this you need data, lots of it if possible. You also need an engine to do the necessary computation. Luckily there are many platforms available for working with neural networks, that that are free and relatively use to deploy (see references). In this post, you will see how to use Google's TensorFlow and the Python environment. TensorFlow comes with a tutorial for recognizing handwritten digits in the MNIST database. Included in the tutorial are training and test data with labels and also example code.

You can find the code I used for training the neural network on Github. Some highlights and code snippets are discussed in the following.

Importing the data: The example dataset that comes with TensorFlow provides 55000 images for training and 10000 images for testing. These originally come from the work of Yann LeCun and coworkers. Having large amounts of high-quality data is very important to the success of the process. Moreover, the images come with labels: the labels tell which number each image is intended to depict and provide a very important piece of information as the exercise is to do supervised learning.

Defining the neural network: there are four tensors (vector and matrices in this case) in the network: W0, W1, b0 and b1. They are defined in the following snippet of code. To better understand their role and the key role that the cross entropy and the gradient descent optimizer play in training the network see the references, in particular "Neural Networks and Deep Learning" and TensorFlow tutorial.



Training the neural network: training proceeds with multiple steps of optimization. Training is performed using 55000 images with labels. It runs over 30000 iterations using "mini-batch" size of 100 images. At each step, the gradient descent algorithm computes an update of the weights and biases (W0, W1, bo and b1) with the goal of minimizing the loss function (cross_entropy). The relevant snippet of the code is:


Result: as a result, the trained network has the accuracy of about 98% in recognizing the images in the test set. Note that the test set is composed of 10000 images and is disjoint from the set of images used for training (the training set contains 55000 images).
It is possible to get higher accuracy with more advanced neural network configurations (see references for details), but that is beyond the scope of this post.


Manually scoring the neural network, a Python example

The main result of the training operations is that the tensors (matrices and vectors in this case) that make the neural network are now populated with useful values. I believe that a good way to understand how all this works is to "run the network manually", that is run as an example of how to go from an image of a handwritten digit to the prediction of its value by the trained neural network. As a first step we extract the values of the trained tensors in our model into numpy arrays for later processing:


An example of "manually" operating the network in Python is as follows:



W0_matrix, b0_array, W1_matrix and b1_array are the tensors that constitute the neural network after training, "testimage" is the input, sigmoid() is used as activation function, "hidden_layer" represents the hidden layer of the network, "predicted" is the output layer and softmax() is a function used to normalize the output as a probability distribution. At the end of the calculation, the array predicted[n] contains the prediction that the input image represents the digit "n". The function argmax() finds the value of "n" where predicted[n] is maximized.
The code shown above predicts the value 7 for a test image. The prediction is confirmed as correct by the value of the label and can also be visually confirmed by the bitmap display of the test image (see Figure 1).


Move test data and neural network tensors to an Oracle database

The example in the previous paragraph on how to manually run a the scoring engine illustrates that serving a neural network can be straightforward, in some cases it is just a matter of performing some basic computations with matrices. This contrasts with the complexity of training neural network models, where often one needs a specialized engine, large quality of training data and in the more complex cases also specialized hardware, such as GPU cards.
The discussion of the previous paragraph has also prepared the terrain for the following development: that is moving the neural network tensors and test data to Oracle and implement a serving engine there.
There are many ways to export Python's numpy arrays. One way is to save them in a text format. Here you will see instead a method targeted to exporting directly into Oracle using cx_Oracle, the Python library to interact with Oracle. See also the notebook "Oracle and Python with cx_Oracle" for additional examples and references on how to use cx_Oracle.

You can find the code on Github, here are some relevant snippets:

- Create the tables to host the tensor definition and test data:

SQL> create table tensors (name varchar2(20), val_id number, val binary_float, primary key(name, val_id));

SQL> create table testdata (image_id number, label number, val_id number, val binary_float, primary key(image_id, val_id));

- From Python, open a connection to Oracle:

import cx_Oracle
ora_conn = cx_Oracle.connect('mnist/mnist@ORCL')
cursor = ora_conn.cursor()

- Example of how to transfer the matrix W0 into the Oracle table "tensors"

i=0
sql="insert into tensors values ('W0', :val_id, :val)"
for column in W0_matrix:
    array_values = []
    for element in column:
        array_values.append((i, float(element)))
        i += 1
    cursor.executemany(sql, array_values)
ora_conn.commit()


Oracle's optimizations for linear algebra using UTL_NLA

From Oracle documentation: "The UTL_NLA package exposes a subset of the BLAS and LAPACK (Version 3.0) operations on vectors and matrices represented as VARRAYS". This is very useful for implementing the calculations needed to serve the neural network of this post.
A snippet of the MNIST code to get the gist of this works in practice is reported below. The code performs the calculation v_Y0 = v_Y0 + g_W0_matrix * p_testimage_array, there g_W0_matrix is a 784x100 matrix, p_testimage_array is a vector of 784 elements (encoding the 28x28 images) and v_Y0 is a vector of 100 elements.



utl_nla.blas_gemv(
                      trans => 'N',
                      m => 100,
                      n => 784,
                      alpha => 1.0,
                      a => g_W0_matrix,
                      lda => 100,
                      x => p_testimage_array,
                      incx => 1,
                      beta => 1.0,
                      y => v_Y0,
                      incy => 1,
                      pack => 'C'
        );


In order to use UTL_NLA the tensors that make the neural network and the test images need to be stored in varrays of binary_float, or rather be declared of data type UTL_NLA_ARRAY.
For this reason it is also convenient to post-process the tables "tensors" and "testdata" as follows:

SQL> create table testdata_array as
select a.image_id, a.label, 
cast(multiset(select val from testdata where image_id=a.image_id order by val_id) as utl_nla_array_flt) image_array 
from (select distinct image_id, label from testdata) a order by image_id;

SQL> create table tensors_array as
select a.name, cast(multiset(select val from tensors where name=a.name order by val_id) as utl_nla_array_flt) tensor_vals 
from (select distinct name from tensors) a;

Finally, you can export the tables for later use. In the Github repository you can find a dump file obtained with the following command (run as Oracle):

$ expdp mnist/mnist tables=testdata,tensors directory=DATA_PUMP_DIR dumpfile=MNIST_tables.dmp

The final step, which brings you back to the discussion in the paragraph "let's start from the end: how to test the PL/SQL package MNIST",  is to create the PL/SQL package MNIST that loads the tensors and performs the operations needed to score the neural network, See the details of the code on Github.


Conclusions and comments

This post describes an example of how to implement a scoring engine for an artificial neural network using the Oracle RDBMS and PL/SQL. The discussion is about a simple implementation of the "hello world" example of neural networks: recognizing handwritten digits of the MNIST database. The network is trained using TensorFlow and later exported into Oracle. The final result is a short PL/SQL package which provides digit recognition with an accuracy of about 98%.

We can expect in the near future to find increasing deployments of neural networks close to data sources and data stores. The example in this post of how to implement a neural network serving engine on an Oracle database shows that this is not only possible but also easy to implement.
Serving neural networks is much simpler than training them. While training requires specialized software/platforms and domain knowledge and large amounts of training data, trained networks can be imported into target systems and executed there, in many cases requiring low usage of computing resources.
This post is intended as learning material: a simple feed forward neural network has been used instead of the more performing convolutional network (see references). Moreover, data movement from TensorFlow to Oracle and the implementation of the serving engine in PL/SQL is a sort of a hack in the present state and it is not intended for production usage.

The code accompanying this post is available on Github.


Notes on how to build the test environment

The main components and tools for testing the scripts in this post are:
the Python environment (on Linux with Centos 7) installed using Anaconda 4.1: Python 2.7, Jupyter Ipython notebook.
TensorFlow, version 0.9 (the latest as I write this), installed following the instructions at https://www.tensorflow.org/versions/r0.9/get_started/os_setup.html
Oracle RDBMS running on Linux. The Oracle scripts have been tested on Oracle 11.2.0.4 and 12.1.0.2


References and acknowledgments

An excellent introduction to neural networks and an inspiration for this blog post is Michael Nielsen's book "Neural Networks and Deep Learning".
The code for neural network training used in this post is an extension of Google's TensorFlow MNIST tutorial.
See also: tutorial on TensorFlow by Martin Gorner
Basic techniques for TensorFlow by Aaron Schumacher
Visualizing MNIST by Christopher Olah
Python Machine Learning by Sebastian Raschka
Other popular frameworks for working with neural networks and deep learning besides TensorFlow include Theano and Torch among many others, see also this page on Wikipedia.