Run steps in parallel

This guide will walk you through creating a diamond-shaped pipeline that executes two steps in parallel using the Kale SDK.

What You’ll Need

  • An EKF or MiniKF deployment with the default Kale Docker image.
  • An understanding of how Kale SDK works.

Procedure

  1. Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py36:<IMAGE_TAG>
    

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, and install scikit-learn:

    $ pip3 install --user scikit-learn==0.23.0
    
  3. Create a new python file and name it kale_parallel.py:

    $ touch kale_parallel.py
    
  4. Copy and paste the following code inside kale_parallel.py:

    # Copyright © 2021 Arrikto Inc.  All Rights Reserved.
    
    """Kale SDK.
    
    This script trains an ML pipeline to solve a binary classification task.
    """
    
    from kale.sdk import pipeline, step
    from sklearn.datasets import make_classification
    from sklearn.linear_model import LogisticRegression
    from sklearn.model_selection import train_test_split
    
    
    @step(name="data_loading")
    def load(random_state):
        """Create a random dataset for binary classification."""
        rs = int(random_state)
        x, y = make_classification(random_state=rs)
        return x, y
    
    
    @step(name="data_split")
    def split(x, y):
        """Split the data into train and test sets."""
        x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
        return x, x_test, y, y_test
    
    
    @step(name="model_training")
    def train(x, x_test, y, training_iterations):
        """Train a Logistic Regression model."""
        iters = int(training_iterations)
        model = LogisticRegression(max_iter=iters)
        model.fit(x, y)
        print(model.predict(x_test))
    
    
    @pipeline(name="binary-classification", experiment="kale-tutorial")
    def ml_pipeline(rs=42, iters=100):
        """Run the ML pipeline."""
        x, y = load(rs)
        x, x_test, y, y_test = split(x, y)
        train(x, x_test, y, iters)
    
    
    if __name__ == "__main__":
        ml_pipeline(rs=42, iters=100)
    

    Alternatively, download the kale_parallel_starter_code.py Python file.

    In this code sample, you start with a standard Python script that trains a Logistic Regression model. Moreover, you have decorated the functions using the Kale SDK. To read more about how to create this file, head to the corresponding Kale SDK user guide.

  5. Add another function that trains a Random Forest ensemble and call it with the right arguments inside the ml_pipeline function. The following snippet summarizes the changes in code:

    --- examples/sdk/sdk.py
    +++ examples/parallel.py
    @@ -7,6 +7,7 @@
     
     from kale.sdk import pipeline, step
     from sklearn.datasets import make_classification
    +from sklearn.ensemble import RandomForestClassifier
     from sklearn.linear_model import LogisticRegression
     from sklearn.model_selection import train_test_split
     
    @@ -26,8 +27,17 @@
         return x, x_test, y, y_test
     
     
    -@step(name="model_training")
    -def train(x, x_test, y, training_iterations):
    +@step(name="random_forest")
    +def train_random_forest(x, x_test, y, n_estimators):
    +    """Train a Random Forest ensemble model."""
    +    n_estimators = int(n_estimators)
    +    model = RandomForestClassifier(n_estimators=n_estimators)
    +    model.fit(x, y)
    +    print(model.predict(x_test))
    +
    +
    +@step(name="logistic_regression")
    +def train_logistic_regression(x, x_test, y, training_iterations):
         """Train a Logistic Regression model."""
         iters = int(training_iterations)
         model = LogisticRegression(max_iter=iters)
    @@ -35,13 +45,14 @@
         print(model.predict(x_test))
     
     
    -@pipeline(name="binary-classification", experiment="kale-tutorial")
    -def ml_pipeline(rs=42, iters=100):
    +@pipeline(name="parallel-steps", experiment="kale-tutorial")
    +def ml_pipeline(rs=42, iters=42, n_estimators=100):
         """Run the ML pipeline."""
         x, y = load(rs)
         x, x_test, y, y_test = split(x, y)
    -    train(x, x_test, y, iters)
    +    train_logistic_regression(x, x_test, y, iters)
    +    train_random_forest(x, x_test, y, n_estimators)
     
     
     if __name__ == "__main__":
    -    ml_pipeline(rs=42, iters=100)
    +    ml_pipeline(rs=42, iters=100, n_estimators=100)
    

    Copy the resulting code below or download the kale_parallel.py Python file.

    This Python script trains two models, a Logistic Regression model and a Random Forest ensemble on a binary classification task. Kale will detect that the two training functions have the same dependencies, and thus, it can execute them in parallel.

  6. Run the script locally to test whether your code runs successfully using Kale’s marshalling mechanism:

    $ python3 -m kale kale_parallel.py
    
  7. (Optional) Produce a workflow YAML file that you can inspect:

    $ python3 -m kale kale_parallel.py --compile
    

    After the successful execution of this command, look for the workflow YAML file inside a .kale directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).

  8. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kale_parallel.py --kfp
    

    Note

    To see the complete list of arguments and their respective usage run python3 -m kale --help.

Summary

You have successfully created a diamond-shaped pipeline that executes two training steps in parallel.

What’s Next

The next step is to parameterize your pipeline.