Skip to content

Move beat receiver component logic to the otel manager #8737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ fleet.yml.lock
fleet.yml.old
pkg/component/fake/component/component
internal/pkg/agent/install/testblocking/testblocking
internal/pkg/otel/manager/testing/testing
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func New(
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode)
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
}
Expand Down
158 changes: 54 additions & 104 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"

"go.opentelemetry.io/collector/component/componentstatus"

Expand Down Expand Up @@ -149,15 +148,22 @@ type RuntimeManager interface {
PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error)
}

// OTelManager provides an interface to run and update the runtime.
// OTelManager provides an interface to run components and plain otel configurations in an otel collector.
type OTelManager interface {
Runner

// Update updates the current configuration for OTel.
Update(cfg *confmap.Conf)
// Update updates the current plain configuration for the otel collector and components.
Update(*confmap.Conf, []component.Component)

// Watch returns the chanel to watch for configuration changes.
Watch() <-chan *status.AggregateStatus
// WatchCollector returns a channel to watch for collector status updates.
WatchCollector() <-chan *status.AggregateStatus

// WatchComponents returns a channel to watch for component state updates.
WatchComponents() <-chan []runtime.ComponentComponentState

// MergedOtelConfig returns the merged Otel collector configuration, containing both the plain config and the
// component config.
MergedOtelConfig() *confmap.Conf
}

