From 778427ffedbc5105379c1477fd6d6b87d849efb5 Mon Sep 17 00:00:00 2001 From: joshmeradna Date: Thu, 6 Jul 2023 08:04:59 -0400 Subject: [PATCH 01/11] parralelize import read and push --- plugins/metrics/pkg/agent/buffer.go | 22 ++++ plugins/metrics/pkg/agent/runner.go | 190 ++++++++++++++++++---------- 2 files changed, 143 insertions(+), 69 deletions(-) create mode 100644 plugins/metrics/pkg/agent/buffer.go diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go new file mode 100644 index 0000000000..a400bf541f --- /dev/null +++ b/plugins/metrics/pkg/agent/buffer.go @@ -0,0 +1,22 @@ +package agent + +type Buffer[T any] interface { + // Add blo cks until the value can be added to the buffer. + Add(T) error + + // Get blocks until a value can be retrieved from the buffer. + Get() (T, error) +} + +type memoryBuffer[T any] struct { + ch chan T +} + +func (b memoryBuffer[T]) Add(t T) error { + b.ch <- t + return nil +} + +func (b memoryBuffer[T]) Get() (T, error) { + return <-b.ch, nil +} diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 5f58d7cea9..09eedcc001 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -15,8 +15,10 @@ import ( "github.com/rancher/opni/pkg/storage" "github.com/rancher/opni/pkg/task" "github.com/rancher/opni/pkg/util" + "github.com/rancher/opni/pkg/util/waitctx" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" + "github.com/samber/lo" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" @@ -54,16 +56,6 @@ func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.Label return pbLabelMatchers } -func dereferenceResultTimeseries(in []*prompb.TimeSeries) []prompb.TimeSeries { - dereferenced := make([]prompb.TimeSeries, 0, len(in)) - - for _, ref := range in { - dereferenced = append(dereferenced, *ref) - } - - return dereferenced -} - func getMessageFromTaskLogs(logs []*corev1.LogEntry) string { if len(logs) == 0 { return "" @@ -84,6 +76,12 @@ type TargetRunMetadata struct { Query *remoteread.Query } +// todo: could probably find a better name for this +type ReadMetadata struct { + Query *prompb.Query + Request *prompb.WriteRequest +} + type targetStore struct { innerMu sync.RWMutex inner map[string]*corev1.TaskStatus @@ -203,94 +201,148 @@ func (tr *taskRunner) doPush(ctx context.Context, writeRequest *prompb.WriteRequ } } -func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { - limit := util.DefaultWriteLimit() - run := &TargetRunMetadata{} - activeTask.LoadTaskMetadata(run) +func (tr *taskRunner) doRead(ctx context.Context, run *TargetRunMetadata, readRequest *prompb.ReadRequest) (*prompb.ReadResponse, error) { + expbackoff := tr.backoffPolicy.Start(ctx) - labelMatchers := toLabelMatchers(run.Query.Matchers) + for { + select { + case <-expbackoff.Done(): + return nil, ctx.Err() + case <-expbackoff.Next(): + tr.remoteReaderMu.Lock() + readResponse, err := tr.remoteReader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest) + tr.remoteReaderMu.Unlock() + + // todo: check if recoverable + if err != nil { + return nil, fmt.Errorf("failed to read from target endpoint: %w", err) + } - importEnd := run.Query.EndTimestamp.AsTime().UnixMilli() - nextStart := run.Query.StartTimestamp.AsTime().UnixMilli() - nextEnd := nextStart + return readResponse, nil + } + } +} - progressDelta := nextStart +func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { + limit := util.DefaultWriteLimit() - progress := &corev1.Progress{ - Current: 0, - Total: uint64(importEnd - progressDelta), + b := &memoryBuffer[ReadMetadata]{ + ch: make(chan ReadMetadata), } - activeTask.SetProgress(progress) + writeChan := make(chan *ReadMetadata) - activeTask.AddLogEntry(zapcore.InfoLevel, "import running") + wc := waitctx.FromContext(ctx) - for nextStart < importEnd { - select { - case <-ctx.Done(): - activeTask.AddLogEntry(zapcore.InfoLevel, "import stopped") - return ctx.Err() - default: // continue with import - } + run := &TargetRunMetadata{} + activeTask.LoadTaskMetadata(run) - nextStart = nextEnd - nextEnd = nextStart + TimeDeltaMillis + waitctx.Go(wc, func() { + labelMatchers := toLabelMatchers(run.Query.Matchers) - if nextStart >= importEnd { - break - } + importEnd := run.Query.EndTimestamp.AsTime().UnixMilli() + nextStart := run.Query.StartTimestamp.AsTime().UnixMilli() + nextEnd := nextStart - if nextEnd >= importEnd { - nextEnd = importEnd - } + activeTask.AddLogEntry(zapcore.InfoLevel, "import read running") - readRequest := &prompb.ReadRequest{ - Queries: []*prompb.Query{ - { - StartTimestampMs: nextStart, - EndTimestampMs: nextEnd, - Matchers: labelMatchers, - }, - }, - } + for nextStart < importEnd { + select { + case <-ctx.Done(): + activeTask.AddLogEntry(zapcore.InfoLevel, "import read stopped") + return + default: // continue with import + } - readResponse, err := tr.remoteReader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest) + nextStart = nextEnd + nextEnd = nextStart + TimeDeltaMillis - if err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("failed to read from target endpoint: %s", err)) - return fmt.Errorf("failed to read from target endpoint: %w", err) - } + if nextStart >= importEnd { + break + } - for _, result := range readResponse.Results { - if len(result.Timeseries) == 0 { - continue + if nextEnd >= importEnd { + nextEnd = importEnd } - writeRequest := prompb.WriteRequest{ - Timeseries: dereferenceResultTimeseries(result.Timeseries), + readRequest := &prompb.ReadRequest{ + Queries: []*prompb.Query{ + { + StartTimestampMs: nextStart, + EndTimestampMs: nextEnd, + Matchers: labelMatchers, + }, + }, } - chunkedRequests, err := util.SplitChunksWithLimit(&writeRequest, limit) + readResponse, err := tr.doRead(wc, run, readRequest) if err != nil { - return fmt.Errorf("failed to chunk request: %w", err) + activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) + return } - if len(chunkedRequests) > 1 { - activeTask.AddLogEntry(zapcore.DebugLevel, fmt.Sprintf("split write request into %d chunks", len(chunkedRequests))) + meta := &ReadMetadata{ + Query: readRequest.Queries[0], + Request: &prompb.WriteRequest{ + Timeseries: lo.Map(readResponse.Results[0].GetTimeseries(), func(t *prompb.TimeSeries, _ int) prompb.TimeSeries { + return lo.FromPtr(t) + }), + Metadata: []prompb.MetricMetadata{}, + }, } - for i, request := range chunkedRequests { - if err := tr.doPush(ctx, request); err != nil { + writeChan <- meta + } + }) + + waitctx.Go(wc, func() { + for { + select { + case <-ctx.Done(): + break + case meta := <-writeChan: + chunks, err := util.SplitChunksWithLimit(meta.Request, limit) + if err != nil { activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) - return err + return } - activeTask.AddLogEntry(zapcore.DebugLevel, fmt.Sprintf("pushed chunk %d of %d", i+1, len(chunkedRequests))) + + lo.ForEach(chunks, func(chunk *prompb.WriteRequest, _ int) { + b.Add(ReadMetadata{ + Query: meta.Query, + Request: chunk, + }) + }) } + } + }) - progress.Current = uint64(nextEnd - progressDelta) - activeTask.SetProgress(progress) + waitctx.Go(wc, func() { + progress := &corev1.Progress{ + Current: 0, + Total: uint64(run.Query.EndTimestamp.AsTime().UnixMilli()), } - } + activeTask.SetProgress(progress) + + activeTask.AddLogEntry(zapcore.InfoLevel, "import push running") + + for { + select { + case <-ctx.Done(): + return + case meta := <-b.ch: + if err := tr.doPush(wc, meta.Request); err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) + return + } + + progress.Current += uint64(meta.Query.EndTimestampMs - meta.Query.StartTimestampMs) + activeTask.SetProgress(progress) + } + } + }) + + waitctx.Wait(wc) return nil } From 66ea27fa498e9112405c03dc73a478addeabc965 Mon Sep 17 00:00:00 2001 From: joshmeradna Date: Wed, 19 Jul 2023 22:45:41 -0400 Subject: [PATCH 02/11] give each import task its own remote reader --- pkg/gateway/delegate.go | 1 - plugins/metrics/pkg/agent/node.go | 2 - plugins/metrics/pkg/agent/runner.go | 116 +++++++++++++--------------- test/plugins/metrics/import_test.go | 21 ++++- test/plugins/metrics/runner_test.go | 18 +++-- 5 files changed, 80 insertions(+), 78 deletions(-) diff --git a/pkg/gateway/delegate.go b/pkg/gateway/delegate.go index e85b35c52a..52764984e3 100644 --- a/pkg/gateway/delegate.go +++ b/pkg/gateway/delegate.go @@ -88,7 +88,6 @@ func (d *DelegateServer) Request(ctx context.Context, req *streamv1.DelegatedMes "target", targetId, "request", req.GetRequest().QualifiedMethodName(), ) - lg.Debug("delegating rpc request") target, ok := d.activeAgents[targetId] if ok { fwdResp := &totem.RPC{} diff --git a/plugins/metrics/pkg/agent/node.go b/plugins/metrics/pkg/agent/node.go index dcc6fd27cd..655efa521a 100644 --- a/plugins/metrics/pkg/agent/node.go +++ b/plugins/metrics/pkg/agent/node.go @@ -3,7 +3,6 @@ package agent import ( "context" "fmt" - "net/http" "sort" "strings" "sync" @@ -68,7 +67,6 @@ func NewMetricsNode(ct health.ConditionTracker, lg *zap.SugaredLogger) *MetricsN targetRunner: NewTargetRunner(lg), } mn.conditions.AddListener(mn.sendHealthUpdate) - mn.targetRunner.SetRemoteReaderClient(NewRemoteReader(&http.Client{})) // FIXME: this is a hack, update the old sync code to use delegates instead mn.conditions.AddListener(func(key string) { diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 09eedcc001..858e74c119 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "strings" "sync" "time" @@ -77,9 +78,12 @@ type TargetRunMetadata struct { } // todo: could probably find a better name for this -type ReadMetadata struct { - Query *prompb.Query - Request *prompb.WriteRequest +type WriteMetadata struct { + Query *prompb.Query + WriteChunk *prompb.WriteRequest + + // ProgressRatio is the ratio of the progress of this chunk to the total progress of the original request. + ProgressRatio float64 } type targetStore struct { @@ -128,9 +132,6 @@ func (store *targetStore) ListKeys(_ context.Context, prefix string) ([]string, type taskRunner struct { remoteWriteClient clients.Locker[remotewrite.RemoteWriteClient] - remoteReaderMu sync.RWMutex - remoteReader RemoteReader - backoffPolicy backoff.Policy logger *zap.SugaredLogger @@ -152,12 +153,12 @@ func (tr *taskRunner) SetRemoteWriteClient(client clients.Locker[remotewrite.Rem tr.remoteWriteClient = client } -func (tr *taskRunner) SetRemoteReaderClient(client RemoteReader) { - tr.remoteReaderMu.Lock() - defer tr.remoteReaderMu.Unlock() +// func (tr *taskRunner) SetRemoteReaderClient(client RemoteReader) { +// tr.remoteReaderMu.Lock() +// defer tr.remoteReaderMu.Unlock() - tr.remoteReader = client -} +// tr.remoteReader = client +// } func (tr *taskRunner) OnTaskPending(_ context.Context, _ task.ActiveTask) error { return nil @@ -201,7 +202,7 @@ func (tr *taskRunner) doPush(ctx context.Context, writeRequest *prompb.WriteRequ } } -func (tr *taskRunner) doRead(ctx context.Context, run *TargetRunMetadata, readRequest *prompb.ReadRequest) (*prompb.ReadResponse, error) { +func (tr *taskRunner) doRead(ctx context.Context, reader RemoteReader, run *TargetRunMetadata, readRequest *prompb.ReadRequest) (*prompb.ReadResponse, error) { expbackoff := tr.backoffPolicy.Start(ctx) for { @@ -209,9 +210,7 @@ func (tr *taskRunner) doRead(ctx context.Context, run *TargetRunMetadata, readRe case <-expbackoff.Done(): return nil, ctx.Err() case <-expbackoff.Next(): - tr.remoteReaderMu.Lock() - readResponse, err := tr.remoteReader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest) - tr.remoteReaderMu.Unlock() + readResponse, err := reader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest) // todo: check if recoverable if err != nil { @@ -226,12 +225,10 @@ func (tr *taskRunner) doRead(ctx context.Context, run *TargetRunMetadata, readRe func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { limit := util.DefaultWriteLimit() - b := &memoryBuffer[ReadMetadata]{ - ch: make(chan ReadMetadata), + b := &memoryBuffer[WriteMetadata]{ + ch: make(chan WriteMetadata), } - writeChan := make(chan *ReadMetadata) - wc := waitctx.FromContext(ctx) run := &TargetRunMetadata{} @@ -244,12 +241,11 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT nextStart := run.Query.StartTimestamp.AsTime().UnixMilli() nextEnd := nextStart - activeTask.AddLogEntry(zapcore.InfoLevel, "import read running") + reader := NewRemoteReader(&http.Client{}) for nextStart < importEnd { select { case <-ctx.Done(): - activeTask.AddLogEntry(zapcore.InfoLevel, "import read stopped") return default: // continue with import } @@ -275,69 +271,64 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT }, } - readResponse, err := tr.doRead(wc, run, readRequest) + readResponse, err := tr.doRead(wc, reader, run, readRequest) if err != nil { activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) return } - meta := &ReadMetadata{ - Query: readRequest.Queries[0], - Request: &prompb.WriteRequest{ - Timeseries: lo.Map(readResponse.Results[0].GetTimeseries(), func(t *prompb.TimeSeries, _ int) prompb.TimeSeries { - return lo.FromPtr(t) - }), - Metadata: []prompb.MetricMetadata{}, - }, + writeRequest := &prompb.WriteRequest{ + Timeseries: lo.Map(readResponse.Results[0].GetTimeseries(), func(t *prompb.TimeSeries, _ int) prompb.TimeSeries { + return lo.FromPtr(t) + }), + Metadata: []prompb.MetricMetadata{}, } - writeChan <- meta - } - }) + chunks, err := util.SplitChunksWithLimit(writeRequest, limit) + if err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) + return + } - waitctx.Go(wc, func() { - for { - select { - case <-ctx.Done(): - break - case meta := <-writeChan: - chunks, err := util.SplitChunksWithLimit(meta.Request, limit) - if err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) - return - } + activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) - lo.ForEach(chunks, func(chunk *prompb.WriteRequest, _ int) { - b.Add(ReadMetadata{ - Query: meta.Query, - Request: chunk, - }) - }) - } + lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { + if err := b.Add(WriteMetadata{ + Query: readRequest.Queries[0], + WriteChunk: chunk, + ProgressRatio: 1.0 / float64(len(chunks)), + }); err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not add chunk to buffer: %s", err.Error())) + } + }) } }) waitctx.Go(wc, func() { progress := &corev1.Progress{ Current: 0, - Total: uint64(run.Query.EndTimestamp.AsTime().UnixMilli()), + Total: uint64(run.Query.EndTimestamp.AsTime().UnixMilli() - run.Query.StartTimestamp.AsTime().UnixMilli()), } activeTask.SetProgress(progress) - activeTask.AddLogEntry(zapcore.InfoLevel, "import push running") - - for { + for progress.Current < progress.Total { select { case <-ctx.Done(): return case meta := <-b.ch: - if err := tr.doPush(wc, meta.Request); err != nil { + activeTask.AddLogEntry(zapcore.DebugLevel, "received chunk from buffer") + + if err := tr.doPush(wc, meta.WriteChunk); err != nil { activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) return } - progress.Current += uint64(meta.Query.EndTimestampMs - meta.Query.StartTimestampMs) + progressDelta := uint64(float64(meta.Query.EndTimestampMs-meta.Query.StartTimestampMs) * meta.ProgressRatio) + + progress.Current += progressDelta activeTask.SetProgress(progress) + + fmt.Printf("=== [taskRunner.OnTaskRunning %s] %F ===\n", run.Target.Meta.Name, float64(progress.Current)/float64(progress.Total)) } } }) @@ -363,7 +354,6 @@ type TargetRunner interface { Stop(name string) error GetStatus(name string) (*remoteread.TargetStatus, error) SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient]) - SetRemoteReaderClient(client RemoteReader) } type taskingTargetRunner struct { @@ -499,9 +489,9 @@ func (runner *taskingTargetRunner) SetRemoteWriteClient(client clients.Locker[re runner.runner.SetRemoteWriteClient(client) } -func (runner *taskingTargetRunner) SetRemoteReaderClient(client RemoteReader) { - runner.runnerMu.Lock() - defer runner.runnerMu.Unlock() +// func (runner *taskingTargetRunner) SetRemoteReaderClient(client RemoteReader) { +// runner.runnerMu.Lock() +// defer runner.runnerMu.Unlock() - runner.runner.SetRemoteReaderClient(client) -} +// runner.runner.SetRemoteReaderClient(client) +// } diff --git a/test/plugins/metrics/import_test.go b/test/plugins/metrics/import_test.go index a0a4c0d92c..d5f67f9d01 100644 --- a/test/plugins/metrics/import_test.go +++ b/test/plugins/metrics/import_test.go @@ -13,12 +13,14 @@ import ( . "github.com/onsi/gomega" "github.com/prometheus/prometheus/prompb" "github.com/samber/lo" + "go.uber.org/zap" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" "github.com/rancher/opni/plugins/metrics/apis/remoteread" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" + "github.com/rancher/opni/pkg/logger" "github.com/rancher/opni/pkg/test" "github.com/rancher/opni/pkg/test/freeport" "github.com/rancher/opni/pkg/test/testk8s" @@ -34,11 +36,22 @@ import ( const testNamespace = "test-ns" -// blockingHttpHandler is only here to keep a remote reader connection open to keep it running indefinitely -type blockingHttpHandler struct { +// readHttpHandler is only here to keep a remote reader connection open to keep it running indefinitely +type readHttpHandler struct { + lg *zap.SugaredLogger } -func (h blockingHttpHandler) ServeHTTP(w http.ResponseWriter, request *http.Request) { +func NewReadHandler() http.Handler { + return readHttpHandler{ + lg: logger.New( + logger.WithLogLevel(zap.DebugLevel), + ).Named("read-handler"), + } +} + +func (h readHttpHandler) ServeHTTP(w http.ResponseWriter, request *http.Request) { + fmt.Printf("=== [readHttpHandler.ServeHTTP] %s ===\n", request.URL.Path) + switch request.URL.Path { case "/block": // select {} will block forever without using CPU. @@ -212,7 +225,7 @@ var _ = Describe("Remote Read Import", Ordered, Label("integration", "slow"), fu server := http.Server{ Addr: addr, - Handler: blockingHttpHandler{}, + Handler: NewReadHandler(), } go func() { diff --git a/test/plugins/metrics/runner_test.go b/test/plugins/metrics/runner_test.go index 4f99783b20..9b87c911ed 100644 --- a/test/plugins/metrics/runner_test.go +++ b/test/plugins/metrics/runner_test.go @@ -50,9 +50,11 @@ func newRespondingReader() *mockRemoteReader { var _ = Describe("Target Runner", Ordered, Label("unit"), func() { var ( - failingReader = &mockRemoteReader{ - Error: fmt.Errorf("failed"), - } + // todo: set up a mock prometheus endpoint since we no longer handle readers + + // failingReader = &mockRemoteReader{ + // Error: fmt.Errorf("failed"), + // } runner agent.TargetRunner @@ -108,7 +110,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("target runner cannot reach target endpoint", func() { It("should fail", func() { - runner.SetRemoteReaderClient(failingReader) + // runner.SetRemoteReaderClient(failingReader) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) @@ -137,7 +139,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("editing and restarting failed import", func() { It("should succeed", func() { - runner.SetRemoteReaderClient(newRespondingReader()) + // runner.SetRemoteReaderClient(newRespondingReader()) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) @@ -175,7 +177,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("target runner can reach target endpoint", func() { It("should complete", func() { - runner.SetRemoteReaderClient(newRespondingReader()) + // runner.SetRemoteReaderClient(newRespondingReader()) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) @@ -213,7 +215,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("target is stopped during push", func() { It("should be marked as stopped", func() { // new reader with the longest possible delay - runner.SetRemoteReaderClient(newRespondingReader()) + // runner.SetRemoteReaderClient(newRespondingReader()) runner.SetRemoteWriteClient(clients.NewLocker(nil, func(connInterface grpc.ClientConnInterface) remotewrite.RemoteWriteClient { return &mockRemoteWriteClient{ Delay: math.MaxInt64, @@ -247,7 +249,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { Error: fmt.Errorf("context canceled"), } })) - runner.SetRemoteReaderClient(newRespondingReader()) + // runner.SetRemoteReaderClient(newRespondingReader()) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) From 89573ab8b017cfc0def924234590890fcb70bcb5 Mon Sep 17 00:00:00 2001 From: joshmeradna Date: Thu, 20 Jul 2023 10:50:48 -0400 Subject: [PATCH 03/11] fix runner tests --- plugins/metrics/pkg/agent/buffer.go | 10 ++- plugins/metrics/pkg/agent/runner.go | 81 +++++++++--------- test/plugins/metrics/import_test.go | 64 -------------- test/plugins/metrics/runner_test.go | 125 +++++++++++----------------- test/plugins/metrics/utils_test.go | 116 ++++++++++++++++++++------ 5 files changed, 189 insertions(+), 207 deletions(-) diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index a400bf541f..4e65f6846c 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -1,22 +1,24 @@ package agent +import "context" + type Buffer[T any] interface { // Add blo cks until the value can be added to the buffer. - Add(T) error + Add(context.Context, T) error // Get blocks until a value can be retrieved from the buffer. - Get() (T, error) + Get(context.Context) (T, error) } type memoryBuffer[T any] struct { ch chan T } -func (b memoryBuffer[T]) Add(t T) error { +func (b memoryBuffer[T]) Add(ctx context.Context, t T) error { b.ch <- t return nil } -func (b memoryBuffer[T]) Get() (T, error) { +func (b memoryBuffer[T]) Get(ctx context.Context) (T, error) { return <-b.ch, nil } diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 858e74c119..c6972d48f3 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -153,13 +153,6 @@ func (tr *taskRunner) SetRemoteWriteClient(client clients.Locker[remotewrite.Rem tr.remoteWriteClient = client } -// func (tr *taskRunner) SetRemoteReaderClient(client RemoteReader) { -// tr.remoteReaderMu.Lock() -// defer tr.remoteReaderMu.Unlock() - -// tr.remoteReader = client -// } - func (tr *taskRunner) OnTaskPending(_ context.Context, _ task.ActiveTask) error { return nil } @@ -212,12 +205,14 @@ func (tr *taskRunner) doRead(ctx context.Context, reader RemoteReader, run *Targ case <-expbackoff.Next(): readResponse, err := reader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest) - // todo: check if recoverable - if err != nil { - return nil, fmt.Errorf("failed to read from target endpoint: %w", err) + if err == nil { + return readResponse, nil } - return readResponse, nil + tr.logger.With( + zap.Error(err), + "endpoint", run.Target.Spec.Endpoint, + ).Warn("failed to read from target endpoint, retrying...") } } } @@ -234,6 +229,8 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT run := &TargetRunMetadata{} activeTask.LoadTaskMetadata(run) + var err error + waitctx.Go(wc, func() { labelMatchers := toLabelMatchers(run.Query.Matchers) @@ -243,11 +240,11 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT reader := NewRemoteReader(&http.Client{}) - for nextStart < importEnd { + for err == nil && nextStart < importEnd { select { - case <-ctx.Done(): + case <-wc.Done(): return - default: // continue with import + default: // continue reading } nextStart = nextEnd @@ -271,9 +268,10 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT }, } - readResponse, err := tr.doRead(wc, reader, run, readRequest) + var readResponse *prompb.ReadResponse + + readResponse, err = tr.doRead(wc, reader, run, readRequest) if err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) return } @@ -284,16 +282,17 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT Metadata: []prompb.MetricMetadata{}, } - chunks, err := util.SplitChunksWithLimit(writeRequest, limit) + var chunks []*prompb.WriteRequest + + chunks, err = util.SplitChunksWithLimit(writeRequest, limit) if err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) return } activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { - if err := b.Add(WriteMetadata{ + if err := b.Add(wc, WriteMetadata{ Query: readRequest.Queries[0], WriteChunk: chunk, ProgressRatio: 1.0 / float64(len(chunks)), @@ -311,31 +310,42 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT } activeTask.SetProgress(progress) - for progress.Current < progress.Total { + for err == nil && progress.Current < progress.Total { select { - case <-ctx.Done(): + case <-wc.Done(): return - case meta := <-b.ch: - activeTask.AddLogEntry(zapcore.DebugLevel, "received chunk from buffer") + default: // continue pushing + } - if err := tr.doPush(wc, meta.WriteChunk); err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) - return - } + var meta WriteMetadata - progressDelta := uint64(float64(meta.Query.EndTimestampMs-meta.Query.StartTimestampMs) * meta.ProgressRatio) + meta, err = b.Get(wc) + if err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", err.Error())) + continue + } - progress.Current += progressDelta - activeTask.SetProgress(progress) + activeTask.AddLogEntry(zapcore.DebugLevel, "received chunk from buffer") - fmt.Printf("=== [taskRunner.OnTaskRunning %s] %F ===\n", run.Target.Meta.Name, float64(progress.Current)/float64(progress.Total)) + if err = tr.doPush(wc, meta.WriteChunk); err != nil { + return } + + progressDelta := uint64(float64(meta.Query.EndTimestampMs-meta.Query.StartTimestampMs) * meta.ProgressRatio) + + progress.Current += progressDelta + activeTask.SetProgress(progress) } }) waitctx.Wait(wc) - return nil + if err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) + return err + } + + return ctx.Err() } func (tr *taskRunner) OnTaskCompleted(_ context.Context, activeTask task.ActiveTask, state task.State, _ ...any) { @@ -488,10 +498,3 @@ func (runner *taskingTargetRunner) SetRemoteWriteClient(client clients.Locker[re runner.runner.SetRemoteWriteClient(client) } - -// func (runner *taskingTargetRunner) SetRemoteReaderClient(client RemoteReader) { -// runner.runnerMu.Lock() -// defer runner.runnerMu.Unlock() - -// runner.runner.SetRemoteReaderClient(client) -// } diff --git a/test/plugins/metrics/import_test.go b/test/plugins/metrics/import_test.go index d5f67f9d01..cf00249d8d 100644 --- a/test/plugins/metrics/import_test.go +++ b/test/plugins/metrics/import_test.go @@ -7,20 +7,14 @@ import ( "os" "time" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/prometheus/prometheus/prompb" - "github.com/samber/lo" - "go.uber.org/zap" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" "github.com/rancher/opni/plugins/metrics/apis/remoteread" managementv1 "github.com/rancher/opni/pkg/apis/management/v1" - "github.com/rancher/opni/pkg/logger" "github.com/rancher/opni/pkg/test" "github.com/rancher/opni/pkg/test/freeport" "github.com/rancher/opni/pkg/test/testk8s" @@ -36,64 +30,6 @@ import ( const testNamespace = "test-ns" -// readHttpHandler is only here to keep a remote reader connection open to keep it running indefinitely -type readHttpHandler struct { - lg *zap.SugaredLogger -} - -func NewReadHandler() http.Handler { - return readHttpHandler{ - lg: logger.New( - logger.WithLogLevel(zap.DebugLevel), - ).Named("read-handler"), - } -} - -func (h readHttpHandler) ServeHTTP(w http.ResponseWriter, request *http.Request) { - fmt.Printf("=== [readHttpHandler.ServeHTTP] %s ===\n", request.URL.Path) - - switch request.URL.Path { - case "/block": - // select {} will block forever without using CPU. - select {} - case "/large": - uncompressed, err := proto.Marshal(&prompb.ReadResponse{ - Results: []*prompb.QueryResult{ - { - Timeseries: []*prompb.TimeSeries{ - { - Labels: []prompb.Label{ - { - Name: "__name__", - Value: "test_metric", - }, - }, - // Samples: lo.Map(make([]prompb.Sample, 4194304), func(sample prompb.Sample, i int) prompb.Sample { - Samples: lo.Map(make([]prompb.Sample, 65536), func(sample prompb.Sample, i int) prompb.Sample { - sample.Timestamp = time.Now().UnixMilli() - return sample - }), - }, - }, - }, - }, - }) - if err != nil { - panic(err) - } - - compressed := snappy.Encode(nil, uncompressed) - - _, err = w.Write(compressed) - if err != nil { - panic(err) - } - case "/health": - default: - panic(fmt.Sprintf("unsupported endpoint: %s", request.URL.Path)) - } -} - var _ = Describe("Remote Read Import", Ordered, Label("integration", "slow"), func() { ctx := context.Background() agentId := "import-agent" diff --git a/test/plugins/metrics/runner_test.go b/test/plugins/metrics/runner_test.go index 9b87c911ed..c1017c74f0 100644 --- a/test/plugins/metrics/runner_test.go +++ b/test/plugins/metrics/runner_test.go @@ -3,13 +3,14 @@ package metrics_test import ( "fmt" "math" + "net/http" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/prometheus/prometheus/prompb" "github.com/rancher/opni/pkg/clients" "github.com/rancher/opni/pkg/logger" + "github.com/rancher/opni/pkg/test/freeport" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" "github.com/rancher/opni/plugins/metrics/pkg/agent" @@ -17,59 +18,13 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -func newRespondingReader() *mockRemoteReader { - return &mockRemoteReader{ - Responses: []*prompb.ReadResponse{ - { - Results: []*prompb.QueryResult{ - { - Timeseries: []*prompb.TimeSeries{ - { - Labels: []prompb.Label{}, - Samples: []prompb.Sample{ - { - Value: 100, - Timestamp: 100, - }, - }, - Exemplars: []prompb.Exemplar{ - { - Labels: nil, - Value: 0, - Timestamp: 0, - }, - }, - }, - }, - }, - }, - }, - }, - } -} - var _ = Describe("Target Runner", Ordered, Label("unit"), func() { var ( // todo: set up a mock prometheus endpoint since we no longer handle readers - - // failingReader = &mockRemoteReader{ - // Error: fmt.Errorf("failed"), - // } - - runner agent.TargetRunner - + addr string + runner agent.TargetRunner writerClient *mockRemoteWriteClient - - target = &remoteread.Target{ - Meta: &remoteread.TargetMeta{ - Name: "test", - ClusterId: "00000-00000", - }, - Spec: &remoteread.TargetSpec{ - Endpoint: "http://127.0.0.1:9090/api/v1/read", - }, - Status: nil, - } + target *remoteread.Target query = &remoteread.Query{ StartTimestamp: ×tamppb.Timestamp{}, @@ -80,6 +35,26 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { } ) + BeforeAll(func() { + By("adding a remote read server") + addr = fmt.Sprintf("127.0.0.1:%d", freeport.GetFreePort()) + + server := http.Server{ + Addr: addr, + Handler: NewReadHandler(), + } + + go func() { + server.ListenAndServe() + }() + DeferCleanup(server.Close) + + Eventually(func() error { + _, err := (&http.Client{}).Get(fmt.Sprintf("http://%s/health", addr)) + return err + }).Should(Not(HaveOccurred())) + }) + BeforeEach(func() { lg := logger.NewPluginLogger().Named("test-runner") @@ -89,6 +64,18 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { runner.SetRemoteWriteClient(clients.NewLocker(nil, func(connInterface grpc.ClientConnInterface) remotewrite.RemoteWriteClient { return writerClient })) + + target = &remoteread.Target{ + Meta: &remoteread.TargetMeta{ + Name: "testTarget", + ClusterId: "testCluster", + }, + Spec: &remoteread.TargetSpec{ + Endpoint: "http://127.0.0.1:9090/api/v1/read", + }, + Status: nil, + } + }) When("target status is not running", func() { @@ -109,37 +96,24 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { }) When("target runner cannot reach target endpoint", func() { - It("should fail", func() { - // runner.SetRemoteReaderClient(failingReader) + It("should retry until success", func() { + target.Spec.Endpoint = "http://i.do.not.exist:9090/api/v1/read" err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) - var status *remoteread.TargetStatus - Eventually(func() remoteread.TargetState { - status, _ = runner.GetStatus(target.Meta.Name) - return status.State - }).Should(Equal(remoteread.TargetState_Failed)) + status, err := runner.GetStatus(target.Meta.Name) + Expect(err).NotTo(HaveOccurred()) - expected := &remoteread.TargetStatus{ - Progress: &remoteread.TargetProgress{ - StartTimestamp: ×tamppb.Timestamp{}, - LastReadTimestamp: ×tamppb.Timestamp{}, - EndTimestamp: ×tamppb.Timestamp{ - Seconds: agent.TimeDeltaMillis / 2 / time.Second.Milliseconds(), - }, - }, - Message: "failed to read from target endpoint: failed", - State: remoteread.TargetState_Failed, - } + time.Sleep(time.Second) - AssertTargetStatus(expected, status) + Expect(status.State).To(Equal(remoteread.TargetState_Running)) }) }) When("editing and restarting failed import", func() { It("should succeed", func() { - // runner.SetRemoteReaderClient(newRespondingReader()) + target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) @@ -177,7 +151,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("target runner can reach target endpoint", func() { It("should complete", func() { - // runner.SetRemoteReaderClient(newRespondingReader()) + target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) @@ -214,8 +188,8 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("target is stopped during push", func() { It("should be marked as stopped", func() { - // new reader with the longest possible delay - // runner.SetRemoteReaderClient(newRespondingReader()) + target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) + runner.SetRemoteWriteClient(clients.NewLocker(nil, func(connInterface grpc.ClientConnInterface) remotewrite.RemoteWriteClient { return &mockRemoteWriteClient{ Delay: math.MaxInt64, @@ -244,12 +218,13 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("target pushes with unrecoverable error", func() { It("should fail", func() { + target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) + runner.SetRemoteWriteClient(clients.NewLocker(nil, func(connInterface grpc.ClientConnInterface) remotewrite.RemoteWriteClient { return &mockRemoteWriteClient{ - Error: fmt.Errorf("context canceled"), + Error: fmt.Errorf("some unrecoverable error"), } })) - // runner.SetRemoteReaderClient(newRespondingReader()) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) diff --git a/test/plugins/metrics/utils_test.go b/test/plugins/metrics/utils_test.go index 7d8bf88989..3a66aaf9d5 100644 --- a/test/plugins/metrics/utils_test.go +++ b/test/plugins/metrics/utils_test.go @@ -3,14 +3,20 @@ package metrics_test import ( "context" "fmt" + "net/http" "time" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/prometheus/prompb" + "github.com/rancher/opni/pkg/logger" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" + "github.com/samber/lo" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" ) @@ -34,31 +40,6 @@ func AssertTargetStatus(expected *remoteread.TargetStatus, actual *remoteread.Ta AssertTargetProgress(expected.Progress, actual.Progress) } -type mockRemoteReader struct { - Error error - Responses []*prompb.ReadResponse - - // Delay is the time.Duration to wait before returning the next response. - Delay time.Duration - i int -} - -func (reader *mockRemoteReader) Read(_ context.Context, _ string, _ *prompb.ReadRequest) (*prompb.ReadResponse, error) { - if reader.Error != nil { - return nil, reader.Error - } - - if reader.i >= len(reader.Responses) { - return nil, fmt.Errorf("all reader responses have alaredy been consumed") - } - - time.Sleep(reader.Delay) - - response := reader.Responses[reader.i] - reader.i++ - return response, reader.Error -} - type mockRemoteWriteClient struct { Error error Payloads []*cortexpb.WriteRequest @@ -85,3 +66,88 @@ func (client *mockRemoteWriteClient) Push(ctx context.Context, in *cortexpb.Writ func (client *mockRemoteWriteClient) SyncRules(_ context.Context, _ *remotewrite.Payload, _ ...grpc.CallOption) (*emptypb.Empty, error) { return &emptypb.Empty{}, nil } + +// readHttpHandler is only here to keep a remote reader connection open to keep it running indefinitely +type readHttpHandler struct { + lg *zap.SugaredLogger +} + +func NewReadHandler() http.Handler { + return readHttpHandler{ + lg: logger.New( + logger.WithLogLevel(zap.DebugLevel), + ).Named("read-handler"), + } +} + +func (h readHttpHandler) writeReadResponse(w http.ResponseWriter, r *prompb.ReadResponse) { + uncompressed, err := proto.Marshal(r) + if err != nil { + panic(err) + } + + compressed := snappy.Encode(nil, uncompressed) + + _, err = w.Write(compressed) + if err != nil { + panic(err) + } +} + +func (h readHttpHandler) ServeHTTP(w http.ResponseWriter, request *http.Request) { + switch request.URL.Path { + case "/block": + // select {} will block forever without using CPU. + select {} + case "/large": + h.writeReadResponse(w, &prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + { + Timeseries: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "test_metric", + }, + }, + // Samples: lo.Map(make([]prompb.Sample, 4194304), func(sample prompb.Sample, i int) prompb.Sample { + Samples: lo.Map(make([]prompb.Sample, 65536), func(sample prompb.Sample, i int) prompb.Sample { + sample.Timestamp = time.Now().UnixMilli() + return sample + }), + }, + }, + }, + }, + }) + case "/small": + h.writeReadResponse(w, &prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + { + Timeseries: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{}, + Samples: []prompb.Sample{ + { + Value: 100, + Timestamp: 100, + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: nil, + Value: 0, + Timestamp: 0, + }, + }, + }, + }, + }, + }, + }) + case "/health": + default: + panic(fmt.Sprintf("unsupported endpoint: %s", request.URL.Path)) + } +} From e2f91a17aa4ced18c15a9dc6b494d4965606f65b Mon Sep 17 00:00:00 2001 From: joshmeradna Date: Thu, 20 Jul 2023 15:09:32 -0400 Subject: [PATCH 04/11] fix linting issues --- plugins/metrics/pkg/agent/buffer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index 4e65f6846c..ae10b99f7d 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -14,11 +14,11 @@ type memoryBuffer[T any] struct { ch chan T } -func (b memoryBuffer[T]) Add(ctx context.Context, t T) error { +func (b memoryBuffer[T]) Add(_ context.Context, t T) error { b.ch <- t return nil } -func (b memoryBuffer[T]) Get(ctx context.Context) (T, error) { +func (b memoryBuffer[T]) Get(_ context.Context) (T, error) { return <-b.ch, nil } From ba90671159848ce5e6fbfa010fcf018821243a09 Mon Sep 17 00:00:00 2001 From: joshmeranda Date: Fri, 21 Jul 2023 09:04:16 -0400 Subject: [PATCH 05/11] add import buffer volume --- .../charts/templates/deployment.yaml | 28 +++++++++++++++++++ .../opni-agent/opni-agent/charts/values.yaml | 6 ++++ 2 files changed, 34 insertions(+) diff --git a/packages/opni-agent/opni-agent/charts/templates/deployment.yaml b/packages/opni-agent/opni-agent/charts/templates/deployment.yaml index b23bf89e2d..834deccff1 100644 --- a/packages/opni-agent/opni-agent/charts/templates/deployment.yaml +++ b/packages/opni-agent/opni-agent/charts/templates/deployment.yaml @@ -59,6 +59,10 @@ spec: mountPath: /etc/opni - name: plugins mountPath: /var/lib/opni-agent/plugins + {{- if .Values.import-buffer.enabled }} + - name: import-buffer + mountPath: /var/lib/opni-agent/import-buffer + {{- end }} {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} @@ -175,6 +179,11 @@ spec: {{- if .Values.volumes }} {{- toYaml .Values.volumes | nindent 8 }} {{- end }} + {{- .Values.import-buffer.enabled }} + - name: import-buffer + persistentVolumeClaim: + claimName: {{ include "opni-agent.fullname" .}}-import-buffer + {{- end }} {{- if eq .Values.persistence.mode "pvc" }} --- apiVersion: v1 @@ -194,4 +203,23 @@ spec: resources: requests: storage: 2Gi +{{- end }} +{{- if .Values.import-buffer.enabled }} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ include "onpi-agent.fullname" .}}-import-buffer + namespace: {{ include "opni-agent.namespace" . }} + labels: + {{- include "opni-agent.labels" . | nindent 4 }} + opni.io/app: agent +spec: + accessModes: + - ReadWriteOnce + {{- if .Values.global.storageClass }} + storageClassName: {{ .Values.global.storageClass }} + {{- end }} + resources: + {{- toYaml .Values.importBuffer.resources | nindent 4 }} {{- end }} \ No newline at end of file diff --git a/packages/opni-agent/opni-agent/charts/values.yaml b/packages/opni-agent/opni-agent/charts/values.yaml index 59da633a1a..8cc9aa22ea 100644 --- a/packages/opni-agent/opni-agent/charts/values.yaml +++ b/packages/opni-agent/opni-agent/charts/values.yaml @@ -102,6 +102,12 @@ kube-prometheus-stack: alertmanager: enabled: false # disable the default Alertmanager deployment +import-buffer: + enabled: false + resources: + requests: + storage: 10Gi + global: cattle: systemDefaultRegistry: "" From b01a0df56d905909ff8ab042de04df674330bca7 Mon Sep 17 00:00:00 2001 From: joshmeranda Date: Fri, 21 Jul 2023 21:40:06 -0400 Subject: [PATCH 06/11] add disk buffer --- plugins/metrics/pkg/agent/buffer.go | 79 ++++++++++++++++++++++++++--- plugins/metrics/pkg/agent/runner.go | 31 +++++++---- 2 files changed, 94 insertions(+), 16 deletions(-) diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index ae10b99f7d..a753da7cf8 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -1,6 +1,16 @@ package agent -import "context" +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path" + + "github.com/golang/snappy" + "github.com/google/uuid" +) type Buffer[T any] interface { // Add blo cks until the value can be added to the buffer. @@ -10,15 +20,70 @@ type Buffer[T any] interface { Get(context.Context) (T, error) } -type memoryBuffer[T any] struct { - ch chan T +type diskBuffer struct { + dir string + queue chan string +} + +// todo: reconcile pre-existing chunks (useful for pod restarts during import) +func NewDiskBuffer(dir string) (Buffer[WriteMetadata], error) { + buffer := &diskBuffer{ + dir: path.Join(BufferDir), + queue: make(chan string, 100), + } + + if err := os.MkdirAll(buffer.dir, 0755); err != nil { + return nil, fmt.Errorf("could not create buffer directory: %w", err) + } + + return buffer, nil } -func (b memoryBuffer[T]) Add(_ context.Context, t T) error { - b.ch <- t +func (b diskBuffer) Add(_ context.Context, meta WriteMetadata) error { + // todo: will create a new directory for each target name which will not be cleaned up internally + filePath := path.Join(b.dir, meta.Target, uuid.New().String()) + + if err := os.MkdirAll(path.Dir(filePath), 0755); err != nil && !errors.Is(err, os.ErrExist) { + return fmt.Errorf("could not create buffer directory for target '%s': %w", meta.Target, err) + } + + uncompressed, err := json.Marshal(meta) + if err != nil { + return fmt.Errorf("could not marshal chunk for buffer: %w", err) + } + + compressed := snappy.Encode(nil, uncompressed) + + if err := os.WriteFile(filePath, compressed, 0644); err != nil { + return fmt.Errorf("could not write chunk to buffer: %w", err) + } + + b.queue <- filePath + return nil } -func (b memoryBuffer[T]) Get(_ context.Context) (T, error) { - return <-b.ch, nil +func (b diskBuffer) Get(_ context.Context) (WriteMetadata, error) { + path := <-b.queue + + compressed, err := os.ReadFile(path) + if err != nil { + return WriteMetadata{}, fmt.Errorf("could not read chunk from buffer: %w", err) + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return WriteMetadata{}, fmt.Errorf("could not decompress chunk from buffer: %w", err) + } + + var meta WriteMetadata + if err := json.Unmarshal(uncompressed, &meta); err != nil { + return WriteMetadata{}, fmt.Errorf("could not unmarshal chunk from buffer: %w", err) + } + + if err := os.Remove(path); err != nil { + return WriteMetadata{}, fmt.Errorf("could not remove chunk file from disk, data may linger on system longer than expected: %w", err) + } + + return meta, nil } diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index c6972d48f3..7620b2b571 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -28,6 +28,8 @@ import ( var TimeDeltaMillis = time.Minute.Milliseconds() +const BufferDir = "/var/lib/opni-agent/import-buffer" + func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.LabelMatcher { pbLabelMatchers := make([]*prompb.LabelMatcher, 0, len(rrLabelMatchers)) @@ -78,7 +80,9 @@ type TargetRunMetadata struct { } // todo: could probably find a better name for this +// todo: replace ProgressRatio and Query with a ProgressDelta type WriteMetadata struct { + Target string Query *prompb.Query WriteChunk *prompb.WriteRequest @@ -135,9 +139,17 @@ type taskRunner struct { backoffPolicy backoff.Policy logger *zap.SugaredLogger + + buffer Buffer[WriteMetadata] } -func newTaskRunner(logger *zap.SugaredLogger) *taskRunner { +func newTaskRunner(logger *zap.SugaredLogger) (*taskRunner, error) { + buffer, err := NewDiskBuffer(BufferDir) + if err != nil { + err := fmt.Errorf("could not create buffer: %w", err) + return nil, fmt.Errorf("could not create buffer: %w", err) + } + return &taskRunner{ backoffPolicy: backoff.Exponential( backoff.WithMaxRetries(0), @@ -146,7 +158,8 @@ func newTaskRunner(logger *zap.SugaredLogger) *taskRunner { backoff.WithMultiplier(1.1), ), logger: logger.Named("task-runner"), - } + buffer: buffer, + }, nil } func (tr *taskRunner) SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient]) { @@ -220,10 +233,6 @@ func (tr *taskRunner) doRead(ctx context.Context, reader RemoteReader, run *Targ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { limit := util.DefaultWriteLimit() - b := &memoryBuffer[WriteMetadata]{ - ch: make(chan WriteMetadata), - } - wc := waitctx.FromContext(ctx) run := &TargetRunMetadata{} @@ -292,7 +301,8 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { - if err := b.Add(wc, WriteMetadata{ + if err := tr.buffer.Add(wc, WriteMetadata{ + Target: run.Target.Meta.Name, Query: readRequest.Queries[0], WriteChunk: chunk, ProgressRatio: 1.0 / float64(len(chunks)), @@ -319,7 +329,7 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT var meta WriteMetadata - meta, err = b.Get(wc) + meta, err = tr.buffer.Get(wc) if err != nil { activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", err.Error())) continue @@ -380,7 +390,10 @@ func NewTargetRunner(logger *zap.SugaredLogger) TargetRunner { inner: make(map[string]*corev1.TaskStatus), } - runner := newTaskRunner(logger) + runner, err := newTaskRunner(logger) + if err != nil { + panic(fmt.Sprintf("bug: failed to create target task runner: %s", err)) + } controller, err := task.NewController(context.Background(), "target-runner", store, runner) if err != nil { From 9508a7be14deadd46ffa36ffc2b1276b8a279945 Mon Sep 17 00:00:00 2001 From: joshmeranda Date: Sat, 22 Jul 2023 00:21:49 -0400 Subject: [PATCH 07/11] consolodate each task buffer into one --- plugins/metrics/pkg/agent/buffer.go | 98 +++++++++++++++++++++-------- plugins/metrics/pkg/agent/runner.go | 20 +++--- test/plugins/metrics/runner_test.go | 36 ++++++++++- 3 files changed, 120 insertions(+), 34 deletions(-) diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index a753da7cf8..e436959d11 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -7,29 +7,39 @@ import ( "fmt" "os" "path" + "sync" "github.com/golang/snappy" "github.com/google/uuid" ) -type Buffer[T any] interface { +var ErrBufferNotFound error = fmt.Errorf("buffer not found") + +type ChunkBuffer interface { // Add blo cks until the value can be added to the buffer. - Add(context.Context, T) error + Add(context.Context, string, WriteMetadata) error // Get blocks until a value can be retrieved from the buffer. - Get(context.Context) (T, error) + Get(context.Context, string) (WriteMetadata, error) + + // Delete removes a buffer for the named task from the buffer. + Delete(context.Context, string) error } type diskBuffer struct { - dir string - queue chan string + dir string + + diskWriteLock sync.Mutex + + chanLocker sync.RWMutex + chunkChans map[string]chan string } // todo: reconcile pre-existing chunks (useful for pod restarts during import) -func NewDiskBuffer(dir string) (Buffer[WriteMetadata], error) { +func NewDiskBuffer(dir string) (ChunkBuffer, error) { buffer := &diskBuffer{ - dir: path.Join(BufferDir), - queue: make(chan string, 100), + dir: path.Join(BufferDir), + chunkChans: make(map[string]chan string), } if err := os.MkdirAll(buffer.dir, 0755); err != nil { @@ -39,7 +49,19 @@ func NewDiskBuffer(dir string) (Buffer[WriteMetadata], error) { return buffer, nil } -func (b diskBuffer) Add(_ context.Context, meta WriteMetadata) error { +func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) error { + b.chanLocker.RLock() + chunkChan, found := b.chunkChans[name] + b.chanLocker.RUnlock() + + if !found { + chunkChan = make(chan string, 100) + + b.chanLocker.Lock() + b.chunkChans[name] = chunkChan + b.chanLocker.Unlock() + } + // todo: will create a new directory for each target name which will not be cleaned up internally filePath := path.Join(b.dir, meta.Target, uuid.New().String()) @@ -54,36 +76,62 @@ func (b diskBuffer) Add(_ context.Context, meta WriteMetadata) error { compressed := snappy.Encode(nil, uncompressed) + b.diskWriteLock.Lock() if err := os.WriteFile(filePath, compressed, 0644); err != nil { return fmt.Errorf("could not write chunk to buffer: %w", err) } + b.diskWriteLock.Unlock() - b.queue <- filePath + chunkChan <- filePath return nil } -func (b diskBuffer) Get(_ context.Context) (WriteMetadata, error) { - path := <-b.queue +func (b *diskBuffer) Get(ctx context.Context, name string) (WriteMetadata, error) { + b.chanLocker.RLock() + chunkChan, found := b.chunkChans[name] + b.chanLocker.RUnlock() - compressed, err := os.ReadFile(path) - if err != nil { - return WriteMetadata{}, fmt.Errorf("could not read chunk from buffer: %w", err) + if !found { + return WriteMetadata{}, ErrBufferNotFound } - uncompressed, err := snappy.Decode(nil, compressed) - if err != nil { - return WriteMetadata{}, fmt.Errorf("could not decompress chunk from buffer: %w", err) + select { + case <-ctx.Done(): + return WriteMetadata{}, ctx.Err() + case path := <-chunkChan: + compressed, err := os.ReadFile(path) + if err != nil { + return WriteMetadata{}, fmt.Errorf("could not read chunk from buffer: %w", err) + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return WriteMetadata{}, fmt.Errorf("could not decompress chunk from buffer: %w", err) + } + + var meta WriteMetadata + if err := json.Unmarshal(uncompressed, &meta); err != nil { + return WriteMetadata{}, fmt.Errorf("could not unmarshal chunk from buffer: %w", err) + } + + if err := os.Remove(path); err != nil { + return WriteMetadata{}, fmt.Errorf("could not remove chunk file from disk, data may linger on system longer than expected: %w", err) + } + + return meta, nil } +} - var meta WriteMetadata - if err := json.Unmarshal(uncompressed, &meta); err != nil { - return WriteMetadata{}, fmt.Errorf("could not unmarshal chunk from buffer: %w", err) - } +func (b *diskBuffer) Delete(ctx context.Context, name string) error { + b.chanLocker.Lock() + delete(b.chunkChans, name) + b.chanLocker.Unlock() - if err := os.Remove(path); err != nil { - return WriteMetadata{}, fmt.Errorf("could not remove chunk file from disk, data may linger on system longer than expected: %w", err) + subBufferDir := path.Join(b.dir, name) + if err := os.RemoveAll(subBufferDir); err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("could not remove buffer directory: %w", err) } - return meta, nil + return nil } diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 7620b2b571..038f8e6419 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -3,6 +3,7 @@ package agent import ( "context" "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -28,7 +29,8 @@ import ( var TimeDeltaMillis = time.Minute.Milliseconds() -const BufferDir = "/var/lib/opni-agent/import-buffer" +// const BufferDir = "/var/lib/opni-agent/import-buffer" +const BufferDir = "import-buffer" func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.LabelMatcher { pbLabelMatchers := make([]*prompb.LabelMatcher, 0, len(rrLabelMatchers)) @@ -140,13 +142,12 @@ type taskRunner struct { logger *zap.SugaredLogger - buffer Buffer[WriteMetadata] + buffer ChunkBuffer } func newTaskRunner(logger *zap.SugaredLogger) (*taskRunner, error) { buffer, err := NewDiskBuffer(BufferDir) if err != nil { - err := fmt.Errorf("could not create buffer: %w", err) return nil, fmt.Errorf("could not create buffer: %w", err) } @@ -301,7 +302,7 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { - if err := tr.buffer.Add(wc, WriteMetadata{ + if err := tr.buffer.Add(wc, run.Target.Meta.Name, WriteMetadata{ Target: run.Target.Meta.Name, Query: readRequest.Queries[0], WriteChunk: chunk, @@ -329,9 +330,11 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT var meta WriteMetadata - meta, err = tr.buffer.Get(wc) + meta, err = tr.buffer.Get(wc, run.Target.Meta.Name) if err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", err.Error())) + if !errors.Is(err, ErrBufferNotFound) { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", err.Error())) + } continue } @@ -358,14 +361,15 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT return ctx.Err() } -func (tr *taskRunner) OnTaskCompleted(_ context.Context, activeTask task.ActiveTask, state task.State, _ ...any) { +func (tr *taskRunner) OnTaskCompleted(ctx context.Context, activeTask task.ActiveTask, state task.State, _ ...any) { switch state { case task.StateCompleted: activeTask.AddLogEntry(zapcore.InfoLevel, "completed") case task.StateFailed: - // a log will be added in OnTaskRunning for failed imports so we don't need to log anything here + tr.buffer.Delete(ctx, activeTask.TaskId()) case task.StateCanceled: activeTask.AddLogEntry(zapcore.WarnLevel, "canceled") + tr.buffer.Delete(ctx, activeTask.TaskId()) } } diff --git a/test/plugins/metrics/runner_test.go b/test/plugins/metrics/runner_test.go index c1017c74f0..ebb6a37d1e 100644 --- a/test/plugins/metrics/runner_test.go +++ b/test/plugins/metrics/runner_test.go @@ -124,7 +124,6 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { return status.State }).Should(Equal(remoteread.TargetState_Completed)) - // log message timing is not guaranteed Eventually(func() string { status, _ = runner.GetStatus(target.Meta.Name) return status.Message @@ -184,6 +183,41 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { AssertTargetStatus(expected, status) Expect(len(writerClient.Payloads)).To(Equal(1)) }) + + It("should complete with large payload", func() { + target.Spec.Endpoint = fmt.Sprintf("http://%s/large", addr) + + err := runner.Start(target, query) + Expect(err).NotTo(HaveOccurred()) + + var status *remoteread.TargetStatus + Eventually(func() remoteread.TargetState { + status, _ = runner.GetStatus(target.Meta.Name) + return status.State + }).Should(Equal(remoteread.TargetState_Completed)) + + Eventually(func() string { + status, _ = runner.GetStatus(target.Meta.Name) + return status.Message + }).Should(Equal("completed")) + + expected := &remoteread.TargetStatus{ + Progress: &remoteread.TargetProgress{ + StartTimestamp: ×tamppb.Timestamp{}, + LastReadTimestamp: ×tamppb.Timestamp{ + Seconds: agent.TimeDeltaMillis / 2 / time.Second.Milliseconds(), + }, + EndTimestamp: ×tamppb.Timestamp{ + Seconds: agent.TimeDeltaMillis / 2 / time.Second.Milliseconds(), + }, + }, + Message: "completed", + State: remoteread.TargetState_Completed, + } + + AssertTargetStatus(expected, status) + Expect(len(writerClient.Payloads)).To(Equal(4)) + }) }) When("target is stopped during push", func() { From fa3f59444afdebd8739fc548c8e2c9edbee653e1 Mon Sep 17 00:00:00 2001 From: joshmeranda Date: Sat, 22 Jul 2023 01:11:37 -0400 Subject: [PATCH 08/11] properlly handle buffer not found err --- plugins/metrics/pkg/agent/buffer.go | 4 ++-- plugins/metrics/pkg/agent/runner.go | 26 ++++++++++---------- test/plugins/metrics/runner_test.go | 37 ----------------------------- 3 files changed, 16 insertions(+), 51 deletions(-) diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index e436959d11..82e2136da6 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -63,10 +63,10 @@ func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) err } // todo: will create a new directory for each target name which will not be cleaned up internally - filePath := path.Join(b.dir, meta.Target, uuid.New().String()) + filePath := path.Join(b.dir, name, uuid.New().String()) if err := os.MkdirAll(path.Dir(filePath), 0755); err != nil && !errors.Is(err, os.ErrExist) { - return fmt.Errorf("could not create buffer directory for target '%s': %w", meta.Target, err) + return fmt.Errorf("could not create buffer directory for target '%s': %w", name, err) } uncompressed, err := json.Marshal(meta) diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 038f8e6419..14683f1cac 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -84,7 +84,6 @@ type TargetRunMetadata struct { // todo: could probably find a better name for this // todo: replace ProgressRatio and Query with a ProgressDelta type WriteMetadata struct { - Target string Query *prompb.Query WriteChunk *prompb.WriteRequest @@ -303,7 +302,6 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { if err := tr.buffer.Add(wc, run.Target.Meta.Name, WriteMetadata{ - Target: run.Target.Meta.Name, Query: readRequest.Queries[0], WriteChunk: chunk, ProgressRatio: 1.0 / float64(len(chunks)), @@ -328,12 +326,10 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT default: // continue pushing } - var meta WriteMetadata - - meta, err = tr.buffer.Get(wc, run.Target.Meta.Name) - if err != nil { - if !errors.Is(err, ErrBufferNotFound) { - activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", err.Error())) + meta, getErr := tr.buffer.Get(wc, run.Target.Meta.Name) + if getErr != nil { + if !errors.Is(getErr, ErrBufferNotFound) { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", getErr.Error())) } continue } @@ -362,14 +358,16 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT } func (tr *taskRunner) OnTaskCompleted(ctx context.Context, activeTask task.ActiveTask, state task.State, _ ...any) { + if err := tr.buffer.Delete(ctx, activeTask.TaskId()); err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not delete task from buffer: %s", err.Error())) + } + switch state { case task.StateCompleted: activeTask.AddLogEntry(zapcore.InfoLevel, "completed") case task.StateFailed: - tr.buffer.Delete(ctx, activeTask.TaskId()) case task.StateCanceled: activeTask.AddLogEntry(zapcore.WarnLevel, "canceled") - tr.buffer.Delete(ctx, activeTask.TaskId()) } } @@ -502,11 +500,15 @@ func (runner *taskingTargetRunner) GetStatus(name string) (*remoteread.TargetSta state = remoteread.TargetState_Canceled } - return &remoteread.TargetStatus{ + status := &remoteread.TargetStatus{ Progress: statusProgress, Message: getMessageFromTaskLogs(taskStatus.Logs), State: state, - }, nil + } + + // fmt.Printf("=== [taskingTargetRunner.GetStatus] 000 %s ===\n", status) + + return status, nil } func (runner *taskingTargetRunner) SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient]) { diff --git a/test/plugins/metrics/runner_test.go b/test/plugins/metrics/runner_test.go index ebb6a37d1e..2a024619c1 100644 --- a/test/plugins/metrics/runner_test.go +++ b/test/plugins/metrics/runner_test.go @@ -111,43 +111,6 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { }) }) - When("editing and restarting failed import", func() { - It("should succeed", func() { - target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) - - err := runner.Start(target, query) - Expect(err).NotTo(HaveOccurred()) - - var status *remoteread.TargetStatus - Eventually(func() remoteread.TargetState { - status, _ = runner.GetStatus(target.Meta.Name) - return status.State - }).Should(Equal(remoteread.TargetState_Completed)) - - Eventually(func() string { - status, _ = runner.GetStatus(target.Meta.Name) - return status.Message - }).Should(Equal("completed")) - - expected := &remoteread.TargetStatus{ - Progress: &remoteread.TargetProgress{ - StartTimestamp: ×tamppb.Timestamp{}, - LastReadTimestamp: ×tamppb.Timestamp{ - Seconds: agent.TimeDeltaMillis / 2 / time.Second.Milliseconds(), - }, - EndTimestamp: ×tamppb.Timestamp{ - Seconds: agent.TimeDeltaMillis / 2 / time.Second.Milliseconds(), - }, - }, - Message: "completed", - State: remoteread.TargetState_Completed, - } - - AssertTargetStatus(expected, status) - Expect(len(writerClient.Payloads)).To(Equal(1)) - }) - }) - When("target runner can reach target endpoint", func() { It("should complete", func() { target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) From bac6b0265a897339a68c9f863cc28243e25e7a2e Mon Sep 17 00:00:00 2001 From: joshmeradna Date: Sat, 22 Jul 2023 02:44:05 -0400 Subject: [PATCH 09/11] reconcile chunks on pod restart --- plugins/metrics/pkg/agent/buffer.go | 40 ++++++++++++++++++++++++++--- plugins/metrics/pkg/agent/runner.go | 5 +--- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index 82e2136da6..12cd77b41a 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -13,7 +13,7 @@ import ( "github.com/google/uuid" ) -var ErrBufferNotFound error = fmt.Errorf("buffer not found") +var ErrBufferNotFound = fmt.Errorf("buffer not found") type ChunkBuffer interface { // Add blo cks until the value can be added to the buffer. @@ -35,7 +35,6 @@ type diskBuffer struct { chunkChans map[string]chan string } -// todo: reconcile pre-existing chunks (useful for pod restarts during import) func NewDiskBuffer(dir string) (ChunkBuffer, error) { buffer := &diskBuffer{ dir: path.Join(BufferDir), @@ -46,9 +45,43 @@ func NewDiskBuffer(dir string) (ChunkBuffer, error) { return nil, fmt.Errorf("could not create buffer directory: %w", err) } + if err := buffer.reconcileExistingChunks(); err != nil { + return nil, err + } + return buffer, nil } +func (b *diskBuffer) reconcileExistingChunks() error { + entries, err := os.ReadDir(b.dir) + if err != nil { + return fmt.Errorf("could not reconcile existing chunks: %w", err) + } + + for _, e := range entries { + if !e.IsDir() { + continue + } + + chunkChan := make(chan string, 100) + + b.chanLocker.Lock() + b.chunkChans[e.Name()] = chunkChan + b.chanLocker.Unlock() + + subBufferDir := path.Join(b.dir, e.Name()) + subEntries, err := os.ReadDir(subBufferDir) + if err != nil { + return fmt.Errorf("could not reconcile existing chunks: %w", err) + } + + for _, se := range subEntries { + chunkChan <- path.Join(subBufferDir, se.Name()) + } + } + return nil +} + func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) error { b.chanLocker.RLock() chunkChan, found := b.chunkChans[name] @@ -62,7 +95,6 @@ func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) err b.chanLocker.Unlock() } - // todo: will create a new directory for each target name which will not be cleaned up internally filePath := path.Join(b.dir, name, uuid.New().String()) if err := os.MkdirAll(path.Dir(filePath), 0755); err != nil && !errors.Is(err, os.ErrExist) { @@ -123,7 +155,7 @@ func (b *diskBuffer) Get(ctx context.Context, name string) (WriteMetadata, error } } -func (b *diskBuffer) Delete(ctx context.Context, name string) error { +func (b *diskBuffer) Delete(_ context.Context, name string) error { b.chanLocker.Lock() delete(b.chunkChans, name) b.chanLocker.Unlock() diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 14683f1cac..934d2c666a 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -29,8 +29,7 @@ import ( var TimeDeltaMillis = time.Minute.Milliseconds() -// const BufferDir = "/var/lib/opni-agent/import-buffer" -const BufferDir = "import-buffer" +const BufferDir = "/var/lib/opni-agent/import-buffer" func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.LabelMatcher { pbLabelMatchers := make([]*prompb.LabelMatcher, 0, len(rrLabelMatchers)) @@ -81,8 +80,6 @@ type TargetRunMetadata struct { Query *remoteread.Query } -// todo: could probably find a better name for this -// todo: replace ProgressRatio and Query with a ProgressDelta type WriteMetadata struct { Query *prompb.Query WriteChunk *prompb.WriteRequest From 54689706d9c80db883b8e206e53dfd58e3424cff Mon Sep 17 00:00:00 2001 From: joshmeradna Date: Sat, 22 Jul 2023 05:22:30 -0400 Subject: [PATCH 10/11] fix data race with err --- plugins/metrics/pkg/agent/runner.go | 206 +++++++++++++++------------- 1 file changed, 111 insertions(+), 95 deletions(-) diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 934d2c666a..c3fb6720e8 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -17,7 +17,6 @@ import ( "github.com/rancher/opni/pkg/storage" "github.com/rancher/opni/pkg/task" "github.com/rancher/opni/pkg/util" - "github.com/rancher/opni/pkg/util/waitctx" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" "github.com/samber/lo" @@ -205,6 +204,46 @@ func (tr *taskRunner) doPush(ctx context.Context, writeRequest *prompb.WriteRequ } } +func (tr *taskRunner) runPush(ctx context.Context, stopChan chan struct{}, activeTask task.ActiveTask, run *TargetRunMetadata) error { + progress := &corev1.Progress{ + Current: 0, + Total: uint64(run.Query.EndTimestamp.AsTime().UnixMilli() - run.Query.StartTimestamp.AsTime().UnixMilli()), + } + activeTask.SetProgress(progress) + + for progress.Current < progress.Total { + select { + case <-ctx.Done(): + return nil + case <-stopChan: + return nil + default: // continue pushing + } + + meta, getErr := tr.buffer.Get(ctx, run.Target.Meta.Name) + if getErr != nil { + if !errors.Is(getErr, ErrBufferNotFound) { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", getErr.Error())) + } + continue + } + + activeTask.AddLogEntry(zapcore.DebugLevel, "received chunk from buffer") + + if err := tr.doPush(ctx, meta.WriteChunk); err != nil { + close(stopChan) + return err + } + + progressDelta := uint64(float64(meta.Query.EndTimestampMs-meta.Query.StartTimestampMs) * meta.ProgressRatio) + + progress.Current += progressDelta + activeTask.SetProgress(progress) + } + + return nil +} + func (tr *taskRunner) doRead(ctx context.Context, reader RemoteReader, run *TargetRunMetadata, readRequest *prompb.ReadRequest) (*prompb.ReadResponse, error) { expbackoff := tr.backoffPolicy.Start(ctx) @@ -227,126 +266,103 @@ func (tr *taskRunner) doRead(ctx context.Context, reader RemoteReader, run *Targ } } -func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { +func (tr *taskRunner) runRead(ctx context.Context, stopChan chan struct{}, activeTask task.ActiveTask, run *TargetRunMetadata) error { limit := util.DefaultWriteLimit() - wc := waitctx.FromContext(ctx) - - run := &TargetRunMetadata{} - activeTask.LoadTaskMetadata(run) - - var err error + labelMatchers := toLabelMatchers(run.Query.Matchers) - waitctx.Go(wc, func() { - labelMatchers := toLabelMatchers(run.Query.Matchers) + importEnd := run.Query.EndTimestamp.AsTime().UnixMilli() + nextStart := run.Query.StartTimestamp.AsTime().UnixMilli() + nextEnd := nextStart - importEnd := run.Query.EndTimestamp.AsTime().UnixMilli() - nextStart := run.Query.StartTimestamp.AsTime().UnixMilli() - nextEnd := nextStart + reader := NewRemoteReader(&http.Client{}) - reader := NewRemoteReader(&http.Client{}) - - for err == nil && nextStart < importEnd { - select { - case <-wc.Done(): - return - default: // continue reading - } + for nextStart < importEnd { + select { + case <-ctx.Done(): + return nil + case <-stopChan: + return nil + default: // continue reading + } - nextStart = nextEnd - nextEnd = nextStart + TimeDeltaMillis + nextStart = nextEnd + nextEnd = nextStart + TimeDeltaMillis - if nextStart >= importEnd { - break - } + if nextStart >= importEnd { + break + } - if nextEnd >= importEnd { - nextEnd = importEnd - } + if nextEnd >= importEnd { + nextEnd = importEnd + } - readRequest := &prompb.ReadRequest{ - Queries: []*prompb.Query{ - { - StartTimestampMs: nextStart, - EndTimestampMs: nextEnd, - Matchers: labelMatchers, - }, + readRequest := &prompb.ReadRequest{ + Queries: []*prompb.Query{ + { + StartTimestampMs: nextStart, + EndTimestampMs: nextEnd, + Matchers: labelMatchers, }, - } - - var readResponse *prompb.ReadResponse - - readResponse, err = tr.doRead(wc, reader, run, readRequest) - if err != nil { - return - } - - writeRequest := &prompb.WriteRequest{ - Timeseries: lo.Map(readResponse.Results[0].GetTimeseries(), func(t *prompb.TimeSeries, _ int) prompb.TimeSeries { - return lo.FromPtr(t) - }), - Metadata: []prompb.MetricMetadata{}, - } + }, + } - var chunks []*prompb.WriteRequest + readResponse, err := tr.doRead(ctx, reader, run, readRequest) + if err != nil { + close(stopChan) + return err + } - chunks, err = util.SplitChunksWithLimit(writeRequest, limit) - if err != nil { - return - } + writeRequest := &prompb.WriteRequest{ + Timeseries: lo.Map(readResponse.Results[0].GetTimeseries(), func(t *prompb.TimeSeries, _ int) prompb.TimeSeries { + return lo.FromPtr(t) + }), + Metadata: []prompb.MetricMetadata{}, + } - activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) + var chunks []*prompb.WriteRequest - lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { - if err := tr.buffer.Add(wc, run.Target.Meta.Name, WriteMetadata{ - Query: readRequest.Queries[0], - WriteChunk: chunk, - ProgressRatio: 1.0 / float64(len(chunks)), - }); err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not add chunk to buffer: %s", err.Error())) - } - }) + chunks, err = util.SplitChunksWithLimit(writeRequest, limit) + if err != nil { + close(stopChan) + return err } - }) - waitctx.Go(wc, func() { - progress := &corev1.Progress{ - Current: 0, - Total: uint64(run.Query.EndTimestamp.AsTime().UnixMilli() - run.Query.StartTimestamp.AsTime().UnixMilli()), - } - activeTask.SetProgress(progress) + activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) - for err == nil && progress.Current < progress.Total { - select { - case <-wc.Done(): - return - default: // continue pushing + lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { + if err := tr.buffer.Add(ctx, run.Target.Meta.Name, WriteMetadata{ + Query: readRequest.Queries[0], + WriteChunk: chunk, + ProgressRatio: 1.0 / float64(len(chunks)), + }); err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not add chunk to buffer: %s", err.Error())) } + }) + } - meta, getErr := tr.buffer.Get(wc, run.Target.Meta.Name) - if getErr != nil { - if !errors.Is(getErr, ErrBufferNotFound) { - activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", getErr.Error())) - } - continue - } + return nil +} - activeTask.AddLogEntry(zapcore.DebugLevel, "received chunk from buffer") +func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { + run := &TargetRunMetadata{} + activeTask.LoadTaskMetadata(run) - if err = tr.doPush(wc, meta.WriteChunk); err != nil { - return - } + stopChan := make(chan struct{}) - progressDelta := uint64(float64(meta.Query.EndTimestampMs-meta.Query.StartTimestampMs) * meta.ProgressRatio) + var eg util.MultiErrGroup - progress.Current += progressDelta - activeTask.SetProgress(progress) - } + eg.Go(func() error { + return tr.runRead(ctx, stopChan, activeTask, run) }) - waitctx.Wait(wc) + eg.Go(func() error { + return tr.runPush(ctx, stopChan, activeTask, run) + }) - if err != nil { + eg.Wait() + + if err := eg.Error(); err != nil { activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) return err } From e65f9f583f9bd61db873b3f4c5b6d2340287d73c Mon Sep 17 00:00:00 2001 From: joshmeranda Date: Mon, 24 Jul 2023 11:35:26 -0400 Subject: [PATCH 11/11] use in-memory buffer when volume is not mounted --- plugins/metrics/pkg/agent/buffer.go | 80 +++++++++++++++++++++++++---- plugins/metrics/pkg/agent/runner.go | 31 ++++++++--- test/plugins/metrics/runner_test.go | 2 +- 3 files changed, 93 insertions(+), 20 deletions(-) diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index 12cd77b41a..9c2f82bbcb 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -17,15 +17,69 @@ var ErrBufferNotFound = fmt.Errorf("buffer not found") type ChunkBuffer interface { // Add blo cks until the value can be added to the buffer. - Add(context.Context, string, WriteMetadata) error + Add(context.Context, string, ChunkMetadata) error // Get blocks until a value can be retrieved from the buffer. - Get(context.Context, string) (WriteMetadata, error) + Get(context.Context, string) (ChunkMetadata, error) // Delete removes a buffer for the named task from the buffer. Delete(context.Context, string) error } +type memoryBuffer struct { + chanLocker sync.RWMutex + chunkChan map[string]chan ChunkMetadata +} + +func NewMemoryBuffer() ChunkBuffer { + return &memoryBuffer{ + chunkChan: make(map[string]chan ChunkMetadata), + } +} + +func (b *memoryBuffer) Add(_ context.Context, name string, meta ChunkMetadata) error { + b.chanLocker.RLock() + chunkChan, found := b.chunkChan[name] + b.chanLocker.RUnlock() + + if !found { + chunkChan = make(chan ChunkMetadata) + + b.chanLocker.Lock() + b.chunkChan[name] = chunkChan + b.chanLocker.Unlock() + } + + chunkChan <- meta + + return nil +} + +func (b *memoryBuffer) Get(ctx context.Context, name string) (ChunkMetadata, error) { + b.chanLocker.RLock() + chunkChan, found := b.chunkChan[name] + b.chanLocker.RUnlock() + + if !found { + return ChunkMetadata{}, ErrBufferNotFound + } + + select { + case <-ctx.Done(): + return ChunkMetadata{}, ctx.Err() + case meta := <-chunkChan: + return meta, nil + } +} + +func (b *memoryBuffer) Delete(_ context.Context, name string) error { + b.chanLocker.Lock() + delete(b.chunkChan, name) + b.chanLocker.Unlock() + + return nil +} + type diskBuffer struct { dir string @@ -55,6 +109,10 @@ func NewDiskBuffer(dir string) (ChunkBuffer, error) { func (b *diskBuffer) reconcileExistingChunks() error { entries, err := os.ReadDir(b.dir) if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return fmt.Errorf("could not reconcile existing chunks: %w", err) } @@ -82,7 +140,7 @@ func (b *diskBuffer) reconcileExistingChunks() error { return nil } -func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) error { +func (b *diskBuffer) Add(_ context.Context, name string, meta ChunkMetadata) error { b.chanLocker.RLock() chunkChan, found := b.chunkChans[name] b.chanLocker.RUnlock() @@ -119,36 +177,36 @@ func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) err return nil } -func (b *diskBuffer) Get(ctx context.Context, name string) (WriteMetadata, error) { +func (b *diskBuffer) Get(ctx context.Context, name string) (ChunkMetadata, error) { b.chanLocker.RLock() chunkChan, found := b.chunkChans[name] b.chanLocker.RUnlock() if !found { - return WriteMetadata{}, ErrBufferNotFound + return ChunkMetadata{}, ErrBufferNotFound } select { case <-ctx.Done(): - return WriteMetadata{}, ctx.Err() + return ChunkMetadata{}, ctx.Err() case path := <-chunkChan: compressed, err := os.ReadFile(path) if err != nil { - return WriteMetadata{}, fmt.Errorf("could not read chunk from buffer: %w", err) + return ChunkMetadata{}, fmt.Errorf("could not read chunk from buffer: %w", err) } uncompressed, err := snappy.Decode(nil, compressed) if err != nil { - return WriteMetadata{}, fmt.Errorf("could not decompress chunk from buffer: %w", err) + return ChunkMetadata{}, fmt.Errorf("could not decompress chunk from buffer: %w", err) } - var meta WriteMetadata + var meta ChunkMetadata if err := json.Unmarshal(uncompressed, &meta); err != nil { - return WriteMetadata{}, fmt.Errorf("could not unmarshal chunk from buffer: %w", err) + return ChunkMetadata{}, fmt.Errorf("could not unmarshal chunk from buffer: %w", err) } if err := os.Remove(path); err != nil { - return WriteMetadata{}, fmt.Errorf("could not remove chunk file from disk, data may linger on system longer than expected: %w", err) + return ChunkMetadata{}, fmt.Errorf("could not remove chunk file from disk, data may linger on system longer than expected: %w", err) } return meta, nil diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index c3fb6720e8..4d427172a0 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "os" "strings" "sync" "time" @@ -74,12 +75,17 @@ func getMessageFromTaskLogs(logs []*corev1.LogEntry) string { return "" } +func dirExists(p string) bool { + _, err := os.Stat(p) + return err == nil +} + type TargetRunMetadata struct { Target *remoteread.Target Query *remoteread.Query } -type WriteMetadata struct { +type ChunkMetadata struct { Query *prompb.Query WriteChunk *prompb.WriteRequest @@ -141,9 +147,20 @@ type taskRunner struct { } func newTaskRunner(logger *zap.SugaredLogger) (*taskRunner, error) { - buffer, err := NewDiskBuffer(BufferDir) - if err != nil { - return nil, fmt.Errorf("could not create buffer: %w", err) + logger = logger.Named("task-runner") + + var buffer ChunkBuffer + var err error + + // if buffer volume is not mounted use a simple blocking buffer + if !dirExists(BufferDir) { + logger.Infof("buffer not enabled, using in memory buffer") + buffer = NewMemoryBuffer() + } else { + buffer, err = NewDiskBuffer(BufferDir) + if err != nil { + return nil, fmt.Errorf("could not create buffer: %w", err) + } } return &taskRunner{ @@ -153,7 +170,7 @@ func newTaskRunner(logger *zap.SugaredLogger) (*taskRunner, error) { backoff.WithMaxInterval(5*time.Minute), backoff.WithMultiplier(1.1), ), - logger: logger.Named("task-runner"), + logger: logger, buffer: buffer, }, nil } @@ -331,7 +348,7 @@ func (tr *taskRunner) runRead(ctx context.Context, stopChan chan struct{}, activ activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { - if err := tr.buffer.Add(ctx, run.Target.Meta.Name, WriteMetadata{ + if err := tr.buffer.Add(ctx, run.Target.Meta.Name, ChunkMetadata{ Query: readRequest.Queries[0], WriteChunk: chunk, ProgressRatio: 1.0 / float64(len(chunks)), @@ -519,8 +536,6 @@ func (runner *taskingTargetRunner) GetStatus(name string) (*remoteread.TargetSta State: state, } - // fmt.Printf("=== [taskingTargetRunner.GetStatus] 000 %s ===\n", status) - return status, nil } diff --git a/test/plugins/metrics/runner_test.go b/test/plugins/metrics/runner_test.go index 2a024619c1..55e29f2c21 100644 --- a/test/plugins/metrics/runner_test.go +++ b/test/plugins/metrics/runner_test.go @@ -157,7 +157,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { Eventually(func() remoteread.TargetState { status, _ = runner.GetStatus(target.Meta.Name) return status.State - }).Should(Equal(remoteread.TargetState_Completed)) + }, "5s").Should(Equal(remoteread.TargetState_Completed)) Eventually(func() string { status, _ = runner.GetStatus(target.Meta.Name)