diff --git a/src/powermem/core/async_memory.py b/src/powermem/core/async_memory.py index d46d68eb..d6075a18 100644 --- a/src/powermem/core/async_memory.py +++ b/src/powermem/core/async_memory.py @@ -539,11 +539,13 @@ async def _simple_add_async( # enhanced_metadata = await self.intelligence.process_metadata_async(content, metadata) enhanced_metadata = metadata # Use original metadata without LLM evaluation - # Intelligent plugin annotations - extra_fields = {} + # Intelligent plugin annotations: merge into metadata for persistence if self._intelligence_plugin and self._intelligence_plugin.enabled: extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) - + if extra_fields: + if enhanced_metadata is None: + enhanced_metadata = {} + enhanced_metadata = {**enhanced_metadata, **extra_fields} # Generate content hash for deduplication content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() @@ -586,9 +588,6 @@ async def _simple_add_async( "updated_at": get_current_datetime(), } - if extra_fields: - memory_data.update(extra_fields) - memory_id = await self.storage.add_memory_async(memory_data) # Log audit event diff --git a/src/powermem/core/memory.py b/src/powermem/core/memory.py index bde203c6..d925e4fd 100644 --- a/src/powermem/core/memory.py +++ b/src/powermem/core/memory.py @@ -716,11 +716,15 @@ def _simple_add( # enhanced_metadata = self.intelligence.process_metadata(content, metadata) enhanced_metadata = metadata # Use original metadata without LLM evaluation - # Intelligent plugin annotations - extra_fields = {} + # Intelligent plugin annotations: merge into metadata so they are persisted + # in the metadata JSON column (OceanBase only saves the metadata column, + # not arbitrary top-level payload fields). if self._intelligence_plugin and self._intelligence_plugin.enabled: extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) - + if extra_fields: + if enhanced_metadata is None: + enhanced_metadata = {} + enhanced_metadata = {**enhanced_metadata, **extra_fields} # Generate content hash for deduplication content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() @@ -763,9 +767,6 @@ def _simple_add( "updated_at": get_current_datetime(), } - if extra_fields: - memory_data.update(extra_fields) - memory_id = self.storage.add_memory(memory_data) # Log audit event @@ -1094,7 +1095,15 @@ def _create_memory( # Process metadata # enhanced_metadata = self.intelligence.process_metadata(content, metadata) enhanced_metadata = metadata # Use original metadata without LLM evaluation - + + # Intelligent plugin annotations: merge into metadata for persistence + if self._intelligence_plugin and self._intelligence_plugin.enabled: + extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata) + if extra_fields: + if enhanced_metadata is None: + enhanced_metadata = {} + enhanced_metadata = {**enhanced_metadata, **extra_fields} + # Extract category from metadata; prefer explicit memory_type param category = "" if enhanced_metadata and isinstance(enhanced_metadata, dict): @@ -1109,7 +1118,7 @@ def _create_memory( enhanced_metadata = {**enhanced_metadata, "scope": scope} else: enhanced_metadata = {"scope": scope} - + # Generate content hash content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() diff --git a/src/powermem/intelligence/plugin.py b/src/powermem/intelligence/plugin.py index e526ae0b..c94f8391 100644 --- a/src/powermem/intelligence/plugin.py +++ b/src/powermem/intelligence/plugin.py @@ -122,43 +122,72 @@ def on_get(self, memory: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], bool if not self.enabled or not self._algo: return None, False try: + # Normalize: intelligence fields may be stored inside the metadata + # JSON column and not exposed at the top level of the memory dict. + meta = memory.get("metadata") or {} + memory_type = memory.get("memory_type") or meta.get("memory_type") + access_count_old = memory.get("access_count") + if access_count_old is None: + access_count_old = meta.get("access_count", 0) or 0 + importance_score = memory.get("importance_score") + if importance_score is None: + importance_score = meta.get("importance_score", 0.5) + if importance_score is None: + importance_score = 0.5 + + new_access_count = access_count_old + 1 updates: Dict[str, Any] = { - "access_count": (memory.get("access_count") or 0) + 1, + "access_count": new_access_count, "updated_at": get_current_datetime(), } - + # Track which fields need updating inside the metadata JSON column + meta_updates: Dict[str, Any] = {"access_count": new_access_count} + + # Provide normalized values to algorithm checks + normalized = { + **memory, + "memory_type": memory_type, + "access_count": access_count_old, + "importance_score": importance_score, + } + # Check if memory should be forgotten - if self._algo.should_forget(memory): + if self._algo.should_forget(normalized): return None, True - + # Check if memory should be promoted - if self._algo.should_promote(memory): - current = memory.get("memory_type") - if current == "working": + new_memory_type = memory_type + if self._algo.should_promote(normalized): + if memory_type == "working": + new_memory_type = "short_term" updates["memory_type"] = "short_term" - elif current == "short_term": + meta_updates["memory_type"] = "short_term" + elif memory_type == "short_term": + new_memory_type = "long_term" updates["memory_type"] = "long_term" - + meta_updates["memory_type"] = "long_term" + # Check if memory should be archived - if self._algo.should_archive(memory): - meta = memory.get("metadata") or {} - meta["archived"] = True - updates["metadata"] = meta - - # Re-process content if memory type changed or if it's been accessed multiple times - access_count = memory.get("access_count", 0) + 1 - if (updates.get("memory_type") != memory.get("memory_type") or - access_count % 5 == 0): # Re-process every 5 accesses - - original_content = memory.get("original_content") or memory.get("content", "") - importance_score = memory.get("importance_score", 0.5) - memory_type = updates.get("memory_type") or memory.get("memory_type", "working") - - # Re-process with updated parameters - intelligence_metadata = self._algo.process_memory_metadata(original_content, importance_score, memory_type) - updates.update(intelligence_metadata) + if self._algo.should_archive(normalized): + meta_updates["archived"] = True + + # Persist all metadata changes back to the metadata JSON column + updates["metadata"] = {**meta, **meta_updates} + + # Re-process content if memory type changed or accessed in multiples of 5 + if new_memory_type != memory_type or new_access_count % 5 == 0: + original_content = ( + memory.get("original_content") + or memory.get("content", "") + or memory.get("memory", "") + ) + intelligence_metadata = self._algo.process_memory_metadata( + original_content, importance_score, new_memory_type or "working" + ) + if "intelligence" in intelligence_metadata: + updates["metadata"]["intelligence"] = intelligence_metadata["intelligence"] updates["last_reprocessed_at"] = get_current_datetime() - + return updates, False except Exception as e: logger.warning(f"Failed to process memory in on_get: {e}")