Run Steps in Parallel¶
This guide will walk you through creating a diamond-shaped pipeline that executes two steps in parallel using the Kale SDK.
Overview
What You'll Need¶
- An EKF or MiniKF deployment with the default Kale Docker image.
- An understanding of how Kale SDK works.
Procedure¶
Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py36:<IMAGE_TAG>
Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Connect to the server, open a terminal, and install
scikit-learn
:$ pip3 install --user scikit-learn==0.23.0
Create a new python file and name it
kale_parallel.py
:$ touch kale_parallel.py
Copy and paste the following code inside
kale_parallel.py
:sdk.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split") 23 def split(x, y): 24 """Split the data into train and test sets.""" 25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 26 return x, x_test, y, y_test 27 28 29 @step(name="model_training") 30 def train(x, x_test, y, training_iterations): 31 """Train a Logistic Regression model.""" 32 iters = int(training_iterations) 33 model = LogisticRegression(max_iter=iters) 34 model.fit(x, y) 35 print(model.predict(x_test)) 36 37 38 @pipeline(name="binary-classification", experiment="kale-tutorial") 39 def ml_pipeline(rs=42, iters=100): 40 """Run the ML pipeline.""" 41 x, y = load(rs) 42 x, x_test, y, y_test = split(x, y) 43 train(x, x_test, y, iters) 44 45 46 if __name__ == "__main__": 47 ml_pipeline(rs=42, iters=100) In this code sample, you start with a standard Python script that trains a Logistic Regression model. Moreover, you have decorated the functions using the Kale SDK. To read more about how to create this file, head to the corresponding Kale SDK user guide.
Add another function that trains a Random Forest ensemble and call it with the right arguments inside the
ml_pipeline
function. The following snippet summarizes the changes in code:parallel.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-6 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 + from sklearn.ensemble import RandomForestClassifier 11 from sklearn.linear_model import LogisticRegression 12 from sklearn.model_selection import train_test_split 13 14-26 14 15 @step(name="data_loading") 16 def load(random_state): 17 """Create a random dataset for binary classification.""" 18 rs = int(random_state) 19 x, y = make_classification(random_state=rs) 20 return x, y 21 22 23 @step(name="data_split") 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 - @step(name="model_training") 31 - def train(x, x_test, y, training_iterations): 32 + @step(name="random_forest") 33 + def train_random_forest(x, x_test, y, n_estimators): 34 + """Train a Random Forest ensemble model.""" 35 + n_estimators = int(n_estimators) 36 + model = RandomForestClassifier(n_estimators=n_estimators) 37 + model.fit(x, y) 38 + print(model.predict(x_test)) 39 + 40 + 41 + @step(name="logistic_regression") 42 + def train_logistic_regression(x, x_test, y, training_iterations): 43 """Train a Logistic Regression model.""" 44 iters = int(training_iterations) 45 model = LogisticRegression(max_iter=iters) 46-46 46 model.fit(x, y) 47 print(model.predict(x_test)) 48 49 50 - @pipeline(name="binary-classification", experiment="kale-tutorial") 51 - def ml_pipeline(rs=42, iters=100): 52 + @pipeline(name="parallel-steps", experiment="kale-tutorial") 53 + def ml_pipeline(rs=42, iters=42, n_estimators=100): 54 """Run the ML pipeline.""" 55 x, y = load(rs) 56 x, x_test, y, y_test = split(x, y) 57 - train(x, x_test, y, iters) 58 + train_logistic_regression(x, x_test, y, iters) 59 + train_random_forest(x, x_test, y, n_estimators) 60 61 62 if __name__ == "__main__": 63 - ml_pipeline(rs=42, iters=100) 64 + ml_pipeline(rs=42, iters=100, n_estimators=100) This Python script trains two models, a Logistic Regression model and a Random Forest ensemble on a binary classification task. Kale will detect that the two training functions have the same dependencies, and thus, it can execute them in parallel.
Run the script locally to test whether your code runs successfully using Kale's marshalling mechanism:
$ python3 -m kale kale_parallel.py
(Optional) Produce a workflow YAML file that you can inspect:
$ python3 -m kale kale_parallel.py --compile
After the successful execution of this command, look for the workflow YAML file inside a
.kale
directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).Deploy and run your code as a KFP pipeline:
$ python3 -m kale kale_parallel.py --kfp
Note
To see the complete list of arguments and their respective usage run
python3 -m kale --help
.
Summary¶
You have successfully created a diamond-shaped pipeline that executes two training steps in parallel.
What’s Next¶
The next step is to create pipelines with container-based steps.