Skip to content

Commit d38a8fc

Browse files
aleks-pshelldandy
authored andcommitted
Usage group support for ingestion limits (grafana#3914)
* Apply ingestion limits to usage groups * Check actual usage groups in the request * Make test more robust * Fix case where usage groups can match against a disjoint set of request labels * Reduce nesting in checkUsageGroupsIngestLimit
1 parent 4f2d470 commit d38a8fc

File tree

5 files changed

+110
-2
lines changed

5 files changed

+110
-2
lines changed

pkg/distributor/distributor.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,12 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
321321

322322
for _, series := range req.Series {
323323
profName := phlaremodel.Labels(series.Labels).Get(ProfileName)
324+
324325
groups := usageGroups.GetUsageGroups(tenantID, series.Labels)
326+
if err := d.checkUsageGroupsIngestLimit(tenantID, groups.Names(), req); err != nil {
327+
return nil, err
328+
}
329+
325330
profLanguage := d.GetProfileLanguage(series)
326331

327332
for _, raw := range series.Samples {
@@ -765,6 +770,30 @@ func (d *Distributor) checkIngestLimit(tenantID string, req *distributormodel.Pu
765770
return nil
766771
}
767772

773+
func (d *Distributor) checkUsageGroupsIngestLimit(tenantID string, groupsInRequest []string, req *distributormodel.PushRequest) error {
774+
l := d.limits.IngestionLimit(tenantID)
775+
if l == nil || len(l.UsageGroups) == 0 {
776+
return nil
777+
}
778+
779+
for _, group := range groupsInRequest {
780+
limit, ok := l.UsageGroups[group]
781+
if !ok || !limit.LimitReached {
782+
continue
783+
}
784+
if d.ingestionLimitsSampler.AllowRequest(tenantID, l.Sampling) {
785+
return nil
786+
}
787+
limitResetTime := time.Unix(l.LimitResetTime, 0).UTC().Format(time.RFC3339)
788+
validation.DiscardedProfiles.WithLabelValues(string(validation.IngestLimitReached), tenantID).Add(float64(req.TotalProfiles))
789+
validation.DiscardedBytes.WithLabelValues(string(validation.IngestLimitReached), tenantID).Add(float64(req.TotalBytesUncompressed))
790+
return connect.NewError(connect.CodeResourceExhausted,
791+
fmt.Errorf("limit of %s/%s reached for usage group %s, next reset at %s", humanize.IBytes(uint64(limit.PeriodLimitMb*1024*1024)), l.PeriodType, group, limitResetTime))
792+
}
793+
794+
return nil
795+
}
796+
768797
type profileTracker struct {
769798
profile *distributormodel.ProfileSeries
770799
minSuccess int

pkg/distributor/distributor_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,51 @@ func Test_Limits(t *testing.T) {
322322
expectedCode: connect.CodeResourceExhausted,
323323
expectedValidationReason: validation.IngestLimitReached,
324324
},
325+
{
326+
description: "ingest_limit_reached_for_usage_group",
327+
pushReq: &pushv1.PushRequest{
328+
Series: []*pushv1.RawProfileSeries{
329+
{
330+
Labels: []*typesv1.LabelPair{
331+
{Name: "__name__", Value: "cpu"},
332+
{Name: phlaremodel.LabelNameServiceName, Value: "svc"},
333+
},
334+
Samples: []*pushv1.RawSample{
335+
{
336+
RawProfile: collectTestProfileBytes(t),
337+
},
338+
},
339+
},
340+
},
341+
},
342+
overrides: validation.MockOverrides(func(defaults *validation.Limits, tenantLimits map[string]*validation.Limits) {
343+
l := validation.MockDefaultLimits()
344+
l.IngestionLimit = &ingest_limits.Config{
345+
PeriodType: "hour",
346+
PeriodLimitMb: 128,
347+
LimitResetTime: 1737721086,
348+
LimitReached: false,
349+
Sampling: ingest_limits.SamplingConfig{
350+
NumRequests: 0,
351+
Period: time.Minute,
352+
},
353+
UsageGroups: map[string]ingest_limits.UsageGroup{
354+
"group-1": {
355+
PeriodLimitMb: 64,
356+
LimitReached: true,
357+
},
358+
},
359+
}
360+
usageGroupCfg, err := validation.NewUsageGroupConfig(map[string]string{
361+
"group-1": "{service_name=\"svc\"}",
362+
})
363+
require.NoError(t, err)
364+
l.DistributorUsageGroups = &usageGroupCfg
365+
tenantLimits["user-1"] = l
366+
}),
367+
expectedCode: connect.CodeResourceExhausted,
368+
expectedValidationReason: validation.IngestLimitReached,
369+
},
325370
}
326371

327372
for _, tc := range testCases {

pkg/distributor/ingest_limits/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ type Config struct {
1313
LimitReached bool `yaml:"limit_reached" json:"limit_reached"`
1414
// Sampling controls the sampling parameters when the limit is reached.
1515
Sampling SamplingConfig `yaml:"sampling" json:"sampling"`
16+
// UsageGroups controls ingestion for pre-configured usage groups.
17+
UsageGroups map[string]UsageGroup `yaml:"usage_groups" json:"usage_groups"`
1618
}
1719

1820
// SamplingConfig describes the params of a simple probabilistic sampling mechanism.
@@ -23,3 +25,8 @@ type SamplingConfig struct {
2325
NumRequests int `yaml:"num_requests" json:"num_requests"`
2426
Period time.Duration `yaml:"period" json:"period"`
2527
}
28+
29+
type UsageGroup struct {
30+
PeriodLimitMb int `yaml:"period_limit_mb" json:"period_limit_mb"`
31+
LimitReached bool `yaml:"limit_reached" json:"limit_reached"`
32+
}

pkg/validation/usage_groups.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ func (m UsageGroupMatch) CountDiscardedBytes(reason string, n int64) {
126126
}
127127
}
128128

129+
func (m UsageGroupMatch) Names() []string {
130+
return m.names
131+
}
132+
129133
func NewUsageGroupConfig(m map[string]string) (UsageGroupConfig, error) {
130134
if len(m) > maxUsageGroups {
131135
return UsageGroupConfig{}, fmt.Errorf("maximum number of usage groups is %d, got %d", maxUsageGroups, len(m))
@@ -176,11 +180,19 @@ func matchesAll(matchers []*labels.Matcher, lbls phlaremodel.Labels) bool {
176180
}
177181

178182
for _, m := range matchers {
183+
matched := false
179184
for _, lbl := range lbls {
180-
if lbl.Name == m.Name && !m.Matches(lbl.Value) {
181-
return false
185+
if lbl.Name == m.Name {
186+
if !m.Matches(lbl.Value) {
187+
return false
188+
}
189+
matched = true
190+
break
182191
}
183192
}
193+
if !matched {
194+
return false
195+
}
184196
}
185197
return true
186198
}

pkg/validation/usage_groups_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,21 @@ func TestUsageGroupConfig_GetUsageGroups(t *testing.T) {
104104
tenantID: "tenant1",
105105
},
106106
},
107+
{
108+
Name: "disjoint_labels_do_not_match",
109+
TenantID: "tenant1",
110+
Config: UsageGroupConfig{
111+
config: map[string][]*labels.Matcher{
112+
"app/foo": testMustParseMatcher(t, `{namespace="foo", container="bar"}`),
113+
},
114+
},
115+
Labels: phlaremodel.Labels{
116+
{Name: "service_name", Value: "foo"},
117+
},
118+
Want: UsageGroupMatch{
119+
tenantID: "tenant1",
120+
},
121+
},
107122
}
108123

109124
for _, tt := range tests {

0 commit comments

Comments
 (0)