Skip to content

Commit 363bd9c

Browse files
author
Rene Kaufmann
committed
nats: fix endless watch loop
1 parent 3c79cb1 commit 363bd9c

File tree

1 file changed

+24
-10
lines changed

1 file changed

+24
-10
lines changed

nats/client.go

+24-10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ var cleanReplacer = strings.NewReplacer(".", "/")
2323
type Client struct {
2424
nc *nats.Conn
2525
kv nats.KeyValue
26+
27+
revisionMap map[string]uint64
2628
}
2729

2830
// New returns a new client
@@ -67,8 +69,9 @@ func New(nodes []string, bucket string, opts ...Option) (*Client, error) {
6769
}
6870

6971
return &Client{
70-
nc: nc,
71-
kv: kv,
72+
nc: nc,
73+
kv: kv,
74+
revisionMap: make(map[string]uint64),
7275
}, nil
7376
}
7477

@@ -130,19 +133,30 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, opts ...easykv.
130133
}
131134

132135
defer watcher.Stop()
136+
133137
for v := range watcher.Updates() {
134138
if v == nil {
135139
break
136140
}
137-
for _, k := range options.Keys {
138-
if strings.HasPrefix(clean(string(v.Key())), k) {
139-
return v.Revision(), nil
140-
}
141-
}
141+
c.revisionMap[v.Key()] = v.Revision()
142142
}
143143

144-
if ctx.Err() == context.Canceled {
145-
return options.WaitIndex, easykv.ErrWatchCanceled
144+
for {
145+
select {
146+
case v := <-watcher.Updates():
147+
if v == nil {
148+
break
149+
}
150+
151+
for _, k := range options.Keys {
152+
if strings.HasPrefix(clean(string(v.Key())), k) {
153+
if v.Revision() != c.revisionMap[v.Key()] {
154+
return v.Revision(), nil
155+
}
156+
}
157+
}
158+
case <-ctx.Done():
159+
return 0, easykv.ErrWatchCanceled
160+
}
146161
}
147-
return 0, err
148162
}

0 commit comments

Comments
 (0)