Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ terraform.rc

### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
Expand Down
208 changes: 143 additions & 65 deletions cmd/alert_enricher/enricher/enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package enricher
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/harishhary/blink/internal/broker"
Expand All @@ -19,9 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
defaultEnrichmentTimeout = 5 * time.Second
)
const defaultEnrichmentTimeout = 5 * time.Second

var (
alertsIn = promauto.NewCounter(prometheus.CounterOpts{Namespace: "blink", Subsystem: "alert_enricher", Name: "alerts_in_total"})
Expand All @@ -34,6 +31,13 @@ var (
writeErrors = promauto.NewCounter(prometheus.CounterOpts{Namespace: "blink", Subsystem: "alert_enricher", Name: "write_errors_total"})
)

// alertState holds a decoded alert and its enrichment outcome for a batch entry.
type alertState struct {
key []byte
alert *alerts.Alert
deadLetter bool
}

// EnricherService reads alerts from Kafka, enriches them, and writes to the formatter topic.
type EnricherService struct {
svcctx.ServiceContext
Expand Down Expand Up @@ -71,73 +75,147 @@ func NewEnricherService(pool *enrichcatalog.Pool) (*EnricherService, error) {

func (service *EnricherService) Name() string { return "alert-enricher" }

// Reads alerts from Kafka, applies enrichments declared by the alert's rule, and writes to the formatter topic.
func (service *EnricherService) Run(ctx context.Context) errors.Error {
return services.RunAlertPipeline(ctx, service.Logger, service.reader, service.writer, service.dlq, 50,
services.PipelineCounters{
In: alertsIn.Inc, Out: alertsOut.Inc, DLQ: alertsDLQ.Inc,
ParseError: parseErrors.Inc, WriteError: writeErrors.Inc,
},
func(ctx context.Context, _ []byte, alert *alerts.Alert) (skip bool, deadLetter bool) {
service.Info("enriching alert %s", alert.AlertID)

applied := make(map[string]struct{}, len(alert.EnrichmentsApplied))
for _, name := range alert.EnrichmentsApplied {
applied[name] = struct{}{}
for {
msgs, err := service.reader.ReadBatch(ctx, 50)
if err != nil {
if ctx.Err() != nil {
return nil
}
service.Error(errors.NewE(err))
continue
}

var (
anyMissing atomic.Bool
mu sync.Mutex
succeeded []string
wg sync.WaitGroup
)
for _, name := range alert.Rule.Enrichments() {
if _, done := applied[name]; done {
continue
}
wg.Add(1)
go func(enrName string) {
defer wg.Done()

cctx, cancel := context.WithTimeout(ctx, defaultEnrichmentTimeout)
defer cancel()
start := time.Now()
absent, removed, err := service.pool.Enrich(cctx, enrName, alert, "")
switch {
case removed:
anyMissing.Store(true)
service.Error(errors.NewF("enrichment %s removed - alert %s missing enrichment", enrName, alert.AlertID))
case absent:
anyMissing.Store(true)
service.Error(errors.NewF("enrichment %s not found - alert %s missing enrichment", enrName, alert.AlertID))
case err != nil:
enrichmentErrors.WithLabelValues(enrName).Inc()
service.Error(errors.NewF("enrichment %s failed: %v", enrName, err))
default:
enrichmentsApplied.WithLabelValues(enrName).Inc()
mu.Lock()
succeeded = append(succeeded, enrName)
mu.Unlock()
}
enrichmentLatency.WithLabelValues(enrName).Observe(time.Since(start).Seconds())
}(name)
service.processBatch(ctx, msgs)

if err := service.reader.CommitMessages(ctx, msgs...); err != nil {
if ctx.Err() != nil {
return nil
}
wg.Wait()
service.Error(errors.NewE(err))
}
}
}

alert.EnrichmentsApplied = append(alert.EnrichmentsApplied, succeeded...)
func (service *EnricherService) processBatch(ctx context.Context, msgs []broker.Message) {
// Decode all alerts.
states := make([]*alertState, 0, len(msgs))
for _, m := range msgs {
alert, err := alerts.Unmarshal(m.Value)
if err != nil {
parseErrors.Inc()
service.Error(errors.NewE(err))
continue
}
alertsIn.Inc()
states = append(states, &alertState{key: m.Key, alert: alert})
}
if len(states) == 0 {
return
}

if anyMissing.Load() {
alert.Attempts++
if alert.Attempts >= services.MaxPluginAttempts || service.dlq == nil {
service.Info("alert %s passed through after %d attempts (enrichment unavailable)", alert.AlertID, alert.Attempts)
alert.EnrichmentsApplied = nil
return false, false
// Group by enrichment name: name → indices into states.
// Respect already-applied enrichments from prior DLQ retries.
byEnrichment := make(map[string][]int)
for i, s := range states {
applied := make(map[string]struct{}, len(s.alert.EnrichmentsApplied))
for _, name := range s.alert.EnrichmentsApplied {
applied[name] = struct{}{}
}
for _, name := range s.alert.Rule.Enrichments() {
if _, done := applied[name]; done {
continue
}
byEnrichment[name] = append(byEnrichment[name], i)
}
}

// Fan out: one goroutine per enrichment with all its alerts.
var mu sync.Mutex
var wg sync.WaitGroup
for name, idxs := range byEnrichment {
wg.Add(1)
go func(name string, idxs []int) {
defer wg.Done()

batch := make([]*alerts.Alert, len(idxs))
for j, idx := range idxs {
batch[j] = states[idx].alert
}

cctx, cancel := context.WithTimeout(ctx, defaultEnrichmentTimeout)
defer cancel()
start := time.Now()
absent, removed, errs := service.pool.Enrich(cctx, name, batch, "")
enrichmentLatency.WithLabelValues(name).Observe(time.Since(start).Seconds())

mu.Lock()
defer mu.Unlock()
switch {
case removed:
service.Error(errors.NewF("enrichment %s removed", name))
for _, idx := range idxs {
states[idx].deadLetter = true
}
case absent:
service.Error(errors.NewF("enrichment %s not found", name))
for _, idx := range idxs {
states[idx].deadLetter = true
}
default:
for j, idx := range idxs {
if errs[j] != nil {
enrichmentErrors.WithLabelValues(name).Inc()
service.Error(errs[j])
} else {
enrichmentsApplied.WithLabelValues(name).Inc()
states[idx].alert.EnrichmentsApplied = append(states[idx].alert.EnrichmentsApplied, name)
}
}
return false, true
}
alert.EnrichmentsApplied = nil
return false, false
},
)
}(name, idxs)
}
wg.Wait()

// Write results.
for _, s := range states {
if s.deadLetter {
s.alert.Attempts++
if s.alert.Attempts >= services.MaxPluginAttempts || service.dlq == nil {
service.Info("alert %s passed through after %d attempts (enrichment unavailable)", s.alert.AlertID, s.alert.Attempts)
s.alert.EnrichmentsApplied = nil
// fall through to write
} else {
payload, err := alerts.Marshal(s.alert)
if err != nil {
writeErrors.Inc()
service.Error(errors.NewE(err))
continue
}
err = service.dlq.WriteMessages(ctx, broker.Message{Key: s.key, Value: payload})
if err != nil {
writeErrors.Inc()
service.Error(errors.NewE(err))
} else {
alertsDLQ.Inc()
}
continue
}
}

s.alert.EnrichmentsApplied = nil
payload, err := alerts.Marshal(s.alert)
if err != nil {
writeErrors.Inc()
service.Error(errors.NewE(err))
continue
}
err = service.writer.WriteMessages(ctx, broker.Message{Key: s.key, Value: payload})
if err != nil {
writeErrors.Inc()
service.Error(errors.NewE(err))
continue
}
alertsOut.Inc()
}
}
14 changes: 11 additions & 3 deletions cmd/alert_enricher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

"github.com/harishhary/blink/cmd/alert_enricher/enricher"
"github.com/harishhary/blink/internal/logger"
"github.com/harishhary/blink/internal/pluginmgr"
"github.com/harishhary/blink/internal/plugin"
"github.com/harishhary/blink/internal/services"
"github.com/harishhary/blink/pkg/enrichments"
enrichmentconfig "github.com/harishhary/blink/pkg/enrichments/config"
pools "github.com/harishhary/blink/internal/pools"
enrichcatalog "github.com/harishhary/blink/pkg/enrichments/pool"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -29,15 +30,21 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

pluginDir := os.Getenv("ENRICHER_PLUGIN_DIR")
cfgWatcher, err := enrichmentconfig.NewWatcher(pluginDir)
if err != nil {
log.Fatalf("enrichment config watcher: %v", err)
}

routingTable := pools.NewRoutingTable()
enricherPool := enrichcatalog.NewPool(routingTable, 0)

syncSvc, err := services.NewPluginSyncService(
"alert-enricher-sync",
"BLINK-ALERT-ENRICHER - SYNC",
"ENRICHER_PLUGIN_DIR",
func(log *logger.Logger, dir string) pluginmgr.Plugin {
return enrichments.NewManager(log, enricherPool.Sync, dir)
func(log *logger.Logger, dir string) plugin.Plugin {
return enrichments.NewManager(log, enricherPool.Sync, dir, cfgWatcher)
},
)
if err != nil {
Expand All @@ -50,6 +57,7 @@ func main() {

runner := services.New()
runner.Register(
cfgWatcher,
syncSvc,
enricherSvc,
)
Expand Down
Loading
Loading