Introduction to the Pipelines SDK

Overview of using the SDK to build components and pipelines

The Kubeflow Pipelines SDK provides a set of Python packages that you can use to specify and run your machine learning (ML) workflows. A pipeline is a description of an ML workflow, including all of the components that make up the steps in the workflow and how the components interact with each other.

Note: The SDK documentation here refers to Kubeflow Pipelines with Argo which is the default. If you are running Kubeflow Pipelines with Tekton instead, please follow the Kubeflow Pipelines SDK for Tekton documentation.

SDK packages

The Kubeflow Pipelines SDK includes the following packages:

  • kfp.compiler includes classes and methods for compiling pipeline Python DSL into a workflow yaml spec Methods in this package include, but are not limited to, the following:

    • kfp.compiler.Compiler.compile compiles your Python DSL code into a single static configuration (in YAML format) that the Kubeflow Pipelines service can process. The Kubeflow Pipelines service converts the static configuration into a set of Kubernetes resources for execution.
  • kfp.components includes classes and methods for interacting with pipeline components. Methods in this package include, but are not limited to, the following:

    • kfp.components.func_to_container_op converts a Python function to a pipeline component and returns a factory function. You can then call the factory function to construct an instance of a pipeline task (ContainerOp) that runs the original function in a container.

    • kfp.components.load_component_from_file loads a pipeline component from a file and returns a factory function. You can then call the factory function to construct an instance of a pipeline task (ContainerOp) that runs the component container image.

    • kfp.components.load_component_from_url loads a pipeline component from a URL and returns a factory function. You can then call the factory function to construct an instance of a pipeline task (ContainerOp) that runs the component container image.

  • kfp.dsl contains the domain-specific language (DSL) that you can use to define and interact with pipelines and components. Methods, classes, and modules in this package include, but are not limited to, the following:

    • kfp.dsl.PipelineParam represents a pipeline parameter that you can pass from one pipeline component to another. See the guide to pipeline parameters.

    • kfp.dsl.component is a decorator for DSL functions that returns a pipeline component. (ContainerOp).

    • kfp.dsl.pipeline is a decorator for Python functions that returns a pipeline.

    • kfp.dsl.python_component is a decorator for Python functions that adds pipeline component metadata to the function object.

    • kfp.dsl.types contains a list of types defined by the Kubeflow Pipelines SDK. Types include basic types like String, Integer, Float, and Bool, as well as domain-specific types like GCPProjectID and GCRPath. See the guide to DSL static type checking.

    • kfp.dsl.ResourceOp represents a pipeline task (op) which lets you directly manipulate Kubernetes resources (create, get, apply, …).

    • kfp.dsl.VolumeOp represents a pipeline task (op) which creates a new PersistentVolumeClaim (PVC). It aims to make the common case of creating a PersistentVolumeClaim fast.

    • kfp.dsl.VolumeSnapshotOp represents a pipeline task (op) which creates a new VolumeSnapshot. It aims to make the common case of creating a VolumeSnapshot fast.

    • kfp.dsl.PipelineVolume represents a volume used to pass data between pipeline steps. ContainerOps can mount a PipelineVolume either via the constructor’s argument pvolumes or add_pvolumes() method.

    • kfp.dsl.ParallelFor represents a parallel for loop over a static or dynamic set of items in a pipeline. Each iteration of the for loop is executed in parallel.

    • kfp.dsl.ExitHandler represents an exit handler that is invoked upon exiting a pipeline. A typical usage of ExitHandler is garbage collection.

    • kfp.dsl.Condition represents a group of ops, that will only be executed when a certain condition is met. The condition specified need to be determined at runtime, by incorporating at least one task output, or PipelineParam in the boolean expression.

  • kfp.Client contains the Python client libraries for the Kubeflow Pipelines API. Methods in this package include, but are not limited to, the following:

    • kfp.Client.create_experiment creates a pipeline experiment and returns an experiment object.
    • kfp.Client.run_pipeline runs a pipeline and returns a run object.
    • kfp.Client.create_run_from_pipeline_func compiles a pipeline function and submits it for execution on Kubeflow Pipelines.
    • kfp.Client.create_run_from_pipeline_package runs a local pipeline package on Kubeflow Pipelines.
    • kfp.Client.upload_pipeline uploads a local file to create a new pipeline in Kubeflow Pipelines.
    • kfp.Client.upload_pipeline_version uploads a local file to create a pipeline version. Follow an example to learn more about creating a pipeline version
  • Kubeflow Pipelines extension modules include classes and functions for specific platforms on which you can use Kubeflow Pipelines. Examples include utility functions for on premises, Google Cloud Platform (GCP), Amazon Web Services (AWS), and Microsoft Azure.

  • Kubeflow Pipelines diagnose_me modulesinclude classes and functions that help with environment diagnostic tasks.

    • kfp.cli.diagnose_me.dev_env reports on diagnostic metadata from your development environment, such as your python library version.
    • kfp.cli.diagnose_me.kubernetes_cluster reports on diagnostic data from your Kubernetes cluster, such as Kubernetes secrets.
    • kfp.cli.diagnose_me.gcp reports on diagnostic data related to your GCP environment.

