Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions analyzer/analyzer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package analyzer

import (
"time"

"github.com/relex/slog-agent/base"
)

type simpleAnalyzer struct {
tickTraffic tickTrafficInfo
highVolumeStart time.Time
analysisScheduled bool
}

type tickTrafficInfo struct {
startTime time.Time
numRecords int
numBytes int64
}

func NewAnalyzer() base.LogAnalyzer {
return &simpleAnalyzer{
tickTraffic: tickTrafficInfo{
startTime: time.Now(),
numRecords: 0,
numBytes: 0,
},
highVolumeStart: time.Time{},
analysisScheduled: false,
}
}

func (a *simpleAnalyzer) ShouldAnalyze(batch base.LogRecordBatch) bool {
// we cannot analyze any batch that is not full, as there would be too few samples.
return a.analysisScheduled && batch.Full
}

func (a *simpleAnalyzer) TrackTraffic(numCleanRecords int, numCleanBytes int64) {
a.tickTraffic.numRecords += numCleanRecords
a.tickTraffic.numBytes += numCleanBytes
}

func (a *simpleAnalyzer) Analyze(batch base.LogRecordBatch, numCleanRecords int, numCleanBytes int64) {
if float64(numCleanRecords)/float64(len(batch.Records)) < minimalSampleRatio &&
float64(numCleanBytes)/float64(batch.NumBytes) < minimalSampleRatio {
return
}

a.highVolumeStart = time.Time{}
a.analysisScheduled = false

// TODO: analysis
}

func (a *simpleAnalyzer) Tick() {
now := time.Now()
durationSec := float64(now.Sub(a.tickTraffic.startTime)+1) / float64(time.Second)
highVolume := float64(a.tickTraffic.numRecords)/durationSec >= highVolumeRecordsPerSec ||
float64(a.tickTraffic.numBytes)/durationSec >= highVolumeBytesPerSec

if highVolume {
if a.highVolumeStart.IsZero() {
a.highVolumeStart = now
a.analysisScheduled = false
} else if float64(now.Sub(a.highVolumeStart))/float64(time.Second) >= highVolumeDuration {
a.analysisScheduled = true
}
} else {
a.highVolumeStart = time.Time{}
a.analysisScheduled = false // cancel any if scheduled but not done
}

a.tickTraffic = tickTrafficInfo{
startTime: now,
numRecords: 0,
numBytes: 0,
}
}
13 changes: 13 additions & 0 deletions analyzer/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package analyzer

import (
"time"
)

