Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2576,10 +2576,10 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
d.cfg.IngestLimitsDryRunEnabled = test.ingestLimitsDryRunEnabled

mockClient := mockIngestLimitsFrontendClient{
t: t,
expectedRequest: test.expectedLimitsRequest,
response: test.limitsResponse,
responseErr: test.limitsResponseErr,
t: t,
expectedExceedsLimitsRequest: test.expectedLimitsRequest,
exceedsLimitsResponse: test.limitsResponse,
exceedsLimitsResponseErr: test.limitsResponseErr,
}
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
d.ingestLimits = l
Expand Down
78 changes: 62 additions & 16 deletions pkg/distributor/ingest_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
"github.com/grafana/loki/v3/pkg/limits/proto"
)

// ingestLimitsFrontendClient is used for tests.
// The ingestLimitsFrontendClient interface is used to mock calls in tests.
type ingestLimitsFrontendClient interface {
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
UpdateRates(context.Context, *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error)
}

// ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend
Expand All @@ -34,7 +35,7 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
}
}

// Implements the ingestLimitsFrontendClient interface.
// Implements the [ingestLimitsFrontendClient] interface.
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
var resp *proto.ExceedsLimitsResponse
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
Expand All @@ -45,6 +46,17 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req
return resp, err
}

// Implements the [ingestLimitsFrontendClient] interface.
func (c *ingestLimitsFrontendRingClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
var resp *proto.UpdateRatesResponse
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
var clientErr error
resp, clientErr = client.UpdateRates(ctx, req)
return clientErr
})
return resp, err
}

// withRandomShuffle gets all healthy frontends in the ring, randomly shuffles
// them, and then calls f.
func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error {
Expand Down Expand Up @@ -82,21 +94,21 @@ func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context,

type ingestLimits struct {
client ingestLimitsFrontendClient
requests prometheus.Counter
requestsFailed prometheus.Counter
requests *prometheus.CounterVec
requestsFailed *prometheus.CounterVec
}

func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer) *ingestLimits {
return &ingestLimits{
client: client,
requests: promauto.With(r).NewCounter(prometheus.CounterOpts{
requests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "loki_distributor_ingest_limits_requests_total",
Help: "The total number of requests.",
}),
requestsFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
}, []string{"operation"}),
requestsFailed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "loki_distributor_ingest_limits_requests_failed_total",
Help: "The total number of requests that failed.",
}),
}, []string{"operation"}),
}
}

Expand Down Expand Up @@ -144,20 +156,16 @@ func (l *ingestLimits) EnforceLimits(ctx context.Context, tenant string, streams
// an error if the client failed to send the request or receive a response
// from the server. Any streams that could not have their limits checked
// and returned in the results with the reason "ReasonFailed".
func (l *ingestLimits) ExceedsLimits(
ctx context.Context,
tenant string,
streams []KeyedStream,
) ([]*proto.ExceedsLimitsResult, error) {
l.requests.Inc()
func (l *ingestLimits) ExceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.ExceedsLimitsResult, error) {
l.requests.WithLabelValues("ExceedsLimits").Inc()
req, err := newExceedsLimitsRequest(tenant, streams)
if err != nil {
l.requestsFailed.Inc()
l.requestsFailed.WithLabelValues("ExceedsLimits").Inc()
return nil, err
}
resp, err := l.client.ExceedsLimits(ctx, req)
if err != nil {
l.requestsFailed.Inc()
l.requestsFailed.WithLabelValues("ExceedsLimits").Inc()
return nil, err
}
return resp.Results, nil
Expand All @@ -182,3 +190,41 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
Streams: streamMetadata,
}, nil
}

// UpdateRates updates the rates for the streams and returns a slice of the
// updated rates for all streams. Any streams that could not have rates updated
// have a rate of zero.
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) {
l.requests.WithLabelValues("UpdateRates").Inc()
req, err := newUpdateRatesRequest(tenant, streams)
if err != nil {
l.requestsFailed.WithLabelValues("UpdateRates").Inc()
return nil, err
}
resp, err := l.client.UpdateRates(ctx, req)
if err != nil {
l.requestsFailed.WithLabelValues("UpdateRates").Inc()
return nil, err
}
return resp.Results, nil
}

