diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d828e2ffb0..b084b9e5d0 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -321,7 +321,12 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push for _, series := range req.Series { profName := phlaremodel.Labels(series.Labels).Get(ProfileName) + groups := usageGroups.GetUsageGroups(tenantID, series.Labels) + if err := d.checkUsageGroupsIngestLimit(tenantID, groups.Names(), req); err != nil { + return nil, err + } + profLanguage := d.GetProfileLanguage(series) for _, raw := range series.Samples { @@ -765,6 +770,30 @@ func (d *Distributor) checkIngestLimit(tenantID string, req *distributormodel.Pu return nil } +func (d *Distributor) checkUsageGroupsIngestLimit(tenantID string, groupsInRequest []string, req *distributormodel.PushRequest) error { + l := d.limits.IngestionLimit(tenantID) + if l == nil || len(l.UsageGroups) == 0 { + return nil + } + + for _, group := range groupsInRequest { + limit, ok := l.UsageGroups[group] + if !ok || !limit.LimitReached { + continue + } + if d.ingestionLimitsSampler.AllowRequest(tenantID, l.Sampling) { + return nil + } + limitResetTime := time.Unix(l.LimitResetTime, 0).UTC().Format(time.RFC3339) + validation.DiscardedProfiles.WithLabelValues(string(validation.IngestLimitReached), tenantID).Add(float64(req.TotalProfiles)) + validation.DiscardedBytes.WithLabelValues(string(validation.IngestLimitReached), tenantID).Add(float64(req.TotalBytesUncompressed)) + return connect.NewError(connect.CodeResourceExhausted, + fmt.Errorf("limit of %s/%s reached for usage group %s, next reset at %s", humanize.IBytes(uint64(limit.PeriodLimitMb*1024*1024)), l.PeriodType, group, limitResetTime)) + } + + return nil +} + type profileTracker struct { profile *distributormodel.ProfileSeries minSuccess int diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 1de122e347..9304439f33 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -322,6 +322,51 @@ func Test_Limits(t *testing.T) { expectedCode: connect.CodeResourceExhausted, expectedValidationReason: validation.IngestLimitReached, }, + { + description: "ingest_limit_reached_for_usage_group", + pushReq: &pushv1.PushRequest{ + Series: []*pushv1.RawProfileSeries{ + { + Labels: []*typesv1.LabelPair{ + {Name: "__name__", Value: "cpu"}, + {Name: phlaremodel.LabelNameServiceName, Value: "svc"}, + }, + Samples: []*pushv1.RawSample{ + { + RawProfile: collectTestProfileBytes(t), + }, + }, + }, + }, + }, + overrides: validation.MockOverrides(func(defaults *validation.Limits, tenantLimits map[string]*validation.Limits) { + l := validation.MockDefaultLimits() + l.IngestionLimit = &ingest_limits.Config{ + PeriodType: "hour", + PeriodLimitMb: 128, + LimitResetTime: 1737721086, + LimitReached: false, + Sampling: ingest_limits.SamplingConfig{ + NumRequests: 0, + Period: time.Minute, + }, + UsageGroups: map[string]ingest_limits.UsageGroup{ + "group-1": { + PeriodLimitMb: 64, + LimitReached: true, + }, + }, + } + usageGroupCfg, err := validation.NewUsageGroupConfig(map[string]string{ + "group-1": "{service_name=\"svc\"}", + }) + require.NoError(t, err) + l.DistributorUsageGroups = &usageGroupCfg + tenantLimits["user-1"] = l + }), + expectedCode: connect.CodeResourceExhausted, + expectedValidationReason: validation.IngestLimitReached, + }, } for _, tc := range testCases { diff --git a/pkg/distributor/ingest_limits/config.go b/pkg/distributor/ingest_limits/config.go index 33e042d80f..171f28b408 100644 --- a/pkg/distributor/ingest_limits/config.go +++ b/pkg/distributor/ingest_limits/config.go @@ -13,6 +13,8 @@ type Config struct { LimitReached bool `yaml:"limit_reached" json:"limit_reached"` // Sampling controls the sampling parameters when the limit is reached. Sampling SamplingConfig `yaml:"sampling" json:"sampling"` + // UsageGroups controls ingestion for pre-configured usage groups. + UsageGroups map[string]UsageGroup `yaml:"usage_groups" json:"usage_groups"` } // SamplingConfig describes the params of a simple probabilistic sampling mechanism. @@ -23,3 +25,8 @@ type SamplingConfig struct { NumRequests int `yaml:"num_requests" json:"num_requests"` Period time.Duration `yaml:"period" json:"period"` } + +type UsageGroup struct { + PeriodLimitMb int `yaml:"period_limit_mb" json:"period_limit_mb"` + LimitReached bool `yaml:"limit_reached" json:"limit_reached"` +} diff --git a/pkg/validation/usage_groups.go b/pkg/validation/usage_groups.go index 5ef79e3420..5af2bf6e2b 100644 --- a/pkg/validation/usage_groups.go +++ b/pkg/validation/usage_groups.go @@ -126,6 +126,10 @@ func (m UsageGroupMatch) CountDiscardedBytes(reason string, n int64) { } } +func (m UsageGroupMatch) Names() []string { + return m.names +} + func NewUsageGroupConfig(m map[string]string) (UsageGroupConfig, error) { if len(m) > maxUsageGroups { return UsageGroupConfig{}, fmt.Errorf("maximum number of usage groups is %d, got %d", maxUsageGroups, len(m)) @@ -176,11 +180,19 @@ func matchesAll(matchers []*labels.Matcher, lbls phlaremodel.Labels) bool { } for _, m := range matchers { + matched := false for _, lbl := range lbls { - if lbl.Name == m.Name && !m.Matches(lbl.Value) { - return false + if lbl.Name == m.Name { + if !m.Matches(lbl.Value) { + return false + } + matched = true + break } } + if !matched { + return false + } } return true } diff --git a/pkg/validation/usage_groups_test.go b/pkg/validation/usage_groups_test.go index 2400fef0b2..baf0dbe90d 100644 --- a/pkg/validation/usage_groups_test.go +++ b/pkg/validation/usage_groups_test.go @@ -104,6 +104,21 @@ func TestUsageGroupConfig_GetUsageGroups(t *testing.T) { tenantID: "tenant1", }, }, + { + Name: "disjoint_labels_do_not_match", + TenantID: "tenant1", + Config: UsageGroupConfig{ + config: map[string][]*labels.Matcher{ + "app/foo": testMustParseMatcher(t, `{namespace="foo", container="bar"}`), + }, + }, + Labels: phlaremodel.Labels{ + {Name: "service_name", Value: "foo"}, + }, + Want: UsageGroupMatch{ + tenantID: "tenant1", + }, + }, } for _, tt := range tests {