Skip to content

Latest commit

 

History

History
431 lines (332 loc) · 13.3 KB

File metadata and controls

431 lines (332 loc) · 13.3 KB

Ray Compute Engine (contrib)

The Ray compute engine is a distributed compute implementation that leverages Ray for executing feature pipelines including transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.

Quick Start with Ray Template

Ray RAG Template - Batch Embedding at Scale

For RAG (Retrieval-Augmented Generation) applications with distributed embedding generation:

feast init -t ray_rag my_rag_project
cd my_rag_project/feature_repo

The Ray RAG template demonstrates:

  • Parallel Embedding Generation: Uses Ray compute engine to generate embeddings across multiple workers
  • Vector Search Integration: Works with Milvus for semantic similarity search
  • Complete RAG Pipeline: Data → Embeddings → Search workflow

The Ray compute engine automatically distributes the embedding generation across available workers, making it ideal for processing large datasets efficiently.

Overview

The Ray compute engine provides:

  • Distributed DAG Execution: Executes feature computation DAGs across Ray clusters
  • Intelligent Join Strategies: Automatic selection between broadcast and distributed joins
  • Lazy Evaluation: Deferred execution for optimal performance
  • Resource Management: Automatic scaling and resource optimization
  • Point-in-Time Joins: Efficient temporal joins for historical feature retrieval

Architecture

The Ray compute engine follows Feast's DAG-based architecture:

EntityDF → RayReadNode → RayJoinNode → RayFilterNode → RayAggregationNode → RayTransformationNode → Output

Core Components

Component Description
RayComputeEngine Main engine implementing ComputeEngine interface
RayFeatureBuilder Constructs DAG from Feature View definitions
RayDAGNode Ray-specific DAG node implementations
RayDAGRetrievalJob Executes retrieval plans and returns results
RayMaterializationJob Handles materialization job tracking

Configuration

Configure the Ray compute engine in your feature_store.yaml:

project: my_project
registry: data/registry.db
provider: local
offline_store:
    type: ray
    storage_path: data/ray_storage
batch_engine:
    type: ray.engine
    max_workers: 4                         # Optional: Maximum number of workers
    enable_optimization: true              # Optional: Enable performance optimizations
    broadcast_join_threshold_mb: 100       # Optional: Broadcast join threshold (MB)
    max_parallelism_multiplier: 2          # Optional: Parallelism multiplier
    target_partition_size_mb: 64           # Optional: Target partition size (MB)
    window_size_for_joins: "1H"            # Optional: Time window for distributed joins
    ray_address: localhost:10001           # Optional: Ray cluster address

Configuration Options

Option Type Default Description
type string "ray.engine" Must be ray.engine
max_workers int None (uses all cores) Maximum number of Ray workers
enable_optimization boolean true Enable performance optimizations
broadcast_join_threshold_mb int 100 Size threshold for broadcast joins (MB)
max_parallelism_multiplier int 2 Parallelism as multiple of CPU cores
target_partition_size_mb int 64 Target partition size (MB)
window_size_for_joins string "1H" Time window for distributed joins
ray_address string None Ray cluster address (triggers REMOTE mode)
use_kuberay boolean None Enable KubeRay mode (overrides ray_address)
kuberay_conf dict None KubeRay configuration dict with keys: cluster_name (required), namespace (default: "default"), auth_token, auth_server, skip_tls (default: false)
enable_ray_logging boolean false Enable Ray progress bars and logging
enable_distributed_joins boolean true Enable distributed joins for large datasets
staging_location string None Remote path for batch materialization jobs
ray_conf dict None Ray configuration parameters (memory, CPU limits)

Mode Detection Precedence

The Ray compute engine automatically detects the execution mode:

  1. Environment Variables → KubeRay mode (if FEAST_RAY_USE_KUBERAY=true)
  2. Config kuberay_conf → KubeRay mode
  3. Config ray_address → Remote mode
  4. Default → Local mode

Usage Examples

Basic Historical Feature Retrieval

