Skip to main content

Feature Store Playbook using MLOps SDK


Feature Store

The Feature Store enables Data Scientists to:

  1. Understand existing features that are present → Reuse existing features for model experimentation & avoiding duplicate feature creation where possible
  2. Create new features for use in model experimentation → Quickly register data sources and sql queries to generate new features during model experimentation and iteration
  3. Bring over features alongside the production models → No need to recreate features again for use with the production model and environment
  4. Track features used by models through wandb integration → Proper feature versioning for models

Feast

The MLOps SDK creates convenience APIs around Feast and abstracts away a lot of the setup required. The preferred way of interacting with the feature store is through the MLOps SDK. See the Feature Store Playbook using the MLOps SDK

The Almanak Feature Store is built upon Feast. Key concepts in Feast:
https://docs.feast.dev/getting-started/concepts/overview

https://docs.feast.dev/getting-started/concepts/data-ingestion

https://docs.feast.dev/getting-started/concepts/entity

https://docs.feast.dev/getting-started/concepts/feature-view

https://docs.feast.dev/getting-started/concepts/feature-retrieval#concepts

https://docs.feast.dev/getting-started/concepts/point-in-time-joins

This playbook will demonstrate the recommended way of using the MLOps SDK for the following interactions with the feature store:

Entities

An entity is a collection of semantically related features.

Features

A feature is an individual measurable property observed on an entity. For example, a feature of a customer entity could be the number of transactions they have made on an average month.

To formalise features, Data Science will submit the data sources and queries/transformations required to create the features to Data Engineering. These features will be registered into DBT and version controlled.

Features are primarily categorised into experimental features and production features.

  • Experimental features are features created during model experimentation.
    • They are registered to the feature store directly by the data scientists using the MLOps SDK and should not be treated as permanent features.
  • Production features are features that have been determined to be useful during experimentation and are to be formalised for use in training of production models.
    • A production feature may have a change in data source when moving from experimentation to production. E.g. The training data was coming from BigQuery, but in a simulation, it would be generated dynamically during the simulation. In these scenarios, the production feature would have its data source switched over to a FEAST PushSource, but still retain its original feature views and feature service. The new data source should have a schema that is legal with the feature view schema.

Feature View

A feature view represents a logical group of feature data from a data source

Feature Service

Use a feature service as the primary reference to features in model training and inference.

The feature data defined in a feature service should be the final result that is used to directly train a model. If there are still further transformations still being done with the retrieved features from a feature service, these transformations should be done before the data enters the feature store. The outputs of the feature store should not be used as an input for further data transformation

A feature service is an object that represents a logical group of features from one or more feature views. Feature Services allows features from within a feature view to be used as needed by a model. Users can expect to create one feature service per model, allowing for tracking of the features used by models.

Feature services are used during

  • The generation of training datasets when querying feature views in order to find historical feature values. A single training dataset may consist of features from multiple feature views.
  • Retrieval of features from the online store. The features retrieved from the online store may also belong to multiple feature views.

Configuring the MLOps SDK to access the Feature Store APIs

Installing the SDK

  1. Git clone the MLOps SDK repo into your working directory https://github.com/almanak-co/almanak-mlops
    cd your/notebook/working/directory
    git clone https://github.com/almanak-co/almanak-mlops
  2. Install the sdk with python setup.py install

Installing credentials to interact with the SDK

The SDK uses google application credentials to access the feature store cloud components

  1. In 1Password, enter the Almanak Team Vault . Download the github-actions-airflow google application credentials [Data Science] json key to your local directory and take note of its path.
  2. Set the bash variable GOOGLE_APPLICATION_CREDENTIALS to your local key file path. The SDK will pick this up.
    export GOOGLE_APPLICATION_CREDENTIALS=/path/to/the/key/file

Interacting with the Feature Store

from almanak_mlops.feature_store import AlmanakFeatureStore

store = AlmanakFeatureStore()

The AlmanakFeatureStore object has the APIs that will be used to interact with the feature store. It should be constructed with no parameters provided. It inherits from the Feast FeatureStore object

It has the following additional methods:

Feature Store UI launcher

This launches the Feast UI for browsing the contents of the feature store.

