diff --git a/backend/protocol_rpc/health.py b/backend/protocol_rpc/health.py index 99aa590a1..e184190d4 100644 --- a/backend/protocol_rpc/health.py +++ b/backend/protocol_rpc/health.py @@ -313,6 +313,9 @@ async def _run_health_checks() -> None: "max_recovery_exhausted_count": consensus_health.get( "max_recovery_exhausted_count", 0 ), + "max_recovery_exhausted_transactions": consensus_health.get( + "max_recovery_exhausted_transactions", [] + ), "no_consensus_progress": consensus_health.get( "no_consensus_progress", False ), @@ -617,6 +620,9 @@ async def _check_consensus_health() -> Dict[str, Any]: MAX_RECOVERY_EXHAUSTED_NOTICE_WINDOW_MINUTES = int( os.environ.get("HEALTH_MAX_RECOVERY_EXHAUSTED_NOTICE_WINDOW_MINUTES", "60") ) + MAX_RECOVERY_EXHAUSTED_EVENT_LIMIT = int( + os.environ.get("HEALTH_MAX_RECOVERY_EXHAUSTED_EVENT_LIMIT", "10") + ) # Statuses where the consensus state machine is actively working. # The "head of queue stuck" check uses ONLY these: ACCEPTED-class @@ -767,37 +773,66 @@ def _query_consensus(): recovery_row.max_recovery_count if recovery_row else 0 ) - max_recovery_exhausted_row = conn.execute( + max_recovery_exhausted_rows = conn.execute( text( """ - SELECT COUNT(*) AS n - FROM transactions - WHERE status = 'CANCELED' - AND consensus_data ->> 'error' - = 'max_recovery_cycles_exceeded' - AND CASE - WHEN consensus_data - ->> 'max_recovery_exhausted_at' - ~ '^[0-9]+(\\.[0-9]+)?$' - THEN to_timestamp( - ( - consensus_data - ->> 'max_recovery_exhausted_at' - )::double precision - ) - ELSE created_at - END > NOW() - CAST(:notice_window AS INTERVAL) + WITH exhausted AS ( + SELECT + hash AS tx_hash, + to_address, + recovery_count, + CASE + WHEN consensus_data + ->> 'max_recovery_exhausted_at' + ~ '^[0-9]+(\\.[0-9]+)?$' + THEN to_timestamp( + ( + consensus_data + ->> 'max_recovery_exhausted_at' + )::double precision + ) + ELSE created_at + END AS exhausted_at + FROM transactions + WHERE status = 'CANCELED' + AND consensus_data ->> 'error' + = 'max_recovery_cycles_exceeded' + ) + SELECT + COUNT(*) OVER() AS total_count, + tx_hash, + to_address, + recovery_count, + EXTRACT(EPOCH FROM exhausted_at)::bigint + AS exhausted_at_epoch + FROM exhausted + WHERE exhausted_at + > NOW() - CAST(:notice_window AS INTERVAL) + ORDER BY exhausted_at DESC + LIMIT :event_limit """ ), { "notice_window": ( f"{MAX_RECOVERY_EXHAUSTED_NOTICE_WINDOW_MINUTES} minutes" - ) + ), + "event_limit": MAX_RECOVERY_EXHAUSTED_EVENT_LIMIT, }, - ).fetchone() + ).fetchall() max_recovery_exhausted_count = ( - max_recovery_exhausted_row.n if max_recovery_exhausted_row else 0 + max_recovery_exhausted_rows[0].total_count + if max_recovery_exhausted_rows + else 0 ) + max_recovery_exhausted_transactions = [ + { + "hash": row.tx_hash, + "contract_address": row.to_address, + "recovery_count": row.recovery_count, + "exhausted_at": row.exhausted_at_epoch, + } + for row in max_recovery_exhausted_rows + ] # Total in-flight (non-final) tx count, for context. # Consensus-active only — finalization-pending rows @@ -960,6 +995,9 @@ def _query_consensus(): "recovery_storm_count": recovery_storm_count, "max_recovery_count": max_recovery_count, "max_recovery_exhausted_count": max_recovery_exhausted_count, + "max_recovery_exhausted_transactions": ( + max_recovery_exhausted_transactions + ), "no_consensus_progress": no_consensus_progress, "no_progress_backlog_count": no_progress_backlog_count, "oldest_backlog_age_seconds": oldest_backlog_age_seconds, diff --git a/backend/services/usage_metrics_service.py b/backend/services/usage_metrics_service.py index 3606ca2f6..a6f4e1966 100644 --- a/backend/services/usage_metrics_service.py +++ b/backend/services/usage_metrics_service.py @@ -103,6 +103,21 @@ async def send_system_health_metrics(self, health_cache) -> None: if mapped_status != "healthy": system_health["instanceHealthReasons"] = health_cache.issues + max_recovery_events = health_cache.services.get("consensus", {}).get( + "max_recovery_exhausted_transactions", [] + ) + if max_recovery_events: + system_health["instanceHealthEvents"] = [ + { + "type": "max_recovery_cycles_exhausted", + "transactionHash": event.get("hash"), + "contractAddress": event.get("contract_address"), + "recoveryCount": event.get("recovery_count"), + "occurredAt": event.get("exhausted_at"), + } + for event in max_recovery_events + ] + # Add pending contracts breakdown if available pending_contracts = getattr(health_cache, "pending_contracts", []) if pending_contracts: diff --git a/tests/db-sqlalchemy/test_health_orphan_detection.py b/tests/db-sqlalchemy/test_health_orphan_detection.py index 50facc380..c4c6aea5e 100644 --- a/tests/db-sqlalchemy/test_health_orphan_detection.py +++ b/tests/db-sqlalchemy/test_health_orphan_detection.py @@ -667,6 +667,14 @@ async def test_recent_max_recovery_exhaustion_flags_notice( result = await health_module._check_consensus_health() assert result["max_recovery_exhausted_count"] == 1 + assert result["max_recovery_exhausted_transactions"] == [ + { + "hash": "0x" + "ab" * 32, + "contract_address": "0x" + "ab" * 20, + "recovery_count": 3, + "exhausted_at": int(now.timestamp()), + } + ] assert result["status"] == "degraded" diff --git a/tests/unit/test_rpc_health_genvm_tracking.py b/tests/unit/test_rpc_health_genvm_tracking.py index 9885adb0c..3fa1a1141 100644 --- a/tests/unit/test_rpc_health_genvm_tracking.py +++ b/tests/unit/test_rpc_health_genvm_tracking.py @@ -5,6 +5,7 @@ import pytest from unittest.mock import patch, MagicMock, AsyncMock +from types import SimpleNamespace import backend.protocol_rpc.health as health_module @@ -278,6 +279,86 @@ async def fake_memory_health(): assert health_module._health_cache.genvm_available_permits == 4 assert health_module._health_cache.genvm_active_executions == 1 + @pytest.mark.asyncio + async def test_consensus_health_includes_max_recovery_exhaustion_events( + self, monkeypatch + ): + exhausted_tx = SimpleNamespace( + total_count=1, + tx_hash="0xabc", + to_address="0xcontract", + recovery_count=3, + exhausted_at_epoch=1779938084, + ) + + class FakeResult: + def __init__(self, row=None, rows=None): + self.row = row + self.rows = rows or [] + + def fetchone(self): + return self.row + + def fetchall(self): + return self.rows + + class FakeConnection: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def execute(self, statement, params=None): + query = str(statement) + if "COUNT(DISTINCT worker_id)" in query: + return FakeResult(SimpleNamespace(n=0)) + if "COUNT(*) AS stuck_heads" in query: + return FakeResult(SimpleNamespace(stuck_heads=0)) + if "timestamp_awaiting_finalization" in query: + return FakeResult(SimpleNamespace(n=0)) + if "COALESCE(MAX(recovery_count), 0)" in query: + return FakeResult(SimpleNamespace(n=0, max_recovery_count=0)) + if "max_recovery_cycles_exceeded" in query: + return FakeResult(rows=[exhausted_tx]) + if "WHERE status IN ('ACTIVATED'" in query: + return FakeResult(SimpleNamespace(n=0)) + if "backlog_count" in query: + return FakeResult( + SimpleNamespace( + backlog_count=0, + oldest_created_at=None, + oldest_backlog_age_seconds=None, + ) + ) + raise AssertionError(f"unexpected query: {query}") + + class FakeEngine: + def connect(self): + return FakeConnection() + + import backend.database_handler.session_factory as session_factory + + monkeypatch.setattr( + session_factory, + "get_database_manager", + lambda: SimpleNamespace(engine=FakeEngine()), + ) + monkeypatch.setattr(health_module, "_rpc_router_ref", object()) + + result = await health_module._check_consensus_health() + + assert result["status"] == "degraded" + assert result["max_recovery_exhausted_count"] == 1 + assert result["max_recovery_exhausted_transactions"] == [ + { + "hash": "0xabc", + "contract_address": "0xcontract", + "recovery_count": 3, + "exhausted_at": 1779938084, + } + ] + class TestThresholdConfig: """Test threshold configuration via environment variable.""" diff --git a/tests/unit/test_usage_metrics_service.py b/tests/unit/test_usage_metrics_service.py new file mode 100644 index 000000000..822628a72 --- /dev/null +++ b/tests/unit/test_usage_metrics_service.py @@ -0,0 +1,52 @@ +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from backend.services.usage_metrics_service import UsageMetricsService + + +@pytest.mark.asyncio +async def test_system_health_metrics_include_max_recovery_events(): + service = UsageMetricsService() + service._enabled = True + service._send_to_api = AsyncMock() + + health_cache = SimpleNamespace( + status="degraded", + genvm_healthy=True, + uptime_percent=100.0, + pending_transactions=1, + total_decisions=2, + total_users=3, + issues=["max_recovery_cycles_exhausted"], + pending_contracts=[], + services={ + "consensus": { + "active_workers": 1, + "max_recovery_exhausted_transactions": [ + { + "hash": "0xabc", + "contract_address": "0xcontract", + "recovery_count": 3, + "exhausted_at": 1779938084, + } + ], + }, + "memory": {"percent": 4.0, "cpu_percent": 5.0}, + }, + ) + + await service.send_system_health_metrics(health_cache) + + service._send_to_api.assert_awaited_once() + payload = service._send_to_api.await_args.args[0] + assert payload["systemHealth"]["instanceHealthEvents"] == [ + { + "type": "max_recovery_cycles_exhausted", + "transactionHash": "0xabc", + "contractAddress": "0xcontract", + "recoveryCount": 3, + "occurredAt": 1779938084, + } + ]