Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/api/0005-api_server.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ curl -X POST "http://localhost:8848/api/v1/memories/batch" \
**Query Parameters**:
- `user_id` (optional): Filter by user ID
- `agent_id` (optional): Filter by agent ID
- `scope` (optional): Filter by metadata scope, such as personal or group memory
- `limit` (optional, default: 100): Maximum number of results (1-1000)
- `offset` (optional, default: 0): Number of results to skip
- `sort_by` (optional): Field to sort by. Options: `created_at`, `updated_at`, `id`. If not specified, results are returned in their original order
Expand All @@ -560,6 +561,10 @@ curl -X GET "http://localhost:8848/api/v1/memories?user_id=user-123&limit=20&off
curl -X GET "http://localhost:8848/api/v1/memories?agent_id=agent-456&limit=50&offset=0" \
-H "X-API-Key: test-api-key-123"

# Filter by metadata scope
curl -X GET "http://localhost:8848/api/v1/memories?user_id=user-123&agent_id=agent-456&scope=personal&limit=20&offset=0" \
-H "X-API-Key: test-api-key-123"

# Sort by updated_at (descending - most recent first)
curl -X GET "http://localhost:8848/api/v1/memories?user_id=user-123&limit=10&sort_by=updated_at&order=desc" \
-H "X-API-Key: test-api-key-123"
Expand Down
1 change: 0 additions & 1 deletion src/powermem/core/async_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,6 @@ async def get_all(
if self.enable_graph:
filters = {**(filters or {}), "user_id": user_id, "agent_id": agent_id, "run_id": run_id}
graph_results = await asyncio.to_thread(self.graph_store.get_all, filters, limit + offset)
results.extend(graph_results)
return {"results": results, "relations": graph_results}

return {"results": results}
Expand Down
3 changes: 1 addition & 2 deletions src/powermem/core/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,6 @@ def get_all(
if self.enable_graph:
filters = {**(filters or {}), "user_id": user_id, "agent_id": agent_id, "run_id": run_id}
graph_results = self.graph_store.get_all(filters, limit + offset)
results.extend(graph_results)
return {"results": results, "relations": graph_results}

return {"results": results}
Expand Down Expand Up @@ -2209,7 +2208,7 @@ def count_all(
"""
try:
count = self.storage.count_all_memories(
user_id, agent_id, run_id
user_id, agent_id, run_id, filters=filters
)

self.audit.log_event("memory.count_all", {
Expand Down
59 changes: 40 additions & 19 deletions src/powermem/storage/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

class StorageAdapter:
"""Adapter that bridges VectorStoreBase interface with Memory class expectations."""

_SYSTEM_FILTER_KEYS = {"user_id", "agent_id", "run_id"}

def __init__(self, vector_store: VectorStoreBase, embedding_service=None, sparse_embedder_service=None):
"""Initialize the adapter with a vector store and embedding service."""
Expand Down Expand Up @@ -54,6 +56,40 @@ def _generate_sparse_embedding(self, content: str, memory_action: str) -> Option
logger.warning(f"Failed to generate sparse embedding ({memory_action}): {e}")
return None

def _metadata_filter_key_for_store(self, key: str) -> str:
"""Translate logical metadata filters to backend-specific payload paths."""
store_module = self.vector_store.__class__.__module__
if (
store_module.endswith("sqlite.sqlite_vector_store")
or ".pgvector." in store_module
):
return f"metadata.{key}"
return key

def _build_db_filters(
self,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Build filters that can be executed by the vector store."""
db_filters: Dict[str, Any] = {}
if user_id:
db_filters["user_id"] = user_id
if agent_id:
db_filters["agent_id"] = agent_id
if run_id:
db_filters["run_id"] = run_id
if filters:
for key, value in filters.items():
if key in self._SYSTEM_FILTER_KEYS:
if key not in db_filters:
db_filters[key] = value
else:
db_filters[self._metadata_filter_key_for_store(key)] = value
return db_filters

def add_memory(self, memory_data: Dict[str, Any]) -> int:
"""Add a memory to the store."""
# ID will be generated using Snowflake algorithm before insertion
Expand Down Expand Up @@ -478,15 +514,8 @@ def get_all_memories(
filters: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""Get all memories with optional filtering and sorting."""
# Build filters for database-level filtering (only scope keys; backends use payload top-level)
db_filters: Dict[str, Any] = {}
if user_id:
db_filters["user_id"] = user_id
if agent_id:
db_filters["agent_id"] = agent_id
if run_id:
db_filters["run_id"] = run_id
# Pass only scope to DB; extra filters (e.g. metadata.name) applied in-memory below
db_filters = self._build_db_filters(user_id, agent_id, run_id, filters)

results = self.vector_store.list(
filters=db_filters if db_filters else None,
limit=limit,
Expand Down Expand Up @@ -566,7 +595,7 @@ def get_all_memories(
continue # one extra filter did not match, skip this memory

memories.append(memory)

return memories

def count_all_memories(
Expand All @@ -577,15 +606,7 @@ def count_all_memories(
filters: Optional[Dict[str, Any]] = None,
) -> int:
"""Count all memories with optional filtering."""
db_filters: Dict[str, Any] = {}
if user_id:
db_filters["user_id"] = user_id
if agent_id:
db_filters["agent_id"] = agent_id
if run_id:
db_filters["run_id"] = run_id
if filters:
db_filters.update(filters)
db_filters = self._build_db_filters(user_id, agent_id, run_id, filters)

try:
if hasattr(self.vector_store, "count"):
Expand Down
21 changes: 18 additions & 3 deletions src/powermem/storage/pgvector/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,12 @@ def search(

if filters:
for k, v in filters.items():
filter_conditions.append("payload->>%s = %s")
if "." in k:
filter_conditions.append(
"payload #>> string_to_array(%s, '.') = %s"
)
else:
filter_conditions.append("payload->>%s = %s")
filter_params.extend([k, str(v)])

filter_clause = "WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
Expand Down Expand Up @@ -403,7 +408,12 @@ def list(

if filters:
for k, v in filters.items():
filter_conditions.append("payload->>%s = %s")
if "." in k:
filter_conditions.append(
"payload #>> string_to_array(%s, '.') = %s"
)
else:
filter_conditions.append("payload->>%s = %s")
filter_params.extend([k, str(v)])

filter_clause = "WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
Expand Down Expand Up @@ -457,7 +467,12 @@ def count(self, filters: Optional[dict] = None) -> int:

if filters:
for k, v in filters.items():
filter_conditions.append("payload->>%s = %s")
if "." in k:
filter_conditions.append(
"payload #>> string_to_array(%s, '.') = %s"
)
else:
filter_conditions.append("payload->>%s = %s")
filter_params.extend([k, str(v)])

filter_clause = "WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
Expand Down
80 changes: 44 additions & 36 deletions src/server/api/v1/memories.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ async def list_memories(
request: Request,
user_id: Optional[str] = Query(None, description="Filter by user ID"),
agent_id: Optional[str] = Query(None, description="Filter by agent ID"),
scope: Optional[str] = Query(None, description="Filter by metadata scope"),
limit: int = Query(100, ge=1, le=1000, description="Maximum number of results"),
offset: int = Query(0, ge=0, description="Number of results to skip"),
sort_by: Optional[str] = Query(None, description="Field to sort by: 'created_at', 'updated_at', 'id'"),
Expand All @@ -202,17 +203,23 @@ async def list_memories(
):
"""List memories with pagination and sorting"""
cutoff_date = parse_time_range_cutoff(time_range)
filters = {"scope": scope} if scope is not None else None

if cutoff_date is None:
# Fast path: no time filter — storage handles pagination + sorting natively
total_count = service.count_memories(user_id=user_id, agent_id=agent_id)
total_count = service.count_memories(
user_id=user_id,
agent_id=agent_id,
filters=filters,
)
memories = service.list_memories(
user_id=user_id,
agent_id=agent_id,
limit=limit,
offset=offset,
sort_by=sort_by,
order=order,
filters=filters,
)
else:
# Storage doesn't natively filter by created_at range; pull a wide page,
Expand All @@ -225,6 +232,7 @@ async def list_memories(
offset=0,
sort_by=sort_by,
order=order,
filters=filters,
)

def _created_at(m):
Expand Down Expand Up @@ -351,6 +359,41 @@ async def get_unique_users(
)


@router.get(
"/export",
summary="Export memories",
description="Export memories to JSON or CSV file",
)
@limiter.limit(get_rate_limit_string())
async def export_memories(
request: Request,
format: str = Query("json", description="Export format (json/csv)"),
user_id: Optional[str] = Query(None, description="Filter by user ID"),
agent_id: Optional[str] = Query(None, description="Filter by agent ID"),
run_id: Optional[str] = Query(None, description="Filter by run ID"),
limit: int = Query(1000, ge=1, le=10000, description="Max memories to export"),
api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service),
):
"""Export memories"""
content = service.memory.export_memories(
format=format,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=limit,
)

media_type = "application/json" if format.lower() == "json" else "text/csv"
filename = f"memories_export.{format.lower()}"

return Response(
content=content,
media_type=media_type,
headers={"Content-Disposition": f"attachment; filename={filename}"},
)


@router.get(
"/{memory_id}",
response_model=APIResponse,
Expand Down Expand Up @@ -546,41 +589,6 @@ async def delete_memory(
)


@router.get(
"/export",
summary="Export memories",
description="Export memories to JSON or CSV file",
)
@limiter.limit(get_rate_limit_string())
async def export_memories(
request: Request,
format: str = Query("json", description="Export format (json/csv)"),
user_id: Optional[str] = Query(None, description="Filter by user ID"),
agent_id: Optional[str] = Query(None, description="Filter by agent ID"),
run_id: Optional[str] = Query(None, description="Filter by run ID"),
limit: int = Query(1000, ge=1, le=10000, description="Max memories to export"),
api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service),
):
"""Export memories"""
content = service.memory.export_memories(
format=format,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=limit,
)

media_type = "application/json" if format.lower() == "json" else "text/csv"
filename = f"memories_export.{format.lower()}"

return Response(
content=content,
media_type=media_type,
headers={"Content-Disposition": f"attachment; filename={filename}"},
)


@router.post(
"/import",
response_model=APIResponse,
Expand Down
8 changes: 7 additions & 1 deletion src/server/services/memory_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def list_memories(
offset: int = 0,
sort_by: Optional[str] = None,
order: str = "desc",
filters: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""
List memories with pagination and sorting.
Expand All @@ -236,6 +237,7 @@ def list_memories(
offset: Number of results to skip
sort_by: Optional field to sort by: 'created_at', 'updated_at', 'id'
order: Sort order: 'desc' (descending) or 'asc' (ascending)
filters: Additional metadata filters

Returns:
List of memories
Expand All @@ -246,6 +248,7 @@ def list_memories(
agent_id=agent_id,
limit=limit,
offset=offset,
filters=filters,
sort_by=sort_by,
order=order,
)
Expand Down Expand Up @@ -276,13 +279,15 @@ def count_memories(
self,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None,
) -> int:
"""
Count total memories matching the filters.

Args:
user_id: Filter by user ID
agent_id: Filter by agent ID
filters: Additional metadata filters

Returns:
Total count of memories
Expand All @@ -292,6 +297,7 @@ def count_memories(
count = self.memory.count_all(
user_id=user_id,
agent_id=agent_id,
filters=filters,
)
return count

Expand Down Expand Up @@ -769,4 +775,4 @@ async def analyze_memory_quality(
code=ErrorCode.INTERNAL_ERROR,
message=f"Failed to analyze memory quality: {str(e)}",
status_code=500,
)
)
11 changes: 10 additions & 1 deletion src/server/services/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ def update_user_memory(
agent_id=agent_id,
metadata=metadata,
)
if result is None:
raise APIError(
code=ErrorCode.MEMORY_NOT_FOUND,
message=f"Memory not found or access denied: {memory_id}",
status_code=404,
)
if isinstance(result, dict) and "id" not in result:
result["id"] = memory_id
result["memory_id"] = memory_id

logger.info(f"User memory updated: user_id={user_id}, memory_id={memory_id}")
return result
Expand Down Expand Up @@ -429,4 +438,4 @@ def count_profiles(self, user_id: Optional[str] = None, fuzzy: bool = False) ->

except Exception as e:
logger.error(f"Failed to count profiles: {e}", exc_info=True)
return 0
return 0
Loading
Loading