Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 59 additions & 21 deletions backend/protocol_rpc/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ async def _run_health_checks() -> None:
"max_recovery_exhausted_count": consensus_health.get(
"max_recovery_exhausted_count", 0
),
"max_recovery_exhausted_transactions": consensus_health.get(
"max_recovery_exhausted_transactions", []
),
"no_consensus_progress": consensus_health.get(
"no_consensus_progress", False
),
Expand Down Expand Up @@ -617,6 +620,9 @@ async def _check_consensus_health() -> Dict[str, Any]:
MAX_RECOVERY_EXHAUSTED_NOTICE_WINDOW_MINUTES = int(
os.environ.get("HEALTH_MAX_RECOVERY_EXHAUSTED_NOTICE_WINDOW_MINUTES", "60")
)
MAX_RECOVERY_EXHAUSTED_EVENT_LIMIT = int(
os.environ.get("HEALTH_MAX_RECOVERY_EXHAUSTED_EVENT_LIMIT", "10")
)

# Statuses where the consensus state machine is actively working.
# The "head of queue stuck" check uses ONLY these: ACCEPTED-class
Expand Down Expand Up @@ -767,37 +773,66 @@ def _query_consensus():
recovery_row.max_recovery_count if recovery_row else 0
)

max_recovery_exhausted_row = conn.execute(
max_recovery_exhausted_rows = conn.execute(
text(
"""
SELECT COUNT(*) AS n
FROM transactions
WHERE status = 'CANCELED'
AND consensus_data ->> 'error'
= 'max_recovery_cycles_exceeded'
AND CASE
WHEN consensus_data
->> 'max_recovery_exhausted_at'
~ '^[0-9]+(\\.[0-9]+)?$'
THEN to_timestamp(
(
consensus_data
->> 'max_recovery_exhausted_at'
)::double precision
)
ELSE created_at
END > NOW() - CAST(:notice_window AS INTERVAL)
WITH exhausted AS (
SELECT
hash AS tx_hash,
to_address,
recovery_count,
CASE
WHEN consensus_data
->> 'max_recovery_exhausted_at'
~ '^[0-9]+(\\.[0-9]+)?$'
THEN to_timestamp(
(
consensus_data
->> 'max_recovery_exhausted_at'
)::double precision
)
ELSE created_at
END AS exhausted_at
FROM transactions
WHERE status = 'CANCELED'
AND consensus_data ->> 'error'
= 'max_recovery_cycles_exceeded'
)
SELECT
COUNT(*) OVER() AS total_count,
tx_hash,
to_address,
recovery_count,
EXTRACT(EPOCH FROM exhausted_at)::bigint
AS exhausted_at_epoch
FROM exhausted
WHERE exhausted_at
> NOW() - CAST(:notice_window AS INTERVAL)
ORDER BY exhausted_at DESC
LIMIT :event_limit
"""
),
{
"notice_window": (
f"{MAX_RECOVERY_EXHAUSTED_NOTICE_WINDOW_MINUTES} minutes"
)
),
"event_limit": MAX_RECOVERY_EXHAUSTED_EVENT_LIMIT,
},
).fetchone()
).fetchall()
max_recovery_exhausted_count = (
max_recovery_exhausted_row.n if max_recovery_exhausted_row else 0
max_recovery_exhausted_rows[0].total_count
if max_recovery_exhausted_rows
else 0
)
max_recovery_exhausted_transactions = [
{
"hash": row.tx_hash,
"contract_address": row.to_address,
"recovery_count": row.recovery_count,
"exhausted_at": row.exhausted_at_epoch,
}
for row in max_recovery_exhausted_rows
]

# Total in-flight (non-final) tx count, for context.
# Consensus-active only — finalization-pending rows
Expand Down Expand Up @@ -960,6 +995,9 @@ def _query_consensus():
"recovery_storm_count": recovery_storm_count,
"max_recovery_count": max_recovery_count,
"max_recovery_exhausted_count": max_recovery_exhausted_count,
"max_recovery_exhausted_transactions": (
max_recovery_exhausted_transactions
),
"no_consensus_progress": no_consensus_progress,
"no_progress_backlog_count": no_progress_backlog_count,
"oldest_backlog_age_seconds": oldest_backlog_age_seconds,
Expand Down
15 changes: 15 additions & 0 deletions backend/services/usage_metrics_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ async def send_system_health_metrics(self, health_cache) -> None:
if mapped_status != "healthy":
system_health["instanceHealthReasons"] = health_cache.issues

max_recovery_events = health_cache.services.get("consensus", {}).get(
"max_recovery_exhausted_transactions", []
)
if max_recovery_events:
system_health["instanceHealthEvents"] = [
{
"type": "max_recovery_cycles_exhausted",
"transactionHash": event.get("hash"),
"contractAddress": event.get("contract_address"),
"recoveryCount": event.get("recovery_count"),
"occurredAt": event.get("exhausted_at"),
}
for event in max_recovery_events
]

