Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion hindsight-api-slim/hindsight_api/_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import logging
import os

from sqlalchemy import text
from sqlalchemy.engine import Connection
Expand Down Expand Up @@ -34,7 +35,7 @@
"pgvector": "USING hnsw (embedding vector_cosine_ops)",
"pgvectorscale": "USING diskann (embedding vector_cosine_ops) WITH (num_neighbors = 50)",
"pg_diskann": "USING diskann (embedding vector_cosine_ops) WITH (max_neighbors = 50)",
"vchord": "USING vchordrq (embedding vector_l2_ops)",
"vchord": "USING vchordrq (embedding vector_cosine_ops)",
"scann": "USING scann (embedding cosine) WITH (mode = 'AUTO')",
}

Expand All @@ -46,6 +47,32 @@
"scann": "scann",
}

# Per-backend ANN search-time tuning GUCs. Each entry is a tuple of
# (guc_name, value) pairs the caller can apply with SET or SET LOCAL.
#
# - pgvector exposes hnsw.ef_search. The 60 / 200 pair is unchanged from the
# pre-dispatcher code (internal benchmarks tuned around our embedding count
# and recall floor; see the link_utils / pool init call sites for the
# latency-vs-recall framing).
# - vchord exposes vchordrq.probes (no default; see VectorChord issue #392)
# and vchordrq.epsilon (default 1.9). probes = 10 / 30 are starting
# defaults pending a workload-specific sweep — vchordrq's recall curve
# shape differs from HNSW's, so the pgvector numbers don't translate
# directly. Revisit with a per-cluster benchmark once we have production
# recall data; until then these are deliberately conservative on the
# high-recall path. We leave epsilon at its default; tightening it is a
# separate trade-off.
# - pgvectorscale / pg_diskann / scann do not expose an equivalent per-statement
# knob in the engine today, so the dispatcher returns no statements for them.
_ANN_TUNING_LOW_LATENCY: dict[str, tuple[tuple[str, str], ...]] = {
"pgvector": (("hnsw.ef_search", "60"),),
"vchord": (("vchordrq.probes", "10"),),
}
_ANN_TUNING_HIGH_RECALL: dict[str, tuple[tuple[str, str], ...]] = {
"pgvector": (("hnsw.ef_search", "200"),),
"vchord": (("vchordrq.probes", "30"),),
}

