Skip to content

Commit 603fcac

Browse files
committed
chore(redis): support block time in XReadGroup
ref: #15 Signed-off-by: Bo-Yi.Wu <[email protected]>
1 parent 655c933 commit 603fcac

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redisdb
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/golang-queue/queue"
78
"github.com/golang-queue/queue/core"
@@ -22,6 +23,7 @@ type options struct {
2223
group string
2324
consumer string
2425
maxLength int64
26+
blockTime time.Duration
2527
}
2628

2729
// WithAddr setup the addr of redis
@@ -38,6 +40,15 @@ func WithMaxLength(m int64) Option {
3840
}
3941
}
4042

43+
// WithBlockTime setup the block time for publish messages
44+
// we use the block command to make sure if no entry is found we wait
45+
// until an entry is found
46+
func WithBlockTime(m time.Duration) Option {
47+
return func(w *options) {
48+
w.blockTime = m
49+
}
50+
}
51+
4152
// WithPassword redis password
4253
func WithDB(db int) Option {
4354
return func(w *options) {
@@ -111,6 +122,7 @@ func newOptions(opts ...Option) options {
111122
runFunc: func(context.Context, core.QueuedMessage) error {
112123
return nil
113124
},
125+
blockTime: 60 * time.Second,
114126
}
115127

116128
// Loop through each option

redis.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616

1717
var _ core.Worker = (*Worker)(nil)
1818

19-
const blockTime = 60000
20-
2119
// Worker for Redis
2220
type Worker struct {
2321
// redis config
@@ -103,10 +101,10 @@ func (w *Worker) fetchTask() {
103101
Count: 1,
104102
// we use the block command to make sure if no entry is found we wait
105103
// until an entry is found
106-
Block: blockTime,
104+
Block: w.opts.blockTime,
107105
}).Result()
108106
if err != nil {
109-
w.opts.logger.Errorf("error while reading from redis %v",err)
107+
w.opts.logger.Errorf("error while reading from redis %v", err)
110108
continue
111109
}
112110
// we have received the data we should loop it and queue the messages

0 commit comments

Comments
 (0)