Kubeflow Pipelines Catalog
In the mlops pipelines repository, the src/update_catalog_to_db.py
module retrieves all existing kubeflow pipelines and their versions and builds it into a JSON catalog. This JSON catalog is pushed to Alloy DB in both prod and staging environments to be exposed over the backend Hasura.
JSON Catalog Data Model
The data model of the JSON catalog is as described below
from typing import Optional, Dict, List
from datetime import datetime
from pydantic import BaseModel, Field
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field, SQLModel
class ResourceReference(BaseModel):
"""Model for a resource reference in a pipeline version of a pipeline in the pipelines catalog"""
key: Dict[str, str]
name: Optional[str]
relationship: str
class PipelineVersion(BaseModel):
"""Model for a pipeline version of a pipeline in the pipelines catalog"""
id: str
name: str
created_at: datetime
parameters: Optional[List[Dict[str, Optional[str]]]]
code_source_url: Optional[str]
package_url: Optional[str]
resource_references: List[ResourceReference]
description: Optional[str]
class Pipeline(BaseModel):
"""Model for a pipeline in the pipelines catalog"""
default_version_id: str
pipeline_versions: Dict[str, PipelineVersion]
class PipelinesCatalog(SQLModel, table=True):
"""SQLModel for the pipelines catalog table. The catalog is a map of all pipelines present in KFP, including their versions.
The dict value of the catalog is a Pipeline model, which contains a dict of PipelineVersion models.
"""
__tablename__ = "pipelines_catalog"
id: Optional[int] = Field(default=None, primary_key=True)
catalog: Dict[str, dict] = Field(sa_column=Column(JSONB))
created_at: datetime = Field(default_factory=datetime.utcnow)