Skip to content

Commit 7aa0d0e

Browse files
feat: add support for UpdateRates RPC to distributors
1 parent bd99fa5 commit 7aa0d0e

File tree

3 files changed

+169
-38
lines changed

3 files changed

+169
-38
lines changed

pkg/distributor/distributor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2576,10 +2576,10 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
25762576
d.cfg.IngestLimitsDryRunEnabled = test.ingestLimitsDryRunEnabled
25772577

25782578
mockClient := mockIngestLimitsFrontendClient{
2579-
t: t,
2580-
expectedRequest: test.expectedLimitsRequest,
2581-
response: test.limitsResponse,
2582-
responseErr: test.limitsResponseErr,
2579+
t: t,
2580+
expectedExceedsLimitsRequest: test.expectedLimitsRequest,
2581+
exceedsLimitsResponse: test.limitsResponse,
2582+
exceedsLimitsResponseErr: test.limitsResponseErr,
25832583
}
25842584
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
25852585
d.ingestLimits = l

pkg/distributor/ingest_limits.go

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ import (
1515
"github.com/grafana/loki/v3/pkg/limits/proto"
1616
)
1717

18-
// ingestLimitsFrontendClient is used for tests.
18+
// The ingestLimitsFrontendClient interface is used to mock calls in tests.
1919
type ingestLimitsFrontendClient interface {
2020
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
21+
UpdateRates(context.Context, *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error)
2122
}
2223

2324
// ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend
@@ -34,7 +35,7 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
3435
}
3536
}
3637

37-
// Implements the ingestLimitsFrontendClient interface.
38+
// Implements the [ingestLimitsFrontendClient] interface.
3839
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
3940
var resp *proto.ExceedsLimitsResponse
4041
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
@@ -45,6 +46,17 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req
4546
return resp, err
4647
}
4748

49+
// Implements the [ingestLimitsFrontendClient] interface.
50+
func (c *ingestLimitsFrontendRingClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
51+
var resp *proto.UpdateRatesResponse
52+
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
53+
var clientErr error
54+
resp, clientErr = client.UpdateRates(ctx, req)
55+
return clientErr
56+
})
57+
return resp, err
58+
}
59+
4860
// withRandomShuffle gets all healthy frontends in the ring, randomly shuffles
4961
// them, and then calls f.
5062
func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error {
@@ -82,21 +94,21 @@ func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context,
8294

8395
type ingestLimits struct {
8496
client ingestLimitsFrontendClient
85-
requests prometheus.Counter
86-
requestsFailed prometheus.Counter
97+
requests *prometheus.CounterVec
98+
requestsFailed *prometheus.CounterVec
8799
}
88100

89101
func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer) *ingestLimits {
90102
return &ingestLimits{
91103
client: client,
92-
requests: promauto.With(r).NewCounter(prometheus.CounterOpts{
104+
requests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
93105
Name: "loki_distributor_ingest_limits_requests_total",
94106
Help: "The total number of requests.",
95-
}),
96-
requestsFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
107+
}, []string{"operation"}),
108+
requestsFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
97109
Name: "loki_distributor_ingest_limits_requests_failed_total",
98110
Help: "The total number of requests that failed.",
99-
}),
111+
}, []string{"operation"}),
100112
}
101113
}
102114

