Skip to main content

Almanak MLOps Playbook


Overview of MLOps

Machine Learning Operations (MLOps) is a set of practices designed to streamline the process of deploying, monitoring, and maintaining machine learning models in production environments. This playbook provides a guide on implementing MLOps, focusing on model and feature lifecycle management.

Goals and Objectives

  • Facilitate collaboration between data scientists, machine learning engineers and stakeholders.
  • Streamline the model and feature lifecycle, from creation to deployment.
  • Ensure model and feature quality, maintainability, and scalability.
  • Leveraging distributed compute to execute arbitrary functions and services required in the model development and serving lifecycle

Who is this for

This playbook is designed for data scientists, data engineers and machine learning engineers working on training, deploying, and maintaining ML models using the Almanak MLOps SDK and WandB.

Feature Lifecycle

See here for more details on using the Feast Feature Store with the Feature Store Playbook using MLOps SDK

What is a feature?

A feature is an individual measurable property observed on an entity. For example, a feature of a customer entity could be the number of transactions they have made on an average month.

What is an entity?

An entity is a collection of semantically related features.

Experimental features

  • Used for model experimentation
  • Feature Engineering:
    • Identify relevant features based on domain knowledge, exploratory data analysis, and feature importance techniques.
    • Transform raw data into features suitable for ML models (e.g., scaling, encoding, imputation).
    • Keep track of the original raw data source and the transformation query/code used to create the features. The data source and transformation logic will be supplied to Data Engineering when promoting the feature to a production feature
    • Register the feature tables as data sources into the feature store
    • Define feature entities from the feature tables
    • Create a feature view from entities, features and a data source
      • A feature view represents a logical group of feature data from a data source
    • Create a feature service from one or more feature views.
      • A model’s input features should be captured by a feature service. For each model, create and associate a feature service to the model
  • Feature Validation
    • Ensure data consistency, quality, and relevance of features through statistical tests and visualisations
  • Feature Versioning
    • Register the model’s features into the model’s metadata using the register_metadata_to_model method
    • Log the feature service alongside the model in WandB as a single artifact

Production Features

  • Used for creating and serving production models
  • Establish a criterial for promoting experimental features to production
  • Data Science will make a production feature request with Data Engineering
    • The original raw data source, transformation query/code and Feast feature configuration (data sources, entities, feature views and feature service) will be submitted as part of that request
  • Data Engineering will
    • Backfill historical data for the new production feature if required
    • Create a scheduled DBT query that transform the raw data source into a production feature table
    • Register the production feature table into DAAS
      • DBT acts as a data transformation and data model versioning for the raw data source
    • Update the feature store repo with:
      • The feast definitions to define it as a production feature
      • Register the production feature view into the feature materialisation cloud composer dag located in the feature registry github repo
      • Upon merge to main, the production feature view can be materialised to the online store by triggering the materialisation from the mlops feature store sdk and retrieved as historical features from the offline store
    • Upon update of the feature repo, the production feature will be viewable through DAAS (as a production feature table) and as production feature service through the Feast UI (see the feature stop playbook for more details)
  • Upon confirming the production feature is accurate, the production feature request can be wrapped up

Maintenance and Monitoring

  • Monitor feature drift and data quality issues to rectify data issues and a need for a new model weights

Quality Assurance

  • Set up monitoring dashboards and alerts for feature statistics and model performance metrics.
  • Periodically review and update feature definitions, data sources, and transformations as needed.

Model Lifecycle

Use WandB SDK to track and log your experiments

