Skip to main content

Almanak KFP SDK Playbook Documentation

Table of Contents

  1. Introduction
  2. Requirements
  3. Authentication
  4. Error Handling
  5. Pipeline Documentation API
  6. AlmanakKFPClient Playbook
  7. Example Usage
  8. Customization and Extensibility
  9. Limitations and Known Issues

Introduction

The Almanak KFP SDK, part of the broader Almanak MLOps SDK provides APIs that facilitate the writing, management, documentation, and uploading of KFP pipelines as part of the MLE backend.

Use Cases:

  • Triggering KFP from the backend
  • Writing, documenting, and updating KFP for backend usage

Requirements

Prerequisites:

  • Python: Version 3.9 or above
  • Kubeflow Pipelines Cluster: Version 1.7 or above
  • SDK: The Almanak MLOps SDK must be installed

Authentication

Authentication with the Almanak KFP SDK requires the following setup:

  • Host: Point to the Almanak KFP cluster
  • OAuth ID: Use the Kubeflow OAuth client ID
  • GCP Service Account Keyfile: Set the JSON keyfile path for `GOOGLE_APPLICATION_CREDENTIALS

AlmanakKFPClient Playbook

Overview

The AlmanakKFPClient class is an extended version of the Kubeflow Pipelines (KFP) Client, provides a comprehensive interface for interacting with Kubeflow Pipelines in the Almanak environment. It include methods for listing, managing, compiling, uploading, and running pipelines and their versions.

Initialization

client = AlmanakKFPClient(
host=os.environ["KFP_HOST"],
client_id=os.environ["OAUTH_CLIENT_ID"]
)

Methods

get_experiment_by_name(experiment_name: Optional[str] = None) -> ApiExperiment
  • Description: Retrieves the experiment by its name. Defaults to the default experiment.

  • Example:

    experiment = client.get_experiment_by_name("my-experiment")
get_pipeline_by_name(pipeline_name: str) -> Optional[ApiPipeline]
  • Description: Retrieves the pipeline by its name.

  • Example:

    pipeline = client.get_pipeline_by_name("my-pipeline")
run_pipeline_by_name(pipeline_name: str, job_name:str, pipeline_parameters: Dict[str, Any]) -> ApiRun
  • Description: Runs a pipeline by its name, given a unique job name, parameters, and an optional version.

  • Example:

    run = client.run_pipeline_by_name(
    pipeline_name="hello-world",
    job_name="hello-world-job",
    pipeline_parameters={"run_id": "123456"}
    )
get_pipelines() -> Dict[str, ApiPipeline]
  • Description: Retrieves a dictionary of all pipelines.

  • Example:

    pipelines = client.get_pipelines()
get_all_pipeline_versions_by_name(pipeline_name: str) -> List[ApiPipelineVersion]
  • Description: Retrieves all versions of a specific pipeline.

  • Example:

    versions = client.get_all_pipeline_versions_by_name("my-pipeline")
compile_pipeline(pipeline_function: Callable, ir_yaml_package_path: str)
  • Description: Compiles a pipeline function into the specified YAML package path.

  • Example:

    client.compile_pipeline(my_pipeline_function, "path/to/compiled.yaml")
upload_a_pipeline(...)
  • Description: Uploads a compiled pipeline to the KFP API, either creating a new pipeline or updating an existing one.

  • Example:

    client.upload_a_pipeline("path/to/compiled.yaml", "my-pipeline")
compile_and_upload_pipeline(...)
  • Description: Compiles and uploads a pipeline in a single step.

  • Example:

    client.compile_and_upload_pipeline(my_pipeline_function, "path/to/compiled.yaml", "my-pipeline")
upload_pipeline_source_version_to_gcs(...)
  • Description: Creates and uploads a component spec YAML based on the pipeline's source code to a Google Cloud Storage location.

  • Example:

    url = client.upload_pipeline_source_version_to_gcs("my-pipeline", "version-id", "source-code")

Example Usage

Using the Almanak KFP SDK with an example hello world pipeline:

from kfp.v2 import dsl

PIPELINE_NAME = "hello-world-pipeline"


@dsl.component(
base_image="europe-west4-docker.pkg.dev/almanak-production/almanak-mlops-sdk/almanak-mlops-sdk",
)
def hello_world(run_id: str):
print(f"Hello world from run id {run_id}")


@dsl.pipeline(
name="hello-word-pipeline",
description="Prints hello world",
)
def hello_world_pipeline(
run_id: str,
):
"""Using run_id to make pipeline runs unique"""
hello_world_task = hello_world(run_id=run_id)
hello_world_task


if __name__ == "__main__":
from pathlib import Path
from almanak_mlops.kfp import AlmanakKFPClient
import os

pipeline_yaml_location = str(
Path(__file__).parent / f"{PIPELINE_NAME}.pipeline.yaml"
)

client = AlmanakKFPClient(
host=os.environ["KFP_HOST"],
client_id=os.environ["OAUTH_CLIENT_ID"],
)

client.compile_and_upload_pipeline(
hello_world_pipeline, pipeline_yaml_location, PIPELINE_NAME
)

Using the Almanak KFP SDK to trigger a KFP pipeline run:

client = AlmanakKFPClient(
host=os.environ["KFP_HOST"],
client_id=os.environ["OAUTH_CLIENT_ID"],
)
"""List all pipelines and their data"""

print(client.get_pipelines())

"""For a given pipeline, list all versions and their data"""

print(client.get_all_pipeline_versions_by_name("hello-world-rx"))

"""Call a pipeline by name and optionally a version"""
run: ApiRun = client.run_pipeline_by_name(
pipeline_name="hello-world-rx",
job_name=f"hello-world-rx-{uuid4().hex}",
pipeline_parameters={"run_id": uuid4().hex},
)
run_id: str = run.id

"""Get the status of a pipeline run by name and optionally a version"""
retrieved_run: ApiRun = client.get_run(run_id)
print(retrieved_run)

Pipeline Documentation API

The Almanak KFP SDK includes an API for documenting the structure and parameters of KFP pipelines. This documentation API leverages Pydantic models and JSON Schema to define and describe both external and internal parameters.

Table of Contents

  1. Introduction
  2. JSONSchema Generation
  3. Internal and External Metadata
  4. Generate Pipeline Description
  5. Example Usage

Introduction

The pipeline documentation API enables developers to define detailed metadata for pipelines, including parameter descriptions, default values, sample values, and data types. This information can be used to generate human-readable documentation, as well as to guide the user interface of the frontend applications that interact with the pipelines.

JSONSchema Generation

Overview

JSONSchema is used to describe the structure of the external metadata. It can be generated from a Pydantic model using the schema_json() method.

Example:

from pydantic import BaseModel

class MyModel(BaseModel):
id: int
name: str
is_active: Optional[bool]

external_metadata_schema = MyModel.schema_json()
external_metadata = JSONSchema(external_metadata_schema)

Internal and External Metadata

Overview

Pipelines can have both internal and external metadata.

  • Internal Metadata: Defines parameters used by the backend. Created using PipelineInternalParameterMetadata.
  • External Metadata: Defines parameters to be displayed on the frontend. Created using PipelineExternalParameterMetadata or a JSONSchema derived from a Pydantic model.

Internal Metadata

from typing import Optional

class PipelineInternalParameterMetadata(BaseModel):
argument_name: str
datatype: str = "string"
usage_description: str
sample_value: Optional[str]
default_value: Optional[str]

External Metadata

class PipelineExternalParameterMetadata(BaseModel):
argument_name: str
display_name: str
usage_description: str
datatype: str = "string"
sample_value: Optional[str]
default_value: Optional[str]

Generate Pipeline Description

The function generate_pipeline_description combines internal and external metadata into a JSON-formatted description.

def generate_pipeline_description(PipelineMetadata: PipelineMetadata):
return json.dumps(PipelineMetadata.dict())

Example Usage

Here's how you can combine all of these elements into a full pipeline description:

external_metadata_schema = MyModel.schema_json()
external_metadata = JSONSchema(external_metadata_schema)

internal_metadata = [
PipelineInternalParameterMetadata(
argument_name="namespace",
default_value="rx",
usage_description="The kfp namespace to use for the simulation",
),
]

pipeline_metadata = PipelineMetadata(
external_metadata=external_metadata,
internal_metadata=internal_metadata,
is_public_pipeline=True,
)

pipeline_description = generate_pipeline_description(pipeline_metadata)