Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 116 additions & 40 deletions backend/app/api/v1/endpoints/health.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,126 @@
from fastapi import APIRouter, Depends, Response, status
from sqlalchemy import text
from sqlalchemy.orm import Session
import time
import logging

from fastapi import APIRouter, Depends
from redis.asyncio import Redis

from app.core.cache import cache
from app.core.config import settings
from app.db.session import get_db
from app.services.soroban import soroban_server
from app.services.webhook_listener import WebhookListenerService, EventHandler

router = APIRouter()


@router.get("/health")
async def health_check(response: Response, db: Session = Depends(get_db)):
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/", response_model=dict, summary="Health check for all services")
async def health_check(
db=Depends(get_db),
redis_client: Redis = Depends(lambda: settings.REDIS_URL),
) -> dict:
"""
Health check endpoint to verify server, database, and Redis connectivity.

Returns HTTP 200 when all dependencies are healthy, and HTTP 503 when any
dependency is unavailable. Designed to be consumed by external uptime
monitors (e.g. UptimeRobot) that trigger email/Slack alerts on failure.
Comprehensive health check endpoint.

Returns the health status of:
- Database connectivity
- Redis connectivity
- Soroban RPC server
- Webhook listener service (if enabled)
"""
# Check database connectivity
checks = {
"status": "healthy",
"checks": {},
"version": "1.0.0",
}

# Database health check
try:
db.execute(text("SELECT 1"))
db_status = "healthy"
except Exception:
db_status = "unhealthy"

# Check Redis connectivity
result = await db.execute("SELECT 1")
checks["database"] = {
"status": "healthy",
"details": "Database connection successful",
}
except Exception as e:
checks["database"] = {
"status": "unhealthy",
"error": str(e),
}
checks["status"] = "degraded"

# Redis health check
try:
if cache.redis is None:
redis_status = "unhealthy"
else:
await cache.redis.ping()
redis_status = "healthy"
except Exception:
redis_status = "unhealthy"

overall_healthy = db_status == "healthy" and redis_status == "healthy"
overall_status = "healthy" if overall_healthy else "unhealthy"

if not overall_healthy:
response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE

await redis_client.ping()
checks["redis"] = {
"status": "healthy",
"details": "Redis connection successful",
}
except Exception as e:
checks["redis"] = {
"status": "unhealthy",
"error": str(e),
}
checks["status"] = "degraded"

# Soroban server health check
try:
network_passphrase = settings.SOROBAN_NETWORK_PASSPHRASE
server_info = await soroban_server.server_info()
checks["soroban"] = {
"status": "healthy",
"details": f"Connected to network: {server_info.network_passphrase}",
"network_passphrase": network_passphrase,
}
except Exception as e:
checks["soroban"] = {
"status": "unhealthy",
"error": str(e),
}
checks["status"] = "degraded"

# Webhook listener health check
if settings.WEBHOOK_LISTENER_ENABLED:
try:
# Check Redis cursor storage capability
test_key = f"webhook_listener:health_check:{int(time.time())}"
await redis_client.setex(test_key, 60, "healthy")
await redis_client.delete(test_key)

checks["webhook_listener"] = {
"status": "healthy",
"enabled": True,
"details": "Webhook listener is operational",
"contract_address": settings.ESCROW_CONTRACT_ID or "not configured",
}
except Exception as e:
checks["webhook_listener"] = {
"status": "unhealthy",
"error": str(e),
"enabled": True,
}
checks["status"] = "degraded"
else:
checks["webhook_listener"] = {
"status": "disabled",
"enabled": False,
}

# Overall status determination
unhealthy_count = sum(
1 for check in checks["checks"].values()
if check["status"] == "unhealthy"
)
degraded_count = sum(
1 for check in checks["checks"].values()
if check["status"] == "degraded"
)

if unhealthy_count > 0:
checks["status"] = "unhealthy"
elif degraded_count > 0:
checks["status"] = "degraded"
else:
checks["status"] = "healthy"

return {
"status": overall_status,
"project": settings.PROJECT_NAME,
"database": db_status,
"redis": redis_status,
"debug": settings.DEBUG,
"status": checks["status"],
"checks": checks["checks"],
"timestamp": int(time.time()),
}
22 changes: 7 additions & 15 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from pydantic import AnyHttpUrl, field_validator
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
# API
API_V1_STR: str = "/api/v1"
Expand Down Expand Up @@ -59,21 +57,15 @@ def assemble_cors_origins(cls, v: str | list[str]) -> list[str] | str:
STRIPE_PUBLISHABLE_KEY: str | None = None

