Skip to content

Commit 82d621c

Browse files
authored
Merge branch 'main' into feature/websocket-streaming-api
2 parents 70200e4 + ad03ede commit 82d621c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1014
-557011
lines changed

Dockerfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@ WORKDIR /ragflow
1010
# Copy models downloaded via download_deps.py
1111
RUN mkdir -p /ragflow/rag/res/deepdoc /root/.ragflow
1212
RUN --mount=type=bind,from=infiniflow/ragflow_deps:latest,source=/huggingface.co,target=/huggingface.co \
13-
cp /huggingface.co/InfiniFlow/huqie/huqie.txt.trie /ragflow/rag/res/ && \
1413
tar --exclude='.*' -cf - \
1514
/huggingface.co/InfiniFlow/text_concat_xgb_v1.0 \
1615
/huggingface.co/InfiniFlow/deepdoc \
17-
| tar -xf - --strip-components=3 -C /ragflow/rag/res/deepdoc
16+
| tar -xf - --strip-components=3 -C /ragflow/rag/res/deepdoc
1817

1918
# https://github.com/chrismattmann/tika-python
2019
# This is the only way to run python-tika without internet access. Without this set, the default is to check the tika version and pull latest every time from Apache.

agent/canvas.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ def __init__(self, dsl: str, tenant_id=None, task_id=None):
9191
def load(self):
9292
self.components = self.dsl["components"]
9393
cpn_nms = set([])
94-
for k, cpn in self.components.items():
95-
cpn_nms.add(cpn["obj"]["component_name"])
96-
9794
for k, cpn in self.components.items():
9895
cpn_nms.add(cpn["obj"]["component_name"])
9996
param = component_class(cpn["obj"]["component_name"] + "Param")()

agent/component/agent_with_tools.py

Lines changed: 47 additions & 192 deletions
Large diffs are not rendered by default.

agent/component/llm.py

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ def delta(txt):
327327
self.set_output("content", answer)
328328

329329
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60)))
330-
def _invoke(self, **kwargs):
330+
async def _invoke_async(self, **kwargs):
331331
if self.check_if_canceled("LLM processing"):
332332
return
333333

@@ -338,22 +338,25 @@ def clean_formated_answer(ans: str) -> str:
338338

339339
prompt, msg, _ = self._prepare_prompt_variables()
340340
error: str = ""
341-
output_structure=None
341+
output_structure = None
342342
try:
343-
output_structure = self._param.outputs['structured']
343+
output_structure = self._param.outputs["structured"]
344344
except Exception:
345345
pass
346346
if output_structure and isinstance(output_structure, dict) and output_structure.get("properties") and len(output_structure["properties"]) > 0:
347-
schema=json.dumps(output_structure, ensure_ascii=False, indent=2)
348-
prompt += structured_output_prompt(schema)
349-
for _ in range(self._param.max_retries+1):
347+
schema = json.dumps(output_structure, ensure_ascii=False, indent=2)
348+
prompt_with_schema = prompt + structured_output_prompt(schema)
349+
for _ in range(self._param.max_retries + 1):
350350
if self.check_if_canceled("LLM processing"):
351351
return
352352

353-
_, msg = message_fit_in([{"role": "system", "content": prompt}, *msg], int(self.chat_mdl.max_length * 0.97))
353+
_, msg_fit = message_fit_in(
354+
[{"role": "system", "content": prompt_with_schema}, *deepcopy(msg)],
355+
int(self.chat_mdl.max_length * 0.97),
356+
)
354357
error = ""
355-
ans = self._generate(msg)
356-
msg.pop(0)
358+
ans = await self._generate_async(msg_fit)
359+
msg_fit.pop(0)
357360
if ans.find("**ERROR**") >= 0:
358361
logging.error(f"LLM response error: {ans}")
359362
error = ans
@@ -362,26 +365,31 @@ def clean_formated_answer(ans: str) -> str:
362365
self.set_output("structured", json_repair.loads(clean_formated_answer(ans)))
363366
return
364367
except Exception:
365-
msg.append({"role": "user", "content": "The answer can't not be parsed as JSON"})
368+
msg_fit.append({"role": "user", "content": "The answer can't not be parsed as JSON"})
366369
error = "The answer can't not be parsed as JSON"
367370
if error:
368371
self.set_output("_ERROR", error)
369372
return
370373

371374
downstreams = self._canvas.get_component(self._id)["downstream"] if self._canvas.get_component(self._id) else []
372375
ex = self.exception_handler()
373-
if any([self._canvas.get_component_obj(cid).component_name.lower()=="message" for cid in downstreams]) and not (ex and ex["goto"]):
374-
self.set_output("content", partial(self._stream_output_async, prompt, msg))
376+
if any([self._canvas.get_component_obj(cid).component_name.lower() == "message" for cid in downstreams]) and not (
377+
ex and ex["goto"]
378+
):
379+
self.set_output("content", partial(self._stream_output_async, prompt, deepcopy(msg)))
375380
return
376381

