Skip to content

Commit 59e61d3

Browse files
committed
feat: make the observers asynchronous
1 parent eeb6fde commit 59e61d3

File tree

5 files changed

+139
-21
lines changed

5 files changed

+139
-21
lines changed

async.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package async
22

33
import (
4+
"math"
45
"time"
56
)
67

78
const (
89
// DefaultRoutineSnapshottingInterval defines how often the routine manager checks routine status
910
DefaultRoutineSnapshottingInterval = 30 * time.Second
11+
DefaultObserverTimeout = time.Duration(math.MaxInt64)
1012
)
1113

1214
// A RoutinesObserver is an object that observes the status of the executions of routines.
@@ -29,3 +31,40 @@ type RoutinesObserver interface {
2931
// currently running
3032
RunningRoutineByNameCount(name string, count int)
3133
}
34+
35+
type routineEventType int
36+
37+
const (
38+
routineStarted routineEventType = iota
39+
routineEnded
40+
routineTimeboxExceeded
41+
takeSnapshot
42+
)
43+
44+
type routineEvent struct {
45+
Type routineEventType
46+
routine AsyncRoutine
47+
snapshot Snapshot
48+
}
49+
50+
func newRoutineEvent(eventType routineEventType) routineEvent {
51+
return routineEvent{
52+
Type: eventType,
53+
}
54+
}
55+
56+
func routineStartedEvent() routineEvent {
57+
return newRoutineEvent(routineStarted)
58+
}
59+
60+
func routineFinishedEvent() routineEvent {
61+
return newRoutineEvent(routineEnded)
62+
}
63+
64+
func routineTimeboxExceededEvent() routineEvent {
65+
return newRoutineEvent(routineTimeboxExceeded)
66+
}
67+
68+
func takeSnapshotEvent() routineEvent {
69+
return newRoutineEvent(takeSnapshot)
70+
}

async_routine.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,10 @@ func (r *asyncRoutine) run(manager AsyncRoutineManager) {
145145
r.status = RoutineStatusFinished
146146
}
147147
manager.deregister(r)
148-
manager.notify(func(observer RoutinesObserver) {
149-
observer.RoutineFinished(r)
150-
})
148+
manager.notifyAll(r, routineFinishedEvent())
151149
}
152150

153-
manager.notify(func(observer RoutinesObserver) {
154-
observer.RoutineStarted(r)
155-
})
151+
manager.notifyAll(r, routineStartedEvent())
156152