from feast import FeatureStore
import pandas as pd
from datetime import datetime

# Initialize feature store with Ray compute engine
store = FeatureStore("feature_store.yaml")

# Create entity DataFrame
entity_df = pd.DataFrame({
    "driver_id": [1, 2, 3, 4, 5],
    "event_timestamp": [datetime.now()] * 5
})

# Get historical features using Ray compute engine
features = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_stats:avg_daily_trips",
        "driver_stats:total_distance"
    ]
)

# Convert to DataFrame
df = features.to_df()
print(f"Retrieved {len(df)} rows with {len(df.columns)} columns")

Batch Materialization

from datetime import datetime, timedelta

# Materialize features using Ray compute engine
store.materialize(
    start_date=datetime.now() - timedelta(days=7),
    end_date=datetime.now(),
    feature_views=["driver_stats", "customer_stats"]
)

# The Ray compute engine handles:
# - Distributed data processing
# - Optimal join strategies
# - Resource management
# - Progress tracking

Large-Scale Feature Retrieval

# Handle large entity datasets efficiently
large_entity_df = pd.DataFrame({
    "driver_id": range(1, 1000000),  # 1M entities
    "event_timestamp": [datetime.now()] * 1000000
})

# Ray compute engine automatically:
# - Partitions data optimally
# - Selects appropriate join strategies
# - Distributes computation across cluster
features = store.get_historical_features(
    entity_df=large_entity_df,
    features=[
        "driver_stats:avg_daily_trips",
        "driver_stats:total_distance",
        "customer_stats:lifetime_value"
    ]
).to_df()

Advanced Configuration

# Production-ready configuration
batch_engine:
    type: ray.engine
    # Resource configuration
    max_workers: 16
    max_parallelism_multiplier: 4
    
    # Performance optimization
    enable_optimization: true
    broadcast_join_threshold_mb: 50
    target_partition_size_mb: 128
    
    # Distributed join configuration
    window_size_for_joins: "30min"
    
    # Ray cluster configuration
    ray_address: "ray://head-node:10001"

Complete Example Configuration

Here's a complete example configuration showing how to use Ray offline store with Ray compute engine:

# Complete example configuration for Ray offline store + Ray compute engine
# This shows how to use both components together for distributed processing

project: my_feast_project
registry: data/registry.db
provider: local

# Ray offline store configuration
# Handles data I/O operations (reading/writing data)
offline_store:
    type: ray
    storage_path: s3://my-bucket/feast-data    # Optional: Path for storing datasets
    ray_address: localhost:10001               # Optional: Ray cluster address

# Ray compute engine configuration  
# Handles complex feature computation and distributed processing
batch_engine:
    type: ray.engine
    
    # Resource configuration
    max_workers: 8                             # Maximum number of Ray workers
    max_parallelism_multiplier: 2              # Parallelism as multiple of CPU cores
    
    # Performance optimization
    enable_optimization: true                  # Enable performance optimizations
    broadcast_join_threshold_mb: 100           # Broadcast join threshold (MB)
    target_partition_size_mb: 64               # Target partition size (MB)
    
    # Distributed join configuration
    window_size_for_joins: "1H"                # Time window for distributed joins
    
    # Ray cluster configuration (inherits from offline_store if not specified)
    ray_address: localhost:10001               # Ray cluster address

DAG Node Types

The Ray compute engine implements several specialized DAG nodes:

RayReadNode

Reads data from Ray-compatible sources:

  • Supports Parquet, CSV, and other formats
  • Handles partitioning and schema inference
  • Applies field mappings and filters

RayJoinNode

Performs distributed joins:

  • Broadcast Join: For small datasets (<100MB)
  • Distributed Join: For large datasets with time-based windowing
  • Automatic Strategy Selection: Based on dataset size and cluster resources

RayFilterNode

Applies filters and time-based constraints:

  • TTL-based filtering
  • Timestamp range filtering
  • Custom predicate filtering

RayAggregationNode

Handles feature aggregations:

  • Windowed aggregations
  • Grouped aggregations
  • Custom aggregation functions

