Skip to content

Commit 11b1dcb

Browse files
authored
chore(engine): introduce execution capture (#19821)
1 parent bd99fa5 commit 11b1dcb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3881
-194
lines changed

pkg/engine/basic_engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/grafana/loki/v3/pkg/util/httpreq"
2929
utillog "github.com/grafana/loki/v3/pkg/util/log"
3030
"github.com/grafana/loki/v3/pkg/util/rangeio"
31+
"github.com/grafana/loki/v3/pkg/xcap"
3132
)
3233

3334
var tracer = otel.Tracer("pkg/engine")
@@ -190,6 +191,7 @@ func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Re
190191
return logqlmodel.Result{}, err
191192
}
192193

194+
ctx, capture := xcap.NewCapture(ctx, nil)
193195
builder, err := func() (ResultBuilder, error) {
194196
ctx, span := tracer.Start(ctx, "QueryEngine.Execute.Process")
195197
defer span.End()
@@ -203,6 +205,7 @@ func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Re
203205
MergePrefetchCount: e.cfg.MergePrefetchCount,
204206
Bucket: e.bucket,
205207
}
208+
206209
pipeline := executor.Run(ctx, cfg, physicalPlan, logger)
207210
defer pipeline.Close()
208211

@@ -238,6 +241,7 @@ func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Re
238241
span.SetStatus(codes.Error, "failed to build results")
239242
return logqlmodel.Result{}, err
240243
}
244+
capture.End()
241245

242246
durFull := time.Since(startTime)
243247
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)

pkg/engine/engine.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/grafana/loki/v3/pkg/util/httpreq"
3434
util_log "github.com/grafana/loki/v3/pkg/util/log"
3535
"github.com/grafana/loki/v3/pkg/util/rangeio"
36+
"github.com/grafana/loki/v3/pkg/xcap"
3637
)
3738

3839
var (
@@ -154,9 +155,10 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R
154155
// This pain point will eventually go away as remaining usages of
155156
// [logql.Engine] disappear.
156157

158+
ctx, _ = xcap.NewCapture(ctx, nil)
157159
startTime := time.Now()
158160

159-
ctx, span := tracer.Start(ctx, "Engine.Execute", trace.WithAttributes(
161+
ctx, region := xcap.StartRegion(ctx, "Engine.Execute", xcap.WithRegionAttributes(
160162
attribute.String("type", string(logql.GetRangeType(params))),
161163
attribute.String("query", params.QueryString()),
162164
attribute.Stringer("start", params.Start()),
@@ -165,7 +167,7 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R
165167
attribute.Stringer("length", params.End().Sub(params.Start())),
166168
attribute.StringSlice("shards", params.Shards()),
167169
))
168-
defer span.End()
170+
defer region.End()
169171

170172
ctx = e.buildContext(ctx)
171173
logger := util_log.WithContext(ctx, e.logger)
@@ -176,21 +178,21 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R
176178
logicalPlan, durLogicalPlanning, err := e.buildLogicalPlan(ctx, logger, params)
177179
if err != nil {
178180
e.metrics.subqueries.WithLabelValues(statusNotImplemented).Inc()
179-
span.SetStatus(codes.Error, "failed to create logical plan")
181+
region.SetStatus(codes.Error, "failed to create logical plan")
180182
return logqlmodel.Result{}, ErrNotSupported
181183
}
182184

183185
physicalPlan, durPhysicalPlanning, err := e.buildPhysicalPlan(ctx, logger, params, logicalPlan)
184186
if err != nil {
185187
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
186-
span.SetStatus(codes.Error, "failed to create physical plan")
188+
region.SetStatus(codes.Error, "failed to create physical plan")
187189
return logqlmodel.Result{}, ErrPlanningFailed
188190
}
189191

190192
wf, durWorkflowPlanning, err := e.buildWorkflow(ctx, logger, physicalPlan)
191193
if err != nil {
192194
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
193-
span.SetStatus(codes.Error, "failed to create execution plan")
195+
region.SetStatus(codes.Error, "failed to create execution plan")
194196
return logqlmodel.Result{}, ErrPlanningFailed
195197
}
196198
defer wf.Close()
@@ -200,15 +202,15 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R
200202
level.Error(logger).Log("msg", "failed to execute query", "err", err)
201203

202204
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
203-
span.SetStatus(codes.Error, "failed to execute query")
205+
region.SetStatus(codes.Error, "failed to execute query")
204206
return logqlmodel.Result{}, ErrSchedulingFailed
205207
}
206208
defer pipeline.Close()
207209

208210
builder, durExecution, err := e.collectResult(ctx, logger, params, pipeline)
209211
if err != nil {
210212
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
211-
span.SetStatus(codes.Error, "error during query execution")
213+
region.SetStatus(codes.Error, "error during query execution")
212214
return logqlmodel.Result{}, err
213215
}
214216

@@ -228,7 +230,7 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R
228230
stats := statsCtx.Result(durFull, queueTime, builder.Len())
229231
md := metadata.FromContext(ctx)
230232

231-
span.SetStatus(codes.Ok, "")
233+
region.SetStatus(codes.Ok, "")
232234
result := builder.Build(stats, md)
233235

234236
logql.RecordRangeAndInstantQueryMetrics(ctx, logger, params, strconv.Itoa(http.StatusOK), stats, result.Data)

pkg/engine/internal/executor/compat.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ import (
1111

1212
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
1313
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
14+
"github.com/grafana/loki/v3/pkg/xcap"
1415
)
1516

16-
func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipeline) Pipeline {
17+
func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipeline, region *xcap.Region) Pipeline {
1718
const extracted = "_extracted"
1819

19-
return newGenericPipeline(func(ctx context.Context, inputs []Pipeline) (arrow.RecordBatch, error) {
20+
return newGenericPipelineWithRegion(func(ctx context.Context, inputs []Pipeline) (arrow.RecordBatch, error) {
2021
input := inputs[0]
2122
batch, err := input.Read(ctx)
2223
if err != nil {
@@ -61,6 +62,8 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
6162
return batch, nil
6263
}
6364

65+
region.Record(statCompatCollisionFound.Observe(true))
66+
6467
// Next, update the schema with the new columns that have the _extracted suffix.
6568
newSchema := batch.Schema()
6669
duplicateCols := make([]duplicateColumn, 0, len(duplicates))
@@ -144,7 +147,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
144147
}
145148

146149
return array.NewRecordBatch(newSchema, newSchemaColumns, batch.NumRows()), nil
147-
}, input)
150+
}, region, input)
148151
}
149152

150153
// duplicate holds indexes to a duplicate values in two slices

pkg/engine/internal/executor/compat_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
485485
}
486486

487487
// Create compatibility pipeline
488-
pipeline := newColumnCompatibilityPipeline(tt.compat, input)
488+
pipeline := newColumnCompatibilityPipeline(tt.compat, input, nil)
489489
defer pipeline.Close()
490490

491491
if tt.expectError {
@@ -551,7 +551,7 @@ func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) {
551551
{"invalid-field-name": "test"},
552552
})
553553

554-
pipeline := newColumnCompatibilityPipeline(compat, input)
554+
pipeline := newColumnCompatibilityPipeline(compat, input, nil)
555555
defer pipeline.Close()
556556

557557
_, err := pipeline.Read(t.Context())
@@ -570,7 +570,7 @@ func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) {
570570
expectedErr := errors.New("test error")
571571
input := errorPipeline(t.Context(), expectedErr)
572572

573-
pipeline := newColumnCompatibilityPipeline(compat, input)
573+
pipeline := newColumnCompatibilityPipeline(compat, input, nil)
574574
defer pipeline.Close()
575575

576576
_, err := pipeline.Read(t.Context())
@@ -595,7 +595,7 @@ func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) {
595595
{"utf8.label.status": "200", "int64.metadata.status": int64(200)},
596596
})
597597

