Example: Schedule the Learning Rate in a PyTorch Distributed Run

This section will guide you through creating a PyTorch distributed experiment, which reduces the value of the learning rate parameter as the experiment progresses.

What You’ll Need

Procedure

  1. Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Increase the size of the workspace volume to 10GB:

    ../../../../_images/notebook-volume.png
  3. Connect to the server, open a terminal, and install the torch and torchvision packages:

    $ pip3 install --user torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html

    Note

    The PyTorch version may be different. Please head to the PyTorch website for the latest releases.

    Note

    In this example, you will distribute the training job over multiple GPU devices. Thus, this command installs the CUDA version of the PyTorch package. If you are using CPU-only nodes, you can install the CPU-only version of the PyTorch package instead, and distribute the job over multiple CPU cores. You will see how to achieve that later in the user guide.

  4. Create a new python file and name it kale_dist.py:

    $ touch kale_dist.py
  5. Copy and paste the following code inside kale_dist.py:

    final.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    6"""
    7
    8import time
    9import torch
    10import torch.nn as nn
    11import torchvision.models as models
    12
    13from torchvision import datasets
    14from torchvision.transforms import ToTensor
    15from torch.utils.data import DataLoader
    16from rok_kubernetes.exceptions import ApiException
    17
    18from kale.sdk import pipeline, step
    19from kale.distributed import pytorch
    20
    21
    22def _train_step(model, data_loader, criterion, optimizer, device, args):
    23 import logging
    24
    25 log = logging.getLogger(__name__)
    26
    27 for i, (features, labels) in enumerate(data_loader):
    28 features = features.to(device)
    29 labels = labels.to(device)
    30
    31 pred = model(features)
    32 loss = criterion(pred, labels.reshape(-1))
    33
    34 loss.backward()
    35
    36 optimizer.step()
    37 optimizer.zero_grad()
    38
    39 if i % args.get("log_interval", 2) == 0:
    40 log.info("\tLoss = {}".format(loss.item()))
    41
    42 return loss
    43
    44
    45def _eval_step(model, data_loader, criterion, device, args):
    46 import logging
    47
    48 log = logging.getLogger(__name__)
    49
    50 for i, (features, labels) in enumerate(data_loader):
    51 features = features.to(device)
    52 labels = labels.to(device)
    53
    54 pred = model(features)
    55 loss = criterion(pred, labels.reshape(-1))
    56
    57 log.info("\tLoss = {}".format(loss.item()))
    58
    59 return loss
    60
    61
    62def _load_data():
    63 """Load and prepare the data."""
    64 train_data = datasets.FashionMNIST(
    65 root="/home/jovyan/data",
    66 train=True,
    67 download=True,
    68 transform=ToTensor()
    69 )
    70
    71 valid_data = datasets.FashionMNIST(
    72 root="/home/jovyan/data",
    73 train=False,
    74 download=True,
    75 transform=ToTensor()
    76 )
    77
    78 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    79 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    80
    81 return train_data_loader, valid_data_loader
    82
    83
    84@step(name="create_model")
    85def create_model():
    86 """Define the model."""
    87 model = models.resnet18(pretrained=True)
    88 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    89 padding=(3, 3), bias=False)
    90
    91 for param in model.parameters():
    92 param.requires_grad = False
    93
    94 features_in = model.fc.in_features
    95 model.fc = nn.Linear(features_in, 10)
    96
    97 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    98 criterion = nn.CrossEntropyLoss()
    99
    100 return model, optimizer, criterion
    101
    102
    103@step(name="model_training")
    104def distribute_training(model, optimizer, criterion):
    105 """Train the model."""
    106 train_data_loader, valid_data_loader = _load_data()
    107 job = pytorch.distribute(model,
    108 train_data_loader,
    109 criterion,
    110 optimizer,
    111 _train_step,
    112 eval_data_loader=valid_data_loader,
    113 eval_step=_eval_step,
    114 epochs=1,
    115 number_of_processes=2,
    116 cuda=True,
    117 train_args={"log_interval": 2})
    118
    119 return job.name
    120
    121
    122@step(name="monitor")
    123def monitor(name):
    124 """Monitor the PyTorchJob CR."""
    125 job = pytorch.PyTorchJob(name)
    126 while True: # Iterate if streaming logs raises an ApiException
    127 while True:
    128 cr = job.get()
    129 if (job.get_job_status() not in ["", "Created", "Restarting"]
    130 and (cr.get("status", {})
    131 .get("replicaStatuses", {})
    132 .get("Master", {}))):
    133 break
    134 print("Job pending...")
    135 time.sleep(2)
    136 try:
    137 job.stream_logs()
    138 break
    139 except ApiException as e:
    140 print("Streaming the logs failed with: %s" % str(e))
    141 print("Retrying...")
    142
    143
    144@pipeline(name="fmnist", experiment="kale-dist")
    145def ml_pipeline():
    146 """Run the ML pipeline."""
    147 model, optimizer, criterion = create_model()
    148 name = distribute_training(model, optimizer, criterion)
    149 monitor(name)
    150
    151
    152if __name__ == "__main__":
    153 ml_pipeline()

    This script creates a pipeline with a step that runs a PyTorch distributed process. There is also a monitor step that monitors the progress of the distributed training job.

  6. Reduce the value of the learning rate parameter as the number of epochs increases. The following snippet summarizes the changes in code:

    scheduler.py
    1-# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2+# Copyright © 2022 Arrikto Inc. All Rights Reserved.
    3
    4"""Kale SDK.
    5
    6This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    7"""
    8
    9-import time
    10import torch
    11import torch.nn as nn
    12import torchvision.models as models
    13-13
    13
    14from torchvision import datasets
    15from torchvision.transforms import ToTensor
    16from torch.utils.data import DataLoader
    17-from rok_kubernetes.exceptions import ApiException
    18
    19from kale.sdk import pipeline, step
    20from kale.distributed import pytorch
    21-23
    21
    22
    23def _train_step(model, data_loader, criterion, optimizer, device, args):
    24 import logging
    25
    26 log = logging.getLogger(__name__)
    27+
    28+ epoch = args.get("epoch")
    29+
    30+ for g in optimizer.param_groups:
    31+ if epoch > 2:
    32+ g["lr"] = g["lr"] * torch.exp(torch.tensor(-1))
    33
    34 for i, (features, labels) in enumerate(data_loader):
    35 features = features.to(device)
    36-117
    36 labels = labels.to(device)
    37
    38 pred = model(features)
    39 loss = criterion(pred, labels.reshape(-1))
    40
    41 loss.backward()
    42
    43 optimizer.step()
    44 optimizer.zero_grad()
    45
    46 if i % args.get("log_interval", 2) == 0:
    47 log.info("\tLoss = {}".format(loss.item()))
    48
    49 return loss
    50
    51
    52def _eval_step(model, data_loader, criterion, device, args):
    53 import logging
    54
    55 log = logging.getLogger(__name__)
    56
    57 for i, (features, labels) in enumerate(data_loader):
    58 features = features.to(device)
    59 labels = labels.to(device)
    60
    61 pred = model(features)
    62 loss = criterion(pred, labels.reshape(-1))
    63
    64 log.info("\tLoss = {}".format(loss.item()))
    65
    66 return loss
    67
    68
    69def _load_data():
    70 """Load and prepare the data."""
    71 train_data = datasets.FashionMNIST(
    72 root="/home/jovyan/data",
    73 train=True,
    74 download=True,
    75 transform=ToTensor()
    76 )
    77
    78 valid_data = datasets.FashionMNIST(
    79 root="/home/jovyan/data",
    80 train=False,
    81 download=True,
    82 transform=ToTensor()
    83 )
    84
    85 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    86 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    87
    88 return train_data_loader, valid_data_loader
    89
    90
    91@step(name="create_model")
    92def create_model():
    93 """Define the model."""
    94 model = models.resnet18(pretrained=True)
    95 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    96 padding=(3, 3), bias=False)
    97
    98 for param in model.parameters():
    99 param.requires_grad = False
    100
    101 features_in = model.fc.in_features
    102 model.fc = nn.Linear(features_in, 10)
    103
    104 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    105 criterion = nn.CrossEntropyLoss()
    106
    107 return model, optimizer, criterion
    108
    109
    110@step(name="model_training")
    111def distribute_training(model, optimizer, criterion):
    112 """Train the model."""
    113 train_data_loader, valid_data_loader = _load_data()
    114 job = pytorch.distribute(model,
    115 train_data_loader,
    116 criterion,
    117 optimizer,
    118 _train_step,
    119 eval_data_loader=valid_data_loader,
    120 eval_step=_eval_step,
    121- epochs=1,
    122+ epochs=4,
    123 number_of_processes=2,
    124 cuda=True,
    125 train_args={"log_interval": 2})
    126-130
    126
    127 return job.name
    128
    129
    130@step(name="monitor")
    131def monitor(name):
    132 """Monitor the PyTorchJob CR."""
    133 job = pytorch.PyTorchJob(name)
    134- while True: # Iterate if streaming logs raises an ApiException
    135- while True:
    136- cr = job.get()
    137- if (job.get_job_status() not in ["", "Created", "Restarting"]
    138- and (cr.get("status", {})
    139- .get("replicaStatuses", {})
    140- .get("Master", {}))):
    141- break
    142- print("Job pending...")
    143- time.sleep(2)
    144- try:
    145- job.stream_logs()
    146- break
    147- except ApiException as e:
    148- print("Streaming the logs failed with: %s" % str(e))
    149- print("Retrying...")
    150+ while job.get_job_status() != "Running":
    151+ continue
    152+ job.stream_logs()
    153
    154
    155@pipeline(name="fmnist", experiment="kale-dist")
    156-161
    156def ml_pipeline():
    157 """Run the ML pipeline."""
    158 model, optimizer, criterion = create_model()
    159 name = distribute_training(model, optimizer, criterion)
    160 monitor(name)
    161
    162
    163if __name__ == "__main__":
    164 ml_pipeline()

    Kale passes the current epoch number to the args dictionary of both train and eval step functions. In this example, you retrieve it inside the train_step function and use it to schedule the value of the learning rate parameter.

    Specifically, the training process runs for 4 epochs. The value of the learning rate parameter remains constant for the first 2 epochs. Then, the learning rate parameter is reduced according to the formula you defined.

  7. Optional

    Produce a workflow YAML file that you can inspect:

    $ python3 -m kale kale_dist.py --compile
  8. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kale_dist.py --kfp

    Note

    To see the complete list of arguments and their respective usage, run python3 -m kale --help.

Summary

You have successfully created a PyTorch ristributed run, which reduces the value of the learning rate parameter as the experiment progresses.

What’s Next

Check out the rest of the Kale user guides.