diff --git a/Makefile b/Makefile index 908291e..2ce222b 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ lib_name := appoptics-go build: - go build -i -o $(lib_name) + go build -o $(lib_name) clean: rm $(lib_name) diff --git a/client.go b/client.go index eb2c6db..a47d998 100644 --- a/client.go +++ b/client.go @@ -4,20 +4,19 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io" + "io/ioutil" "net/http" "net/http/httputil" "net/url" "regexp" - - "fmt" - + "strconv" "time" log "github.com/sirupsen/logrus" ) - const ( // MeasurementPostMaxBatchSize defines the max number of Measurements to send to the API at once MeasurementPostMaxBatchSize = 1000 @@ -32,7 +31,7 @@ var ( regexpIllegalNameChars = regexp.MustCompile("[^A-Za-z0-9.:_-]") // from https://www.AppOptics.com/docs/api/#measurements // ErrBadStatus is returned if the AppOptics API returns a non-200 error code. ErrBadStatus = errors.New("Received non-OK status from AppOptics POST") - client = &http.Client{ + client = &http.Client{ Timeout: 30 * time.Second, } ) @@ -90,8 +89,8 @@ type ClientOption func(*Client) error func NewClient(token string, opts ...func(*Client) error) *Client { baseURL, _ := url.Parse(defaultBaseURL) c := &Client{ - token: token, - baseURL: baseURL, + token: token, + baseURL: baseURL, httpClient: client, } @@ -231,7 +230,6 @@ func (c *Client) SpacesService() SpacesCommunicator { return c.spacesService } - // DefaultPaginationParameters provides a *PaginationParameters with minimum required fields func (c *Client) DefaultPaginationParameters(length int) *PaginationParameters { return &PaginationParameters{ @@ -291,11 +289,10 @@ func checkError(resp *http.Response) error { errResponse.Status = resp.Status errResponse.Response = resp if resp.ContentLength != 0 { - decoder := json.NewDecoder(resp.Body) - err := decoder.Decode(errResponse) - + body, _ := ioutil.ReadAll(resp.Body) + err := json.Unmarshal(body, errResponse) if err != nil { - return err + errResponse.Errors = strconv.Quote(string(body)) } log.Debugf("error: %+v\n", errResponse) return errResponse @@ -310,7 +307,7 @@ func dumpResponse(resp *http.Response) { fmt.Printf("response status: %s\n", resp.Status) dump, err := httputil.DumpResponse(resp, true) - if err != nil{ + if err != nil { log.Printf("error dumping response: %s", err) return } @@ -318,13 +315,13 @@ func dumpResponse(resp *http.Response) { } // dumpRequest is a debugging function which dumps the HTTP request to stdout -func dumpRequest(req *http.Request) { - if req.Body == nil{ +func dumpRequest(req *http.Request) { + if req.Body == nil { return } - dump, err := httputil.DumpRequestOut(req, true); - if err != nil{ + dump, err := httputil.DumpRequestOut(req, true) + if err != nil { log.Printf("error dumping request: %s", err) return } diff --git a/go.mod b/go.mod index 9eeec43..543e24d 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,20 @@ module github.com/appoptics/appoptics-api-go -go 1.13 +go 1.15 require ( - github.com/davecgh/go-spew v1.1.0 - github.com/golang/protobuf v1.1.0 - github.com/gorilla/context v1.1.1 + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v1.0.0 // indirect + github.com/golang/protobuf v1.1.0 // indirect + github.com/gorilla/context v1.1.1 // indirect github.com/gorilla/mux v1.6.2 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/magiconair/properties v1.8.0 - github.com/pmezard/go-difflib v1.0.0 - github.com/sirupsen/logrus v1.0.6 + github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.2.2 - golang.org/x/crypto v0.0.0-20180802221240-56440b844dfe - golang.org/x/net v0.0.0-20180801234040-f4c29de78a2a - golang.org/x/sys v0.0.0-20180806082429-34b17bdb4300 - golang.org/x/text v0.3.0 - google.golang.org/genproto v0.0.0-20180731170733-daca94659cb5 + golang.org/x/net v0.0.0-20220121210141-e204ce36a2ba + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect + google.golang.org/genproto v0.0.0-20180731170733-daca94659cb5 // indirect google.golang.org/grpc v1.14.0 ) diff --git a/go.sum b/go.sum index af3aebd..5ee2c88 100644 --- a/go.sum +++ b/go.sum @@ -1,28 +1,66 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/protobuf v1.1.0 h1:0iH4Ffd/meGoXqF2lSAhZHt8X+cPgkfn/cb6Cce5Vpc= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.0.6 h1:hcP1GmhGigz/O7h1WVUM5KklBp1JoNS9FggWKdj/j3s= -github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -golang.org/x/crypto v0.0.0-20180802221240-56440b844dfe h1:APBCFlxGVQi3YDSHtTbNXRZhDEuz9rrnVPXZA4YbUx8= -golang.org/x/crypto v0.0.0-20180802221240-56440b844dfe/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/net v0.0.0-20180801234040-f4c29de78a2a h1:8fCF9zjAir2SP3N+axz9xs+0r4V8dqPzqsWO10t8zoo= -golang.org/x/net v0.0.0-20180801234040-f4c29de78a2a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/sys v0.0.0-20180806082429-34b17bdb4300 h1:eJa+6+7jje7fOYUrLnwKNR9kcpvLANj1Asw0Ou1pBiI= -golang.org/x/sys v0.0.0-20180806082429-34b17bdb4300/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20220121210141-e204ce36a2ba h1:6u6sik+bn/y7vILcYkK3iwTBWN7WtBvB0+SZswQnbf8= +golang.org/x/net v0.0.0-20220121210141-e204ce36a2ba/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20180731170733-daca94659cb5 h1:2PjFmwzH/sxgW9CRJDlEiwMHO8rOk1eMDzVL14HC1e4= google.golang.org/genproto v0.0.0-20180731170733-daca94659cb5/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.14.0 h1:ArxJuB1NWfPY6r9Gp9gqwplT0Ge7nqv9msgu03lHLmo= diff --git a/multi_reporter.go b/multi_reporter.go deleted file mode 100644 index fb89b5e..0000000 --- a/multi_reporter.go +++ /dev/null @@ -1,41 +0,0 @@ -package appoptics - -import ( - "math/rand" - "time" -) - -type MultiReporter struct { - measurementSet *MeasurementSet - reporters []*Reporter -} - -func NewMultiReporter(m *MeasurementSet, reporters []*Reporter) *MultiReporter { - return &MultiReporter{measurementSet: m, reporters: reporters} -} - -func (m *MultiReporter) Start() { - for _, r := range m.reporters { - go r.postMeasurementBatches() - } - - go m.flushReportsForever() -} - -func (m *MultiReporter) flushReport(report *MeasurementSetReport) { - for _, r := range m.reporters { - r.flushReport(report) - } -} - -func (m *MultiReporter) flushReportsForever() { - // Sleep for a random duration between 0 and outputMeasurementsInterval in order to randomize the counters output cycle. - time.Sleep(time.Duration(rand.Int63n(int64(outputMeasurementsInterval)))) - m.flushReport(m.measurementSet.Reset()) - - // After the initial random sleep, start a regular interval timer. This will output measurements at a consistent time - // modulo outputMeasurementsInterval. - for range time.Tick(outputMeasurementsInterval) { - m.flushReport(m.measurementSet.Reset()) - } -} diff --git a/reporter.go b/reporter.go index d800b6c..fec166c 100644 --- a/reporter.go +++ b/reporter.go @@ -1,73 +1,93 @@ package appoptics import ( - "math/rand" - "os" + "sync" "time" log "github.com/sirupsen/logrus" + "golang.org/x/net/context" ) const ( - outputMeasurementsIntervalSeconds = 15 - outputMeasurementsInterval = outputMeasurementsIntervalSeconds * time.Second - maxRetries = 3 - maxMeasurementsPerBatch = 1000 + maxMeasurementsPerBatch = 1000 ) // Reporter provides a way to persist data from a set collection of Aggregators and Counters at a regular interval type Reporter struct { - measurementSet *MeasurementSet - measurementsComm MeasurementsCommunicator - prefix string - - batchChan chan *MeasurementsBatch - measurementSetReports chan *MeasurementSetReport - - globalTags map[string]string + *ReporterOpts + batchChans []chan *MeasurementsBatch + stopOnce sync.Once + stopChan chan struct{} + stoppedWaitGroup sync.WaitGroup } // NewReporter returns a reporter for a given MeasurementSet, providing a way to sync metric information // to AppOptics for a collection of running metrics. -func NewReporter(measurementSet *MeasurementSet, communicator MeasurementsCommunicator, prefix string) *Reporter { +func NewReporter(optFns ...ReporterOptsFn) *Reporter { + opts := &ReporterOpts{} + *opts = defaultReporterOpts + withDefaultGlobalTags(opts) + for _, optFn := range optFns { + optFn(opts) + } + if opts.MeasurementSet == nil { + opts.MeasurementSet = NewMeasurementSet() + } r := &Reporter{ - measurementSet: measurementSet, - measurementsComm: communicator, - prefix: prefix, - batchChan: make(chan *MeasurementsBatch, 100), - measurementSetReports: make(chan *MeasurementSetReport, 1000), + ReporterOpts: opts, + stopChan: make(chan struct{}), + } + for range r.measurementsComms { + r.batchChans = append(r.batchChans, make(chan *MeasurementsBatch, 10)) } - r.initGlobalTags() return r } // Start kicks off two goroutines that help batch and report metrics measurements to AppOptics. func (r *Reporter) Start() { - go r.postMeasurementBatches() + for workerIndex := range r.measurementsComms { + go r.postMeasurementBatches(workerIndex) + } go r.flushReportsForever() } -func (r *Reporter) initGlobalTags() { - hostname, err := os.Hostname() - if err != nil { - hostname = "na" - } - r.globalTags = map[string]string{ - "hostname": hostname + os.Getenv("HOST_SUFFIX"), +// Close forces an immediate flush of the metrics and stops further reporting. +func (r *Reporter) Close(ctx context.Context) error { + // Notify the flushReportsForever worker that it should exit. + r.stopOnce.Do(func() { + close(r.stopChan) + }) + r.stoppedWaitGroup.Add(len(r.measurementsComms)) + // Wait for all postMeasurementBatches workers to return. + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + allDone := make(chan struct{}) + go func() { + defer close(allDone) + r.stoppedWaitGroup.Wait() + }() + select { + case <-allDone: + return nil + case <-ctx.Done(): + return ctx.Err() } } -func (r *Reporter) postMeasurementBatches() { - for batch := range r.batchChan { +func (r *Reporter) postMeasurementBatches(workerIndex int) { + defer r.stoppedWaitGroup.Done() + batchChan := r.batchChans[workerIndex] + measurementsComm := r.measurementsComms[workerIndex] + for batch := range batchChan { tryCount := 0 for { log.Debug("Uploading AppOptics measurements batch", "time", time.Unix(batch.Time, 0), "numMeasurements", len(batch.Measurements), "globalTags", r.globalTags) - _, err := r.measurementsComm.Create(batch) + _, err := measurementsComm.Create(batch) if err == nil { break } tryCount++ - aborting := tryCount == maxRetries + aborting := tryCount == r.maxPostRetries log.Error("Error uploading AppOptics measurements batch", "err", err, "tryCount", tryCount, "aborting", aborting) if aborting { break @@ -76,34 +96,33 @@ func (r *Reporter) postMeasurementBatches() { } } -func (r *Reporter) flushReport(report *MeasurementSetReport) { - batchTimeUnixSecs := (time.Now().Unix() / outputMeasurementsIntervalSeconds) * outputMeasurementsIntervalSeconds - +func (r *Reporter) flushReport(report *MeasurementSetReport, reportTime time.Time) { var batch *MeasurementsBatch resetBatch := func() { batch = &MeasurementsBatch{ - Time: batchTimeUnixSecs, - Period: outputMeasurementsIntervalSeconds, + Time: reportTime.Unix(), + Period: int64(r.period / time.Second), } } flushBatch := func() { - r.batchChan <- batch + for _, batchChan := range r.batchChans { + batchChan <- batch + } } addMeasurement := func(measurement Measurement) { batch.Measurements = append(batch.Measurements, measurement) - // AppOptics API docs advise sending very large numbers of metrics in multiple HTTP requests; so we'll flush - // batches of 500 measurements at a time. + // AppOptics API docs advise sending very large numbers of metrics in multiple HTTP requests; so we'll limit each + // request to a batch of 1000 measurements. if len(batch.Measurements) >= maxMeasurementsPerBatch { flushBatch() resetBatch() } } resetBatch() - report.Counts["num_measurements"] = int64(len(report.Counts)) + int64(len(report.Aggregators)) + 1 for key, value := range report.Counts { metricName, tags := parseMeasurementKey(key) m := Measurement{ - Name: r.prefix + regexpIllegalNameChars.ReplaceAllString(metricName, "_"), + Name: r.metricPrefix + regexpIllegalNameChars.ReplaceAllString(metricName, "_"), Tags: r.mergeGlobalTags(tags), } if value != 0 { @@ -115,15 +134,11 @@ func (r *Reporter) flushReport(report *MeasurementSetReport) { for key, agg := range report.Aggregators { metricName, tags := parseMeasurementKey(key) m := Measurement{ - Name: r.prefix + regexpIllegalNameChars.ReplaceAllString(metricName, "_"), + Name: r.metricPrefix + regexpIllegalNameChars.ReplaceAllString(metricName, "_"), Tags: r.mergeGlobalTags(tags), } - if agg.Sum != 0 { - m.Sum = agg.Sum - } - if agg.Count != 0 { - m.Count = agg.Count - } + m.Sum = agg.Sum + m.Count = agg.Count if agg.Min != 0 { m.Min = agg.Min } @@ -141,15 +156,25 @@ func (r *Reporter) flushReport(report *MeasurementSetReport) { } func (r *Reporter) flushReportsForever() { - // Sleep for a random duration between 0 and outputMeasurementsInterval in order to randomize the counters output cycle. - time.Sleep(time.Duration(rand.Int63n(int64(outputMeasurementsInterval)))) - report := r.measurementSet.Reset() - r.flushReport(report) - // After the initial random sleep, start a regular interval timer. This will output measurements at a consistent time - // modulo outputMeasurementsInterval. - for range time.Tick(outputMeasurementsInterval) { - report := r.measurementSet.Reset() - r.flushReport(report) + defer func() { + for _, batchChan := range r.batchChans { + close(batchChan) + } + }() + shutdown := false + for !shutdown { + // Sleep until the beginning of the next reporting period. + now := time.Now() + nextInterval := now.Truncate(r.period).Add(r.period) + select { + case <-time.After(nextInterval.Sub(now)): + case <-r.stopChan: + shutdown = true + } + report := r.MeasurementSet.Reset() + if len(report.Aggregators) > 0 || len(report.Counts) > 0 { + r.flushReport(report, nextInterval) + } } } diff --git a/reporter_opts.go b/reporter_opts.go new file mode 100644 index 0000000..6baa401 --- /dev/null +++ b/reporter_opts.go @@ -0,0 +1,90 @@ +package appoptics + +import ( + "os" + "strings" + "time" +) + +// ReporterOpts is a container for Reporter configuration. This struct should not +// be configured directly; instead, users should pass a slice of OptsFn to NewReporter. +type ReporterOpts struct { + MeasurementSet *MeasurementSet + measurementsComms []MeasurementsCommunicator + period time.Duration + metricPrefix string + globalTags map[string]string + maxPostRetries int +} + +var defaultReporterOpts = ReporterOpts{ + period: time.Minute, + maxPostRetries: 3, +} + +// ReporterOptsFn is a mutator to configure an aspect of the Opts struct. This interface +// is implemented by the return values of the various ReporterWithXX functions in this package. +type ReporterOptsFn func(*ReporterOpts) + +func withDefaultGlobalTags(opts *ReporterOpts) { + hostname, err := os.Hostname() + if err != nil { + hostname = "na" + } + opts.globalTags = map[string]string{ + "hostname": hostname + os.Getenv("HOST_SUFFIX"), + } +} + +// ReporterWithMeasurementSet sets the MeasurementSet for a Reporter. +// +// If no MeasurementSet is set, a new one will be created and can be retrieved via +// `reporter.MeasurementSet`. +func ReporterWithMeasurementSet(measurementSet *MeasurementSet) ReporterOptsFn { + return func(opts *ReporterOpts) { + opts.MeasurementSet = measurementSet + } +} + +// ReporterWithMeasurementsCommunicator adds a MeasurementsCommunicator to the Reporter. +// Multiple communicators can be attached to a single Reporter, and they each receive +// identical batches of events. +func ReporterWithMeasurementsCommunicator(communicator MeasurementsCommunicator) ReporterOptsFn { + return func(opts *ReporterOpts) { + opts.measurementsComms = append(opts.measurementsComms, communicator) + } +} + +// ReporterWithMetricNamespace sets the metric namespace for metrics emitted by the Reporter. +func ReporterWithMetricNamespace(metricNamespace string) ReporterOptsFn { + return func(opts *ReporterOpts) { + opts.metricPrefix = metricNamespace + if opts.metricPrefix != "" && !strings.HasSuffix(opts.metricPrefix, ".") { + opts.metricPrefix += "." + } + } +} + +// ReporterWithReportingPeriod sets the interval at which the Reporter emits metrics. +func ReporterWithReportingPeriod(period time.Duration) ReporterOptsFn { + return func(opts *ReporterOpts) { + opts.period = period + } +} + +// ReporterWithGlobalTags sets a global set of tags to apply to all metrics emitted +// by the reporter. These tags may be overridden by individual metrics that contain +// a tag with a matching key. +func ReporterWithGlobalTags(globalTags map[string]string) ReporterOptsFn { + return func(opts *ReporterOpts) { + opts.globalTags = globalTags + } +} + +// ReporterWithMaxPostRetries sets the maximum number of retries to attempt for +// each batch of metrics. +func ReporterWithMaxPostRetries(maxPostRetries int) ReporterOptsFn { + return func(opts *ReporterOpts) { + opts.maxPostRetries = maxPostRetries + } +}