From 7efcc283d0a5098f377a49c57959c25e2372555c Mon Sep 17 00:00:00 2001 From: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com> Date: Wed, 20 May 2026 18:01:51 +0200 Subject: [PATCH] fix(worker): quarantine invalid async operations --- .../hindsight_api/worker/poller.py | 71 +++++++ .../tests/test_worker_quarantine.py | 182 ++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 hindsight-api-slim/tests/test_worker_quarantine.py diff --git a/hindsight-api-slim/hindsight_api/worker/poller.py b/hindsight-api-slim/hindsight_api/worker/poller.py index 8e4cfa5db..f36d3b728 100644 --- a/hindsight-api-slim/hindsight_api/worker/poller.py +++ b/hindsight-api-slim/hindsight_api/worker/poller.py @@ -195,6 +195,75 @@ async def _get_schemas(self) -> list[str | None]: # Convert default schema to None for SQL compatibility (no prefix), keep others as-is return [t.schema if t.schema != DEFAULT_DATABASE_SCHEMA else None for t in tenants] + async def _quarantine_unclaimable_pending_operations(self, schemas: list[str | None]) -> int: + """Move deterministic null-payload poison rows out of the pending lane. + + Worker claim queries intentionally skip ``task_payload IS NULL`` because + there is no executable task to hand to the executor. Most such rows are + legitimate ``batch_retain`` parent aggregators while their child rows + are still active. A null-payload row with no child references, or any + non-parent operation with a null payload, is unclaimable forever and + makes queue health look like ordinary backlog. Mark those rows failed so + the pending lane reflects claimable/retryable work only. + """ + if self._backend.backend_type != "postgresql": + return 0 + + total = 0 + error_message = "Quarantined unclaimable async operation: task_payload is NULL" + async with self._backend.acquire() as conn: + for schema in schemas: + table = fq_table("async_operations", schema) + try: + result = await conn.execute( + f""" + UPDATE {table} AS op + SET status = 'failed', + error_message = $1, + result_metadata = COALESCE(op.result_metadata, '{{}}'::jsonb) || jsonb_build_object( + 'quarantined', true, + 'quarantine_reason', 'task_payload_null', + 'quarantine_payload_null', true + ), + updated_at = now() + WHERE op.status = 'pending' + AND op.task_payload IS NULL + AND ( + op.operation_type != 'batch_retain' + OR NOT EXISTS ( + SELECT 1 + FROM {table} AS child + WHERE child.bank_id = op.bank_id + AND child.result_metadata::jsonb @> jsonb_build_object( + 'parent_operation_id', op.operation_id::text + ) + ) + ) + """, + error_message, + ) + count = int(result.split()[-1]) if result else 0 + if count: + total += count + schema_display = schema or "default" + logger.warning( + "Worker %s quarantined %d unclaimable pending async operation(s) in schema %s", + self._worker_id, + count, + schema_display, + ) + except Exception as e: + # Keep polling resilient on partially provisioned schemas or + # non-PostgreSQL backends whose JSON predicate syntax differs. + schema_display = f'"{schema}"' if schema else str(schema) + logger.debug( + "Worker %s could not quarantine unclaimable operations for schema %s: %s", + self._worker_id, + schema_display, + e, + ) + return total + async def _scan_active_schemas(self, schemas: list[str | None]) -> set[str | None]: """Find which schemas have pending work. @@ -317,6 +386,8 @@ async def claim_batch(self) -> list[ClaimedTask]: if not schemas: return [] + await self._quarantine_unclaimable_pending_operations(schemas) + # Scan: find which schemas have pending work using a lightweight # EXISTS check (no locks). Then only claim from those schemas # using the expensive FOR UPDATE SKIP LOCKED query. diff --git a/hindsight-api-slim/tests/test_worker_quarantine.py b/hindsight-api-slim/tests/test_worker_quarantine.py new file mode 100644 index 000000000..d46ab474e --- /dev/null +++ b/hindsight-api-slim/tests/test_worker_quarantine.py @@ -0,0 +1,182 @@ +"""Worker quarantine tests for unclaimable async operations.""" + +import json +import uuid +from typing import Any, cast + +import pytest +import pytest_asyncio + +pytestmark = pytest.mark.xdist_group("worker_tests") + + +async def _ensure_bank(pool, bank_id: str) -> None: + """Upsert a minimal bank row so FK on async_operations passes.""" + await pool.execute( + "INSERT INTO banks (bank_id, name) VALUES ($1, $2) ON CONFLICT DO NOTHING", + bank_id, + bank_id, + ) + + +@pytest_asyncio.fixture +async def backend(pg0_db_url): + """Create a DatabaseBackend for worker tests.""" + from hindsight_api.engine.db import create_database_backend + from hindsight_api.pg0 import resolve_database_url + + resolved_url = await resolve_database_url(pg0_db_url) + b = create_database_backend("postgresql") + await b.initialize(resolved_url, min_size=2, max_size=10, command_timeout=30) + yield b + await b.shutdown() + + +@pytest_asyncio.fixture +async def pool(backend): + """Expose the raw asyncpg pool from the backend for direct DB access in tests.""" + yield backend.get_pool() + + +@pytest_asyncio.fixture +async def clean_operations(pool): + """Remove rows that can interfere with worker claim/quarantine tests.""" + await pool.execute("DELETE FROM async_operations WHERE bank_id LIKE 'test-quarantine-%'") + yield + await pool.execute("DELETE FROM async_operations WHERE bank_id LIKE 'test-quarantine-%'") + + +class _NonPostgresBackend: + backend_type = "oracle" + + def acquire(self): # pragma: no cover - guard should return before DB access + raise AssertionError("non-PostgreSQL quarantine path should not acquire a connection") + + +class TestWorkerPollerQuarantine: + @pytest.mark.asyncio + async def test_quarantine_guard_skips_non_postgresql_backend(self): + """The PostgreSQL JSONB quarantine query must not run on other backends.""" + from hindsight_api.worker import WorkerPoller + + async def noop_executor(_task_dict): + return None + + poller = WorkerPoller( + backend=cast(Any, _NonPostgresBackend()), + worker_id="test-worker-quarantine", + executor=noop_executor, + slot_reservations={}, + ) + + assert await poller._quarantine_unclaimable_pending_operations([None, "tenant_a"]) == 0 + + @pytest.mark.asyncio + async def test_claim_batch_quarantines_null_payload_batch_parent_without_children( + self, pool, backend, clean_operations + ): + """A parent with no executable payload and no children cannot ever be claimed. + + This is a poison queue shape observed after crash recovery: a pending + batch_retain parent row with task_payload=NULL but no child rows. The + poller should move it out of the normal pending lane instead of letting + queue health report ordinary backlog forever. + """ + from hindsight_api.worker import WorkerPoller + + bank_id = f"test-quarantine-{uuid.uuid4().hex[:8]}" + await _ensure_bank(pool, bank_id) + parent_id = uuid.uuid4() + await pool.execute( + """ + INSERT INTO async_operations (operation_id, bank_id, operation_type, result_metadata, status, task_payload) + VALUES ($1, $2, 'batch_retain', $3::jsonb, 'pending', NULL) + """, + parent_id, + bank_id, + json.dumps({"items_count": 5, "num_sub_batches": 1, "is_parent": True}), + ) + + async def executor(_task_dict): # pragma: no cover - must not run + raise AssertionError("unclaimable null-payload parent should not execute") + + poller = WorkerPoller( + backend=backend, + worker_id="test-worker-quarantine", + executor=executor, + slot_reservations={}, + ) + + claimed = await poller.claim_batch() + assert claimed == [] + + row = await pool.fetchrow( + "SELECT status, error_message, task_payload, result_metadata, completed_at FROM async_operations WHERE operation_id = $1", + parent_id, + ) + assert row["status"] == "failed" + assert row["task_payload"] is None + assert row["completed_at"] is None + assert "unclaimable" in row["error_message"] + assert "task_payload is NULL" in row["error_message"] + metadata = json.loads(row["result_metadata"]) if isinstance(row["result_metadata"], str) else row["result_metadata"] + assert metadata["quarantined"] is True + assert metadata["quarantine_reason"] == "task_payload_null" + + @pytest.mark.asyncio + async def test_claim_batch_keeps_null_payload_batch_parent_with_children_pending( + self, pool, backend, clean_operations + ): + """Valid batch parents are aggregators and may have task_payload=NULL. + + A parent with children is not executable itself, but it is not poison: + child completion/failure should reconcile the parent. The quarantine + guard must not fail these legitimate aggregate rows. + """ + from hindsight_api.worker import WorkerPoller + + bank_id = f"test-quarantine-{uuid.uuid4().hex[:8]}" + await _ensure_bank(pool, bank_id) + parent_id = uuid.uuid4() + child_id = uuid.uuid4() + await pool.execute( + """ + INSERT INTO async_operations (operation_id, bank_id, operation_type, result_metadata, status, task_payload) + VALUES ($1, $2, 'batch_retain', $3::jsonb, 'pending', NULL) + """, + parent_id, + bank_id, + json.dumps({"items_count": 1, "num_sub_batches": 1, "is_parent": True}), + ) + await pool.execute( + """ + INSERT INTO async_operations (operation_id, bank_id, operation_type, result_metadata, status, task_payload) + VALUES ($1, $2, 'retain', $3::jsonb, 'pending', $4::jsonb) + """, + child_id, + bank_id, + json.dumps({"parent_operation_id": str(parent_id), "sub_batch_index": 1, "total_sub_batches": 1}), + json.dumps({"type": "batch_retain", "bank_id": bank_id, "contents": [{"content": "hello"}]}), + ) + + async def noop_executor(_task_dict): + return None + + poller = WorkerPoller( + backend=backend, + worker_id="test-worker-quarantine", + executor=noop_executor, + max_slots=1, + slot_reservations={}, + ) + + claimed = await poller.claim_batch() + assert len(claimed) == 1 + assert claimed[0].operation_id == str(child_id) + + parent = await pool.fetchrow( + "SELECT status, error_message FROM async_operations WHERE operation_id = $1", + parent_id, + ) + assert parent["status"] == "pending" + assert parent["error_message"] is None