# Soroban Configuration
# Optional vision model configuration for completion verification
VISION_API_URL: str | None = None
VISION_API_KEY: str | None = None
VISION_MODEL: str = "gpt-4o-mini"
JOB_COMPLETION_ACCEPTANCE_THRESHOLD: float = 0.75

# Stellar/Soroban Configuration
STELLAR_NETWORK: str = "standalone" # standalone, testnet, or mainnet
STELLAR_RPC_URL: str = "http://localhost:8002/soroban/rpc"
STELLAR_NETWORK_PASSPHRASE: str = "Standalone Network ; September 2022"
STELLAR_ESCROW_PUBLIC: str | None = None
SOROBAN_RPC_URL: str = "https://soroban-testnet.stellar.org"
SOROBAN_NETWORK_PASSPHRASE: str = "Test SDF Network ; September 2015"
ESCROW_CONTRACT_ID: str | None = None
REPUTATION_CONTRACT_ID: str | None = None

# Webhook Listener Configuration
WEBHOOK_LISTENER_ENABLED: bool = True
EVENT_STREAM_INTERVAL: int = 5 # seconds between poll retries
EVENT_CURSOR_TTL: int = 86400 # 24 hours in seconds

model_config = {"env_file": ".env", "case_sensitive": True, "extra": "ignore"}


settings = Settings()
67 changes: 67 additions & 0 deletions backend/app/db/migrations/004_add_events_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Migration to add events table for tracking on-chain events.

This table stores Soroban events with their processing status
and cursor information for idempotent event handling.
"""

import logging
from sqlalchemy import (
Column,
String,
DateTime,
Text,
Boolean,
Index,
Integer,
Table,
func,
)
from app.db.base import Base

logger = logging.getLogger(__name__)

def upgrade():
"""Create events table."""
events_table = Table(
"events",
Base.metadata,
Column("id", String(64), primary_key=True, index=True),
Column("transaction_hash", String(64), nullable=False, index=True),
Column("contract_address", String(64), nullable=False, index=True),
Column("topic", String(128), nullable=False, index=True),
Column("body", Text, nullable=False),
Column("ledger_sequence", Integer, nullable=False, index=True),
Column("created_at", DateTime(timezone=True), server_default=func.now()),
Column("processed_at", DateTime(timezone=True), nullable=True),
Column("processing_status", String(20), default="pending"),
Column("error_message", Text, nullable=True),
Column("retry_count", Integer, default=0),
)

# Index for efficient lookup of unprocessed events
Index(
"ix_events_unprocessed",
events_table.c.contract_address,
events_table.c.topic,
events_table.c.processing_status,
)

# Index for cursor-based pagination
Index(
"ix_events_cursor",
events_table.c.contract_address,
events_table.c.ledger_sequence,
)

events_table.create(checkfirst=True)
logger.info("Events table created successfully")

def downgrade():
"""Drop events table."""
events_table = Table(
"events",
Base.metadata,
)
events_table.drop(checkfirst=True)
logger.info("Events table dropped successfully")
23 changes: 15 additions & 8 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ async def lifespan(app: FastAPI):
yield
# Shutdown
await cache.close() # Cambio: disconnect() -> close()


app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
Expand Down Expand Up @@ -54,8 +52,6 @@ async def lifespan(app: FastAPI):

# Include API router
app.include_router(api_router, prefix=settings.API_V1_STR)


@app.get("/")
def root():
"""
Expand All @@ -66,9 +62,8 @@ def root():
"version": "1.0.0",
"docs": "/docs",
"health": f"{settings.API_V1_STR}/health",
"webhook_listener": f"{settings.API_V1_STR}/webhook/listener",
}


@app.get("/test-redis")
async def test_redis():
"""Test Redis connection and basic operations"""
Expand All @@ -87,8 +82,6 @@ async def test_redis():
}
except Exception as e:
return {"redis_status": "error", "error": str(e)}


@app.get("/test-db")
async def test_database(db: Session = Depends(get_db)):
"""Test database connection"""
Expand All @@ -104,3 +97,17 @@ async def test_database(db: Session = Depends(get_db)):
}
except Exception as e:
return {"database_status": "error", "error": str(e)}
@app.post("/webhook/listener", summary="Webhook listener endpoint for Soroban events")
async def webhook_listener():
"""
HTTP endpoint to receive webhook notifications from external systems.

This endpoint allows external systems to notify about event updates
that may need to be processed. The main event streaming should still
use the SorobanServer.stream() in WebhookListenerService.
"""
return {
"status": "available",
"message": "Webhook listener is operational",
"note": "Main event streaming runs via SorobanServer.stream()",
}
Loading
Loading