377-
for _ in range(self._param.max_retries+1):
382+
error = ""
383+
for _ in range(self._param.max_retries + 1):
378384
if self.check_if_canceled("LLM processing"):
379385
return
380386

381-
_, msg = message_fit_in([{"role": "system", "content": prompt}, *msg], int(self.chat_mdl.max_length * 0.97))
387+
_, msg_fit = message_fit_in(
388+
[{"role": "system", "content": prompt}, *deepcopy(msg)], int(self.chat_mdl.max_length * 0.97)
389+
)
382390
error = ""
383-
ans = self._generate(msg)
384-
msg.pop(0)
391+
ans = await self._generate_async(msg_fit)
392+
msg_fit.pop(0)
385393
if ans.find("**ERROR**") >= 0:
386394
logging.error(f"LLM response error: {ans}")
387395
error = ans
@@ -395,23 +403,9 @@ def clean_formated_answer(ans: str) -> str:
395403
else:
396404
self.set_output("_ERROR", error)
397405

398-
def _stream_output(self, prompt, msg):
399-
_, msg = message_fit_in([{"role": "system", "content": prompt}, *msg], int(self.chat_mdl.max_length * 0.97))
400-
answer = ""
401-
for ans in self._generate_streamly(msg):
402-
if self.check_if_canceled("LLM streaming"):
403-
return
404-
405-
if ans.find("**ERROR**") >= 0:
406-
if self.get_exception_default_value():
407-
self.set_output("content", self.get_exception_default_value())
408-
yield self.get_exception_default_value()
409-
else:
410-
self.set_output("_ERROR", ans)
411-
return
412-
yield ans
413-
answer += ans
414-
self.set_output("content", answer)
406+
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60)))
407+
def _invoke(self, **kwargs):
408+
return asyncio.run(self._invoke_async(**kwargs))
415409

416410
def add_memory(self, user:str, assist:str, func_name: str, params: dict, results: str, user_defined_prompt:dict={}):
417411
summ = tool_call_summary(self.chat_mdl, func_name, params, results, user_defined_prompt)

agent/tools/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,19 @@ def __init__(self, tools_map: dict[str, object], callback: partial):
4949
self.callback = callback
5050

5151
def tool_call(self, name: str, arguments: dict[str, Any]) -> Any:
52+
return asyncio.run(self.tool_call_async(name, arguments))
53+
54+
async def tool_call_async(self, name: str, arguments: dict[str, Any]) -> Any:
5255
assert name in self.tools_map, f"LLM tool {name} does not exist"
5356
st = timer()
5457
tool_obj = self.tools_map[name]
5558
if isinstance(tool_obj, MCPToolCallSession):
56-
resp = tool_obj.tool_call(name, arguments, 60)
59+
resp = await asyncio.to_thread(tool_obj.tool_call, name, arguments, 60)
5760
else:
5861
if hasattr(tool_obj, "invoke_async") and asyncio.iscoroutinefunction(tool_obj.invoke_async):
59-
resp = asyncio.run(tool_obj.invoke_async(**arguments))
62+
resp = await tool_obj.invoke_async(**arguments)
6063
else:
61-
resp = asyncio.run(asyncio.to_thread(tool_obj.invoke, **arguments))
64+
resp = await asyncio.to_thread(tool_obj.invoke, **arguments)
6265

6366
self.callback(name, arguments, resp, elapsed_time=timer()-st)
6467
return resp

api/apps/sdk/doc.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from api.db.services.knowledgebase_service import KnowledgebaseService
3434
from api.db.services.llm_service import LLMBundle
3535
from api.db.services.tenant_llm_service import TenantLLMService
36-
from api.db.services.task_service import TaskService, queue_tasks
36+
from api.db.services.task_service import TaskService, queue_tasks, cancel_all_task_of
3737
from api.db.services.dialog_service import meta_filter, convert_conditions
3838
from api.utils.api_utils import check_duplicate_ids, construct_json_result, get_error_data_result, get_parser_config, get_result, server_error_response, token_required, \
3939
get_request_json
@@ -321,9 +321,7 @@ async def update_doc(tenant_id, dataset_id, document_id):
321321
try:
322322
if not DocumentService.update_by_id(doc.id, {"status": str(status)}):
323323
return get_error_data_result(message="Database error (Document update)!")
324-
325324
settings.docStoreConn.update({"doc_id": doc.id}, {"available_int": status}, search.index_name(kb.tenant_id), doc.kb_id)
326-
return get_result(data=True)
327325
except Exception as e:
328326
return server_error_response(e)
329327

@@ -350,12 +348,10 @@ async def update_doc(tenant_id, dataset_id, document_id):
350348
}
351349
renamed_doc = {}
352350
for key, value in doc.to_dict().items():
353-
if key == "run":
354-
renamed_doc["run"] = run_mapping.get(str(value))
355351
new_key = key_mapping.get(key, key)
356352
renamed_doc[new_key] = value
357353
if key == "run":
358-
renamed_doc["run"] = run_mapping.get(value)
354+
renamed_doc["run"] = run_mapping.get(str(value))
359355

