Feature Store Playbook using MLOps SDK
Feature Store
The Feature Store enables Data Scientists to:
- Understand existing features that are present → Reuse existing features for model experimentation & avoiding duplicate feature creation where possible
- Create new features for use in model experimentation → Quickly register data sources and sql queries to generate new features during model experimentation and iteration
- Bring over features alongside the production models → No need to recreate features again for use with the production model and environment
- Track features used by models through wandb integration → Proper feature versioning for models
Feast
The MLOps SDK creates convenience APIs around Feast and abstracts away a lot of the setup required. The preferred way of interacting with the feature store is through the MLOps SDK. See the Feature Store Playbook using the MLOps SDK
The Almanak Feature Store is built upon Feast. Key concepts in Feast:
https://docs.feast.dev/getting-started/concepts/overview
https://docs.feast.dev/getting-started/concepts/data-ingestion
https://docs.feast.dev/getting-started/concepts/entity
https://docs.feast.dev/getting-started/concepts/feature-view
https://docs.feast.dev/getting-started/concepts/feature-retrieval#concepts
https://docs.feast.dev/getting-started/concepts/point-in-time-joins
This playbook will demonstrate the recommended way of using the MLOps SDK for the following interactions with the feature store:
Entities
An entity is a collection of semantically related features.
Features
A feature is an individual measurable property observed on an entity. For example, a feature of a customer entity could be the number of transactions they have made on an average month.
To formalise features, Data Science will submit the data sources and queries/transformations required to create the features to Data Engineering. These features will be registered into DBT and version controlled.
Features are primarily categorised into experimental features and production features.
- Experimental features are features created during model experimentation.
- They are registered to the feature store directly by the data scientists using the MLOps SDK and should not be treated as permanent features.
- Production features are features that have been determined to be useful during experimentation and are to be formalised for use in training of production models.
- A production feature may have a change in data source when moving from experimentation to production. E.g. The training data was coming from BigQuery, but in a simulation, it would be generated dynamically during the simulation. In these scenarios, the production feature would have its data source switched over to a FEAST PushSource, but still retain its original feature views and feature service. The new data source should have a schema that is legal with the feature view schema.
Feature View
A feature view represents a logical group of feature data from a data source
Feature Service
Use a feature service as the primary reference to features in model training and inference.
The feature data defined in a feature service should be the final result that is used to directly train a model. If there are still further transformations still being done with the retrieved features from a feature service, these transformations should be done before the data enters the feature store. The outputs of the feature store should not be used as an input for further data transformation
A feature service is an object that represents a logical group of features from one or more feature views. Feature Services allows features from within a feature view to be used as needed by a model. Users can expect to create one feature service per model, allowing for tracking of the features used by models.
Feature services are used during
- The generation of training datasets when querying feature views in order to find historical feature values. A single training dataset may consist of features from multiple feature views.
- Retrieval of features from the online store. The features retrieved from the online store may also belong to multiple feature views.
Configuring the MLOps SDK to access the Feature Store APIs
Installing the SDK
- Git clone the MLOps SDK repo into your working directory https://github.com/almanak-co/almanak-mlops
cd your/notebook/working/directory
git clone https://github.com/almanak-co/almanak-mlops - Install the sdk with
python setup.py install
Installing credentials to interact with the SDK
The SDK uses google application credentials to access the feature store cloud components
- In 1Password, enter the
Almanak Team Vault
. Download thegithub-actions-airflow google application credentials [Data Science]
json key to your local directory and take note of its path. - Set the bash variable
GOOGLE_APPLICATION_CREDENTIALS
to your local key file path. The SDK will pick this up.export GOOGLE_APPLICATION_CREDENTIALS=/path/to/the/key/file
Interacting with the Feature Store
from almanak_mlops.feature_store import AlmanakFeatureStore
store = AlmanakFeatureStore()
The AlmanakFeatureStore object has the APIs that will be used to interact with the feature store. It should be constructed with no parameters provided. It inherits from the Feast FeatureStore object
It has the following additional methods:
Feature Store UI launcher
This launches the Feast UI for browsing the contents of the feature store.
This API will not work in a notebook as it launches a local web server (FastAPI) that serves the FEAST UI as a webpage. Instead, run it as a script executed in your local machine’s shell.
AlmanakFeatureStore().launch_ui()
Creating feature views
Create an experimental feature view from a BigQuery sql query
from feast import Entity, Field
AlmanakFeatureStore().create_feature_view(
feature_view_name: str,
select_sql: str,
entities: List[Entity],
timestamp_field: str,
schema: Optional[List[Field]],
owner: str,
description: str = None,
tags: Dict[str, str] = None,
)
Create a feature set from a feature view or service and return it as a Ray Dataset for model training
Ray Dataset is the recommended DataFrame format for model serving scalability reasons
AlmanakFeatureStore().materialize_to_ray_dataset(
entity_df: pd.DataFrame,
features: Union[List[str], FeatureService],
) -> ray.data.Dataset:
Creating a feature set as a Polars DataFrame
Ray Dataset is the recommended DataFrame format for model serving scalability reasons
AlmanakFeatureStore().materialize_to_polars(
self,
entity_df: pd.DataFrame,
features: Union[List[str], FeatureService],
) -> polars.DataFrame
Logging training features and trained model to wandb
AlmanakFeatureStore().log_model_and_feature_service(
wandb: ModuleType,
run: wandb.wandb_run.Run,
model: Union[onnx.ModelProto, RayModel],
feature_service: FeatureService,
entity_df: pd.DataFrame,
model_output_description: str,
)
Examining an existing training feature wandb artifact to see the original feature service, feature view and data sources involved
Providing the path to the pickle in the training feature artifact:
from typing import Dict, Any, List
from feast import FeatureService, FeatureView
from almanak_mlops.feature_store import AlmanakFeatureStore
(
entity_records,
feature_service,
feature_views,
) = AlmanakFeatureStore().retrieve_training_features_from_artifact_pickle("")
entity_records: Dict[str, Any]
feature_service: FeatureService
feature_views: List[FeatureView]
print(f"Feature service: {feature_service}")
print(f"Entity records: {entity_records}")
for feature_view in feature_views:
print(
f"""Feature view name: {feature_view.name}
Feature view source: {feature_view.batch_source}
"""
)
Triggering materialisation of features into the online store for use by production models
This API should be invoked by the model serving environment. It should not be called explicitly by a user or the simulation.
AlmanakFeatureStore().trigger_materialization()
Viewing existing features
There are two places to view existing features:
- DAAS
- Production features (DBT will generate the feature tables)
- Materialised experimental offline features
- Feast UI https://almanak.atlassian.net/wiki/spaces/MLEO/pages/edit-v2/239239171#Feature-Store-UI-launcher
- Data sources, Data Entities, Feature views, Feature Services
Creating experimental features for model experimentation
See this as a script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/create_experimental_feature_in_notebook.py
There are numerous key ways to setup experimental features during model experimentation:
Creating a feature view directly from a BigQuery SQL statement
- Import the following and instantiate the feature_store
from almanak_mlops.feature_store import AlmanakFeatureStore
from feast import Field, Entity
from feast import types
feature_store = AlmanakFeatureStore() - Create a feature entity
from_address_entity = Entity(name="from_address", join_keys=["from_address"])
- Define the sql query
sql_query = """
SELECT
from_address,
to_address,
gas,
block_timestamp,
block_number,
FROM
almanak-production.crypto_ethereum.transactions
LIMIT
100
;
"""
The query result must have a timestamp field. If a timestamp field is not required in the feature, a placeholder timestamp column should be used. You can add it as an additional column as part of your query.
E.g. SELECT *, TIMESTAMP('2000-01-01 00:00:00 UTC') AS `PLACEHOLDER_TIMESTAMP`, FROM `your_table`;
The sql query used to define a data source provides a degree of freedom to perform some necessary transformations to query the table such that the queried data used as a Feast data source can work with entities and event timestamps for Feast’s point in time joins with other Feature Views directly in a Feature Service directly.
For example, a query used for the data source can be fashioned such that the last 100 records is always retrieved from a table that has no monotonically increasing columnar values.
WITH
numbered_rows AS (
SELECT
*,
ROW_NUMBER() OVER () AS row_number
FROM
`almanak-production.presentation_layer.uniswap_v3_swaps` )
SELECT
*,
'ETH' AS token
FROM
numbered_rows
ORDER BY
row_number DESC
LIMIT
100
4. Call the create_feature_view
API on the feature store. This will automatically generate and register the data source, entity and feature view into the feature store.
The schema of the sql result should be defined in the schema parameter with the feast.Field object. The types should be described with feast.types objects.
The schema of the feature view can be a subset of the schema of the data source it is using
If the timestamp field of the BigQuerySource
data source (defined in the previous step) uses placeholder values, exclude that timestamp field from the schema
The MLOps SDK includes a BigQuery query cost estimator that can wrap around the query string to estimate the cost of the query and warn you if the cost is too high.
def estimate_query_cost(query: str, query_size_limit_GB=10) -> str:
from almanak_mlops.bq import estimate_query_cost
feature_store.create_feature_view(
feature_view_name="eth_transactions_view_from_query_v1",
select_sql=estimate_query_cost(sql_query),
entities=[from_address_entity],
timestamp_field="block_timestamp",
schema=[
Field(name="from_address", dtype=types.String),
Field(name="to_address", dtype=types.String),
Field(name="gas", dtype=types.Int64),
Field(name="block_timestamp", dtype=types.UnixTimestamp), # exclude this timestamp field if it uses placeholder values
Field(name="block_number", dtype=types.Int64),
],
owner="rx@almanak.co",
description="A feature view of ethereum transactions",
tags={"production": "False"},
)
Retrieve existing data sources, entities, feature views and compose them into new feature views and services
- Using the Feast UI, you can see a list of existing definitions in the feature store:
- Alternatively, you can also list them via these APIs on the feature store
from almanak_mlops.feature_store import AlmanakFeatureStore
feature_store = AlmanakFeatureStore()
feature_store.list_data_sources()
feature_store.list_entities()
feature_store.list_feature_views()
feature_store.list_feature_services() - To retrieve an existing definition for use, invoke the appropriate API
from feast import BigQuerySource, Entity, FeatureView, FeatureService
source: BigQuerySource = feature_store.get_data_source(name="eth_transactions_source")
from_address_entity: Entity = feature_store.get_entity(name="from_address")
transactions_feature_view: FeatureView = feature_store.get_feature_view(
name="eth_transactions_view_v1"
)
transactions_feature_service: FeatureService = feature_store.get_feature_service(
name="eth_transactions_feature_service_v1"
)
4. Use an existing resource to create a new feature view or service
from almanak_mlops.feature_store import AlmanakFeatureStore
from feast import types
from feast import Field
from datetime import timedelta
feature_store = AlmanakFeatureStore()
from feast import BigQuerySource, Entity, FeatureView, FeatureService
eth_transactions_source: BigQuerySource = feature_store.get_data_source(
name="eth_transactions_source"
)
from_address_entity: Entity = feature_store.get_entity(name="from_address")
eth_transactions_features = FeatureView(
name="eth_transactions_view_v1",
source=eth_transactions_source,
schema=[
Field(name="from_address", dtype=types.String),
Field(name="to_address", dtype=types.String),
Field(name="gas", dtype=types.Int64),
Field(name="block_timestamp", dtype=types.UnixTimestamp),
Field(name="block_number", dtype=types.Int64),
],
ttl=timedelta(days=1),
entities=[from_address_entity],
description="A feature view of ethereum transactions",
tags={"production": "False"},
owner="rx@almanak.co",
)
transactions_feature_service = FeatureService(
name="transactions_service_v1",
features=[eth_transactions_features],
)
5. Apply the experimental feature and service to the feature store
feature_store.apply(objects=[eth_transactions_features, transactions_feature_service])
6. To clean up any resources that are no longer required, use feast apply on them in the following manner
feature_store.apply(objects_to_delete=[eth_transactions_features, transactions_feature_service])
Define new data sources, entities, feature views and entities
Defining the resource
Data source
Note that the query result/bigquery table must have a timestamp column. This is a requirement for feast data sources. If a table does not have a timestamp column, you can create one with a placeholder value.from feast import BigQuerySource
eth_transactions_source = BigQuerySource(
name="eth_transactions",
table="almanak-production.crypto_ethereum.transactions",
timestamp_field="block_timestamp",
description="A table listing ethereum transactions",
owner="rx@almanak.co",
)Alternatively, you can also define a data source from a SQL query.
from feast import BigQuerySource
from almanak_mlops.bq import estimate_query_cost
source_query = "SELECT * FROM almanak-production.crypto_ethereum.transactions"
eth_transactions_source = BigQuerySource(
name="eth_transactions",
query=estimate_query_cost(query=source_query, query_size_limit_GB=2),
timestamp_field="block_timestamp",
description="A table listing ethereum transactions",
owner="rx@almanak.co",
)Entity
from feast import Entity
from_address_entity = Entity(name="from_address", join_keys=["from_address"])Feature View
from feast import FeatureView, Field
from feast import types
from datetime import timedelta
eth_transactions_features = FeatureView(
name="eth_transactions_view_v1",
source=eth_transactions_source,
schema=[
Field(name="hash", dtype=types.String),
Field(name="from_address", dtype=types.String),
Field(name="to_address", dtype=types.String),
Field(name="gas", dtype=types.Int64),
Field(name="block_timestamp", dtype=types.UnixTimestamp),
Field(name="block_number", dtype=types.Int64),
],
ttl=timedelta(days=1),
entities=[from_address_entity],
description="A feature view of ethereum transactions",
tags={"production": "False"},
owner="rx@almanak.co",
)Feature Service
from feast import FeatureService
transactions_feature_service = FeatureService(
name="transactions_service_v1",
features=[eth_transactions_features],
)
Applying the new resources to the feature store
from almanak_mlops.feature_store import AlmanakFeatureStore
AlmanakFeatureStore().apply(
[
eth_transactions_source,
from_address_entity,
eth_transactions_features,
transactions_feature_service,
]
)
Training models with experimental features
See this as a script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/demo_fetch_contract_features.py
From existing feature views or services, a feature view set will be created as the training data for the model.
- From within the model training environment, use the feature store API. Instantiate a feature store object to interact with the feature store
store = AlmanakFeatureStore()
- Create an entity df to create a feature view set from a feature view.
It should include a timestamp column to provide a time range for the feature view set to be created from the feature view.entity_df = pd.DataFrame.from_dict(
{
"address": [
"0xb793c026a29b5aab9a4c02d4a84f5fdeb697ad73",
"0x239f30aa3e17d352bfefede4c379e9d744538a00",
"0x08ed8ef6ca0bf831ef952a51bc2ff2568e49100e",
"0x3f8312f04db82bb0abe0826c701566a227c8deb3",
"0x7420c6120340e9da3779d154d4f4577ec2ec8eca",
"0xd48b1f39e70184d3397e195b00faa84e5788fd1b",
"0xe2edecf85f8abee17c39e83e62bd36c09676bac6",
"0xb4bc610617a7efbdfdcd946fbe400e6c4253232b",
"0x9079fbcda2ba70047241e0ff6120ff5fa45355f6",
"0xb324e697a4e1122d023a2b690b9fc1d14b058ed4",
],
}
)
entity_df["event_timestamp"] = pd.to_datetime("2023-03-09", utc=True) - Retrieve the feature view or service
feature_service = store.get_feature_service("eth_contracts_service_v1")
- Pass the entity df and feature view or service to the feature store to retrieve the features as a ray dataset. Use this ray dataset as the training input.
feature_config_input = entity_df, feature_service
training_features: ray.data.Dataset = store.materialize_to_ray_dataset(
entity_df, feature_service
)
Register the feature_config_input
to wandb as the feature configuration associated with the trained model
5. Registering the feature view set configuration (feature_config_input) into wandb
Use the procedure described in WANDB PlayBook
import wandb
import pickle
# Initialize W&B run
wandb.init(project="your_project_name", config=config)
# your code for creating features and training the model goes here
# store = AlmanakFeatureStore()
store.log_training_features_as_artifact(
wandb,
entity_records,
feature_service,
name="Training features name",
type"training_features",
description="your description here",
metadata="{"version": "your feature view config version"}",
incremental=,
use_as,
)
Creating production features for model serving
- With the finalised query and entity df in hand, Data Science team will submit a request to Data Engineering team to register a production feature.
- The query will be registered in DBT by Data Engineering and placed on a regular schedule to generate a production feature table in BigQuery.
- Data Engineering will register the following in the feature repo and create a pull request on the feature repo
- Production feature table: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/data_sources.py
- Entity DF: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/entities.py
- Feature view using the production feature table and feature entity: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/features.py
- Feature service using the feature view: https://github.com/almanak-co/feature-registry/blob/main/feature_repo_europe_west4/feature_services.py
- Upon merging of the pull request, the CICD on this feature repo will add the production feature to the feature store and it is now accessible via the MLOps SDK.
- Data Science can now look into the feature store UI or SDK to confirm the production feature is now present.
If a model was trained and registered with an experimental feature service, when that experimental feature is promoted as a production feature, the name of that production feature service must be identical to that of the experimental feature service. That feature service name will be used by the model serving environment to retrieve the production online features
Using production features with simulations and production models
The procedure in this section has been updated to reflect the change of the Almanak Broker’s business logic
An important context here is
Step 1: Retrieving models and feature references
The simulation will provide the model serving environment with a collection of WandB model artifacts references.
In each artifact, the feature service (which contains the feature references) used by the model will be encoded as feature metadata in the artifact
The model serving environment will download the artifacts to retrieve the model and the corresponding feature service reference
Step 2: Identifying features views contained in a feature service
The model serving environment will retrieve the feature service object from the feature store using the feature service reference.
From the feature service object, the feature views reference and features reference used can be obtained
Step 3: Materializing online features
The model serving environment will trigger materialization of the features with the feature views reference and features reference into the online store.
Step 4:
See this as a script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/demo_almanak_feature_store.py
Trigger feature materialisation to the online store prior to a simulation run to ensure feature data is updated.
Examining a FeatureService to determine the feature views and data sources it contains
This is useful when looking at an arbitrary experimental feature logged in wandb as the training feature of a model, in order to understand how the training feature was derived.
Get this script here: https://github.com/almanak-co/feature-registry/blob/main/demo_clients/examine_feature_view_and_data_source_from_a_feature_service_object.py
from almanak_mlops.feature_store import AlmanakFeatureStore
from feast import FeatureService, FeatureView
from feast.feature_view_projection import FeatureViewProjection
store = AlmanakFeatureStore()
eth_feature_service: FeatureService = store.get_feature_service(
"eth_contracts_service_v1"
)
feature_view_projections: FeatureViewProjection = (
eth_feature_service.feature_view_projections
)
for feature_view_projection in feature_view_projections:
feature_view: FeatureView = store.get_feature_view(feature_view_projection.name)
feature_view_source = feature_view.batch_source
print(
f"""Feature view projection:{feature_view_projection}
Feature view name: {feature_view.name}
Feature view entities: {feature_view.entities}
Features in feature view: {feature_view.features}
Feature view source: {feature_view_source}
"""
)
Output:
Feature view projection:FeatureViewProjection(name='eth_contracts_view_v1', name_alias='', desired_features=[], features=[block_timestamp-UnixTimestamp, block_number-Int64, block_hash-String], join_key_map={})
Feature view name: eth_contracts_view_v1
Feature view entities: ['contract_address']
Features in feature view: [block_timestamp-UnixTimestamp, block_number-Int64, block_hash-String]
Feature view source: {
"type": "BATCH_BIGQUERY",
"timestampField": "block_timestamp",
"bigqueryOptions": {
"table": "almanak-production.crypto_ethereum.contracts"
},
"name": "eth_contracts_source",
"description": "A table listing ethereum contracts",
"owner": "rx@almanak.co"
}