@@ -144,20 +156,16 @@ func (l *ingestLimits) EnforceLimits(ctx context.Context, tenant string, streams
144156
// an error if the client failed to send the request or receive a response
145157
// from the server. Any streams that could not have their limits checked
146158
// and returned in the results with the reason "ReasonFailed".
147-
func (l *ingestLimits) ExceedsLimits(
148-
ctx context.Context,
149-
tenant string,
150-
streams []KeyedStream,
151-
) ([]*proto.ExceedsLimitsResult, error) {
152-
l.requests.Inc()
159+
func (l *ingestLimits) ExceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.ExceedsLimitsResult, error) {
160+
l.requests.WithLabelValues("ExceedsLimits").Inc()
153161
req, err := newExceedsLimitsRequest(tenant, streams)
154162
if err != nil {
155-
l.requestsFailed.Inc()
163+
l.requestsFailed.WithLabelValues("ExceedsLimits").Inc()
156164
return nil, err
157165
}
158166
resp, err := l.client.ExceedsLimits(ctx, req)
159167
if err != nil {
160-
l.requestsFailed.Inc()
168+
l.requestsFailed.WithLabelValues("ExceedsLimits").Inc()
161169
return nil, err
162170
}
163171
return resp.Results, nil
@@ -182,3 +190,41 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
182190
Streams: streamMetadata,
183191
}, nil
184192
}
193+
194+
// UpdateRates updates the rates for the streams and returns a slice of the
195+
// updated rates for all streams. Any streams that could not have rates updated
196+
// have a rate of zero.
197+
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) {
198+
l.requests.WithLabelValues("UpdateRates").Inc()
199+
req, err := newUpdateRatesRequest(tenant, streams)
200+
if err != nil {
201+
l.requestsFailed.WithLabelValues("UpdateRates").Inc()
202+
return nil, err
203+
}
204+
resp, err := l.client.UpdateRates(ctx, req)
205+
if err != nil {
206+
l.requestsFailed.WithLabelValues("UpdateRates").Inc()
207+
return nil, err
208+
}
209+
return resp.Results, nil
210+
}
211+
212+
func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) {
213+
// The distributor sends the hashes of all streams in the request to the
214+
// limits-frontend. The limits-frontend is responsible for deciding if
215+
// the request would exceed the tenants limits, and if so, which streams
216+
// from the request caused it to exceed its limits.
217+
streamMetadata := make([]*proto.StreamMetadata, 0, len(streams))
218+
for _, stream := range streams {
219+
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
220+
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
221+
StreamHash: stream.HashKeyNoShard,
222+
TotalSize: entriesSize + structuredMetadataSize,
223+
IngestionPolicy: stream.Policy,
224+
})
225+
}
226+
return &proto.UpdateRatesRequest{
227+
Tenant: tenant,
228+
Streams: streamMetadata,
229+
}, nil
230+
}