This API will not work in a notebook as it launches a local web server (FastAPI) that serves the FEAST UI as a webpage. Instead, run it as a script executed in your local machine’s shell.

AlmanakFeatureStore().launch_ui()

Creating feature views

Create an experimental feature view from a BigQuery sql query

from feast import Entity, Field

AlmanakFeatureStore().create_feature_view(
feature_view_name: str,
select_sql: str,
entities: List[Entity],
timestamp_field: str,
schema: Optional[List[Field]],
owner: str,
description: str = None,
tags: Dict[str, str] = None,
)

Create a feature set from a feature view or service and return it as a Ray Dataset for model training

Ray Dataset is the recommended DataFrame format for model serving scalability reasons

AlmanakFeatureStore().materialize_to_ray_dataset(
entity_df: pd.DataFrame,
features: Union[List[str], FeatureService],
) -> ray.data.Dataset:

Creating a feature set as a Polars DataFrame

Ray Dataset is the recommended DataFrame format for model serving scalability reasons

AlmanakFeatureStore().materialize_to_polars(
self,
entity_df: pd.DataFrame,
features: Union[List[str], FeatureService],
) -> polars.DataFrame

Logging training features and trained model to wandb

AlmanakFeatureStore().log_model_and_feature_service(
wandb: ModuleType,
run: wandb.wandb_run.Run,
model: Union[onnx.ModelProto, RayModel],
feature_service: FeatureService,
entity_df: pd.DataFrame,
model_output_description: str,
)

Examining an existing training feature wandb artifact to see the original feature service, feature view and data sources involved

Providing the path to the pickle in the training feature artifact:

from typing import Dict, Any, List

from feast import FeatureService, FeatureView
from almanak_mlops.feature_store import AlmanakFeatureStore

(
entity_records,
feature_service,
feature_views,
) = AlmanakFeatureStore().retrieve_training_features_from_artifact_pickle("")
entity_records: Dict[str, Any]
feature_service: FeatureService
feature_views: List[FeatureView]

print(f"Feature service: {feature_service}")
print(f"Entity records: {entity_records}")
for feature_view in feature_views:
print(
f"""Feature view name: {feature_view.name}
Feature view source: {feature_view.batch_source}
"""
)

Triggering materialisation of features into the online store for use by production models

This API should be invoked by the model serving environment. It should not be called explicitly by a user or the simulation.

AlmanakFeatureStore().trigger_materialization()

Viewing existing features

There are two places to view existing features:

  1. DAAS
    1. Production features (DBT will generate the feature tables)
    2. Materialised experimental offline features
  2. Feast UI https://almanak.atlassian.net/wiki/spaces/MLEO/pages/edit-v2/239239171#Feature-Store-UI-launcher
    1. Data sources, Data Entities, Feature views, Feature Services

Creating experimental features for model experimentation

See this as a script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/create_experimental_feature_in_notebook.py

There are numerous key ways to setup experimental features during model experimentation:

Creating a feature view directly from a BigQuery SQL statement

  1. Import the following and instantiate the feature_store
    from almanak_mlops.feature_store import AlmanakFeatureStore
    from feast import Field, Entity
    from feast import types
    feature_store = AlmanakFeatureStore()
  2. Create a feature entity
    from_address_entity = Entity(name="from_address", join_keys=["from_address"])
  3. Define the sql query
    sql_query = """
    SELECT
    from_address,
    to_address,
    gas,
    block_timestamp,
    block_number,
    FROM
    almanak-production.crypto_ethereum.transactions
    LIMIT
    100
    ;
    """

The query result must have a timestamp field. If a timestamp field is not required in the feature, a placeholder timestamp column should be used. You can add it as an additional column as part of your query.
E.g. SELECT *, TIMESTAMP('2000-01-01 00:00:00 UTC') AS `PLACEHOLDER_TIMESTAMP`, FROM `your_table`;

The sql query used to define a data source provides a degree of freedom to perform some necessary transformations to query the table such that the queried data used as a Feast data source can work with entities and event timestamps for Feast’s point in time joins with other Feature Views directly in a Feature Service directly.

