1
+ from mem0 import Memory
2
+ import logging
3
+ import json
4
+ import os
5
+ import time
6
+ from datetime import datetime
7
+ from typing import Dict , Any , Optional
8
+
9
+ # Configure logging
10
+ logging .basicConfig (
11
+ level = logging .INFO ,
12
+ format = '%(asctime)s - %(levelname)s - %(message)s'
13
+ )
14
+ logger = logging .getLogger (__name__ )
15
+
16
+ class MemoryManager :
17
+ def __init__ (self ):
18
+ """Initialize Mem0 Memory manager with configuration"""
19
+ # Define the complete configuration with only supported fields
20
+ config = {
21
+ "version" : "v1.1" ,
22
+ "custom_prompt" : None ,
23
+ "vector_store" : {
24
+ "provider" : "chroma" ,
25
+ "config" : {
26
+ "collection_name" : "praison" ,
27
+ "path" : os .path .abspath ("./.praison/storage" ),
28
+ "host" : None ,
29
+ "port" : None
30
+ }
31
+ },
32
+ "embedder" : {
33
+ "provider" : "openai" ,
34
+ "config" : {
35
+ "model" : "text-embedding-3-small" ,
36
+ "embedding_dims" : 1536
37
+ }
38
+ },
39
+ "llm" : {
40
+ "provider" : "openai" ,
41
+ "config" : {
42
+ "model" : "gpt-4o-mini" ,
43
+ "temperature" : 0 ,
44
+ "max_tokens" : 1000
45
+ }
46
+ }
47
+ }
48
+
49
+ # Ensure storage directory exists
50
+ storage_path = os .path .abspath ("./.praison/storage" )
51
+ if os .path .exists (storage_path ):
52
+ logger .info (f"Clearing existing storage at { storage_path } " )
53
+ import shutil
54
+ shutil .rmtree (storage_path )
55
+ os .makedirs (storage_path , exist_ok = True )
56
+ logger .info (f"Created storage path: { storage_path } " )
57
+
58
+ # Initialize memory
59
+ self .memory = Memory .from_config (config )
60
+ logger .info ("Memory manager initialized" )
61
+
62
+ def pretty_print (self , data : Any ) -> None :
63
+ """Helper function to print memory data in a readable format"""
64
+ if isinstance (data , (dict , list )):
65
+ print (json .dumps (data , indent = 2 ))
66
+ else :
67
+ print (data )
68
+
69
+ def extract_memory_id (self , result : Any ) -> Optional [str ]:
70
+ """Extract memory ID from the result, handling different formats"""
71
+ try :
72
+ if isinstance (result , dict ):
73
+ # v1.1 format
74
+ memories = result .get ('results' , [])
75
+ return memories [0 ]['id' ] if memories else None
76
+ elif isinstance (result , list ):
77
+ # v1.0 format
78
+ return result [0 ]['id' ] if result else None
79
+ return None
80
+ except (KeyError , IndexError ) as e :
81
+ logger .warning (f"Could not extract memory ID from result: { e } " )
82
+ return None
83
+
84
+ def prepare_metadata (self , metadata : Optional [Dict ]) -> Dict :
85
+ """Prepare metadata by converting values to acceptable types"""
86
+ if not metadata :
87
+ return {}
88
+
89
+ clean_metadata = {}
90
+ for key , value in metadata .items ():
91
+ if isinstance (value , (str , int , float , bool )):
92
+ clean_metadata [key ] = value
93
+ elif isinstance (value , list ):
94
+ clean_metadata [key ] = ',' .join (map (str , value ))
95
+ elif isinstance (value , dict ):
96
+ clean_metadata [key ] = json .dumps (value )
97
+ else :
98
+ clean_metadata [key ] = str (value )
99
+ return clean_metadata
100
+
101
+ def verify_memory_addition (self , user_id : str , max_retries : int = 3 ) -> bool :
102
+ """Verify that memory was added successfully"""
103
+ for attempt in range (max_retries ):
104
+ try :
105
+ time .sleep (0.5 ) # Brief pause to allow indexing
106
+ memories = self .memory .get_all (user_id = user_id )
107
+ if len (memories .get ('results' , [])) > 0 :
108
+ return True
109
+ logger .warning (f"Memory not found on attempt { attempt + 1 } " )
110
+ except Exception as e :
111
+ logger .warning (f"Verification attempt { attempt + 1 } failed: { e } " )
112
+ if attempt < max_retries - 1 :
113
+ time .sleep (1 )
114
+ return False
115
+
116
+ def add_memory (self , text : str , user_id : Optional [str ] = None , metadata : Optional [Dict ] = None ) -> Dict :
117
+ """Add a new memory with error handling and verification"""
118
+ try :
119
+ logger .info (f"Adding memory for user { user_id } " )
120
+ clean_metadata = self .prepare_metadata (metadata )
121
+ logger .debug (f"Prepared metadata: { clean_metadata } " )
122
+
123
+ messages = [{"role" : "user" , "content" : text }]
124
+ result = self .memory .add (
125
+ messages = messages ,
126
+ user_id = user_id ,
127
+ metadata = clean_metadata
128
+ )
129
+
130
+ if self .verify_memory_addition (user_id ):
131
+ logger .info ("Memory added and verified successfully" )
132
+ else :
133
+ logger .warning ("Memory addition could not be verified" )
134
+
135
+ return result
136
+
137
+ except Exception as e :
138
+ logger .error (f"Failed to add memory: { str (e )} " )
139
+ raise
140
+
141
+ def get_all_memories (self , user_id : Optional [str ] = None ) -> Dict :
142
+ """Retrieve all memories for a user"""
143
+ try :
144
+ logger .info (f"Retrieving all memories for user { user_id } " )
145
+ memories = self .memory .get_all (user_id = user_id )
146
+ count = len (memories .get ('results' , []))
147
+ logger .info (f"Retrieved { count } memories" )
148
+ return memories
149
+ except Exception as e :
150
+ logger .error (f"Failed to retrieve memories: { str (e )} " )
151
+ raise
152
+
153
+ def search_memories (self , query : str , user_id : Optional [str ] = None , limit : int = 5 ) -> Dict :
154
+ """Search memories with a query"""
155
+ try :
156
+ logger .info (f"Searching memories with query: { query } " )
157
+ results = self .memory .search (
158
+ query = query ,
159
+ user_id = user_id ,
160
+ limit = limit
161
+ )
162
+ count = len (results .get ('results' , []))
163
+ logger .info (f"Found { count } matching memories" )
164
+ return results
165
+ except Exception as e :
166
+ logger .error (f"Search failed: { str (e )} " )
167
+ raise
168
+
169
+ def main ():
170
+ try :
171
+ # Initialize memory manager
172
+ manager = MemoryManager ()
173
+
174
+ # Add a test memory
175
+ print ("\n 1. Adding a new memory:" )
176
+ metadata = {
177
+ "category" : "hobbies" ,
178
+ "timestamp" : datetime .now ().isoformat (),
179
+ "tags" : "tennis,learning" ,
180
+ "priority" : 1 ,
181
+ "source" : "user_input"
182
+ }
183
+
184
+ result = manager .add_memory (
185
+ text = "I am working on improving my tennis skills. Suggest some online courses." ,
186
+ user_id = "alice" ,
187
+ metadata = metadata
188
+ )
189
+ manager .pretty_print (result )
190
+
191
+ # Wait briefly for indexing
192
+ time .sleep (1 )
193
+
194
+ print ("\n 2. Retrieving all memories:" )
195
+ all_memories = manager .get_all_memories (user_id = "alice" )
196
+ manager .pretty_print (all_memories )
197
+
198
+ print ("\n 3. Searching memories:" )
199
+ search_results = manager .search_memories (
200
+ query = "What are Alice's hobbies?" ,
201
+ user_id = "alice" ,
202
+ limit = 5
203
+ )
204
+ manager .pretty_print (search_results )
205
+
206
+ # Print summary
207
+ print ("\n Memory Store Summary:" )
208
+ total_memories = len (all_memories .get ('results' , []))
209
+ print (f"Total memories stored: { total_memories } " )
210
+ if total_memories > 0 :
211
+ print ("Available memories:" )
212
+ for mem in all_memories ['results' ]:
213
+ print (f"- { mem .get ('memory' , 'No content' )} " )
214
+
215
+ except Exception as e :
216
+ logger .error (f"Main process failed: { str (e )} " , exc_info = True )
217
+ raise
218
+
219
+ if __name__ == "__main__" :
220
+ main ()
0 commit comments