PyTorch Distributed with Kale

This section will guide you through creating and managing a PyTorchJob CR on Kubeflow, with Kale and the Kubeflow Training Operator.

For this guide, we leverage the interactive environment of JupyterLab, but this is completely optional. We do it to demonstrate how you can monitor such a job using Kale’s PyTorchJob client.

What You’ll Need

  • An Arrikto EKF or MiniKF deployment with the default Kale Docker image.

Procedure

  1. Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Increase the size of the workspace volume to 10GB:

    ../../../../_images/notebook-volume.png
  3. Connect to the server, open a terminal, and install the torch and torchvision packages:

    $ pip3 install --user torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html

    Note

    The PyTorch version may be different. Please head to the PyTorch website for the latest releases.

    Note

    In this example, you will distribute the training job over multiple GPU devices. Thus, this command installs the CUDA version of the PyTorch package. If you are using CPU-only nodes, you can install the CPU-only version of the PyTorch package instead, and distribute the job over multiple CPU cores. You will see how to achieve that later in the user guide.

  4. Create a new Jupyter Notebook (that is, an IPYNB file) using the JupyterLab UI and name it kale_dist.ipynb:

    ../../../../_images/ipynb.png
  5. Copy and paste the import statements on the top code cell. Then run it:

    import torch import torch.nn as nn import torchvision.models as models from torchvision import datasets from torchvision.transforms import ToTensor from torch.utils.data import DataLoader from kale.distributed import pytorch

    This is how your notebook cell will look like:

    ../../../../_images/imports.png
  6. Download the Fashion MNIST dataset. Copy and paste the following code snippet to a new code cell. Then run it:

    train_data = datasets.FashionMNIST( root="/home/jovyan/data", train=True, download=True, transform=ToTensor() ) valid_data = datasets.FashionMNIST( root="/home/jovyan/data", train=False, download=True, transform=ToTensor() ) train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True) valid_data_loader = DataLoader(valid_data, batch_size=2014)

    This is how your notebook cell will look like:

    ../../../../_images/fmnist.png

    This cell will download the Fashion MNIST training and validation splits and transform them into PyTorch datasets. Then, you can build PyTorch DataLoader objects to iterate over the data.

    Important

    Always use absolute paths when specifying the path to a file or folder. For example, during the creation of the train_data dataset, we specify where PyTorch should download the FashionMNIST dataset.

    Important

    Working inside a JupyterLab environment is only supported for built-in PyTorch Datasets. This includes torchvision, torchaudio, and torchtext datasets. This issue is due to how serialization works. Pickle and torch.save cannot follow the dependency graph of an object to create a hermetic package, thus external dependencies, like a custom Dataset definition, usually break the process. If you need to define a custom Dataset, by subclassing the torch.utils.data.Dataset class, you will need to work with the Kale SDK. For this, head to the Kale SDK example.

  7. Define a Convolutional Neural Network architecture, the optimizer, and the criterion (i.e., loss function). Copy and paste the following code snippet to a new code cell. Then run it:

    model = models.resnet18(pretrained=True) model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False) for param in model.parameters(): param.requires_grad = False features_in = model.fc.in_features model.fc = nn.Linear(features_in, 10) optimizer = torch.optim.Adam(model.parameters(), lr=.03) criterion = nn.CrossEntropyLoss()

    This is how your notebook cell will look like:

    ../../../../_images/model.png

    This cell downloads a pretrained PyTorch model. This model is called ResNet, a common model architecture for computer vision challenges. You will fine-tune this model architecture on the FashionMNIST dataset in a distributed manner. Finally, we instantiate the optimizer and the criterion (that is, the loss function).

  8. Define the step of the training procedure. Copy and paste the following code snippet to a new code cell. Then run it:

    def train_step(model, data_loader, criterion, optimizer, device, args): import logging log = logging.getLogger(__name__) for i, (features, labels) in enumerate(data_loader): features = features.to(device) labels = labels.to(device) pred = model(features) loss = criterion(pred, labels.reshape(-1)) loss.backward() optimizer.step() optimizer.zero_grad() if i % args.get("log_interval", 2) == 0: log.info(f"\tProgress: {100. * i / len(data_loader):.2f}%" f"\tLoss = {loss.item():.4f}") return loss

    This is how your notebook cell will look like:

    ../../../../_images/trainstep.png

    You can customize what happens during training using this function. You can pass any additional arguments using the args Python dictionary. In this example, you use the log_interval argument to control how often you log the training progress.

    Important

    The train_step function must have exactly this signature and it must be a standalone function. This means that you should import all the Python modules the function depends on within its body and you must pass any extra arguments inside the args dictionary.

  9. Optional

    Define the evaluation step of the training procedure. Copy and paste the following code snippet to a new code cell. Then run it:

    def eval_step(model, data_loader, criterion, device, args): import logging log = logging.getLogger(__name__) for i, (features, labels) in enumerate(data_loader): features = features.to(device) labels = labels.to(device) pred = model(features) loss = criterion(pred, labels.reshape(-1)) log.info(f"\tProgress: {100. * i / len(data_loader):.2f}%" f"\tLoss = {loss.item():.4f}") return loss

    This is how your notebook cell will look like:

    ../../../../_images/evalstep.png

    With this function you can log the performance of your model on a validation dataset. Kale will call this function at the end of each epoch. This is an optional step, however, it is highly recommended to evaluate your model on a validation dataset that is separate from the one that was used to train the model.

    Important

    The eval_step function must have exactly this signature and it must be a standalone function. This means that you should import all the Python modules the function depends on within its body and you must pass any extra arguments inside the args dictionary.

  10. Create and submit a PytorchJob CR. Copy and paste the following code snippet to a new code cell. Then run it:

    pytorch_job = pytorch.distribute( model, train_data_loader, criterion, optimizer, train_step, eval_data_loader=valid_data_loader, eval_step=eval_step, epochs=1, number_of_processes=2, cuda=True, train_args={"log_interval": 2} )

    This is how your notebook cell will look like:

    ../../../../_images/distribute.png

    At a high level, the distribute function follows this process:

    • Save several assets to a local folder, including the training and evaluation functions, the dataset, the DataLoader, etc..
    • Snapshot the volumes mounted to the Notebook Server.
    • Hydrate new PVCs starting from the snapshots of the previous step.
    • Create and submit a PytorchJob CR. The master and the workers all mount the newly created PVCs as RWX.

    Upon submission of the CR, the Training Operator creates the two processes you requested with the number_of_processes argument (one is always the master, so #processes - 1 = #workers). By default, each process requests to consume a GPU device. These Pods run a Kale entrypoint which

    • Looks for the assets saved during the preparation phase in the local FS (backed by one of the RWX PVCs), and loads them into memory.
    • Prepares the PyTorch objects for distributed training.
    • Initializes the process group so that the master and the workers can synchronize.
    • Starts the training and evaluation loop, calling train_step and eval_step.

    Note

    If you want to distribute your model across multiple CPU cores, you can set the cuda argument to False. By default, Kale will launch two processes (the minimum number of processes required by the PyTorchJob CR), on two different GPU devices.

  11. Monitor the PytorchJob CR. Copy and paste the following code snippet to a new code cell. Then run it:

    pytorch_job.get(watch=True)

    This is how your notebook cell will look like:

    ../../../../_images/monitor.png

    Note

    In this step you monitor the state of the Job. The state can be in one of the following states: Created, Running, Failed, or Succeded. This call blocks until the training process is finished. To continue with the next step and view the logs, you can stop the interpreter by pressing the stop button in the Notebook UI. Otherwise, you can call the get_job_status function of the client with no arguments. The function will return immediately, reporting back the current status of the Job.

  12. Stream the logs of the master process. Copy and paste the following code snippet to a new code cell. Then run it:

    pytorch_job.stream_logs()

    This is how your notebook cell will look like:

    ../../../../_images/logs.png

    Note

    In this step you view the logs of the pod running the master process. You can view the logs of the worker as well, however, in most cases, they are identical. This call blocks until the training process is finished. If you want to continue executing other Notebook cells, you can stop the interpreter by pressing the stop button in the Notebook UI.

  13. When the training process is completed, you can delete the PytorchJob CR. Copy and paste the following code snippet to a new code cell. Then run it:

    pytorch_job.delete()

    This is how your notebook cell will look like:

    ../../../../_images/delete.png

    Important

    After the completion of the training process the controller will not remove the resources it creates. If you do not want to leave stale resources you have to manually delete the CR.

Summary

You have successfully run a PyTorch distributed process using Kale and the Kubeflow Training Operator.

What’s Next

The next step is to create a PyTorch distributed KFP step using the Kale SDK.