Set Kubernetes Configurations¶
This guide will walk you through setting Kubernetes metadata and spec details for the steps of your pipeline.
Overview
What You’ll Need¶
- An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
- An understanding of how the Kale SDK works.
Procedure¶
Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>varies based on the MiniKF or EKF release.Connect to the server, open a terminal, and install
scikit-learn:$ pip3 install --user scikit-learn==0.23.0Create a new python file and name it
kale_k8sconfig.py:$ touch kale_k8sconfig.pyCopy and paste the following code inside
kale_k8sconfig.py:sdk.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split") 23 def split(x, y): 24 """Split the data into train and test sets.""" 25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 26 return x, x_test, y, y_test 27 28 29 @step(name="model_training") 30 def train(x, x_test, y, training_iterations): 31 """Train a Logistic Regression model.""" 32 iters = int(training_iterations) 33 model = LogisticRegression(max_iter=iters) 34 model.fit(x, y) 35 print(model.predict(x_test)) 36 37 38 @pipeline(name="binary-classification", experiment="kale-tutorial") 39 def ml_pipeline(rs=42, iters=100): 40 """Run the ML pipeline.""" 41 x, y = load(rs) 42 x, x_test, y, y_test = split(x=x, y=y) 43 train(x, x_test, y, training_iterations=iters) 44 45 46 if __name__ == "__main__": 47 ml_pipeline(rs=42, iters=100) In this code sample, you start with a standard Python script that trains a Logistic Regression model. Moreover, you have decorated the functions using the Kale SDK. To read more about how to create this file, head to the corresponding Kale SDK user guide.
Extend the
data_splitstep declaration and use thedeploy_configargument to enforcelimitson it. Kale will instruct Kubernetes to apply theselimitson the Pod that runs this step. The following snippet summarizes the changes in code:limits.py1 - # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 + # Copyright © 2021 Arrikto Inc. All Rights Reserved. 3 4 """Kale SDK. 5 6-19 6 This script trains an ML pipeline to solve a binary classification task. 7 """ 8 9 from kale.sdk import pipeline, step 10 from sklearn.datasets import make_classification 11 from sklearn.linear_model import LogisticRegression 12 from sklearn.model_selection import train_test_split 13 14 15 @step(name="data_loading") 16 def load(random_state): 17 """Create a random dataset for binary classification.""" 18 rs = int(random_state) 19 x, y = make_classification(random_state=rs) 20 return x, y 21 22 23 - @step(name="data_split") 24 + @step(name="data_split", 25 + deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 26 def split(x, y): 27 """Split the data into train and test sets.""" 28 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 29-41 29 return x, x_test, y, y_test 30 31 32 @step(name="model_training") 33 def train(x, x_test, y, training_iterations): 34 """Train a Logistic Regression model.""" 35 iters = int(training_iterations) 36 model = LogisticRegression(max_iter=iters) 37 model.fit(x, y) 38 print(model.predict(x_test)) 39 40 41 @pipeline(name="binary-classification", experiment="kale-tutorial") 42 def ml_pipeline(rs=42, iters=100): 43 """Run the ML pipeline.""" 44 x, y = load(rs) 45 - x, x_test, y, y_test = split(x=x, y=y) 46 - train(x, x_test, y, training_iterations=iters) 47 + x, x_test, y, y_test = split(x, y) 48 + train(x, x_test, y, iters) 49 50 51 if __name__ == "__main__": 52 ml_pipeline(rs=42, iters=100) Note
In the
deploy_configargument you can either pass aDeployConfigobject or a dictionary for Kale to construct aDeployConfigobject.Note
Check out the DeployConfig API to find all the available configurations.
Extend the
model_trainingstep declaration and use thedeploy_configargument to setlabelson it. Kale will instruct Kubernetes to apply theselabelson the Pod that runs this step. The following snippet summarizes the changes in code:labels.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-26 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split", 23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 - @step(name="model_training") 31 + @step(name="model_training", 32 + deploy_config={"labels": {"training-step": "true"}}) 33 def train(x, x_test, y, training_iterations): 34 """Train a Logistic Regression model.""" 35 iters = int(training_iterations) 36-47 36 model = LogisticRegression(max_iter=iters) 37 model.fit(x, y) 38 print(model.predict(x_test)) 39 40 41 @pipeline(name="binary-classification", experiment="kale-tutorial") 42 def ml_pipeline(rs=42, iters=100): 43 """Run the ML pipeline.""" 44 x, y = load(rs) 45 x, x_test, y, y_test = split(x, y) 46 train(x, x_test, y, iters) 47 48 49 if __name__ == "__main__": 50 ml_pipeline(rs=42, iters=100) You can also specify Kubernetes configurations to be applied on all steps using the
deploy_configargument in thepipelinedecorator. Extend the pipeline declaration to set theMY_POD_IPenvironment variable on all steps, which will be assigned with the IP of their running pod. The following snippet summarizes the changes in code:env.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-36 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split", 23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 @step(name="model_training", 31 deploy_config={"labels": {"training-step": "true"}}) 32 def train(x, x_test, y, training_iterations): 33 """Train a Logistic Regression model.""" 34 iters = int(training_iterations) 35 model = LogisticRegression(max_iter=iters) 36 model.fit(x, y) 37 print(model.predict(x_test)) 38 39 40 - @pipeline(name="binary-classification", experiment="kale-tutorial") 41 + pod_ip_env = {"name": "MY_POD_IP", 42 + "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}} 43 + 44 + 45 + @pipeline(name="binary-classification", experiment="kale-tutorial", 46 + deploy_config={"env": [pod_ip_env]}) 47 def ml_pipeline(rs=42, iters=100): 48 """Run the ML pipeline.""" 49 x, y = load(rs) 50-52 50 x, x_test, y, y_test = split(x, y) 51 train(x, x_test, y, iters) 52 53 54 if __name__ == "__main__": 55 ml_pipeline(rs=42, iters=100) Note
Same as the
stepdecorator, in thedeploy_configargument you can either pass aDeployConfigobject or a dictionary for Kale to construct aDeployConfigobject.Note
Kale tries to merge the pipeline-level
DeployConfigwith the one of each step, giving precedence to the individual step configuration.Kale merges the dictionary attributes, such as
limits,requests, andannotations. In other cases, Kale will use the pipeline-level configuration if the corresponding step-level one evaluates as empty.Create a function that prints the contents of the environment variable
MY_POD_IP. The step runs in parallel to the other ones and proves that the environment variable gets set. The following snippet summarizes the changes in code:printenv.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-36 4 5 This script trains an ML pipeline to solve a binary classification task. 6 """ 7 8 from kale.sdk import pipeline, step 9 from sklearn.datasets import make_classification 10 from sklearn.linear_model import LogisticRegression 11 from sklearn.model_selection import train_test_split 12 13 14 @step(name="data_loading") 15 def load(random_state): 16 """Create a random dataset for binary classification.""" 17 rs = int(random_state) 18 x, y = make_classification(random_state=rs) 19 return x, y 20 21 22 @step(name="data_split", 23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}}) 24 def split(x, y): 25 """Split the data into train and test sets.""" 26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1) 27 return x, x_test, y, y_test 28 29 30 @step(name="model_training", 31 deploy_config={"labels": {"training-step": "true"}}) 32 def train(x, x_test, y, training_iterations): 33 """Train a Logistic Regression model.""" 34 iters = int(training_iterations) 35 model = LogisticRegression(max_iter=iters) 36 model.fit(x, y) 37 print(model.predict(x_test)) 38 39 40 + @step(name="print_ip") 41 + def show_ip(): 42 + """Print the IP provided by 'MY_POD_IP' env var.""" 43 + import os 44 + print("My IP is: %s" % os.getenv("MY_POD_IP", "'MY_POD_IP' IS NOT SET")) 45 + 46 + 47 pod_ip_env = {"name": "MY_POD_IP", 48 "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}} 49 50-54 50 51 @pipeline(name="binary-classification", experiment="kale-tutorial", 52 deploy_config={"env": [pod_ip_env]}) 53 def ml_pipeline(rs=42, iters=100): 54 """Run the ML pipeline.""" 55 x, y = load(rs) 56 x, x_test, y, y_test = split(x, y) 57 train(x, x_test, y, iters) 58 + show_ip() 59 60 61 if __name__ == "__main__": 62 ml_pipeline(rs=42, iters=100) Run the script locally to test whether your code runs successfully using Kale’s marshalling mechanism:
$ python3 -m kale kale_k8sconfig.pyNote
Since these configurations affect the Kubernetes resources that are created during the pipeline execution, we expect the
print_ipstep to display'MY_POD_IP' IS NOT SET:... [INFO] Running step 'print_ip'... ... [INFO] ---------- Running step 'print_ip'... ---------- ... [INFO] Starting timeout. User code TTL set to 0.0 seconds. My IP is: 'MY_POD_IP' IS NOT SET ... [INFO] User code executed successfully. ... [INFO] ---------- Successfully ran step 'print_ip' ---------- ... [INFO] This step has no artifacts(Optional) Produce a workflow YAML file that you can inspect:
$ python3 -m kale kale_k8sconfig.py --compileAfter the successful execution of this command, look for the workflow YAML file inside a
.kaledirectory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).Deploy and run your code as a KFP pipeline:
$ python3 -m kale kale_k8sconfig.py --kfpNote
Inspecting the logs of the
print_ipstep you should notice a line similar to the following:My IP is: 172.17.0.85Note
To see the complete list of arguments and their respective usage run
python3 -m kale --help.