Skip to content

Commit e7884b0

Browse files
authored
Scope producer and ticker names with node name (#6)
To make it possible to attach the same producers to different pools. Also fix race conditions in tests.
1 parent 1e9f0ec commit e7884b0

File tree

4 files changed

+46
-13
lines changed

4 files changed

+46
-13
lines changed

pool/node_test.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"os"
77
"strings"
8+
"sync"
89
"testing"
910
"time"
1011

@@ -119,9 +120,9 @@ func testContext(t *testing.T) context.Context {
119120
return log.Context(context.Background(), log.WithDebug())
120121
}
121122

122-
func testLogContext(t *testing.T) (context.Context, *bytes.Buffer) {
123+
func testLogContext(t *testing.T) (context.Context, *buffer) {
123124
t.Helper()
124-
var buf bytes.Buffer
125+
var buf buffer
125126
return log.Context(context.Background(), log.WithOutput(&buf), log.WithFormat(log.FormatText), log.WithDebug()), &buf
126127
}
127128

@@ -159,3 +160,21 @@ type workerMock struct {
159160
func (w *workerMock) Start(job *Job) error { return w.startFunc(job) }
160161
func (w *workerMock) Stop(key string) error { return w.stopFunc(key) }
161162
func (w *workerMock) Notify(p []byte) error { return w.notifyFunc(p) }
163+
164+
// buffer is a goroutine safe bytes.Buffer
165+
type buffer struct {
166+
buffer bytes.Buffer
167+
mutex sync.Mutex
168+
}
169+
170+
func (s *buffer) Write(p []byte) (n int, err error) {
171+
s.mutex.Lock()
172+
defer s.mutex.Unlock()
173+
return s.buffer.Write(p)
174+
}
175+
176+
func (s *buffer) String() string {
177+
s.mutex.Lock()
178+
defer s.mutex.Unlock()
179+
return s.buffer.String()
180+
}

pool/scheduler.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,17 @@ var ErrScheduleStop = fmt.Errorf("stop")
7171
// returns ErrScheduleStop. Plan is called on only one of the nodes that
7272
// scheduled the same producer.
7373
func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval time.Duration) error {
74-
jobMap, err := rmap.Join(ctx, producer.Name(), node.rdb, rmap.WithLogger(node.logger))
74+
name := node.Name + ":" + producer.Name()
75+
jobMap, err := rmap.Join(ctx, name, node.rdb, rmap.WithLogger(node.logger))
7576
if err != nil {
76-
return fmt.Errorf("failed to join job map: %w", err)
77+
return fmt.Errorf("failed to join job map %s: %w", name, err)
7778
}
7879
ticker, err := node.NewTicker(ctx, producer.Name(), interval)
7980
if err != nil {
80-
return fmt.Errorf("failed to create ticker: %w", err)
81+
return fmt.Errorf("failed to create ticker %s: %w", name, err)
8182
}
8283
sched := &scheduler{
83-
name: producer.Name(),
84+
name: name,
8485
interval: interval,
8586
producer: producer,
8687
node: node,

pool/scheduler_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pool
22

33
import (
4+
"sync"
45
"testing"
56
"time"
67

@@ -20,12 +21,16 @@ func TestSchedule(t *testing.T) {
2021
worker = newTestWorker(t, ctx, node)
2122
d = 10 * time.Millisecond
2223
iter = 0
24+
lock sync.Mutex
2325
)
2426
defer cleanup(t, rdb, false, testName)
2527

28+
inc := func() { lock.Lock(); iter++; lock.Unlock() }
29+
it := func() int { lock.Lock(); defer lock.Unlock(); return iter }
30+
2631
producer := newTestProducer(testName, func() (*JobPlan, error) {
27-
iter++
28-
switch iter {
32+
inc()
33+
switch it() {
2934
case 1:
3035
assert.Equal(t, 0, numJobs(t, worker), "unexpected number of jobs")
3136
// First iteration: start a job
@@ -60,17 +65,19 @@ func TestSchedule(t *testing.T) {
6065
// Seventh iteration: stop schedule
6166
return nil, ErrScheduleStop
6267
}
63-
t.Errorf("unexpected iteration %d", iter)
68+
t.Errorf("unexpected iteration %d", it())
6469
return nil, nil
6570
})
6671

6772
// Observe call to reset
68-
jobMap, err := rmap.Join(ctx, testName, rdb)
73+
jobMap, err := rmap.Join(ctx, testName+":"+testName, rdb)
6974
require.NoError(t, err)
7075
var reset bool
7176
c := jobMap.Subscribe()
7277
defer jobMap.Unsubscribe(c)
78+
done := make(chan struct{})
7379
go func() {
80+
defer close(done)
7481
for ev := range c {
7582
if ev == rmap.EventReset {
7683
reset = true
@@ -82,9 +89,14 @@ func TestSchedule(t *testing.T) {
8289
err = node.Schedule(ctx, producer, d)
8390
require.NoError(t, err)
8491

85-
jobMap.Subscribe()
86-
assert.Eventually(t, func() bool { return iter == 7 }, max, delay, "schedule should have stopped")
87-
assert.Eventually(t, func() bool { return reset }, max, delay, "job map should have been reset")
92+
assert.Eventually(t, func() bool { return it() == 7 }, max, delay, "schedule should have stopped")
93+
select {
94+
case <-done:
95+
reset = true
96+
case <-time.After(time.Second):
97+
break
98+
}
99+
assert.True(t, reset, "job map should have been reset")
88100
assert.NotContains(t, buf.String(), "level=error", "unexpected logged error")
89101
}
90102

pool/ticker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o
3636
if node.clientOnly {
3737
return nil, fmt.Errorf("cannot create ticker on client-only node")
3838
}
39+
name = node.Name + ":" + name
3940
o := parseTickerOptions(opts...)
4041
logger := o.logger
4142
if logger == nil {

0 commit comments

Comments
 (0)