diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3ebb201c36e18..4b800bdc60689 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2576,10 +2576,10 @@ func TestDistributor_PushIngestLimits(t *testing.T) { d.cfg.IngestLimitsDryRunEnabled = test.ingestLimitsDryRunEnabled mockClient := mockIngestLimitsFrontendClient{ - t: t, - expectedRequest: test.expectedLimitsRequest, - response: test.limitsResponse, - responseErr: test.limitsResponseErr, + t: t, + expectedExceedsLimitsRequest: test.expectedLimitsRequest, + exceedsLimitsResponse: test.limitsResponse, + exceedsLimitsResponseErr: test.limitsResponseErr, } l := newIngestLimits(&mockClient, prometheus.NewRegistry()) d.ingestLimits = l diff --git a/pkg/distributor/ingest_limits.go b/pkg/distributor/ingest_limits.go index f60aefa24b55a..4e6febcb91b3a 100644 --- a/pkg/distributor/ingest_limits.go +++ b/pkg/distributor/ingest_limits.go @@ -15,9 +15,10 @@ import ( "github.com/grafana/loki/v3/pkg/limits/proto" ) -// ingestLimitsFrontendClient is used for tests. +// The ingestLimitsFrontendClient interface is used to mock calls in tests. type ingestLimitsFrontendClient interface { ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) + UpdateRates(context.Context, *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) } // ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend @@ -34,7 +35,7 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo } } -// Implements the ingestLimitsFrontendClient interface. +// Implements the [ingestLimitsFrontendClient] interface. func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) { var resp *proto.ExceedsLimitsResponse err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error { @@ -45,6 +46,17 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req return resp, err } +// Implements the [ingestLimitsFrontendClient] interface. +func (c *ingestLimitsFrontendRingClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) { + var resp *proto.UpdateRatesResponse + err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error { + var clientErr error + resp, clientErr = client.UpdateRates(ctx, req) + return clientErr + }) + return resp, err +} + // withRandomShuffle gets all healthy frontends in the ring, randomly shuffles // them, and then calls f. func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error { @@ -82,21 +94,21 @@ func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, type ingestLimits struct { client ingestLimitsFrontendClient - requests prometheus.Counter - requestsFailed prometheus.Counter + requests *prometheus.CounterVec + requestsFailed *prometheus.CounterVec } func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer) *ingestLimits { return &ingestLimits{ client: client, - requests: promauto.With(r).NewCounter(prometheus.CounterOpts{ + requests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "loki_distributor_ingest_limits_requests_total", Help: "The total number of requests.", - }), - requestsFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{ + }, []string{"operation"}), + requestsFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "loki_distributor_ingest_limits_requests_failed_total", Help: "The total number of requests that failed.", - }), + }, []string{"operation"}), } } @@ -144,20 +156,16 @@ func (l *ingestLimits) EnforceLimits(ctx context.Context, tenant string, streams // an error if the client failed to send the request or receive a response // from the server. Any streams that could not have their limits checked // and returned in the results with the reason "ReasonFailed". -func (l *ingestLimits) ExceedsLimits( - ctx context.Context, - tenant string, - streams []KeyedStream, -) ([]*proto.ExceedsLimitsResult, error) { - l.requests.Inc() +func (l *ingestLimits) ExceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.ExceedsLimitsResult, error) { + l.requests.WithLabelValues("ExceedsLimits").Inc() req, err := newExceedsLimitsRequest(tenant, streams) if err != nil { - l.requestsFailed.Inc() + l.requestsFailed.WithLabelValues("ExceedsLimits").Inc() return nil, err } resp, err := l.client.ExceedsLimits(ctx, req) if err != nil { - l.requestsFailed.Inc() + l.requestsFailed.WithLabelValues("ExceedsLimits").Inc() return nil, err } return resp.Results, nil @@ -182,3 +190,41 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee Streams: streamMetadata, }, nil } + +// UpdateRates updates the rates for the streams and returns a slice of the +// updated rates for all streams. Any streams that could not have rates updated +// have a rate of zero. +func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) { + l.requests.WithLabelValues("UpdateRates").Inc() + req, err := newUpdateRatesRequest(tenant, streams) + if err != nil { + l.requestsFailed.WithLabelValues("UpdateRates").Inc() + return nil, err + } + resp, err := l.client.UpdateRates(ctx, req) + if err != nil { + l.requestsFailed.WithLabelValues("UpdateRates").Inc() + return nil, err + } + return resp.Results, nil +} + +func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) { + // The distributor sends the hashes of all streams in the request to the + // limits-frontend. The limits-frontend is responsible for deciding if + // the request would exceed the tenants limits, and if so, which streams + // from the request caused it to exceed its limits. + streamMetadata := make([]*proto.StreamMetadata, 0, len(streams)) + for _, stream := range streams { + entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream) + streamMetadata = append(streamMetadata, &proto.StreamMetadata{ + StreamHash: stream.HashKeyNoShard, + TotalSize: entriesSize + structuredMetadataSize, + IngestionPolicy: stream.Policy, + }) + } + return &proto.UpdateRatesRequest{ + Tenant: tenant, + Streams: streamMetadata, + }, nil +} diff --git a/pkg/distributor/ingest_limits_test.go b/pkg/distributor/ingest_limits_test.go index a39afb4771189..9786d3ce82cd0 100644 --- a/pkg/distributor/ingest_limits_test.go +++ b/pkg/distributor/ingest_limits_test.go @@ -18,23 +18,37 @@ import ( // mockIngestLimitsFrontendClient mocks the RPC calls for tests. type mockIngestLimitsFrontendClient struct { - t *testing.T - calls atomic.Uint64 - expectedRequest *proto.ExceedsLimitsRequest - response *proto.ExceedsLimitsResponse - responseErr error + t *testing.T + calls atomic.Uint64 + expectedExceedsLimitsRequest *proto.ExceedsLimitsRequest + exceedsLimitsResponse *proto.ExceedsLimitsResponse + exceedsLimitsResponseErr error + expectedUpdateRatesRequest *proto.UpdateRatesRequest + updateRatesResponse *proto.UpdateRatesResponse + updateRatesResponseErr error } // Implements the ingestLimitsFrontendClient interface. func (c *mockIngestLimitsFrontendClient) ExceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) { c.calls.Add(1) - if c.expectedRequest != nil { - require.Equal(c.t, c.expectedRequest, r) + if c.expectedExceedsLimitsRequest != nil { + require.Equal(c.t, c.expectedExceedsLimitsRequest, r) } - if c.responseErr != nil { - return nil, c.responseErr + if c.exceedsLimitsResponseErr != nil { + return nil, c.exceedsLimitsResponseErr } - return c.response, nil + return c.exceedsLimitsResponse, nil +} + +func (c *mockIngestLimitsFrontendClient) UpdateRates(_ context.Context, r *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) { + c.calls.Add(1) + if c.expectedUpdateRatesRequest != nil { + require.Equal(c.t, c.expectedUpdateRatesRequest, r) + } + if c.updateRatesResponseErr != nil { + return nil, c.updateRatesResponseErr + } + return c.updateRatesResponse, nil } func TestIngestLimits_EnforceLimits(t *testing.T) { @@ -176,10 +190,10 @@ func TestIngestLimits_EnforceLimits(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { mockClient := mockIngestLimitsFrontendClient{ - t: t, - expectedRequest: test.expectedRequest, - response: test.response, - responseErr: test.responseErr, + t: t, + expectedExceedsLimitsRequest: test.expectedRequest, + exceedsLimitsResponse: test.response, + exceedsLimitsResponseErr: test.responseErr, } l := newIngestLimits(&mockClient, prometheus.NewRegistry()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -264,10 +278,10 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { mockClient := mockIngestLimitsFrontendClient{ - t: t, - expectedRequest: test.expectedRequest, - response: test.response, - responseErr: test.responseErr, + t: t, + expectedExceedsLimitsRequest: test.expectedRequest, + exceedsLimitsResponse: test.response, + exceedsLimitsResponseErr: test.responseErr, } l := newIngestLimits(&mockClient, prometheus.NewRegistry()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -283,3 +297,74 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { }) } } + +func TestIngestLimits_UpdateRates(t *testing.T) { + tests := []struct { + name string + tenant string + streams []KeyedStream + expectedRequest *proto.UpdateRatesRequest + response *proto.UpdateRatesResponse + responseErr error + expectedResult []*proto.UpdateRatesResult + expectedErr string + }{{ + name: "error should be returned if rates cannot be updated", + tenant: "test", + streams: []KeyedStream{{ + HashKeyNoShard: 1, + }}, + expectedRequest: &proto.UpdateRatesRequest{ + Tenant: "test", + Streams: []*proto.StreamMetadata{{ + StreamHash: 1, + }}, + }, + responseErr: errors.New("failed to update rates"), + expectedErr: "failed to update rates", + }, { + name: "updates rates", + tenant: "test", + streams: []KeyedStream{{ + HashKeyNoShard: 1, + }}, + expectedRequest: &proto.UpdateRatesRequest{ + Tenant: "test", + Streams: []*proto.StreamMetadata{{ + StreamHash: 1, + }}, + }, + response: &proto.UpdateRatesResponse{ + Results: []*proto.UpdateRatesResult{{ + StreamHash: 1, + Rate: 1024, + }}, + }, + expectedResult: []*proto.UpdateRatesResult{{ + StreamHash: 1, + Rate: 1024, + }}, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mockClient := mockIngestLimitsFrontendClient{ + t: t, + expectedUpdateRatesRequest: test.expectedRequest, + updateRatesResponse: test.response, + updateRatesResponseErr: test.responseErr, + } + l := newIngestLimits(&mockClient, prometheus.NewRegistry()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + res, err := l.UpdateRates(ctx, test.tenant, test.streams) + if test.expectedErr != "" { + require.EqualError(t, err, test.expectedErr) + require.Nil(t, res) + } else { + require.Nil(t, err) + require.Equal(t, test.expectedResult, res) + } + }) + } +}