Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions backend/docs/CACHE_PRIMING_STRATEGY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Cache Priming Strategy

## Overview
To ensure low-latency performance from the first user request after a deployment or restart, the system implements a **Cache Priming** mechanism. This pre-populates the Redis cache with critical data that would otherwise require expensive computations or external API calls.

## Priming Priorities

### High Priority (Hourly)
These entries are critical for the dashboard and main landing pages.
- **Protocol Stats**: Global TVL, volume, and active bridge counts.
- **Bridge Comparisons**: Cross-chain stats for all registered bridges.
- **Major Prices**: Real-time prices for top assets (XLM, USDC, USDT, BTC, ETH).
- **Top Performers**: Health score and TVL rankings for top 10 assets/bridges.

### Low Priority (Daily)
These entries are less frequently accessed but still beneficial to have cached.
- **All Asset Rankings**: Detailed health and liquidity stats for all supported assets.
- **Long-tail Prices**: Prices for all remaining supported assets.

## Implementation Details

### Service
The `CachePrimerService` manages the execution of priming tasks. It handles:
- **Partial Fill**: Failure in one task does not stop the entire process.
- **Concurrency**: Tasks are executed sequentially or in small batches to avoid overloading external APIs (e.g. CoinGecko rate limits).
- **Metrics**: Every attempt, success, and failure is recorded in Prometheus.

### Execution Hooks
1. **Startup**: The system runs a `HIGH` priority priming job immediately after the server starts.
2. **Scheduled**:
- **Hourly**: Refreshes high-priority entries.
- **Daily (03:00 UTC)**: Performs a full priming of all entries.

## Monitoring
Monitor the following metrics in Prometheus/Grafana:
- `cache_priming_total`: Number of priming attempts.
- `cache_priming_success_total`: Successful completions.
- `cache_priming_failure_total`: Failures with error reasons.
- `cache_priming_duration_seconds`: Time taken for each task.

## Troubleshooting
If failure rates increase:
1. Check upstream API status (Circle, CoinGecko).
2. Verify Redis connectivity.
3. Check logs for specific task errors (e.g. `protocol_stats` failing).
50 changes: 8 additions & 42 deletions backend/src/jobs/cacheWarming.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,14 @@
import { AnalyticsService } from "../services/analytics.service.js";
import { PriceService } from "../services/price.service.js";
import { SUPPORTED_ASSETS } from "../config/index.js";
import { cachePrimerService, CachePriority } from "../services/cachePrimer.service.js";
import { logger } from "../utils/logger.js";