func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) {
// The distributor sends the hashes of all streams in the request to the
// limits-frontend. The limits-frontend is responsible for deciding if
// the request would exceed the tenants limits, and if so, which streams
// from the request caused it to exceed its limits.
streamMetadata := make([]*proto.StreamMetadata, 0, len(streams))
for _, stream := range streams {
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
StreamHash: stream.HashKeyNoShard,
TotalSize: entriesSize + structuredMetadataSize,
IngestionPolicy: stream.Policy,
})
}
return &proto.UpdateRatesRequest{
Tenant: tenant,
Streams: streamMetadata,
}, nil
}
121 changes: 103 additions & 18 deletions pkg/distributor/ingest_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,37 @@ import (

// mockIngestLimitsFrontendClient mocks the RPC calls for tests.
type mockIngestLimitsFrontendClient struct {
t *testing.T
calls atomic.Uint64
expectedRequest *proto.ExceedsLimitsRequest
response *proto.ExceedsLimitsResponse
responseErr error
t *testing.T
calls atomic.Uint64
expectedExceedsLimitsRequest *proto.ExceedsLimitsRequest
exceedsLimitsResponse *proto.ExceedsLimitsResponse
exceedsLimitsResponseErr error
expectedUpdateRatesRequest *proto.UpdateRatesRequest
updateRatesResponse *proto.UpdateRatesResponse
updateRatesResponseErr error
}

// Implements the ingestLimitsFrontendClient interface.
func (c *mockIngestLimitsFrontendClient) ExceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
c.calls.Add(1)
if c.expectedRequest != nil {
require.Equal(c.t, c.expectedRequest, r)
if c.expectedExceedsLimitsRequest != nil {
require.Equal(c.t, c.expectedExceedsLimitsRequest, r)
}
if c.responseErr != nil {
return nil, c.responseErr
if c.exceedsLimitsResponseErr != nil {
return nil, c.exceedsLimitsResponseErr
}
return c.response, nil
return c.exceedsLimitsResponse, nil
}

func (c *mockIngestLimitsFrontendClient) UpdateRates(_ context.Context, r *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
c.calls.Add(1)
if c.expectedUpdateRatesRequest != nil {
require.Equal(c.t, c.expectedUpdateRatesRequest, r)
}
if c.updateRatesResponseErr != nil {
return nil, c.updateRatesResponseErr
}
return c.updateRatesResponse, nil
}

func TestIngestLimits_EnforceLimits(t *testing.T) {
Expand Down Expand Up @@ -176,10 +190,10 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mockClient := mockIngestLimitsFrontendClient{
t: t,
expectedRequest: test.expectedRequest,
response: test.response,
responseErr: test.responseErr,
t: t,
expectedExceedsLimitsRequest: test.expectedRequest,
exceedsLimitsResponse: test.response,
exceedsLimitsResponseErr: test.responseErr,
}
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -264,10 +278,10 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mockClient := mockIngestLimitsFrontendClient{
t: t,
expectedRequest: test.expectedRequest,
response: test.response,
responseErr: test.responseErr,
t: t,
expectedExceedsLimitsRequest: test.expectedRequest,
exceedsLimitsResponse: test.response,
exceedsLimitsResponseErr: test.responseErr,
}
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -283,3 +297,74 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
})
}
}

func TestIngestLimits_UpdateRates(t *testing.T) {
tests := []struct {
name string
tenant string
streams []KeyedStream
expectedRequest *proto.UpdateRatesRequest
response *proto.UpdateRatesResponse
responseErr error
expectedResult []*proto.UpdateRatesResult
expectedErr string
}{{
name: "error should be returned if rates cannot be updated",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
}},
expectedRequest: &proto.UpdateRatesRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 1,
}},
},
responseErr: errors.New("failed to update rates"),
expectedErr: "failed to update rates",
}, {
name: "updates rates",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
}},
expectedRequest: &proto.UpdateRatesRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 1,
}},
},
response: &proto.UpdateRatesResponse{
Results: []*proto.UpdateRatesResult{{
StreamHash: 1,
Rate: 1024,
}},
},
expectedResult: []*proto.UpdateRatesResult{{
StreamHash: 1,
Rate: 1024,
}},
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mockClient := mockIngestLimitsFrontendClient{
t: t,
expectedUpdateRatesRequest: test.expectedRequest,
updateRatesResponse: test.response,
updateRatesResponseErr: test.responseErr,
}
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := l.UpdateRates(ctx, test.tenant, test.streams)
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
require.Nil(t, res)
} else {
require.Nil(t, err)
require.Equal(t, test.expectedResult, res)
}
})
}
}
Loading