Kubeflow Pipelines CLI tool

The Kubeflow Pipelines CLI tool enables you to use a subset of the Kubeflow Pipelines SDK directly from the command line. The Kubeflow Pipelines CLI tool provides the following commands:

  • kfp diagnose_me runs environment diagnostic with specified parameters.

    • --json - Indicates that this command must return its results as JSON. Otherwise, results are returned in human readable format.
    • --namespace TEXT - Specifies the Kubernetes namespace to use. all-namespaces is the default value.
    • --project-id TEXT - For GCP deployments, this value specifies the GCP project to use. If this value is not specified, the environment default is used.
  • kfp pipeline <COMMAND> provides the following commands to help you manage pipelines.

    • get - Gets detailed information about a Kubeflow pipeline from your Kubeflow Pipelines cluster.
    • list - Lists the pipelines that have been uploaded to your Kubeflow Pipelines cluster.
    • upload - Uploads a pipeline to your Kubeflow Pipelines cluster.
  • kfp run <COMMAND> provides the following commands to help you manage pipeline runs.

    • get - Displays the details of a pipeline run.
    • list - Lists recent pipeline runs.
    • submit - Submits a pipeline run.
  • kfp --endpoint <ENDPOINT> - Specifies the endpoint that the Kubeflow Pipelines CLI should connect to.

Installing the SDK

Follow the guide to installing the Kubeflow Pipelines SDK.

Building pipelines and components

This section summarizes the ways you can use the SDK to build pipelines and components:

The diagrams provide a conceptual guide to the relationships between the following concepts:

  • Your Python code
  • A pipeline component
  • A Docker container image
  • A pipeline

Creating components from existing application code

This section describes how to create a component and a pipeline outside your Python application, by creating components from existing containerized applications. This technique is useful when you have already created a TensorFlow program, for example, and you want to use it in a pipeline.

Creating components outside your application code

