Set Kubernetes Configurations

This guide will walk you through setting Kubernetes metadata and spec details for the steps of your pipeline.

What You’ll Need

  • An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
  • An understanding of how the Kale SDK works.

Procedure

  1. Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, and install scikit-learn:

    $ pip3 install --user scikit-learn==0.23.0
  3. Create a new python file and name it kale_k8sconfig.py:

    $ touch kale_k8sconfig.py
  4. Copy and paste the following code inside kale_k8sconfig.py:

    sdk.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8from kale.sdk import pipeline, step
    9from sklearn.datasets import make_classification
    10from sklearn.linear_model import LogisticRegression
    11from sklearn.model_selection import train_test_split
    12
    13
    14@step(name="data_loading")
    15def load(random_state):
    16 """Create a random dataset for binary classification."""
    17 rs = int(random_state)
    18 x, y = make_classification(random_state=rs)
    19 return x, y
    20
    21
    22@step(name="data_split")
    23def split(x, y):
    24 """Split the data into train and test sets."""
    25 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    26 return x, x_test, y, y_test
    27
    28
    29@step(name="model_training")
    30def train(x, x_test, y, training_iterations):
    31 """Train a Logistic Regression model."""
    32 iters = int(training_iterations)
    33 model = LogisticRegression(max_iter=iters)
    34 model.fit(x, y)
    35 print(model.predict(x_test))
    36
    37
    38@pipeline(name="binary-classification", experiment="kale-tutorial")
    39def ml_pipeline(rs=42, iters=100):
    40 """Run the ML pipeline."""
    41 x, y = load(rs)
    42 x, x_test, y, y_test = split(x=x, y=y)
    43 train(x, x_test, y, training_iterations=iters)
    44
    45
    46if __name__ == "__main__":
    47 ml_pipeline(rs=42, iters=100)

    In this code sample, you start with a standard Python script that trains a Logistic Regression model. Moreover, you have decorated the functions using the Kale SDK. To read more about how to create this file, head to the corresponding Kale SDK user guide.

  5. Extend the data_split step declaration and use the deploy_config argument to enforce limits on it. Kale will instruct Kubernetes to apply these limits on the Pod that runs this step. The following snippet summarizes the changes in code:

    limits.py
    1-# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2+# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    3
    4"""Kale SDK.
    5
    6-19
    6This script trains an ML pipeline to solve a binary classification task.
    7"""
    8
    9from kale.sdk import pipeline, step
    10from sklearn.datasets import make_classification
    11from sklearn.linear_model import LogisticRegression
    12from sklearn.model_selection import train_test_split
    13
    14
    15@step(name="data_loading")
    16def load(random_state):
    17 """Create a random dataset for binary classification."""
    18 rs = int(random_state)
    19 x, y = make_classification(random_state=rs)
    20 return x, y
    21
    22
    23-@step(name="data_split")
    24+@step(name="data_split",
    25+ deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}})
    26def split(x, y):
    27 """Split the data into train and test sets."""
    28 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    29-41
    29 return x, x_test, y, y_test
    30
    31
    32@step(name="model_training")
    33def train(x, x_test, y, training_iterations):
    34 """Train a Logistic Regression model."""
    35 iters = int(training_iterations)
    36 model = LogisticRegression(max_iter=iters)
    37 model.fit(x, y)
    38 print(model.predict(x_test))
    39
    40
    41@pipeline(name="binary-classification", experiment="kale-tutorial")
    42def ml_pipeline(rs=42, iters=100):
    43 """Run the ML pipeline."""
    44 x, y = load(rs)
    45- x, x_test, y, y_test = split(x=x, y=y)
    46- train(x, x_test, y, training_iterations=iters)
    47+ x, x_test, y, y_test = split(x, y)
    48+ train(x, x_test, y, iters)
    49
    50
    51if __name__ == "__main__":
    52 ml_pipeline(rs=42, iters=100)

    Note

    In the deploy_config argument you can either pass a DeployConfig object or a dictionary for Kale to construct a DeployConfig object.

    Note

    Check out the DeployConfig API to find all the available configurations.

  6. Extend the model_training step declaration and use the deploy_config argument to set labels on it. Kale will instruct Kubernetes to apply these labels on the Pod that runs this step. The following snippet summarizes the changes in code:

    labels.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-26
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8from kale.sdk import pipeline, step
    9from sklearn.datasets import make_classification
    10from sklearn.linear_model import LogisticRegression
    11from sklearn.model_selection import train_test_split
    12
    13
    14@step(name="data_loading")
    15def load(random_state):
    16 """Create a random dataset for binary classification."""
    17 rs = int(random_state)
    18 x, y = make_classification(random_state=rs)
    19 return x, y
    20
    21
    22@step(name="data_split",
    23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}})
    24def split(x, y):
    25 """Split the data into train and test sets."""
    26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    27 return x, x_test, y, y_test
    28
    29
    30-@step(name="model_training")
    31+@step(name="model_training",
    32+ deploy_config={"labels": {"training-step": "true"}})
    33def train(x, x_test, y, training_iterations):
    34 """Train a Logistic Regression model."""
    35 iters = int(training_iterations)
    36-47
    36 model = LogisticRegression(max_iter=iters)
    37 model.fit(x, y)
    38 print(model.predict(x_test))
    39
    40
    41@pipeline(name="binary-classification", experiment="kale-tutorial")
    42def ml_pipeline(rs=42, iters=100):
    43 """Run the ML pipeline."""
    44 x, y = load(rs)
    45 x, x_test, y, y_test = split(x, y)
    46 train(x, x_test, y, iters)
    47
    48
    49if __name__ == "__main__":
    50 ml_pipeline(rs=42, iters=100)
  7. You can also specify Kubernetes configurations to be applied on all steps using the deploy_config argument in the pipeline decorator. Extend the pipeline declaration to set the MY_POD_IP environment variable on all steps, which will be assigned with the IP of their running pod. The following snippet summarizes the changes in code:

    env.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-36
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8from kale.sdk import pipeline, step
    9from sklearn.datasets import make_classification
    10from sklearn.linear_model import LogisticRegression
    11from sklearn.model_selection import train_test_split
    12
    13
    14@step(name="data_loading")
    15def load(random_state):
    16 """Create a random dataset for binary classification."""
    17 rs = int(random_state)
    18 x, y = make_classification(random_state=rs)
    19 return x, y
    20
    21
    22@step(name="data_split",
    23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}})
    24def split(x, y):
    25 """Split the data into train and test sets."""
    26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    27 return x, x_test, y, y_test
    28
    29
    30@step(name="model_training",
    31 deploy_config={"labels": {"training-step": "true"}})
    32def train(x, x_test, y, training_iterations):
    33 """Train a Logistic Regression model."""
    34 iters = int(training_iterations)
    35 model = LogisticRegression(max_iter=iters)
    36 model.fit(x, y)
    37 print(model.predict(x_test))
    38
    39
    40-@pipeline(name="binary-classification", experiment="kale-tutorial")
    41+pod_ip_env = {"name": "MY_POD_IP",
    42+ "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}}
    43+
    44+
    45+@pipeline(name="binary-classification", experiment="kale-tutorial",
    46+ deploy_config={"env": [pod_ip_env]})
    47def ml_pipeline(rs=42, iters=100):
    48 """Run the ML pipeline."""
    49 x, y = load(rs)
    50-52
    50 x, x_test, y, y_test = split(x, y)
    51 train(x, x_test, y, iters)
    52
    53
    54if __name__ == "__main__":
    55 ml_pipeline(rs=42, iters=100)

    Note

    Same as the step decorator, in the deploy_config argument you can either pass a DeployConfig object or a dictionary for Kale to construct a DeployConfig object.

    Note

    Kale tries to merge the pipeline-level DeployConfig with the one of each step, giving precedence to the individual step configuration.

    Kale merges the dictionary attributes, such as limits, requests, and annotations. In other cases, Kale will use the pipeline-level configuration if the corresponding step-level one evaluates as empty.

  8. Create a function that prints the contents of the environment variable MY_POD_IP. The step runs in parallel to the other ones and proves that the environment variable gets set. The following snippet summarizes the changes in code:

    printenv.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-36
    4
    5This script trains an ML pipeline to solve a binary classification task.
    6"""
    7
    8from kale.sdk import pipeline, step
    9from sklearn.datasets import make_classification
    10from sklearn.linear_model import LogisticRegression
    11from sklearn.model_selection import train_test_split
    12
    13
    14@step(name="data_loading")
    15def load(random_state):
    16 """Create a random dataset for binary classification."""
    17 rs = int(random_state)
    18 x, y = make_classification(random_state=rs)
    19 return x, y
    20
    21
    22@step(name="data_split",
    23 deploy_config={"limits": {"cpu": "100m", "memory": "1Gi"}})
    24def split(x, y):
    25 """Split the data into train and test sets."""
    26 x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
    27 return x, x_test, y, y_test
    28
    29
    30@step(name="model_training",
    31 deploy_config={"labels": {"training-step": "true"}})
    32def train(x, x_test, y, training_iterations):
    33 """Train a Logistic Regression model."""
    34 iters = int(training_iterations)
    35 model = LogisticRegression(max_iter=iters)
    36 model.fit(x, y)
    37 print(model.predict(x_test))
    38
    39
    40+@step(name="print_ip")
    41+def show_ip():
    42+ """Print the IP provided by 'MY_POD_IP' env var."""
    43+ import os
    44+ print("My IP is: %s" % os.getenv("MY_POD_IP", "'MY_POD_IP' IS NOT SET"))
    45+
    46+
    47pod_ip_env = {"name": "MY_POD_IP",
    48 "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}}
    49
    50-54
    50
    51@pipeline(name="binary-classification", experiment="kale-tutorial",
    52 deploy_config={"env": [pod_ip_env]})
    53def ml_pipeline(rs=42, iters=100):
    54 """Run the ML pipeline."""
    55 x, y = load(rs)
    56 x, x_test, y, y_test = split(x, y)
    57 train(x, x_test, y, iters)
    58+ show_ip()
    59
    60
    61if __name__ == "__main__":
    62 ml_pipeline(rs=42, iters=100)
  9. Run the script locally to test whether your code runs successfully using Kale’s marshalling mechanism:

    $ python3 -m kale kale_k8sconfig.py

    Note

    Since these configurations affect the Kubernetes resources that are created during the pipeline execution, we expect the print_ip step to display 'MY_POD_IP' IS NOT SET:

    ... [INFO] Running step 'print_ip'... ... [INFO] ---------- Running step 'print_ip'... ---------- ... [INFO] Starting timeout. User code TTL set to 0.0 seconds. My IP is: 'MY_POD_IP' IS NOT SET ... [INFO] User code executed successfully. ... [INFO] ---------- Successfully ran step 'print_ip' ---------- ... [INFO] This step has no artifacts
  10. (Optional) Produce a workflow YAML file that you can inspect:

    $ python3 -m kale kale_k8sconfig.py --compile

    After the successful execution of this command, look for the workflow YAML file inside a .kale directory inside your working directory. This is a file that you could upload and submit to Kubeflow manually through its User Interface (KFP UI).

  11. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale kale_k8sconfig.py --kfp

    Note

    Inspecting the logs of the print_ip step you should notice a line similar to the following:

    My IP is: 172.17.0.85

    Note

    To see the complete list of arguments and their respective usage run python3 -m kale --help.

Summary

You have successfully configured Kubernetes metadata and spec for your steps both individually and in a pipeline-wide manner.

What’s Next

Check out the rest of the Kale user guides.