_EXTENSION_INSTALL_SQL = {
"pgvector": ("CREATE EXTENSION IF NOT EXISTS vector",),
"pgvectorscale": (
Expand All @@ -67,6 +94,18 @@
}


def configured_vector_extension() -> str:
"""Return the user-configured vector backend extension.

Reads ``HINDSIGHT_API_VECTOR_EXTENSION`` (default ``"pgvector"``) and
validates it via :func:`validate_extension`. This is the single source of
truth for runtime code that needs to dispatch behaviour by vector backend;
callers should prefer this over reading the env var directly, so the
default value and the lookup mechanism live in one place.
"""
return validate_extension(os.getenv("HINDSIGHT_API_VECTOR_EXTENSION", "pgvector"))


def validate_extension(name: str) -> str:
"""Return a normalized configurable vector extension name or raise.

Expand Down Expand Up @@ -115,6 +154,25 @@ def should_defer_index_creation(ext: str, row_count: int) -> bool:
return minimum_rows > 0 and row_count < minimum_rows


def ann_search_tuning_settings(ext: str, *, kind: str) -> tuple[tuple[str, str], ...]:
"""Return per-backend (guc_name, value) pairs for ANN search-time tuning.

``kind`` is ``"low_latency"`` for retain-side link probing (smaller probe
count, lower recall, lower latency) and ``"high_recall"`` for connection
init in the pool (larger probe count, higher recall). Callers wrap each
pair with ``SET LOCAL`` or ``SET`` themselves so the same dispatcher works
for both transaction-scoped and session-scoped use. Returns an empty tuple
for backends without an equivalent knob.
"""
if kind == "low_latency":
table = _ANN_TUNING_LOW_LATENCY
elif kind == "high_recall":
table = _ANN_TUNING_HIGH_RECALL
else:
raise ValueError(f"Unknown ANN tuning kind: {kind!r}")
return table.get(_normalize_resolved(ext), ())


def uses_per_bank_vector_indexes(ext: str) -> bool:
"""Return whether the backend should create per-bank partial vector indexes."""
return _normalize_resolved(ext) != "scann"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _vector_index_using_clause(ext: str) -> str:
if ext == "pg_diskann":
return "USING diskann (embedding vector_cosine_ops) WITH (max_neighbors = 50)"
if ext == "vchord":
return "USING vchordrq (embedding vector_l2_ops)"
return "USING vchordrq (embedding vector_cosine_ops)"
if ext == "scann":
return "USING scann (embedding cosine) WITH (mode = 'AUTO')"
return "USING hnsw (embedding vector_cosine_ops)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _vector_index_using_clause(ext: str) -> str:
if ext == "pgvectorscale":
return "USING diskann (embedding vector_cosine_ops) WITH (num_neighbors = 50)"
if ext == "vchord":
return "USING vchordrq (embedding vector_l2_ops)"
return "USING vchordrq (embedding vector_cosine_ops)"
if ext == "scann":
return "USING scann (embedding cosine) WITH (mode = 'AUTO')"
return "USING hnsw (embedding vector_cosine_ops)"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Re-create vchord vector indexes with vector_cosine_ops

Revision ID: b8c9d0e1f2a3
Revises: 86f7a033d372
Create Date: 2026-05-20

vchordrq operator classes are bound 1:1 to operators in PostgreSQL:
vector_l2_ops only matches ``<->``, while every Hindsight ANN query uses
``<=>`` (cosine distance). The previous vchord mapping used vector_l2_ops,
so vchord deployments could never use the index — every ANN query fell
back to a sequential scan with per-row cosine computation.

This migration finds any vchordrq index built with vector_l2_ops in the
target schema and re-creates it with vector_cosine_ops, using
``CREATE INDEX CONCURRENTLY`` so it can run online. It is a no-op when:

* the configured vector extension is not vchord, or
* no matching indexes exist (already on cosine ops).

Only PostgreSQL is affected; the Oracle 23ai dialect uses its own native
vector index and does not depend on this mapping.
"""

from __future__ import annotations

import re
from collections.abc import Sequence

from alembic import context, op
from sqlalchemy import text

from hindsight_api._vector_index import configured_vector_extension
from hindsight_api.alembic._dialect import run_for_dialect

revision: str = "b8c9d0e1f2a3"
down_revision: str | Sequence[str] | None = "86f7a033d372"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def _pg_schema_prefix() -> str:
"""Schema-qualifier for raw SQL on PG (multi-tenant search_path)."""
schema = context.config.get_main_option("target_schema")
return f'"{schema}".' if schema else ""


def _rebuild_vchordrq_indexes(old_ops: str, new_ops: str) -> None:
"""Rebuild vchordrq indexes using ``old_ops`` so they use ``new_ops``.

Each index is rebuilt with CREATE INDEX CONCURRENTLY under a fresh name,
then the old index is dropped and the new one renamed to take its place.
Must be called inside an ``autocommit_block()`` because CONCURRENTLY
cannot run inside a transaction.
"""
bind = op.get_bind()
# `or None` collapses both unset and explicit empty-string Alembic options
# into NULL so the COALESCE below falls back to current_schema() in either
# case. Without it, an empty-string option would filter on `schemaname = ''`
# and skip every real schema.
target_schema = context.config.get_main_option("target_schema") or None
prefix = _pg_schema_prefix()

rows = bind.execute(
text(
"SELECT indexname, indexdef FROM pg_indexes "
"WHERE schemaname = COALESCE(:target_schema, current_schema()) "
"AND indexdef ILIKE '%vchordrq%' "
"AND indexdef ILIKE :ops_like"
),
{"target_schema": target_schema, "ops_like": f"%{old_ops}%"},
).fetchall()

for idx_name, indexdef in rows:
# pg_get_indexdef() emits the canonical form `CREATE INDEX <name> ON …`,
# so <name> is the first textual occurrence — both substitutions below
# rely on that.
new_def = indexdef.replace(old_ops, new_ops, 1)
temp_name = f"{idx_name}__opclass_swap"
new_def = new_def.replace(idx_name, temp_name, 1)
new_def = re.sub(
r"^CREATE\s+INDEX\b",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS",
new_def,
count=1,
)

# CREATE INDEX CONCURRENTLY can leave the partial index as INVALID if a
# previous run errored (disk pressure, lock conflict, signal). Without
# this drop the CONCURRENTLY IF NOT EXISTS below would skip creation,
# then we'd drop the original and rename the broken index into its
# place — silently restoring the seq-scan bug this migration fixes.
op.execute(f'DROP INDEX IF EXISTS {prefix}"{temp_name}"')
op.execute(new_def)

# Even on a clean run CONCURRENTLY can finish with indisvalid = false
# (e.g. constraint violation during the second build scan). Refuse to
# promote in that case so we never alias an INVALID index over a working
# one.
is_valid = bind.execute(
text(
"SELECT i.indisvalid "
"FROM pg_class c "
"JOIN pg_index i ON c.oid = i.indexrelid "
"JOIN pg_namespace n ON c.relnamespace = n.oid "
"WHERE c.relname = :name "
" AND n.nspname = COALESCE(:target_schema, current_schema())"
),
{"name": temp_name, "target_schema": target_schema},
).scalar()
if not is_valid:
raise RuntimeError(
f"vchordrq index rebuild produced an INVALID index ({temp_name}); "
"drop it manually and re-run the migration."
)

# DROP + RENAME atomically. A crash between the two would leave
# `temp_name` as a valid orphan and the canonical name missing —
# next run's `pg_indexes` filter (looking for vector_l2_ops) wouldn't
# find anything to recover from, so the index would stay gone. PG
# runs the DO block in its own server-side transaction, so either
# both succeed or both roll back.
op.execute(
f"""
DO $$
BEGIN
DROP INDEX IF EXISTS {prefix}"{idx_name}";
ALTER INDEX {prefix}"{temp_name}" RENAME TO "{idx_name}";
END $$;
"""
)


def _pg_upgrade() -> None:
if configured_vector_extension() != "vchord":
return
with op.get_context().autocommit_block():
_rebuild_vchordrq_indexes("vector_l2_ops", "vector_cosine_ops")


def _pg_downgrade() -> None:
if configured_vector_extension() != "vchord":
return
with op.get_context().autocommit_block():
_rebuild_vchordrq_indexes("vector_cosine_ops", "vector_l2_ops")


def upgrade() -> None:
run_for_dialect(pg=_pg_upgrade)


def downgrade() -> None:
run_for_dialect(pg=_pg_downgrade)
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _vector_index_using_clause(ext: str) -> str:
if ext == "pgvectorscale":
return "USING diskann (embedding vector_cosine_ops) WITH (num_neighbors = 50)"
if ext == "vchord":
return "USING vchordrq (embedding vector_l2_ops)"
return "USING vchordrq (embedding vector_cosine_ops)"
if ext == "scann":
return "USING scann (embedding cosine) WITH (mode = 'AUTO')"
return "USING hnsw (embedding vector_cosine_ops)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _vector_index_using_clause(ext: str) -> str:
if ext == "pg_diskann":
return "USING diskann (embedding vector_cosine_ops) WITH (max_neighbors = 50)"
if ext == "vchord":
return "USING vchordrq (embedding vector_l2_ops)"
return "USING vchordrq (embedding vector_cosine_ops)"
if ext == "scann":
return "USING scann (embedding cosine) WITH (mode = 'AUTO')"
return "USING hnsw (embedding vector_cosine_ops)"
Expand Down
23 changes: 16 additions & 7 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import httpx
import tiktoken

from .._vector_index import ann_search_tuning_settings, configured_vector_extension
from ..config import (
DEFAULT_RECALL_CHUNKS_MAX_TOKENS,
DEFAULT_RECALL_INCLUDE_CHUNKS,
Expand Down Expand Up @@ -2047,13 +2048,21 @@ async def verify_llm():

# Per-connection initialization callback (PostgreSQL-specific for now)
async def _init_connection(conn: asyncpg.Connection) -> None:
# SET (not SET LOCAL) so it persists for the connection lifetime.
# ef_search=200 improves HNSW recall quality for the per-fact_type
# semantic queries in retrieve_semantic_bm25_combined().
try:
await conn.execute("SET hnsw.ef_search = 200")
except Exception:
logger.debug("Could not set hnsw.ef_search — extension may not support it")
# SET (not SET LOCAL) so per-backend ANN tuning persists for the
# connection lifetime. Each backend exposes its own GUC: pgvector
# uses hnsw.ef_search, vchord uses vchordrq.probes. The dispatcher
# returns the right one for the configured extension, tuned for
# the higher recall the per-fact_type semantic queries in
# retrieve_semantic_bm25_combined() need.
for guc, value in ann_search_tuning_settings(configured_vector_extension(), kind="high_recall"):
try:
await conn.execute(f"SET {guc} = {value}")
except asyncpg.exceptions.PostgresError:
# Defensive net for env mis-config (e.g. extension configured
# for vchord but the cluster only has pgvector). Narrow to
# PostgresError so genuine bugs in the pool/conn layer surface
# instead of being silently logged at debug level.
logger.debug("Could not set %s — extension may not support it", guc)

# Server-side safety net for runaway queries. Migrations use a
# separate SQLAlchemy/psycopg2 engine, so long-running DDL is
Expand Down
19 changes: 11 additions & 8 deletions hindsight-api-slim/hindsight_api/engine/retain/link_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import UTC, datetime, timedelta
from uuid import UUID

from ..._vector_index import ann_search_tuning_settings, configured_vector_extension
from ..memory_engine import fq_table
from .types import EntityLink

Expand Down Expand Up @@ -701,16 +702,18 @@ async def compute_semantic_links_ann(
# `relation "_ann_seeds" does not exist` on the second statement.
#
# Using ON COMMIT DROP + SET LOCAL also means we don't have to remember to
# manually drop the temp table or reset hnsw.ef_search — the transaction
# end handles both.
# manually drop the temp table or reset the per-backend ANN tuning GUC —
# the transaction end handles both.
rows: list = []
async with conn.transaction():
# Transaction-local ef_search. Default 400 is tuned for recall precision
# but at 164k units each HNSW probe takes 94ms. ef_search=60 gives 2.7ms
# per probe (35x faster) with sufficient accuracy for top-50 semantic
# link creation. SET LOCAL auto-reverts at commit, so we don't pollute
# the pool for subsequent recall queries.
await conn.execute("SET LOCAL hnsw.ef_search = 60")
# Transaction-local ANN tuning. Each supported backend exposes its own
# GUC (hnsw.ef_search on pgvector, vchordrq.probes on vchord); the
# dispatcher returns the right knob for the configured backend with a
# value tuned for top-50 semantic link creation (lower recall but much
# lower latency than the recall-side default). SET LOCAL auto-reverts
# at commit, so we don't pollute the pool for subsequent queries.
for guc, value in ann_search_tuning_settings(configured_vector_extension(), kind="low_latency"):
await conn.execute(f"SET LOCAL {guc} = {value}")

t_setup = time_mod.time()
await conn.execute("CREATE TEMP TABLE _ann_seeds (unit_id text, emb_text text, fact_type text) ON COMMIT DROP")
Expand Down
25 changes: 15 additions & 10 deletions hindsight-api-slim/tests/test_link_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,16 @@ async def test_no_manual_drop_or_truncate(self, mock_conn):
)

@pytest.mark.asyncio
async def test_uses_set_local_for_ef_search(self, mock_conn):
"""hnsw.ef_search must be set with SET LOCAL so the change is scoped
to the transaction. Without SET LOCAL, the setting would leak onto
the pooled backend and affect subsequent recall queries that land
on the same backend."""
@pytest.mark.parametrize(
("ext", "guc"),
[("pgvector", "hnsw.ef_search"), ("vchord", "vchordrq.probes")],
)
async def test_uses_set_local_for_ann_tuning(self, mock_conn, monkeypatch, ext, guc):
"""The per-backend ANN tuning GUC must be set with SET LOCAL so the
change is scoped to the transaction. Without SET LOCAL, the setting
would leak onto the pooled backend and affect subsequent recall
queries that land on the same backend."""
monkeypatch.setenv("HINDSIGHT_API_VECTOR_EXTENSION", ext)
emb = [0.1] * 384
await compute_semantic_links_ann(
conn=mock_conn,
Expand All @@ -524,11 +529,11 @@ async def test_uses_set_local_for_ef_search(self, mock_conn):
)

executed_sql = [call.args[0] for call in mock_conn.execute.call_args_list]
ef_statements = [s for s in executed_sql if "hnsw.ef_search" in s]
assert ef_statements, "ef_search must be tuned down for retain ANN"
for stmt in ef_statements:
tuning_statements = [s for s in executed_sql if guc in s]
assert tuning_statements, f"{guc} must be tuned for retain ANN under ext={ext}"
for stmt in tuning_statements:
assert stmt.strip().startswith("SET LOCAL"), (
f"hnsw.ef_search must use SET LOCAL, got: {stmt}"
f"{guc} must use SET LOCAL, got: {stmt}"
)
# And there must not be a RESET — SET LOCAL handles it at commit.
assert not any("RESET hnsw.ef_search" in s for s in executed_sql)
assert not any(f"RESET {guc}" in s for s in executed_sql)
Loading