Skip to content

Commit 1a66d2d

Browse files
feat: ability to send query context for limit enforcement (#19900)
1 parent 06da42a commit 1a66d2d

File tree

13 files changed

+488
-60
lines changed

13 files changed

+488
-60
lines changed

Makefile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,6 @@ cmd/loki/loki:
223223
cmd/loki/loki-debug:
224224
CGO_ENABLED=0 go build $(DEBUG_GO_FLAGS) -o $@ ./$(@D)
225225

226-
ui-assets:
227-
make -C pkg/ui/frontend build
228226
###############
229227
# Loki-Canary #
230228
###############

pkg/querier/queryrange/codec.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -717,13 +717,21 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
717717
}
718718

719719
// Add limits
720-
if limits := querylimits.ExtractQueryLimitsContext(ctx); limits != nil {
720+
if limits := querylimits.ExtractQueryLimitsFromContext(ctx); limits != nil {
721721
err := querylimits.InjectQueryLimitsHeader(&header, limits)
722722
if err != nil {
723723
return nil, err
724724
}
725725
}
726726

727+
// Add limits context
728+
if limitsCtx := querylimits.ExtractQueryLimitsContextFromContext(ctx); limitsCtx != nil {
729+
err := querylimits.InjectQueryLimitsContextHeader(&header, limitsCtx)
730+
if err != nil {
731+
return nil, err
732+
}
733+
}
734+
727735
// Add org id
728736
orgID, err := user.ExtractOrgID(ctx)
729737
if err != nil {

pkg/querier/queryrange/limits.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/grafana/loki/v3/pkg/util/constants"
3838
"github.com/grafana/loki/v3/pkg/util/httpreq"
3939
util_log "github.com/grafana/loki/v3/pkg/util/log"
40+
"github.com/grafana/loki/v3/pkg/util/querylimits"
4041
"github.com/grafana/loki/v3/pkg/util/spanlogger"
4142
"github.com/grafana/loki/v3/pkg/util/validation"
4243
)
@@ -281,7 +282,30 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra
281282
ctx, sp := tracer.Start(ctx, "querySizeLimiter.getBytesReadForRequest")
282283
defer sp.End()
283284

284-
expr, err := syntax.ParseExpr(r.GetQuery())
285+
queryLimitCtx := querylimits.ExtractQueryLimitsContextFromContext(ctx)
286+
fullCtxBytes := uint64(0)
287+
if queryLimitCtx != nil && queryLimitCtx.Expr != "" && !queryLimitCtx.From.IsZero() && !queryLimitCtx.To.IsZero() {
288+
var err error
289+
fullCtxBytes, err = q.getBytesForQueryAndRange(ctx, queryLimitCtx.Expr, queryLimitCtx.From, queryLimitCtx.To)
290+
if err != nil {
291+
return 0, nil
292+
}
293+
}
294+
295+
queryBytes, err := q.getBytesForQueryAndRange(ctx, r.GetQuery(), r.GetStart(), r.GetEnd())
296+
if err != nil {
297+
return 0, nil
298+
}
299+
300+
if fullCtxBytes > queryBytes {
301+
return fullCtxBytes, nil
302+
}
303+
304+
return queryBytes, nil
305+
}
306+
307+
func (q *querySizeLimiter) getBytesForQueryAndRange(ctx context.Context, query string, from, to time.Time) (uint64, error) {
308+
expr, err := syntax.ParseExpr(query)
285309
if err != nil {
286310
return 0, err
287311
}
@@ -294,7 +318,16 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra
294318
// TODO: Set concurrency dynamically as in shardResolverForConf?
295319
start := time.Now()
296320
const maxConcurrentIndexReq = 10
297-
matcherStats, err := getStatsForMatchers(ctx, q.logger, q.statsHandler, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli()), matcherGroups, maxConcurrentIndexReq, q.maxLookBackPeriod)
321+
matcherStats, err := getStatsForMatchers(
322+
ctx,
323+
q.logger,
324+
q.statsHandler,
325+
model.Time(from.UnixMilli()),
326+
model.Time(to.UnixMilli()),
327+
matcherGroups,
328+
maxConcurrentIndexReq,
329+
q.maxLookBackPeriod,
330+
)
298331
if err != nil {
299332
return 0, err
300333
}

