Real-time anomaly detection over live data streams using Kafka, PyOD ensemble ML, FastAPI, and Grafana.
Ingests three types of real-world data streams, scores every event using an ensemble of three ML models, and surfaces anomalies in real time via a REST API, WebSocket feed, and Grafana dashboard — all orchestrated with Docker Compose.
[Server Metrics] ──┐
[Transactions] ──┤──► Kafka ──► PyOD Ensemble ──► FastAPI + WebSocket
[IoT Sensors] ──┘ │
└──► PostgreSQL ──► Grafana
| Stream | Features | Anomaly type |
|---|---|---|
server_metrics |
CPU %, memory %, latency ms, error rate | Incident / outage spike |
transactions |
Amount, hour of day, tx frequency | Fraud signal |
iot_sensors |
Temperature, vibration, pressure, humidity | Equipment fault |
Three PyOD detectors vote on every event. Majority rules.
| Model | Strength |
|---|---|
| IForest (Isolation Forest) | Global outliers — sudden spikes |
| LOF (Local Outlier Factor) | Local density anomalies |
| HBOS (Histogram-Based) | Fast univariate anomalies |
A normalised ensemble score (0–1) is computed for every event. Score ≥ 0.6 → anomaly.
The detector warms up on the first 200 events per stream, then re-fits every 500 events to handle concept drift.
| Layer | Technology |
|---|---|
| Message broker | Apache Kafka + Zookeeper |
| ML detection | PyOD (IForest + LOF + HBOS) |
| API | FastAPI + WebSockets |
| Database | PostgreSQL + SQLAlchemy |
| Dashboard | Grafana (auto-provisioned) |
| Testing | Pytest — 44 tests |
| Infrastructure | Docker Compose |
git clone https://github.com/YOUR_USERNAME/anomaly-detection
cd anomaly-detection
cp .env.example .env
docker compose up --buildServices started:
| Service | URL |
|---|---|
| FastAPI docs | http://localhost:8000/docs |
| Grafana | http://localhost:3000 (admin/admin) |
| Kafka | localhost:9092 |
| PostgreSQL | localhost:5432 |
GET /health — liveness + DB check
GET /api/anomalies — paginated anomaly history
GET /api/anomalies?stream=server_metrics — filter by stream
GET /api/anomalies?min_score=0.8 — filter by severity
GET /api/anomalies/{id} — single record
GET /api/anomalies/stream/{stream} — stream history
GET /api/stats — per-stream summary
DELETE /api/anomalies/{id} — delete record
WS /ws/alerts — real-time anomaly push
pip install -r requirements.txt
pytest tests/ -v44 passed in 11.2s ✅
All 44 tests run with no Kafka, no Postgres, no network — fully isolated.
anomaly-detection/
├── producer/
│ └── simulator.py # Three synthetic data stream generators
├── detector/
│ ├── models.py # PyOD ensemble (IForest + LOF + HBOS)
│ ├── engine.py # Warm-up buffer + scoring logic
│ └── consumer.py # Kafka consumer loop
├── api/
│ ├── main.py # FastAPI + WebSocket broadcaster
│ ├── database.py # SQLAlchemy + PostgreSQL
│ └── routers/
│ └── anomalies.py # REST endpoints
├── grafana/
│ └── provisioning/ # Auto-provisioned datasource + dashboard
├── tests/ # 44 pytest tests
├── docker-compose.yml # Full stack: Kafka + Postgres + API + Grafana
├── Dockerfile
└── requirements.txt
After docker compose up, open http://localhost:3000 (admin / admin).
The PostgreSQL datasource is auto-provisioned. Create a panel with:
SELECT timestamp, stream, score
FROM anomalies
WHERE $__timeFilter(created_at)
ORDER BY created_at DESC- Add a new stream — add features to
STREAM_FEATURESindetector/models.pyand a new generator inproducer/simulator.py - Swap a model — replace any detector in
EnsembleDetector.__post_init__with any PyOD model - Add email/Slack alerts — hook into the
anomaly_queueindetector/consumer.py - Add TimescaleDB — swap PostgreSQL image for
timescale/timescaledbfor better time-series queries
MIT
Raj Mandaliya — github.com/RajMandaliya