Create a Pipeline¶
A Machine Learning (ML) project usually consists of multiple interconnected steps. These steps can be thought of as separate processes, with clearly defined inputs and outputs. Thus, it becomes natural to think of ML projects as workflows or pipelines. This guide will walk you through transforming a Python script into a Kubeflow Pipeline (KFP), and deploy it on Kubeflow using the Kale SDK.
Overview
What You’ll Need¶
- An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
Procedure¶
Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Connect to the server, open a terminal, and install
scikit-learn
:$ pip3 install --user scikit-learn==0.23.0Create a new python file and name it
kale_sdk.py
:$ touch kale_sdk.pyCopy and paste the following code inside
kale_sdk.py
, or download it:starter.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from sklearn.datasets import make_classification 9 from sklearn.linear_model import LogisticRegression 10 from sklearn.model_selection import train_test_split 11 12 13 def load(random_state): 14 """Create a random dataset for binary classification.""" 15 rs = int(random_state) 16 x, y = make_classification(random_state=rs) 17 return x, y 18 19 20 def split(x, y): 21 """Split the data into train and test sets.""" 22 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 23 return x, x_test, y, y_test 24 25 26 def train(x, x_test, y, training_iterations): 27 """Train a Logistic Regression model.""" 28 iters = int(training_iterations) 29 model = LogisticRegression(max_iter=iters) 30 model.fit(x, y) 31 print(model.predict(x_test)) 32 33 34 def ml_pipeline(rs=42, iters=100): 35 """Run the ML pipeline.""" 36 x, y = load(rs) 37 x, x_test, y, y_test = split(x=x, y=y) 38 train(x, x_test, y, training_iterations=iters) 39 40 41 if __name__ == "__main__": 42 ml_pipeline(rs=42, iters=100) In this code sample, you define three functions that could form individual steps in an ML pipeline:
- The first function (
load
) creates a random dataset that we can use to simulate a binary classification task. - The second function (
split
) processes the dataset by splitting it into training and test subsets. - The third function (
train
) fits a Logistic Regression model and prints its performance on the test subset.
An ML engineer would write similar functions, test them separately, and in the end, group them to launch an ML experiment. To this end, you can define a fourth function that calls each step and weaves everything together. This is the
ml_pipeline
function, the main entry point.Finally, we add the standard python boilerplate code to parse user arguments and call the
ml_pipeline
function with those arguments.- The first function (
Decorate every function that you want to be a step in the pipeline with the
step
decorator. The following snippet summarizes the changes in the code:step.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-4 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 + from kale.sdk import step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 + @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18-18 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 + @step(name="data_split") 23 def split(x, y): 24 """Split the data into train and test sets.""" 25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 26 return x, x_test, y, y_test 27 28 29 + @step(name="model_training") 30 def train(x, x_test, y, training_iterations): 31 """Train a Logistic Regression model.""" 32 iters = int(training_iterations) 33-43 33 model = LogisticRegression(max_iter=iters) 34 model.fit(x, y) 35 print(model.predict(x_test)) 36 37 38 def ml_pipeline(rs=42, iters=100): 39 """Run the ML pipeline.""" 40 x, y = load(rs) 41 x, x_test, y, y_test = split(x=x, y=y) 42 train(x, x_test, y, training_iterations=iters) 43 44 45 if __name__ == "__main__": 46 ml_pipeline(rs=42, iters=100) Decorate the main entry point function with the Kale
pipeline
decorator. The following snippet summarizes the changes in code:sdk.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-4 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 - from kale.sdk import step 9 + from kale.sdk import pipeline, step 10 from sklearn.datasets import make_classification 11 from sklearn.linear_model import LogisticRegression 12 from sklearn.model_selection import train_test_split 13-35 13 14 15 @step(name="data_loading") 16 def load(random_state): 17 """Create a random dataset for binary classification.""" 18 rs = int(random_state) 19 x, y = make_classification(random_state=rs) 20 return x, y 21 22 23 @step(name="data_split") 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 @step(name="model_training") 31 def train(x, x_test, y, training_iterations): 32 """Train a Logistic Regression model.""" 33 iters = int(training_iterations) 34 model = LogisticRegression(max_iter=iters) 35 model.fit(x, y) 36 print(model.predict(x_test)) 37 38 39 + @pipeline(name="binary-classification", experiment="kale-tutorial") 40 def ml_pipeline(rs=42, iters=100): 41 """Run the ML pipeline.""" 42 x, y = load(rs) 43-45 43 x, x_test, y, y_test = split(x=x, y=y) 44 train(x, x_test, y, training_iterations=iters) 45 46 47 if __name__ == "__main__": 48 ml_pipeline(rs=42, iters=100) Kale can infer the pipeline structure by examining the order of the function calls inside the
pipeline
decorated function. Thus, the step that runs theload
function will run first, followed by the step that runs thesplit
function. Finally, the step that trains the model will complete the KFP run.Important
Kale now supports type hints in step definitions and pipeline parameters:
In the future, the above pipeline will not work. Kale will consider steps that don’t declare types for their outputs to have no outputs.
Run the script locally to test whether your code runs successfully using Kale’s marshalling mechanism:
$ python3 -m kale kale_sdk.pyOptional
Produce a workflow YAML file that you can inspect:
$ python3 -m kale kale_sdk.py --compileAfter the successful execution of this command, look for the workflow YAML file inside a
.kale
directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).Optional
Upload a KFP pipeline definition that you can inspect from the KFP UI:
$ python3 -m kale kale_sdk.py --uploadThis command compiles your pipeline (includes
--compile
action) and uploads it to your private namespace that only you (and the ones who have access to this namespace) can see. If you want to share your pipeline among all namespaces and make it visible to everyone in the cluster use the below command:$ python3 -m kale kale_sdk.py --upload --shared-pipelineDeploy and run your code as a KFP pipeline:
$ python3 -m kale kale_sdk.py --kfpNote
This command includes both
--compile
and--upload
actions. That is, Kale first compiles and uploads your pipeline to KFP and then runs it. The KFP pipeline will always run in your private namespace. This is also where Kale will upload your pipeline definition by default before running it. If you want to share your pipeline definition run:python3 -m kale kale_sdk.py --kfp --shared-pipeline
This command will upload a shared pipeline definition and then run your pipeline in your private namespace.
Note
To see the complete list of arguments and their respective usage, run
python3 -m kale --help
.
Important
The steps in the pipeline above do not declare the types of their inputs. In the future, such pipelines may stop working. To see how you can declare the types of step and pipeline parameters, head over to:
Summary¶
You have successfully transformed a simple Python script into an ML pipeline and deployed it on Kubeflow.
What’s Next¶
The next step is to understand how Kale uses volumes to marshal data