For example, a query used for the data source can be fashioned such that the last 100 records is always retrieved from a table that has no monotonically increasing columnar values.

WITH
numbered_rows AS (
SELECT
*,
ROW_NUMBER() OVER () AS row_number
FROM
`almanak-production.presentation_layer.uniswap_v3_swaps` )
SELECT
*,
'ETH' AS token
FROM
numbered_rows
ORDER BY
row_number DESC
LIMIT
100

4. Call the create_feature_view API on the feature store. This will automatically generate and register the data source, entity and feature view into the feature store.
The schema of the sql result should be defined in the schema parameter with the feast.Field object. The types should be described with feast.types objects.

The schema of the feature view can be a subset of the schema of the data source it is using

If the timestamp field of the BigQuerySource data source (defined in the previous step) uses placeholder values, exclude that timestamp field from the schema

The MLOps SDK includes a BigQuery query cost estimator that can wrap around the query string to estimate the cost of the query and warn you if the cost is too high.

def estimate_query_cost(query: str, query_size_limit_GB=10) -> str:
from almanak_mlops.bq import estimate_query_cost
feature_store.create_feature_view(
feature_view_name="eth_transactions_view_from_query_v1",
select_sql=estimate_query_cost(sql_query),
entities=[from_address_entity],
timestamp_field="block_timestamp",
schema=[
Field(name="from_address", dtype=types.String),
Field(name="to_address", dtype=types.String),
Field(name="gas", dtype=types.Int64),
Field(name="block_timestamp", dtype=types.UnixTimestamp), # exclude this timestamp field if it uses placeholder values
Field(name="block_number", dtype=types.Int64),
],
owner="rx@almanak.co",
description="A feature view of ethereum transactions",
tags={"production": "False"},
)

Retrieve existing data sources, entities, feature views and compose them into new feature views and services

  1. Using the Feast UI, you can see a list of existing definitions in the feature store:
  2. Alternatively, you can also list them via these APIs on the feature store
    from almanak_mlops.feature_store import AlmanakFeatureStore
    feature_store = AlmanakFeatureStore()
    feature_store.list_data_sources()
    feature_store.list_entities()
    feature_store.list_feature_views()
    feature_store.list_feature_services()
  3. To retrieve an existing definition for use, invoke the appropriate API
    from feast import BigQuerySource, Entity, FeatureView, FeatureService
    source: BigQuerySource = feature_store.get_data_source(name="eth_transactions_source")
    from_address_entity: Entity = feature_store.get_entity(name="from_address")
    transactions_feature_view: FeatureView = feature_store.get_feature_view(
    name="eth_transactions_view_v1"
    )
    transactions_feature_service: FeatureService = feature_store.get_feature_service(
    name="eth_transactions_feature_service_v1"
    )

4. Use an existing resource to create a new feature view or service

from almanak_mlops.feature_store import AlmanakFeatureStore
from feast import types
from feast import Field
from datetime import timedelta

feature_store = AlmanakFeatureStore()

from feast import BigQuerySource, Entity, FeatureView, FeatureService

eth_transactions_source: BigQuerySource = feature_store.get_data_source(
name="eth_transactions_source"
)
from_address_entity: Entity = feature_store.get_entity(name="from_address")

eth_transactions_features = FeatureView(
name="eth_transactions_view_v1",
source=eth_transactions_source,
schema=[
Field(name="from_address", dtype=types.String),
Field(name="to_address", dtype=types.String),
Field(name="gas", dtype=types.Int64),
Field(name="block_timestamp", dtype=types.UnixTimestamp),
Field(name="block_number", dtype=types.Int64),
],
ttl=timedelta(days=1),
entities=[from_address_entity],
description="A feature view of ethereum transactions",
tags={"production": "False"},
owner="rx@almanak.co",
)

transactions_feature_service = FeatureService(
name="transactions_service_v1",
features=[eth_transactions_features],
)

5. Apply the experimental feature and service to the feature store

feature_store.apply(objects=[eth_transactions_features, transactions_feature_service])

6. To clean up any resources that are no longer required, use feast apply on them in the following manner

feature_store.apply(objects_to_delete=[eth_transactions_features, transactions_feature_service])

