Skip to content

Commit e2d4527

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
fix(sdk): match reference behaviour for large error payloads
Changes: - When payloads are large, we checkpoint the error and return only failed. - When payloads are small, we return back the error fixes: #41
1 parent 3b70eb1 commit e2d4527

File tree

3 files changed

+94
-31
lines changed

3 files changed

+94
-31
lines changed

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import contextlib
34
import json
45
import logging
56
from concurrent.futures import ThreadPoolExecutor
@@ -250,9 +251,12 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
250251
)
251252

252253
# Use ThreadPoolExecutor for concurrent execution of user code and background checkpoint processing
253-
with ThreadPoolExecutor(
254-
max_workers=2, thread_name_prefix="dex-handler"
255-
) as executor:
254+
with (
255+
ThreadPoolExecutor(
256+
max_workers=2, thread_name_prefix="dex-handler"
257+
) as executor,
258+
contextlib.closing(execution_state) as execution_state,
259+
):
256260
# Thread 1: Run background checkpoint processing
257261
executor.submit(execution_state.checkpoint_batches_forever)
258262

@@ -296,18 +300,12 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
296300
# Must ensure the result is persisted before returning to Lambda.
297301
# Large results exceed Lambda response limits and must be stored durably
298302
# before the execution completes.
299-
execution_state.create_checkpoint_sync(success_operation)
300-
301-
# Stop background checkpointing thread
302-
execution_state.stop_checkpointing()
303+
execution_state.create_checkpoint(success_operation, is_sync=True)
303304

304305
return DurableExecutionInvocationOutput.create_succeeded(
305306
result=""
306307
).to_dict()
307308

308-
# Stop background checkpointing thread
309-
execution_state.stop_checkpointing()
310-
311309
return DurableExecutionInvocationOutput.create_succeeded(
312310
result=serialized_result
313311
).to_dict()
@@ -322,33 +320,28 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
322320
)
323321
else:
324322
logger.exception("Checkpoint processing failed")
325-
execution_state.stop_checkpointing()
326323
# Raise the original exception
327324
raise bg_error.source_exception from bg_error
328325

329326
except SuspendExecution:
330327
# User code suspended - stop background checkpointing thread
331328
logger.debug("Suspending execution...")
332-
execution_state.stop_checkpointing()
333329
return DurableExecutionInvocationOutput(
334330
status=InvocationStatus.PENDING
335331
).to_dict()
336332

