Skip to content

Commit 4212667

Browse files
feat: add support for UpdateRates RPC to distributors
1 parent bd99fa5 commit 4212667

File tree

1 file changed

+61
-15
lines changed

1 file changed

+61
-15
lines changed

pkg/distributor/ingest_limits.go

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ import (
1515
"github.com/grafana/loki/v3/pkg/limits/proto"
1616
)
1717

18-
// ingestLimitsFrontendClient is used for tests.
18+
// The ingestLimitsFrontendClient interface is used to mock calls in tests.
1919
type ingestLimitsFrontendClient interface {
2020
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
21+
UpdateRates(context.Context, *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error)
2122
}
2223

2324
// ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend
@@ -34,7 +35,7 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
3435
}
3536
}
3637

37-
// Implements the ingestLimitsFrontendClient interface.
38+
// Implements the [ingestLimitsFrontendClient] interface.
3839
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
3940
var resp *proto.ExceedsLimitsResponse
4041
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
@@ -45,6 +46,17 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req
4546
return resp, err
4647
}
4748

49+
// Implements the [ingestLimitsFrontendClient] interface.
50+
func (c *ingestLimitsFrontendRingClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
51+
var resp *proto.UpdateRatesResponse
52+
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
53+
var clientErr error
54+
resp, clientErr = client.UpdateRates(ctx, req)
55+
return clientErr
56+
})
57+
return resp, err
58+
}
59+
4860
// withRandomShuffle gets all healthy frontends in the ring, randomly shuffles
4961
// them, and then calls f.
5062
func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error {
@@ -82,21 +94,21 @@ func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context,
8294

8395
type ingestLimits struct {
8496
client ingestLimitsFrontendClient
85-
requests prometheus.Counter
86-
requestsFailed prometheus.Counter
97+
requests *prometheus.CounterVec
98+
requestsFailed *prometheus.CounterVec
8799
}
88100

89101
func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer) *ingestLimits {
90102
return &ingestLimits{
91103
client: client,
92-
requests: promauto.With(r).NewCounter(prometheus.CounterOpts{
104+
requests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
93105
Name: "loki_distributor_ingest_limits_requests_total",
94106
Help: "The total number of requests.",
95-
}),
96-
requestsFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
107+
}, []string{"operation"}),
108+
requestsFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
97109
Name: "loki_distributor_ingest_limits_requests_failed_total",
98110
Help: "The total number of requests that failed.",
99-
}),
111+
}, []string{"operation"}),
100112
}
101113
}
102114

@@ -144,20 +156,16 @@ func (l *ingestLimits) EnforceLimits(ctx context.Context, tenant string, streams
144156
// an error if the client failed to send the request or receive a response
145157
// from the server. Any streams that could not have their limits checked
146158
// and returned in the results with the reason "ReasonFailed".
147-
func (l *ingestLimits) ExceedsLimits(
148-
ctx context.Context,
149-
tenant string,
150-
streams []KeyedStream,
151-
) ([]*proto.ExceedsLimitsResult, error) {
152-
l.requests.Inc()
159+
func (l *ingestLimits) ExceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.ExceedsLimitsResult, error) {
160+
l.requests.WithLabelValues("ExceedsLimits").Inc()
153161
req, err := newExceedsLimitsRequest(tenant, streams)
154162
if err != nil {
155163
l.requestsFailed.Inc()
156164
return nil, err
157165
}
158166
resp, err := l.client.ExceedsLimits(ctx, req)
159167
if err != nil {
160-
l.requestsFailed.Inc()
168+
l.requestsFailed.WithLabelValues("ExceedsLimits").Inc()
161169
return nil, err
162170
}
163171
return resp.Results, nil
@@ -182,3 +190,41 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
182190
Streams: streamMetadata,
183191
}, nil
184192
}
193+
194+
// UpdateRates updates the rates for the streams and returns a slice of the
195+
// updated rates for all streams. Any streams that could not have rates updated
196+
// have a rate of zero.
197+
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) {
198+
l.requests.WithLabelValues("UpdateRates").Inc()
199+
req, err := newUpdateRatesRequest(tenant, streams)
200+
if err != nil {
201+
l.requestsFailed.Inc()
202+
return nil, err
203+
}
204+
resp, err := l.client.UpdateRates(ctx, req)
205+
if err != nil {
206+
l.requestsFailed.WithLabelValues("UpdateRates").Inc()
207+
return nil, err
208+
}
209+
return resp.Results, nil
210+
}
211+
212+
func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) {
213+
// The distributor sends the hashes of all streams in the request to the
214+
// limits-frontend. The limits-frontend is responsible for deciding if
215+
// the request would exceed the tenants limits, and if so, which streams
216+
// from the request caused it to exceed its limits.
217+
streamMetadata := make([]*proto.StreamMetadata, 0, len(streams))
218+
for _, stream := range streams {
219+
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
220+
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
221+
StreamHash: stream.HashKeyNoShard,
222+
TotalSize: entriesSize + structuredMetadataSize,
223+
IngestionPolicy: stream.Policy,
224+
})
225+
}
226+
return &proto.UpdateRatesRequest{
227+
Tenant: tenant,
228+
Streams: streamMetadata,
229+
}, nil
230+
}

0 commit comments

Comments
 (0)