Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions hindsight-api-slim/hindsight_api/worker/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,75 @@ async def _get_schemas(self) -> list[str | None]:
# Convert default schema to None for SQL compatibility (no prefix), keep others as-is
return [t.schema if t.schema != DEFAULT_DATABASE_SCHEMA else None for t in tenants]

async def _quarantine_unclaimable_pending_operations(self, schemas: list[str | None]) -> int:
"""Move deterministic null-payload poison rows out of the pending lane.

Worker claim queries intentionally skip ``task_payload IS NULL`` because
there is no executable task to hand to the executor. Most such rows are
legitimate ``batch_retain`` parent aggregators while their child rows
are still active. A null-payload row with no child references, or any
non-parent operation with a null payload, is unclaimable forever and
makes queue health look like ordinary backlog. Mark those rows failed so
the pending lane reflects claimable/retryable work only.
"""
if self._backend.backend_type != "postgresql":
return 0

total = 0
error_message = "Quarantined unclaimable async operation: task_payload is NULL"
async with self._backend.acquire() as conn:
for schema in schemas:
table = fq_table("async_operations", schema)
try:
result = await conn.execute(
f"""
UPDATE {table} AS op
SET status = 'failed',
error_message = $1,
result_metadata = COALESCE(op.result_metadata, '{{}}'::jsonb) || jsonb_build_object(
'quarantined', true,
'quarantine_reason', 'task_payload_null',
'quarantine_payload_null', true
),
updated_at = now()
WHERE op.status = 'pending'
AND op.task_payload IS NULL
AND (
op.operation_type != 'batch_retain'
OR NOT EXISTS (
SELECT 1
FROM {table} AS child
WHERE child.bank_id = op.bank_id
AND child.result_metadata::jsonb @> jsonb_build_object(
'parent_operation_id', op.operation_id::text
)
)
)
""",
error_message,
)
count = int(result.split()[-1]) if result else 0
if count:
total += count
schema_display = schema or "default"
logger.warning(
"Worker %s quarantined %d unclaimable pending async operation(s) in schema %s",
self._worker_id,
count,
schema_display,
)
except Exception as e:
# Keep polling resilient on partially provisioned schemas or
# non-PostgreSQL backends whose JSON predicate syntax differs.
schema_display = f'"{schema}"' if schema else str(schema)
logger.debug(
"Worker %s could not quarantine unclaimable operations for schema %s: %s",
self._worker_id,
schema_display,
e,
)
return total

async def _scan_active_schemas(self, schemas: list[str | None]) -> set[str | None]:
"""Find which schemas have pending work.

Expand Down Expand Up @@ -317,6 +386,8 @@ async def claim_batch(self) -> list[ClaimedTask]:
if not schemas:
return []

await self._quarantine_unclaimable_pending_operations(schemas)

# Scan: find which schemas have pending work using a lightweight
# EXISTS check (no locks). Then only claim from those schemas
# using the expensive FOR UPDATE SKIP LOCKED query.
Expand Down
182 changes: 182 additions & 0 deletions hindsight-api-slim/tests/test_worker_quarantine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""Worker quarantine tests for unclaimable async operations."""

import json
import uuid
from typing import Any, cast

import pytest
import pytest_asyncio

pytestmark = pytest.mark.xdist_group("worker_tests")


async def _ensure_bank(pool, bank_id: str) -> None:
"""Upsert a minimal bank row so FK on async_operations passes."""
await pool.execute(
"INSERT INTO banks (bank_id, name) VALUES ($1, $2) ON CONFLICT DO NOTHING",
bank_id,
bank_id,
)


@pytest_asyncio.fixture
async def backend(pg0_db_url):
"""Create a DatabaseBackend for worker tests."""
from hindsight_api.engine.db import create_database_backend
from hindsight_api.pg0 import resolve_database_url

resolved_url = await resolve_database_url(pg0_db_url)
b = create_database_backend("postgresql")
await b.initialize(resolved_url, min_size=2, max_size=10, command_timeout=30)
yield b
await b.shutdown()


@pytest_asyncio.fixture
async def pool(backend):
"""Expose the raw asyncpg pool from the backend for direct DB access in tests."""
yield backend.get_pool()


@pytest_asyncio.fixture
async def clean_operations(pool):
"""Remove rows that can interfere with worker claim/quarantine tests."""
await pool.execute("DELETE FROM async_operations WHERE bank_id LIKE 'test-quarantine-%'")
yield
await pool.execute("DELETE FROM async_operations WHERE bank_id LIKE 'test-quarantine-%'")


class _NonPostgresBackend:
backend_type = "oracle"

