The Market Data Service is a production-ready microservice designed to fetch market data, process it through a streaming pipeline, and serve it via REST APIs. It integrates with external market data providers, processes raw data into meaningful insights (e.g., moving averages), and provides real-time updates via Kafka. The service is built with FastAPI, PostgreSQL, and Apache Kafka, ensuring scalability, reliability, and modularity.
-
Market Data Fetching:
- Fetches real-time price data for specified symbols from external providers (e.g., Yahoo Finance).
- Supports configurable polling intervals.
-
Streaming Pipeline:
- Publishes raw price data to Kafka (
price-eventstopic). - Computes 5-point moving averages and stores results in the
symbol_averagestable.
- Publishes raw price data to Kafka (
-
REST API:
- Provides endpoints for fetching the latest price and scheduling polling jobs.
- OpenAPI documentation for easy integration.
-
Database Integration:
- Stores raw market data, processed price points, polling job configurations, and moving averages.
- Optimized with indexes on
timestampandsymbol.
-
Dockerized Deployment:
- Fully containerized with Docker Compose for local development and production environments.
- Includes PostgreSQL, Kafka, and FastAPI services.
sequenceDiagram
participant C as Client
participant A as FastAPI
participant M as Market API
participant K as Kafka
participant MA as MA Consumer
participant DB as PostgreSQL
C->>A: GET /prices/latest
A->>DB: Check cache
alt Cache miss
A->>M: Fetch latest price
M-->>A: Price data
A->>DB: Store raw response
A->>K: Produce price event
end
A-->>C: Return price
K->>MA: Consume price event
MA->>DB: Fetch last 5 prices
MA->>MA: Calculate MA
MA->>DB: Store MA result
Fetch the latest price for a given symbol.
Query Parameters:
symbol(required): The stock symbol (e.g.,AAPL).provider(optional): The data provider (default: Yahoo Finance).
Response:
{
"symbol": "AAPL",
"price": 150.25,
"timestamp": "2024-03-20T10:30:00Z",
"provider": "yahoo_finance"
}Schedule a polling job for specified symbols.
Request Body:
{
"symbols": ["AAPL", "MSFT"],
"interval": 60,
"provider": "yahoo_finance"
}Response:
{
"job_id": "poll_123",
"status": "accepted",
"config": {
"symbols": ["AAPL", "MSFT"],
"interval": 60
}
}-
polling_job:
- Stores configurations for polling jobs.
- Columns:
id,symbols,interval,provider,created_at,status.
-
price_point:
- Stores raw price data fetched from external providers.
- Columns:
id,symbol,timestamp,price,provider,raw_response_id.
-
symbol_averages:
- Stores computed moving averages for symbols.
- Columns:
id,symbol,timestamp,average_price.
-
raw_market_data:
- Stores raw responses from external market data providers.
- Columns:
id,symbol,timestamp,source,data.
- Docker and Docker Compose installed.
- Python 3.10+ installed locally (optional for development).
-
Clone the repository:
git clone https://github.com/an-siuu-man/market-data-service cd market-data-service -
Build and start the services:
docker-compose up --build
-
Access the API documentation:
- Open http://localhost:8000/docs in your browser.
-
Database Connection Refused:
- Ensure the
dbservice is running and accessible. - Check the
DATABASE_URLenvironment variable for correct credentials.
- Ensure the
-
Kafka Topic Not Found:
- Ensure the producer has published at least one message to the
price-eventstopic. - Create the topic manually using Kafka CLI tools.
- Ensure the producer has published at least one message to the
market-data-service/
├── app/
│ ├── api/ # FastAPI routes
│ ├── core/ # Core utilities (e.g., database)
│ ├── models/ # SQLAlchemy models
│ ├── services/ # External integrations (e.g., Kafka, providers)
│ ├── schemas/ # Pydantic schemas
│ └── main.py # FastAPI entrypoint
├── migrations/ # Alembic migration scripts
├── postgres-data/ # Local Postgres data (for Docker)
├── scripts/ # Standalone scripts (e.g., poller, init_db)
├── myenv/ # Python virtual environment (local)
├── Dockerfile # Docker build file
├── docker-compose.yml # Docker Compose config
├── requirements.txt # Project dependencies
├── README.md # Project documentation
-
Caching:
- Add Redis for caching frequently accessed data.
-
Rate Limiting:
- Implement rate limiting for API endpoints.
-
Monitoring:
- Integrate Prometheus and Grafana for system monitoring.
-
Deployment:
- Deploy to AWS or Heroku for production use.
Due to limited time and the steep learning curve of tools such as SQLAlchemy, Pydantic, and FastAPI, this project does not currently include a Postman collection, automated testing, or Redis caching. These features are valuable and were recommended in the assignment description, but I decided to focus on core functionality and learning the main frameworks in this implementation.