/**
* Cache warming script/job to pre-fetch expensive data.
* Cache warming script to manually trigger priming from CLI.
*/
export async function runCacheWarming() {
logger.info("Starting cache warming process...");
const analyticsService = new AnalyticsService();
const priceService = new PriceService();

export async function runCacheWarming(priority?: CachePriority) {
logger.info({ priority }, "Starting manual cache warming process...");

try {
// We pass bypassCache=true to force a refresh of the underlying data into Redis
const bypassCache = true;

// Pre-fetch protocol stats
logger.info("Warming protocol stats");
await analyticsService.getProtocolStats(bypassCache);

// Pre-fetch bridge comparisons
logger.info("Warming bridge comparisons");
await analyticsService.getBridgeComparisons(bypassCache);

// Pre-fetch asset rankings
logger.info("Warming asset rankings");
await analyticsService.getAssetRankings(bypassCache);

// Pre-fetch health/tvl/volume top performers
logger.info("Warming top performers");
await analyticsService.getTopPerformers("assets", "health", 10, bypassCache);
await analyticsService.getTopPerformers("bridges", "tvl", 10, bypassCache);

// Provide warm price aggregated data
logger.info("Warming price aggregates for supported assets");
for (const asset of SUPPORTED_ASSETS) {
if (asset.code !== "native" && asset.code !== "XLM") {
try {
await priceService.getAggregatedPrice(asset.code, bypassCache);
} catch (e) {
logger.warn({ asset: asset.code }, "Could not warm price for asset");
}
}
}

await cachePrimerService.prime(priority);
logger.info("Cache warming successfully completed");
} catch (error) {
logger.error({ error }, "Cache warming failed");
Expand All @@ -54,5 +19,6 @@ export async function runCacheWarming() {
// Automatically runs if executed as script directly
// @ts-expect-error - Required for direct execution script detection
if (import.meta.url === `file://${process.argv[1]}`) {
runCacheWarming().then(() => process.exit(0)).catch(() => process.exit(1));
const priority = process.argv[2] as CachePriority;
runCacheWarming(priority).then(() => process.exit(0)).catch(() => process.exit(1));
}
114 changes: 114 additions & 0 deletions backend/src/services/cachePrimer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { AnalyticsService } from "./analytics.service.js";
import { PriceService } from "./price.service.js";
import { SUPPORTED_ASSETS } from "../config/index.js";
import { logger } from "../utils/logger.js";
import { getMetricsService } from "./metrics.service.js";

export enum CachePriority {
HIGH = "high",
LOW = "low",
}

export interface PrimingTask {
name: string;
priority: CachePriority;
execute: () => Promise<void>;
}

export class CachePrimerService {
private analyticsService = new AnalyticsService();
private priceService = new PriceService();
private metricsService = getMetricsService();

/**
* Prime the cache with all high and low priority entries.
*/
async prime(priorityFilter?: CachePriority): Promise<void> {
const startTime = Date.now();
logger.info({ priorityFilter }, "Starting cache priming job");

const tasks: PrimingTask[] = [
// HIGH PRIORITY: Protocol Stats
{
name: "protocol_stats",
priority: CachePriority.HIGH,
execute: () => this.analyticsService.getProtocolStats(true),
},
// HIGH PRIORITY: Bridge Comparisons
{
name: "bridge_comparisons",
priority: CachePriority.HIGH,
execute: () => this.analyticsService.getBridgeComparisons(true),
},
// HIGH PRIORITY: Top Assets by Health
{
name: "top_assets_health",
priority: CachePriority.HIGH,
execute: () => this.analyticsService.getTopPerformers("assets", "health", 10, true),
},
// HIGH PRIORITY: Prices for major assets
{
name: "major_prices",
priority: CachePriority.HIGH,
execute: async () => {
const majorAssets = ["XLM", "USDC", "USDT", "BTC", "ETH"];
await Promise.allSettled(
majorAssets.map((symbol) => this.priceService.getAggregatedPrice(symbol, true))
);
},
},
// LOW PRIORITY: Remaining Asset Rankings
{
name: "asset_rankings",
priority: CachePriority.LOW,
execute: () => this.analyticsService.getAssetRankings(true),
},
// LOW PRIORITY: All other prices
{
name: "all_prices",
priority: CachePriority.LOW,
execute: async () => {
const otherAssets = SUPPORTED_ASSETS.filter(
(a) => !["XLM", "USDC", "USDT", "BTC", "ETH"].includes(a.code) && a.code !== "native"
);
for (const asset of otherAssets) {
await this.priceService.getAggregatedPrice(asset.code, true).catch(() => {});
}
},
},
];

const filteredTasks = priorityFilter
? tasks.filter(t => t.priority === priorityFilter)
: tasks;

let successCount = 0;
let failureCount = 0;

for (const task of filteredTasks) {
const taskStart = Date.now();
try {
this.metricsService.cachePrimingTotal.inc({ task_name: task.name });
await task.execute();
const duration = (Date.now() - taskStart) / 1000;
this.metricsService.cachePrimingSuccess.inc({ task_name: task.name });
this.metricsService.cachePrimingDuration.observe({ task_name: task.name }, duration);
successCount++;
logger.debug({ task: task.name, duration }, "Cache priming task completed");
} catch (error) {
failureCount++;
const reason = error instanceof Error ? error.message : "unknown";
this.metricsService.cachePrimingFailure.inc({ task_name: task.name, reason });
logger.error({ task: task.name, error }, "Cache priming task failed");
}
}

const totalDuration = Date.now() - startTime;
logger.info(
{ successCount, failureCount, totalDuration },
"Cache priming job completed"
);
}
}

export const cachePrimerService = new CachePrimerService();
37 changes: 37 additions & 0 deletions backend/src/services/metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class MetricsService {
public cacheMisses: Counter;
public cacheSize: Gauge;
public cacheEvictions: Counter;
public cachePrimingTotal: Counter;
public cachePrimingSuccess: Counter;
public cachePrimingFailure: Counter;
public cachePrimingDuration: Histogram;

// API Key Metrics
public apiKeyRequests: Counter;
Expand Down Expand Up @@ -85,6 +89,10 @@ class MetricsService {
this.cacheMisses = undefined as any;
this.cacheSize = undefined as any;
this.cacheEvictions = undefined as any;
this.cachePrimingTotal = undefined as any;
this.cachePrimingSuccess = undefined as any;
this.cachePrimingFailure = undefined as any;
this.cachePrimingDuration = undefined as any;
this.apiKeyRequests = undefined as any;
this.apiKeyRateLimitHits = undefined as any;
this.websocketConnections = undefined as any;
Expand Down Expand Up @@ -297,6 +305,35 @@ class MetricsService {
registers: [this.registry],
});

this.cachePrimingTotal = new Counter({
name: "cache_priming_total",
help: "Total number of cache priming attempts",
labelNames: ["task_name"],
registers: [this.registry],
});

this.cachePrimingSuccess = new Counter({
name: "cache_priming_success_total",
help: "Total number of successful cache priming tasks",
labelNames: ["task_name"],
registers: [this.registry],
});

this.cachePrimingFailure = new Counter({
name: "cache_priming_failure_total",
help: "Total number of failed cache priming tasks",
labelNames: ["task_name", "reason"],
registers: [this.registry],
});

this.cachePrimingDuration = new Histogram({
name: "cache_priming_duration_seconds",
help: "Duration of cache priming tasks in seconds",
labelNames: ["task_name"],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
registers: [this.registry],
});

// API Key Metrics
this.apiKeyRequests = new Counter({
name: "api_key_requests_total",
Expand Down
20 changes: 20 additions & 0 deletions backend/src/workers/cachePrimer.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Job } from "bullmq";
import { cachePrimerService, CachePriority } from "../services/cachePrimer.service.js";
import { logger } from "../utils/logger.js";

/**
* Worker processor for cache priming jobs.
*/
export async function processCachePriming(job: Job): Promise<any> {
const { priority } = job.data as { priority?: CachePriority };

logger.info({ jobId: job.id, priority }, "Processing cache priming job");

try {
await cachePrimerService.prime(priority);
return { success: true, priority };
} catch (error) {
logger.error({ jobId: job.id, error }, "Cache priming job failed");
throw error;
}
}
18 changes: 13 additions & 5 deletions backend/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import { processExternalDependencyMonitor } from "./externalDependencyMonitor.jo
import { logger } from "../utils/logger.js";
import { initSupplyVerificationJob } from "../jobs/supplyVerification.job.js";
import { runAuditRetentionJob } from "../jobs/auditRetention.job.js";
import { runPriceCacheWarmup } from "../jobs/priceCacheWarmup.job.js";
import { processCachePriming } from "./cachePrimer.job.js";

export async function initJobSystem() {
const jobQueue = JobQueue.getInstance();

// Run price cache warmup on startup
// Run high-priority cache priming on startup
try {
logger.info("Running startup price cache warmup");
await runPriceCacheWarmup();
const { cachePrimerService, CachePriority } = await import("../services/cachePrimer.service.js");
logger.info("Running startup high-priority cache priming");
await cachePrimerService.prime(CachePriority.HIGH);
} catch (error) {
logger.error({ error }, "Startup price cache warmup failed, continuing with job initialization");
logger.error({ error }, "Startup cache priming failed, continuing with job initialization");
}

// Initialize worker with processor
Expand Down Expand Up @@ -57,6 +58,9 @@ export async function initJobSystem() {
case "external-dependency-monitor":
await processExternalDependencyMonitor(job);
break;
case "cache-priming":
await processCachePriming(job);
break;
default:
logger.warn({ jobName: job.name }, "Unknown job name in worker");
}
Expand Down Expand Up @@ -126,5 +130,9 @@ export async function initJobSystem() {
// External dependency checks: every 2 minutes
await jobQueue.addRepeatableJob("external-dependency-monitor", {}, "*/2 * * * *");

// Cache priming: High priority every hour, Full every day at 03:00 UTC
await jobQueue.addRepeatableJob("cache-priming", { priority: "high" }, "0 * * * *");
await jobQueue.addRepeatableJob("cache-priming", {}, "0 3 * * *");

logger.info("Scheduled job system initialized");
}
Loading