157153
if r.errGroup != nil {
158154
r.errGroup.Go(func() error {

async_routine_manager.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type AsyncRoutineManager interface {
1616
RemoveObserver(observerId string)
1717
IsEnabled() bool
1818
GetSnapshot() Snapshot
19-
notify(eventSource func(observer RoutinesObserver))
19+
notifyAll(src AsyncRoutine, evt routineEvent)
2020
Monitor() AsyncRoutineMonitor
2121
register(routine AsyncRoutine)
2222
deregister(routine AsyncRoutine)
@@ -30,7 +30,7 @@ type asyncRoutineManager struct {
3030
snapshottingToggle Toggle
3131
snapshottingInterval time.Duration
3232
routines cmap.ConcurrentMap[string, AsyncRoutine]
33-
observers cmap.ConcurrentMap[string, RoutinesObserver]
33+
observers cmap.ConcurrentMap[string, *observerProxy]
3434

3535
monitorLock sync.Mutex // user to sync the `Start` and `Stop` methods that are used to start the
3636
// snapshotting routine
@@ -47,12 +47,26 @@ func (arm *asyncRoutineManager) IsEnabled() bool {
4747
// Assigns and returns an observer ID to the RoutineObserver
4848
func (arm *asyncRoutineManager) AddObserver(observer RoutinesObserver) string {
4949
uid := uuid.New().String()
50-
arm.observers.Set(uid, observer)
50+
arm.observers.Set(uid, newObserverProxy(uid, observer, arm, DefaultObserverTimeout))
51+
return uid
52+
}
53+
54+
// AddObserverWithTimeout registers a new RoutinesObserver with the asyncRoutineManager,
55+
// associating it with a unique identifier and a specified timeout duration.
56+
// The function returns the unique ID assigned to the observer.
57+
func (arm *asyncRoutineManager) AddObserverWithTimeout(observer RoutinesObserver, timeout time.Duration) string {
58+
uid := uuid.New().String()
59+
arm.observers.Set(uid, newObserverProxy(uid, observer, arm, timeout))
5160
return uid
5261
}
5362

5463
// RemoveObserver removes the given RoutineObserver from the list of observers
5564
func (arm *asyncRoutineManager) RemoveObserver(observerId string) {
65+
observer, ok := arm.observers.Get(observerId)
66+
if !ok {
67+
return
68+
}
69+
close(observer.channel)
5670
arm.observers.Remove(observerId)
5771
}
5872

@@ -64,9 +78,10 @@ func (arm *asyncRoutineManager) GetSnapshot() Snapshot {
6478
return snapshot
6579
}
6680

67-
func (arm *asyncRoutineManager) notify(eventSource func(observer RoutinesObserver)) {
81+
// notifyAll notifies all the observers of the event evt received from the routine src
82+
func (arm *asyncRoutineManager) notifyAll(src AsyncRoutine, evt routineEvent) {
6883
for _, observer := range arm.observers.Items() {
69-
eventSource(observer)
84+
observer.notify(src, evt)
7085
}
7186
}
7287

@@ -113,7 +128,7 @@ var lock sync.RWMutex
113128
func newAsyncRoutineManager(options ...AsyncManagerOption) AsyncRoutineManager {
114129
mgr := &asyncRoutineManager{
115130
routines: cmap.New[AsyncRoutine](),
116-
observers: cmap.New[RoutinesObserver](),
131+
observers: cmap.New[*observerProxy](),
117132
snapshottingInterval: DefaultRoutineSnapshottingInterval,
118133
ctx: context.Background(),
119134
managerToggle: func() bool { return true }, // manager is enabled by default

async_routine_monitor.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,8 @@ func (arm *asyncRoutineManager) snapshot() {
6666
snapshot := arm.GetSnapshot()
6767

6868
for _, r := range snapshot.GetTimedOutRoutines() {
69-
arm.notify(func(observer RoutinesObserver) {
70-
observer.RoutineExceededTimebox(r)
71-
})
69+
arm.notifyAll(r, routineTimeboxExceededEvent())
7270
}
7371

74-
arm.notify(func(observer RoutinesObserver) {
75-
observer.RunningRoutineCount(snapshot.totalRoutineCount)
76-
for _, name := range snapshot.GetRunningRoutinesNames() {
77-
observer.RunningRoutineByNameCount(name, snapshot.GetRunningRoutinesCount(name))
78-
}
79-
})
72+
arm.notifyAll(nil, takeSnapshotEvent())
8073
}

async_routine_observer_proxy.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// observerProxy acts as an intermediary between the AsyncRoutineManager and a RoutinesObserver.
9+
// It receives routine events via a channel and dispatches them to the observer's callback methods.
10+
// The proxy manages event notification asynchronously and can enforce a timeout on the observer's lifecycle.
11+
type observerProxy struct {
12+
manager AsyncRoutineManager
13+
observer RoutinesObserver
14+
channel chan routineEvent
15+
}
16+
17+
// newObserverProxy creates and initializes a new observerProxy instance.
18+
// It sets up an asynchronous routine that listens for routine events on the proxy's channel
19+
// and forwards them to the appropriate methods of the provided RoutinesObserver.
20+
//
21+
// Parameters:
22+
// - observerId: a unique identifier for the observer instance.
23+
// - observer: the RoutinesObserver to be notified of routine events.
24+
// - manager: the AsyncRoutineManager responsible for managing routines.
25+
// - timeout: the duration after which the observer routine is considered 'exceeding the timebox'.
26+
//
27+
// Returns:
28+
// - A pointer to the initialized observerProxy.
29+
func newObserverProxy(observerId string, observer RoutinesObserver, manager AsyncRoutineManager, timeout time.Duration) *observerProxy {
30+
proxy := &observerProxy{
31+
manager: manager,
32+
observer: observer,
33+
channel: make(chan routineEvent),
34+
}
35+
NewAsyncRoutine("async-observer-notifier", context.Background(), func() {
36+
for evt := range proxy.channel {
37+
switch evt.Type {
38+
case routineStarted:
39+
observer.RoutineStarted(evt.routine)
40+
case routineEnded:
41+
observer.RoutineFinished(evt.routine)
42+
case routineTimeboxExceeded:
43+
observer.RoutineExceededTimebox(evt.routine)
44+
case takeSnapshot:
45+
observer.RunningRoutineCount(evt.snapshot.GetTotalRoutineCount())
46+
for _, routineName := range evt.snapshot.GetRunningRoutinesNames() {
47+
observer.RunningRoutineByNameCount(routineName, evt.snapshot.GetRunningRoutinesCount(routineName))
48+
}
49+
}
50+
}
51+
}).
52+
Timebox(timeout).
53+
WithData("observer-id", observerId).
54+
Run()
55+
56+
return proxy
57+
}
58+
59+
// notify sends a routine event to the observerProxy's channel.
60+
// Depending on the event type, it either forwards the routine information
61+
// or triggers a snapshot retrieval from the manager.
62+
func (proxy *observerProxy) notify(routine AsyncRoutine, evt routineEvent) {
63+
switch evt.Type {
64+
case routineStarted, routineEnded, routineTimeboxExceeded:
65+
proxy.channel <- routineEvent{
66+
Type: evt.Type,
67+
routine: routine,
68+
}
69+
case takeSnapshot:
70+
proxy.channel <- routineEvent{
71+
Type: takeSnapshot,
72+
snapshot: proxy.manager.GetSnapshot(),
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)