Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 109 additions & 139 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
)

const (
Expand Down Expand Up @@ -191,122 +192,31 @@ func (client *Client) openConnection() (c net.Conn, err error) {
return
}

func (client *Client) sendCommand(cmd string, args ...string) (data interface{}, err error) {
func (client *Client) sendCommand(cmd string, args ...string) (interface{}, error) {
// grab a connection from the pool
var b []byte
c, err := client.popCon()
defer func() {
//add the client back to the queue
client.pushCon(c)
}()
if err != nil {
println(err.Error())
goto End
return nil, err
}

b = commandBytes(cmd, args...)
data, err = client.rawSend(c, b)
b := commandBytes(cmd, args...)
data, err := client.rawSend(c, b)
if err == io.EOF {
c, err = client.openConnection()
if err != nil {
println(err.Error())
goto End
return nil, err
}

data, err = client.rawSend(c, b)
}

End:

//add the client back to the queue
client.pushCon(c)

return data, err
}

func (client *Client) sendCommands(cmdArgs <-chan []string, data chan<- interface{}) (err error) {
// grab a connection from the pool
c, err := client.popCon()
var reader *bufio.Reader
var pong interface{}
var errs chan error
var errsClosed = false

if err != nil {
goto End
}

reader = bufio.NewReader(c)

// Ping first to verify connection is open
err = writeRequest(c, "PING")

// On first attempt permit a reconnection attempt
if err == io.EOF {
// Looks like we have to open a new connection
c, err = client.openConnection()
if err != nil {
goto End
}
reader = bufio.NewReader(c)
} else {
// Read Ping response
pong, err = readResponse(reader)
if pong != "PONG" {
return RedisError("Unexpected response to PING.")
}
if err != nil {
goto End
}
}

errs = make(chan error)

go func() {
for cmdArg := range cmdArgs {
err = writeRequest(c, cmdArg[0], cmdArg[1:]...)
if err != nil {
if !errsClosed {
errs <- err
}
break
}
}
if !errsClosed {
errsClosed = true
close(errs)
}
}()

go func() {
for {
response, err := readResponse(reader)
if err != nil {
if !errsClosed {
errs <- err
}
break
}
data <- response
}
if !errsClosed {
errsClosed = true
close(errs)
}
}()

// Block until errs channel closes
for e := range errs {
err = e
}

End:

// Close client and synchronization issues are a nightmare to solve.
c.Close()

// Push nil back onto queue
client.pushCon(nil)

return err
}

func (client *Client) popCon() (net.Conn, error) {
if client.pool == nil {
poolSize := client.MaxPoolSize
Expand Down Expand Up @@ -1334,69 +1244,129 @@ type Message struct {
Message []byte
}

// Subscribe to redis serve channels, this method will block until one of the sub/unsub channels are closed.
// Subscribe to redis serve channels, this method will block until one of the sub/unsub channels is closed.
// There are two pairs of channels subscribe/unsubscribe & psubscribe/punsubscribe.
// The former does an exact match on the channel, the later uses glob patterns on the redis channels.
// Closing either of these channels will unblock this method call.
// Messages that are received are sent down the messages channel.
func (client *Client) Subscribe(subscribe <-chan string, unsubscribe <-chan string, psubscribe <-chan string, punsubscribe <-chan string, messages chan<- Message) error {
cmds := make(chan []string, 0)
data := make(chan interface{}, 0)
c, err := client.popCon()
// Can't reuse connection
client.pushCon(nil)
if err != nil {
return err
}
defer c.Close()

reader := bufio.NewReader(c)

// Ping first to verify connection is open
err = writeRequest(c, "PING")

// On first attempt permit a reconnection attempt
if err == io.EOF {
// Looks like we have to open a new connection
c, err = client.openConnection()
if err != nil {
return err
}
reader = bufio.NewReader(c)
} else {
// Read ping response
pong, err := readResponse(reader)
if err != nil {
return err
}
if pong != "PONG" {
return RedisError("Unexpected response to PING.")
}
}

var wg sync.WaitGroup
cancel := make(chan struct{})
errs := make(chan error)

wg.Add(1)
go func() {
defer wg.Done()
for {
var ok bool
var channel string
var cmd string

select {
case channel = <-subscribe:
case <-cancel:
return
case channel, ok = <-subscribe:
cmd = "SUBSCRIBE"
case channel = <-unsubscribe:
case channel, ok = <-unsubscribe:
cmd = "UNSUBSCRIBE"
case channel = <-psubscribe:
case channel, ok = <-psubscribe:
cmd = "PSUBSCRIBE"
case channel = <-punsubscribe:
cmd = "UNPSUBSCRIBE"

case channel, ok = <-punsubscribe:
cmd = "PUNSUBSCRIBE"
}
if !ok { // one of the channels closed
errs <- nil
return
}
if channel == "" {
break
} else {
cmds <- []string{cmd, channel}

err := writeRequest(c, cmd, channel)
if err != nil {
errs <- err
return
}
}
close(cmds)
close(data)
}()

wg.Add(1)
go func() {
for response := range data {
db := response.([][]byte)
messageType := string(db[0])
switch messageType {
case "message":
channel, message := string(db[1]), db[2]
messages <- Message{channel, channel, message}
case "subscribe":
// Ignore
case "unsubscribe":
// Ignore
case "pmessage":
channelMatched, channel, message := string(db[1]), string(db[2]), db[3]
messages <- Message{channelMatched, channel, message}
case "psubscribe":
// Ignore
case "punsubscribe":
// Ignore

default:
// log.Printf("Unknown message '%s'", messageType)
defer wg.Done()
responses := make(chan interface{})
for {
go func() {
r, err := readResponse(reader)
if err != nil {
errs <- err
} else {
responses <- r
}
}()

select {
case <- cancel:
return

case r := <- responses:
db := r.([][]byte)
messageType := string(db[0])

switch messageType {
case "message":
channel, message := string(db[1]), db[2]
messages <- Message{channel, channel, message}
case "subscribe":
// Ignore
case "unsubscribe":
// Ignore
case "pmessage":
channelMatched, channel, message := string(db[1]), string(db[2]), db[3]
messages <- Message{channelMatched, channel, message}
case "psubscribe":
// Ignore
case "punsubscribe":
// Ignore
default:
errs <- RedisError(fmt.Sprintf("Unknown message '%s'", messageType))
return
}

}
}
}()

err := client.sendCommands(cmds, data)

err =<- errs
close(cancel)
wg.Wait()
return err
}

Expand Down
Loading