Almanak KFP SDK Playbook Documentation
Table of Contents
- Introduction
- Requirements
- Authentication
- Error Handling
- Pipeline Documentation API
- AlmanakKFPClient Playbook
- Example Usage
- Customization and Extensibility
- Limitations and Known Issues
Introduction
The Almanak KFP SDK, part of the broader Almanak MLOps SDK provides APIs that facilitate the writing, management, documentation, and uploading of KFP pipelines as part of the MLE backend.
Use Cases:
- Triggering KFP from the backend
- Writing, documenting, and updating KFP for backend usage
Requirements
Prerequisites:
- Python: Version 3.9 or above
- Kubeflow Pipelines Cluster: Version 1.7 or above
- SDK: The Almanak MLOps SDK must be installed
Authentication
Authentication with the Almanak KFP SDK requires the following setup:
- Host: Point to the Almanak KFP cluster
- OAuth ID: Use the Kubeflow OAuth client ID
- GCP Service Account Keyfile: Set the JSON keyfile path for `GOOGLE_APPLICATION_CREDENTIALS
AlmanakKFPClient Playbook
Overview
The AlmanakKFPClient
class is an extended version of the Kubeflow Pipelines (KFP) Client, provides a comprehensive interface for interacting with Kubeflow Pipelines in the Almanak environment. It include methods for listing, managing, compiling, uploading, and running pipelines and their versions.
Initialization
client = AlmanakKFPClient(
host=os.environ["KFP_HOST"],
client_id=os.environ["OAUTH_CLIENT_ID"]
)
Methods
get_experiment_by_name(experiment_name: Optional[str] = None) -> ApiExperiment
Description: Retrieves the experiment by its name. Defaults to the default experiment.
Example:
experiment = client.get_experiment_by_name("my-experiment")
get_pipeline_by_name(pipeline_name: str) -> Optional[ApiPipeline]
Description: Retrieves the pipeline by its name.
Example:
pipeline = client.get_pipeline_by_name("my-pipeline")
run_pipeline_by_name(pipeline_name: str, job_name:str, pipeline_parameters: Dict[str, Any]) -> ApiRun
Description: Runs a pipeline by its name, given a unique job name, parameters, and an optional version.
Example:
run = client.run_pipeline_by_name(
pipeline_name="hello-world",
job_name="hello-world-job",
pipeline_parameters={"run_id": "123456"}
)
get_pipelines() -> Dict[str, ApiPipeline]
Description: Retrieves a dictionary of all pipelines.
Example:
pipelines = client.get_pipelines()
get_all_pipeline_versions_by_name(pipeline_name: str) -> List[ApiPipelineVersion]
Description: Retrieves all versions of a specific pipeline.
Example:
versions = client.get_all_pipeline_versions_by_name("my-pipeline")
compile_pipeline(pipeline_function: Callable, ir_yaml_package_path: str)
Description: Compiles a pipeline function into the specified YAML package path.
Example:
client.compile_pipeline(my_pipeline_function, "path/to/compiled.yaml")
upload_a_pipeline(...)
Description: Uploads a compiled pipeline to the KFP API, either creating a new pipeline or updating an existing one.
Example:
client.upload_a_pipeline("path/to/compiled.yaml", "my-pipeline")
compile_and_upload_pipeline(...)
Description: Compiles and uploads a pipeline in a single step.
Example:
client.compile_and_upload_pipeline(my_pipeline_function, "path/to/compiled.yaml", "my-pipeline")
upload_pipeline_source_version_to_gcs(...)
Description: Creates and uploads a component spec YAML based on the pipeline's source code to a Google Cloud Storage location.
Example:
url = client.upload_pipeline_source_version_to_gcs("my-pipeline", "version-id", "source-code")
Example Usage
Using the Almanak KFP SDK with an example hello world pipeline:
from kfp.v2 import dsl
PIPELINE_NAME = "hello-world-pipeline"
@dsl.component(
base_image="europe-west4-docker.pkg.dev/almanak-production/almanak-mlops-sdk/almanak-mlops-sdk",
)
def hello_world(run_id: str):
print(f"Hello world from run id {run_id}")
@dsl.pipeline(
name="hello-word-pipeline",
description="Prints hello world",
)
def hello_world_pipeline(
run_id: str,
):
"""Using run_id to make pipeline runs unique"""
hello_world_task = hello_world(run_id=run_id)
hello_world_task
if __name__ == "__main__":
from pathlib import Path
from almanak_mlops.kfp import AlmanakKFPClient
import os
pipeline_yaml_location = str(
Path(__file__).parent / f"{PIPELINE_NAME}.pipeline.yaml"
)
client = AlmanakKFPClient(
host=os.environ["KFP_HOST"],
client_id=os.environ["OAUTH_CLIENT_ID"],
)
client.compile_and_upload_pipeline(
hello_world_pipeline, pipeline_yaml_location, PIPELINE_NAME
)
Using the Almanak KFP SDK to trigger a KFP pipeline run:
client = AlmanakKFPClient(
host=os.environ["KFP_HOST"],
client_id=os.environ["OAUTH_CLIENT_ID"],
)
"""List all pipelines and their data"""
print(client.get_pipelines())
"""For a given pipeline, list all versions and their data"""
print(client.get_all_pipeline_versions_by_name("hello-world-rx"))
"""Call a pipeline by name and optionally a version"""
run: ApiRun = client.run_pipeline_by_name(
pipeline_name="hello-world-rx",
job_name=f"hello-world-rx-{uuid4().hex}",
pipeline_parameters={"run_id": uuid4().hex},
)
run_id: str = run.id
"""Get the status of a pipeline run by name and optionally a version"""
retrieved_run: ApiRun = client.get_run(run_id)
print(retrieved_run)
Pipeline Documentation API
The Almanak KFP SDK includes an API for documenting the structure and parameters of KFP pipelines. This documentation API leverages Pydantic models and JSON Schema to define and describe both external and internal parameters.
Table of Contents
- Introduction
- JSONSchema Generation
- Internal and External Metadata
- Generate Pipeline Description
- Example Usage
Introduction
The pipeline documentation API enables developers to define detailed metadata for pipelines, including parameter descriptions, default values, sample values, and data types. This information can be used to generate human-readable documentation, as well as to guide the user interface of the frontend applications that interact with the pipelines.
JSONSchema Generation
Overview
JSONSchema is used to describe the structure of the external metadata. It can be generated from a Pydantic model using the schema_json()
method.
Example:
from pydantic import BaseModel
class MyModel(BaseModel):
id: int
name: str
is_active: Optional[bool]
external_metadata_schema = MyModel.schema_json()
external_metadata = JSONSchema(external_metadata_schema)
Internal and External Metadata
Overview
Pipelines can have both internal and external metadata.
- Internal Metadata: Defines parameters used by the backend. Created using
PipelineInternalParameterMetadata
. - External Metadata: Defines parameters to be displayed on the frontend. Created using
PipelineExternalParameterMetadata
or a JSONSchema derived from a Pydantic model.
Internal Metadata
from typing import Optional
class PipelineInternalParameterMetadata(BaseModel):
argument_name: str
datatype: str = "string"
usage_description: str
sample_value: Optional[str]
default_value: Optional[str]
External Metadata
class PipelineExternalParameterMetadata(BaseModel):
argument_name: str
display_name: str
usage_description: str
datatype: str = "string"
sample_value: Optional[str]
default_value: Optional[str]
Generate Pipeline Description
The function generate_pipeline_description
combines internal and external metadata into a JSON-formatted description.
def generate_pipeline_description(PipelineMetadata: PipelineMetadata):
return json.dumps(PipelineMetadata.dict())
Example Usage
Here's how you can combine all of these elements into a full pipeline description:
external_metadata_schema = MyModel.schema_json()
external_metadata = JSONSchema(external_metadata_schema)
internal_metadata = [
PipelineInternalParameterMetadata(
argument_name="namespace",
default_value="rx",
usage_description="The kfp namespace to use for the simulation",
),
]
pipeline_metadata = PipelineMetadata(
external_metadata=external_metadata,
internal_metadata=internal_metadata,
is_public_pipeline=True,
)
pipeline_description = generate_pipeline_description(pipeline_metadata)