diff --git a/bus/multi.go b/bus/multi.go index a8a12a54..96c60c9e 100644 --- a/bus/multi.go +++ b/bus/multi.go @@ -164,3 +164,179 @@ func ExecCollectValues[T directive.Value]( } } } + +// ExecCollectValuesWatch collects values and calls a callback whenever the slice changes. +// +// The callback is called with a snapshot of the current values. Its behavior is +// determined by the waitIdle flag and the directive's state. +// +// Rules for Emitting: +// 1. Initial State: Before the first emit, if waitIdle is true, all adds and +// removals will be buffered. The first emit occurs only when the directive +// becomes idle. This first emit will happen even if the collected value set +// is empty. +// 2. If waitIdle is false, any change (add, remove, error) triggers an immediate emit. +// 3. After the First Emit: Once the initial collection has been emitted, any +// subsequent change (add, remove, error) will trigger an immediate emit, +// regardless of the waitIdle flag. +// +// The callback runs in a separate goroutine. If the callback returns an error, +// it will not be called again, and the error will be passed to errorCb. +// +// errorCb can be nil. +// +// Returns the directive instance and reference for cleanup. If err != nil, ref == nil. +func ExecCollectValuesWatch[T directive.Value]( + ctx context.Context, + bus Bus, + dir directive.Directive, + waitIdle bool, + callback func(resErr error, vals []T) error, + errorCb func(err error), +) (directive.Instance, directive.Reference, error) { + // bcast guards these variables + var bcast broadcast.Broadcast + + var vals []T + var valIDs []uint32 + var resErr error + var idle bool + var emittedOnce bool // Set to true after the first emit and never reset. + var pendingEmit bool + + // scheduleEmitIfReady checks if an emit should be triggered now or deferred. + scheduleEmitIfReady := func(broadcast func()) { + // Emit immediately if we are not waiting for an initial idle, OR if the + // initial emit has already happened. + if !waitIdle || emittedOnce { + pendingEmit = true + broadcast() + } + } + + di, ref, err := bus.AddDirective( + dir, + NewCallbackHandler( + func(v directive.AttachedValue) { // Add handler + val, valOk := v.GetValue().(T) + if !valOk { + return + } + bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) { + vals = append(vals, val) + valIDs = append(valIDs, v.GetValueID()) + scheduleEmitIfReady(broadcast) + }) + }, + func(v directive.AttachedValue) { // Remove handler + _, valOk := v.GetValue().(T) + if !valOk { + return + } + bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) { + id := v.GetValueID() + for i, valID := range valIDs { + if valID == id { + // In-place removal for efficiency. + valIDs[i] = valIDs[len(valIDs)-1] + valIDs = valIDs[:len(valIDs)-1] + vals[i] = vals[len(vals)-1] + var empty T + vals[len(vals)-1] = empty + vals = vals[:len(vals)-1] + scheduleEmitIfReady(broadcast) + break + } + } + }) + }, + func() { // Dispose handler + bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) { + if resErr == nil { + resErr = directive.ErrDirectiveDisposed + pendingEmit = true + broadcast() + } + }) + }, + ), + ) + if err != nil { + if ref != nil { + ref.Release() + } + return nil, nil, err + } + + defer di.AddIdleCallback(func(isIdle bool, errs []error) { + bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) { + if resErr != nil { + return + } + + wasIdle := idle + idle = isIdle + + // Errors always trigger an emit. + for _, err := range errs { + if err != nil { + resErr = err + pendingEmit = true + broadcast() + return + } + } + + // If we just became idle and haven't emitted the initial set yet, + // trigger the emit now. This is the core of the waitIdle logic. + if !wasIdle && idle && !emittedOnce { + pendingEmit = true + broadcast() + } + }) + })() + + // Goroutine to handle callbacks + go func() { + for { + var currVals []T + var currErr error + var shouldEmit bool + var waitCh <-chan struct{} + + bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) { + currErr = resErr + shouldEmit = pendingEmit + pendingEmit = false + waitCh = getWaitCh() + + if shouldEmit { + // Mark that the initial emit has happened. + // This is the one and only time this flag is set. + emittedOnce = true + if currErr == nil { + currVals = slices.Clone(vals) + } + } + }) + + if shouldEmit { + if err := callback(currErr, currVals); err != nil { + if errorCb != nil { + errorCb(err) + } + return // Stop processing on callback error. + } + } + + select { + case <-ctx.Done(): + return + case <-waitCh: + // Await next broadcast + } + } + }() + + return di, ref, nil +}