From dce7f2088cd96a9dc03e1b5e57c2b3e2e8ddf172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BAlio=20Almeida?= Date: Fri, 7 Jun 2024 16:00:59 +0100 Subject: [PATCH 1/2] Concurrent Tesseract and queue added. --- .../document_loader_tesseract.py | 84 +++++++++++++++---- tests/document_loader_tesseract.py | 8 ++ 2 files changed, 74 insertions(+), 18 deletions(-) diff --git a/extract_thinker/document_loader/document_loader_tesseract.py b/extract_thinker/document_loader/document_loader_tesseract.py index 6c87af3..2411d38 100644 --- a/extract_thinker/document_loader/document_loader_tesseract.py +++ b/extract_thinker/document_loader/document_loader_tesseract.py @@ -1,6 +1,7 @@ from io import BytesIO from operator import attrgetter import os +import threading from typing import Any, List, Union from PIL import Image import pytesseract @@ -10,9 +11,9 @@ from cachetools import cachedmethod from cachetools.keys import hashkey -import concurrent.futures +from queue import Queue -SUPPORTED_IMAGE_FORMATS = ["jpeg", "png", "bmp", "tiff"] +SUPPORTED_IMAGE_FORMATS = ["jpeg", "png", "bmp", "tiff", "pdf"] class DocumentLoaderTesseract(CachedDocumentLoader): @@ -20,7 +21,6 @@ def __init__(self, tesseract_cmd, isContainer=False, content=None, cache_ttl=300 super().__init__(content, cache_ttl) self.tesseract_cmd = tesseract_cmd if isContainer: - # docker path to tesseract self.tesseract_cmd = os.environ.get("TESSERACT_PATH", "tesseract") pytesseract.pytesseract.tesseract_cmd = self.tesseract_cmd if not os.path.isfile(self.tesseract_cmd): @@ -54,35 +54,83 @@ def load_content_from_stream(self, stream: Union[BytesIO, str]) -> Union[str, ob except Exception as e: raise Exception(f"Error processing stream: {e}") from e - def process_image(self, image): + def process_image(self, image: BytesIO) -> str: for attempt in range(3): - raw_text = str(pytesseract.image_to_string(Image.open(BytesIO(image)))) - if raw_text: - return raw_text - raise Exception("Failed to process image after 3 attempts") + try: + raw_text = str(pytesseract.image_to_string(Image.open(image))) + if raw_text: + return raw_text + except Exception as e: + if attempt == 2: + raise Exception(f"Failed to process image after 3 attempts: {e}") + return "" + + def worker(self, input_queue: Queue, output_queue: Queue): + while True: + image = input_queue.get() + if image is None: # Sentinel to indicate shutdown + break + try: + text = self.process_image(image) + output_queue.put((image, text)) + except Exception as e: + output_queue.put((image, str(e))) + input_queue.task_done() - @cachedmethod(cache=attrgetter('cache'), key=lambda self, stream: hashkey(id(stream))) def load_content_from_stream_list(self, stream: BytesIO) -> List[Any]: images = self.convert_to_images(stream) + input_queue = Queue() + output_queue = Queue() + + for img in images.values(): + input_queue.put(BytesIO(img)) + + threads = [] + for _ in range(4): # Number of worker threads + t = threading.Thread(target=self.worker, args=(input_queue, output_queue)) + t.start() + threads.append(t) + + input_queue.join() + + for _ in range(4): + input_queue.put(None) - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {i: executor.submit(self.process_image, image[i]) for i, image in enumerate(images.values())} + for t in threads: + t.join() contents = [] - for i, future in futures.items(): - contents.append({"image": images[i], "content": future.result()}) + while not output_queue.empty(): + image, content = output_queue.get() + contents.append({"image": image, "content": content}) return contents - @cachedmethod(cache=attrgetter('cache'), key=lambda self, input_list: hashkey(id(input_list))) def load_content_from_file_list(self, input: List[Union[str, BytesIO]]) -> List[Any]: images = self.convert_to_images(input) + input_queue = Queue() + output_queue = Queue() + + for img in images.values(): + input_queue.put(BytesIO(img)) + + threads = [] + for _ in range(4): # Number of worker threads + t = threading.Thread(target=self.worker, args=(input_queue, output_queue)) + t.start() + threads.append(t) + + input_queue.join() + + for _ in range(4): + input_queue.put(None) - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = {i: executor.submit(self.process_image, image[i]) for i, image in enumerate(images.values())} + for t in threads: + t.join() contents = [] - for i, future in futures.items(): - contents.append({"image": Image.open(BytesIO(images[i][i])), "content": future.result()}) + while not output_queue.empty(): + image, content = output_queue.get() + contents.append({"image": Image.open(image), "content": content}) return contents diff --git a/tests/document_loader_tesseract.py b/tests/document_loader_tesseract.py index 0fc783e..b781fa9 100644 --- a/tests/document_loader_tesseract.py +++ b/tests/document_loader_tesseract.py @@ -43,3 +43,11 @@ def test_cache_for_file(): # Assert assert content1 is content2 + + +def test_queue_load(): + for _ in range(10): + # Act + content = loader.load_content_from_file(test_file_path) + # Assert + assert "0000001" in content From 16b88b2573046107489ed1059430af4fad036e50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BAlio=20Almeida?= Date: Fri, 7 Jun 2024 16:02:34 +0100 Subject: [PATCH 2/2] bumping up version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3ac71e8..fa3c6ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "extract_thinker" -version = "0.0.3" +version = "0.0.5" description = "Library to extract data from files and documents agnositicaly using LLMs" authors = ["JĂșlio Almeida "] readme = "README.md"