Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Parrallel import #1588

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
28 changes: 28 additions & 0 deletions packages/opni-agent/opni-agent/charts/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ spec:
mountPath: /etc/opni
- name: plugins
mountPath: /var/lib/opni-agent/plugins
{{- if .Values.import-buffer.enabled }}
- name: import-buffer
mountPath: /var/lib/opni-agent/import-buffer
{{- end }}
{{- if .Values.volumeMounts }}
{{- toYaml .Values.volumeMounts | nindent 12 }}
{{- end }}
Expand Down Expand Up @@ -175,6 +179,11 @@ spec:
{{- if .Values.volumes }}
{{- toYaml .Values.volumes | nindent 8 }}
{{- end }}
{{- .Values.import-buffer.enabled }}
- name: import-buffer
persistentVolumeClaim:
claimName: {{ include "opni-agent.fullname" .}}-import-buffer
{{- end }}
{{- if eq .Values.persistence.mode "pvc" }}
---
apiVersion: v1
Expand All @@ -194,4 +203,23 @@ spec:
resources:
requests:
storage: 2Gi
{{- end }}
{{- if .Values.import-buffer.enabled }}
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ include "onpi-agent.fullname" .}}-import-buffer
namespace: {{ include "opni-agent.namespace" . }}
labels:
{{- include "opni-agent.labels" . | nindent 4 }}
opni.io/app: agent
spec:
accessModes:
- ReadWriteOnce
{{- if .Values.global.storageClass }}
storageClassName: {{ .Values.global.storageClass }}
{{- end }}
resources:
{{- toYaml .Values.importBuffer.resources | nindent 4 }}
{{- end }}
6 changes: 6 additions & 0 deletions packages/opni-agent/opni-agent/charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ kube-prometheus-stack:
alertmanager:
enabled: false # disable the default Alertmanager deployment

import-buffer:
enabled: false
resources:
requests:
storage: 10Gi

global:
cattle:
systemDefaultRegistry: ""
Expand Down
1 change: 0 additions & 1 deletion pkg/gateway/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (d *DelegateServer) Request(ctx context.Context, req *streamv1.DelegatedMes
"target", targetId,
"request", req.GetRequest().QualifiedMethodName(),
)
lg.Debug("delegating rpc request")
target, ok := d.activeAgents[targetId]
if ok {
fwdResp := &totem.RPC{}
Expand Down
227 changes: 227 additions & 0 deletions plugins/metrics/pkg/agent/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package agent

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path"
"sync"

"github.com/golang/snappy"
"github.com/google/uuid"
)

var ErrBufferNotFound = fmt.Errorf("buffer not found")

type ChunkBuffer interface {
// Add blo cks until the value can be added to the buffer.
Add(context.Context, string, ChunkMetadata) error

// Get blocks until a value can be retrieved from the buffer.
Get(context.Context, string) (ChunkMetadata, error)

// Delete removes a buffer for the named task from the buffer.
Delete(context.Context, string) error
}

type memoryBuffer struct {
chanLocker sync.RWMutex
chunkChan map[string]chan ChunkMetadata
}

func NewMemoryBuffer() ChunkBuffer {
return &memoryBuffer{
chunkChan: make(map[string]chan ChunkMetadata),
}
}

func (b *memoryBuffer) Add(_ context.Context, name string, meta ChunkMetadata) error {
b.chanLocker.RLock()
chunkChan, found := b.chunkChan[name]
b.chanLocker.RUnlock()

if !found {
chunkChan = make(chan ChunkMetadata)

b.chanLocker.Lock()
b.chunkChan[name] = chunkChan
b.chanLocker.Unlock()
}

chunkChan <- meta

return nil
}

func (b *memoryBuffer) Get(ctx context.Context, name string) (ChunkMetadata, error) {
b.chanLocker.RLock()
chunkChan, found := b.chunkChan[name]
b.chanLocker.RUnlock()

if !found {
return ChunkMetadata{}, ErrBufferNotFound
}

select {
case <-ctx.Done():
return ChunkMetadata{}, ctx.Err()
case meta := <-chunkChan:
return meta, nil
}
}

func (b *memoryBuffer) Delete(_ context.Context, name string) error {
b.chanLocker.Lock()
delete(b.chunkChan, name)
b.chanLocker.Unlock()

return nil
}

type diskBuffer struct {
dir string

diskWriteLock sync.Mutex

chanLocker sync.RWMutex
chunkChans map[string]chan string
}

