Create Custom Container Steps

The Kale SDK allows you to define custom containers and use them as actual pipeline steps with inputs and outputs. This guide will walk you through creating pipelines that execute both function-based and container-based steps using the Kale SDK.

What You’ll Need

  • An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
  • An understanding of how the Kale SDK works.

Procedure

Choose one of the following options based on how you are using your custom container step. If you plan to use plain values as inputs or outputs for it, choose Plain values. If you plan to pass large data objects, choose Data files.

  1. Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, create a new python file and name it container_pipeline.py:

    $ touch container_pipeline.py
  3. Copy and paste the following code inside container_pipeline.py:

    container_pipeline.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script creates a pipeline that generates and adds 2 random integers and
    6prints the result.
    7"""
    8
    9from random import randint
    10
    11from kale import ContainerStep
    12from kale.sdk import step, pipeline
    13
    14
    15@step(name="gen_random_ints")
    16def random_ints():
    17 """Generate 2 random integers in the range [1,100]."""
    18 return randint(1, 100), randint(1, 100)
    19
    20
    21add = ContainerStep(
    22 name="add_random_ints",
    23 image="gcr.io/arrikto/example-add-split",
    24 command=["/bin/bash", "/add.sh"],
    25 args=["{{input.x}}", "{{input.y}}",
    26 "{{outfile.res}}"])
    27
    28
    29@step(name="print_result")
    30def print_result(x):
    31 """Print an input integer."""
    32 print("The result is: %d" % int(x))
    33
    34
    35@pipeline(name="add-random-ints",
    36 experiment="kale-tutorial")
    37def pipeline_func():
    38 """Run the ML pipeline."""
    39 a, b = random_ints()
    40 c = add(x=a, y=b)
    41 print_result(c)
    42
    43
    44if __name__ == "__main__":
    45 pipeline_func()

    In this code example, you define a pipeline with three steps:

    • The first step (gen_random_ints) is function-based. It generates and returns two random integers.
    • The second step (add_random_ints) is container-based. It adds the two outputs of gen_random_ints and returns the result.
    • The third step (print_result) is function-based. It prints the output of add_random_ints.

    Notice that the command input parameter of the ContainerStep runs a custom bash script which adds two numbers and writes the result in a file. The script expects three arguments: the first two are the numbers to add and the third one is the output file. We pass these arguments using the args parameter.

    The add_random_ints step executes the following script:

    add.sh
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3#!/bin/bash
    4
    5sum=$(( $1 + $2 ))
    6mkdir -p $(dirname "$3")
    7echo "$sum" > $3

    The output of the step is the contents of this file, that is, the result of the addition. In our pipeline definition, this result is represented by the variable c and next steps may consume it.

    Important

    • Use {{input.<NAME>}} to declare an input argument for your ContainerStep.
    • Use {{outfile.<NAME>}} as a file path to write your output data.
  4. In your Notebook server, deploy and run your code as a KFP pipeline:

    $ python3 -m kale container_pipeline.py --kfp

In this example, we will use a shared mounted volume to pass large data objects as inputs and outputs to the ContainerStep.

  1. Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, and install scikit-learn:

    $ pip3 install --user scikit-learn==0.23.0
  3. Create a new python file and name it container_pipeline_file_passing.py:

    $ touch container_pipeline_file_passing.py
  4. Copy and paste the following code inside container_pipeline_file_passing.py:

    container_pipeline_file_passing.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script creates an ML pipeline to split a dataset.
    6"""
    7
    8import json
    9
    10from sklearn.datasets import load_digits
    11
    12from kale import ContainerStep
    13from kale.sdk import step, pipeline
    14
    15
    16@step(name="load_data")
    17def load_dataset():
    18 """Load the Digits dataset."""
    19 x, y = load_digits(return_X_y=True)
    20 filename_x = "/kale_marshal_dir/x.txt"
    21 filename_y = "/kale_marshal_dir/y.txt"
    22
    23 with open(filename_x, "w") as f:
    24 json.dump(x.tolist(), f)
    25 with open(filename_y, "w") as f:
    26 json.dump(y.tolist(), f)
    27
    28 return filename_x, filename_y
    29
    30
    31split_dataset = ContainerStep(
    32 name="split_dataset",
    33 image="gcr.io/arrikto/example-add-split",
    34 command=["python", "/split.py"],
    35 args=["{{input.x}}", "{{input.y}}",
    36 "{{outfile.x_test}}", "{{outfile.y_test}}"],
    37 volume_mounts=["/kale_marshal_dir"])
    38
    39
    40@step(name="print_data")
    41def print_dataset(x, y):
    42 """Print a slice of the Digits dataset."""
    43 with open(x, "r") as f:
    44 x_test = json.load(f)
    45
    46 with open(y, "r") as f:
    47 y_test = json.load(f)
    48 print("The 1st digit is:", y_test[0])
    49 print("The 1st image array is:", x_test[0])
    50
    51
    52@pipeline(name="split-dataset",
    53 experiment="kale-tutorial",
    54 marshal_volume=True,
    55 marshal_path="/kale_marshal_dir")
    56def pipeline_func():
    57 """Run the ML pipeline."""
    58 x, y = load_dataset()
    59 x_new, y_new = split_dataset(x, y)
    60 print_dataset(x=x_new, y=y_new)
    61
    62
    63if __name__ == "__main__":
    64 pipeline_func()

    In this code example, we define a pipeline with three steps:

    • The first step (load_dataset) is function-based. It loads the digits dataset and returns its samples and targets.
    • The second step (split_dataset) is container-based. It splits the dataset into train and test subsets, and it returns the test subset.
    • The third step (print_dataset) is function-based. It prints the first element of the test subset.

    Note

    Notice how we extend the pipeline to use a marshal volume. We use this volume to save large data objects in files and, then, we pass their filepaths between steps.

    The split_dataset step executes the following script:

    split.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""This script takes a dataset as input and splits it."""
    4
    5import os
    6import sys
    7import json
    8
    9from sklearn.model_selection import train_test_split
    10
    11if __name__ == "__main__":
    12 # Load dataset from the input args
    13 file_x = sys.argv[1]
    14 file_y = sys.argv[2]
    15 with open(file_x, "r") as fp:
    16 x = json.load(fp)
    17 with open(file_y, "r") as fp:
    18 y = json.load(fp)
    19 print("Dataset read successfully.")
    20
    21 # Read the file paths to write the outputs
    22 x_test_out_filepath = sys.argv[3]
    23 y_test_out_filepath = sys.argv[4]
    24
    25 # Create the parent directories for the output files
    26 if not os.path.exists(os.path.dirname(x_test_out_filepath)):
    27 os.makedirs(os.path.dirname(x_test_out_filepath))
    28 if not os.path.exists(os.path.dirname(y_test_out_filepath)):
    29 os.makedirs(os.path.dirname(y_test_out_filepath))
    30
    31 print("Creating a test set...")
    32 _, x_test, _, y_test = train_test_split(x, y, test_size=0.6)
    33 print("Test dataset created successfully.")
    34
    35 print("Saving files to marshal directory...")
    36 with open("/kale_marshal_dir/x_test.txt", "w") as f:
    37 json.dump(x_test, f)
    38 with open("/kale_marshal_dir/y_test.txt", "w") as f:
    39 json.dump(y_test, f)
    40 print("Saved files to marshal directory successfully.")
    41
    42 print("Writing output files...")
    43 with open(x_test_out_filepath, "w") as f:
    44 f.write("/kale_marshal_dir/x_test.txt")
    45 with open(y_test_out_filepath, "w") as f:
    46 f.write("/kale_marshal_dir/y_test.txt")
    47 print("Done")
  5. In your Notebook server, deploy and run your code as a KFP pipeline:

    $ python3 -m kale container_pipeline_file_passing.py --kfp

Summary

You have successfully created container-based steps and run them in pipelines.

What’s Next

The next step is to parameterize your pipeline.