// ConfigChange provides an interface for receiving a new configuration.
Expand Down Expand Up @@ -240,8 +246,6 @@ type Coordinator struct {

otelMgr OTelManager
otelCfg *confmap.Conf
// the final config sent to the manager, contains both config from hybrid mode and from components
finalOtelCfg *confmap.Conf

caps capabilities.Capabilities
modifiers []ComponentsModifier
Expand Down Expand Up @@ -364,8 +368,9 @@ type managerChans struct {
varsManagerUpdate <-chan []*transpiler.Vars
varsManagerError <-chan error

otelManagerUpdate chan *status.AggregateStatus
otelManagerError <-chan error
otelManagerCollectorUpdate <-chan *status.AggregateStatus
otelManagerComponentUpdate <-chan []runtime.ComponentComponentState
otelManagerError <-chan error

upgradeMarkerUpdate <-chan upgrade.UpdateMarker
}
Expand Down Expand Up @@ -393,7 +398,24 @@ type UpdateComponentChange struct {
}

// New creates a new coordinator.
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, otelMgr OTelManager, fleetAcker acker.Acker, modifiers ...ComponentsModifier) *Coordinator {
func New(
logger *logger.Logger,
cfg *configuration.Configuration,
logLevel logp.Level,
agentInfo info.Agent,
specs component.RuntimeSpecs,
reexecMgr ReExecManager,
upgradeMgr UpgradeManager,
runtimeMgr RuntimeManager,
configMgr ConfigManager,
varsMgr VarsManager,
caps capabilities.Capabilities,
monitorMgr MonitorManager,
isManaged bool,
otelMgr OTelManager,
fleetAcker acker.Acker,
modifiers ...ComponentsModifier,
) *Coordinator {
var fleetState cproto.State
var fleetMessage string
if !isManaged {
Expand Down Expand Up @@ -480,7 +502,8 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
if otelMgr != nil {
// The otel manager sends updates to the watchRuntimeComponents function, which extracts component status
// and forwards the rest to this channel.
c.managerChans.otelManagerUpdate = make(chan *status.AggregateStatus)
c.managerChans.otelManagerCollectorUpdate = otelMgr.WatchCollector()
c.managerChans.otelManagerComponentUpdate = otelMgr.WatchComponents()
c.managerChans.otelManagerError = otelMgr.Errors()
}
if upgradeMgr != nil && upgradeMgr.MarkerWatcher() != nil {
Expand Down Expand Up @@ -774,70 +797,29 @@ func (c *Coordinator) SetLogLevel(ctx context.Context, lvl *logp.Level) error {
func (c *Coordinator) watchRuntimeComponents(
ctx context.Context,
runtimeComponentStates <-chan runtime.ComponentComponentState,
otelStatuses <-chan *status.AggregateStatus,
otelComponentStates <-chan []runtime.ComponentComponentState,
) {
// We need to track otel component state separately because otel components may not always get a STOPPED status
// If we receive an otel status without the state of a component we're tracking, we need to emit a fake STOPPED
// status for it. Process component states should not be affected by this logic.
processState := make(map[string]runtime.ComponentState)
otelState := make(map[string]runtime.ComponentState)
state := make(map[string]runtime.ComponentState)

for {
select {
case <-ctx.Done():
return
case componentState := <-runtimeComponentStates:
logComponentStateChange(c.logger, processState, &componentState)
logComponentStateChange(c.logger, state, &componentState)
// Forward the final changes back to Coordinator, unless our context
// has ended.
select {
case c.managerChans.runtimeManagerUpdate <- componentState:
case <-ctx.Done():
return
}
case otelStatus := <-otelStatuses:
// We don't break on errors here, because we want to forward the status
// even if there was an error, and the rest of the code gracefully handles componentStates being nil
componentStates, err := translate.GetAllComponentStates(otelStatus, c.componentModel)
if err != nil {
c.setOTelError(err)
}
finalOtelStatus, err := translate.DropComponentStateFromOtelStatus(otelStatus)
if err != nil {
c.setOTelError(err)
finalOtelStatus = otelStatus
}

// forward the remaining otel status
// TODO: Implement subscriptions for otel manager status to avoid the need for this
select {
case c.managerChans.otelManagerUpdate <- finalOtelStatus:
case <-ctx.Done():
return
}

// drop component states which don't exist in the configuration anymore
// we need to do this because we aren't guaranteed to receive a STOPPED state when the component is removed
componentIds := make(map[string]bool)
for _, componentState := range componentStates {
componentIds[componentState.Component.ID] = true
}
for id := range otelState {
if _, ok := componentIds[id]; !ok {
// this component is not in the configuration anymore, emit a fake STOPPED state
componentStates = append(componentStates, runtime.ComponentComponentState{
Component: component.Component{
ID: id,
},
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
})
}
}
// now handle the component states
case componentStates := <-otelComponentStates:
for _, componentState := range componentStates {
logComponentStateChange(c.logger, otelState, &componentState)
logComponentStateChange(c.logger, state, &componentState)
// Forward the final changes back to Coordinator, unless our context
// has ended.
select {
Expand Down Expand Up @@ -930,15 +912,15 @@ func (c *Coordinator) Run(ctx context.Context) error {
defer watchCanceller()

var subChan <-chan runtime.ComponentComponentState
var otelChan <-chan *status.AggregateStatus
var otelChan <-chan []runtime.ComponentComponentState
// A real Coordinator will always have a runtime manager, but unit tests
// may not initialize all managers -- in that case we leave subChan nil,
// and just idle until Coordinator shuts down.
if c.runtimeMgr != nil {
subChan = c.runtimeMgr.SubscribeAll(ctx).Ch()
}
if c.otelMgr != nil {
otelChan = c.otelMgr.Watch()
otelChan = c.otelMgr.WatchComponents()
}
go c.watchRuntimeComponents(watchCtx, subChan, otelChan)

Expand Down Expand Up @@ -1234,10 +1216,11 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.",
ContentType: "application/yaml",
Hook: func(_ context.Context) []byte {
if c.finalOtelCfg == nil {
mergedCfg := c.otelMgr.MergedOtelConfig()
if mergedCfg == nil {
return []byte("no active OTel configuration")
}
o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap())
o, err := yaml.Marshal(mergedCfg.ToStringMap())
if err != nil {
return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err))
}
Expand Down Expand Up @@ -1431,7 +1414,7 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
c.processVars(ctx, vars)
}

case collectorStatus := <-c.managerChans.otelManagerUpdate:
case collectorStatus := <-c.managerChans.otelManagerCollectorUpdate:
c.state.Collector = collectorStatus
c.stateNeedsRefresh = true

Expand Down Expand Up @@ -1649,58 +1632,25 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {

c.logger.Info("Updating running component model")
c.logger.With("components", model.Components).Debug("Updating running component model")
return c.updateManagersWithConfig(model)
c.updateManagersWithConfig(model)
return nil
}

// updateManagersWithConfig updates runtime managers with the component model and config.
// Components may be sent to different runtimes depending on various criteria.
func (c *Coordinator) updateManagersWithConfig(model *component.Model) error {
func (c *Coordinator) updateManagersWithConfig(model *component.Model) {
runtimeModel, otelModel := c.splitModelBetweenManagers(model)
c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model")
c.runtimeMgr.Update(*runtimeModel)
return c.updateOtelManagerConfig(otelModel)
}

// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration
// from the component model passed in and from the hybrid-mode otel config set on the Coordinator.
func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
finalOtelCfg := confmap.New()
var componentOtelCfg *confmap.Conf
if len(model.Components) > 0 {
var err error
c.logger.With("components", model.Components).Debug("Updating otel manager model")
componentOtelCfg, err = translate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig)
if err != nil {
c.logger.Errorf("failed to generate otel config: %v", err)
}
componentIDs := make([]string, 0, len(model.Components))
for _, comp := range model.Components {
c.logger.With("components", otelModel.Components).Debug("Updating otel manager model")
if len(otelModel.Components) > 0 {
componentIDs := make([]string, 0, len(otelModel.Components))
for _, comp := range otelModel.Components {
componentIDs = append(componentIDs, comp.ID)
}
c.logger.With("component_ids", componentIDs).Warn("The Otel runtime manager is HIGHLY EXPERIMENTAL and only intended for testing. Use at your own risk.")
}
if componentOtelCfg != nil {
err := finalOtelCfg.Merge(componentOtelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if c.otelCfg != nil {
err := finalOtelCfg.Merge(c.otelCfg)
if err != nil {
c.logger.Error("failed to merge otel config: %v", err)
}
}

if len(finalOtelCfg.AllKeys()) == 0 {
// if the config is empty, we want to send nil to the manager, so it knows to stop the collector
finalOtelCfg = nil
}

c.otelMgr.Update(finalOtelCfg)
c.finalOtelCfg = finalOtelCfg
return nil
c.otelMgr.Update(c.otelCfg, otelModel.Components)
}

// splitModelBetweenManager splits the model components between the runtime manager and the otel manager.
Expand Down
47 changes: 31 additions & 16 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,9 +1078,8 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt
cfg.Port = 0
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg)
require.NoError(t, err)
otelMgr, err := otelmanager.NewOTelManager(l, logp.InfoLevel, l, otelmanager.EmbeddedExecutionMode)
otelMgr, err := otelmanager.NewOTelManager(l, logp.InfoLevel, l, otelmanager.EmbeddedExecutionMode, ai, monitoringMgr.ComponentMonitoringConfig)
require.NoError(t, err)

caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
require.NoError(t, err)

Expand Down Expand Up @@ -1339,11 +1338,14 @@ func (f *fakeVarsManager) DefaultProvider() string {
return ""
}

var _ OTelManager = (*fakeOTelManager)(nil)

type fakeOTelManager struct {
updateCallback func(*confmap.Conf) error
result error
errChan chan error
statusChan chan *status.AggregateStatus
updateCollectorCallback func(*confmap.Conf) error
updateComponentCallback func([]component.Component) error
errChan chan error
collectorStatusChan chan *status.AggregateStatus
componentStateChan chan []runtime.ComponentComponentState
}

func (f *fakeOTelManager) Run(ctx context.Context) error {
Expand All @@ -1352,24 +1354,37 @@ func (f *fakeOTelManager) Run(ctx context.Context) error {
}

func (f *fakeOTelManager) Errors() <-chan error {
return nil
return f.errChan
}

func (f *fakeOTelManager) Update(cfg *confmap.Conf) {
f.result = nil
if f.updateCallback != nil {
f.result = f.updateCallback(cfg)
func (f *fakeOTelManager) Update(cfg *confmap.Conf, components []component.Component) {
var collectorResult, componentResult error
if f.updateCollectorCallback != nil {
collectorResult = f.updateCollectorCallback(cfg)
}
if f.errChan != nil {
// If a reporting channel is set, send the result to it
f.errChan <- f.result
if f.errChan != nil && collectorResult != nil {
// If a reporting channel is set, send the collectorResult to it
f.errChan <- collectorResult
}
if f.updateComponentCallback != nil {
componentResult = f.updateComponentCallback(components)
}
if f.errChan != nil && componentResult != nil {
// If a reporting channel is set, send the componentResult to it
f.errChan <- componentResult
}
}

func (f *fakeOTelManager) Watch() <-chan *status.AggregateStatus {
return f.statusChan
func (f *fakeOTelManager) WatchCollector() <-chan *status.AggregateStatus {
return f.collectorStatusChan
}

func (f *fakeOTelManager) WatchComponents() <-chan []runtime.ComponentComponentState {
return f.componentStateChan
}

func (f *fakeOTelManager) MergedOtelConfig() *confmap.Conf { return nil }

// An implementation of the RuntimeManager interface for use in testing.
type fakeRuntimeManager struct {
state []runtime.ComponentComponentState
Expand Down
Loading