Example: Create a PyTorch Distributed KFP Step with the Kale SDK

This section will guide you through creating a PyTorch distributed KFP step on Kubeflow, using the Kale SDK and the PyTorch operator.

What You'll Need

  • An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
  • An understanding of how the Kale SDK works.

Procedure

  1. Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py36:<IMAGE_TAG>
    

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, and install the torch and torchvision packages:

    $ pip3 install --user torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html
    

    Note

    The PyTorch version may be different. Please head to the PyTorch website for the latest releases.

    Note

    In this example, you will distribute the training job over multiple GPU devices. Thus, this command installs the CUDA version of the PyTorch package. If you are using CPU-only nodes, you can install the CPU-only version of the PyTorch package instead, and distribute the job over multiple CPU cores. You will see how to achieve that later in the user guide.

  3. Create a new python file and name it kale_dist.py:

    $ touch kale_dist.py
    
  4. Copy and paste the following code inside kale_dist.py:

    starter.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    6"""
    7
    8import torch
    9import torch.nn as nn
    10import torchvision.models as models
    11
    12from torchvision import datasets
    13from torchvision.transforms import ToTensor
    14from torch.utils.data import DataLoader
    15
    16from kale.sdk import pipeline, step
    17
    18
    19def _load_data():
    20 """Load and prepare the data."""
    21 train_data = datasets.FashionMNIST(
    22 root="/home/jovyan/data",
    23 train=True,
    24 download=True,
    25 transform=ToTensor()
    26 )
    27
    28 valid_data = datasets.FashionMNIST(
    29 root="/home/jovyan/data",
    30 train=False,
    31 download=True,
    32 transform=ToTensor()
    33 )
    34
    35 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    36 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    37
    38 return train_data_loader, valid_data_loader
    39
    40
    41@step(name="create_model")
    42def create_model():
    43 """Define the model."""
    44 model = models.resnet18(pretrained=True)
    45 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    46 padding=(3, 3), bias=False)
    47
    48 for param in model.parameters():
    49 param.requires_grad = False
    50
    51 features_in = model.fc.in_features
    52 model.fc = nn.Linear(features_in, 10)
    53
    54 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    55 criterion = nn.CrossEntropyLoss()
    56
    57 return model, optimizer, criterion
    58
    59
    60@pipeline(name="fmnist", experiment="kale-dist")
    61def ml_pipeline():
    62 """Run the ML pipeline."""
    63 model, optimizer, criterion = create_model()
    64
    65
    66if __name__ == "__main__":
    67 ml_pipeline()

    This script creates a pipeline with one step. This step creates a PyTorch model, an optimizer, and a criterion (i.e., loss function).

    Important

    Always use absolute paths when specifying the path to a file or folder. For example, in the _load_data function, we specify where PyTorch should download the FashionMNIST dataset.

  5. Define the training and validation steps. The following snippet summarizes the changes in code:

    steps.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-13
    4
    5This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    6"""
    7
    8import torch
    9import torch.nn as nn
    10import torchvision.models as models
    11
    12from torchvision import datasets
    13from torchvision.transforms import ToTensor
    14from torch.utils.data import DataLoader
    15
    16from kale.sdk import pipeline, step
    17+
    18+
    19+def _train_step(model, data_loader, criterion, optimizer, device, args):
    20+ import logging
    21+
    22+ log = logging.getLogger(__name__)
    23+
    24+ for i, (features, labels) in enumerate(data_loader):
    25+ features = features.to(device)
    26+ labels = labels.to(device)
    27+
    28+ pred = model(features)
    29+ loss = criterion(pred, labels.reshape(-1))
    30+
    31+ loss.backward()
    32+
    33+ optimizer.step()
    34+ optimizer.zero_grad()
    35+
    36+ if i % args.get("log_interval", 2) == 0:
    37+ log.info("\tLoss = {}".format(loss.item()))
    38+
    39+ return loss
    40+
    41+
    42+def _eval_step(model, data_loader, criterion, device, args):
    43+ import logging
    44+
    45+ log = logging.getLogger(__name__)
    46+
    47+ for i, (features, labels) in enumerate(data_loader):
    48+ features = features.to(device)
    49+ labels = labels.to(device)
    50+
    51+ pred = model(features)
    52+ loss = criterion(pred, labels.reshape(-1))
    53+
    54+ log.info("\tLoss = {}".format(loss.item()))
    55+
    56+ return loss
    57
    58
    59def _load_data():
    60-104
    60 """Load and prepare the data."""
    61 train_data = datasets.FashionMNIST(
    62 root="/home/jovyan/data",
    63 train=True,
    64 download=True,
    65 transform=ToTensor()
    66 )
    67
    68 valid_data = datasets.FashionMNIST(
    69 root="/home/jovyan/data",
    70 train=False,
    71 download=True,
    72 transform=ToTensor()
    73 )
    74
    75 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    76 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    77
    78 return train_data_loader, valid_data_loader
    79
    80
    81@step(name="create_model")
    82def create_model():
    83 """Define the model."""
    84 model = models.resnet18(pretrained=True)
    85 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    86 padding=(3, 3), bias=False)
    87
    88 for param in model.parameters():
    89 param.requires_grad = False
    90
    91 features_in = model.fc.in_features
    92 model.fc = nn.Linear(features_in, 10)
    93
    94 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    95 criterion = nn.CrossEntropyLoss()
    96
    97 return model, optimizer, criterion
    98
    99
    100@pipeline(name="fmnist", experiment="kale-dist")
    101def ml_pipeline():
    102 """Run the ML pipeline."""
    103 model, optimizer, criterion = create_model()
    104
    105
    106if __name__ == "__main__":
    107 ml_pipeline()

    The train_step and valid_step functions are user-defined functions that dictate what happens during training. While you should provide a train_step function, the valid_step function is optional but highly recommended.

    Important

    The train_step and valid_step functions must have exactly this signature and they must be standalone functions. This means that you should import all the Python modules the function depends on within its body and you must pass any extra arguments inside the args dictionary.

  6. Distribute the training job over multiple GPU devices. The following snippet summarizes the changes in code:

    pytorchjob.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-13
    4
    5This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    6"""
    7
    8import torch
    9import torch.nn as nn
    10import torchvision.models as models
    11
    12from torchvision import datasets
    13from torchvision.transforms import ToTensor
    14from torch.utils.data import DataLoader
    15
    16from kale.sdk import pipeline, step
    17+from kale.distributed import pytorch
    18
    19
    20def _train_step(model, data_loader, criterion, optimizer, device, args):
    21-97
    21 import logging
    22
    23 log = logging.getLogger(__name__)
    24
    25 for i, (features, labels) in enumerate(data_loader):
    26 features = features.to(device)
    27 labels = labels.to(device)
    28
    29 pred = model(features)
    30 loss = criterion(pred, labels.reshape(-1))
    31
    32 loss.backward()
    33
    34 optimizer.step()
    35 optimizer.zero_grad()
    36
    37 if i % args.get("log_interval", 2) == 0:
    38 log.info("\tLoss = {}".format(loss.item()))
    39
    40 return loss
    41
    42
    43def _eval_step(model, data_loader, criterion, device, args):
    44 import logging
    45
    46 log = logging.getLogger(__name__)
    47
    48 for i, (features, labels) in enumerate(data_loader):
    49 features = features.to(device)
    50 labels = labels.to(device)
    51
    52 pred = model(features)
    53 loss = criterion(pred, labels.reshape(-1))
    54
    55 log.info("\tLoss = {}".format(loss.item()))
    56
    57 return loss
    58
    59
    60def _load_data():
    61 """Load and prepare the data."""
    62 train_data = datasets.FashionMNIST(
    63 root="/home/jovyan/data",
    64 train=True,
    65 download=True,
    66 transform=ToTensor()
    67 )
    68
    69 valid_data = datasets.FashionMNIST(
    70 root="/home/jovyan/data",
    71 train=False,
    72 download=True,
    73 transform=ToTensor()
    74 )
    75
    76 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    77 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    78
    79 return train_data_loader, valid_data_loader
    80
    81
    82@step(name="create_model")
    83def create_model():
    84 """Define the model."""
    85 model = models.resnet18(pretrained=True)
    86 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    87 padding=(3, 3), bias=False)
    88
    89 for param in model.parameters():
    90 param.requires_grad = False
    91
    92 features_in = model.fc.in_features
    93 model.fc = nn.Linear(features_in, 10)
    94
    95 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    96 criterion = nn.CrossEntropyLoss()
    97
    98 return model, optimizer, criterion
    99
    100
    101+@step(name="model_training")
    102+def distribute_training(model, optimizer, criterion):
    103+ """Train the model."""
    104+ train_data_loader, valid_data_loader = _load_data()
    105+ job = pytorch.distribute(model,
    106+ train_data_loader,
    107+ criterion,
    108+ optimizer,
    109+ _train_step,
    110+ eval_data_loader=valid_data_loader,
    111+ eval_step=_eval_step,
    112+ epochs=1,
    113+ number_of_processes=2,
    114+ cuda=True,
    115+ train_args={"log_interval": 2})
    116+
    117+ return job.name
    118+
    119+
    120@pipeline(name="fmnist", experiment="kale-dist")
    121def ml_pipeline():
    122 """Run the ML pipeline."""
    123 model, optimizer, criterion = create_model()
    124+ distribute_training(model, optimizer, criterion)
    125
    126
    127if __name__ == "__main__":
    128 ml_pipeline()

    To find out more about the process the distribute function follows, head to the corresponding section of the PyTorch distributed with Kale guide.

    Important

    You should create the PyTorch DataLoader inside the same step that launches the training job. This is a limitation due to the way Kale handles the serialization of those assets. In future versions we plan to remove this limitation.

    Important

    If you need to use a custom PyTorch Dataset, by subclassing the torch.utils.data.Dataset class, you need to keep your class definition inside this Python script, at module-level scope. This is a limitation due to the way Kale handles the serialization of those assets.

    Note

    If you want to distribute your model across multiple CPU cores, you can set the cuda argument to False. By default, Kale will launch two processes (the minimum number of processes required by the PyTorchJob CR), on two different GPU devices.

  7. Monitor the PyTorchJob CR you submitted. The following snippet summarizes the changes in code:

    final.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-116
    4
    5This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    6"""
    7
    8import torch
    9import torch.nn as nn
    10import torchvision.models as models
    11
    12from torchvision import datasets
    13from torchvision.transforms import ToTensor
    14from torch.utils.data import DataLoader
    15
    16from kale.sdk import pipeline, step
    17from kale.distributed import pytorch
    18
    19
    20def _train_step(model, data_loader, criterion, optimizer, device, args):
    21 import logging
    22
    23 log = logging.getLogger(__name__)
    24
    25 for i, (features, labels) in enumerate(data_loader):
    26 features = features.to(device)
    27 labels = labels.to(device)
    28
    29 pred = model(features)
    30 loss = criterion(pred, labels.reshape(-1))
    31
    32 loss.backward()
    33
    34 optimizer.step()
    35 optimizer.zero_grad()
    36
    37 if i % args.get("log_interval", 2) == 0:
    38 log.info("\tLoss = {}".format(loss.item()))
    39
    40 return loss
    41
    42
    43def _eval_step(model, data_loader, criterion, device, args):
    44 import logging
    45
    46 log = logging.getLogger(__name__)
    47
    48 for i, (features, labels) in enumerate(data_loader):
    49 features = features.to(device)
    50 labels = labels.to(device)
    51
    52 pred = model(features)
    53 loss = criterion(pred, labels.reshape(-1))
    54
    55 log.info("\tLoss = {}".format(loss.item()))
    56
    57 return loss
    58
    59
    60def _load_data():
    61 """Load and prepare the data."""
    62 train_data = datasets.FashionMNIST(
    63 root="/home/jovyan/data",
    64 train=True,
    65 download=True,
    66 transform=ToTensor()
    67 )
    68
    69 valid_data = datasets.FashionMNIST(
    70 root="/home/jovyan/data",
    71 train=False,
    72 download=True,
    73 transform=ToTensor()
    74 )
    75
    76 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    77 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    78
    79 return train_data_loader, valid_data_loader
    80
    81
    82@step(name="create_model")
    83def create_model():
    84 """Define the model."""
    85 model = models.resnet18(pretrained=True)
    86 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    87 padding=(3, 3), bias=False)
    88
    89 for param in model.parameters():
    90 param.requires_grad = False
    91
    92 features_in = model.fc.in_features
    93 model.fc = nn.Linear(features_in, 10)
    94
    95 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    96 criterion = nn.CrossEntropyLoss()
    97
    98 return model, optimizer, criterion
    99
    100
    101@step(name="model_training")
    102def distribute_training(model, optimizer, criterion):
    103 """Train the model."""
    104 train_data_loader, valid_data_loader = _load_data()
    105 job = pytorch.distribute(model,
    106 train_data_loader,
    107 criterion,
    108 optimizer,
    109 _train_step,
    110 eval_data_loader=valid_data_loader,
    111 eval_step=_eval_step,
    112 epochs=1,
    113 number_of_processes=2,
    114 cuda=True,
    115 train_args={"log_interval": 2})
    116
    117 return job.name
    118
    119
    120+@step(name="monitor")
    121+def monitor(name):
    122+ """Monitor the PyTorchJob CR."""
    123+ job = pytorch.PyTorchJob(name)
    124+ while job.get_job_status() != "Running":
    125+ continue
    126+ job.stream_logs()
    127+
    128+
    129@pipeline(name="fmnist", experiment="kale-dist")
    130def ml_pipeline():
    131 """Run the ML pipeline."""
    132 model, optimizer, criterion = create_model()
    133- distribute_training(model, optimizer, criterion)
    134+ name = distribute_training(model, optimizer, criterion)
    135+ monitor(name)
    136
    137
    138if __name__ == "__main__":
    139 ml_pipeline()
  8. (Optional) Delete the PyTorchJob CR after completion. The following snippet summarizes the changes in code:

    delete.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-125
    4
    5This script creates a PyTorch distributed KFP step to solve Fashion MNIST.
    6"""
    7
    8import torch
    9import torch.nn as nn
    10import torchvision.models as models
    11
    12from torchvision import datasets
    13from torchvision.transforms import ToTensor
    14from torch.utils.data import DataLoader
    15
    16from kale.sdk import pipeline, step
    17from kale.distributed import pytorch
    18
    19
    20def _train_step(model, data_loader, criterion, optimizer, device, args):
    21 import logging
    22
    23 log = logging.getLogger(__name__)
    24
    25 for i, (features, labels) in enumerate(data_loader):
    26 features = features.to(device)
    27 labels = labels.to(device)
    28
    29 pred = model(features)
    30 loss = criterion(pred, labels.reshape(-1))
    31
    32 loss.backward()
    33
    34 optimizer.step()
    35 optimizer.zero_grad()
    36
    37 if i % args.get("log_interval", 2) == 0:
    38 log.info("\tLoss = {}".format(loss.item()))
    39
    40 return loss
    41
    42
    43def _eval_step(model, data_loader, criterion, device, args):
    44 import logging
    45
    46 log = logging.getLogger(__name__)
    47
    48 for i, (features, labels) in enumerate(data_loader):
    49 features = features.to(device)
    50 labels = labels.to(device)
    51
    52 pred = model(features)
    53 loss = criterion(pred, labels.reshape(-1))
    54
    55 log.info("\tLoss = {}".format(loss.item()))
    56
    57 return loss
    58
    59
    60def _load_data():
    61 """Load and prepare the data."""
    62 train_data = datasets.FashionMNIST(
    63 root="/home/jovyan/data",
    64 train=True,
    65 download=True,
    66 transform=ToTensor()
    67 )
    68
    69 valid_data = datasets.FashionMNIST(
    70 root="/home/jovyan/data",
    71 train=False,
    72 download=True,
    73 transform=ToTensor()
    74 )
    75
    76 train_data_loader = DataLoader(train_data, batch_size=1024, shuffle=True)
    77 valid_data_loader = DataLoader(valid_data, batch_size=2014)
    78
    79 return train_data_loader, valid_data_loader
    80
    81
    82@step(name="create_model")
    83def create_model():
    84 """Define the model."""
    85 model = models.resnet18(pretrained=True)
    86 model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2),
    87 padding=(3, 3), bias=False)
    88
    89 for param in model.parameters():
    90 param.requires_grad = False
    91
    92 features_in = model.fc.in_features
    93 model.fc = nn.Linear(features_in, 10)
    94
    95 optimizer = torch.optim.Adam(model.parameters(), lr=.03)
    96 criterion = nn.CrossEntropyLoss()
    97
    98 return model, optimizer, criterion
    99
    100
    101@step(name="model_training")
    102def distribute_training(model, optimizer, criterion):
    103 """Train the model."""
    104 train_data_loader, valid_data_loader = _load_data()
    105 job = pytorch.distribute(model,
    106 train_data_loader,
    107 criterion,
    108 optimizer,
    109 _train_step,
    110 eval_data_loader=valid_data_loader,
    111 eval_step=_eval_step,
    112 epochs=1,
    113 number_of_processes=2,
    114 cuda=True,
    115 train_args={"log_interval": 2})
    116
    117 return job.name
    118
    119
    120@step(name="monitor")
    121def monitor(name):
    122 """Monitor the PyTorchJob CR."""
    123 job = pytorch.PyTorchJob(name)
    124 while job.get_job_status() != "Running":
    125 continue
    126 job.stream_logs()
    127
    128
    129+@step(name="delete")
    130+def delete(name):
    131+ """Delete the PyTorchJob CR."""
    132+ job = pytorch.PyTorchJob(name)
    133+ while job.get_job_status() != "Succeeded":
    134+ continue
    135+ job.delete()
    136+
    137+
    138@pipeline(name="fmnist", experiment="kale-dist")
    139def ml_pipeline():
    140 """Run the ML pipeline."""
    141 model, optimizer, criterion = create_model()
    142 name = distribute_training(model, optimizer, criterion)
    143 monitor(name)
    144+ delete(name)
    145
    146
    147if __name__ == "__main__":
    148 ml_pipeline()

    Important

    After the completion of the training process the controller will not remove the resources it creates. If you do not want to leave stale resources you have to manually delete the CR. However, please note that deleting the CR will delete the PVC(s) attached to the pods running the processes as well. So, you will not have access to the resulting saved model. In future versions of Kale we plan to make this configurable and also log the resulting trained model to MLMD.

  9. Run the script locally to test whether your code runs successfully using Kale's marshalling mechanism.

    $ python3 -m kale kale_dist.py
    
  10. (Optional) Produce a workflow YAML file that you can inspect:

    $ python3 -m kale kale_dist.py --compile
    

    After the successful execution of this command, look for the workflow YAML file inside a .kale directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its user interface (KFP UI).

  11. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kale_dist.py --kfp
    

    Note

    To see the complete list of arguments and their respective usage, run python3 -m kale --help.

Summary

You have successfully created a pipeline with a step that runs a PyTorch distributed process, using Kale and the Kubeflow PyTorch operator.

What's Next

Check out the rest of the Kale user guides.