Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Compared to stuffing full conversation context on [LOCOMO](https://github.com/sn

**Interfaces and tooling** — [Python integration](docs/examples/scenario_1_basic_usage.md); [CLI](docs/guides/0012-cli_usage.md) (`pmem`); [HTTP API / Dashboard](docs/api/0005-api_server.md); [MCP](docs/api/0004-mcp.md) (optional); [IDE apps](apps/README.md) (VS Code / Cursor, Claude Code, and more).

**Memory pipeline and retrieval** — [Smart extraction and updates](docs/examples/scenario_2_intelligent_memory.md); [Ebbinghaus-style decay](docs/examples/scenario_8_ebbinghaus_forgetting_curve.md); [Hybrid retrieval (vector / full-text / graph)](docs/examples/scenario_2_intelligent_memory.md); [Sub stores and routing](docs/examples/scenario_6_sub_stores.md).
**Memory pipeline and retrieval** — [Smart extraction and updates](docs/examples/scenario_2_intelligent_memory.md); [Ebbinghaus-style decay and access-based reinforcement](docs/examples/scenario_8_ebbinghaus_forgetting_curve.md); [Hybrid retrieval (vector / full-text / graph)](docs/examples/scenario_2_intelligent_memory.md); [Sub stores and routing](docs/examples/scenario_6_sub_stores.md).

**Profiles and multi-agent** — [User profile](docs/examples/scenario_9_user_memory.md); [Shared / isolated memory and scopes](docs/examples/scenario_3_multi_agent.md).

Expand Down
4 changes: 2 additions & 2 deletions docs/architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ The `ImportanceEvaluator` uses LLM capabilities to evaluate memory importance:
The `EbbinghausAlgorithm` implements the forgetting curve theory:

- **Decay Calculation**: `R = e^(-t/S)` where R is retention, t is time, S is strength
- **Reinforcement**: Increases retention strength when memories are accessed
- **Memory Promotion**: Automatically promotes memories between layers based on retention scores
- **Reinforcement**: Accesses and reviews refresh the active review timestamp before decay is evaluated
- **Memory Promotion**: Access-triggered promotion is evaluated before forgetting so reinforced memories can move between layers
- **Forgetting Detection**: Identifies memories that should be forgotten or archived

### Memory Processor
Expand Down
23 changes: 21 additions & 2 deletions src/powermem/core/async_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,12 @@ async def _simple_add_async(
else:
enhanced_metadata = {"scope": scope}

# Keep infer=True aligned with the sync/simple add paths so lifecycle
# fields are persisted for later promotion/forgetting decisions.
extra_fields = {}
if self._intelligence_plugin and self._intelligence_plugin.enabled:
extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata)

# Final validation before storage
if not content or not content.strip():
raise ValueError(f"Refusing to store empty content. Original messages: {messages}")
Expand Down Expand Up @@ -937,6 +943,10 @@ async def _create_memory_async(
enhanced_metadata = {**enhanced_metadata, "scope": scope}
else:
enhanced_metadata = {"scope": scope}

extra_fields = {}
if self._intelligence_plugin and self._intelligence_plugin.enabled:
extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata)

# Generate content hash
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
Expand All @@ -958,6 +968,9 @@ async def _create_memory_async(
"created_at": get_current_datetime(),
"updated_at": get_current_datetime(),
}

if extra_fields:
memory_data.update(extra_fields)

memory_id = await self.storage.add_memory_async(memory_data)

Expand Down Expand Up @@ -1111,6 +1124,9 @@ async def search(
for key in ["id", "created_at", "updated_at", "user_id", "agent_id", "run_id"]:
if key in result:
transformed_result[key] = result[key]
for key, value in result.items():
if key not in transformed_result and key not in {"memory", "metadata", "score"}:
transformed_result[key] = value
transformed_results.append(transformed_result)

# Log audit event
Expand Down Expand Up @@ -1172,8 +1188,11 @@ async def get(
updates, delete_flag = self._intelligence_plugin.on_get(result)
try:
if delete_flag:
await self.storage.delete_memory_async(memory_id, user_id, agent_id)
return None
logger.info(f"Memory {memory_id} marked as 'should_forget' by intelligence plugin")
if updates is None:
updates = {}
updates["should_forget"] = True
updates["marked_for_forgetting_at"] = get_current_datetime().isoformat()
if updates:
await self.storage.update_memory_async(memory_id, {**updates}, user_id, agent_id)
except Exception:
Expand Down
20 changes: 20 additions & 0 deletions src/powermem/core/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,12 @@ def _simple_add(
else:
enhanced_metadata = {"scope": scope}

# Preserve the same intelligent annotations as the simple add path so
# infer=True writes memories that can later be promoted/forgotten.
extra_fields = {}
if self._intelligence_plugin and self._intelligence_plugin.enabled:
extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata)

# Final validation before storage
if not content or not content.strip():
raise ValueError(f"Refusing to store empty content. Original messages: {messages}")
Expand Down Expand Up @@ -1109,6 +1115,12 @@ def _create_memory(
enhanced_metadata = {**enhanced_metadata, "scope": scope}
else:
enhanced_metadata = {"scope": scope}

# Keep infer=True aligned with the simple add path so lifecycle fields
# are persisted for later promotion/forgetting decisions.
extra_fields = {}
if self._intelligence_plugin and self._intelligence_plugin.enabled:
extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata)

# Generate content hash
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
Expand All @@ -1130,6 +1142,9 @@ def _create_memory(
"created_at": get_current_datetime(),
"updated_at": get_current_datetime(),
}

if extra_fields:
memory_data.update(extra_fields)

memory_id = self.storage.add_memory(memory_data)

Expand Down Expand Up @@ -1296,6 +1311,11 @@ def search(
for key in ["id", "created_at", "updated_at", "user_id", "agent_id", "run_id"]:
if key in result:
transformed_result[key] = result[key]

# Preserve intelligent lifecycle fields surfaced by the storage layer.
for key, value in result.items():
if key not in transformed_result and key not in {"memory", "metadata", "score"}:
transformed_result[key] = value

# Ensure memory_id field exists (for API compatibility)
if "id" in transformed_result and "memory_id" not in transformed_result:
Expand Down
44 changes: 36 additions & 8 deletions src/powermem/intelligence/ebbinghaus_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,22 +230,20 @@ def should_forget(self, memory: Dict[str, Any]) -> bool:
True if memory should be forgotten
"""
try:
review_reference = self._get_review_reference_time(memory)

# Check decay factor
created_at = memory.get("created_at")
if created_at:
decay_factor = self.calculate_decay(created_at)
if review_reference:
decay_factor = self.calculate_decay(review_reference)
if decay_factor < self.working_threshold:
return True

# Check access frequency
access_count = memory.get("access_count", 0)
if access_count == 0:
# Check if memory is old enough to be forgotten
if created_at:
# Parse string to datetime if needed
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
time_elapsed = get_current_datetime() - created_at
if review_reference:
time_elapsed = get_current_datetime() - review_reference
if time_elapsed > timedelta(days=7):
return True

Expand Down Expand Up @@ -331,6 +329,36 @@ def _get_decay_rate_for_type(self, memory_type: str) -> float:
"long_term": self.decay_rate, # Standard decay for long-term
}
return decay_rates.get(memory_type, self.decay_rate)

def _parse_datetime(self, value: Any) -> datetime:
"""Normalize stored timestamps into datetime objects."""
if isinstance(value, datetime):
return value
if isinstance(value, str):
return datetime.fromisoformat(value.replace('Z', '+00:00'))
raise TypeError(f"Unsupported datetime value: {type(value)!r}")

def _get_review_reference_time(self, memory: Dict[str, Any]) -> Optional[datetime]:
"""
Use the most recent review/access timestamp for decay checks.

Falling back to created_at preserves compatibility with older records
that do not yet store review metadata.
"""
intelligence = memory.get("intelligence")
candidates = []
if isinstance(intelligence, dict):
candidates.append(intelligence.get("last_reviewed"))
candidates.extend([memory.get("updated_at"), memory.get("created_at")])

for candidate in candidates:
if not candidate:
continue
try:
return self._parse_datetime(candidate)
except Exception:
continue
return None

def _generate_review_schedule(self, importance_score: float, created_at: datetime) -> List[datetime]:
"""Generate review schedule based on importance and Ebbinghaus curve."""
Expand Down
65 changes: 50 additions & 15 deletions src/powermem/intelligence/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,42 +122,77 @@ def on_get(self, memory: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], bool
if not self.enabled or not self._algo:
return None, False
try:
review_time = get_current_datetime()
reviewed_access_count = (memory.get("access_count") or 0) + 1
reviewed_memory = memory.copy()

reviewed_intelligence = memory.get("intelligence")
if isinstance(reviewed_intelligence, dict):
reviewed_intelligence = reviewed_intelligence.copy()
else:
reviewed_intelligence = {}
reviewed_intelligence["last_reviewed"] = review_time.isoformat()
reviewed_intelligence["review_count"] = reviewed_intelligence.get("review_count", 0) + 1
reviewed_intelligence["access_count"] = reviewed_access_count

updates: Dict[str, Any] = {
"access_count": (memory.get("access_count") or 0) + 1,
"updated_at": get_current_datetime(),
"access_count": reviewed_access_count,
"updated_at": review_time,
"intelligence": reviewed_intelligence,
}

# Check if memory should be forgotten
if self._algo.should_forget(memory):
return None, True

# Check if memory should be promoted
if self._algo.should_promote(memory):

reviewed_memory["access_count"] = reviewed_access_count
reviewed_memory["updated_at"] = review_time
reviewed_memory["intelligence"] = reviewed_intelligence

# Access should first strengthen/potentially promote the memory
# before we consider forgetting it.
promoted = False
if self._algo.should_promote(reviewed_memory):
current = memory.get("memory_type")
if current == "working":
updates["memory_type"] = "short_term"
reviewed_memory["memory_type"] = "short_term"
promoted = True
elif current == "short_term":
updates["memory_type"] = "long_term"

reviewed_memory["memory_type"] = "long_term"
promoted = True

if not promoted and self._algo.should_forget(reviewed_memory):
return None, True

# Check if memory should be archived
if self._algo.should_archive(memory):
if self._algo.should_archive(reviewed_memory):
meta = memory.get("metadata") or {}
meta["archived"] = True
updates["metadata"] = meta

# Re-process content if memory type changed or if it's been accessed multiple times
access_count = memory.get("access_count", 0) + 1
access_count = reviewed_access_count
if (updates.get("memory_type") != memory.get("memory_type") or
access_count % 5 == 0): # Re-process every 5 accesses

original_content = memory.get("original_content") or memory.get("content", "")
original_content = (
memory.get("original_content")
or memory.get("content", "")
or memory.get("memory", "")
)
importance_score = memory.get("importance_score", 0.5)
memory_type = updates.get("memory_type") or memory.get("memory_type", "working")

# Re-process with updated parameters
intelligence_metadata = self._algo.process_memory_metadata(original_content, importance_score, memory_type)
updates.update(intelligence_metadata)
updates["last_reprocessed_at"] = get_current_datetime()
refreshed_intelligence = intelligence_metadata.get("intelligence", {})
if refreshed_intelligence:
refreshed_intelligence["last_reviewed"] = reviewed_intelligence["last_reviewed"]
refreshed_intelligence["review_count"] = reviewed_intelligence["review_count"]
refreshed_intelligence["access_count"] = reviewed_access_count
updates["intelligence"] = refreshed_intelligence
if intelligence_metadata.get("memory_management"):
updates["memory_management"] = intelligence_metadata["memory_management"]
updates["processing_applied"] = True
updates["last_reprocessed_at"] = review_time

return updates, False
except Exception as e:
Expand Down
35 changes: 34 additions & 1 deletion src/powermem/storage/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@

class StorageAdapter:
"""Adapter that bridges VectorStoreBase interface with Memory class expectations."""

_PAYLOAD_RESERVED_KEYS = {
"id",
"data",
"content",
"fulltext_content",
"sparse_embedding",
"hash",
"created_at",
"updated_at",
"metadata",
"user_id",
"agent_id",
"run_id",
"actor_id",
"role",
"category",
}

def __init__(self, vector_store: VectorStoreBase, embedding_service=None, sparse_embedder_service=None):
"""Initialize the adapter with a vector store and embedding service."""
Expand All @@ -33,6 +51,14 @@ def __init__(self, vector_store: VectorStoreBase, embedding_service=None, sparse

# Ensure collection exists (will be created with actual vector size when first vector is added)
# self.vector_store.create_col(self.collection_name, vector_size=1536, distance="cosine")

def _extract_additional_payload_fields(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Expose intelligent lifecycle fields that are stored at payload top level."""
return {
key: value
for key, value in payload.items()
if key not in self._PAYLOAD_RESERVED_KEYS and value is not None
}

def _generate_sparse_embedding(self, content: str, memory_action: str) -> Optional[Any]:
"""
Expand Down Expand Up @@ -268,6 +294,8 @@ def search_memories(
**promoted_fields, # Add promoted fields at top level
"metadata": user_metadata if user_metadata else {}, # Add user metadata
}

memory.update(self._extract_additional_payload_fields(payload))

# No need to apply filters here - filters are already applied at the database level
# in vector_store.search(), so all returned results should already match the filters
Expand Down Expand Up @@ -296,7 +324,9 @@ def get_memory(
"metadata": result.payload.get("metadata", {}),
"created_at": result.payload.get("created_at"),
"updated_at": result.payload.get("updated_at"),
"category": result.payload.get("category"),
}
memory.update(self._extract_additional_payload_fields(result.payload))

# Check access control
if user_id and memory.get("user_id") != user_id:
Expand All @@ -315,14 +345,16 @@ def get_memory(
content = result.payload.get("data") or result.payload.get("content") or ""
memory = {
"id": result.id,
"content": content,
"content": content,
"user_id": result.payload.get("user_id"),
"agent_id": result.payload.get("agent_id"),
"run_id": result.payload.get("run_id"),
"metadata": result.payload.get("metadata", {}),
"created_at": result.payload.get("created_at"),
"updated_at": result.payload.get("updated_at"),
"category": result.payload.get("category"),
}
memory.update(self._extract_additional_payload_fields(result.payload))

# Check access control
if user_id and memory.get("user_id") != user_id:
Expand Down Expand Up @@ -540,6 +572,7 @@ def get_all_memories(
"created_at": created_at,
"updated_at": updated_at,
}
memory.update(self._extract_additional_payload_fields(payload))

# Apply filters (as double-check if database didn't filter)
# Note: If filters were applied at database level, these will all pass
Expand Down
Loading
Loading