pkg/querier/queryrange/limits_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/grafana/loki/v3/pkg/util/constants"
3030
"github.com/grafana/loki/v3/pkg/util/httpreq"
3131
util_log "github.com/grafana/loki/v3/pkg/util/log"
32+
"github.com/grafana/loki/v3/pkg/util/querylimits"
3233
)
3334

3435
func TestLimits(t *testing.T) {
@@ -1051,6 +1052,145 @@ func Test_MaxQuerySize(t *testing.T) {
10511052
}
10521053
}
10531054

1055+
func Test_MaxQuerySize_WithQueryLimitsContext(t *testing.T) {
1056+
// a sentinal query value to control when our mock stats handler returns context stats
1057+
ctxSentinal := `{context="true"}`
1058+
schemas := []config.PeriodConfig{
1059+
{
1060+
From: config.DayTime{Time: model.TimeFromUnix(testTime.Add(-48 * time.Hour).Unix())},
1061+
IndexType: types.TSDBType,
1062+
},
1063+
}
1064+
1065+
for _, tc := range []struct {
1066+
desc string
1067+
query string
1068+
queryStart time.Time
1069+
queryEnd time.Time
1070+
queryBytes uint64
1071+
contextStart time.Time
1072+
contextEnd time.Time
1073+
contextBytes uint64
1074+
limit int
1075+
shouldErr bool
1076+
expectedStatsHits int
1077+
}{
1078+
{
1079+
desc: "No context, query under limit",
1080+
query: `{app="foo"} |= "foo"`,
1081+
queryStart: testTime.Add(-1 * time.Hour),
1082+
queryEnd: testTime,
1083+
queryBytes: 500,
1084+
limit: 1000,
1085+
shouldErr: false,
1086+
expectedStatsHits: 1,
1087+
},
1088+
{
1089+
desc: "Context range larger, both under limit",
1090+
query: `{app="foo"} |= "foo"`,
1091+
queryStart: testTime.Add(-1 * time.Hour),
1092+
queryEnd: testTime,
1093+
queryBytes: 200,
1094+
contextStart: testTime.Add(-24 * time.Hour),
1095+
contextEnd: testTime,
1096+
contextBytes: 800,
1097+
limit: 1000,
1098+
shouldErr: false,
1099+
expectedStatsHits: 2,
1100+
},
1101+
{
1102+
desc: "Context range larger, context exceeds limit",
1103+
query: `{app="foo"} |= "foo"`,
1104+
queryStart: testTime.Add(-1 * time.Hour),
1105+
queryEnd: testTime,
1106+
queryBytes: 200,
1107+
contextStart: testTime.Add(-24 * time.Hour),
1108+
contextEnd: testTime,
1109+
contextBytes: 1200,
1110+
limit: 1000,
1111+
shouldErr: true,
1112+
expectedStatsHits: 2,
1113+
},
1114+
{
1115+
desc: "Query range larger, query exceeds limit",
1116+
query: `{app="foo"} |= "foo"`,
1117+
queryStart: testTime.Add(-24 * time.Hour),
1118+
queryEnd: testTime,
1119+
queryBytes: 1200,
1120+
contextStart: testTime.Add(-1 * time.Hour),
1121+
contextEnd: testTime,
1122+
contextBytes: 200,
1123+
limit: 1000,
1124+
shouldErr: true,
1125+
expectedStatsHits: 2,
1126+
},
1127+
} {
1128+
t.Run(tc.desc, func(t *testing.T) {
1129+
statsHits := atomic.NewInt32(0)
1130+
1131+
statsHandler := queryrangebase.HandlerFunc(func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
1132+
statsHits.Inc()
1133+
1134+
bytes := tc.queryBytes
1135+
if req.GetQuery() == ctxSentinal {
1136+
bytes = tc.contextBytes
1137+
}
1138+
1139+
return &IndexStatsResponse{
1140+
Response: &logproto.IndexStatsResponse{
1141+
Bytes: bytes,
1142+
},
1143+
}, nil
1144+
})
1145+
1146+
promHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
1147+
return &LokiPromResponse{
1148+
Response: &queryrangebase.PrometheusResponse{
1149+
Status: "success",
1150+
},
1151+
}, nil
1152+
})
1153+
1154+
lokiReq := &LokiRequest{
1155+
Query: tc.query,
1156+
StartTs: tc.queryStart,
1157+
EndTs: tc.queryEnd,
1158+
Direction: logproto.FORWARD,
1159+
Path: "/query_range",
1160+
Plan: &plan.QueryPlan{
1161+
AST: syntax.MustParseExpr(tc.query),
1162+
},
1163+
}
1164+
1165+
ctx := user.InjectOrgID(context.Background(), "foo")
1166+
1167+
if !tc.contextStart.IsZero() && !tc.contextEnd.IsZero() {
1168+
ctx = querylimits.InjectQueryLimitsContextIntoContext(ctx, querylimits.Context{
1169+
Expr: ctxSentinal, // a hack to make mocking the stats handler easier, irl this should be the same query as in the request
1170+
From: tc.contextStart,
1171+
To: tc.contextEnd,
1172+
})
1173+
}
1174+
1175+
middlewares := []queryrangebase.Middleware{
1176+
NewQuerySizeLimiterMiddleware(schemas, testEngineOpts, util_log.Logger, fakeLimits{
1177+
maxQueryBytesRead: tc.limit,
1178+
}, statsHandler),
1179+
}
1180+
1181+
_, err := queryrangebase.MergeMiddlewares(middlewares...).Wrap(promHandler).Do(ctx, lokiReq)
1182+
1183+
if tc.shouldErr {
1184+
require.Error(t, err)
1185+
} else {
1186+
require.NoError(t, err)
1187+
}
1188+
1189+
require.Equal(t, tc.expectedStatsHits, int(statsHits.Load()))
1190+
})
1191+
}
1192+
}
1193+
10541194
func Test_MaxQuerySize_MaxLookBackPeriod(t *testing.T) {
10551195
engineOpts := testEngineOpts
10561196
engineOpts.MaxLookBackPeriod = 1 * time.Hour

pkg/querier/queryrange/marshal.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,16 @@ func (Codec) QueryRequestUnwrap(ctx context.Context, req *QueryRequest) (queryra
342342
if err != nil {
343343
return nil, ctx, err
344344
}
345-
ctx = querylimits.InjectQueryLimitsContext(ctx, *limits)
345+
ctx = querylimits.InjectQueryLimitsIntoContext(ctx, *limits)
346+
}
347+
348+
// Add limits context
349+
if encodedLimitsCtx, ok := req.Metadata[querylimits.HTTPHeaderQueryLimitsContextKey]; ok {
350+
limitsCtx, err := querylimits.UnmarshalQueryLimitsContext([]byte(encodedLimitsCtx))
351+
if err != nil {
352+
return nil, ctx, err
353+
}
354+
ctx = querylimits.InjectQueryLimitsContextIntoContext(ctx, *limitsCtx)
346355
}
347356

348357
// Add query time
@@ -454,7 +463,7 @@ func (Codec) QueryRequestWrap(ctx context.Context, r queryrangebase.Request) (*Q
454463
}
455464

456465
// Add limits
457-
limits := querylimits.ExtractQueryLimitsContext(ctx)
466+
limits := querylimits.ExtractQueryLimitsFromContext(ctx)
458467
if limits != nil {
459468
encodedLimits, err := querylimits.MarshalQueryLimits(limits)
460469
if err != nil {
@@ -463,6 +472,16 @@ func (Codec) QueryRequestWrap(ctx context.Context, r queryrangebase.Request) (*Q
463472
result.Metadata[querylimits.HTTPHeaderQueryLimitsKey] = string(encodedLimits)
464473
}
465474

475+
// Add limits context
476+
limitsCtx := querylimits.ExtractQueryLimitsContextFromContext(ctx)
477+
if limitsCtx != nil {
478+
encodedLimitsCtx, err := querylimits.MarshalQueryLimitsContext(limitsCtx)
479+
if err != nil {
480+
return nil, err
481+
}
482+
result.Metadata[querylimits.HTTPHeaderQueryLimitsContextKey] = string(encodedLimitsCtx)
483+
}
484+
466485
// Add org ID
467486
orgID, err := user.ExtractOrgID(ctx)
468487
if err != nil {

pkg/util/querylimits/grpc.go

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,43 @@ package querylimits
22

33
import (
44
"context"
5-
"fmt"
65

76
"google.golang.org/grpc"
87
"google.golang.org/grpc/metadata"
98
)
109

1110
const (
12-
lowerQueryLimitsHeaderName = "x-loki-query-limits"
11+
lowerQueryLimitsHeaderName = "x-loki-query-limits"
12+
lowerQueryLimitsContextHeaderName = "x-loki-query-limits-context"
1313
)
1414

1515
func injectIntoGRPCRequest(ctx context.Context) (context.Context, error) {
16-
limits := ExtractQueryLimitsContext(ctx)
17-
fmt.Printf("extract limits grpc: %v", limits)
18-
if limits == nil {
19-
return ctx, nil
20-
}
2116
// inject into GRPC metadata
2217
md, ok := metadata.FromOutgoingContext(ctx)
2318
if !ok {
2419
md = metadata.New(map[string]string{})
2520
}
2621
md = md.Copy()
27-
headerValue, err := MarshalQueryLimits(limits)
28-
if err != nil {
29-
return nil, err
22+
23+
limits := ExtractQueryLimitsFromContext(ctx)
24+
if limits != nil {
25+
headerValue, err := MarshalQueryLimits(limits)
26+
if err != nil {
27+
return nil, err
28+
}
29+
md.Set(lowerQueryLimitsHeaderName, string(headerValue))
30+
}
31+
32+
limitsCtx := ExtractQueryLimitsContextFromContext(ctx)
33+
if limitsCtx != nil {
34+
headerValue, err := MarshalQueryLimitsContext(limitsCtx)
35+
if err != nil {
36+
return nil, err
37+
}
38+
md.Set(lowerQueryLimitsContextHeaderName, string(headerValue))
3039
}
31-
md.Set(lowerQueryLimitsHeaderName, string(headerValue))
32-
newCtx := metadata.NewOutgoingContext(ctx, md)
3340

41+
newCtx := metadata.NewOutgoingContext(ctx, md)
3442
return newCtx, nil
3543
}
3644

@@ -60,21 +68,27 @@ func extractFromGRPCRequest(ctx context.Context) (context.Context, error) {
6068
}
6169

6270
headerValues, ok := md[lowerQueryLimitsHeaderName]
63-
if !ok {
64-
// No QueryLimits header in metadata, just return context
65-
return ctx, nil
71+
if ok && len(headerValues) > 0 {
72+
// Pick first header
73+
limits, err := UnmarshalQueryLimits([]byte(headerValues[0]))
74+
if err != nil {
75+
return ctx, err
76+
}
77+
ctx = InjectQueryLimitsIntoContext(ctx, *limits)
6678
}
6779

68-
if len(headerValues) == 0 {
69-
return ctx, nil
80+
// Extract QueryLimitsContext if present
81+
headerContextValues, ok := md[lowerQueryLimitsContextHeaderName]
82+
if ok && len(headerContextValues) > 0 {
83+
// Pick first header
84+
limitsCtx, err := UnmarshalQueryLimitsContext([]byte(headerContextValues[0]))
85+
if err != nil {
86+
return ctx, err
87+
}
88+
ctx = InjectQueryLimitsContextIntoContext(ctx, *limitsCtx)
7089
}
7190

72-
// Pick first header
73-
limits, err := UnmarshalQueryLimits([]byte(headerValues[0]))
74-
if err != nil {
75-
return ctx, err
76-
}
77-
return InjectQueryLimitsContext(ctx, *limits), nil
91+
return ctx, nil
7892
}
7993

8094
func ServerQueryLimitsInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {

0 commit comments

Comments
 (0)