This repository has been archived by the owner on Oct 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtools.py
224 lines (171 loc) · 9.03 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import json
from typing import List, Optional
from phi.document import Document
from phi.tools import ToolRegistry
from phi.knowledge import AssistantKnowledge
from phi.vectordb.pgvector import PgVector2
from pdf_ai.knowledge import get_pdf_knowledge_base_for_user
from utils.log import logger
class PDFTools(ToolRegistry):
def __init__(self, user_id: str):
super().__init__(name="pdf_tools")
self.user_id = user_id
self.knowledge_base: AssistantKnowledge = get_pdf_knowledge_base_for_user(user_id=user_id)
self.register(self.get_latest_document_contents)
self.register(self.search_latest_document)
self.register(self.search_document)
self.register(self.get_document_contents)
def get_latest_document_contents(self, limit: int = 5000) -> Optional[str]:
"""Use this function to get the content of the latest document uploaded by the user.
Args:
limit (int): Maximum number of characters to return. Defaults to 5000.
Returns:
str: JSON string of the latest document
"""
logger.debug(f"Getting latest document for user {self.user_id}")
if self.knowledge_base.vector_db is None or not isinstance(self.knowledge_base.vector_db, PgVector2):
return "Sorry could not find latest document"
vector_db: PgVector2 = self.knowledge_base.vector_db
table = vector_db.table
with vector_db.Session() as session, session.begin():
query = session.query(table).order_by(table.c.created_at.desc()).limit(1)
result = session.execute(query)
row = result.fetchone()
if row is None:
return "Sorry could not find latest document"
latest_document_name = row.name
logger.debug(f"Latest document name: {latest_document_name}")
document_query = session.query(table).filter(table.c.name == latest_document_name)
document_result = session.execute(document_query)
document_rows = document_result.fetchall()
latest_document_content = ""
for document_row in document_rows:
document_content = document_row.content
latest_document_content += document_content
return latest_document_content[:limit]
def search_latest_document(self, query: str, num_documents: int = 5) -> Optional[str]:
"""Use this function to search the latest document uploaded by the user for a query.
Args:
query (str): Query to search for
num_documents (int): Number of documents to return. Defaults to 5.
Returns:
str: JSON string of the search results
"""
logger.debug(f"Searching latest document for query: {query}")
if self.knowledge_base.vector_db is None or not isinstance(self.knowledge_base.vector_db, PgVector2):
return "Sorry could not search latest document"
vector_db: PgVector2 = self.knowledge_base.vector_db
table = vector_db.table
latest_document_name = None
with vector_db.Session() as session, session.begin():
latest_document_query = session.query(table).order_by(table.c.created_at.desc()).limit(1)
result = session.execute(latest_document_query)
row = result.fetchone()
if row is None:
return "Sorry could not find latest document"
latest_document_name = row.name
logger.debug(f"Latest document name: {latest_document_name}")
if latest_document_name is None:
return "Sorry could not find latest document"
search_results: List[Document] = vector_db.search(
query=query, limit=num_documents, filters={"name": latest_document_name}
)
logger.debug(f"Search result: {search_results}")
if len(search_results) == 0:
return "Sorry could not find any results from latest document"
return json.dumps([doc.to_dict() for doc in search_results])
def get_document_names(self, limit: int = 20) -> Optional[str]:
"""Use this function to get the names of the documents uploaded by the user.
Args:
limit (int): Maximum number of documents to return. Defaults to 20.
Returns:
str: JSON string of the document names
"""
logger.debug("Getting all document names")
if self.knowledge_base.vector_db is None or not isinstance(self.knowledge_base.vector_db, PgVector2):
return None
vector_db: PgVector2 = self.knowledge_base.vector_db
table = vector_db.table
with vector_db.Session() as session, session.begin():
try:
query = session.query(table).distinct(table.c.name).limit(limit)
result = session.execute(query)
rows = result.fetchall()
if rows is None:
return "Sorry could not find any documents"
document_names = []
for row in rows:
document_name = row.name
document_names.append(document_name)
return json.dumps(document_names)
except Exception as e:
logger.error(f"Error getting document names: {e}")
return None
def search_document(self, query: str, document_name: str, num_documents: int = 5) -> Optional[str]:
"""Use this function to search the latest document uploaded by the user for a query.
Args:
query (str): Query to search for
document_name (str): Name of the document to search
num_documents (int): Number of documents to return. Defaults to 5.
Returns:
str: JSON string of the search results
"""
logger.debug(f"Searching document {document_name} for query: {query}")
if self.knowledge_base.vector_db is None or not isinstance(self.knowledge_base.vector_db, PgVector2):
return "Sorry could not search latest document"
search_results: List[Document] = self.knowledge_base.vector_db.search(
query=query, limit=num_documents, filters={"name": document_name}
)
logger.debug(f"Search result: {search_results}")
if len(search_results) == 0:
return "Sorry could not find any results from latest document"
return json.dumps([doc.to_dict() for doc in search_results])
def get_document_contents(self, document_name: str, limit: int = 5000) -> Optional[str]:
"""Use this function to get the content of the document with name=document_name.
Args:
document_name (str): Name of the document to get contents of
limit (int): Maximum number of characters to return. Defaults to 5000.
Returns:
str: JSON string of the document contents
"""
logger.debug(f"Getting document contents for user {document_name}")
if self.knowledge_base.vector_db is None or not isinstance(self.knowledge_base.vector_db, PgVector2):
return "Sorry could not find latest document"
vector_db: PgVector2 = self.knowledge_base.vector_db
table = vector_db.table
with vector_db.Session() as session, session.begin():
document_query = (
session.query(table).filter(table.c.name == document_name).order_by(table.c.created_at.desc())
)
document_result = session.execute(document_query)
document_rows = document_result.fetchall()
document_content = ""
for document_row in document_rows:
document_content += document_row.content
return document_content[:limit]
# def get_documents_with_intro_section(self) -> Optional[str]:
# """Use this function to get a quick introduction to the documents uploaded by the user.
# This function will return a dictionary of document names and their first 200 characters.
# Returns:
# str: JSON string of the document names and their first 200 characters
# """
# logger.debug("Getting document introduction")
# if self.knowledge_base.vector_db is None or not isinstance(self.knowledge_base.vector_db, PgVector2):
# return None
# vector_db: PgVector2 = self.knowledge_base.vector_db
# table = vector_db.table
# with vector_db.Session() as session, session.begin():
# try:
# query = select(table.c.name, table.c.meta_data, table.c.content).order_by(table.c.created_at)
# result = session.execute(query)
# rows = result.fetchall()
# if rows is None:
# return "Sorry could not find any documents"
# document_intro = {}
# for row in rows:
# document_name = row.name
# document_names.append(document_name)
# return document_names
# except Exception as e:
# logger.error(f"Error getting document names: {e}")
# return None