Create Parallel Steps with Loops

This guide will walk you through creating Kale pipelines with loops that create parallel steps in bulk.

What You’ll Need

  • An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
  • An understanding of how the Kale SDK works.

Procedure

  1. Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Create a new Python file and name it loop.py:

    $ touch loop.py
  3. Copy and paste the following code inside loop.py:

    loop.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script defines a pipeline with a looping statement.
    6"""
    7
    8import random
    9
    10from kale.sdk import step, pipeline
    11
    12
    13@step(name="generate")
    14def generate():
    15 gen = random.sample(range(0, 20), 2)
    16 return gen
    17
    18
    19@step(name="printer")
    20def squared(value):
    21 value = float(value)
    22 print(value * value)
    23
    24
    25@pipeline(name="loop", experiment="kale-tutorial")
    26def looping_pipeline():
    27 res = generate()
    28 for x in res:
    29 squared(x)
    30
    31
    32if __name__ == "__main__":
    33 looping_pipeline()

    The pipeline above shows how you can iterate over the result of a step to distribute an action over each item as parallel pipeline steps.

    Important

    Kale does not support a map-reduce like approach. You cannot aggregate the results of parallel steps that were produced using a for loop. This is due to some limitations in Kubeflow Pipelines. If you feel like this is a very important use case for you, reach out to your Arrikto contact person.

  4. You can also use pipeline parameters and nested loops in your loop statements. The following snippet summarizes the changes in code:

    loop_param.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-21
    4
    5This script defines a pipeline with a looping statement.
    6"""
    7
    8import random
    9
    10from kale.sdk import step, pipeline
    11
    12
    13@step(name="generate")
    14def generate():
    15 gen = random.sample(range(0, 20), 2)
    16 return gen
    17
    18
    19@step(name="printer")
    20def squared(value):
    21 value = float(value)
    22 print(value * value)
    23
    24
    25+@step(name="printer2")
    26+def printer(value):
    27+ value = int(value)
    28+ print(value)
    29+
    30+
    31@pipeline(name="loop", experiment="kale-tutorial")
    32-def looping_pipeline():
    33+def looping_pipeline(arg_list=[0, 1]):
    34 res = generate()
    35 for x in res:
    36 squared(x)
    37+ for i in arg_list:
    38+ printer(i)
    39
    40
    41if __name__ == "__main__":
    42 looping_pipeline()

    Important

    Due to a Kubeflow Pipelines limitation (upstream issue) you cannot iterate over items within nested loops if the list comes from outside the parent loop. For example, the following fails:

    list1 = gen_list1_step() list2 = gen_list2_step() for i in list1: for j in list2: do(i, j)
  5. Kale also supports a constant list as an input to a for loop statement. The following snippet summarizes the changes in code:

    loop_const.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-27
    4
    5This script defines a pipeline with a looping statement.
    6"""
    7
    8import random
    9
    10from kale.sdk import step, pipeline
    11
    12
    13@step(name="generate")
    14def generate():
    15 gen = random.sample(range(0, 20), 2)
    16 return gen
    17
    18
    19@step(name="printer")
    20def squared(value):
    21 value = float(value)
    22 print(value * value)
    23
    24
    25@step(name="printer2")
    26def printer(value):
    27 value = int(value)
    28 print(value)
    29
    30
    31+@step(name="printer3")
    32+def mult(value1, value2, value3):
    33+ value1 = float(value1)
    34+ value2 = float(value2)
    35+ value3 = float(value3)
    36+ print(value1 * value2 * value3)
    37+
    38+
    39+const_list = [2, 3]
    40+
    41+
    42@pipeline(name="loop", experiment="kale-tutorial")
    43def looping_pipeline(arg_list=[0, 1]):
    44 res = generate()
    45-45
    45 for x in res:
    46 squared(x)
    47 for i in arg_list:
    48 printer(i)
    49+ for j in const_list:
    50+ mult(x, i, j)
    51
    52
    53if __name__ == "__main__":
    54 looping_pipeline()

    Important

    Kale does not support a constant list initialized inside the pipeline function.

  6. Finally, you can also pass a list directly to the for loop statement. The following snippet summarizes the changes in code:

    loop_item.py
    1# Copyright © 2021 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-35
    4
    5This script defines a pipeline with a looping statement.
    6"""
    7
    8import random
    9
    10from kale.sdk import step, pipeline
    11
    12
    13@step(name="generate")
    14def generate():
    15 gen = random.sample(range(0, 20), 2)
    16 return gen
    17
    18
    19@step(name="printer")
    20def squared(value):
    21 value = float(value)
    22 print(value * value)
    23
    24
    25@step(name="printer2")
    26def printer(value):
    27 value = int(value)
    28 print(value)
    29
    30
    31@step(name="printer3")
    32def mult(value1, value2, value3):
    33 value1 = float(value1)
    34 value2 = float(value2)
    35 value3 = float(value3)
    36 print(value1 * value2 * value3)
    37
    38
    39+@step(name="printer4")
    40+def hello():
    41+ print("Hello!")
    42+
    43+
    44const_list = [2, 3]
    45
    46
    47-52
    47@pipeline(name="loop", experiment="kale-tutorial")
    48def looping_pipeline(arg_list=[0, 1]):
    49 res = generate()
    50 for x in res:
    51 squared(x)
    52 for i in arg_list:
    53 printer(i)
    54 for j in const_list:
    55 mult(x, i, j)
    56+ for z in [4, 5]:
    57+ hello()
    58
    59
    60if __name__ == "__main__":
    61 looping_pipeline()
  7. Deploy and run your code as a KFP pipeline:

    $ python3 -m kale loop.py --kfp

    Important

    We currently don’t support iterating over a dictionary’s keys, like Kubeflow Pipelines does. We may support this in the future. Please contact Arrikto for more information.

Summary

You have successfully created and run a Kale pipeline with parallel steps produced by loop statements.

What’s Next

Check out how you can set Kubernetes configurations for your steps.