Define new data sources, entities, feature views and entities

  1. Defining the resource

    1. Data source
      Note that the query result/bigquery table must have a timestamp column. This is a requirement for feast data sources. If a table does not have a timestamp column, you can create one with a placeholder value.

      from feast import BigQuerySource
      eth_transactions_source = BigQuerySource(
      name="eth_transactions",
      table="almanak-production.crypto_ethereum.transactions",
      timestamp_field="block_timestamp",
      description="A table listing ethereum transactions",
      owner="rx@almanak.co",
      )

      Alternatively, you can also define a data source from a SQL query.

      from feast import BigQuerySource
      from almanak_mlops.bq import estimate_query_cost

      source_query = "SELECT * FROM almanak-production.crypto_ethereum.transactions"
      eth_transactions_source = BigQuerySource(
      name="eth_transactions",
      query=estimate_query_cost(query=source_query, query_size_limit_GB=2),
      timestamp_field="block_timestamp",
      description="A table listing ethereum transactions",
      owner="rx@almanak.co",
      )
    2. Entity

      from feast import Entity
      from_address_entity = Entity(name="from_address", join_keys=["from_address"])
    3. Feature View

      from feast import FeatureView, Field
      from feast import types
      from datetime import timedelta
      eth_transactions_features = FeatureView(
      name="eth_transactions_view_v1",
      source=eth_transactions_source,
      schema=[
      Field(name="hash", dtype=types.String),
      Field(name="from_address", dtype=types.String),
      Field(name="to_address", dtype=types.String),
      Field(name="gas", dtype=types.Int64),
      Field(name="block_timestamp", dtype=types.UnixTimestamp),
      Field(name="block_number", dtype=types.Int64),
      ],
      ttl=timedelta(days=1),
      entities=[from_address_entity],
      description="A feature view of ethereum transactions",
      tags={"production": "False"},
      owner="rx@almanak.co",
      )
    4. Feature Service

      from feast import FeatureService
      transactions_feature_service = FeatureService(
      name="transactions_service_v1",
      features=[eth_transactions_features],
      )
  2. Applying the new resources to the feature store

    from almanak_mlops.feature_store import AlmanakFeatureStore
    AlmanakFeatureStore().apply(
    [
    eth_transactions_source,
    from_address_entity,
    eth_transactions_features,
    transactions_feature_service,
    ]
    )

Training models with experimental features

See this as a script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/demo_fetch_contract_features.py

From existing feature views or services, a feature view set will be created as the training data for the model.

  1. From within the model training environment, use the feature store API. Instantiate a feature store object to interact with the feature store
    store = AlmanakFeatureStore()
  2. Create an entity df to create a feature view set from a feature view.
    It should include a timestamp column to provide a time range for the feature view set to be created from the feature view.
    entity_df = pd.DataFrame.from_dict(
    {
    "address": [
    "0xb793c026a29b5aab9a4c02d4a84f5fdeb697ad73",
    "0x239f30aa3e17d352bfefede4c379e9d744538a00",
    "0x08ed8ef6ca0bf831ef952a51bc2ff2568e49100e",
    "0x3f8312f04db82bb0abe0826c701566a227c8deb3",
    "0x7420c6120340e9da3779d154d4f4577ec2ec8eca",
    "0xd48b1f39e70184d3397e195b00faa84e5788fd1b",
    "0xe2edecf85f8abee17c39e83e62bd36c09676bac6",
    "0xb4bc610617a7efbdfdcd946fbe400e6c4253232b",
    "0x9079fbcda2ba70047241e0ff6120ff5fa45355f6",
    "0xb324e697a4e1122d023a2b690b9fc1d14b058ed4",
    ],
    }
    )
    entity_df["event_timestamp"] = pd.to_datetime("2023-03-09", utc=True)
  3. Retrieve the feature view or service
    feature_service = store.get_feature_service("eth_contracts_service_v1")
  4. Pass the entity df and feature view or service to the feature store to retrieve the features as a ray dataset. Use this ray dataset as the training input.
    feature_config_input = entity_df, feature_service
    training_features: ray.data.Dataset = store.materialize_to_ray_dataset(
    entity_df, feature_service
    )

