Create Custom Container Steps¶
The Kale SDK allows you to define custom containers and use them as actual pipeline steps with inputs and outputs. This guide will walk you through creating pipelines that execute both function-based and container-based steps using the Kale SDK.
Overview
What You’ll Need¶
- An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
- An understanding of how the Kale SDK works.
Procedure¶
Choose one of the following options based on how you are using your custom container step. If you plan to use plain values as inputs or outputs for it, choose Plain values. If you plan to pass large data objects, choose Data files.
Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Connect to the server, open a terminal, create a new python file and name it
container_pipeline.py
:$ touch container_pipeline.pyCopy and paste the following code inside
container_pipeline.py
:container_pipeline.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script creates a pipeline that generates and adds 2 random integers and 6 prints the result. 7 """ 8 9 from random import randint 10 11 from kale import ContainerStep 12 from kale.sdk import step, pipeline 13 14 15 @step(name="gen_random_ints") 16 def random_ints(): 17 """Generate 2 random integers in the range [1,100].""" 18 return randint(1, 100), randint(1, 100) 19 20 21 add = ContainerStep( 22 name="add_random_ints", 23 image="gcr.io/arrikto/example-add-split", 24 command=["/bin/bash", "/add.sh"], 25 args=["{{input.x}}", "{{input.y}}", 26 "{{outfile.res}}"]) 27 28 29 @step(name="print_result") 30 def print_result(x): 31 """Print an input integer.""" 32 print("The result is: %d" % int(x)) 33 34 35 @pipeline(name="add-random-ints", 36 experiment="kale-tutorial") 37 def pipeline_func(): 38 """Run the ML pipeline.""" 39 a, b = random_ints() 40 c = add(x=a, y=b) 41 print_result(c) 42 43 44 if __name__ == "__main__": 45 pipeline_func() In this code example, you define a pipeline with three steps:
- The first step (
gen_random_ints
) is function-based. It generates and returns two random integers. - The second step (
add_random_ints
) is container-based. It adds the two outputs ofgen_random_ints
and returns the result. - The third step (
print_result
) is function-based. It prints the output ofadd_random_ints
.
Notice that the
command
input parameter of theContainerStep
runs a custom bash script which adds two numbers and writes the result in a file. The script expects three arguments: the first two are the numbers to add and the third one is the output file. We pass these arguments using theargs
parameter.The
add_random_ints
step executes the following script:add.sh1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 #!/bin/bash 4 5 sum=$(( $1 + $2 )) 6 mkdir -p $(dirname "$3") 7 echo "$sum" > $3 The output of the step is the contents of this file, that is, the result of the addition. In our pipeline definition, this result is represented by the variable
c
and next steps may consume it.Important
- Use
{{input.<NAME>}}
to declare an input argument for yourContainerStep
. - Use
{{outfile.<NAME>}}
as a file path to write your output data.
- The first step (
In your Notebook server, deploy and run your code as a KFP pipeline:
$ python3 -m kale container_pipeline.py --kfp
In this example, we will use a shared mounted volume to pass large data
objects as inputs and outputs to the ContainerStep
.
Create a new Notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Connect to the server, open a terminal, and install
scikit-learn
:$ pip3 install --user scikit-learn==0.23.0Create a new python file and name it
container_pipeline_file_passing.py
:$ touch container_pipeline_file_passing.pyCopy and paste the following code inside
container_pipeline_file_passing.py
:container_pipeline_file_passing.py1 # Copyright © 2021-2022 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script creates an ML pipeline to split a dataset. 6 """ 7 8 import json 9 10 from sklearn.datasets import load_digits 11 12 from kale import ContainerStep 13 from kale.sdk import step, pipeline 14 15 16 @step(name="load_data") 17 def load_dataset(): 18 """Load the Digits dataset.""" 19 x, y = load_digits(return_X_y=True) 20 filename_x = "/kale_marshal_dir/x.txt" 21 filename_y = "/kale_marshal_dir/y.txt" 22 23 with open(filename_x, "w") as f: 24 json.dump(x.tolist(), f) 25 with open(filename_y, "w") as f: 26 json.dump(y.tolist(), f) 27 28 return filename_x, filename_y 29 30 31 split_dataset = ContainerStep( 32 name="split_dataset", 33 image="gcr.io/arrikto/example-add-split", 34 command=["python", "/split.py"], 35 args=["{{input.x}}", "{{input.y}}", 36 "{{outfile.x_test}}", "{{outfile.y_test}}"], 37 volume_mounts=["/kale_marshal_dir"]) 38 39 40 @step(name="print_data") 41 def print_dataset(x, y): 42 """Print a slice of the Digits dataset.""" 43 with open(x, "r") as f: 44 x_test = json.load(f) 45 46 with open(y, "r") as f: 47 y_test = json.load(f) 48 print("The 1st digit is:", y_test[0]) 49 print("The 1st image array is:", x_test[0]) 50 51 52 @pipeline(name="split-dataset", 53 experiment="kale-tutorial", 54 marshal_volume=True, 55 marshal_path="/kale_marshal_dir") 56 def pipeline_func(): 57 """Run the ML pipeline.""" 58 x, y = load_dataset() 59 x_new, y_new = split_dataset(x, y) 60 print_dataset(x=x_new, y=y_new) 61 62 63 if __name__ == "__main__": 64 pipeline_func() In this code example, we define a pipeline with three steps:
- The first step (
load_dataset
) is function-based. It loads the digits dataset and returns its samples and targets. - The second step (
split_dataset
) is container-based. It splits the dataset into train and test subsets, and it returns the test subset. - The third step (
print_dataset
) is function-based. It prints the first element of the test subset.
Note
Notice how we extend the pipeline to use a marshal volume. We use this volume to save large data objects in files and, then, we pass their filepaths between steps.
The
split_dataset
step executes the following script:split.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """This script takes a dataset as input and splits it.""" 4 5 import os 6 import sys 7 import json 8 9 from sklearn.model_selection import train_test_split 10 11 if __name__ == "__main__": 12 # Load dataset from the input args 13 file_x = sys.argv[1] 14 file_y = sys.argv[2] 15 with open(file_x, "r") as fp: 16 x = json.load(fp) 17 with open(file_y, "r") as fp: 18 y = json.load(fp) 19 print("Dataset read successfully.") 20 21 # Read the file paths to write the outputs 22 x_test_out_filepath = sys.argv[3] 23 y_test_out_filepath = sys.argv[4] 24 25 # Create the parent directories for the output files 26 if not os.path.exists(os.path.dirname(x_test_out_filepath)): 27 os.makedirs(os.path.dirname(x_test_out_filepath)) 28 if not os.path.exists(os.path.dirname(y_test_out_filepath)): 29 os.makedirs(os.path.dirname(y_test_out_filepath)) 30 31 print("Creating a test set...") 32 _, x_test, _, y_test = train_test_split(x, y, test_size=0.6) 33 print("Test dataset created successfully.") 34 35 print("Saving files to marshal directory...") 36 with open("/kale_marshal_dir/x_test.txt", "w") as f: 37 json.dump(x_test, f) 38 with open("/kale_marshal_dir/y_test.txt", "w") as f: 39 json.dump(y_test, f) 40 print("Saved files to marshal directory successfully.") 41 42 print("Writing output files...") 43 with open(x_test_out_filepath, "w") as f: 44 f.write("/kale_marshal_dir/x_test.txt") 45 with open(y_test_out_filepath, "w") as f: 46 f.write("/kale_marshal_dir/y_test.txt") 47 print("Done") - The first step (
In your Notebook server, deploy and run your code as a KFP pipeline:
$ python3 -m kale container_pipeline_file_passing.py --kfp