/volga

Data Processing/Feature Calculation Engine for real-time AI/ML

Primary LanguagePythonApache License 2.0Apache-2.0

Volga - Feature Engine for real-time AI/ML

Volga is an open-source, self-serve, scalable data/feature engine tailored for modern real-time AI/ML applications. It features convenient pandas-like API to define data entities, online/offline pipelines and sources, consistent online+offline feature calculation semantics, plugable and configurable hot and cold storage, feature lookups, real-time serving and on-demand request-time calculations. It aims to be a completely standalone and self-serve system removing any heavy-weight dependency on general data processors (Flink, Spark) or cloud-based feature platforms (Tecton.ai, Fennel.ai) and can be run on a laptop or on a 1000-node cluster.

See our blog for more details on motivation and technical details.

Features:

  • Utilizes custom scalable stream processing engine using Ray Actors for orchestration, ZeroMQ for messaging and Rust for perf-critical parts. Kappa architecture - no Flink or Spark
  • Built on top of Ray - Easily integrates with Ray ecosystem (cluster/job/cloud management, model training/serving, zero-copy data transfers, etc.) as well as your custom ML infrastructure
  • Kubernetes ready, no vendor lock-in - use KubeRay to run multitenant scalable jobs or create your own deployment/scheduling logic in pure Python
  • Pure Python, no heavy JVM setups - minimal setup and maintenance efforts in production
  • Standalone - launch on your laptop or a cluster with no heavy-weight external dependencies
  • Cold + Hot storage model allows for customizable offline storage and caches, fast real-time feature queries
  • Easy to use declarative pandas-like API to simultaneously define online and offline feature pipelines, including operators like transform, filter, join, groupby/aggregate, drop, etc.
  • [Experimental] Perform heavy embedding dot products, query meta-models or simply calculate users age in milliseconds at request time using On-Demand Features

Volga's API design was inspired by Fennel.

Why use Volga

There are no self-serve open-source feature calculation engines/platforms which allow consistent online-offline pipelines without vendor lock-in, setting up and managing complex infra like Spark or Flink simultaneously and/or dependency on proprietary closed-source tech (i.e Tecton.ai, Fennel.ai, Chalk.ai etc.). Volga fills this spot. More info here.

Quick start

Define data sources

from volga.api.source.source import MysqlSource, KafkaSource

kafka = KafkaSource.get(bootstrap_servers='127.0.0.1:9092', username='root', password='')
mysql = MysqlSource.get(host='127.0.0.1', port='3306', user='root', password='', database='db')

Define input datasets

from volga.api.dataset.dataset import dataset, field, Dataset
from volga.api.source.source import source

@source(mysql.table('users'))
@dataset
class User:
    user_id: str = field(key=True)
    registered_at: datetime.datetime = field(timestamp=True)
    name: str


@source(kafka.topic('orders'), tag='online')
@source(mysql.table('orders'), tag='offline')
@dataset
class Order:
    buyer_id: str = field(key=True)
    product_id: str = field(key=True)
    product_type: str # 'ON_SALE' or 'REGULAR' 
    purchased_at: datetime.datetime = field(timestamp=True)
    product_price: float

Define feature dataset and calculation pipeline

from volga.api.dataset.pipeline import pipeline
from volga.api.dataset.aggregate import Avg, Count

@dataset
class OnSaleUserSpentInfo:
    user_id: str = field(key=True)
    product_id: str = field(key=True)
    timestamp: datetime.datetime = field(timestamp=True)

    avg_spent_7d: float
    avg_spent_1h: float
    num_purchases_1d: int

    @pipeline(inputs=[User, Order])
    def gen(cls, users: Dataset, orders: Dataset):
        on_sale_purchases = orders.filter(lambda df: df['product_type'] == 'ON_SALE')       
        per_user = on_sale_purchases.join(users, right_on=['user_id'], left_on=['buyer_id'])

        return per_user.group_by(keys=['user_id']).aggregate([
            Avg(on='product_price', window= '7d', into='avg_spent_7d'),
            Avg(on='product_price', window= '1h', into='avg_spent_1h'),
            Count(window='1d', into='num_purchases_1d'),
        ])

Define Client and Storage interfaces for online (HotStorage) and offline (ColdStorage) features

from volga.client.client import Client
from volga.storage.common.simple_in_memory_actor_storage import SimpleInMemoryActorStorage

storage = SimpleInMemoryActorStorage() # default in-memory storage, can be used as both hot and cold
client = Client(hot=storage, cold=storage)

Run offline feature calculation job and get results (i.e. for model training)

# run batch materialization job
client.materialize_offline(
    target=OnSaleUserSpentInfo, 
    source_tags={Order: 'offline'},
    paralellism=1
)

# query cold offline storage
historical_on_sale_user_spent_df = client.get_offline_data(
    dataset_name=OnSaleUserSpentInfo.__name__, 
    keys=[{'user_id': 0}], 
    start=None, end=None # whole dataset
)
historical_on_sale_user_spent_df
...
  user_id product_id                   timestamp  avg_spent_7d  avg_spent_1h  num_purchases_1d
0       0     prod_0  2024-03-27 11:26:45.514375           100           100                 1
1       0     prod_2  2024-03-27 12:24:45.514375           100           100                 2
2       0     prod_4  2024-03-27 13:22:45.514375           100           100                 3
3       0     prod_6  2024-03-27 14:20:45.514375           100           100                 4
4       0     prod_8  2024-03-27 15:18:45.514375           100           100                 5