Below is a more detailed explanation of the above diagram:

  1. Write your application code, my-app-code.py. For example, write code to transform data or train a model.

  2. Create a Docker container image that packages your program (my-app-code.py) and upload the container image to a registry. To build a container image based on a given Dockerfile, you can use the Docker command-line interface or the kfp.compiler.build_docker_image method from the Kubeflow Pipelines SDK.

  3. Write a component function using the Kubeflow Pipelines DSL to define your pipeline’s interactions with the component’s Docker container. Your component function must return a kfp.dsl.ContainerOp. Optionally, you can use the kfp.dsl.component decorator to enable static type checking in the DSL compiler. To use the decorator, you can add the @kfp.dsl.component annotation to your component function:

    @kfp.dsl.component
    def my_component(my_param):
      ...
      return kfp.dsl.ContainerOp(
        name='My component name',
        image='gcr.io/path/to/container/image'
      )
    
  4. Write a pipeline function using the Kubeflow Pipelines DSL to define the pipeline and include all the pipeline components. Use the kfp.dsl.pipeline decorator to build a pipeline from your pipeline function. To use the decorator, you can add the @kfp.dsl.pipeline annotation to your pipeline function:

    @kfp.dsl.pipeline(
      name='My pipeline',
      description='My machine learning pipeline'
    )
    def my_pipeline(param_1: PipelineParam, param_2: PipelineParam):
      my_step = my_component(my_param='a')
    
  5. Compile the pipeline to generate a compressed YAML definition of the pipeline. The Kubeflow Pipelines service converts the static configuration into a set of Kubernetes resources for execution.

    To compile the pipeline, you can choose one of the following options:

    • Use the kfp.compiler.Compiler.compile method:

      kfp.compiler.Compiler().compile(my_pipeline,  
        'my-pipeline.zip')
      
    • Alternatively, use the dsl-compile command on the command line.

      dsl-compile --py [path/to/python/file] --output my-pipeline.zip
      
  6. Use the Kubeflow Pipelines SDK to run the pipeline:

    client = kfp.Client()
    my_experiment = client.create_experiment(name='demo')
    my_run = client.run_pipeline(my_experiment.id, 'my-pipeline', 
      'my-pipeline.zip')
    

You can also choose to share your pipeline as follows:

Creating components within your application code

This section describes how to create a pipeline component inside your Python application, as part of the application. The DSL code for creating a component therefore runs inside your Docker container.

Building components within your application code

Below is a more detailed explanation of the above diagram:

  1. Write your code in a Python function. For example, write code to transform data or train a model:

    def my_python_func(a: str, b: str) -> str:
      ...
    
  2. Use the kfp.dsl.python_component decorator to convert your Python function into a pipeline component. To use the decorator, you can add the @kfp.dsl.python_component annotation to your function:

    @kfp.dsl.python_component(
      name='My awesome component',
      description='Come and play',
    )
    def my_python_func(a: str, b: str) -> str:
      ...
    
  3. Use kfp.compiler.build_python_component to create a container image for the component.

    my_op = compiler.build_python_component(
      component_func=my_python_func,
      staging_gcs_path=OUTPUT_DIR,
      target_image=TARGET_IMAGE)
    
  4. Write a pipeline function using the Kubeflow Pipelines DSL to define the pipeline and include all the pipeline components. Use the kfp.dsl.pipeline decorator to build a pipeline from your pipeline function, by adding the @kfp.dsl.pipeline annotation to your pipeline function:

    @kfp.dsl.pipeline(
      name='My pipeline',
      description='My machine learning pipeline'
    )
    def my_pipeline(param_1: PipelineParam, param_2: PipelineParam):
      my_step = my_op(a='a', b='b')
    
  5. Compile the pipeline to generate a compressed YAML definition of the pipeline. The Kubeflow Pipelines service converts the static configuration into a set of Kubernetes resources for execution.

    To compile the pipeline, you can choose one of the following options:

    • Use the kfp.compiler.Compiler.compile method:

      kfp.compiler.Compiler().compile(my_pipeline,  
        'my-pipeline.zip')
      
    • Alternatively, use the dsl-compile command on the command line.

      dsl-compile --py [path/to/python/file] --output my-pipeline.zip
      
  6. Use the Kubeflow Pipelines SDK to run the pipeline:

    client = kfp.Client()
    my_experiment = client.create_experiment(name='demo')
    my_run = client.run_pipeline(my_experiment.id, 'my-pipeline', 
      'my-pipeline.zip')
    

You can also choose to share your pipeline as follows:

Creating lightweight components

This section describes how to create lightweight Python components that do not require you to build a container image. Lightweight components simplify prototyping and rapid development, especially in a Jupyter notebook environment.