pkg/distributor/ingest_limits_test.go

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,37 @@ import (
1818

1919
// mockIngestLimitsFrontendClient mocks the RPC calls for tests.
2020
type mockIngestLimitsFrontendClient struct {
21-
t *testing.T
22-
calls atomic.Uint64
23-
expectedRequest *proto.ExceedsLimitsRequest
24-
response *proto.ExceedsLimitsResponse
25-
responseErr error
21+
t *testing.T
22+
calls atomic.Uint64
23+
expectedExceedsLimitsRequest *proto.ExceedsLimitsRequest
24+
exceedsLimitsResponse *proto.ExceedsLimitsResponse
25+
exceedsLimitsResponseErr error
26+
expectedUpdateRatesRequest *proto.UpdateRatesRequest
27+
updateRatesResponse *proto.UpdateRatesResponse
28+
updateRatesResponseErr error
2629
}
2730

2831
// Implements the ingestLimitsFrontendClient interface.
2932
func (c *mockIngestLimitsFrontendClient) ExceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
3033
c.calls.Add(1)
31-
if c.expectedRequest != nil {
32-
require.Equal(c.t, c.expectedRequest, r)
34+
if c.expectedExceedsLimitsRequest != nil {
35+
require.Equal(c.t, c.expectedExceedsLimitsRequest, r)
3336
}
34-
if c.responseErr != nil {
35-
return nil, c.responseErr
37+
if c.exceedsLimitsResponseErr != nil {
38+
return nil, c.exceedsLimitsResponseErr
3639
}
37-
return c.response, nil
40+
return c.exceedsLimitsResponse, nil
41+
}
42+
43+
func (c *mockIngestLimitsFrontendClient) UpdateRates(_ context.Context, r *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
44+
c.calls.Add(1)
45+
if c.expectedUpdateRatesRequest != nil {
46+
require.Equal(c.t, c.expectedUpdateRatesRequest, r)
47+
}
48+
if c.updateRatesResponseErr != nil {
49+
return nil, c.updateRatesResponseErr
50+
}
51+
return c.updateRatesResponse, nil
3852
}
3953

4054
func TestIngestLimits_EnforceLimits(t *testing.T) {
@@ -176,10 +190,10 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
176190
for _, test := range tests {
177191
t.Run(test.name, func(t *testing.T) {
178192
mockClient := mockIngestLimitsFrontendClient{
179-
t: t,
180-
expectedRequest: test.expectedRequest,
181-
response: test.response,
182-
responseErr: test.responseErr,
193+
t: t,
194+
expectedExceedsLimitsRequest: test.expectedRequest,
195+
exceedsLimitsResponse: test.response,
196+
exceedsLimitsResponseErr: test.responseErr,
183197
}
184198
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
185199
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -264,10 +278,10 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
264278
for _, test := range tests {
265279
t.Run(test.name, func(t *testing.T) {
266280
mockClient := mockIngestLimitsFrontendClient{
267-
t: t,
268-
expectedRequest: test.expectedRequest,
269-
response: test.response,
270-
responseErr: test.responseErr,
281+
t: t,
282+
expectedExceedsLimitsRequest: test.expectedRequest,
283+
exceedsLimitsResponse: test.response,
284+
exceedsLimitsResponseErr: test.responseErr,
271285
}
272286
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
273287
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -283,3 +297,74 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
283297
})
284298
}
285299
}
300+
301+
func TestIngestLimits_UpdateRates(t *testing.T) {
302+
tests := []struct {
303+
name string
304+
tenant string
305+
streams []KeyedStream
306+
expectedRequest *proto.UpdateRatesRequest
307+
response *proto.UpdateRatesResponse
308+
responseErr error
309+
expectedResult []*proto.UpdateRatesResult
310+
expectedErr string
311+
}{{
312+
name: "error should be returned if rates cannot be updated",
313+
tenant: "test",
314+
streams: []KeyedStream{{
315+
HashKeyNoShard: 1,
316+
}},
317+
expectedRequest: &proto.UpdateRatesRequest{
318+
Tenant: "test",
319+
Streams: []*proto.StreamMetadata{{
320+
StreamHash: 1,
321+
}},
322+
},
323+
responseErr: errors.New("failed to update rates"),
324+
expectedErr: "failed to update rates",
325+
}, {
326+
name: "updates rates",
327+
tenant: "test",
328+
streams: []KeyedStream{{
329+
HashKeyNoShard: 1,
330+
}},
331+
expectedRequest: &proto.UpdateRatesRequest{
332+
Tenant: "test",
333+
Streams: []*proto.StreamMetadata{{
334+
StreamHash: 1,
335+
}},
336+
},
337+
response: &proto.UpdateRatesResponse{
338+
Results: []*proto.UpdateRatesResult{{
339+
StreamHash: 1,
340+
Rate: 1024,
341+
}},
342+
},
343+
expectedResult: []*proto.UpdateRatesResult{{
344+
StreamHash: 1,
345+
Rate: 1024,
346+
}},
347+
}}
348+
349+
for _, test := range tests {
350+
t.Run(test.name, func(t *testing.T) {
351+
mockClient := mockIngestLimitsFrontendClient{
352+
t: t,
353+
expectedUpdateRatesRequest: test.expectedRequest,
354+
updateRatesResponse: test.response,
355+
updateRatesResponseErr: test.responseErr,
356+
}
357+
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
358+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
359+
defer cancel()
360+
res, err := l.UpdateRates(ctx, test.tenant, test.streams)
361+
if test.expectedErr != "" {
362+
require.EqualError(t, err, test.expectedErr)
363+
require.Nil(t, res)
364+
} else {
365+
require.Nil(t, err)
366+
require.Equal(t, test.expectedResult, res)
367+
}
368+
})
369+
}
370+
}

0 commit comments

Comments
 (0)