Skip to content

Commit 325e564

Browse files
committed
Add ability to unsubscribe
from reader or sink prior to closing them.
1 parent 14418bf commit 325e564

File tree

6 files changed

+54
-27
lines changed

6 files changed

+54
-27
lines changed

rmap/map.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,8 @@ func (sm *Map) run() {
488488
}
489489
}
490490

491-
// runLuaScript runs the given Lua script, the furst argument must be the key.
491+
// runLuaScript runs the given Lua script, the first argument must be the key.
492+
// It is the caller's responsibility to make sure the map is locked.
492493
func (sm *Map) runLuaScript(ctx context.Context, name string, script *redis.Script, args ...any) (any, error) {
493494
key := args[0].(string)
494495
if len(key) == 0 {

streaming/reader.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type (
8080
)
8181

8282
// newReader creates a new reader.
83-
func newReader(ctx context.Context, stream *Stream, opts ...ReaderOption) (*Reader, error) {
83+
func newReader(stream *Stream, opts ...ReaderOption) (*Reader, error) {
8484
options := defaultReaderOptions()
8585
for _, option := range opts {
8686
option(&options)
@@ -117,7 +117,7 @@ func newReader(ctx context.Context, stream *Stream, opts ...ReaderOption) (*Read
117117
}
118118

119119
// Subscribe returns a channel that receives events from the stream.
120-
// The channel is closed when the reader is stopped.
120+
// The channel is closed when the reader is closed.
121121
func (r *Reader) Subscribe() <-chan *Event {
122122
c := make(chan *Event, r.bufferSize)
123123
r.lock.Lock()
@@ -126,6 +126,19 @@ func (r *Reader) Subscribe() <-chan *Event {
126126
return c
127127
}
128128

129+
// Unsubscribe removes the channel from the reader subscribers and closes it.
130+
func (r *Reader) Unsubscribe(c <-chan *Event) {
131+
r.lock.Lock()
132+
defer r.lock.Unlock()
133+
for i, ch := range r.chans {
134+
if ch == c {
135+
close(ch)
136+
r.chans = append(r.chans[:i], r.chans[i+1:]...)
137+
return
138+
}
139+
}
140+
}
141+
129142
// AddStream adds the stream to the sink. By default the stream cursor starts at
130143
// the same timestamp as the sink main stream cursor. This can be overridden
131144
// with opts. AddStream does nothing if the stream is already part of the sink.

streaming/reader_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestNewReader(t *testing.T) {
1818
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
1919
defer cleanup(t, rdb, testName)
2020
ctx := testContext(t)
21-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
21+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
2222
assert.NoError(t, err)
2323
reader, err := s.NewReader(ctx)
2424
assert.NoError(t, err)
@@ -31,7 +31,7 @@ func TestReaderReadOnce(t *testing.T) {
3131
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
3232
defer cleanup(t, rdb, testName)
3333
ctx := testContext(t)
34-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
34+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
3535
assert.NoError(t, err)
3636
reader, err := s.NewReader(ctx, WithReaderStartAtOldest(), WithReaderBlockDuration(testBlockDuration))
3737
require.NoError(t, err)
@@ -50,7 +50,7 @@ func TestReaderReadSinceLastEvent(t *testing.T) {
5050
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
5151
defer cleanup(t, rdb, testName)
5252
ctx := testContext(t)
53-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
53+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
5454
assert.NoError(t, err)
5555

5656
// Add and read 2 events consecutively
@@ -97,7 +97,7 @@ func TestCleanupReader(t *testing.T) {
9797
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
9898
defer cleanup(t, rdb, testName)
9999
ctx := testContext(t)
100-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
100+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
101101
assert.NoError(t, err)
102102
reader, err := s.NewReader(ctx, WithReaderStartAtOldest(), WithReaderBlockDuration(testBlockDuration))
103103
require.NoError(t, err)
@@ -123,11 +123,11 @@ func TestAddReaderStream(t *testing.T) {
123123
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
124124
defer cleanup(t, rdb, testName)
125125
ctx := testContext(t)
126-
s, err := NewStream(ctx, "testAddStream", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
126+
s, err := NewStream("testAddStream", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
127127
assert.NoError(t, err)
128128
reader, err := s.NewReader(ctx, WithReaderStartAtOldest(), WithReaderBlockDuration(testBlockDuration))
129129
require.NoError(t, err)
130-
s2, err := NewStream(ctx, "testAddStream2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
130+
s2, err := NewStream("testAddStream2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
131131
assert.NoError(t, err)
132132
assert.NoError(t, reader.AddStream(ctx, s2))
133133
assert.NoError(t, reader.AddStream(ctx, s2)) // Make sure it's idempotent
@@ -155,9 +155,9 @@ func TestRemoveReaderStream(t *testing.T) {
155155
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
156156
defer cleanup(t, rdb, testName)
157157
ctx := testContext(t)
158-
s, err := NewStream(ctx, "testRemoveStream", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
158+
s, err := NewStream("testRemoveStream", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
159159
assert.NoError(t, err)
160-
s2, err := NewStream(ctx, "testRemoveStream2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
160+
s2, err := NewStream("testRemoveStream2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
161161
assert.NoError(t, err)
162162
reader, err := s.NewReader(ctx, WithReaderStartAtOldest(), WithReaderBlockDuration(testBlockDuration))
163163
require.NoError(t, err)

streaming/sink.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,19 @@ func (s *Sink) Subscribe() <-chan *Event {
165165
return c
166166
}
167167

168+
// Unsubscribe removes the channel from the sink and closes it.
169+
func (s *Sink) Unsubscribe(c <-chan *Event) {
170+
s.lock.Lock()
171+
defer s.lock.Unlock()
172+
for i, ch := range s.chans {
173+
if ch == c {
174+
close(ch)
175+
s.chans = append(s.chans[:i], s.chans[i+1:]...)
176+
return
177+
}
178+
}
179+
}
180+
168181
// Ack acknowledges the event.
169182
func (s *Sink) Ack(ctx context.Context, e *Event) error {
170183
err := e.rdb.XAck(ctx, e.streamKey, e.SinkName, e.ID).Err()

streaming/sink_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestNewSink(t *testing.T) {
2727
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
2828
defer cleanup(t, rdb, testName)
2929
ctx := testContext(t)
30-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
30+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
3131
assert.NoError(t, err)
3232
sink, err := s.NewSink(ctx, "sink")
3333
assert.NoError(t, err)
@@ -40,7 +40,7 @@ func TestReadOnce(t *testing.T) {
4040
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
4141
defer cleanup(t, rdb, testName)
4242
ctx := testContext(t)
43-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
43+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
4444
assert.NoError(t, err)
4545
sink, err := s.NewSink(ctx, "sink",
4646
WithSinkStartAtOldest(),
@@ -61,7 +61,7 @@ func TestReadSinceLastEvent(t *testing.T) {
6161
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
6262
defer cleanup(t, rdb, testName)
6363
ctx := testContext(t)
64-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
64+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
6565
assert.NoError(t, err)
6666

6767
// Add and read 2 events consecutively
@@ -115,7 +115,7 @@ func TestCleanup(t *testing.T) {
115115
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
116116
defer cleanup(t, rdb, testName)
117117
ctx := testContext(t)
118-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
118+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
119119
assert.NoError(t, err)
120120
sink, err := s.NewSink(ctx, "sink",
121121
WithSinkStartAtOldest(),
@@ -143,13 +143,13 @@ func TestAddStream(t *testing.T) {
143143
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
144144
defer cleanup(t, rdb, testName)
145145
ctx := testContext(t)
146-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
146+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
147147
assert.NoError(t, err)
148148
sink, err := s.NewSink(ctx, "sink",
149149
WithSinkStartAtOldest(),
150150
WithSinkBlockDuration(testBlockDuration))
151151
require.NoError(t, err)
152-
s2, err := NewStream(ctx, testName+"2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
152+
s2, err := NewStream(testName+"2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
153153
assert.NoError(t, err)
154154
assert.NoError(t, sink.AddStream(ctx, s2))
155155
assert.NoError(t, sink.AddStream(ctx, s2)) // Make sure it's idempotent
@@ -177,13 +177,13 @@ func TestRemoveStream(t *testing.T) {
177177
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
178178
defer cleanup(t, rdb, testName)
179179
ctx := testContext(t)
180-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
180+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
181181
assert.NoError(t, err)
182182
sink, err := s.NewSink(ctx, "sink",
183183
WithSinkStartAtOldest(),
184184
WithSinkBlockDuration(testBlockDuration))
185185
require.NoError(t, err)
186-
s2, err := NewStream(ctx, "testRemoveStream2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
186+
s2, err := NewStream("testRemoveStream2", rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
187187
assert.NoError(t, err)
188188
err = sink.AddStream(ctx, s2)
189189
assert.NoError(t, err)
@@ -229,7 +229,7 @@ func TestMultipleConsumers(t *testing.T) {
229229
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
230230
defer cleanup(t, rdb, testName)
231231
ctx := testContext(t)
232-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
232+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
233233
assert.NoError(t, err)
234234
sink, err := s.NewSink(ctx, "sink",
235235
WithSinkStartAtOldest(),
@@ -286,7 +286,7 @@ func TestClaimStaleMessages(t *testing.T) {
286286
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
287287
defer cleanup(t, rdb, testName)
288288
ctx := testContext(t)
289-
s, err := NewStream(ctx, testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
289+
s, err := NewStream(testName, rdb, WithStreamLogger(ponos.ClueLogger(ctx)))
290290
assert.NoError(t, err)
291291
sink, err := s.NewSink(ctx, "sink",
292292
WithSinkStartAtOldest(),

streaming/streams_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ func TestDestroy(t *testing.T) {
2121
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
2222
ctx := testContext(t)
2323

24-
s, err := NewStream(ctx, "testDestroy", rdb)
24+
s, err := NewStream("testDestroy", rdb)
2525
assert.NoError(t, err)
2626
assert.NoError(t, s.Destroy(ctx))
2727
exists, err := rdb.Exists(ctx, s.key).Result()
2828
assert.NoError(t, err)
2929
assert.Equal(t, int64(0), exists)
3030

31-
s2, err := NewStream(ctx, "testDestroy2", rdb)
31+
s2, err := NewStream("testDestroy2", rdb)
3232
assert.NoError(t, err)
3333
_, err = s2.Add(ctx, "foo", []byte("bar"))
3434
assert.NoError(t, err)
@@ -40,7 +40,7 @@ func TestDestroy(t *testing.T) {
4040

4141
func TestOptions(t *testing.T) {
4242
ctx := testContext(t)
43-
s, err := NewStream(ctx, "testOptions", nil, WithStreamMaxLen(10), WithStreamLogger(nil))
43+
s, err := NewStream("testOptions", nil, WithStreamMaxLen(10), WithStreamLogger(nil))
4444
assert.NoError(t, err)
4545
assert.Equal(t, 10, s.MaxLen)
4646
assert.Equal(t, ponos.NoopLogger(), s.logger)
@@ -49,7 +49,7 @@ func TestOptions(t *testing.T) {
4949
func TestAdd(t *testing.T) {
5050
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
5151
ctx := testContext(t)
52-
s, err := NewStream(ctx, "testAdd", rdb)
52+
s, err := NewStream("testAdd", rdb)
5353
assert.NoError(t, err)
5454

5555
_, err = s.Add(ctx, "foo", []byte("bar"))
@@ -68,7 +68,7 @@ func TestAdd(t *testing.T) {
6868
func TestRemove(t *testing.T) {
6969
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
7070
ctx := testContext(t)
71-
s, err := NewStream(ctx, "testRemove", rdb)
71+
s, err := NewStream("testRemove", rdb)
7272
assert.NoError(t, err)
7373

7474
_, err = s.Add(ctx, "foo", []byte("bar"))
@@ -94,7 +94,7 @@ func TestRemove(t *testing.T) {
9494
func TestTopic(t *testing.T) {
9595
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
9696
ctx := testContext(t)
97-
s, err := NewStream(ctx, "testTopic", rdb)
97+
s, err := NewStream("testTopic", rdb)
9898
assert.NoError(t, err)
9999

100100
_, err = s.Add(ctx, "bar", []byte("baz"), WithTopic("foo"))

0 commit comments

Comments
 (0)