TensorFlow Keras Distributed with Kale

This section will guide you through creating and managing a TFJob CR on Kubeflow, with Kale and the Kubeflow Training Operator.

For this guide, we leverage the interactive environment of JupyterLab, but this is completely optional. We do it to demonstrate how you can monitor such a job using Kale’s TFJob client.

Note

Currently, Kale supports the TensorFlow Keras API for creating and launching TensorFlow distributed jobs. In future versions, we plan to support the TensorFlow Estimator API as well.

What You’ll Need

  • An Arrikto EKF or MiniKF deployment.
  • The Kale TensorFlow Docker image.

Procedure

  1. Create a new notebook server using the Kale TensorFlow Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-gpu-tf-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

    This is not the default Kale image, so you must carefully choose it from the dropdown menu.

    Note

    If you want to have access to a GPU device, you must specifically request one or more from the Jupyter Web App UI. For this user guide, access to a GPU device is not required.

    ../../../../_images/gpu.png
  2. Increase the size of the workspace volume to 10GB:

    ../../../../_images/notebook-volume1.png
  3. Connect to the server, open a terminal, and install the tensorflow-datasets package:

    $ pip3 install --user tensorflow-datasets
  4. Create a new Jupyter notebook (that is, an IPYNB file) using the JupyterLab UI:

    ../../../../_images/ipynb1.png
  5. Copy and paste the import statements on the top code cell. Then run it:

    import tensorflow as tf import tensorflow_datasets as tfds from typing import Tuple from kale.distributed import tf as kale_tf

    This is how your notebook cell will look like:

    ../../../../_images/imports1.png
  6. Create a function which downloads the MNIST dataset. Copy and paste the following code snippet to a new code cell. Then run it:

    def create_dataset() -> tf.data.Dataset: """Create the Training dataset""" import tensorflow as tf import tensorflow_datasets as tfds buffer_size = 10000 batch_size = 64 def scale(image, label): """Scale MNIST data from (0, 255] to (0., 1.]""" image = tf.cast(image, tf.float32) image /= 255 return image, label datasets, _ = tfds.load(name='mnist', with_info=True, as_supervised=True) train_dataset = datasets['train'].map(scale).cache().shuffle(buffer_size).batch(batch_size) return train_dataset

    This is how your notebook cell will look like:

    ../../../../_images/mnist.png

    This cell creates a function which downloads the MNIST dataset. You will use the training split, scale it and create batches of 64 samples.

  7. Define a function that creates, compiles, and returns a tf.keras Model. Copy and paste the following code snippet to a new code cell. Then run it:

    def create_model() -> tf.keras.Model: """Create the model architecture.""" import tensorflow as tf model = tf.keras.models.Sequential() model.add( tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1))) model.add(tf.keras.layers.MaxPooling2D((2, 2))) model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu')) model.add(tf.keras.layers.MaxPooling2D((2, 2))) model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='relu')) model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(64, activation='relu')) model.add(tf.keras.layers.Dense(10, activation='softmax')) model.summary() model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) return model

    This is how your notebook cell will look like:

    ../../../../_images/model1.png

    This cell defines a function that creates a standard Convolutional Neural Network architecture. It sets adam as the trainining process optimizer, the loss function to sparse_categorical_corssentropy, and the accuracy metric. You will train this model architecture on the MNIST dataset in a distributed manner.

  8. Create and submit a TFJob CR. Copy and paste the following code snippet to a new code cell. Then run it:

    tfjob = kale_tf.distribute( model_fn=create_model, train_data_fn=create_dataset, epochs=4, number_of_processes=2, cuda=True, fit_kwargs={"steps_per_epoch": 70} )

    This is how your notebook cell will look like:

    ../../../../_images/distribute1.png

    At a high level, the distribute function follows this process:

    • Save several assets to a local folder, including the training dataset and the model definition function.
    • Snapshot the volumes mounted to the notebook server.
    • Hydrate new PVCs starting from the snapshots of the previous step.
    • Create and submit a TFJob CR. All the workers mount the newly created PVCs as RWX.

    Upon submission of the CR, the Training Operator creates the two processes you requested with the number_of_processes argument. By default, each process requests to consume a GPU device. These Pods run a Kale entrypoint which

    • Looks for the assets saved during the preparation phase in the local FS (backed by one of the RWX PVCs), and loads them into memory.
    • Creates a suitable TensorFlow distribution strategy.
    • Instantiates and compiles the model inside the scope of this strategy.
    • Calls the model’s fit function to start the training process.

    Note

    The TF Keras fit function accepts a number of different arguments. To view the list of arguments that the fit function accepts see the TF Keras documentation. You can pass any of those arguments in the fit_kwargs argument of the distribute function, as a Python dictionary. Kale will make sure to pass it down the line. However, you cannot pass values for x, and y arguments. Kale expects you to use the train_data_fn argument to provide a function that creates the dataset. Then, Kale knows how to populate the x and y arguments.

    Note

    If you want to distribute your model across multiple CPU cores, you can set the cuda argument to False. By default, Kale will launch two processes (the minimum number of processes required by the TFJob CR), on two different GPU devices.

  9. Monitor the TFJob CR. Copy and paste the following code snippet to a new code cell. Then run it:

    tfjob.get(watch=True)

    This is how your notebook cell will look like:

    ../../../../_images/monitor1.png

    Note

    In this step you monitor the state of the Job. The state can be in one of the following states: Created, Running, Failed, or Succeeded. This call blocks until the training process finishes. To continue with the next step and view the logs, you can stop the interpreter by pressing the stop button in the notebook UI. Instead of the above, you can call the get_job_status function of the client with no arguments. The function will return immediately, reporting back the current status of the Job.

  10. Stream the logs of the master process. Copy and paste the following code snippet to a new code cell. Then run it:

    tfjob.stream_logs()

    This is how your notebook cell will look like:

    ../../../../_images/logs1.png

    Note

    In this step you view the logs of the Pod-running worker with index 0. You can view the logs of any other worker as well, however, in most cases, they are identical. This call blocks until the training process finishes. If you want to continue executing other notebook cells, you can stop the interpreter by pressing the stop button in the notebook UI.

  11. Optional

    When the training process completes, you can delete the TFJob CR. Copy and paste the following code snippet to a new code cell. Then run it:

    tfjob.delete()

    This is how your notebook cell will look like:

    ../../../../_images/delete1.png

    Important

    After the completion of the training process, the controller will not remove the resources it creates. If you do not want to leave stale resources, you have to manually delete the CR using the above command.

Summary

You have successfully run a TF Keras distributed process using Kale and the Kubeflow Training Operator.

What’s Next

The next step is to create a TF Keras distributed KFP step using the Kale SDK.