diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index f37703a78..d41918b70 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -57,28 +57,27 @@ def context_collection(name: str, vector_dim: int) -> Dict[str, Any]: Returns: Schema definition for the context collection """ - return { - "CollectionName": name, - "Description": "Unified context collection", - "Fields": [ - {"FieldName": "id", "FieldType": "string", "IsPrimaryKey": True}, - {"FieldName": "uri", "FieldType": "path"}, - # type 字段:当前版本未使用,保留用于未来扩展 - # 预留用于表示资源的具体类型,如 "file", "directory", "image", "video", "repository" 等 - {"FieldName": "type", "FieldType": "string"}, - # context_type 字段:区分上下文的大类 - # 枚举值:"resource"(资源,默认), "memory"(记忆), "skill"(技能) - # 推导规则: - # - URI 以 viking://agent/skills 开头 → "skill" - # - URI 包含 "memories" → "memory" - # - 其他情况 → "resource" - {"FieldName": "context_type", "FieldType": "string"}, - {"FieldName": "vector", "FieldType": "vector", "Dim": vector_dim}, - {"FieldName": "sparse_vector", "FieldType": "sparse_vector"}, - {"FieldName": "created_at", "FieldType": "date_time"}, - {"FieldName": "updated_at", "FieldType": "date_time"}, - {"FieldName": "active_count", "FieldType": "int64"}, - {"FieldName": "parent_uri", "FieldType": "path"}, + fields = [ + {"FieldName": "id", "FieldType": "string", "IsPrimaryKey": True}, + {"FieldName": "uri", "FieldType": "path"}, + # type 字段:当前版本未使用,保留用于未来扩展 + # 预留用于表示资源的具体类型,如 "file", "directory", "image", "video", "repository" 等 + {"FieldName": "type", "FieldType": "string"}, + # context_type 字段:区分上下文的大类 + # 枚举值:"resource"(资源,默认), "memory"(记忆), "skill"(技能) + # 推导规则: + # - URI 以 viking://agent/skills 开头 → "skill" + # - URI 包含 "memories" → "memory" + # - 其他情况 → "resource" + {"FieldName": "context_type", "FieldType": "string"}, + {"FieldName": "vector", "FieldType": "vector", "Dim": vector_dim}, + {"FieldName": "sparse_vector", "FieldType": "sparse_vector"}, + {"FieldName": "created_at", "FieldType": "date_time"}, + {"FieldName": "updated_at", "FieldType": "date_time"}, + {"FieldName": "active_count", "FieldType": "int64"}, + ] + fields.extend( + [ # level 字段:区分 L0/L1/L2 层级 # 枚举值: # - 0 = L0(abstract,摘要) @@ -95,21 +94,30 @@ def context_collection(name: str, vector_dim: int) -> Dict[str, Any]: {"FieldName": "abstract", "FieldType": "string"}, {"FieldName": "account_id", "FieldType": "string"}, {"FieldName": "owner_space", "FieldType": "string"}, - ], - "ScalarIndex": [ - "uri", - "type", - "context_type", - "created_at", - "updated_at", - "active_count", - "parent_uri", + ] + ) + scalar_index = [ + "uri", + "type", + "context_type", + "created_at", + "updated_at", + "active_count", + ] + scalar_index.extend( + [ "level", "name", "tags", "account_id", "owner_space", - ], + ] + ) + return { + "CollectionName": name, + "Description": "Unified context collection", + "Fields": fields, + "ScalarIndex": scalar_index, } diff --git a/openviking/storage/vectordb/collection/volcengine_collection.py b/openviking/storage/vectordb/collection/volcengine_collection.py index 18cfa86cb..30845b15d 100644 --- a/openviking/storage/vectordb/collection/volcengine_collection.py +++ b/openviking/storage/vectordb/collection/volcengine_collection.py @@ -132,7 +132,7 @@ def _sanitize_uri_value(v: Any) -> Any: @classmethod def _sanitize_payload(cls, obj: Any) -> Any: - """Recursively sanitize URI values in payload (including data and filter DSL), and forcefully add parent_uri if missing""" + """Recursively sanitize URI values in payload, including filter DSL.""" # Dictionary node if isinstance(obj, dict): return cls._sanitize_dict_payload(obj) @@ -167,8 +167,6 @@ def _sanitize_dict_payload(cls, obj: Dict[str, Any]) -> Any: if not new_obj: return None - # Forcefully add parent_uri: when the dictionary looks like a data record (contains uri) - cls._ensure_parent_uri(new_obj) return new_obj @classmethod @@ -211,13 +209,6 @@ def _sanitize_dict_keys(cls, obj: Dict[str, Any]) -> Dict[str, Any]: new_obj[k] = y return new_obj - @classmethod - def _ensure_parent_uri(cls, obj: Dict[str, Any]) -> None: - """Forcefully add parent_uri: when the dictionary looks like a data record (contains uri)""" - if "uri" in obj: - if "parent_uri" not in obj or not obj.get("parent_uri"): - obj["parent_uri"] = "/" - @classmethod def _sanitize_list_payload(cls, obj: List[Any]) -> List[Any]: """Sanitize list-type payload""" diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index 98c505b94..0bf15ea51 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -1380,7 +1380,7 @@ async def _update_vector_store_uris( ) -> None: """Update URIs in vector store (when moving files). - Preserves vector data, only updates uri and parent_uri fields, no need to regenerate embeddings. + Preserves vector data and updates URI-derived identifiers without regenerating embeddings. """ vector_store = self._get_vector_store() if not vector_store: @@ -1392,13 +1392,11 @@ async def _update_vector_store_uris( for uri in uris: try: new_uri = uri.replace(old_base_uri, new_base_uri, 1) - new_parent_uri = VikingURI(new_uri).parent.uri await vector_store.update_uri_mapping( ctx=self._ctx_or_default(ctx), uri=uri, new_uri=new_uri, - new_parent_uri=new_parent_uri, levels=levels, ) logger.debug(f"[VikingFS] Updated URI: {uri} -> {new_uri}") diff --git a/openviking/storage/viking_vector_index_backend.py b/openviking/storage/viking_vector_index_backend.py index 43925ac4d..6be6dd1c5 100644 --- a/openviking/storage/viking_vector_index_backend.py +++ b/openviking/storage/viking_vector_index_backend.py @@ -17,6 +17,67 @@ logger = get_logger(__name__) +RETRIEVAL_OUTPUT_FIELDS = [ + "uri", + "level", + "context_type", + "abstract", + "active_count", + "updated_at", +] + +LOOKUP_OUTPUT_FIELDS = [ + "uri", + "level", + "active_count", +] + +MEMORY_DEDUP_OUTPUT_FIELDS = [ + "uri", + "abstract", + "context_type", + "created_at", + "updated_at", + "active_count", + "level", + "account_id", + "owner_space", +] + +FETCH_BY_URI_OUTPUT_FIELDS = [ + "uri", + "type", + "context_type", + "created_at", + "updated_at", + "active_count", + "level", + "name", + "description", + "tags", + "abstract", + "account_id", + "owner_space", +] + +URI_REWRITE_OUTPUT_FIELDS = [ + "uri", + "type", + "context_type", + "vector", + "sparse_vector", + "created_at", + "updated_at", + "active_count", + "level", + "name", + "description", + "tags", + "abstract", + "account_id", + "owner_space", +] + class _SingleAccountBackend: """绑定单个 account 的后端实现(内部类)""" @@ -210,6 +271,7 @@ async def fetch_by_uri(self, uri: str) -> Optional[Dict[str, Any]]: records = await self.query( filter={"op": "must", "field": "uri", "conds": [uri]}, limit=2, + output_fields=FETCH_BY_URI_OUTPUT_FIELDS, ) if len(records) == 1: return records[0] @@ -300,6 +362,7 @@ async def remove_by_uri(self, uri: str) -> int: target_records = await self.filter( {"op": "must", "field": "uri", "conds": [uri]}, limit=10, + output_fields=LOOKUP_OUTPUT_FIELDS, ) if not target_records: return 0 @@ -319,8 +382,9 @@ async def remove_by_uri(self, uri: str) -> int: async def _remove_descendants(self, parent_uri: str) -> int: total_deleted = 0 children = await self.filter( - {"op": "must", "field": "parent_uri", "conds": [parent_uri]}, + PathScope("uri", parent_uri, depth=1), limit=100000, + output_fields=LOOKUP_OUTPUT_FIELDS, ) for child in children: child_uri = child.get("uri") @@ -715,6 +779,7 @@ async def search_in_tenant( filter=scope_filter, limit=limit, offset=offset, + output_fields=RETRIEVAL_OUTPUT_FIELDS, ctx=ctx, ) @@ -745,6 +810,7 @@ async def search_global_roots_in_tenant( sparse_query_vector=sparse_query_vector, filter=merged_filter, limit=limit, + output_fields=RETRIEVAL_OUTPUT_FIELDS, ctx=ctx, ) @@ -773,6 +839,7 @@ async def search_children_in_tenant( sparse_query_vector=sparse_query_vector, filter=merged_filter, limit=limit, + output_fields=RETRIEVAL_OUTPUT_FIELDS, ctx=ctx, ) @@ -800,6 +867,7 @@ async def search_similar_memories( query_vector=query_vector, filter=And(conds), limit=limit, + output_fields=MEMORY_DEDUP_OUTPUT_FIELDS, ) async def get_context_by_uri( @@ -818,7 +886,11 @@ async def get_context_by_uri( conds.append(Eq("level", level)) backend = self._get_backend_for_context(ctx) - return await backend.filter(filter=And(conds), limit=limit) + return await backend.filter( + filter=And(conds), + limit=limit, + output_fields=LOOKUP_OUTPUT_FIELDS, + ) async def delete_account_data(self, account_id: str, *, ctx: RequestContext) -> int: """删除指定 account 的所有数据(仅限,root 角色操作)""" @@ -848,7 +920,6 @@ async def update_uri_mapping( ctx: RequestContext, uri: str, new_uri: str, - new_parent_uri: str, levels: Optional[List[int]] = None, ) -> bool: import hashlib @@ -864,7 +935,12 @@ async def update_uri_mapping( ) conds.append(Eq("owner_space", owner_space)) - records = await self.filter(filter=And(conds), limit=100, ctx=ctx) + records = await self.filter( + filter=And(conds), + limit=100, + output_fields=URI_REWRITE_OUTPUT_FIELDS, + ctx=ctx, + ) if not records: return False @@ -894,7 +970,6 @@ def _seed_uri_for_id(uri: str, level: int) -> str: **record, "id": new_id, "uri": new_uri, - "parent_uri": new_parent_uri, } if await self.upsert(updated, ctx=ctx): success = True diff --git a/openviking/storage/vikingdb_manager.py b/openviking/storage/vikingdb_manager.py index f332ab32d..1777b380b 100644 --- a/openviking/storage/vikingdb_manager.py +++ b/openviking/storage/vikingdb_manager.py @@ -500,13 +500,11 @@ async def update_uri_mapping( self, uri: str, new_uri: str, - new_parent_uri: str, ) -> bool: return await self._manager.update_uri_mapping( self._ctx, uri=uri, new_uri=new_uri, - new_parent_uri=new_parent_uri, ) async def increment_active_count(self, uris: List[str]) -> int: diff --git a/tests/storage/test_collection_schemas.py b/tests/storage/test_collection_schemas.py index c25dbbda6..da5efa7a2 100644 --- a/tests/storage/test_collection_schemas.py +++ b/tests/storage/test_collection_schemas.py @@ -1,13 +1,18 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 +import inspect import json from types import SimpleNamespace import pytest from openviking.models.embedder.base import EmbedResult -from openviking.storage.collection_schemas import TextEmbeddingHandler +from openviking.storage.collection_schemas import ( + CollectionSchemas, + TextEmbeddingHandler, + init_context_collection, +) from openviking.storage.queuefs.embedding_msg import EmbeddingMsg @@ -21,8 +26,8 @@ def embed(self, text: str) -> EmbedResult: class _DummyConfig: - def __init__(self, embedder: _DummyEmbedder): - self.storage = SimpleNamespace(vectordb=SimpleNamespace(name="context")) + def __init__(self, embedder: _DummyEmbedder, backend: str = "volcengine"): + self.storage = SimpleNamespace(vectordb=SimpleNamespace(name="context", backend=backend)) self.embedding = SimpleNamespace( dimension=2, get_embedder=lambda: embedder, @@ -104,3 +109,97 @@ async def upsert(self, _data, *, ctx): assert embedder.calls == 1 assert status["success"] == 1 assert status["error"] == 0 + + +@pytest.mark.asyncio +async def test_embedding_handler_preserves_parent_uri_for_backend_upsert_logic(monkeypatch): + captured = {} + + class _CapturingVikingDB: + is_closing = False + mode = "local" + + async def upsert(self, data, *, ctx): + captured["data"] = dict(data) + return "rec-1" + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder), + ) + + handler = TextEmbeddingHandler(_CapturingVikingDB()) + payload = _build_queue_payload() + queue_data = json.loads(payload["data"]) + queue_data["context_data"]["parent_uri"] = "viking://resources" + payload["data"] = json.dumps(queue_data) + + result = await handler.on_dequeue(payload) + + assert result is not None + assert "data" in captured + assert captured["data"]["parent_uri"] == "viking://resources" + + +def test_context_collection_excludes_parent_uri(): + schema = CollectionSchemas.context_collection("ctx", 8) + + field_names = [field["FieldName"] for field in schema["Fields"]] + + assert "parent_uri" not in field_names + assert "parent_uri" not in schema["ScalarIndex"] + + +def test_context_collection_signature_has_no_include_parent_uri(): + signature = inspect.signature(CollectionSchemas.context_collection) + + assert "include_parent_uri" not in signature.parameters + + +@pytest.mark.asyncio +async def test_init_context_collection_uses_backend_specific_schema(monkeypatch): + captured = {} + + class _Storage: + async def create_collection(self, name, schema): + captured["name"] = name + captured["schema"] = schema + return True + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder, backend="volcengine"), + ) + + created = await init_context_collection(_Storage()) + + assert created is True + field_names = [field["FieldName"] for field in captured["schema"]["Fields"]] + assert "parent_uri" not in field_names + assert "parent_uri" not in captured["schema"]["ScalarIndex"] + + +@pytest.mark.asyncio +async def test_init_context_collection_excludes_parent_uri_for_local_backend(monkeypatch): + captured = {} + + class _Storage: + async def create_collection(self, name, schema): + captured["name"] = name + captured["schema"] = schema + return True + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder, backend="local"), + ) + + created = await init_context_collection(_Storage()) + + assert created is True + field_names = [field["FieldName"] for field in captured["schema"]["Fields"]] + assert "parent_uri" not in field_names + assert "parent_uri" not in captured["schema"]["ScalarIndex"] diff --git a/tests/storage/test_semantic_processor_mv_vector_store.py b/tests/storage/test_semantic_processor_mv_vector_store.py index 44e1c0698..a7bab1c68 100644 --- a/tests/storage/test_semantic_processor_mv_vector_store.py +++ b/tests/storage/test_semantic_processor_mv_vector_store.py @@ -21,7 +21,6 @@ async def update_uri_mapping( ctx: RequestContext, uri: str, new_uri: str, - new_parent_uri: str, levels: Optional[List[int]] = None, ) -> bool: def seed_uri_for_id(target_uri: str, level: int) -> str: @@ -58,7 +57,7 @@ def seed_uri_for_id(target_uri: str, level: int) -> str: new_record = dict(record) new_record["id"] = new_id new_record["uri"] = new_uri - new_record["parent_uri"] = new_parent_uri + new_record.pop("parent_uri", None) self.records.append(new_record) touched = True @@ -132,9 +131,27 @@ async def test_mv_vector_store_moves_records(monkeypatch): store = _FakeVectorStore( [ - {"id": "l0", "uri": old_uri, "level": 0, "account_id": ctx.account_id, "owner_space": ""}, - {"id": "l1", "uri": old_uri, "level": 1, "account_id": ctx.account_id, "owner_space": ""}, - {"id": "l2", "uri": old_uri, "level": 2, "account_id": ctx.account_id, "owner_space": ""}, + { + "id": "l0", + "uri": old_uri, + "level": 0, + "account_id": ctx.account_id, + "owner_space": "", + }, + { + "id": "l1", + "uri": old_uri, + "level": 1, + "account_id": ctx.account_id, + "owner_space": "", + }, + { + "id": "l2", + "uri": old_uri, + "level": 2, + "account_id": ctx.account_id, + "owner_space": "", + }, { "id": "child-l0", "uri": f"{old_uri}/x", @@ -175,6 +192,7 @@ def _ensure_access(self, uri, ctx): assert {r["id"] for r in store.records if r.get("uri") == old_uri} == {"l2"} assert {r["id"] for r in store.records if r.get("uri") == f"{old_uri}/x"} == {"child-l0"} assert {int(r["level"]) for r in store.records if r.get("uri") == new_uri} == {0, 1} + assert all("parent_uri" not in r for r in store.records if r.get("uri") == new_uri) assert set(store.deleted_ids) == {"l0", "l1"}