Example: Configure a PyTorch Distributed KFP Step with the Kale SDK¶
This section will guide you through configuring and running a PyTorch distributed step on Kubeflow, using the Kale SDK and the Kubeflow training operator.
Overview
What You’ll Need¶
- An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
- An understanding of how the Kale SDK works.
- An understanding of how to create and submit a
PyTorchJob
CR, using Kale and the training operator.
Procedure¶
Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Increase the size of the workspace volume to 10GB:
Connect to the server, open a terminal, and install the
torch
andtorchvision
packages:$ pip3 install --user torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.htmlNote
The PyTorch version may be different. Please head to the PyTorch website for the latest releases.
Note
In this example, you will distribute the training job over multiple GPU devices. Thus, this command installs the CUDA version of the PyTorch package. If you are using CPU-only nodes, you can install the CPU-only version of the PyTorch package instead, and distribute the job over multiple CPU cores. You will see how to achieve that later in the user guide.
Create a new python file and name it
kale_dist.py
:$ touch kale_dist.pyCopy and paste the following code inside
kale_dist.py
:final.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script creates a PyTorch distributed KFP step to solve Fashion MNIST. 6 """ 7 8 import time 9 import torch 10 import torch.nn as nn 11 import torchvision.models as models 12 13 from torchvision import datasets 14 from torchvision.transforms import ToTensor 15 from torch.utils.data import DataLoader 16 from rok_kubernetes.exceptions import ApiException 17 18 from kale.sdk import pipeline, step 19 from kale.distributed import pytorch 20 21 22 def _train_step(model, data_loader, criterion, optimizer, device, args): 23 import logging 24 25 log = logging.getLogger(__name__) 26 27 for i, (features, labels) in enumerate(data_loader): 28 features = features.to(device) 29 labels = labels.to(device) 30 31 pred = model(features) 32 loss = criterion(pred, labels.reshape(-1)) 33 34 loss.backward() 35 36 optimizer.step() 37 optimizer.zero_grad() 38 39 if i % args.get("log_interval", 2) == 0: 40 log.info("\tLoss = {}".format(loss.item())) 41 42 return loss 43 44 45 def _eval_step(model, data_loader, criterion, device, args): 46 import logging 47 48 log = logging.getLogger(__name__) 49 50 for i, (features, labels) in enumerate(data_loader): 51 features = features.to(device) 52 labels = labels.to(device) 53 54 pred = model(features) 55 loss = criterion(pred, labels.reshape(-1)) 56 57 log.info("\tLoss = {}".format(loss.item())) 58 59 return loss 60 61 62 def _load_data(): 63 """Load and prepare the data.""" 64 train_data = datasets.FashionMNIST( 65 root="/home/jovyan/data", 66 train=True, 67 download=True, 68 transform=ToTensor() 69 ) 70 71 valid_data = datasets.FashionMNIST( 72 root="/home/jovyan/data", 73 train=False, 74 download=True, 75 transform=ToTensor() 76 ) 77 78 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True) 79 valid_data_loader = DataLoader(valid_data, batch_size=2014) 80 81 return train_data_loader, valid_data_loader 82 83 84 @step(name="create_model") 85 def create_model(): 86 """Define the model.""" 87 model = models.resnet18(pretrained=True) 88 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), 89 padding=(3, 3), bias=False) 90 91 for param in model.parameters(): 92 param.requires_grad = False 93 94 features_in = model.fc.in_features 95 model.fc = nn.Linear(features_in, 10) 96 97 optimizer = torch.optim.Adam(model.parameters(), lr=.03) 98 criterion = nn.CrossEntropyLoss() 99 100 return model, optimizer, criterion 101 102 103 @step(name="model_training") 104 def distribute_training(model, optimizer, criterion): 105 """Train the model.""" 106 train_data_loader, valid_data_loader = _load_data() 107 job = pytorch.distribute(model, 108 train_data_loader, 109 criterion, 110 optimizer, 111 _train_step, 112 eval_data_loader=valid_data_loader, 113 eval_step=_eval_step, 114 epochs=1, 115 number_of_processes=2, 116 cuda=True, 117 train_args={"log_interval": 2}) 118 119 return job.name 120 121 122 @step(name="monitor") 123 def monitor(name): 124 """Monitor the PyTorchJob CR.""" 125 job = pytorch.PyTorchJob(name) 126 while True: # Iterate if streaming logs raises an ApiException 127 while True: 128 cr = job.get() 129 if (job.get_job_status() not in ["", "Created", "Restarting"] 130 and (cr.get("status", {}) 131 .get("replicaStatuses", {}) 132 .get("Master", {}))): 133 break 134 print("Job pending...") 135 time.sleep(2) 136 try: 137 job.stream_logs() 138 break 139 except ApiException as e: 140 print("Streaming the logs failed with: %s" % str(e)) 141 print("Retrying...") 142 143 144 @pipeline(name="fmnist", experiment="kale-dist") 145 def ml_pipeline(): 146 """Run the ML pipeline.""" 147 model, optimizer, criterion = create_model() 148 name = distribute_training(model, optimizer, criterion) 149 monitor(name) 150 151 152 if __name__ == "__main__": 153 ml_pipeline() This script creates a pipeline with a step that runs a PyTorch distributed process. There is also a monitor step that monitors the progress of the distributed training job.
Specify your desired Kubernetes configuration for the deployment. The following snippet summarizes the changes in code:
config.py1 - # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 + # Copyright © 2021 Arrikto Inc. All Rights Reserved. 3 4 """Kale SDK. 5 6 This script creates a PyTorch distributed KFP step to solve Fashion MNIST. 7 """ 8 9 - import time 10 import torch 11 import torch.nn as nn 12 import torchvision.models as models 13-13 13 14 from torchvision import datasets 15 from torchvision.transforms import ToTensor 16 from torch.utils.data import DataLoader 17 - from rok_kubernetes.exceptions import ApiException 18 19 from kale.sdk import pipeline, step 20 from kale.distributed import pytorch 21-103 21 22 23 def _train_step(model, data_loader, criterion, optimizer, device, args): 24 import logging 25 26 log = logging.getLogger(__name__) 27 28 for i, (features, labels) in enumerate(data_loader): 29 features = features.to(device) 30 labels = labels.to(device) 31 32 pred = model(features) 33 loss = criterion(pred, labels.reshape(-1)) 34 35 loss.backward() 36 37 optimizer.step() 38 optimizer.zero_grad() 39 40 if i % args.get("log_interval", 2) == 0: 41 log.info("\tLoss = {}".format(loss.item())) 42 43 return loss 44 45 46 def _eval_step(model, data_loader, criterion, device, args): 47 import logging 48 49 log = logging.getLogger(__name__) 50 51 for i, (features, labels) in enumerate(data_loader): 52 features = features.to(device) 53 labels = labels.to(device) 54 55 pred = model(features) 56 loss = criterion(pred, labels.reshape(-1)) 57 58 log.info("\tLoss = {}".format(loss.item())) 59 60 return loss 61 62 63 def _load_data(): 64 """Load and prepare the data.""" 65 train_data = datasets.FashionMNIST( 66 root="/home/jovyan/data", 67 train=True, 68 download=True, 69 transform=ToTensor() 70 ) 71 72 valid_data = datasets.FashionMNIST( 73 root="/home/jovyan/data", 74 train=False, 75 download=True, 76 transform=ToTensor() 77 ) 78 79 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True) 80 valid_data_loader = DataLoader(valid_data, batch_size=2014) 81 82 return train_data_loader, valid_data_loader 83 84 85 @step(name="create_model") 86 def create_model(): 87 """Define the model.""" 88 model = models.resnet18(pretrained=True) 89 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), 90 padding=(3, 3), bias=False) 91 92 for param in model.parameters(): 93 param.requires_grad = False 94 95 features_in = model.fc.in_features 96 model.fc = nn.Linear(features_in, 10) 97 98 optimizer = torch.optim.Adam(model.parameters(), lr=.03) 99 criterion = nn.CrossEntropyLoss() 100 101 return model, optimizer, criterion 102 103 104 @step(name="model_training") 105 def distribute_training(model, optimizer, criterion): 106 """Train the model.""" 107 + distributed_config = {"limits": {"cpu": "1", "memory": "4Gi"}, 108 + "requests": {"cpu": "100m", "memory": "3Gi"}, 109 + "run_policy": {"cleanPodPolicy": "All"}} 110 + 111 train_data_loader, valid_data_loader = _load_data() 112 job = pytorch.distribute(model, 113 train_data_loader, 114-118 114 criterion, 115 optimizer, 116 _train_step, 117 eval_data_loader=valid_data_loader, 118 eval_step=_eval_step, 119 epochs=1, 120 number_of_processes=2, 121 cuda=True, 122 - train_args={"log_interval": 2}) 123 + train_args={"log_interval": 2}, 124 + config=distributed_config) 125 126 return job.name 127 128-129 128 129 @step(name="monitor") 130 def monitor(name): 131 """Monitor the PyTorchJob CR.""" 132 job = pytorch.PyTorchJob(name) 133 - while True: # Iterate if streaming logs raises an ApiException 134 - while True: 135 - cr = job.get() 136 - if (job.get_job_status() not in ["", "Created", "Restarting"] 137 - and (cr.get("status", {}) 138 - .get("replicaStatuses", {}) 139 - .get("Master", {}))): 140 - break 141 - print("Job pending...") 142 - time.sleep(2) 143 - try: 144 - job.stream_logs() 145 - break 146 - except ApiException as e: 147 - print("Streaming the logs failed with: %s" % str(e)) 148 - print("Retrying...") 149 + while job.get_job_status() != "Running": 150 + continue 151 + job.stream_logs() 152 153 154 @pipeline(name="fmnist", experiment="kale-dist") 155-160 155 def ml_pipeline(): 156 """Run the ML pipeline.""" 157 model, optimizer, criterion = create_model() 158 name = distribute_training(model, optimizer, criterion) 159 monitor(name) 160 161 162 if __name__ == "__main__": 163 ml_pipeline() In this example, you are setting the
requests
andlimits
for every container that is part of the distributed training process. Furthermore, by setting thecleanPodPolicy
toAll
, the controller will delete every pod when the training job is complete.Important
The container-level options that you set in the configuration object, such as
env
,labels
,annotations
,limits
,requests
, etc., are propagated to every container that is part of the distributed training process. Thus, the master and every worker pod will have the same container-level options.Note
To find out about all available configurations you can specify, take a look at the DistributedConfig API.
Optional
Produce a workflow YAML file that you can inspect:
$ python3 -m kale kale_dist.py --compileDeploy and run your code as a KFP pipeline:
$ python3 -m kale kale_dist.py --kfpNote
To see the complete list of arguments and their respective usage, run
python3 -m kale --help
.
Summary¶
You have successfully configured and run a PyTorch distributed step on Kubeflow, using Kale and the Kubeflow training operator.
What’s Next¶
The next step is to schedule the value of the learning rate parameter in a PyTorch distributed KFP step using the Kale SDK.