Skip to content

Commit 39993c5

Browse files
Alex Wangwangyb-A
authored andcommitted
feat: exit early when pending
- suspend execution of step and wait_for_condition when the checkpointed result is pending - refactor suspend, create a new suspend module - test cases
1 parent 1a6262f commit 39993c5

File tree

11 files changed

+323
-61
lines changed

11 files changed

+323
-61
lines changed

src/aws_durable_execution_sdk_python/exceptions.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
import time
99
from dataclasses import dataclass
10+
from typing import TYPE_CHECKING
11+
12+
if TYPE_CHECKING:
13+
import datetime
1014

1115

1216
class DurableExecutionsError(Exception):
@@ -96,6 +100,21 @@ def from_delay(cls, message: str, delay_seconds: int) -> TimedSuspendExecution:
96100
resume_time = time.time() + delay_seconds
97101
return cls(message, scheduled_timestamp=resume_time)
98102

103+
@classmethod
104+
def from_datetime(
105+
cls, message: str, datetime_timestamp: datetime.datetime
106+
) -> TimedSuspendExecution:
107+
"""Create a timed suspension with the delay calculated from now.
108+
109+
Args:
110+
message: Descriptive message for the suspension
111+
datetime_timestamp: Unix datetime timestamp in seconds at which to resume
112+
113+
Returns:
114+
TimedSuspendExecution: Instance with calculated resume time
115+
"""
116+
return cls(message, scheduled_timestamp=datetime_timestamp.timestamp())
117+
99118