const (
highVolumeDuration float64 = float64(60 * time.Second)
highVolumeRecordsPerSec float64 = 10.0 * 1000
highVolumeBytesPerSec float64 = 100.0 * 1024 * 1024

minimalSampleRatio = 0.4
)
60 changes: 44 additions & 16 deletions base/bsupport/logprocessingworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (

// LogProcessingWorker is a worker for transformation, serialization and chunk making
type LogProcessingWorker struct {
PipelineWorkerBase[[]*base.LogRecord]
PipelineWorkerBase[base.LogRecordBatch]
deallocator *base.LogAllocator
procCounter *base.LogProcessCounterSet
analyzer base.LogAnalyzer
transformList []base.LogTransformFunc
outputList []OutputInterface

lastChunkTime time.Time
}

Expand All @@ -28,8 +30,8 @@ type OutputInterface struct {

// NewLogProcessingWorker creates LogProcessingWorker
func NewLogProcessingWorker(parentLogger logger.Logger,
input <-chan []*base.LogRecord, deallocator *base.LogAllocator, procCounter *base.LogProcessCounterSet,
transforms []base.LogTransformFunc, outputInterfaces []OutputInterface,
input <-chan base.LogRecordBatch, deallocator *base.LogAllocator, procCounter *base.LogProcessCounterSet,
analyzer base.LogAnalyzer, transforms []base.LogTransformFunc, outputInterfaces []OutputInterface,
) *LogProcessingWorker {
worker := &LogProcessingWorker{
PipelineWorkerBase: NewPipelineWorkerBase(
Expand All @@ -38,23 +40,41 @@ func NewLogProcessingWorker(parentLogger logger.Logger,
),
deallocator: deallocator,
procCounter: procCounter,
analyzer: analyzer,
transformList: transforms,
outputList: outputInterfaces,

lastChunkTime: time.Now(),
}
worker.InitInternal(worker.onInput, worker.onTick, worker.onStop)
return worker
}

func (worker *LogProcessingWorker) onInput(buffer []*base.LogRecord) {
if len(buffer) == 0 {
func (worker *LogProcessingWorker) onInput(batch base.LogRecordBatch) {
if len(batch.Records) == 0 {
return
}
for _, record := range buffer {
worker.lastChunkTime = batch.Records[0].Timestamp

releaseRecord := worker.deallocator.Release
analyzingBatch := worker.analyzer.ShouldAnalyze(batch)
if analyzingBatch {
releaseRecord = func(record *base.LogRecord) {}
}

var numCleanRecords int
var numCleanBytes int64

for _, record := range batch.Records {
icounter := worker.procCounter.SelectMetricKeySet(record)
if RunTransforms(record, worker.transformList) == base.DROP {
result := RunTransforms(record, worker.transformList)
if !record.Spam {
numCleanRecords++
numCleanBytes += int64(record.RawLength)
}
if result == base.DROP {
icounter.CountRecordDrop(record)
worker.deallocator.Release(record)
releaseRecord(record)
continue
}
icounter.CountRecordPass(record)
Expand All @@ -63,29 +83,37 @@ func (worker *LogProcessingWorker) onInput(buffer []*base.LogRecord) {
// TODO:
//if RunTransforms(record, output.transformList) == base.DROP {
// icounter.CountOutputFilter(i, record)
// worker.deallocator.Release(record)
// releaseRecord(record)
// continue
//}
stream := output.SerializeRecord(record)
// TODO: decide whether to release once at the end or release here after per-output transform is implemented
// It will depend on whether records are duplicated for additional outputs, or the same record with all transforms run in place.
worker.deallocator.Release(record)
worker.procCounter.CountStream(i, stream)
maybeChunk := output.WriteStream(stream)
if maybeChunk != nil {
worker.procCounter.CountChunk(i, maybeChunk)
output.AcceptChunk(*maybeChunk)
}
}
releaseRecord(record)
}

if analyzingBatch {
worker.analyzer.Analyze(batch, numCleanRecords, numCleanBytes)
for _, record := range batch.Records {
releaseRecord(record)
}
} else {
worker.analyzer.TrackTraffic(numCleanRecords, numCleanBytes)
}
}

func (worker *LogProcessingWorker) onTick() {
// send buffered streams as a chunk if X seconds have passed
if time.Since(worker.lastChunkTime) < defs.IntermediateFlushInterval {
return
worker.analyzer.Tick()

// flush buffered streams only if one full second has passed
if time.Since(worker.lastChunkTime) >= defs.IntermediateFlushInterval {
worker.flushChunk()
}
worker.flushChunk()
worker.procCounter.UpdateMetrics()
}

Expand Down
6 changes: 2 additions & 4 deletions base/logallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
type LogAllocator struct {
recordPool *sync.Pool // pool of *LogRecord
backbufPools util.BytesPoolBy2n // pools of the backing buffers of LogRecord(s), i.e. pools of raw input copies
initialRefCount int // equal to the total amount of outputs
}

// NewLogAllocator creates LogAllocator linked to the given schema
func NewLogAllocator(schema LogSchema, outputCount int) *LogAllocator {
func NewLogAllocator(schema LogSchema) *LogAllocator {
maxFields := schema.GetMaxFields()
recordPool := &sync.Pool{}
recordPool.New = func() interface{} {
Expand All @@ -27,7 +26,6 @@ func NewLogAllocator(schema LogSchema, outputCount int) *LogAllocator {
return &LogAllocator{
recordPool: recordPool,
backbufPools: util.NewBytesPoolBy2n(),
initialRefCount: outputCount,
}
}

Expand All @@ -46,7 +44,7 @@ func newLogRecord(maxFields int) *LogRecord {
func (alloc *LogAllocator) NewRecord(input []byte) (*LogRecord, string) {
// pooling speeds up 10% in agent benchmarks but minus 20% in pipeline benchmarks
record := alloc.recordPool.Get().(*LogRecord)
record._refCount += alloc.initialRefCount
record._refCount = 1
if len(input) > defs.InputLogMinMessageBytesToPool {
backbuf := alloc.backbufPools.Get(len(input))
record._backbuf = backbuf
Expand Down
10 changes: 10 additions & 0 deletions base/loganalyzer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package base

type LogAnalyzer interface {
ShouldAnalyze(batch LogRecordBatch) bool

TrackTraffic(numCleanRecords int, numCleanBytes int64)
Analyze(batch LogRecordBatch, numCleanRecords int, numCleanBytes int64)

Tick()
}
1 change: 1 addition & 0 deletions base/logrecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type LogRecord struct {
RawLength int // Input length or approximated length of entire record, for statistics
Timestamp time.Time // Timestamp, might be zero until processed by a LogTransform
Unescaped bool // Whether the main message field has been un-escaped. Multi-line logs start with true.
Spam bool
_backbuf *[]byte // Backing buffer where initial field values come from, nil if buffer pooling isn't used
_refCount int // reference count, + outputs_length for new, -1 for release (back to pool)
}
Expand Down
8 changes: 8 additions & 0 deletions base/logrecordbatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package base

// LogRecordBatch represents a batch of logs passed from orchestrators to transformation pipelines
type LogRecordBatch struct {
Records []*LogRecord
Full bool // true if the batch is sent due to buffer limit reached (as opposed to periodic flushing)
NumBytes int
}
2 changes: 1 addition & 1 deletion input/sysloginput/sysloginput.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (cfg *Config) VerifyConfig(schema base.LogSchema) error {
if err := func() error {
dummyMetricFactory := promreg.NewMetricFactory("verify_", nil, nil)
dummyInputCounter := base.NewLogInputCounter(dummyMetricFactory)
dummyLogAllocator := base.NewLogAllocator(schema, 1)
dummyLogAllocator := base.NewLogAllocator(schema)
_, err := syslogparser.NewParser(logger.Root(), dummyLogAllocator, schema, cfg.LevelMapping, dummyInputCounter)
return err
}(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion input/sysloginput/sysloginput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestSyslogTCPInputConfig(t *testing.T) {
testOversizedLogLine := "<163>1 2019-08-15T15:50:46.866916+03:00 local my-app 456 fn - Something" + strings.Repeat("x", defs.InputLogMaxMessageBytes) + "\n"

schema := syslogprotocol.RFC5424Schema
allocator := base.NewLogAllocator(schema, 1)
allocator := base.NewLogAllocator(schema)

selLevel := schema.MustCreateFieldLocator("level")
selLog := schema.MustCreateFieldLocator("log")
Expand Down
2 changes: 1 addition & 1 deletion input/syslogparser/syslogparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestSyslogParser(t *testing.T) {
schema := syslogprotocol.RFC5424Schema
allocator := base.NewLogAllocator(schema, 1)
allocator := base.NewLogAllocator(schema)
const line1 = "<163>1 2019-08-15T15:50:46.866915+03:00 local1 my-app1 123 fn1 - Something"
const line2 = "<163>1 2020-09-17T16:51:47.867Z local2 my-app2 456 fn2 - Something else"
mfactory := promreg.NewMetricFactory("syslog_parser_", nil, nil)
Expand Down
4 changes: 2 additions & 2 deletions orchestrate/obase/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ type outputWorkerSettings struct {
//
// Launched workers should start shutting down as soon as the input channel is closed and call onStopped at the end
type PipelineStarter func(parentLogger logger.Logger, metricCreator promreg.MetricCreator,
input <-chan []*base.LogRecord, bufferID string, outputTag string, onStopped func())
input <-chan base.LogRecordBatch, bufferID string, outputTag string, onStopped func())

// PrepareSequentialPipeline makes a starter for pipelines including transformer, serializer and output forwarder
func PrepareSequentialPipeline(args bconfig.PipelineArgs) PipelineStarter {
return func(parentLogger logger.Logger, metricCreator promreg.MetricCreator,
input <-chan []*base.LogRecord, bufferID string, outputTag string, onStopped func(),
input <-chan base.LogRecordBatch, bufferID string, outputTag string, onStopped func(),
) {
outputSettingsSlice := lo.Map(args.OutputBufferPairs, func(pair bconfig.OutputBufferConfig, _ int) outputWorkerSettings {
outputLogger := parentLogger.WithField("output", pair.Name)
Expand Down
18 changes: 11 additions & 7 deletions orchestrate/obykeyset/channelinputbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
//
// A buffer is NOT thread-safe and it should be created for each of gorouting sending logs to a channel
type channelInputBuffer struct {
Channel chan<- []*base.LogRecord // point to channel in global map
PendingLogs []*base.LogRecord // locally-buffered log records to be sent to channel
Channel chan<- base.LogRecordBatch // point to channel in global map
PendingLogs []*base.LogRecord // locally-buffered log records to be sent to channel
PendingBytes int
LastFlushTime time.Time
}

func newInputBufferForChannel(ch chan<- []*base.LogRecord) *channelInputBuffer {
func newInputBufferForChannel(ch chan<- base.LogRecordBatch) *channelInputBuffer {
return &channelInputBuffer{
Channel: ch,
PendingLogs: make([]*base.LogRecord, 0, defs.IntermediateBufferMaxNumLogs),
Expand All @@ -43,18 +43,22 @@ func (cache *channelInputBuffer) Append(record *base.LogRecord) bool { // xx:inl
}

// Flush flushes all pending logs to the channel
func (cache *channelInputBuffer) Flush(now time.Time, parentLogger logger.Logger, loggingKey interface{}) {
func (cache *channelInputBuffer) Flush(dueToOverflow bool, now time.Time, parentLogger logger.Logger, loggingKey interface{}) {
pendingLogs := cache.PendingLogs
reusableLogBuffer := bsupport.CopyLogBuffer(pendingLogs)
newBatch := base.LogRecordBatch{
Records: bsupport.CopyLogBuffer(pendingLogs),
Full: dueToOverflow,
NumBytes: cache.PendingBytes,
}
cache.PendingLogs = pendingLogs[:0]
cache.PendingBytes = 0
cache.LastFlushTime = now

// Send with timeout; There is enough buffering to make on-demand timer allocations trivial.
select {
case cache.Channel <- reusableLogBuffer:
case cache.Channel <- newBatch:
// TODO: update metrics
case <-time.After(defs.IntermediateChannelTimeout):
parentLogger.Errorf("BUG: timeout flushing: %d records for %s. stack=%s", len(reusableLogBuffer), loggingKey, util.Stack())
parentLogger.Errorf("BUG: timeout flushing: %d records for %s. stack=%s", len(newBatch.Records), loggingKey, util.Stack())
}
}
Loading