360356
return get_result(data=renamed_doc)
361357

@@ -839,6 +835,8 @@ async def stop_parsing(tenant_id, dataset_id):
839835
return get_error_data_result(message=f"You don't own the document {id}.")
840836
if int(doc[0].progress) == 1 or doc[0].progress == 0:
841837
return get_error_data_result("Can't stop parsing document with progress at 0 or 1")
838+
# Send cancellation signal via Redis to stop background task
839+
cancel_all_task_of(id)
842840
info = {"run": "2", "progress": 0, "chunk_num": 0}
843841
DocumentService.update_by_id(id, info)
844842
settings.docStoreConn.delete({"doc_id": doc[0].id}, search.index_name(tenant_id), dataset_id)

common/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ class Storage(Enum):
148148
AWS_S3 = 4
149149
OSS = 5
150150
OPENDAL = 6
151+
GCS = 7
151152

152153
# environment
153154
# ENV_STRONG_TEST_COUNT = "STRONG_TEST_COUNT"

common/settings.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import rag.utils.opensearch_conn
3232
from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob
3333
from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob
34+
from rag.utils.gcs_conn import RAGFlowGCS
3435
from rag.utils.minio_conn import RAGFlowMinio
3536
from rag.utils.opendal_conn import OpenDALStorage
3637
from rag.utils.s3_conn import RAGFlowS3
@@ -109,6 +110,7 @@
109110
OB = {}
110111
OSS = {}
111112
OS = {}
113+
GCS = {}
112114

113115
DOC_MAXIMUM_SIZE: int = 128 * 1024 * 1024
114116
DOC_BULK_SIZE: int = 4
@@ -151,7 +153,8 @@ class StorageFactory:
151153
Storage.AZURE_SAS: RAGFlowAzureSasBlob,
152154
Storage.AWS_S3: RAGFlowS3,
153155
Storage.OSS: RAGFlowOSS,
154-
Storage.OPENDAL: OpenDALStorage
156+
Storage.OPENDAL: OpenDALStorage,
157+
Storage.GCS: RAGFlowGCS,
155158
}
156159

157160
@classmethod
@@ -250,7 +253,7 @@ def init_settings():
250253
else:
251254
raise Exception(f"Not supported doc engine: {DOC_ENGINE}")
252255

253-
global AZURE, S3, MINIO, OSS
256+
global AZURE, S3, MINIO, OSS, GCS
254257
if STORAGE_IMPL_TYPE in ['AZURE_SPN', 'AZURE_SAS']:
255258
AZURE = get_base_config("azure", {})
256259
elif STORAGE_IMPL_TYPE == 'AWS_S3':
@@ -259,6 +262,8 @@ def init_settings():
259262
MINIO = decrypt_database_config(name="minio")
260263
elif STORAGE_IMPL_TYPE == 'OSS':
261264
OSS = get_base_config("oss", {})
265+
elif STORAGE_IMPL_TYPE == 'GCS':
266+
GCS = get_base_config("gcs", {})
262267

263268
global STORAGE_IMPL
264269
STORAGE_IMPL = StorageFactory.create(Storage[STORAGE_IMPL_TYPE])

conf/service_conf.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ user_default_llm:
6060
# access_key: 'access_key'
6161
# secret_key: 'secret_key'
6262
# region: 'region'
63+
#gcs:
64+
# bucket: 'bridgtl-edm-d-bucket-ragflow'
6365
# oss:
6466
# access_key: 'access_key'
6567
# secret_key: 'secret_key'

deepdoc/parser/figure_parser.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626

2727
def vision_figure_parser_figure_data_wrapper(figures_data_without_positions):
28+
if not figures_data_without_positions:
29+
return []
2830
return [
2931
(
3032
(figure_data[1], [figure_data[0]]),
@@ -35,7 +37,9 @@ def vision_figure_parser_figure_data_wrapper(figures_data_without_positions):
3537
]
3638

3739

38-
def vision_figure_parser_docx_wrapper(sections,tbls,callback=None,**kwargs):
40+
def vision_figure_parser_docx_wrapper(sections, tbls, callback=None,**kwargs):
41+
if not tbls:
42+
return []
3943
try:
4044
vision_model = LLMBundle(kwargs["tenant_id"], LLMType.IMAGE2TEXT)
4145
callback(0.7, "Visual model detected. Attempting to enhance figure extraction...")
@@ -53,6 +57,8 @@ def vision_figure_parser_docx_wrapper(sections,tbls,callback=None,**kwargs):
5357

5458

5559
def vision_figure_parser_pdf_wrapper(tbls, callback=None, **kwargs):
60+
if not tbls:
61+
return []
5662
try:
5763
vision_model = LLMBundle(kwargs["tenant_id"], LLMType.IMAGE2TEXT)
5864
callback(0.7, "Visual model detected. Attempting to enhance figure extraction...")

0 commit comments

Comments
 (0)