Skip to content

Commit 98295d6

Browse files
authored
Implement document transformation pipeline to improve RAG performance (#363)
1 parent 6685714 commit 98295d6

31 files changed

+3635
-262
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
- [docker](https://docs.docker.com/get-started/get-docker/) Docker
1313
- [uv](https://docs.astral.sh/uv/) for Python package and environment management.
14+
- **Poppler** – Install Poppler, required for PDF processing.
1415

1516
## Project Setup
1617

backend/Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ ENV PYTHONUNBUFFERED=1
77
# Set working directory
88
WORKDIR /app/
99

10-
# Install system dependencies
11-
RUN apt-get update && apt-get install -y curl
10+
# Install system dependencies (added poppler-utils)
11+
RUN apt-get update && apt-get install -y \
12+
curl \
13+
poppler-utils \
14+
&& rm -rf /var/lib/apt/lists/*
1215

1316
# Install uv package manager
1417
COPY --from=ghcr.io/astral-sh/uv:0.5.11 /uv /uvx /bin/
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""create doc transformation job table
2+
3+
Revision ID: 9f8a4af9d6fd
4+
Revises: b5b9412d3d2a
5+
Create Date: 2025-08-29 16:00:47.848950
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
import sqlmodel.sql.sqltypes
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "9f8a4af9d6fd"
15+
down_revision = "b5b9412d3d2a"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
# ### commands auto generated by Alembic - please adjust! ###
22+
op.create_table(
23+
"doc_transformation_job",
24+
sa.Column("id", sa.Uuid(), nullable=False),
25+
sa.Column("source_document_id", sa.Uuid(), nullable=False),
26+
sa.Column("transformed_document_id", sa.Uuid(), nullable=True),
27+
sa.Column(
28+
"status",
29+
sa.Enum(
30+
"PENDING",
31+
"PROCESSING",
32+
"COMPLETED",
33+
"FAILED",
34+
name="transformationstatus",
35+
),
36+
nullable=False,
37+
),
38+
sa.Column("error_message", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
39+
sa.Column("created_at", sa.DateTime(), nullable=False),
40+
sa.Column("updated_at", sa.DateTime(), nullable=False),
41+
sa.ForeignKeyConstraint(
42+
["source_document_id"],
43+
["document.id"],
44+
),
45+
sa.ForeignKeyConstraint(
46+
["transformed_document_id"],
47+
["document.id"],
48+
),
49+
sa.PrimaryKeyConstraint("id"),
50+
)
51+
# ### end Alembic commands ###
52+
53+
54+
def downgrade():
55+
# ### commands auto generated by Alembic - please adjust! ###
56+
op.drop_table("doc_transformation_job")
57+
# ### end Alembic commands ###
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""add source document id to document table
2+
3+
Revision ID: b5b9412d3d2a
4+
Revises: 40307ab77e9f
5+
Create Date: 2025-08-29 15:59:34.347031
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
import sqlmodel.sql.sqltypes
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "b5b9412d3d2a"
15+
down_revision = "40307ab77e9f"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
# ### commands auto generated by Alembic - please adjust! ###
22+
op.add_column("document", sa.Column("source_document_id", sa.Uuid(), nullable=True))
23+
op.create_foreign_key(None, "document", "document", ["source_document_id"], ["id"])
24+
# ### end Alembic commands ###
25+
26+
27+
def downgrade():
28+
# ### commands auto generated by Alembic - please adjust! ###
29+
op.drop_constraint(None, "document", type_="foreignkey")
30+
op.drop_column("document", "source_document_id")
31+
# ### end Alembic commands ###
Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,17 @@
1-
Upload a document to the AI platform. The response will contain an ID,
2-
which is the document ID required by other routes.
1+
Upload a document to the AI platform.
2+
3+
- If only a file is provided, the document will be uploaded and stored, and its ID will be returned.
4+
- If a target format is specified, a transformation job will also be created to transform document into target format in the background. The response will include both the uploaded document details and information about the transformation job.
5+
6+
### Supported Transformations
7+
8+
The following (source_format → target_format) transformations are supported:
9+
10+
- pdf → markdown
11+
- zerox
12+
13+
### Transformers
14+
15+
Available transformer names and their implementations, default transformer is zerox:
16+
17+
- `zerox`

backend/app/api/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
assistants,
66
collections,
77
documents,
8+
doc_transformation_job,
89
login,
910
organization,
1011
openai_conversation,
@@ -26,6 +27,7 @@
2627
api_router.include_router(collections.router)
2728
api_router.include_router(credentials.router)
2829
api_router.include_router(documents.router)
30+
api_router.include_router(doc_transformation_job.router)
2931
api_router.include_router(login.router)
3032
api_router.include_router(onboarding.router)
3133
api_router.include_router(openai_conversation.router)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from uuid import UUID
2+
3+
from fastapi import APIRouter, HTTPException, Query, Path as FastPath
4+
5+
from app.api.deps import CurrentUserOrgProject, SessionDep
6+
from app.crud.doc_transformation_job import DocTransformationJobCrud
7+
from app.models import DocTransformationJob, DocTransformationJobs
8+
from app.utils import APIResponse
9+
10+
router = APIRouter(prefix="/documents/transformations", tags=["doc_transformation_job"])
11+
12+
13+
@router.get(
14+
"/{job_id}",
15+
description="Get the status and details of a document transformation job.",
16+
response_model=APIResponse[DocTransformationJob],
17+
)
18+
def get_transformation_job(
19+
session: SessionDep,
20+
current_user: CurrentUserOrgProject,
21+
job_id: UUID = FastPath(description="Transformation job ID"),
22+
):
23+
crud = DocTransformationJobCrud(session, current_user.project_id)
24+
job = crud.read_one(job_id)
25+
return APIResponse.success_response(job)
26+
27+
28+
@router.get(
29+
"/",
30+
description="Get the status and details of multiple document transformation jobs by IDs.",
31+
response_model=APIResponse[DocTransformationJobs],
32+
)
33+
def get_multiple_transformation_jobs(
34+
session: SessionDep,
35+
current_user: CurrentUserOrgProject,
36+
job_ids: list[UUID] = Query(
37+
description="List of transformation job IDs", min=1, max_length=100
38+
),
39+
):
40+
crud = DocTransformationJobCrud(session, project_id=current_user.project_id)
41+
jobs = crud.read_each(set(job_ids))
42+
jobs_not_found = set(job_ids) - {job.id for job in jobs}
43+
return APIResponse.success_response(
44+
DocTransformationJobs(jobs=jobs, jobs_not_found=list(jobs_not_found))
45+
)

backend/app/api/routes/documents.py

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,38 @@
11
import logging
2-
from uuid import UUID, uuid4
3-
from typing import List
42
from pathlib import Path
3+
from uuid import UUID, uuid4
54

6-
from fastapi import APIRouter, File, UploadFile, Query, HTTPException
5+
from fastapi import (
6+
APIRouter,
7+
BackgroundTasks,
8+
File,
9+
Form,
10+
HTTPException,
11+
Query,
12+
UploadFile,
13+
)
714
from fastapi import Path as FastPath
815

9-
from app.crud import DocumentCrud, CollectionCrud
10-
from app.models import Document, DocumentPublic, Message
11-
from app.utils import APIResponse, load_description, get_openai_client
12-
from app.api.deps import CurrentUser, SessionDep, CurrentUserOrgProject
16+
from app.api.deps import CurrentUserOrgProject, SessionDep
1317
from app.core.cloud import get_cloud_storage
18+
from app.core.doctransform import service as transformation_service
19+
from app.core.doctransform.registry import (
20+
get_available_transformers,
21+
get_file_format,
22+
is_transformation_supported,
23+
resolve_transformer,
24+
)
25+
from app.crud import CollectionCrud, DocumentCrud
1426
from app.crud.rag import OpenAIAssistantCrud
27+
from app.models import (
28+
Document,
29+
DocumentPublic,
30+
DocumentUploadResponse,
31+
Message,
32+
TransformationJobInfo,
33+
)
34+
from app.utils import APIResponse, get_openai_client, load_description
35+
1536

1637
logger = logging.getLogger(__name__)
1738
router = APIRouter(prefix="/documents", tags=["documents"])
@@ -20,7 +41,7 @@
2041
@router.get(
2142
"/list",
2243
description=load_description("documents/list.md"),
23-
response_model=APIResponse[List[DocumentPublic]],
44+
response_model=APIResponse[list[DocumentPublic]],
2445
)
2546
def list_docs(
2647
session: SessionDep,
@@ -36,13 +57,53 @@ def list_docs(
3657
@router.post(
3758
"/upload",
3859
description=load_description("documents/upload.md"),
39-
response_model=APIResponse[DocumentPublic],
60+
response_model=APIResponse[DocumentUploadResponse],
4061
)
41-
def upload_doc(
62+
async def upload_doc(
4263
session: SessionDep,
4364
current_user: CurrentUserOrgProject,
65+
background_tasks: BackgroundTasks,
4466
src: UploadFile = File(...),
67+
target_format: str
68+
| None = Form(
69+
None,
70+
description="Desired output format for the uploaded document (e.g., pdf, docx, txt). ",
71+
),
72+
transformer: str
73+
| None = Form(
74+
None, description="Name of the transformer to apply when converting. "
75+
),
4576
):
77+
# Determine source file format
78+
try:
79+
source_format = get_file_format(src.filename)
80+
except ValueError as e:
81+
raise HTTPException(status_code=400, detail=str(e))
82+
83+
# validate if transformation is possible or not
84+
if target_format:
85+
if not is_transformation_supported(source_format, target_format):
86+
raise HTTPException(
87+
status_code=400,
88+
detail=f"Transformation from {source_format} to {target_format} is not supported",
89+
)
90+
91+
# Resolve the transformer to use
92+
if not transformer:
93+
transformer = "default"
94+
try:
95+
actual_transformer = resolve_transformer(
96+
source_format, target_format, transformer
97+
)
98+
except ValueError as e:
99+
available_transformers = get_available_transformers(
100+
source_format, target_format
101+
)
102+
raise HTTPException(
103+
status_code=400,
104+
detail=f"{str(e)}. Available transformers: {list(available_transformers.keys())}",
105+
)
106+
46107
storage = get_cloud_storage(session=session, project_id=current_user.project_id)
47108
document_id = uuid4()
48109

@@ -54,8 +115,38 @@ def upload_doc(
54115
fname=src.filename,
55116
object_store_url=str(object_store_url),
56117
)
57-
data = crud.update(document)
58-
return APIResponse.success_response(data)
118+
source_document = crud.update(document)
119+
120+
job_info: TransformationJobInfo | None = None
121+
if target_format and actual_transformer:
122+
job_id = transformation_service.start_job(
123+
db=session,
124+
current_user=current_user,
125+
source_document_id=source_document.id,
126+
transformer_name=actual_transformer,
127+
target_format=target_format,
128+
background_tasks=background_tasks,
129+
)
130+
job_info = TransformationJobInfo(
131+
message=f"Document accepted for transformation from {source_format} to {target_format}.",
132+
job_id=str(job_id),
133+
source_format=source_format,
134+
target_format=target_format,
135+
transformer=actual_transformer,
136+
status_check_url=f"/documents/transformations/{job_id}",
137+
)
138+
139+
document_schema = DocumentPublic.model_validate(
140+
source_document, from_attributes=True
141+
)
142+
document_schema.signed_url = storage.get_signed_url(
143+
source_document.object_store_url
144+
)
145+
response = DocumentUploadResponse(
146+
**document_schema.model_dump(), transformation_job=job_info
147+
)
148+
149+
return APIResponse.success_response(response)
59150

60151

61152
@router.delete(

0 commit comments

Comments
 (0)