https://docs.wandb.ai/guide

  • Runs https://docs.wandb.ai/guides/runs
    • A single unit of computation logged by W&B is called a Run.
    • Consider a W&B Run as an atomic element of your whole project. You should create and initiate a new Run if you change a hyperparameter, use a different model, create a W&B Artifact and so on.
    • Use W&B Runs for tasks such as:
    • Anything you log with wandb.log is recorded in that Run.
  • Experiments https://docs.wandb.ai/guides/track
    • wandb.init(): Initialize a new run at the top of your script. This returns a Run object and creates a local directory where all logs and files are saved, then streamed asynchronously to a W&B server. If you want to use a private server instead of our hosted cloud server, we offer Self-Hosting.
    • wandb.config: Save a dictionary of hyperparameters such as learning rate or model type. The model settings you capture in config are useful later to organize and query your results.
    • wandb.log(): Log metrics over time in a training loop, such as accuracy and loss. By default, when you call wandb.log it appends a new step to the history object and updates the summary object.
    • wandb.log_artifact: Save outputs of a run, like the model weights or a table of predictions. This lets you track not just model training, but all the pipeline steps that affect the final model.
    • history: An array of dictionary-like objects that tracks metrics over time. These time series values are shown as default line plots in the UI.
    • summary: By default, the final value of a metric logged with wandb.log(). You can set the summary for a metric manually to capture the highest accuracy or lowest loss instead of the final value. These values are used in the table, and plots that compare runs — for example, you could visualize at the final accuracy for all runs in your project.
  • Artifacts https://docs.wandb.ai/guides/artifacts
    • Use W&B Artifacts to track datasets, models, dependencies, and results through each step of your machine learning pipeline. Artifacts make it easy to get a complete and auditable history of changes to your files.
    • Artifacts can be thought of as a versioned directory. Artifacts are either an input of a run or an output of a run.
  • WandB Sweeps https://docs.wandb.ai/guides/sweeps
    • Tool for automated hyperparameter tuning
  • Model Registry https://docs.wandb.ai/guides/models
    • Create Registered Models to organize your best model versions for a given task
    • Track a model moving into staging and production
    • See a history of all changes, including who moved a model to production
    • Features:
      • Model Versioning
      • Model Lineage
      • Model Lifecycle
  • Data Visualisation https://docs.wandb.ai/guides/data-vis

Data Preparation

Ray Datasets is the recommended Dataframe format for dealing with features. It can be used with Ray Train and Ray Tune for model training and hyperparameter tuning respectively

  • Training features from a feature service should be retrieved as a Ray Dataset using the MLOps SDK
    from typing import Tuple
    from ray.data import Dataset
    import pandas as pd
    from almanak_mlops.feature_store import AlmanakFeatureStore
    def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    store = AlmanakFeatureStore()
    entity_df = pd.DataFrame.from_dict(
    {
    "address": [
    "0xb793c026a29b5aab9a4c02d4a84f5fdeb697ad73",
    "0x239f30aa3e17d352bfefede4c379e9d744538a00",
    ],
    }
    )
    entity_df["event_timestamp"] = pd.to_datetime("2023-03-09", utc=True)
    dataset = store.materialize_to_ray_dataset(
    entity_df, store.get_feature_service("eth_contracts_service_v1")
    )
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(cols=["target"])
    return train_dataset, valid_dataset, test_dataset
  • We understand that there are scenarios where some new experimental data was just obtained and will take time to be onboarded into BigQuery to be used as a feature data source.
    You can still directly load that data in as a Ray Dataset to train an experimental model to check if the feature data is useful. Once that feature data is verified to be useful, do put in a data request to Data Engineering to onboard that data, such that it is now accessible through the feature store and formally defined as a feature data source.

What problems do Ray Datasets solve?

Ray Datasets aims to solve the problems of slow, resource-inefficient, unscalable data loading and preprocessing pipelines for two core uses cases:

Model training: resulting in poor training throughput and low GPU utilization as the trainers are bottlenecked on preprocessing and data loading.

Batch inference: resulting in poor batch inference throughput and low GPU utilization.

In order to solve these problems without sacrificing usability, Ray Datasets simplifies parallel and pipelined data processing on Ray, providing a higher-level API while internally handling data batching, task parallelism and pipelining, and memory management.

Model Training

It is highly encouraged to use model libraries shipped with Ray Train for scalability reasons

Hyperparameter Tuning

What is hyperparameter tuning?

Hyperparameter tuning is the process of selecting the best combination of hyperparameters for a model to achieve optimal performance on a specific task. Hyperparameters are adjustable parameters that are not learned from data but determined prior to training.

Hyperparameter tuning involves trying out different combinations of hyperparameters, training the model with each set of hyperparameters, and evaluating the performance of the resulting models. The objective is to find the hyperparameters that result in the best performance on a validation set.

How Does Automated Hyperparameter Tuning Work?

  1. Configure hyperparameter optimisation with the following attributes
    1. Hyperparameters search space
    2. Metric and mode for measuring performance of objective function
    3. Method for iterating through search space
  2. Creating an objective function that wraps around the model training function and model validation function
    1. The objective function will retrieve hyperparameter values from the config reference and use it to configure the model training function
    2. The objective function will return the model score
  3. Passing the objective function to an optimisation function that will iteratively run the objective function to train the model over different hyperparameters in the search space. The validation metric will be used in a feedback loop to approach the optimal hyperparameter for model training.

Hyperparameter Tuning Tools

