Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS sports_rate_budget;
19 changes: 19 additions & 0 deletions channels/sports/service/migrations/120000000008_rate_budget.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Per-host daily request consumption for the api-sports.io rate budget.
--
-- The in-memory RateLimiter (src/types.rs) budgets 7,500 requests/day per
-- sport host, but a pod restart (deploy, OOM, node drain) used to reset
-- every bucket back to the full quota while the upstream counter only
-- resets at UTC midnight — letting a restarted pod overshoot the daily
-- quota. The service flushes its consumed counts here periodically and
-- seeds the limiter from today's row on startup.
--
-- One row per (host, UTC day). `consumed` is monotonically increasing
-- within a day; rows older than a week are pruned by the daily reset task.

CREATE TABLE IF NOT EXISTS sports_rate_budget (
host TEXT NOT NULL,
day DATE NOT NULL,
consumed INTEGER NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (host, day)
);
77 changes: 75 additions & 2 deletions channels/sports/service/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{env, time::Duration, sync::Arc};
use std::{collections::HashMap, env, time::Duration, sync::Arc};
use anyhow::{Context, Result};
use sqlx::postgres::PgPoolOptions;
pub use sqlx::PgPool;
use sqlx::{FromRow, query, query_as};
use chrono::Utc;
use chrono::{NaiveDate, Utc};
use serde::Deserialize;

/// Build the sqlx migrator for this service.
Expand Down Expand Up @@ -421,6 +421,79 @@ pub async fn cleanup_old_games(pool: &Arc<PgPool>) -> Result<u64> {
Ok(result.rows_affected())
}

// =============================================================================
// Rate budget persistence — sports_rate_budget (host, UTC day) → consumed
// =============================================================================

/// Load today's (UTC) consumed request count per host. Used once on startup
/// to seed the RateLimiter so a mid-day restart resumes from the quota
/// actually spent instead of a fresh full budget.
///
/// Errors are logged and yield an empty map (fail open: the service starts
/// with full budgets, which is exactly the pre-persistence behavior). The
/// header clamp in RateLimiter::update still catches the overshoot.
pub async fn get_consumed_today(pool: &Arc<PgPool>) -> HashMap<String, u32> {
let today = Utc::now().date_naive();
let result: Result<Vec<(String, i32)>, sqlx::Error> = async {
let mut conn = pool.acquire().await?;
let rows = query_as("SELECT host, consumed FROM sports_rate_budget WHERE day = $1")
.bind(today)
.fetch_all(&mut *conn)
.await?;
Ok(rows)
}.await;

match result {
Ok(rows) => rows
.into_iter()
.map(|(host, consumed)| (host, consumed.max(0) as u32))
.collect(),
Err(e) => {
log::error!("Failed to load persisted rate-budget consumption, starting from full quota: {}", e);
HashMap::new()
}
}
}

/// Add per-host consumption deltas to the given UTC day's rows. Additive
/// upsert (`consumed + delta`, not overwrite) so flushes are safe even if
/// another writer touched the row between flushes.
pub async fn add_consumed(
pool: &Arc<PgPool>,
day: NaiveDate,
deltas: &HashMap<String, u32>,
) -> Result<()> {
let mut conn = pool.acquire().await?;
for (host, delta) in deltas {
query(
"INSERT INTO sports_rate_budget (host, day, consumed)
VALUES ($1, $2, $3)
ON CONFLICT (host, day) DO UPDATE SET
consumed = sports_rate_budget.consumed + EXCLUDED.consumed,
updated_at = NOW()"
)
.bind(host)
.bind(day)
.bind(*delta as i32)
.execute(&mut *conn)
.await?;
}
Ok(())
}

/// Delete rate-budget rows older than a week. Only today's row is ever read
/// back; a week of history is kept for operator debugging (e.g. comparing
/// against the api-sports dashboard after a quota incident).
pub async fn prune_rate_budget(pool: &Arc<PgPool>) -> Result<u64> {
let cutoff = Utc::now().date_naive() - chrono::Days::new(7);
let mut conn = pool.acquire().await?;
let result = query("DELETE FROM sports_rate_budget WHERE day < $1")
.bind(cutoff)
.execute(&mut *conn)
.await?;
Ok(result.rows_affected())
}

// =============================================================================
// Game upsert
// =============================================================================
Expand Down
6 changes: 6 additions & 0 deletions channels/sports/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ pub async fn poll_standings(
url = format!("{}&sport={}", url, league.sport_api);
}

