From deb1f20cd5d373d30d67e4c4c86bef135446c136 Mon Sep 17 00:00:00 2001 From: real-venus Date: Thu, 25 Jun 2026 22:23:59 -0700 Subject: [PATCH] feat: high-resolution timer wheel for scheduled Soroban executions (#56) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schedules up to 1,000 recurring contract executions with ms precision that keeps firing while the tab is throttled, via a Web Worker ticking on a SharedArrayBuffer. - types/scheduler.ts: wheel invariants (1024 slots x 100ms, 1000-job cap, +/-50ms tolerance, 200ms fallback), job/meta/status types, SAB Int32 layout, worker protocol - utils/sharedBuffer.ts: SharedArrayBuffer creation + atomic drift/heartbeat/ command helpers, availability check (COOP/COEP), futexWait/notify with non-shared fallback - utils/timerWheelCore.ts: pure hashed wheel (rounds + pending set so sub-slot jobs are never skipped); schedule/cancel/advance with recurrence, drift correction, missed-fire detection, and the 1000-job cap - workers/timerWheel.worker.ts: holds the wheel, sleeps each tick on Atomics.wait(HEARTBEAT, 100ms) (timed-loop fallback), advances and posts fired jobs; command channel via postMessage + Atomics.notify - services/timerWheel.ts: main-thread facade — worker path or 200ms polling fallback, requestAnimationFrame drift loop writing correction while hidden - hooks/useScheduledExecution.ts: wraps the service, mirrors metadata to the store, auto-cancels on unmount - store/slices/scheduleSlice.ts: job metadata (next fire, status, missed count) - components/dashboard/SchedulerDashboard.tsx: jobs table + cancel (component rather than src/pages since this is an App Router project) - tests for the wheel (ordering, rounds, recurrence, cancel, cap, precision, drift), the atomic buffer helpers, and the schedule store --- .../dashboard/SchedulerDashboard.tsx | 121 +++++++++++ src/hooks/useScheduledExecution.ts | 108 +++++++++ src/services/timerWheel.ts | 205 ++++++++++++++++++ src/store/slices/scheduleSlice.ts | 102 +++++++++ src/types/scheduler.ts | 103 +++++++++ src/utils/sharedBuffer.ts | 111 ++++++++++ src/utils/timerWheelCore.ts | 166 ++++++++++++++ src/workers/timerWheel.worker.ts | 103 +++++++++ tests/unit/scheduleSlice.test.ts | 77 +++++++ tests/unit/sharedBuffer.test.ts | 72 ++++++ tests/unit/timerWheelCore.test.ts | 115 ++++++++++ 11 files changed, 1283 insertions(+) create mode 100644 src/components/dashboard/SchedulerDashboard.tsx create mode 100644 src/hooks/useScheduledExecution.ts create mode 100644 src/services/timerWheel.ts create mode 100644 src/store/slices/scheduleSlice.ts create mode 100644 src/types/scheduler.ts create mode 100644 src/utils/sharedBuffer.ts create mode 100644 src/utils/timerWheelCore.ts create mode 100644 src/workers/timerWheel.worker.ts create mode 100644 tests/unit/scheduleSlice.test.ts create mode 100644 tests/unit/sharedBuffer.test.ts create mode 100644 tests/unit/timerWheelCore.test.ts diff --git a/src/components/dashboard/SchedulerDashboard.tsx b/src/components/dashboard/SchedulerDashboard.tsx new file mode 100644 index 0000000..45e3292 --- /dev/null +++ b/src/components/dashboard/SchedulerDashboard.tsx @@ -0,0 +1,121 @@ +"use client"; + +import { useMemo } from "react"; +import { + useSchedule, + selectJobs, + scheduleStore, +} from "@/store/slices/scheduleSlice"; +import { getTimerWheel } from "@/services/timerWheel"; +import type { JobStatus } from "@/types/scheduler"; + +/** + * Admin view of all scheduled Soroban executions: next fire time, interval, + * status, missed-fire count and a cancel action. + * + * (The blueprint names `src/pages/SchedulerDashboard.tsx`, but this project uses + * the App Router, so it ships as a component to avoid creating a conflicting + * Pages Router directory.) + */ + +const STATUS_STYLE: Record = { + scheduled: "bg-blue-500/10 text-blue-600", + firing: "bg-amber-500/10 text-amber-600", + done: "bg-green-500/10 text-green-600", + cancelled: "bg-muted text-muted-foreground", +}; + +function formatTime(ms: number): string { + return new Date(ms).toLocaleTimeString(); +} + +function formatInterval(ms: number | null): string { + if (!ms) return "once"; + if (ms % 86_400_000 === 0) return `${ms / 86_400_000}d`; + if (ms % 3_600_000 === 0) return `${ms / 3_600_000}h`; + if (ms % 60_000 === 0) return `${ms / 60_000}m`; + return `${Math.round(ms / 1000)}s`; +} + +export interface SchedulerDashboardProps { + className?: string; +} + +export function SchedulerDashboard({ className }: SchedulerDashboardProps) { + const state = useSchedule(); + const jobs = useMemo(() => selectJobs(state), [state]); + + const handleCancel = (id: string) => { + getTimerWheel().cancel(id); + scheduleStore.dispatch({ type: "JOB_CANCELLED", payload: { id } }); + }; + + return ( +
+
+

Scheduled Executions

+ + {jobs.length} job{jobs.length === 1 ? "" : "s"} + +
+ + {jobs.length === 0 ? ( +

+ No scheduled jobs. +

+ ) : ( + + + + + + + + + + + + {jobs.map((job) => ( + + + + + + + + + ))} + +
JobNext fireIntervalStatusMissed +
{job.name} + {formatTime(job.nextFire)} + + {formatInterval(job.intervalMs)} + + + {job.status} + + + {job.missedCount > 0 ? ( + {job.missedCount} + ) : ( + "0" + )} + + {job.status === "scheduled" && ( + + )} +
+ )} +
+ ); +} + +export default SchedulerDashboard; diff --git a/src/hooks/useScheduledExecution.ts b/src/hooks/useScheduledExecution.ts new file mode 100644 index 0000000..337d237 --- /dev/null +++ b/src/hooks/useScheduledExecution.ts @@ -0,0 +1,108 @@ +"use client"; + +import { useCallback, useEffect, useRef } from "react"; +import { + getTimerWheel, + type ScheduleHandle, + type TimerWheelService, +} from "@/services/timerWheel"; +import { scheduleStore } from "@/store/slices/scheduleSlice"; +import type { FiredJob } from "@/types/scheduler"; + +/** + * Component-level scheduling. Wraps {@link TimerWheelService}, mirrors job + * metadata into {@link scheduleStore} for the dashboard, and auto-cancels every + * job this component scheduled when it unmounts. + */ + +export interface ScheduleRequest { + /** Human-readable name for the dashboard. */ + name: string; + handlerKey: string; + handler: (fired: FiredJob) => void; + /** Absolute fire time (unix ms). */ + fireAt: number; + /** Recurrence interval (ms); omit for one-shot. */ + intervalMs?: number; +} + +export interface UseScheduledExecutionResult { + schedule: (request: ScheduleRequest) => string; + cancel: (id: string) => void; + /** Current main-thread → worker drift correction (ms). */ + drift: number; + usingFallback: boolean; +} + +export function useScheduledExecution( + service: TimerWheelService = getTimerWheel() +): UseScheduledExecutionResult { + const handlesRef = useRef>(new Map()); + + useEffect(() => { + const handles = handlesRef.current; + return () => { + // Auto-cancel everything this component scheduled. + for (const handle of handles.values()) handle.cancel(); + handles.clear(); + }; + }, []); + + const schedule = useCallback( + (request: ScheduleRequest): string => { + const { name, handlerKey, handler, fireAt, intervalMs } = request; + const handle = service.schedule( + handlerKey, + (fired) => { + handler(fired); + scheduleStore.dispatch({ + type: "JOB_FIRED", + payload: { + id: fired.id, + firedAt: fired.firedAt, + nextFire: intervalMs ? fired.scheduledFor + intervalMs : fired.scheduledFor, + missed: fired.missed, + }, + }); + if (!intervalMs) handlesRef.current.delete(fired.id); + }, + fireAt, + intervalMs + ); + + handlesRef.current.set(handle.id, handle); + scheduleStore.dispatch({ + type: "JOB_SCHEDULED", + payload: { + id: handle.id, + handlerKey, + name, + nextFire: fireAt, + intervalMs: intervalMs ?? null, + status: "scheduled", + missedCount: 0, + lastFiredAt: null, + }, + }); + return handle.id; + }, + [service] + ); + + const cancel = useCallback( + (id: string) => { + handlesRef.current.get(id)?.cancel(); + handlesRef.current.delete(id); + service.cancel(id); + scheduleStore.dispatch({ type: "JOB_CANCELLED", payload: { id } }); + }, + [service] + ); + + return { + schedule, + cancel, + drift: service.drift, + usingFallback: service.usingFallback, + }; +} diff --git a/src/services/timerWheel.ts b/src/services/timerWheel.ts new file mode 100644 index 0000000..a62bef6 --- /dev/null +++ b/src/services/timerWheel.ts @@ -0,0 +1,205 @@ +"use client"; + +import { TimerWheel } from "@/utils/timerWheelCore"; +import { + bumpCommandSeq, + createTimerBuffer, + isSharedArrayBufferAvailable, + writeDrift, + type TimerBuffer, +} from "@/utils/sharedBuffer"; +import { + FALLBACK_POLL_MS, + SLOT_MS, + type FiredJob, + type SchedulerEvent, + type TimerJob, +} from "@/types/scheduler"; + +/** + * Main-thread facade for the timer wheel. + * + * Preferred path: a Web Worker holds the wheel and ticks via Atomics.wait over a + * SharedArrayBuffer, immune to background-tab throttling. The main thread runs a + * requestAnimationFrame loop that, while the tab is hidden, writes a drift + * correction into the shared buffer. If SharedArrayBuffer (COOP/COEP) is + * unavailable it degrades to a 200 ms polling loop on the main thread. + */ + +export interface ScheduleHandle { + id: string; + cancel: () => void; +} + +export interface TimerWheelDeps { + createWorker?: () => Worker; + now?: () => number; + /** Schedule a rAF-like callback; defaults to requestAnimationFrame. */ + raf?: (cb: (t: number) => void) => number; + cancelRaf?: (handle: number) => void; + logger?: Pick; +} + +type Handler = (fired: FiredJob) => void; + +export class TimerWheelService { + private buffer: TimerBuffer | null = null; + private worker: Worker | null = null; + private fallbackWheel: TimerWheel | null = null; + private fallbackTimer: ReturnType | null = null; + private rafHandle: number | null = null; + private readonly handlers = new Map(); + private counter = 0; + private started = false; + private _drift = 0; + + constructor(private readonly deps: TimerWheelDeps = {}) {} + + get usingFallback(): boolean { + return this.fallbackWheel !== null; + } + get drift(): number { + return this._drift; + } + + /** Initialise the worker (or the polling fallback). Idempotent. */ + start(): void { + if (this.started) return; + this.started = true; + + const canShare = isSharedArrayBufferAvailable(); + const createWorker = this.deps.createWorker; + if (canShare && createWorker) { + try { + this.buffer = createTimerBuffer(); + this.worker = createWorker(); + this.worker.onmessage = (e: MessageEvent) => + this.onEvent(e.data); + this.worker.postMessage({ + type: "init", + buffer: this.buffer.buffer, + shared: this.buffer.shared, + }); + this.startDriftLoop(); + return; + } catch { + // fall through to polling + } + } + + (this.deps.logger ?? console).warn( + "[timerWheel] SharedArrayBuffer/worker unavailable — falling back to 200 ms polling" + ); + this.startFallback(); + } + + private startFallback(): void { + const now = this.deps.now ?? Date.now; + this.fallbackWheel = new TimerWheel(now()); + this.fallbackTimer = setInterval(() => { + const fired = this.fallbackWheel!.advance(now()); + for (const job of fired) this.dispatch(job); + }, FALLBACK_POLL_MS); + } + + private startDriftLoop(): void { + const raf = + this.deps.raf ?? + (typeof requestAnimationFrame !== "undefined" ? requestAnimationFrame : null); + if (!raf) return; + const now = this.deps.now ?? Date.now; + + let lastTs = now(); + const frame = () => { + const ts = now(); + const elapsed = ts - lastTs; + lastTs = ts; + // While hidden, rAF is throttled; the extra elapsed time is the drift the + // worker should compensate its cursor by. + const hidden = + typeof document !== "undefined" && document.visibilityState === "hidden"; + this._drift = hidden ? Math.max(0, elapsed - SLOT_MS) : 0; + if (this.buffer) writeDrift(this.buffer.view, this._drift); + this.rafHandle = raf(frame); + }; + this.rafHandle = raf(frame); + } + + /** Schedule a job; returns a handle whose `cancel()` removes it. */ + schedule( + handlerKey: string, + handler: Handler, + fireAt: number, + intervalMs?: number + ): ScheduleHandle { + if (!this.started) this.start(); + const id = `job-${++this.counter}`; + const job: TimerJob = { id, handlerKey, fireAt, intervalMs }; + this.handlers.set(id, handler); + + if (this.worker && this.buffer) { + this.worker.postMessage({ type: "schedule", job }); + bumpCommandSeq(this.buffer.view); // wake the worker + } else if (this.fallbackWheel) { + this.fallbackWheel.schedule(job); + } + + return { id, cancel: () => this.cancel(id) }; + } + + cancel(id: string): void { + this.handlers.delete(id); + if (this.worker && this.buffer) { + this.worker.postMessage({ type: "cancel", jobId: id }); + bumpCommandSeq(this.buffer.view); + } else if (this.fallbackWheel) { + this.fallbackWheel.cancel(id); + } + } + + private onEvent(event: SchedulerEvent): void { + if (event.type === "fired") { + for (const job of event.jobs) this.dispatch(job); + } + } + + private dispatch(job: FiredJob): void { + const handler = this.handlers.get(job.id); + handler?.(job); + } + + /** Tear everything down. */ + dispose(): void { + if (this.worker && this.buffer) { + this.worker.postMessage({ type: "terminate" }); + this.worker.terminate(); + } + if (this.fallbackTimer !== null) clearInterval(this.fallbackTimer); + const cancelRaf = + this.deps.cancelRaf ?? + (typeof cancelAnimationFrame !== "undefined" ? cancelAnimationFrame : null); + if (this.rafHandle !== null && cancelRaf) cancelRaf(this.rafHandle); + this.handlers.clear(); + this.worker = null; + this.buffer = null; + this.fallbackWheel = null; + this.fallbackTimer = null; + this.rafHandle = null; + this.started = false; + } +} + +let singleton: TimerWheelService | null = null; + +/** Lazily-constructed shared service, wired to the bundled worker. */ +export function getTimerWheel(): TimerWheelService { + if (!singleton) { + singleton = new TimerWheelService({ + createWorker: () => + new Worker(new URL("../workers/timerWheel.worker.ts", import.meta.url), { + type: "module", + }), + }); + } + return singleton; +} diff --git a/src/store/slices/scheduleSlice.ts b/src/store/slices/scheduleSlice.ts new file mode 100644 index 0000000..db12a95 --- /dev/null +++ b/src/store/slices/scheduleSlice.ts @@ -0,0 +1,102 @@ +"use client"; + +import { useSyncExternalStore } from "react"; +import type { ScheduledJobMeta } from "@/types/scheduler"; + +/** + * Tracks scheduled-job metadata for the dashboard (next fire time, status, + * missed-fire count). Custom singleton store, matching the codebase pattern. + */ + +export type ScheduleState = Record; + +export type ScheduleAction = + | { type: "JOB_SCHEDULED"; payload: ScheduledJobMeta } + | { type: "JOB_FIRED"; payload: { id: string; firedAt: number; nextFire: number; missed: boolean } } + | { type: "JOB_CANCELLED"; payload: { id: string } } + | { type: "JOB_REMOVED"; payload: { id: string } } + | { type: "RESET" }; + +type Listener = (state: ScheduleState) => void; + +class ScheduleStore { + private state: ScheduleState = {}; + private listeners = new Set(); + + getState = (): Readonly => this.state; + + subscribe = (listener: Listener): (() => void) => { + this.listeners.add(listener); + return () => this.listeners.delete(listener); + }; + + dispatch(action: ScheduleAction): void { + const next = this.reducer(this.state, action); + if (next !== this.state) { + this.state = next; + this.notify(); + } + } + + private reducer(state: ScheduleState, action: ScheduleAction): ScheduleState { + switch (action.type) { + case "JOB_SCHEDULED": + return { ...state, [action.payload.id]: action.payload }; + case "JOB_FIRED": { + const job = state[action.payload.id]; + if (!job) return state; + return { + ...state, + [job.id]: { + ...job, + lastFiredAt: action.payload.firedAt, + nextFire: action.payload.nextFire, + missedCount: job.missedCount + (action.payload.missed ? 1 : 0), + // A recurring job stays scheduled; a one-shot is done. + status: job.intervalMs ? "scheduled" : "done", + }, + }; + } + case "JOB_CANCELLED": { + const job = state[action.payload.id]; + if (!job) return state; + return { ...state, [job.id]: { ...job, status: "cancelled" } }; + } + case "JOB_REMOVED": { + if (!state[action.payload.id]) return state; + const next = { ...state }; + delete next[action.payload.id]; + return next; + } + case "RESET": + return {}; + default: + return state; + } + } + + private notify(): void { + for (const listener of this.listeners) listener(this.state); + } +} + +/** Shared singleton schedule store. */ +export const scheduleStore = new ScheduleStore(); + +/** All jobs as an array (sorted by next fire time). */ +export function selectJobs(state: ScheduleState): ScheduledJobMeta[] { + return Object.values(state).sort((a, b) => a.nextFire - b.nextFire); +} + +/** Number of jobs that have missed at least one fire. */ +export function selectMissedCount(state: ScheduleState): number { + return Object.values(state).reduce((n, j) => n + (j.missedCount > 0 ? 1 : 0), 0); +} + +export function useSchedule(): ScheduleState { + return useSyncExternalStore( + scheduleStore.subscribe, + scheduleStore.getState, + scheduleStore.getState + ); +} diff --git a/src/types/scheduler.ts b/src/types/scheduler.ts new file mode 100644 index 0000000..30707a0 --- /dev/null +++ b/src/types/scheduler.ts @@ -0,0 +1,103 @@ +/** + * Types and invariants for the high-resolution timer wheel that schedules + * recurring Soroban contract executions with millisecond precision, resisting + * background-tab throttling via a Web Worker and SharedArrayBuffer timekeeping. + */ + +// --- Wheel invariants ------------------------------------------------------- + +/** Number of slots in the wheel. */ +export const WHEEL_SLOTS = 1024; +/** Milliseconds each slot represents. */ +export const SLOT_MS = 100; +/** Total wheel span (≈ 102.4 s). */ +export const WHEEL_SPAN_MS = WHEEL_SLOTS * SLOT_MS; +/** Maximum concurrently scheduled jobs. */ +export const MAX_JOBS = 1000; +/** Execution must land within ±this many ms of the scheduled fire-at time. */ +export const PRECISION_TOLERANCE_MS = 50; +/** Polling interval used when SharedArrayBuffer is unavailable. */ +export const FALLBACK_POLL_MS = 200; + +// --- Jobs ------------------------------------------------------------------- + +/** A scheduling request. */ +export interface TimerJob { + id: string; + /** Application handler key (the actual callback lives on the main thread). */ + handlerKey: string; + /** Absolute wall-clock fire time (unix ms). */ + fireAt: number; + /** Recurrence interval (ms); omitted/0 means one-shot. */ + intervalMs?: number; +} + +/** Internal wheel bookkeeping layered on top of a {@link TimerJob}. */ +export interface WheelJob extends TimerJob { + /** Slot index the job currently sits in. */ + slot: number; + /** Full wheel rotations remaining before the job is due. */ + rounds: number; + /** Times the job fired later than the precision tolerance. */ + missedCount: number; + /** Last time the job actually fired (unix ms), or null. */ + lastFiredAt: number | null; +} + +/** A fired job reported back to the caller. */ +export interface FiredJob { + id: string; + handlerKey: string; + /** Scheduled fire-at. */ + scheduledFor: number; + /** Wall-clock time it actually fired. */ + firedAt: number; + /** firedAt − scheduledFor (ms); positive = late. */ + lateness: number; + /** True when |lateness| exceeded the precision tolerance. */ + missed: boolean; +} + +export type JobStatus = "scheduled" | "firing" | "done" | "cancelled"; + +/** Metadata tracked for the dashboard. */ +export interface ScheduledJobMeta { + id: string; + handlerKey: string; + name: string; + nextFire: number; + intervalMs: number | null; + status: JobStatus; + missedCount: number; + lastFiredAt: number | null; +} + +// --- SharedArrayBuffer layout (Int32) --------------------------------------- + +/** 1024 × Int32 = 4 KiB control buffer. */ +export const SAB_INT32_LENGTH = 1024; + +export const SAB_INDEX = { + /** Worker heartbeat counter (also the Atomics.wait futex word). */ + HEARTBEAT: 0, + /** Main-thread-written drift correction (ms). */ + DRIFT: 1, + /** Incremented by the main thread when a new command is posted. */ + COMMAND_SEQ: 2, + /** 1 while the worker should keep running, 0 to stop. */ + RUNNING: 3, + /** Worker's last observed clock (ms, truncated to int32). */ + LAST_NOW: 4, +} as const; + +// --- Worker protocol -------------------------------------------------------- + +export type SchedulerCommand = + | { type: "schedule"; job: TimerJob } + | { type: "cancel"; jobId: string } + | { type: "terminate" }; + +export type SchedulerEvent = + | { type: "fired"; jobs: FiredJob[] } + | { type: "ready"; shared: boolean } + | { type: "error"; message: string }; diff --git a/src/utils/sharedBuffer.ts b/src/utils/sharedBuffer.ts new file mode 100644 index 0000000..9233a67 --- /dev/null +++ b/src/utils/sharedBuffer.ts @@ -0,0 +1,111 @@ +/** + * SharedArrayBuffer creation and atomic-access helpers for the timer wheel's + * cross-worker timekeeping (heartbeat, drift correction, command signalling). + * + * SharedArrayBuffer requires cross-origin isolation (COOP/COEP). When it is + * unavailable the buffer degrades to a plain ArrayBuffer: atomic loads/stores + * still work, but {@link futexWait} cannot block, so the worker falls back to a + * timed loop and the main thread to polling. + */ + +import { SAB_INDEX, SAB_INT32_LENGTH } from "@/types/scheduler"; + +export interface TimerBuffer { + buffer: SharedArrayBuffer | ArrayBuffer; + view: Int32Array; + /** True when backed by a real SharedArrayBuffer (atomic blocking works). */ + shared: boolean; +} + +/** Whether real cross-thread shared memory is available. */ +export function isSharedArrayBufferAvailable(): boolean { + return ( + typeof SharedArrayBuffer !== "undefined" && + // crossOriginIsolated is undefined in workers/node; treat absence as "ok". + (typeof crossOriginIsolated === "undefined" || crossOriginIsolated === true) + ); +} + +/** Allocate the 4 KiB control buffer (shared when possible). */ +export function createTimerBuffer(): TimerBuffer { + const bytes = SAB_INT32_LENGTH * 4; + if (isSharedArrayBufferAvailable()) { + const buffer = new SharedArrayBuffer(bytes); + return { buffer, view: new Int32Array(buffer), shared: true }; + } + const buffer = new ArrayBuffer(bytes); + return { buffer, view: new Int32Array(buffer), shared: false }; +} + +/** Wrap an existing (transferred) buffer in a typed view. */ +export function viewOf(buffer: SharedArrayBuffer | ArrayBuffer): TimerBuffer { + return { + buffer, + view: new Int32Array(buffer), + shared: typeof SharedArrayBuffer !== "undefined" && buffer instanceof SharedArrayBuffer, + }; +} + +// --- Field accessors (atomic) ---------------------------------------------- + +export function writeDrift(view: Int32Array, driftMs: number): void { + Atomics.store(view, SAB_INDEX.DRIFT, Math.trunc(driftMs)); +} +export function readDrift(view: Int32Array): number { + return Atomics.load(view, SAB_INDEX.DRIFT); +} + +export function setRunning(view: Int32Array, running: boolean): void { + Atomics.store(view, SAB_INDEX.RUNNING, running ? 1 : 0); +} +export function isRunning(view: Int32Array): boolean { + return Atomics.load(view, SAB_INDEX.RUNNING) === 1; +} + +export function writeLastNow(view: Int32Array, now: number): void { + Atomics.store(view, SAB_INDEX.LAST_NOW, Math.trunc(now) | 0); +} +export function readLastNow(view: Int32Array): number { + return Atomics.load(view, SAB_INDEX.LAST_NOW); +} + +/** Bump the command sequence and wake the worker. Returns the new sequence. */ +export function bumpCommandSeq(view: Int32Array): number { + const next = Atomics.add(view, SAB_INDEX.COMMAND_SEQ, 1) + 1; + futexNotify(view, SAB_INDEX.HEARTBEAT); + return next; +} +export function readCommandSeq(view: Int32Array): number { + return Atomics.load(view, SAB_INDEX.COMMAND_SEQ); +} + +export function incrementHeartbeat(view: Int32Array): number { + return Atomics.add(view, SAB_INDEX.HEARTBEAT, 1) + 1; +} + +/** Wake any threads blocked on `index`. No-op semantics on non-shared buffers. */ +export function futexNotify(view: Int32Array, index: number, count = 1): number { + try { + return Atomics.notify(view, index, count); + } catch { + return 0; // non-shared buffer + } +} + +/** + * Block until `index` changes from `expected` or `timeoutMs` elapses. On a + * non-shared buffer Atomics.wait throws; we report "not-equal" so the caller + * falls back to a timed loop. + */ +export function futexWait( + view: Int32Array, + index: number, + expected: number, + timeoutMs: number +): "ok" | "timed-out" | "not-equal" { + try { + return Atomics.wait(view, index, expected, timeoutMs); + } catch { + return "not-equal"; + } +} diff --git a/src/utils/timerWheelCore.ts b/src/utils/timerWheelCore.ts new file mode 100644 index 0000000..3337f29 --- /dev/null +++ b/src/utils/timerWheelCore.ts @@ -0,0 +1,166 @@ +/** + * Hashed timer wheel (Netty/Varghese & Lauck style) — the pure data structure + * shared by the worker and the polling fallback, and the thing under test. + * + * 1,024 slots × 100 ms (≈102.4 s span). A job is hashed into a slot by its + * fire-tick; jobs further out than one rotation carry a `rounds` counter that is + * decremented each time the cursor passes their slot. Once a job's rotation is + * complete it moves to a `pending` set and fires as soon as `now ≥ fireAt`, + * which guarantees sub-slot jobs are never skipped. + */ + +import { + MAX_JOBS, + PRECISION_TOLERANCE_MS, + SLOT_MS, + WHEEL_SLOTS, + type FiredJob, + type TimerJob, + type WheelJob, +} from "@/types/scheduler"; + +export class TimerWheel { + private readonly slots: Set[]; + private readonly jobs = new Map(); + /** Jobs whose rotation completed, awaiting their exact fire-at. */ + private readonly pending = new Set(); + private currentTick: number; + private drift = 0; + + constructor(startNow = 0) { + this.slots = Array.from({ length: WHEEL_SLOTS }, () => new Set()); + this.currentTick = Math.floor(startNow / SLOT_MS); + } + + get size(): number { + return this.jobs.size; + } + + getJob(id: string): WheelJob | undefined { + return this.jobs.get(id); + } + + list(): WheelJob[] { + return [...this.jobs.values()]; + } + + /** Apply a drift correction (ms) added to `now` on each advance. */ + setDrift(ms: number): void { + this.drift = ms; + } + + /** Schedule (or replace) a job. Throws past {@link MAX_JOBS}. */ + schedule(job: TimerJob): WheelJob { + if (!this.jobs.has(job.id) && this.jobs.size >= MAX_JOBS) { + throw new Error(`Timer wheel is full (max ${MAX_JOBS} jobs)`); + } + return this.place(job); + } + + /** Cancel a job. Returns true if it existed. */ + cancel(id: string): boolean { + const wj = this.jobs.get(id); + if (!wj) return false; + this.slots[wj.slot].delete(id); + this.pending.delete(id); + this.jobs.delete(id); + return true; + } + + private place(job: TimerJob): WheelJob { + const prev = this.jobs.get(job.id); + if (prev) { + this.slots[prev.slot].delete(job.id); + this.pending.delete(job.id); + } + + const targetTick = Math.floor(job.fireAt / SLOT_MS); + const slot = ((targetTick % WHEEL_SLOTS) + WHEEL_SLOTS) % WHEEL_SLOTS; + const ticksAway = targetTick - this.currentTick; + const rounds = ticksAway <= 0 ? 0 : Math.floor(ticksAway / WHEEL_SLOTS); + + const wj: WheelJob = { + ...job, + slot, + rounds, + missedCount: prev?.missedCount ?? 0, + lastFiredAt: prev?.lastFiredAt ?? null, + }; + this.jobs.set(job.id, wj); + + if (ticksAway <= 0) { + this.pending.add(job.id); + } else { + this.slots[slot].add(job.id); + } + return wj; + } + + /** + * Advance the wheel to `now` (wall-clock ms) and return the jobs that fired. + * Recurring jobs are rescheduled by `fireAt += intervalMs` (at most one fire + * per advance, so being behind never triggers a catch-up storm). + */ + advance(now: number): FiredJob[] { + const clockNow = now + this.drift; + const targetTick = Math.floor(clockNow / SLOT_MS); + + // Step the cursor; jobs whose rotation completes move to `pending`. + while (this.currentTick < targetTick) { + this.currentTick++; + const slot = this.currentTick % WHEEL_SLOTS; + for (const id of [...this.slots[slot]]) { + const wj = this.jobs.get(id); + if (!wj) { + this.slots[slot].delete(id); + continue; + } + if (wj.rounds > 0) { + wj.rounds -= 1; + continue; + } + this.slots[slot].delete(id); + this.pending.add(id); + } + } + + // Fire pending jobs that have reached their fire-at. + const fired: FiredJob[] = []; + for (const id of [...this.pending]) { + const wj = this.jobs.get(id); + if (!wj) { + this.pending.delete(id); + continue; + } + if (wj.fireAt > clockNow) continue; + + this.pending.delete(id); + const lateness = clockNow - wj.fireAt; + const missed = Math.abs(lateness) > PRECISION_TOLERANCE_MS; + if (missed) wj.missedCount += 1; + wj.lastFiredAt = clockNow; + + fired.push({ + id: wj.id, + handlerKey: wj.handlerKey, + scheduledFor: wj.fireAt, + firedAt: clockNow, + lateness, + missed, + }); + + if (wj.intervalMs && wj.intervalMs > 0) { + // Reschedule for the next cadence tick (carries missed/lastFired over). + this.place({ + id: wj.id, + handlerKey: wj.handlerKey, + fireAt: wj.fireAt + wj.intervalMs, + intervalMs: wj.intervalMs, + }); + } else { + this.jobs.delete(id); + } + } + return fired; + } +} diff --git a/src/workers/timerWheel.worker.ts b/src/workers/timerWheel.worker.ts new file mode 100644 index 0000000..de0966a --- /dev/null +++ b/src/workers/timerWheel.worker.ts @@ -0,0 +1,103 @@ +/** + * Timer-wheel worker. Holds the wheel and runs the tick loop off the main + * thread so it keeps firing at full resolution while the tab is throttled. + * + * With a SharedArrayBuffer it sleeps each tick on `Atomics.wait(HEARTBEAT, …, + * 100)`, which is woken early when the main thread posts a command (via + * Atomics.notify). Without one it degrades to a 100 ms timed loop. Each tick it + * reads the drift correction, advances the wheel, and posts any fired jobs. + */ + +import { TimerWheel } from "@/utils/timerWheelCore"; +import { + futexWait, + incrementHeartbeat, + isRunning, + readDrift, + setRunning, + writeLastNow, + viewOf, +} from "@/utils/sharedBuffer"; +import { SAB_INDEX, SLOT_MS } from "@/types/scheduler"; +import type { SchedulerCommand, SchedulerEvent } from "@/types/scheduler"; + +type InitMessage = { + type: "init"; + buffer: SharedArrayBuffer | ArrayBuffer; + shared: boolean; +}; + +const worker = self as unknown as Worker; + +let view: Int32Array | null = null; +let shared = false; +let wheel: TimerWheel | null = null; +let looping = false; + +const delay = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +function post(event: SchedulerEvent): void { + worker.postMessage(event); +} + +async function runLoop(): Promise { + if (looping || !view || !wheel) return; + looping = true; + try { + while (view && wheel && isRunning(view)) { + const heartbeat = Atomics.load(view, SAB_INDEX.HEARTBEAT); + if (shared) { + // Sleep up to one slot; woken early by a command's Atomics.notify. + futexWait(view, SAB_INDEX.HEARTBEAT, heartbeat, SLOT_MS); + } else { + await delay(SLOT_MS); + } + if (!isRunning(view)) break; + + const now = Date.now(); + writeLastNow(view, now); + wheel.setDrift(readDrift(view)); + const fired = wheel.advance(now); + if (fired.length > 0) post({ type: "fired", jobs: fired }); + incrementHeartbeat(view); + } + } catch (err) { + post({ type: "error", message: (err as Error).message }); + } finally { + looping = false; + } +} + +worker.addEventListener( + "message", + (event: MessageEvent) => { + const data = event.data; + + if (data.type === "init") { + view = viewOf(data.buffer).view; + shared = data.shared; + wheel = new TimerWheel(Date.now()); + setRunning(view, true); + post({ type: "ready", shared }); + void runLoop(); + return; + } + + if (!wheel) return; + switch (data.type) { + case "schedule": + try { + wheel.schedule(data.job); + } catch (err) { + post({ type: "error", message: (err as Error).message }); + } + break; + case "cancel": + wheel.cancel(data.jobId); + break; + case "terminate": + if (view) setRunning(view, false); + break; + } + } +); diff --git a/tests/unit/scheduleSlice.test.ts b/tests/unit/scheduleSlice.test.ts new file mode 100644 index 0000000..f8c94d0 --- /dev/null +++ b/tests/unit/scheduleSlice.test.ts @@ -0,0 +1,77 @@ +import { describe, it, expect, beforeEach } from "vitest"; +import { + scheduleStore, + selectJobs, + selectMissedCount, +} from "@/store/slices/scheduleSlice"; +import type { ScheduledJobMeta } from "@/types/scheduler"; + +function meta(id: string, nextFire: number, intervalMs: number | null = null): ScheduledJobMeta { + return { + id, + handlerKey: "h", + name: `Job ${id}`, + nextFire, + intervalMs, + status: "scheduled", + missedCount: 0, + lastFiredAt: null, + }; +} + +beforeEach(() => scheduleStore.dispatch({ type: "RESET" })); + +describe("scheduleStore", () => { + it("adds scheduled jobs", () => { + scheduleStore.dispatch({ type: "JOB_SCHEDULED", payload: meta("a", 1000) }); + expect(selectJobs(scheduleStore.getState())).toHaveLength(1); + }); + + it("sorts jobs by next fire time", () => { + scheduleStore.dispatch({ type: "JOB_SCHEDULED", payload: meta("late", 3000) }); + scheduleStore.dispatch({ type: "JOB_SCHEDULED", payload: meta("early", 1000) }); + expect(selectJobs(scheduleStore.getState()).map((j) => j.id)).toEqual([ + "early", + "late", + ]); + }); + + it("updates next fire and keeps a recurring job scheduled", () => { + scheduleStore.dispatch({ type: "JOB_SCHEDULED", payload: meta("r", 1000, 1000) }); + scheduleStore.dispatch({ + type: "JOB_FIRED", + payload: { id: "r", firedAt: 1005, nextFire: 2000, missed: false }, + }); + const job = scheduleStore.getState()["r"]; + expect(job.status).toBe("scheduled"); + expect(job.nextFire).toBe(2000); + expect(job.lastFiredAt).toBe(1005); + }); + + it("marks a one-shot job done after firing", () => { + scheduleStore.dispatch({ type: "JOB_SCHEDULED", payload: meta("one", 1000) }); + scheduleStore.dispatch({ + type: "JOB_FIRED", + payload: { id: "one", firedAt: 1005, nextFire: 1000, missed: false }, + }); + expect(scheduleStore.getState()["one"].status).toBe("done"); + }); + + it("increments missed count and reports it", () => { + scheduleStore.dispatch({ type: "JOB_SCHEDULED", payload: meta("m", 1000, 1000) }); + scheduleStore.dispatch({ + type: "JOB_FIRED", + payload: { id: "m", firedAt: 1200, nextFire: 2000, missed: true }, + }); + expect(scheduleStore.getState()["m"].missedCount).toBe(1); + expect(selectMissedCount(scheduleStore.getState())).toBe(1); + }); + + it("cancels and removes jobs", () => { + scheduleStore.dispatch({ type: "JOB_SCHEDULED", payload: meta("c", 1000) }); + scheduleStore.dispatch({ type: "JOB_CANCELLED", payload: { id: "c" } }); + expect(scheduleStore.getState()["c"].status).toBe("cancelled"); + scheduleStore.dispatch({ type: "JOB_REMOVED", payload: { id: "c" } }); + expect(scheduleStore.getState()["c"]).toBeUndefined(); + }); +}); diff --git a/tests/unit/sharedBuffer.test.ts b/tests/unit/sharedBuffer.test.ts new file mode 100644 index 0000000..5e50f8d --- /dev/null +++ b/tests/unit/sharedBuffer.test.ts @@ -0,0 +1,72 @@ +import { describe, it, expect } from "vitest"; +import { + createTimerBuffer, + viewOf, + writeDrift, + readDrift, + setRunning, + isRunning, + writeLastNow, + readLastNow, + bumpCommandSeq, + readCommandSeq, + incrementHeartbeat, + futexNotify, + futexWait, +} from "@/utils/sharedBuffer"; +import { SAB_INT32_LENGTH } from "@/types/scheduler"; + +describe("createTimerBuffer", () => { + it("allocates a 4 KiB Int32 control buffer", () => { + const { view } = createTimerBuffer(); + expect(view.length).toBe(SAB_INT32_LENGTH); + expect(view.byteLength).toBe(4096); + }); + + it("viewOf wraps an existing buffer", () => { + const { buffer } = createTimerBuffer(); + expect(viewOf(buffer).view.length).toBe(SAB_INT32_LENGTH); + }); +}); + +describe("atomic field accessors", () => { + it("round-trips drift, running, last-now", () => { + const { view } = createTimerBuffer(); + writeDrift(view, 123); + expect(readDrift(view)).toBe(123); + writeDrift(view, -7); + expect(readDrift(view)).toBe(-7); + + setRunning(view, true); + expect(isRunning(view)).toBe(true); + setRunning(view, false); + expect(isRunning(view)).toBe(false); + + writeLastNow(view, 999); + expect(readLastNow(view)).toBe(999); + }); + + it("bumps the command sequence and increments the heartbeat", () => { + const { view } = createTimerBuffer(); + expect(readCommandSeq(view)).toBe(0); + expect(bumpCommandSeq(view)).toBe(1); + expect(bumpCommandSeq(view)).toBe(2); + expect(readCommandSeq(view)).toBe(2); + + expect(incrementHeartbeat(view)).toBe(1); + expect(incrementHeartbeat(view)).toBe(2); + }); + + it("futexNotify never throws regardless of buffer type", () => { + const { view } = createTimerBuffer(); + expect(() => futexNotify(view, 0)).not.toThrow(); + }); + + it("futexWait reports not-equal immediately when the value already differs", () => { + const { view } = createTimerBuffer(); + // Waiting for HEARTBEAT(0) to differ from a wrong expected value returns + // immediately. On a non-shared buffer it also returns 'not-equal'. + const result = futexWait(view, 0, 12345, 0); + expect(["not-equal", "timed-out", "ok"]).toContain(result); + }); +}); diff --git a/tests/unit/timerWheelCore.test.ts b/tests/unit/timerWheelCore.test.ts new file mode 100644 index 0000000..83c5ff3 --- /dev/null +++ b/tests/unit/timerWheelCore.test.ts @@ -0,0 +1,115 @@ +import { describe, it, expect } from "vitest"; +import { TimerWheel } from "@/utils/timerWheelCore"; +import { + MAX_JOBS, + WHEEL_SPAN_MS, + PRECISION_TOLERANCE_MS, + type TimerJob, +} from "@/types/scheduler"; + +function job(id: string, fireAt: number, intervalMs?: number): TimerJob { + return { id, handlerKey: id, fireAt, intervalMs }; +} + +describe("TimerWheel scheduling", () => { + it("fires a one-shot job when time reaches its fire-at", () => { + const w = new TimerWheel(0); + w.schedule(job("a", 500)); + expect(w.advance(400)).toEqual([]); // not yet + const fired = w.advance(550); + expect(fired.map((f) => f.id)).toEqual(["a"]); + expect(fired[0].scheduledFor).toBe(500); + expect(w.size).toBe(0); // one-shot removed + }); + + it("fires jobs in fire-at order", () => { + const w = new TimerWheel(0); + w.schedule(job("late", 900)); + w.schedule(job("early", 300)); + const fired = w.advance(1000).map((f) => f.id); + expect(fired).toEqual(["early", "late"]); + }); + + it("fires a job scheduled in the past immediately on next advance", () => { + const w = new TimerWheel(1000); + w.schedule(job("past", 500)); + expect(w.advance(1000).map((f) => f.id)).toEqual(["past"]); + }); + + it("handles jobs beyond one wheel rotation (rounds)", () => { + const w = new TimerWheel(0); + const farFuture = WHEEL_SPAN_MS + 5000; // > 102.4 s out + w.schedule(job("far", farFuture)); + expect(w.advance(WHEEL_SPAN_MS).length).toBe(0); // a full rotation, not yet + expect(w.advance(farFuture + 50).map((f) => f.id)).toEqual(["far"]); + }); +}); + +describe("TimerWheel recurrence", () => { + it("reschedules a recurring job by its interval", () => { + const w = new TimerWheel(0); + w.schedule(job("r", 1000, 1000)); + expect(w.advance(1000).map((f) => f.id)).toEqual(["r"]); + expect(w.advance(1500).length).toBe(0); // before the next cadence + expect(w.advance(2000).map((f) => f.id)).toEqual(["r"]); + expect(w.advance(3000).map((f) => f.id)).toEqual(["r"]); + }); + + it("fires a recurring job at most once per advance (no catch-up storm)", () => { + const w = new TimerWheel(0); + w.schedule(job("r", 100, 100)); + // Jump far ahead; should fire once, then again next advance, not 50×. + expect(w.advance(5000).map((f) => f.id)).toEqual(["r"]); + expect(w.advance(5000).map((f) => f.id)).toEqual(["r"]); + }); +}); + +describe("TimerWheel cancellation & capacity", () => { + it("cancels a scheduled job", () => { + const w = new TimerWheel(0); + w.schedule(job("c", 500)); + expect(w.cancel("c")).toBe(true); + expect(w.advance(1000)).toEqual([]); + expect(w.cancel("c")).toBe(false); + }); + + it("replacing an existing id keeps the job count stable", () => { + const w = new TimerWheel(0); + w.schedule(job("x", 500)); + w.schedule(job("x", 800)); + expect(w.size).toBe(1); + expect(w.advance(1000).map((f) => f.id)).toEqual(["x"]); + }); + + it("throws past the max-jobs cap", () => { + const w = new TimerWheel(0); + for (let i = 0; i < MAX_JOBS; i++) w.schedule(job(`j${i}`, 1000 + i)); + expect(() => w.schedule(job("overflow", 2000))).toThrow(/full/); + // Replacing an existing id is still allowed. + expect(() => w.schedule(job("j0", 3000))).not.toThrow(); + }); +}); + +describe("TimerWheel precision & drift", () => { + it("flags a fire as missed when late beyond the tolerance", () => { + const w = new TimerWheel(0); + w.schedule(job("m", 100)); + const fired = w.advance(100 + PRECISION_TOLERANCE_MS + 10); + expect(fired[0].missed).toBe(true); + expect(w.getJob("m")).toBeUndefined(); // one-shot removed + }); + + it("does not flag an on-time fire", () => { + const w = new TimerWheel(0); + w.schedule(job("ok", 100)); + expect(w.advance(110)[0].missed).toBe(false); + }); + + it("applies a drift correction to the clock", () => { + const w = new TimerWheel(0); + w.schedule(job("d", 1000)); + w.setDrift(60); // worker is 60 ms behind real time + // Real now 950, +60 drift → effective 1010 → fires. + expect(w.advance(950).map((f) => f.id)).toEqual(["d"]); + }); +});