Skip to content

Commit

Permalink
Merge pull request #555 from mreiferson/internal_pkg_reorg_555
Browse files Browse the repository at this point in the history
internal pkg reorg
  • Loading branch information
jehiah committed Feb 27, 2015
2 parents d9193b6 + ed21587 commit 77a46db
Show file tree
Hide file tree
Showing 63 changed files with 722 additions and 849 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ github.com/bmizerany/perks/quantile 6cb9d9d729303ee2628580d9aec5db968da3a607
github.com/mreiferson/go-options 2cf7eb1fdd83e2bb3375fef6fdadb04c3ad564da
github.com/mreiferson/go-snappystream 307a466b220aaf34bcee2d19c605ed9e96b4bcdb # v0.2.0
github.com/bitly/timer_metrics afad1794bb13e2a094720aeb27c088aa64564895
github.com/blang/semver 9bf7bff48b0388cb75991e58c6df7d13e982f1f2
16 changes: 9 additions & 7 deletions apps/nsq_pubsub/nsq_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ import (
"time"

"github.com/bitly/go-nsq"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/http_api"
"github.com/bitly/nsq/internal/version"
)

var (
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http-address", "0.0.0.0:8080", "<addr>:<port> to listen on for HTTP clients")
maxInFlight = flag.Int("max-in-flight", 100, "max number of messages to allow in flight")
nsqdTCPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
)

