Almanak MLOps Playbook
Overview of MLOps
Machine Learning Operations (MLOps) is a set of practices designed to streamline the process of deploying, monitoring, and maintaining machine learning models in production environments. This playbook provides a guide on implementing MLOps, focusing on model and feature lifecycle management.
Goals and Objectives
- Facilitate collaboration between data scientists, machine learning engineers and stakeholders.
- Streamline the model and feature lifecycle, from creation to deployment.
- Ensure model and feature quality, maintainability, and scalability.
- Leveraging distributed compute to execute arbitrary functions and services required in the model development and serving lifecycle
Who is this for
This playbook is designed for data scientists, data engineers and machine learning engineers working on training, deploying, and maintaining ML models using the Almanak MLOps SDK and WandB.
Feature Lifecycle
See here for more details on using the Feast Feature Store with the Feature Store Playbook using MLOps SDK
What is a feature?
A feature is an individual measurable property observed on an entity. For example, a feature of a customer entity could be the number of transactions they have made on an average month.
What is an entity?
An entity is a collection of semantically related features.
Experimental features
- Used for model experimentation
- Feature Engineering:
- Identify relevant features based on domain knowledge, exploratory data analysis, and feature importance techniques.
- Transform raw data into features suitable for ML models (e.g., scaling, encoding, imputation).
- Keep track of the original raw data source and the transformation query/code used to create the features. The data source and transformation logic will be supplied to Data Engineering when promoting the feature to a production feature
- Register the feature tables as data sources into the feature store
- Define feature entities from the feature tables
- Create a feature view from entities, features and a data source
- A feature view represents a logical group of feature data from a data source
- Create a feature service from one or more feature views.
- A model’s input features should be captured by a feature service. For each model, create and associate a feature service to the model
- Feature Validation
- Ensure data consistency, quality, and relevance of features through statistical tests and visualisations
- Feature Versioning
- Register the model’s features into the model’s metadata using the
register_metadata_to_model
method - Log the feature service alongside the model in WandB as a single artifact
- Register the model’s features into the model’s metadata using the
Production Features
- Used for creating and serving production models
- Establish a criterial for promoting experimental features to production
- Data Science will make a production feature request with Data Engineering
- The original raw data source, transformation query/code and Feast feature configuration (data sources, entities, feature views and feature service) will be submitted as part of that request
- Data Engineering will
- Backfill historical data for the new production feature if required
- Create a scheduled DBT query that transform the raw data source into a production feature table
- Register the production feature table into DAAS
- DBT acts as a data transformation and data model versioning for the raw data source
- Update the feature store repo with:
- The feast definitions to define it as a production feature
- Register the production feature view into the feature materialisation cloud composer dag located in the feature registry github repo
- Upon merge to main, the production feature view can be materialised to the online store by triggering the materialisation from the mlops feature store sdk and retrieved as historical features from the offline store
- Upon update of the feature repo, the production feature will be viewable through DAAS (as a production feature table) and as production feature service through the Feast UI (see the feature stop playbook for more details)
- Upon confirming the production feature is accurate, the production feature request can be wrapped up
Maintenance and Monitoring
- Monitor feature drift and data quality issues to rectify data issues and a need for a new model weights
Quality Assurance
- Set up monitoring dashboards and alerts for feature statistics and model performance metrics.
- Periodically review and update feature definitions, data sources, and transformations as needed.
Model Lifecycle
Use WandB SDK to track and log your experiments
- Runs https://docs.wandb.ai/guides/runs
- A single unit of computation logged by W&B is called a Run.
- Consider a W&B Run as an atomic element of your whole project. You should create and initiate a new Run if you change a hyperparameter, use a different model, create a W&B Artifact and so on.
- Use W&B Runs for tasks such as:
- Each time you train a model.
- Log data or a model as a W&B Artifact.
- Download a W&B Artifact.
- Anything you log with
wandb.log
is recorded in that Run.
- Experiments https://docs.wandb.ai/guides/track
wandb.init()
: Initialize a new run at the top of your script. This returns aRun
object and creates a local directory where all logs and files are saved, then streamed asynchronously to a W&B server. If you want to use a private server instead of our hosted cloud server, we offer Self-Hosting.wandb.config
: Save a dictionary of hyperparameters such as learning rate or model type. The model settings you capture in config are useful later to organize and query your results.wandb.log()
: Log metrics over time in a training loop, such as accuracy and loss. By default, when you callwandb.log
it appends a new step to thehistory
object and updates thesummary
object.wandb.log_artifact
: Save outputs of a run, like the model weights or a table of predictions. This lets you track not just model training, but all the pipeline steps that affect the final model.history
: An array of dictionary-like objects that tracks metrics over time. These time series values are shown as default line plots in the UI.summary
: By default, the final value of a metric logged with wandb.log(). You can set the summary for a metric manually to capture the highest accuracy or lowest loss instead of the final value. These values are used in the table, and plots that compare runs — for example, you could visualize at the final accuracy for all runs in your project.
- Artifacts https://docs.wandb.ai/guides/artifacts
- Use W&B Artifacts to track datasets, models, dependencies, and results through each step of your machine learning pipeline. Artifacts make it easy to get a complete and auditable history of changes to your files.
- Artifacts can be thought of as a versioned directory. Artifacts are either an input of a run or an output of a run.
- WandB Sweeps https://docs.wandb.ai/guides/sweeps
- Tool for automated hyperparameter tuning
- Model Registry https://docs.wandb.ai/guides/models
- Create Registered Models to organize your best model versions for a given task
- Track a model moving into staging and production
- See a history of all changes, including who moved a model to production
- Features:
- Model Versioning
- Model Lineage
- Model Lifecycle
- Data Visualisation https://docs.wandb.ai/guides/data-vis
Data Preparation
Ray Datasets is the recommended Dataframe format for dealing with features. It can be used with Ray Train and Ray Tune for model training and hyperparameter tuning respectively
- Training features from a feature service should be retrieved as a Ray Dataset using the MLOps SDK
from typing import Tuple
from ray.data import Dataset
import pandas as pd
from almanak_mlops.feature_store import AlmanakFeatureStore
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
store = AlmanakFeatureStore()
entity_df = pd.DataFrame.from_dict(
{
"address": [
"0xb793c026a29b5aab9a4c02d4a84f5fdeb697ad73",
"0x239f30aa3e17d352bfefede4c379e9d744538a00",
],
}
)
entity_df["event_timestamp"] = pd.to_datetime("2023-03-09", utc=True)
dataset = store.materialize_to_ray_dataset(
entity_df, store.get_feature_service("eth_contracts_service_v1")
)
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
test_dataset = valid_dataset.drop_columns(cols=["target"])
return train_dataset, valid_dataset, test_dataset- Pandas APIs to Ray Dataset APIshttps://docs.ray.io/en/latest/data/api/from_other_data_libs.html#for-pandas-users
- Ray Dataset APIs https://docs.ray.io/en/latest/data/api/api.html
- If you are dealing with a model training library that cannot work with Ray Dataset, the dataset can be exported to other common Dataframe formats
- We understand that there are scenarios where some new experimental data was just obtained and will take time to be onboarded into BigQuery to be used as a feature data source.
You can still directly load that data in as a Ray Dataset to train an experimental model to check if the feature data is useful. Once that feature data is verified to be useful, do put in a data request to Data Engineering to onboard that data, such that it is now accessible through the feature store and formally defined as a feature data source.
What problems do Ray Datasets solve?
Ray Datasets aims to solve the problems of slow, resource-inefficient, unscalable data loading and preprocessing pipelines for two core uses cases:
Model training: resulting in poor training throughput and low GPU utilization as the trainers are bottlenecked on preprocessing and data loading.
Batch inference: resulting in poor batch inference throughput and low GPU utilization.
In order to solve these problems without sacrificing usability, Ray Datasets simplifies parallel and pipelined data processing on Ray, providing a higher-level API while internally handling data batching, task parallelism and pipelining, and memory management.
Model Training
It is highly encouraged to use model libraries shipped with Ray Train for scalability reasons
- Use the Ray Dataset to perform distributed model training with Ray Train
- Trainers in Ray Train https://docs.ray.io/en/latest/ray-air/trainers.html
- Ray Train model training examples https://docs.ray.io/en/latest/ray-air/examples/
- There are three broad categories of Trainers that Ray AI Runtime (AIR) offers:
- Deep Learning Trainers (Pytorch, Tensorflow, Horovod)
- Tree-based Trainers (XGboost, LightGBM)
- Other ML frameworks (Hugging Face, Scikit-Learn, RLlib)
- If Ray Train does not have a model that suits the solution required, distributed compute can still be applied during the model training process. See more here: https://almanak.atlassian.net/wiki/spaces/MLEO/pages/249167954/Almanak+MLOps+Playbook#Using-distributed-compute-to-run-python-functions-and-services-with-Ray-Core
Hyperparameter Tuning
What is hyperparameter tuning?
Hyperparameter tuning is the process of selecting the best combination of hyperparameters for a model to achieve optimal performance on a specific task. Hyperparameters are adjustable parameters that are not learned from data but determined prior to training.
Hyperparameter tuning involves trying out different combinations of hyperparameters, training the model with each set of hyperparameters, and evaluating the performance of the resulting models. The objective is to find the hyperparameters that result in the best performance on a validation set.
How Does Automated Hyperparameter Tuning Work?
- Configure hyperparameter optimisation with the following attributes
- Hyperparameters search space
- Metric and mode for measuring performance of objective function
- Method for iterating through search space
- Creating an objective function that wraps around the model training function and model validation function
- The objective function will retrieve hyperparameter values from the config reference and use it to configure the model training function
- The objective function will return the model score
- Passing the objective function to an optimisation function that will iteratively run the objective function to train the model over different hyperparameters in the search space. The validation metric will be used in a feedback loop to approach the optimal hyperparameter for model training.
Hyperparameter Tuning Tools
Use these tools to perform distributed hyperparameter tuning
- (Recommended) Ray Tune https://docs.ray.io/en/latest/tune/index.html
- Allows for distributed hyperparameter tuning on the ray cluster
- Closely integrates with Ray Datasets and Ray Train
- WandB Sweeps https://docs.wandb.ai/guides/sweeps
- Supports multi-cpu and multi-gpu hyperparameter tuning on local machine
Example: Performing distributed hyperparameter tuning with a quadratic model
- Ray Tune
- Write the tuning script
tune_script.py
import argparse
import ray
from ray import tune
parser = argparse.ArgumentParser()
parser.add_argument("--address")
args = parser.parse_args()
ray.init(address=args.address)
def objective_function(config):
score = config["a"] ** 2 + config["b"]
return {"score": score}
search_space = {
"a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
"b": tune.choice([1, 2, 3]),
}
tuner = tune.Tuner(objective_function, param_space=search_space)
results = tuner.fit()
print(results.get_best_result(metric="score", mode="min").config) - Submit the tune script to the Ray Cluster by providing the ray address
ray submit tune-default.yaml tune_script.py -- --ray-address=XXXX:6379
- (Optional) This script can also be executed on your local machine’s ray cluster when debugging
- Write the tuning script
- WandB SweepParallelising tuning across multi-cpu and multi-gpu: https://docs.wandb.ai/guides/sweeps/parallelize-agents
import wandb
sweep_configuration = {
"name": "sweep",
"metric": {"goal": "minimize", "name": "validation_score"},
"method": "grid",
"parameters": {
"a": {"values": [0.001, 0.01, 0.1, 1.0]},
"b": {"values": [1, 2, 3]},
},
}
sweep_id = wandb.sweep(sweep=sweep_configuration, project="my-first-sweep")
def objective_function():
wandb.init()
# note that we define values from `wandb.config` instead of defining hard values
a = wandb.config.a
b = wandb.config.b
# use validation score when working with ML models
score = a**2 + b
wandb.log(
{
"a": a,
"b": b,
"score": score,
}
)
wandb.agent(sweep_id, function=objective_function, count=10)
Model Validation and Evaluation
- Decide on a model evaluation metric and use it to score models
- Perform model validation
Model Registration
- Log the model and features to WandB using the MLOps SDK’s
log_model_and_feature_service
APIAlmanakFeatureStore().log_model_and_feature_service(
wandb: ModuleType,
run: wandb.wandb_run.Run,
model: Union[onnx.ModelProto, RayModel],
feature_service: FeatureService,
entity_df: pd.DataFrame,
model_output_description: str,
model_name: Optional[str] = None,
) - This API does the following:
- Retrieves the feature views for the feature service and converts it to feature metadata
- Embeds the following metadata into the onnx model:
- List of feature’s name, data type and description
- Description of the model output
- Saves the onnx model to a temporary local file and logs it to wandb as a model artifact
- Creates a high resolution feature metadata that describes the feature service, feature views, feature entities and feature sources
- This includes the original BigQuery query/table used to define the data source for each feature
- The high resolution feature metadata is embedded into the wandb model artifact
- Embeds the serialized experimental feature protos to allow for recreation later
- Logs the model artifact
- If no model name is provided, the model will take on the name of the feature service
Model Serving
This section will be expanded and become more detailed when the model serving environment is formally defined
- Provide a list of models to be served to the model serving environment
- The models to be used should be provided as a list of WandB Model Registry model artifacts
- When the model serving environment initialises, it should immediately trigger materialisation of online features
- The online features to be materialised are to be registered to the cloud composer materialisation DAG in the feature repo by Data Engineering
- The model artifacts will be downloaded by the model serving environment and a deployment dag will be created
- The list of features each model requires will be extracted from the model artifact’s feast feature metadata. The features should match production features in the available online feature store.
- When the Almanak Broker pushes the EVM state over the model serving environment as part of a inference request, the feature entities selection should be made available.
- The model serving environment will use the feature entities selection, alongside its features metadata, to retrieve the correct feature view set from the online store through the MLOps SDK and use that as input for model inference
- The model inference result will be returned to the Almanak Broker as a response to the inference request
Using distributed compute to run python functions and services with Ray Core
https://docs.ray.io/en/latest/ray-core/user-guide.html
Why use distributed compute with Ray Core?
Using Ray Core offers several advantages compared to running functions and services locally:
- Distributed computing: Ray allows you to execute tasks and services on a cluster of machines rather than on a single machine. This enables you to scale your computations horizontally and take advantage of multiple cores or machines to perform your work. This is particularly useful when working with large datasets or running computationally intensive tasks that can benefit from parallel execution.
- Parallelism and concurrency: Ray automatically handles parallel and concurrent execution of tasks and services. This means you can easily execute multiple tasks or method calls at the same time without worrying about manually managing threads, processes, or synchronization. Ray's underlying scheduler efficiently manages the execution of tasks on the available resources, optimizing for throughput and minimizing latency.
- Fault tolerance: Ray provides fault tolerance features that allow you to handle node failures and ensure that your tasks and services can recover from failures. When a Ray task or actor fails, the system will automatically reschedule the task or restart the actor on another node, allowing your application to continue running despite hardware or software issues.
- Resource management: Ray allows you to specify resource requirements for your tasks and actors, such as the number of CPUs, GPUs, or custom resources needed for execution. This enables Ray to efficiently manage and allocate resources across your cluster, ensuring that your tasks and services run on the appropriate hardware and that resources are not wasted.
Key Concepts of Ray Core https://docs.ray.io/en/latest/ray-core/key-concepts.html
- Tasks
- Ray enables arbitrary functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called “tasks”. Ray enables tasks to specify their resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution.
- See the User Guide for Tasks.
- Actors
- Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.
- See the User Guide for Actors.
MLOps SDK for Ray Core
Using the MLOps SDK, existing python scripts require minor modifications to allow computationally heavy functions and services to be executed on the ray cluster in a distributed fashion.
This will allow for more efficient and consistent development through the use of a more powerful and standardised compute environment
Converting regular python functions to run on Ray Core
The MLOps SDK has an API to wrap a function almanak_mlops.ray_.remote.ray_task_wrapper
. It can be used as either a decorator or a higher order function.
def ray_task_wrapper(ray: ModuleType, async_task: bool = False) -> Callable
The wrapped function can be interacted with as a regular function, or as a coroutine if the async_task=True
parameter is passed to the wrapper
import argparse
import random
import asyncio
import ray
from almanak_mlops.ray_.remote import ray_task_wrapper
parser = argparse.ArgumentParser()
parser.add_argument("--address")
args = parser.parse_args()
ray.init(address=args.address)
"""
This script demonstrates how to execute an arbitrary python function in a distributed fashion using Ray Core.
The ray_task_wrapper that can be applied to a target function as either a decorator or a higher order function. The wrapper returns a function that can be executed on the Ray cluster. The wrapper also takes care of serializing and deserializing the arguments and return values of the target function.
"""
def multiplier_task(input: int) -> float:
"""
Ray enables arbitrary functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called “tasks”. Ray enables tasks to specify their resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution.
"""
result = input * 2
print(f"I am running on the ray cluster. Input: {input} Result: {result}")
return result
def run_remote_task():
print("I am running on the local machine")
"""Using ray_task_wrapper as a higher order function"""
multiplication_results = [
ray_task_wrapper(ray)(multiplier_task)(random.randint(1, 100))
for _ in range(10)
]
print("Multiplication results")
print(multiplication_results)
"""Using ray_task_wrapper as a decorator"""
@ray_task_wrapper(ray)
def division_task(x: int):
result = x * 2
print(f"I am running on the ray cluster. Input: {x} Result: {result}")
return result
division_results = [division_task(random.randint(1, 100)) for _ in range(10)]
print("Division results")
print(division_results)
async def run_remote_task_async():
print("I am running on the local machine")
"""Using ray_task_wrapper as a higher order function"""
multiplication_results = await asyncio.gather(
*(
ray_task_wrapper(ray, async_task=True)(multiplier_task)(
random.randint(1, 100)
)
for _ in range(10)
)
)
print("Multiplication results")
print(multiplication_results)
"""Using ray_task_wrapper as a decorator"""
@ray_task_wrapper(ray, async_task=True)
def division_task(x: int):
result = x * 2
print(f"I am running on the ray cluster. Input: {x} Result: {result}")
return result
division_results = await asyncio.gather(
*(division_task(random.randint(1, 100)) for _ in range(10))
)
print("Division results")
print(division_results)
if __name__ == "__main__":
run_remote_task()
asyncio.run(run_remote_task_async())
Creating a Ray Core Service with a python class
The MLOps SDK provides an API that allows a service to be described as a python class. almanak_mlops.ray_.remote.ray_actor_wrapper
def ray_actor_wrapper(
ray, async_actor: bool = False, num_cpus=1, num_gpus=0, resources={}
) -> Type
The methods of that service can be described as methods attached to the class.
The service can be interacted with as an instance of the class
If the async_actor=True
parameter is passed to the API, the methods of the class will become coroutines that returns an Awaitable.
import argparse
import asyncio
import ray
from almanak_mlops.ray_.remote import ray_actor_wrapper
parser = argparse.ArgumentParser()
parser.add_argument("--address")
args = parser.parse_args()
ray.init(address=args.address)
"""
This script demonstrates how to write an arbitrary python class to be used as a service with Ray Core.
The ray_actor_wrapper that can be applied to a target class as either a decorator or a higher order function. The wrapper returns an object that represents a service running on the Ray cluster. The wrapper also takes care of serializing and deserializing the arguments and return values of the service object.
"""
def run_remote_actor():
"""Creating a synchronous actor class"""
"""Using the ray_actor_wrapper as a decorator"""
@ray_actor_wrapper(ray)
class CounterService:
"""
Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.
"""
def __init__(self) -> None:
self._state = 0
def increment(self) -> None:
print("I am running on the ray cluster. Incrementing state")
self._state += 1
def decrement(self) -> None:
print("I am running on the ray cluster. Decrementing state")
self._state -= 1
def state(self) -> int:
print("I am running on the ray cluster. Returning state")
return self._state
counter_service = CounterService()
print(counter_service.state())
counter_service.increment()
print(counter_service.state())
counter_service.decrement()
print(counter_service.state())
class AdderService:
def __init__(self) -> None:
self._state = 0
def add(self, x) -> None:
print("I am running on the ray cluster. Adding state")
self._state += x
def state(self) -> int:
"""NOTE: Ray Actors do not support property getters and setters."""
print("I am running on the ray cluster. Returning state")
return self._state
"""Using the ray_actor_wrapper as a higher order function"""
AdderService = ray_actor_wrapper(ray)(AdderService)
adder_service = AdderService()
print(adder_service.state())
adder_service.add(42)
print(adder_service.state())
async def run_remote_actor_async():
"""Creating an asynchronous actor class"""
"""Using the ray_actor_wrapper as a decorator"""
@ray_actor_wrapper(ray, async_actor=True)
class CounterService:
"""
Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.
"""
def __init__(self) -> None:
self._state = 0
def increment(self) -> None:
print("I am running on the ray cluster. Incrementing state")
self._state += 1
def decrement(self) -> None:
print("I am running on the ray cluster. Decrementing state")
self._state -= 1
def state(self) -> int:
print("I am running on the ray cluster. Returning state")
return self._state
counter_service = CounterService()
print(await counter_service.state())
await counter_service.increment()
print(await counter_service.state())
await counter_service.decrement()
print(await counter_service.state())
class AdderService:
def __init__(self) -> None:
self._state = 0
def add(self, x) -> None:
print("I am running on the ray cluster. Adding state")
self._state += x
def state(self) -> int:
"""NOTE: Ray Actors do not support property getters and setters."""
print("I am running on the ray cluster. Returning state")
return self._state
"""Using the ray_actor_wrapper as a higher order function"""
AdderService = ray_actor_wrapper(ray, async_actor=True)(AdderService)
adder_service = AdderService()
print(await adder_service.state())
await adder_service.add(42)
print(await adder_service.state())
if __name__ == "__main__":
print("Running synchronous actor")
run_remote_actor()
print("Running asynchronous actor")
asyncio.run(run_remote_actor_async())
Connecting/submitting jobs to the Ray Cluster
Ray cluster endpoint for use with ray.init
ray cluster address: 35.204.61.141
The Almanak MLOps SDK is installed and available for use on the ray cluster.
ray-config.yaml
Create a yaml file locally with these contents:
cluster_name: almanak-cluster
provider:
type: gcp
region: europe-west4
availability_zone: europe-west4-a
project_id: almanak-production
max_workers: 5
initialization_commands:
- echo "Running initialization_commands"
head_setup_commands:
- pip install google-api-python-client==1.7.8
The ray-config.yaml
provides instruction to the ray cli on how it should interact with the ray cluster.
To ssh into the ray cluster, run the shell command:
ray attach <path to the ray config yaml file>