feat(hub): multi-node broker dispatch — affinity, durable intent, lifecycle + message routing#305
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
There was a problem hiding this comment.
Code Review
This pull request introduces PostgreSQL support to Scion, enabling multi-node deployments through Postgres LISTEN/NOTIFY for event publishing and command dispatching. It also adds a database migration command to copy state from SQLite to Postgres, refactors tests to use the Ent-backed memory database, and updates design documentation. The review feedback highlights several critical issues, including potential Windows sharing violations during SQLite file deletion, a stale nil capture of the control channel manager, incorrect parsing of file URLs with triple slashes, and SQLite database initialization failures in Go subtests due to unescaped slashes.
| if err != nil { | ||
| return fmt.Errorf("opening source sqlite: %w", err) | ||
| } | ||
| defer src.Close() |
There was a problem hiding this comment.
| // parseSQLiteSourceDSN normalizes the --from value into a modernc.org/sqlite DSN | ||
| // and returns the bare filesystem path (for --drop-source). It accepts: | ||
| // | ||
| // sqlite:///abs/path/hub.db -> /abs/path/hub.db | ||
| // sqlite://rel/path/hub.db -> rel/path/hub.db | ||
| // file:/abs/path/hub.db -> passed through, path extracted | ||
| // /abs/path/hub.db -> bare path | ||
| func parseSQLiteSourceDSN(raw string) (dsn, path string, err error) { | ||
| if raw == "" { |
There was a problem hiding this comment.
Explicitly close the source database connection and set src to nil before calling dropSQLiteFile. This prevents deleting a file that is still actively held open by the connection pool, which is especially critical on Windows where it would otherwise fail with a sharing violation.
if migrateDropSource {
_ = src.Close()
src = nil
fmt.Fprintf(out, "Dropping source SQLite file: %s\n", srcPath)
if err := dropSQLiteFile(srcPath); err != nil {
return fmt.Errorf("dropping source: %w", err)
}
fmt.Fprintln(out, "Source dropped.")
} else {
fmt.Fprintf(out, "Source left in place: %s\n", srcPath)
}| mgr := hubSrv.GetControlChannelManager() | ||
| ownsLocally := func(brokerID string) bool { | ||
| if mgr == nil { | ||
| return false | ||
| } | ||
| return mgr.IsConnected(brokerID) | ||
| } |
There was a problem hiding this comment.
Evaluate hubSrv.GetControlChannelManager() inside the closure instead of capturing a local variable. If GetControlChannelManager() returns nil at the time newCommandBus is called but is initialized later, the closure would otherwise permanently capture the stale nil value, causing ownsLocally to always return false.
| mgr := hubSrv.GetControlChannelManager() | |
| ownsLocally := func(brokerID string) bool { | |
| if mgr == nil { | |
| return false | |
| } | |
| return mgr.IsConnected(brokerID) | |
| } | |
| ownsLocally := func(brokerID string) bool { | |
| mgr := hubSrv.GetControlChannelManager() | |
| if mgr == nil { | |
| return false | |
| } | |
| return mgr.IsConnected(brokerID) | |
| } |
| case strings.HasPrefix(raw, "file:"): | ||
| path = strings.TrimPrefix(raw, "file:") |
There was a problem hiding this comment.
Trim the file:// prefix first before checking for file:. Standard file URLs often use the file:///path/to/db format, and only trimming file: would result in an invalid path like ///path/to/db on Unix-like systems.
case strings.HasPrefix(raw, "file://"):
path = strings.TrimPrefix(raw, "file://")
case strings.HasPrefix(raw, "file:"):
path = strings.TrimPrefix(raw, "file:")| }, | ||
| { | ||
| name: "file url with query", | ||
| in: "file:/tmp/hub.db?cache=shared", | ||
| wantDSN: "file:/tmp/hub.db?cache=shared", | ||
| wantPath: "/tmp/hub.db", |
There was a problem hiding this comment.
Add a test case to verify that file URLs with triple slashes (e.g., file:///tmp/hub.db) are correctly parsed and normalized to /tmp/hub.db.
| }, | |
| { | |
| name: "file url with query", | |
| in: "file:/tmp/hub.db?cache=shared", | |
| wantDSN: "file:/tmp/hub.db?cache=shared", | |
| wantPath: "/tmp/hub.db", | |
| { | |
| name: "file url with query", | |
| in: "file:/tmp/hub.db?cache=shared", | |
| wantDSN: "file:/tmp/hub.db?cache=shared", | |
| wantPath: "/tmp/hub.db", | |
| }, | |
| { | |
| name: "file url with triple slashes", | |
| in: "file:///tmp/hub.db", | |
| wantDSN: "file:/tmp/hub.db?cache=shared", | |
| wantPath: "/tmp/hub.db", | |
| }, |
| func newTestStore(t *testing.T) store.Store { | ||
| t.Helper() | ||
| s, err := sqlite.New(":memory:") | ||
| client, err := entc.OpenSQLite("file:"+t.Name()+"?mode=memory&cache=shared", entc.PoolConfig{}) |
There was a problem hiding this comment.
Sanitize t.Name() by replacing slashes (/) with underscores. Subtests in Go contain slashes in their names (e.g., TestParent/Subtest), which SQLite interprets as directory paths even for in-memory databases, causing "unable to open database file" errors if the directory does not exist on disk. Note: you may need to import "strings" if it is not already present.
| client, err := entc.OpenSQLite("file:"+t.Name()+"?mode=memory&cache=shared", entc.PoolConfig{}) | |
| dbName := strings.ReplaceAll(t.Name(), "/", "_") | |
| client, err := entc.OpenSQLite("file:"+dbName+"?mode=memory&cache=shared", entc.PoolConfig{}) |
- server_migrate.go: use nil-checked deferred close for src DB, and explicitly close src before dropSQLiteFile to prevent Windows sharing violations - server_migrate.go: handle file:// prefix before file: to correctly parse file:///path/to/db URLs - server_foreground.go: evaluate GetControlChannelManager() inside the ownsLocally closure to avoid capturing a stale nil value - server_migrate_test.go: add test case for file:/// URL format - server_test.go: sanitize t.Name() slashes in newTestStore to prevent SQLite path errors in subtests
* Add engineering glossary (GLOSSARY.md) with canonical terms and cleanup tracker Add a root-level GLOSSARY.md capturing canonical Scion terminology in the ubiquitous-language format (preferred term + synonyms to avoid), grouped by domain cluster, plus an Exceptions & Future Cleanup section tracking known naming-convergence work. Link it from agents.md as the canonical engineering glossary. * Revise glossary: broker reframe, Event Bus, Hub-managed, and term refinements Refine entries from review: redefine Message Broker as the pluggable messaging-integration system (add Broker plugin, Built-in broker); add Event Bus for the NATS real-time/event capability; collapse hub-native/Hub Workspace into Hub-managed project/workspace; tighten Template (harness-agnostic, optional default harness-config), Skill (template-only, Agent Skills link), Profile (named runtime-broker settings bundle), Harness/Harness-config; reframe Hub as the control plane in both modes; add Group and Message Group. Expand Exceptions & Future Cleanup to nine tracked items. * Glossary: restructure headings, add cross-refs, modes table, and new terms - Retitle to "Scion Glossary"; drop the "Language" wrapper and promote the thematic categories to top-level sections - Add an Operations section (Attach, Dispatch) and move Profile next to Runtime Broker - Add a Local/Workstation/Hosted comparison table and "See also" cross-refs across the main confusable term clusters - Reframe the intro around the three-way broker collision (incl. Event Bus) and defer to the disambiguation rule; sentence-case "Shared directory" - Add canonical entries for Secret, Notification, and Schedule - Add a "Potential Future Additions" section cataloguing candidate terms * Glossary: remove Exceptions & Future Cleanup tracker The cleanup items are now tracked by dedicated agents that open GitHub issues and implementation PRs, so the staged tracker no longer lives in the glossary. Reword the two intro/disambiguation references that pointed at the removed section to point at GitHub issues instead. --------- Co-authored-by: Preston Holmes <ptone@google.com>
- Add github.com/jackc/pgx/v5/stdlib (registers as "pgx")
- driver_postgres.go: blank import pgx stdlib instead of lib/pq
- OpenPostgres: open via sql.Open("pgx", dsn) + entsql.OpenDB
- Introduce PoolConfig (applied to *sql.DB); thread through
OpenSQLite/OpenPostgres and update all callers
- go mod tidy drops lib/pq
- DatabaseConfig gains MaxOpenConns / MaxIdleConns / ConnMaxLifetime plus ConnMaxLifetimeDuration() helper - DefaultGlobalConfig sets sqlite pool defaults (MaxOpenConns=1, load-bearing for write serialization) - applyDatabasePoolDefaults fills postgres defaults (20/5/30m) and forces sqlite MaxOpenConns=1; called in both load paths - Mirror fields in V1DatabaseConfig + both conversion directions - Wire pool settings into entc.OpenSQLite in initStore
P0-3: pkg/store/storetest/ — backend-agnostic, table-driven CRUD oracle. A Factory(t) -> store.Store is injected; generic Domain[T] descriptors drive Create/Read/Update/Delete (+optional soft-delete)/List-paginate/List-filter. Ships group + policy domains and runs green against today's CompositeStore (SQLite base + Ent DB). Ready to accept a postgresFactory for P3-2. P0-4: internal/fixturegen/ — Go-defined spec seeding >=1 row per table across all 30 domain tables, with edge cases (NULL optionals, max-length strings, nested/unicode JSON, soft-deleted agent, BLOB). Deterministic. 'go run ./internal/fixturegen' emits testdata/hub-v46-fixture.db, prints a 30-table coverage report, and caches the blob to the scratchpad mount. CI gate fails if any table has zero rows.
Add Ent-backed implementations of the notification, GCP service account, GitHub App installation, and user access token store sub-interfaces: - notification_store.go: NotificationStore (subscriptions, notifications, templates). Dispatch uses an atomic conditional update as the multi-replica claim primitive, and an optional NotificationPublisher designs in the LISTEN/NOTIFY fan-out for created/dispatched events. - external_store.go: GCPServiceAccountStore + GitHubInstallationStore + UserAccessTokenStore. GitHub create is idempotent (INSERT OR IGNORE semantics), repositories/scopes are JSON, default_scopes is CSV, and tokens support key-hash lookup. Legacy api_keys is intentionally not surfaced. - storetest: add GCPServiceAccount, SubscriptionTemplate, and NotificationSubscription CRUD-parity domains. Does not modify composite.go.
- schedule_store.go: ScheduleStore + ScheduledEventStore sub-interfaces with dialect-aware SELECT FOR UPDATE SKIP LOCKED claim helper for the ListDueSchedules / ListPendingScheduledEvents job-claim paths (plain SELECT on SQLite, SKIP LOCKED on Postgres). - maintenance_store.go: run-state RMW, AbortRunningMaintenanceOps, Go-side seed (uuid.New) replacing SQLite randomblob() UUID seeds. - message_store.go: CRUD, read flags, PurgeOldMessages, design-in PublishUserMessage hook for Postgres LISTEN/NOTIFY. - pkg/ent/client_driver.go: hand-written Client.Driver() accessor for dialect detection + raw locking queries.
Implements the Ent-backed store adapters for the user and
allowlist/invite domains, plus their CRUD-parity oracle descriptors.
pkg/store/entadapter/user_store.go (store.UserStore):
- CreateUser/GetUser/GetUserByEmail/UpdateUser/UpdateUserLastSeen/
DeleteUser/ListUsers.
- Case-insensitive email: emails are normalized to lower case on write
(so the plain unique index enforces case-insensitive uniqueness,
equivalent to the legacy UNIQUE COLLATE NOCASE) and matched with
EmailEqualFold (lower(email)=lower($1)) on read. ent codegen +
AutoMigrate cannot emit a real lower(email) functional index across
both SQLite (tests) and Postgres, so the invariant is enforced at the
port layer.
- Offset-based pagination matching the legacy SQLite store.
pkg/store/entadapter/allowlist_store.go (store.AllowListStore +
store.InviteCodeStore):
- Full allow-list + invite-code CRUD.
- BulkAddAllowListEntries uses CreateBulk + OnConflictColumns(email).
Ignore() for race-safe INSERT-OR-IGNORE; added/skipped counts mirror
the legacy per-row semantics (existing + within-batch dups skipped).
- IncrementInviteUseCount is a single atomic conditional UPDATE
(revoked=false AND not expired AND (max_uses=0 OR use_count<max_uses)),
which is race-free on both backends without SELECT...FOR UPDATE. The
sql/lock feature is enabled and ForUpdate is available for genuine
multi-statement RMW paths.
- ListAllowListEntriesWithInvites batch-joins invite codes (invite_id is
a plain column, not an Ent edge).
Schema:
- pkg/ent/schema/user.go: add nillable last_seen field (+ index) needed
by UpdateUserLastSeen / lastSeen sort; document the case-insensitive
email strategy.
- pkg/ent/generate.go: enable --feature sql/upsert,sql/lock (required for
OnConflict and ForUpdate).
Tests (all passing):
- pkg/store/storetest/domains_user.go: UserDomain, AllowListDomain,
InviteCodeDomain oracle descriptors (kept in a separate file to avoid
contending on domains.go).
- entadapter oracle test runs the shared CRUD-parity suite directly
against the new adapters; behavior tests cover case-insensitivity,
bulk idempotency, conditional increment, stats, and the invite join.
NOTE: Generated Ent code under pkg/ent/** is intentionally NOT included.
This is a shared worktree where sibling port agents concurrently modify
schemas and the same feature flags; the generated code must be
regenerated at wave integration via:
go generate ./pkg/ent/...
Verified locally that regeneration + full build + tests pass.
Per P2 scope: composite.go wiring and ensureEntUser shadow removal are
deferred to P2-collapse.
Add Ent-backed store implementations for the secret/env and template/harness domains, mirroring the legacy SQLite semantics: - entadapter/secret_store.go: SecretStore implementing store.SecretStore + store.EnvVarStore. Polymorphic (scope, scope_id) addressing, COALESCE target->key projection, version bump on update, get-then-update upsert, and transitive ListProgenySecrets via a created_by IN-list over the ancestor set (user scope + allow_progeny only; encrypted value withheld). - entadapter/template_store.go: TemplateStore implementing store.TemplateStore + store.HarnessConfigStore. base_template hierarchy, scope/project_id backwards-compat lookups, content_hash, JSON config/files columns, DeleteByScope. Subscription templates are owned by NotificationStore. - Direct Ent unit tests incl. a progeny-inheritance parity test. - storetest: Template/HarnessConfig/Secret/EnvVar domain descriptors wired into RunStoreSuite for cross-backend CRUD parity.
Port the project/broker domain (projects, runtime_brokers, project_contributors,
project_sync_state) and the broker-auth domain (broker_secrets,
broker_join_tokens) from raw SQL to Ent adapters.
- pkg/store/entadapter/project_store.go: implements ProjectStore,
RuntimeBrokerStore, ProjectProviderStore and ProjectSyncStateStore.
* provider + sync-state upserts use Ent OnConflict().UpdateNewValues()
(sql/upsert) keyed on the (project_id, broker_id) unique index.
* runtime broker heartbeat/update use an optimistic version-CAS loop on a
new internal lock_version token, serializing concurrent writers portably
across SQLite (tests) and Postgres without SELECT ... FOR UPDATE.
* slug lookups support case-insensitive matching (EqualFold).
* project computed fields (AgentCount, ActiveBrokerCount, ProjectType) are
derived via Ent queries, matching the legacy SQLite store.
- pkg/store/entadapter/brokersecret_store.go: implements BrokerSecretStore
(per-broker HMAC secrets + short-lived join tokens, expiry cleanup).
- Project Ent schema: add operational fields for full parity
(default_runtime_broker_id, shared_dirs, github_*, git_identity).
- RuntimeBroker Ent schema: relax vestigial type column to Optional, add
internal lock_version concurrency token.
- Regenerate Ent with sql/upsert,sql/lock features.
- storetest: add Project, RuntimeBroker, BrokerSecret and BrokerJoinToken
CRUD-parity domains.
- Unit tests for both adapters.
Per the integration plan, composite.go wiring and ensureEntProject shadow
removal are deferred to P2-collapse.
Regenerated with --feature sql/upsert,sql/lock to support OnConflict upserts and ForUpdate/SKIP LOCKED job claims.
Wire all Ent-backed sub-stores into CompositeStore via embedding, removing the raw-SQL base store and the User/Agent/Project shadow-sync machinery (ensureEntUser/ensureEntAgent/ensureEntProject). CompositeStore now serves every domain from a single Ent client and implements Close/Ping/Migrate directly. Collapse initStore() to open one Ent SQLite DB (no _ent shadow DSN, no MigrateGroveToProjectData, no raw sqlite.New). Register the User, AllowList, and InviteCode domains in the storetest CRUD-parity suite. Update entadapter tests for the single-DB NewCompositeStore(client) signature. go build ./... green; go test ./pkg/store/entadapter/... ./pkg/store/storetest/... green.
Delete the ~6k-LOC raw-SQL store (sqlite.go) and its per-domain sibling files (brokersecret, gcp_service_account, github_installation, maintenance, messages, notification, project_sync_state, schedule, scheduled_event) plus their tests, including the inline schema-migration scaffold. Keep driver.go, which registers the pure-Go SQLite driver used by Ent's SQLite backend. Repoint the two non-test consumers to the Ent-backed store: - cmd/hub_secret_migrate.go now opens an Ent client + CompositeStore. - internal/fixturegen opens via entc and seeds the Ent schema's *sql.DB. go build ./... green; no remaining production references to the raw store.
…y PK Replace the removed raw-SQL store in downstream tests with an Ent-backed newTestStore helper (pkg/hub, pkg/secret) and update cmd/server_test.go and internal/fixturegen tests. Port the 8 raw-SQL DB() access sites in hub tests via a new CompositeStore.DB() escape-hatch accessor. Fix a production bug surfaced by the collapse: hub/server.go signingKeySecretID generated a non-UUID secret primary key, which the Ent secret store rejects; it now derives a deterministic UUIDv5. go build ./... green; entadapter and storetest suites green. NOTE: hub/secret/fixturegen suites now COMPILE but many tests still fail because their fixtures seed non-UUID string IDs that the UUID-PK Ent schema rejects; addressed in follow-up commits (tid() helper).
Wrap human-readable test identifiers in tid() (deterministic UUIDv5) so the UUID-PK Ent store accepts them while preserving cross-reference consistency and ID-equality assertions. Reduces pkg/hub failures from 611 to 79; remaining failures are behavioral, not ID-format, and are addressed separately.
Restore raw-SQL parity: CompositeStore.Migrate now runs AutoMigrate and seeds built-in maintenance operations (the raw store seeded these in its migrations). initStore and hub test helpers call s.Migrate() so production and tests seed consistently. Fixes the maintenance-operation hub tests (404 'Operation not found'). pkg/hub failures 79 -> 71.
Add slugs/broker names to test fixtures that previously relied on the raw store's lenient (no-validator) inserts: project/agent slugs in the logs test helper, broker slugs in embedded/profile/authz fixtures, and BrokerName on envgather ProjectProvider literals. pkg/hub failures 71 -> 57.
Restore raw-SQL store parity: a malformed identifier cannot match any UUID primary key, so get-by-id lookups now report store.ErrNotFound instead of store.ErrInvalidInput. This matches the raw store (a lookup with a bad id simply returned no row) and is what callers depend on — e.g. resolveTemplate passes a template *name* to GetTemplate and relies on ErrNotFound to fall back to slug-based resolution. New parseGetID helper applied across all 17 get-by-id methods. pkg/hub failures 56 -> 40; entadapter/storetest stay green.
- controlchannel_client_test: revert tid() wraps (store-less path-builder test; IDs must match the expected literal paths). - github/envgather: project-scoped route handlers resolve the project by UUID id, so build paths with tid(rawID) via fmt.Sprintf instead of the old raw-id literal. pkg/hub failures 40 -> 32.
The tid() sweep over-wrapped a non-ID expected value in a pure-function test; restore the literal GCP project id.
The GCP service account project_id holds the GCP *cloud project* identifier (e.g. 'my-project-123'), a free-form string — not a UUID. The schema declared it field.UUID, so entadapter CreateGCPServiceAccount/Update did parseUUID(sa.ProjectID) and rejected real GCP project ids, breaking SA mint/create with a 400 in production (storetest masked it by passing a UUID). Change the schema field to field.String, regenerate Ent, and store/read project_id as a string in external_store.go. Fixes ~7 hub GCP tests; pkg/hub 31 -> 23.
Unwrap the over-wrapped 'my-project' expectation now that project_id is a string, and wrap the dynamic project-settings project ID with tid().
events_test exercises the in-memory ChannelEventPublisher directly; its ProjectID/IDs are subject-string components, not stored UUIDs. The tid() sweep wrongly rewrote them so published subjects no longer matched the subscriptions (timeouts). Restore the literal values. pkg/hub 19 -> 12.
Use tid() UUIDs in the maintenance run-detail path and the notifications agentId query params; guard list indexing with require.Len so a mismatch fails cleanly instead of panicking (panics truncate the package run).
…eared Panics ([0] on empty lists) had been truncating the package run, hiding many failures and starving the tid() sweep. With those guarded, sweep the newly reached tests: wrap dynamic rune-suffix IDs and the setupProjectWithBroker / seedCreatedAgentForHarnessTest helper IDs, and convert raw query-param project IDs to tid(). No UUID-parse errors remain in pkg/hub.
… race (B1-4, B1-5) B1-4: HandleUpgrade returns sessionID; markBrokerOnline(brokerID, sessionID) now calls ClaimRuntimeBrokerConnection(brokerID, instanceID, sessionID), recording affinity + online + heartbeat in one CAS write. B1-5: SetOnDisconnect callback gains sessionID; the handler compare-and-clears via ReleaseRuntimeBrokerConnection and skips the offline stamp when affinity has moved (flap). removeConnection now only removes/fires for the matching session, so an old connection's teardown can't drop a newer live socket.
…(B2-1, B2-2) B2-1: new BrokerDispatch ent entity (table broker_dispatch) — id, broker_id, agent_id(null), agent_slug, project_id(null), op, args(JSON), state, result, claimed_by, attempts, error, created_at/updated_at, deadline_at(null); index (broker_id,state). store.BrokerDispatch model + state constants. B2-2: messages.dispatch_state (default 'pending') + dispatched_at; wired through store.Message + entadapter conversion/create. Dialect-neutral.
…broker_cmd (B2-4) Introduce a CommandBus interface and PostgresCommandBus implementation that listens on the new global channel scion_broker_cmd for broker dispatch wakeup signals. This is a sibling of PostgresEventPublisher, reusing the same connect/reconnect/keepalive helpers but maintaining its own independent pgx connection and pool (design §5.1). Key components: - PostgresCommandBus: LISTEN loop with backoff-reconnect on its own dedicated connection; filters signals by local broker ownership via an injected ownsLocally func (wired to ControlChannelManager.IsConnected); invokes an injected onSignal reconcile callback (to be wired to the reconcile drain in B2-5). - NotifyBrokerCmd: issues NOTIFY inside the caller's transaction so the signal commits atomically with the durable intent row (mirrors PublishTx). - NoopCommandBus: safe no-op for the SQLite backend (single-process, all brokers are local). - Backend selection in newCommandBus mirrors newEventPublisher: Postgres driver → PostgresCommandBus; otherwise → NoopCommandBus. - Server.SetCommandBus/CommandBus() setter/getter; cleanup in both Shutdown and CleanupResources paths.
BrokerDispatchStore: Insert/Claim(CAS pending->in_progress)/Complete/Fail/ ListPendingDispatch + MarkMessageDispatched(CAS)/ListPendingMessages (via agent runtime_broker_id). Wired into CompositeStore + store.Store. Tests: concurrent claim single-winner (exactly-once), drain pending-only, message CAS dedupe, complete/fail transitions, pending-messages-by-broker-agent.
… (B2-5) Server.reconcileBroker drains pending broker_dispatch rows (CAS-claim -> exec -> done/fail) and pending messages (CAS MarkMessageDispatched -> deliver) for a broker this node owns. Exactly-once via store CAS; idempotent + concurrent-safe. Wired as durability backstop into markBrokerOnline (async on reconnect) and as the command-bus signal handler (SetOnSignal -> ReconcileBroker). Op executors are seams (executeDispatch/deliverMessage) that Phase 3/4 fill with local tunnel ops.
routeLocal (IsConnected, unchanged fast path) | routeForward (affinity owner alive) | routeHTTP (broker endpoint set) | routeUndeliverable. Affinity is a hint only (StoreAffinityLookup over connected_hub_id + last_heartbeat freshness), injectable for testing. Not yet wired into dispatch (B3-2 wires message path). Table-driven tests over all branches incl. local-precedence + nil-affinity.
…r drain (B3-2, B3-3) Route-gate the message send path: HybridBrokerClient.MessageAgent now uses route(brokerID, endpoint) to decide delivery. routeLocal and routeHTTP follow existing paths unchanged. routeForward/routeUndeliverable return ErrMessageDeferred — the message row (already persisted with dispatch_state=pending) is the durable intent. All call sites (handleAgentMessage, set[], broadcastDirect, messagebroker, notifications, scheduler) catch the sentinel, emit a best-effort NOTIFY wakeup via SignalBrokerCmd, and return 202 Accepted (or log as deferred). Fill the deliverMessage seam in reconcile.go: resolves the agent from the message's AgentID, obtains the dispatcher, and calls DispatchAgentMessage for local tunnel delivery. reconcileBroker already CAS-marks dispatched before calling this. Wire SetAffinityLookup(StoreAffinityLookup(store, 0)) on the HybridBrokerClient in CreateAuthenticatedDispatcher so route() can return routeForward when another node owns the broker. Add SignalBrokerCmd to the CommandBus interface — a best-effort NOTIFY using the bus's own pool, used by the message path where the durable intent is the message row itself and the NOTIFY is only a wakeup hint.
…t/stop/restart) (B4-1, B4-2) B4-1: Rolling-timeout wait helper (dispatch_wait.go) - waitForAgentTransition subscribes to agent.<id>.status events and loops with a rolling window (dispatchRollingTimeout=90s) that resets on ANY AgentStatusEvent (phase/activity/detail change). - Terminal phase → return phase, nil. Window expiry → ErrDispatchFailed. Context cancellation → ctx.Err(). - Caller subscribes BEFORE writing intent, passes the channel + unsub. B4-2: Cross-node start/stop/restart dispatch - Route-gated HybridBrokerClient.StartAgent/StopAgent/RestartAgent exactly like MessageAgent: routeLocal → control-channel tunnel (unchanged fast path), routeHTTP → HTTP fallback, routeForward/routeUndeliverable → ErrLifecycleDeferred. - Dispatch args structs (dispatch_args.go): StartDispatchArgs captures task, resolvedEnv, resolvedSecrets, inlineConfig, sharedDirs, sharedWorkspace, projectPath, projectSlug, harnessConfig. RestartDispatchArgs captures resolvedEnv. StopDispatchArgs is empty. All JSON-serializable for broker_dispatch.args column. - Owner-side executeDispatch (reconcile.go): start/stop/restart cases deserialize args, load agent from store, call local DispatchAgentStart/Stop/Restart via the dispatcher. Unknown ops (delete, finalize_env, etc.) still fail cleanly for B4-3/B4-4. Tests: waitForAgentTransition (terminal, error, rolling reset, silence expiry, context cancel, unsub); route-gating of Start/Stop/Restart returns ErrLifecycleDeferred when non-local; executeDispatch lifecycle cases invoke the local dispatcher; args round-trip (serialize→deserialize) is lossless; reconcile end-to-end lifecycle path.
…omplete)
The originator-side orchestration was missing: ErrLifecycleDeferred was
returned by HybridBrokerClient but nothing caught it. Now the full
cross-node start/stop/restart flow works transparently to all handler
call sites.
Originator side (HTTPAgentDispatcher):
- DispatchAgentStart/Stop/Restart catch ErrLifecycleDeferred after
env/secret resolution and invoke deferredLifecycle:
1. Subscribe("agent.<id>.status") BEFORE writing intent
2. InsertBrokerDispatch{op, agent_id, broker_id, args}
3. Best-effort SignalBrokerCmd (row is durable backstop)
4. waitForAgentTransition with terminal set per op
5. Return nil on success, error on error-phase/timeout
- SetCrossNodeDeps(events, commandBus) wired in server.go's
getOrCreateDispatcher, so all handler call sites get cross-node
for free with synchronous semantics preserved.
- Local path (routeLocal) is unchanged at zero added latency — no
subscribe, no intent row, no wait.
Args decision: owner RE-RESOLVES env/secrets via DispatchAgentStart
(all hub instances share the same store + secret backend), so
StartDispatchArgs carries only {Task}. RestartDispatchArgs and
StopDispatchArgs are empty. This avoids serializing potentially large
env/secrets into the DB while remaining correct because all hubs read
from the same shared store.
waitForAgentTransition refactored to a standalone function (no Server
receiver) so the dispatcher can call it directly.
Tests:
- TestDeferredStart_WritesIntentAndWaits: deferred start writes a
broker_dispatch row, waits, returns success on "running" event
- TestDeferredStart_ReturnsErrorOnErrorPhase: error phase → error
- TestLocalStart_SkipsIntentRow: local path calls tunnel directly,
no intent row written
- All existing tests pass (no regressions)
OAuth login behind the load balancer intermittently failed with state_mismatch: the CSRF state token (and the entire web session) was stored in a gorilla FilesystemStore on the handling replica's local disk, while the browser only carried a session-ID cookie. When the LB routed /auth/login and /auth/callback to different replicas, the callback replica had no matching session file -> empty state -> state_mismatch. It only "worked" when both hops happened to hit the same backend. The same flaw affected the post-login session: sessionToBearerMiddleware reads the Hub access/refresh JWTs from that disk-local store on every API request, so sessions silently dropped whenever a follow-up request landed on a different replica. Replace the FilesystemStore with an encrypted, signed gorilla CookieStore so the whole session lives in the client's cookie and any replica sharing SESSION_SECRET can read it. Keys are derived deterministically from SESSION_SECRET (32-byte HMAC auth key + 32-byte AES-256 encryption key, domain-separated). No DB, no migration; works with N replicas. The original switch to disk was motivated by a "JWT tokens exceed 4096 bytes" concern. Measured against the current compact HS256 tokens the full session (identity + access + refresh) encodes to ~2.6 KB, well under the browser's ~4 KB per-cookie cap, so the securecookie length limit is left in force (oversize would now error+log, not silently drop). Tests: replace the obsolete NoMaxLengthLimit test with a cross-replica round-trip regression test (cookie minted by replica A decodes on replica B with the same secret; carries OAuth state + post-login tokens) plus a negative test (a different secret cannot decode the cookie).
…4-4)
Route-gate HybridBrokerClient.DeleteAgent, CheckAgentPrompt,
CreateAgentWithGather, and FinalizeEnv through route() so
routeForward/routeUndeliverable return ErrLifecycleDeferred (matching
start/stop/restart pattern from B4-2).
B4-3 (delete dispatch):
- deferredDelete on ErrLifecycleDeferred: subscribe
broker.dispatch.<id>.done → InsertBrokerDispatch{op:delete} →
SignalBrokerCmd → waitForDispatchDone (reads DB row, authoritative).
- Owner executeDispatch case "delete": deserializes DeleteDispatchArgs →
local DispatchAgentDelete (idempotent, 404 ok).
- DeleteDispatchArgs struct + UnmarshalDeleteArgs for args round-trip.
B4-4 (create-time data ops):
- deferredDataOp/deferredDataOpResult: common originator flow for ops
that return results via the dispatch row (design §6.3). Subscribe to
broker.dispatch.<id>.done BEFORE writing intent, insert dispatch,
signal, waitForDispatchDone, read result from GetBrokerDispatch.
- deferredCheckPrompt: returns bool from CheckPromptResult in row.
- deferredFinalizeEnv: fire-and-forget via deferredDataOp.
- deferredCreateWithGather: returns envRequirements from row result.
- Owner executeDispatch cases: check_prompt, finalize_env, create —
run local op, marshal result JSON, return it.
- PublishDispatchDone on EventPublisher: slim completion event
broker.dispatch.<id>.done emitted by reconcile loop on complete/fail.
- waitForDispatchDone: event-driven wait with bounded re-read at
rolling timeout (missed event recovery, design §6.3).
- GetBrokerDispatch added to BrokerDispatchStore interface + entadapter.
Local fast path unchanged (routeLocal → zero added latency).
Add observability for the multi-node broker dispatch pipeline: Sweep: - CountStuckPendingMessages store method (messages pending > threshold) - brokerMessageSweepHandler registered as RecurringSingleton with LockBrokerMessageSweep (0x5C100007), runs every 1m Metrics (pkg/observability/dispatchmetrics): - Counters: dispatch published/claimed/done/failed, message dispatched - Gauge: message stuck (pending beyond 5m threshold) - Histograms: intent-to-done latency, reconcile drain duration - Counter: command bus reconnects Emit sites: - InsertBrokerDispatch → IncPublished (httpdispatcher.go) - ClaimBrokerDispatch → IncClaimed (reconcile.go) - CompleteBrokerDispatch → IncDone + RecordDispatchLatency (reconcile.go) - FailBrokerDispatch → IncFailed (reconcile.go) - MarkMessageDispatched → IncMessageDispatched (reconcile.go) - reconcileBroker → RecordReconcileDrainDuration (reconcile.go) - command bus reconnect → IncCmdBusReconnects (command_bus.go) - sweep handler → ObserveMessageStuck (sweep.go)
…ross-replica login loop The cookie-store fix (0515e2a) made the web session replica-portable, but the Hub JWT *inside* the cookie is still signed with a per-replica key: ensureSigningKey scopes signing keys to (scope=hub, scope_id=hubID) and hubID = sha256(hostname)[:12]. The integration env runs two replicas of one logical hub behind a single LB, sharing one Postgres DB and one SESSION_SECRET but with different hostnames -> different hubIDs -> different HS256 signing keys. So a user JWT minted on replica A failed signature verification on replica B (go-jose: error in cryptographic primitive); refresh failed too (refresh token signed with the same foreign key), so sessionToBearerMiddleware declared the session irrecoverably invalid, DELETED the cookie (MaxAge=-1) and returned session_expired. The cookie deletion turns it into a redirect loop: dashboard flashes, then /login?error=session_expired. Fix: extend the 0515e2a approach (replica-portable via the shared secret) from the cookie to the keys inside it. Add ServerConfig.SharedSigningSecret; when set, ensureSigningKey derives the agent and user signing keys deterministically from it (domain-separated by key name) and bypasses per-host secret-backend storage. cmd feeds the same --session-secret / SESSION_SECRET value into both the web cookie store and the hub config via a new resolveSessionSecret() helper. Empty secret keeps the existing per-hub behavior (no regression for single-node/local dev). Tests: cross-replica round trip (different hubID + same secret -> identical keys, token minted on A validates on B; different secret cannot) plus pre-configured-key precedence. Note: rollout rotates the signing keys (now derived from SESSION_SECRET), so existing web/CLI tokens are invalidated once and users re-login.
- server_migrate.go: use nil-checked deferred close for src DB, and explicitly close src before dropSQLiteFile to prevent Windows sharing violations - server_migrate.go: handle file:// prefix before file: to correctly parse file:///path/to/db URLs - server_foreground.go: evaluate GetControlChannelManager() inside the ownsLocally closure to avoid capturing a stale nil value - server_migrate_test.go: add test case for file:/// URL format - server_test.go: sanitize t.Name() slashes in newTestStore to prevent SQLite path errors in subtests
…ansitions C1: Call MarkMessageDispatched after successful local dispatch in messagebroker.go and handlers.go (single-recipient, set[], broadcast). Without this, successfully dispatched messages remained dispatch_state=pending and were re-delivered on every broker reconnect via reconcileBroker. C2: Return immediately in messagebroker.go deliverToAgent when CreateMessage fails — without a durable row, a deferred signal has nothing for the owning node to reconcile. C3: Guard CompleteBrokerDispatch and FailBrokerDispatch with state=in_progress CAS predicate so a done dispatch cannot be flipped to failed or vice versa. Update tests to claim before completing/failing to match the new CAS guard.
- server_migrate.go: use nil-checked deferred close for src DB, and explicitly close src before dropSQLiteFile to prevent Windows sharing violations - server_migrate.go: handle file:// prefix before file: to correctly parse file:///path/to/db URLs - server_foreground.go: evaluate GetControlChannelManager() inside the ownsLocally closure to avoid capturing a stale nil value - server_migrate_test.go: add test case for file:/// URL format - server_test.go: sanitize t.Name() slashes in newTestStore to prevent SQLite path errors in subtests
88c9f9a to
828ea4b
Compare
…s after rebase Post-rebase fixups to align the feature branch with main's refactoring: - broker package → eventbus package rename (types, imports, methods) - SetRecipient → GroupRecipient, SetMessageResponse → GroupMessageResponse - hubNativeProjectPath → hubManagedProjectPath - ProjectTypeHubNative → ProjectTypeHubManaged - populateAgentConfig gains ctx parameter - Add missing handleResourcesImport and handleMessageChannels handlers - Add ListChannels method to MessageBrokerProxy - Wire newCommandBus in server_foreground.go - Restore main's test fixtures for renamed APIs
828ea4b to
ffe5373
Compare
Summary
Adds cross-node broker dispatch so API calls hitting any hub replica are correctly routed to the replica that holds the broker's control-channel WebSocket. Fixes the silent split-brain where messages sent to the wrong replica were dropped.
Depends on #304 (Postgres Core) — base branch set to
main.What's included
instanceID;connected_hub_id/session_id/connected_atcolumns onruntime_brokers;Claim/ReleaseCAS methods;sessionIDthreading + disconnect-race fix (stale offline stamp on broker reconnect)broker_dispatchdurable intent table;messages.dispatch_statecolumn;BrokerDispatchstore methods with CAS claim;PostgresCommandBusLISTEN/NOTIFY signal listener onscion_broker_cmd; reconcile-on-connect drainroute()decision inHybridBrokerClient(local → forward → HTTP → undeliverable); transactional intent+signal for cross-node messages; owner-side drain withMarkMessageDispatchedstart/stop/restart/deletecross-node dispatch; create-time data ops (finalize_env,check_prompt,gather)SESSION_SECRET(eliminates cross-replica login loop)Test plan
go build ./...passesgo test ./pkg/hub/...passesstartfrom Hub A for a broker on Hub B → agent reachesrunning