diff --git a/.travis.yml b/.travis.yml index 285387c97..aca21f34f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ env: - GOARCH=386 install: - go get github.com/bmizerany/assert + - go get github.com/bitly/go-nsq - go get github.com/bitly/go-hostpool - go get github.com/bitly/go-simplejson script: diff --git a/ChangeLog.md b/ChangeLog.md index 377e9d1ed..a3ac31f21 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -279,72 +279,10 @@ removed in a future release. * Initial public release. -## Go Client Library +## go-nsq Client Library -### 0.3.2 - 2013-08-26 - -**Upgrading from 0.3.1**: This release requires NSQ binary version `0.2.22+` for TLS support. - -New Features/Improvements: - - * #227 - TLS feature negotiation - * #164/#202/#255 - add `Writer` - * #186 - `MaxBackoffDuration` of `0` disables backoff - * #175 - support for `nsqd` config option `--max-rdy-count` - * #169 - auto-reconnect to hard-coded `nsqd` - -Bug Fixes: - - * #254/#256/#257 - new connection RDY starvation - * #250 - `nsqlookupd` polling improvements - * #243 - limit `IsStarved()` to connections w/ in-flight messages - * #169 - use last RDY count for `IsStarved()`; redistribute RDY state - * #204 - fix early termination blocking - * #177 - support `broadcast_address` - * #161 - connection pool goroutine safety - -### 0.3.1 - 2013-02-07 - -**Upgrading from 0.3.0**: This release requires NSQ binary version `0.2.17+` for `TOUCH` support. - - * #119 - add TOUCH command - * #133 - improved handling of errors/magic - * #127 - send IDENTIFY (missed in #90) - * #16 - add backoff to Reader - -### 0.3.0 - 2013-01-07 - -**Upgrading from 0.2.4**: There are no backward incompatible changes to applications -written against the public `nsq.Reader` API. - -However, there *are* a few backward incompatible changes to the API for applications that -directly use other public methods, or properties of a few NSQ data types: - -`nsq.Message` IDs are now a type `nsq.MessageID` (a `[16]byte` array). The signatures of -`nsq.Finish()` and `nsq.Requeue()` reflect this change. - -`nsq.SendCommand()` and `nsq.Frame()` were removed in favor of `nsq.SendFramedResponse()`. - -`nsq.Subscribe()` no longer accepts `shortId` and `longId`. If upgrading your consumers -before upgrading your `nsqd` binaries to `0.2.16-rc.1` they will not be able to send the -optional custom identifiers. - - * #90 performance optimizations - * #81 reader performance improvements / MPUB support - -### 0.2.4 - 2012-10-15 - - * #69 added IsStarved() to reader API - -### 0.2.3 - 2012-10-11 - - * #64 timeouts on reader queries to lookupd - * #54 fix crash issue with reader cleaning up from unexpectedly closed nsqd connections - -### 0.2.2 - 2012-10-09 - - * Initial public release + * #264 moved **go-nsq** to its own [repository](https://github.com/bitly/go-nsq) ## pynsq Python Client Library - * #88 moved **pynsq** to its own [repository](https://github.com/bitly/pynsq). + * #88 moved **pynsq** to its own [repository](https://github.com/bitly/pynsq) diff --git a/README.md b/README.md index 32e46263f..1afc79154 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -

+