Use these tools to perform distributed hyperparameter tuning

  1. (Recommended) Ray Tune https://docs.ray.io/en/latest/tune/index.html
    1. Allows for distributed hyperparameter tuning on the ray cluster
    2. Closely integrates with Ray Datasets and Ray Train
  2. WandB Sweeps https://docs.wandb.ai/guides/sweeps
    1. Supports multi-cpu and multi-gpu hyperparameter tuning on local machine

Example: Performing distributed hyperparameter tuning with a quadratic model

  1. Ray Tune
    1. Write the tuning script tune_script.py
      import argparse
      import ray
      from ray import tune
      parser = argparse.ArgumentParser()
      parser.add_argument("--address")
      args = parser.parse_args()
      ray.init(address=args.address)
      def objective_function(config):
      score = config["a"] ** 2 + config["b"]
      return {"score": score}
      search_space = {
      "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
      "b": tune.choice([1, 2, 3]),
      }
      tuner = tune.Tuner(objective_function, param_space=search_space)
      results = tuner.fit()
      print(results.get_best_result(metric="score", mode="min").config)
    2. Submit the tune script to the Ray Cluster by providing the ray address
      ray submit tune-default.yaml tune_script.py -- --ray-address=XXXX:6379
    3. (Optional) This script can also be executed on your local machine’s ray cluster when debugging
  2. WandB Sweep
    import wandb
    sweep_configuration = {
    "name": "sweep",
    "metric": {"goal": "minimize", "name": "validation_score"},
    "method": "grid",
    "parameters": {
    "a": {"values": [0.001, 0.01, 0.1, 1.0]},
    "b": {"values": [1, 2, 3]},
    },
    }
    sweep_id = wandb.sweep(sweep=sweep_configuration, project="my-first-sweep")
    def objective_function():
    wandb.init()
    # note that we define values from `wandb.config` instead of defining hard values
    a = wandb.config.a
    b = wandb.config.b
    # use validation score when working with ML models
    score = a**2 + b
    wandb.log(
    {
    "a": a,
    "b": b,
    "score": score,
    }
    )
    wandb.agent(sweep_id, function=objective_function, count=10)
    Parallelising tuning across multi-cpu and multi-gpu: https://docs.wandb.ai/guides/sweeps/parallelize-agents

Model Validation and Evaluation

  • Decide on a model evaluation metric and use it to score models
  • Perform model validation

Model Registration

  • Log the model and features to WandB using the MLOps SDK’s log_model_and_feature_service API
    AlmanakFeatureStore().log_model_and_feature_service(
    wandb: ModuleType,
    run: wandb.wandb_run.Run,
    model: Union[onnx.ModelProto, RayModel],
    feature_service: FeatureService,
    entity_df: pd.DataFrame,
    model_output_description: str,
    model_name: Optional[str] = None,
    )
  • This API does the following:
    1. Retrieves the feature views for the feature service and converts it to feature metadata
    2. Embeds the following metadata into the onnx model:
      1. List of feature’s name, data type and description
      2. Description of the model output
    3. Saves the onnx model to a temporary local file and logs it to wandb as a model artifact
    4. Creates a high resolution feature metadata that describes the feature service, feature views, feature entities and feature sources
      1. This includes the original BigQuery query/table used to define the data source for each feature
    5. The high resolution feature metadata is embedded into the wandb model artifact
    6. Embeds the serialized experimental feature protos to allow for recreation later
    7. Logs the model artifact
      1. If no model name is provided, the model will take on the name of the feature service

Model Serving

This section will be expanded and become more detailed when the model serving environment is formally defined

  • Provide a list of models to be served to the model serving environment
    • The models to be used should be provided as a list of WandB Model Registry model artifacts
  • When the model serving environment initialises, it should immediately trigger materialisation of online features
    • The online features to be materialised are to be registered to the cloud composer materialisation DAG in the feature repo by Data Engineering
  • The model artifacts will be downloaded by the model serving environment and a deployment dag will be created
  • The list of features each model requires will be extracted from the model artifact’s feast feature metadata. The features should match production features in the available online feature store.
  • When the Almanak Broker pushes the EVM state over the model serving environment as part of a inference request, the feature entities selection should be made available.
  • The model serving environment will use the feature entities selection, alongside its features metadata, to retrieve the correct feature view set from the online store through the MLOps SDK and use that as input for model inference
  • The model inference result will be returned to the Almanak Broker as a response to the inference request

Using distributed compute to run python functions and services with Ray Core

