Skip to content

Commit 4a20d30

Browse files
mc0Mark Caudill
authored and
Mark Caudill
committed
initial commit with working version
0 parents  commit 4a20d30

7 files changed

+818
-0
lines changed

.gitignore

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Compiled Object files, Static and Dynamic libs (Shared Objects)
2+
*.o
3+
*.a
4+
*.so
5+
6+
# Folders
7+
_obj
8+
_test
9+
.idea
10+
11+
# Architecture specific extensions/prefixes
12+
*.[568vq]
13+
[568vq].out
14+
15+
*.cgo1.go
16+
*.cgo2.c
17+
_cgo_defun.c
18+
_cgo_gotypes.go
19+
_cgo_export.*
20+
21+
_testmain.go
22+
23+
*.exe
24+
*.test
25+
*.prof

LICENSE

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2014 Mark Caudill
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.
22+

README.md

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
Redeque
2+
=======
3+
4+
A simple go-based reliable event queueing with at-least-once support.
5+
6+
The underlying data storage is [Redis](http://redis.io) and the protocol is [RESP](http://redis.io/topics/protocol).
7+
This allows swapping connecting directly to the service from any redis client.
8+
9+
Commands
10+
--------
11+
* QREGISTER queue [queue ...]
12+
13+
optionally register the queues a consumer will pop from
14+
15+
* QRPOP queue
16+
17+
Grab the right-most event from a queue
18+
19+
* QLPEEK queue
20+
21+
Look at the contents of the left-most event in a queue
22+
23+
* QRPEEK queue
24+
25+
See QLPEEK; except the right-most
26+
27+
* QREM queue eventId
28+
29+
Remove an event from a queue. Should be done after the event is successfully consumed.
30+
31+
* QLPUSH queue eventId contents
32+
33+
Add an event as the new left-most event in the queue
34+
35+
* QRPUSH queue eventId contents
36+
37+
See QLPUSH; except the right-most. Usually for high-priority events.
38+
39+
* QSTATUS
40+
41+
Get the status of all active queues on the system in the format: [queue] [total] [processing]

commands.go

+279
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/fzzy/radix/redis"
7+
"github.com/fzzy/radix/redis/resp"
8+
"strconv"
9+
)
10+
11+
func qregister(client *Client, args []string) {
12+
// reset the queue list to what was sent
13+
client.queues = args
14+
conn := *client.conn
15+
16+
throttledWriteAllConsumers()
17+
18+
conn.Write([]byte("+OK\r\n"))
19+
}
20+
21+
func qpeekgeneric(client *Client, args []string, direction string) {
22+
var err error
23+
conn := *client.conn
24+
if len(args) < 1 {
25+
err = errors.New("ERR missing args")
26+
resp.WriteArbitrary(conn, err)
27+
return
28+
}
29+
30+
}
31+
32+
func qrpop(client *Client, args []string) {
33+
var err error
34+
conn := *client.conn
35+
if len(args) < 1 {
36+
err = errors.New("ERR missing args")
37+
resp.WriteArbitrary(conn, err)
38+
return
39+
}
40+
41+
queueName := args[0]
42+
expires := 30
43+
//delete := false
44+
if len(args) > 2 && args[1] == "EX" {
45+
argsRest := args[1:]
46+
for i := range argsRest {
47+
switch argsRest[i] {
48+
case "EX":
49+
expires, err = strconv.Atoi(args[2])
50+
if err != nil {
51+
err = errors.New("ERR bad expires value")
52+
resp.WriteArbitrary(conn, err)
53+
return
54+
}
55+
case "DELETE":
56+
//delete = true
57+
}
58+
}
59+
}
60+
61+
redisClient, err := redisPool.Get()
62+
if err != nil {
63+
err = errors.New(fmt.Sprintf("ERR failed to get redis conn %q", err))
64+
logger.Printf("%s", err)
65+
resp.WriteArbitrary(conn, err)
66+
return
67+
}
68+
defer redisPool.Put(redisClient)
69+
70+
reply := redisClient.Cmd("RPOPLPUSH", "queue:"+queueName, "queue:claimed:"+queueName)
71+
if reply.Err != nil {
72+
err = errors.New(fmt.Sprintf("ERR rpoplpush redis replied %q", reply.Err))
73+
logger.Printf("%s", err)
74+
resp.WriteArbitrary(conn, err)
75+
return
76+
}
77+
if reply.Type == redis.NilReply {
78+
resp.WriteArbitrary(conn, nil)
79+
return
80+
}
81+
82+
eventID, err := reply.Str()
83+
if err != nil {
84+
err = errors.New(fmt.Sprintf("ERR cast failed %q", err))
85+
logger.Printf("%s", err)
86+
resp.WriteArbitrary(conn, err)
87+
return
88+
}
89+
90+
reply = redisClient.Cmd("SET", "queue:lock:"+queueName+":"+eventID, 1, "EX", strconv.Itoa(expires), "NX")
91+
if reply.Err != nil {
92+
err = errors.New(fmt.Sprintf("ERR set redis replied %q", reply.Err))
93+
logger.Printf("%s", err)
94+
resp.WriteArbitrary(conn, err)
95+
return
96+
}
97+
98+
reply = redisClient.Cmd("HGET", "queue:items:"+queueName, eventID)
99+
if reply.Err != nil {
100+
err = errors.New(fmt.Sprintf("ERR hget redis replied %q", reply.Err))
101+
logger.Printf("%s", err)
102+
resp.WriteArbitrary(conn, err)
103+
return
104+
}
105+
106+
var eventRaw string
107+
if reply.Type != redis.NilReply {
108+
eventRaw, err = reply.Str()
109+
if err != nil {
110+
err = errors.New(fmt.Sprintf("ERR cast failed %q", err))
111+
logger.Printf("%s", err)
112+
resp.WriteArbitrary(conn, err)
113+
return
114+
}
115+
}
116+
117+
result := [2]string{eventID, eventRaw}
118+
119+
resp.WriteArbitrary(conn, result)
120+
121+
go func(client Client, queueName string, eventID string) {
122+
// TODO: don't send the real client here
123+
//fakeClient := Client{conn: &io.PipeWriter{}}
124+
qrem(&client, []string{queueName, eventID})
125+
}(*client, queueName, eventID)
126+
}
127+
128+
func qrem(client *Client, args []string) {
129+
var err error
130+
conn := *client.conn
131+
if len(args) < 2 {
132+
err = errors.New("ERR missing args")
133+
resp.WriteArbitrary(conn, err)
134+
return
135+
}
136+
redisClient, err := redisPool.Get()
137+
if err != nil {
138+
return
139+
}
140+
defer redisPool.Put(redisClient)
141+
142+
reply := redisClient.Cmd("LREM", "queue:claimed:"+args[0], -1, args[1])
143+
if reply.Err != nil {
144+
err = errors.New(fmt.Sprintf("ERR lrem redis replied %q", reply.Err))
145+
logger.Printf("%s", err)
146+
resp.WriteArbitrary(conn, err)
147+
return
148+
}
149+
150+
delReply := redisClient.Cmd("HDEL", "queue:items:"+args[0], args[1])
151+
if delReply.Err != nil {
152+
err = errors.New(fmt.Sprintf("ERR hdel redis replied %q", delReply.Err))
153+
logger.Printf("%s", err)
154+
resp.WriteArbitrary(conn, err)
155+
return
156+
}
157+
158+
delReply = redisClient.Cmd("DEL", "queue:lock:"+args[0]+":"+args[1])
159+
if delReply.Err != nil {
160+
err = errors.New(fmt.Sprintf("ERR del redis replied %q", delReply.Err))
161+
logger.Printf("%s", err)
162+
resp.WriteArbitrary(conn, err)
163+
return
164+
}
165+
166+
numRemoved, _ := reply.Int()
167+
168+
resp.WriteArbitrary(conn, numRemoved)
169+
}
170+
171+
func qpushgeneric(client *Client, args []string, direction string) {
172+
var err error
173+
conn := *client.conn
174+
if len(args) < 3 {
175+
err = errors.New("ERR missing args")
176+
resp.WriteArbitrary(conn, err)
177+
return
178+
}
179+
180+
redisClient, err := redisPool.Get()
181+
if err != nil {
182+
return
183+
}
184+
defer redisPool.Put(redisClient)
185+
186+
reply := redisClient.Cmd("HSETNX", "queue:items:"+args[0], args[1], args[2])
187+
if reply.Err != nil {
188+
err = errors.New(fmt.Sprintf("ERR redis replied %q", reply.Err))
189+
logger.Printf("%s", err)
190+
resp.WriteArbitrary(conn, err)
191+
return
192+
}
193+
194+
created, err := reply.Int()
195+
if err != nil {
196+
err = errors.New(fmt.Sprintf("ERR cast failed %q", err))
197+
logger.Printf("%s", err)
198+
resp.WriteArbitrary(conn, err)
199+
return
200+
}
201+
if created == 0 {
202+
err = errors.New(fmt.Sprintf("ERR duplicate event %q", args[1]))
203+
resp.WriteArbitrary(conn, err)
204+
return
205+
}
206+
207+
if direction == "left" {
208+
reply = redisClient.Cmd("LPUSH", "queue:"+args[0], args[1])
209+
} else {
210+
reply = redisClient.Cmd("RPUSH", "queue:"+args[0], args[1])
211+
}
212+
if reply.Err != nil {
213+
err = errors.New(fmt.Sprintf("ERR redis replied %q", reply.Err))
214+
logger.Printf("%s", err)
215+
resp.WriteArbitrary(conn, err)
216+
return
217+
}
218+
219+
conn.Write([]byte("+OK\r\n"))
220+
}
221+
222+
func qstatus(client *Client, args []string) {
223+
var err error
224+
conn := *client.conn
225+
if len(args) > 0 {
226+
err = errors.New("wrong number of arguments for 'qstatus' command")
227+
resp.WriteArbitrary(conn, err)
228+
return
229+
}
230+
231+
redisClient, err := redisPool.Get()
232+
if err != nil {
233+
return
234+
}
235+
defer redisPool.Put(redisClient)
236+
237+
queueNames, err := getAllQueueNames(redisClient)
238+
if err != nil {
239+
resp.WriteArbitrary(conn, err)
240+
return
241+
}
242+
243+
var queueStatuses []string
244+
245+
for i := range queueNames {
246+
queueName := queueNames[i]
247+
claimedCount := 0
248+
availableCount := 0
249+
totalCount := 0
250+
251+
claimedReply := redisClient.Cmd("LLEN", "queue:claimed:"+queueName)
252+
if claimedReply.Err != nil {
253+
err = errors.New(fmt.Sprintf("ERR llen redis replied %q", claimedReply.Err))
254+
logger.Printf("%s", err)
255+
resp.WriteArbitrary(conn, err)
256+
return
257+
}
258+
if claimedReply.Type == redis.IntegerReply {
259+
claimedCount, _ = claimedReply.Int()
260+
}
261+
262+
availableReply := redisClient.Cmd("LLEN", "queue:"+queueName)
263+
if availableReply.Err != nil {
264+
err = errors.New(fmt.Sprintf("ERR llen redis replied %q", availableReply.Err))
265+
logger.Printf("%s", err)
266+
resp.WriteArbitrary(conn, err)
267+
return
268+
}
269+
if availableReply.Type == redis.IntegerReply {
270+
availableCount, _ = availableReply.Int()
271+
}
272+
273+
totalCount = availableCount + claimedCount
274+
275+
queueStatuses = append(queueStatuses, queueName+" "+strconv.Itoa(totalCount)+" "+strconv.Itoa(claimedCount))
276+
}
277+
278+
resp.WriteArbitrary(conn, queueStatuses)
279+
}

0 commit comments

Comments
 (0)