# Add pending contracts breakdown if available
pending_contracts = getattr(health_cache, "pending_contracts", [])
if pending_contracts:
Expand Down
8 changes: 8 additions & 0 deletions tests/db-sqlalchemy/test_health_orphan_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,14 @@ async def test_recent_max_recovery_exhaustion_flags_notice(
result = await health_module._check_consensus_health()

assert result["max_recovery_exhausted_count"] == 1
assert result["max_recovery_exhausted_transactions"] == [
{
"hash": "0x" + "ab" * 32,
"contract_address": "0x" + "ab" * 20,
"recovery_count": 3,
"exhausted_at": int(now.timestamp()),
}
]
assert result["status"] == "degraded"


Expand Down
81 changes: 81 additions & 0 deletions tests/unit/test_rpc_health_genvm_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from types import SimpleNamespace

import backend.protocol_rpc.health as health_module

Expand Down Expand Up @@ -278,6 +279,86 @@ async def fake_memory_health():
assert health_module._health_cache.genvm_available_permits == 4
assert health_module._health_cache.genvm_active_executions == 1

@pytest.mark.asyncio
async def test_consensus_health_includes_max_recovery_exhaustion_events(
self, monkeypatch
):
exhausted_tx = SimpleNamespace(
total_count=1,
tx_hash="0xabc",
to_address="0xcontract",
recovery_count=3,
exhausted_at_epoch=1779938084,
)

class FakeResult:
def __init__(self, row=None, rows=None):
self.row = row
self.rows = rows or []

def fetchone(self):
return self.row

def fetchall(self):
return self.rows

class FakeConnection:
def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

def execute(self, statement, params=None):
query = str(statement)
if "COUNT(DISTINCT worker_id)" in query:
return FakeResult(SimpleNamespace(n=0))
if "COUNT(*) AS stuck_heads" in query:
return FakeResult(SimpleNamespace(stuck_heads=0))
if "timestamp_awaiting_finalization" in query:
return FakeResult(SimpleNamespace(n=0))
if "COALESCE(MAX(recovery_count), 0)" in query:
return FakeResult(SimpleNamespace(n=0, max_recovery_count=0))
if "max_recovery_cycles_exceeded" in query:
return FakeResult(rows=[exhausted_tx])
if "WHERE status IN ('ACTIVATED'" in query:
return FakeResult(SimpleNamespace(n=0))
if "backlog_count" in query:
return FakeResult(
SimpleNamespace(
backlog_count=0,
oldest_created_at=None,
oldest_backlog_age_seconds=None,
)
)
raise AssertionError(f"unexpected query: {query}")

class FakeEngine:
def connect(self):
return FakeConnection()

import backend.database_handler.session_factory as session_factory

monkeypatch.setattr(
session_factory,
"get_database_manager",
lambda: SimpleNamespace(engine=FakeEngine()),
)
monkeypatch.setattr(health_module, "_rpc_router_ref", object())

result = await health_module._check_consensus_health()

assert result["status"] == "degraded"
assert result["max_recovery_exhausted_count"] == 1
assert result["max_recovery_exhausted_transactions"] == [
{
"hash": "0xabc",
"contract_address": "0xcontract",
"recovery_count": 3,
"exhausted_at": 1779938084,
}
]


class TestThresholdConfig:
"""Test threshold configuration via environment variable."""
Expand Down
52 changes: 52 additions & 0 deletions tests/unit/test_usage_metrics_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock

import pytest

from backend.services.usage_metrics_service import UsageMetricsService


@pytest.mark.asyncio
async def test_system_health_metrics_include_max_recovery_events():
service = UsageMetricsService()
service._enabled = True
service._send_to_api = AsyncMock()

health_cache = SimpleNamespace(
status="degraded",
genvm_healthy=True,
uptime_percent=100.0,
pending_transactions=1,
total_decisions=2,
total_users=3,
issues=["max_recovery_cycles_exhausted"],
pending_contracts=[],
services={
"consensus": {
"active_workers": 1,
"max_recovery_exhausted_transactions": [
{
"hash": "0xabc",
"contract_address": "0xcontract",
"recovery_count": 3,
"exhausted_at": 1779938084,
}
],
},
"memory": {"percent": 4.0, "cpu_percent": 5.0},
},
)

await service.send_system_health_metrics(health_cache)

service._send_to_api.assert_awaited_once()
payload = service._send_to_api.await_args.args[0]
assert payload["systemHealth"]["instanceHealthEvents"] == [
{
"type": "max_recovery_cycles_exhausted",
"transactionHash": "0xabc",
"contractAddress": "0xcontract",
"recoveryCount": 3,
"occurredAt": 1779938084,
}
]
Loading