Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 40 additions & 32 deletions openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,27 @@ def context_collection(name: str, vector_dim: int) -> Dict[str, Any]:
Returns:
Schema definition for the context collection
"""
return {
"CollectionName": name,
"Description": "Unified context collection",
"Fields": [
{"FieldName": "id", "FieldType": "string", "IsPrimaryKey": True},
{"FieldName": "uri", "FieldType": "path"},
# type 字段:当前版本未使用,保留用于未来扩展
# 预留用于表示资源的具体类型,如 "file", "directory", "image", "video", "repository" 等
{"FieldName": "type", "FieldType": "string"},
# context_type 字段:区分上下文的大类
# 枚举值:"resource"(资源,默认), "memory"(记忆), "skill"(技能)
# 推导规则:
# - URI 以 viking://agent/skills 开头 → "skill"
# - URI 包含 "memories" → "memory"
# - 其他情况 → "resource"
{"FieldName": "context_type", "FieldType": "string"},
{"FieldName": "vector", "FieldType": "vector", "Dim": vector_dim},
{"FieldName": "sparse_vector", "FieldType": "sparse_vector"},
{"FieldName": "created_at", "FieldType": "date_time"},
{"FieldName": "updated_at", "FieldType": "date_time"},
{"FieldName": "active_count", "FieldType": "int64"},
{"FieldName": "parent_uri", "FieldType": "path"},
fields = [
{"FieldName": "id", "FieldType": "string", "IsPrimaryKey": True},
{"FieldName": "uri", "FieldType": "path"},
# type 字段:当前版本未使用,保留用于未来扩展
# 预留用于表示资源的具体类型,如 "file", "directory", "image", "video", "repository" 等
{"FieldName": "type", "FieldType": "string"},
# context_type 字段:区分上下文的大类
# 枚举值:"resource"(资源,默认), "memory"(记忆), "skill"(技能)
# 推导规则:
# - URI 以 viking://agent/skills 开头 → "skill"
# - URI 包含 "memories" → "memory"
# - 其他情况 → "resource"
{"FieldName": "context_type", "FieldType": "string"},
{"FieldName": "vector", "FieldType": "vector", "Dim": vector_dim},
{"FieldName": "sparse_vector", "FieldType": "sparse_vector"},
{"FieldName": "created_at", "FieldType": "date_time"},
{"FieldName": "updated_at", "FieldType": "date_time"},
{"FieldName": "active_count", "FieldType": "int64"},
]
fields.extend(
[
# level 字段:区分 L0/L1/L2 层级
# 枚举值:
# - 0 = L0(abstract,摘要)
Expand All @@ -95,21 +94,30 @@ def context_collection(name: str, vector_dim: int) -> Dict[str, Any]:
{"FieldName": "abstract", "FieldType": "string"},
{"FieldName": "account_id", "FieldType": "string"},
{"FieldName": "owner_space", "FieldType": "string"},
],
"ScalarIndex": [
"uri",
"type",
"context_type",
"created_at",
"updated_at",
"active_count",
"parent_uri",
]
)
scalar_index = [
"uri",
"type",
"context_type",
"created_at",
"updated_at",
"active_count",
]
scalar_index.extend(
[
"level",
"name",
"tags",
"account_id",
"owner_space",
],
]
)
return {
"CollectionName": name,
"Description": "Unified context collection",
"Fields": fields,
"ScalarIndex": scalar_index,
}


Expand Down
11 changes: 1 addition & 10 deletions openviking/storage/vectordb/collection/volcengine_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _sanitize_uri_value(v: Any) -> Any:

@classmethod
def _sanitize_payload(cls, obj: Any) -> Any:
"""Recursively sanitize URI values in payload (including data and filter DSL), and forcefully add parent_uri if missing"""
"""Recursively sanitize URI values in payload, including filter DSL."""
# Dictionary node
if isinstance(obj, dict):
return cls._sanitize_dict_payload(obj)
Expand Down Expand Up @@ -167,8 +167,6 @@ def _sanitize_dict_payload(cls, obj: Dict[str, Any]) -> Any:
if not new_obj:
return None

# Forcefully add parent_uri: when the dictionary looks like a data record (contains uri)
cls._ensure_parent_uri(new_obj)
return new_obj

@classmethod
Expand Down Expand Up @@ -211,13 +209,6 @@ def _sanitize_dict_keys(cls, obj: Dict[str, Any]) -> Dict[str, Any]:
new_obj[k] = y
return new_obj

@classmethod
def _ensure_parent_uri(cls, obj: Dict[str, Any]) -> None:
"""Forcefully add parent_uri: when the dictionary looks like a data record (contains uri)"""
if "uri" in obj:
if "parent_uri" not in obj or not obj.get("parent_uri"):
obj["parent_uri"] = "/"

@classmethod
def _sanitize_list_payload(cls, obj: List[Any]) -> List[Any]:
"""Sanitize list-type payload"""
Expand Down
4 changes: 1 addition & 3 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ async def _update_vector_store_uris(
) -> None:
"""Update URIs in vector store (when moving files).

