Skip to main content

Kubeflow Pipelines Catalog

In the mlops pipelines repository, the src/update_catalog_to_db.py module retrieves all existing kubeflow pipelines and their versions and builds it into a JSON catalog. This JSON catalog is pushed to Alloy DB in both prod and staging environments to be exposed over the backend Hasura.

JSON Catalog Data Model

The data model of the JSON catalog is as described below

from typing import Optional, Dict, List
from datetime import datetime

from pydantic import BaseModel, Field
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field, SQLModel


class ResourceReference(BaseModel):
"""Model for a resource reference in a pipeline version of a pipeline in the pipelines catalog"""

key: Dict[str, str]
name: Optional[str]
relationship: str


class PipelineVersion(BaseModel):
"""Model for a pipeline version of a pipeline in the pipelines catalog"""

id: str
name: str
created_at: datetime
parameters: Optional[List[Dict[str, Optional[str]]]]
code_source_url: Optional[str]
package_url: Optional[str]
resource_references: List[ResourceReference]
description: Optional[str]


class Pipeline(BaseModel):
"""Model for a pipeline in the pipelines catalog"""

default_version_id: str
pipeline_versions: Dict[str, PipelineVersion]


class PipelinesCatalog(SQLModel, table=True):
"""SQLModel for the pipelines catalog table. The catalog is a map of all pipelines present in KFP, including their versions.

The dict value of the catalog is a Pipeline model, which contains a dict of PipelineVersion models.
"""

__tablename__ = "pipelines_catalog"
id: Optional[int] = Field(default=None, primary_key=True)
catalog: Dict[str, dict] = Field(sa_column=Column(JSONB))
created_at: datetime = Field(default_factory=datetime.utcnow)