// Standings requests don't go through try_consume (has_budget gate
// above), so record them against the persisted consumption count.
rate_limiter.note_consumed(&league.sport_api, 1);
match client.get(&url).send().await {
Ok(resp) => {
if let Some(remaining) = resp.headers()
Expand Down Expand Up @@ -920,6 +923,9 @@ pub async fn poll_teams(
url = format!("{}&sport={}", url, league.sport_api);
}

// Teams requests don't go through try_consume (has_budget gate
// above), so record them against the persisted consumption count.
rate_limiter.note_consumed(&league.sport_api, 1);
match client.get(&url).send().await {
Ok(resp) => {
if let Some(remaining) = resp.headers()
Expand Down
85 changes: 83 additions & 2 deletions channels/sports/service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use sports_service::{
database::initialize_pool,
database::{add_consumed, get_consumed_today, initialize_pool, prune_rate_budget, PgPool},
init::{fatal, spawn_supervised, ReadinessGate, ReadinessSnapshot},
init_sports_service,
log::init_async_logger,
Expand Down Expand Up @@ -52,6 +52,31 @@ const READINESS_BRIDGE_INTERVAL: Duration = Duration::from_secs(10);
/// the per-league budget allocation.
const SPORTS_DAILY_QUOTA: u32 = 7500;

/// How often accumulated per-host request consumption is flushed to the
/// `sports_rate_budget` table. Batched rather than per-request to keep DB
/// write load negligible (at most one upsert per host per interval). An
/// unclean shutdown can lose up to one interval's worth of counts — small,
/// bounded slack compared to the full-quota reset this persistence prevents.
const BUDGET_FLUSH_INTERVAL_SECS: u64 = 60;

/// Persist accumulated per-host consumption to Postgres, attributed to the
/// current UTC day. On write failure the deltas are handed back to the
/// in-memory buffer so the next flush retries them — dropping them would
/// under-count after a restart and let the service overshoot the quota.
async fn flush_consumed(pool: &Arc<PgPool>, rate_limiter: &Arc<RateLimiter>) {
let deltas = rate_limiter.take_consumed();
if deltas.is_empty() {
return;
}
let day = chrono::Utc::now().date_naive();
if let Err(e) = add_consumed(pool, day, &deltas).await {
eprintln!("[Rate Budget] Failed to persist consumed counts, will retry: {e:#}");
for (host, n) in deltas {
rate_limiter.note_consumed(&host, n);
}
}
}

/// Initialize Sentry. The returned guard MUST live for the lifetime of
/// the process — Drop flushes pending events on shutdown. Sentry MUST
/// initialize before the Tokio runtime starts (the crate's docs forbid
Expand Down Expand Up @@ -219,7 +244,24 @@ async fn run_service() -> Result<()> {
// in-season league can borrow from when its reserved budget is
// exhausted. Prevents Champions League knockout nights from starving
// Premier League polls.
let rate_limiter = Arc::new(RateLimiter::new_per_league(&leagues, SPORTS_DAILY_QUOTA));
//
// Budgets are seeded from today's persisted consumption so a pod
// restart mid-day (deploy, OOM, node drain) resumes from the quota
// actually remaining — the upstream counter only resets at UTC
// midnight, while a fresh in-memory limiter would grant itself the
// full 7,500 again.
let consumed_today = get_consumed_today(&pool).await;
if !consumed_today.is_empty() {
println!(
"[Rate Budget] Seeding budgets from persisted consumption: {:?}",
consumed_today
);
}
let rate_limiter = Arc::new(RateLimiter::new_per_league_seeded(
&leagues,
SPORTS_DAILY_QUOTA,
&consumed_today,
));

let client = Arc::new(client);
let leagues = Arc::new(leagues);
Expand Down Expand Up @@ -355,7 +397,35 @@ async fn run_service() -> Result<()> {
}
});

// ── Periodic flush: persist consumed counts to Postgres ──────────
// Drains the RateLimiter's per-host consumption buffer into the
// sports_rate_budget table so the counts survive a pod restart
// (see the seeding above). Final flush on shutdown so a graceful
// termination loses nothing.
let pool_flush = pool.clone();
let rl_flush = rate_limiter.clone();
let cancel_flush = cancel_bg.clone();
spawn_supervised("sports-budget-flush", async move {
println!(
"Starting rate-budget flush loop (every {}s)...",
BUDGET_FLUSH_INTERVAL_SECS
);
loop {
tokio::select! {
_ = cancel_flush.cancelled() => {
flush_consumed(&pool_flush, &rl_flush).await;
println!("Budget flush loop shutting down (final flush done)...");
break;
}
_ = tokio::time::sleep(Duration::from_secs(BUDGET_FLUSH_INTERVAL_SECS)) => {
flush_consumed(&pool_flush, &rl_flush).await;
}
}
}
});

// ── Daily reset: rate budgets at UTC midnight ─────────────────────
let pool_reset = pool.clone();
let leagues_reset = leagues.clone();
let rl_reset = rate_limiter.clone();
let cancel_reset = cancel_bg.clone();
Expand All @@ -377,7 +447,18 @@ async fn run_service() -> Result<()> {
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(wait_secs)) => {
// Drain the consumption buffer before resetting. We're
// just past midnight here, so up to one flush-interval
// of pre-midnight requests lands in the new day's row —
// a small, conservative error (shrinks the new day's
// seed slightly on a later restart).
flush_consumed(&pool_reset, &rl_reset).await;
rl_reset.reset_daily(&leagues_reset, SPORTS_DAILY_QUOTA);
match prune_rate_budget(&pool_reset).await {
Ok(n) if n > 0 => println!("[Rate Budget] Pruned {n} old budget rows"),
Ok(_) => {}
Err(e) => eprintln!("[Rate Budget] Failed to prune old budget rows: {e:#}"),
}
println!("[Rate Budget] Daily reset completed at UTC midnight");
}
}
Expand Down
Loading