Skip to content

feat: Redis leader election for processor coordination#3

Open
Dafuriousis wants to merge 9 commits intodevelopfrom
feat/redis-leader-election
Open

feat: Redis leader election for processor coordination#3
Dafuriousis wants to merge 9 commits intodevelopfrom
feat/redis-leader-election

Conversation

@Dafuriousis
Copy link
Copy Markdown
Owner

Implements Redis-based leader election for horizontal scaling.

Changes:

  • LeaderElection struct in lock_manager.rs:
    • try_acquire_leadership(): SET NX EX 30s, renews if already leader
    • publish_heartbeat(): processor:heartbeat:{instance_id} key TTL 45s
    • list_active_instances(): scans heartbeat keys
    • current_leader(): reads leader key
  • processor.rs: run_processor_with_leader_election() integrates leader election;
    all instances run process_batch (SKIP LOCKED safe)
  • services/mod.rs: export LeaderElection
  • main.rs: leader election + heartbeat task spawned at startup (10s interval)
  • GET /admin/instances: lists active instances and current leader

Mac-5 and others added 9 commits April 22, 2026 05:37
- Fix queries.rs: restore missing get_audit_logs body, fix get_daily_totals mapping, add get_asset_stats
- Fix duplicate admin module (remove admin.rs, keep admin/mod.rs)
- Add opentelemetry/tracing-opentelemetry crates to Cargo.toml
- Fix telemetry.rs API for opentelemetry_sdk 0.21 (with_config, force_flush)
- Add profiling_manager field to AppState and all AppState constructors
- Add tenant_configs field + get_tenant_config/load_tenant_configs to AppState
- Add AppState::test_new for integration tests
- Add Debug impl for ApiState (manual, avoids AppSchema constraint)
- Add Clone derive to ProfilingManager
- Fix stellar/client.rs: restore trace propagation imports
- Fix webhook_replay.rs: add missing sqlx::Row import
- Fix startup.rs tests: add missing otlp_endpoint field
- Fix startup_validation_test.rs: add missing otlp_endpoint field
- Add assert_cmd to dev-dependencies
- Add missing down migration files for 4 migrations
- Mark all external-service-dependent tests as #[ignore]
- Add migration to backfill partitions from 2025-01 through 3 months ahead
- Fix dlq_test setup_db to ensure current month partition exists before inserting
…ok-replay-fk-constraint

fix: use composite FK for webhook_replay_history referencing partitio…
…ch sizing

- Back-pressure: AtomicU64 pending_queue_depth refreshed every 5s; callback
  returns 503 + Retry-After when depth >= MAX_PENDING_QUEUE (default 10000)
- Dynamic DB pool: DB_MIN_CONNECTIONS/DB_MAX_CONNECTIONS config (default 5/50);
  pool_monitor_task logs CRITICAL after 3 consecutive checks at >=80% utilization
- Concurrent ProcessorPool: N workers (PROCESSOR_WORKERS default 4) with
  FOR UPDATE SKIP LOCKED, graceful shutdown via watch channel, per-worker metrics
- Adaptive batch sizing: EMA-based BatchSizer scaled by PROCESSOR_SCALING_FACTOR,
  clamped between PROCESSOR_MIN_BATCH and PROCESSOR_MAX_BATCH; exposed in /health
- LeaderElection struct in lock_manager.rs:
  - try_acquire_leadership(): SET NX EX 30s, renews if already leader
  - publish_heartbeat(): processor:heartbeat:{instance_id} key TTL 45s
  - list_active_instances(): scans heartbeat keys
  - current_leader(): reads leader key
- processor.rs: run_processor_with_leader_election() uses LeaderElection;
  all instances run process_batch (SKIP LOCKED safe), leader renews lease
- services/mod.rs: export LeaderElection
- main.rs: spawn leader election + heartbeat task (10s interval)
- admin/mod.rs: GET /admin/instances endpoint lists active instances + leader
…ckpressure-pool-processor

feat: back-pressure, dynamic pool, concurrent processor, adaptive bat…
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants