How to use the Kale SDK

This guide describes how to use the Kale SDK to write and deploy pipelines to Kubeflow Pipelines.

Kale provides the simplest way possible to convert your Python code into a scalable pipeline, run workflows with hyperparameter tuning, track them automatically using MLMD, and version pipelines by taking Rok snapshots each step of the way.

Whether running code from a Jupyter Notebook, or writing a more complex Python project (for example, using VSCode), Kale has you covered.

You can find extensive documentation about how to use Kale to convert notebooks to pipeline in the following guides:

  1. Codelab: Titanic example - Notebook to single Kubeflow pipeline run
  2. Codelab: Dog Breed example - Notebook to HP tuning Kubeflow pipeline runs orchestrated by Katib

If you are developing a more complex Python project, read on to learn how the Kale SDK allows you to seamlessly convert it to a scalable Kubeflow pipeline.

Software Development Kit (SDK)

Note

The Kale SDK is available in one of the jupyter-kale, vscode-kale (or other *-kale) images, accessible from Kubeflow’s dashboard, after completing the Kubeflow integration guide.

Overview

The aim of the Kale SDK is to allow you to write plain Python code and then be able to convert it to fully reproducible Kubeflow pipelines without making any change to the original source code.

All you have to do, is to decorate the Python functions that will become pipeline steps, and decorate the single function that acts as the main entry point.

Let’s take a look at a simple example:

@step(name="data_processing")
def process(timestamp):
    import remote_service_lib
    from local_package import data_processor

    data = remote_service_lib.get_data(timestamp)
    train, validate = data_processor(data)
    return train, validate

@step(name="data_validation")
def validate(train_data, validate_data):
    from local_package import data_validator

    # data_validator raises an exception if data is not valid
    train, validate = data_validator(train_data, validate_data)
    return train, validate

@step(name="model_training")
def train(train_data, validate_data, training_iterations):
    from local_package import Model

    model = Model(training_iterations)
    model.train(train_data)
    print(model.predict(validate_data))

In the above snippet we have defined three functions that perform some trivial machine learning task:

  1. Fetch some data from an external service, given an input timestamp
  2. Process the data by splitting it into a training and evaluation dataset
  3. Validate the processed data
  4. Train a model and show its performance

After you write and test these functions separately, the time comes to make them work together. A common approach is to write another function. Here is an example:

def ml_experiment(ts, iters):
  train, validate = process(ts)
  train_valid, validate_valid = validate(train, validate)
  train(train_valid, validate_valid, iters)

if __name__ == "__main__":
  # Note: reading arguments directly from `sys.argv` is not a good
  # practice in general. Consider using some specific library, like
  # `argparse`
  ts = sys.argv[1]
  iters = sys.argv[2]
  ml_experiments(ts, iters)

Let’s now see how the above code can be converted to a pipeline using the Kale SDK:

from kale.sdk import pipeline, step

@step(name="data_processing")
def process(timestamp):
    # <same code as above>

@step(name="data_validation")
def validate(train_data, validate_data):
    # <same code as above>

@step(name="model_training")
def train(train_data, validate_data, training_iterations):
    # <same code as above>

@pipeline(name="model_training", experiment="project_phoenix")
def ml_experiment(ts="19990312", iters=10):
    # <same code as above>

if __name__ == "__main__":
    # provide values to the pipeline parameters
    ml_experiments(ts="...", iters=100)

Instructing Kale to use these functions as pipeline steps is as simple as decorating them with the step decorator. Also, Kale is able to infer the pipeline structure by itself, by looking at how the steps are executed in the pipeline decorated function. This way, it defining branching pipelines that can run some code in parallel becomes essential.

Writing a diamond-shaped pipeline would look like this:

@pipeline(name="parallel_steps", experiment="cool_pipelines")
def multiple_trainings():
    data = get_data()
    res1 = train_and_predict_random_forest(data)
    res2 = train_and_predict_svm(data)
    compare(res1, res2)

In this example, Kale detects that the two train functions have the same dependency and thus can be executed in parallel. A final step, compare, will then run with the results of the two parallel steps as input.

Running a Python script with a pipeline decorated entry point function from CLI (e.g., python run.py) will actually just run your Python code, with an important addition: Kale will simulate the data passing between functions (steps) that would happen in Kubeflow Pipelines, so that you can detect and fix errors early during development.

Once you are ready to see your pipeline running in Kubeflow, all you need to do to convert the existing code is to run the same script, with the addition of the --kfp flag (e.g., python run.py --kfp).

Pipeline parameters

