Skip to content

Conversation

@Prajna1999
Copy link
Collaborator

@Prajna1999 Prajna1999 commented Dec 2, 2025

Summary

Target issue is #460

Adding threadpool based concurrency model to allow max 4 worker threads to take up the insert (input, output) pair instead of one blocking process doing all the heavy lifting. This enhances the endpoints ability to insert CSV files upto 1000 line items within 60 seconds.

Chore: Also added a celery doc to explain how CELERY implemented in the repo. Largely unrelated with the core task.

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

Summary by CodeRabbit

  • Documentation

    • Added a comprehensive guide for the asynchronous task processing system covering architecture, configuration, scheduling, worker operation, monitoring, error handling, and best practices.
  • Improvements

    • Faster dataset uploads via concurrent processing, reducing overall synchronization time and improving reliability.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 2, 2025

Walkthrough

Adds a new Celery architecture and operations guide and refactors Langfuse dataset uploading to use concurrent uploads via ThreadPoolExecutor with aggregated success counting and a single final flush.

Changes

Cohort / File(s) Summary
Documentation
backend/CELERY_OVERVIEW.md
New comprehensive Celery overview describing architecture (RabbitMQ broker, Redis result backend, Beat), file layout, task routing/queues, worker/beat management, monitoring, error-handling/reliability practices, dynamic task execution pattern, CLI/examples and operational guidance.
Concurrent Upload Optimization
backend/app/crud/evaluations/langfuse.py
Replaces sequential per-item uploads with a ThreadPoolExecutor-based concurrent upload flow: adds an upload_item helper, builds upload_tasks, uses as_completed to count successes, removes per-item flushes and keeps a single final flush; adds necessary concurrency imports.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Review ThreadPoolExecutor usage and potential shared-state/thread-safety in upload_dataset_to_langfuse.
  • Validate error handling in upload_item and correctness of success counting with as_completed.
  • Confirm final flush semantics with Langfuse API and that concurrency does not violate API expectations.

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • avirajsingh7

Poem

🐰 Hop, hop — the tasks align in rows,
Threads bustle where the dataset grows,
Celery hums, schedulers keep time,
Workers leap — each job a rhyme.
I nibble code and twitch my nose. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The PR title 'Evaluation: Uploading dataset concurrently' is vague and doesn't clearly convey the primary change. While it mentions 'concurrently', it's unclear what is being evaluated or what the specific improvement is. Refine the title to be more specific, such as 'Add threadpool-based concurrency to Langfuse dataset uploads' or 'Speed up Langfuse dataset uploads with concurrent processing'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/lf-dataset-upload

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
backend/app/crud/evaluations/langfuse.py (3)

251-270: Add return type annotation to inner function.

Per coding guidelines, type hints should be used throughout. The upload_item helper is missing its return type annotation.

-    def upload_item(item: dict[str, str], duplicate_num: int) -> bool:
+    def upload_item(item: dict[str, str], duplicate_num: int) -> bool:

The current signature already has the return type — disregard if Black reformats it correctly. Otherwise, ensure the -> bool annotation is preserved.


285-285: Consider making max_workers configurable.

The hardcoded max_workers=4 may not be optimal for all environments. Consider exposing this as a configuration parameter or deriving it from settings (similar to CELERY_WORKER_CONCURRENCY).

+from app.core.config import settings
+
+# In function or at module level
+MAX_UPLOAD_WORKERS = getattr(settings, "LANGFUSE_UPLOAD_WORKERS", 4)
+
 # Then use:
-        with ThreadPoolExecutor(max_workers=4) as executor:
+        with ThreadPoolExecutor(max_workers=MAX_UPLOAD_WORKERS) as executor:

287-290: Simplify by using executor.map or list comprehension with submit.

The current pattern of appending futures to a list in a loop can be simplified.

-            futures = []
-            for item, dup_num in upload_tasks:
-                future = executor.submit(upload_item, item, dup_num)
-                futures.append(future)
+            futures = [executor.submit(upload_item, item, dup_num) for item, dup_num in upload_tasks]
backend/CELERY_OVERVIEW.md (1)

15-25: Add language specifier to fenced code block.

The file structure code block is missing a language identifier, which triggers markdown lint warning MD040. Use text or plaintext for directory listings.

-```
+```text
 app/celery/
 ├── __init__.py              # Package initialization, exports celery_app
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17f427e and cc015ec.