RayTransformationNode

Applies feature transformations:

  • Row-level transformations
  • Column-level transformations
  • Custom transformation functions

RayWriteNode

Writes results to various targets:

  • Online stores
  • Offline stores
  • Temporary storage

Join Strategies

The Ray compute engine automatically selects optimal join strategies:

Broadcast Join

Used for small feature datasets:

  • Automatically selected when feature data < 100MB
  • Features are cached in Ray's object store
  • Entities are distributed across cluster
  • Each worker gets a copy of feature data

Distributed Windowed Join

Used for large feature datasets:

  • Automatically selected when feature data > 100MB
  • Data is partitioned by time windows
  • Point-in-time joins within each window
  • Results are combined across windows

Strategy Selection Logic

def select_join_strategy(feature_size_mb, threshold_mb):
    if feature_size_mb < threshold_mb:
        return "broadcast"
    else:
        return "distributed_windowed"

Performance Optimization

Automatic Optimization

The Ray compute engine includes several automatic optimizations:

  1. Partition Optimization: Automatically determines optimal partition sizes
  2. Join Strategy Selection: Chooses between broadcast and distributed joins
  3. Resource Allocation: Scales workers based on available resources
  4. Memory Management: Handles out-of-core processing for large datasets

Manual Tuning

For specific workloads, you can fine-tune performance:

batch_engine:
    type: ray.engine
    # Fine-tuning for high-throughput scenarios
    broadcast_join_threshold_mb: 200      # Larger broadcast threshold
    max_parallelism_multiplier: 1        # Conservative parallelism
    target_partition_size_mb: 512        # Larger partitions
    window_size_for_joins: "2H"          # Larger time windows

Monitoring and Metrics

Monitor Ray compute engine performance:

import ray

# Check cluster resources
resources = ray.cluster_resources()
print(f"Available CPUs: {resources.get('CPU', 0)}")
print(f"Available memory: {resources.get('memory', 0) / 1e9:.2f} GB")

# Monitor job progress
job = store.get_historical_features(...)
# Ray compute engine provides built-in progress tracking

Integration Examples

With Spark Offline Store

# Use Ray compute engine with Spark offline store
offline_store:
    type: spark
    spark_conf:
        spark.executor.memory: "4g"
        spark.executor.cores: "2"
batch_engine:
    type: ray.engine
    max_workers: 8
    enable_optimization: true

With Cloud Storage

# Use Ray compute engine with cloud storage
offline_store:
    type: ray
    storage_path: s3://my-bucket/feast-data
batch_engine:
    type: ray.engine
    ray_address: "ray://ray-cluster:10001"
    broadcast_join_threshold_mb: 50

With Feature Transformations

On-Demand Transformations

from feast import FeatureView, Field
from feast.types import Float64
from feast.on_demand_feature_view import on_demand_feature_view

@on_demand_feature_view(
    sources=["driver_stats"],
    schema=[Field(name="trips_per_hour", dtype=Float64)]
)
def trips_per_hour(features_df):
    features_df["trips_per_hour"] = features_df["avg_daily_trips"] / 24
    return features_df

# Ray compute engine handles transformations efficiently
features = store.get_historical_features(
    entity_df=entity_df,
    features=["trips_per_hour:trips_per_hour"]
)

Ray Native Transformations

For distributed transformations that leverage Ray's dataset and parallel processing capabilities, use mode="ray" in your BatchFeatureView:

# Feature view with Ray transformation mode
document_embeddings_view = BatchFeatureView(
    name="document_embeddings",
    entities=[document],
    mode="ray",  # Enable Ray native transformation
    ttl=timedelta(days=365),
    schema=[
        Field(name="document_id", dtype=String),
        Field(name="embedding", dtype=Array(Float32), vector_index=True),
        Field(name="movie_name", dtype=String),
        Field(name="movie_director", dtype=String),
    ],
    source=movies_source,
    udf=generate_embeddings_ray_native,
    online=True,
)

For more information, see the Ray documentation and Ray Data guide.