Skip to content

Commit 5032cca

Browse files
committed
Multitenant mode: fetch live data from collectors
Collectors hold recent reports in memory. When querier needs 'live' data, fetch it from collectors instead of from the long-term store. Send reports from collector to querier in msgpack; disable compression on REST call, otherwise Go silently decompresses, which takes longer.
1 parent 667daef commit 5032cca

File tree

6 files changed

+143
-5
lines changed

6 files changed

+143
-5
lines changed

app/api_report.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func makeRawReportHandler(rep Reporter) CtxHandlerFunc {
1919
return
2020
}
2121
censorCfg := report.GetCensorConfigFromRequest(r)
22-
respondWith(ctx, w, http.StatusOK, report.CensorRawReport(rawReport, censorCfg))
22+
respondWithReport(ctx, w, r, report.CensorRawReport(rawReport, censorCfg))
2323
}
2424
}
2525

app/multitenant/aws_collector.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -242,11 +242,17 @@ func (c *awsCollector) flushPending(ctx context.Context) {
242242
entry := value.(*pendingEntry)
243243

244244
entry.Lock()
245-
rpt, count := entry.report, entry.count
246-
entry.report, entry.count = report.MakeReport(), 0
245+
rpt := entry.report
246+
entry.report = nil
247+
if entry.older == nil {
248+
entry.older = make([]*report.Report, c.cfg.Window/c.cfg.StoreInterval)
249+
} else {
250+
copy(entry.older[1:], entry.older) // move everything down one
251+
}
252+
entry.older[0] = rpt
247253
entry.Unlock()
248254

249-
if count > 0 {
255+
if rpt != nil {
250256
// serialise reports on one goroutine to limit CPU usage
251257
buf, err := rpt.WriteBinary()
252258
if err != nil {
@@ -459,6 +465,8 @@ func (c *awsCollector) massageReport(userid string, report report.Report) report
459465
return report
460466
}
461467

468+
// If we are running as a Query service, fetch data and merge into a report
469+
// If we are running as a Collector and the request is for live data, merge in-memory data and return
462470
func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
463471
span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report")
464472
defer span.Finish()
@@ -468,7 +476,11 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.
468476
}
469477
span.SetTag("userid", userid)
470478
var reports []report.Report
471-
reports, err = c.reportsFromStore(ctx, userid, timestamp)
479+
if time.Since(timestamp) < c.cfg.Window {
480+
reports, err = c.reportsFromLive(ctx, userid)
481+
} else {
482+
reports, err = c.reportsFromStore(ctx, userid, timestamp)
483+
}
472484
if err != nil {
473485
return report.MakeReport(), err
474486
}

app/multitenant/collector.go

+100
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,27 @@ package multitenant
33
// Collect reports from probes per-tenant, and supply them to queriers on demand
44

55
import (
6+
"fmt"
7+
"io"
8+
"net"
9+
"net/http"
10+
"strconv"
611
"sync"
712

813
"context"
914

15+
"github.com/opentracing-contrib/go-stdlib/nethttp"
16+
opentracing "github.com/opentracing/opentracing-go"
17+
log "github.com/sirupsen/logrus"
18+
"github.com/weaveworks/common/user"
1019
"github.com/weaveworks/scope/report"
1120
)
1221

1322
// if StoreInterval is set, reports are merged into here and held until flushed to store
1423
type pendingEntry struct {
1524
sync.Mutex
1625
report *report.Report
26+
older []*report.Report
1727
}
1828

1929
// We are building up a report in memory; merge into that and it will be saved shortly
@@ -32,3 +42,93 @@ func (c *awsCollector) addToLive(ctx context.Context, userid string, rep report.
3242
entry.Unlock()
3343
}
3444

45+
func (c *awsCollector) reportsFromLive(ctx context.Context, userid string) ([]report.Report, error) {
46+
span, ctx := opentracing.StartSpanFromContext(ctx, "reportsFromLive")
47+
defer span.Finish()
48+
if c.cfg.StoreInterval != 0 {
49+
// We are a collector
50+
e, found := c.pending.Load(userid)
51+
if !found {
52+
return nil, nil
53+
}
54+
entry := e.(*pendingEntry)
55+
entry.Lock()
56+
ret := make([]report.Report, 0, len(entry.older)+1)
57+
if entry.report != nil {
58+
ret = append(ret, entry.report.Copy()) // Copy contents because this report is being unsafe-merged to
59+
}
60+
for _, v := range entry.older {
61+
if v != nil {
62+
ret = append(ret, *v) // no copy because older reports are immutable
63+
}
64+
}
65+
entry.Unlock()
66+
return ret, nil
67+
}
68+
69+
// We are a querier: fetch the most up-to-date reports from collectors
70+
// TODO: resolve c.collectorAddress periodically instead of every time we make a call
71+
addrs := resolve(c.cfg.CollectorAddr)
72+
ret := make([]report.Report, 0, len(addrs))
73+
// make a call to each collector and fetch its data for this userid
74+
// TODO: do them in parallel
75+
for _, addr := range addrs {
76+
body, err := oneCall(ctx, addr, "/api/report", userid)
77+
if err != nil {
78+
log.Warnf("error calling '%s': %v", addr, err)
79+
continue
80+
}
81+
rpt, err := report.MakeFromBinary(ctx, body, false, true)
82+
body.Close()
83+
if err != nil {
84+
log.Warnf("error decoding: %v", err)
85+
continue
86+
}
87+
ret = append(ret, *rpt)
88+
}
89+
90+
return ret, nil
91+
}
92+
93+
func resolve(name string) []string {
94+
_, addrs, err := net.LookupSRV("", "", name)
95+
if err != nil {
96+
log.Warnf("Cannot resolve '%s': %v", name, err)
97+
return []string{}
98+
}
99+
endpoints := make([]string, 0, len(addrs))
100+
for _, addr := range addrs {
101+
port := strconv.Itoa(int(addr.Port))
102+
endpoints = append(endpoints, net.JoinHostPort(addr.Target, port))
103+
}
104+
return endpoints
105+
}
106+
107+
func oneCall(ctx context.Context, endpoint, path, userid string) (io.ReadCloser, error) {
108+
fullPath := "http://" + endpoint + path
109+
req, err := http.NewRequest("GET", fullPath, nil)
110+
if err != nil {
111+
return nil, fmt.Errorf("error making request %s: %w", fullPath, err)
112+
}
113+
req = req.WithContext(ctx)
114+
req.Header.Set(user.OrgIDHeaderName, userid)
115+
req.Header.Set("Accept", "application/msgpack")
116+
req.Header.Set("Accept-Encoding", "identity") // disable compression
117+
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
118+
var ht *nethttp.Tracer
119+
req, ht = nethttp.TraceRequest(parentSpan.Tracer(), req, nethttp.OperationName("Collector Fetch"))
120+
defer ht.Finish()
121+
}
122+
client := &http.Client{Transport: &nethttp.Transport{}}
123+
res, err := client.Do(req)
124+
if err != nil {
125+
return nil, fmt.Errorf("error getting %s: %w", fullPath, err)
126+
}
127+
if res.StatusCode != http.StatusOK {
128+
content, _ := io.ReadAll(res.Body)
129+
res.Body.Close()
130+
return nil, fmt.Errorf("error from collector: %s (%s)", res.Status, string(content))
131+
}
132+
133+
return res.Body, nil
134+
}

app/server_helpers.go

+24
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package app
33
import (
44
"context"
55
"net/http"
6+
"strings"
67

78
opentracing "github.com/opentracing/opentracing-go"
89
"github.com/ugorji/go/codec"
@@ -33,3 +34,26 @@ func respondWith(ctx context.Context, w http.ResponseWriter, code int, response
3334
log.Errorf("Error encoding response: %v", err)
3435
}
3536
}
37+
38+
// Similar to the above function, but respect the request's Accept header.
39+
// Possibly we should do a complete parse of Accept, but for now just rudimentary check
40+
func respondWithReport(ctx context.Context, w http.ResponseWriter, req *http.Request, response interface{}) {
41+
accept := req.Header.Get("Accept")
42+
if strings.HasPrefix(accept, "application/msgpack") {
43+
w.Header().Set("Content-Type", "application/msgpack")
44+
w.WriteHeader(http.StatusOK)
45+
encoder := codec.NewEncoder(w, &codec.MsgpackHandle{})
46+
if err := encoder.Encode(response); err != nil {
47+
log.Errorf("Error encoding response: %v", err)
48+
}
49+
return
50+
}
51+
52+
w.Header().Set("Content-Type", "application/json")
53+
w.Header().Add("Cache-Control", "no-cache")
54+
w.WriteHeader(http.StatusOK)
55+
encoder := codec.NewEncoder(w, &codec.JsonHandle{})
56+
if err := encoder.Encode(response); err != nil {
57+
log.Errorf("Error encoding response: %v", err)
58+
}
59+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ require (
5252
github.com/nats-io/nuid v0.0.0-20160402145409-a5152d67cf63 // indirect
5353
github.com/opencontainers/runc v1.0.0-rc5 // indirect
5454
github.com/openebs/k8s-snapshot-client v0.0.0-20180831100134-a6506305fb16
55+
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
5556
github.com/opentracing/opentracing-go v1.1.0
5657
github.com/paypal/ionet v0.0.0-20130919195445-ed0aaebc5417
5758
github.com/pborman/uuid v0.0.0-20150824212802-cccd189d45f7

vendor/modules.txt

+1
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ github.com/openebs/k8s-snapshot-client/snapshot/pkg/client/clientset/versioned
293293
github.com/openebs/k8s-snapshot-client/snapshot/pkg/client/clientset/versioned/scheme
294294
github.com/openebs/k8s-snapshot-client/snapshot/pkg/client/clientset/versioned/typed/volumesnapshot/v1
295295
# github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
296+
## explicit
296297
github.com/opentracing-contrib/go-stdlib/nethttp
297298
# github.com/opentracing/opentracing-go v1.1.0
298299
## explicit

0 commit comments

Comments
 (0)