diff --git a/README.md b/README.md index 9aaa428c..880d0b99 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ Compared to stuffing full conversation context on [LOCOMO](https://github.com/sn **Interfaces and tooling** — [Python integration](docs/examples/scenario_1_basic_usage.md); [CLI](docs/guides/0012-cli_usage.md) (`pmem`); [HTTP API / Dashboard](docs/api/0005-api_server.md); [MCP](docs/api/0004-mcp.md) (optional); [IDE apps](apps/README.md) (VS Code / Cursor, Claude Code, and more). -**Memory pipeline and retrieval** — [Smart extraction and updates](docs/examples/scenario_2_intelligent_memory.md); [Ebbinghaus-style decay](docs/examples/scenario_8_ebbinghaus_forgetting_curve.md); [Hybrid retrieval (vector / full-text / graph)](docs/examples/scenario_2_intelligent_memory.md); [Sub stores and routing](docs/examples/scenario_6_sub_stores.md). +**Memory pipeline and retrieval** — [Smart extraction and updates](docs/examples/scenario_2_intelligent_memory.md); [Ebbinghaus-style decay and access-based reinforcement](docs/examples/scenario_8_ebbinghaus_forgetting_curve.md); [Hybrid retrieval (vector / full-text / graph)](docs/examples/scenario_2_intelligent_memory.md); [Sub stores and routing](docs/examples/scenario_6_sub_stores.md). **Profiles and multi-agent** — [User profile](docs/examples/scenario_9_user_memory.md); [Shared / isolated memory and scopes](docs/examples/scenario_3_multi_agent.md). diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md index 55396e15..c5ce0a81 100644 --- a/docs/architecture/overview.md +++ b/docs/architecture/overview.md @@ -262,8 +262,8 @@ The `ImportanceEvaluator` uses LLM capabilities to evaluate memory importance: The `EbbinghausAlgorithm` implements the forgetting curve theory: - **Decay Calculation**: `R = e^(-t/S)` where R is retention, t is time, S is strength -- **Reinforcement**: Increases retention strength when memories are accessed -- **Memory Promotion**: Automatically promotes memories between layers based on retention scores +- **Reinforcement**: Accesses and reviews refresh the active review timestamp before decay is evaluated +- **Memory Promotion**: Access-triggered promotion is evaluated before forgetting so reinforced memories can move between layers - **Forgetting Detection**: Identifies memories that should be forgotten or archived ### Memory Processor diff --git a/src/powermem/core/async_memory.py b/src/powermem/core/async_memory.py index d46d68eb..131c8bfe 100644 --- a/src/powermem/core/async_memory.py +++ b/src/powermem/core/async_memory.py @@ -564,6 +564,12 @@ async def _simple_add_async( else: enhanced_metadata = {"scope": scope} + # Keep infer=True aligned with the sync/simple add paths so lifecycle + # fields are persisted for later promotion/forgetting decisions. + extra_fields = {} + if self._intelligence_plugin and self._intelligence_plugin.enabled: + extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) + # Final validation before storage if not content or not content.strip(): raise ValueError(f"Refusing to store empty content. Original messages: {messages}") @@ -937,6 +943,10 @@ async def _create_memory_async( enhanced_metadata = {**enhanced_metadata, "scope": scope} else: enhanced_metadata = {"scope": scope} + + extra_fields = {} + if self._intelligence_plugin and self._intelligence_plugin.enabled: + extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) # Generate content hash content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() @@ -958,6 +968,9 @@ async def _create_memory_async( "created_at": get_current_datetime(), "updated_at": get_current_datetime(), } + + if extra_fields: + memory_data.update(extra_fields) memory_id = await self.storage.add_memory_async(memory_data) @@ -1111,6 +1124,9 @@ async def search( for key in ["id", "created_at", "updated_at", "user_id", "agent_id", "run_id"]: if key in result: transformed_result[key] = result[key] + for key, value in result.items(): + if key not in transformed_result and key not in {"memory", "metadata", "score"}: + transformed_result[key] = value transformed_results.append(transformed_result) # Log audit event @@ -1172,8 +1188,11 @@ async def get( updates, delete_flag = self._intelligence_plugin.on_get(result) try: if delete_flag: - await self.storage.delete_memory_async(memory_id, user_id, agent_id) - return None + logger.info(f"Memory {memory_id} marked as 'should_forget' by intelligence plugin") + if updates is None: + updates = {} + updates["should_forget"] = True + updates["marked_for_forgetting_at"] = get_current_datetime().isoformat() if updates: await self.storage.update_memory_async(memory_id, {**updates}, user_id, agent_id) except Exception: diff --git a/src/powermem/core/memory.py b/src/powermem/core/memory.py index bde203c6..6860250f 100644 --- a/src/powermem/core/memory.py +++ b/src/powermem/core/memory.py @@ -741,6 +741,12 @@ def _simple_add( else: enhanced_metadata = {"scope": scope} + # Preserve the same intelligent annotations as the simple add path so + # infer=True writes memories that can later be promoted/forgotten. + extra_fields = {} + if self._intelligence_plugin and self._intelligence_plugin.enabled: + extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) + # Final validation before storage if not content or not content.strip(): raise ValueError(f"Refusing to store empty content. Original messages: {messages}") @@ -1109,6 +1115,12 @@ def _create_memory( enhanced_metadata = {**enhanced_metadata, "scope": scope} else: enhanced_metadata = {"scope": scope} + + # Keep infer=True aligned with the simple add path so lifecycle fields + # are persisted for later promotion/forgetting decisions. + extra_fields = {} + if self._intelligence_plugin and self._intelligence_plugin.enabled: + extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) # Generate content hash content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() @@ -1130,6 +1142,9 @@ def _create_memory( "created_at": get_current_datetime(), "updated_at": get_current_datetime(), } + + if extra_fields: + memory_data.update(extra_fields) memory_id = self.storage.add_memory(memory_data) @@ -1296,6 +1311,11 @@ def search( for key in ["id", "created_at", "updated_at", "user_id", "agent_id", "run_id"]: if key in result: transformed_result[key] = result[key] + + # Preserve intelligent lifecycle fields surfaced by the storage layer. + for key, value in result.items(): + if key not in transformed_result and key not in {"memory", "metadata", "score"}: + transformed_result[key] = value # Ensure memory_id field exists (for API compatibility) if "id" in transformed_result and "memory_id" not in transformed_result: diff --git a/src/powermem/intelligence/ebbinghaus_algorithm.py b/src/powermem/intelligence/ebbinghaus_algorithm.py index b4648c07..93721627 100644 --- a/src/powermem/intelligence/ebbinghaus_algorithm.py +++ b/src/powermem/intelligence/ebbinghaus_algorithm.py @@ -230,10 +230,11 @@ def should_forget(self, memory: Dict[str, Any]) -> bool: True if memory should be forgotten """ try: + review_reference = self._get_review_reference_time(memory) + # Check decay factor - created_at = memory.get("created_at") - if created_at: - decay_factor = self.calculate_decay(created_at) + if review_reference: + decay_factor = self.calculate_decay(review_reference) if decay_factor < self.working_threshold: return True @@ -241,11 +242,8 @@ def should_forget(self, memory: Dict[str, Any]) -> bool: access_count = memory.get("access_count", 0) if access_count == 0: # Check if memory is old enough to be forgotten - if created_at: - # Parse string to datetime if needed - if isinstance(created_at, str): - created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) - time_elapsed = get_current_datetime() - created_at + if review_reference: + time_elapsed = get_current_datetime() - review_reference if time_elapsed > timedelta(days=7): return True @@ -331,6 +329,36 @@ def _get_decay_rate_for_type(self, memory_type: str) -> float: "long_term": self.decay_rate, # Standard decay for long-term } return decay_rates.get(memory_type, self.decay_rate) + + def _parse_datetime(self, value: Any) -> datetime: + """Normalize stored timestamps into datetime objects.""" + if isinstance(value, datetime): + return value + if isinstance(value, str): + return datetime.fromisoformat(value.replace('Z', '+00:00')) + raise TypeError(f"Unsupported datetime value: {type(value)!r}") + + def _get_review_reference_time(self, memory: Dict[str, Any]) -> Optional[datetime]: + """ + Use the most recent review/access timestamp for decay checks. + + Falling back to created_at preserves compatibility with older records + that do not yet store review metadata. + """ + intelligence = memory.get("intelligence") + candidates = [] + if isinstance(intelligence, dict): + candidates.append(intelligence.get("last_reviewed")) + candidates.extend([memory.get("updated_at"), memory.get("created_at")]) + + for candidate in candidates: + if not candidate: + continue + try: + return self._parse_datetime(candidate) + except Exception: + continue + return None def _generate_review_schedule(self, importance_score: float, created_at: datetime) -> List[datetime]: """Generate review schedule based on importance and Ebbinghaus curve.""" diff --git a/src/powermem/intelligence/plugin.py b/src/powermem/intelligence/plugin.py index e526ae0b..d58cc98b 100644 --- a/src/powermem/intelligence/plugin.py +++ b/src/powermem/intelligence/plugin.py @@ -122,42 +122,77 @@ def on_get(self, memory: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], bool if not self.enabled or not self._algo: return None, False try: + review_time = get_current_datetime() + reviewed_access_count = (memory.get("access_count") or 0) + 1 + reviewed_memory = memory.copy() + + reviewed_intelligence = memory.get("intelligence") + if isinstance(reviewed_intelligence, dict): + reviewed_intelligence = reviewed_intelligence.copy() + else: + reviewed_intelligence = {} + reviewed_intelligence["last_reviewed"] = review_time.isoformat() + reviewed_intelligence["review_count"] = reviewed_intelligence.get("review_count", 0) + 1 + reviewed_intelligence["access_count"] = reviewed_access_count + updates: Dict[str, Any] = { - "access_count": (memory.get("access_count") or 0) + 1, - "updated_at": get_current_datetime(), + "access_count": reviewed_access_count, + "updated_at": review_time, + "intelligence": reviewed_intelligence, } - - # Check if memory should be forgotten - if self._algo.should_forget(memory): - return None, True - - # Check if memory should be promoted - if self._algo.should_promote(memory): + + reviewed_memory["access_count"] = reviewed_access_count + reviewed_memory["updated_at"] = review_time + reviewed_memory["intelligence"] = reviewed_intelligence + + # Access should first strengthen/potentially promote the memory + # before we consider forgetting it. + promoted = False + if self._algo.should_promote(reviewed_memory): current = memory.get("memory_type") if current == "working": updates["memory_type"] = "short_term" + reviewed_memory["memory_type"] = "short_term" + promoted = True elif current == "short_term": updates["memory_type"] = "long_term" - + reviewed_memory["memory_type"] = "long_term" + promoted = True + + if not promoted and self._algo.should_forget(reviewed_memory): + return None, True + # Check if memory should be archived - if self._algo.should_archive(memory): + if self._algo.should_archive(reviewed_memory): meta = memory.get("metadata") or {} meta["archived"] = True updates["metadata"] = meta # Re-process content if memory type changed or if it's been accessed multiple times - access_count = memory.get("access_count", 0) + 1 + access_count = reviewed_access_count if (updates.get("memory_type") != memory.get("memory_type") or access_count % 5 == 0): # Re-process every 5 accesses - original_content = memory.get("original_content") or memory.get("content", "") + original_content = ( + memory.get("original_content") + or memory.get("content", "") + or memory.get("memory", "") + ) importance_score = memory.get("importance_score", 0.5) memory_type = updates.get("memory_type") or memory.get("memory_type", "working") # Re-process with updated parameters intelligence_metadata = self._algo.process_memory_metadata(original_content, importance_score, memory_type) - updates.update(intelligence_metadata) - updates["last_reprocessed_at"] = get_current_datetime() + refreshed_intelligence = intelligence_metadata.get("intelligence", {}) + if refreshed_intelligence: + refreshed_intelligence["last_reviewed"] = reviewed_intelligence["last_reviewed"] + refreshed_intelligence["review_count"] = reviewed_intelligence["review_count"] + refreshed_intelligence["access_count"] = reviewed_access_count + updates["intelligence"] = refreshed_intelligence + if intelligence_metadata.get("memory_management"): + updates["memory_management"] = intelligence_metadata["memory_management"] + updates["processing_applied"] = True + updates["last_reprocessed_at"] = review_time return updates, False except Exception as e: diff --git a/src/powermem/storage/adapter.py b/src/powermem/storage/adapter.py index 23e3efb3..5baeeb5e 100644 --- a/src/powermem/storage/adapter.py +++ b/src/powermem/storage/adapter.py @@ -18,6 +18,24 @@ class StorageAdapter: """Adapter that bridges VectorStoreBase interface with Memory class expectations.""" + + _PAYLOAD_RESERVED_KEYS = { + "id", + "data", + "content", + "fulltext_content", + "sparse_embedding", + "hash", + "created_at", + "updated_at", + "metadata", + "user_id", + "agent_id", + "run_id", + "actor_id", + "role", + "category", + } def __init__(self, vector_store: VectorStoreBase, embedding_service=None, sparse_embedder_service=None): """Initialize the adapter with a vector store and embedding service.""" @@ -33,6 +51,14 @@ def __init__(self, vector_store: VectorStoreBase, embedding_service=None, sparse # Ensure collection exists (will be created with actual vector size when first vector is added) # self.vector_store.create_col(self.collection_name, vector_size=1536, distance="cosine") + + def _extract_additional_payload_fields(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """Expose intelligent lifecycle fields that are stored at payload top level.""" + return { + key: value + for key, value in payload.items() + if key not in self._PAYLOAD_RESERVED_KEYS and value is not None + } def _generate_sparse_embedding(self, content: str, memory_action: str) -> Optional[Any]: """ @@ -268,6 +294,8 @@ def search_memories( **promoted_fields, # Add promoted fields at top level "metadata": user_metadata if user_metadata else {}, # Add user metadata } + + memory.update(self._extract_additional_payload_fields(payload)) # No need to apply filters here - filters are already applied at the database level # in vector_store.search(), so all returned results should already match the filters @@ -296,7 +324,9 @@ def get_memory( "metadata": result.payload.get("metadata", {}), "created_at": result.payload.get("created_at"), "updated_at": result.payload.get("updated_at"), + "category": result.payload.get("category"), } + memory.update(self._extract_additional_payload_fields(result.payload)) # Check access control if user_id and memory.get("user_id") != user_id: @@ -315,14 +345,16 @@ def get_memory( content = result.payload.get("data") or result.payload.get("content") or "" memory = { "id": result.id, - "content": content, + "content": content, "user_id": result.payload.get("user_id"), "agent_id": result.payload.get("agent_id"), "run_id": result.payload.get("run_id"), "metadata": result.payload.get("metadata", {}), "created_at": result.payload.get("created_at"), "updated_at": result.payload.get("updated_at"), + "category": result.payload.get("category"), } + memory.update(self._extract_additional_payload_fields(result.payload)) # Check access control if user_id and memory.get("user_id") != user_id: @@ -540,6 +572,7 @@ def get_all_memories( "created_at": created_at, "updated_at": updated_at, } + memory.update(self._extract_additional_payload_fields(payload)) # Apply filters (as double-check if database didn't filter) # Note: If filters were applied at database level, these will all pass diff --git a/tests/integration/test_storage_integration.py b/tests/integration/test_storage_integration.py index 499f5f20..8961c0e0 100644 --- a/tests/integration/test_storage_integration.py +++ b/tests/integration/test_storage_integration.py @@ -6,8 +6,9 @@ import pytest import uuid +import asyncio from unittest.mock import MagicMock, patch -from powermem import Memory +from powermem import AsyncMemory, Memory from powermem.storage.sqlite.sqlite_vector_store import SQLiteVectorStore @@ -50,6 +51,43 @@ def sqlite_memory(self): yield memory finally: patcher.stop() + + @pytest.fixture + def sqlite_intelligent_memory(self): + """Create a Memory instance with intelligent memory enabled.""" + config = { + "vector_store": { + "provider": "sqlite", + "config": { + "database_path": ":memory:", + "collection_name": f"test_collection_intelligent_{uuid.uuid4().hex[:8]}" + } + }, + "llm": { + "provider": "openai", + "config": { + "model": "gpt-4o-mini", + "api_key": "mock-key" + } + }, + "embedder": { + "provider": "mock", + "config": {} + }, + "intelligent_memory": { + "enabled": True + } + } + + patcher = patch('powermem.integrations.llm.factory.LLMFactory.create') + mock_llm_factory = patcher.start() + mock_llm_factory.return_value = MagicMock() + + try: + memory = Memory(config=config) + yield memory + finally: + patcher.stop() def test_sqlite_storage_initialization(self, sqlite_memory): """Test SQLite storage initialization.""" @@ -88,6 +126,94 @@ def test_sqlite_search_functionality(self, sqlite_memory): assert "results" in results assert len(results["results"]) > 0 + def test_intelligent_add_persists_and_exposes_lifecycle_fields(self, sqlite_intelligent_memory): + """infer=True adds should store and expose lifecycle fields.""" + memory = sqlite_intelligent_memory + user_id = "test_user_intelligent" + + with patch.object(memory, "_extract_facts", return_value=["Remember that Alice likes coffee"]), \ + patch.object( + memory, + "_decide_memory_actions", + return_value=[{"event": "ADD", "text": "Remember that Alice likes coffee"}], + ): + add_result = memory.add("Remember that Alice likes coffee", user_id=user_id) + + memory_id = add_result["results"][0]["id"] + retrieved = memory.get(memory_id, user_id=user_id) + assert retrieved is not None + assert "memory_type" in retrieved + assert "importance_score" in retrieved + assert "intelligence" in retrieved + assert "access_count" in retrieved + + listed = memory.get_all(user_id=user_id) + assert listed["results"] + assert "memory_type" in listed["results"][0] + assert "importance_score" in listed["results"][0] + + searched = memory.search("coffee", user_id=user_id) + assert searched["results"] + assert "memory_type" in searched["results"][0] + assert "importance_score" in searched["results"][0] + + def test_async_intelligent_add_persists_lifecycle_fields(self): + """Async infer=True adds should also persist lifecycle fields.""" + config = { + "vector_store": { + "provider": "sqlite", + "config": { + "database_path": ":memory:", + "collection_name": f"test_collection_async_intelligent_{uuid.uuid4().hex[:8]}" + } + }, + "llm": { + "provider": "openai", + "config": { + "model": "gpt-4o-mini", + "api_key": "mock-key" + } + }, + "embedder": { + "provider": "mock", + "config": {} + }, + "intelligent_memory": { + "enabled": True + } + } + + patcher = patch('powermem.integrations.llm.factory.LLMFactory.create') + mock_llm_factory = patcher.start() + mock_llm_factory.return_value = MagicMock() + + try: + async def run_test(): + memory = AsyncMemory(config=config) + + async def fake_extract(_messages): + return ["Remember that Bob likes tea"] + + async def fake_decide(_facts, _existing_memories, _user_id, _agent_id): + return [{"event": "ADD", "text": "Remember that Bob likes tea"}] + + memory._extract_facts = fake_extract + memory._decide_memory_actions = fake_decide + + add_result = await memory.add("Remember that Bob likes tea", user_id="async_user") + memory_id = add_result["results"][0]["id"] + retrieved = await memory.get(memory_id, user_id="async_user") + + assert retrieved is not None + assert "memory_type" in retrieved + assert "importance_score" in retrieved + assert "intelligence" in retrieved + assert "access_count" in retrieved + + asyncio.run(run_test()) + finally: + patcher.stop() + def test_sqlite_delete_functionality(self, sqlite_memory): """Test SQLite delete functionality.""" memory = sqlite_memory diff --git a/tests/unit/intelligence/test_ebbinghaus_plugin.py b/tests/unit/intelligence/test_ebbinghaus_plugin.py new file mode 100644 index 00000000..4c3c7741 --- /dev/null +++ b/tests/unit/intelligence/test_ebbinghaus_plugin.py @@ -0,0 +1,59 @@ +from datetime import timedelta + +from powermem.intelligence.ebbinghaus_algorithm import EbbinghausAlgorithm +from powermem.intelligence.plugin import EbbinghausIntelligencePlugin +from powermem.utils.utils import get_current_datetime + + +def _plugin_config() -> dict: + return { + "enabled": True, + "decay_rate": 0.1, + "working_threshold": 0.3, + "short_term_threshold": 0.6, + "long_term_threshold": 0.8, + } + + +def test_should_forget_uses_last_reviewed_before_created_at(): + algo = EbbinghausAlgorithm(_plugin_config()) + now = get_current_datetime() + stale_created_at = (now - timedelta(days=30)).isoformat() + recent_review = (now - timedelta(minutes=5)).isoformat() + + memory = { + "created_at": stale_created_at, + "updated_at": recent_review, + "intelligence": {"last_reviewed": recent_review}, + "access_count": 0, + } + + assert algo.should_forget(memory) is False + + +def test_on_get_promotes_recently_accessed_memory_before_forgetting(): + plugin = EbbinghausIntelligencePlugin(_plugin_config()) + now = get_current_datetime() + + memory = { + "content": "Remember this launch checklist", + "created_at": (now - timedelta(days=30)).isoformat(), + "updated_at": (now - timedelta(days=2)).isoformat(), + "memory_type": "working", + "importance_score": 0.1, + "access_count": 2, + "intelligence": { + "last_reviewed": (now - timedelta(minutes=5)).isoformat(), + "review_count": 2, + "access_count": 2, + }, + "metadata": {}, + } + + updates, delete_flag = plugin.on_get(memory) + + assert delete_flag is False + assert updates is not None + assert updates["memory_type"] == "short_term" + assert updates["access_count"] == 3 + assert updates["intelligence"]["review_count"] == 3