Example: Schedule the Learning Rate in a PyTorch Distributed Run¶
This section will guide you through creating a PyTorch distributed experiment, which reduces the value of the learning rate parameter as the experiment progresses.
Overview
What You’ll Need¶
- An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
- An understanding of how the Kale SDK works.
- An understanding of how to create and submit a
PyTorchJob
CR, using Kale and the training operator.
Procedure¶
Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Increase the size of the workspace volume to 10GB:
Connect to the server, open a terminal, and install the
torch
andtorchvision
packages:$ pip3 install --user torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.htmlNote
The PyTorch version may be different. Please head to the PyTorch website for the latest releases.
Note
In this example, you will distribute the training job over multiple GPU devices. Thus, this command installs the CUDA version of the PyTorch package. If you are using CPU-only nodes, you can install the CPU-only version of the PyTorch package instead, and distribute the job over multiple CPU cores. You will see how to achieve that later in the user guide.
Create a new python file and name it
kale_dist.py
:$ touch kale_dist.pyCopy and paste the following code inside
kale_dist.py
:final.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script creates a PyTorch distributed KFP step to solve Fashion MNIST. 6 """ 7 8 import time 9 import torch 10 import torch.nn as nn 11 import torchvision.models as models 12 13 from torchvision import datasets 14 from torchvision.transforms import ToTensor 15 from torch.utils.data import DataLoader 16 from rok_kubernetes.exceptions import ApiException 17 18 from kale.sdk import pipeline, step 19 from kale.distributed import pytorch 20 21 22 def _train_step(model, data_loader, criterion, optimizer, device, args): 23 import logging 24 25 log = logging.getLogger(__name__) 26 27 for i, (features, labels) in enumerate(data_loader): 28 features = features.to(device) 29 labels = labels.to(device) 30 31 pred = model(features) 32 loss = criterion(pred, labels.reshape(-1)) 33 34 loss.backward() 35 36 optimizer.step() 37 optimizer.zero_grad() 38 39 if i % args.get("log_interval", 2) == 0: 40 log.info("\tLoss = {}".format(loss.item())) 41 42 return loss 43 44 45 def _eval_step(model, data_loader, criterion, device, args): 46 import logging 47 48 log = logging.getLogger(__name__) 49 50 for i, (features, labels) in enumerate(data_loader): 51 features = features.to(device) 52 labels = labels.to(device) 53 54 pred = model(features) 55 loss = criterion(pred, labels.reshape(-1)) 56 57 log.info("\tLoss = {}".format(loss.item())) 58 59 return loss 60 61 62 def _load_data(): 63 """Load and prepare the data.""" 64 train_data = datasets.FashionMNIST( 65 root="/home/jovyan/data", 66 train=True, 67 download=True, 68 transform=ToTensor() 69 ) 70 71 valid_data = datasets.FashionMNIST( 72 root="/home/jovyan/data", 73 train=False, 74 download=True, 75 transform=ToTensor() 76 ) 77 78 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True) 79 valid_data_loader = DataLoader(valid_data, batch_size=2014) 80 81 return train_data_loader, valid_data_loader 82 83 84 @step(name="create_model") 85 def create_model(): 86 """Define the model.""" 87 model = models.resnet18(pretrained=True) 88 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), 89 padding=(3, 3), bias=False) 90 91 for param in model.parameters(): 92 param.requires_grad = False 93 94 features_in = model.fc.in_features 95 model.fc = nn.Linear(features_in, 10) 96 97 optimizer = torch.optim.Adam(model.parameters(), lr=.03) 98 criterion = nn.CrossEntropyLoss() 99 100 return model, optimizer, criterion 101 102 103 @step(name="model_training") 104 def distribute_training(model, optimizer, criterion): 105 """Train the model.""" 106 train_data_loader, valid_data_loader = _load_data() 107 job = pytorch.distribute(model, 108 train_data_loader, 109 criterion, 110 optimizer, 111 _train_step, 112 eval_data_loader=valid_data_loader, 113 eval_step=_eval_step, 114 epochs=1, 115 number_of_processes=2, 116 cuda=True, 117 train_args={"log_interval": 2}) 118 119 return job.name 120 121 122 @step(name="monitor") 123 def monitor(name): 124 """Monitor the PyTorchJob CR.""" 125 job = pytorch.PyTorchJob(name) 126 while True: # Iterate if streaming logs raises an ApiException 127 while True: 128 cr = job.get() 129 if (job.get_job_status() not in ["", "Created", "Restarting"] 130 and (cr.get("status", {}) 131 .get("replicaStatuses", {}) 132 .get("Master", {}))): 133 break 134 print("Job pending...") 135 time.sleep(2) 136 try: 137 job.stream_logs() 138 break 139 except ApiException as e: 140 print("Streaming the logs failed with: %s" % str(e)) 141 print("Retrying...") 142 143 144 @pipeline(name="fmnist", experiment="kale-dist") 145 def ml_pipeline(): 146 """Run the ML pipeline.""" 147 model, optimizer, criterion = create_model() 148 name = distribute_training(model, optimizer, criterion) 149 monitor(name) 150 151 152 if __name__ == "__main__": 153 ml_pipeline() This script creates a pipeline with a step that runs a PyTorch distributed process. There is also a
monitor
step that monitors the progress of the distributed training job.Reduce the value of the learning rate parameter as the number of epochs increases. The following snippet summarizes the changes in code:
scheduler.py1 - # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 + # Copyright © 2022 Arrikto Inc. All Rights Reserved. 3 4 """Kale SDK. 5 6 This script creates a PyTorch distributed KFP step to solve Fashion MNIST. 7 """ 8 9 - import time 10 import torch 11 import torch.nn as nn 12 import torchvision.models as models 13-13 13 14 from torchvision import datasets 15 from torchvision.transforms import ToTensor 16 from torch.utils.data import DataLoader 17 - from rok_kubernetes.exceptions import ApiException 18 19 from kale.sdk import pipeline, step 20 from kale.distributed import pytorch 21-23 21 22 23 def _train_step(model, data_loader, criterion, optimizer, device, args): 24 import logging 25 26 log = logging.getLogger(__name__) 27 + 28 + epoch = args.get("epoch") 29 + 30 + for g in optimizer.param_groups: 31 + if epoch > 2: 32 + g["lr"] = g["lr"] * torch.exp(torch.tensor(-1)) 33 34 for i, (features, labels) in enumerate(data_loader): 35 features = features.to(device) 36-117 36 labels = labels.to(device) 37 38 pred = model(features) 39 loss = criterion(pred, labels.reshape(-1)) 40 41 loss.backward() 42 43 optimizer.step() 44 optimizer.zero_grad() 45 46 if i % args.get("log_interval", 2) == 0: 47 log.info("\tLoss = {}".format(loss.item())) 48 49 return loss 50 51 52 def _eval_step(model, data_loader, criterion, device, args): 53 import logging 54 55 log = logging.getLogger(__name__) 56 57 for i, (features, labels) in enumerate(data_loader): 58 features = features.to(device) 59 labels = labels.to(device) 60 61 pred = model(features) 62 loss = criterion(pred, labels.reshape(-1)) 63 64 log.info("\tLoss = {}".format(loss.item())) 65 66 return loss 67 68 69 def _load_data(): 70 """Load and prepare the data.""" 71 train_data = datasets.FashionMNIST( 72 root="/home/jovyan/data", 73 train=True, 74 download=True, 75 transform=ToTensor() 76 ) 77 78 valid_data = datasets.FashionMNIST( 79 root="/home/jovyan/data", 80 train=False, 81 download=True, 82 transform=ToTensor() 83 ) 84 85 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True) 86 valid_data_loader = DataLoader(valid_data, batch_size=2014) 87 88 return train_data_loader, valid_data_loader 89 90 91 @step(name="create_model") 92 def create_model(): 93 """Define the model.""" 94 model = models.resnet18(pretrained=True) 95 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), 96 padding=(3, 3), bias=False) 97 98 for param in model.parameters(): 99 param.requires_grad = False 100 101 features_in = model.fc.in_features 102 model.fc = nn.Linear(features_in, 10) 103 104 optimizer = torch.optim.Adam(model.parameters(), lr=.03) 105 criterion = nn.CrossEntropyLoss() 106 107 return model, optimizer, criterion 108 109 110 @step(name="model_training") 111 def distribute_training(model, optimizer, criterion): 112 """Train the model.""" 113 train_data_loader, valid_data_loader = _load_data() 114 job = pytorch.distribute(model, 115 train_data_loader, 116 criterion, 117 optimizer, 118 _train_step, 119 eval_data_loader=valid_data_loader, 120 eval_step=_eval_step, 121 - epochs=1, 122 + epochs=4, 123 number_of_processes=2, 124 cuda=True, 125 train_args={"log_interval": 2}) 126-130 126 127 return job.name 128 129 130 @step(name="monitor") 131 def monitor(name): 132 """Monitor the PyTorchJob CR.""" 133 job = pytorch.PyTorchJob(name) 134 - while True: # Iterate if streaming logs raises an ApiException 135 - while True: 136 - cr = job.get() 137 - if (job.get_job_status() not in ["", "Created", "Restarting"] 138 - and (cr.get("status", {}) 139 - .get("replicaStatuses", {}) 140 - .get("Master", {}))): 141 - break 142 - print("Job pending...") 143 - time.sleep(2) 144 - try: 145 - job.stream_logs() 146 - break 147 - except ApiException as e: 148 - print("Streaming the logs failed with: %s" % str(e)) 149 - print("Retrying...") 150 + while job.get_job_status() != "Running": 151 + continue 152 + job.stream_logs() 153 154 155 @pipeline(name="fmnist", experiment="kale-dist") 156-161 156 def ml_pipeline(): 157 """Run the ML pipeline.""" 158 model, optimizer, criterion = create_model() 159 name = distribute_training(model, optimizer, criterion) 160 monitor(name) 161 162 163 if __name__ == "__main__": 164 ml_pipeline() Kale passes the current epoch number to the
args
dictionary of bothtrain
andeval
step functions. In this example, you retrieve it inside thetrain_step
function and use it to schedule the value of the learning rate parameter.Specifically, the training process runs for 4 epochs. The value of the learning rate parameter remains constant for the first 2 epochs. Then, the learning rate parameter is reduced according to the formula you defined.
Optional
Produce a workflow YAML file that you can inspect:
$ python3 -m kale kale_dist.py --compileDeploy and run your code as a KFP pipeline:
$ python3 -m kale kale_dist.py --kfpNote
To see the complete list of arguments and their respective usage, run
python3 -m kale --help
.