Skip to content

Commit f0d8c65

Browse files
author
Brian Picciano
committed
changes suggested by golint
1 parent 1736a54 commit f0d8c65

14 files changed

+121
-90
lines changed

clients/clients.go

+17-14
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,27 @@ func init() {
1818
}()
1919
}
2020

21-
// Obstensibly a net.Conn, but for testing we don't want to have to set up a
22-
// real listen socket and all that noise
21+
// ClientConn is obstensibly a net.Conn, but for testing we don't want to have
22+
// to set up a real listen socket and all that noise
2323
type ClientConn interface {
2424
io.ReadWriteCloser
2525
RemoteAddr() net.Addr
2626
}
2727

28-
// Represents a single client, either one just submitting events or a consumer.
29-
// It expects to be handled in a single threaded context, except for methods
30-
// marked otherwise (specifically Notify and DrainNotifyCh)
28+
// Client represents a single okq client, either one just submitting events or a
29+
// consumer. It expects to be handled in a single threaded context, except for
30+
// methods marked otherwise (specifically Notify and DrainNotifyCh)
3131
type Client struct {
32-
Id string
32+
ID string
3333
Queues []string
3434
Conn ClientConn
3535
NotifyCh chan string
3636
}
3737

38+
// NewClient creates a new Client structure around the given ClientConn
3839
func NewClient(conn ClientConn) *Client {
3940
client := &Client{
40-
Id: <-uuidCh,
41+
ID: <-uuidCh,
4142
Conn: conn,
4243
Queues: []string{},
4344
NotifyCh: make(chan string, 1),
@@ -46,34 +47,36 @@ func NewClient(conn ClientConn) *Client {
4647
return client
4748
}
4849

49-
// Notifies the client that queueName has an event on it. This may be called
50-
// from another thread besides the one which "owns" the client
50+
// Notify notifies the client that queueName has an event on it. This may be
51+
// called from another thread besides the one which "owns" the client
5152
func (client *Client) Notify(queueName string) {
5253
select {
5354
case client.NotifyCh <- queueName:
5455
default:
5556
}
5657
}
5758

58-
// Removes any queue notifications that may be buffered in the client. This may
59-
// be called from another thread besides the one which "owns" the client
59+
// DrainNotifyCh removes any queue notifications that may be buffered in the
60+
// client. This may be called from another thread besides the one which "owns"
61+
// the client
6062
func (client *Client) DrainNotifyCh() {
6163
select {
6264
case <-client.NotifyCh:
6365
default:
6466
}
6567
}
6668

69+
// Close closes the client's connection
6770
func (client *Client) Close() {
6871
client.Conn.Close()
6972
}
7073

71-
// Returns the given formatted string with a little extra info about the client
72-
// prepended to it
74+
// Sprintf returns the given formatted string with a little extra info about the
75+
// client prepended to it
7376
func (client *Client) Sprintf(format string, args ...interface{}) error {
7477
fullFormat := "client %v %v - " + format
7578
fullArgs := make([]interface{}, 0, len(args)+2)
76-
fullArgs = append(fullArgs, client.Id)
79+
fullArgs = append(fullArgs, client.ID)
7780
fullArgs = append(fullArgs, client.Conn.RemoteAddr())
7881
fullArgs = append(fullArgs, args...)
7982

clients/consumers/active.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,20 @@ import (
1616
//
1717
// * Update the timestamps of all the currently live consumers on this instance
1818
// in redis, so other okq instances don't truncate them
19+
1920
const (
20-
STALE_CONSUMER_TIMEOUT = 30 * time.Second
21+
// StaleConsumerTimeout is the time a consumer has to update its register
22+
// status on a queue before it is considered stale and removed
23+
StaleConsumerTimeout = 30 * time.Second
2124
)
2225

2326
func activeSpin() {
2427
tick := time.Tick(10 * time.Second)
25-
for _ = range tick {
28+
for range tick {
2629
if err := updateActiveConsumers(); err != nil {
2730
log.L.Printf("updating active consumers: %s", err)
2831
}
29-
if err := removeStaleConsumers(STALE_CONSUMER_TIMEOUT); err != nil {
32+
if err := removeStaleConsumers(StaleConsumerTimeout); err != nil {
3033
log.L.Printf("removing stale consumers: %s", err)
3134
}
3235
}
@@ -50,7 +53,7 @@ func updateActiveConsumers() error {
5053
consumersArgs[queue] = args
5154
}
5255

53-
consumersArgs[queue] = append(args, ts, client.Id)
56+
consumersArgs[queue] = append(args, ts, client.ID)
5457
}
5558
}
5659

clients/consumers/consumers.go

+21-21
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ type state struct {
3333

3434
// Returns a slice containing a client's current queues, or empty slice if there
3535
// are none registered
36-
func (s *state) clientQueues(clientId string) []string {
36+
func (s *state) clientQueues(clientID string) []string {
3737
queues := make([]string, 0, 8)
3838
for q, clients := range s.queueM {
3939
for _, client := range clients {
40-
if client.Id == clientId {
40+
if client.ID == clientID {
4141
queues = append(queues, q)
4242
}
4343
}
@@ -50,9 +50,9 @@ func (s *state) clientQueues(clientId string) []string {
5050
func (s *state) addClientQueues(client *clients.Client, queues []string) {
5151
for _, q := range queues {
5252
if mc := s.queueM[q]; mc != nil {
53-
mc[client.Id] = client
53+
mc[client.ID] = client
5454
} else {
55-
s.queueM[q] = map[string]*clients.Client{client.Id: client}
55+
s.queueM[q] = map[string]*clients.Client{client.ID: client}
5656
}
5757
}
5858
}
@@ -61,7 +61,7 @@ func (s *state) addClientQueues(client *clients.Client, queues []string) {
6161
func (s *state) removeClientQueues(client *clients.Client, queues []string) {
6262
for _, q := range queues {
6363
if mc := s.queueM[q]; mc != nil {
64-
delete(mc, client.Id)
64+
delete(mc, client.ID)
6565
if len(mc) == 0 {
6666
delete(s.queueM, q)
6767
}
@@ -72,33 +72,33 @@ func (s *state) removeClientQueues(client *clients.Client, queues []string) {
7272
// Returns a slice containing a queue's currently registered clients, or empty
7373
// slice if there are none registered
7474
func (s *state) queueClients(queue string) []*clients.Client {
75-
if mc := s.queueM[queue]; mc == nil {
75+
mc := s.queueM[queue]
76+
if mc == nil {
7677
return []*clients.Client{}
77-
} else {
78-
cs := make([]*clients.Client, 0, len(mc))
79-
for _, c := range mc {
80-
cs = append(cs, c)
81-
}
82-
return cs
8378
}
79+
cs := make([]*clients.Client, 0, len(mc))
80+
for _, c := range mc {
81+
cs = append(cs, c)
82+
}
83+
return cs
8484
}
8585

8686
// Returns a slice of all queues currently registered by at least one client
8787
func (s *state) registeredQueues() []string {
8888
qs := make([]string, 0, len(s.queueM))
89-
for q, _ := range s.queueM {
89+
for q := range s.queueM {
9090
qs = append(qs, q)
9191
}
9292
return qs
9393
}
9494

95-
// Should be called whenever a client changes what queues it's registered for.
96-
// The new full list of registered queues should be passed in, this method will
97-
// do a diff and figure it out what was removed
95+
// UpdateQueues should be called whenever a client changes what queues it's
96+
// registered for. The new full list of registered queues should be passed in,
97+
// this method will do a diff and figure it out what was removed
9898
func UpdateQueues(client *clients.Client, queues []string) error {
9999
respCh := make(chan error)
100100
callCh <- func(s *state) {
101-
oldQueues := s.clientQueues(client.Id)
101+
oldQueues := s.clientQueues(client.ID)
102102
removed := stringSliceSub(oldQueues, queues)
103103
s.addClientQueues(client, queues)
104104
s.removeClientQueues(client, removed)
@@ -120,12 +120,12 @@ func UpdateQueues(client *clients.Client, queues []string) error {
120120

121121
for _, queueName := range removed {
122122
consumersKey := db.ConsumersKey(queueName)
123-
redisClient.Append("ZREM", consumersKey, client.Id)
123+
redisClient.Append("ZREM", consumersKey, client.ID)
124124
pipelineSize++
125125
}
126126
for _, queueName := range queues {
127127
consumersKey := db.ConsumersKey(queueName)
128-
redisClient.Append("ZADD", consumersKey, ts, client.Id)
128+
redisClient.Append("ZADD", consumersKey, ts, client.ID)
129129
pipelineSize++
130130
}
131131
for i := 0; i < pipelineSize; i++ {
@@ -173,8 +173,8 @@ outer:
173173
return ret
174174
}
175175

176-
// Returns the total number of consumers registered for the given queue, either
177-
// on this okq instance or others
176+
// QueueConsumerCount returns the total number of consumers registered for the
177+
// given queue, either on this okq instance or others
178178
func QueueConsumerCount(queue string) (int64, error) {
179179
consumersKey := db.ConsumersKey(queue)
180180

clients/consumers/consumers_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestUpdateQueues(t *T) {
2929
// Make sure the client.Id appears in the consumers set for those queues
3030
for i := range queues {
3131
key := db.ConsumersKey(queues[i])
32-
res := redisClient.Cmd("ZRANK", key, client.Id)
32+
res := redisClient.Cmd("ZRANK", key, client.ID)
3333
assert.Equal(t, redis.IntegerReply, res.Type, "res: %s", res)
3434
}
3535

@@ -38,13 +38,13 @@ func TestUpdateQueues(t *T) {
3838

3939
// Make sure the first queue had this clientId removed from it
4040
key := db.ConsumersKey(queues[0])
41-
res := redisClient.Cmd("ZRANK", key, client.Id)
41+
res := redisClient.Cmd("ZRANK", key, client.ID)
4242
assert.Equal(t, redis.NilReply, res.Type, "res: %s", res)
4343

4444
// Make sure the rest of the queues still have it
4545
for i := range queues[1:] {
4646
key := db.ConsumersKey(queues[1:][i])
47-
res := redisClient.Cmd("ZRANK", key, client.Id)
47+
res := redisClient.Cmd("ZRANK", key, client.ID)
4848
assert.Equal(t, redis.IntegerReply, res.Type, "res: %s", res)
4949
}
5050

@@ -54,7 +54,7 @@ func TestUpdateQueues(t *T) {
5454
// Make sure the clientId appears nowhere
5555
for i := range queues {
5656
key := db.ConsumersKey(queues[i])
57-
res := redisClient.Cmd("ZRANK", key, client.Id)
57+
res := redisClient.Cmd("ZRANK", key, client.ID)
5858
assert.Equal(t, redis.NilReply, res.Type, "res: %s", res)
5959
}
6060

@@ -73,7 +73,7 @@ func TestStaleCleanup(t *T) {
7373

7474
// Make sure the queue has this clientId as a consumer
7575
key := db.ConsumersKey(queue)
76-
res := redisClient.Cmd("ZRANK", key, client.Id)
76+
res := redisClient.Cmd("ZRANK", key, client.ID)
7777
assert.Equal(t, redis.IntegerReply, res.Type, "res: %s", res)
7878

7979
// Remove all knowledge about this client from the consumer state
@@ -87,7 +87,7 @@ func TestStaleCleanup(t *T) {
8787
require.Nil(t, err)
8888

8989
// Make sure this client is no longer a consumer
90-
res = redisClient.Cmd("ZRANK", key, client.Id)
91-
assert.Equal(t, redis.NilReply, res.Type, "key: %s clientId: %s res: %s", key, client.Id, res)
90+
res = redisClient.Cmd("ZRANK", key, client.ID)
91+
assert.Equal(t, redis.NilReply, res.Type, "key: %s clientId: %s res: %s", key, client.ID, res)
9292

9393
}

clients/fake_client.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,36 @@ import (
77
"net"
88
)
99

10-
// Implements clients.ClientConn, but isn't an actual network connection. Can
11-
// still be passed into resp.ReadMessage and other things that take in
10+
// FakeClientConn implements ClientConn, but isn't an actual network connection.
11+
// Can still be passed into resp.ReadMessage and other things that take in
1212
// io.Readers and io.Writers. Used for testing here and other places
1313
type FakeClientConn struct {
1414
*bytes.Buffer
1515
}
1616

17+
// NewFakeClientConn initializes a FakeClientConn and returns it
1718
func NewFakeClientConn() *FakeClientConn {
1819
return &FakeClientConn{
1920
bytes.NewBuffer(make([]byte, 0, 1024)),
2021
}
2122
}
2223

24+
// Close does nothing and always returns nil. It is only here to implement the
25+
// ClientConn interface
2326
func (fconn *FakeClientConn) Close() error {
2427
return nil
2528
}
2629

30+
// RemoteAddr always returns nil. It is only here to implement the ClientConn
31+
// interface
2732
func (fconn *FakeClientConn) RemoteAddr() net.Addr {
2833
return nil
2934
}
3035

31-
// Kind of an odd place for this, but it has to go somewhere. Returns a random
32-
// string to use as a queue name. Used for testing in a few places
36+
// Kind of an odd place for this, but it has to go somewhere.
37+
38+
// RandQueueName returns a random string to use as a queue name. Used for
39+
// testing in a few places
3340
func RandQueueName() string {
3441
b := make([]byte, 10)
3542
if _, err := rand.Read(b); err != nil {

commands/commands.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Contains all commands callable by a client
1+
// Package commands contains all commands callable by a client
22
package commands
33

44
import (
@@ -40,10 +40,9 @@ var commandMap = map[string]commandInfo{
4040

4141
var okSS = resp.NewSimpleString("OK")
4242

43-
// All commands take in a client whose command has already been read off the
43+
// Dispatch takes in a client whose command has already been read off the
4444
// socket, a list of arguments from that command (not including the command name
45-
// itself), and return an error ONLY if the error is worth logging (disconnect
46-
// from redis, etc...)
45+
// itself), and handles that command
4746
func Dispatch(client *clients.Client, cmd string, args []string) {
4847
cmdInfo, ok := commandMap[strings.ToUpper(cmd)]
4948
if !ok {

commands/commands_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,22 @@ func TestQRegister(t *T) {
8181
func TestBasicFunctionality(t *T) {
8282
client := newClient()
8383
queue := clients.RandQueueName()
84-
events := []struct{ eventId, event string }{
84+
events := []struct{ eventID, event string }{
8585
{"0", "foo"},
8686
{"1", "bar"},
8787
{"2", "baz"},
8888
}
8989

9090
for i := range events {
91-
Dispatch(client, "qlpush", []string{queue, events[i].eventId, events[i].event})
91+
Dispatch(client, "qlpush", []string{queue, events[i].eventID, events[i].event})
9292
readAndAssertStr(t, client, "OK")
9393
}
9494

9595
for i := range events {
9696
Dispatch(client, "qrpop", []string{queue})
97-
readAndAssertArr(t, client, []string{events[i].eventId, events[i].event})
97+
readAndAssertArr(t, client, []string{events[i].eventID, events[i].event})
9898

99-
Dispatch(client, "qack", []string{queue, events[i].eventId})
99+
Dispatch(client, "qack", []string{queue, events[i].eventID})
100100
readAndAssertInt(t, client, 1)
101101
}
102102
}
@@ -118,7 +118,7 @@ func TestQStatus(t *T) {
118118
func TestPeeks(t *T) {
119119
client := newClient()
120120
queue := clients.RandQueueName()
121-
events := []struct{ eventId, event string }{
121+
events := []struct{ eventID, event string }{
122122
{"0", "foo"},
123123
{"1", "bar"},
124124
{"2", "baz"},
@@ -133,15 +133,15 @@ func TestPeeks(t *T) {
133133
readAndAssertNil(t, client)
134134

135135
for i := range events {
136-
Dispatch(client, "qlpush", []string{queue, events[i].eventId, events[i].event})
136+
Dispatch(client, "qlpush", []string{queue, events[i].eventID, events[i].event})
137137
readAndAssertStr(t, client, "OK")
138138
}
139139

140140
Dispatch(client, "qrpeek", []string{queue})
141-
readAndAssertArr(t, client, []string{eventFirst.eventId, eventFirst.event})
141+
readAndAssertArr(t, client, []string{eventFirst.eventID, eventFirst.event})
142142

143143
Dispatch(client, "qlpeek", []string{queue})
144-
readAndAssertArr(t, client, []string{eventLast.eventId, eventLast.event})
144+
readAndAssertArr(t, client, []string{eventLast.eventID, eventLast.event})
145145

146146
// Make sure the actual status of the queue hasn't been affected
147147
Dispatch(client, "qstatus", []string{queue})

0 commit comments

Comments
 (0)