diff --git a/.gitignore b/.gitignore index d6a422b0552..8e1dd040e4c 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ fleet.yml.lock fleet.yml.old pkg/component/fake/component/component internal/pkg/agent/install/testblocking/testblocking +internal/pkg/otel/manager/testing/testing diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 5b6ad0364af..dcaa2ddc570 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -226,7 +226,7 @@ func New( return nil, nil, nil, errors.New(err, "failed to initialize composable controller") } - otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode) + otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig) if err != nil { return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) } diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 1e6d166beb2..76a53486d5b 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -14,7 +14,6 @@ import ( "time" "github.com/elastic/elastic-agent/internal/pkg/core/backoff" - "github.com/elastic/elastic-agent/internal/pkg/otel/translate" "go.opentelemetry.io/collector/component/componentstatus" @@ -149,15 +148,22 @@ type RuntimeManager interface { PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) } -// OTelManager provides an interface to run and update the runtime. +// OTelManager provides an interface to run components and plain otel configurations in an otel collector. type OTelManager interface { Runner - // Update updates the current configuration for OTel. - Update(cfg *confmap.Conf) + // Update updates the current plain configuration for the otel collector and components. + Update(*confmap.Conf, []component.Component) - // Watch returns the chanel to watch for configuration changes. - Watch() <-chan *status.AggregateStatus + // WatchCollector returns a channel to watch for collector status updates. + WatchCollector() <-chan *status.AggregateStatus + + // WatchComponents returns a channel to watch for component state updates. + WatchComponents() <-chan []runtime.ComponentComponentState + + // MergedOtelConfig returns the merged Otel collector configuration, containing both the plain config and the + // component config. + MergedOtelConfig() *confmap.Conf } // ConfigChange provides an interface for receiving a new configuration. @@ -240,8 +246,6 @@ type Coordinator struct { otelMgr OTelManager otelCfg *confmap.Conf - // the final config sent to the manager, contains both config from hybrid mode and from components - finalOtelCfg *confmap.Conf caps capabilities.Capabilities modifiers []ComponentsModifier @@ -364,8 +368,9 @@ type managerChans struct { varsManagerUpdate <-chan []*transpiler.Vars varsManagerError <-chan error - otelManagerUpdate chan *status.AggregateStatus - otelManagerError <-chan error + otelManagerCollectorUpdate <-chan *status.AggregateStatus + otelManagerComponentUpdate <-chan []runtime.ComponentComponentState + otelManagerError <-chan error upgradeMarkerUpdate <-chan upgrade.UpdateMarker } @@ -393,7 +398,24 @@ type UpdateComponentChange struct { } // New creates a new coordinator. -func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, otelMgr OTelManager, fleetAcker acker.Acker, modifiers ...ComponentsModifier) *Coordinator { +func New( + logger *logger.Logger, + cfg *configuration.Configuration, + logLevel logp.Level, + agentInfo info.Agent, + specs component.RuntimeSpecs, + reexecMgr ReExecManager, + upgradeMgr UpgradeManager, + runtimeMgr RuntimeManager, + configMgr ConfigManager, + varsMgr VarsManager, + caps capabilities.Capabilities, + monitorMgr MonitorManager, + isManaged bool, + otelMgr OTelManager, + fleetAcker acker.Acker, + modifiers ...ComponentsModifier, +) *Coordinator { var fleetState cproto.State var fleetMessage string if !isManaged { @@ -480,7 +502,8 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. if otelMgr != nil { // The otel manager sends updates to the watchRuntimeComponents function, which extracts component status // and forwards the rest to this channel. - c.managerChans.otelManagerUpdate = make(chan *status.AggregateStatus) + c.managerChans.otelManagerCollectorUpdate = otelMgr.WatchCollector() + c.managerChans.otelManagerComponentUpdate = otelMgr.WatchComponents() c.managerChans.otelManagerError = otelMgr.Errors() } if upgradeMgr != nil && upgradeMgr.MarkerWatcher() != nil { @@ -774,20 +797,19 @@ func (c *Coordinator) SetLogLevel(ctx context.Context, lvl *logp.Level) error { func (c *Coordinator) watchRuntimeComponents( ctx context.Context, runtimeComponentStates <-chan runtime.ComponentComponentState, - otelStatuses <-chan *status.AggregateStatus, + otelComponentStates <-chan []runtime.ComponentComponentState, ) { // We need to track otel component state separately because otel components may not always get a STOPPED status // If we receive an otel status without the state of a component we're tracking, we need to emit a fake STOPPED // status for it. Process component states should not be affected by this logic. - processState := make(map[string]runtime.ComponentState) - otelState := make(map[string]runtime.ComponentState) + state := make(map[string]runtime.ComponentState) for { select { case <-ctx.Done(): return case componentState := <-runtimeComponentStates: - logComponentStateChange(c.logger, processState, &componentState) + logComponentStateChange(c.logger, state, &componentState) // Forward the final changes back to Coordinator, unless our context // has ended. select { @@ -795,49 +817,9 @@ func (c *Coordinator) watchRuntimeComponents( case <-ctx.Done(): return } - case otelStatus := <-otelStatuses: - // We don't break on errors here, because we want to forward the status - // even if there was an error, and the rest of the code gracefully handles componentStates being nil - componentStates, err := translate.GetAllComponentStates(otelStatus, c.componentModel) - if err != nil { - c.setOTelError(err) - } - finalOtelStatus, err := translate.DropComponentStateFromOtelStatus(otelStatus) - if err != nil { - c.setOTelError(err) - finalOtelStatus = otelStatus - } - - // forward the remaining otel status - // TODO: Implement subscriptions for otel manager status to avoid the need for this - select { - case c.managerChans.otelManagerUpdate <- finalOtelStatus: - case <-ctx.Done(): - return - } - - // drop component states which don't exist in the configuration anymore - // we need to do this because we aren't guaranteed to receive a STOPPED state when the component is removed - componentIds := make(map[string]bool) - for _, componentState := range componentStates { - componentIds[componentState.Component.ID] = true - } - for id := range otelState { - if _, ok := componentIds[id]; !ok { - // this component is not in the configuration anymore, emit a fake STOPPED state - componentStates = append(componentStates, runtime.ComponentComponentState{ - Component: component.Component{ - ID: id, - }, - State: runtime.ComponentState{ - State: client.UnitStateStopped, - }, - }) - } - } - // now handle the component states + case componentStates := <-otelComponentStates: for _, componentState := range componentStates { - logComponentStateChange(c.logger, otelState, &componentState) + logComponentStateChange(c.logger, state, &componentState) // Forward the final changes back to Coordinator, unless our context // has ended. select { @@ -930,7 +912,7 @@ func (c *Coordinator) Run(ctx context.Context) error { defer watchCanceller() var subChan <-chan runtime.ComponentComponentState - var otelChan <-chan *status.AggregateStatus + var otelChan <-chan []runtime.ComponentComponentState // A real Coordinator will always have a runtime manager, but unit tests // may not initialize all managers -- in that case we leave subChan nil, // and just idle until Coordinator shuts down. @@ -938,7 +920,7 @@ func (c *Coordinator) Run(ctx context.Context) error { subChan = c.runtimeMgr.SubscribeAll(ctx).Ch() } if c.otelMgr != nil { - otelChan = c.otelMgr.Watch() + otelChan = c.otelMgr.WatchComponents() } go c.watchRuntimeComponents(watchCtx, subChan, otelChan) @@ -1234,10 +1216,11 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { - if c.finalOtelCfg == nil { + mergedCfg := c.otelMgr.MergedOtelConfig() + if mergedCfg == nil { return []byte("no active OTel configuration") } - o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap()) + o, err := yaml.Marshal(mergedCfg.ToStringMap()) if err != nil { return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err)) } @@ -1431,7 +1414,7 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { c.processVars(ctx, vars) } - case collectorStatus := <-c.managerChans.otelManagerUpdate: + case collectorStatus := <-c.managerChans.otelManagerCollectorUpdate: c.state.Collector = collectorStatus c.stateNeedsRefresh = true @@ -1649,58 +1632,25 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { c.logger.Info("Updating running component model") c.logger.With("components", model.Components).Debug("Updating running component model") - return c.updateManagersWithConfig(model) + c.updateManagersWithConfig(model) + return nil } // updateManagersWithConfig updates runtime managers with the component model and config. // Components may be sent to different runtimes depending on various criteria. -func (c *Coordinator) updateManagersWithConfig(model *component.Model) error { +func (c *Coordinator) updateManagersWithConfig(model *component.Model) { runtimeModel, otelModel := c.splitModelBetweenManagers(model) c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model") c.runtimeMgr.Update(*runtimeModel) - return c.updateOtelManagerConfig(otelModel) -} - -// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration -// from the component model passed in and from the hybrid-mode otel config set on the Coordinator. -func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { - finalOtelCfg := confmap.New() - var componentOtelCfg *confmap.Conf - if len(model.Components) > 0 { - var err error - c.logger.With("components", model.Components).Debug("Updating otel manager model") - componentOtelCfg, err = translate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig) - if err != nil { - c.logger.Errorf("failed to generate otel config: %v", err) - } - componentIDs := make([]string, 0, len(model.Components)) - for _, comp := range model.Components { + c.logger.With("components", otelModel.Components).Debug("Updating otel manager model") + if len(otelModel.Components) > 0 { + componentIDs := make([]string, 0, len(otelModel.Components)) + for _, comp := range otelModel.Components { componentIDs = append(componentIDs, comp.ID) } c.logger.With("component_ids", componentIDs).Warn("The Otel runtime manager is HIGHLY EXPERIMENTAL and only intended for testing. Use at your own risk.") } - if componentOtelCfg != nil { - err := finalOtelCfg.Merge(componentOtelCfg) - if err != nil { - c.logger.Error("failed to merge otel config: %v", err) - } - } - - if c.otelCfg != nil { - err := finalOtelCfg.Merge(c.otelCfg) - if err != nil { - c.logger.Error("failed to merge otel config: %v", err) - } - } - - if len(finalOtelCfg.AllKeys()) == 0 { - // if the config is empty, we want to send nil to the manager, so it knows to stop the collector - finalOtelCfg = nil - } - - c.otelMgr.Update(finalOtelCfg) - c.finalOtelCfg = finalOtelCfg - return nil + c.otelMgr.Update(c.otelCfg, otelModel.Components) } // splitModelBetweenManager splits the model components between the runtime manager and the otel manager. diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 4a8c0d47b85..62c9e5e2733 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1078,9 +1078,8 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt cfg.Port = 0 rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg) require.NoError(t, err) - otelMgr, err := otelmanager.NewOTelManager(l, logp.InfoLevel, l, otelmanager.EmbeddedExecutionMode) + otelMgr, err := otelmanager.NewOTelManager(l, logp.InfoLevel, l, otelmanager.EmbeddedExecutionMode, ai, monitoringMgr.ComponentMonitoringConfig) require.NoError(t, err) - caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l) require.NoError(t, err) @@ -1339,11 +1338,14 @@ func (f *fakeVarsManager) DefaultProvider() string { return "" } +var _ OTelManager = (*fakeOTelManager)(nil) + type fakeOTelManager struct { - updateCallback func(*confmap.Conf) error - result error - errChan chan error - statusChan chan *status.AggregateStatus + updateCollectorCallback func(*confmap.Conf) error + updateComponentCallback func([]component.Component) error + errChan chan error + collectorStatusChan chan *status.AggregateStatus + componentStateChan chan []runtime.ComponentComponentState } func (f *fakeOTelManager) Run(ctx context.Context) error { @@ -1352,24 +1354,37 @@ func (f *fakeOTelManager) Run(ctx context.Context) error { } func (f *fakeOTelManager) Errors() <-chan error { - return nil + return f.errChan } -func (f *fakeOTelManager) Update(cfg *confmap.Conf) { - f.result = nil - if f.updateCallback != nil { - f.result = f.updateCallback(cfg) +func (f *fakeOTelManager) Update(cfg *confmap.Conf, components []component.Component) { + var collectorResult, componentResult error + if f.updateCollectorCallback != nil { + collectorResult = f.updateCollectorCallback(cfg) } - if f.errChan != nil { - // If a reporting channel is set, send the result to it - f.errChan <- f.result + if f.errChan != nil && collectorResult != nil { + // If a reporting channel is set, send the collectorResult to it + f.errChan <- collectorResult + } + if f.updateComponentCallback != nil { + componentResult = f.updateComponentCallback(components) + } + if f.errChan != nil && componentResult != nil { + // If a reporting channel is set, send the componentResult to it + f.errChan <- componentResult } } -func (f *fakeOTelManager) Watch() <-chan *status.AggregateStatus { - return f.statusChan +func (f *fakeOTelManager) WatchCollector() <-chan *status.AggregateStatus { + return f.collectorStatusChan } +func (f *fakeOTelManager) WatchComponents() <-chan []runtime.ComponentComponentState { + return f.componentStateChan +} + +func (f *fakeOTelManager) MergedOtelConfig() *confmap.Conf { return nil } + // An implementation of the RuntimeManager interface for use in testing. type fakeRuntimeManager struct { state []runtime.ComponentComponentState diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 2252b9ce123..deb2cda4c3f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -15,7 +15,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "net" "net/http" "net/http/httptest" @@ -26,7 +25,6 @@ import ( "time" "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/elastic/elastic-agent/internal/pkg/otel/translate" "github.com/elastic/elastic-agent/internal/pkg/testutils" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" @@ -206,7 +204,7 @@ func TestCoordinatorReportsUnhealthyOTelComponents(t *testing.T) { InputChan: stateChan, }, managerChans: managerChans{ - otelManagerUpdate: otelChan, + otelManagerCollectorUpdate: otelChan, }, componentPIDTicker: time.NewTicker(time.Second * 30), } @@ -829,7 +827,7 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManager(t *testing.T) { var otelUpdated bool // Set by otel manager callback var otelConfig *confmap.Conf // Set by otel manager callback otelManager := &fakeOTelManager{ - updateCallback: func(cfg *confmap.Conf) error { + updateCollectorCallback: func(cfg *confmap.Conf) error { otelUpdated = true otelConfig = cfg return nil @@ -972,7 +970,7 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t var otelUpdated bool // Set by otel manager callback var otelConfig *confmap.Conf // Set by otel manager callback otelManager := &fakeOTelManager{ - updateCallback: func(cfg *confmap.Conf) error { + updateCollectorCallback: func(cfg *confmap.Conf) error { otelUpdated = true otelConfig = cfg return nil @@ -1158,7 +1156,7 @@ func TestCoordinatorReportsOTelManagerUpdateFailure(t *testing.T) { const errorStr = "update failed for testing reasons" runtimeManager := &fakeRuntimeManager{} otelManager := &fakeOTelManager{ - updateCallback: func(retrieved *confmap.Conf) error { + updateCollectorCallback: func(retrieved *confmap.Conf) error { return errors.New(errorStr) }, errChan: updateErrChan, @@ -1358,9 +1356,8 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) { defer cancel() logger := logp.NewLogger("testing") - statusChan := make(chan *status.AggregateStatus) - runtimeStateChan := make(chan runtime.ComponentComponentState) + componentUpdateChan := make(chan []runtime.ComponentComponentState) otelComponent := component.Component{ ID: "filestream-default", @@ -1392,33 +1389,15 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) { }, }, } + processComponent := otelComponent processComponent.RuntimeManager = component.ProcessRuntimeManager processComponent.ID = "filestream-process" - otelStatus := &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - fmt.Sprintf("pipeline:logs/%sfilestream-default", translate.OtelNamePrefix): { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - fmt.Sprintf("receiver:filebeat/%sfilestream-unit", translate.OtelNamePrefix): { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - fmt.Sprintf("exporter:elasticsearch/%sfilestream-default", translate.OtelNamePrefix): { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - }, - }, - }, - } - - invalidOtelStatus := &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - "unknown:logs/filestream-default": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, + compState := runtime.ComponentComponentState{ + Component: otelComponent, + State: runtime.ComponentState{ + State: client.UnitStateHealthy, }, } @@ -1427,31 +1406,21 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) { agentInfo: &info.AgentInfo{}, stateBroadcaster: broadcaster.New(State{}, 0, 0), managerChans: managerChans{ - otelManagerUpdate: make(chan *status.AggregateStatus), - runtimeManagerUpdate: make(chan runtime.ComponentComponentState), + otelManagerComponentUpdate: componentUpdateChan, + runtimeManagerUpdate: make(chan runtime.ComponentComponentState), }, state: State{}, } // start runtime status watching - go coord.watchRuntimeComponents(ctx, runtimeStateChan, statusChan) + go coord.watchRuntimeComponents(ctx, runtimeStateChan, componentUpdateChan) // no component status assert.Empty(t, coord.state.Components) - coord.componentModel = []component.Component{otelComponent} - - // push the status into the coordinator + // push the otel component state into the coordinator select { - case statusChan <- otelStatus: - case <-ctx.Done(): - t.Fatal("timeout waiting for coordinator to receive status") - } - - select { - case finalOtelStatus := <-coord.managerChans.otelManagerUpdate: - // we shouldn't have any status remaining for the otel collector - assert.Empty(t, finalOtelStatus.ComponentStatusMap) + case componentUpdateChan <- []runtime.ComponentComponentState{compState}: case <-ctx.Done(): t.Fatal("timeout waiting for coordinator to receive status") } @@ -1465,9 +1434,6 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) { assert.Len(t, coord.state.Components, 1) - // Add both a process component and an otel component, in that order. Both should appear in the state. - coord.componentModel = []component.Component{otelComponent, processComponent} - // push the process component state into the coordinator select { case runtimeStateChan <- runtime.ComponentComponentState{ @@ -1487,45 +1453,16 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) { t.Fatal("timeout waiting for coordinator to receive status") } - // push the otel status into the coordinator - select { - case statusChan <- otelStatus: - case <-ctx.Done(): - t.Fatal("timeout waiting for coordinator to receive status") - } - - select { - case finalOtelStatus := <-coord.managerChans.otelManagerUpdate: - // we shouldn't have any status remaining for the otel collector, as the status we've pushed earlier only - // contains beats receiver status for the "filestream-default" component - // this status is removed from the otel collector status, because it's reported as component state instead - assert.Empty(t, finalOtelStatus.ComponentStatusMap) - case <-ctx.Done(): - t.Fatal("timeout waiting for coordinator to receive status") - } - - select { - case componentState := <-coord.managerChans.runtimeManagerUpdate: - coord.applyComponentState(componentState) - case <-ctx.Done(): - t.Fatal("timeout waiting for coordinator to receive status") - } - assert.Len(t, coord.state.Components, 2) - // Now, we remove the component and resend the same status. The component state should be deleted. - coord.componentModel = []component.Component{} - coord.state = State{} + // Push a stopped status, there should be no otel component state select { - case statusChan <- otelStatus: - case <-ctx.Done(): - t.Fatal("timeout waiting for coordinator to receive status") - } - - select { - case finalOtelStatus := <-coord.managerChans.otelManagerUpdate: - // we shouldn't have any status remaining for the otel collector - assert.Empty(t, finalOtelStatus.ComponentStatusMap) + case componentUpdateChan <- []runtime.ComponentComponentState{{ + Component: otelComponent, + State: runtime.ComponentState{ + State: client.UnitStateStopped, + }, + }}: case <-ctx.Done(): t.Fatal("timeout waiting for coordinator to receive status") } @@ -1537,25 +1474,7 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) { t.Fatal("timeout waiting for coordinator to receive status") } - assert.Empty(t, coord.state.Components) - - // Push an invalid status, there should be no otel component state, but there should be an otel status - select { - case statusChan <- invalidOtelStatus: - case <-ctx.Done(): - t.Fatal("timeout waiting for coordinator to receive status") - } - - select { - case finalOtelStatus := <-coord.managerChans.otelManagerUpdate: - // we should have otel status with pipelines that didn't parse correctly - assert.NotEmpty(t, finalOtelStatus.ComponentStatusMap) - case <-ctx.Done(): - t.Fatal("timeout waiting for coordinator to receive status") - } - - assert.Empty(t, coord.state.Components) - assert.Equal(t, coord.otelErr.Error(), "pipeline status id unknown:logs/filestream-default is not a pipeline") + assert.Len(t, coord.state.Components, 1) } func TestCoordinatorInitiatesUpgrade(t *testing.T) { diff --git a/internal/pkg/otel/manager/common.go b/internal/pkg/otel/manager/common.go index 218bbebd95d..3127a70e5aa 100644 --- a/internal/pkg/otel/manager/common.go +++ b/internal/pkg/otel/manager/common.go @@ -33,12 +33,22 @@ func reportErr(ctx context.Context, errCh chan error, err error) { } } -// reportStatus sends the new status to the status channel. -func reportStatus(ctx context.Context, statusCh chan *status.AggregateStatus, statuses *status.AggregateStatus) { +// reportCollectorStatus sends a status to the provided channel. It first drains the channel +// to ensure that only the most recent status is kept, as intermediate statuses can be safely discarded. +// This ensures the receiver always observes the latest reported status. +func reportCollectorStatus(ctx context.Context, statusCh chan *status.AggregateStatus, collectorStatus *status.AggregateStatus) { + select { + case <-ctx.Done(): + // context is already done + return + case <-statusCh: + // drain the channel first + default: + } select { case <-ctx.Done(): return - case statusCh <- statuses: + case statusCh <- collectorStatus: } } diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index c94620e2991..d925461ca3d 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -124,7 +124,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger close(healthCheckDone) }() currentStatus := aggregateStatus(componentstatus.StatusStarting, nil) - reportStatus(ctx, statusCh, currentStatus) + reportCollectorStatus(ctx, statusCh, currentStatus) // specify a max duration of not being able to get the status from the collector const maxFailuresDuration = 130 * time.Second @@ -140,7 +140,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger if err != nil { switch { case errors.Is(err, context.Canceled): - reportStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + reportCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) return } } else { @@ -148,13 +148,13 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger if !compareStatuses(currentStatus, statuses) { currentStatus = statuses - reportStatus(procCtx, statusCh, statuses) + reportCollectorStatus(procCtx, statusCh, statuses) } } select { case <-procCtx.Done(): - reportStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + reportCollectorStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) return case <-healthCheckPollTimer.C: healthCheckPollTimer.Reset(healthCheckPollDuration) @@ -165,7 +165,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger ) if !compareStatuses(currentStatus, failedToConnectStatuses) { currentStatus = statuses - reportStatus(procCtx, statusCh, statuses) + reportCollectorStatus(procCtx, statusCh, statuses) } } } diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 8a2057adb43..46b2d7bf6c5 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -9,9 +9,18 @@ import ( "errors" "fmt" "os" + "sync" "sync/atomic" "time" + "go.uber.org/zap" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/otel/translate" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/confmap" @@ -40,6 +49,11 @@ type collectorRecoveryTimer interface { C() <-chan time.Time } +type configUpdate struct { + collectorCfg *confmap.Conf + components []component.Component +} + // OTelManager is a manager that manages the lifecycle of the OTel collector inside of the Elastic Agent. type OTelManager struct { // baseLogger is the base logger for the otel collector, and doesn't include any agent-specific fields. @@ -47,16 +61,30 @@ type OTelManager struct { logger *logger.Logger errCh chan error + // Agent info and monitoring config getter for otel config generation + agentInfo info.Agent + beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter + + collectorCfg *confmap.Conf + components []component.Component + // The current configuration that the OTel collector is using. In the case that - // the cfg is nil then the collector is not running. - cfg *confmap.Conf + // the mergedCollectorCfg is nil then the collector is not running. + mergedCollectorCfg *confmap.Conf + + currentCollectorStatus *status.AggregateStatus + currentComponentStates map[string]runtime.ComponentComponentState - // cfg is changed by sending its new value on cfgCh, where it is - // handled by (*OTelManager).Run. - cfgCh chan *confmap.Conf + // Update channels for forwarding updates to the run loop + updateCh chan configUpdate - // stateCh passes the state information of the collector. - statusCh chan *status.AggregateStatus + // Status channels for reading status from the run loop + collectorStatusCh chan *status.AggregateStatus + componentStateCh chan []runtime.ComponentComponentState + + // This mutex is used to protect access to attributes read outside the run loop. This happens when reading the + // merged config and generating diagnostics. + mx sync.RWMutex // doneChan is closed when Run is stopped to signal that any // pending update calls should be ignored. @@ -71,10 +99,19 @@ type OTelManager struct { // execution is used to invoke the collector into different execution modes execution collectorExecution + + proc collectorHandle } // NewOTelManager returns a OTelManager. -func NewOTelManager(logger *logger.Logger, logLevel logp.Level, baseLogger *logger.Logger, mode ExecutionMode) (*OTelManager, error) { +func NewOTelManager( + logger *logger.Logger, + logLevel logp.Level, + baseLogger *logger.Logger, + mode ExecutionMode, + agentInfo info.Agent, + beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter, +) (*OTelManager, error) { var exec collectorExecution var recoveryTimer collectorRecoveryTimer switch mode { @@ -97,43 +134,49 @@ func NewOTelManager(logger *logger.Logger, logLevel logp.Level, baseLogger *logg logger.Debugf("Using collector execution mode: %s", mode) return &OTelManager{ - logger: logger, - baseLogger: baseLogger, - errCh: make(chan error, 1), // holds at most one error - cfgCh: make(chan *confmap.Conf), - statusCh: make(chan *status.AggregateStatus), - doneChan: make(chan struct{}), - execution: exec, - recoveryTimer: recoveryTimer, + logger: logger, + baseLogger: baseLogger, + agentInfo: agentInfo, + beatMonitoringConfigGetter: beatMonitoringConfigGetter, + errCh: make(chan error, 1), // holds at most one error + collectorStatusCh: make(chan *status.AggregateStatus, 1), + componentStateCh: make(chan []runtime.ComponentComponentState, 1), + updateCh: make(chan configUpdate), + doneChan: make(chan struct{}), + execution: exec, + recoveryTimer: recoveryTimer, }, nil } // Run runs the lifecycle of the manager. func (m *OTelManager) Run(ctx context.Context) error { - var ( - err error - proc collectorHandle - ) + var err error + m.proc = nil // signal that the run loop is ended to unblock any incoming update calls defer close(m.doneChan) // collectorRunErr is used to signal that the collector has exited. collectorRunErr := make(chan error) + + // collectorStatusCh is used internally by the otel collector to send status updates to the manager + // this channel is buffered because it's possible for the collector to send a status update while the manager is + // waiting for the collector to exit + collectorStatusCh := make(chan *status.AggregateStatus, 1) for { select { case <-ctx.Done(): m.recoveryTimer.Stop() // our caller context is cancelled so stop the collector and return // has exited. - if proc != nil { - proc.Stop(ctx) + if m.proc != nil { + m.proc.Stop(ctx) } return ctx.Err() case <-m.recoveryTimer.C(): m.recoveryTimer.Stop() - if m.cfg == nil || proc != nil || ctx.Err() != nil { + if m.mergedCollectorCfg == nil || m.proc != nil || ctx.Err() != nil { // no configuration, or the collector is already running, or the context // is cancelled. continue @@ -141,7 +184,7 @@ func (m *OTelManager) Run(ctx context.Context) error { newRetries := m.recoveryRetries.Add(1) m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) - proc, err = m.execution.startCollector(ctx, m.baseLogger, m.cfg, collectorRunErr, m.statusCh) + m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh) if err != nil { reportErr(ctx, m.errCh, err) // reset the restart timer to the next backoff @@ -155,13 +198,16 @@ func (m *OTelManager) Run(ctx context.Context) error { m.recoveryTimer.Stop() if err == nil { // err is nil means that the collector has exited cleanly without an error - if proc != nil { - proc.Stop(ctx) - proc = nil - reportStatus(ctx, m.statusCh, nil) + if m.proc != nil { + m.proc.Stop(ctx) + m.proc = nil + updateErr := m.reportOtelStatusUpdate(ctx, nil) + if updateErr != nil { + reportErr(ctx, m.errCh, updateErr) + } } - if m.cfg == nil { + if m.mergedCollectorCfg == nil { // no configuration then the collector should not be // running. // ensure that the coordinator knows that there is no error @@ -174,7 +220,7 @@ func (m *OTelManager) Run(ctx context.Context) error { // in this rare case the collector stopped running but a configuration was // provided and the collector stopped with a clean exit - proc, err = m.execution.startCollector(ctx, m.baseLogger, m.cfg, collectorRunErr, m.statusCh) + m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure @@ -195,12 +241,15 @@ func (m *OTelManager) Run(ctx context.Context) error { // // in the case that the configuration is invalid there is no reason to // try again as it will keep failing so we do not trigger a restart - if proc != nil { - proc.Stop(ctx) - proc = nil + if m.proc != nil { + m.proc.Stop(ctx) + m.proc = nil // don't wait here for <-collectorRunErr, already occurred // clear status, no longer running - reportStatus(ctx, m.statusCh, nil) + updateErr := m.reportOtelStatusUpdate(ctx, nil) + if updateErr != nil { + err = errors.Join(err, updateErr) + } } // pass the error to the errCh so the coordinator, unless it's a cancel error if !errors.Is(err, context.Canceled) { @@ -211,49 +260,25 @@ func (m *OTelManager) Run(ctx context.Context) error { } } - case cfg := <-m.cfgCh: + case cfgUpdate := <-m.updateCh: // we received a new configuration, thus stop the recovery timer // and reset the retry count m.recoveryTimer.Stop() m.recoveryRetries.Store(0) - m.cfg = cfg - - if proc != nil { - proc.Stop(ctx) - proc = nil - select { - case <-collectorRunErr: - case <-ctx.Done(): - // our caller ctx is Done - return ctx.Err() - } - reportStatus(ctx, m.statusCh, nil) + err = m.handleConfigUpdate(cfgUpdate.collectorCfg, cfgUpdate.components) + if err != nil { + reportErr(ctx, m.errCh, err) + continue } - if cfg == nil { - // no configuration then the collector should not be - // running. - // ensure that the coordinator knows that there is no error - // as the collector is not running anymore - reportErr(ctx, m.errCh, nil) - } else { - // either a new configuration or the first configuration - // that results in the collector being started - proc, err = m.execution.startCollector(ctx, m.baseLogger, m.cfg, collectorRunErr, m.statusCh) - if err != nil { - // failed to create the collector (this is different then - // it's failing to run). we do not retry creation on failure - // as it will always fail. A new configuration is required for - // it not to fail (a new configuration will result in the retry) - reportErr(ctx, m.errCh, err) - // since this is a new configuration we want to start the timer - // from the initial delay - recoveryDelay := m.recoveryTimer.ResetInitial() - m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) - } else { - // all good at the moment (possible that it will fail) - reportErr(ctx, m.errCh, nil) - } + err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr) + // report the error unconditionally to indicate that the config was applied + reportErr(ctx, m.errCh, err) + + case otelStatus := <-collectorStatusCh: + err = m.reportOtelStatusUpdate(ctx, otelStatus) + if err != nil { + reportErr(ctx, m.errCh, err) } } } @@ -264,20 +289,244 @@ func (m *OTelManager) Errors() <-chan error { return m.errCh } -// Update updates the configuration. -// -// When nil is passed for the cfg, then the collector is stopped. -func (m *OTelManager) Update(cfg *confmap.Conf) { +// handleConfigUpdate processes collector and component configuration updates received through the updateCh. +// This method updates the internal collector and component configurations and triggers a rebuild of the merged +// configuration that combines them. +func (m *OTelManager) handleConfigUpdate(cfg *confmap.Conf, components []component.Component) error { + m.collectorCfg = cfg + m.components = components + return m.updateMergedConfig() +} + +// buildMergedConfig combines collector configuration with component-derived configuration. +func (m *OTelManager) buildMergedConfig() (*confmap.Conf, error) { + mergedOtelCfg := confmap.New() + + // Generate component otel config if there are components + var componentOtelCfg *confmap.Conf + if len(m.components) > 0 { + model := &component.Model{Components: m.components} + var err error + componentOtelCfg, err = translate.GetOtelConfig(model, m.agentInfo, m.beatMonitoringConfigGetter) + if err != nil { + return nil, fmt.Errorf("failed to generate otel config: %w", err) + } + } + + // If both configs are nil, return nil so the manager knows to stop the collector + if componentOtelCfg == nil && m.collectorCfg == nil { + return nil, nil + } + + // Merge component config if it exists + if componentOtelCfg != nil { + err := mergedOtelCfg.Merge(componentOtelCfg) + if err != nil { + return nil, fmt.Errorf("failed to merge component otel config: %w", err) + } + } + + // Merge with base collector config if it exists + if m.collectorCfg != nil { + err := mergedOtelCfg.Merge(m.collectorCfg) + if err != nil { + return nil, fmt.Errorf("failed to merge collector otel config: %w", err) + } + } + + return mergedOtelCfg, nil +} + +// updateMergedConfig builds the merged configuration for the otel manager by merging the base collector configuration +// with the component configuration, and updates the otel manager with the merged configuration. +func (m *OTelManager) updateMergedConfig() error { + mergedCfg, err := m.buildMergedConfig() + if err != nil { + return err + } + + m.mx.Lock() + defer m.mx.Unlock() + m.mergedCollectorCfg = mergedCfg + return nil +} + +func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error { + if m.proc != nil { + m.proc.Stop(ctx) + m.proc = nil + select { + case <-collectorRunErr: + case <-ctx.Done(): + // our caller ctx is Done + return ctx.Err() + } + // drain the internal status update channel + // this status handling is normally done in the main loop, but in this case we want to ensure that we emit a + // nil status after the collector has stopped + select { + case statusCh := <-collectorStatusCh: + updateErr := m.reportOtelStatusUpdate(ctx, statusCh) + if updateErr != nil { + m.logger.Error("failed to update otel status", zap.Error(updateErr)) + } + case <-ctx.Done(): + // our caller ctx is Done + return ctx.Err() + default: + } + err := m.reportOtelStatusUpdate(ctx, nil) + if err != nil { + return err + } + } + + if m.mergedCollectorCfg == nil { + // no configuration then the collector should not be + // running. + // ensure that the coordinator knows that there is no error + // as the collector is not running anymore + return nil + } else { + // either a new configuration or the first configuration + // that results in the collector being started + proc, err := m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh) + if err != nil { + // failed to create the collector (this is different then + // it's failing to run). we do not retry creation on failure + // as it will always fail. A new configuration is required for + // it not to fail (a new configuration will result in the retry) + // since this is a new configuration we want to start the timer + // from the initial delay + recoveryDelay := m.recoveryTimer.ResetInitial() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) + return err + } else { + // all good at the moment (possible that it will fail) + m.proc = proc + } + } + return nil +} + +// Update sends collector configuration and component updates to the manager's run loop. +func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component) { + cfgUpdate := configUpdate{ + collectorCfg: cfg, + components: components, + } select { - case m.cfgCh <- cfg: + case m.updateCh <- cfgUpdate: case <-m.doneChan: - // shutting down, ignore the update + // Manager is shutting down, ignore the update } } -// Watch returns a channel to watch for state information. -// -// This must be called and the channel must be read from, or it will block this manager. -func (m *OTelManager) Watch() <-chan *status.AggregateStatus { - return m.statusCh +// WatchCollector returns a read-only channel that provides collector status updates. +func (m *OTelManager) WatchCollector() <-chan *status.AggregateStatus { + return m.collectorStatusCh +} + +// WatchComponents returns a read-only channel that provides component state updates. +func (m *OTelManager) WatchComponents() <-chan []runtime.ComponentComponentState { + return m.componentStateCh +} + +func (m *OTelManager) MergedOtelConfig() *confmap.Conf { + m.mx.RLock() + defer m.mx.RUnlock() + return m.mergedCollectorCfg +} + +// handleOtelStatusUpdate processes status updates from the underlying OTelManager. +// This method extracts component states from the aggregate status, updates internal state tracking, +// and prepares component state updates for distribution to watchers. +// Returns component state updates and any error encountered during processing. +func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus) ([]runtime.ComponentComponentState, error) { + // Extract component states from otel status + componentStates, err := translate.GetAllComponentStates(otelStatus, m.components) + if err != nil { + return nil, fmt.Errorf("failed to extract component states: %w", err) + } + + // Drop component state information from otel status + finalStatus, err := translate.DropComponentStateFromOtelStatus(otelStatus) + if err != nil { + return nil, fmt.Errorf("failed to drop component state from otel status: %w", err) + } + + // Update the current collector status to the cleaned status (after dropping component states) + m.currentCollectorStatus = finalStatus + + // Handle component state updates + return m.processComponentStates(componentStates), nil +} + +// reportOtelStatusUpdate processes status updates from the underlying otel collector and reports separate collector +// and component state updates to the external watch channels. +func (m *OTelManager) reportOtelStatusUpdate(ctx context.Context, otelStatus *status.AggregateStatus) error { + componentUpdates, err := m.handleOtelStatusUpdate(otelStatus) + if err != nil { + return err + } + reportCollectorStatus(ctx, m.collectorStatusCh, m.currentCollectorStatus) + m.reportComponentStateUpdates(ctx, componentUpdates) + return nil +} + +// processComponentStates updates the internal component state tracking and handles cleanup +// of components that are no longer in the configuration. This method ensures that removed +// components are properly marked as STOPPED even if no explicit stop event was received. +func (m *OTelManager) processComponentStates(componentStates []runtime.ComponentComponentState) []runtime.ComponentComponentState { + // Drop component states which don't exist in the configuration anymore + // we need to do this because we aren't guaranteed to receive a STOPPED state when the component is removed + componentIds := make(map[string]bool) + for _, componentState := range componentStates { + componentIds[componentState.Component.ID] = true + } + for id := range m.currentComponentStates { + if _, ok := componentIds[id]; !ok { + // this component is not in the configuration anymore, emit a fake STOPPED state + componentStates = append(componentStates, runtime.ComponentComponentState{ + Component: component.Component{ + ID: id, + }, + State: runtime.ComponentState{ + State: client.UnitStateStopped, + }, + }) + } + } + + // update the current state + m.currentComponentStates = make(map[string]runtime.ComponentComponentState) + for _, componentState := range componentStates { + if componentState.State.State == client.UnitStateStopped { + delete(m.currentComponentStates, componentState.Component.ID) + } else { + m.currentComponentStates[componentState.Component.ID] = componentState + } + } + + return componentStates +} + +// reportComponentStateUpdates sends component state updates to the component watch channel. It first drains +// the channel to ensure that only the most recent status is kept, as intermediate statuses can be safely discarded. +// This ensures the receiver always observes the latest reported status. +func (m *OTelManager) reportComponentStateUpdates(ctx context.Context, componentUpdates []runtime.ComponentComponentState) { + select { + case <-ctx.Done(): + // context is already done + return + case <-m.componentStateCh: + // drain the channel first + default: + } + select { + case m.componentStateCh <- componentUpdates: + case <-ctx.Done(): + // Manager is shutting down, ignore the update + return + } } diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 68f482ba63a..4998c849ae1 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -15,6 +15,14 @@ import ( "testing" "time" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/internal/pkg/otel/translate" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/version" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -67,26 +75,75 @@ var ( } ) -type mockExecution struct { +type testExecution struct { mtx sync.Mutex exec collectorExecution handle collectorHandle } -func (m *mockExecution) startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) { - m.mtx.Lock() - defer m.mtx.Unlock() +func (e *testExecution) startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) { + e.mtx.Lock() + defer e.mtx.Unlock() var err error - m.handle, err = m.exec.startCollector(ctx, logger, cfg, errCh, statusCh) - return m.handle, err + e.handle, err = e.exec.startCollector(ctx, logger, cfg, errCh, statusCh) + return e.handle, err +} + +func (e *testExecution) getProcessHandle() collectorHandle { + e.mtx.Lock() + defer e.mtx.Unlock() + + return e.handle +} + +var _ collectorExecution = &mockExecution{} + +type mockExecution struct { + errCh chan error + statusCh chan *status.AggregateStatus + cfg *confmap.Conf + collectorStarted chan struct{} +} + +func (e *mockExecution) startCollector( + ctx context.Context, + _ *logger.Logger, + cfg *confmap.Conf, + errCh chan error, + statusCh chan *status.AggregateStatus, +) (collectorHandle, error) { + e.errCh = errCh + e.statusCh = statusCh + e.cfg = cfg + stopCh := make(chan struct{}) + collectorCtx, collectorCancel := context.WithCancel(ctx) + go func() { + <-collectorCtx.Done() + close(stopCh) + reportErr(ctx, errCh, nil) + }() + handle := &mockCollectorHandle{ + stopCh: stopCh, + cancel: collectorCancel, + } + e.collectorStarted <- struct{}{} + return handle, nil } -func (m *mockExecution) getProcessHandle() collectorHandle { - m.mtx.Lock() - defer m.mtx.Unlock() +var _ collectorHandle = &mockCollectorHandle{} + +type mockCollectorHandle struct { + stopCh chan struct{} + cancel context.CancelFunc +} - return m.handle +func (h *mockCollectorHandle) Stop(ctx context.Context) { + h.cancel() + select { + case <-ctx.Done(): + case <-h.stopCh: + } } // EventListener listens to the events from the OTelManager and stores the latest error and status. @@ -138,8 +195,13 @@ func (e *EventListener) EnsureHealthy(t *testing.T, u time.Time) { e.mtx.Unlock() // we expect to have a reported error which is nil and a reported status which is StatusOK - require.False(collect, latestErr == nil || latestErr.Before(u) || latestErr.Value() != nil) - require.False(collect, latestStatus == nil || latestStatus.Value() == nil || latestStatus.Before(u) || latestStatus.Value().Status() != componentstatus.StatusOK) + require.NotNil(collect, latestErr) + assert.Nil(collect, latestErr.Value()) + assert.False(collect, latestErr.Before(u)) + require.NotNil(collect, latestStatus) + require.NotNil(collect, latestStatus.Value()) + assert.False(collect, latestStatus.Before(u)) + require.Equal(collect, componentstatus.StatusOK, latestStatus.Value().Status()) }, 60*time.Second, 1*time.Second, "otel collector never got healthy") } @@ -152,8 +214,12 @@ func (e *EventListener) EnsureOffWithoutError(t *testing.T, u time.Time) { e.mtx.Unlock() // we expect to have a reported error which is nil and a reported status which is nil - require.False(collect, latestErr == nil || latestErr.Before(u) || latestErr.Value() != nil) - require.False(collect, latestStatus == nil || latestStatus.Before(u) || latestStatus.Value() != nil) + require.NotNil(collect, latestErr) + assert.Nil(collect, latestErr.Value()) + assert.False(collect, latestErr.Before(u)) + require.NotNil(collect, latestStatus) + assert.Nil(collect, latestStatus.Value()) + assert.False(collect, latestStatus.Before(u)) }, 60*time.Second, 1*time.Second, "otel collector never stopped without an error") } @@ -208,53 +274,53 @@ func TestOTelManager_Run(t *testing.T) { for _, tc := range []struct { name string - exec *mockExecution + exec *testExecution restarter collectorRecoveryTimer skipListeningErrors bool - testFn func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) + testFn func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) }{ { name: "embedded collector config updates", - exec: &mockExecution{exec: newExecutionEmbedded()}, + exec: &testExecution{exec: newExecutionEmbedded()}, restarter: newRestarterNoop(), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg) + m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) // trigger update (no config compare is due externally to otel collector) updateTime = time.Now() - m.Update(cfg) + m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) // no configuration should stop the runner updateTime = time.Now() - m.Update(nil) + m.Update(nil, nil) e.EnsureOffWithoutError(t, updateTime) require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") }, }, { name: "subprocess collector config updates", - exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg) + m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) // trigger update (no config compare is due externally to otel collector) updateTime = time.Now() - m.Update(cfg) + m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) // no configuration should stop the runner updateTime = time.Now() - m.Update(nil) + m.Update(nil, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") @@ -262,13 +328,13 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "embedded collector stopped gracefully outside manager", - exec: &mockExecution{exec: newExecutionEmbedded()}, + exec: &testExecution{exec: newExecutionEmbedded()}, restarter: newRestarterNoop(), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg) + m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) // stop it, this should be restarted by the manager @@ -279,20 +345,20 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil) + m.Update(nil, nil) e.EnsureOffWithoutError(t, updateTime) require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") }, }, { name: "subprocess collector stopped gracefully outside manager", - exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg) + m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) // stop it, this should be restarted by the manager @@ -303,7 +369,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil) + m.Update(nil, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") @@ -311,13 +377,13 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector killed outside manager", - exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) updateTime := time.Now() - m.Update(cfg) + m.Update(cfg, nil) e.EnsureHealthy(t, updateTime) var oldPHandle *procHandle @@ -342,7 +408,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil) + m.Update(nil, nil) e.EnsureOffWithoutError(t, updateTime) assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.Equal(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") @@ -350,9 +416,9 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector panics", - exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { err := os.Setenv("TEST_SUPERVISED_COLLECTOR_PANIC", (3 * time.Second).String()) require.NoError(t, err, "failed to set TEST_SUPERVISED_COLLECTOR_PANIC env var") t.Cleanup(func() { @@ -361,7 +427,7 @@ func TestOTelManager_Run(t *testing.T) { // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg) + m.Update(cfg, nil) seenRecoveredTimes := uint32(0) require.Eventually(t, func() bool { @@ -376,7 +442,7 @@ func TestOTelManager_Run(t *testing.T) { // no configuration should stop the runner updateTime = time.Now() - m.Update(nil) + m.Update(nil, nil) e.EnsureOffWithoutError(t, updateTime) require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") assert.GreaterOrEqual(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") @@ -384,15 +450,15 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "embedded collector invalid config", - exec: &mockExecution{exec: newExecutionEmbedded()}, + exec: &testExecution{exec: newExecutionEmbedded()}, restarter: newRestarterNoop(), skipListeningErrors: true, - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { // Errors channel is non-blocking, should be able to send an Update that causes an error multiple // times without it blocking on sending over the errCh. for range 3 { cfg := confmap.New() // invalid config - m.Update(cfg) + m.Update(cfg, nil) // delay between updates to ensure the collector will have to fail <-time.After(100 * time.Millisecond) @@ -423,15 +489,15 @@ func TestOTelManager_Run(t *testing.T) { }, { name: "subprocess collector invalid config", - exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + exec: &testExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), skipListeningErrors: true, - testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution) { // Errors channel is non-blocking, should be able to send an Update that causes an error multiple // times without it blocking on sending over the errCh. for range 3 { cfg := confmap.New() // invalid config - m.Update(cfg) + m.Update(cfg, nil) // delay between updates to ensure the collector will have to fail <-time.After(100 * time.Millisecond) @@ -464,18 +530,18 @@ func TestOTelManager_Run(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() - l, _ := loggertest.New("otel") - base, _ := loggertest.New("otel") + base, obs := loggertest.New("otel") m := &OTelManager{ - logger: l, - baseLogger: base, - errCh: make(chan error, 1), // holds at most one error - cfgCh: make(chan *confmap.Conf), - statusCh: make(chan *status.AggregateStatus), - doneChan: make(chan struct{}), - recoveryTimer: tc.restarter, - execution: tc.exec, + logger: l, + baseLogger: base, + errCh: make(chan error, 1), // holds at most one error + updateCh: make(chan configUpdate), + collectorStatusCh: make(chan *status.AggregateStatus), + componentStateCh: make(chan []runtime.ComponentComponentState, 1), + doneChan: make(chan struct{}), + recoveryTimer: tc.restarter, + execution: tc.exec, } eListener := &EventListener{} @@ -485,6 +551,9 @@ func TestOTelManager_Run(t *testing.T) { } t.Logf("latest received err: %s", eListener.getError()) t.Logf("latest received status: %s", statusToYaml(eListener.getStatus())) + for _, entry := range obs.All() { + t.Logf("%+v", entry) + } }() runWg := sync.WaitGroup{} @@ -492,9 +561,9 @@ func TestOTelManager_Run(t *testing.T) { go func() { defer runWg.Done() if !tc.skipListeningErrors { - eListener.Listen(ctx, m.Errors(), m.Watch()) + eListener.Listen(ctx, m.Errors(), m.WatchCollector()) } else { - eListener.Listen(ctx, nil, m.Watch()) + eListener.Listen(ctx, nil, m.WatchCollector()) } }() @@ -521,7 +590,7 @@ func TestOTelManager_Logging(t *testing.T) { defer cancel() base, obs := loggertest.New("otel") l, _ := loggertest.New("otel-manager") - m, err := NewOTelManager(l, logp.DebugLevel, base, EmbeddedExecutionMode) + m, err := NewOTelManager(l, logp.DebugLevel, base, EmbeddedExecutionMode, nil, nil) require.NoError(t, err, "could not create otel manager") go func() { @@ -533,7 +602,7 @@ func TestOTelManager_Logging(t *testing.T) { go func() { for { select { - case <-m.Watch(): + case <-m.WatchCollector(): case <-ctx.Done(): return } @@ -541,7 +610,7 @@ func TestOTelManager_Logging(t *testing.T) { }() cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg) + m.Update(cfg, nil) // the collector should log to the base logger assert.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -585,3 +654,712 @@ func toSerializableStatus(s *status.AggregateStatus) *serializableStatus { } return outputStruct } + +// Mock function for BeatMonitoringConfigGetter +func mockBeatMonitoringConfigGetter(unitID, binary string) map[string]any { + return map[string]any{"test": "config"} +} + +// Helper function to create test logger +func newTestLogger() *logger.Logger { + l, _ := loggertest.New("test") + return l +} + +func TestOTelManager_buildMergedConfig(t *testing.T) { + // Common parameters used across all test cases + commonAgentInfo := &info.AgentInfo{} + commonBeatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + testComp := testComponent("test-component") + + tests := []struct { + name string + collectorCfg *confmap.Conf + components []component.Component + expectedKeys []string + expectedErrorString string + }{ + { + name: "nil config returns nil", + collectorCfg: nil, + components: nil, + }, + { + name: "empty config returns empty config", + collectorCfg: nil, + components: nil, + expectedKeys: []string{}, + }, + { + name: "collector config only", + collectorCfg: confmap.NewFromStringMap(map[string]any{"receivers": map[string]any{"nop": map[string]any{}}}), + components: nil, + expectedKeys: []string{"receivers"}, + }, + { + name: "components only", + collectorCfg: nil, + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service"}, + }, + { + name: "both collector config and components", + collectorCfg: confmap.NewFromStringMap(map[string]any{"processors": map[string]any{"batch": map[string]any{}}}), + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + }, + { + name: "component config generation error", + collectorCfg: nil, + components: []component.Component{{ + ID: "test-component", + InputType: "filestream", // Supported input type + OutputType: "elasticsearch", // Supported output type + // Missing InputSpec which should cause an error during config generation + }}, + expectedErrorString: "failed to generate otel config: unknown otel receiver type for input type: filestream", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + collectorCfg: tt.collectorCfg, + components: tt.components, + agentInfo: commonAgentInfo, + beatMonitoringConfigGetter: commonBeatMonitoringConfigGetter, + } + + result, err := mgr.buildMergedConfig() + + if tt.expectedErrorString != "" { + assert.Error(t, err) + assert.Equal(t, tt.expectedErrorString, err.Error()) + assert.Nil(t, result) + return + } + + assert.NoError(t, err) + + if len(tt.expectedKeys) == 0 { + assert.Nil(t, result) + return + } + + require.NotNil(t, result) + for _, key := range tt.expectedKeys { + assert.True(t, result.IsSet(key), "Expected key %s to be set", key) + } + }) + } +} + +func TestOTelManager_handleConfigUpdate(t *testing.T) { + testComp := testComponent("test-component") + t.Run("successful update with empty collector config and components", func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + agentInfo: &info.AgentInfo{}, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + } + + err := mgr.handleConfigUpdate(nil, nil) + + assert.NoError(t, err) + assert.Nil(t, mgr.components) + // Verify that Update was called with nil config (empty components should result in nil config) + assert.Nil(t, mgr.mergedCollectorCfg) + }) + + t.Run("successful update with components and empty collector config", func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + agentInfo: &info.AgentInfo{}, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + } + + components := []component.Component{testComp} + err := mgr.handleConfigUpdate(nil, components) + + assert.NoError(t, err) + assert.Equal(t, components, mgr.components) + // Verify that Update was called with a valid configuration + assert.NotNil(t, mgr.mergedCollectorCfg) + // Verify that the configuration contains expected OpenTelemetry sections + assert.True(t, mgr.mergedCollectorCfg.IsSet("receivers"), "Expected receivers section in config") + assert.True(t, mgr.mergedCollectorCfg.IsSet("exporters"), "Expected exporters section in config") + assert.True(t, mgr.mergedCollectorCfg.IsSet("service"), "Expected service section in config") + }) + + t.Run("successful update with empty components and collector config", func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + agentInfo: &info.AgentInfo{}, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + } + + collectorConfig := confmap.NewFromStringMap(map[string]any{ + "receivers": map[string]any{ + "nop": map[string]any{}, + }, + "processors": map[string]any{ + "batch": map[string]any{}, + }, + }) + + err := mgr.handleConfigUpdate(collectorConfig, nil) + + assert.NoError(t, err) + assert.Equal(t, collectorConfig, mgr.collectorCfg) + assert.Equal(t, collectorConfig, mgr.MergedOtelConfig()) + // Verify that Update was called with the collector configuration + assert.NotNil(t, mgr.mergedCollectorCfg) + // Verify that the configuration contains expected collector sections + assert.True(t, mgr.mergedCollectorCfg.IsSet("receivers"), "Expected receivers section in config") + assert.True(t, mgr.mergedCollectorCfg.IsSet("processors"), "Expected processors section in config") + }) + + t.Run("successful update with both collector config and components", func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + agentInfo: &info.AgentInfo{}, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + } + components := []component.Component{testComponent("test-component")} + + collectorConfig := confmap.NewFromStringMap(map[string]any{ + "processors": map[string]any{ + "batch": map[string]any{}, + }, + }) + + err := mgr.handleConfigUpdate(collectorConfig, components) + + assert.NoError(t, err) + assert.Equal(t, collectorConfig, mgr.collectorCfg) + // Verify that the configuration contains both collector and component sections + assert.True(t, mgr.mergedCollectorCfg.IsSet("receivers"), "Expected receivers section from components") + assert.True(t, mgr.mergedCollectorCfg.IsSet("exporters"), "Expected exporters section from components") + assert.True(t, mgr.mergedCollectorCfg.IsSet("service"), "Expected service section from components") + assert.True(t, mgr.mergedCollectorCfg.IsSet("processors"), "Expected processors section from collector config") + }) +} + +func TestOTelManager_handleOtelStatusUpdate(t *testing.T) { + // Common test component used across test cases + testComp := testComponent("test-component") + + tests := []struct { + name string + components []component.Component + inputStatus *status.AggregateStatus + expectedErrorString string + expectedCollectorStatus *status.AggregateStatus + expectedComponentStates []runtime.ComponentComponentState + }{ + { + name: "successful status update with component states", + components: []component.Component{testComp}, + inputStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a pipeline for our component (with OtelNamePrefix) + "pipeline:logs/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "receiver:filebeat/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "exporter:elasticsearch/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + // This represents a regular collector pipeline (should remain after cleaning) + "pipeline:logs": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + expectedCollectorStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a regular collector pipeline (should remain after cleaning) + "pipeline:logs": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + expectedComponentStates: []runtime.ComponentComponentState{ + { + Component: testComp, + State: runtime.ComponentState{ + State: client.UnitStateHealthy, + Message: "HEALTHY", + Units: map[runtime.ComponentUnitKey]runtime.ComponentUnitState{ + runtime.ComponentUnitKey{ + UnitID: "filestream-unit", + UnitType: client.UnitTypeInput, + }: { + State: client.UnitStateHealthy, + Message: "Healthy", + Payload: map[string]any{ + "streams": map[string]map[string]string{ + "test-1": { + "error": "", + "status": client.UnitStateHealthy.String(), + }, + "test-2": { + "error": "", + "status": client.UnitStateHealthy.String(), + }, + }, + }, + }, + runtime.ComponentUnitKey{ + UnitID: "filestream-default", + UnitType: client.UnitTypeOutput, + }: { + State: client.UnitStateHealthy, + Message: "Healthy", + }, + }, + VersionInfo: runtime.ComponentVersionInfo{ + Name: translate.OtelComponentName, + Meta: map[string]string{ + "build_time": version.BuildTime().String(), + "commit": version.Commit(), + }, + BuildHash: version.Commit(), + }, + }, + }, + }, + }, + { + name: "handles nil otel status", + components: []component.Component{}, + inputStatus: nil, + expectedCollectorStatus: nil, + expectedComponentStates: nil, + }, + { + name: "handles empty components list", + components: []component.Component{}, + inputStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + expectedErrorString: "", + expectedCollectorStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + expectedComponentStates: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + components: tt.components, + currentComponentStates: make(map[string]runtime.ComponentComponentState), + } + + componentStates, err := mgr.handleOtelStatusUpdate(tt.inputStatus) + + // Verify error expectation + if tt.expectedErrorString != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedErrorString) + return + } + + require.NoError(t, err) + + // Compare component states + assert.Equal(t, tt.expectedComponentStates, componentStates) + + // Compare collector status + assertOtelStatusesEqualIgnoringTimestamps(t, tt.expectedCollectorStatus, mgr.currentCollectorStatus) + }) + } +} + +func TestOTelManager_processComponentStates(t *testing.T) { + tests := []struct { + name string + currentComponentStates map[string]runtime.ComponentComponentState + inputComponentStates []runtime.ComponentComponentState + expectedOutputStates []runtime.ComponentComponentState + expectedCurrentStatesAfter map[string]runtime.ComponentComponentState + }{ + { + name: "empty input and current states", + currentComponentStates: map[string]runtime.ComponentComponentState{}, + inputComponentStates: []runtime.ComponentComponentState{}, + expectedOutputStates: []runtime.ComponentComponentState{}, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + { + name: "new component state added", + currentComponentStates: map[string]runtime.ComponentComponentState{}, + inputComponentStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + }, + { + name: "component removed from config generates STOPPED state", + currentComponentStates: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + inputComponentStates: []runtime.ComponentComponentState{}, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + { + name: "component stopped removes from current states", + currentComponentStates: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + inputComponentStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + currentComponentStates: tt.currentComponentStates, + } + + result := mgr.processComponentStates(tt.inputComponentStates) + + assert.ElementsMatch(t, tt.expectedOutputStates, result) + assert.Equal(t, tt.expectedCurrentStatesAfter, mgr.currentComponentStates) + }) + } +} + +// TestOTelManagerEndToEnd tests the full lifecycle of the OTelManager +// including configuration updates, status updates, and error handling. +func TestOTelManagerEndToEnd(t *testing.T) { + // Setup test logger and dependencies + testLogger, _ := loggertest.New("test") + agentInfo := &info.AgentInfo{} + beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + collectorStarted := make(chan struct{}) + + execution := &mockExecution{ + collectorStarted: collectorStarted, + } + + // Create manager with test dependencies + mgr := OTelManager{ + logger: testLogger, + baseLogger: testLogger, + errCh: make(chan error, 1), // holds at most one error + updateCh: make(chan configUpdate), + collectorStatusCh: make(chan *status.AggregateStatus, 1), + componentStateCh: make(chan []runtime.ComponentComponentState, 1), + doneChan: make(chan struct{}), + recoveryTimer: newRestarterNoop(), + execution: execution, + agentInfo: agentInfo, + beatMonitoringConfigGetter: beatMonitoringConfigGetter, + } + + // Start manager in a goroutine + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1) + defer cancel() + + go func() { + err := mgr.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) + }() + + collectorCfg := confmap.NewFromStringMap(map[string]interface{}{ + "receivers": map[string]interface{}{ + "nop": map[string]interface{}{}, + }, + "exporters": map[string]interface{}{"nop": map[string]interface{}{}}, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "metrics": map[string]interface{}{ + "receivers": []string{"nop"}, + "exporters": []string{"nop"}, + }, + }, + }, + }) + + testComp := testComponent("test") + components := []component.Component{testComp} + + t.Run("collector config is passed down to the collector execution", func(t *testing.T) { + mgr.Update(collectorCfg, nil) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + assert.Equal(t, collectorCfg, execution.cfg) + + }) + + t.Run("collector status is passed up to the component manager", func(t *testing.T) { + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + require.NoError(t, err) + assert.Equal(t, otelStatus, collectorStatus) + }) + + t.Run("component config is passed down to the otel manager", func(t *testing.T) { + mgr.Update(collectorCfg, components) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + cfg := execution.cfg + require.NotNil(t, cfg) + receivers, err := cfg.Sub("receivers") + require.NoError(t, err) + require.NotNil(t, receivers) + assert.True(t, receivers.IsSet("nop")) + assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + assert.Nil(t, err) + assert.Nil(t, collectorStatus) + }) + + t.Run("empty collector config leaves the component config running", func(t *testing.T) { + mgr.Update(nil, components) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + cfg := execution.cfg + require.NotNil(t, cfg) + receivers, err := cfg.Sub("receivers") + require.NoError(t, err) + require.NotNil(t, receivers) + assert.False(t, receivers.IsSet("nop")) + assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + assert.Nil(t, err) + assert.Nil(t, collectorStatus) + }) + + t.Run("collector status with components is passed up to the component manager", func(t *testing.T) { + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a pipeline for our component (with OtelNamePrefix) + "pipeline:logs/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "receiver:filebeatreceiver/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "exporter:elasticsearch/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + }, + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, collectorStatus) + assert.Len(t, collectorStatus.ComponentStatusMap, 0) + + componentState, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, componentState) + require.Len(t, componentState, 1) + assert.Equal(t, componentState[0].Component, testComp) + }) + + t.Run("collector error is passed up to the component manager", func(t *testing.T) { + collectorErr := errors.New("collector error") + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.errCh <- collectorErr: + } + + // we should get a nil status and an error + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case s := <-mgr.WatchCollector(): + assert.Nil(t, s) + } + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case err := <-mgr.Errors(): + assert.Equal(t, collectorErr, err) + } + }) +} + +func testComponent(componentId string) component.Component { + fileStreamConfig := map[string]any{ + "id": "test", + "use_output": "default", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + filepath.Join(paths.TempDir(), "nonexistent.log"), + }, + }, + map[string]any{ + "id": "test-2", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + filepath.Join(paths.TempDir(), "nonexistent.log"), + }, + }, + }, + } + + esOutputConfig := map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "username": "elastic", + "password": "password", + "preset": "balanced", + "queue.mem.events": 3200, + } + + return component.Component{ + ID: componentId, + RuntimeManager: component.OtelRuntimeManager, + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig), + }, + }, + } +} + +func getFromChannelOrErrorWithContext[T any](t *testing.T, ctx context.Context, ch <-chan T, errCh <-chan error) (T, error) { + t.Helper() + var result T + var err error + for err == nil { + select { + case result = <-ch: + return result, nil + case err = <-errCh: + case <-ctx.Done(): + err = ctx.Err() + } + } + return result, err +} + +func assertOtelStatusesEqualIgnoringTimestamps(t require.TestingT, a, b *status.AggregateStatus) bool { + if a == nil || b == nil { + return assert.Equal(t, a, b) + } + + if !assert.Equal(t, a.Status(), b.Status()) { + return false + } + + if !assert.Equal(t, len(a.ComponentStatusMap), len(b.ComponentStatusMap)) { + return false + } + + for k, v := range a.ComponentStatusMap { + if !assertOtelStatusesEqualIgnoringTimestamps(t, v, b.ComponentStatusMap[k]) { + return false + } + } + + return true +}