Preserves vector data, only updates uri and parent_uri fields, no need to regenerate embeddings.
Preserves vector data and updates URI-derived identifiers without regenerating embeddings.
"""
vector_store = self._get_vector_store()
if not vector_store:
Expand All @@ -1392,13 +1392,11 @@ async def _update_vector_store_uris(
for uri in uris:
try:
new_uri = uri.replace(old_base_uri, new_base_uri, 1)
new_parent_uri = VikingURI(new_uri).parent.uri

await vector_store.update_uri_mapping(
ctx=self._ctx_or_default(ctx),
uri=uri,
new_uri=new_uri,
new_parent_uri=new_parent_uri,
levels=levels,
)
logger.debug(f"[VikingFS] Updated URI: {uri} -> {new_uri}")
Expand Down
85 changes: 80 additions & 5 deletions openviking/storage/viking_vector_index_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,67 @@

logger = get_logger(__name__)

RETRIEVAL_OUTPUT_FIELDS = [
"uri",
"level",
"context_type",
"abstract",
"active_count",
"updated_at",
]

LOOKUP_OUTPUT_FIELDS = [
"uri",
"level",
"active_count",
]

MEMORY_DEDUP_OUTPUT_FIELDS = [
"uri",
"abstract",
"context_type",
"created_at",
"updated_at",
"active_count",
"level",
"account_id",
"owner_space",
]

FETCH_BY_URI_OUTPUT_FIELDS = [
"uri",
"type",
"context_type",
"created_at",
"updated_at",
"active_count",
"level",
"name",
"description",
"tags",
"abstract",
"account_id",
"owner_space",
]

URI_REWRITE_OUTPUT_FIELDS = [
"uri",
"type",
"context_type",
"vector",
"sparse_vector",
"created_at",
"updated_at",
"active_count",
"level",
"name",
"description",
"tags",
"abstract",
"account_id",
"owner_space",
]


class _SingleAccountBackend:
"""绑定单个 account 的后端实现(内部类)"""
Expand Down Expand Up @@ -210,6 +271,7 @@ async def fetch_by_uri(self, uri: str) -> Optional[Dict[str, Any]]:
records = await self.query(
filter={"op": "must", "field": "uri", "conds": [uri]},
limit=2,
output_fields=FETCH_BY_URI_OUTPUT_FIELDS,
)
if len(records) == 1:
return records[0]
Expand Down Expand Up @@ -300,6 +362,7 @@ async def remove_by_uri(self, uri: str) -> int:
target_records = await self.filter(
{"op": "must", "field": "uri", "conds": [uri]},
limit=10,
output_fields=LOOKUP_OUTPUT_FIELDS,
)
if not target_records:
return 0
Expand All @@ -319,8 +382,9 @@ async def remove_by_uri(self, uri: str) -> int:
async def _remove_descendants(self, parent_uri: str) -> int:
total_deleted = 0
children = await self.filter(
{"op": "must", "field": "parent_uri", "conds": [parent_uri]},
PathScope("uri", parent_uri, depth=1),
limit=100000,
output_fields=LOOKUP_OUTPUT_FIELDS,
)
for child in children:
child_uri = child.get("uri")
Expand Down Expand Up @@ -715,6 +779,7 @@ async def search_in_tenant(
filter=scope_filter,
limit=limit,
offset=offset,
output_fields=RETRIEVAL_OUTPUT_FIELDS,
ctx=ctx,
)

Expand Down Expand Up @@ -745,6 +810,7 @@ async def search_global_roots_in_tenant(
sparse_query_vector=sparse_query_vector,
filter=merged_filter,
limit=limit,
output_fields=RETRIEVAL_OUTPUT_FIELDS,
ctx=ctx,
)

Expand Down Expand Up @@ -773,6 +839,7 @@ async def search_children_in_tenant(
sparse_query_vector=sparse_query_vector,
filter=merged_filter,
limit=limit,
output_fields=RETRIEVAL_OUTPUT_FIELDS,
ctx=ctx,
)

Expand Down Expand Up @@ -800,6 +867,7 @@ async def search_similar_memories(
query_vector=query_vector,
filter=And(conds),
limit=limit,
output_fields=MEMORY_DEDUP_OUTPUT_FIELDS,
)

async def get_context_by_uri(
Expand All @@ -818,7 +886,11 @@ async def get_context_by_uri(
conds.append(Eq("level", level))

backend = self._get_backend_for_context(ctx)
return await backend.filter(filter=And(conds), limit=limit)
return await backend.filter(
filter=And(conds),
limit=limit,
output_fields=LOOKUP_OUTPUT_FIELDS,
)

async def delete_account_data(self, account_id: str, *, ctx: RequestContext) -> int:
"""删除指定 account 的所有数据(仅限,root 角色操作)"""
Expand Down Expand Up @@ -848,7 +920,6 @@ async def update_uri_mapping(
ctx: RequestContext,
uri: str,
new_uri: str,
new_parent_uri: str,
levels: Optional[List[int]] = None,
) -> bool:
import hashlib
Expand All @@ -864,7 +935,12 @@ async def update_uri_mapping(
)
conds.append(Eq("owner_space", owner_space))

records = await self.filter(filter=And(conds), limit=100, ctx=ctx)
records = await self.filter(
filter=And(conds),
limit=100,
output_fields=URI_REWRITE_OUTPUT_FIELDS,
ctx=ctx,
)
if not records:
return False

Expand Down Expand Up @@ -894,7 +970,6 @@ def _seed_uri_for_id(uri: str, level: int) -> str:
**record,
"id": new_id,
"uri": new_uri,
"parent_uri": new_parent_uri,
}
if await self.upsert(updated, ctx=ctx):
success = True
Expand Down
2 changes: 0 additions & 2 deletions openviking/storage/vikingdb_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,11 @@ async def update_uri_mapping(
self,
uri: str,
new_uri: str,
new_parent_uri: str,
) -> bool:
return await self._manager.update_uri_mapping(
self._ctx,
uri=uri,
new_uri=new_uri,
new_parent_uri=new_parent_uri,
)

async def increment_active_count(self, uris: List[str]) -> int:
Expand Down
Loading
Loading