diff --git a/analyzer/analyzer.go b/analyzer/analyzer.go new file mode 100644 index 0000000..f4b0fe5 --- /dev/null +++ b/analyzer/analyzer.go @@ -0,0 +1,78 @@ +package analyzer + +import ( + "time" + + "github.com/relex/slog-agent/base" +) + +type simpleAnalyzer struct { + tickTraffic tickTrafficInfo + highVolumeStart time.Time + analysisScheduled bool +} + +type tickTrafficInfo struct { + startTime time.Time + numRecords int + numBytes int64 +} + +func NewAnalyzer() base.LogAnalyzer { + return &simpleAnalyzer{ + tickTraffic: tickTrafficInfo{ + startTime: time.Now(), + numRecords: 0, + numBytes: 0, + }, + highVolumeStart: time.Time{}, + analysisScheduled: false, + } +} + +func (a *simpleAnalyzer) ShouldAnalyze(batch base.LogRecordBatch) bool { + // we cannot analyze any batch that is not full, as there would be too few samples. + return a.analysisScheduled && batch.Full +} + +func (a *simpleAnalyzer) TrackTraffic(numCleanRecords int, numCleanBytes int64) { + a.tickTraffic.numRecords += numCleanRecords + a.tickTraffic.numBytes += numCleanBytes +} + +func (a *simpleAnalyzer) Analyze(batch base.LogRecordBatch, numCleanRecords int, numCleanBytes int64) { + if float64(numCleanRecords)/float64(len(batch.Records)) < minimalSampleRatio && + float64(numCleanBytes)/float64(batch.NumBytes) < minimalSampleRatio { + return + } + + a.highVolumeStart = time.Time{} + a.analysisScheduled = false + + // TODO: analysis +} + +func (a *simpleAnalyzer) Tick() { + now := time.Now() + durationSec := float64(now.Sub(a.tickTraffic.startTime)+1) / float64(time.Second) + highVolume := float64(a.tickTraffic.numRecords)/durationSec >= highVolumeRecordsPerSec || + float64(a.tickTraffic.numBytes)/durationSec >= highVolumeBytesPerSec + + if highVolume { + if a.highVolumeStart.IsZero() { + a.highVolumeStart = now + a.analysisScheduled = false + } else if float64(now.Sub(a.highVolumeStart))/float64(time.Second) >= highVolumeDuration { + a.analysisScheduled = true + } + } else { + a.highVolumeStart = time.Time{} + a.analysisScheduled = false // cancel any if scheduled but not done + } + + a.tickTraffic = tickTrafficInfo{ + startTime: now, + numRecords: 0, + numBytes: 0, + } +} diff --git a/analyzer/params.go b/analyzer/params.go new file mode 100644 index 0000000..5063bf2 --- /dev/null +++ b/analyzer/params.go @@ -0,0 +1,13 @@ +package analyzer + +import ( + "time" +) + +const ( + highVolumeDuration float64 = float64(60 * time.Second) + highVolumeRecordsPerSec float64 = 10.0 * 1000 + highVolumeBytesPerSec float64 = 100.0 * 1024 * 1024 + + minimalSampleRatio = 0.4 +) diff --git a/base/bsupport/logprocessingworker.go b/base/bsupport/logprocessingworker.go index 2e850f8..f02f566 100644 --- a/base/bsupport/logprocessingworker.go +++ b/base/bsupport/logprocessingworker.go @@ -10,11 +10,13 @@ import ( // LogProcessingWorker is a worker for transformation, serialization and chunk making type LogProcessingWorker struct { - PipelineWorkerBase[[]*base.LogRecord] + PipelineWorkerBase[base.LogRecordBatch] deallocator *base.LogAllocator procCounter *base.LogProcessCounterSet + analyzer base.LogAnalyzer transformList []base.LogTransformFunc outputList []OutputInterface + lastChunkTime time.Time } @@ -28,8 +30,8 @@ type OutputInterface struct { // NewLogProcessingWorker creates LogProcessingWorker func NewLogProcessingWorker(parentLogger logger.Logger, - input <-chan []*base.LogRecord, deallocator *base.LogAllocator, procCounter *base.LogProcessCounterSet, - transforms []base.LogTransformFunc, outputInterfaces []OutputInterface, + input <-chan base.LogRecordBatch, deallocator *base.LogAllocator, procCounter *base.LogProcessCounterSet, + analyzer base.LogAnalyzer, transforms []base.LogTransformFunc, outputInterfaces []OutputInterface, ) *LogProcessingWorker { worker := &LogProcessingWorker{ PipelineWorkerBase: NewPipelineWorkerBase( @@ -38,23 +40,41 @@ func NewLogProcessingWorker(parentLogger logger.Logger, ), deallocator: deallocator, procCounter: procCounter, + analyzer: analyzer, transformList: transforms, outputList: outputInterfaces, + lastChunkTime: time.Now(), } worker.InitInternal(worker.onInput, worker.onTick, worker.onStop) return worker } -func (worker *LogProcessingWorker) onInput(buffer []*base.LogRecord) { - if len(buffer) == 0 { +func (worker *LogProcessingWorker) onInput(batch base.LogRecordBatch) { + if len(batch.Records) == 0 { return } - for _, record := range buffer { + worker.lastChunkTime = batch.Records[0].Timestamp + + releaseRecord := worker.deallocator.Release + analyzingBatch := worker.analyzer.ShouldAnalyze(batch) + if analyzingBatch { + releaseRecord = func(record *base.LogRecord) {} + } + + var numCleanRecords int + var numCleanBytes int64 + + for _, record := range batch.Records { icounter := worker.procCounter.SelectMetricKeySet(record) - if RunTransforms(record, worker.transformList) == base.DROP { + result := RunTransforms(record, worker.transformList) + if !record.Spam { + numCleanRecords++ + numCleanBytes += int64(record.RawLength) + } + if result == base.DROP { icounter.CountRecordDrop(record) - worker.deallocator.Release(record) + releaseRecord(record) continue } icounter.CountRecordPass(record) @@ -63,13 +83,10 @@ func (worker *LogProcessingWorker) onInput(buffer []*base.LogRecord) { // TODO: //if RunTransforms(record, output.transformList) == base.DROP { // icounter.CountOutputFilter(i, record) - // worker.deallocator.Release(record) + // releaseRecord(record) // continue //} stream := output.SerializeRecord(record) - // TODO: decide whether to release once at the end or release here after per-output transform is implemented - // It will depend on whether records are duplicated for additional outputs, or the same record with all transforms run in place. - worker.deallocator.Release(record) worker.procCounter.CountStream(i, stream) maybeChunk := output.WriteStream(stream) if maybeChunk != nil { @@ -77,15 +94,26 @@ func (worker *LogProcessingWorker) onInput(buffer []*base.LogRecord) { output.AcceptChunk(*maybeChunk) } } + releaseRecord(record) + } + + if analyzingBatch { + worker.analyzer.Analyze(batch, numCleanRecords, numCleanBytes) + for _, record := range batch.Records { + releaseRecord(record) + } + } else { + worker.analyzer.TrackTraffic(numCleanRecords, numCleanBytes) } } func (worker *LogProcessingWorker) onTick() { - // send buffered streams as a chunk if X seconds have passed - if time.Since(worker.lastChunkTime) < defs.IntermediateFlushInterval { - return + worker.analyzer.Tick() + + // flush buffered streams only if one full second has passed + if time.Since(worker.lastChunkTime) >= defs.IntermediateFlushInterval { + worker.flushChunk() } - worker.flushChunk() worker.procCounter.UpdateMetrics() } diff --git a/base/logallocator.go b/base/logallocator.go index ccab861..316935c 100644 --- a/base/logallocator.go +++ b/base/logallocator.go @@ -14,11 +14,10 @@ import ( type LogAllocator struct { recordPool *sync.Pool // pool of *LogRecord backbufPools util.BytesPoolBy2n // pools of the backing buffers of LogRecord(s), i.e. pools of raw input copies - initialRefCount int // equal to the total amount of outputs } // NewLogAllocator creates LogAllocator linked to the given schema -func NewLogAllocator(schema LogSchema, outputCount int) *LogAllocator { +func NewLogAllocator(schema LogSchema) *LogAllocator { maxFields := schema.GetMaxFields() recordPool := &sync.Pool{} recordPool.New = func() interface{} { @@ -27,7 +26,6 @@ func NewLogAllocator(schema LogSchema, outputCount int) *LogAllocator { return &LogAllocator{ recordPool: recordPool, backbufPools: util.NewBytesPoolBy2n(), - initialRefCount: outputCount, } } @@ -46,7 +44,7 @@ func newLogRecord(maxFields int) *LogRecord { func (alloc *LogAllocator) NewRecord(input []byte) (*LogRecord, string) { // pooling speeds up 10% in agent benchmarks but minus 20% in pipeline benchmarks record := alloc.recordPool.Get().(*LogRecord) - record._refCount += alloc.initialRefCount + record._refCount = 1 if len(input) > defs.InputLogMinMessageBytesToPool { backbuf := alloc.backbufPools.Get(len(input)) record._backbuf = backbuf diff --git a/base/loganalyzer.go b/base/loganalyzer.go new file mode 100644 index 0000000..b162dbb --- /dev/null +++ b/base/loganalyzer.go @@ -0,0 +1,10 @@ +package base + +type LogAnalyzer interface { + ShouldAnalyze(batch LogRecordBatch) bool + + TrackTraffic(numCleanRecords int, numCleanBytes int64) + Analyze(batch LogRecordBatch, numCleanRecords int, numCleanBytes int64) + + Tick() +} diff --git a/base/logrecord.go b/base/logrecord.go index b95a48c..4726a71 100644 --- a/base/logrecord.go +++ b/base/logrecord.go @@ -10,6 +10,7 @@ type LogRecord struct { RawLength int // Input length or approximated length of entire record, for statistics Timestamp time.Time // Timestamp, might be zero until processed by a LogTransform Unescaped bool // Whether the main message field has been un-escaped. Multi-line logs start with true. + Spam bool _backbuf *[]byte // Backing buffer where initial field values come from, nil if buffer pooling isn't used _refCount int // reference count, + outputs_length for new, -1 for release (back to pool) } diff --git a/base/logrecordbatch.go b/base/logrecordbatch.go new file mode 100644 index 0000000..e2910a8 --- /dev/null +++ b/base/logrecordbatch.go @@ -0,0 +1,8 @@ +package base + +// LogRecordBatch represents a batch of logs passed from orchestrators to transformation pipelines +type LogRecordBatch struct { + Records []*LogRecord + Full bool // true if the batch is sent due to buffer limit reached (as opposed to periodic flushing) + NumBytes int +} diff --git a/input/sysloginput/sysloginput.go b/input/sysloginput/sysloginput.go index 77bdfef..7f3cefe 100644 --- a/input/sysloginput/sysloginput.go +++ b/input/sysloginput/sysloginput.go @@ -117,7 +117,7 @@ func (cfg *Config) VerifyConfig(schema base.LogSchema) error { if err := func() error { dummyMetricFactory := promreg.NewMetricFactory("verify_", nil, nil) dummyInputCounter := base.NewLogInputCounter(dummyMetricFactory) - dummyLogAllocator := base.NewLogAllocator(schema, 1) + dummyLogAllocator := base.NewLogAllocator(schema) _, err := syslogparser.NewParser(logger.Root(), dummyLogAllocator, schema, cfg.LevelMapping, dummyInputCounter) return err }(); err != nil { diff --git a/input/sysloginput/sysloginput_test.go b/input/sysloginput/sysloginput_test.go index 9677a7b..990b8dc 100644 --- a/input/sysloginput/sysloginput_test.go +++ b/input/sysloginput/sysloginput_test.go @@ -24,7 +24,7 @@ func TestSyslogTCPInputConfig(t *testing.T) { testOversizedLogLine := "<163>1 2019-08-15T15:50:46.866916+03:00 local my-app 456 fn - Something" + strings.Repeat("x", defs.InputLogMaxMessageBytes) + "\n" schema := syslogprotocol.RFC5424Schema - allocator := base.NewLogAllocator(schema, 1) + allocator := base.NewLogAllocator(schema) selLevel := schema.MustCreateFieldLocator("level") selLog := schema.MustCreateFieldLocator("log") diff --git a/input/syslogparser/syslogparser_test.go b/input/syslogparser/syslogparser_test.go index d4751c8..daaabeb 100644 --- a/input/syslogparser/syslogparser_test.go +++ b/input/syslogparser/syslogparser_test.go @@ -14,7 +14,7 @@ import ( func TestSyslogParser(t *testing.T) { schema := syslogprotocol.RFC5424Schema - allocator := base.NewLogAllocator(schema, 1) + allocator := base.NewLogAllocator(schema) const line1 = "<163>1 2019-08-15T15:50:46.866915+03:00 local1 my-app1 123 fn1 - Something" const line2 = "<163>1 2020-09-17T16:51:47.867Z local2 my-app2 456 fn2 - Something else" mfactory := promreg.NewMetricFactory("syslog_parser_", nil, nil) diff --git a/orchestrate/obase/pipelines.go b/orchestrate/obase/pipelines.go index c6e2565..42146d1 100644 --- a/orchestrate/obase/pipelines.go +++ b/orchestrate/obase/pipelines.go @@ -23,12 +23,12 @@ type outputWorkerSettings struct { // // Launched workers should start shutting down as soon as the input channel is closed and call onStopped at the end type PipelineStarter func(parentLogger logger.Logger, metricCreator promreg.MetricCreator, - input <-chan []*base.LogRecord, bufferID string, outputTag string, onStopped func()) + input <-chan base.LogRecordBatch, bufferID string, outputTag string, onStopped func()) // PrepareSequentialPipeline makes a starter for pipelines including transformer, serializer and output forwarder func PrepareSequentialPipeline(args bconfig.PipelineArgs) PipelineStarter { return func(parentLogger logger.Logger, metricCreator promreg.MetricCreator, - input <-chan []*base.LogRecord, bufferID string, outputTag string, onStopped func(), + input <-chan base.LogRecordBatch, bufferID string, outputTag string, onStopped func(), ) { outputSettingsSlice := lo.Map(args.OutputBufferPairs, func(pair bconfig.OutputBufferConfig, _ int) outputWorkerSettings { outputLogger := parentLogger.WithField("output", pair.Name) diff --git a/orchestrate/obykeyset/channelinputbuffer.go b/orchestrate/obykeyset/channelinputbuffer.go index d623648..e03e573 100644 --- a/orchestrate/obykeyset/channelinputbuffer.go +++ b/orchestrate/obykeyset/channelinputbuffer.go @@ -16,13 +16,13 @@ import ( // // A buffer is NOT thread-safe and it should be created for each of gorouting sending logs to a channel type channelInputBuffer struct { - Channel chan<- []*base.LogRecord // point to channel in global map - PendingLogs []*base.LogRecord // locally-buffered log records to be sent to channel + Channel chan<- base.LogRecordBatch // point to channel in global map + PendingLogs []*base.LogRecord // locally-buffered log records to be sent to channel PendingBytes int LastFlushTime time.Time } -func newInputBufferForChannel(ch chan<- []*base.LogRecord) *channelInputBuffer { +func newInputBufferForChannel(ch chan<- base.LogRecordBatch) *channelInputBuffer { return &channelInputBuffer{ Channel: ch, PendingLogs: make([]*base.LogRecord, 0, defs.IntermediateBufferMaxNumLogs), @@ -43,18 +43,22 @@ func (cache *channelInputBuffer) Append(record *base.LogRecord) bool { // xx:inl } // Flush flushes all pending logs to the channel -func (cache *channelInputBuffer) Flush(now time.Time, parentLogger logger.Logger, loggingKey interface{}) { +func (cache *channelInputBuffer) Flush(dueToOverflow bool, now time.Time, parentLogger logger.Logger, loggingKey interface{}) { pendingLogs := cache.PendingLogs - reusableLogBuffer := bsupport.CopyLogBuffer(pendingLogs) + newBatch := base.LogRecordBatch{ + Records: bsupport.CopyLogBuffer(pendingLogs), + Full: dueToOverflow, + NumBytes: cache.PendingBytes, + } cache.PendingLogs = pendingLogs[:0] cache.PendingBytes = 0 cache.LastFlushTime = now // Send with timeout; There is enough buffering to make on-demand timer allocations trivial. select { - case cache.Channel <- reusableLogBuffer: + case cache.Channel <- newBatch: // TODO: update metrics case <-time.After(defs.IntermediateChannelTimeout): - parentLogger.Errorf("BUG: timeout flushing: %d records for %s. stack=%s", len(reusableLogBuffer), loggingKey, util.Stack()) + parentLogger.Errorf("BUG: timeout flushing: %d records for %s. stack=%s", len(newBatch.Records), loggingKey, util.Stack()) } } diff --git a/orchestrate/obykeyset/orchestrator.go b/orchestrate/obykeyset/orchestrator.go index 54922f9..2c47704 100644 --- a/orchestrate/obykeyset/orchestrator.go +++ b/orchestrate/obykeyset/orchestrator.go @@ -19,7 +19,7 @@ import ( // A per-connection version has been tried and abandoned because a client may create a new connection after the old one dies, and both need to share info here type byKeySetOrchestrator struct { logger logger.Logger - workerMap *localcachedmap.GlobalCachedMap[chan<- []*base.LogRecord, *channelInputBuffer] // append-only global map of merged keys => worker channel + workerMap *localcachedmap.GlobalCachedMap[chan<- base.LogRecordBatch, *channelInputBuffer] // append-only global map of merged keys => worker channel keyLocators []base.LogFieldLocator tagBuilder *obase.TagBuilder // builder to construct tag from keys, used when protected by globalPipelineChannelMap's mutex metricCreator promreg.MetricCreator @@ -32,8 +32,8 @@ type byKeySetOrchestrator struct { // It holds local buffer of pending logs to a set of global channels to backend workers, used by this input sink and flushes on demand type byKeySetOrchestratorSink struct { logger logger.Logger - workerMap *localcachedmap.LocalCachedMap[chan<- []*base.LogRecord, *channelInputBuffer] // append-only locac cache of byKeySetOrchestrator.workerMap - keySetExtractor base.FieldSetExtractor // extractor to fetch keys from LogRecord(s) + workerMap *localcachedmap.LocalCachedMap[chan<- base.LogRecordBatch, *channelInputBuffer] // append-only locac cache of byKeySetOrchestrator.workerMap + keySetExtractor base.FieldSetExtractor // extractor to fetch keys from LogRecord(s) } // NewOrchestrator creates an Orchestrator to distribute logs to different pipelines by unique combinations of key labels (key set) @@ -68,7 +68,7 @@ func NewOrchestrator(parentLogger logger.Logger, schema base.LogSchema, keyField } o.workerMap = localcachedmap.NewGlobalMap( o.newPipeline, - func(ch chan<- []*base.LogRecord) { close(ch) }, + func(ch chan<- base.LogRecordBatch) { close(ch) }, newInputBufferForChannel, ) @@ -103,10 +103,10 @@ func (o *byKeySetOrchestrator) Shutdown() { } // newPipeline creates channel and pipeline workers for a new key-set, must be protected by global mutex -func (o *byKeySetOrchestrator) newPipeline(keys []string, onStopped func()) chan<- []*base.LogRecord { +func (o *byKeySetOrchestrator) newPipeline(keys []string, onStopped func()) chan<- base.LogRecordBatch { outputTag := o.tagBuilder.Build(keys) workerID := strings.Join(keys, ",") - inputChannel := make(chan []*base.LogRecord, defs.IntermediateBufferedChannelSize) + inputChannel := make(chan base.LogRecordBatch, defs.IntermediateBufferedChannelSize) pipelineLogger := o.logger.WithField(defs.LabelName, workerID) pipelineLogger.Infof("new pipeline tag=%s", outputTag) pipelineMetricCreator := o.metricCreator.AddOrGetPrefix( @@ -127,7 +127,7 @@ func (oc *byKeySetOrchestratorSink) Accept(buffer []*base.LogRecord) { tempKeySet := keySetExtractor.Extract(record) cache := workerMap.GetOrCreate(tempKeySet, oc.onNewLinkToPipeline) if cache.Append(record) { - cache.Flush(now, oc.logger, tempKeySet) + cache.Flush(true, now, oc.logger, tempKeySet) } } } @@ -158,6 +158,6 @@ func (oc *byKeySetOrchestratorSink) flushAllLocalBuffers(forceAll bool) { if !forceAll && now.Sub(cache.LastFlushTime) < defs.IntermediateFlushInterval { return } - cache.Flush(now, oc.logger, mergedKey) + cache.Flush(false, now, oc.logger, mergedKey) }) } diff --git a/orchestrate/obykeyset/orchestrator_test.go b/orchestrate/obykeyset/orchestrator_test.go index 54a330d..fd4a80b 100644 --- a/orchestrate/obykeyset/orchestrator_test.go +++ b/orchestrate/obykeyset/orchestrator_test.go @@ -19,7 +19,7 @@ func TestByKeySetOrchestrator(t *testing.T) { schema := base.MustNewLogSchema([]string{"level", "app", "msg"}) collectedLogsByTag := make(map[string]*[]*base.LogRecord) startPipeline := func(parentLogger logger.Logger, metricCreator promreg.MetricCreator, - input <-chan []*base.LogRecord, bufferID string, outputTag string, onStopped func()) { + input <-chan base.LogRecordBatch, bufferID string, outputTag string, onStopped func()) { t.Logf("new worker %s: %s", outputTag, bufferID) _, ok := collectedLogsByTag[outputTag] @@ -28,9 +28,9 @@ func TestByKeySetOrchestrator(t *testing.T) { collectedLogsByTag[outputTag] = &collectedLogs go func() { counter := metricCreator.AddOrGetCounter("mycounter", "", nil, nil) - for rec := range input { - collectedLogs = append(collectedLogs, rec...) - counter.Add(uint64(len(rec))) + for batch := range input { + collectedLogs = append(collectedLogs, batch.Records...) + counter.Add(uint64(len(batch.Records))) } onStopped() }() diff --git a/orchestrate/osingleton/orchestrator.go b/orchestrate/osingleton/orchestrator.go index fac1a4c..23a09e6 100644 --- a/orchestrate/osingleton/orchestrator.go +++ b/orchestrate/osingleton/orchestrator.go @@ -11,24 +11,25 @@ import ( "github.com/relex/slog-agent/defs" "github.com/relex/slog-agent/orchestrate/obase" "github.com/relex/slog-agent/util" + "github.com/samber/lo" ) type singletonOrchestrator struct { logger logger.Logger - inputChannel chan []*base.LogRecord + inputChannel chan base.LogRecordBatch stopSignal *channels.SignalAwaitable } type singletonOrchestratorChild struct { logger logger.Logger - inputChannel chan []*base.LogRecord + inputChannel chan base.LogRecordBatch } // NewOrchestrator creates a singleton Orchestrator backed by one pipeline to aggregate and process all incoming logs func NewOrchestrator(parentLogger logger.Logger, tag string, metricCreator promreg.MetricCreator, startPipeline obase.PipelineStarter) base.Orchestrator { o := &singletonOrchestrator{ logger: parentLogger.WithField(defs.LabelComponent, "SingletonOrchestrator"), - inputChannel: make(chan []*base.LogRecord, defs.IntermediateBufferedChannelSize), + inputChannel: make(chan base.LogRecordBatch, defs.IntermediateBufferedChannelSize), stopSignal: channels.NewSignalAwaitable(), } startPipeline(o.logger, metricCreator, o.inputChannel, "", tag, o.stopSignal.Signal) @@ -49,12 +50,16 @@ func (o *singletonOrchestrator) Shutdown() { // Accept accepts input logs from LogInput, the buffer is only usable within the function func (oc *singletonOrchestratorChild) Accept(buffer []*base.LogRecord) { - reusableBuffer := bsupport.CopyLogBuffer(buffer) + newBatch := base.LogRecordBatch{ + Records: bsupport.CopyLogBuffer(buffer), + Full: true, + NumBytes: lo.SumBy(buffer, func(b *base.LogRecord) int { return b.RawLength }), + } select { - case oc.inputChannel <- reusableBuffer: + case oc.inputChannel <- newBatch: // TODO: update metrics case <-time.After(defs.IntermediateChannelTimeout): - oc.logger.Errorf("BUG: timeout flushing: %d records. stack=%s", len(reusableBuffer), util.Stack()) + oc.logger.Errorf("BUG: timeout flushing: %d records. stack=%s", len(newBatch.Records), util.Stack()) } } diff --git a/run/loader.go b/run/loader.go index 0f11d79..8eb94f9 100644 --- a/run/loader.go +++ b/run/loader.go @@ -74,7 +74,7 @@ func NewLoaderFromConfigFile(filepath string, metricPrefix string) (*Loader, err PipelineArgs: bconfig.PipelineArgs{ Schema: schema, - Deallocator: base.NewLogAllocator(schema, len(config.OutputBuffersPairs)), + Deallocator: base.NewLogAllocator(schema), MetricKeyLocators: schema.MustCreateFieldLocators(config.MetricKeys), // should have been verified in config parsing TransformConfigs: config.Transformations, OutputBufferPairs: config.OutputBuffersPairs, diff --git a/test/pipeline.go b/test/pipeline.go index f62cab7..5a6c3ca 100644 --- a/test/pipeline.go +++ b/test/pipeline.go @@ -39,7 +39,7 @@ func preparePipeline(configFile string, tagOverride string, metricCreator promre } inputConfig := conf.Inputs[0].Value // we support only one input for testing - allocator := base.NewLogAllocator(schema, len(conf.OutputBuffersPairs)) + allocator := base.NewLogAllocator(schema) inputCounter := base.NewLogInputCounter(metricCreator.AddOrGetPrefix("input_", nil, nil)) parser, perr := inputConfig.NewParser(logger.Root(), allocator, schema, inputCounter)