📒 Files selected for processing (2)
  • backend/CELERY_OVERVIEW.md (1 hunks)
  • backend/app/crud/evaluations/langfuse.py (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • backend/app/crud/evaluations/langfuse.py
backend/app/crud/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Implement database access operations in backend/app/crud/

Files:

  • backend/app/crud/evaluations/langfuse.py
🧠 Learnings (2)
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/celery/**/*.py : Keep Celery app configuration (priority queues, beat scheduler, workers) under backend/app/celery/

Applied to files:

  • backend/CELERY_OVERVIEW.md
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/celery/tasks/**/*.py : Define Celery tasks under backend/app/celery/tasks/

Applied to files:

  • backend/CELERY_OVERVIEW.md
🧬 Code graph analysis (1)
backend/app/crud/evaluations/langfuse.py (1)
backend/app/core/langfuse/langfuse.py (1)
  • flush (108-109)
🪛 GitHub Actions: AI Platform CI
backend/app/crud/evaluations/langfuse.py

[error] 1-1: Trailing whitespace detected and removed by pre-commit hook 'trailing-whitespace'. Re-run pre-commit to finalize changes.


[error] 1-1: Black formatter reformatted the file. Re-run pre-commit to commit the changes.

🪛 LanguageTool
backend/CELERY_OVERVIEW.md

[uncategorized] ~171-~171: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...Identical to high priority but uses the low priority queue get_task_status() (lines 7...

(EN_COMPOUND_ADJECTIVE_INTERNAL)


[grammar] ~555-~555: Use a hyphen to join words.
Context: ...vice** (app/services/llm/) - Uses high priority queue for real-time API calls -...

(QB_NEW_EN_HYPHEN)

🪛 markdownlint-cli2 (0.18.1)
backend/CELERY_OVERVIEW.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (2)
backend/CELERY_OVERVIEW.md (1)

1-588: Documentation looks comprehensive and well-structured.

This is a valuable addition that clearly explains the Celery architecture, queue priorities, task routing, and best practices. The end-to-end flow example and integration patterns are particularly helpful for onboarding developers. Based on learnings, the file structure aligns with project conventions for Celery configuration under backend/app/celery/.

backend/app/crud/evaluations/langfuse.py (1)

283-295: Langfuse client thread-safety is properly handled.

The Langfuse Python SDK is thread-safe for concurrent create_dataset_item calls when using a single client instance, which your code does. The internal queue and background worker handle batching, and langfuse.flush() at line 298 correctly ensures all items are delivered before the function returns.

Comment on lines +292 to 295
for future in as_completed(futures):
upload_successful = future.result()
if upload_successful:
total_uploaded += 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle exceptions from future.result() to avoid silent failures.

If upload_item raises an unexpected exception (not caught internally), future.result() will propagate it here and abort the loop. Consider wrapping in a try-except to ensure all futures are processed and failures are logged.

             for future in as_completed(futures):
-                upload_successful = future.result()
-                if upload_successful:
-                    total_uploaded += 1
+                try:
+                    if future.result():
+                        total_uploaded += 1
+                except Exception as e:
+                    logger.error(f"[upload_dataset_to_langfuse] Unexpected error in upload task | {e}")
🤖 Prompt for AI Agents
In backend/app/crud/evaluations/langfuse.py around lines 292 to 295, the loop
calling future.result() does not catch exceptions from the completed futures
which can abort processing; wrap the future.result() call in a try-except block,
log the exception (with context such as which item or index failed), and treat
that future as a failed upload (do not increment total_uploaded). Ensure the
except block continues the loop so all futures are processed and consider
returning/recording failure details for higher-level handling.

@Prajna1999 Prajna1999 requested a review from nishika26 December 12, 2025 04:22
Comment on lines +286 to +291
futures = []
for item, dup_num in upload_tasks:
future = executor.submit(upload_item, item, dup_num)
futures.append(future)

for future in as_completed(futures):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use better names than futures and future

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the naming might be bit generic but future, futures, executor are conventional variable naming used for the Threadpool library. Re https://docs.python.org/3/library/concurrent.futures.html

Comment on lines +276 to +280
upload_tasks = [
(item, duplicate_num)
for item in items
for duplicate_num in range(duplication_factor)
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we are using 2 loops, inner loop and outer loop.
in last iteration of code after every inner loop complete we flush langfuse as we have seen cases where there was data inconsistency while uploading. Can we ensure that same is followed in new version of code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since all upload tasks are stored in an array pre-uploading each (input, output, duplicate_num) triplet should not be inconsistent. Had checked once and it was showing the ordering of duplicates might be inconsistent (i.e 5,2,1,4,3 numbered instead of the sequence 1-5), the QnA pairs are correctly mapped. Anyway will test with couple more examples and if sense some errors, will update.

@AkhileshNegi AkhileshNegi changed the title fix: add threadpool based concurrency to speeden up langfuse dataset upload functionality Evaluation: Uploading dataset concurrently Dec 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants