Skip to content

feat: Add async DynamoDB timeout and retry configuration #5178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
keepalive_timeout: float = 12.0
"""Keep-alive timeout in seconds for async Dynamodb connections."""

connect_timeout: Union[int, float] = 60
"""The time in seconds until a timeout exception is thrown when attempting to make
an async connection."""

read_timeout: Union[int, float] = 60
"""The time in seconds until a timeout exception is thrown when attempting to read
from an async connection."""

total_max_retry_attempts: Union[int, None] = None
"""Maximum number of total attempts that will be made on a single request.

Maps to `retries.total_max_attempts` in botocore.config.Config.
"""

retry_mode: Union[Literal["legacy", "standard", "adaptive"], None] = None
"""The type of retry mode (aio)botocore should use.

Maps to `retries.mode` in botocore.config.Config.
"""


class DynamoDBOnlineStore(OnlineStore):
"""
Expand All @@ -99,10 +119,16 @@ class DynamoDBOnlineStore(OnlineStore):
_dynamodb_resource = None

async def initialize(self, config: RepoConfig):
online_config = config.online_store

await _get_aiodynamodb_client(
config.online_store.region,
config.online_store.max_pool_connections,
config.online_store.keepalive_timeout,
online_config.region,
online_config.max_pool_connections,
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.total_max_retry_attempts,
online_config.retry_mode,
)

async def close(self):
Expand Down Expand Up @@ -280,6 +306,10 @@ async def online_write_batch_async(
online_config.region,
online_config.max_pool_connections,
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.total_max_retry_attempts,
online_config.retry_mode,
)
await dynamo_write_items_async(client, table_name, items)

Expand Down Expand Up @@ -387,6 +417,10 @@ def to_tbl_resp(raw_client_response):
online_config.region,
online_config.max_pool_connections,
online_config.keepalive_timeout,
online_config.connect_timeout,
online_config.read_timeout,
online_config.total_max_retry_attempts,
online_config.retry_mode,
)
response_batches = await asyncio.gather(
*[
Expand Down Expand Up @@ -546,16 +580,32 @@ def _get_aioboto_session():


async def _get_aiodynamodb_client(
region: str, max_pool_connections: int, keepalive_timeout: float
region: str,
max_pool_connections: int,
keepalive_timeout: float,
connect_timeout: Union[int, float],
read_timeout: Union[int, float],
total_max_retry_attempts: Union[int, None],
retry_mode: Union[Literal["legacy", "standard", "adaptive"], None],
):
global _aioboto_client
if _aioboto_client is None:
logger.debug("initializing the aiobotocore dynamodb client")

retries: Dict[str, Any] = {}
if total_max_retry_attempts is not None:
retries["total_max_attempts"] = total_max_retry_attempts
if retry_mode is not None:
retries["mode"] = retry_mode

client_context = _get_aioboto_session().create_client(
"dynamodb",
region_name=region,
config=AioConfig(
max_pool_connections=max_pool_connections,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
retries=retries if retries else None,
connector_args={"keepalive_timeout": keepalive_timeout},
),
)
Expand Down
Loading