Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies = [

[project.optional-dependencies]
aws = ["boto3==1.38.27", "fsspec<=2024.9.0", "aiobotocore>2,<3"]
dax = ["boto3>=1.26.0", "amazon-dax-client>=2.0.0,<3"]
azure = [
"azure-storage-blob>=0.37.0",
"azure-identity>=1.6.1",
Expand Down
121 changes: 115 additions & 6 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union

from aiobotocore.config import AioConfig
from pydantic import StrictBool, StrictStr
from pydantic import StrictBool, StrictStr, model_validator

from feast import Entity, FeatureView, utils
from feast.infra.online_stores.helpers import compute_entity_id
Expand Down Expand Up @@ -62,6 +62,23 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
"""DynamoDB endpoint URL. Use for local development (e.g., http://localhost:8000)
or VPC endpoints for improved latency."""

use_dax: bool = False
"""Enable DAX (DynamoDB Accelerator) for sub-millisecond read latency.
Requires amazon-dax-client package and a running DAX cluster.

IMPORTANT: DAX is only supported for synchronous operations. When using
Feast feature server (which uses async), DAX will NOT be used and requests
will fall back to direct DynamoDB access. DAX works with:
- Direct Python SDK usage (FeatureStore.get_online_features)
- Batch operations via CLI

For async feature server workloads, consider using DynamoDB VPC endpoints
with endpoint_url configuration instead."""

dax_endpoint: Union[str, None] = None
"""DAX cluster endpoint URL (e.g., dax://my-cluster.xxx.dax-clusters.us-east-1.amazonaws.com).
Required when use_dax is True. Supports both 'dax://' and 'daxs://' (TLS) schemes."""

region: StrictStr
"""AWS Region Name"""

Expand Down Expand Up @@ -110,6 +127,16 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
'adaptive' mode provides intelligent retry with client-side rate limiting.
"""

@model_validator(mode="after")
def _validate_dax_config(self):
"""Validate that dax_endpoint is provided when use_dax is True."""
if self.use_dax and not self.dax_endpoint:
raise ValueError(
"dax_endpoint is required when use_dax is True. "
"Provide the DAX cluster endpoint URL (e.g., dax://my-cluster.xxx.dax-clusters.us-east-1.amazonaws.com)"
)
return self


class DynamoDBOnlineStore(OnlineStore):
"""
Expand Down Expand Up @@ -141,6 +168,14 @@ def __init__(self):
async def initialize(self, config: RepoConfig):
online_config = config.online_store

# Warn if DAX is enabled but async mode is being used
if online_config.use_dax and online_config.dax_endpoint:
logger.warning(
"DAX is enabled but async mode (feature server) does not support DAX. "
"Requests will use direct DynamoDB access. DAX only works with "
"synchronous operations (e.g., FeatureStore.get_online_features())."
)

await self._get_aiodynamodb_client(
online_config.region,
online_config.max_pool_connections,
Expand Down Expand Up @@ -270,15 +305,22 @@ def update(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client = self._get_dynamodb_client(
# Table operations (describe, create, delete) are NOT supported by DAX.
# Create fresh non-DAX clients directly (don't use cached _get_dynamodb_* methods
# as those may cache DAX clients for data operations).
dynamodb_client = _initialize_dynamodb_client(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
use_dax=False,
dax_endpoint=None,
)
dynamodb_resource = self._get_dynamodb_resource(
dynamodb_resource = _initialize_dynamodb_resource(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
use_dax=False,
dax_endpoint=None,
)

do_tag_updates = defaultdict(bool)
Expand Down Expand Up @@ -369,10 +411,15 @@ def teardown(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
# Table operations (delete) are NOT supported by DAX.
# Create fresh non-DAX client directly (don't use cached _get_dynamodb_resource
# as it may cache DAX client for data operations).
dynamodb_resource = _initialize_dynamodb_resource(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
use_dax=False,
dax_endpoint=None,
)

for table in tables:
Expand Down Expand Up @@ -410,6 +457,8 @@ def online_write_batch(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
online_config.use_dax,
online_config.dax_endpoint,
)

table_instance = dynamodb_resource.Table(
Expand Down Expand Up @@ -483,6 +532,8 @@ def online_read(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
online_config.use_dax,
online_config.dax_endpoint,
)
table_name = _get_table_name(online_config, config, table)

Expand Down Expand Up @@ -516,6 +567,8 @@ def online_read(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
online_config.use_dax,
online_config.dax_endpoint,
)

def fetch_batch(batch: List[str]) -> Dict[str, Any]:
Expand Down Expand Up @@ -640,10 +693,12 @@ def _get_dynamodb_client(
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
use_dax: bool = False,
dax_endpoint: Optional[str] = None,
):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(
region, endpoint_url, session_based_auth
region, endpoint_url, session_based_auth, use_dax, dax_endpoint
)
return self._dynamodb_client

Expand All @@ -652,10 +707,12 @@ def _get_dynamodb_resource(
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
use_dax: bool = False,
dax_endpoint: Optional[str] = None,
):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(
region, endpoint_url, session_based_auth
region, endpoint_url, session_based_auth, use_dax, dax_endpoint
)
return self._dynamodb_resource

Expand Down Expand Up @@ -811,6 +868,8 @@ def update_online_store(
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
online_config.use_dax,
online_config.dax_endpoint,
)

table_instance = dynamodb_resource.Table(
Expand Down Expand Up @@ -1119,7 +1178,32 @@ def _initialize_dynamodb_client(
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
use_dax: bool = False,
dax_endpoint: Optional[str] = None,
):
"""
Initialize DynamoDB client, optionally using DAX for caching.

When use_dax=True, returns a DAX client that is API-compatible with
the boto3 DynamoDB client but routes requests through DAX cluster.
"""
if use_dax and dax_endpoint:
try:
from amazondax import AmazonDaxClient

logger.info(f"Initializing DAX client with endpoint: {dax_endpoint}")
# AmazonDaxClient() constructor creates a client (not .client() method)
# endpoint_url should be in format: dax://cluster.xxx.dax-clusters.region.amazonaws.com
return AmazonDaxClient(
endpoint_url=dax_endpoint,
region_name=region,
)
except ImportError:
logger.warning(
"amazon-dax-client not installed. Install with: pip install amazon-dax-client. "
"Falling back to standard DynamoDB client."
)

if session_based_auth:
return boto3.Session().client(
"dynamodb",
Expand All @@ -1140,7 +1224,32 @@ def _initialize_dynamodb_resource(
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
use_dax: bool = False,
dax_endpoint: Optional[str] = None,
):
"""
Initialize DynamoDB resource, optionally using DAX for caching.

When use_dax=True, returns a DAX resource that is API-compatible with
the boto3 DynamoDB resource but routes requests through DAX cluster.
"""
if use_dax and dax_endpoint:
try:
from amazondax import AmazonDaxClient

logger.info(f"Initializing DAX resource with endpoint: {dax_endpoint}")
# AmazonDaxClient.resource() creates a resource interface
# endpoint_url should be in format: dax://cluster.xxx.dax-clusters.region.amazonaws.com
return AmazonDaxClient.resource(
endpoint_url=dax_endpoint,
region_name=region,
)
except ImportError:
logger.warning(
"amazon-dax-client not installed. Install with: pip install amazon-dax-client. "
"Falling back to standard DynamoDB resource."
)

if session_based_auth:
return boto3.Session().resource(
"dynamodb", region_name=region, endpoint_url=endpoint_url
Expand Down
Loading
Loading