Skip to content

Commit d283576

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
Log background thread errors at top level
1 parent 4eecf0e commit d283576

File tree

2 files changed

+186
-3
lines changed

2 files changed

+186
-3
lines changed

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from aws_durable_execution_sdk_python.context import DurableContext, ExecutionState
1111
from aws_durable_execution_sdk_python.exceptions import (
1212
BackgroundThreadError,
13+
BotoClientError,
1314
CheckpointError,
1415
DurableExecutionsError,
1516
ExecutionError,
@@ -314,7 +315,13 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
314315
except BackgroundThreadError as bg_error:
315316
# Background checkpoint system failed - propagated through CompletionEvent
316317
# Do not attempt to checkpoint anything, just terminate immediately
317-
logger.exception("Checkpoint processing failed")
318+
if isinstance(bg_error.source_exception, BotoClientError):
319+
logger.exception(
320+
"Checkpoint processing failed",
321+
extra=bg_error.source_exception.build_logger_extras(),
322+
)
323+
else:
324+
logger.exception("Checkpoint processing failed")
318325
execution_state.stop_checkpointing()
319326
# Raise the original exception
320327
raise bg_error.source_exception from bg_error
@@ -327,10 +334,13 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
327334
status=InvocationStatus.PENDING
328335
).to_dict()
329336

330-
except CheckpointError:
337+
except CheckpointError as e:
331338
# Checkpoint system is broken - stop background thread and exit immediately
332339
execution_state.stop_checkpointing()
333-
logger.exception("Checkpoint system failed")
340+
logger.exception(
341+
"Checkpoint system failed",
342+
extra=e.build_logger_extras(),
343+
)
334344
raise # Terminate Lambda immediately
335345
except InvocationError:
336346
execution_state.stop_checkpointing()

tests/execution_test.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from aws_durable_execution_sdk_python.config import StepConfig, StepSemantics
1212
from aws_durable_execution_sdk_python.context import DurableContext
1313
from aws_durable_execution_sdk_python.exceptions import (
14+
BotoClientError,
1415
CheckpointError,
1516
ExecutionError,
1617
InvocationError,
@@ -1408,3 +1409,175 @@ def test_handler(event: Any, context: DurableContext) -> str:
14081409
RuntimeError, match="Background checkpoint failed on error handling"
14091410
):
14101411
test_handler(invocation_input, lambda_context)
1412+
1413+
1414+
def test_durable_execution_logs_checkpoint_error_extras_from_background_thread():
1415+
"""Test that CheckpointError extras are logged when raised from background thread."""
1416+
mock_client = Mock(spec=DurableServiceClient)
1417+
mock_logger = Mock()
1418+
1419+
error_obj = {"Code": "TestError", "Message": "Test checkpoint error"}
1420+
metadata_obj = {"RequestId": "test-request-id"}
1421+
1422+
def failing_checkpoint(*args, **kwargs):
1423+
raise CheckpointError( # noqa TRY003
1424+
"Checkpoint failed", # noqa EM101
1425+
error=error_obj,
1426+
response_metadata=metadata_obj, # EM101
1427+
)
1428+
1429+
@durable_execution
1430+
def test_handler(event: Any, context: DurableContext) -> dict:
1431+
context.step(lambda ctx: "step_result")
1432+
return {"result": "success"}
1433+
1434+
operation = Operation(
1435+
operation_id="exec1",
1436+
operation_type=OperationType.EXECUTION,
1437+
status=OperationStatus.STARTED,
1438+
execution_details=ExecutionDetails(input_payload="{}"),
1439+
)
1440+
1441+
initial_state = InitialExecutionState(operations=[operation], next_marker="")
1442+
1443+
invocation_input = DurableExecutionInvocationInputWithClient(
1444+
durable_execution_arn="arn:test:execution",
1445+
checkpoint_token="token123", # noqa: S106
1446+
initial_execution_state=initial_state,
1447+
is_local_runner=False,
1448+
service_client=mock_client,
1449+
)
1450+
1451+
lambda_context = Mock()
1452+
lambda_context.aws_request_id = "test-request"
1453+
lambda_context.client_context = None
1454+
lambda_context.identity = None
1455+
lambda_context._epoch_deadline_time_in_ms = 1000000 # noqa: SLF001
1456+
lambda_context.invoked_function_arn = None
1457+
lambda_context.tenant_id = None
1458+
1459+
mock_client.checkpoint.side_effect = failing_checkpoint
1460+
1461+
with patch("aws_durable_execution_sdk_python.execution.logger", mock_logger):
1462+
with pytest.raises(CheckpointError):
1463+
test_handler(invocation_input, lambda_context)
1464+
1465+
mock_logger.exception.assert_called_once()
1466+
call_args = mock_logger.exception.call_args
1467+
assert "Checkpoint processing failed" in call_args[0][0]
1468+
assert call_args[1]["extra"]["Error"] == error_obj
1469+
assert call_args[1]["extra"]["ResponseMetadata"] == metadata_obj
1470+
1471+
1472+
def test_durable_execution_logs_boto_client_error_extras_from_background_thread():
1473+
"""Test that BotoClientError extras are logged when raised from background thread."""
1474+
1475+
mock_client = Mock(spec=DurableServiceClient)
1476+
mock_logger = Mock()
1477+
1478+
error_obj = {"Code": "ServiceError", "Message": "Boto3 service error"}
1479+
metadata_obj = {"RequestId": "boto-request-id"}
1480+
1481+
def failing_checkpoint(*args, **kwargs):
1482+
raise BotoClientError( # noqa TRY003
1483+
"Boto3 error", # noqa EM101
1484+
error=error_obj,
1485+
response_metadata=metadata_obj, # EM101
1486+
)
1487+
1488+
@durable_execution
1489+
def test_handler(event: Any, context: DurableContext) -> dict:
1490+
context.step(lambda ctx: "step_result")
1491+
return {"result": "success"}
1492+
1493+
operation = Operation(
1494+
operation_id="exec1",
1495+
operation_type=OperationType.EXECUTION,
1496+
status=OperationStatus.STARTED,
1497+
execution_details=ExecutionDetails(input_payload="{}"),
1498+
)
1499+
1500+
initial_state = InitialExecutionState(operations=[operation], next_marker="")
1501+
1502+
invocation_input = DurableExecutionInvocationInputWithClient(
1503+
durable_execution_arn="arn:test:execution",
1504+
checkpoint_token="token123", # noqa: S106
1505+
initial_execution_state=initial_state,
1506+
is_local_runner=False,
1507+
service_client=mock_client,
1508+
)
1509+
1510+
lambda_context = Mock()
1511+
lambda_context.aws_request_id = "test-request"
1512+
lambda_context.client_context = None
1513+
lambda_context.identity = None
1514+
lambda_context._epoch_deadline_time_in_ms = 1000000 # noqa: SLF001
1515+
lambda_context.invoked_function_arn = None
1516+
lambda_context.tenant_id = None
1517+
1518+
mock_client.checkpoint.side_effect = failing_checkpoint
1519+
1520+
with patch("aws_durable_execution_sdk_python.execution.logger", mock_logger):
1521+
with pytest.raises(BotoClientError):
1522+
test_handler(invocation_input, lambda_context)
1523+
1524+
mock_logger.exception.assert_called_once()
1525+
call_args = mock_logger.exception.call_args
1526+
assert "Checkpoint processing failed" in call_args[0][0]
1527+
assert call_args[1]["extra"]["Error"] == error_obj
1528+
assert call_args[1]["extra"]["ResponseMetadata"] == metadata_obj
1529+
1530+
1531+
def test_durable_execution_logs_checkpoint_error_extras_from_user_code():
1532+
"""Test that CheckpointError extras are logged when raised directly from user code."""
1533+
mock_client = Mock(spec=DurableServiceClient)
1534+
mock_logger = Mock()
1535+
1536+
error_obj = {
1537+
"Code": "UserCheckpointError",
1538+
"Message": "User raised checkpoint error",
1539+
}
1540+
metadata_obj = {"RequestId": "user-request-id"}
1541+
1542+
@durable_execution
1543+
def test_handler(event: Any, context: DurableContext) -> dict:
1544+
raise CheckpointError( # noqa TRY003
1545+
"User checkpoint error", # noqa EM101
1546+
error=error_obj,
1547+
response_metadata=metadata_obj, # EM101
1548+
)
1549+
1550+
operation = Operation(
1551+
operation_id="exec1",
1552+
operation_type=OperationType.EXECUTION,
1553+
status=OperationStatus.STARTED,
1554+
execution_details=ExecutionDetails(input_payload="{}"),
1555+
)
1556+
1557+
initial_state = InitialExecutionState(operations=[operation], next_marker="")
1558+
1559+
invocation_input = DurableExecutionInvocationInputWithClient(
1560+
durable_execution_arn="arn:test:execution",
1561+
checkpoint_token="token123", # noqa: S106
1562+
initial_execution_state=initial_state,
1563+
is_local_runner=False,
1564+
service_client=mock_client,
1565+
)
1566+
1567+
lambda_context = Mock()
1568+
lambda_context.aws_request_id = "test-request"
1569+
lambda_context.client_context = None
1570+
lambda_context.identity = None
1571+
lambda_context._epoch_deadline_time_in_ms = 1000000 # noqa: SLF001
1572+
lambda_context.invoked_function_arn = None
1573+
lambda_context.tenant_id = None
1574+
1575+
with patch("aws_durable_execution_sdk_python.execution.logger", mock_logger):
1576+
with pytest.raises(CheckpointError):
1577+
test_handler(invocation_input, lambda_context)
1578+
1579+
mock_logger.exception.assert_called_once()
1580+
call_args = mock_logger.exception.call_args
1581+
assert call_args[0][0] == "Checkpoint system failed"
1582+
assert call_args[1]["extra"]["Error"] == error_obj
1583+
assert call_args[1]["extra"]["ResponseMetadata"] == metadata_obj

0 commit comments

Comments
 (0)