Skip to content

Commit 7087a7d

Browse files
author
Brian Picciano
committed
use lever instead of flagconfig, use radix.v2 instead of radix, fix Lua call for cluster
1 parent a5bd057 commit 7087a7d

File tree

11 files changed

+154
-111
lines changed

11 files changed

+154
-111
lines changed

.go.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
path: github.com/mc0/okq
33
deps:
4-
- loc: github.com/fzzy/radix/redis
5-
- loc: github.com/mediocregopher/flagconfig
4+
- loc: github.com/mediocregopher/lever
65
- loc: github.com/mediocregopher/pubsubch
6+
- loc: github.com/mediocregopher/radix.v2
77
- loc: github.com/stretchr/testify

clients/consumers/consumers_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
. "testing"
55
"time"
66

7-
"github.com/fzzy/radix/redis"
7+
"github.com/mediocregopher/radix.v2/redis"
88
"github.com/stretchr/testify/assert"
99
"github.com/stretchr/testify/require"
1010

@@ -27,7 +27,7 @@ func TestUpdateQueues(t *T) {
2727
for i := range queues {
2828
key := db.ConsumersKey(queues[i])
2929
res := db.Cmd("ZRANK", key, client.ID)
30-
assert.Equal(t, redis.IntegerReply, res.Type, "res: %s", res)
30+
assert.Equal(t, true, res.IsType(redis.Int), "res: %s", res)
3131
}
3232

3333
err = UpdateQueues(client, queues[1:])
@@ -36,13 +36,13 @@ func TestUpdateQueues(t *T) {
3636
// Make sure the first queue had this clientId removed from it
3737
key := db.ConsumersKey(queues[0])
3838
res := db.Cmd("ZRANK", key, client.ID)
39-
assert.Equal(t, redis.NilReply, res.Type, "res: %s", res)
39+
assert.Equal(t, true, res.IsType(redis.Nil), "res: %s", res)
4040

4141
// Make sure the rest of the queues still have it
4242
for i := range queues[1:] {
4343
key := db.ConsumersKey(queues[1:][i])
4444
res := db.Cmd("ZRANK", key, client.ID)
45-
assert.Equal(t, redis.IntegerReply, res.Type, "res: %s", res)
45+
assert.Equal(t, true, res.IsType(redis.Int), "res: %s", res)
4646
}
4747

4848
err = UpdateQueues(client, []string{})
@@ -52,7 +52,7 @@ func TestUpdateQueues(t *T) {
5252
for i := range queues {
5353
key := db.ConsumersKey(queues[i])
5454
res := db.Cmd("ZRANK", key, client.ID)
55-
assert.Equal(t, redis.NilReply, res.Type, "res: %s", res)
55+
assert.Equal(t, true, res.IsType(redis.Nil), "res: %s", res)
5656
}
5757
}
5858

@@ -66,7 +66,7 @@ func TestStaleCleanup(t *T) {
6666
// Make sure the queue has this clientId as a consumer
6767
key := db.ConsumersKey(queue)
6868
res := db.Cmd("ZRANK", key, client.ID)
69-
assert.Equal(t, redis.IntegerReply, res.Type, "res: %s", res)
69+
assert.Equal(t, true, res.IsType(redis.Int), "res: %s", res)
7070

7171
// Remove all knowledge about this client from the consumer state
7272
callCh <- func(s *state) {
@@ -80,5 +80,5 @@ func TestStaleCleanup(t *T) {
8080

8181
// Make sure this client is no longer a consumer
8282
res = db.Cmd("ZRANK", key, client.ID)
83-
assert.Equal(t, redis.NilReply, res.Type, "key: %s clientId: %s res: %s", key, client.ID, res)
83+
assert.Equal(t, true, res.IsType(redis.Nil), "key: %s clientId: %s res: %s", key, client.ID, res)
8484
}

commands/commands.go

+11-13
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@ import (
99
"strings"
1010
"time"
1111

12-
"github.com/fzzy/radix/redis"
13-
"github.com/fzzy/radix/redis/resp"
14-
1512
"github.com/mc0/okq/clients"
1613
"github.com/mc0/okq/clients/consumers"
1714
"github.com/mc0/okq/db"
1815
"github.com/mc0/okq/log"
16+
"github.com/mediocregopher/radix.v2/redis"
1917
)
2018

2119
type commandFunc func(*clients.Client, []string) (interface{}, error)
@@ -38,7 +36,7 @@ var commandMap = map[string]commandInfo{
3836
"PING": {ping, 0},
3937
}
4038

41-
var okSS = resp.NewSimpleString("OK")
39+
var okSS = redis.NewRespSimple("OK")
4240

4341
// Dispatch takes in a client whose command has already been read off the
4442
// socket, a list of arguments from that command (not including the command name
@@ -62,7 +60,7 @@ func Dispatch(client *clients.Client, cmd string, args []string) {
6260
return
6361
}
6462

65-
resp.WriteArbitrary(client.Conn, ret)
63+
redis.NewResp(ret).WriteTo(client.Conn)
6664
}
6765

6866
func parseInt(from, as string) (int, error) {
@@ -77,8 +75,8 @@ func parseInt(from, as string) (int, error) {
7775

7876
func drainPipeline(redisClient *redis.Client) error {
7977
for {
80-
err := redisClient.GetReply().Err
81-
if err == redis.PipelineQueueEmptyError {
78+
err := redisClient.PipeResp().Err
79+
if err == redis.ErrPipelineEmpty {
8280
break
8381
} else if err != nil {
8482
return err
@@ -87,9 +85,9 @@ func drainPipeline(redisClient *redis.Client) error {
8785
return nil
8886
}
8987

90-
func writeErrf(w io.Writer, format string, args ...interface{}) error {
88+
func writeErrf(w io.Writer, format string, args ...interface{}) {
9189
err := fmt.Errorf(format, args...)
92-
return resp.WriteArbitrary(w, err)
90+
redis.NewResp(err).WriteTo(w)
9391
}
9492

9593
func qregister(client *clients.Client, args []string) (interface{}, error) {
@@ -134,7 +132,7 @@ func qpeekgeneric(
134132

135133
var eventRaw string
136134
reply := db.Cmd("HGET", itemsKey, eventID)
137-
if reply.Type == redis.NilReply {
135+
if reply.IsType(redis.Nil) {
138136
return nil, nil
139137
}
140138
if eventRaw, err = reply.Str(); err != nil {
@@ -164,7 +162,7 @@ func qrpop(client *clients.Client, args []string) (interface{}, error) {
164162
unclaimedKey := db.UnclaimedKey(queueName)
165163
claimedKey := db.ClaimedKey(queueName)
166164
reply := db.Cmd("RPOPLPUSH", unclaimedKey, claimedKey)
167-
if reply.Type == redis.NilReply {
165+
if reply.IsType(redis.Nil) {
168166
return nil, nil
169167
}
170168

@@ -365,8 +363,8 @@ func qstatus(client *clients.Client, args []string) (interface{}, error) {
365363
return queueStatuses, nil
366364
}
367365

368-
var pong = resp.NewSimpleString("PONG")
366+
var pongSS = redis.NewRespSimple("PONG")
369367

370368
func ping(client *clients.Client, args []string) (interface{}, error) {
371-
return pong, nil
369+
return pongSS, nil
372370
}

commands/commands_test.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,42 @@ import (
66
. "testing"
77
"time"
88

9-
"github.com/fzzy/radix/redis/resp"
9+
"github.com/mediocregopher/radix.v2/redis"
1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
1212

1313
"github.com/mc0/okq/clients"
1414
)
1515

1616
func readAndAssertStr(t *T, client *clients.Client, expected string) {
17-
m, err := resp.ReadMessage(client.Conn)
18-
require.Nil(t, err, "stack:\n%s", debug.Stack())
17+
rr := redis.NewRespReader(client.Conn)
18+
m := rr.Read()
19+
require.Nil(t, m.Err, "stack:\n%s", debug.Stack())
1920
s, err := m.Str()
2021
require.Nil(t, err, "stack:\n%s", debug.Stack())
2122
assert.Equal(t, expected, s, "m: %v stack:\n%s", m, debug.Stack())
2223
}
2324

2425
func readAndAssertInt(t *T, client *clients.Client, expected int64) {
25-
m, err := resp.ReadMessage(client.Conn)
26-
require.Nil(t, err, "stack:\n%s", debug.Stack())
26+
rr := redis.NewRespReader(client.Conn)
27+
m := rr.Read()
28+
require.Nil(t, m.Err, "stack:\n%s", debug.Stack())
2729
i, err := m.Int()
2830
require.Nil(t, err, "stack:\n%s", debug.Stack())
2931
assert.Equal(t, expected, i, "m: %v stack:\n%s", m, debug.Stack())
3032
}
3133

3234
func readAndAssertNil(t *T, client *clients.Client) {
33-
m, err := resp.ReadMessage(client.Conn)
34-
require.Nil(t, err, "stack:\n%s", debug.Stack())
35-
assert.Equal(t, resp.Nil, m.Type, "m: %v stack:\n%s", m, debug.Stack())
35+
rr := redis.NewRespReader(client.Conn)
36+
m := rr.Read()
37+
require.Nil(t, m.Err, "stack:\n%s", debug.Stack())
38+
assert.Equal(t, true, m.IsType(redis.Nil), "m: %v stack:\n%s", m, debug.Stack())
3639
}
3740

3841
func readAndAssertArr(t *T, client *clients.Client, expected []string) {
39-
m, err := resp.ReadMessage(client.Conn)
40-
require.Nil(t, err, "stack:\n%s", debug.Stack())
42+
rr := redis.NewRespReader(client.Conn)
43+
m := rr.Read()
44+
require.Nil(t, m.Err, "stack:\n%s", debug.Stack())
4145

4246
arr, err := m.Array()
4347
require.Nil(t, err, "stack:\n%s", debug.Stack())

config/config.go

+28-17
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
package config
44

55
import (
6-
"log" // we don't use our log because it imports this package
7-
8-
"github.com/mediocregopher/flagconfig"
6+
"github.com/mediocregopher/lever"
97
)
108

119
// Variables populated by the flag parsing process at runtime
@@ -17,19 +15,32 @@ var (
1715
)
1816

1917
func init() {
20-
fc := flagconfig.New("redeqeue")
21-
22-
fc.StrParam("listen-addr", "Address to listen for client connections on", ":4777")
23-
fc.StrParam("redis-addr", "Address redis is listening on", "127.0.0.1:6379")
24-
fc.FlagParam("redis-cluster", "Whether or not to treat the redis address as a node in a larger cluster", false)
25-
fc.FlagParam("debug", "Turn on debug logging", false)
26-
27-
if err := fc.Parse(); err != nil {
28-
log.Fatal(err)
29-
}
18+
l := lever.New("okq", nil)
19+
l.Add(lever.Param{
20+
Name: "--listen-addr",
21+
Description: "Address to listen for client connections on",
22+
Default: ":4777",
23+
})
24+
l.Add(lever.Param{
25+
Name: "--redis-addr",
26+
Description: "Address redis is listening on",
27+
Default: "127.0.0.1:6379",
28+
})
29+
l.Add(lever.Param{
30+
Name: "--redis-cluster",
31+
Description: "Whether or not to treat the redis address as a node in a larger cluster",
32+
Flag: true,
33+
})
34+
l.Add(lever.Param{
35+
Name: "--debug",
36+
Aliases: []string{"-d"},
37+
Description: "Turn on debug logging",
38+
Flag: true,
39+
})
40+
l.Parse()
3041

31-
ListenAddr = fc.GetStr("listen-addr")
32-
RedisAddr = fc.GetStr("redis-addr")
33-
RedisCluster = fc.GetFlag("redis-cluster")
34-
Debug = fc.GetFlag("debug")
42+
ListenAddr, _ = l.ParamStr("--listen-addr")
43+
RedisAddr, _ = l.ParamStr("--redis-addr")
44+
RedisCluster = l.ParamFlag("--redis-cluster")
45+
Debug = l.ParamFlag("debug")
3546
}

db/db.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,28 @@ import (
66
"errors"
77
"strings"
88

9-
"github.com/fzzy/radix/redis"
109
"github.com/mc0/okq/config"
1110
"github.com/mc0/okq/log"
11+
"github.com/mediocregopher/radix.v2/redis"
1212
)
1313

1414
// These functions are filled in dynamically at runtime
1515
var (
1616
// Cmd is a function which will perform the given cmd/args in redis and
17-
// returns the reply. It automatically handles using redis cluster, if that
17+
// returns the resp. It automatically handles using redis cluster, if that
1818
// is enabled
19-
Cmd func(string, ...interface{}) *redis.Reply
19+
Cmd func(string, ...interface{}) *redis.Resp
2020

2121
// Pipe runs a set of commands (given by p) one after the other. It is *not*
2222
// guaranteed that all the commands will be run on the same client. If any
2323
// commands return an error the pipeline will stop and return that error.
24-
// Otherwise the Reply from each command is returned in a slice
24+
// Otherwise the Resp from each command is returned in a slice
2525
//
2626
// r, err := db.Pipe(
2727
// db.PP("SET", "foo", "bar"),
2828
// db.PP("GET", "foo"),
2929
// )
30-
Pipe func(...*PipePart) ([]*redis.Reply, error)
30+
Pipe func(...*PipePart) ([]*redis.Resp, error)
3131

3232
// Scan is a function which returns a channel to which keys matching the
3333
// given pattern are written to. The channel must be read from until it is
@@ -36,6 +36,16 @@ var (
3636
//
3737
// This should not be used in any critical paths
3838
Scan func(string) <-chan string
39+
40+
// Lua performs one of the preloaded Lua scripts that have been built-in.
41+
// It's *possible* that the script wasn't loaded in initLuaScripts() for
42+
// some strange reason, this tries to handle that case as well. The integer
43+
// passed in is the number of keys the command takes in
44+
//
45+
// Example:
46+
//
47+
// db.Lua(redisClient, "LREMRPUSH", 2, "foo", "bar", "value")
48+
Lua func(string, int, ...interface{}) *redis.Resp
3949
)
4050

4151
// PipePart is a single command to be run in a pipe. See Pipe for an example on
@@ -147,8 +157,12 @@ func scanHelper(redisClient *redis.Client, pattern string, retCh chan string) er
147157
if r.Err != nil {
148158
return r.Err
149159
}
160+
elems, err := r.Array()
161+
if err != nil {
162+
return err
163+
}
150164

151-
results, err := r.Elems[1].List()
165+
results, err := elems[1].List()
152166
if err != nil {
153167
return err
154168
}
@@ -157,7 +171,7 @@ func scanHelper(redisClient *redis.Client, pattern string, retCh chan string) er
157171
retCh <- results[i]
158172
}
159173

160-
if cursor, err = r.Elems[0].Str(); err != nil {
174+
if cursor, err = elems[0].Str(); err != nil {
161175
return err
162176
} else if cursor == "0" {
163177
return nil

0 commit comments

Comments
 (0)