* **Source**: [https://github.com/bitly/nsq][github] * **Issues**: [https://github.com/bitly/nsq/issues][issues] @@ -30,13 +30,13 @@ NOTE: master is our *development* branch and may not be stable at all times. ## In Production
- - - - - - - + + + + + + +
## Documentation diff --git a/apps/nsq_pubsub/nsq_pubsub.go b/apps/nsq_pubsub/nsq_pubsub.go index 9909dcef1..274f4dc89 100644 --- a/apps/nsq_pubsub/nsq_pubsub.go +++ b/apps/nsq_pubsub/nsq_pubsub.go @@ -6,7 +6,7 @@ import ( "bufio" "flag" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "io" "log" diff --git a/apps/nsq_tail/nsq_tail.go b/apps/nsq_tail/nsq_tail.go index 6283e8075..26f16007e 100644 --- a/apps/nsq_tail/nsq_tail.go +++ b/apps/nsq_tail/nsq_tail.go @@ -4,7 +4,7 @@ import ( "crypto/tls" "flag" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "log" "math/rand" diff --git a/apps/nsq_to_file/nsq_to_file.go b/apps/nsq_to_file/nsq_to_file.go index f8326557d..b406c3dd3 100644 --- a/apps/nsq_to_file/nsq_to_file.go +++ b/apps/nsq_to_file/nsq_to_file.go @@ -8,7 +8,7 @@ import ( "errors" "flag" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "log" "os" diff --git a/apps/nsq_to_http/http.go b/apps/nsq_to_http/http.go index edcbd200c..6e08bd8c8 100644 --- a/apps/nsq_to_http/http.go +++ b/apps/nsq_to_http/http.go @@ -3,7 +3,7 @@ package main import ( "bytes" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "net/http" "time" diff --git a/apps/nsq_to_http/nsq_to_http.go b/apps/nsq_to_http/nsq_to_http.go index e2f5bcb07..0d266a661 100644 --- a/apps/nsq_to_http/nsq_to_http.go +++ b/apps/nsq_to_http/nsq_to_http.go @@ -10,7 +10,7 @@ import ( "flag" "fmt" "github.com/bitly/go-hostpool" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "log" "math" diff --git a/apps/nsq_to_nsq/nsq_to_nsq.go b/apps/nsq_to_nsq/nsq_to_nsq.go index 1236d3a80..80dea6334 100644 --- a/apps/nsq_to_nsq/nsq_to_nsq.go +++ b/apps/nsq_to_nsq/nsq_to_nsq.go @@ -9,7 +9,7 @@ import ( "flag" "fmt" "github.com/bitly/go-hostpool" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "log" "math" diff --git a/bench/bench_reader/bench_reader.go b/bench/bench_reader/bench_reader.go index fddc51caa..5c6b1efd7 100644 --- a/bench/bench_reader/bench_reader.go +++ b/bench/bench_reader/bench_reader.go @@ -3,7 +3,7 @@ package main import ( "bufio" "flag" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "log" "math" "net" diff --git a/bench/bench_writer/bench_writer.go b/bench/bench_writer/bench_writer.go index 69e84f9d1..eb2ae557a 100644 --- a/bench/bench_writer/bench_writer.go +++ b/bench/bench_writer/bench_writer.go @@ -4,7 +4,7 @@ import ( "bufio" "bytes" "flag" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "log" "net" "runtime" diff --git a/contrib/nsq.spec b/contrib/nsq.spec index b643b19db..de0a6e5e5 100644 --- a/contrib/nsq.spec +++ b/contrib/nsq.spec @@ -1,5 +1,5 @@ %define name nsq -%define version 0.2.19 +%define version 0.2.22 %define release 1 %define path usr/local %define group Database/Applications @@ -41,7 +41,10 @@ make PREFIX=/${path} DESTDIR=$RPM_BUILD_ROOT install /%{path}/bin/nsqadmin /%{path}/bin/nsqd /%{path}/bin/nsqlookupd -/%{path}/bin/nsq_to_file /%{path}/bin/nsq_pubsub +/%{path}/bin/nsq_to_file /%{path}/bin/nsq_to_http +/%{path}/bin/nsq_to_nsq +/%{path}/bin/nsq_tail +/%{path}/bin/nsq_stat /%{path}/share/nsqadmin/templates diff --git a/dist.sh b/dist.sh index c234d38dd..d221a6a8f 100755 --- a/dist.sh +++ b/dist.sh @@ -21,8 +21,9 @@ export GOPATH="$TMPGOPATH:$GOROOT" echo "... getting dependencies" go get -v github.com/bitly/go-simplejson -go get -v github.com/bmizerany/assert go get -v github.com/bitly/go-hostpool +go get -v github.com/bitly/go-nsq +go get -v github.com/bmizerany/assert pushd $TMPGOPATH/src/github.com/bitly/nsq diff --git a/docs/images/bitly_logo.png b/docs/images/bitly_logo.png deleted file mode 100644 index 9ba88410d..000000000 Binary files a/docs/images/bitly_logo.png and /dev/null differ diff --git a/docs/images/energyhub_logo.png b/docs/images/energyhub_logo.png deleted file mode 100644 index c2a4e5bcf..000000000 Binary files a/docs/images/energyhub_logo.png and /dev/null differ diff --git a/docs/images/hailo_logo.png b/docs/images/hailo_logo.png deleted file mode 100644 index 0d6ffd37f..000000000 Binary files a/docs/images/hailo_logo.png and /dev/null differ diff --git a/docs/images/life360_logo.png b/docs/images/life360_logo.png deleted file mode 100644 index ae65f6f4f..000000000 Binary files a/docs/images/life360_logo.png and /dev/null differ diff --git a/docs/images/nsq.png b/docs/images/nsq.png deleted file mode 100644 index c54305da3..000000000 Binary files a/docs/images/nsq.png and /dev/null differ diff --git a/docs/images/path_logo.png b/docs/images/path_logo.png deleted file mode 100644 index 280beaf9d..000000000 Binary files a/docs/images/path_logo.png and /dev/null differ diff --git a/docs/images/simplereach_logo.png b/docs/images/simplereach_logo.png deleted file mode 100644 index 756505d5c..000000000 Binary files a/docs/images/simplereach_logo.png and /dev/null differ diff --git a/docs/images/trendrr_logo.png b/docs/images/trendrr_logo.png deleted file mode 100644 index 0488ce2be..000000000 Binary files a/docs/images/trendrr_logo.png and /dev/null differ diff --git a/nsq/README.md b/nsq/README.md index 5f82cb5c2..0cf0cded5 100644 --- a/nsq/README.md +++ b/nsq/README.md @@ -1,29 +1 @@ -## go-nsq - -`go-nsq` is the official Go package for [NSQ][nsq]. - -It provides the building blocks for developing applications on the [NSQ][nsq] platform in Go. - -Low-level functions and types are provided to communicate over the [NSQ protocol][protocol] as well -as a high-level [Reader][reader] and [Writer][writer] library to implement consumers and producers. - -### Installing - - $ go get github.com/bitly/nsq/nsq - -### Importing - - import "github.com/bitly/nsq/nsq" - -### Docs - -See [godoc][nsq_gopkgdoc] for pretty documentation or: - - # in the nsq package directory - $ go doc - -[nsq]: https://github.com/bitly/nsq -[nsq_gopkgdoc]: http://godoc.org/github.com/bitly/nsq/nsq -[protocol]: http://bitly.github.io/nsq/clients/tcp_protocol_spec.html -[reader]: http://godoc.org/github.com/bitly/nsq/nsq#Reader -[writer]: http://godoc.org/github.com/bitly/nsq/nsq#Writer +**go-nsq** has moved to https://github.com/bitly/go-nsq diff --git a/nsq/command.go b/nsq/command.go deleted file mode 100644 index 4e12a50db..000000000 --- a/nsq/command.go +++ /dev/null @@ -1,204 +0,0 @@ -package nsq - -import ( - "bytes" - "encoding/binary" - "encoding/json" - "fmt" - "io" - "strconv" -) - -var byteSpace = []byte(" ") -var byteNewLine = []byte("\n") - -// Command represents a command from a client to an NSQ daemon -type Command struct { - Name []byte - Params [][]byte - Body []byte -} - -// String returns the name and parameters of the Command -func (c *Command) String() string { - if len(c.Params) > 0 { - return fmt.Sprintf("%s %s", c.Name, string(bytes.Join(c.Params, byteSpace))) - } - return string(c.Name) -} - -// Write serializes the Command to the supplied Writer. -// -// It is suggested that the target Writer is buffered to avoid performing many system calls. -func (c *Command) Write(w io.Writer) error { - _, err := w.Write(c.Name) - if err != nil { - return err - } - - for _, param := range c.Params { - _, err := w.Write(byteSpace) - if err != nil { - return err - } - _, err = w.Write(param) - if err != nil { - return err - } - } - - _, err = w.Write(byteNewLine) - if err != nil { - return err - } - - if c.Body != nil { - bodySize := int32(len(c.Body)) - err := binary.Write(w, binary.BigEndian, &bodySize) - if err != nil { - return err - } - _, err = w.Write(c.Body) - if err != nil { - return err - } - } - - return nil -} - -// Identify creates a new Command to provide information about the client. After connecting, -// it is generally the first message sent. -// -// The supplied map is marshaled into JSON to provide some flexibility -// for this command to evolve over time. -// -// nsqd currently supports the following keys: -// -// short_id - short identifier, typically client's short hosname -// long_id - long identifier, typically client's long hostname -// buffer_size - size in bytes for nsqd to buffer before writing to the wire for this client -// -// nsqlookupd currently supports the following keys: -// -// version - the version of the nsqd peer -// tcp_port - the nsqd port where TCP clients can connect -// http_port - the nsqd port where HTTP clients can connect -// address - the address where clients can connect (generally DNS resolvable hostname) -func Identify(js map[string]interface{}) (*Command, error) { - body, err := json.Marshal(js) - if err != nil { - return nil, err - } - return &Command{[]byte("IDENTIFY"), nil, body}, nil -} - -// Register creates a new Command to add a topic/channel for the connected nsqd -func Register(topic string, channel string) *Command { - params := [][]byte{[]byte(topic)} - if len(channel) > 0 { - params = append(params, []byte(channel)) - } - return &Command{[]byte("REGISTER"), params, nil} -} - -// Unregister creates a new Command to remove a topic/channel for the connected nsqd -func UnRegister(topic string, channel string) *Command { - params := [][]byte{[]byte(topic)} - if len(channel) > 0 { - params = append(params, []byte(channel)) - } - return &Command{[]byte("UNREGISTER"), params, nil} -} - -// Ping creates a new Command to keep-alive the state of all the -// announced topic/channels for a given client -func Ping() *Command { - return &Command{[]byte("PING"), nil, nil} -} - -// Publish creates a new Command to write a message to a given topic -func Publish(topic string, body []byte) *Command { - var params = [][]byte{[]byte(topic)} - return &Command{[]byte("PUB"), params, body} -} - -// MultiPublish creates a new Command to write more than one message to a given topic. -// This is useful for high-throughput situations to avoid roundtrips and saturate the pipe. -func MultiPublish(topic string, bodies [][]byte) (*Command, error) { - var params = [][]byte{[]byte(topic)} - - num := uint32(len(bodies)) - bodySize := 4 - for _, b := range bodies { - bodySize += len(b) + 4 - } - body := make([]byte, 0, bodySize) - buf := bytes.NewBuffer(body) - - err := binary.Write(buf, binary.BigEndian, &num) - if err != nil { - return nil, err - } - for _, b := range bodies { - err = binary.Write(buf, binary.BigEndian, int32(len(b))) - if err != nil { - return nil, err - } - _, err = buf.Write(b) - if err != nil { - return nil, err - } - } - - return &Command{[]byte("MPUB"), params, buf.Bytes()}, nil -} - -// Subscribe creates a new Command to subscribe to the given topic/channel -func Subscribe(topic string, channel string) *Command { - var params = [][]byte{[]byte(topic), []byte(channel)} - return &Command{[]byte("SUB"), params, nil} -} - -// Ready creates a new Command to specify -// the number of messages a client is willing to receive -func Ready(count int) *Command { - var params = [][]byte{[]byte(strconv.Itoa(count))} - return &Command{[]byte("RDY"), params, nil} -} - -// Finish creates a new Command to indiciate that -// a given message (by id) has been processed successfully -func Finish(id MessageID) *Command { - var params = [][]byte{id[:]} - return &Command{[]byte("FIN"), params, nil} -} - -// Requeue creates a new Command to indicate that -// a given message (by id) should be requeued after the given timeout (in ms) -// NOTE: a timeout of 0 indicates immediate requeue -func Requeue(id MessageID, timeoutMs int) *Command { - var params = [][]byte{id[:], []byte(strconv.Itoa(timeoutMs))} - return &Command{[]byte("REQ"), params, nil} -} - -// Touch creates a new Command to reset the timeout for -// a given message (by id) -func Touch(id MessageID) *Command { - var params = [][]byte{id[:]} - return &Command{[]byte("TOUCH"), params, nil} -} - -// StartClose creates a new Command to indicate that the -// client would like to start a close cycle. nsqd will no longer -// send messages to a client in this state and the client is expected -// finish pending messages and close the connection -func StartClose() *Command { - return &Command{[]byte("CLS"), nil, nil} -} - -// Nop creates a new Command that has no effect server side. -// Commonly used to respond to heartbeats -func Nop() *Command { - return &Command{[]byte("NOP"), nil, nil} -} diff --git a/nsq/command_test.go b/nsq/command_test.go deleted file mode 100644 index b7070db90..000000000 --- a/nsq/command_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package nsq - -import ( - "bytes" - "io/ioutil" - "log" - "os" - "testing" -) - -func BenchmarkCommand(b *testing.B) { - b.StopTimer() - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - data := make([]byte, 2048) - cmd := Publish("test", data) - var buf bytes.Buffer - b.StartTimer() - - for i := 0; i < b.N; i++ { - cmd.Write(&buf) - } -} diff --git a/nsq/message.go b/nsq/message.go deleted file mode 100644 index 72aff4f87..000000000 --- a/nsq/message.go +++ /dev/null @@ -1,105 +0,0 @@ -package nsq - -import ( - "bytes" - "encoding/binary" - "io" - "io/ioutil" - "time" -) - -// The number of bytes for a Message.Id -const MsgIdLength = 16 - -type MessageID [MsgIdLength]byte - -// Message is the fundamental data type containing -// the id, body, and metadata -type Message struct { - Id MessageID - Body []byte - Timestamp int64 - Attempts uint16 -} - -// NewMessage creates a Message, initializes some metadata, -// and returns a pointer -func NewMessage(id MessageID, body []byte) *Message { - return &Message{ - Id: id, - Body: body, - Timestamp: time.Now().UnixNano(), - } -} - -// EncodeBytes serializes the message into a new, returned, []byte -func (m *Message) EncodeBytes() ([]byte, error) { - var buf bytes.Buffer - err := m.Write(&buf) - if err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -// Write serializes the message into the supplied writer. -// -// It is suggested that the target Writer is buffered to avoid performing many system calls. -func (m *Message) Write(w io.Writer) error { - err := binary.Write(w, binary.BigEndian, &m.Timestamp) - if err != nil { - return err - } - - err = binary.Write(w, binary.BigEndian, &m.Attempts) - if err != nil { - return err - } - - _, err = w.Write(m.Id[:]) - if err != nil { - return err - } - - _, err = w.Write(m.Body) - if err != nil { - return err - } - - return nil -} - -// DecodeMessage deseralizes data (as []byte) and creates a new Message -func DecodeMessage(byteBuf []byte) (*Message, error) { - var timestamp int64 - var attempts uint16 - var msg Message - - buf := bytes.NewBuffer(byteBuf) - - err := binary.Read(buf, binary.BigEndian, ×tamp) - if err != nil { - return nil, err - } - - err = binary.Read(buf, binary.BigEndian, &attempts) - if err != nil { - return nil, err - } - - _, err = io.ReadFull(buf, msg.Id[:]) - if err != nil { - return nil, err - } - - body, err := ioutil.ReadAll(buf) - if err != nil { - return nil, err - } - - msg.Body = body - msg.Timestamp = timestamp - msg.Attempts = attempts - - return &msg, nil -} diff --git a/nsq/protocol.go b/nsq/protocol.go deleted file mode 100644 index 3da043c21..000000000 --- a/nsq/protocol.go +++ /dev/null @@ -1,132 +0,0 @@ -package nsq - -import ( - "encoding/binary" - "errors" - "io" - "net" - "regexp" - "time" -) - -var MagicV1 = []byte(" V1") -var MagicV2 = []byte(" V2") - -const ( - // when successful - FrameTypeResponse int32 = 0 - // when an error occurred - FrameTypeError int32 = 1 - // when it's a serialized message - FrameTypeMessage int32 = 2 -) - -// The amount of time nsqd will allow a client to idle, can be overriden -const DefaultClientTimeout = 60 * time.Second - -var validTopicNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+$`) -var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`) - -// IsValidTopicName checks a topic name for correctness -func IsValidTopicName(name string) bool { - if len(name) > 32 || len(name) < 1 { - return false - } - return validTopicNameRegex.MatchString(name) -} - -// IsValidChannelName checks a channel name for correctness -func IsValidChannelName(name string) bool { - if len(name) > 32 || len(name) < 1 { - return false - } - return validChannelNameRegex.MatchString(name) -} - -// Protocol describes the basic behavior of any protocol in the system -type Protocol interface { - IOLoop(conn net.Conn) error -} - -// SendResponse is a server side utility function to prefix data with a length header -// and write to the supplied Writer -func SendResponse(w io.Writer, data []byte) (int, error) { - err := binary.Write(w, binary.BigEndian, int32(len(data))) - if err != nil { - return 0, err - } - - n, err := w.Write(data) - if err != nil { - return 0, err - } - - return (n + 4), nil -} - -// SendFramedResponse is a server side utility function to prefix data with a length header -// and frame header and write to the supplied Writer -func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error) { - beBuf := make([]byte, 4) - size := uint32(len(data)) + 4 - - binary.BigEndian.PutUint32(beBuf, size) - n, err := w.Write(beBuf) - if err != nil { - return n, err - } - - binary.BigEndian.PutUint32(beBuf, uint32(frameType)) - n, err = w.Write(beBuf) - if err != nil { - return n + 4, err - } - - n, err = w.Write(data) - return n + 8, err -} - -// ReadResponse is a client-side utility function to read from the supplied Reader -// according to the NSQ protocol spec: -// -// [x][x][x][x][x][x][x][x]... -// | (int32) || (binary) -// | 4-byte || N-byte -// ------------------------... -// size data -func ReadResponse(r io.Reader) ([]byte, error) { - var msgSize int32 - - // message size - err := binary.Read(r, binary.BigEndian, &msgSize) - if err != nil { - return nil, err - } - - // message binary data - buf := make([]byte, msgSize) - _, err = io.ReadFull(r, buf) - if err != nil { - return nil, err - } - - return buf, nil -} - -// UnpackResponse is a client-side utility function that unpacks serialized data -// according to NSQ protocol spec: -// -// [x][x][x][x][x][x][x][x]... -// | (int32) || (binary) -// | 4-byte || N-byte -// ------------------------... -// frame ID data -// -// Returns a triplicate of: frame type, data ([]byte), error -func UnpackResponse(response []byte) (int32, []byte, error) { - if len(response) < 4 { - return -1, nil, errors.New("length of response is too small") - } - - return int32(binary.BigEndian.Uint32(response)), response[4:], nil -} diff --git a/nsq/reader.go b/nsq/reader.go deleted file mode 100644 index d2c18b4ef..000000000 --- a/nsq/reader.go +++ /dev/null @@ -1,1092 +0,0 @@ -package nsq - -import ( - "bufio" - "bytes" - "crypto/tls" - "encoding/json" - "errors" - "fmt" - "io" - "log" - "math" - "math/rand" - "net" - "net/url" - "os" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" -) - -// returned from ConnectToNSQ() when already connected -var ErrAlreadyConnected = errors.New("already connected") - -// return from updateRdy if over max-in-flight -var ErrOverMaxInFlight = errors.New("over configure max-inflight") - -// Handler is the synchronous interface to Reader. -// -// Implement this interface for handlers that return whether or not message -// processing completed successfully. -// -// When the return value is nil Reader will automatically handle FINishing. -// -// When the returned value is non-nil Reader will automatically handle REQueing. -type Handler interface { - HandleMessage(message *Message) error -} - -// AsyncHandler is the asynchronous interface to Reader. -// -// Implement this interface for handlers that wish to defer responding until later. -// This is particularly useful if you want to batch work together. -// -// An AsyncHandler must send: -// -// &FinishedMessage{messageID, requeueDelay, true|false} -// -// To the supplied responseChannel to indicate that a message is processed. -type AsyncHandler interface { - HandleMessage(message *Message, responseChannel chan *FinishedMessage) -} - -// FinishedMessage is the data type used over responseChannel in AsyncHandlers -type FinishedMessage struct { - Id MessageID - RequeueDelayMs int - Success bool -} - -// FailedMessageLogger is an interface that can be implemented by handlers that wish -// to receive a callback when a message is deemed "failed" (i.e. the number of attempts -// exceeded the Reader specified MaxAttemptCount) -type FailedMessageLogger interface { - LogFailedMessage(message *Message) -} - -type incomingMessage struct { - *Message - responseChannel chan *FinishedMessage -} - -type nsqConn struct { - // 64bit atomic vars need to be first for proper alignment on 32bit platforms - messagesInFlight int64 - messagesReceived uint64 - messagesFinished uint64 - messagesRequeued uint64 - maxRdyCount int64 - rdyCount int64 - lastRdyCount int64 - lastMsgTimestamp int64 - - sync.Mutex - net.Conn - - tlsConn *tls.Conn - r io.Reader - w io.Writer - addr string - stopFlag int32 - finishedMessages chan *FinishedMessage - readTimeout time.Duration - writeTimeout time.Duration - stopper sync.Once - dying chan int - drainReady chan int - readyChan chan int - exitChan chan int - backoffCounter int32 - rdyRetryTimer *time.Timer -} - -func newNSQConn(addr string, readTimeout time.Duration, writeTimeout time.Duration) (*nsqConn, error) { - conn, err := net.DialTimeout("tcp", addr, time.Second) - if err != nil { - return nil, err - } - - nc := &nsqConn{ - Conn: conn, - r: bufio.NewReader(conn), - w: conn, - addr: addr, - finishedMessages: make(chan *FinishedMessage), - readTimeout: readTimeout, - writeTimeout: writeTimeout, - dying: make(chan int, 1), - drainReady: make(chan int), - readyChan: make(chan int, 1), - exitChan: make(chan int), - maxRdyCount: 2500, - lastMsgTimestamp: time.Now().UnixNano(), - } - - _, err = nc.Write(MagicV2) - if err != nil { - nc.Close() - return nil, fmt.Errorf("[%s] failed to write magic - %s", addr, err.Error()) - } - - return nc, nil -} - -func (c *nsqConn) String() string { - return c.addr -} - -func (c *nsqConn) Read(p []byte) (int, error) { - c.SetReadDeadline(time.Now().Add(c.readTimeout)) - return c.r.Read(p) -} - -func (c *nsqConn) Write(p []byte) (int, error) { - c.SetWriteDeadline(time.Now().Add(c.writeTimeout)) - return c.w.Write(p) -} - -func (c *nsqConn) sendCommand(buf *bytes.Buffer, cmd *Command) error { - c.Lock() - defer c.Unlock() - - buf.Reset() - err := cmd.Write(buf) - if err != nil { - return err - } - - _, err = buf.WriteTo(c) - if err != nil { - return err - } - - return nil -} - -func (c *nsqConn) readUnpackedResponse() (int32, []byte, error) { - resp, err := ReadResponse(c) - if err != nil { - return -1, nil, err - } - return UnpackResponse(resp) -} - -func (c *nsqConn) upgradeTLS(conf *tls.Config) error { - c.tlsConn = tls.Client(c.Conn, conf) - err := c.tlsConn.Handshake() - if err != nil { - return err - } - c.r = bufio.NewReader(c.tlsConn) - c.w = c.tlsConn - frameType, data, err := c.readUnpackedResponse() - if err != nil { - return err - } - if frameType != FrameTypeResponse || !bytes.Equal(data, []byte("OK")) { - return errors.New("invalid response from TLS upgrade") - } - return nil -} - -func (c *nsqConn) tryUpdateRDY() { - select { - case c.readyChan <- 1: - default: - } -} - -// Reader is a high-level type to consume from NSQ. -// -// A Reader instance is supplied handler(s) that will be executed -// concurrently via goroutines to handle processing the stream of messages -// consumed from the specified topic/channel. See: AsyncHandler and Handler -// for details on implementing those interfaces to create handlers. -// -// If configured, it will poll nsqlookupd instances and handle connection (and -// reconnection) to any discovered nsqds. -type Reader struct { - // 64bit atomic vars need to be first for proper alignment on 32bit platforms - MessagesReceived uint64 // an atomic counter - # of messages received - MessagesFinished uint64 // an atomic counter - # of messages FINished - MessagesRequeued uint64 // an atomic counter - # of messages REQueued - totalRdyCount int64 - messagesInFlight int64 - - sync.RWMutex - - TopicName string // name of topic to subscribe to - ChannelName string // name of channel to subscribe to - LookupdPollInterval time.Duration // duration between polling lookupd for new connections - LookupdPollJitter float64 // Maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time. - MaxAttemptCount uint16 // maximum number of times this reader will attempt to process a message - DefaultRequeueDelay time.Duration // the default duration when REQueueing - MaxRequeueDelay time.Duration // the maximum duration when REQueueing (for doubling backoff) - LowRdyIdleTimeout time.Duration // the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers) - VerboseLogging bool // enable verbose logging - ShortIdentifier string // an identifier to send to nsqd when connecting (defaults: short hostname) - LongIdentifier string // an identifier to send to nsqd when connecting (defaults: long hostname) - ReadTimeout time.Duration // the deadline set for network reads - WriteTimeout time.Duration // the deadline set for network writes - ExitChan chan int // read from this channel to block your main loop - TLSv1 bool // negotiate enabling TLS - TLSConfig *tls.Config // client TLS configuration - - // internal variables - redistributeOnce sync.Once - maxBackoffDuration time.Duration - maxBackoffCount int32 - maxInFlight int - incomingMessages chan *incomingMessage - pendingConnections map[string]bool - nsqConnections map[string]*nsqConn - lookupdExitChan chan int - lookupdRecheckChan chan int - stopFlag int32 - runningHandlers int32 - lookupdHTTPAddrs []string - stopHandler sync.Once - lookupdQueryIndex int -} - -// NewReader creates a new instance of Reader for the specified topic/channel -// -// The returned Reader instance is setup with sane default values. To modify -// configuration, update the values on the returned instance before connecting. -func NewReader(topic string, channel string) (*Reader, error) { - if !IsValidTopicName(topic) { - return nil, errors.New("invalid topic name") - } - - if !IsValidChannelName(channel) { - return nil, errors.New("invalid channel name") - } - - hostname, err := os.Hostname() - if err != nil { - log.Fatalf("ERROR: unable to get hostname %s", err.Error()) - } - q := &Reader{ - TopicName: topic, - ChannelName: channel, - incomingMessages: make(chan *incomingMessage), - ExitChan: make(chan int), - pendingConnections: make(map[string]bool), - nsqConnections: make(map[string]*nsqConn), - MaxAttemptCount: 5, - LookupdPollInterval: 60 * time.Second, - LookupdPollJitter: 0.3, - LowRdyIdleTimeout: 10 * time.Second, - lookupdExitChan: make(chan int), - lookupdRecheckChan: make(chan int, 1), // used at connection close to force a possible reconnect - DefaultRequeueDelay: 90 * time.Second, - MaxRequeueDelay: 15 * time.Minute, - ShortIdentifier: strings.Split(hostname, ".")[0], - LongIdentifier: hostname, - ReadTimeout: DefaultClientTimeout, - WriteTimeout: time.Second, - maxInFlight: 1, - } - q.SetMaxBackoffDuration(120 * time.Second) - return q, nil -} - -// ConnectionMaxInFlight calculates the per-connection max-in-flight count. -// -// This may change dynamically based on the number of connections to nsqd the Reader -// is responsible for. -func (q *Reader) ConnectionMaxInFlight() int64 { - q.RLock() - defer q.RUnlock() - - b := float64(q.maxInFlight) - s := b / float64(len(q.nsqConnections)) - return int64(math.Min(math.Max(1, s), b)) -} - -// IsStarved indicates whether any connections for this reader are blocked on processing -// before being able to receive more messages (ie. RDY count of 0 and not exiting) -func (q *Reader) IsStarved() bool { - q.RLock() - defer q.RUnlock() - - for _, conn := range q.nsqConnections { - threshold := int64(float64(atomic.LoadInt64(&conn.lastRdyCount)) * 0.85) - inFlight := atomic.LoadInt64(&conn.messagesInFlight) - if inFlight >= threshold && inFlight > 0 && atomic.LoadInt32(&conn.stopFlag) != 1 { - return true - } - } - return false -} - -// SetMaxInFlight sets the maximum number of messages this reader instance -// will allow in-flight. -// -// If already connected, it updates the reader RDY state for each connection. -func (q *Reader) SetMaxInFlight(maxInFlight int) { - if atomic.LoadInt32(&q.stopFlag) == 1 { - return - } - - if q.maxInFlight == maxInFlight { - return - } - q.maxInFlight = maxInFlight - - q.RLock() - defer q.RUnlock() - - for _, c := range q.nsqConnections { - c.tryUpdateRDY() - } -} - -// SetMaxBackoffDuration sets the maximum duration a connection will backoff from message processing -func (q *Reader) SetMaxBackoffDuration(duration time.Duration) { - q.maxBackoffDuration = duration - q.maxBackoffCount = int32(math.Max(1, math.Ceil(math.Log2(duration.Seconds())))) -} - -// MaxInFlight returns the configured maximum number of messages to allow in-flight. -func (q *Reader) MaxInFlight() int { - return q.maxInFlight -} - -// ConnectToLookupd adds a nsqlookupd address to the list for this Reader instance. -// -// If it is the first to be added, it initiates an HTTP request to discover nsqd -// producers for the configured topic. -// -// A goroutine is spawned to handle continual polling. -func (q *Reader) ConnectToLookupd(addr string) error { - // make a HTTP req to the lookupd, and ask it for endpoints that have the - // topic we are interested in. - // this is a go loop that fires every x seconds - for _, x := range q.lookupdHTTPAddrs { - if x == addr { - return errors.New("lookupd address already exists") - } - } - q.lookupdHTTPAddrs = append(q.lookupdHTTPAddrs, addr) - - // if this is the first one, kick off the go loop - if len(q.lookupdHTTPAddrs) == 1 { - q.queryLookupd() - go q.lookupdLoop() - } - - return nil -} - -// poll all known lookup servers every LookupdPollInterval -func (q *Reader) lookupdLoop() { - // add some jitter so that multiple consumers discovering the same topic, - // when restarted at the same time, dont all connect at once. - rand.Seed(time.Now().UnixNano()) - - jitter := time.Duration(int64(rand.Float64() * q.LookupdPollJitter * float64(q.LookupdPollInterval))) - ticker := time.NewTicker(q.LookupdPollInterval) - - select { - case <-time.After(jitter): - case <-q.lookupdExitChan: - goto exit - } - - for { - select { - case <-ticker.C: - q.queryLookupd() - case <-q.lookupdRecheckChan: - q.queryLookupd() - case <-q.lookupdExitChan: - goto exit - } - } - -exit: - ticker.Stop() - log.Printf("exiting lookupdLoop") -} - -// make a HTTP req to the /lookup endpoint on one lookup server -// to find what nsq's provide the topic we are consuming. -// for any new topics, initiate a connection to those NSQ's -func (q *Reader) queryLookupd() { - i := q.lookupdQueryIndex - - if i >= len(q.lookupdHTTPAddrs) { - i = 0 - } - q.lookupdQueryIndex += 1 - - addr := q.lookupdHTTPAddrs[i] - endpoint := fmt.Sprintf("http://%s/lookup?topic=%s", addr, url.QueryEscape(q.TopicName)) - - log.Printf("LOOKUPD: querying %s", endpoint) - - data, err := ApiRequest(endpoint) - if err != nil { - log.Printf("ERROR: lookupd %s - %s", addr, err.Error()) - return - } - - // {"data":{"channels":[],"producers":[{"address":"jehiah-air.local", "tpc_port":4150, "http_port":4151}],"timestamp":1340152173},"status_code":200,"status_txt":"OK"} - producers, _ := data.Get("producers").Array() - for _, producer := range producers { - producerData, _ := producer.(map[string]interface{}) - address := producerData["address"].(string) - broadcastAddress, ok := producerData["broadcast_address"] - if ok { - address = broadcastAddress.(string) - } - port := int(producerData["tcp_port"].(float64)) - - // make an address, start a connection - joined := net.JoinHostPort(address, strconv.Itoa(port)) - err = q.ConnectToNSQ(joined) - if err != nil && err != ErrAlreadyConnected { - log.Printf("ERROR: failed to connect to nsqd (%s) - %s", joined, err.Error()) - continue - } - } -} - -// ConnectToNSQ takes a nsqd address to connect directly to. -// -// It is recommended to use ConnectToLookupd so that topics are discovered -// automatically. This method is useful when you want to connect to a single, local, -// instance. -func (q *Reader) ConnectToNSQ(addr string) error { - var buf bytes.Buffer - - if atomic.LoadInt32(&q.stopFlag) == 1 { - return errors.New("reader stopped") - } - - if atomic.LoadInt32(&q.runningHandlers) == 0 { - return errors.New("no handlers") - } - - q.RLock() - _, ok := q.nsqConnections[addr] - _, pendingOk := q.pendingConnections[addr] - if ok || pendingOk { - q.RUnlock() - return ErrAlreadyConnected - } - q.RUnlock() - - q.redistributeOnce.Do(func() { go q.redistributeRdyState() }) - - log.Printf("[%s] connecting to nsqd", addr) - - connection, err := newNSQConn(addr, q.ReadTimeout, q.WriteTimeout) - if err != nil { - return err - } - cleanupConnection := func() { - q.Lock() - delete(q.pendingConnections, addr) - q.Unlock() - connection.Close() - } - q.pendingConnections[addr] = true - - ci := make(map[string]interface{}) - ci["short_id"] = q.ShortIdentifier - ci["long_id"] = q.LongIdentifier - ci["tls_v1"] = q.TLSv1 - ci["feature_negotiation"] = true - cmd, err := Identify(ci) - if err != nil { - cleanupConnection() - return fmt.Errorf("[%s] failed to create identify command - %s", connection, err.Error()) - } - - err = connection.sendCommand(&buf, cmd) - if err != nil { - cleanupConnection() - return fmt.Errorf("[%s] failed to identify - %s", connection, err.Error()) - } - - _, data, err := connection.readUnpackedResponse() - if err != nil { - cleanupConnection() - return fmt.Errorf("[%s] error reading response %s", connection, err.Error()) - } - - // check to see if the server was able to respond w/ capabilities - if data[0] == '{' { - resp := struct { - MaxRdyCount int64 `json:"max_rdy_count"` - TLSv1 bool `json:"tls_v1"` - }{} - err := json.Unmarshal(data, &resp) - if err != nil { - cleanupConnection() - return fmt.Errorf("[%s] error (%s) unmarshaling IDENTIFY response %s", connection, err.Error(), data) - } - - log.Printf("[%s] IDENTIFY response: %+v", connection, resp) - - connection.maxRdyCount = resp.MaxRdyCount - if resp.MaxRdyCount < int64(q.maxInFlight) { - log.Printf("[%s] max RDY count %d < reader max in flight %d, truncation possible", - connection, resp.MaxRdyCount, q.maxInFlight) - } - - if resp.TLSv1 { - log.Printf("[%s] upgrading to TLS", connection) - err := connection.upgradeTLS(q.TLSConfig) - if err != nil { - cleanupConnection() - return fmt.Errorf("[%s] error (%s) upgrading to TLS", connection, err.Error()) - } - } - } - - cmd = Subscribe(q.TopicName, q.ChannelName) - err = connection.sendCommand(&buf, cmd) - if err != nil { - cleanupConnection() - return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s", connection, q.TopicName, q.ChannelName, err.Error()) - } - - q.Lock() - delete(q.pendingConnections, addr) - q.nsqConnections[connection.String()] = connection - // signal to existing connections to lower their RDY count - for _, c := range q.nsqConnections { - c.tryUpdateRDY() - } - q.Unlock() - - go q.readLoop(connection) - go q.finishLoop(connection) - go q.rdyLoop(connection) - - // prime RDY state - connection.tryUpdateRDY() - - return nil -} - -func handleError(q *Reader, c *nsqConn, errMsg string) { - log.Printf(errMsg) - atomic.StoreInt32(&c.stopFlag, 1) - - if len(q.lookupdHTTPAddrs) == 0 { - go func(addr string) { - for { - log.Printf("[%s] re-connecting in 15 seconds...", addr) - time.Sleep(15 * time.Second) - if atomic.LoadInt32(&q.stopFlag) == 1 { - break - } - err := q.ConnectToNSQ(addr) - if err != nil { - log.Printf("ERROR: failed to connect to %s - %s", addr, err.Error()) - continue - } - break - } - }(c.RemoteAddr().String()) - } -} - -func (q *Reader) readLoop(c *nsqConn) { - for { - if atomic.LoadInt32(&c.stopFlag) == 1 || atomic.LoadInt32(&q.stopFlag) == 1 { - // start the connection close - if atomic.LoadInt64(&c.messagesInFlight) == 0 { - q.stopFinishLoop(c) - } else { - log.Printf("[%s] delaying close of FinishedMesages channel; %d outstanding messages", c, c.messagesInFlight) - } - goto exit - } - - frameType, data, err := c.readUnpackedResponse() - if err != nil { - handleError(q, c, fmt.Sprintf("[%s] error (%s) reading response %d %s", c, err.Error(), frameType, data)) - continue - } - - switch frameType { - case FrameTypeMessage: - msg, err := DecodeMessage(data) - if err != nil { - handleError(q, c, fmt.Sprintf("[%s] error (%s) decoding message %s", c, err.Error(), data)) - continue - } - - remain := atomic.AddInt64(&c.rdyCount, -1) - atomic.AddInt64(&q.totalRdyCount, -1) - atomic.AddUint64(&c.messagesReceived, 1) - atomic.AddUint64(&q.MessagesReceived, 1) - atomic.AddInt64(&c.messagesInFlight, 1) - atomic.AddInt64(&q.messagesInFlight, 1) - atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano()) - - if q.VerboseLogging { - log.Printf("[%s] (remain %d) FrameTypeMessage: %s - %s", c, remain, msg.Id, msg.Body) - } - - q.incomingMessages <- &incomingMessage{msg, c.finishedMessages} - - c.tryUpdateRDY() - case FrameTypeResponse: - switch { - case bytes.Equal(data, []byte("CLOSE_WAIT")): - // server is ready for us to close (it ack'd our StartClose) - // we can assume we will not receive any more messages over this channel - // (but we can still write back responses) - log.Printf("[%s] received ACK from nsqd - now in CLOSE_WAIT", c) - atomic.StoreInt32(&c.stopFlag, 1) - case bytes.Equal(data, []byte("_heartbeat_")): - var buf bytes.Buffer - log.Printf("[%s] heartbeat received", c) - err := c.sendCommand(&buf, Nop()) - if err != nil { - handleError(q, c, fmt.Sprintf("[%s] error sending NOP - %s", c, err.Error())) - goto exit - } - } - case FrameTypeError: - log.Printf("[%s] error from nsqd %s", c, data) - default: - log.Printf("[%s] unknown message type %d", c, frameType) - } - } - -exit: - log.Printf("[%s] readLoop exiting", c) -} - -func (q *Reader) finishLoop(c *nsqConn) { - var buf bytes.Buffer - var backoffCounter int32 - var backoffUpdated bool - var backoffDeadline time.Time - - for { - select { - case <-c.dying: - log.Printf("[%s] breaking out of finish loop ", c) - // Indicate drainReady because we will not pull any more off finishedMessages - c.drainReady <- 1 - goto exit - case msg := <-c.finishedMessages: - // Decrement this here so it is correct even if we can't respond to nsqd - atomic.AddInt64(&q.messagesInFlight, -1) - atomic.AddInt64(&c.messagesInFlight, -1) - now := time.Now() - - if msg.Success { - if q.VerboseLogging { - log.Printf("[%s] finishing %s", c, msg.Id) - } - - err := c.sendCommand(&buf, Finish(msg.Id)) - if err != nil { - log.Printf("[%s] error finishing %s - %s", c, msg.Id, err.Error()) - q.stopFinishLoop(c) - continue - } - - atomic.AddUint64(&c.messagesFinished, 1) - atomic.AddUint64(&q.MessagesFinished, 1) - - if backoffCounter > 0 && now.After(backoffDeadline) { - backoffCounter-- - backoffUpdated = true - } - } else { - if q.VerboseLogging { - log.Printf("[%s] requeuing %s", c, msg.Id) - } - - err := c.sendCommand(&buf, Requeue(msg.Id, msg.RequeueDelayMs)) - if err != nil { - log.Printf("[%s] error requeueing %s - %s", c, msg.Id, err.Error()) - q.stopFinishLoop(c) - continue - } - - atomic.AddUint64(&c.messagesRequeued, 1) - atomic.AddUint64(&q.MessagesRequeued, 1) - - if backoffCounter < q.maxBackoffCount && now.After(backoffDeadline) { - backoffCounter++ - backoffUpdated = true - } - } - - atomic.StoreInt32(&c.backoffCounter, backoffCounter) - // prevent many async failures/successes from immediately resulting in - // max backoff/normal rate (by ensuring that we dont continually incr/decr - // the counter during a backoff period) - if backoffCounter > 0 && backoffUpdated { - backoffDuration := q.backoffDuration(backoffCounter) - backoffDeadline = now.Add(backoffDuration) - - // going into backoff. set all RDY to 0 immediately - q.RLock() - for _, c := range q.nsqConnections { - if q.VerboseLogging { - log.Printf("[%s] in backoff. sending RDY 0", c) - } - q.updateRDY(c, 0) - - // prime the rdy loop so it starts waiting on the backoff timer (if not already doing so) - c.tryUpdateRDY() - } - q.RUnlock() - } - - if atomic.LoadInt64(&c.messagesInFlight) == 0 && - (atomic.LoadInt32(&c.stopFlag) == 1 || atomic.LoadInt32(&q.stopFlag) == 1) { - q.stopFinishLoop(c) - continue - } - } - } - -exit: - log.Printf("[%s] finishLoop exiting", c) -} - -func (q *Reader) stopFinishLoop(c *nsqConn) { - c.stopper.Do(func() { - log.Printf("[%s] beginning stopFinishLoop logic", c) - // This doesn't block because dying has buffer of 1 - c.dying <- 1 - - // Drain the finishedMessages channel - go func() { - <-c.drainReady - for atomic.AddInt64(&c.messagesInFlight, -1) >= 0 { - <-c.finishedMessages - } - }() - close(c.exitChan) - c.Close() - - atomic.AddInt64(&q.totalRdyCount, atomic.LoadInt64(&c.rdyCount)*-1) - - q.Lock() - delete(q.nsqConnections, c.String()) - left := len(q.nsqConnections) - q.Unlock() - - log.Printf("there are %d connections left alive", left) - - // ie: we were the last one, and stopping - if left == 0 && atomic.LoadInt32(&q.stopFlag) == 1 { - q.stopHandlers() - } - - if len(q.lookupdHTTPAddrs) != 0 && atomic.LoadInt32(&q.stopFlag) == 0 { - // trigger a poll of the lookupd - select { - case q.lookupdRecheckChan <- 1: - default: - } - } - }) -} - -func (q *Reader) backoffDuration(count int32) time.Duration { - backoffDuration := time.Second * time.Duration(math.Pow(2, float64(count))) - if backoffDuration > q.maxBackoffDuration { - backoffDuration = q.maxBackoffDuration - } - return backoffDuration -} - -func (q *Reader) rdyLoop(c *nsqConn) { - readyChan := c.readyChan - var backoffTimer *time.Timer - var backoffTimerChan <-chan time.Time - - for { - select { - case <-backoffTimerChan: - log.Printf("[%s] backoff time expired, continuing with RDY 1...", c) - // while in backoff only ever let 1 message at a time through - q.updateRDY(c, 1) - readyChan = c.readyChan - backoffTimer = nil - case <-readyChan: - backoffCounter := atomic.LoadInt32(&c.backoffCounter) - - if backoffCounter != 0 && q.maxBackoffDuration != 0 { - if backoffTimer != nil { - // dont overwrite an existing backoff timer - continue - } - backoffDuration := q.backoffDuration(backoffCounter) - backoffTimer = time.NewTimer(backoffDuration) - backoffTimerChan = backoffTimer.C - readyChan = nil - log.Printf("[%s] backing off for %.02f seconds (backoff level %d)", c, backoffDuration.Seconds(), backoffCounter) - continue - } - - // send ready immediately - remain := atomic.LoadInt64(&c.rdyCount) - lastRdyCount := atomic.LoadInt64(&c.lastRdyCount) - count := q.ConnectionMaxInFlight() - // refill when at 1, or at 25%, or if connections have changed and we have too many RDY - if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) { - if q.VerboseLogging { - log.Printf("[%s] sending RDY %d (%d remain from last RDY %d)", c, count, remain, lastRdyCount) - } - q.updateRDY(c, count) - } else { - if q.VerboseLogging { - log.Printf("[%s] skip sending RDY %d (%d remain out of last RDY %d)", c, count, remain, lastRdyCount) - } - } - case <-c.exitChan: - if backoffTimer != nil { - backoffTimer.Stop() - } - goto exit - } - } - -exit: - log.Printf("[%s] rdyLoop exiting", c) -} - -func (q *Reader) updateRDY(c *nsqConn, count int64) error { - if atomic.LoadInt32(&c.stopFlag) != 0 { - return nil - } - - // never exceed the nsqd's configured max RDY count - if count > c.maxRdyCount { - count = c.maxRdyCount - } - - // stop any pending retry of an old RDY update - if c.rdyRetryTimer != nil { - c.rdyRetryTimer.Stop() - c.rdyRetryTimer = nil - } - - // never exceed our global max in flight. truncate if possible. - // this could help a new connection get partial max-in-flight - maxPossibleRdy := int64(q.maxInFlight) - atomic.LoadInt64(&q.totalRdyCount) + atomic.LoadInt64(&c.rdyCount) - if maxPossibleRdy > 0 && maxPossibleRdy < count { - count = maxPossibleRdy - } - if maxPossibleRdy <= 0 && count > 0 { - if atomic.LoadInt64(&c.rdyCount) == 0 { - // we wanted a to exit a zero RDY count but we couldn't send it... - // in order to prevent eternal starvation we reschedule this attempt - // (if any other RDY update succeeds this timer will be stopped) - c.rdyRetryTimer = time.AfterFunc(5*time.Second, func() { - q.updateRDY(c, count) - }) - } - return ErrOverMaxInFlight - } - - return q.sendRDY(c, count) -} - -func (q *Reader) sendRDY(c *nsqConn, count int64) error { - var buf bytes.Buffer - - if count == 0 && atomic.LoadInt64(&c.lastRdyCount) == 0 { - // no need to send. It's already that RDY count - return nil - } - - atomic.AddInt64(&q.totalRdyCount, atomic.LoadInt64(&c.rdyCount)*-1+count) - atomic.StoreInt64(&c.rdyCount, count) - atomic.StoreInt64(&c.lastRdyCount, count) - err := c.sendCommand(&buf, Ready(int(count))) - if err != nil { - handleError(q, c, fmt.Sprintf("[%s] error sending RDY %d - %s", c, count, err.Error())) - return err - } - return nil -} - -func (q *Reader) redistributeRdyState() { - for { - time.Sleep(5 * time.Second) - - q.RLock() - l := len(q.nsqConnections) - q.RUnlock() - if l <= q.maxInFlight { - continue - } - - log.Printf("redistributing ready state (%d conns > %d max_in_flight)", l, q.maxInFlight) - q.RLock() - possibleConns := make([]*nsqConn, 0, len(q.nsqConnections)) - for _, c := range q.nsqConnections { - lastMsgTimestamp := atomic.LoadInt64(&c.lastMsgTimestamp) - lastMsgDuration := time.Now().Sub(time.Unix(0, lastMsgTimestamp)) - rdyCount := atomic.LoadInt64(&c.rdyCount) - if q.VerboseLogging { - log.Printf("[%s] rdy: %d (last message received %s)", c, rdyCount, lastMsgDuration) - } - if rdyCount > 0 && lastMsgDuration > q.LowRdyIdleTimeout { - log.Printf("[%s] idle connection, giving up RDY count", c) - q.updateRDY(c, 0) - } - possibleConns = append(possibleConns, c) - } - availableMaxInFlight := int64(q.maxInFlight) - atomic.LoadInt64(&q.totalRdyCount) - for len(possibleConns) > 0 && availableMaxInFlight > 0 { - availableMaxInFlight-- - i := rand.Int() % len(possibleConns) - c := possibleConns[i] - // delete - possibleConns = append(possibleConns[:i], possibleConns[i+1:]...) - log.Printf("[%s] redistributing RDY", c) - q.updateRDY(c, 1) - } - q.RUnlock() - } -} - -// Stop will gracefully stop the Reader -func (q *Reader) Stop() { - var buf bytes.Buffer - - if !atomic.CompareAndSwapInt32(&q.stopFlag, 0, 1) { - return - } - - log.Printf("stopping reader") - - q.RLock() - l := len(q.nsqConnections) - q.RUnlock() - - if l == 0 { - q.stopHandlers() - } else { - q.RLock() - for _, c := range q.nsqConnections { - err := c.sendCommand(&buf, StartClose()) - if err != nil { - log.Printf("[%s] failed to start close - %s", c, err.Error()) - } - } - q.RUnlock() - - go func() { - <-time.After(time.Duration(30) * time.Second) - q.stopHandlers() - }() - } - - if len(q.lookupdHTTPAddrs) != 0 { - q.lookupdExitChan <- 1 - } -} - -func (q *Reader) stopHandlers() { - q.stopHandler.Do(func() { - log.Printf("closing incomingMessages") - close(q.incomingMessages) - }) -} - -// AddHandler adds a Handler for messages received by this Reader. -// -// See Handler for details on implementing this interface. -// -// It's ok to start more than one handler simultaneously, they -// are concurrently executed in goroutines. -func (q *Reader) AddHandler(handler Handler) { - atomic.AddInt32(&q.runningHandlers, 1) - log.Println("starting Handler go-routine") - go func() { - for { - message, ok := <-q.incomingMessages - if !ok { - log.Printf("closing Handler (after self.incomingMessages closed)") - if atomic.AddInt32(&q.runningHandlers, -1) == 0 { - q.ExitChan <- 1 - } - break - } - - err := handler.HandleMessage(message.Message) - if err != nil { - log.Printf("ERROR: handler returned %s for msg %s %s", err.Error(), message.Id, message.Body) - } - - // message passed the max number of attempts - if err != nil && q.MaxAttemptCount > 0 && message.Attempts > q.MaxAttemptCount { - log.Printf("WARNING: msg attempted %d times. giving up %s %s", message.Attempts, message.Id, message.Body) - logger, ok := handler.(FailedMessageLogger) - if ok { - logger.LogFailedMessage(message.Message) - } - message.responseChannel <- &FinishedMessage{message.Id, 0, true} - continue - } - - // linear delay - requeueDelay := q.DefaultRequeueDelay * time.Duration(message.Attempts) - // bound the requeueDelay to configured max - if requeueDelay > q.MaxRequeueDelay { - requeueDelay = q.MaxRequeueDelay - } - - message.responseChannel <- &FinishedMessage{message.Id, int(requeueDelay / time.Millisecond), err == nil} - } - }() -} - -// AddAsyncHandler adds an AsyncHandler for messages received by this Reader. -// -// See AsyncHandler for details on implementing this interface. -// -// It's ok to start more than one handler simultaneously, they -// are concurrently executed in goroutines. -func (q *Reader) AddAsyncHandler(handler AsyncHandler) { - atomic.AddInt32(&q.runningHandlers, 1) - log.Println("starting AsyncHandler go-routine") - go func() { - for { - message, ok := <-q.incomingMessages - if !ok { - log.Printf("closing AsyncHandler (after self.incomingMessages closed)") - if atomic.AddInt32(&q.runningHandlers, -1) == 0 { - q.ExitChan <- 1 - } - break - } - - // message passed the max number of attempts - // note: unfortunately it's not straight forward to do this after passing to async handler, so we don't. - if q.MaxAttemptCount > 0 && message.Attempts > q.MaxAttemptCount { - log.Printf("WARNING: msg attempted %d times. giving up %s %s", message.Attempts, message.Id, message.Body) - logger, ok := handler.(FailedMessageLogger) - if ok { - logger.LogFailedMessage(message.Message) - } - message.responseChannel <- &FinishedMessage{message.Id, 0, true} - continue - } - - handler.HandleMessage(message.Message, message.responseChannel) - } - }() -} diff --git a/nsq/reader_test.go b/nsq/reader_test.go deleted file mode 100644 index b98d5ebdd..000000000 --- a/nsq/reader_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package nsq - -import ( - "bytes" - "crypto/tls" - "errors" - "fmt" - "github.com/bitly/go-simplejson" - "io/ioutil" - "log" - "net/http" - "os" - "strconv" - "testing" - "time" -) - -type MyTestHandler struct { - t *testing.T - q *Reader - messagesSent int - messagesReceived int - messagesFailed int -} - -func (h *MyTestHandler) LogFailedMessage(message *Message) { - h.messagesFailed++ - h.q.Stop() -} - -func (h *MyTestHandler) HandleMessage(message *Message) error { - if string(message.Body) == "TOBEFAILED" { - h.messagesReceived++ - return errors.New("fail this message") - } - - data, err := simplejson.NewJson(message.Body) - if err != nil { - return err - } - - msg, _ := data.Get("msg").String() - if msg != "single" && msg != "double" { - h.t.Error("message 'action' was not correct: ", msg, data) - } - h.messagesReceived++ - return nil -} - -func SendMessage(t *testing.T, port int, topic string, method string, body []byte) { - httpclient := &http.Client{} - endpoint := fmt.Sprintf("http://127.0.0.1:%d/%s?topic=%s", port, method, topic) - req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(body)) - resp, err := httpclient.Do(req) - if err != nil { - t.Fatalf(err.Error()) - return - } - resp.Body.Close() -} - -func TestReader(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - readerTest(t, false) -} - -func TestReaderTLS(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - readerTest(t, true) -} - -func readerTest(t *testing.T, tlsv1 bool) { - topicName := "reader_test" - if tlsv1 { - topicName = topicName + "_tls" - } - topicName = topicName + strconv.Itoa(int(time.Now().Unix())) - - q, _ := NewReader(topicName, "ch") - q.VerboseLogging = true - // so that the test can simulate reaching max requeues and a call to LogFailedMessage - q.DefaultRequeueDelay = 0 - // so that the test wont timeout from backing off - q.SetMaxBackoffDuration(time.Millisecond * 50) - - if tlsv1 { - q.TLSv1 = true - q.TLSConfig = &tls.Config{ - InsecureSkipVerify: true, - } - } - - h := &MyTestHandler{ - t: t, - q: q, - } - q.AddHandler(h) - - SendMessage(t, 4151, topicName, "put", []byte(`{"msg":"single"}`)) - SendMessage(t, 4151, topicName, "mput", []byte("{\"msg\":\"double\"}\n{\"msg\":\"double\"}")) - SendMessage(t, 4151, topicName, "put", []byte("TOBEFAILED")) - h.messagesSent = 4 - - addr := "127.0.0.1:4150" - err := q.ConnectToNSQ(addr) - if err != nil { - t.Fatalf(err.Error()) - } - - err = q.ConnectToNSQ(addr) - if err == nil { - t.Fatalf("should not be able to connect to the same NSQ twice") - } - - <-q.ExitChan - - if h.messagesReceived != 9 || h.messagesSent != 4 { - t.Fatalf("end of test. should have handled a diff number of messages (got %d, sent %d)", h.messagesReceived, h.messagesSent) - } - if h.messagesFailed != 1 { - t.Fatal("failed message not done") - } -} diff --git a/nsq/states.go b/nsq/states.go deleted file mode 100644 index b906da631..000000000 --- a/nsq/states.go +++ /dev/null @@ -1,10 +0,0 @@ -package nsq - -const ( - StateInit = iota - StateDisconnected - StateConnected - StateSubscribed - // close has started. responses are ok, but no new messages will be sent - StateClosing -) diff --git a/nsq/version.go b/nsq/version.go deleted file mode 100644 index 0f577ec17..000000000 --- a/nsq/version.go +++ /dev/null @@ -1,9 +0,0 @@ -// nsq is the official Go package for https://github.com/bitly/nsq -// -// It provides the building blocks for developing applications on the NSQ platform in Go. -// -// Low-level functions and types are provided to communicate over the NSQ protocol as well -// as a high-level Reader library to implement robust consumers. -package nsq - -const VERSION = "0.3.2" diff --git a/nsq/writer.go b/nsq/writer.go deleted file mode 100644 index 9a65d34b4..000000000 --- a/nsq/writer.go +++ /dev/null @@ -1,302 +0,0 @@ -package nsq - -import ( - "bufio" - "bytes" - "errors" - "log" - "net" - "os" - "strings" - "sync" - "sync/atomic" - "time" -) - -type Writer struct { - net.Conn - - WriteTimeout time.Duration - Addr string - HeartbeatInterval time.Duration - ShortIdentifier string - LongIdentifier string - - transactionChan chan *WriterTransaction - dataChan chan []byte - transactions []*WriterTransaction - state int32 - stopFlag int32 - exitChan chan int - closeChan chan int - wg sync.WaitGroup -} - -type WriterTransaction struct { - cmd *Command - doneChan chan *WriterTransaction - FrameType int32 - Data []byte - Error error - Args []interface{} -} - -var ErrNotConnected = errors.New("not connected") -var ErrStopped = errors.New("stopped") - -func NewWriter(addr string) *Writer { - hostname, err := os.Hostname() - if err != nil { - log.Fatalf("ERROR: unable to get hostname %s", err.Error()) - } - return &Writer{ - transactionChan: make(chan *WriterTransaction), - exitChan: make(chan int), - closeChan: make(chan int), - dataChan: make(chan []byte), - - // can be overriden before connecting - Addr: addr, - WriteTimeout: time.Second, - HeartbeatInterval: DefaultClientTimeout / 2, - ShortIdentifier: strings.Split(hostname, ".")[0], - LongIdentifier: hostname, - } -} - -func (w *Writer) String() string { - return w.Addr -} - -func (w *Writer) Stop() { - if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) { - return - } - w.close() - w.wg.Wait() -} - -func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, args ...interface{}) error { - return w.sendCommandAsync(Publish(topic, body), doneChan, args) -} - -func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, args ...interface{}) error { - cmd, err := MultiPublish(topic, body) - if err != nil { - return err - } - return w.sendCommandAsync(cmd, doneChan, args) -} - -func (w *Writer) Publish(topic string, body []byte) (int32, []byte, error) { - return w.sendCommand(Publish(topic, body)) -} - -func (w *Writer) MultiPublish(topic string, body [][]byte) (int32, []byte, error) { - cmd, err := MultiPublish(topic, body) - if err != nil { - return -1, nil, err - } - return w.sendCommand(cmd) -} - -func (w *Writer) sendCommand(cmd *Command) (int32, []byte, error) { - doneChan := make(chan *WriterTransaction) - err := w.sendCommandAsync(cmd, doneChan, nil) - if err != nil { - close(doneChan) - return -1, nil, err - } - t := <-doneChan - return t.FrameType, t.Data, t.Error -} - -func (w *Writer) sendCommandAsync(cmd *Command, doneChan chan *WriterTransaction, args []interface{}) error { - if atomic.LoadInt32(&w.state) != StateConnected { - err := w.connect() - if err != nil { - return err - } - } - t := &WriterTransaction{ - cmd: cmd, - doneChan: doneChan, - FrameType: -1, - Args: args, - } - select { - case w.transactionChan <- t: - case <-w.exitChan: - return ErrStopped - } - return nil -} - -func (w *Writer) connect() error { - if atomic.LoadInt32(&w.stopFlag) == 1 { - return ErrStopped - } - - if !atomic.CompareAndSwapInt32(&w.state, StateInit, StateConnected) { - return nil - } - - conn, err := net.DialTimeout("tcp", w.Addr, time.Second*5) - if err != nil { - log.Printf("ERROR: [%s] failed to dial %s - %s", w, w.Addr, err) - atomic.StoreInt32(&w.state, StateInit) - return err - } - - w.closeChan = make(chan int) - w.Conn = conn - - w.SetWriteDeadline(time.Now().Add(w.WriteTimeout)) - _, err = w.Write(MagicV2) - if err != nil { - log.Printf("ERROR: [%s] failed to write magic - %s", w, err) - w.close() - return err - } - - ci := make(map[string]interface{}) - ci["short_id"] = w.ShortIdentifier - ci["long_id"] = w.LongIdentifier - ci["heartbeat_interval"] = int64(w.HeartbeatInterval / time.Millisecond) - ci["feature_negotiation"] = true - cmd, err := Identify(ci) - if err != nil { - log.Printf("ERROR: [%s] failed to create IDENTIFY command - %s", w, err) - w.close() - return err - } - - w.SetWriteDeadline(time.Now().Add(w.WriteTimeout)) - err = cmd.Write(w) - if err != nil { - log.Printf("ERROR: [%s] failed to write IDENTIFY - %s", w, err) - w.close() - return err - } - - w.SetReadDeadline(time.Now().Add(w.HeartbeatInterval * 2)) - resp, err := ReadResponse(w) - if err != nil { - log.Printf("ERROR: [%s] failed to read IDENTIFY response - %s", w, err) - w.close() - return err - } - - frameType, data, err := UnpackResponse(resp) - if err != nil { - log.Printf("ERROR: [%s] failed to unpack IDENTIFY response - %s", w, resp) - w.close() - return err - } - - if frameType == FrameTypeError { - return errors.New(string(data)) - } - - w.wg.Add(1) - go w.readLoop() - - w.wg.Add(1) - go w.messageRouter() - - return nil -} - -func (w *Writer) close() { - if !atomic.CompareAndSwapInt32(&w.state, StateConnected, StateDisconnected) { - return - } - close(w.closeChan) - w.Conn.Close() - go func() { - w.wg.Wait() - atomic.StoreInt32(&w.state, StateInit) - }() -} - -func (w *Writer) messageRouter() { - defer w.transactionCleanup() - - for { - select { - case t := <-w.transactionChan: - w.transactions = append(w.transactions, t) - w.SetWriteDeadline(time.Now().Add(w.WriteTimeout)) - err := t.cmd.Write(w.Conn) - if err != nil { - log.Printf("ERROR: [%s] failed writing %s", w, err) - w.close() - goto exit - } - case buf := <-w.dataChan: - frameType, data, err := UnpackResponse(buf) - if err != nil { - log.Printf("ERROR: [%s] failed (%s) unpacking response %d %s", w, err, frameType, data) - w.close() - goto exit - } - - if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) { - log.Printf("[%s] heartbeat received", w) - w.SetWriteDeadline(time.Now().Add(w.WriteTimeout)) - err := Nop().Write(w.Conn) - if err != nil { - log.Printf("ERROR: [%s] failed sending heartbeat - %s", w, err) - w.close() - goto exit - } - continue - } - - t := w.transactions[0] - w.transactions = w.transactions[1:] - t.FrameType = frameType - t.Data = data - t.Error = err - t.doneChan <- t - case <-w.closeChan: - goto exit - } - } - -exit: - w.wg.Done() - log.Printf("[%s] exiting messageRouter()", w) -} - -func (w *Writer) transactionCleanup() { - for _, t := range w.transactions { - t.Error = ErrNotConnected - t.doneChan <- t - } - w.transactions = w.transactions[:0] -} - -func (w *Writer) readLoop() { - rbuf := bufio.NewReader(w.Conn) - for { - w.SetReadDeadline(time.Now().Add(w.HeartbeatInterval * 2)) - resp, err := ReadResponse(rbuf) - if err != nil { - log.Printf("ERROR: [%s] reading response %s", w, err) - if !strings.Contains(err.Error(), "use of closed network connection") { - w.close() - } - goto exit - } - select { - case w.dataChan <- resp: - case <-w.closeChan: - goto exit - } - } - -exit: - w.wg.Done() - log.Printf("[%s] exiting readLoop()", w) -} diff --git a/nsq/writer_test.go b/nsq/writer_test.go deleted file mode 100644 index 19211c2eb..000000000 --- a/nsq/writer_test.go +++ /dev/null @@ -1,261 +0,0 @@ -package nsq - -import ( - "errors" - "io/ioutil" - "log" - "os" - "strconv" - "testing" - "time" -) - -type ReaderHandler struct { - t *testing.T - q *Reader - messagesGood int - messagesFailed int -} - -func (h *ReaderHandler) LogFailedMessage(message *Message) { - h.messagesFailed++ - h.q.Stop() -} - -func (h *ReaderHandler) HandleMessage(message *Message) error { - msg := string(message.Body) - if msg == "bad_test_case" { - return errors.New("fail this message") - } - if msg != "multipublish_test_case" && msg != "publish_test_case" { - h.t.Error("message 'action' was not correct:", msg) - } - h.messagesGood++ - return nil -} - -func TestWriterConnection(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - w := NewWriter("127.0.0.1:4150") - - _, _, err := w.Publish("write_test", []byte("test")) - if err != nil { - t.Fatalf("should lazily connect") - } - - w.Stop() - - _, _, err = w.Publish("write_test", []byte("fail test")) - if err != ErrStopped { - t.Fatalf("should not be able to write after Stop()") - } -} - -func TestWriterPublish(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - topicName := "publish" + strconv.Itoa(int(time.Now().Unix())) - msgCount := 10 - - w := NewWriter("127.0.0.1:4150") - defer w.Stop() - - for i := 0; i < msgCount; i++ { - frameType, data, err := w.Publish(topicName, []byte("publish_test_case")) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - } - - frameType, data, err := w.Publish(topicName, []byte("bad_test_case")) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - - readMessages(topicName, t, msgCount) -} - -func TestWriterMultiPublish(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - topicName := "multi_publish" + strconv.Itoa(int(time.Now().Unix())) - msgCount := 10 - - w := NewWriter("127.0.0.1:4150") - defer w.Stop() - - var testData [][]byte - for i := 0; i < msgCount; i++ { - testData = append(testData, []byte("multipublish_test_case")) - } - - frameType, data, err := w.MultiPublish(topicName, testData) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - - frameType, data, err = w.Publish(topicName, []byte("bad_test_case")) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - - readMessages(topicName, t, msgCount) -} - -func TestWriterPublishAsync(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - topicName := "async_publish" + strconv.Itoa(int(time.Now().Unix())) - msgCount := 10 - - w := NewWriter("127.0.0.1:4150") - defer w.Stop() - - responseChan := make(chan *WriterTransaction, msgCount) - for i := 0; i < msgCount; i++ { - err := w.PublishAsync(topicName, []byte("publish_test_case"), responseChan, "test") - if err != nil { - t.Fatalf(err.Error()) - } - } - - for i := 0; i < msgCount; i++ { - trans := <-responseChan - if trans.Error != nil { - t.Fatalf(trans.Error.Error()) - } - if trans.FrameType != int32(0) { - t.Fatalf("FrameType %d != 0", trans.FrameType) - } - if trans.Args[0].(string) != "test" { - t.Fatalf(`proxied arg "%s" != "test"`, trans.Args[0].(string)) - } - } - - frameType, data, err := w.Publish(topicName, []byte("bad_test_case")) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - - readMessages(topicName, t, msgCount) -} - -func TestWriterMultiPublishAsync(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - topicName := "multi_publish" + strconv.Itoa(int(time.Now().Unix())) - msgCount := 10 - - w := NewWriter("127.0.0.1:4150") - defer w.Stop() - - var testData [][]byte - for i := 0; i < msgCount; i++ { - testData = append(testData, []byte("multipublish_test_case")) - } - - responseChan := make(chan *WriterTransaction) - err := w.MultiPublishAsync(topicName, testData, responseChan, "test0", 1) - if err != nil { - t.Fatalf(err.Error()) - } - - trans := <-responseChan - if trans.Error != nil { - t.Fatalf(trans.Error.Error()) - } - if trans.FrameType != int32(0) { - t.Fatalf("FrameType %d != 0", trans.FrameType) - } - if trans.Args[0].(string) != "test0" { - t.Fatalf(`proxied arg "%s" != "test0"`, trans.Args[0].(string)) - } - if trans.Args[1].(int) != 1 { - t.Fatalf(`proxied arg %d != 1`, trans.Args[1].(int)) - } - - frameType, data, err := w.Publish(topicName, []byte("bad_test_case")) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - - readMessages(topicName, t, msgCount) -} - -func TestWriterHeartbeat(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stdout) - - topicName := "heartbeat" + strconv.Itoa(int(time.Now().Unix())) - - w := NewWriter("127.0.0.1:4150") - defer w.Stop() - w.HeartbeatInterval = 1 * time.Millisecond - - _, _, err := w.Publish(topicName, []byte("publish_test_case")) - if err == nil { - t.Fatalf("error should not be nil") - } - if err.Error() != "E_BAD_BODY IDENTIFY heartbeat interval (1) is invalid" { - t.Fatalf("wrong error - %s", err) - } - - w = NewWriter("127.0.0.1:4150") - defer w.Stop() - w.HeartbeatInterval = 1000 * time.Millisecond - - _, _, err = w.Publish(topicName, []byte("publish_test_case")) - if err != nil { - t.Fatalf(err.Error()) - } - - time.Sleep(1100 * time.Millisecond) - - msgCount := 10 - for i := 0; i < msgCount; i++ { - frameType, data, err := w.Publish(topicName, []byte("publish_test_case")) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - } - - frameType, data, err := w.Publish(topicName, []byte("bad_test_case")) - if err != nil { - t.Fatalf("frametype %d data %s error %s", frameType, string(data), err.Error()) - } - - readMessages(topicName, t, msgCount+1) -} - -func readMessages(topicName string, t *testing.T, msgCount int) { - q, _ := NewReader(topicName, "ch") - q.VerboseLogging = true - q.DefaultRequeueDelay = 0 - q.SetMaxBackoffDuration(time.Millisecond * 50) - - h := &ReaderHandler{ - t: t, - q: q, - } - q.AddHandler(h) - - err := q.ConnectToNSQ("127.0.0.1:4150") - if err != nil { - t.Fatalf(err.Error()) - } - <-q.ExitChan - - if h.messagesGood != msgCount { - t.Fatalf("end of test. should have handled a diff number of messages %d != %d", h.messagesGood, msgCount) - } - - if h.messagesFailed != 1 { - t.Fatal("failed message not done") - } -} diff --git a/nsqadmin/http.go b/nsqadmin/http.go index e1fc3b024..97ee1c3fd 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -2,7 +2,7 @@ package main import ( "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "github.com/bitly/nsq/util/lookupd" "html/template" @@ -44,7 +44,7 @@ func NewSingleHostReverseProxy(target *url.URL, timeout time.Duration) *httputil } return &httputil.ReverseProxy{ Director: director, - Transport: nsq.NewDeadlineTransport(timeout), + Transport: util.NewDeadlineTransport(timeout), } } @@ -294,7 +294,7 @@ func createTopicChannelHandler(w http.ResponseWriter, req *http.Request) { for _, addr := range lookupdHTTPAddrs { endpoint := fmt.Sprintf("http://%s/create_topic?topic=%s", addr, url.QueryEscape(topicName)) log.Printf("LOOKUPD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: lookupd %s - %s", endpoint, err.Error()) continue @@ -308,7 +308,7 @@ func createTopicChannelHandler(w http.ResponseWriter, req *http.Request) { endpoint := fmt.Sprintf("http://%s/create_channel?topic=%s&channel=%s", addr, url.QueryEscape(topicName), url.QueryEscape(channelName)) log.Printf("LOOKUPD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: lookupd %s - %s", endpoint, err.Error()) continue @@ -321,7 +321,7 @@ func createTopicChannelHandler(w http.ResponseWriter, req *http.Request) { endpoint := fmt.Sprintf("http://%s/create_channel?topic=%s&channel=%s", addr, url.QueryEscape(topicName), url.QueryEscape(channelName)) log.Printf("NSQD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error()) continue @@ -363,7 +363,7 @@ func tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request) { endpoint := fmt.Sprintf("http://%s/tombstone_topic_producer?topic=%s&node=%s", addr, url.QueryEscape(topicName), url.QueryEscape(node)) log.Printf("LOOKUPD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: lookupd %s - %s", endpoint, err.Error()) } @@ -372,7 +372,7 @@ func tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request) { // delete the topic on the producer endpoint := fmt.Sprintf("http://%s/delete_topic?topic=%s", node, url.QueryEscape(topicName)) log.Printf("NSQD: querying %s", endpoint) - _, err = nsq.ApiRequest(endpoint) + _, err = util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error()) } @@ -409,7 +409,7 @@ func deleteTopicHandler(w http.ResponseWriter, req *http.Request) { endpoint := fmt.Sprintf("http://%s/delete_topic?topic=%s", addr, url.QueryEscape(topicName)) log.Printf("LOOKUPD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: lookupd %s - %s", endpoint, err.Error()) continue @@ -420,7 +420,7 @@ func deleteTopicHandler(w http.ResponseWriter, req *http.Request) { for _, addr := range producers { endpoint := fmt.Sprintf("http://%s/delete_topic?topic=%s", addr, url.QueryEscape(topicName)) log.Printf("NSQD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error()) continue @@ -456,7 +456,7 @@ func deleteChannelHandler(w http.ResponseWriter, req *http.Request) { addr, url.QueryEscape(topicName), url.QueryEscape(channelName)) log.Printf("LOOKUPD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: lookupd %s - %s", endpoint, err.Error()) continue @@ -468,7 +468,7 @@ func deleteChannelHandler(w http.ResponseWriter, req *http.Request) { endpoint := fmt.Sprintf("http://%s/delete_channel?topic=%s&channel=%s", addr, url.QueryEscape(topicName), url.QueryEscape(channelName)) log.Printf("NSQD: querying %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error()) continue @@ -499,7 +499,7 @@ func emptyTopicHandler(w http.ResponseWriter, req *http.Request) { endpoint := fmt.Sprintf("http://%s/empty_topic?topic=%s", addr, url.QueryEscape(topicName)) log.Printf("NSQD: calling %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error()) continue @@ -531,7 +531,7 @@ func emptyChannelHandler(w http.ResponseWriter, req *http.Request) { addr, url.QueryEscape(topicName), url.QueryEscape(channelName)) log.Printf("NSQD: calling %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error()) continue @@ -563,7 +563,7 @@ func pauseChannelHandler(w http.ResponseWriter, req *http.Request) { addr, req.URL.Path, url.QueryEscape(topicName), url.QueryEscape(channelName)) log.Printf("NSQD: calling %s", endpoint) - _, err := nsq.ApiRequest(endpoint) + _, err := util.ApiRequest(endpoint) if err != nil { log.Printf("ERROR: nsqd %s - %s", endpoint, err.Error()) continue diff --git a/nsqadmin/notify.go b/nsqadmin/notify.go index a7b368f40..c8482c8d1 100644 --- a/nsqadmin/notify.go +++ b/nsqadmin/notify.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/base64" "encoding/json" - "github.com/bitly/nsq/nsq" + "github.com/bitly/nsq/util" "log" "net/http" "strings" @@ -28,7 +28,7 @@ func HandleAdminActions() { if err != nil { log.Printf("Error serializing admin action! %s", err) } - httpclient := &http.Client{Transport: nsq.NewDeadlineTransport(10 * time.Second)} + httpclient := &http.Client{Transport: util.NewDeadlineTransport(10 * time.Second)} log.Printf("Posting notification to %s", *notificationHTTPEndpoint) _, err = httpclient.Post(*notificationHTTPEndpoint, "application/json", bytes.NewBuffer(content)) if err != nil { diff --git a/nsqd/channel.go b/nsqd/channel.go index a90ded6f2..0abafd453 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -4,7 +4,7 @@ import ( "bytes" "container/heap" "errors" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "github.com/bitly/nsq/util/pqueue" "log" diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 7051884ca..bed834b27 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -1,7 +1,7 @@ package main import ( - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bmizerany/assert" "io/ioutil" "log" diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 325224616..cd4ac580c 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -5,7 +5,7 @@ import ( "crypto/tls" "errors" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "log" "net" "sync" diff --git a/nsqd/guid.go b/nsqd/guid.go index ddf7c51cd..d53fc8511 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -12,7 +12,7 @@ package main import ( "encoding/hex" "errors" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "time" ) diff --git a/nsqd/guid_test.go b/nsqd/guid_test.go index 6b4196013..b0bc14f6e 100644 --- a/nsqd/guid_test.go +++ b/nsqd/guid_test.go @@ -1,7 +1,7 @@ package main import ( - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "testing" "unsafe" ) diff --git a/nsqd/http.go b/nsqd/http.go index 5f2b0e0d8..c522b81c0 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -4,7 +4,7 @@ import ( "bufio" "errors" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "io" "io/ioutil" diff --git a/nsqd/lookup.go b/nsqd/lookup.go index 7b5573cc3..1481aa40a 100644 --- a/nsqd/lookup.go +++ b/nsqd/lookup.go @@ -3,7 +3,7 @@ package main import ( "bytes" "encoding/json" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "log" "net" @@ -13,7 +13,7 @@ import ( ) func (n *NSQd) lookupLoop() { - syncTopicChan := make(chan *nsq.LookupPeer) + syncTopicChan := make(chan *LookupPeer) hostname, err := os.Hostname() if err != nil { @@ -22,7 +22,7 @@ func (n *NSQd) lookupLoop() { for _, host := range n.lookupdTCPAddrs { log.Printf("LOOKUP: adding peer %s", host) - lookupPeer := nsq.NewLookupPeer(host, func(lp *nsq.LookupPeer) { + lookupPeer := NewLookupPeer(host, func(lp *LookupPeer) { ci := make(map[string]interface{}) ci["version"] = util.BINARY_VERSION ci["tcp_port"] = n.tcpAddr.Port diff --git a/nsq/lookup_peer.go b/nsqd/lookup_peer.go similarity index 87% rename from nsq/lookup_peer.go rename to nsqd/lookup_peer.go index 18e5362d5..8a0b32eb1 100644 --- a/nsq/lookup_peer.go +++ b/nsqd/lookup_peer.go @@ -1,6 +1,7 @@ -package nsq +package main import ( + "github.com/bitly/go-nsq" "log" "net" "time" @@ -34,7 +35,7 @@ type PeerInfo struct { func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer { return &LookupPeer{ addr: addr, - state: StateDisconnected, + state: nsq.StateDisconnected, connectCallback: connectCallback, } } @@ -69,7 +70,7 @@ func (lp *LookupPeer) Write(data []byte) (int, error) { // Close implements the io.Closer interface func (lp *LookupPeer) Close() error { - lp.state = StateDisconnected + lp.state = nsq.StateDisconnected return lp.conn.Close() } @@ -79,16 +80,16 @@ func (lp *LookupPeer) Close() error { // reconnecting in the event of a failure. // // It returns the response from nsqlookupd as []byte -func (lp *LookupPeer) Command(cmd *Command) ([]byte, error) { +func (lp *LookupPeer) Command(cmd *nsq.Command) ([]byte, error) { initialState := lp.state - if lp.state != StateConnected { + if lp.state != nsq.StateConnected { err := lp.Connect() if err != nil { return nil, err } - lp.state = StateConnected - lp.Write(MagicV1) - if initialState == StateDisconnected { + lp.state = nsq.StateConnected + lp.Write(nsq.MagicV1) + if initialState == nsq.StateDisconnected { lp.connectCallback(lp) } } @@ -100,7 +101,7 @@ func (lp *LookupPeer) Command(cmd *Command) ([]byte, error) { lp.Close() return nil, err } - resp, err := ReadResponse(lp) + resp, err := nsq.ReadResponse(lp) if err != nil { lp.Close() return nil, err diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 08370355c..765724f97 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -5,8 +5,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/bitly/go-nsq" "github.com/bitly/go-simplejson" - "github.com/bitly/nsq/nsq" "github.com/bitly/nsq/util" "github.com/bitly/nsq/util/lookupd" "io/ioutil" @@ -31,7 +31,7 @@ type NSQd struct { topicMap map[string]*Topic lookupdTCPAddrs util.StringArray - lookupPeers []*nsq.LookupPeer + lookupPeers []*LookupPeer tcpAddr *net.TCPAddr httpAddr *net.TCPAddr diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index b052e1f3c..f3f6de389 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -2,8 +2,8 @@ package main import ( "fmt" + "github.com/bitly/go-nsq" "github.com/bitly/go-simplejson" - "github.com/bitly/nsq/nsq" "github.com/bmizerany/assert" "io/ioutil" "log" diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 1c8e9dd33..20e7f2b63 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -5,7 +5,7 @@ import ( "encoding/binary" "encoding/json" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "io" "log" @@ -62,7 +62,7 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error { response, err := p.Exec(client, params) if err != nil { context := "" - if parentErr := err.(nsq.ChildError).Parent(); parentErr != nil { + if parentErr := err.(util.ChildErr).Parent(); parentErr != nil { context = " - " + parentErr.Error() } log.Printf("ERROR: [%s] - %s%s", client, err.Error(), context) @@ -73,7 +73,7 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error { } // errors of type FatalClientErr should forceably close the connection - if _, ok := err.(*nsq.FatalClientErr); ok { + if _, ok := err.(*util.FatalClientErr); ok { break } continue @@ -123,7 +123,7 @@ func (p *ProtocolV2) Send(client *ClientV2, frameType int32, data []byte) error defer client.Unlock() client.SetWriteDeadline(time.Now().Add(time.Second)) - _, err := nsq.SendFramedResponse(client.Writer, frameType, data) + _, err := util.SendFramedResponse(client.Writer, frameType, data) if err != nil { return err } @@ -166,7 +166,7 @@ func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) { case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) } - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) + return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) } func (p *ProtocolV2) messagePump(client *ClientV2) { @@ -276,35 +276,35 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) var err error if atomic.LoadInt32(&client.State) != nsq.StateInit { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot IDENTIFY in current state") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot IDENTIFY in current state") } bodyLen, err := p.readLen(client) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } if int64(bodyLen) > p.context.nsqd.options.maxBodySize { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_BODY", + return nil, util.NewFatalClientErr(nil, "E_BAD_BODY", fmt.Sprintf("IDENTIFY body too big %d > %d", bodyLen, p.context.nsqd.options.maxBodySize)) } body := make([]byte, bodyLen) _, err = io.ReadFull(client.Reader, body) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") } // body is a json structure with producer information var identifyData IdentifyDataV2 err = json.Unmarshal(body, &identifyData) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } err = client.Identify(identifyData) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error()) + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error()) } // bail out early if we're not negotiating features @@ -333,19 +333,19 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) err = p.Send(client, nsq.FrameTypeResponse, resp) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) + return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) } if tlsv1 { log.Printf("PROTOCOL(V2): [%s] upgrading connection to TLS", client) err = client.UpgradeTLS() if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) + return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) } err = p.Send(client, nsq.FrameTypeResponse, okBytes) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) + return nil, util.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) } } @@ -354,26 +354,26 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) func (p *ProtocolV2) SUB(client *ClientV2, params [][]byte) ([]byte, error) { if atomic.LoadInt32(&client.State) != nsq.StateInit { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot SUB in current state") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot SUB in current state") } if client.HeartbeatInterval < 0 { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot SUB with heartbeats disabled") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot SUB with heartbeats disabled") } if len(params) < 3 { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "SUB insufficient number of parameters") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "SUB insufficient number of parameters") } topicName := string(params[1]) if !nsq.IsValidTopicName(topicName) { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_TOPIC", + return nil, util.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("SUB topic name '%s' is not valid", topicName)) } channelName := string(params[2]) if !nsq.IsValidChannelName(channelName) { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_CHANNEL", + return nil, util.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("SUB channel name '%s' is not valid", channelName)) } @@ -399,14 +399,14 @@ func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) { } if state != nsq.StateSubscribed { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot RDY in current state") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot RDY in current state") } count := int64(1) if len(params) > 1 { b10, err := util.ByteToBase10(params[1]) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_INVALID", + return nil, util.NewFatalClientErr(err, "E_INVALID", fmt.Sprintf("RDY could not parse count %s", params[1])) } count = int64(b10) @@ -415,7 +415,7 @@ func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) { if count < 0 || count > p.context.nsqd.options.maxRdyCount { // this needs to be a fatal error otherwise clients would have // inconsistent state - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", + return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("RDY count %d out of range 0-%d", count, p.context.nsqd.options.maxRdyCount)) } @@ -427,17 +427,17 @@ func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) { func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state") } if len(params) < 2 { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params") } id := *(*nsq.MessageID)(unsafe.Pointer(¶ms[1][0])) err := client.Channel.FinishMessage(client.ID, id) if err != nil { - return nil, nsq.NewClientErr(err, "E_FIN_FAILED", + return nil, util.NewClientErr(err, "E_FIN_FAILED", fmt.Sprintf("FIN %s failed %s", id, err.Error())) } @@ -449,29 +449,29 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state") } if len(params) < 3 { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "REQ insufficient number of params") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "REQ insufficient number of params") } id := *(*nsq.MessageID)(unsafe.Pointer(¶ms[1][0])) timeoutMs, err := util.ByteToBase10(params[2]) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_INVALID", + return nil, util.NewFatalClientErr(err, "E_INVALID", fmt.Sprintf("REQ could not parse timeout %s", params[2])) } timeoutDuration := time.Duration(timeoutMs) * time.Millisecond if timeoutDuration < 0 || timeoutDuration > maxTimeout { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", + return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("REQ timeout %d out of range 0-%d", timeoutDuration, maxTimeout)) } err = client.Channel.RequeueMessage(client.ID, id, timeoutDuration) if err != nil { - return nil, nsq.NewClientErr(err, "E_REQ_FAILED", + return nil, util.NewClientErr(err, "E_REQ_FAILED", fmt.Sprintf("REQ %s failed %s", id, err.Error())) } @@ -482,7 +482,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { func (p *ProtocolV2) CLS(client *ClientV2, params [][]byte) ([]byte, error) { if atomic.LoadInt32(&client.State) != nsq.StateSubscribed { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot CLS in current state") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot CLS in current state") } client.StartClose() @@ -498,41 +498,41 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) { var err error if len(params) < 2 { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters") } topicName := string(params[1]) if !nsq.IsValidTopicName(topicName) { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_TOPIC", + return nil, util.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("PUB topic name '%s' is not valid", topicName)) } bodyLen, err := p.readLen(client) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size") + return nil, util.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size") } if bodyLen <= 0 { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_MESSAGE", + return nil, util.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("PUB invalid message body size %d", bodyLen)) } if int64(bodyLen) > p.context.nsqd.options.maxMessageSize { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_MESSAGE", + return nil, util.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.context.nsqd.options.maxMessageSize)) } messageBody := make([]byte, bodyLen) _, err = io.ReadFull(client.Reader, messageBody) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") + return nil, util.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") } topic := p.context.nsqd.GetTopic(topicName) msg := nsq.NewMessage(<-p.context.nsqd.idChan, messageBody) err = topic.PutMessage(msg) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) + return nil, util.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } return okBytes, nil @@ -542,37 +542,37 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { var err error if len(params) < 2 { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "MPUB insufficient number of parameters") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "MPUB insufficient number of parameters") } topicName := string(params[1]) if !nsq.IsValidTopicName(topicName) { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_TOPIC", + return nil, util.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("E_BAD_TOPIC MPUB topic name '%s' is not valid", topicName)) } bodyLen, err := p.readLen(client) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size") } if bodyLen <= 0 { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_BODY", + return nil, util.NewFatalClientErr(nil, "E_BAD_BODY", fmt.Sprintf("MPUB invalid body size %d", bodyLen)) } if int64(bodyLen) > p.context.nsqd.options.maxBodySize { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_BODY", + return nil, util.NewFatalClientErr(nil, "E_BAD_BODY", fmt.Sprintf("MPUB body too big %d > %d", bodyLen, p.context.nsqd.options.maxBodySize)) } numMessages, err := p.readLen(client) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count") } if numMessages <= 0 { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", fmt.Sprintf("MPUB invalid message count %d", numMessages)) } @@ -580,24 +580,24 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { for i := int32(0); i < numMessages; i++ { messageSize, err := p.readLen(client) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", + return nil, util.NewFatalClientErr(err, "E_BAD_MESSAGE", fmt.Sprintf("MPUB failed to read message(%d) body size", i)) } if messageSize <= 0 { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_MESSAGE", + return nil, util.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("MPUB invalid message(%d) body size %d", i, messageSize)) } if int64(messageSize) > p.context.nsqd.options.maxMessageSize { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_MESSAGE", + return nil, util.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("MPUB message too big %d > %d", messageSize, p.context.nsqd.options.maxMessageSize)) } msgBody := make([]byte, messageSize) _, err = io.ReadFull(client.Reader, msgBody) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body") + return nil, util.NewFatalClientErr(err, "E_BAD_MESSAGE", "MPUB failed to read message body") } messages = append(messages, nsq.NewMessage(<-p.context.nsqd.idChan, msgBody)) @@ -610,7 +610,7 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { // this next call (and no messages will be queued in that case) err = topic.PutMessages(messages) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error()) + return nil, util.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error()) } return okBytes, nil @@ -619,17 +619,17 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot TOUCH in current state") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot TOUCH in current state") } if len(params) < 2 { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "TOUCH insufficient number of params") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "TOUCH insufficient number of params") } id := *(*nsq.MessageID)(unsafe.Pointer(¶ms[1][0])) err := client.Channel.TouchMessage(client.ID, id) if err != nil { - return nil, nsq.NewClientErr(err, "E_TOUCH_FAILED", + return nil, util.NewClientErr(err, "E_TOUCH_FAILED", fmt.Sprintf("TOUCH %s failed %s", id, err.Error())) } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index ca7367430..fb37c5c54 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -6,7 +6,7 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bmizerany/assert" "io/ioutil" "log" @@ -250,7 +250,7 @@ func TestClientHeartbeat(t *testing.T) { topicName := "test_hb_v2" + strconv.Itoa(int(time.Now().Unix())) options := NewNsqdOptions() - options.clientTimeout = 100 * time.Millisecond + options.clientTimeout = 200 * time.Millisecond tcpAddr, _, nsqd := mustStartNSQd(options) defer nsqd.Exit() @@ -267,13 +267,13 @@ func TestClientHeartbeat(t *testing.T) { _, data, _ := nsq.UnpackResponse(resp) assert.Equal(t, data, []byte("_heartbeat_")) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) err = nsq.Nop().Write(conn) assert.Equal(t, err, nil) // wait long enough that would have timed out (had we not sent the above cmd) - time.Sleep(50 * time.Millisecond) + time.Sleep(100 * time.Millisecond) err = nsq.Nop().Write(conn) assert.Equal(t, err, nil) diff --git a/nsqd/queue.go b/nsqd/queue.go index 5cfe94d0a..b3f6df270 100644 --- a/nsqd/queue.go +++ b/nsqd/queue.go @@ -2,7 +2,7 @@ package main import ( "bytes" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" ) // BackendQueue represents the behavior for the secondary message diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 592b0a6af..8ea3f049e 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -1,7 +1,7 @@ package main import ( - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bmizerany/assert" "io/ioutil" "log" diff --git a/nsqd/tcp.go b/nsqd/tcp.go index 716dc225f..c372d619f 100644 --- a/nsqd/tcp.go +++ b/nsqd/tcp.go @@ -1,7 +1,8 @@ package main import ( - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" + "github.com/bitly/nsq/util" "io" "log" "net" @@ -27,12 +28,12 @@ func (p *tcpServer) Handle(clientConn net.Conn) { log.Printf("CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) - var prot nsq.Protocol + var prot util.Protocol switch protocolMagic { case " V2": prot = &ProtocolV2{context: p.context} default: - nsq.SendFramedResponse(clientConn, nsq.FrameTypeError, []byte("E_BAD_PROTOCOL")) + util.SendFramedResponse(clientConn, nsq.FrameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() log.Printf("ERROR: client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return diff --git a/nsq/cluster_test.go b/nsqd/test/cluster_test.go similarity index 82% rename from nsq/cluster_test.go rename to nsqd/test/cluster_test.go index cf1b2729d..4721d2b36 100644 --- a/nsq/cluster_test.go +++ b/nsqd/test/cluster_test.go @@ -1,6 +1,8 @@ package nsq import ( + "fmt" + "github.com/bitly/nsq/util" "github.com/bmizerany/assert" "io/ioutil" "log" @@ -10,12 +12,6 @@ import ( "time" ) -type MyOtherTestHandler struct{} - -func (h *MyOtherTestHandler) HandleMessage(message *Message) error { - return nil -} - func TestNsqdToLookupd(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) @@ -27,11 +23,12 @@ func TestNsqdToLookupd(t *testing.T) { t.Fatalf("ERROR: failed to get hostname - %s", err.Error()) } - q, _ := NewReader(topicName, "ch") - q.VerboseLogging = true - q.AddHandler(&MyOtherTestHandler{}) + _, err = util.ApiRequest(fmt.Sprintf("http://127.0.0.1:4151/create_topic?topic=%s", topicName)) + if err != nil { + t.Fatalf(err.Error()) + } - err = q.ConnectToNSQ("127.0.0.1:4150") + _, err = util.ApiRequest(fmt.Sprintf("http://127.0.0.1:4151/create_channel?topic=%s&channel=ch", topicName)) if err != nil { t.Fatalf(err.Error()) } @@ -39,7 +36,7 @@ func TestNsqdToLookupd(t *testing.T) { // allow some time for nsqd to push info to nsqlookupd time.Sleep(350 * time.Millisecond) - data, err := ApiRequest("http://127.0.0.1:4161/debug") + data, err := util.ApiRequest("http://127.0.0.1:4161/debug") if err != nil { t.Fatalf(err.Error()) } @@ -77,7 +74,7 @@ func TestNsqdToLookupd(t *testing.T) { assert.Equal(t, port, 4150) assert.Equal(t, tombstoned, false) - data, err = ApiRequest("http://127.0.0.1:4161/lookup?topic=" + topicName) + data, err = util.ApiRequest("http://127.0.0.1:4161/lookup?topic=" + topicName) if err != nil { t.Fatalf(err.Error()) } @@ -102,7 +99,7 @@ func TestNsqdToLookupd(t *testing.T) { channel := channels[0].(string) assert.Equal(t, channel, "ch") - data, err = ApiRequest("http://127.0.0.1:4151/delete_topic?topic=" + topicName) + data, err = util.ApiRequest("http://127.0.0.1:4151/delete_topic?topic=" + topicName) if err != nil { t.Fatalf(err.Error()) } @@ -110,7 +107,7 @@ func TestNsqdToLookupd(t *testing.T) { // allow some time for nsqd to push info to nsqlookupd time.Sleep(350 * time.Millisecond) - data, err = ApiRequest("http://127.0.0.1:4161/lookup?topic=" + topicName) + data, err = util.ApiRequest("http://127.0.0.1:4161/lookup?topic=" + topicName) if err != nil { t.Fatalf(err.Error()) } @@ -118,7 +115,7 @@ func TestNsqdToLookupd(t *testing.T) { producers, _ = data.Get("producers").Array() assert.Equal(t, len(producers), 0) - data, err = ApiRequest("http://127.0.0.1:4161/debug") + data, err = util.ApiRequest("http://127.0.0.1:4161/debug") if err != nil { t.Fatalf(err.Error()) } @@ -128,7 +125,4 @@ func TestNsqdToLookupd(t *testing.T) { producers, _ = data.Get("channel:" + topicName + ":ch").Array() assert.Equal(t, len(producers), 0) - - q.Stop() - <-q.ExitChan } diff --git a/nsqd/topic.go b/nsqd/topic.go index 175a249f8..ef5396b92 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -3,7 +3,7 @@ package main import ( "bytes" "errors" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "log" "sync" diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index d9ab9ce1f..103dff4db 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -1,7 +1,7 @@ package main import ( - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bmizerany/assert" "io/ioutil" "log" diff --git a/nsqlookupd/http.go b/nsqlookupd/http.go index 6b627e356..46c724fe1 100644 --- a/nsqlookupd/http.go +++ b/nsqlookupd/http.go @@ -2,7 +2,7 @@ package main import ( "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "io" "log" diff --git a/nsqlookupd/lookup_protocol_v1.go b/nsqlookupd/lookup_protocol_v1.go index ce8da7f77..ca4c3385f 100644 --- a/nsqlookupd/lookup_protocol_v1.go +++ b/nsqlookupd/lookup_protocol_v1.go @@ -5,7 +5,7 @@ import ( "encoding/binary" "encoding/json" "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" "github.com/bitly/nsq/util" "io" "log" @@ -38,25 +38,25 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { response, err := p.Exec(client, reader, params) if err != nil { context := "" - if parentErr := err.(nsq.ChildError).Parent(); parentErr != nil { + if parentErr := err.(util.ChildErr).Parent(); parentErr != nil { context = " - " + parentErr.Error() } log.Printf("ERROR: [%s] - %s%s", client, err.Error(), context) - _, err = nsq.SendResponse(client, []byte(err.Error())) + _, err = util.SendResponse(client, []byte(err.Error())) if err != nil { break } // errors of type FatalClientErr should forceably close the connection - if _, ok := err.(*nsq.FatalClientErr); ok { + if _, ok := err.(*util.FatalClientErr); ok { break } continue } if response != nil { - _, err = nsq.SendResponse(client, response) + _, err = util.SendResponse(client, response) if err != nil { break } @@ -87,12 +87,12 @@ func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params [ case "UNREGISTER": return p.UNREGISTER(client, reader, params[1:]) } - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) + return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) } func getTopicChan(command string, params []string) (string, string, error) { if len(params) == 0 { - return "", "", nsq.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command)) + return "", "", util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command)) } topicName := params[0] @@ -102,11 +102,11 @@ func getTopicChan(command string, params []string) (string, string, error) { } if !nsq.IsValidTopicName(topicName) { - return "", "", nsq.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName)) + return "", "", util.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName)) } if channelName != "" && !nsq.IsValidChannelName(channelName) { - return "", "", nsq.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName)) + return "", "", util.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName)) } return topicName, channelName, nil @@ -114,7 +114,7 @@ func getTopicChan(command string, params []string) (string, string, error) { func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } topic, channel, err := getTopicChan("REGISTER", params) @@ -140,7 +140,7 @@ func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, para func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { - return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") + return nil, util.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } topic, channel, err := getTopicChan("UNREGISTER", params) @@ -186,26 +186,26 @@ func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, para var err error if client.peerInfo != nil { - return nil, nsq.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") + return nil, util.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") } var bodyLen int32 err = binary.Read(reader, binary.BigEndian, &bodyLen) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } body := make([]byte, bodyLen) _, err = io.ReadFull(reader, body) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") } // body is a json structure with producer information peerInfo := PeerInfo{id: client.RemoteAddr().String()} err = json.Unmarshal(body, &peerInfo) if err != nil { - return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") + return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } peerInfo.RemoteAddress = client.RemoteAddr().String() @@ -216,7 +216,7 @@ func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, para // require all fields if peerInfo.BroadcastAddress == "" || peerInfo.TcpPort == 0 || peerInfo.HttpPort == 0 || peerInfo.Version == "" { - return nil, nsq.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields") + return nil, util.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields") } peerInfo.lastUpdate = time.Now() diff --git a/nsqlookupd/nsqlookupd_test.go b/nsqlookupd/nsqlookupd_test.go index 4c1ed8be8..62a175e55 100644 --- a/nsqlookupd/nsqlookupd_test.go +++ b/nsqlookupd/nsqlookupd_test.go @@ -2,7 +2,8 @@ package main import ( "fmt" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" + "github.com/bitly/nsq/util" lookuputil "github.com/bitly/nsq/util/lookupd" "github.com/bmizerany/assert" "io/ioutil" @@ -72,7 +73,7 @@ func TestBasicLookupd(t *testing.T) { assert.Equal(t, v, []byte("OK")) endpoint := fmt.Sprintf("http://%s/nodes", httpAddr) - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) log.Printf("got %v", data) returnedProducers, err := data.Get("producers").Array() assert.Equal(t, err, nil) @@ -92,7 +93,7 @@ func TestBasicLookupd(t *testing.T) { assert.Equal(t, producer.peerInfo.HttpPort, httpPort) endpoint = fmt.Sprintf("http://%s/topics", httpAddr) - data, err = nsq.ApiRequest(endpoint) + data, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) returnedTopics, err := data.Get("topics").Array() log.Printf("got returnedTopics %v", returnedTopics) @@ -100,7 +101,7 @@ func TestBasicLookupd(t *testing.T) { assert.Equal(t, len(returnedTopics), 1) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err = nsq.ApiRequest(endpoint) + data, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) returnedChannels, err := data.Get("channels").Array() assert.Equal(t, err, nil) @@ -135,7 +136,7 @@ func TestBasicLookupd(t *testing.T) { time.Sleep(10 * time.Millisecond) // now there should be no producers, but still topic/channel entries - data, err = nsq.ApiRequest(endpoint) + data, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) returnedChannels, err = data.Get("channels").Array() assert.Equal(t, err, nil) @@ -187,7 +188,7 @@ func TestChannelUnregister(t *testing.T) { assert.Equal(t, len(channels), 1) endpoint := fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) assert.Equal(t, err, nil) returnedProducers, err := data.Get("producers").Array() assert.Equal(t, err, nil) @@ -217,17 +218,17 @@ func TestTombstoneRecover(t *testing.T) { assert.Equal(t, err, nil) endpoint := fmt.Sprintf("http://%s/tombstone_topic_producer?topic=%s&node=%s", httpAddr, topicName, "ip.address:5555") - _, err = nsq.ApiRequest(endpoint) + _, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) assert.Equal(t, err, nil) producers, _ := data.Get("producers").Array() assert.Equal(t, len(producers), 0) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName2) - data, err = nsq.ApiRequest(endpoint) + data, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) producers, _ = data.Get("producers").Array() assert.Equal(t, len(producers), 1) @@ -235,7 +236,7 @@ func TestTombstoneRecover(t *testing.T) { time.Sleep(55 * time.Millisecond) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err = nsq.ApiRequest(endpoint) + data, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) producers, _ = data.Get("producers").Array() assert.Equal(t, len(producers), 1) @@ -259,11 +260,11 @@ func TestTombstoneUnregister(t *testing.T) { assert.Equal(t, err, nil) endpoint := fmt.Sprintf("http://%s/tombstone_topic_producer?topic=%s&node=%s", httpAddr, topicName, "ip.address:5555") - _, err = nsq.ApiRequest(endpoint) + _, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) assert.Equal(t, err, nil) producers, _ := data.Get("producers").Array() assert.Equal(t, len(producers), 0) @@ -275,7 +276,7 @@ func TestTombstoneUnregister(t *testing.T) { time.Sleep(55 * time.Millisecond) endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", httpAddr, topicName) - data, err = nsq.ApiRequest(endpoint) + data, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) producers, _ = data.Get("producers").Array() assert.Equal(t, len(producers), 0) @@ -338,7 +339,7 @@ func TestTombstonedNodes(t *testing.T) { assert.Equal(t, producers[0].Topics[0].Tombstoned, false) endpoint := fmt.Sprintf("http://%s/tombstone_topic_producer?topic=%s&node=%s", httpAddr, topicName, "ip.address:5555") - _, err = nsq.ApiRequest(endpoint) + _, err = util.ApiRequest(endpoint) assert.Equal(t, err, nil) producers, _ = lookuputil.GetLookupdProducers(lookupdHTTPAddrs) diff --git a/nsqlookupd/tcp.go b/nsqlookupd/tcp.go index 5a2555616..8108c3048 100644 --- a/nsqlookupd/tcp.go +++ b/nsqlookupd/tcp.go @@ -1,7 +1,7 @@ package main import ( - "github.com/bitly/nsq/nsq" + "github.com/bitly/nsq/util" "io" "log" "net" @@ -27,12 +27,12 @@ func (p *tcpServer) Handle(clientConn net.Conn) { log.Printf("CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) - var prot nsq.Protocol + var prot util.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{context: p.context} default: - nsq.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) + util.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) clientConn.Close() log.Printf("ERROR: client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return diff --git a/test.sh b/test.sh index 94e2ad17e..0e80ea308 100755 --- a/test.sh +++ b/test.sh @@ -34,7 +34,7 @@ cleanup() { } trap cleanup INT TERM EXIT -pushd nsq >/dev/null +pushd nsqd/test >/dev/null echo "testing nsq" go test -v -timeout 15s popd >/dev/null diff --git a/nsq/api_request.go b/util/api_request.go similarity index 99% rename from nsq/api_request.go rename to util/api_request.go index ea4463aea..ef9c416e2 100644 --- a/nsq/api_request.go +++ b/util/api_request.go @@ -1,4 +1,4 @@ -package nsq +package util import ( "errors" diff --git a/nsq/errors.go b/util/errors.go similarity index 96% rename from nsq/errors.go rename to util/errors.go index 01738c617..c59eb444c 100644 --- a/nsq/errors.go +++ b/util/errors.go @@ -1,6 +1,6 @@ -package nsq +package util -type ChildError interface { +type ChildErr interface { Parent() error } diff --git a/util/lookupd/lookupd.go b/util/lookupd/lookupd.go index 180a8d721..cf35c77b1 100644 --- a/util/lookupd/lookupd.go +++ b/util/lookupd/lookupd.go @@ -3,7 +3,6 @@ package lookupd import ( "errors" "fmt" - "github.com/bitly/nsq/nsq" "github.com/bitly/nsq/util" "github.com/bitly/nsq/util/semver" "log" @@ -27,7 +26,7 @@ func GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error) { log.Printf("LOOKUPD: querying %s", endpoint) go func(endpoint string) { - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) lock.Lock() defer lock.Unlock() defer wg.Done() @@ -61,7 +60,7 @@ func GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, endpoint := fmt.Sprintf("http://%s/channels?topic=%s", addr, url.QueryEscape(topic)) log.Printf("LOOKUPD: querying %s", endpoint) go func(endpoint string) { - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) lock.Lock() defer lock.Unlock() defer wg.Done() @@ -98,7 +97,7 @@ func GetLookupdProducers(lookupdHTTPAddrs []string) ([]*Producer, error) { endpoint := fmt.Sprintf("http://%s/nodes", addr) log.Printf("LOOKUPD: querying %s", endpoint) go func(addr string, endpoint string) { - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) lock.Lock() defer lock.Unlock() defer wg.Done() @@ -204,7 +203,7 @@ func GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) ([]string log.Printf("LOOKUPD: querying %s", endpoint) go func(endpoint string) { - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) lock.Lock() defer lock.Unlock() defer wg.Done() @@ -248,7 +247,7 @@ func GetNSQDTopics(nsqdHTTPAddrs []string) ([]string, error) { log.Printf("NSQD: querying %s", endpoint) go func(endpoint string) { - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) lock.Lock() defer lock.Unlock() defer wg.Done() @@ -286,7 +285,7 @@ func GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) ([]string, erro log.Printf("NSQD: querying %s", endpoint) go func(endpoint string) { - data, err := nsq.ApiRequest(endpoint) + data, err := util.ApiRequest(endpoint) lock.Lock() defer lock.Unlock() defer wg.Done() @@ -331,8 +330,7 @@ func GetNSQDStats(nsqdHTTPAddrs []string, selectedTopic string) ([]*TopicStats, log.Printf("NSQD: querying %s", endpoint) go func(endpoint string, addr string) { - data, err := nsq.ApiRequest(endpoint) - + data, err := util.ApiRequest(endpoint) lock.Lock() defer lock.Unlock() defer wg.Done() diff --git a/util/mock_conn.go b/util/mock_conn.go deleted file mode 100644 index fbf5a21b6..000000000 --- a/util/mock_conn.go +++ /dev/null @@ -1,35 +0,0 @@ -package util - -import ( - "io" - "net" - "time" -) - -type MockConn struct { - io.ReadWriter -} - -func (c MockConn) Close() error { - return nil -} - -func (c MockConn) LocalAddr() net.Addr { - return &net.TCPAddr{IP: net.IPv4zero} -} - -func (c MockConn) RemoteAddr() net.Addr { - return &net.TCPAddr{IP: net.IPv4zero} -} - -func (c MockConn) SetDeadline(t time.Time) error { - return nil -} - -func (c MockConn) SetReadDeadline(t time.Time) error { - return nil -} - -func (c MockConn) SetWriteDeadline(t time.Time) error { - return nil -} diff --git a/util/protocol.go b/util/protocol.go new file mode 100644 index 000000000..af929de23 --- /dev/null +++ b/util/protocol.go @@ -0,0 +1,50 @@ +package util + +import ( + "encoding/binary" + "io" + "net" +) + +// Protocol describes the basic behavior of any protocol in the system +type Protocol interface { + IOLoop(conn net.Conn) error +} + +// SendResponse is a server side utility function to prefix data with a length header +// and write to the supplied Writer +func SendResponse(w io.Writer, data []byte) (int, error) { + err := binary.Write(w, binary.BigEndian, int32(len(data))) + if err != nil { + return 0, err + } + + n, err := w.Write(data) + if err != nil { + return 0, err + } + + return (n + 4), nil +} + +// SendFramedResponse is a server side utility function to prefix data with a length header +// and frame header and write to the supplied Writer +func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error) { + beBuf := make([]byte, 4) + size := uint32(len(data)) + 4 + + binary.BigEndian.PutUint32(beBuf, size) + n, err := w.Write(beBuf) + if err != nil { + return n, err + } + + binary.BigEndian.PutUint32(beBuf, uint32(frameType)) + n, err = w.Write(beBuf) + if err != nil { + return n + 4, err + } + + n, err = w.Write(data) + return n + 8, err +} diff --git a/util/topic_channel_args.go b/util/topic_channel_args.go index f45e5bf30..8e2df674f 100644 --- a/util/topic_channel_args.go +++ b/util/topic_channel_args.go @@ -2,7 +2,7 @@ package util import ( "errors" - "github.com/bitly/nsq/nsq" + "github.com/bitly/go-nsq" ) type Getter interface {