Set Kubernetes Configurations¶
This guide will walk you through setting Kubernetes metadata and spec details for the steps of your pipeline.
Overview
What You’ll Need¶
- An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
- An understanding of how the Kale SDK works.
Procedure¶
Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Connect to the server, open a terminal, and install
scikit-learn
:$ pip3 install --user scikit-learn==0.23.0Create a new python file and name it
kale_k8sconfig.py
:$ touch kale_k8sconfig.pyCopy and paste the following code inside
kale_k8sconfig.py
:sdk.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split") 23 def split(x, y): 24 """Split the data into train and test sets.""" 25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 26 return x, x_test, y, y_test 27 28 29 @step(name="model_training") 30 def train(x, x_test, y, training_iterations): 31 """Train a Logistic Regression model.""" 32 iters = int(training_iterations) 33 model = LogisticRegression(max_iter=iters) 34 model.fit(x, y) 35 print(model.predict(x_test)) 36 37 38 @pipeline(name="binary-classification", experiment="kale-tutorial") 39 def ml_pipeline(rs=42, iters=100): 40 """Run the ML pipeline.""" 41 x, y = load(rs) 42 x, x_test, y, y_test = split(x=x, y=y) 43 train(x, x_test, y, training_iterations=iters) 44 45 46 if __name__ == "__main__": 47 ml_pipeline(rs=42, iters=100) In this code sample, you start with a standard Python script that trains a Logistic Regression model. Moreover, you have decorated the functions using the Kale SDK. To read more about how to create this file, head to the corresponding Kale SDK user guide.
Extend the
data_split
step declaration and use thedeploy_config
argument to enforcelimits
on it. Kale will instruct Kubernetes to apply theselimits
on the Pod that runs this step. The following snippet summarizes the changes in code:limits.py1 - # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 + # Copyright © 2021 Arrikto Inc. All Rights Reserved. 3 4 """Kale SDK. 5 6-19 6 This script trains an ML pipeline to solve a binary classification task. 7 """ 8 9 from kale.sdk import pipeline, step 10 from sklearn.datasets import make_classification 11 from sklearn.linear_model import LogisticRegression 12 from sklearn.model_selection import train_test_split 13 14 15 @step(name="data_loading") 16 def load(random_state): 17 """Create a random dataset for binary classification.""" 18 rs = int(random_state) 19 x, y = make_classification(random_state=rs) 20 return x, y 21 22 23 - @step(name="data_split") 24 + @step(name="data_split", 25 + deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 26 def split(x, y): 27 """Split the data into train and test sets.""" 28 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 29-41 29 return x, x_test, y, y_test 30 31 32 @step(name="model_training") 33 def train(x, x_test, y, training_iterations): 34 """Train a Logistic Regression model.""" 35 iters = int(training_iterations) 36 model = LogisticRegression(max_iter=iters) 37 model.fit(x, y) 38 print(model.predict(x_test)) 39 40 41 @pipeline(name="binary-classification", experiment="kale-tutorial") 42 def ml_pipeline(rs=42, iters=100): 43 """Run the ML pipeline.""" 44 x, y = load(rs) 45 - x, x_test, y, y_test = split(x=x, y=y) 46 - train(x, x_test, y, training_iterations=iters) 47 + x, x_test, y, y_test = split(x, y) 48 + train(x, x_test, y, iters) 49 50 51 if __name__ == "__main__": 52 ml_pipeline(rs=42, iters=100) Note
In the
deploy_config
argument you can either pass aDeployConfig
object or a dictionary for Kale to construct aDeployConfig
object.Note
Check out the DeployConfig API to find all the available configurations.
Extend the
model_training
step declaration and use thedeploy_config
argument to setlabels
on it. Kale will instruct Kubernetes to apply theselabels
on the Pod that runs this step. The following snippet summarizes the changes in code:labels.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-26 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split", 23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 - @step(name="model_training") 31 + @step(name="model_training", 32 + deploy_config={"labels": {"training-step": "true"}}) 33 def train(x, x_test, y, training_iterations): 34 """Train a Logistic Regression model.""" 35 iters = int(training_iterations) 36-47 36 model = LogisticRegression(max_iter=iters) 37 model.fit(x, y) 38 print(model.predict(x_test)) 39 40 41 @pipeline(name="binary-classification", experiment="kale-tutorial") 42 def ml_pipeline(rs=42, iters=100): 43 """Run the ML pipeline.""" 44 x, y = load(rs) 45 x, x_test, y, y_test = split(x, y) 46 train(x, x_test, y, iters) 47 48 49 if __name__ == "__main__": 50 ml_pipeline(rs=42, iters=100) You can also specify Kubernetes configurations to be applied on all steps using the
deploy_config
argument in thepipeline
decorator. Extend the pipeline declaration to set theMY_POD_IP
environment variable on all steps, which will be assigned with the IP of their running pod. The following snippet summarizes the changes in code:env.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-36 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split", 23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 @step(name="model_training", 31 deploy_config={"labels": {"training-step": "true"}}) 32 def train(x, x_test, y, training_iterations): 33 """Train a Logistic Regression model.""" 34 iters = int(training_iterations) 35 model = LogisticRegression(max_iter=iters) 36 model.fit(x, y) 37 print(model.predict(x_test)) 38 39 40 - @pipeline(name="binary-classification", experiment="kale-tutorial") 41 + pod_ip_env = {"name": "MY_POD_IP", 42 + "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}} 43 + 44 + 45 + @pipeline(name="binary-classification", experiment="kale-tutorial", 46 + deploy_config={"env": [pod_ip_env]}) 47 def ml_pipeline(rs=42, iters=100): 48 """Run the ML pipeline.""" 49 x, y = load(rs) 50-52 50 x, x_test, y, y_test = split(x, y) 51 train(x, x_test, y, iters) 52 53 54 if __name__ == "__main__": 55 ml_pipeline(rs=42, iters=100) Note
Same as the
step
decorator, in thedeploy_config
argument you can either pass aDeployConfig
object or a dictionary for Kale to construct aDeployConfig
object.Note
Kale tries to merge the pipeline-level
DeployConfig
with the one of each step, giving precedence to the individual step configuration.Kale merges the dictionary attributes, such as
limits
,requests
, andannotations
. In other cases, Kale will use the pipeline-level configuration if the corresponding step-level one evaluates as empty.Create a function that prints the contents of the environment variable
MY_POD_IP
. The step runs in parallel to the other ones and proves that the environment variable gets set. The following snippet summarizes the changes in code:printenv.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-36 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split", 23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 @step(name="model_training", 31 deploy_config={"labels": {"training-step": "true"}}) 32 def train(x, x_test, y, training_iterations): 33 """Train a Logistic Regression model.""" 34 iters = int(training_iterations) 35 model = LogisticRegression(max_iter=iters) 36 model.fit(x, y) 37 print(model.predict(x_test)) 38 39 40 + @step(name="print_ip") 41 + def show_ip(): 42 + """Print the IP provided by 'MY_POD_IP' env var.""" 43 + import os 44 + print("My IP is: %s" % os.getenv("MY_POD_IP", "'MY_POD_IP' IS NOT SET")) 45 + 46 + 47 pod_ip_env = {"name": "MY_POD_IP", 48 "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}} 49 50-54 50 51 @pipeline(name="binary-classification", experiment="kale-tutorial", 52 deploy_config={"env": [pod_ip_env]}) 53 def ml_pipeline(rs=42, iters=100): 54 """Run the ML pipeline.""" 55 x, y = load(rs) 56 x, x_test, y, y_test = split(x, y) 57 train(x, x_test, y, iters) 58 + show_ip() 59 60 61 if __name__ == "__main__": 62 ml_pipeline(rs=42, iters=100) Run the script locally to test whether your code runs successfully using Kale’s marshalling mechanism:
$ python3 -m kale kale_k8sconfig.pyNote
Since these configurations affect the Kubernetes resources that are created during the pipeline execution, we expect the
print_ip
step to display'MY_POD_IP' IS NOT SET
:... [INFO] Running step 'print_ip'... ... [INFO] ---------- Running step 'print_ip'... ---------- ... [INFO] Starting timeout. User code TTL set to 0.0 seconds. My IP is: 'MY_POD_IP' IS NOT SET ... [INFO] User code executed successfully. ... [INFO] ---------- Successfully ran step 'print_ip' ---------- ... [INFO] This step has no artifacts(Optional) Produce a workflow YAML file that you can inspect:
$ python3 -m kale kale_k8sconfig.py --compileAfter the successful execution of this command, look for the workflow YAML file inside a
.kale
directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).Deploy and run your code as a KFP pipeline:
$ python3 -m kale kale_k8sconfig.py --kfpNote
Inspecting the logs of the
print_ip
step you should notice a line similar to the following:My IP is: 172.17.0.85Note
To see the complete list of arguments and their respective usage run
python3 -m kale --help
.