100119
class OrderedLockError(DurableExecutionsError):
101120
"""An error from OrderedLock.

src/aws_durable_execution_sdk_python/lambda_service.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,7 @@ def to_callable_runtime_error(self) -> CallableRuntimeError:
149149
@dataclass(frozen=True)
150150
class StepDetails:
151151
attempt: int = 0
152-
next_attempt_timestamp: str | None = (
153-
None # TODO: confirm type, depending on how serialized
154-
)
152+
next_attempt_timestamp: datetime.datetime | None = None
155153
result: str | None = None
156154
error: ErrorObject | None = None
157155

src/aws_durable_execution_sdk_python/operation/invoke.py

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,15 @@
88
from aws_durable_execution_sdk_python.config import InvokeConfig
99
from aws_durable_execution_sdk_python.exceptions import (
1010
FatalError,
11-
SuspendExecution,
12-
TimedSuspendExecution,
1311
)
1412
from aws_durable_execution_sdk_python.lambda_service import (
1513
InvokeOptions,
1614
OperationUpdate,
1715
)
1816
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
17+
from aws_durable_execution_sdk_python.suspend import suspend_with_optional_timeout
1918

2019
if TYPE_CHECKING:
21-
from typing import NoReturn
22-
2320
from aws_durable_execution_sdk_python.identifier import OperationIdentifier
2421
from aws_durable_execution_sdk_python.state import ExecutionState
2522

@@ -108,21 +105,3 @@ def invoke_handler(
108105
# This line should never be reached since suspend_with_optional_timeout always raises
109106
msg = "suspend_with_optional_timeout should have raised an exception, but did not."
110107
raise FatalError(msg) from None
111-
112-
113-
def suspend_with_optional_timeout(
114-
msg: str, timeout_seconds: int | None = None
115-
) -> NoReturn:
116-
"""Suspend execution with optional timeout.
117-
118-
Args:
119-
msg: Descriptive message for the suspension
120-
timeout_seconds: Duration to suspend in seconds, or None/0 for indefinite
121-
122-
Raises:
123-
TimedSuspendExecution: When timeout_seconds > 0
124-
SuspendExecution: When timeout_seconds is None or <= 0
125-
"""
126-
if timeout_seconds and timeout_seconds > 0:
127-
raise TimedSuspendExecution.from_delay(msg, timeout_seconds)
128-
raise SuspendExecution(msg)

src/aws_durable_execution_sdk_python/operation/step.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import annotations
44

55
import logging
6-
import time
76
from typing import TYPE_CHECKING, TypeVar
87

98
from aws_durable_execution_sdk_python.config import (
@@ -14,12 +13,18 @@
1413
from aws_durable_execution_sdk_python.exceptions import (
1514
FatalError,
1615
StepInterruptedError,
17-
TimedSuspendExecution,
1816
)
19-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
17+
from aws_durable_execution_sdk_python.lambda_service import (
18+
ErrorObject,
19+
OperationUpdate,
20+
)
2021
from aws_durable_execution_sdk_python.logger import Logger, LogInfo
2122
from aws_durable_execution_sdk_python.retries import RetryPresets
2223
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
24+
from aws_durable_execution_sdk_python.suspend import (
25+
suspend_with_optional_timeout,
26+
suspend_with_optional_timestamp,
27+
)
2328
from aws_durable_execution_sdk_python.types import StepContext
2429

2530
if TYPE_CHECKING:
@@ -52,7 +57,9 @@ def step_handler(
5257
if not config:
5358
config = StepConfig()
5459

55-
checkpointed_result = state.get_checkpoint_result(operation_identifier.operation_id)
60+
checkpointed_result: CheckpointedResult = state.get_checkpoint_result(
61+
operation_identifier.operation_id
62+
)
5663
if checkpointed_result.is_succeeded():
5764
logger.debug(
5865
"Step already completed, skipping execution for id: %s, name: %s",
@@ -73,6 +80,13 @@ def step_handler(
7380
# have to throw the exact same error on replay as the checkpointed failure
7481
checkpointed_result.raise_callable_error()
7582

83+
if checkpointed_result.is_pending():
84+
scheduled_timestamp = checkpointed_result.get_next_attempt_timestamp()
85+
suspend_with_optional_timestamp(
86+
msg=f"Retry scheduled for {operation_identifier.name or operation_identifier.operation_id} will retry at timestamp {scheduled_timestamp}",
87+
datetime_timestamp=scheduled_timestamp,
88+
)
89+
7690
if checkpointed_result.is_started():
7791
# step was previously interrupted
7892
if config.step_semantics is StepSemantics.AT_MOST_ONCE_PER_RETRY:
@@ -193,7 +207,10 @@ def retry_handler(
193207

194208
state.create_checkpoint(operation_update=retry_operation)
195209

196-
_suspend(operation_identifier, retry_decision)
210+
suspend_with_optional_timeout(
211+
msg=f"Retry scheduled for {operation_identifier.operation_id} in {retry_decision.delay_seconds} seconds",
212+
timeout_seconds=retry_decision.delay_seconds,
213+
)
197214

198215
# no retry
199216
fail_operation: OperationUpdate = OperationUpdate.create_step_fail(
@@ -206,12 +223,3 @@ def retry_handler(
206223
raise error
207224

208225
raise error_object.to_callable_runtime_error()
209-
210-
211-
def _suspend(operation_identifier: OperationIdentifier, retry_decision: RetryDecision):
212-
scheduled_timestamp = time.time() + retry_decision.delay_seconds
213-
msg = f"Retry scheduled for {operation_identifier.operation_id} in {retry_decision.delay_seconds} seconds"
214-
raise TimedSuspendExecution(
215-
msg,
216-
scheduled_timestamp=scheduled_timestamp,
217-
)

src/aws_durable_execution_sdk_python/operation/wait_for_condition.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,21 @@
33
from __future__ import annotations
44

55
import logging
6-
import time
76
from typing import TYPE_CHECKING, TypeVar
87

98
from aws_durable_execution_sdk_python.exceptions import (
109
FatalError,
11-
TimedSuspendExecution,
1210
)
13-
from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate
11+
from aws_durable_execution_sdk_python.lambda_service import (
12+
ErrorObject,
13+
OperationUpdate,
14+
)
1415
from aws_durable_execution_sdk_python.logger import LogInfo
1516
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
17+
from aws_durable_execution_sdk_python.suspend import (
18+
suspend_with_optional_timeout,
19+
suspend_with_optional_timestamp,
20+
)
1621
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext
1722

1823
if TYPE_CHECKING:
@@ -24,7 +29,10 @@
2429
)
2530
from aws_durable_execution_sdk_python.identifier import OperationIdentifier
2631
from aws_durable_execution_sdk_python.logger import Logger
27-
from aws_durable_execution_sdk_python.state import ExecutionState
32+
from aws_durable_execution_sdk_python.state import (
33+
CheckpointedResult,
34+
ExecutionState,
35+
)
2836

2937

3038
T = TypeVar("T")
@@ -49,7 +57,9 @@ def wait_for_condition_handler(
4957
operation_identifier.name,
5058
)
5159

52-
checkpointed_result = state.get_checkpoint_result(operation_identifier.operation_id)
60+
checkpointed_result: CheckpointedResult = state.get_checkpoint_result(
61+
operation_identifier.operation_id
62+
)
5363

5464
# Check if already completed
5565
if checkpointed_result.is_succeeded():
@@ -70,6 +80,13 @@ def wait_for_condition_handler(
7080
if checkpointed_result.is_failed():
7181
checkpointed_result.raise_callable_error()
7282

83+
if checkpointed_result.is_pending():
84+
scheduled_timestamp = checkpointed_result.get_next_attempt_timestamp()
85+
suspend_with_optional_timestamp(
86+
msg=f"wait_for_condition {operation_identifier.name or operation_identifier.operation_id} will retry at timestamp {scheduled_timestamp}",
87+
datetime_timestamp=scheduled_timestamp,
88+
)
89+
7390
attempt: int = 1
7491
if checkpointed_result.is_started_or_ready():
7592
# This is a retry - get state from previous checkpoint
@@ -164,7 +181,10 @@ def wait_for_condition_handler(
164181

165182
state.create_checkpoint(operation_update=retry_operation)
166183

167-
_suspend_execution(operation_identifier, decision)
184+
suspend_with_optional_timeout(
185+
msg=f"wait_for_condition {operation_identifier.name or operation_identifier.operation_id} will retry in {decision.delay_seconds} seconds",
186+
timeout_seconds=decision.delay_seconds,
187+
)
168188

169189
except Exception as e:
170190
# Mark as failed - waitForCondition doesn't have its own retry logic for errors
@@ -184,14 +204,3 @@ def wait_for_condition_handler(
184204

185205
msg: str = "wait_for_condition should never reach this point"
186206
raise FatalError(msg)
187-
188-
189-
def _suspend_execution(
190-
operation_identifier: OperationIdentifier, decision: WaitForConditionDecision
191-
) -> None:
192-
scheduled_timestamp = time.time() + (decision.delay_seconds or 0)
193-
msg = f"wait_for_condition {operation_identifier.name or operation_identifier.operation_id} will retry in {decision.delay_seconds} seconds"
194-
raise TimedSuspendExecution(
195-
msg,
196-
scheduled_timestamp=scheduled_timestamp,
197-
)

src/aws_durable_execution_sdk_python/state.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from aws_durable_execution_sdk_python.threading import OrderedLock
2121

2222
if TYPE_CHECKING:
23+
import datetime
2324
from collections.abc import MutableMapping
2425

2526

@@ -106,6 +107,13 @@ def is_started_or_ready(self) -> bool:
106107
return False
107108
return op.status in (OperationStatus.STARTED, OperationStatus.READY)
108109

110+
def is_pending(self) -> bool:
111+
"""Return True if the checkpointed operation is PENDING."""
112+
op = self.operation
113+
if not op:
114+
return False
115+
return op.status is OperationStatus.PENDING
116+
109117
def is_timed_out(self) -> bool:
110118
"""Return True if the checkpointed operation is TIMED_OUT."""
111119
op = self.operation
@@ -126,6 +134,11 @@ def raise_callable_error(self) -> None:
126134

127135
raise self.error.to_callable_runtime_error()
128136

137+
def get_next_attempt_timestamp(self) -> datetime.datetime | None:
138+
if self.operation and self.operation.step_details:
139+
return self.operation.step_details.next_attempt_timestamp
140+
return None
141+
129142

130143
# shared so don't need to create an instance for each not found check
131144
CHECKPOINT_NOT_FOUND = CheckpointedResult.create_not_found()
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import datetime
2+
3+
from aws_durable_execution_sdk_python.exceptions import (
4+
SuspendExecution,
5+
TimedSuspendExecution,
6+
)
7+
8+
9+
def suspend_with_optional_timestamp(
10+
msg: str, datetime_timestamp: datetime.datetime | None = None
11+
) -> None:
12+
"""Suspend execution with optional timestamp.
13+
14+
Args:
15+
msg: Descriptive message for the suspension
16+
timestamp: Timestamp to suspend until, or None/0 for indefinite
17+
18+
Raises:
19+
TimedSuspendExecution: When timestamp is in the future
20+
SuspendExecution: When timestamp is None or in the past
21+
"""
22+
# TODO: confirm with backend about the behaviour of 0 time suspend
23+
if datetime_timestamp and datetime_timestamp > datetime.datetime.now(
24+
tz=datetime.UTC
25+
):
26+
raise TimedSuspendExecution.from_datetime(msg, datetime_timestamp)
27+
msg = f"Invalid timestamp {datetime_timestamp}, suspending without retry timestamp, original operation: [{msg}]"
28+
raise SuspendExecution(msg)
29+
30+
31+
def suspend_with_optional_timeout(msg: str, timeout_seconds: int | None = None) -> None:
32+
"""Suspend execution with optional timeout.
33+
34+
Args:
35+
msg: Descriptive message for the suspension
36+
timeout_seconds: Duration to suspend in seconds, or None/0 for indefinite
37+
38+
Raises:
39+
TimedSuspendExecution: When timeout_seconds > 0
40+
SuspendExecution: When timeout_seconds is None or <= 0
41+
"""
42+
# TODO: confirm with backend about the behaviour of 0 time suspend
43+
if timeout_seconds and timeout_seconds > 0:
44+
raise TimedSuspendExecution.from_delay(msg, timeout_seconds)
45+
msg = f"Invalid timeout seconds {timeout_seconds}, suspending without retry timestamp, original operation: [{msg}]"
46+
raise SuspendExecution(msg)

0 commit comments

Comments
 (0)