func NewDiskBuffer(dir string) (ChunkBuffer, error) {
buffer := &diskBuffer{
dir: path.Join(BufferDir),
chunkChans: make(map[string]chan string),
}

if err := os.MkdirAll(buffer.dir, 0755); err != nil {
return nil, fmt.Errorf("could not create buffer directory: %w", err)
}

if err := buffer.reconcileExistingChunks(); err != nil {
return nil, err
}

return buffer, nil
}

func (b *diskBuffer) reconcileExistingChunks() error {
entries, err := os.ReadDir(b.dir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}

return fmt.Errorf("could not reconcile existing chunks: %w", err)
}

for _, e := range entries {
if !e.IsDir() {
continue
}

chunkChan := make(chan string, 100)

b.chanLocker.Lock()
b.chunkChans[e.Name()] = chunkChan
b.chanLocker.Unlock()

subBufferDir := path.Join(b.dir, e.Name())
subEntries, err := os.ReadDir(subBufferDir)
if err != nil {
return fmt.Errorf("could not reconcile existing chunks: %w", err)
}

for _, se := range subEntries {
chunkChan <- path.Join(subBufferDir, se.Name())
}
}
return nil
}

func (b *diskBuffer) Add(_ context.Context, name string, meta ChunkMetadata) error {
b.chanLocker.RLock()
chunkChan, found := b.chunkChans[name]
b.chanLocker.RUnlock()

if !found {
chunkChan = make(chan string, 100)

b.chanLocker.Lock()
b.chunkChans[name] = chunkChan
b.chanLocker.Unlock()
}

filePath := path.Join(b.dir, name, uuid.New().String())

if err := os.MkdirAll(path.Dir(filePath), 0755); err != nil && !errors.Is(err, os.ErrExist) {
return fmt.Errorf("could not create buffer directory for target '%s': %w", name, err)
}

uncompressed, err := json.Marshal(meta)
if err != nil {
return fmt.Errorf("could not marshal chunk for buffer: %w", err)
}

compressed := snappy.Encode(nil, uncompressed)

b.diskWriteLock.Lock()
if err := os.WriteFile(filePath, compressed, 0644); err != nil {
return fmt.Errorf("could not write chunk to buffer: %w", err)
}
b.diskWriteLock.Unlock()

chunkChan <- filePath

return nil
}

func (b *diskBuffer) Get(ctx context.Context, name string) (ChunkMetadata, error) {
b.chanLocker.RLock()
chunkChan, found := b.chunkChans[name]
b.chanLocker.RUnlock()

if !found {
return ChunkMetadata{}, ErrBufferNotFound
}

select {
case <-ctx.Done():
return ChunkMetadata{}, ctx.Err()
case path := <-chunkChan:
compressed, err := os.ReadFile(path)
if err != nil {
return ChunkMetadata{}, fmt.Errorf("could not read chunk from buffer: %w", err)
}

uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
return ChunkMetadata{}, fmt.Errorf("could not decompress chunk from buffer: %w", err)
}

var meta ChunkMetadata
if err := json.Unmarshal(uncompressed, &meta); err != nil {
return ChunkMetadata{}, fmt.Errorf("could not unmarshal chunk from buffer: %w", err)
}

if err := os.Remove(path); err != nil {
return ChunkMetadata{}, fmt.Errorf("could not remove chunk file from disk, data may linger on system longer than expected: %w", err)
}

return meta, nil
}
}

func (b *diskBuffer) Delete(_ context.Context, name string) error {
b.chanLocker.Lock()
delete(b.chunkChans, name)
b.chanLocker.Unlock()

subBufferDir := path.Join(b.dir, name)
if err := os.RemoveAll(subBufferDir); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("could not remove buffer directory: %w", err)
}

return nil
}
2 changes: 0 additions & 2 deletions plugins/metrics/pkg/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package agent
import (
"context"
"fmt"
"net/http"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -68,7 +67,6 @@ func NewMetricsNode(ct health.ConditionTracker, lg *zap.SugaredLogger) *MetricsN
targetRunner: NewTargetRunner(lg),
}
mn.conditions.AddListener(mn.sendHealthUpdate)
mn.targetRunner.SetRemoteReaderClient(NewRemoteReader(&http.Client{}))

// FIXME: this is a hack, update the old sync code to use delegates instead
mn.conditions.AddListener(func(key string) {
Expand Down
Loading