Declare Input and Output Data Types

This guide will walk you through annotating the step parameters of a Kale pipeline, using the Kale SDK, to declare the types of the input and output data of each step.

What You’ll Need

  • An Arrikto EKF or MiniKF deployment with the default Kale Docker image.
  • An understanding of how the Kale SDK works.

Procedure

  1. Create a new notebook server using the default Kale Docker image. The image will have the following naming scheme:

    gcr.io/arrikto/jupyter-kale-py38:<IMAGE_TAG>

    Note

    The <IMAGE_TAG> varies based on the MiniKF or EKF release.

  2. Connect to the server, open a terminal, create a new Python file, and name it kale_typing.py:

    $ touch kale_typing.py
  3. Copy and paste the following code inside kale_typing.py:

    kale_typing.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4
    5This script runs a pipeline that calculates the outer product of two lists.
    6"""
    7
    8import numpy as np
    9
    10from kale.sdk import step, pipeline
    11from random import randint
    12
    13
    14@step(name="generate-lists")
    15def list_gen(len_1, len_2):
    16 """Generate two random lists of given lengths."""
    17 list_1 = [randint(1, 100) for _ in range(int(len_1))]
    18 list_2 = [randint(1, 100) for _ in range(int(len_2))]
    19 return list_1, list_2
    20
    21
    22@step(name="multiply-lists")
    23def list_mul(l_1, l_2):
    24 """Multiply two lists and create two outer-product matrices."""
    25 return np.outer(l_1, l_2), np.outer(l_2, l_1)
    26
    27
    28@step(name="print-matrices")
    29def mat_print(mat_1, mat_2):
    30 """Print two matrices."""
    31 print(mat_1, mat_2)
    32
    33
    34@pipeline(name="typed-pipeline", experiment="kale-tutorial")
    35def pipeline_func(len_1=42, len_2=17):
    36 """The pipeline function."""
    37 l_1, l_2 = list_gen(len_1, len_2)
    38 mat_1, mat_2 = list_mul(l_1, l_2)
    39 mat_print(mat_1, mat_2)
    40
    41
    42if __name__ == "__main__":
    43 pipeline_func()

    In this code example, we define a pipeline with three steps:

    • The first step (list_gen) creates two lists with random integers and returns them.
    • The second step (list_mul) calculates the outer products (l_1 X l_2 and l_2 X l_1) of these two lists and returns them.
    • The third step (mat_print) prints the results of the previous step.
  4. Add type hints for each pipeline parameter and each input and output parameter of the pipeline steps. The following snippet summarizes the changes in code:

    kale_typing_hints.py
    1# Copyright © 2021-2022 Arrikto Inc. All Rights Reserved.
    2
    3"""Kale SDK.
    4-6
    4
    5This script runs a pipeline that calculates the outer product of two lists.
    6"""
    7
    8import numpy as np
    9
    10+from typing import Tuple, List, NamedTuple
    11+from kale.types import MarshalData
    12from kale.sdk import step, pipeline
    13from random import randint
    14
    15
    16@step(name="generate-lists")
    17-def list_gen(len_1, len_2):
    18+def list_gen(len_1: int = 7, len_2: int = 13) -> Tuple[List, List]:
    19 """Generate two random lists of given lengths."""
    20- list_1 = [randint(1, 100) for _ in range(int(len_1))]
    21- list_2 = [randint(1, 100) for _ in range(int(len_2))]
    22+ list_1 = [randint(1, 100) for _ in range(len_1)]
    23+ list_2 = [randint(1, 100) for _ in range(len_2)]
    24 return list_1, list_2
    25
    26
    27@step(name="multiply-lists")
    28-def list_mul(l_1, l_2):
    29+def list_mul(l_1: List, l_2: List) \
    30+ -> NamedTuple("outputs", [("arr_1", MarshalData[np.ndarray]),
    31+ ("arr_2", MarshalData[np.ndarray])]):
    32 """Multiply two lists and create two outer-product matrices."""
    33 return np.outer(l_1, l_2), np.outer(l_2, l_1)
    34
    35
    36@step(name="print-matrices")
    37-def mat_print(mat_1, mat_2):
    38+def mat_print(mat_1: MarshalData[np.ndarray], mat_2: MarshalData[np.ndarray]):
    39 """Print two matrices."""
    40 print(mat_1, mat_2)
    41
    42
    43@pipeline(name="typed-pipeline", experiment="kale-tutorial")
    44-def pipeline_func(len_1=42, len_2=17):
    45+def pipeline_func(len_1: int = 42, len_2: int = 17):
    46 """The pipeline function."""
    47- l_1, l_2 = list_gen(len_1, len_2)
    48+ l_1, l_2 = list_gen() # now we can even use default inputs: 7 and 13
    49 mat_1, mat_2 = list_mul(l_1, l_2)
    50 mat_print(mat_1, mat_2)
    51
    52
    53if __name__ == "__main__":
    54 pipeline_func()

    Declaring input and output types allows you to decide which of the following data-passing mechanisms to use for each parameter:

    1. Plain value data passing: Use the underlying KFP mechanism for passing plain Python values that are string-serializable. These plain values can be: bool, float, str, small lists or dicts. Passing data with this mechanism is necessary in certain occasions, such as when using conditional statements or loops.
    2. Marshalling: Use Kale’s Marshalling mechanism for larger, more complex objects. This mechanism uses a shared volume where Kale can serialize and de-serialize data to pass them from one step to the next.

    In the above code example, we use both of these mechanisms. We declare the type and the data-passing mechanism of step inputs in two ways:

    • By using regular type hints, such as int in step list_gen or List in step multiply-lists. These type hints declare that the step expects its input to be a plain value of the declared type.
    • By using Kale’s MarshalData[<type>] type annotation, such as MarshalData[np.ndarray] in step mat_print. This type hint declares that the step expects its input to be of type <type>, passed using Kale’s marshalling mechanism.

    Similarly, we declare the type and the data-passing mechanism of step outputs in two ways:

    • By using regular type hints, such as List in step list_gen. This type hint declares that the step expects this output to be a plain value of the declared type.
    • By using Kale’s MarshalData[<type>] type hint, such as MarshalData[np.ndarray] in step multiply-lists. This type hint declares that the step expects this output to be of type <type>, passed using Kale’s marshalling mechanism.

    Following steps that want to consume these outputs must declare their inputs with the same type hint.

    Note

    The MarshalData[<type>] annotation can also:

    • receive a string <type> input argument (e.g. MarshalData["MyCustomType"]).
    • have no <type> input argument (MarshalData).

    Note

    Omitting the type annotation of a step parameter, entirely, is equivalent to using MarshalData, and for a pipeline parameter it is equivalent to using str.

    Important

    MarshalData type hints are not supported in:

    • pipeline parameters,
    • step inputs with default values.

    In these cases, you can only use primitive types.

    Important

    To declare multiple outputs, either use

    • a NamedTuple, in which case you also need to declare names for the parameters, or
    • a Tuple, in which case you only declare the types of the parameters and Kale decides on their names.

    The names of the output parameters appear only in the reusable KFP component that corresponds to the step.

Summary

You have successfully created steps with annotated parameters and run them in pipelines.

What’s Next

The next step is to create and run parallel steps.