Register the feature_config_input to wandb as the feature configuration associated with the trained model

5. Registering the feature view set configuration (feature_config_input) into wandb

Use the procedure described in WANDB PlayBook

import wandb
import pickle

# Initialize W&B run
wandb.init(project="your_project_name", config=config)

# your code for creating features and training the model goes here

# store = AlmanakFeatureStore()

store.log_training_features_as_artifact(
wandb,
entity_records,
feature_service,
name="Training features name",
type"training_features",
description="your description here",
metadata="{"version": "your feature view config version"}",
incremental=,
use_as,
)

Creating production features for model serving

  1. With the finalised query and entity df in hand, Data Science team will submit a request to Data Engineering team to register a production feature.
  2. The query will be registered in DBT by Data Engineering and placed on a regular schedule to generate a production feature table in BigQuery.
  3. Data Engineering will register the following in the feature repo and create a pull request on the feature repo
    1. Production feature table: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/data_sources.py
    2. Entity DF: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/entities.py
    3. Feature view using the production feature table and feature entity: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/features.py
    4. Feature service using the feature view: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/feature_services.py
  4. Upon merging of the pull request, the CICD on this feature repo will add the production feature to the feature store and it is now accessible via the MLOps SDK.
  5. Data Science can now look into the feature store UI or SDK to confirm the production feature is now present.

If a model was trained and registered with an experimental feature service, when that experimental feature is promoted as a production feature, the name of that production feature service must be identical to that of the experimental feature service. That feature service name will be used by the model serving environment to retrieve the production online features

Using production features with simulations and production models

The procedure in this section has been updated to reflect the change of the Almanak Broker’s business logic

An important context here is

Step 1: Retrieving models and feature references

The simulation will provide the model serving environment with a collection of WandB model artifacts references.

In each artifact, the feature service (which contains the feature references) used by the model will be encoded as feature metadata in the artifact

The model serving environment will download the artifacts to retrieve the model and the corresponding feature service reference

Step 2: Identifying features views contained in a feature service

The model serving environment will retrieve the feature service object from the feature store using the feature service reference.

From the feature service object, the feature views reference and features reference used can be obtained

Step 3: Materializing online features

The model serving environment will trigger materialization of the features with the feature views reference and features reference into the online store.

Step 4:

See this as a script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/demo_almanak_feature_store.py

Trigger feature materialisation to the online store prior to a simulation run to ensure feature data is updated.

Examining a FeatureService to determine the feature views and data sources it contains

This is useful when looking at an arbitrary experimental feature logged in wandb as the training feature of a model, in order to understand how the training feature was derived.
Get this script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/examine_feature_view_and_data_source_from_a_feature_service_object.py

from almanak_mlops.feature_store import AlmanakFeatureStore
from feast import FeatureService, FeatureView
from feast.feature_view_projection import FeatureViewProjection

store = AlmanakFeatureStore()

eth_feature_service: FeatureService = store.get_feature_service(
"eth_contracts_service_v1"
)
feature_view_projections: FeatureViewProjection = (
eth_feature_service.feature_view_projections
)

for feature_view_projection in feature_view_projections:
feature_view: FeatureView = store.get_feature_view(feature_view_projection.name)
feature_view_source = feature_view.batch_source
print(
f"""Feature view projection:{feature_view_projection}
Feature view name: {feature_view.name}
Feature view entities: {feature_view.entities}
Features in feature view: {feature_view.features}
Feature view source: {feature_view_source}
"""
)

Output:

Feature view projection:FeatureViewProjection(name='eth_contracts_view_v1', name_alias='', desired_features=[], features=[block_timestamp-UnixTimestamp, block_number-Int64, block_hash-String], join_key_map={})
Feature view name: eth_contracts_view_v1
Feature view entities: ['contract_address']
Features in feature view: [block_timestamp-UnixTimestamp, block_number-Int64, block_hash-String]
Feature view source: {
"type": "BATCH_BIGQUERY",
"timestampField": "block_timestamp",
"bigqueryOptions": {
"table": "almanak-production.crypto_ethereum.contracts"
},
"name": "eth_contracts_source",
"description": "A table listing ethereum contracts",
"owner": "rx@almanak.co"
}