Run a Pipeline on Arrikto vGPUs using the Kale SDK

Without Kiwi, each step of a Kubeflow pipeline requiring an NVIDIA GPU occupies a whole GPU. This means that in order for N independent steps to run in parallel, we would need N GPUs. Kiwi enables multiple steps of a Kubeflow pipeline to run in parallel on the same GPU.

In this section you will use the Kale SDK to deploy a Kubeflow pipeline whose GPU steps will all run in parallel on the same physical device through Kiwi. You will run Kaggle’s dog breed classification example that classifies images of dogs according to their breed.

Procedure

  1. Create a new notebook server using the Kale Tensorflow GPU Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-gpu-tf-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

    Note

    You do not need to add an Arrikto vGPU to the notebook server when you create it in the Jupyter Web App. You will not run any GPU work from inside that notebook server. You will only create the Kubeflow pipeline, whose steps are going to use Arrikto vGPUs.

  2. Connect to the server, open a terminal, and install the required packages:

    $ pip3 install --user pillow==7.2.0 tensorflow==2.6.2 matplotlib==3.3.1 fastapi==0.76.0
  3. Download and extract the required dog images:

    $ wget https://s3-us-west-1.amazonaws.com/udacity-aind/dog-project/dogImages.zip \ > && unzip -qo dogImages.zip \ > && rm dogImages.zip
  4. Create a new Python file and name it kiwi_dogbreed.py:

    $ touch kiwi_dogbreed.py
  5. Copy and paste the following code inside kiwi_dogbreed.py, or download it:

    kiwi_dogbreed.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kiwi example using the Kale SDK.
    4-169
    4
    5This script creates an ML pipeline that classifies dog images
    6and runs in parallel on the same Kiwi-enabled GPU
    7"""
    8
    9from kale.types import MarshalData
    10import tensorflow as tf
    11
    12from tensorflow.keras.preprocessing.image import ImageDataGenerator
    13from kubernetes.client import (V1Affinity, V1PodAffinity, V1PodAffinityTerm,
    14 V1LabelSelector)
    15
    16from kale.sdk import pipeline, step
    17from PIL import ImageFile
    18
    19ImageFile.LOAD_TRUNCATED_IMAGES = True
    20
    21affinity = V1Affinity(pod_affinity=V1PodAffinity(
    22 required_during_scheduling_ignored_during_execution=[
    23 V1PodAffinityTerm(
    24 topology_key="kubernetes.io/hostname",
    25 label_selector=V1LabelSelector(
    26 match_labels={"app": "dogbreed"}))]))
    27
    28
    29@step(name="load-data")
    30def load_data(img_size: int = 224, batch_size: int = 32):
    31 def get_train_generator():
    32 data_datagen = ImageDataGenerator(rescale=1. / 255,
    33 width_shift_range=.2,
    34 height_shift_range=.2,
    35 brightness_range=[0.5, 1.5],
    36 horizontal_flip=True)
    37 return data_datagen.flow_from_directory(
    38 "dogImages/train/", target_size=(img_size, img_size),
    39 batch_size=batch_size)
    40
    41 def get_valid_generator():
    42 data_datagen = ImageDataGenerator(rescale=1. / 255)
    43 return data_datagen.flow_from_directory(
    44 "dogImages/valid/", target_size=(img_size, img_size),
    45 batch_size=batch_size)
    46
    47 def get_test_generator():
    48 data_datagen = ImageDataGenerator(rescale=1. / 255)
    49 return data_datagen.flow_from_directory(
    50 "dogImages/test/", target_size=(img_size, img_size),
    51 batch_size=batch_size)
    52
    53 return get_train_generator, get_valid_generator, get_test_generator
    54
    55
    56@step(name="custom-classifier",
    57 deploy_config={"limits": {"arrikto.com/gpu": "1"}})
    58def custom_classifier(get_train_generator: MarshalData,
    59 get_valid_generator: MarshalData, epochs: int = 2,
    60 img_size: int = 224, number_of_nodes: int = 512,
    61 lr: float = 0.001):
    62 model = tf.keras.models.Sequential(
    63 [tf.keras.layers.Conv2D(16, 3, activation="relu",
    64 input_shape=(img_size, img_size, 3)),
    65 tf.keras.layers.MaxPool2D(),
    66 tf.keras.layers.Conv2D(32, 3, activation="relu"),
    67 tf.keras.layers.MaxPool2D(),
    68 tf.keras.layers.Conv2D(64, 3, activation="relu"),
    69 tf.keras.layers.MaxPool2D(),
    70 tf.keras.layers.GlobalAveragePooling2D(),
    71 tf.keras.layers.Dense(int(number_of_nodes), activation="relu"),
    72 tf.keras.layers.Dense(133, activation="softmax")])
    73
    74 model.compile(
    75 optimizer=tf.optimizers.Adam(learning_rate=float(lr)),
    76 loss=tf.losses.categorical_crossentropy,
    77 metrics=["accuracy"]
    78 )
    79
    80 train_generator = get_train_generator()
    81 valid_generator = get_valid_generator()
    82
    83 tb_callback = tf.keras.callbacks.TensorBoard(
    84 log_dir="custom_classifier_logs")
    85
    86 model.fit(train_generator, epochs=epochs, validation_data=valid_generator,
    87 callbacks=[tb_callback])
    88
    89
    90@step(name="vgg16-classifier",
    91 deploy_config={"limits": {"arrikto.com/gpu": "1"}})
    92def vgg16_classifier(get_train_generator: MarshalData,
    93 get_valid_generator: MarshalData, epochs: int = 2,
    94 img_size: int = 224, lr: float = 0.001):
    95 vgg_body = tf.keras.applications.VGG16(
    96 weights="imagenet",
    97 include_top=False,
    98 input_shape=(img_size, img_size, 3)
    99 )
    100
    101 vgg_body.trainable = False
    102
    103 inputs = tf.keras.layers.Input(shape=(img_size, img_size, 3))
    104 x = vgg_body(inputs, training=False)
    105 x = tf.keras.layers.GlobalAveragePooling2D()(x)
    106 outputs = tf.keras.layers.Dense(133, activation="softmax")(x)
    107 vgg_model = tf.keras.Model(inputs, outputs)
    108
    109 vgg_model.summary()
    110
    111 vgg_model.compile(
    112 optimizer=tf.optimizers.Adam(learning_rate=float(lr)),
    113 loss=tf.losses.categorical_crossentropy,
    114 metrics=["accuracy"]
    115 )
    116
    117 train_generator = get_train_generator()
    118 valid_generator = get_valid_generator()
    119
    120 vgg_model.fit(train_generator, epochs=epochs,
    121 validation_data=valid_generator)
    122
    123
    124@step(name="resnet50-classifier",
    125 deploy_config={"limits": {"arrikto.com/gpu": "1"}})
    126def resnet50_classifier(get_train_generator: MarshalData,
    127 get_valid_generator: MarshalData, epochs: int = 2,
    128 img_size: int = 224, lr: float = 0.001):
    129 resnet_body = tf.keras.applications.ResNet50V2(
    130 weights="imagenet",
    131 include_top=False,
    132 input_shape=(img_size, img_size, 3)
    133 )
    134
    135 resnet_body.trainable = False
    136
    137 inputs = tf.keras.layers.Input(shape=(img_size, img_size, 3))
    138
    139 x = resnet_body(inputs, training=False)
    140 x = tf.keras.layers.Flatten()(x)
    141 outputs = tf.keras.layers.Dense(133, activation="softmax")(x)
    142
    143 resnet_model = tf.keras.Model(inputs, outputs)
    144
    145 resnet_model.compile(
    146 optimizer=tf.optimizers.Adam(learning_rate=float(lr)),
    147 loss=tf.losses.categorical_crossentropy,
    148 metrics=["accuracy"]
    149 )
    150
    151 train_generator = get_train_generator()
    152 valid_generator = get_valid_generator()
    153
    154 resnet_model.fit(train_generator, epochs=epochs,
    155 validation_data=valid_generator)
    156
    157
    158@pipeline(name="dog-classifier", experiment="kiwi-tutorial",
    159 deploy_config={"affinity": affinity, "labels": {"app": "dogbreed"}},
    160 autosnapshot=False)
    161def dog_classifier(epochs: int = 2, lr: float = 6e-4, batch_size: int = 32,
    162 img_size: int = 224, number_of_nodes: int = 512):
    163 train_generator, valid_generator, junk = load_data(img_size, batch_size)
    164 custom_classifier(train_generator, valid_generator, epochs, img_size,
    165 number_of_nodes, lr)
    166 vgg16_classifier(train_generator, valid_generator, epochs, img_size, lr)
    167 resnet50_classifier(train_generator, valid_generator, epochs, img_size, lr)
    168
    169
    170if __name__ == "__main__":
    171 dog_classifier(epochs=4, lr=6e-4, batch_size=32,
    172 img_size=224, number_of_nodes=512)

    In this code sample, we define three distinct pipeline steps, each of which trains an ML model on distinguishing dog breeds: custom-classifier, vgg16-classfier, and resnet50-classifier.

    We then run the pipeline, ensuring (via Pod Affinities) that all the steps run on the same Kiwi-enabled NVIDIA GPU. We do this just for illustration purposes.

    To showcase Kiwi’s GPU sharing capabilities in a multi-node GPU cluster, we first create a Pod Affinity object using the following snippet from our example program:

    affinity = V1Affinity(pod_affinity=V1PodAffinity( required_during_scheduling_ignored_during_execution=[ V1PodAffinityTerm topology_key="kubernetes.io/hostname", label_selector=V1LabelSelector( match_labels={"app": "dogbreed"}))]))

    Then we attach this Affinity to each step (Pod) of the pipeline by providing it as an argument when creating the pipeline using the following snippet:

    @pipeline(name="dog-classifier", experiment="kiwi-tutorial", deploy_config={"affinity": affinity, "labels": {"app": "dogbreed"}}, autosnapshot=False)
  6. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kiwi_dogbreed.py --kfp
  7. View the pipeline run via the Runs tab on the Kubeflow UI. You will eventually see all three GPU steps running in parallel on the same physical GPU.

    Note

    The GPU steps are running on the same physical GPU because we forced them to run on the same node using Pod Affinities. Since Kiwi only uses one physical GPU per node, they are all running on that GPU.

    ../../_images/kiwi-kfp.png

Summary

In this section you ran a Kubeflow pipeline on vGPUs using the Kale SDK.

What’s Next

Check out the rest of the documentation regarding Kiwi.