Create a Pipeline

A Machine Learning (ML) project usually consists of multiple interconnected steps. These steps can be thought of as separate processes, with clearly defined inputs and outputs. Thus, it becomes natural to think of ML projects as workflows or pipelines. This guide will walk you through transforming a Python script into a Kubeflow Pipeline (KFP), and deploy it on Kubeflow using the Kale SDK.

What You’ll Need

  • An Arrikto EKF or MiniKF deployment with the default Kale Docker image.

Procedure

  1. Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, and install scikit-learn:

    $ pip3 install --user scikit-learn==0.23.0
  3. Create a new python file and name it kale_sdk.py:

    $ touch kale_sdk.py
  4. Copy and paste the following code inside kale_sdk.py, or download it:

    starter.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8from sklearn.datasets import make_classification
    9from sklearn.linear_model import LogisticRegression
    10from sklearn.model_selection import train_test_split
    11
    12
    13def load(random_state):
    14 """Create a random dataset for binary classification."""
    15 rs = int(random_state)
    16 x, y = make_classification(random_state=rs)
    17 return x, y
    18
    19
    20def split(x, y):
    21 """Split the data into train and test sets."""
    22 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    23 return x, x_test, y, y_test
    24
    25
    26def train(x, x_test, y, training_iterations):
    27 """Train a Logistic Regression model."""
    28 iters = int(training_iterations)
    29 model = LogisticRegression(max_iter=iters)
    30 model.fit(x, y)
    31 print(model.predict(x_test))
    32
    33
    34def ml_pipeline(rs=42, iters=100):
    35 """Run the ML pipeline."""
    36 x, y = load(rs)
    37 x, x_test, y, y_test = split(x=x, y=y)
    38 train(x, x_test, y, training_iterations=iters)
    39
    40
    41if __name__ == "__main__":
    42 ml_pipeline(rs=42, iters=100)

    In this code sample, you define three functions that could form individual steps in an ML pipeline:

    • The first function (load) creates a random dataset that we can use to simulate a binary classification task.
    • The second function (split) processes the dataset by splitting it into training and test subsets.
    • The third function (train) fits a Logistic Regression model and prints its performance on the test subset.

    An ML engineer would write similar functions, test them separately, and in the end, group them to launch an ML experiment. To this end, you can define a fourth function that calls each step and weaves everything together. This is the ml_pipeline function, the main entry point.

    Finally, we add the standard python boilerplate code to parse user arguments and call the ml_pipeline function with those arguments.

  5. Decorate every function that you want to be a step in the pipeline with the step decorator. The following snippet summarizes the changes in the code:

    step.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-4
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8+from kale.sdk import step
    9from sklearn.datasets import make_classification
    10from sklearn.linear_model import LogisticRegression
    11from sklearn.model_selection import train_test_split
    12
    13
    14+@step(name="data_loading")
    15def load(random_state):
    16 """Create a random dataset for binary classification."""
    17 rs = int(random_state)
    18-18
    18 x, y = make_classification(random_state=rs)
    19 return x, y
    20
    21
    22+@step(name="data_split")
    23def split(x, y):
    24 """Split the data into train and test sets."""
    25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    26 return x, x_test, y, y_test
    27
    28
    29+@step(name="model_training")
    30def train(x, x_test, y, training_iterations):
    31 """Train a Logistic Regression model."""
    32 iters = int(training_iterations)
    33-43
    33 model = LogisticRegression(max_iter=iters)
    34 model.fit(x, y)
    35 print(model.predict(x_test))
    36
    37
    38def ml_pipeline(rs=42, iters=100):
    39 """Run the ML pipeline."""
    40 x, y = load(rs)
    41 x, x_test, y, y_test = split(x=x, y=y)
    42 train(x, x_test, y, training_iterations=iters)
    43
    44
    45if __name__ == "__main__":
    46 ml_pipeline(rs=42, iters=100)
  6. Decorate the main entry point function with the Kale pipeline decorator. The following snippet summarizes the changes in code:

    sdk.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-4
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8-from kale.sdk import step
    9+from kale.sdk import pipeline, step
    10from sklearn.datasets import make_classification
    11from sklearn.linear_model import LogisticRegression
    12from sklearn.model_selection import train_test_split
    13-35
    13
    14
    15@step(name="data_loading")
    16def load(random_state):
    17 """Create a random dataset for binary classification."""
    18 rs = int(random_state)
    19 x, y = make_classification(random_state=rs)
    20 return x, y
    21
    22
    23@step(name="data_split")
    24def split(x, y):
    25 """Split the data into train and test sets."""
    26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    27 return x, x_test, y, y_test
    28
    29
    30@step(name="model_training")
    31def train(x, x_test, y, training_iterations):
    32 """Train a Logistic Regression model."""
    33 iters = int(training_iterations)
    34 model = LogisticRegression(max_iter=iters)
    35 model.fit(x, y)
    36 print(model.predict(x_test))
    37
    38
    39+@pipeline(name="binary-classification", experiment="kale-tutorial")
    40def ml_pipeline(rs=42, iters=100):
    41 """Run the ML pipeline."""
    42 x, y = load(rs)
    43-45
    43 x, x_test, y, y_test = split(x=x, y=y)
    44 train(x, x_test, y, training_iterations=iters)
    45
    46
    47if __name__ == "__main__":
    48 ml_pipeline(rs=42, iters=100)

    Kale can infer the pipeline structure by examining the order of the function calls inside the pipeline decorated function. Thus, the step that runs the load function will run first, followed by the step that runs the split function. Finally, the step that trains the model will complete the KFP run.

    Important

    Kale now supports type hints in step definitions and pipeline parameters:

    In the future, the above pipeline will not work. Kale will consider steps that don’t declare types for their outputs to have no outputs.

  7. Run the script locally to test whether your code runs successfully using Kale’s marshalling mechanism:

    $ python3 -m kale kale_sdk.py
  8. Optional

    Produce a workflow YAML file that you can inspect:

    $ python3 -m kale kale_sdk.py --compile

    After the successful execution of this command, look for the workflow YAML file inside a .kale directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).

  9. Optional

    Upload a KFP pipeline definition that you can inspect from the KFP UI:

    $ python3 -m kale kale_sdk.py --upload

    This command compiles your pipeline (includes --compile action) and uploads it to your private namespace that only you (and the ones who have access to this namespace) can see. If you want to share your pipeline among all namespaces and make it visible to everyone in the cluster use the below command:

    $ python3 -m kale kale_sdk.py --upload --shared-pipeline
  10. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kale_sdk.py --kfp

    Note

    This command includes both --compile and --upload actions. That is, Kale first compiles and uploads your pipeline to KFP and then runs it. The KFP pipeline will always run in your private namespace. This is also where Kale will upload your pipeline definition by default before running it. If you want to share your pipeline definition run: python3 -m kale kale_sdk.py --kfp --shared-pipeline

    This command will upload a shared pipeline definition and then run your pipeline in your private namespace.

    Note

    To see the complete list of arguments and their respective usage, run python3 -m kale --help.

Important

The steps in the pipeline above do not declare the types of their inputs. In the future, such pipelines may stop working. To see how you can declare the types of step and pipeline parameters, head over to:

Summary

You have successfully transformed a simple Python script into an ML pipeline and deployed it on Kubeflow.

What’s Next

The next step is to understand how Kale uses volumes to marshal data