https://docs.ray.io/en/latest/ray-core/user-guide.html

Why use distributed compute with Ray Core?

Using Ray Core offers several advantages compared to running functions and services locally:

  1. Distributed computing: Ray allows you to execute tasks and services on a cluster of machines rather than on a single machine. This enables you to scale your computations horizontally and take advantage of multiple cores or machines to perform your work. This is particularly useful when working with large datasets or running computationally intensive tasks that can benefit from parallel execution.
  2. Parallelism and concurrency: Ray automatically handles parallel and concurrent execution of tasks and services. This means you can easily execute multiple tasks or method calls at the same time without worrying about manually managing threads, processes, or synchronization. Ray's underlying scheduler efficiently manages the execution of tasks on the available resources, optimizing for throughput and minimizing latency.
  3. Fault tolerance: Ray provides fault tolerance features that allow you to handle node failures and ensure that your tasks and services can recover from failures. When a Ray task or actor fails, the system will automatically reschedule the task or restart the actor on another node, allowing your application to continue running despite hardware or software issues.
  4. Resource management: Ray allows you to specify resource requirements for your tasks and actors, such as the number of CPUs, GPUs, or custom resources needed for execution. This enables Ray to efficiently manage and allocate resources across your cluster, ensuring that your tasks and services run on the appropriate hardware and that resources are not wasted.

Key Concepts of Ray Core https://docs.ray.io/en/latest/ray-core/key-concepts.html

  • Tasks
    • Ray enables arbitrary functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called “tasks”. Ray enables tasks to specify their resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution.
    • See the User Guide for Tasks.
  • Actors
    • Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.
    • See the User Guide for Actors.

MLOps SDK for Ray Core

Using the MLOps SDK, existing python scripts require minor modifications to allow computationally heavy functions and services to be executed on the ray cluster in a distributed fashion.

This will allow for more efficient and consistent development through the use of a more powerful and standardised compute environment

Converting regular python functions to run on Ray Core

The MLOps SDK has an API to wrap a function almanak_mlops.ray_.remote.ray_task_wrapper . It can be used as either a decorator or a higher order function.

def ray_task_wrapper(ray: ModuleType, async_task: bool = False) -> Callable

The wrapped function can be interacted with as a regular function, or as a coroutine if the async_task=True parameter is passed to the wrapper

import argparse
import random
import asyncio

import ray

from almanak_mlops.ray_.remote import ray_task_wrapper

parser = argparse.ArgumentParser()
parser.add_argument("--address")
args = parser.parse_args()
ray.init(address=args.address)

"""
This script demonstrates how to execute an arbitrary python function in a distributed fashion using Ray Core.

The ray_task_wrapper that can be applied to a target function as either a decorator or a higher order function. The wrapper returns a function that can be executed on the Ray cluster. The wrapper also takes care of serializing and deserializing the arguments and return values of the target function.
"""


def multiplier_task(input: int) -> float:
"""
Ray enables arbitrary functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called “tasks”. Ray enables tasks to specify their resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution.
"""
result = input * 2
print(f"I am running on the ray cluster. Input: {input} Result: {result}")
return result


def run_remote_task():
print("I am running on the local machine")

"""Using ray_task_wrapper as a higher order function"""
multiplication_results = [
ray_task_wrapper(ray)(multiplier_task)(random.randint(1, 100))
for _ in range(10)
]
print("Multiplication results")
print(multiplication_results)

"""Using ray_task_wrapper as a decorator"""

@ray_task_wrapper(ray)
def division_task(x: int):
result = x * 2
print(f"I am running on the ray cluster. Input: {x} Result: {result}")
return result

division_results = [division_task(random.randint(1, 100)) for _ in range(10)]
print("Division results")
print(division_results)


async def run_remote_task_async():
print("I am running on the local machine")
"""Using ray_task_wrapper as a higher order function"""
multiplication_results = await asyncio.gather(
*(
ray_task_wrapper(ray, async_task=True)(multiplier_task)(
random.randint(1, 100)
)
for _ in range(10)
)
)
print("Multiplication results")
print(multiplication_results)

"""Using ray_task_wrapper as a decorator"""

@ray_task_wrapper(ray, async_task=True)
def division_task(x: int):
result = x * 2
print(f"I am running on the ray cluster. Input: {x} Result: {result}")
return result

division_results = await asyncio.gather(
*(division_task(random.randint(1, 100)) for _ in range(10))
)
print("Division results")
print(division_results)


if __name__ == "__main__":
run_remote_task()
asyncio.run(run_remote_task_async())

