diff --git a/Dockerfile.alertmanager b/Dockerfile.alertmanager new file mode 100644 index 0000000000..37c2894ae9 --- /dev/null +++ b/Dockerfile.alertmanager @@ -0,0 +1,49 @@ +# Multi-stage Dockerfile for Alertmanager +FROM golang:1.23-alpine AS builder + +# Install build dependencies +RUN apk add --no-cache git make bash nodejs npm + +# Set working directory +WORKDIR /app + +# Copy go mod files +COPY go.mod go.sum ./ +RUN go mod download + +# Copy source code +COPY . . + +# Build the binaries (skip UI build for faster compilation) +# Force native architecture build +RUN CGO_ENABLED=0 GOOS=linux go build -o alertmanager ./cmd/alertmanager +RUN CGO_ENABLED=0 GOOS=linux go build -o amtool ./cmd/amtool + +# Final stage +FROM alpine:latest + +# Install ca-certificates for TLS with retry logic +RUN apk update && apk --no-cache add ca-certificates tzdata || \ + (sleep 5 && apk update && apk --no-cache add ca-certificates tzdata) + +# Create alertmanager user +RUN addgroup -S alertmanager && adduser -S alertmanager -G alertmanager + +# Copy binaries from builder stage +COPY --from=builder /app/alertmanager /bin/alertmanager +COPY --from=builder /app/amtool /bin/amtool + +# Copy default configuration +COPY examples/ha/alertmanager.yml /etc/alertmanager/alertmanager.yml + +# Create directories and set permissions +RUN mkdir -p /alertmanager && \ + chown -R alertmanager:alertmanager /etc/alertmanager /alertmanager + +USER alertmanager +EXPOSE 9093 +VOLUME ["/alertmanager"] +WORKDIR /alertmanager +ENTRYPOINT ["/bin/alertmanager"] +CMD ["--config.file=/etc/alertmanager/alertmanager.yml", \ + "--storage.path=/alertmanager"] diff --git a/Dockerfile.webhook b/Dockerfile.webhook new file mode 100644 index 0000000000..05832edc02 --- /dev/null +++ b/Dockerfile.webhook @@ -0,0 +1,28 @@ +# Dockerfile for webhook echo server +FROM golang:1.23-alpine AS builder + +# Set working directory +WORKDIR /app + +# Copy the echo.go file +COPY examples/webhook/echo.go . + +# Build the binary +RUN CGO_ENABLED=0 go build -o webhook-server echo.go + +# Final stage +FROM alpine:latest + +# Install ca-certificates for TLS with retry logic +RUN apk update && apk --no-cache add ca-certificates || \ + (sleep 5 && apk update && apk --no-cache add ca-certificates) + +# Create webhook user +RUN addgroup -S webhook && adduser -S webhook -G webhook + +# Copy binary from builder stage +COPY --from=builder /app/webhook-server /bin/webhook-server + +USER webhook +EXPOSE 5001 +ENTRYPOINT ["/bin/webhook-server"] diff --git a/alertmanager-1.yml b/alertmanager-1.yml new file mode 100644 index 0000000000..de2bbd60a9 --- /dev/null +++ b/alertmanager-1.yml @@ -0,0 +1,16 @@ +route: + group_by: ['alertname'] + group_wait: 30s + group_interval: 5m + repeat_interval: 1h + receiver: 'web.hook' +receivers: + - name: 'web.hook' + webhook_configs: + - url: 'http://webhook-server:5001/' +inhibit_rules: + - source_match: + severity: 'critical' + target_match: + severity: 'warning' + equal: ['alertname', 'dev', 'instance'] diff --git a/alertmanager-2.yml b/alertmanager-2.yml new file mode 100644 index 0000000000..de2bbd60a9 --- /dev/null +++ b/alertmanager-2.yml @@ -0,0 +1,16 @@ +route: + group_by: ['alertname'] + group_wait: 30s + group_interval: 5m + repeat_interval: 1h + receiver: 'web.hook' +receivers: + - name: 'web.hook' + webhook_configs: + - url: 'http://webhook-server:5001/' +inhibit_rules: + - source_match: + severity: 'critical' + target_match: + severity: 'warning' + equal: ['alertname', 'dev', 'instance'] diff --git a/alertmanager-3.yml b/alertmanager-3.yml new file mode 100644 index 0000000000..de2bbd60a9 --- /dev/null +++ b/alertmanager-3.yml @@ -0,0 +1,16 @@ +route: + group_by: ['alertname'] + group_wait: 30s + group_interval: 5m + repeat_interval: 1h + receiver: 'web.hook' +receivers: + - name: 'web.hook' + webhook_configs: + - url: 'http://webhook-server:5001/' +inhibit_rules: + - source_match: + severity: 'critical' + target_match: + severity: 'warning' + equal: ['alertname', 'dev', 'instance'] diff --git a/cluster/dbcluster.go b/cluster/dbcluster.go new file mode 100644 index 0000000000..8598782b4a --- /dev/null +++ b/cluster/dbcluster.go @@ -0,0 +1,418 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "fmt" + "log/slog" + "sort" + "sync" + "time" + + "github.com/gofrs/uuid" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/alertmanager/db" +) + +// DBPeer implements ClusterPeer interface using a database for shared state +// instead of gossip protocol. +type DBPeer struct { + db db.DB + nodeID string + address string + logger *slog.Logger + metrics *dbPeerMetrics + + // State management + mtx sync.RWMutex + states map[string]State + channels map[string]*DBChannel + ready bool + readyc chan struct{} + + // Background tasks + syncInterval time.Duration + nodeTimeout time.Duration + stopc chan struct{} + wg sync.WaitGroup + + // Cluster state cache + nodes []db.Node + nodesMtx sync.RWMutex +} + +type dbPeerMetrics struct { + syncTotal prometheus.Counter + syncErrorsTotal prometheus.Counter + syncDuration prometheus.Histogram + nodesActive prometheus.Gauge + statesTotal prometheus.Gauge + channelOperations *prometheus.CounterVec +} + +func newDBPeerMetrics(reg prometheus.Registerer) *dbPeerMetrics { + m := &dbPeerMetrics{ + syncTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_cluster_db_sync_total", + Help: "Total number of database sync operations.", + }), + syncErrorsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "alertmanager_cluster_db_sync_errors_total", + Help: "Total number of database sync errors.", + }), + syncDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "alertmanager_cluster_db_sync_duration_seconds", + Help: "Duration of database sync operations.", + Buckets: prometheus.DefBuckets, + }), + nodesActive: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_cluster_db_nodes_active", + Help: "Number of active nodes in the cluster.", + }), + statesTotal: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "alertmanager_cluster_db_states_total", + Help: "Number of registered states in the cluster.", + }), + channelOperations: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "alertmanager_cluster_db_channel_operations_total", + Help: "Total number of channel operations.", + }, + []string{"channel", "operation"}, + ), + } + + if reg != nil { + reg.MustRegister( + m.syncTotal, + m.syncErrorsTotal, + m.syncDuration, + m.nodesActive, + m.statesTotal, + m.channelOperations, + ) + } + + return m +} + +// DBPeerConfig holds the configuration for a database-backed peer. +type DBPeerConfig struct { + DB db.DB + NodeID string + Address string + SyncInterval time.Duration + NodeTimeout time.Duration + Logger *slog.Logger + Metrics prometheus.Registerer +} + +// NewDBPeer creates a new database-backed cluster peer. +func NewDBPeer(config DBPeerConfig) (*DBPeer, error) { + if config.DB == nil { + return nil, fmt.Errorf("database is required") + } + + if config.NodeID == "" { + nodeUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("failed to generate node ID: %w", err) + } + config.NodeID = nodeUUID.String() + } + + if config.SyncInterval == 0 { + config.SyncInterval = 30 * time.Second + } + + if config.NodeTimeout == 0 { + config.NodeTimeout = 5 * time.Minute + } + + if config.Logger == nil { + config.Logger = slog.Default() + } + + if config.Address == "" { + config.Address = "unknown" + } + + p := &DBPeer{ + db: config.DB, + nodeID: config.NodeID, + address: config.Address, + logger: config.Logger, + metrics: newDBPeerMetrics(config.Metrics), + states: make(map[string]State), + channels: make(map[string]*DBChannel), + readyc: make(chan struct{}), + syncInterval: config.SyncInterval, + nodeTimeout: config.NodeTimeout, + stopc: make(chan struct{}), + } + + return p, nil +} + +// Join registers this node with the cluster and starts background sync. +func (p *DBPeer) Join(reconnectInterval, reconnectTimeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Register this node in the database + if err := p.db.RegisterNode(ctx, p.nodeID, p.address); err != nil { + return fmt.Errorf("failed to register node: %w", err) + } + + // Start background sync + p.wg.Add(2) + go p.syncLoop() + go p.cleanupLoop() + + // Mark as ready immediately for database clustering + p.mtx.Lock() + if !p.ready { + p.ready = true + close(p.readyc) + } + p.mtx.Unlock() + + p.logger.Info("joined database cluster", "node_id", p.nodeID, "address", p.address) + return nil +} + +// Leave removes this node from the cluster and stops background sync. +func (p *DBPeer) Leave(timeout time.Duration) error { + close(p.stopc) + p.wg.Wait() + + p.logger.Info("left database cluster", "node_id", p.nodeID) + return nil +} + +// Settle waits for the cluster to be ready (immediate for database implementation). +func (p *DBPeer) Settle(ctx context.Context, interval time.Duration) { + // For database implementation, we're ready immediately after joining + p.mtx.Lock() + if !p.ready { + p.ready = true + close(p.readyc) + } + p.mtx.Unlock() + + p.logger.Info("database cluster settled", "node_id", p.nodeID) +} + +// WaitReady waits until the peer is ready. +func (p *DBPeer) WaitReady(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.readyc: + return nil + } +} + +// Ready returns true if the peer is ready. +func (p *DBPeer) Ready() bool { + p.mtx.RLock() + defer p.mtx.RUnlock() + return p.ready +} + +// Name returns the unique ID of this peer. +func (p *DBPeer) Name() string { + return p.nodeID +} + +// ClusterSize returns the number of active nodes in the cluster. +func (p *DBPeer) ClusterSize() int { + p.nodesMtx.RLock() + defer p.nodesMtx.RUnlock() + return len(p.nodes) +} + +// Status returns the status of the peer. +func (p *DBPeer) Status() string { + if p.Ready() { + return "ready" + } + return "settling" +} + +// Peers returns the list of cluster members. +func (p *DBPeer) Peers() []ClusterMember { + p.nodesMtx.RLock() + defer p.nodesMtx.RUnlock() + + peers := make([]ClusterMember, len(p.nodes)) + for i, node := range p.nodes { + peers[i] = DBMember{node: node} + } + return peers +} + +// Position returns the position of this peer in the cluster. +func (p *DBPeer) Position() int { + p.nodesMtx.RLock() + defer p.nodesMtx.RUnlock() + + // Create a sorted list of node IDs to ensure consistent ordering + nodeIDs := make([]string, len(p.nodes)) + for i, node := range p.nodes { + nodeIDs[i] = node.ID + } + sort.Strings(nodeIDs) + + // Find our position + for i, nodeID := range nodeIDs { + if nodeID == p.nodeID { + return i + } + } + return 0 +} + +// AddState adds a new state to be managed by the cluster. +func (p *DBPeer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.states[key] = s + + channel := &DBChannel{ + key: key, + peer: p, + logger: p.logger.With("channel", key), + } + + p.channels[key] = channel + p.metrics.statesTotal.Set(float64(len(p.states))) + + return channel +} + +// syncLoop runs periodic synchronization with the database. +func (p *DBPeer) syncLoop() { + defer p.wg.Done() + + ticker := time.NewTicker(p.syncInterval) + defer ticker.Stop() + + for { + select { + case <-p.stopc: + return + case <-ticker.C: + p.sync() + } + } +} + +// cleanupLoop runs periodic cleanup of inactive nodes. +func (p *DBPeer) cleanupLoop() { + defer p.wg.Done() + + ticker := time.NewTicker(p.syncInterval * 2) // Less frequent cleanup + defer ticker.Stop() + + for { + select { + case <-p.stopc: + return + case <-ticker.C: + p.cleanup() + } + } +} + +// sync performs a synchronization cycle. +func (p *DBPeer) sync() { + start := time.Now() + defer func() { + p.metrics.syncDuration.Observe(time.Since(start).Seconds()) + }() + + ctx, cancel := context.WithTimeout(context.Background(), p.syncInterval/2) + defer cancel() + + // Update our heartbeat + if err := p.db.UpdateNodeHeartbeat(ctx, p.nodeID); err != nil { + p.logger.Warn("failed to update node heartbeat", "err", err) + p.metrics.syncErrorsTotal.Inc() + return + } + + // Get active nodes + nodes, err := p.db.GetActiveNodes(ctx) + if err != nil { + p.logger.Warn("failed to get active nodes", "err", err) + p.metrics.syncErrorsTotal.Inc() + return + } + + // Update our node cache + p.nodesMtx.Lock() + p.nodes = nodes + p.nodesMtx.Unlock() + + p.metrics.nodesActive.Set(float64(len(nodes))) + p.metrics.syncTotal.Inc() +} + +// cleanup removes inactive nodes from the cluster. +func (p *DBPeer) cleanup() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := p.db.RemoveInactiveNodes(ctx, p.nodeTimeout); err != nil { + p.logger.Warn("failed to remove inactive nodes", "err", err) + } +} + +// DBMember implements ClusterMember interface. +type DBMember struct { + node db.Node +} + +// Name returns the name of the node. +func (m DBMember) Name() string { + return m.node.ID +} + +// Address returns the address of the node. +func (m DBMember) Address() string { + return m.node.Address +} + +// DBChannel implements ClusterChannel interface for database-backed communication. +type DBChannel struct { + key string + peer *DBPeer + logger *slog.Logger +} + +// Broadcast sends a message to all nodes in the cluster by updating the database. +func (c *DBChannel) Broadcast(data []byte) { + c.peer.metrics.channelOperations.WithLabelValues(c.key, "broadcast").Inc() + + // For database implementation, broadcasting means updating the shared state + // The actual synchronization happens through the database + c.logger.Debug("broadcasting message", "key", c.key, "size", len(data)) + + // The actual state update should be handled by the calling code + // This is just for metric tracking and logging +} diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 87cdab8a09..6bcd4d64ed 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -44,17 +44,21 @@ import ( webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" "go.uber.org/automaxprocs/maxprocs" + _ "github.com/lib/pq" "github.com/prometheus/alertmanager/api" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/config/receiver" + "github.com/prometheus/alertmanager/db" "github.com/prometheus/alertmanager/dispatch" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/matcher/compat" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/provider/mem" + dbprovider "github.com/prometheus/alertmanager/provider/db" "github.com/prometheus/alertmanager/silence" "github.com/prometheus/alertmanager/template" "github.com/prometheus/alertmanager/timeinterval" @@ -176,6 +180,15 @@ func run() int { tlsConfigFile = kingpin.Flag("cluster.tls-config", "[EXPERIMENTAL] Path to config yaml file that can enable mutual TLS within the gossip protocol.").Default("").String() allowInsecureAdvertise = kingpin.Flag("cluster.allow-insecure-public-advertise-address-discovery", "[EXPERIMENTAL] Allow alertmanager to discover and listen on a public IP address.").Bool() label = kingpin.Flag("cluster.label", "The cluster label is an optional string to include on each packet and stream. It uniquely identifies the cluster and prevents cross-communication issues when sending gossip messages.").Default("").String() + + // Database configuration flags (alternative to gossip-based clustering) + dbDriver = kingpin.Flag("storage.db.driver", "Database driver to use for shared state storage instead of gossip. Leave empty to use gossip.").Default("").String() + dbDSN = kingpin.Flag("storage.db.dsn", "Database connection string. Required when storage.db.driver is set.").Default("").String() + dbSyncInterval = kingpin.Flag("storage.db.sync-interval", "Interval for database synchronization.").Default("30s").Duration() + dbNodeTimeout = kingpin.Flag("storage.db.node-timeout", "Timeout for considering a node inactive.").Default("5m").Duration() + dbMaxOpenConns = kingpin.Flag("storage.db.max-open-conns", "Maximum number of open database connections.").Default("25").Int() + dbMaxIdleConns = kingpin.Flag("storage.db.max-idle-conns", "Maximum number of idle database connections.").Default("5").Int() + featureFlags = kingpin.Flag("enable-feature", fmt.Sprintf("Comma-separated experimental features to enable. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String() ) @@ -232,13 +245,69 @@ func run() int { return 1 } + var peer *cluster.Peer // Keep original type for gossip clustering + var dbPeer *cluster.DBPeer // Database-based clustering peer + var database db.DB + + // Initialize TLS transport configuration for gossip-based clustering tlsTransportConfig, err := cluster.GetTLSTransportConfig(*tlsConfigFile) if err != nil { logger.Error("unable to initialize TLS transport configuration for gossip mesh", "err", err) return 1 } - var peer *cluster.Peer - if *clusterBindAddr != "" { + + // Check if database-based clustering is enabled + if *dbDriver != "" { + if *dbDSN == "" { + logger.Error("storage.db.dsn is required when storage.db.driver is set") + return 1 + } + + logger.Info("using database for shared state", "driver", *dbDriver) + + // Initialize database + dbConfig := db.Config{ + Driver: *dbDriver, + DSN: *dbDSN, + MaxOpenConns: *dbMaxOpenConns, + MaxIdleConns: *dbMaxIdleConns, + ConnMaxLifetime: 30 * time.Minute, + ConnMaxIdleTime: 5 * time.Minute, + NodeTimeout: *dbNodeTimeout, + SyncInterval: *dbSyncInterval, + } + + database, err = db.NewSQL(dbConfig, logger.With("component", "database"), prometheus.DefaultRegisterer) + if err != nil { + logger.Error("failed to initialize database", "err", err) + return 1 + } + defer database.Close() + + // Create database-backed cluster peer + advertiseAddr := *clusterAdvertiseAddr + if advertiseAddr == "" { + advertiseAddr = "127.0.0.1:9093" // Default advertise address + } + + dbPeerConfig := cluster.DBPeerConfig{ + DB: database, + Address: advertiseAddr, + SyncInterval: *dbSyncInterval, + NodeTimeout: *dbNodeTimeout, + Logger: logger.With("component", "cluster"), + Metrics: prometheus.DefaultRegisterer, + } + + dbPeer, err = cluster.NewDBPeer(dbPeerConfig) + if err != nil { + logger.Error("failed to create database cluster peer", "err", err) + return 1 + } + clusterEnabled.Set(1) + + } else if *clusterBindAddr != "" { + // Use traditional gossip-based clustering peer, err = cluster.Create( logger.With("component", "cluster"), prometheus.DefaultRegisterer, @@ -265,58 +334,124 @@ func run() int { stopc := make(chan struct{}) var wg sync.WaitGroup - notificationLogOpts := nflog.Options{ - SnapshotFile: filepath.Join(*dataDir, "nflog"), - Retention: *retention, - Logger: logger.With("component", "nflog"), - Metrics: prometheus.DefaultRegisterer, - } + marker := types.NewMarker(prometheus.DefaultRegisterer) - notificationLog, err := nflog.New(notificationLogOpts) - if err != nil { - logger.Error("error creating notification log", "err", err) - return 1 - } - if peer != nil { - c := peer.AddState("nfl", notificationLog, prometheus.DefaultRegisterer) - notificationLog.SetBroadcast(c.Broadcast) - } + var notificationLog *nflog.Log + var notificationLogInterface notify.NotificationLog // For passing to notify pipeline + var silences *silence.Silences + + if database != nil { + // Use database-backed notification log + dbLogConfig := nflog.DBLogOptions{ + DB: database, + Retention: *retention, + SyncInterval: *dbSyncInterval, + Logger: logger.With("component", "nflog"), + Metrics: prometheus.DefaultRegisterer, + } + + dbLog, err := nflog.NewDBLog(dbLogConfig) + if err != nil { + logger.Error("error creating database notification log", "err", err) + return 1 + } + notificationLogInterface = dbLog // Use for notify pipeline + logger.Info("database-backed notification log created") + + // Start maintenance for the DB notification log + wg.Add(1) + go func() { + dbLog.Maintenance(*maintenanceInterval, "", stopc, nil) + wg.Done() + }() + + // Use file-based silences for now (DB silences can be implemented later) + silenceOpts := silence.Options{ + SnapshotFile: filepath.Join(*dataDir, "silences"), + Retention: *retention, + Limits: silence.Limits{ + MaxSilences: func() int { return *maxSilences }, + MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes }, + }, + Logger: logger.With("component", "silences"), + Metrics: prometheus.DefaultRegisterer, + } - wg.Add(1) - go func() { - notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil) - wg.Done() - }() + silences, err = silence.New(silenceOpts) + if err != nil { + logger.Error("error creating silence", "err", err) + return 1 + } + + // Start maintenance for the silences in DB mode (still file-based) + wg.Add(1) + go func() { + silences.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "silences"), stopc, nil) + wg.Done() + }() + + // In database mode, we don't use peer state management for notification log + // The database handles the state coordination + + } else { + // Use traditional file-backed implementations + notificationLogOpts := nflog.Options{ + SnapshotFile: filepath.Join(*dataDir, "nflog"), + Retention: *retention, + Logger: logger.With("component", "nflog"), + Metrics: prometheus.DefaultRegisterer, + } - marker := types.NewMarker(prometheus.DefaultRegisterer) + notificationLog, err = nflog.New(notificationLogOpts) + if err != nil { + logger.Error("error creating notification log", "err", err) + return 1 + } + notificationLogInterface = notificationLog // Use for notify pipeline + if peer != nil { + c := peer.AddState("nfl", notificationLog, prometheus.DefaultRegisterer) + notificationLog.SetBroadcast(c.Broadcast) + } - silenceOpts := silence.Options{ - SnapshotFile: filepath.Join(*dataDir, "silences"), - Retention: *retention, - Limits: silence.Limits{ - MaxSilences: func() int { return *maxSilences }, - MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes }, - }, - Logger: logger.With("component", "silences"), - Metrics: prometheus.DefaultRegisterer, - } + wg.Add(1) + go func() { + notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil) + wg.Done() + }() - silences, err := silence.New(silenceOpts) - if err != nil { - logger.Error("error creating silence", "err", err) - return 1 - } - if peer != nil { - c := peer.AddState("sil", silences, prometheus.DefaultRegisterer) - silences.SetBroadcast(c.Broadcast) - } + wg.Add(1) + go func() { + notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil) + wg.Done() + }() - // Start providers before router potentially sends updates. - wg.Add(1) - go func() { - silences.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "silences"), stopc, nil) - wg.Done() - }() + silenceOpts := silence.Options{ + SnapshotFile: filepath.Join(*dataDir, "silences"), + Retention: *retention, + Limits: silence.Limits{ + MaxSilences: func() int { return *maxSilences }, + MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes }, + }, + Logger: logger.With("component", "silences"), + Metrics: prometheus.DefaultRegisterer, + } + + silences, err = silence.New(silenceOpts) + if err != nil { + logger.Error("error creating silence", "err", err) + return 1 + } + if peer != nil { + c := peer.AddState("sil", silences, prometheus.DefaultRegisterer) + silences.SetBroadcast(c.Broadcast) + } + + wg.Add(1) + go func() { + silences.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "silences"), stopc, nil) + wg.Done() + }() + } defer func() { close(stopc) @@ -325,6 +460,7 @@ func run() int { // Peer state listeners have been registered, now we can join and get the initial state. if peer != nil { + // Gossip-based clustering err = peer.Join( *reconnectInterval, *peerReconnectTimeout, @@ -340,14 +476,48 @@ func run() int { } }() go peer.Settle(ctx, *gossipInterval*10) + } else if dbPeer != nil { + // Database-based clustering + err = dbPeer.Join(*reconnectInterval, *peerReconnectTimeout) + if err != nil { + logger.Warn("unable to join database cluster", "err", err) + } + defer func() { + if err := dbPeer.Leave(10 * time.Second); err != nil { + logger.Warn("unable to leave database cluster", "err", err) + } + }() + // Database clustering is always ready, no need to wait for settlement + ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout) + defer cancel() + dbPeer.WaitReady(ctx) } - alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer) - if err != nil { - logger.Error("error creating memory provider", "err", err) - return 1 + // Choose alert provider based on whether database backend is enabled + var alerts provider.Alerts + var alertsCloser interface{ Close() } + if database != nil { + // Use database-backed alert provider with maintenance interval for GC + dbAlerts, err := dbprovider.NewAlertsWithGC(database, marker, logger, prometheus.DefaultRegisterer, *maintenanceInterval) + if err != nil { + logger.Error("error creating database provider", "err", err) + return 1 + } + alerts = dbAlerts + alertsCloser = dbAlerts + logger.Info("using database-backed alert provider", "gc_interval", maintenanceInterval.String()) + } else { + // Use in-memory alert provider + memAlerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer) + if err != nil { + logger.Error("error creating memory provider", "err", err) + return 1 + } + alerts = memAlerts + alertsCloser = memAlerts + logger.Info("using memory-backed alert provider") } - defer alerts.Close() + defer alertsCloser.Close() var disp *dispatch.Dispatcher defer func() { @@ -364,6 +534,8 @@ func run() int { var clusterPeer cluster.ClusterPeer if peer != nil { clusterPeer = peer + } else if dbPeer != nil { + clusterPeer = dbPeer } api, err := api.New(api.Options{ @@ -393,6 +565,9 @@ func run() int { waitFunc := func() time.Duration { return 0 } if peer != nil { waitFunc = clusterWait(peer, *peerTimeout) + } else if dbPeer != nil { + // Database clustering doesn't need wait times like gossip + waitFunc = func() time.Duration { return 0 } } timeoutFunc := func(d time.Duration) time.Duration { if d < notify.MinTimeout { @@ -470,6 +645,9 @@ func run() int { var pipelinePeer notify.Peer if peer != nil { pipelinePeer = peer + } else if dbPeer != nil { + // Database peer implements notify.Peer interface + pipelinePeer = dbPeer } pipeline := pipelineBuilder.New( @@ -479,7 +657,7 @@ func run() int { silencer, intervener, marker, - notificationLog, + notificationLogInterface, pipelinePeer, ) diff --git a/db/db.go b/db/db.go new file mode 100644 index 0000000000..48781e5b06 --- /dev/null +++ b/db/db.go @@ -0,0 +1,135 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package db provides database interfaces and implementations for shared state +// storage in Alertmanager clusters. +package db + +import ( + "context" + "time" + + "github.com/prometheus/common/model" + + "github.com/prometheus/alertmanager/nflog/nflogpb" + "github.com/prometheus/alertmanager/silence/silencepb" +) + +// Alert represents an alert stored in the database +type Alert struct { + // Unique fingerprint for the alert (based on labels) + Fingerprint string `json:"fingerprint"` + // Labels associated with the alert + Labels model.LabelSet `json:"labels"` + // Annotations for the alert + Annotations model.LabelSet `json:"annotations"` + // StartsAt is when the alert started firing + StartsAt time.Time `json:"startsAt"` + // EndsAt is when the alert stopped firing (or expected to stop) + EndsAt time.Time `json:"endsAt"` + // GeneratorURL identifies the source of this alert + GeneratorURL string `json:"generatorURL"` + // UpdatedAt is the last time this alert was updated + UpdatedAt time.Time `json:"updatedAt"` + // Timeout indicates if this alert has timed out + Timeout bool `json:"timeout"` +} + +// DB defines the interface for database operations supporting shared state storage. +type DB interface { + // Silences operations + GetSilences(ctx context.Context) ([]*silencepb.MeshSilence, error) + SetSilence(ctx context.Context, silence *silencepb.MeshSilence) error + DeleteSilence(ctx context.Context, silenceID string) error + + // Notification log operations + GetNotificationEntries(ctx context.Context, since time.Time) ([]*nflogpb.MeshEntry, error) + SetNotificationEntry(ctx context.Context, entry *nflogpb.MeshEntry) error + DeleteExpiredNotificationEntries(ctx context.Context, before time.Time) error + + // Alert operations - for shared alert storage and deduplication + GetAlerts(ctx context.Context) ([]*Alert, error) + SetAlert(ctx context.Context, alert *Alert) error + DeleteAlert(ctx context.Context, fingerprint string) error + DeleteExpiredAlerts(ctx context.Context, before time.Time) error + + // Cluster management + RegisterNode(ctx context.Context, nodeID string, address string) error + GetActiveNodes(ctx context.Context) ([]Node, error) + UpdateNodeHeartbeat(ctx context.Context, nodeID string) error + RemoveInactiveNodes(ctx context.Context, timeout time.Duration) error + + // Transaction support + Begin(ctx context.Context) (Tx, error) + + // Connection management + Close() error + Health(ctx context.Context) error +} + +// Tx represents a database transaction. +type Tx interface { + Commit() error + Rollback() error + + // Same operations as DB but within transaction context + GetSilences(ctx context.Context) ([]*silencepb.MeshSilence, error) + SetSilence(ctx context.Context, silence *silencepb.MeshSilence) error + DeleteSilence(ctx context.Context, silenceID string) error + GetNotificationEntries(ctx context.Context, since time.Time) ([]*nflogpb.MeshEntry, error) + SetNotificationEntry(ctx context.Context, entry *nflogpb.MeshEntry) error + DeleteExpiredNotificationEntries(ctx context.Context, before time.Time) error + GetAlerts(ctx context.Context) ([]*Alert, error) + SetAlert(ctx context.Context, alert *Alert) error + DeleteAlert(ctx context.Context, fingerprint string) error + DeleteExpiredAlerts(ctx context.Context, before time.Time) error +} + +// Node represents a cluster node. +type Node struct { + ID string + Address string + LastSeen time.Time + CreatedAt time.Time +} + +// Config holds database configuration. +type Config struct { + Driver string `yaml:"driver"` + DSN string `yaml:"dsn"` + Options map[string]string `yaml:"options"` + + // Connection pool settings + MaxOpenConns int `yaml:"max_open_conns"` + MaxIdleConns int `yaml:"max_idle_conns"` + ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime"` + ConnMaxIdleTime time.Duration `yaml:"conn_max_idle_time"` + + // Cluster settings + NodeTimeout time.Duration `yaml:"node_timeout"` + SyncInterval time.Duration `yaml:"sync_interval"` +} + +// DefaultConfig returns a default database configuration. +func DefaultConfig() Config { + return Config{ + Driver: "sqlite3", + DSN: "file:alertmanager.db?cache=shared&mode=rwc", + MaxOpenConns: 25, + MaxIdleConns: 5, + ConnMaxLifetime: 30 * time.Minute, + ConnMaxIdleTime: 5 * time.Minute, + NodeTimeout: 5 * time.Minute, + SyncInterval: 30 * time.Second, + } +} diff --git a/db/sql.go b/db/sql.go new file mode 100644 index 0000000000..8c5bfb04f6 --- /dev/null +++ b/db/sql.go @@ -0,0 +1,862 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "context" + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "fmt" + "log/slog" + "sort" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + + "github.com/prometheus/alertmanager/nflog/nflogpb" + "github.com/prometheus/alertmanager/silence/silencepb" +) + +// SQL implements the DB interface using SQL databases. +type SQL struct { + db *sql.DB + config Config + logger *slog.Logger + metrics *sqlMetrics +} + +type sqlMetrics struct { + operations *prometheus.CounterVec + operationDuration *prometheus.HistogramVec + connectionPool *prometheus.GaugeVec +} + +func newSQLMetrics(reg prometheus.Registerer) *sqlMetrics { + m := &sqlMetrics{ + operations: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "alertmanager_db_operations_total", + Help: "Total number of database operations.", + }, + []string{"operation", "status"}, + ), + operationDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "alertmanager_db_operation_duration_seconds", + Help: "Duration of database operations.", + Buckets: prometheus.DefBuckets, + }, + []string{"operation"}, + ), + connectionPool: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "alertmanager_db_connection_pool", + Help: "Database connection pool statistics.", + }, + []string{"state"}, + ), + } + + if reg != nil { + reg.MustRegister(m.operations, m.operationDuration, m.connectionPool) + } + + return m +} + +// NewSQL creates a new SQL database implementation. +func NewSQL(config Config, logger *slog.Logger, reg prometheus.Registerer) (*SQL, error) { + db, err := sql.Open(config.Driver, config.DSN) + if err != nil { + return nil, fmt.Errorf("failed to open database: %w", err) + } + + // Configure connection pool + db.SetMaxOpenConns(config.MaxOpenConns) + db.SetMaxIdleConns(config.MaxIdleConns) + db.SetConnMaxLifetime(config.ConnMaxLifetime) + db.SetConnMaxIdleTime(config.ConnMaxIdleTime) + + s := &SQL{ + db: db, + config: config, + logger: logger, + metrics: newSQLMetrics(reg), + } + + // Initialize schema + if err := s.initSchema(); err != nil { + db.Close() + return nil, fmt.Errorf("failed to initialize schema: %w", err) + } + + // Start metrics collection + go s.collectConnectionPoolMetrics() + + return s, nil +} + +func (s *SQL) initSchema() error { + var blobType string + switch s.config.Driver { + case "postgres", "pgx", "pq": + blobType = "BYTEA" + default: + blobType = "BLOB" + } + + schema := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS silences ( + id TEXT PRIMARY KEY, + data %s NOT NULL, + expires_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_silences_expires_at ON silences(expires_at); + CREATE INDEX IF NOT EXISTS idx_silences_updated_at ON silences(updated_at); + + CREATE TABLE IF NOT EXISTS notification_entries ( + id TEXT PRIMARY KEY, + group_key TEXT NOT NULL, + receiver TEXT NOT NULL, + data %s NOT NULL, + expires_at TIMESTAMP NOT NULL, + timestamp TIMESTAMP NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_notification_entries_expires_at ON notification_entries(expires_at); + CREATE INDEX IF NOT EXISTS idx_notification_entries_timestamp ON notification_entries(timestamp); + CREATE INDEX IF NOT EXISTS idx_notification_entries_group_receiver ON notification_entries(group_key, receiver); + + CREATE TABLE IF NOT EXISTS alerts ( + fingerprint TEXT PRIMARY KEY, + data %s NOT NULL, + labels_hash TEXT NOT NULL, + starts_at TIMESTAMP NOT NULL, + ends_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_alerts_starts_at ON alerts(starts_at); + CREATE INDEX IF NOT EXISTS idx_alerts_ends_at ON alerts(ends_at); + CREATE INDEX IF NOT EXISTS idx_alerts_updated_at ON alerts(updated_at); + CREATE INDEX IF NOT EXISTS idx_alerts_labels_hash ON alerts(labels_hash); + + CREATE TABLE IF NOT EXISTS cluster_nodes ( + id TEXT PRIMARY KEY, + address TEXT NOT NULL, + last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_cluster_nodes_last_seen ON cluster_nodes(last_seen); + `, blobType, blobType, blobType) + + _, err := s.db.Exec(schema) + return err +} + +func (s *SQL) collectConnectionPoolMetrics() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + stats := s.db.Stats() + s.metrics.connectionPool.WithLabelValues("open").Set(float64(stats.OpenConnections)) + s.metrics.connectionPool.WithLabelValues("idle").Set(float64(stats.Idle)) + s.metrics.connectionPool.WithLabelValues("in_use").Set(float64(stats.InUse)) + s.metrics.connectionPool.WithLabelValues("wait_count").Set(float64(stats.WaitCount)) + s.metrics.connectionPool.WithLabelValues("wait_duration").Set(stats.WaitDuration.Seconds()) + } +} + +func (s *SQL) recordOperation(operation string, start time.Time, err error) { + duration := time.Since(start) + s.metrics.operationDuration.WithLabelValues(operation).Observe(duration.Seconds()) + + status := "success" + if err != nil { + status = "error" + } + s.metrics.operations.WithLabelValues(operation, status).Inc() +} + +// GetSilences retrieves all silences from the database. +func (s *SQL) GetSilences(ctx context.Context) ([]*silencepb.MeshSilence, error) { + start := time.Now() + var err error + defer func() { s.recordOperation("get_silences", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `SELECT data FROM silences WHERE expires_at > $1 ORDER BY updated_at` + default: + query = `SELECT data FROM silences WHERE expires_at > ? ORDER BY updated_at` + } + rows, err := s.db.QueryContext(ctx, query, time.Now()) + if err != nil { + return nil, err + } + defer rows.Close() + + var silences []*silencepb.MeshSilence + for rows.Next() { + var data []byte + if err = rows.Scan(&data); err != nil { + return nil, err + } + + var silence silencepb.MeshSilence + if err = proto.Unmarshal(data, &silence); err != nil { + s.logger.Warn("failed to unmarshal silence", "err", err) + continue + } + + silences = append(silences, &silence) + } + + err = rows.Err() + return silences, err +} + +// SetSilence stores or updates a silence in the database. +func (s *SQL) SetSilence(ctx context.Context, silence *silencepb.MeshSilence) error { + start := time.Now() + var err error + defer func() { s.recordOperation("set_silence", start, err) }() + + data, err := proto.Marshal(silence) + if err != nil { + return err + } + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = ` + INSERT INTO silences (id, data, expires_at, updated_at) + VALUES ($1, $2, $3, $4) + ON CONFLICT (id) DO UPDATE SET + data = EXCLUDED.data, + expires_at = EXCLUDED.expires_at, + updated_at = EXCLUDED.updated_at + ` + default: + query = ` + INSERT OR REPLACE INTO silences (id, data, expires_at, updated_at) + VALUES (?, ?, ?, ?) + ` + } + + _, err = s.db.ExecContext(ctx, query, + silence.Silence.Id, + data, + silence.ExpiresAt.UTC(), + silence.Silence.UpdatedAt.UTC(), + ) + return err +} + +// DeleteSilence removes a silence from the database. +func (s *SQL) DeleteSilence(ctx context.Context, silenceID string) error { + start := time.Now() + var err error + defer func() { s.recordOperation("delete_silence", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `DELETE FROM silences WHERE id = $1` + default: + query = `DELETE FROM silences WHERE id = ?` + } + _, err = s.db.ExecContext(ctx, query, silenceID) + return err +} + +// GetNotificationEntries retrieves notification entries since a given time. +func (s *SQL) GetNotificationEntries(ctx context.Context, since time.Time) ([]*nflogpb.MeshEntry, error) { + start := time.Now() + var err error + defer func() { s.recordOperation("get_notification_entries", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `SELECT data FROM notification_entries WHERE timestamp >= $1 AND expires_at > $2 ORDER BY timestamp` + default: + query = `SELECT data FROM notification_entries WHERE timestamp >= ? AND expires_at > ? ORDER BY timestamp` + } + rows, err := s.db.QueryContext(ctx, query, since.UTC(), time.Now().UTC()) + if err != nil { + return nil, err + } + defer rows.Close() + + var entries []*nflogpb.MeshEntry + for rows.Next() { + var data []byte + if err = rows.Scan(&data); err != nil { + return nil, err + } + + var entry nflogpb.MeshEntry + if err = proto.Unmarshal(data, &entry); err != nil { + s.logger.Warn("failed to unmarshal notification entry", "err", err) + continue + } + + entries = append(entries, &entry) + } + + err = rows.Err() + return entries, err +} + +// SetNotificationEntry stores a notification entry in the database. +func (s *SQL) SetNotificationEntry(ctx context.Context, entry *nflogpb.MeshEntry) error { + start := time.Now() + var err error + defer func() { s.recordOperation("set_notification_entry", start, err) }() + + data, err := proto.Marshal(entry) + if err != nil { + return err + } + + entryID := fmt.Sprintf("%s:%s:%d", + string(entry.Entry.GroupKey), + entry.Entry.Receiver.String(), + entry.Entry.Timestamp.Unix(), + ) + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = ` + INSERT INTO notification_entries (id, group_key, receiver, data, expires_at, timestamp) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (id) DO UPDATE SET + group_key = EXCLUDED.group_key, + receiver = EXCLUDED.receiver, + data = EXCLUDED.data, + expires_at = EXCLUDED.expires_at, + timestamp = EXCLUDED.timestamp + ` + default: + query = ` + INSERT OR REPLACE INTO notification_entries (id, group_key, receiver, data, expires_at, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ` + } + + _, err = s.db.ExecContext(ctx, query, + entryID, + string(entry.Entry.GroupKey), + entry.Entry.Receiver.String(), + data, + entry.ExpiresAt.UTC(), + entry.Entry.Timestamp.UTC(), + ) + return err +} + +// DeleteExpiredNotificationEntries removes expired entries from the database. +func (s *SQL) DeleteExpiredNotificationEntries(ctx context.Context, before time.Time) error { + start := time.Now() + var err error + defer func() { s.recordOperation("delete_expired_notification_entries", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `DELETE FROM notification_entries WHERE expires_at < $1` + default: + query = `DELETE FROM notification_entries WHERE expires_at < ?` + } + _, err = s.db.ExecContext(ctx, query, before.UTC()) + return err +} + +// RegisterNode registers a cluster node in the database. +func (s *SQL) RegisterNode(ctx context.Context, nodeID string, address string) error { + start := time.Now() + var err error + defer func() { s.recordOperation("register_node", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = ` + INSERT INTO cluster_nodes (id, address, last_seen) + VALUES ($1, $2, $3) + ON CONFLICT (id) DO UPDATE SET + address = EXCLUDED.address, + last_seen = EXCLUDED.last_seen + ` + default: + query = ` + INSERT OR REPLACE INTO cluster_nodes (id, address, last_seen) + VALUES (?, ?, ?) + ` + } + + _, err = s.db.ExecContext(ctx, query, nodeID, address, time.Now().UTC()) + return err +} + +// GetActiveNodes returns all active cluster nodes. +func (s *SQL) GetActiveNodes(ctx context.Context) ([]Node, error) { + start := time.Now() + var err error + defer func() { s.recordOperation("get_active_nodes", start, err) }() + + cutoff := time.Now().Add(-s.config.NodeTimeout).UTC() + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `SELECT id, address, last_seen, created_at FROM cluster_nodes WHERE last_seen > $1 ORDER BY last_seen DESC` + default: + query = `SELECT id, address, last_seen, created_at FROM cluster_nodes WHERE last_seen > ? ORDER BY last_seen DESC` + } + + rows, err := s.db.QueryContext(ctx, query, cutoff) + if err != nil { + return nil, err + } + defer rows.Close() + + var nodes []Node + for rows.Next() { + var node Node + if err = rows.Scan(&node.ID, &node.Address, &node.LastSeen, &node.CreatedAt); err != nil { + return nil, err + } + nodes = append(nodes, node) + } + + err = rows.Err() + return nodes, err +} + +// UpdateNodeHeartbeat updates a node's last seen timestamp. +func (s *SQL) UpdateNodeHeartbeat(ctx context.Context, nodeID string) error { + start := time.Now() + var err error + defer func() { s.recordOperation("update_node_heartbeat", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `UPDATE cluster_nodes SET last_seen = $1 WHERE id = $2` + default: + query = `UPDATE cluster_nodes SET last_seen = ? WHERE id = ?` + } + _, err = s.db.ExecContext(ctx, query, time.Now().UTC(), nodeID) + return err +} + +// RemoveInactiveNodes removes nodes that haven't been seen for the specified timeout. +func (s *SQL) RemoveInactiveNodes(ctx context.Context, timeout time.Duration) error { + start := time.Now() + var err error + defer func() { s.recordOperation("remove_inactive_nodes", start, err) }() + + cutoff := time.Now().Add(-timeout).UTC() + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `DELETE FROM cluster_nodes WHERE last_seen < $1` + default: + query = `DELETE FROM cluster_nodes WHERE last_seen < ?` + } + _, err = s.db.ExecContext(ctx, query, cutoff) + return err +} + +// Begin starts a new database transaction. +func (s *SQL) Begin(ctx context.Context) (Tx, error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + return &sqlTx{tx: tx, logger: s.logger}, nil +} + +// Close closes the database connection. +func (s *SQL) Close() error { + return s.db.Close() +} + +// Health checks the database connection health. +func (s *SQL) Health(ctx context.Context) error { + return s.db.PingContext(ctx) +} + +// sqlTx implements the Tx interface for SQL transactions. +type sqlTx struct { + tx *sql.Tx + logger *slog.Logger +} + +func (tx *sqlTx) Commit() error { + return tx.tx.Commit() +} + +func (tx *sqlTx) Rollback() error { + return tx.tx.Rollback() +} + +// Transaction implementations use the same logic but with tx.tx instead of main connection + +func (tx *sqlTx) GetSilences(ctx context.Context) ([]*silencepb.MeshSilence, error) { + query := `SELECT data FROM silences WHERE expires_at > $1 ORDER BY updated_at` + rows, err := tx.tx.QueryContext(ctx, query, time.Now()) + if err != nil { + return nil, err + } + defer rows.Close() + + var silences []*silencepb.MeshSilence + for rows.Next() { + var data []byte + if err := rows.Scan(&data); err != nil { + return nil, err + } + + var silence silencepb.MeshSilence + if err := proto.Unmarshal(data, &silence); err != nil { + tx.logger.Warn("failed to unmarshal silence", "err", err) + continue + } + + silences = append(silences, &silence) + } + + return silences, rows.Err() +} + +func (tx *sqlTx) SetSilence(ctx context.Context, silence *silencepb.MeshSilence) error { + data, err := proto.Marshal(silence) + if err != nil { + return err + } + + var query string + // Note: We need to check the parent SQL instance's driver + // For simplicity, we'll detect based on the query structure + query = ` + INSERT INTO silences (id, data, expires_at, updated_at) + VALUES ($1, $2, $3, $4) + ON CONFLICT (id) DO UPDATE SET + data = EXCLUDED.data, + expires_at = EXCLUDED.expires_at, + updated_at = EXCLUDED.updated_at + ` + + _, err = tx.tx.ExecContext(ctx, query, + silence.Silence.Id, + data, + silence.ExpiresAt.UTC(), + silence.Silence.UpdatedAt.UTC(), + ) + return err +} + +func (tx *sqlTx) DeleteSilence(ctx context.Context, silenceID string) error { + query := `DELETE FROM silences WHERE id = $1` + _, err := tx.tx.ExecContext(ctx, query, silenceID) + return err +} + +func (tx *sqlTx) GetNotificationEntries(ctx context.Context, since time.Time) ([]*nflogpb.MeshEntry, error) { + query := `SELECT data FROM notification_entries WHERE timestamp >= $1 AND expires_at > $2 ORDER BY timestamp` + rows, err := tx.tx.QueryContext(ctx, query, since.UTC(), time.Now().UTC()) + if err != nil { + return nil, err + } + defer rows.Close() + + var entries []*nflogpb.MeshEntry + for rows.Next() { + var data []byte + if err := rows.Scan(&data); err != nil { + return nil, err + } + + var entry nflogpb.MeshEntry + if err := proto.Unmarshal(data, &entry); err != nil { + tx.logger.Warn("failed to unmarshal notification entry", "err", err) + continue + } + + entries = append(entries, &entry) + } + + return entries, rows.Err() +} + +func (tx *sqlTx) SetNotificationEntry(ctx context.Context, entry *nflogpb.MeshEntry) error { + data, err := proto.Marshal(entry) + if err != nil { + return err + } + + entryID := fmt.Sprintf("%s:%s:%d", + string(entry.Entry.GroupKey), + entry.Entry.Receiver.String(), + entry.Entry.Timestamp.Unix(), + ) + + query := ` + INSERT INTO notification_entries (id, group_key, receiver, data, expires_at, timestamp) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (id) DO UPDATE SET + group_key = EXCLUDED.group_key, + receiver = EXCLUDED.receiver, + data = EXCLUDED.data, + expires_at = EXCLUDED.expires_at, + timestamp = EXCLUDED.timestamp + ` + + _, err = tx.tx.ExecContext(ctx, query, + entryID, + string(entry.Entry.GroupKey), + entry.Entry.Receiver.String(), + data, + entry.ExpiresAt.UTC(), + entry.Entry.Timestamp.UTC(), + ) + return err +} + +func (tx *sqlTx) DeleteExpiredNotificationEntries(ctx context.Context, before time.Time) error { + query := `DELETE FROM notification_entries WHERE expires_at < $1` + _, err := tx.tx.ExecContext(ctx, query, before.UTC()) + return err +} + +func (tx *sqlTx) GetAlerts(ctx context.Context) ([]*Alert, error) { + query := `SELECT data FROM alerts ORDER BY starts_at` + rows, err := tx.tx.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + var alerts []*Alert + for rows.Next() { + var data []byte + if err := rows.Scan(&data); err != nil { + return nil, err + } + + var alert Alert + if err = json.Unmarshal(data, &alert); err != nil { + tx.logger.Warn("failed to unmarshal alert", "err", err) + continue + } + + alerts = append(alerts, &alert) + } + + return alerts, rows.Err() +} + +func (tx *sqlTx) SetAlert(ctx context.Context, alert *Alert) error { + data, err := json.Marshal(alert) + if err != nil { + return err + } + + labelsHash := generateLabelsHash(alert.Labels) + + var query string + // Note: assuming PostgreSQL since transactions are typically used with PostgreSQL + query = ` + INSERT INTO alerts (fingerprint, data, labels_hash, starts_at, ends_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (fingerprint) DO UPDATE SET + data = EXCLUDED.data, + labels_hash = EXCLUDED.labels_hash, + starts_at = LEAST(alerts.starts_at, EXCLUDED.starts_at), + ends_at = GREATEST(alerts.ends_at, EXCLUDED.ends_at), + updated_at = EXCLUDED.updated_at + ` + + _, err = tx.tx.ExecContext(ctx, query, + alert.Fingerprint, + data, + labelsHash, + alert.StartsAt.UTC(), + alert.EndsAt.UTC(), + alert.UpdatedAt.UTC(), + ) + return err +} + +func (tx *sqlTx) DeleteAlert(ctx context.Context, fingerprint string) error { + query := `DELETE FROM alerts WHERE fingerprint = $1` + _, err := tx.tx.ExecContext(ctx, query, fingerprint) + return err +} + +func (tx *sqlTx) DeleteExpiredAlerts(ctx context.Context, before time.Time) error { + query := `DELETE FROM alerts WHERE ends_at < $1` + _, err := tx.tx.ExecContext(ctx, query, before.UTC()) + return err +} + +// generateLabelsHash creates a deterministic hash from labels for indexing +func generateLabelsHash(labels model.LabelSet) string { + // Convert labels to sorted slice for deterministic hash + var labelPairs []string + for name, value := range labels { + labelPairs = append(labelPairs, fmt.Sprintf("%s=%s", name, value)) + } + sort.Strings(labelPairs) + + // Create hash of sorted labels + hasher := sha256.New() + for _, pair := range labelPairs { + hasher.Write([]byte(pair)) + } + return hex.EncodeToString(hasher.Sum(nil)) +} + +// GetAlerts retrieves all active alerts from the database. +func (s *SQL) GetAlerts(ctx context.Context) ([]*Alert, error) { + start := time.Now() + var err error + defer func() { s.recordOperation("get_alerts", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `SELECT data FROM alerts WHERE ends_at > $1 ORDER BY updated_at` + default: + query = `SELECT data FROM alerts WHERE ends_at > ? ORDER BY updated_at` + } + rows, err := s.db.QueryContext(ctx, query, time.Now()) + if err != nil { + return nil, err + } + defer rows.Close() + + var alerts []*Alert + for rows.Next() { + var data []byte + if err = rows.Scan(&data); err != nil { + return nil, err + } + + var alert Alert + if err = json.Unmarshal(data, &alert); err != nil { + s.logger.Warn("failed to unmarshal alert", "err", err) + continue + } + + alerts = append(alerts, &alert) + } + + err = rows.Err() + return alerts, err +} + +// SetAlert stores or updates an alert in the database. +func (s *SQL) SetAlert(ctx context.Context, alert *Alert) error { + start := time.Now() + var err error + defer func() { s.recordOperation("set_alert", start, err) }() + + data, err := json.Marshal(alert) + if err != nil { + return err + } + + labelsHash := generateLabelsHash(alert.Labels) + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = ` + INSERT INTO alerts (fingerprint, data, labels_hash, starts_at, ends_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (fingerprint) DO UPDATE SET + data = EXCLUDED.data, + labels_hash = EXCLUDED.labels_hash, + starts_at = LEAST(alerts.starts_at, EXCLUDED.starts_at), + ends_at = GREATEST(alerts.ends_at, EXCLUDED.ends_at), + updated_at = EXCLUDED.updated_at + ` + default: + query = ` + INSERT OR REPLACE INTO alerts (fingerprint, data, labels_hash, starts_at, ends_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + ` + } + + _, err = s.db.ExecContext(ctx, query, + alert.Fingerprint, + data, + labelsHash, + alert.StartsAt.UTC(), + alert.EndsAt.UTC(), + alert.UpdatedAt.UTC(), + ) + return err +} + +// DeleteAlert removes an alert from the database. +func (s *SQL) DeleteAlert(ctx context.Context, fingerprint string) error { + start := time.Now() + var err error + defer func() { s.recordOperation("delete_alert", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `DELETE FROM alerts WHERE fingerprint = $1` + default: + query = `DELETE FROM alerts WHERE fingerprint = ?` + } + _, err = s.db.ExecContext(ctx, query, fingerprint) + return err +} + +// DeleteExpiredAlerts removes expired alerts from the database. +func (s *SQL) DeleteExpiredAlerts(ctx context.Context, before time.Time) error { + start := time.Now() + var err error + defer func() { s.recordOperation("delete_expired_alerts", start, err) }() + + var query string + switch s.config.Driver { + case "postgres", "pgx", "pq": + query = `DELETE FROM alerts WHERE ends_at < $1` + default: + query = `DELETE FROM alerts WHERE ends_at < ?` + } + _, err = s.db.ExecContext(ctx, query, before.UTC()) + return err +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000000..3a39332c29 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,136 @@ +services: + postgres: + image: postgres:15 + environment: + POSTGRES_DB: alertmanager + POSTGRES_USER: alertmanager + POSTGRES_PASSWORD: alertmanager_password + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" + networks: + - alertmanager + healthcheck: + test: ["CMD-SHELL", "pg_isready -U alertmanager -d alertmanager"] + interval: 10s + timeout: 5s + retries: 5 + + alertmanager-1: + build: + context: . + dockerfile: Dockerfile.alertmanager + platform: linux/amd64 + ports: + - "9093:9093" + volumes: + - ./alertmanager-1.yml:/etc/alertmanager/alertmanager.yml + - alertmanager-1-data:/alertmanager + command: + - '--config.file=/etc/alertmanager/alertmanager.yml' + - '--storage.path=/alertmanager' + - '--web.listen-address=0.0.0.0:9093' + - '--storage.db.driver=postgres' + - '--storage.db.dsn=postgresql://alertmanager:alertmanager_password@postgres:5432/alertmanager?sslmode=disable' + - '--storage.db.sync-interval=30s' + - '--storage.db.node-timeout=5m' + - '--storage.db.max-open-conns=25' + - '--storage.db.max-idle-conns=5' + networks: + - alertmanager + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:9093/-/healthy"] + interval: 30s + timeout: 10s + retries: 3 + + alertmanager-2: + build: + context: . + dockerfile: Dockerfile.alertmanager + platform: linux/amd64 + ports: + - "9094:9093" + volumes: + - ./alertmanager-2.yml:/etc/alertmanager/alertmanager.yml + - alertmanager-2-data:/alertmanager + command: + - '--config.file=/etc/alertmanager/alertmanager.yml' + - '--storage.path=/alertmanager' + - '--web.listen-address=0.0.0.0:9093' + - '--storage.db.driver=postgres' + - '--storage.db.dsn=postgresql://alertmanager:alertmanager_password@postgres:5432/alertmanager?sslmode=disable' + - '--storage.db.sync-interval=30s' + - '--storage.db.node-timeout=5m' + - '--storage.db.max-open-conns=25' + - '--storage.db.max-idle-conns=5' + networks: + - alertmanager + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:9093/-/healthy"] + interval: 30s + timeout: 10s + retries: 3 + + alertmanager-3: + build: + context: . + dockerfile: Dockerfile.alertmanager + platform: linux/amd64 + ports: + - "9095:9093" + volumes: + - ./alertmanager-3.yml:/etc/alertmanager/alertmanager.yml + - alertmanager-3-data:/alertmanager + command: + - '--config.file=/etc/alertmanager/alertmanager.yml' + - '--storage.path=/alertmanager' + - '--web.listen-address=0.0.0.0:9093' + - '--storage.db.driver=postgres' + - '--storage.db.dsn=postgresql://alertmanager:alertmanager_password@postgres:5432/alertmanager?sslmode=disable' + - '--storage.db.sync-interval=30s' + - '--storage.db.node-timeout=5m' + - '--storage.db.max-open-conns=25' + - '--storage.db.max-idle-conns=5' + networks: + - alertmanager + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:9093/-/healthy"] + interval: 30s + timeout: 10s + retries: 3 + + webhook-server: + build: + context: . + dockerfile: Dockerfile.webhook + platform: linux/amd64 + ports: + - "5001:5001" + networks: + - alertmanager + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:5001/"] + interval: 30s + timeout: 10s + retries: 3 + +volumes: + postgres_data: + alertmanager-1-data: + alertmanager-2-data: + alertmanager-3-data: + +networks: + alertmanager: + driver: bridge diff --git a/docker-helper.sh b/docker-helper.sh new file mode 100755 index 0000000000..96da4e514a --- /dev/null +++ b/docker-helper.sh @@ -0,0 +1,172 @@ +#!/usr/bin/env bash + +# Helper script to manage Alertmanager Docker Compose setup + +set -e + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +log_info() { + echo -e "${GREEN}[INFO]${NC} $1" +} + +log_warn() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +log_error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +# Function to start services +start_services() { + log_info "Starting Alertmanager cluster and webhook server..." + docker compose up -d --build --force-recreate + + log_info "Waiting for services to be ready..." + sleep 10 + + # Check if services are healthy + log_info "Checking service health..." + docker compose ps + + echo + log_info "Services are running!" + echo "Alertmanager instances:" + echo " - Instance 1: http://localhost:9093" + echo " - Instance 2: http://localhost:9094" + echo " - Instance 3: http://localhost:9095" + echo "Webhook server: http://localhost:5001" +} + +# Function to stop services +stop_services() { + log_info "Stopping Alertmanager cluster..." + docker compose down + log_info "Services stopped." +} + +# Function to restart services +restart_services() { + log_info "Restarting Alertmanager cluster..." + docker compose restart + log_info "Services restarted." +} + +# Function to show logs +show_logs() { + if [ -n "$1" ]; then + docker compose logs -f "$1" + else + docker compose logs -f + fi +} + +# Function to send test alerts +send_alerts() { + log_info "Sending test alerts..." + + # Check if send_alerts.sh exists + if [ ! -f "examples/ha/send_alerts.sh" ]; then + log_error "send_alerts.sh not found at examples/ha/send_alerts.sh" + exit 1 + fi + + # Make it executable + chmod +x examples/ha/send_alerts.sh + + # Run the script + ./examples/ha/send_alerts.sh + + log_info "Test alerts sent!" + log_info "Check webhook server logs: docker compose logs webhook-server" +} + +# Function to check service status +check_status() { + log_info "Checking service status..." + docker compose ps + + echo + log_info "Service endpoints:" + echo " - Alertmanager 1: http://localhost:9093" + echo " - Alertmanager 2: http://localhost:9094" + echo " - Alertmanager 3: http://localhost:9095" + echo " - Webhook server: http://localhost:5001" + + echo + log_info "Health checks:" + for port in 9093 9094 9095; do + if curl -s -o /dev/null -w "%{http_code}" "http://localhost:$port/-/healthy" | grep -q "200"; then + echo -e " - Alertmanager ($port): ${GREEN}✓ Healthy${NC}" + else + echo -e " - Alertmanager ($port): ${RED}✗ Unhealthy${NC}" + fi + done + + if curl -s -o /dev/null -w "%{http_code}" "http://localhost:5001/" | grep -q "200"; then + echo -e " - Webhook server: ${GREEN}✓ Healthy${NC}" + else + echo -e " - Webhook server: ${RED}✗ Unhealthy${NC}" + fi +} + +# Function to clean up everything +cleanup() { + log_info "Cleaning up containers, images, and volumes..." + docker compose down -v --rmi all + log_info "Cleanup complete." +} + +# Main function +main() { + case "${1:-}" in + start) + start_services + ;; + stop) + stop_services + ;; + restart) + restart_services + ;; + logs) + show_logs "$2" + ;; + send-alerts) + send_alerts + ;; + status) + check_status + ;; + cleanup) + cleanup + ;; + *) + echo "Usage: $0 {start|stop|restart|logs [service]|send-alerts|status|cleanup}" + echo + echo "Commands:" + echo " start - Start all services" + echo " stop - Stop all services" + echo " restart - Restart all services" + echo " logs - Show logs for all services" + echo " logs - Show logs for specific service" + echo " send-alerts - Send test alerts" + echo " status - Check service status" + echo " cleanup - Remove containers, images, and volumes" + echo + echo "Examples:" + echo " $0 start" + echo " $0 logs webhook-server" + echo " $0 send-alerts" + exit 1 + ;; + esac +} + +# Run main function +main "$@" diff --git a/go.mod b/go.mod index 5221932e33..051f003ee7 100644 --- a/go.mod +++ b/go.mod @@ -78,6 +78,7 @@ require ( github.com/jpillora/backoff v1.0.0 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/lib/pq v1.10.9 github.com/mailru/easyjson v0.7.7 // indirect github.com/mdlayher/socket v0.4.1 // indirect github.com/mdlayher/vsock v1.2.1 // indirect diff --git a/go.sum b/go.sum index a633321b63..1d01298542 100644 --- a/go.sum +++ b/go.sum @@ -361,6 +361,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= diff --git a/nflog/db_nflog.go b/nflog/db_nflog.go new file mode 100644 index 0000000000..d039f896b3 --- /dev/null +++ b/nflog/db_nflog.go @@ -0,0 +1,417 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nflog + +import ( + "bytes" + "context" + "fmt" + "io" + "log/slog" + "os" + "sync" + "time" + + uuid "github.com/gofrs/uuid" + "github.com/matttproud/golang_protobuf_extensions/pbutil" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/alertmanager/db" + pb "github.com/prometheus/alertmanager/nflog/nflogpb" +) + +// DBLog implements a notification log backed by a database. +type DBLog struct { + db db.DB + nodeID string + logger *slog.Logger + metrics *metrics + mtx sync.RWMutex + + retention time.Duration + broadcastFunc func([]byte) + + // Sync management + syncInterval time.Duration + stopc chan struct{} + syncWg sync.WaitGroup +} + +type DBLogOptions struct { + DB db.DB + NodeID string + Retention time.Duration + Logger *slog.Logger + Metrics prometheus.Registerer + SyncInterval time.Duration +} + +// NewDBLog creates a new database-backed notification log. +func NewDBLog(opts DBLogOptions) (*DBLog, error) { + if opts.DB == nil { + return nil, fmt.Errorf("database is required") + } + + if opts.NodeID == "" { + nodeUUID, err := uuid.NewV4() + if err != nil { + return nil, fmt.Errorf("failed to generate node ID: %w", err) + } + opts.NodeID = nodeUUID.String() + } + + if opts.SyncInterval == 0 { + opts.SyncInterval = 30 * time.Second + } + + if opts.Logger == nil { + opts.Logger = slog.Default() + } + + l := &DBLog{ + db: opts.DB, + nodeID: opts.NodeID, + logger: opts.Logger, + metrics: newMetrics(opts.Metrics), + retention: opts.Retention, + syncInterval: opts.SyncInterval, + stopc: make(chan struct{}), + broadcastFunc: func([]byte) {}, // No-op by default + } + + // Register this node in the database + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := l.db.RegisterNode(ctx, l.nodeID, ""); err != nil { + return nil, fmt.Errorf("failed to register node: %w", err) + } + + // Start background sync + l.startSync() + + return l, nil +} + +// startSync starts the background synchronization with the database. +func (l *DBLog) startSync() { + l.syncWg.Add(1) + + go func() { + defer l.syncWg.Done() + ticker := time.NewTicker(l.syncInterval) + defer ticker.Stop() + + for { + select { + case <-l.stopc: + return + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + + // Update heartbeat + if err := l.db.UpdateNodeHeartbeat(ctx, l.nodeID); err != nil { + l.logger.Warn("failed to update node heartbeat", "err", err) + } + + // Clean up expired entries + cutoff := time.Now().Add(-l.retention) + if err := l.db.DeleteExpiredNotificationEntries(ctx, cutoff); err != nil { + l.logger.Warn("failed to clean up expired notification entries", "err", err) + } + + cancel() + } + } + }() +} + +// Close stops the database log and cleans up resources. +func (l *DBLog) Close() error { + close(l.stopc) + l.syncWg.Wait() + return nil +} + +// Log records a notification entry. +func (l *DBLog) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + now := time.Now().UTC() + + expiresAt := now.Add(l.retention) + if expiry > 0 && l.retention > expiry { + expiresAt = now.Add(expiry) + } + + entry := &pb.MeshEntry{ + Entry: &pb.Entry{ + Receiver: r, + GroupKey: []byte(gkey), + Timestamp: now, + FiringAlerts: firingAlerts, + ResolvedAlerts: resolvedAlerts, + }, + ExpiresAt: expiresAt, + } + + if err := l.db.SetNotificationEntry(ctx, entry); err != nil { + return fmt.Errorf("failed to store notification entry: %w", err) + } + + // Broadcast the change (for compatibility with existing code) + if l.broadcastFunc != nil { + if b, err := l.marshalMeshEntry(entry); err == nil { + l.broadcastFunc(b) + } + } + + l.metrics.propagatedMessagesTotal.Inc() + l.logger.Debug("logged notification", "receiver", receiverKey(r), "group_key", gkey) + + return nil +} + +// Query retrieves notification entries using the same interface as the regular Log. +func (l *DBLog) Query(params ...QueryParam) ([]*pb.Entry, error) { + q := &query{} + for _, p := range params { + if err := p(q); err != nil { + return nil, err + } + } + + // TODO(fabxc): For now our only query mode is the most recent entry for a + // receiver/group_key combination, to match the original implementation. + if q.recv == nil || q.groupKey == "" { + return nil, fmt.Errorf("no query parameters specified") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Get recent entries (last 24 hours should be sufficient for most queries) + since := time.Now().Add(-24 * time.Hour) + entries, err := l.db.GetNotificationEntries(ctx, since) + if err != nil { + return nil, fmt.Errorf("failed to get notification entries: %w", err) + } + + // Find the most recent entry for this receiver/group_key combination + var mostRecent *pb.Entry + for _, meshEntry := range entries { + entry := meshEntry.Entry + + // Filter by receiver and group key + if receiverKey(entry.Receiver) == receiverKey(q.recv) && string(entry.GroupKey) == q.groupKey { + if mostRecent == nil || entry.Timestamp.After(mostRecent.Timestamp) { + mostRecent = entry + } + } + } + + if mostRecent == nil { + return nil, ErrNotFound + } + + return []*pb.Entry{mostRecent}, nil +} + +// GC runs garbage collection, removing expired notification entries. +func (l *DBLog) GC() (int, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Calculate the cutoff time for expired entries + cutoffTime := time.Now().Add(-l.retention) + + // Delete expired notification entries + if err := l.db.DeleteExpiredNotificationEntries(ctx, cutoffTime); err != nil { + l.logger.Error("failed to delete expired notification entries", "err", err) + return 0, err + } + + l.logger.Debug("notification log garbage collection completed", "cutoff_time", cutoffTime) + // Return 0 for count as we don't track the number deleted + return 0, nil +} + +// Snapshot writes the current notification log state to a writer. +func (l *DBLog) Snapshot(w io.Writer) (int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Get all non-expired entries + since := time.Now().Add(-l.retention) + entries, err := l.db.GetNotificationEntries(ctx, since) + if err != nil { + return 0, fmt.Errorf("failed to get notification entries: %w", err) + } + + var buf bytes.Buffer + for _, entry := range entries { + if _, err := pbutil.WriteDelimited(&buf, entry); err != nil { + return 0, err + } + } + + n, err := w.Write(buf.Bytes()) + return int64(n), err +} + +// MarshalBinary serializes the notification log state. +func (l *DBLog) MarshalBinary() ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Get all non-expired entries + since := time.Now().Add(-l.retention) + entries, err := l.db.GetNotificationEntries(ctx, since) + if err != nil { + return nil, fmt.Errorf("failed to get notification entries: %w", err) + } + + var buf bytes.Buffer + for _, entry := range entries { + if _, err := pbutil.WriteDelimited(&buf, entry); err != nil { + return nil, err + } + } + + return buf.Bytes(), nil +} + +// Merge processes incoming notification log state. +func (l *DBLog) Merge(b []byte) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Decode the incoming state + entries, err := l.decodeState(bytes.NewReader(b)) + if err != nil { + return err + } + + now := time.Now().UTC() + merged := false + + for _, entry := range entries { + // Check if we should store this entry + if entry.ExpiresAt.Before(now) { + continue + } + + // Store in database + if err := l.db.SetNotificationEntry(ctx, entry); err != nil { + l.logger.Warn("failed to merge notification entry", "receiver", entry.Entry.Receiver, "err", err) + continue + } + + merged = true + } + + if merged { + l.metrics.propagatedMessagesTotal.Inc() + } + + return nil +} + +// SetBroadcast sets the broadcast function (for compatibility). +func (l *DBLog) SetBroadcast(f func([]byte)) { + l.broadcastFunc = f +} + +// Maintenance performs periodic maintenance tasks. +func (l *DBLog) Maintenance(interval time.Duration, snapFile string, stopc <-chan struct{}, maintenanceFunc MaintenanceFunc) { + if interval > 0 { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-stopc: + return + case <-ticker.C: + // Run garbage collection to remove expired entries + if _, err := l.GC(); err != nil { + l.logger.Warn("notification log garbage collection failed", "err", err) + } + + // Take snapshot if requested + if snapFile != "" { + if err := l.takeSnapshot(snapFile); err != nil { + l.logger.Warn("failed to take snapshot", "file", snapFile, "err", err) + } + } + + // Run custom maintenance function if provided + if maintenanceFunc != nil { + if _, err := maintenanceFunc(); err != nil { + l.logger.Warn("maintenance function failed", "err", err) + } + } + } + } + } +} + +// Helper methods + +func (l *DBLog) marshalMeshEntry(entry *pb.MeshEntry) ([]byte, error) { + var buf bytes.Buffer + if _, err := pbutil.WriteDelimited(&buf, entry); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (l *DBLog) decodeState(r io.Reader) ([]*pb.MeshEntry, error) { + var entries []*pb.MeshEntry + for { + var entry pb.MeshEntry + _, err := pbutil.ReadDelimited(r, &entry) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + entries = append(entries, &entry) + } + return entries, nil +} + +func (l *DBLog) takeSnapshot(filename string) error { + tmpFile := filename + ".tmp" + + f, err := os.Create(tmpFile) + if err != nil { + return err + } + defer f.Close() + + if _, err := l.Snapshot(f); err != nil { + os.Remove(tmpFile) + return err + } + + if err := f.Sync(); err != nil { + os.Remove(tmpFile) + return err + } + + return os.Rename(tmpFile, filename) +} diff --git a/provider/db/db.go b/provider/db/db.go new file mode 100644 index 0000000000..c4bb7832df --- /dev/null +++ b/provider/db/db.go @@ -0,0 +1,381 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "context" + "log/slog" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + + "github.com/prometheus/alertmanager/db" + "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/types" +) + +// Alerts provides a database-backed alert provider that implements the provider.Alerts interface. +// Unlike the memory-based provider, this stores alerts persistently in a database and +// enables high availability by sharing alert state across multiple Alertmanager instances. +type Alerts struct { + db db.DB + logger *slog.Logger + marker types.AlertMarker + cancel context.CancelFunc + maintenanceInterval time.Duration +} + +// NewAlerts creates a new database-backed alert provider. +func NewAlerts(db db.DB, marker types.AlertMarker, logger *slog.Logger, r prometheus.Registerer) (*Alerts, error) { + return NewAlertsWithGC(db, marker, logger, r, 15*time.Minute) // Default 15-minute cleanup interval +} + +// NewAlertsWithGC creates a new database-backed alert provider with custom maintenance interval. +func NewAlertsWithGC(db db.DB, marker types.AlertMarker, logger *slog.Logger, r prometheus.Registerer, maintenanceInterval time.Duration) (*Alerts, error) { + ctx, cancel := context.WithCancel(context.Background()) + + a := &Alerts{ + db: db, + logger: logger.With("component", "db-provider"), + marker: marker, + cancel: cancel, + maintenanceInterval: maintenanceInterval, + } + + if r != nil { + a.registerMetrics(r) + } + + // Start garbage collection goroutine + go a.runGC(ctx) + + return a, nil +} + +func (a *Alerts) registerMetrics(r prometheus.Registerer) { + // Register database-specific alert metrics + newDBAlertByStatus := func(s types.AlertState) prometheus.GaugeFunc { + return prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "alertmanager_alerts", + Help: "How many alerts by state.", + ConstLabels: prometheus.Labels{"state": string(s), "storage": "database"}, + }, + func() float64 { + return float64(a.count(s)) + }, + ) + } + + r.MustRegister(newDBAlertByStatus(types.AlertStateActive)) + r.MustRegister(newDBAlertByStatus(types.AlertStateSuppressed)) + r.MustRegister(newDBAlertByStatus(types.AlertStateUnprocessed)) +} + +// Subscribe returns an iterator over active alerts that have not been resolved and successfully notified about. +// They are not guaranteed to be in chronological order. +func (a *Alerts) Subscribe() provider.AlertIterator { + var ( + ch = make(chan *types.Alert, 1000) + done = make(chan struct{}) + ) + + go func() { + defer close(ch) + + // For database provider, we'll periodically fetch alerts to simulate subscription + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + // Send initial alerts + a.sendAlertsToChannel(ch, done) + + for { + select { + case <-done: + return + case <-ticker.C: + a.sendAlertsToChannel(ch, done) + } + } + }() + + return provider.NewAlertIterator(ch, done, nil) +} + +func (a *Alerts) sendAlertsToChannel(ch chan<- *types.Alert, done <-chan struct{}) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + alerts, err := a.db.GetAlerts(ctx) + if err != nil { + a.logger.Error("failed to get alerts from database", "err", err) + return + } + + for _, dbAlert := range alerts { + if dbAlert == nil { + continue + } + + // Convert db.Alert to types.Alert + labels := make(model.LabelSet) + for k, v := range dbAlert.Labels { + labels[model.LabelName(k)] = model.LabelValue(v) + } + annotations := make(model.LabelSet) + for k, v := range dbAlert.Annotations { + annotations[model.LabelName(k)] = model.LabelValue(v) + } + + alert := &types.Alert{ + Alert: model.Alert{ + Labels: labels, + Annotations: annotations, + StartsAt: dbAlert.StartsAt, + EndsAt: dbAlert.EndsAt, + GeneratorURL: dbAlert.GeneratorURL, + }, + UpdatedAt: dbAlert.UpdatedAt, + Timeout: false, + } + + select { + case ch <- alert: + case <-done: + return + } + } +} + +// GetPending returns an iterator over all the alerts that have pending notifications. +func (a *Alerts) GetPending() provider.AlertIterator { + var ( + ch = make(chan *types.Alert, 1000) // Use a larger buffer for database queries + done = make(chan struct{}) + ) + + go func() { + defer close(ch) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + alerts, err := a.db.GetAlerts(ctx) + if err != nil { + a.logger.Error("failed to get alerts from database", "err", err) + return + } + + for _, dbAlert := range alerts { + if dbAlert == nil { + continue // Skip nil alerts + } + + // Convert db.Alert to types.Alert + labels := make(model.LabelSet) + for k, v := range dbAlert.Labels { + labels[model.LabelName(k)] = model.LabelValue(v) + } + annotations := make(model.LabelSet) + for k, v := range dbAlert.Annotations { + annotations[model.LabelName(k)] = model.LabelValue(v) + } + + alert := &types.Alert{ + Alert: model.Alert{ + Labels: labels, + Annotations: annotations, + StartsAt: dbAlert.StartsAt, + EndsAt: dbAlert.EndsAt, + GeneratorURL: dbAlert.GeneratorURL, + }, + UpdatedAt: dbAlert.UpdatedAt, + Timeout: false, // Database alerts don't timeout the same way + } + + select { + case ch <- alert: + case <-done: + return + } + } + }() + + return provider.NewAlertIterator(ch, done, nil) +} + +// Get returns the alert for a given fingerprint. +func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + alerts, err := a.db.GetAlerts(ctx) + if err != nil { + return nil, err + } + + for _, dbAlert := range alerts { + if dbAlert.Fingerprint == fp.String() { + labels := make(model.LabelSet) + for k, v := range dbAlert.Labels { + labels[model.LabelName(k)] = model.LabelValue(v) + } + annotations := make(model.LabelSet) + for k, v := range dbAlert.Annotations { + annotations[model.LabelName(k)] = model.LabelValue(v) + } + + alert := &types.Alert{ + Alert: model.Alert{ + Labels: labels, + Annotations: annotations, + StartsAt: dbAlert.StartsAt, + EndsAt: dbAlert.EndsAt, + GeneratorURL: dbAlert.GeneratorURL, + }, + UpdatedAt: dbAlert.UpdatedAt, + Timeout: false, + } + return alert, nil + } + } + + return nil, provider.ErrNotFound +} + +// Put adds the given set of alerts to the database. +// This method provides deduplication by using the fingerprint as the primary key. +func (a *Alerts) Put(alerts ...*types.Alert) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + for _, alert := range alerts { + // Convert types.Alert to db.Alert - convert LabelSet to LabelSet for db.Alert + dbAlert := &db.Alert{ + Fingerprint: alert.Fingerprint().String(), + Labels: alert.Labels, + Annotations: alert.Annotations, + StartsAt: alert.StartsAt, + EndsAt: alert.EndsAt, + UpdatedAt: alert.UpdatedAt, + GeneratorURL: alert.GeneratorURL, + } + + // Store in database - SetAlert handles deduplication via upsert + if err := a.db.SetAlert(ctx, dbAlert); err != nil { + a.logger.Error("failed to store alert in database", "err", err, "fingerprint", dbAlert.Fingerprint) + return err + } + + a.logger.Debug("stored alert in database", "fingerprint", dbAlert.Fingerprint, "labels", dbAlert.Labels) + } + + return nil +} + +// count returns the number of alerts with the given state. +// For database provider, we'll implement a simple count based on current time. +func (a *Alerts) count(state types.AlertState) int { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + alerts, err := a.db.GetAlerts(ctx) + if err != nil { + a.logger.Error("failed to get alerts for count", "err", err) + return 0 + } + + count := 0 + now := time.Now() + + for _, dbAlert := range alerts { + labels := make(model.LabelSet) + for k, v := range dbAlert.Labels { + labels[model.LabelName(k)] = model.LabelValue(v) + } + annotations := make(model.LabelSet) + for k, v := range dbAlert.Annotations { + annotations[model.LabelName(k)] = model.LabelValue(v) + } + + alert := &types.Alert{ + Alert: model.Alert{ + Labels: labels, + Annotations: annotations, + StartsAt: dbAlert.StartsAt, + EndsAt: dbAlert.EndsAt, + }, + UpdatedAt: dbAlert.UpdatedAt, + } + + alertState := types.AlertStateUnprocessed + if alert.EndsAt.Before(now) && !alert.EndsAt.IsZero() { + // Alert is resolved + alertState = types.AlertStateUnprocessed // Resolved alerts are not counted in active metrics + } else { + status := a.marker.Status(alert.Fingerprint()) + if status.State == types.AlertStateSuppressed { + alertState = types.AlertStateSuppressed + } else { + alertState = types.AlertStateActive + } + } + + if alertState == state { + count++ + } + } + + return count +} + +// runGC periodically removes expired alerts from the database. +func (a *Alerts) runGC(ctx context.Context) { + ticker := time.NewTicker(a.maintenanceInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + a.cleanupExpiredAlerts() + } + } +} + +func (a *Alerts) cleanupExpiredAlerts() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Delete expired alerts directly using the database method + cutoffTime := time.Now().Add(-1 * time.Hour) // Keep alerts for 1 hour after they expire + + if err := a.db.DeleteExpiredAlerts(ctx, cutoffTime); err != nil { + a.logger.Error("failed to cleanup expired alerts", "err", err) + } else { + a.logger.Debug("cleanup expired alerts completed", "cutoff_time", cutoffTime) + } +} + +// Close cleans up the alert provider resources. +func (a *Alerts) Close() { + // Stop the garbage collection goroutine + if a.cancel != nil { + a.cancel() + } + // Database connections are managed by the db.DB instance + // No additional cleanup needed here +} diff --git a/provider/db/db_test.go b/provider/db/db_test.go new file mode 100644 index 0000000000..242e92c06b --- /dev/null +++ b/provider/db/db_test.go @@ -0,0 +1,22 @@ +package db + +import ( + "testing" + "log/slog" + "os" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/alertmanager/types" +) + +func TestCompilation(t *testing.T) { + // This is just a simple test to verify the package compiles + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + marker := types.NewMarker(prometheus.NewRegistry()) + + // This should compile without errors + _, err := NewAlerts(nil, marker, logger, nil) + if err != nil { + t.Fatal(err) + } +}