337333
except CheckpointError as e:
338334
# Checkpoint system is broken - stop background thread and exit immediately
339-
execution_state.stop_checkpointing()
340335
logger.exception(
341336
"Checkpoint system failed",
342337
extra=e.build_logger_extras(),
343338
)
344339
raise # Terminate Lambda immediately
345340
except InvocationError:
346-
execution_state.stop_checkpointing()
347341
logger.exception("Invocation error. Must terminate.")
348342
# Throw the error to trigger Lambda retry
349343
raise
350344
except ExecutionError as e:
351-
execution_state.stop_checkpointing()
352345
logger.exception("Execution error. Must terminate without retry.")
353346
return DurableExecutionInvocationOutput(
354347
status=InvocationStatus.FAILED,
@@ -357,15 +350,36 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
357350
except Exception as e:
358351
# all user-space errors go here
359352
logger.exception("Execution failed")
360-
failed_operation = OperationUpdate.create_execution_fail(
361-
error=ErrorObject.from_exception(e)
362-
)
363-
# TODO: can optimize, if not too large can just return response rather than checkpoint
364-
execution_state.create_checkpoint_sync(failed_operation)
365353

366-
execution_state.stop_checkpointing()
367-
return DurableExecutionInvocationOutput(
368-
status=InvocationStatus.FAILED
354+
result = DurableExecutionInvocationOutput(
355+
status=InvocationStatus.FAILED, error=ErrorObject.from_exception(e)
369356
).to_dict()
370357

358+
serialized_result = json.dumps(result)
359+
360+
if (
361+
serialized_result
362+
and len(serialized_result) > LAMBDA_RESPONSE_SIZE_LIMIT
363+
):
364+
logger.debug(
365+
"Response size (%s bytes) exceeds Lambda limit (%s) bytes). Checkpointing result.",
366+
len(serialized_result),
367+
LAMBDA_RESPONSE_SIZE_LIMIT,
368+
)
369+
failed_operation = OperationUpdate.create_execution_fail(
370+
error=ErrorObject.from_exception(e)
371+
)
372+
373+
# Checkpoint large result with blocking (is_sync=True, default).
374+
# Must ensure the result is persisted before returning to Lambda.
375+
# Large results exceed Lambda response limits and must be stored durably
376+
# before the execution completes.
377+
execution_state.create_checkpoint_sync(failed_operation)
378+
379+
return DurableExecutionInvocationOutput(
380+
status=InvocationStatus.FAILED
381+
).to_dict()
382+
383+
return result
384+
371385
return wrapper

src/aws_durable_execution_sdk_python/state.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,3 +731,6 @@ def _calculate_operation_size(queued_op: QueuedOperation) -> int:
731731
# Use JSON serialization to estimate size
732732
serialized = json.dumps(queued_op.operation_update.to_dict()).encode("utf-8")
733733
return len(serialized)
734+
735+
def close(self):
736+
self.stop_checkpointing()

tests/execution_test.py

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -592,17 +592,63 @@ def test_handler(event: Any, context: DurableContext) -> dict:
592592

593593
result = test_handler(invocation_input, lambda_context)
594594

595+
# small error, should not call checkpoint
595596
assert result["Status"] == InvocationStatus.FAILED.value
597+
assert result["Error"] == {"ErrorMessage": "Test error", "ErrorType": "ValueError"}
598+
599+
assert not mock_client.checkpoint.called
600+
601+
602+
def test_durable_execution_with_large_error_payload():
603+
"""Test that large error payloads trigger checkpoint."""
604+
mock_client = Mock(spec=DurableServiceClient)
605+
mock_output = CheckpointOutput(
606+
checkpoint_token="new_token", # noqa: S106
607+
new_execution_state=CheckpointUpdatedExecutionState(),
608+
)
609+
mock_client.checkpoint.return_value = mock_output
610+
611+
@durable_execution
612+
def test_handler(event: Any, context: DurableContext) -> dict:
613+
raise ValueError(LARGE_RESULT)
614+
615+
operation = Operation(
616+
operation_id="exec1",
617+
operation_type=OperationType.EXECUTION,
618+
status=OperationStatus.STARTED,
619+
execution_details=ExecutionDetails(input_payload="{}"),
620+
)
621+
622+
initial_state = InitialExecutionState(operations=[operation], next_marker="")
623+
624+
invocation_input = DurableExecutionInvocationInputWithClient(
625+
durable_execution_arn="arn:test:execution",
626+
checkpoint_token="token123", # noqa: S106
627+
initial_execution_state=initial_state,
628+
is_local_runner=False,
629+
service_client=mock_client,
630+
)
631+
632+
lambda_context = Mock()
633+
lambda_context.aws_request_id = "test-request"
634+
lambda_context.client_context = None
635+
lambda_context.identity = None
636+
lambda_context._epoch_deadline_time_in_ms = 1000000 # noqa: SLF001
637+
lambda_context.invoked_function_arn = None
638+
lambda_context.tenant_id = None
639+
640+
result = test_handler(invocation_input, lambda_context)
641+
642+
assert result["Status"] == InvocationStatus.FAILED.value
643+
assert "Error" not in result
596644
mock_client.checkpoint.assert_called_once()
597645

598-
# Verify the checkpoint call was for execution failure
599646
call_args = mock_client.checkpoint.call_args
600647
updates = call_args[1]["updates"]
601648
assert len(updates) == 1
602649
assert updates[0].operation_type == OperationType.EXECUTION
603650
assert updates[0].action.value == "FAIL"
604-
assert updates[0].error.message == "Test error"
605-
assert updates[0].error.type == "ValueError"
651+
assert updates[0].error.message == LARGE_RESULT
606652

607653

608654
def test_durable_execution_fatal_error_handling():
@@ -1404,11 +1450,11 @@ def test_handler(event: Any, context: DurableContext) -> str:
14041450
# Make the service client checkpoint call fail on error handling
14051451
mock_client.checkpoint.side_effect = failing_checkpoint
14061452

1407-
# Verify that the checkpoint error is raised (not the original ValueError)
1408-
with pytest.raises(
1409-
RuntimeError, match="Background checkpoint failed on error handling"
1410-
):
1411-
test_handler(invocation_input, lambda_context)
1453+
# Verify that errors are not raised, but returned because response is small
1454+
resp = test_handler(invocation_input, lambda_context)
1455+
assert resp["Error"]["ErrorMessage"] == "User function error"
1456+
assert resp["Error"]["ErrorType"] == "ValueError"
1457+
assert resp["Status"] == InvocationStatus.FAILED.value
14121458

14131459

14141460
def test_durable_execution_logs_checkpoint_error_extras_from_background_thread():

0 commit comments

Comments
 (0)