func init() {
Expand Down Expand Up @@ -119,13 +121,13 @@ func (s *StreamServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

reqParams, err := util.NewReqParams(req)
reqParams, err := http_api.NewReqParams(req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

topicName, channelName, err := util.GetTopicChannelArgs(reqParams)
topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -143,7 +145,7 @@ func (s *StreamServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_pubsub/%s go-nsq/%s", util.BinaryVersion, nsq.VERSION)
cfg.UserAgent = fmt.Sprintf("nsq_pubsub/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight
r, err := nsq.NewConsumer(topicName, channelName, cfg)
if err != nil {
Expand Down Expand Up @@ -221,7 +223,7 @@ func main() {
flag.Parse()

if *showVersion {
fmt.Printf("nsq_pubsub v%s\n", util.BinaryVersion)
fmt.Printf("nsq_pubsub v%s\n", version.Binary)
return
}

Expand Down
9 changes: 5 additions & 4 deletions apps/nsq_stat/nsq_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
"syscall"
"time"

"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/lookupd"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/version"
)

var (
Expand All @@ -27,8 +28,8 @@ var (
statusEvery = flag.Duration("status-every", -1, "(deprecated) duration of time between polling/printing output")
interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
countNum = numValue{}
nsqdHTTPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
nsqdHTTPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
)

type numValue struct {
Expand Down Expand Up @@ -134,7 +135,7 @@ func main() {
flag.Parse()

if *showVersion {
fmt.Printf("nsq_stat v%s\n", util.BinaryVersion)
fmt.Printf("nsq_stat v%s\n", version.Binary)
return
}

Expand Down
15 changes: 8 additions & 7 deletions apps/nsq_tail/nsq_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"time"

"github.com/bitly/go-nsq"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/version"
)

var (
Expand All @@ -22,9 +23,9 @@ var (
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
totalMessages = flag.Int("n", 0, "total messages to show (will wait if starved)")

consumerOpts = util.StringArray{}
nsqdTCPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
consumerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
)

func init() {
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
flag.Parse()

if *showVersion {
fmt.Printf("nsq_tail v%s\n", util.BinaryVersion)
fmt.Printf("nsq_tail v%s\n", version.Binary)
return
}

Expand Down Expand Up @@ -90,8 +91,8 @@ func main() {
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_tail/%s go-nsq/%s", util.BinaryVersion, nsq.VERSION)
err := util.ParseOpts(cfg, consumerOpts)
cfg.UserAgent = fmt.Sprintf("nsq_tail/%s go-nsq/%s", version.Binary, nsq.VERSION)
err := app.ParseOpts(cfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
Expand Down
17 changes: 9 additions & 8 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"time"

"github.com/bitly/go-nsq"
"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/lookupd"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/version"
)

var (
Expand All @@ -43,10 +44,10 @@ var (
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

consumerOpts = util.StringArray{}
nsqdTCPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
topics = util.StringArray{}
consumerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}

// TODO: remove, deprecated
gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)")
Expand Down Expand Up @@ -366,8 +367,8 @@ func newConsumerFileLogger(topic string) (*ConsumerFileLogger, error) {
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", util.BinaryVersion, nsq.VERSION)
err = util.ParseOpts(cfg, consumerOpts)
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
err = app.ParseOpts(cfg, consumerOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -468,7 +469,7 @@ func main() {
flag.Parse()

if *showVersion {
fmt.Printf("nsq_to_file v%s\n", util.BinaryVersion)
fmt.Printf("nsq_to_file v%s\n", version.Binary)
return
}

Expand Down
7 changes: 4 additions & 3 deletions apps/nsq_to_http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"fmt"
"net/http"

"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/http_api"
"github.com/bitly/nsq/internal/version"
)

var httpclient *http.Client
var userAgent string

func init() {
httpclient = &http.Client{Transport: util.NewDeadlineTransport(*httpTimeout)}
userAgent = fmt.Sprintf("nsq_to_http v%s", util.BinaryVersion)
httpclient = &http.Client{Transport: http_api.NewDeadlineTransport(*httpTimeout)}
userAgent = fmt.Sprintf("nsq_to_http v%s", version.Binary)
}

func HTTPGet(endpoint string) (*http.Response, error) {
Expand Down
23 changes: 12 additions & 11 deletions apps/nsq_to_http/nsq_to_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (

"github.com/bitly/go-hostpool"
"github.com/bitly/go-nsq"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/version"
"github.com/bitly/timer_metrics"
)

Expand All @@ -45,11 +46,11 @@ var (
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables")
contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests")

consumerOpts = util.StringArray{}
getAddrs = util.StringArray{}
postAddrs = util.StringArray{}
nsqdTCPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
consumerOpts = app.StringArray{}
getAddrs = app.StringArray{}
postAddrs = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}

// TODO: remove, deprecated
roundRobin = flag.Bool("round-robin", false, "(deprecated) use --mode=round-robin, enable round robin mode")
Expand Down Expand Up @@ -78,7 +79,7 @@ type PublishHandler struct {
counter uint64

Publisher
addresses util.StringArray
addresses app.StringArray
mode int
hostPool hostpool.HostPool

Expand Down Expand Up @@ -171,13 +172,13 @@ func hasArg(s string) bool {

func main() {
var publisher Publisher
var addresses util.StringArray
var addresses app.StringArray
var selectedMode int

flag.Parse()

if *showVersion {
fmt.Printf("nsq_to_http v%s\n", util.BinaryVersion)
fmt.Printf("nsq_to_http v%s\n", version.Binary)
return
}

Expand Down Expand Up @@ -259,8 +260,8 @@ func main() {
}

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_to_http/%s go-nsq/%s", util.BinaryVersion, nsq.VERSION)
err := util.ParseOpts(cfg, consumerOpts)
cfg.UserAgent = fmt.Sprintf("nsq_to_http/%s go-nsq/%s", version.Binary, nsq.VERSION)
err := app.ParseOpts(cfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
Expand Down
32 changes: 17 additions & 15 deletions apps/nsq_to_nsq/nsq_to_nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/bitly/go-hostpool"
"github.com/bitly/go-nsq"
"github.com/bitly/go-simplejson"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/protocol"
"github.com/bitly/nsq/internal/version"
"github.com/bitly/timer_metrics"
)

Expand All @@ -40,12 +42,12 @@ var (
statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per destination), 0 disables")
mode = flag.String("mode", "round-robin", "the upstream request mode options: round-robin (default), hostpool")

consumerOpts = util.StringArray{}
producerOpts = util.StringArray{}
nsqdTCPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
destNsqdTCPAddrs = util.StringArray{}
whitelistJSONFields = util.StringArray{}
consumerOpts = app.StringArray{}
producerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
destNsqdTCPAddrs = app.StringArray{}
whitelistJSONFields = app.StringArray{}

requireJSONField = flag.String("require-json-field", "", "for JSON messages: only pass messages that contain this field")
requireJSONValue = flag.String("require-json-value", "", "for JSON messages: only pass messages in which the required field has this value")
Expand All @@ -71,7 +73,7 @@ type PublishHandler struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
counter uint64

addresses util.StringArray
addresses app.StringArray
producers map[string]*nsq.Producer
mode int
hostPool hostpool.HostPool
Expand Down Expand Up @@ -275,7 +277,7 @@ func main() {
flag.Parse()

if *showVersion {
fmt.Printf("nsq_to_nsq v%s\n", util.BinaryVersion)
fmt.Printf("nsq_to_nsq v%s\n", version.Binary)
return
}

Expand All @@ -287,15 +289,15 @@ func main() {
*destTopic = *topic
}

if !util.IsValidTopicName(*topic) {
if !protocol.IsValidTopicName(*topic) {
log.Fatal("--topic is invalid")
}

if !util.IsValidTopicName(*destTopic) {
if !protocol.IsValidTopicName(*destTopic) {
log.Fatal("--destination-topic is invalid")
}

if !util.IsValidChannelName(*channel) {
if !protocol.IsValidChannelName(*channel) {
log.Fatal("--channel is invalid")
}

Expand All @@ -320,11 +322,11 @@ func main() {
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

defaultUA := fmt.Sprintf("nsq_to_nsq/%s go-nsq/%s", util.BinaryVersion, nsq.VERSION)
defaultUA := fmt.Sprintf("nsq_to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION)

cCfg := nsq.NewConfig()
cCfg.UserAgent = defaultUA
err := util.ParseOpts(cCfg, consumerOpts)
err := app.ParseOpts(cCfg, consumerOpts)
if err != nil {
log.Fatal(err)
}
Expand All @@ -339,7 +341,7 @@ func main() {
pCfg := nsq.NewConfig()
pCfg.UserAgent = defaultUA

err = util.ParseOpts(pCfg, producerOpts)
err = app.ParseOpts(pCfg, producerOpts)
if err != nil {
log.Fatal(err)
}
Expand Down
11 changes: 6 additions & 5 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/bitly/nsq/internal/util"
"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/version"
"github.com/bitly/nsq/nsqd"
"github.com/mreiferson/go-options"
)
Expand Down Expand Up @@ -85,11 +86,11 @@ func nsqFlagset() *flag.FlagSet {
flagSet.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
flagSet.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")

authHTTPAddresses := util.StringArray{}
authHTTPAddresses := app.StringArray{}
flagSet.Var(&authHTTPAddresses, "auth-http-address", "<addr>:<port> to query auth server (may be given multiple times)")

flagSet.String("broadcast-address", "", "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs := util.StringArray{}
lookupdTCPAddrs := app.StringArray{}
flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")

// diskqueue options
Expand Down Expand Up @@ -121,7 +122,7 @@ func nsqFlagset() *flag.FlagSet {
flagSet.String("statsd-prefix", "nsq.%s", "prefix used for keys sent to statsd (%s for host replacement)")

// End to end percentile flags
e2eProcessingLatencyPercentiles := util.FloatArray{}
e2eProcessingLatencyPercentiles := app.FloatArray{}
flagSet.Var(&e2eProcessingLatencyPercentiles, "e2e-processing-latency-percentile", "message processing time percentiles to keep track of (can be specified multiple times or comma separated, default none)")
flagSet.Duration("e2e-processing-latency-window-time", 10*time.Minute, "calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds)")

Expand Down Expand Up @@ -181,7 +182,7 @@ func main() {
rand.Seed(time.Now().UTC().UnixNano())

if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(util.Version("nsqd"))
fmt.Println(version.String("nsqd"))
return
}

Expand Down
Loading

0 comments on commit 77a46db

Please sign in to comment.