AI multi-agent pipeline over 150K+ Amazon product reviews (McAuley-Lab Amazon-Reviews-2023, loaded via direct JSONL download from Hugging Face). The stack combines FAISS (dense vectors), BM25 (keywords), DuckDB (SQL analytics), cross-encoder reranking, and a multi-agent flow with dynamic routing (Planner -> Router -> [Decompose | SQL-first | Direct] -> Analyst -> Critic) using LangGraph. LLM calls use Google Gemini (AI Studio) when GEMINI_API_KEY or GOOGLE_API_KEY is set, with Groq as automatic fallback on errors or rate limits; with no Gemini key, the app uses Groq only.
| Item | Detail |
|---|---|
| Source | McAuley-Lab/Amazon-Reviews-2023 on Hugging Face Hub |
| Access | Raw JSONL files under raw/review_categories/*.jsonl via huggingface_hub.hf_hub_download (the datasets Python API does not load this repo due to deprecated loading scripts). |
| Subset used | 5 categories x 30,000 reviews = 150,000 rows: All_Beauty, Appliances, Amazon_Fashion, Arts_Crafts_and_Sewing, Baby_Products |
| Fields stored | id, asin, category, rating, title, text, doc_text, timestamp, helpful_vote, verified_purchase |
| Preprocessing | Normalize Unicode (NFKC), strip HTML-like tags, build doc_text = "[Category] " + title + " " + text (category prefix boosts BM25/FAISS category signal), coerce verified_purchase to bool, drop empty rows |
| Artifacts | Parquet (data/processed/), DuckDB + FAISS + BM25 (data/indices/) -- large files gitignored; rebuild with python scripts/ingest.py |
- Python 3.11+ (tested on 3.13)
- ~15GB disk for caches + indices (first run downloads category JSONL files)
- At least one LLM key:
- Gemini (recommended primary): Google AI Studio — set
GEMINI_API_KEYorGOOGLE_API_KEY - Groq (recommended fallback): console.groq.com — set
GROQ_API_KEY
- Gemini (recommended primary): Google AI Studio — set
With both keys, Gemini is invoked first; if the call fails (quota, 429, network, invalid response path, etc.), LangChain automatically retries with Groq. Gemini’s free tier can be tight on RPM/RPD for multi-agent flows, so keeping Groq configured avoids hard failures.
Defaults use gemini-2.5-flash / gemini-2.5-pro (older gemini-1.5-* IDs often return 404 on the current API). Override with MODEL_GEMINI_PLANNER, MODEL_GEMINI_ANALYST, MODEL_GEMINI_CRITIC if your project lists different model strings in Google’s model docs.
cd C:\Projects\BigData_QnA
pip install -r requirements.txt
copy .env.example .env
# Edit .env: set GEMINI_API_KEY or GOOGLE_API_KEY, and GROQ_API_KEY for fallback (or Groq-only).
# Optional: LLM_PRIMARY=groq to force Groq and skip Gemini.Reproducible installs: CI and the Dockerfile install from requirements.lock (generated from requirements.in). To refresh pins after editing constraints: python -m pip install pip-tools then python -m piptools compile requirements.in -o requirements.lock (use the same Python major/minor you deploy with, e.g. 3.11 on Linux CI).
Production-oriented env vars (see .env.example): USE_SHARED_HYBRID, DUCKDB_READ_ONLY, LOG_QUERY_PREVIEWS, LLM_REQUEST_TIMEOUT_S, optional PIPELINE_TIMEOUT_S (wall-clock cap on the full LangGraph run; the worker thread may still finish in the background after a timeout), optional Gradio HTTP basic auth (GRADIO_AUTH_USER / GRADIO_AUTH_PASSWORD).
Default: 5 categories x 30,000 reviews = 150,000 rows (may take 30-90+ minutes on CPU for embedding).
python scripts/ingest.pySmaller smoke test:
python scripts/ingest.py --rows-per-category 100Outputs (gitignored):
data/processed/reviews.parquetdata/indices/reviews.duckdbdata/indices/faiss.index+data/indices/faiss_meta.jsondata/indices/bm25.pkl
python scripts/query.py "What is the average rating by category?"python -m src.ui.appOpen the printed local URL (default http://localhost:7860).
python -m src.api.mainDefault http://localhost:8000: GET /healthz, GET /readyz (indices present), POST /v1/query with body {"query": "..."}. A minimal static client is served at / from src/api/static/. Set API_PORT to change the listen port.
After ingestion, rebuild ground truth from DuckDB and run the evaluation pipeline:
python scripts/build_ground_truth.py # deterministic, content-aligned ground truth
python scripts/evaluate.py # Recall, Precision, MRR, nDCG, Hit@Kscripts/evaluate.py runs the retrieval batch with eval_mode awareness (retrieval-primary vs SQL-primary questions) and category-filtered retrieval for single-category queries, then regenerates EVALUATION_REPORT.md.
Compare retrieval modes side-by-side (vector-only, BM25-only, hybrid, hybrid + cross-encoder):
python scripts/ablation.pyRun all 20 test questions through the full agent pipeline, collect critic scores and latency:
python scripts/answer_quality.pyUses read-only DuckDB so this can run while the Gradio UI (or another job) has the database open. The script sets USE_CROSS_ENCODER=0 by default (same as CI) so the cross-encoder is not loaded once per query.
On Groq only, analyst and critic default to llama-3.1-8b-instant for this script so a full 20-question batch stays within the free tier tokens-per-day limit for llama-3.3-70b-versatile. To force 70B for analyst/critic (higher quality, may hit TPD): set ANSWER_QUALITY_USE_70B=1 or add --use-70b to the command line. With Gemini as primary (GEMINI_API_KEY set), Groq model envs apply mainly to fallback.
Generate reference answers for SQL-primary questions:
python scripts/sql_eval.py60+ tests covering metrics, RRF, preprocessor, SQL safety, agents, pipeline timeout, and synthesizer:
python -m pytest -vCI (.github/workflows/ci.yml) runs tests with USE_CROSS_ENCODER=0.
Assignment steps (you complete on GitHub / Google / recording):
- Repo public or private with collaborator
uptiq-chaitanya(read) per assignment. - Submit Google Form with repo link: forms.gle/9iPeUBHKcdHhuSq67.
- Demo video — URL in
DEMO_VIDEO/README.md: Google Drive (set file sharing to Anyone with the link in Drive).
Repo evaluation artifacts (regenerate anytime):
-
evaluation/cache/eval_results.json,ablation.json, andanswer_quality.jsonare populated;EVALUATION_REPORT.mdis generated from them viapython scripts/evaluate.py(includes report) orpython -m evaluation.report_generatorafter ablation/answer-quality runs.
(After you finish each assignment step above, change its [ ] to [x].)
The system uses 6 specialized agents orchestrated by LangGraph with dynamic routing:
User Query
|
v
[Planner] -- classifies query type + decides route
|
v
[Router] -- conditional edges based on route
|
+-- "direct" --> [Retriever] --+
+-- "sql_first" --> [SQL-first] --+--> [Analyst] --> [Critic] --> Answer
+-- "decompose" --> [Decomposer] --> [Sub-Retriever] --> [Synthesizer] --+
| Agent | Model (primary / fallback) | Role |
|---|---|---|
| Planner | MODEL_GEMINI_PLANNER (default gemini-2.5-flash) / MODEL_PLANNER (Groq llama-3.1-8b-instant) |
Classify query, route, SQL hint, optional category_filter for retrieval |
| Decomposer | Same stack as Planner | Break multi-hop / comparison queries into sub-questions |
| Retriever | -- | Hybrid search (FAISS + BM25 + RRF + optional cross-encoder); respects planner category_filter |
| Synthesizer | -- | Merge per-sub-question retrieval results |
| Analyst | MODEL_GEMINI_ANALYST (default gemini-2.5-pro) / MODEL_ANALYST (Groq llama-3.3-70b-versatile) |
Grounded answer with citations |
| Critic | MODEL_GEMINI_CRITIC (default gemini-2.5-flash) / MODEL_CRITIC (Groq llama-3.3-70b-versatile) |
Score quality, trigger retry |
When GEMINI_API_KEY or GOOGLE_API_KEY is set and LLM_PRIMARY is not groq, Gemini is used first; Groq is the fallback on errors/rate limits. With no Gemini key, Groq only. See src/llm/chat.py.
docker build -t bigdata-qna .
docker run -p 7860:7860 --env-file .env bigdata-qnaThe image installs requirements.lock for reproducible dependency versions.
- Push to GitHub.
- Create a new Web Service on Render, point to the repo.
- Render auto-detects
render.yaml(Docker runtime, persistent disk for data). - Set
GROQ_API_KEYin Render environment. - Run ingestion once via Render shell:
python scripts/ingest.py --rows-per-category 100.
Gradio apps deploy directly to HF Spaces -- push the repo and set secrets.
Structured JSON logs (stdout + logs/app.log) via src/observability/logger.py with trace_id correlation:
| Event | What is logged |
|---|---|
pipeline_query |
User question length + preview + trace_id |
agent_planner |
query_type, route |
agent_decomposer |
Sub-question count + text |
retrieval_sql / retrieval_sql_error |
SQL row count or error |
retrieval_hybrid |
Doc count, retrieved reviews.id list (up to 30), fusion scores |
agent_analyst |
Answer length |
agent_critic |
Score, needs_retry |
ui_query |
Query length (Gradio) |
Uncomment in .env:
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=lsv2_your_key_here
LANGCHAIN_PROJECT=bigdata-qnaAll LangChain/LangGraph calls are automatically traced in LangSmith (free tier).
OTEL_ENABLED=true
OTEL_EXPORTER=console # or "otlp" for a collector
OTEL_ENDPOINT=http://localhost:4317Per-agent spans with attributes (query, doc_count, latency) exported to console or OTLP.
| Path | Role |
|---|---|
src/ingestion/ |
Download JSONL, preprocess, orchestration |
src/embeddings/ |
sentence-transformers embeddings |
src/storage/ |
DuckDB, FAISS, BM25 persistence |
src/retrieval/ |
Hybrid search + RRF + optional cross-encoder rerank |
src/agents/ |
Planner, Decomposer, Retriever, Synthesizer, Analyst, Critic, LangGraph |
src/llm/ |
Gemini + Groq routing (get_chat_llm, fallbacks) |
src/observability/ |
Structured logging + OTel tracing |
src/ui/ |
Gradio app with tabbed trace/observability UI |
src/api/ |
FastAPI JSON API + minimal static client |
evaluation/ |
Questions, metrics, batch eval, ablation, answer quality, SQL accuracy |
scripts/ |
ingest.py, query.py, evaluate.py, compile_requirements.py, … |
tests/ |
Unit tests (metrics, RRF, preprocessor, SQL safety, agents, API smoke) |
requirements.in / requirements.lock |
Pinned deps for Docker / CI (pip-compile) |
.github/workflows/ |
CI: ruff, mypy, pytest, Docker build |
Dockerfile |
Container build for deployment |
render.yaml |
Render deployment config |
- ARCHITECTURE.md -- diagrams and data flow
- ADVANCED.md -- retrieval theory (RRF, cross-encoder), evaluation nuance, talking points
- EVALUATION_REPORT.md -- metrics (after you run eval)
- ANY_OTHER_DIAGRAMS.md -- sequence diagrams, retrieval fusion, evaluation pipeline
Record a short walkthrough (complex query + a failure case) and paste the link in DEMO_VIDEO/README.md. Optional until you record it -- everything else can be submitted without the video.
- Data platform: ingestion + vector (FAISS) + hybrid (SQL + vector + BM25)
- Retrieval: embeddings + similarity; bonus hybrid search + reranking + metadata filtering
- Agents: Planner, Decomposer, Retriever, Synthesizer, Analyst, Critic (LangGraph multi-agent)
- Query complexity: multi-hop (decomposition), aggregations (SQL-first), comparisons (decompose per entity)
- Evaluation:
- Retrieval: Recall@K, Precision@K, Hit@K, MRR@K, nDCG@K with eval_mode separation
- Ablation study: vector-only vs BM25 vs hybrid vs hybrid+CE
- Answer quality: batch critic scores (mean 4.79/5)
- SQL accuracy: reference answers for aggregation questions
- Category-filtered retrieval for fair single-category evaluation
- Observability: structured logs with trace_id + optional LangSmith + optional OpenTelemetry
- Deployment: Docker + Render
Educational / assignment use. Dataset subject to McAuley-Lab / Amazon license on Hugging Face.