diff --git a/backend/docs/CACHE_PRIMING_STRATEGY.md b/backend/docs/CACHE_PRIMING_STRATEGY.md new file mode 100644 index 0000000..dd7c104 --- /dev/null +++ b/backend/docs/CACHE_PRIMING_STRATEGY.md @@ -0,0 +1,45 @@ +# Cache Priming Strategy + +## Overview +To ensure low-latency performance from the first user request after a deployment or restart, the system implements a **Cache Priming** mechanism. This pre-populates the Redis cache with critical data that would otherwise require expensive computations or external API calls. + +## Priming Priorities + +### High Priority (Hourly) +These entries are critical for the dashboard and main landing pages. +- **Protocol Stats**: Global TVL, volume, and active bridge counts. +- **Bridge Comparisons**: Cross-chain stats for all registered bridges. +- **Major Prices**: Real-time prices for top assets (XLM, USDC, USDT, BTC, ETH). +- **Top Performers**: Health score and TVL rankings for top 10 assets/bridges. + +### Low Priority (Daily) +These entries are less frequently accessed but still beneficial to have cached. +- **All Asset Rankings**: Detailed health and liquidity stats for all supported assets. +- **Long-tail Prices**: Prices for all remaining supported assets. + +## Implementation Details + +### Service +The `CachePrimerService` manages the execution of priming tasks. It handles: +- **Partial Fill**: Failure in one task does not stop the entire process. +- **Concurrency**: Tasks are executed sequentially or in small batches to avoid overloading external APIs (e.g. CoinGecko rate limits). +- **Metrics**: Every attempt, success, and failure is recorded in Prometheus. + +### Execution Hooks +1. **Startup**: The system runs a `HIGH` priority priming job immediately after the server starts. +2. **Scheduled**: + - **Hourly**: Refreshes high-priority entries. + - **Daily (03:00 UTC)**: Performs a full priming of all entries. + +## Monitoring +Monitor the following metrics in Prometheus/Grafana: +- `cache_priming_total`: Number of priming attempts. +- `cache_priming_success_total`: Successful completions. +- `cache_priming_failure_total`: Failures with error reasons. +- `cache_priming_duration_seconds`: Time taken for each task. + +## Troubleshooting +If failure rates increase: +1. Check upstream API status (Circle, CoinGecko). +2. Verify Redis connectivity. +3. Check logs for specific task errors (e.g. `protocol_stats` failing). diff --git a/backend/src/jobs/cacheWarming.ts b/backend/src/jobs/cacheWarming.ts index 38d66df..8efeadb 100644 --- a/backend/src/jobs/cacheWarming.ts +++ b/backend/src/jobs/cacheWarming.ts @@ -1,49 +1,14 @@ -import { AnalyticsService } from "../services/analytics.service.js"; -import { PriceService } from "../services/price.service.js"; -import { SUPPORTED_ASSETS } from "../config/index.js"; +import { cachePrimerService, CachePriority } from "../services/cachePrimer.service.js"; import { logger } from "../utils/logger.js"; /** - * Cache warming script/job to pre-fetch expensive data. + * Cache warming script to manually trigger priming from CLI. */ -export async function runCacheWarming() { - logger.info("Starting cache warming process..."); - const analyticsService = new AnalyticsService(); - const priceService = new PriceService(); - +export async function runCacheWarming(priority?: CachePriority) { + logger.info({ priority }, "Starting manual cache warming process..."); + try { - // We pass bypassCache=true to force a refresh of the underlying data into Redis - const bypassCache = true; - - // Pre-fetch protocol stats - logger.info("Warming protocol stats"); - await analyticsService.getProtocolStats(bypassCache); - - // Pre-fetch bridge comparisons - logger.info("Warming bridge comparisons"); - await analyticsService.getBridgeComparisons(bypassCache); - - // Pre-fetch asset rankings - logger.info("Warming asset rankings"); - await analyticsService.getAssetRankings(bypassCache); - - // Pre-fetch health/tvl/volume top performers - logger.info("Warming top performers"); - await analyticsService.getTopPerformers("assets", "health", 10, bypassCache); - await analyticsService.getTopPerformers("bridges", "tvl", 10, bypassCache); - - // Provide warm price aggregated data - logger.info("Warming price aggregates for supported assets"); - for (const asset of SUPPORTED_ASSETS) { - if (asset.code !== "native" && asset.code !== "XLM") { - try { - await priceService.getAggregatedPrice(asset.code, bypassCache); - } catch (e) { - logger.warn({ asset: asset.code }, "Could not warm price for asset"); - } - } - } - + await cachePrimerService.prime(priority); logger.info("Cache warming successfully completed"); } catch (error) { logger.error({ error }, "Cache warming failed"); @@ -54,5 +19,6 @@ export async function runCacheWarming() { // Automatically runs if executed as script directly // @ts-expect-error - Required for direct execution script detection if (import.meta.url === `file://${process.argv[1]}`) { - runCacheWarming().then(() => process.exit(0)).catch(() => process.exit(1)); + const priority = process.argv[2] as CachePriority; + runCacheWarming(priority).then(() => process.exit(0)).catch(() => process.exit(1)); } diff --git a/backend/src/services/cachePrimer.service.ts b/backend/src/services/cachePrimer.service.ts new file mode 100644 index 0000000..5283c07 --- /dev/null +++ b/backend/src/services/cachePrimer.service.ts @@ -0,0 +1,114 @@ +import { AnalyticsService } from "./analytics.service.js"; +import { PriceService } from "./price.service.js"; +import { SUPPORTED_ASSETS } from "../config/index.js"; +import { logger } from "../utils/logger.js"; +import { getMetricsService } from "./metrics.service.js"; + +export enum CachePriority { + HIGH = "high", + LOW = "low", +} + +export interface PrimingTask { + name: string; + priority: CachePriority; + execute: () => Promise; +} + +export class CachePrimerService { + private analyticsService = new AnalyticsService(); + private priceService = new PriceService(); + private metricsService = getMetricsService(); + + /** + * Prime the cache with all high and low priority entries. + */ + async prime(priorityFilter?: CachePriority): Promise { + const startTime = Date.now(); + logger.info({ priorityFilter }, "Starting cache priming job"); + + const tasks: PrimingTask[] = [ + // HIGH PRIORITY: Protocol Stats + { + name: "protocol_stats", + priority: CachePriority.HIGH, + execute: () => this.analyticsService.getProtocolStats(true), + }, + // HIGH PRIORITY: Bridge Comparisons + { + name: "bridge_comparisons", + priority: CachePriority.HIGH, + execute: () => this.analyticsService.getBridgeComparisons(true), + }, + // HIGH PRIORITY: Top Assets by Health + { + name: "top_assets_health", + priority: CachePriority.HIGH, + execute: () => this.analyticsService.getTopPerformers("assets", "health", 10, true), + }, + // HIGH PRIORITY: Prices for major assets + { + name: "major_prices", + priority: CachePriority.HIGH, + execute: async () => { + const majorAssets = ["XLM", "USDC", "USDT", "BTC", "ETH"]; + await Promise.allSettled( + majorAssets.map((symbol) => this.priceService.getAggregatedPrice(symbol, true)) + ); + }, + }, + // LOW PRIORITY: Remaining Asset Rankings + { + name: "asset_rankings", + priority: CachePriority.LOW, + execute: () => this.analyticsService.getAssetRankings(true), + }, + // LOW PRIORITY: All other prices + { + name: "all_prices", + priority: CachePriority.LOW, + execute: async () => { + const otherAssets = SUPPORTED_ASSETS.filter( + (a) => !["XLM", "USDC", "USDT", "BTC", "ETH"].includes(a.code) && a.code !== "native" + ); + for (const asset of otherAssets) { + await this.priceService.getAggregatedPrice(asset.code, true).catch(() => {}); + } + }, + }, + ]; + + const filteredTasks = priorityFilter + ? tasks.filter(t => t.priority === priorityFilter) + : tasks; + + let successCount = 0; + let failureCount = 0; + + for (const task of filteredTasks) { + const taskStart = Date.now(); + try { + this.metricsService.cachePrimingTotal.inc({ task_name: task.name }); + await task.execute(); + const duration = (Date.now() - taskStart) / 1000; + this.metricsService.cachePrimingSuccess.inc({ task_name: task.name }); + this.metricsService.cachePrimingDuration.observe({ task_name: task.name }, duration); + successCount++; + logger.debug({ task: task.name, duration }, "Cache priming task completed"); + } catch (error) { + failureCount++; + const reason = error instanceof Error ? error.message : "unknown"; + this.metricsService.cachePrimingFailure.inc({ task_name: task.name, reason }); + logger.error({ task: task.name, error }, "Cache priming task failed"); + } + } + + const totalDuration = Date.now() - startTime; + logger.info( + { successCount, failureCount, totalDuration }, + "Cache priming job completed" + ); + } +} + +export const cachePrimerService = new CachePrimerService(); diff --git a/backend/src/services/metrics.service.ts b/backend/src/services/metrics.service.ts index 9024e02..1c762f1 100644 --- a/backend/src/services/metrics.service.ts +++ b/backend/src/services/metrics.service.ts @@ -45,6 +45,10 @@ class MetricsService { public cacheMisses: Counter; public cacheSize: Gauge; public cacheEvictions: Counter; + public cachePrimingTotal: Counter; + public cachePrimingSuccess: Counter; + public cachePrimingFailure: Counter; + public cachePrimingDuration: Histogram; // API Key Metrics public apiKeyRequests: Counter; @@ -85,6 +89,10 @@ class MetricsService { this.cacheMisses = undefined as any; this.cacheSize = undefined as any; this.cacheEvictions = undefined as any; + this.cachePrimingTotal = undefined as any; + this.cachePrimingSuccess = undefined as any; + this.cachePrimingFailure = undefined as any; + this.cachePrimingDuration = undefined as any; this.apiKeyRequests = undefined as any; this.apiKeyRateLimitHits = undefined as any; this.websocketConnections = undefined as any; @@ -297,6 +305,35 @@ class MetricsService { registers: [this.registry], }); + this.cachePrimingTotal = new Counter({ + name: "cache_priming_total", + help: "Total number of cache priming attempts", + labelNames: ["task_name"], + registers: [this.registry], + }); + + this.cachePrimingSuccess = new Counter({ + name: "cache_priming_success_total", + help: "Total number of successful cache priming tasks", + labelNames: ["task_name"], + registers: [this.registry], + }); + + this.cachePrimingFailure = new Counter({ + name: "cache_priming_failure_total", + help: "Total number of failed cache priming tasks", + labelNames: ["task_name", "reason"], + registers: [this.registry], + }); + + this.cachePrimingDuration = new Histogram({ + name: "cache_priming_duration_seconds", + help: "Duration of cache priming tasks in seconds", + labelNames: ["task_name"], + buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60], + registers: [this.registry], + }); + // API Key Metrics this.apiKeyRequests = new Counter({ name: "api_key_requests_total", diff --git a/backend/src/workers/cachePrimer.job.ts b/backend/src/workers/cachePrimer.job.ts new file mode 100644 index 0000000..5b004af --- /dev/null +++ b/backend/src/workers/cachePrimer.job.ts @@ -0,0 +1,20 @@ +import { Job } from "bullmq"; +import { cachePrimerService, CachePriority } from "../services/cachePrimer.service.js"; +import { logger } from "../utils/logger.js"; + +/** + * Worker processor for cache priming jobs. + */ +export async function processCachePriming(job: Job): Promise { + const { priority } = job.data as { priority?: CachePriority }; + + logger.info({ jobId: job.id, priority }, "Processing cache priming job"); + + try { + await cachePrimerService.prime(priority); + return { success: true, priority }; + } catch (error) { + logger.error({ jobId: job.id, error }, "Cache priming job failed"); + throw error; + } +} diff --git a/backend/src/workers/index.ts b/backend/src/workers/index.ts index 2a668c1..5e28e74 100644 --- a/backend/src/workers/index.ts +++ b/backend/src/workers/index.ts @@ -11,17 +11,18 @@ import { processExternalDependencyMonitor } from "./externalDependencyMonitor.jo import { logger } from "../utils/logger.js"; import { initSupplyVerificationJob } from "../jobs/supplyVerification.job.js"; import { runAuditRetentionJob } from "../jobs/auditRetention.job.js"; -import { runPriceCacheWarmup } from "../jobs/priceCacheWarmup.job.js"; +import { processCachePriming } from "./cachePrimer.job.js"; export async function initJobSystem() { const jobQueue = JobQueue.getInstance(); - // Run price cache warmup on startup + // Run high-priority cache priming on startup try { - logger.info("Running startup price cache warmup"); - await runPriceCacheWarmup(); + const { cachePrimerService, CachePriority } = await import("../services/cachePrimer.service.js"); + logger.info("Running startup high-priority cache priming"); + await cachePrimerService.prime(CachePriority.HIGH); } catch (error) { - logger.error({ error }, "Startup price cache warmup failed, continuing with job initialization"); + logger.error({ error }, "Startup cache priming failed, continuing with job initialization"); } // Initialize worker with processor @@ -57,6 +58,9 @@ export async function initJobSystem() { case "external-dependency-monitor": await processExternalDependencyMonitor(job); break; + case "cache-priming": + await processCachePriming(job); + break; default: logger.warn({ jobName: job.name }, "Unknown job name in worker"); } @@ -126,5 +130,9 @@ export async function initJobSystem() { // External dependency checks: every 2 minutes await jobQueue.addRepeatableJob("external-dependency-monitor", {}, "*/2 * * * *"); + // Cache priming: High priority every hour, Full every day at 03:00 UTC + await jobQueue.addRepeatableJob("cache-priming", { priority: "high" }, "0 * * * *"); + await jobQueue.addRepeatableJob("cache-priming", {}, "0 3 * * *"); + logger.info("Scheduled job system initialized"); }