Create a Pipeline¶
A Machine Learning (ML) project usually consists of multiple interconnected steps. These steps can be thought of as separate processes, with clearly defined inputs and outputs. Thus, it becomes natural to think of ML projects as workflows or pipelines. This guide will walk you through transforming a Python script into a Kubeflow Pipeline (KFP), and deploy it on Kubeflow using the Kale SDK.
Overview
What You'll Need¶
- An EKF or MiniKF deployment with the default Kale Docker image.
Procedure¶
Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py36:<IMAGE_TAG>
Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Connect to the server, open a terminal, and install
scikit-learn
:$ pip3 install --user scikit-learn==0.23.0
Create a new python file and name it
kale_sdk.py
:$ touch kale_sdk.py
Copy and paste the following code inside
kale_sdk.py
, or download it:starter.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from sklearn.datasets import make_classification 9 from sklearn.linear_model import LogisticRegression 10 from sklearn.model_selection import train_test_split 11 12 13 def load(random_state): 14 """Create a random dataset for binary classification.""" 15 rs = int(random_state) 16 x, y = make_classification(random_state=rs) 17 return x, y 18 19 20 def split(x, y): 21 """Split the data into train and test sets.""" 22 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 23 return x, x_test, y, y_test 24 25 26 def train(x, x_test, y, training_iterations): 27 """Train a Logistic Regression model.""" 28 iters = int(training_iterations) 29 model = LogisticRegression(max_iter=iters) 30 model.fit(x, y) 31 print(model.predict(x_test)) 32 33 34 def ml_pipeline(rs=42, iters=100): 35 """Run the ML pipeline.""" 36 x, y = load(rs) 37 x, x_test, y, y_test = split(x, y) 38 train(x, x_test, y, iters) 39 40 41 if __name__ == "__main__": 42 ml_pipeline(rs=42, iters=100) In this code sample, you define three functions that could form individual steps in an ML pipeline:
- The first function (
load
) creates a random dataset that we can use to simulate a binary classification task. - The second function (
split
) processes the dataset by splitting it into training and test subsets. - The third function (
train
) fits a Logistic Regression model and prints its performance on the test subset.
An ML engineer would write similar functions, test them separately, and in the end, group them to launch an ML experiment. To this end, you can define a fourth function that calls each step and weaves everything together. This is the
ml_pipeline
function, the main entry point.Finally, we add the standard python boilerplate code to parse user arguments and call the
ml_pipeline
function with those arguments.- The first function (
Decorate every function that you want to be a step in the pipeline with the
step
decorator. The following snippet summarizes the changes in the code:step.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-4 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 + from kale.sdk import step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 + @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18-18 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 + @step(name="data_split") 23 def split(x, y): 24 """Split the data into train and test sets.""" 25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 26 return x, x_test, y, y_test 27 28 29 + @step(name="model_training") 30 def train(x, x_test, y, training_iterations): 31 """Train a Logistic Regression model.""" 32 iters = int(training_iterations) 33-43 33 model = LogisticRegression(max_iter=iters) 34 model.fit(x, y) 35 print(model.predict(x_test)) 36 37 38 def ml_pipeline(rs=42, iters=100): 39 """Run the ML pipeline.""" 40 x, y = load(rs) 41 x, x_test, y, y_test = split(x, y) 42 train(x, x_test, y, iters) 43 44 45 if __name__ == "__main__": 46 ml_pipeline(rs=42, iters=100) Decorate the main entry point function with the Kale
pipeline
decorator. The following snippet summarizes the changes in code:sdk.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-4 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 - from kale.sdk import step 9 + from kale.sdk import pipeline, step 10 from sklearn.datasets import make_classification 11 from sklearn.linear_model import LogisticRegression 12 from sklearn.model_selection import train_test_split 13-35 13 14 15 @step(name="data_loading") 16 def load(random_state): 17 """Create a random dataset for binary classification.""" 18 rs = int(random_state) 19 x, y = make_classification(random_state=rs) 20 return x, y 21 22 23 @step(name="data_split") 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 @step(name="model_training") 31 def train(x, x_test, y, training_iterations): 32 """Train a Logistic Regression model.""" 33 iters = int(training_iterations) 34 model = LogisticRegression(max_iter=iters) 35 model.fit(x, y) 36 print(model.predict(x_test)) 37 38 39 + @pipeline(name="binary-classification", experiment="kale-tutorial") 40 def ml_pipeline(rs=42, iters=100): 41 """Run the ML pipeline.""" 42 x, y = load(rs) 43-45 43 x, x_test, y, y_test = split(x, y) 44 train(x, x_test, y, iters) 45 46 47 if __name__ == "__main__": 48 ml_pipeline(rs=42, iters=100) Kale can infer the pipeline structure by examining the order of the function calls inside the
pipeline
decorated function. Thus, the step that runs theload
function will run first, followed by the step that runs thesplit
function. Finally, the step that trains the model will complete the KFP run.Note
Kale does not support calling step functions using keyword arguments. At the moment, you can only use positional arguments. This is a temporary limitation that Arrikto will address in the near future.
Run the script locally to test whether your code runs successfully using Kale's marshalling mechanism:
$ python3 -m kale kale_sdk.py
(Optional) Produce a workflow YAML file that you can inspect:
$ python3 -m kale kale_sdk.py --compile
After the successful execution of this command, look for the workflow YAML file inside a
.kale
directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).Deploy and run your code as a KFP pipeline:
$ python3 -m kale kale_sdk.py --kfp
Note
To see the complete list of arguments and their respective usage, run
python3 -m kale --help
.
Note
Casting the variables into the correct data type inside the functions is a temporary limitation. This will be addressed in future versions of the Kale SDK.
Summary¶
You have successfully transformed a simple Python script into an ML pipeline and deployed it on Kubeflow.
What’s Next¶
The next step is to understand how Kale uses volumes to marshal data