1+ import json
2+ import numpy as np
3+ import faiss
4+ from collections import deque
5+ try :
6+ from .utils import get_timestamp , get_embedding , normalize_vector , ensure_directory_exists
7+ except ImportError :
8+ from utils import get_timestamp , get_embedding , normalize_vector , ensure_directory_exists
9+
10+ class LongTermMemory :
11+ def __init__ (self , file_path , knowledge_capacity = 100 , embedding_model_name : str = "all-MiniLM-L6-v2" , embedding_model_kwargs : dict = None ):
12+ self .file_path = file_path
13+ ensure_directory_exists (self .file_path )
14+ self .knowledge_capacity = knowledge_capacity
15+ self .user_profiles = {} # {user_id: {data: "profile_string", "last_updated": "timestamp"}}
16+ # Use deques for knowledge bases to easily manage capacity
17+ self .knowledge_base = deque (maxlen = self .knowledge_capacity ) # For general/user private knowledge
18+ self .assistant_knowledge = deque (maxlen = self .knowledge_capacity ) # For assistant specific knowledge
19+
20+ self .embedding_model_name = embedding_model_name
21+ self .embedding_model_kwargs = embedding_model_kwargs if embedding_model_kwargs is not None else {}
22+ self .load ()
23+
24+ def update_user_profile (self , user_id , new_data , merge = True ):
25+ if merge and user_id in self .user_profiles and self .user_profiles [user_id ].get ("data" ): # Check if data exists
26+ current_data = self .user_profiles [user_id ]["data" ]
27+ if isinstance (current_data , str ) and isinstance (new_data , str ):
28+ updated_data = f"{ current_data } \n \n --- Updated on { get_timestamp ()} ---\n { new_data } "
29+ else : # Fallback to overwrite if types are not strings or for more complex merge
30+ updated_data = new_data
31+ else :
32+ # If merge=False or no existing data, replace with new data
33+ updated_data = new_data
34+
35+ self .user_profiles [user_id ] = {
36+ "data" : updated_data ,
37+ "last_updated" : get_timestamp ()
38+ }
39+ print (f"LongTermMemory: Updated user profile for { user_id } (merge={ merge } )." )
40+ self .save ()
41+
42+ def get_raw_user_profile (self , user_id ):
43+ return self .user_profiles .get (user_id , {}).get ("data" , "None" ) # Return "None" string if not found
44+
45+ def get_user_profile_data (self , user_id ):
46+ return self .user_profiles .get (user_id , {})
47+
48+ def add_knowledge_entry (self , knowledge_text , knowledge_deque : deque , type_name = "knowledge" ):
49+ if not knowledge_text or knowledge_text .strip ().lower () in ["" , "none" , "- none" , "- none." ]:
50+ print (f"LongTermMemory: Empty { type_name } received, not saving." )
51+ return
52+
53+ # If deque is full, the oldest item is automatically removed when appending.
54+ vec = get_embedding (
55+ knowledge_text ,
56+ model_name = self .embedding_model_name ,
57+ ** self .embedding_model_kwargs
58+ )
59+ vec = normalize_vector (vec ).tolist ()
60+ entry = {
61+ "knowledge" : knowledge_text ,
62+ "timestamp" : get_timestamp (),
63+ "knowledge_embedding" : vec
64+ }
65+ knowledge_deque .append (entry )
66+ print (f"LongTermMemory: Added { type_name } . Current count: { len (knowledge_deque )} ." )
67+ self .save ()
68+
69+ def add_user_knowledge (self , knowledge_text ):
70+ self .add_knowledge_entry (knowledge_text , self .knowledge_base , "user knowledge" )
71+
72+ def add_assistant_knowledge (self , knowledge_text ):
73+ self .add_knowledge_entry (knowledge_text , self .assistant_knowledge , "assistant knowledge" )
74+
75+ def get_user_knowledge (self ):
76+ return list (self .knowledge_base )
77+
78+ def get_assistant_knowledge (self ):
79+ return list (self .assistant_knowledge )
80+
81+ def _search_knowledge_deque (self , query , knowledge_deque : deque , threshold = 0.1 , top_k = 5 ):
82+ if not knowledge_deque :
83+ return []
84+
85+ query_vec = get_embedding (
86+ query ,
87+ model_name = self .embedding_model_name ,
88+ ** self .embedding_model_kwargs
89+ )
90+ query_vec = normalize_vector (query_vec )
91+
92+ embeddings = []
93+ valid_entries = []
94+ for entry in knowledge_deque :
95+ if "knowledge_embedding" in entry and entry ["knowledge_embedding" ]:
96+ embeddings .append (np .array (entry ["knowledge_embedding" ], dtype = np .float32 ))
97+ valid_entries .append (entry )
98+ else :
99+ print (f"Warning: Entry without embedding found in knowledge_deque: { entry .get ('knowledge' ,'N/A' )[:50 ]} " )
100+
101+ if not embeddings :
102+ return []
103+
104+ embeddings_np = np .array (embeddings , dtype = np .float32 )
105+ if embeddings_np .ndim == 1 : # Single item case
106+ if embeddings_np .shape [0 ] == 0 : return [] # Empty embeddings
107+ embeddings_np = embeddings_np .reshape (1 , - 1 )
108+
109+ if embeddings_np .shape [0 ] == 0 : # No valid embeddings
110+ return []
111+
112+ dim = embeddings_np .shape [1 ]
113+ index = faiss .IndexFlatIP (dim ) # Using Inner Product for similarity
114+ index .add (embeddings_np )
115+
116+ query_arr = np .array ([query_vec ], dtype = np .float32 )
117+ distances , indices = index .search (query_arr , min (top_k , len (valid_entries ))) # Search at most k or length of valid_entries
118+
119+ results = []
120+ for i , idx in enumerate (indices [0 ]):
121+ if idx != - 1 : # faiss returns -1 for no valid index
122+ similarity_score = float (distances [0 ][i ]) # For IndexFlatIP, distance is the dot product (similarity)
123+ if similarity_score >= threshold :
124+ results .append (valid_entries [idx ]) # Add the original entry dict
125+
126+ # Sort by similarity score descending before returning, as faiss might not guarantee order for IP
127+ results .sort (key = lambda x : float (np .dot (np .array (x ["knowledge_embedding" ], dtype = np .float32 ), query_vec )), reverse = True )
128+ return results
129+
130+ def search_user_knowledge (self , query , threshold = 0.1 , top_k = 5 ):
131+ results = self ._search_knowledge_deque (query , self .knowledge_base , threshold , top_k )
132+ print (f"LongTermMemory: Searched user knowledge for '{ query [:30 ]} ...'. Found { len (results )} matches." )
133+ return results
134+
135+ def search_assistant_knowledge (self , query , threshold = 0.1 , top_k = 5 ):
136+ results = self ._search_knowledge_deque (query , self .assistant_knowledge , threshold , top_k )
137+ print (f"LongTermMemory: Searched assistant knowledge for '{ query [:30 ]} ...'. Found { len (results )} matches." )
138+ return results
139+
140+ def save (self ):
141+ data = {
142+ "user_profiles" : self .user_profiles ,
143+ "knowledge_base" : list (self .knowledge_base ), # Convert deques to lists for JSON serialization
144+ "assistant_knowledge" : list (self .assistant_knowledge )
145+ }
146+ try :
147+ with open (self .file_path , "w" , encoding = "utf-8" ) as f :
148+ json .dump (data , f , ensure_ascii = False , indent = 2 )
149+ except IOError as e :
150+ print (f"Error saving LongTermMemory to { self .file_path } : { e } " )
151+
152+ def load (self ):
153+ try :
154+ with open (self .file_path , "r" , encoding = "utf-8" ) as f :
155+ data = json .load (f )
156+ self .user_profiles = data .get ("user_profiles" , {})
157+ # Load into deques, respecting maxlen
158+ kb_data = data .get ("knowledge_base" , [])
159+ self .knowledge_base = deque (kb_data , maxlen = self .knowledge_capacity )
160+
161+ ak_data = data .get ("assistant_knowledge" , [])
162+ self .assistant_knowledge = deque (ak_data , maxlen = self .knowledge_capacity )
163+
164+ print (f"LongTermMemory: Loaded from { self .file_path } ." )
165+ except FileNotFoundError :
166+ print (f"LongTermMemory: No history file found at { self .file_path } . Initializing new memory." )
167+ except json .JSONDecodeError :
168+ print (f"LongTermMemory: Error decoding JSON from { self .file_path } . Initializing new memory." )
169+ except Exception as e :
170+ print (f"LongTermMemory: An unexpected error occurred during load from { self .file_path } : { e } . Initializing new memory." )
0 commit comments