598-
pipeline := newColumnCompatibilityPipeline(compat, input)
598+
pipeline := newColumnCompatibilityPipeline(compat, input, nil)
599599
defer pipeline.Close()
600600

601601
// This should panic with "invalid column type: only string columns can be checked for collisions"

pkg/engine/internal/executor/dataobjscan.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
2020
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
2121
"github.com/grafana/loki/v3/pkg/engine/internal/types"
22+
"github.com/grafana/loki/v3/pkg/xcap"
2223
)
2324

2425
type dataobjScanOptions struct {
@@ -34,6 +35,7 @@ type dataobjScanOptions struct {
3435
type dataobjScan struct {
3536
opts dataobjScanOptions
3637
logger log.Logger
38+
region *xcap.Region
3739

3840
initialized bool
3941
initializedAt time.Time
@@ -49,10 +51,11 @@ var _ Pipeline = (*dataobjScan)(nil)
4951
// [arrow.RecordBatch] composed of the requested log section in a data object. Rows
5052
// in the returned record are ordered by timestamp in the direction specified
5153
// by opts.Direction.
52-
func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger) *dataobjScan {
54+
func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger, region *xcap.Region) *dataobjScan {
5355
return &dataobjScan{
5456
opts: opts,
5557
logger: logger,
58+
region: region,
5659
}
5760
}
5861

@@ -370,11 +373,10 @@ func (s *dataobjScan) read(ctx context.Context) (arrow.RecordBatch, error) {
370373

371374
// Close closes s and releases all resources.
372375
func (s *dataobjScan) Close() {
373-
if s.reader != nil {
374-
// TODO(ashwanth): remove this once we have stats collection via executor
375-
s.reader.Stats().LogSummary(s.logger, time.Since(s.initializedAt))
376+
if s.region != nil && s.reader != nil {
377+
s.recordReaderStats()
378+
s.region.End()
376379
}
377-
378380
if s.streams != nil {
379381
s.streams.Close()
380382
}
@@ -386,4 +388,46 @@ func (s *dataobjScan) Close() {
386388
s.streams = nil
387389
s.streamsInjector = nil
388390
s.reader = nil
391+
s.region = nil
392+
}
393+
394+
// recordReaderStats records statistics from the [logs.Reader] to the xcap region.
395+
// TODO: [dataset.ReaderStats] should be replaced by xcap statistics.
396+
func (s *dataobjScan) recordReaderStats() {
397+
if s.region == nil || s.reader == nil {
398+
return
399+
}
400+
401+
stats := s.reader.Stats()
402+
if stats == nil {
403+
return
404+
}
405+
406+
// Record basic stats
407+
s.region.Record(statDatasetPrimaryColumns.Observe(int64(stats.PrimaryColumns)))
408+
s.region.Record(statDatasetSecondaryColumns.Observe(int64(stats.SecondaryColumns)))
409+
s.region.Record(statDatasetPrimaryColumnPages.Observe(int64(stats.PrimaryColumnPages)))
410+
s.region.Record(statDatasetSecondaryColumnPages.Observe(int64(stats.SecondaryColumnPages)))
411+
s.region.Record(statDatasetMaxRows.Observe(int64(stats.MaxRows)))
412+
s.region.Record(statDatasetRowsAfterPruning.Observe(int64(stats.RowsToReadAfterPruning)))
413+
s.region.Record(statDatasetPrimaryRowsRead.Observe(int64(stats.PrimaryRowsRead)))
414+
s.region.Record(statDatasetSecondaryRowsRead.Observe(int64(stats.SecondaryRowsRead)))
415+
s.region.Record(statDatasetPrimaryRowBytes.Observe(int64(stats.PrimaryRowBytes)))
416+
s.region.Record(statDatasetSecondaryRowBytes.Observe(int64(stats.SecondaryRowBytes)))
417+
418+
// Record download stats
419+
downloadStats := stats.DownloadStats
420+
s.region.Record(statDatasetPagesScanned.Observe(int64(downloadStats.PagesScanned)))
421+
s.region.Record(statDatasetPagesFoundInCache.Observe(int64(downloadStats.PagesFoundInCache)))
422+
s.region.Record(statDatasetBatchDownloadRequests.Observe(int64(downloadStats.BatchDownloadRequests)))
423+
s.region.Record(statDatasetPageDownloadTime.Observe(downloadStats.PageDownloadTime.Nanoseconds()))
424+
s.region.Record(statDatasetPrimaryColumnBytes.Observe(int64(downloadStats.PrimaryColumnBytes)))
425+
s.region.Record(statDatasetSecondaryColumnBytes.Observe(int64(downloadStats.SecondaryColumnBytes)))
426+
s.region.Record(statDatasetPrimaryColumnUncompressedBytes.Observe(int64(downloadStats.PrimaryColumnUncompressedBytes)))
427+
s.region.Record(statDatasetSecondaryColumnUncompressedBytes.Observe(int64(downloadStats.SecondaryColumnUncompressedBytes)))
428+
}
429+
430+
// Region implements RegionProvider.
431+
func (s *dataobjScan) Region() *xcap.Region {
432+
return s.region
389433
}

pkg/engine/internal/executor/dataobjscan_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func Test_dataobjScan(t *testing.T) {
8282
Projections: nil, // All columns
8383

8484
BatchSize: 512,
85-
}, log.NewNopLogger())
85+
}, log.NewNopLogger(), nil)
8686

8787
expectFields := []arrow.Field{
8888
semconv.FieldFromFQN("utf8.label.env", true),
@@ -115,7 +115,7 @@ prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world`
115115
},
116116

117117
BatchSize: 512,
118-
}, log.NewNopLogger())
118+
}, log.NewNopLogger(), nil)
119119

120120
expectFields := []arrow.Field{
121121
semconv.FieldFromFQN("utf8.label.env", true),
@@ -141,7 +141,7 @@ prod,1970-01-01 00:00:02`
141141
Projections: nil, // All columns
142142

143143
BatchSize: 512,
144-
}, log.NewNopLogger())
144+
}, log.NewNopLogger(), nil)
145145

