Skip to content

Commit 86c9bb8

Browse files
swiatekmmergify[bot]
authored andcommitted
Move beat receiver component logic to the otel manager (#8737)
* Add initial otel component manager implementation * Update coordinator to use the new manager * Move logging to the coordinator * Add more tests * Don't use a real otel manager in tests * Move the logic to the otel manager * Ignore the test collector binary * Rename some dangling attributes back * Comment out temporarily unused code * Restore manager e2e test * Fix import order * Write synthetic status updates directly into the external channel * Update collector config and components in one call * Rename the mutex in the otel manager * Discard intermediate statuses * Emit component updates in a single batch * Undo timeout increase in test (cherry picked from commit 503421f) # Conflicts: # internal/pkg/agent/application/coordinator/coordinator.go # internal/pkg/agent/application/coordinator/coordinator_unit_test.go
1 parent 4a0b3fb commit 86c9bb8

File tree

9 files changed

+1308
-376
lines changed

9 files changed

+1308
-376
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,4 @@ fleet.yml.lock
6060
fleet.yml.old
6161
pkg/component/fake/component/component
6262
internal/pkg/agent/install/testblocking/testblocking
63+
internal/pkg/otel/manager/testing/testing

internal/pkg/agent/application/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func New(
224224
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
225225
}
226226

227-
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode)
227+
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig)
228228
if err != nil {
229229
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
230230
}

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 58 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
<<<<<<< HEAD
1617
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
18+
=======
19+
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
20+
>>>>>>> 503421fc8 (Move beat receiver component logic to the otel manager (#8737))
1721

1822
"go.opentelemetry.io/collector/component/componentstatus"
1923

@@ -134,15 +138,22 @@ type RuntimeManager interface {
134138
PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error)
135139
}
136140

137-
// OTelManager provides an interface to run and update the runtime.
141+
// OTelManager provides an interface to run components and plain otel configurations in an otel collector.
138142
type OTelManager interface {
139143
Runner
140144

141-
// Update updates the current configuration for OTel.
142-
Update(cfg *confmap.Conf)
145+
// Update updates the current plain configuration for the otel collector and components.
146+
Update(*confmap.Conf, []component.Component)
143147

144-
// Watch returns the chanel to watch for configuration changes.
145-
Watch() <-chan *status.AggregateStatus
148+
// WatchCollector returns a channel to watch for collector status updates.
149+
WatchCollector() <-chan *status.AggregateStatus
150+
151+
// WatchComponents returns a channel to watch for component state updates.
152+
WatchComponents() <-chan []runtime.ComponentComponentState
153+
154+
// MergedOtelConfig returns the merged Otel collector configuration, containing both the plain config and the
155+
// component config.
156+
MergedOtelConfig() *confmap.Conf
146157
}
147158

148159
// ConfigChange provides an interface for receiving a new configuration.
@@ -225,8 +236,6 @@ type Coordinator struct {
225236

226237
otelMgr OTelManager
227238
otelCfg *confmap.Conf
228-
// the final config sent to the manager, contains both config from hybrid mode and from components
229-
finalOtelCfg *confmap.Conf
230239

231240
caps capabilities.Capabilities
232241
modifiers []ComponentsModifier
@@ -349,8 +358,9 @@ type managerChans struct {
349358
varsManagerUpdate <-chan []*transpiler.Vars
350359
varsManagerError <-chan error
351360

352-
otelManagerUpdate chan *status.AggregateStatus
353-
otelManagerError <-chan error
361+
otelManagerCollectorUpdate <-chan *status.AggregateStatus
362+
otelManagerComponentUpdate <-chan []runtime.ComponentComponentState
363+
otelManagerError <-chan error
354364

355365
upgradeMarkerUpdate <-chan upgrade.UpdateMarker
356366
}
@@ -378,7 +388,24 @@ type UpdateComponentChange struct {
378388
}
379389

380390
// New creates a new coordinator.
381-
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, otelMgr OTelManager, fleetAcker acker.Acker, modifiers ...ComponentsModifier) *Coordinator {
391+
func New(
392+
logger *logger.Logger,
393+
cfg *configuration.Configuration,
394+
logLevel logp.Level,
395+
agentInfo info.Agent,
396+
specs component.RuntimeSpecs,
397+
reexecMgr ReExecManager,
398+
upgradeMgr UpgradeManager,
399+
runtimeMgr RuntimeManager,
400+
configMgr ConfigManager,
401+
varsMgr VarsManager,
402+
caps capabilities.Capabilities,
403+
monitorMgr MonitorManager,
404+
isManaged bool,
405+
otelMgr OTelManager,
406+
fleetAcker acker.Acker,
407+
modifiers ...ComponentsModifier,
408+
) *Coordinator {
382409
var fleetState cproto.State
383410
var fleetMessage string
384411
if !isManaged {
@@ -465,7 +492,8 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
465492
if otelMgr != nil {
466493
// The otel manager sends updates to the watchRuntimeComponents function, which extracts component status
467494
// and forwards the rest to this channel.
468-
c.managerChans.otelManagerUpdate = make(chan *status.AggregateStatus)
495+
c.managerChans.otelManagerCollectorUpdate = otelMgr.WatchCollector()
496+
c.managerChans.otelManagerComponentUpdate = otelMgr.WatchComponents()
469497
c.managerChans.otelManagerError = otelMgr.Errors()
470498
}
471499
if upgradeMgr != nil && upgradeMgr.MarkerWatcher() != nil {
@@ -657,70 +685,29 @@ func (c *Coordinator) SetLogLevel(ctx context.Context, lvl *logp.Level) error {
657685
func (c *Coordinator) watchRuntimeComponents(
658686
ctx context.Context,
659687
runtimeComponentStates <-chan runtime.ComponentComponentState,
660-
otelStatuses <-chan *status.AggregateStatus,
688+
otelComponentStates <-chan []runtime.ComponentComponentState,
661689
) {
662690
// We need to track otel component state separately because otel components may not always get a STOPPED status
663691
// If we receive an otel status without the state of a component we're tracking, we need to emit a fake STOPPED
664692
// status for it. Process component states should not be affected by this logic.
665-
processState := make(map[string]runtime.ComponentState)
666-
otelState := make(map[string]runtime.ComponentState)
693+
state := make(map[string]runtime.ComponentState)
667694

668695
for {
669696
select {
670697
case <-ctx.Done():
671698
return
672699
case componentState := <-runtimeComponentStates:
673-
logComponentStateChange(c.logger, processState, &componentState)
700+
logComponentStateChange(c.logger, state, &componentState)
674701
// Forward the final changes back to Coordinator, unless our context
675702
// has ended.
676703
select {
677704
case c.managerChans.runtimeManagerUpdate <- componentState:
678705
case <-ctx.Done():
679706
return
680707
}
681-
case otelStatus := <-otelStatuses:
682-
// We don't break on errors here, because we want to forward the status
683-
// even if there was an error, and the rest of the code gracefully handles componentStates being nil
684-
componentStates, err := translate.GetAllComponentStates(otelStatus, c.componentModel)
685-
if err != nil {
686-
c.setOTelError(err)
687-
}
688-
finalOtelStatus, err := translate.DropComponentStateFromOtelStatus(otelStatus)
689-
if err != nil {
690-
c.setOTelError(err)
691-
finalOtelStatus = otelStatus
692-
}
693-
694-
// forward the remaining otel status
695-
// TODO: Implement subscriptions for otel manager status to avoid the need for this
696-
select {
697-
case c.managerChans.otelManagerUpdate <- finalOtelStatus:
698-
case <-ctx.Done():
699-
return
700-
}
701-
702-
// drop component states which don't exist in the configuration anymore
703-
// we need to do this because we aren't guaranteed to receive a STOPPED state when the component is removed
704-
componentIds := make(map[string]bool)
705-
for _, componentState := range componentStates {
706-
componentIds[componentState.Component.ID] = true
707-
}
708-
for id := range otelState {
709-
if _, ok := componentIds[id]; !ok {
710-
// this component is not in the configuration anymore, emit a fake STOPPED state
711-
componentStates = append(componentStates, runtime.ComponentComponentState{
712-
Component: component.Component{
713-
ID: id,
714-
},
715-
State: runtime.ComponentState{
716-
State: client.UnitStateStopped,
717-
},
718-
})
719-
}
720-
}
721-
// now handle the component states
708+
case componentStates := <-otelComponentStates:
722709
for _, componentState := range componentStates {
723-
logComponentStateChange(c.logger, otelState, &componentState)
710+
logComponentStateChange(c.logger, state, &componentState)
724711
// Forward the final changes back to Coordinator, unless our context
725712
// has ended.
726713
select {
@@ -813,15 +800,15 @@ func (c *Coordinator) Run(ctx context.Context) error {
813800
defer watchCanceller()
814801

815802
var subChan <-chan runtime.ComponentComponentState
816-
var otelChan <-chan *status.AggregateStatus
803+
var otelChan <-chan []runtime.ComponentComponentState
817804
// A real Coordinator will always have a runtime manager, but unit tests
818805
// may not initialize all managers -- in that case we leave subChan nil,
819806
// and just idle until Coordinator shuts down.
820807
if c.runtimeMgr != nil {
821808
subChan = c.runtimeMgr.SubscribeAll(ctx).Ch()
822809
}
823810
if c.otelMgr != nil {
824-
otelChan = c.otelMgr.Watch()
811+
otelChan = c.otelMgr.WatchComponents()
825812
}
826813
go c.watchRuntimeComponents(watchCtx, subChan, otelChan)
827814

@@ -1117,10 +1104,11 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
11171104
Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.",
11181105
ContentType: "application/yaml",
11191106
Hook: func(_ context.Context) []byte {
1120-
if c.finalOtelCfg == nil {
1107+
mergedCfg := c.otelMgr.MergedOtelConfig()
1108+
if mergedCfg == nil {
11211109
return []byte("no active OTel configuration")
11221110
}
1123-
o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap())
1111+
o, err := yaml.Marshal(mergedCfg.ToStringMap())
11241112
if err != nil {
11251113
return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err))
11261114
}
@@ -1314,7 +1302,7 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
13141302
c.processVars(ctx, vars)
13151303
}
13161304

1317-
case collectorStatus := <-c.managerChans.otelManagerUpdate:
1305+
case collectorStatus := <-c.managerChans.otelManagerCollectorUpdate:
13181306
c.state.Collector = collectorStatus
13191307
c.stateNeedsRefresh = true
13201308

@@ -1532,58 +1520,25 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) {
15321520

15331521
c.logger.Info("Updating running component model")
15341522
c.logger.With("components", model.Components).Debug("Updating running component model")
1535-
return c.updateManagersWithConfig(model)
1523+
c.updateManagersWithConfig(model)
1524+
return nil
15361525
}
15371526

15381527
// updateManagersWithConfig updates runtime managers with the component model and config.
15391528
// Components may be sent to different runtimes depending on various criteria.
1540-
func (c *Coordinator) updateManagersWithConfig(model *component.Model) error {
1529+
func (c *Coordinator) updateManagersWithConfig(model *component.Model) {
15411530
runtimeModel, otelModel := c.splitModelBetweenManagers(model)
15421531
c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model")
15431532
c.runtimeMgr.Update(*runtimeModel)
1544-
return c.updateOtelManagerConfig(otelModel)
1545-
}
1546-
1547-
// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration
1548-
// from the component model passed in and from the hybrid-mode otel config set on the Coordinator.
1549-
func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error {
1550-
finalOtelCfg := confmap.New()
1551-
var componentOtelCfg *confmap.Conf
1552-
if len(model.Components) > 0 {
1553-
var err error
1554-
c.logger.With("components", model.Components).Debug("Updating otel manager model")
1555-
componentOtelCfg, err = translate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig)
1556-
if err != nil {
1557-
c.logger.Errorf("failed to generate otel config: %v", err)
1558-
}
1559-
componentIDs := make([]string, 0, len(model.Components))
1560-
for _, comp := range model.Components {
1533+
c.logger.With("components", otelModel.Components).Debug("Updating otel manager model")
1534+
if len(otelModel.Components) > 0 {
1535+
componentIDs := make([]string, 0, len(otelModel.Components))
1536+
for _, comp := range otelModel.Components {
15611537
componentIDs = append(componentIDs, comp.ID)
15621538
}
15631539
c.logger.With("component_ids", componentIDs).Warn("The Otel runtime manager is HIGHLY EXPERIMENTAL and only intended for testing. Use at your own risk.")
15641540
}
1565-
if componentOtelCfg != nil {
1566-
err := finalOtelCfg.Merge(componentOtelCfg)
1567-
if err != nil {
1568-
c.logger.Error("failed to merge otel config: %v", err)
1569-
}
1570-
}
1571-
1572-
if c.otelCfg != nil {
1573-
err := finalOtelCfg.Merge(c.otelCfg)
1574-
if err != nil {
1575-
c.logger.Error("failed to merge otel config: %v", err)
1576-
}
1577-
}
1578-
1579-
if len(finalOtelCfg.AllKeys()) == 0 {
1580-
// if the config is empty, we want to send nil to the manager, so it knows to stop the collector
1581-
finalOtelCfg = nil
1582-
}
1583-
1584-
c.otelMgr.Update(finalOtelCfg)
1585-
c.finalOtelCfg = finalOtelCfg
1586-
return nil
1541+
c.otelMgr.Update(c.otelCfg, otelModel.Components)
15871542
}
15881543

15891544
// splitModelBetweenManager splits the model components between the runtime manager and the otel manager.

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,9 +1078,8 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt
10781078
cfg.Port = 0
10791079
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg)
10801080
require.NoError(t, err)
1081-
otelMgr, err := otelmanager.NewOTelManager(l, logp.InfoLevel, l, otelmanager.EmbeddedExecutionMode)
1081+
otelMgr, err := otelmanager.NewOTelManager(l, logp.InfoLevel, l, otelmanager.EmbeddedExecutionMode, ai, monitoringMgr.ComponentMonitoringConfig)
10821082
require.NoError(t, err)
1083-
10841083
caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
10851084
require.NoError(t, err)
10861085

@@ -1339,11 +1338,14 @@ func (f *fakeVarsManager) DefaultProvider() string {
13391338
return ""
13401339
}
13411340

1341+
var _ OTelManager = (*fakeOTelManager)(nil)
1342+
13421343
type fakeOTelManager struct {
1343-
updateCallback func(*confmap.Conf) error
1344-
result error
1345-
errChan chan error
1346-
statusChan chan *status.AggregateStatus
1344+
updateCollectorCallback func(*confmap.Conf) error
1345+
updateComponentCallback func([]component.Component) error
1346+
errChan chan error
1347+
collectorStatusChan chan *status.AggregateStatus
1348+
componentStateChan chan []runtime.ComponentComponentState
13471349
}
13481350

13491351
func (f *fakeOTelManager) Run(ctx context.Context) error {
@@ -1352,24 +1354,37 @@ func (f *fakeOTelManager) Run(ctx context.Context) error {
13521354
}
13531355

13541356
func (f *fakeOTelManager) Errors() <-chan error {
1355-
return nil
1357+
return f.errChan
13561358
}
13571359

1358-
func (f *fakeOTelManager) Update(cfg *confmap.Conf) {
1359-
f.result = nil
1360-
if f.updateCallback != nil {
1361-
f.result = f.updateCallback(cfg)
1360+
func (f *fakeOTelManager) Update(cfg *confmap.Conf, components []component.Component) {
1361+
var collectorResult, componentResult error
1362+
if f.updateCollectorCallback != nil {
1363+
collectorResult = f.updateCollectorCallback(cfg)
13621364
}
1363-
if f.errChan != nil {
1364-
// If a reporting channel is set, send the result to it
1365-
f.errChan <- f.result
1365+
if f.errChan != nil && collectorResult != nil {
1366+
// If a reporting channel is set, send the collectorResult to it
1367+
f.errChan <- collectorResult
1368+
}
1369+
if f.updateComponentCallback != nil {
1370+
componentResult = f.updateComponentCallback(components)
1371+
}
1372+
if f.errChan != nil && componentResult != nil {
1373+
// If a reporting channel is set, send the componentResult to it
1374+
f.errChan <- componentResult
13661375
}
13671376
}
13681377

1369-
func (f *fakeOTelManager) Watch() <-chan *status.AggregateStatus {
1370-
return f.statusChan
1378+
func (f *fakeOTelManager) WatchCollector() <-chan *status.AggregateStatus {
1379+
return f.collectorStatusChan
13711380
}
13721381

1382+
func (f *fakeOTelManager) WatchComponents() <-chan []runtime.ComponentComponentState {
1383+
return f.componentStateChan
1384+
}
1385+
1386+
func (f *fakeOTelManager) MergedOtelConfig() *confmap.Conf { return nil }
1387+
13731388
// An implementation of the RuntimeManager interface for use in testing.
13741389
type fakeRuntimeManager struct {
13751390
state []runtime.ComponentComponentState

0 commit comments

Comments
 (0)