Run Steps in Parallel

This guide will walk you through creating a diamond-shaped pipeline that executes two steps in parallel using the Kale SDK.

What You'll Need

  • An EKF or MiniKF deployment with the default Kale Docker image.
  • An understanding of how Kale SDK works.

Procedure

  1. Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py36:<IMAGE_TAG>
    

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, and install scikit-learn:

    $ pip3 install --user scikit-learn==0.23.0
    
  3. Create a new python file and name it kale_parallel.py:

    $ touch kale_parallel.py
    
  4. Copy and paste the following code inside kale_parallel.py:

    sdk.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8from kale.sdk import pipeline, step
    9from sklearn.datasets import make_classification
    10from sklearn.linear_model import LogisticRegression
    11from sklearn.model_selection import train_test_split
    12
    13
    14@step(name="data_loading")
    15def load(random_state):
    16 """Create a random dataset for binary classification."""
    17 rs = int(random_state)
    18 x, y = make_classification(random_state=rs)
    19 return x, y
    20
    21
    22@step(name="data_split")
    23def split(x, y):
    24 """Split the data into train and test sets."""
    25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    26 return x, x_test, y, y_test
    27
    28
    29@step(name="model_training")
    30def train(x, x_test, y, training_iterations):
    31 """Train a Logistic Regression model."""
    32 iters = int(training_iterations)
    33 model = LogisticRegression(max_iter=iters)
    34 model.fit(x, y)
    35 print(model.predict(x_test))
    36
    37
    38@pipeline(name="binary-classification", experiment="kale-tutorial")
    39def ml_pipeline(rs=42, iters=100):
    40 """Run the ML pipeline."""
    41 x, y = load(rs)
    42 x, x_test, y, y_test = split(x, y)
    43 train(x, x_test, y, iters)
    44
    45
    46if __name__ == "__main__":
    47 ml_pipeline(rs=42, iters=100)

    In this code sample, you start with a standard Python script that trains a Logistic Regression model. Moreover, you have decorated the functions using the Kale SDK. To read more about how to create this file, head to the corresponding Kale SDK user guide.

  5. Add another function that trains a Random Forest ensemble and call it with the right arguments inside the ml_pipeline function. The following snippet summarizes the changes in code:

    parallel.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-6
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8from kale.sdk import pipeline, step
    9from sklearn.datasets import make_classification
    10+from sklearn.ensemble import RandomForestClassifier
    11from sklearn.linear_model import LogisticRegression
    12from sklearn.model_selection import train_test_split
    13
    14-26
    14
    15@step(name="data_loading")
    16def load(random_state):
    17 """Create a random dataset for binary classification."""
    18 rs = int(random_state)
    19 x, y = make_classification(random_state=rs)
    20 return x, y
    21
    22
    23@step(name="data_split")
    24def split(x, y):
    25 """Split the data into train and test sets."""
    26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    27 return x, x_test, y, y_test
    28
    29
    30-@step(name="model_training")
    31-def train(x, x_test, y, training_iterations):
    32+@step(name="random_forest")
    33+def train_random_forest(x, x_test, y, n_estimators):
    34+ """Train a Random Forest ensemble model."""
    35+ n_estimators = int(n_estimators)
    36+ model = RandomForestClassifier(n_estimators=n_estimators)
    37+ model.fit(x, y)
    38+ print(model.predict(x_test))
    39+
    40+
    41+@step(name="logistic_regression")
    42+def train_logistic_regression(x, x_test, y, training_iterations):
    43 """Train a Logistic Regression model."""
    44 iters = int(training_iterations)
    45 model = LogisticRegression(max_iter=iters)
    46-46
    46 model.fit(x, y)
    47 print(model.predict(x_test))
    48
    49
    50-@pipeline(name="binary-classification", experiment="kale-tutorial")
    51-def ml_pipeline(rs=42, iters=100):
    52+@pipeline(name="parallel-steps", experiment="kale-tutorial")
    53+def ml_pipeline(rs=42, iters=42, n_estimators=100):
    54 """Run the ML pipeline."""
    55 x, y = load(rs)
    56 x, x_test, y, y_test = split(x, y)
    57- train(x, x_test, y, iters)
    58+ train_logistic_regression(x, x_test, y, iters)
    59+ train_random_forest(x, x_test, y, n_estimators)
    60
    61
    62if __name__ == "__main__":
    63- ml_pipeline(rs=42, iters=100)
    64+ ml_pipeline(rs=42, iters=100, n_estimators=100)

    This Python script trains two models, a Logistic Regression model and a Random Forest ensemble on a binary classification task. Kale will detect that the two training functions have the same dependencies, and thus, it can execute them in parallel.

  6. Run the script locally to test whether your code runs successfully using Kale's marshalling mechanism:

    $ python3 -m kale kale_parallel.py
    
  7. (Optional) Produce a workflow YAML file that you can inspect:

    $ python3 -m kale kale_parallel.py --compile
    

    After the successful execution of this command, look for the workflow YAML file inside a .kale directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).

  8. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kale_parallel.py --kfp
    

    Note

    To see the complete list of arguments and their respective usage run python3 -m kale --help.

Summary

You have successfully created a diamond-shaped pipeline that executes two training steps in parallel.

What’s Next

The next step is to create pipelines with container-based steps.