Run online feature calculation job and query real-time updates (i.e. for model inference)

# run real-time job
client.materialize_online(
    target=OnSaleUserSpentInfo, 
    source_tags={Order: 'online'},
    scaling_config={'Join_1': 4}, # optional parallelism per operator
    _async=True
)

# query hot cache
live_on_sale_user_spent = None
while True:
    res = client.get_online_latest_data(
        dataset_name=OnSaleUserSpentInfo.__name__, 
        keys=[{'user_id': 0}]
    )
    if live_on_sale_user_spent == res:
        # skip same event
        continue
    live_on_sale_user_spent = res
    print(f'[{time.time()}]{res}')
[1711537166.856853][{'user_id': '0', 'product_id': 'prod_0', 'timestamp': '2024-03-27 14:59:20.124752', 'avg_spent_7d': 100, 'avg_spent_1h': 100, 'num_purchases_1d': 1}]
[1711537167.867083][{'user_id': '0', 'product_id': 'prod_2', 'timestamp': '2024-03-27 15:57:20.124752', 'avg_spent_7d': 100, 'avg_spent_1h': 100, 'num_purchases_1d': 2}]
[1711537169.8647628][{'user_id': '0', 'product_id': 'prod_4', 'timestamp': '2024-03-27 16:55:20.124752', 'avg_spent_7d': 100, 'avg_spent_1h': 100, 'num_purchases_1d': 3}]
...

On-Demand Features (Experimental work in progress)

On-Demand features allow performing stateless transformations at request time, both in online and offline setting. This can be helpful in cases when transformation is too resource-heavy for streaming or when input data is available only at request time (e.g. GPS coordinates, meta-model outputs, etc.)

Define resulting @dataset with @on_demand function. On-Demand features can depend on regular datasets (with @pipeline function) as well as other on-demand datasets - this is configured via deps=[Dataset] parameter. When materialization is launched (both offline and online), the framework builds on-demand task DAG and executes it in parallel on on-demand worker pool (Ray Actors).

from volga.on_demand.on_demand import on_demand

@dataset
class UserOnSaleTransactionTooBig:
    user_id: str = field(key=True)
    tx_id: str = field(key=True)
    tx_ts: datetime.datetime = field(timestamp=True)

    above_7d_avg: bool
    above_1h_avg: bool

    @on_demand(deps=[OnSaleUserSpentInfo])
    def gen(cls, ts: datetime.datetime, transaction: Dict):
        on_sale_spent_info = OnSaleUserSpentInfo.get(keys=[{'user_id': transaction['user_id'], 'product_id': transaction['product_id']}], ts=ts) # returns Dict-like object
        above_7d_avg = transaction['tx_amount'] > on_sale_spent_info['avg_spent_7d']
        above_1h_avg = transaction['tx_amount'] > on_sale_spent_info['avg_spent_1h']
        
        # output schema should match dataset schema
        return {
            'user_id': transaction['user_id'],
            'tx_id': transaction['tx_id'],
            'tx_ts': ts,
            'above_7d_avg': above_7d_avg,
            'above_1h_avg': above_1h_avg
        }

Calculate on-demand transformation, this can be done for both online and offline storages. This step will first check availability of all dependant datasets and will fail if not present.

res = client.get_on_demand(
    target=UserOnSaleTransactionTooBig,
    online=True, # False for offline storage source
    start=None, end=None, # datetime range in case of offline request
    inputs=[{
        'user_id': '1',
        'tx_id': 'tx_0',
        'product_id': 'prod_0',
        'tx_amount': 150
    }]
)
...

{'user_id': '1', 'timestamp': '2024-03-27 14:59:20.124752', 'tx_id': 'tx_0', 'above_7d_avg': 'false', 'above_1h_avg': 'true'}

Installation

The project is currently in dev stage and has no published packages. To run locally/dev locally, clone the repository and in your dev env run:

pip install .

After that make sure to compile rust binaries and build Python binding using PyO3's maturin :

cd rust
maturin develop

If on Apple Silicon, when installing Ray you may get an error regarding grpcio on Apple Silicon. To fix, run:

pip uninstall grpcio; conda install grpcio

To enable graph visualization install GraphViz:

brew install graphviz
pip install pygraphviz

To develop locally and have Ray pick up local changes, you need to uninstall local volga package and append PYTHONPATH env var with a path to your local volga project folder:

pip uninstall volga
export PYTHONPATH=/path/to/local/volga/folder:$PYTHONPATH # you need to set it in each new shell/window. Alternatively, use env vars setting in you virtual env maanger

Running locally

Since Volga uses Ray's distributed runtime you'll need a running Ray Cluster to run pipelines. The easiest way is to launch a local one-node cluster:

ray start --head

Make sure your program's entry point is in the same virtual env where you launched the cluster. You can run sample e2e tests to see the engine's workflow:

python test_volga_e2e.py

python test_streaming_e2e.py

The development is done with Python 3.10.8 and Ray 2.22.0, in case of any import/installation related errors, please try rolling your dev env back to these version

Development

Volga is in a proof of concept state and requires some key features to be prod-ready (checkpointing and state backend, watermarks, etc.), you can see the backlog here

BACKLOG

Any feedback is extremely valuable, issues and PRs are always welcome.