Example: Configure a PyTorch Distributed KFP Step with the Kale SDK

This section will guide you through configuring and running a PyTorch distributed step on Kubeflow, using the Kale SDK and the Kubeflow training operator.

What You’ll Need

Procedure

  1. Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Increase the size of the workspace volume to 10GB:

    ../../../../_images/notebook-volume.png
  3. Connect to the server, open a terminal, and install the torch and torchvision packages:

    $ pip3 install --user torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html

    Note

    The PyTorch version may be different. Please head to the PyTorch website for the latest releases.

    Note

    In this example, you will distribute the training job over multiple GPU devices. Thus, this command installs the CUDA version of the PyTorch package. If you are using CPU-only nodes, you can install the CPU-only version of the PyTorch package instead, and distribute the job over multiple CPU cores. You will see how to achieve that later in the user guide.

  4. Create a new python file and name it kale_dist.py:

    $ touch kale_dist.py
  5. Copy and paste the following code inside kale_dist.py:

    final.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    6"""
    7
    8import time
    9import torch
    10import torch.nn as nn
    11import torchvision.models as models
    12
    13from torchvision import datasets
    14from torchvision.transforms import ToTensor
    15from torch.utils.data import DataLoader
    16from rok_kubernetes.exceptions import ApiException
    17
    18from kale.sdk import pipeline, step
    19from kale.distributed import pytorch
    20
    21
    22def _train_step(model, data_loader, criterion, optimizer, device, args):
    23 import logging
    24
    25 log = logging.getLogger(__name__)
    26
    27 for i, (features, labels) in enumerate(data_loader):
    28 features = features.to(device)
    29 labels = labels.to(device)
    30
    31 pred = model(features)
    32 loss = criterion(pred, labels.reshape(-1))
    33
    34 loss.backward()
    35
    36 optimizer.step()
    37 optimizer.zero_grad()
    38
    39 if i % args.get("log_interval", 2) == 0:
    40 log.info("\tLoss = {}".format(loss.item()))
    41
    42 return loss
    43
    44
    45def _eval_step(model, data_loader, criterion, device, args):
    46 import logging
    47
    48 log = logging.getLogger(__name__)
    49
    50 for i, (features, labels) in enumerate(data_loader):
    51 features = features.to(device)
    52 labels = labels.to(device)
    53
    54 pred = model(features)
    55 loss = criterion(pred, labels.reshape(-1))
    56
    57 log.info("\tLoss = {}".format(loss.item()))
    58
    59 return loss
    60
    61
    62def _load_data():
    63 """Load and prepare the data."""
    64 train_data = datasets.FashionMNIST(
    65 root="/home/jovyan/data",
    66 train=True,
    67 download=True,
    68 transform=ToTensor()
    69 )
    70
    71 valid_data = datasets.FashionMNIST(
    72 root="/home/jovyan/data",
    73 train=False,
    74 download=True,
    75 transform=ToTensor()
    76 )
    77
    78 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    79 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    80
    81 return train_data_loader, valid_data_loader
    82
    83
    84@step(name="create_model")
    85def create_model():
    86 """Define the model."""
    87 model = models.resnet18(pretrained=True)
    88 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    89 padding=(3, 3), bias=False)
    90
    91 for param in model.parameters():
    92 param.requires_grad = False
    93
    94 features_in = model.fc.in_features
    95 model.fc = nn.Linear(features_in, 10)
    96
    97 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    98 criterion = nn.CrossEntropyLoss()
    99
    100 return model, optimizer, criterion
    101
    102
    103@step(name="model_training")
    104def distribute_training(model, optimizer, criterion):
    105 """Train the model."""
    106 train_data_loader, valid_data_loader = _load_data()
    107 job = pytorch.distribute(model,
    108 train_data_loader,
    109 criterion,
    110 optimizer,
    111 _train_step,
    112 eval_data_loader=valid_data_loader,
    113 eval_step=_eval_step,
    114 epochs=1,
    115 number_of_processes=2,
    116 cuda=True,
    117 train_args={"log_interval": 2})
    118
    119 return job.name
    120
    121
    122@step(name="monitor")
    123def monitor(name):
    124 """Monitor the PyTorchJob CR."""
    125 job = pytorch.PyTorchJob(name)
    126 while True: # Iterate if streaming logs raises an ApiException
    127 while True:
    128 cr = job.get()
    129 if (job.get_job_status() not in ["", "Created", "Restarting"]
    130 and (cr.get("status", {})
    131 .get("replicaStatuses", {})
    132 .get("Master", {}))):
    133 break
    134 print("Job pending...")
    135 time.sleep(2)
    136 try:
    137 job.stream_logs()
    138 break
    139 except ApiException as e:
    140 print("Streaming the logs failed with: %s" % str(e))
    141 print("Retrying...")
    142
    143
    144@pipeline(name="fmnist", experiment="kale-dist")
    145def ml_pipeline():
    146 """Run the ML pipeline."""
    147 model, optimizer, criterion = create_model()
    148 name = distribute_training(model, optimizer, criterion)
    149 monitor(name)
    150
    151
    152if __name__ == "__main__":
    153 ml_pipeline()

    This script creates a pipeline with a step that runs a PyTorch distributed process. There is also a monitor step that monitors the progress of the distributed training job.

  6. Specify your desired Kubernetes configuration for the deployment. The following snippet summarizes the changes in code:

    config.py
    1-# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2+# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    3
    4"""Kale SDK.
    5
    6This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    7"""
    8
    9-import time
    10import torch
    11import torch.nn as nn
    12import torchvision.models as models
    13-13
    13
    14from torchvision import datasets
    15from torchvision.transforms import ToTensor
    16from torch.utils.data import DataLoader
    17-from rok_kubernetes.exceptions import ApiException
    18
    19from kale.sdk import pipeline, step
    20from kale.distributed import pytorch
    21-103
    21
    22
    23def _train_step(model, data_loader, criterion, optimizer, device, args):
    24 import logging
    25
    26 log = logging.getLogger(__name__)
    27
    28 for i, (features, labels) in enumerate(data_loader):
    29 features = features.to(device)
    30 labels = labels.to(device)
    31
    32 pred = model(features)
    33 loss = criterion(pred, labels.reshape(-1))
    34
    35 loss.backward()
    36
    37 optimizer.step()
    38 optimizer.zero_grad()
    39
    40 if i % args.get("log_interval", 2) == 0:
    41 log.info("\tLoss = {}".format(loss.item()))
    42
    43 return loss
    44
    45
    46def _eval_step(model, data_loader, criterion, device, args):
    47 import logging
    48
    49 log = logging.getLogger(__name__)
    50
    51 for i, (features, labels) in enumerate(data_loader):
    52 features = features.to(device)
    53 labels = labels.to(device)
    54
    55 pred = model(features)
    56 loss = criterion(pred, labels.reshape(-1))
    57
    58 log.info("\tLoss = {}".format(loss.item()))
    59
    60 return loss
    61
    62
    63def _load_data():
    64 """Load and prepare the data."""
    65 train_data = datasets.FashionMNIST(
    66 root="/home/jovyan/data",
    67 train=True,
    68 download=True,
    69 transform=ToTensor()
    70 )
    71
    72 valid_data = datasets.FashionMNIST(
    73 root="/home/jovyan/data",
    74 train=False,
    75 download=True,
    76 transform=ToTensor()
    77 )
    78
    79 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    80 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    81
    82 return train_data_loader, valid_data_loader
    83
    84
    85@step(name="create_model")
    86def create_model():
    87 """Define the model."""
    88 model = models.resnet18(pretrained=True)
    89 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    90 padding=(3, 3), bias=False)
    91
    92 for param in model.parameters():
    93 param.requires_grad = False
    94
    95 features_in = model.fc.in_features
    96 model.fc = nn.Linear(features_in, 10)
    97
    98 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    99 criterion = nn.CrossEntropyLoss()
    100
    101 return model, optimizer, criterion
    102
    103
    104@step(name="model_training")
    105def distribute_training(model, optimizer, criterion):
    106 """Train the model."""
    107+ distributed_config = {"limits": {"cpu": "1", "memory": "4Gi"},
    108+ "requests": {"cpu": "100m", "memory": "3Gi"},
    109+ "run_policy": {"cleanPodPolicy": "All"}}
    110+
    111 train_data_loader, valid_data_loader = _load_data()
    112 job = pytorch.distribute(model,
    113 train_data_loader,
    114-118
    114 criterion,
    115 optimizer,
    116 _train_step,
    117 eval_data_loader=valid_data_loader,
    118 eval_step=_eval_step,
    119 epochs=1,
    120 number_of_processes=2,
    121 cuda=True,
    122- train_args={"log_interval": 2})
    123+ train_args={"log_interval": 2},
    124+ config=distributed_config)
    125
    126 return job.name
    127
    128-129
    128
    129@step(name="monitor")
    130def monitor(name):
    131 """Monitor the PyTorchJob CR."""
    132 job = pytorch.PyTorchJob(name)
    133- while True: # Iterate if streaming logs raises an ApiException
    134- while True:
    135- cr = job.get()
    136- if (job.get_job_status() not in ["", "Created", "Restarting"]
    137- and (cr.get("status", {})
    138- .get("replicaStatuses", {})
    139- .get("Master", {}))):
    140- break
    141- print("Job pending...")
    142- time.sleep(2)
    143- try:
    144- job.stream_logs()
    145- break
    146- except ApiException as e:
    147- print("Streaming the logs failed with: %s" % str(e))
    148- print("Retrying...")
    149+ while job.get_job_status() != "Running":
    150+ continue
    151+ job.stream_logs()
    152
    153
    154@pipeline(name="fmnist", experiment="kale-dist")
    155-160
    155def ml_pipeline():
    156 """Run the ML pipeline."""
    157 model, optimizer, criterion = create_model()
    158 name = distribute_training(model, optimizer, criterion)
    159 monitor(name)
    160
    161
    162if __name__ == "__main__":
    163 ml_pipeline()

    In this example, you are setting the requests and limits for every container that is part of the distributed training process. Furthermore, by setting the cleanPodPolicy to All, the controller will delete every pod when the training job is complete.

    Important

    The container-level options that you set in the configuration object, such as env, labels, annotations, limits, requests, etc., are propagated to every container that is part of the distributed training process. Thus, the master and every worker pod will have the same container-level options.

    Note

    To find out about all available configurations you can specify, take a look at the DistributedConfig API.

  7. Optional

    Produce a workflow YAML file that you can inspect:

    $ python3 -m kale kale_dist.py --compile
  8. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kale_dist.py --kfp

    Note

    To see the complete list of arguments and their respective usage, run python3 -m kale --help.

Summary

You have successfully configured and run a PyTorch distributed step on Kubeflow, using Kale and the Kubeflow training operator.

What’s Next

The next step is to schedule the value of the learning rate parameter in a PyTorch distributed KFP step using the Kale SDK.