146146
expectFields := []arrow.Field{
147147
semconv.FieldFromFQN("utf8.label.env", true),
@@ -172,7 +172,7 @@ prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world`
172172
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}},
173173
},
174174
BatchSize: 512,
175-
}, log.NewNopLogger())
175+
}, log.NewNopLogger(), nil)
176176

177177
expectFields := []arrow.Field{
178178
semconv.FieldFromFQN("utf8.label.env", true),
@@ -251,7 +251,7 @@ func Test_dataobjScan_DuplicateColumns(t *testing.T) {
251251
StreamIDs: []int64{1, 2, 3}, // All streams
252252
Projections: nil, // All columns
253253
BatchSize: 512,
254-
}, log.NewNopLogger())
254+
}, log.NewNopLogger(), nil)
255255

256256
expectFields := []arrow.Field{
257257
semconv.FieldFromFQN("utf8.label.env", true),
@@ -285,7 +285,7 @@ prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1`
285285
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}},
286286
},
287287
BatchSize: 512,
288-
}, log.NewNopLogger())
288+
}, log.NewNopLogger(), nil)
289289

290290
expectFields := []arrow.Field{
291291
semconv.FieldFromFQN("utf8.label.pod", true),
@@ -311,7 +311,7 @@ pod-1,override`
311311
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}},
312312
},
313313
BatchSize: 512,
314-
}, log.NewNopLogger())
314+
}, log.NewNopLogger(), nil)
315315

316316
expectFields := []arrow.Field{
317317
semconv.FieldFromFQN("utf8.label.namespace", true),

0 commit comments

Comments
 (0)