def acquire(self): # pragma: no cover - guard should return before DB access
raise AssertionError("non-PostgreSQL quarantine path should not acquire a connection")


class TestWorkerPollerQuarantine:
@pytest.mark.asyncio
async def test_quarantine_guard_skips_non_postgresql_backend(self):
"""The PostgreSQL JSONB quarantine query must not run on other backends."""
from hindsight_api.worker import WorkerPoller

async def noop_executor(_task_dict):
return None

poller = WorkerPoller(
backend=cast(Any, _NonPostgresBackend()),
worker_id="test-worker-quarantine",
executor=noop_executor,
slot_reservations={},
)

assert await poller._quarantine_unclaimable_pending_operations([None, "tenant_a"]) == 0

@pytest.mark.asyncio
async def test_claim_batch_quarantines_null_payload_batch_parent_without_children(
self, pool, backend, clean_operations
):
"""A parent with no executable payload and no children cannot ever be claimed.

This is a poison queue shape observed after crash recovery: a pending
batch_retain parent row with task_payload=NULL but no child rows. The
poller should move it out of the normal pending lane instead of letting
queue health report ordinary backlog forever.
"""
from hindsight_api.worker import WorkerPoller

bank_id = f"test-quarantine-{uuid.uuid4().hex[:8]}"
await _ensure_bank(pool, bank_id)
parent_id = uuid.uuid4()
await pool.execute(
"""
INSERT INTO async_operations (operation_id, bank_id, operation_type, result_metadata, status, task_payload)
VALUES ($1, $2, 'batch_retain', $3::jsonb, 'pending', NULL)
""",
parent_id,
bank_id,
json.dumps({"items_count": 5, "num_sub_batches": 1, "is_parent": True}),
)

async def executor(_task_dict): # pragma: no cover - must not run
raise AssertionError("unclaimable null-payload parent should not execute")

poller = WorkerPoller(
backend=backend,
worker_id="test-worker-quarantine",
executor=executor,
slot_reservations={},
)

claimed = await poller.claim_batch()
assert claimed == []

row = await pool.fetchrow(
"SELECT status, error_message, task_payload, result_metadata, completed_at FROM async_operations WHERE operation_id = $1",
parent_id,
)
assert row["status"] == "failed"
assert row["task_payload"] is None
assert row["completed_at"] is None
assert "unclaimable" in row["error_message"]
assert "task_payload is NULL" in row["error_message"]
metadata = json.loads(row["result_metadata"]) if isinstance(row["result_metadata"], str) else row["result_metadata"]
assert metadata["quarantined"] is True
assert metadata["quarantine_reason"] == "task_payload_null"

@pytest.mark.asyncio
async def test_claim_batch_keeps_null_payload_batch_parent_with_children_pending(
self, pool, backend, clean_operations
):
"""Valid batch parents are aggregators and may have task_payload=NULL.

A parent with children is not executable itself, but it is not poison:
child completion/failure should reconcile the parent. The quarantine
guard must not fail these legitimate aggregate rows.
"""
from hindsight_api.worker import WorkerPoller

bank_id = f"test-quarantine-{uuid.uuid4().hex[:8]}"
await _ensure_bank(pool, bank_id)
parent_id = uuid.uuid4()
child_id = uuid.uuid4()
await pool.execute(
"""
INSERT INTO async_operations (operation_id, bank_id, operation_type, result_metadata, status, task_payload)
VALUES ($1, $2, 'batch_retain', $3::jsonb, 'pending', NULL)
""",
parent_id,
bank_id,
json.dumps({"items_count": 1, "num_sub_batches": 1, "is_parent": True}),
)
await pool.execute(
"""
INSERT INTO async_operations (operation_id, bank_id, operation_type, result_metadata, status, task_payload)
VALUES ($1, $2, 'retain', $3::jsonb, 'pending', $4::jsonb)
""",
child_id,
bank_id,
json.dumps({"parent_operation_id": str(parent_id), "sub_batch_index": 1, "total_sub_batches": 1}),
json.dumps({"type": "batch_retain", "bank_id": bank_id, "contents": [{"content": "hello"}]}),
)

async def noop_executor(_task_dict):
return None

poller = WorkerPoller(
backend=backend,
worker_id="test-worker-quarantine",
executor=noop_executor,
max_slots=1,
slot_reservations={},
)

claimed = await poller.claim_batch()
assert len(claimed) == 1
assert claimed[0].operation_id == str(child_id)

parent = await pool.fetchrow(
"SELECT status, error_message FROM async_operations WHERE operation_id = $1",
parent_id,
)
assert parent["status"] == "pending"
assert parent["error_message"] is None