You can create a parameterized pipeline with the Kale SDK by adding arguments to the function decorated as @pipeline. E.g., in the previous example, the pipeline resulting from the compilation of function ml_experiment will have two parameters named ts and iters.

Note that you always need to provide default values for the parameters and that these defaults will end up in the definition of the uploaded pipeline. You can then override them by calling the pipeline function with new argument values, or set different values when creating a Run from the KFP UI.

Head to the KFP macros guide to learn how to provide dynamic values as input to your pipelines.

Library

kale.sdk.step

The step decorator can be called with the following arguments:

  1. name (str): The name of the resulting pipeline step.
  2. timeout (int): Set a timeout in seconds that starts just before the user-code execution. When the timeout expires, Kale will stop the step execution and mark it as failed. Useful when, for example, the step might get stuck in some blocking call to an external service.
  3. retry_count (int): The number of times the step will be retried, in case of failure.
  4. retry_interval (str): The time interval between retries. Defaults to 0 seconds (retry immediately). In case you specify a simple number, the unit defaults to seconds. You can also specify a different unit, for instance, 2m (2 minutes), 1h (1 hour).
  5. retry_factor (int): The exponential backoff factor applied to retry_interval. For example, if retry_interval="60" (60 seconds) and retry_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
  6. retry_max_interval: The maximum interval that can be reached with the backoff strategy.

kale.sdk.pipeline

The pipeline decorator can be called with the following arguments:

  1. name (str): The name of the resulting pipeline.
  2. timeout (int): Sets a TTL for the entire pipeline run. When the timeout expires, all the running steps are stopped and marked as failed.
  3. experiment (str): The name of the experiment that Kale will create (if not exists) and where the pipeline will run.
  4. description (str): A textual description to be applied to the pipeline
  5. autosnapshot (bool): Set to True to enable Rok to take snapshots of the volumes before and after the pipeline steps.
  6. steps_defaults (dict): Use this dict to apply the same configuration to all the steps of the pipeline. For example use {"retry_count": 3} if you want every step of the pipeline ti be retried at most 3 times in case of failure.

Limitations

The @pipeline decorated function poses some syntax restrictions, as Kale needs to parse it to create a corresponding pipeline representation. Whenever these restrictions are not met, Kale will try to fail gracefully and inform you how you should fix it. These are the notable constraints:

  1. You can add input arguments to define pipeline parameters. All input arguments expect a default value.
  2. The body of the function does not accept arbitrary Python statements. All you can write is function calls, chaining them together with their return arguments.
  3. Each line should contain a function call with its return value.
  4. Use tuple unpacking to return multiple values.

Regarding the @step decorated functions:

  1. The function should have all of its import statements inside the function itself, otherwise Kale will not be able to resolve them while running the Kubeflow pipeline. This means that the function should be self-contained, by not relying on external import statements.
  2. The function should be defined either in the same file as the pipeline function, or in its own file. In the latter case, the file should live in the same directory as the pipeline function.
  3. The function cannot use other functions or objects defined in the same source file, it should import what it needs from other files.

Important

Some of these limitations are temporary and will be addressed in future versions of Kale. Our aim is to allow any Python project to be converted to a scalable Kubeflow pipeline, just by using decorators (i.e. without needing to touch your code)

Relative Imports

Take a look back at the first example. We are importing local_package, a package that lives in the current directory and contains some useful helper functions that we want to user in our pipeline.

When running python <script> --kfp, Rok takes an immutable snapshot of the current environment, and uses a clone of this snapshot to fully reproduce your environment in the pipeline steps. In this way, all the code you develop locally and the libraries you install, will be available in the pipeline steps.

Using relative imports works seamlessly between your dev environment and the Kubeflow pipelines execution, making it easy to build complex Python projects that span multiple files and packages, while still being able to orchestrate everything with a few decorators.

Besides importing local packages or scripts, having Rok as a storage management layer also allows you to seamlessly read from other local files. A typical example might be to store some common configuration in a settings.json file that can be parsed by the pipeline steps.

# settings.json
# {
#   "setting1": "value1",
#   "setting2": "value2",
#   ...
# }

@step(name="step1")
def foo():
    settings = open("settings.json", "r").read()
    # configure code based on settings["setting1"]

@step(name="step2")
def bar():
    settings = open("settings.json", "r").read()
    # configure code based on settings["setting2"]

General orchestration

The Kale SDK allows you to easily write pipelines that run any kind of workload. Python can be used to run any software or CLI utility, thus becoming the entry point and orchestrator for any process.

Navigate to the Command execution guide to learn how you can leverage Arrikto’s cmdutils library to run any process from Python.