Creating a Ray Core Service with a python class

The MLOps SDK provides an API that allows a service to be described as a python class. almanak_mlops.ray_.remote.ray_actor_wrapper

def ray_actor_wrapper(
ray, async_actor: bool = False, num_cpus=1, num_gpus=0, resources={}
) -> Type

The methods of that service can be described as methods attached to the class.

The service can be interacted with as an instance of the class

If the async_actor=True parameter is passed to the API, the methods of the class will become coroutines that returns an Awaitable.

import argparse
import asyncio

import ray

from almanak_mlops.ray_.remote import ray_actor_wrapper

parser = argparse.ArgumentParser()
parser.add_argument("--address")
args = parser.parse_args()
ray.init(address=args.address)

"""
This script demonstrates how to write an arbitrary python class to be used as a service with Ray Core.

The ray_actor_wrapper that can be applied to a target class as either a decorator or a higher order function. The wrapper returns an object that represents a service running on the Ray cluster. The wrapper also takes care of serializing and deserializing the arguments and return values of the service object.
"""


def run_remote_actor():
"""Creating a synchronous actor class"""

"""Using the ray_actor_wrapper as a decorator"""

@ray_actor_wrapper(ray)
class CounterService:
"""
Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.
"""

def __init__(self) -> None:
self._state = 0

def increment(self) -> None:
print("I am running on the ray cluster. Incrementing state")
self._state += 1

def decrement(self) -> None:
print("I am running on the ray cluster. Decrementing state")
self._state -= 1

def state(self) -> int:
print("I am running on the ray cluster. Returning state")
return self._state

counter_service = CounterService()
print(counter_service.state())
counter_service.increment()
print(counter_service.state())
counter_service.decrement()
print(counter_service.state())

class AdderService:
def __init__(self) -> None:
self._state = 0

def add(self, x) -> None:
print("I am running on the ray cluster. Adding state")
self._state += x

def state(self) -> int:
"""NOTE: Ray Actors do not support property getters and setters."""
print("I am running on the ray cluster. Returning state")
return self._state

"""Using the ray_actor_wrapper as a higher order function"""
AdderService = ray_actor_wrapper(ray)(AdderService)
adder_service = AdderService()
print(adder_service.state())
adder_service.add(42)
print(adder_service.state())


async def run_remote_actor_async():
"""Creating an asynchronous actor class"""

"""Using the ray_actor_wrapper as a decorator"""

@ray_actor_wrapper(ray, async_actor=True)
class CounterService:
"""
Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.
"""

def __init__(self) -> None:
self._state = 0

def increment(self) -> None:
print("I am running on the ray cluster. Incrementing state")
self._state += 1

def decrement(self) -> None:
print("I am running on the ray cluster. Decrementing state")
self._state -= 1

def state(self) -> int:
print("I am running on the ray cluster. Returning state")
return self._state

counter_service = CounterService()
print(await counter_service.state())
await counter_service.increment()
print(await counter_service.state())
await counter_service.decrement()
print(await counter_service.state())

class AdderService:
def __init__(self) -> None:
self._state = 0

def add(self, x) -> None:
print("I am running on the ray cluster. Adding state")
self._state += x

def state(self) -> int:
"""NOTE: Ray Actors do not support property getters and setters."""
print("I am running on the ray cluster. Returning state")
return self._state

"""Using the ray_actor_wrapper as a higher order function"""
AdderService = ray_actor_wrapper(ray, async_actor=True)(AdderService)
adder_service = AdderService()
print(await adder_service.state())
await adder_service.add(42)
print(await adder_service.state())


if __name__ == "__main__":
print("Running synchronous actor")
run_remote_actor()
print("Running asynchronous actor")
asyncio.run(run_remote_actor_async())

Connecting/submitting jobs to the Ray Cluster

Ray cluster endpoint for use with ray.init

ray cluster address: 35.204.61.141

The Almanak MLOps SDK is installed and available for use on the ray cluster.

ray-config.yaml

Create a yaml file locally with these contents:

cluster_name: almanak-cluster

provider:
type: gcp
region: europe-west4
availability_zone: europe-west4-a
project_id: almanak-production

max_workers: 5

initialization_commands:
- echo "Running initialization_commands"

head_setup_commands:
- pip install google-api-python-client==1.7.8

The ray-config.yaml provides instruction to the ray cli on how it should interact with the ray cluster.

To ssh into the ray cluster, run the shell command:

ray attach <path to the ray config yaml file>