Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .design/project-log/2026-06-05-broker-disconnect-race-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Project Log: Broker Disconnect Reconnect Race Fix (Issue #131)

**Date:** 2026-06-05
**Task:** Unify broker disconnect race fix from two branches into PR #303

## Problem

When a broker disconnects and reconnects rapidly, the stale disconnect callback's
offline stamp can clobber the new connection's online status. The root cause is a
TOCTOU race: `ReleaseRuntimeBrokerConnection` and `UpdateRuntimeBrokerHeartbeat`
were separate calls — the heartbeat update has no session guard and unconditionally
overwrites status. Provider statuses are also clobbered and never restored by
heartbeats, leaving the broker permanently invisible until hub restart.

## Solution

Added `ReleaseAndMarkBrokerOffline` to the store interface — a single CAS write
that atomically clears affinity AND stamps status=offline, only if the session
still matches. If a concurrent reconnect has already claimed the broker with a
new session, the compare fails and the callback is a no-op.

Also added a re-check guard in `server.go` before updating provider statuses:
after the atomic release, re-read the broker to confirm no concurrent
`markBrokerOnline` has re-claimed it before stamping providers offline.

## Branch Unification

Two branches addressed this issue:
- `scion/dev-issue-131` (PR #303): had only a docs/project-log commit, no code fix
- `origin/fix/session-guarded-broker-disconnect` (fork PR #144): had the complete
code fix with tests

The fork branch's fix was the more complete solution. Rebased PR #303 onto
upstream main and cherry-picked the fork's fix commit to produce a single
unified branch.

## Files Changed

- `pkg/store/store.go` — added `ReleaseAndMarkBrokerOffline` to `RuntimeBrokerStore` interface
- `pkg/store/entadapter/project_store.go` — implemented `ReleaseAndMarkBrokerOffline` with CAS retry loop
- `pkg/hub/server.go` — rewired `SetOnDisconnect` callback to use the atomic method + provider re-check guard
- `pkg/store/entadapter/broker_affinity_test.go` — 4 new tests covering the atomic method

## Verification

- All 10 broker affinity tests pass (4 new + 6 existing)
- Hub package compiles cleanly
- Pre-existing test failures in `pkg/hub` (unrelated to this change) confirmed on upstream main
25 changes: 16 additions & 9 deletions pkg/hub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,29 +724,36 @@ func New(cfg ServerConfig, s store.Store) (*Server, error) {
Debug: cfg.Debug,
}, logging.Subsystem("hub.control-channel"))
// Set disconnect callback to mark broker offline when WebSocket drops.
// Compare-and-clear affinity first: only stamp offline if this hub instance +
// session still owns the broker. If affinity has moved (broker flapped to
// another replica or re-dialed with a newer session), this is a stale
// disconnect and we must NOT clobber the live owner's online status.
// ReleaseAndMarkBrokerOffline atomically clears affinity AND stamps
// status=offline in a single CAS write — if a concurrent reconnect has
// already claimed the broker with a new session, the compare fails and the
// callback is a no-op. This eliminates the TOCTOU race where a separate
// ReleaseRuntimeBrokerConnection + UpdateRuntimeBrokerHeartbeat allowed
// the offline stamp to clobber a concurrent markBrokerOnline (issue #131).
srv.controlChannel.SetOnDisconnect(func(brokerID, sessionID string) {
ctx := context.Background()

cleared, err := s.ReleaseRuntimeBrokerConnection(ctx, brokerID, srv.instanceID, sessionID)
cleared, err := s.ReleaseAndMarkBrokerOffline(ctx, brokerID, srv.instanceID, sessionID)
if err != nil {
slog.Error("Failed to release broker affinity on disconnect", "brokerID", brokerID, "sessionID", sessionID, "error", err)
return
}
if !cleared {
// Another replica (or a newer session on this replica) already owns
// the socket. Skip the offline stamp to avoid clobbering it.
slog.Info("broker reconnected elsewhere; skipping offline stamp", "brokerID", brokerID, "staleSession", sessionID)
return
}

slog.Info("Broker disconnected, marking offline", "brokerID", brokerID, "sessionID", sessionID)

if err := s.UpdateRuntimeBrokerHeartbeat(ctx, brokerID, store.BrokerStatusOffline); err != nil {
slog.Error("Failed to mark broker offline", "brokerID", brokerID, "error", err)
// Guard: re-read the broker before updating provider statuses. A
// concurrent markBrokerOnline may have already re-claimed the broker
// between our atomic release+offline and now. If so, skip provider
// updates to avoid clobbering the new session's online providers.
broker, rerr := s.GetRuntimeBroker(ctx, brokerID)
if rerr == nil && broker.ConnectedSessionID != nil && *broker.ConnectedSessionID != "" {
slog.Info("broker re-claimed by new session after release; skipping provider offline stamp",
"brokerID", brokerID, "staleSession", sessionID, "newSession", *broker.ConnectedSessionID)
return
}

// Update all project provider records for this broker
Expand Down
84 changes: 84 additions & 0 deletions pkg/store/entadapter/broker_affinity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,90 @@ func TestReleaseRuntimeBrokerConnection_NoOpWhenUnclaimed(t *testing.T) {
assert.False(t, cleared)
}

// ---------------------------------------------------------------------------
// ReleaseAndMarkBrokerOffline — atomic release + offline stamp
// ---------------------------------------------------------------------------

func TestReleaseAndMarkBrokerOffline_StampsOffline(t *testing.T) {
ps := newTestProjectStore(t)
ctx := context.Background()
b := newOfflineBroker(t, ps)

require.NoError(t, ps.ClaimRuntimeBrokerConnection(ctx, b.ID, "hub-1", "sess-1"))

cleared, err := ps.ReleaseAndMarkBrokerOffline(ctx, b.ID, "hub-1", "sess-1")
require.NoError(t, err)
assert.True(t, cleared)

got, err := ps.GetRuntimeBroker(ctx, b.ID)
require.NoError(t, err)
assert.Nil(t, got.ConnectedHubID)
assert.Nil(t, got.ConnectedSessionID)
assert.Nil(t, got.ConnectedAt)
assert.Equal(t, store.BrokerStatusOffline, got.Status)
assert.False(t, got.LastHeartbeat.IsZero())
}

func TestReleaseAndMarkBrokerOffline_NoopOnSessionMismatch(t *testing.T) {
ps := newTestProjectStore(t)
ctx := context.Background()
b := newOfflineBroker(t, ps)

require.NoError(t, ps.ClaimRuntimeBrokerConnection(ctx, b.ID, "hub-1", "sess-NEW"))

// Stale session tries to release+offline: must be a no-op.
cleared, err := ps.ReleaseAndMarkBrokerOffline(ctx, b.ID, "hub-1", "sess-OLD")
require.NoError(t, err)
assert.False(t, cleared)

got, err := ps.GetRuntimeBroker(ctx, b.ID)
require.NoError(t, err)
require.NotNil(t, got.ConnectedHubID)
assert.Equal(t, "hub-1", *got.ConnectedHubID)
require.NotNil(t, got.ConnectedSessionID)
assert.Equal(t, "sess-NEW", *got.ConnectedSessionID)
assert.Equal(t, store.BrokerStatusOnline, got.Status, "status must remain online")
}

// TestReleaseAndMarkBrokerOffline_NoopAfterReclaim reproduces the exact race
// from issue #131: old session releases + stamps offline, but a new session
// has already re-claimed the broker. The stale release must be a no-op.
func TestReleaseAndMarkBrokerOffline_NoopAfterReclaim(t *testing.T) {
ps := newTestProjectStore(t)
ctx := context.Background()
b := newOfflineBroker(t, ps)

// t0: session A claims.
require.NoError(t, ps.ClaimRuntimeBrokerConnection(ctx, b.ID, "hub-1", "sess-A"))
// t1: session A disconnects, but before the callback runs, session B re-claims.
require.NoError(t, ps.ClaimRuntimeBrokerConnection(ctx, b.ID, "hub-1", "sess-B"))

// t2: stale callback tries to release+offline for session A.
cleared, err := ps.ReleaseAndMarkBrokerOffline(ctx, b.ID, "hub-1", "sess-A")
require.NoError(t, err)
assert.False(t, cleared, "stale session must not stamp offline")

got, err := ps.GetRuntimeBroker(ctx, b.ID)
require.NoError(t, err)
require.NotNil(t, got.ConnectedSessionID)
assert.Equal(t, "sess-B", *got.ConnectedSessionID, "new session must still own the broker")
assert.Equal(t, store.BrokerStatusOnline, got.Status, "status must remain online")
}

func TestReleaseAndMarkBrokerOffline_NoopWhenUnclaimed(t *testing.T) {
ps := newTestProjectStore(t)
ctx := context.Background()
b := newOfflineBroker(t, ps)

cleared, err := ps.ReleaseAndMarkBrokerOffline(ctx, b.ID, "hub-1", "sess-1")
require.NoError(t, err)
assert.False(t, cleared)
}

// ---------------------------------------------------------------------------
// Flap / cross-hub scenarios
// ---------------------------------------------------------------------------

// TestBrokerAffinity_FlapAtoB reproduces the design §9.4 disconnect race: a
// broker flaps from hub A to hub B; A's delayed onDisconnect must NOT clobber
// B's live ownership.
Expand Down
42 changes: 42 additions & 0 deletions pkg/store/entadapter/project_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,48 @@ func (s *ProjectStore) ReleaseRuntimeBrokerConnection(ctx context.Context, broke
return false, store.ErrVersionConflict
}

// ReleaseAndMarkBrokerOffline atomically clears broker affinity AND stamps
// status=offline in a single CAS write, ONLY IF affinity still names
// (hubInstanceID, sessionID). This eliminates the TOCTOU race between a
// separate release and a separate offline stamp: if a concurrent reconnect
// has already claimed the broker with a new session, the compare fails and
// this is a no-op — the new connection's online status is not clobbered.
func (s *ProjectStore) ReleaseAndMarkBrokerOffline(ctx context.Context, brokerID, hubInstanceID, sessionID string) (bool, error) {
uid, err := parseUUID(brokerID)
if err != nil {
return false, err
}

now := time.Now()
for attempt := 0; attempt < maxCASRetries; attempt++ {
cur, err := s.client.RuntimeBroker.Get(ctx, uid)
if err != nil {
return false, mapError(err)
}
if cur.ConnectedHubID == nil || *cur.ConnectedHubID != hubInstanceID ||
cur.ConnectedSessionID == nil || *cur.ConnectedSessionID != sessionID {
return false, nil
}
affected, err := s.client.RuntimeBroker.Update().
Where(runtimebroker.IDEQ(uid), runtimebroker.LockVersionEQ(cur.LockVersion)).
ClearConnectedHubID().
ClearConnectedSessionID().
ClearConnectedAt().
SetStatus(store.BrokerStatusOffline).
SetLastHeartbeat(now).
SetUpdated(now).
AddLockVersion(1).
Save(ctx)
if err != nil {
return false, mapError(err)
}
if affected == 1 {
return true, nil
}
}
return false, store.ErrVersionConflict
}

// ReapStaleBrokerAffinity clears affinity (connected_hub_id/connected_session_id/
// connected_at) for brokers that still claim affinity but whose last_heartbeat
// is older than staleBefore. Does not change broker status.
Expand Down
9 changes: 9 additions & 0 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ type RuntimeBrokerStore interface {
// It does not change status (the caller decides offline based on cleared).
ReleaseRuntimeBrokerConnection(ctx context.Context, brokerID, hubInstanceID, sessionID string) (cleared bool, err error)

// ReleaseAndMarkBrokerOffline atomically clears broker affinity AND stamps
// status=offline, ONLY IF affinity still names (hubInstanceID, sessionID).
// This prevents a stale disconnect callback from clobbering a concurrent
// reconnect's online status — the session check and the offline stamp happen
// in the same CAS write with no TOCTOU window.
// Returns cleared=true when affinity matched and the broker was stamped offline.
// Returns cleared=false (no-op) when affinity has already moved.
ReleaseAndMarkBrokerOffline(ctx context.Context, brokerID, hubInstanceID, sessionID string) (cleared bool, err error)

// ReapStaleBrokerAffinity clears connected_hub_id/connected_session_id/
// connected_at for brokers whose last_heartbeat is older than staleBefore
// and whose connected_hub_id is not NULL (i.e. they still claim affinity).
Expand Down