Create Parallel Steps with Loops¶
This guide will walk you through creating Kale pipelines with loops that create parallel steps in bulk.
Overview
What You’ll Need¶
- An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
- An understanding of how the Kale SDK works.
Procedure¶
Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:
gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>Note
The
<IMAGE_TAG>
varies based on the MiniKF or EKF release.Create a new Python file and name it
loop.py
:$ touch loop.pyCopy and paste the following code inside
loop.py
:loop.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4 5 This script defines a pipeline with a looping statement. 6 """ 7 8 import random 9 10 from kale.sdk import step, pipeline 11 12 13 @step(name="generate") 14 def generate(): 15 gen = random.sample(range(0, 20), 2) 16 return gen 17 18 19 @step(name="printer") 20 def squared(value): 21 value = float(value) 22 print(value * value) 23 24 25 @pipeline(name="loop", experiment="kale-tutorial") 26 def looping_pipeline(): 27 res = generate() 28 for x in res: 29 squared(x) 30 31 32 if __name__ == "__main__": 33 looping_pipeline() The pipeline above shows how you can iterate over the result of a step to distribute an action over each item as parallel pipeline steps.
Important
Kale does not support a map-reduce like approach. You cannot aggregate the results of parallel steps that were produced using a for loop. This is due to some limitations in Kubeflow Pipelines. If you feel like this is a very important use case for you, reach out to your Arrikto contact person.
You can also use pipeline parameters and nested loops in your loop statements. The following snippet summarizes the changes in code:
loop_param.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-21 4 5 This script defines a pipeline with a looping statement. 6 """ 7 8 import random 9 10 from kale.sdk import step, pipeline 11 12 13 @step(name="generate") 14 def generate(): 15 gen = random.sample(range(0, 20), 2) 16 return gen 17 18 19 @step(name="printer") 20 def squared(value): 21 value = float(value) 22 print(value * value) 23 24 25 + @step(name="printer2") 26 + def printer(value): 27 + value = int(value) 28 + print(value) 29 + 30 + 31 @pipeline(name="loop", experiment="kale-tutorial") 32 - def looping_pipeline(): 33 + def looping_pipeline(arg_list=[0, 1]): 34 res = generate() 35 for x in res: 36 squared(x) 37 + for i in arg_list: 38 + printer(i) 39 40 41 if __name__ == "__main__": 42 looping_pipeline() Important
Due to a Kubeflow Pipelines limitation (upstream issue) you cannot iterate over items within nested loops if the list comes from outside the parent loop. For example, the following fails:
list1 = gen_list1_step() list2 = gen_list2_step() for i in list1: for j in list2: do(i, j)Kale also supports a constant list as an input to a for loop statement. The following snippet summarizes the changes in code:
loop_const.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-27 4 5 This script defines a pipeline with a looping statement. 6 """ 7 8 import random 9 10 from kale.sdk import step, pipeline 11 12 13 @step(name="generate") 14 def generate(): 15 gen = random.sample(range(0, 20), 2) 16 return gen 17 18 19 @step(name="printer") 20 def squared(value): 21 value = float(value) 22 print(value * value) 23 24 25 @step(name="printer2") 26 def printer(value): 27 value = int(value) 28 print(value) 29 30 31 + @step(name="printer3") 32 + def mult(value1, value2, value3): 33 + value1 = float(value1) 34 + value2 = float(value2) 35 + value3 = float(value3) 36 + print(value1 * value2 * value3) 37 + 38 + 39 + const_list = [2, 3] 40 + 41 + 42 @pipeline(name="loop", experiment="kale-tutorial") 43 def looping_pipeline(arg_list=[0, 1]): 44 res = generate() 45-45 45 for x in res: 46 squared(x) 47 for i in arg_list: 48 printer(i) 49 + for j in const_list: 50 + mult(x, i, j) 51 52 53 if __name__ == "__main__": 54 looping_pipeline() Important
Kale does not support a constant list initialized inside the pipeline function.
Finally, you can also pass a list directly to the for loop statement. The following snippet summarizes the changes in code:
loop_item.py1 # Copyright © 2021 Arrikto Inc. All Rights Reserved. 2 3 """Kale SDK. 4-35 4 5 This script defines a pipeline with a looping statement. 6 """ 7 8 import random 9 10 from kale.sdk import step, pipeline 11 12 13 @step(name="generate") 14 def generate(): 15 gen = random.sample(range(0, 20), 2) 16 return gen 17 18 19 @step(name="printer") 20 def squared(value): 21 value = float(value) 22 print(value * value) 23 24 25 @step(name="printer2") 26 def printer(value): 27 value = int(value) 28 print(value) 29 30 31 @step(name="printer3") 32 def mult(value1, value2, value3): 33 value1 = float(value1) 34 value2 = float(value2) 35 value3 = float(value3) 36 print(value1 * value2 * value3) 37 38 39 + @step(name="printer4") 40 + def hello(): 41 + print("Hello!") 42 + 43 + 44 const_list = [2, 3] 45 46 47-52 47 @pipeline(name="loop", experiment="kale-tutorial") 48 def looping_pipeline(arg_list=[0, 1]): 49 res = generate() 50 for x in res: 51 squared(x) 52 for i in arg_list: 53 printer(i) 54 for j in const_list: 55 mult(x, i, j) 56 + for z in [4, 5]: 57 + hello() 58 59 60 if __name__ == "__main__": 61 looping_pipeline() Deploy and run your code as a KFP pipeline:
$ python3 -m kale loop.py --kfpImportant
We currently don’t support iterating over a dictionary’s keys, like Kubeflow Pipelines does. We may support this in the future. Please contact Arrikto for more information.
Summary¶
You have successfully created and run a Kale pipeline with parallel steps produced by loop statements.
What’s Next¶
Check out how you can set Kubernetes configurations for your steps.