Building lightweight Python components

Below is a more detailed explanation of the above diagram:

  1. Write your code in a Python function. For example, write code to transform data or train a model:

    def my_python_func(a: str, b: str) -> str:
      ...
    
  2. Use kfp.components.func_to_container_op to convert your Python function into a pipeline component:

    my_op = kfp.components.func_to_container_op(my_python_func)
    

    Optionally, you can write the component to a file that you can share or use in another pipeline:

    my_op = kfp.components.func_to_container_op(my_python_func, 
      output_component_file='my-op.component')
    
  3. If you stored your lightweight component in a file as described in the previous step, use kfp.components.load_component_from_file to load the component:

    my_op = kfp.components.load_component_from_file('my-op.component')
    
  4. Write a pipeline function using the Kubeflow Pipelines DSL to define the pipeline and include all the pipeline components. Use the kfp.dsl.pipeline decorator to build a pipeline from your pipeline function, by adding the @kfp.dsl.pipeline annotation to your pipeline function:

    @kfp.dsl.pipeline(
      name='My pipeline',
      description='My machine learning pipeline'
    )
    def my_pipeline(param_1: PipelineParam, param_2: PipelineParam):
      my_step = my_op(a='a', b='b')
    
  5. Compile the pipeline to generate a compressed YAML definition of the pipeline. The Kubeflow Pipelines service converts the static configuration into a set of Kubernetes resources for execution.

    To compile the pipeline, you can choose one of the following options:

    • Use the kfp.compiler.Compiler.compile method:

      kfp.compiler.Compiler().compile(my_pipeline,  
        'my-pipeline.zip')
      
    • Alternatively, use the dsl-compile command on the command line.

      dsl-compile --py [path/to/python/file] --output my-pipeline.zip
      
  6. Use the Kubeflow Pipelines SDK to run the pipeline:

    client = kfp.Client()
    my_experiment = client.create_experiment(name='demo')
    my_run = client.run_pipeline(my_experiment.id, 'my-pipeline', 
      'my-pipeline.zip')
    

Using prebuilt, reusable components in your pipeline

A reusable component is one that someone has built and made available for others to use. To use the component in your pipeline, you need the YAML file that defines the component.

Using prebuilt, reusable components in your pipeline

Below is a more detailed explanation of the above diagram:

  1. Find the YAML file that defines the reusable component. For example, take a look at the reusable components and other shared resources.

  2. Use kfp.components.load_component_from_url to load the component:

    my_op = kfp.components.load_component_from_url('https://path/to/component.yaml')
    
  3. Write a pipeline function using the Kubeflow Pipelines DSL to define the pipeline and include all the pipeline components. Use the kfp.dsl.pipeline decorator to build a pipeline from your pipeline function, by adding the @kfp.dsl.pipeline annotation to your pipeline function:

    @kfp.dsl.pipeline(
      name='My pipeline',
      description='My machine learning pipeline'
    )
    def my_pipeline(param_1: PipelineParam, param_2: PipelineParam):
      my_step = my_op(a='a', b='b')
    
  4. Compile the pipeline to generate a compressed YAML definition of the pipeline. The Kubeflow Pipelines service converts the static configuration into a set of Kubernetes resources for execution.

    To compile the pipeline, you can choose one of the following options:

    • Use the kfp.compiler.Compiler.compile method:

      kfp.compiler.Compiler().compile(my_pipeline,  
        'my-pipeline.zip')
      
    • Alternatively, use the dsl-compile command on the command line.

      dsl-compile --py [path/to/python/file] --output my-pipeline.zip
      
  5. Use the Kubeflow Pipelines SDK to run the pipeline:

    client = kfp.Client()
    my_experiment = client.create_experiment(name='demo')
    my_run = client.run_pipeline(my_experiment.id, 'my-pipeline', 
      'my-pipeline.zip')
    

Next steps