From 64efedf6a8bc4224b9d142db7f2fbf0842f51e0b Mon Sep 17 00:00:00 2001 From: JordonPhillips Date: Fri, 27 Mar 2026 16:23:28 +0100 Subject: [PATCH] Refactor RetryStrategy to be async This refactors retry strategies to be async. This is needed for when strategies need to internally wait or if they need to protect access to a shared resource. --- .../python/codegen/ClientGenerator.java | 4 +- .../codegen/HttpProtocolTestGenerator.java | 2 +- .../codegen/generators/ConfigGenerator.java | 2 +- .../src/smithy_aws_core/identity/imds.py | 4 +- .../tests/unit/identity/test_imds.py | 2 +- ...king-e1d59c61d8344b88832fc481dbc16531.json | 4 + .../smithy-core/src/smithy_core/aio/client.py | 8 +- .../src/smithy_core/aio/interfaces/retries.py | 56 +++++ .../src/smithy_core/aio/retries.py | 227 ++++++++++++++++++ .../src/smithy_core/interfaces/retries.py | 51 ---- .../smithy-core/src/smithy_core/retries.py | 216 +---------------- .../tests/functional/test_retries.py | 10 +- .../tests/unit/aio/test_retries.py | 168 +++++++++++++ .../smithy-core/tests/unit/test_retries.py | 155 ------------ 14 files changed, 473 insertions(+), 436 deletions(-) create mode 100644 packages/smithy-core/.changes/next-release/smithy-core-breaking-e1d59c61d8344b88832fc481dbc16531.json create mode 100644 packages/smithy-core/src/smithy_core/aio/interfaces/retries.py create mode 100644 packages/smithy-core/src/smithy_core/aio/retries.py create mode 100644 packages/smithy-core/tests/unit/aio/test_retries.py diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java index a56aff818..24f87ef78 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java @@ -71,7 +71,7 @@ private void generateService(PythonWriter writer) { } writer.addDependency(SmithyPythonDependency.SMITHY_CORE); - writer.addImport("smithy_core.retries", "RetryStrategyResolver"); + writer.addImport("smithy_core.aio.retries", "RetryStrategyResolver"); writer.write(""" def __init__(self, config: $1T | None = None, plugins: list[$2T] | None = None): $3C @@ -215,7 +215,7 @@ private void writeSharedOperationInit( writer.addImport("smithy_core.aio.client", "RequestPipeline"); writer.addImport("smithy_core.exceptions", "ExpectationNotMetError"); writer.addImport("smithy_core.retries", "RetryStrategyOptions"); - writer.addImport("smithy_core.interfaces.retries", "RetryStrategy"); + writer.addImport("smithy_core.aio.interfaces.retries", "RetryStrategy"); writer.addStdlibImport("copy", "deepcopy"); writer.write(""" diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java index 0a06f15bf..5adf93f95 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/HttpProtocolTestGenerator.java @@ -181,7 +181,7 @@ private void generateRequestTest(OperationShape operation, HttpRequestTestCase t } else { path = ""; } - writer.addImport("smithy_core.retries", "SimpleRetryStrategy"); + writer.addImport("smithy_core.aio.retries", "SimpleRetryStrategy"); writeClientBlock(context.symbolProvider().toSymbol(service), testCase, Optional.of(() -> { writer.write(""" config = $T( diff --git a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java index de03d42d0..333362160 100644 --- a/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java +++ b/codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ConfigGenerator.java @@ -58,7 +58,7 @@ public final class ConfigGenerator implements Runnable { .name("RetryStrategy | RetryStrategyOptions") .addReference(Symbol.builder() .name("RetryStrategy") - .namespace("smithy_core.interfaces.retries", ".") + .namespace("smithy_core.aio.interfaces.retries", ".") .addDependency(SmithyPythonDependency.SMITHY_CORE) .build()) .addReference(Symbol.builder() diff --git a/packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py b/packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py index c2b266e90..737d37d76 100644 --- a/packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py +++ b/packages/smithy-aws-core/src/smithy_aws_core/identity/imds.py @@ -9,9 +9,9 @@ from smithy_core import URI from smithy_core.aio.interfaces.identity import IdentityResolver +from smithy_core.aio.interfaces.retries import RetryStrategy +from smithy_core.aio.retries import SimpleRetryStrategy from smithy_core.exceptions import SmithyIdentityError -from smithy_core.interfaces.retries import RetryStrategy -from smithy_core.retries import SimpleRetryStrategy from smithy_http import Field, Fields from smithy_http.aio import HTTPRequest from smithy_http.aio.interfaces import HTTPClient diff --git a/packages/smithy-aws-core/tests/unit/identity/test_imds.py b/packages/smithy-aws-core/tests/unit/identity/test_imds.py index 81a1df95f..f5146fbe3 100644 --- a/packages/smithy-aws-core/tests/unit/identity/test_imds.py +++ b/packages/smithy-aws-core/tests/unit/identity/test_imds.py @@ -16,7 +16,7 @@ TokenCache, ) from smithy_core import URI -from smithy_core.retries import SimpleRetryStrategy +from smithy_core.aio.retries import SimpleRetryStrategy from smithy_http.aio import HTTPRequest diff --git a/packages/smithy-core/.changes/next-release/smithy-core-breaking-e1d59c61d8344b88832fc481dbc16531.json b/packages/smithy-core/.changes/next-release/smithy-core-breaking-e1d59c61d8344b88832fc481dbc16531.json new file mode 100644 index 000000000..484d18598 --- /dev/null +++ b/packages/smithy-core/.changes/next-release/smithy-core-breaking-e1d59c61d8344b88832fc481dbc16531.json @@ -0,0 +1,4 @@ +{ + "type": "breaking", + "description": "Refactored retry strategies to be async, allowing them to wait internally or use async synchronization primitives if necessary." +} \ No newline at end of file diff --git a/packages/smithy-core/src/smithy_core/aio/client.py b/packages/smithy-core/src/smithy_core/aio/client.py index 6060727b4..e84c9a94b 100644 --- a/packages/smithy-core/src/smithy_core/aio/client.py +++ b/packages/smithy-core/src/smithy_core/aio/client.py @@ -22,7 +22,6 @@ ) from ..interfaces import Endpoint, TypedProperties from ..interfaces.auth import AuthOption, AuthSchemeResolver -from ..interfaces.retries import RetryStrategy from ..schemas import APIOperation from ..serializers import SerializeableShape from ..shapes import ShapeID @@ -37,6 +36,7 @@ ) from .interfaces.auth import AuthScheme from .interfaces.eventstream import EventReceiver +from .interfaces.retries import RetryStrategy from .utils import seek if TYPE_CHECKING: @@ -330,7 +330,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape]( return await self._handle_attempt(call, request_context, request_future) retry_strategy = call.retry_strategy - retry_token = retry_strategy.acquire_initial_retry_token( + retry_token = await retry_strategy.acquire_initial_retry_token( token_scope=call.retry_scope ) @@ -349,7 +349,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape]( if isinstance(output_context.response, Exception): try: - retry_token = retry_strategy.refresh_retry_token_for_retry( + retry_token = await retry_strategy.refresh_retry_token_for_retry( token_to_renew=retry_token, error=output_context.response, ) @@ -364,7 +364,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape]( await seek(request_context.transport_request.body, 0) else: - retry_strategy.record_success(token=retry_token) + await retry_strategy.record_success(token=retry_token) return output_context async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape]( diff --git a/packages/smithy-core/src/smithy_core/aio/interfaces/retries.py b/packages/smithy-core/src/smithy_core/aio/interfaces/retries.py new file mode 100644 index 000000000..fbe188b55 --- /dev/null +++ b/packages/smithy-core/src/smithy_core/aio/interfaces/retries.py @@ -0,0 +1,56 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from typing import Protocol, runtime_checkable + +from ...interfaces.retries import RetryBackoffStrategy, RetryToken + + +@runtime_checkable +class RetryStrategy(Protocol): + """Issuer of :py:class:`RetryToken`s.""" + + backoff_strategy: RetryBackoffStrategy + """The strategy used by returned tokens to compute delay duration values.""" + + max_attempts: int + """Upper limit on total attempt count (initial attempt plus retries).""" + + async def acquire_initial_retry_token( + self, *, token_scope: str | None = None + ) -> RetryToken: + """Create a base retry token for the start of a request. + + :param token_scope: An arbitrary string accepted by the retry strategy to + separate tokens into scopes. + :returns: A retry token, to be used for determining the retry delay, refreshing + the token after a failure, and recording success after success. + :raises RetryError: If the retry strategy has no available tokens. + """ + ... + + async def refresh_retry_token_for_retry( + self, *, token_to_renew: RetryToken, error: Exception + ) -> RetryToken: + """Replace an existing retry token from a failed attempt with a new token. + + After a failed operation call, this method is called to exchange a retry token + that was previously obtained by calling :py:func:`acquire_initial_retry_token` + or this method with a new retry token for the next attempt. This method can + either choose to allow another retry and send a new or updated token, or reject + the retry attempt and raise the error. + + :param token_to_renew: The token used for the previous failed attempt. + :param error: The error that triggered the need for a retry. + :raises RetryError: If no further retry attempts are allowed. + """ + ... + + async def record_success(self, *, token: RetryToken) -> None: + """Return token after successful completion of an operation. + + Upon successful completion of the operation, a user calls this function to + record that the operation was successful. + + :param token: The token used for the previous successful attempt. + """ + ... diff --git a/packages/smithy-core/src/smithy_core/aio/retries.py b/packages/smithy-core/src/smithy_core/aio/retries.py new file mode 100644 index 000000000..bf9d255ed --- /dev/null +++ b/packages/smithy-core/src/smithy_core/aio/retries.py @@ -0,0 +1,227 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +from functools import lru_cache +from typing import Any, Literal + +from ..exceptions import RetryError +from ..interfaces import retries as retries_interface +from ..retries import ( + ExponentialBackoffJitterType, + ExponentialRetryBackoffStrategy, + RetryStrategyOptions, + SimpleRetryToken, + StandardRetryQuota, + StandardRetryToken, +) +from .interfaces.retries import RetryStrategy + +RetryStrategyType = Literal["simple", "standard"] + + +class RetryStrategyResolver: + """Retry strategy resolver that caches retry strategies based on configuration options. + + This resolver caches retry strategy instances based on their configuration to reuse existing + instances of RetryStrategy with the same settings. Uses LRU cache for thread-safe caching. + """ + + async def resolve_retry_strategy( + self, *, retry_strategy: RetryStrategy | RetryStrategyOptions | None + ) -> RetryStrategy: + """Resolve a retry strategy from the provided options, using cache when possible. + + :param retry_strategy: An explicitly configured retry strategy or options for creating one. + """ + if isinstance(retry_strategy, RetryStrategy): + return retry_strategy + elif retry_strategy is None: + retry_strategy = RetryStrategyOptions() + elif not isinstance(retry_strategy, RetryStrategyOptions): # type: ignore[reportUnnecessaryIsInstance] + raise TypeError( + f"retry_strategy must be RetryStrategy, RetryStrategyOptions, or None, " + f"got {type(retry_strategy).__name__}" + ) + return self._create_retry_strategy( + retry_strategy.retry_mode, retry_strategy.max_attempts + ) + + @lru_cache + def _create_retry_strategy( + self, retry_mode: RetryStrategyType, max_attempts: int | None + ) -> RetryStrategy: + kwargs = {"max_attempts": max_attempts} + filtered_kwargs: dict[str, Any] = { + k: v for k, v in kwargs.items() if v is not None + } + match retry_mode: + case "simple": + return SimpleRetryStrategy(**filtered_kwargs) + case "standard": + return StandardRetryStrategy(**filtered_kwargs) + case _: + raise ValueError(f"Unknown retry mode: {retry_mode}") + + +class SimpleRetryStrategy: + def __init__( + self, + *, + backoff_strategy: retries_interface.RetryBackoffStrategy | None = None, + max_attempts: int = 5, + ): + """Retry strategy that simply invokes the given backoff strategy. + + :param backoff_strategy: The backoff strategy used by returned tokens to compute + the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`. + + :param max_attempts: Upper limit on total number of attempts made, including + initial attempt and retries. + """ + self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy() + self.max_attempts = max_attempts + + async def acquire_initial_retry_token( + self, *, token_scope: str | None = None + ) -> SimpleRetryToken: + """Create a base retry token for the start of a request. + + :param token_scope: This argument is ignored by this retry strategy. + """ + retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) + return SimpleRetryToken(retry_count=0, retry_delay=retry_delay) + + async def refresh_retry_token_for_retry( + self, + *, + token_to_renew: retries_interface.RetryToken, + error: Exception, + ) -> SimpleRetryToken: + """Replace an existing retry token from a failed attempt with a new token. + + This retry strategy always returns a token until the attempt count stored in + the new token exceeds the ``max_attempts`` value. + + :param token_to_renew: The token used for the previous failed attempt. + :param error: The error that triggered the need for a retry. + :raises RetryError: If no further retry attempts are allowed. + """ + if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: + retry_count = token_to_renew.retry_count + 1 + if retry_count >= self.max_attempts: + raise RetryError( + f"Reached maximum number of allowed attempts: {self.max_attempts}" + ) from error + retry_delay = self.backoff_strategy.compute_next_backoff_delay(retry_count) + return SimpleRetryToken(retry_count=retry_count, retry_delay=retry_delay) + else: + raise RetryError(f"Error is not retryable: {error}") from error + + async def record_success(self, *, token: retries_interface.RetryToken) -> None: + """Not used by this retry strategy.""" + + def __deepcopy__(self, memo: Any) -> "SimpleRetryStrategy": + return self + + +class StandardRetryStrategy: + def __init__( + self, + *, + backoff_strategy: retries_interface.RetryBackoffStrategy | None = None, + max_attempts: int = 3, + retry_quota: StandardRetryQuota | None = None, + ): + """Standard retry strategy using truncated binary exponential backoff + with full jitter. + + :param backoff_strategy: The backoff strategy used by returned tokens to compute + the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`. + + :param max_attempts: Upper limit on total number of attempts made, including + initial attempt and retries. + + :param retry_quota: The retry quota to use for managing retry capacity. Defaults + to a new :py:class:`StandardRetryQuota` instance. + """ + if max_attempts < 0: + raise ValueError( + f"max_attempts must be a non-negative integer, got {max_attempts}" + ) + + self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy( + backoff_scale_value=1, + max_backoff=20, + jitter_type=ExponentialBackoffJitterType.FULL, + ) + self.max_attempts = max_attempts + self._retry_quota = retry_quota or StandardRetryQuota() + + async def acquire_initial_retry_token( + self, *, token_scope: str | None = None + ) -> StandardRetryToken: + """Create a base retry token for the start of a request. + + :param token_scope: This argument is ignored by this retry strategy. + """ + retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) + return StandardRetryToken(retry_count=0, retry_delay=retry_delay) + + async def refresh_retry_token_for_retry( + self, + *, + token_to_renew: retries_interface.RetryToken, + error: Exception, + ) -> StandardRetryToken: + """Replace an existing retry token from a failed attempt with a new token. + + This retry strategy always returns a token until the attempt count stored in + the new token exceeds the ``max_attempts`` value. + + :param token_to_renew: The token used for the previous failed attempt. + :param error: The error that triggered the need for a retry. + :raises RetryError: If no further retry attempts are allowed. + """ + if not isinstance(token_to_renew, StandardRetryToken): + raise TypeError( + f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}" + ) + + if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: + retry_count = token_to_renew.retry_count + 1 + if retry_count >= self.max_attempts: + raise RetryError( + f"Reached maximum number of allowed attempts: {self.max_attempts}" + ) from error + + # Acquire additional quota for this retry attempt + # (may raise a RetryError if none is available) + quota_acquired = self._retry_quota.acquire(error=error) + + if error.retry_after is not None: + retry_delay = error.retry_after + else: + retry_delay = self.backoff_strategy.compute_next_backoff_delay( + retry_count + ) + + return StandardRetryToken( + retry_count=retry_count, + retry_delay=retry_delay, + quota_acquired=quota_acquired, + ) + else: + raise RetryError(f"Error is not retryable: {error}") from error + + async def record_success(self, *, token: retries_interface.RetryToken) -> None: + """Release retry quota back based on the amount consumed by the last retry. + + :param token: The token used for the previous successful attempt. + """ + if not isinstance(token, StandardRetryToken): + raise TypeError( + f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}" + ) + self._retry_quota.release(release_amount=token.quota_acquired) + + def __deepcopy__(self, memo: Any) -> "StandardRetryStrategy": + return self diff --git a/packages/smithy-core/src/smithy_core/interfaces/retries.py b/packages/smithy-core/src/smithy_core/interfaces/retries.py index a7f8c4e2d..cff7cb498 100644 --- a/packages/smithy-core/src/smithy_core/interfaces/retries.py +++ b/packages/smithy-core/src/smithy_core/interfaces/retries.py @@ -53,54 +53,3 @@ class RetryToken(Protocol): retry_delay: float """Delay in seconds to wait before the retry attempt.""" - - -@runtime_checkable -class RetryStrategy(Protocol): - """Issuer of :py:class:`RetryToken`s.""" - - backoff_strategy: RetryBackoffStrategy - """The strategy used by returned tokens to compute delay duration values.""" - - max_attempts: int - """Upper limit on total attempt count (initial attempt plus retries).""" - - def acquire_initial_retry_token( - self, *, token_scope: str | None = None - ) -> RetryToken: - """Create a base retry token for the start of a request. - - :param token_scope: An arbitrary string accepted by the retry strategy to - separate tokens into scopes. - :returns: A retry token, to be used for determining the retry delay, refreshing - the token after a failure, and recording success after success. - :raises RetryError: If the retry strategy has no available tokens. - """ - ... - - def refresh_retry_token_for_retry( - self, *, token_to_renew: RetryToken, error: Exception - ) -> RetryToken: - """Replace an existing retry token from a failed attempt with a new token. - - After a failed operation call, this method is called to exchange a retry token - that was previously obtained by calling :py:func:`acquire_initial_retry_token` - or this method with a new retry token for the next attempt. This method can - either choose to allow another retry and send a new or updated token, or reject - the retry attempt and raise the error. - - :param token_to_renew: The token used for the previous failed attempt. - :param error: The error that triggered the need for a retry. - :raises RetryError: If no further retry attempts are allowed. - """ - ... - - def record_success(self, *, token: RetryToken) -> None: - """Return token after successful completion of an operation. - - Upon successful completion of the operation, a user calls this function to - record that the operation was successful. - - :param token: The token used for the previous successful attempt. - """ - ... diff --git a/packages/smithy-core/src/smithy_core/retries.py b/packages/smithy-core/src/smithy_core/retries.py index 67c63c1a3..1b106c68c 100644 --- a/packages/smithy-core/src/smithy_core/retries.py +++ b/packages/smithy-core/src/smithy_core/retries.py @@ -5,12 +5,10 @@ from collections.abc import Callable from dataclasses import dataclass from enum import Enum -from functools import lru_cache -from typing import Any, Literal +from typing import Literal from .exceptions import RetryError from .interfaces import retries as retries_interface -from .interfaces.retries import RetryStrategy RetryStrategyType = Literal["simple", "standard"] @@ -26,50 +24,6 @@ class RetryStrategyOptions: """Maximum number of attempts (initial attempt plus retries). If None, uses the strategy's default.""" -class RetryStrategyResolver: - """Retry strategy resolver that caches retry strategies based on configuration options. - - This resolver caches retry strategy instances based on their configuration to reuse existing - instances of RetryStrategy with the same settings. Uses LRU cache for thread-safe caching. - """ - - async def resolve_retry_strategy( - self, *, retry_strategy: RetryStrategy | RetryStrategyOptions | None - ) -> RetryStrategy: - """Resolve a retry strategy from the provided options, using cache when possible. - - :param retry_strategy: An explicitly configured retry strategy or options for creating one. - """ - if isinstance(retry_strategy, RetryStrategy): - return retry_strategy - elif retry_strategy is None: - retry_strategy = RetryStrategyOptions() - elif not isinstance(retry_strategy, RetryStrategyOptions): # type: ignore[reportUnnecessaryIsInstance] - raise TypeError( - f"retry_strategy must be RetryStrategy, RetryStrategyOptions, or None, " - f"got {type(retry_strategy).__name__}" - ) - return self._create_retry_strategy( - retry_strategy.retry_mode, retry_strategy.max_attempts - ) - - @lru_cache - def _create_retry_strategy( - self, retry_mode: RetryStrategyType, max_attempts: int | None - ) -> RetryStrategy: - kwargs = {"max_attempts": max_attempts} - filtered_kwargs: dict[str, Any] = { - k: v for k, v in kwargs.items() if v is not None - } - match retry_mode: - case "simple": - return SimpleRetryStrategy(**filtered_kwargs) - case "standard": - return StandardRetryStrategy(**filtered_kwargs) - case _: - raise ValueError(f"Unknown retry mode: {retry_mode}") - - class ExponentialBackoffJitterType(Enum): """Jitter mode for exponential backoff. @@ -231,8 +185,7 @@ def _next_delay_decorrelated_jitter(self, previous_delay: float) -> float: class SimpleRetryToken: """Basic retry token that stores only the attempt count and backoff strategy. - Retry tokens should always be obtained from an implementation of - :py:class:`retries_interface.RetryStrategy`. + Retry tokens should always be obtained from a retry strategy implementation. """ retry_count: int @@ -247,67 +200,6 @@ def attempt_count(self) -> int: return self.retry_count + 1 -class SimpleRetryStrategy(retries_interface.RetryStrategy): - def __init__( - self, - *, - backoff_strategy: retries_interface.RetryBackoffStrategy | None = None, - max_attempts: int = 5, - ): - """Basic retry strategy that simply invokes the given backoff strategy. - - :param backoff_strategy: The backoff strategy used by returned tokens to compute - the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`. - - :param max_attempts: Upper limit on total number of attempts made, including - initial attempt and retries. - """ - self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy() - self.max_attempts = max_attempts - - def acquire_initial_retry_token( - self, *, token_scope: str | None = None - ) -> SimpleRetryToken: - """Create a base retry token for the start of a request. - - :param token_scope: This argument is ignored by this retry strategy. - """ - retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) - return SimpleRetryToken(retry_count=0, retry_delay=retry_delay) - - def refresh_retry_token_for_retry( - self, - *, - token_to_renew: retries_interface.RetryToken, - error: Exception, - ) -> SimpleRetryToken: - """Replace an existing retry token from a failed attempt with a new token. - - This retry strategy always returns a token until the attempt count stored in - the new token exceeds the ``max_attempts`` value. - - :param token_to_renew: The token used for the previous failed attempt. - :param error: The error that triggered the need for a retry. - :raises RetryError: If no further retry attempts are allowed. - """ - if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: - retry_count = token_to_renew.retry_count + 1 - if retry_count >= self.max_attempts: - raise RetryError( - f"Reached maximum number of allowed attempts: {self.max_attempts}" - ) from error - retry_delay = self.backoff_strategy.compute_next_backoff_delay(retry_count) - return SimpleRetryToken(retry_count=retry_count, retry_delay=retry_delay) - else: - raise RetryError(f"Error is not retryable: {error}") from error - - def record_success(self, *, token: retries_interface.RetryToken) -> None: - """Not used by this retry strategy.""" - - def __deepcopy__(self, memo: Any) -> "SimpleRetryStrategy": - return self - - class StandardRetryQuota: """Retry quota used by :py:class:`StandardRetryStrategy`.""" @@ -376,107 +268,3 @@ class StandardRetryToken: quota_acquired: int = 0 """The amount of quota acquired for this retry attempt.""" - - -class StandardRetryStrategy(retries_interface.RetryStrategy): - def __init__( - self, - *, - backoff_strategy: retries_interface.RetryBackoffStrategy | None = None, - max_attempts: int = 3, - retry_quota: StandardRetryQuota | None = None, - ): - """Standard retry strategy using truncated binary exponential backoff with full - jitter. - - :param backoff_strategy: The backoff strategy used by returned tokens to compute - the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`. - - :param max_attempts: Upper limit on total number of attempts made, including - initial attempt and retries. - - :param retry_quota: The retry quota to use for managing retry capacity. Defaults - to a new :py:class:`StandardRetryQuota` instance. - """ - if max_attempts < 0: - raise ValueError( - f"max_attempts must be a non-negative integer, got {max_attempts}" - ) - - self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy( - backoff_scale_value=1, - max_backoff=20, - jitter_type=ExponentialBackoffJitterType.FULL, - ) - self.max_attempts = max_attempts - self._retry_quota = retry_quota or StandardRetryQuota() - - def acquire_initial_retry_token( - self, *, token_scope: str | None = None - ) -> StandardRetryToken: - """Create a base retry token for the start of a request. - - :param token_scope: This argument is ignored by this retry strategy. - """ - retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) - return StandardRetryToken(retry_count=0, retry_delay=retry_delay) - - def refresh_retry_token_for_retry( - self, - *, - token_to_renew: retries_interface.RetryToken, - error: Exception, - ) -> StandardRetryToken: - """Replace an existing retry token from a failed attempt with a new token. - - This retry strategy always returns a token until the attempt count stored in - the new token exceeds the ``max_attempts`` value. - - :param token_to_renew: The token used for the previous failed attempt. - :param error: The error that triggered the need for a retry. - :raises RetryError: If no further retry attempts are allowed. - """ - if not isinstance(token_to_renew, StandardRetryToken): - raise TypeError( - f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}" - ) - - if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: - retry_count = token_to_renew.retry_count + 1 - if retry_count >= self.max_attempts: - raise RetryError( - f"Reached maximum number of allowed attempts: {self.max_attempts}" - ) from error - - # Acquire additional quota for this retry attempt - # (may raise a RetryError if none is available) - quota_acquired = self._retry_quota.acquire(error=error) - - if error.retry_after is not None: - retry_delay = error.retry_after - else: - retry_delay = self.backoff_strategy.compute_next_backoff_delay( - retry_count - ) - - return StandardRetryToken( - retry_count=retry_count, - retry_delay=retry_delay, - quota_acquired=quota_acquired, - ) - else: - raise RetryError(f"Error is not retryable: {error}") from error - - def record_success(self, *, token: retries_interface.RetryToken) -> None: - """Release retry quota back based on the amount consumed by the last retry. - - :param token: The token used for the previous successful attempt. - """ - if not isinstance(token, StandardRetryToken): - raise TypeError( - f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}" - ) - self._retry_quota.release(release_amount=token.quota_acquired) - - def __deepcopy__(self, memo: Any) -> "StandardRetryStrategy": - return self diff --git a/packages/smithy-core/tests/functional/test_retries.py b/packages/smithy-core/tests/functional/test_retries.py index 9a72b491b..4889ebe37 100644 --- a/packages/smithy-core/tests/functional/test_retries.py +++ b/packages/smithy-core/tests/functional/test_retries.py @@ -4,13 +4,13 @@ from asyncio import gather, sleep import pytest +from smithy_core.aio.interfaces import retries as retries_interface +from smithy_core.aio.retries import StandardRetryStrategy from smithy_core.exceptions import CallError, ClientTimeoutError, RetryError -from smithy_core.interfaces import retries as retries_interface from smithy_core.retries import ( ExponentialBackoffJitterType, ExponentialRetryBackoffStrategy, StandardRetryQuota, - StandardRetryStrategy, ) @@ -19,7 +19,7 @@ async def retry_operation( strategy: retries_interface.RetryStrategy, responses: list[int | Exception], ) -> tuple[str, int]: - token = strategy.acquire_initial_retry_token() + token = await strategy.acquire_initial_retry_token() response_iter = iter(responses) while True: @@ -31,7 +31,7 @@ async def retry_operation( # Success case if response == 200: - strategy.record_success(token=token) + await strategy.record_success(token=token) return "success", attempt # Error case - either status code or exception @@ -45,7 +45,7 @@ async def retry_operation( ) try: - token = strategy.refresh_retry_token_for_retry( + token = await strategy.refresh_retry_token_for_retry( token_to_renew=token, error=error ) except RetryError: diff --git a/packages/smithy-core/tests/unit/aio/test_retries.py b/packages/smithy-core/tests/unit/aio/test_retries.py new file mode 100644 index 000000000..f35c50750 --- /dev/null +++ b/packages/smithy-core/tests/unit/aio/test_retries.py @@ -0,0 +1,168 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import pytest +from smithy_core.aio.retries import ( + RetryStrategyOptions, + RetryStrategyResolver, + SimpleRetryStrategy, + StandardRetryStrategy, +) +from smithy_core.exceptions import CallError, RetryError +from smithy_core.retries import ( + ExponentialRetryBackoffStrategy, +) + + +@pytest.mark.parametrize("max_attempts", [2, 3, 10]) +async def test_simple_retry_strategy(max_attempts: int) -> None: + strategy = SimpleRetryStrategy( + backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), + max_attempts=max_attempts, + ) + error = CallError(is_retry_safe=True) + token = await strategy.acquire_initial_retry_token() + for _ in range(max_attempts - 1): + token = await strategy.refresh_retry_token_for_retry( + token_to_renew=token, error=error + ) + with pytest.raises(RetryError): + await strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +async def test_simple_retry_does_not_retry_unclassified() -> None: + strategy = SimpleRetryStrategy( + backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), + max_attempts=2, + ) + token = await strategy.acquire_initial_retry_token() + with pytest.raises(RetryError): + await strategy.refresh_retry_token_for_retry( + token_to_renew=token, error=Exception() + ) + + +async def test_simple_retry_does_not_retry_when_safety_unknown() -> None: + strategy = SimpleRetryStrategy( + backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), + max_attempts=2, + ) + error = CallError(is_retry_safe=None) + token = await strategy.acquire_initial_retry_token() + with pytest.raises(RetryError): + await strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +async def test_simple_retry_does_not_retry_unsafe() -> None: + strategy = SimpleRetryStrategy( + backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), + max_attempts=2, + ) + error = CallError(fault="client", is_retry_safe=False) + token = await strategy.acquire_initial_retry_token() + with pytest.raises(RetryError): + await strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +@pytest.mark.parametrize("max_attempts", [2, 3, 10]) +async def test_standard_retry_strategy(max_attempts: int) -> None: + strategy = StandardRetryStrategy(max_attempts=max_attempts) + error = CallError(is_retry_safe=True) + token = await strategy.acquire_initial_retry_token() + for _ in range(max_attempts - 1): + token = await strategy.refresh_retry_token_for_retry( + token_to_renew=token, error=error + ) + with pytest.raises(RetryError): + await strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +@pytest.mark.parametrize( + "error", + [ + Exception(), + CallError(is_retry_safe=None), + CallError(fault="client", is_retry_safe=False), + ], + ids=[ + "unclassified_error", + "safety_unknown_error", + "unsafe_error", + ], +) +async def test_standard_retry_does_not_retry(error: Exception | CallError) -> None: + strategy = StandardRetryStrategy() + token = await strategy.acquire_initial_retry_token() + with pytest.raises(RetryError): + await strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +async def test_standard_retry_after_overrides_backoff() -> None: + strategy = StandardRetryStrategy() + error = CallError(is_retry_safe=True, retry_after=5.5) + token = await strategy.acquire_initial_retry_token() + token = await strategy.refresh_retry_token_for_retry( + token_to_renew=token, error=error + ) + assert token.retry_delay == 5.5 + + +async def test_standard_retry_invalid_max_attempts() -> None: + with pytest.raises(ValueError): + StandardRetryStrategy(max_attempts=-1) + + +async def test_retry_strategy_resolver_none_returns_default() -> None: + resolver = RetryStrategyResolver() + + strategy = await resolver.resolve_retry_strategy(retry_strategy=None) + + assert isinstance(strategy, StandardRetryStrategy) + assert strategy.max_attempts == 3 + + +async def test_retry_strategy_resolver_creates_different_strategies() -> None: + resolver = RetryStrategyResolver() + + options1 = RetryStrategyOptions(max_attempts=3) + options2 = RetryStrategyOptions(max_attempts=5) + + strategy1 = await resolver.resolve_retry_strategy(retry_strategy=options1) + strategy2 = await resolver.resolve_retry_strategy(retry_strategy=options2) + + assert strategy1.max_attempts == 3 + assert strategy2.max_attempts == 5 + assert strategy1 is not strategy2 + + +async def test_retry_strategy_resolver_caches_strategies() -> None: + resolver = RetryStrategyResolver() + + strategy1 = await resolver.resolve_retry_strategy(retry_strategy=None) + strategy2 = await resolver.resolve_retry_strategy(retry_strategy=None) + options = RetryStrategyOptions(max_attempts=5) + strategy3 = await resolver.resolve_retry_strategy(retry_strategy=options) + strategy4 = await resolver.resolve_retry_strategy(retry_strategy=options) + + assert strategy1 is strategy2 + assert strategy3 is strategy4 + assert strategy1 is not strategy3 + + +async def test_retry_strategy_resolver_returns_existing_strategy() -> None: + resolver = RetryStrategyResolver() + provided_strategy = SimpleRetryStrategy(max_attempts=7) + + strategy = await resolver.resolve_retry_strategy(retry_strategy=provided_strategy) + + assert strategy is provided_strategy + assert strategy.max_attempts == 7 + + +async def test_retry_strategy_resolver_rejects_invalid_type() -> None: + resolver = RetryStrategyResolver() + + with pytest.raises( + TypeError, + match="retry_strategy must be RetryStrategy, RetryStrategyOptions, or None", + ): + await resolver.resolve_retry_strategy(retry_strategy="invalid") # type: ignore diff --git a/packages/smithy-core/tests/unit/test_retries.py b/packages/smithy-core/tests/unit/test_retries.py index c36c5b758..65f9a2c47 100644 --- a/packages/smithy-core/tests/unit/test_retries.py +++ b/packages/smithy-core/tests/unit/test_retries.py @@ -5,11 +5,7 @@ from smithy_core.retries import ExponentialBackoffJitterType as EBJT from smithy_core.retries import ( ExponentialRetryBackoffStrategy, - RetryStrategyOptions, - RetryStrategyResolver, - SimpleRetryStrategy, StandardRetryQuota, - StandardRetryStrategy, ) @@ -60,100 +56,6 @@ def test_exponential_backoff_strategy( assert delay_actual == pytest.approx(delay_expected) # type: ignore -@pytest.mark.parametrize("max_attempts", [2, 3, 10]) -def test_simple_retry_strategy(max_attempts: int) -> None: - strategy = SimpleRetryStrategy( - backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), - max_attempts=max_attempts, - ) - error = CallError(is_retry_safe=True) - token = strategy.acquire_initial_retry_token() - for _ in range(max_attempts - 1): - token = strategy.refresh_retry_token_for_retry( - token_to_renew=token, error=error - ) - with pytest.raises(RetryError): - strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) - - -def test_simple_retry_does_not_retry_unclassified() -> None: - strategy = SimpleRetryStrategy( - backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), - max_attempts=2, - ) - token = strategy.acquire_initial_retry_token() - with pytest.raises(RetryError): - strategy.refresh_retry_token_for_retry(token_to_renew=token, error=Exception()) - - -def test_simple_retry_does_not_retry_when_safety_unknown() -> None: - strategy = SimpleRetryStrategy( - backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), - max_attempts=2, - ) - error = CallError(is_retry_safe=None) - token = strategy.acquire_initial_retry_token() - with pytest.raises(RetryError): - strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) - - -def test_simple_retry_does_not_retry_unsafe() -> None: - strategy = SimpleRetryStrategy( - backoff_strategy=ExponentialRetryBackoffStrategy(backoff_scale_value=5), - max_attempts=2, - ) - error = CallError(fault="client", is_retry_safe=False) - token = strategy.acquire_initial_retry_token() - with pytest.raises(RetryError): - strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) - - -@pytest.mark.parametrize("max_attempts", [2, 3, 10]) -def test_standard_retry_strategy(max_attempts: int) -> None: - strategy = StandardRetryStrategy(max_attempts=max_attempts) - error = CallError(is_retry_safe=True) - token = strategy.acquire_initial_retry_token() - for _ in range(max_attempts - 1): - token = strategy.refresh_retry_token_for_retry( - token_to_renew=token, error=error - ) - with pytest.raises(RetryError): - strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) - - -@pytest.mark.parametrize( - "error", - [ - Exception(), - CallError(is_retry_safe=None), - CallError(fault="client", is_retry_safe=False), - ], - ids=[ - "unclassified_error", - "safety_unknown_error", - "unsafe_error", - ], -) -def test_standard_retry_does_not_retry(error: Exception | CallError) -> None: - strategy = StandardRetryStrategy() - token = strategy.acquire_initial_retry_token() - with pytest.raises(RetryError): - strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) - - -def test_standard_retry_after_overrides_backoff() -> None: - strategy = StandardRetryStrategy() - error = CallError(is_retry_safe=True, retry_after=5.5) - token = strategy.acquire_initial_retry_token() - token = strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) - assert token.retry_delay == 5.5 - - -def test_standard_retry_invalid_max_attempts() -> None: - with pytest.raises(ValueError): - StandardRetryStrategy(max_attempts=-1) - - @pytest.fixture def retry_quota() -> StandardRetryQuota: return StandardRetryQuota(initial_capacity=10) @@ -218,60 +120,3 @@ def test_retry_quota_acquire_timeout_error( acquired = retry_quota.acquire(error=timeout_error) assert acquired == StandardRetryQuota.TIMEOUT_RETRY_COST assert retry_quota.available_capacity == 0 - - -async def test_retry_strategy_resolver_none_returns_default() -> None: - resolver = RetryStrategyResolver() - - strategy = await resolver.resolve_retry_strategy(retry_strategy=None) - - assert isinstance(strategy, StandardRetryStrategy) - assert strategy.max_attempts == 3 - - -async def test_retry_strategy_resolver_creates_different_strategies() -> None: - resolver = RetryStrategyResolver() - - options1 = RetryStrategyOptions(max_attempts=3) - options2 = RetryStrategyOptions(max_attempts=5) - - strategy1 = await resolver.resolve_retry_strategy(retry_strategy=options1) - strategy2 = await resolver.resolve_retry_strategy(retry_strategy=options2) - - assert strategy1.max_attempts == 3 - assert strategy2.max_attempts == 5 - assert strategy1 is not strategy2 - - -async def test_retry_strategy_resolver_caches_strategies() -> None: - resolver = RetryStrategyResolver() - - strategy1 = await resolver.resolve_retry_strategy(retry_strategy=None) - strategy2 = await resolver.resolve_retry_strategy(retry_strategy=None) - options = RetryStrategyOptions(max_attempts=5) - strategy3 = await resolver.resolve_retry_strategy(retry_strategy=options) - strategy4 = await resolver.resolve_retry_strategy(retry_strategy=options) - - assert strategy1 is strategy2 - assert strategy3 is strategy4 - assert strategy1 is not strategy3 - - -async def test_retry_strategy_resolver_returns_existing_strategy() -> None: - resolver = RetryStrategyResolver() - provided_strategy = SimpleRetryStrategy(max_attempts=7) - - strategy = await resolver.resolve_retry_strategy(retry_strategy=provided_strategy) - - assert strategy is provided_strategy - assert strategy.max_attempts == 7 - - -async def test_retry_strategy_resolver_rejects_invalid_type() -> None: - resolver = RetryStrategyResolver() - - with pytest.raises( - TypeError, - match="retry_strategy must be RetryStrategy, RetryStrategyOptions, or None", - ): - await resolver.resolve_retry_strategy(retry_strategy="invalid") # type: ignore