Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4316,6 +4316,16 @@ discover_generic_fields:
# CLI flag: -querier.query-timeout
[query_timeout: <duration> | default = 1m]

# Maximum bytes per second for bucket GetObject operations during a query. 0
# means unlimited. Also expressible in human readable forms (1MB, 256KB, etc).
# CLI flag: -querier.query-bucket-get-object-rate-limit
[query_bucket_get_object_rate_limit: <int> | default = 0B]

# Maximum burst bytes for bucket GetObject operations during a query. 0 means
# unlimited. Also expressible in human readable forms (1MB, 256KB, etc).
# CLI flag: -querier.query-bucket-get-object-rate-limit-burst
[query_bucket_get_object_rate_limit_burst: <int> | default = 0B]

# Split queries by a time interval and execute in parallel. The value 0 disables
# splitting by time. This also determines how cache keys are chosen when result
# caching is enabled.
Expand Down
21 changes: 21 additions & 0 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
"github.com/grafana/loki/v3/pkg/querier/pattern"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/storage/bucket"
index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/tracing"
"github.com/grafana/loki/v3/pkg/util/constants"
Expand Down Expand Up @@ -92,6 +93,16 @@ func (q *QuerierAPI) RangeQueryHandler(ctx context.Context, req *queryrange.Loki
return result, err
}

// Add bandwidth limit to context for this query if configured
tenantID, err := tenant.TenantID(ctx)
if err == nil {
rateLimit := q.limits.QueryBucketGetObjectRateLimit(ctx, tenantID)
if rateLimit > 0 {
burstLimit := q.limits.QueryBucketGetObjectRateLimitBurst(ctx, tenantID)
ctx = bucket.WithQueryBandwidthLimit(ctx, rateLimit, burstLimit)
}
}

if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(q.cfg, params.Start(), params.End()) {
query := q.engineV2.Query(params)
result, err = query.Exec(ctx)
Expand Down Expand Up @@ -133,6 +144,16 @@ func (q *QuerierAPI) InstantQueryHandler(ctx context.Context, req *queryrange.Lo
return logqlmodel.Result{}, err
}

// Add bandwidth limit to context for this query if configured
tenantID, err := tenant.TenantID(ctx)
if err == nil {
rateLimit := q.limits.QueryBucketGetObjectRateLimit(ctx, tenantID)
if rateLimit > 0 {
burstLimit := q.limits.QueryBucketGetObjectRateLimitBurst(ctx, tenantID)
ctx = bucket.WithQueryBandwidthLimit(ctx, rateLimit, burstLimit)
}
}

if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(q.cfg, params.Start(), params.End()) {
query := q.engineV2.Query(params)
result, err := query.Exec(ctx)
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/limits/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ type Limits interface {
MaxStreamsMatchersPerQuery(context.Context, string) int
MaxConcurrentTailRequests(context.Context, string) int
MaxEntriesLimitPerQuery(context.Context, string) int
QueryBucketGetObjectRateLimit(context.Context, string) int64
QueryBucketGetObjectRateLimitBurst(context.Context, string) int64
}
8 changes: 8 additions & 0 deletions pkg/querier/testutil/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,11 @@ func (m *MockLimits) DebugEngineStreams(_ string) bool {
func (m *MockLimits) DebugEngineTasks(_ string) bool {
return false
}

func (m *MockLimits) QueryBucketGetObjectRateLimit(_ context.Context, _ string) int64 {
return 0 // 0 means unlimited
}

func (m *MockLimits) QueryBucketGetObjectRateLimitBurst(_ context.Context, _ string) int64 {
return 0 // 0 means unlimited
}
17 changes: 16 additions & 1 deletion pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,26 @@ func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) (
level.Warn(o.logger).Log("msg", "failed to get size of object", "err", err)
}

// Wrap with rate-limited reader if query has a bandwidth limit
if limiter := getQueryRateLimiter(ctx); limiter != nil {
reader = newRateLimitedReader(ctx, reader, o.logger)
}

return reader, size, err
}

func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
return o.hedgedBucket.GetRange(ctx, objectKey, offset, length)
reader, err := o.hedgedBucket.GetRange(ctx, objectKey, offset, length)
if err != nil {
return nil, err
}

// Wrap with rate-limited reader if query has a bandwidth limit
if limiter := getQueryRateLimiter(ctx); limiter != nil {
reader = newRateLimitedReader(ctx, reader, o.logger)
}

return reader, nil
}

// List objects with given prefix.
Expand Down
177 changes: 177 additions & 0 deletions pkg/storage/bucket/rate_limited_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package bucket

import (
"context"
"fmt"
"io"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"golang.org/x/time/rate"

util_log "github.com/grafana/loki/v3/pkg/util/log"
)

// minReadSize is the minimum chunk size for reading data.
// This ensures we read in reasonable-sized batches rather than very small ones.
// For typical 1-2MB objects (most of our chunks), 64KB provides a good balance between efficiency
// and rate limiting responsiveness.
// E.g. for 2MB object, 64KB read size is 32 reads, which is reasonable.
const minReadSize = 64 * 1024 // 64KB

type rateLimiterKey struct{}

// queryRateLimiter holds a rate limiter for a query.
type queryRateLimiter struct {
limiter *rate.Limiter
}

// WithQueryBandwidthLimit adds a bandwidth limit to the context for this query.
// All GetObject calls within this query will share the same rate limiter.
// bytesPerSecond is the maximum bytes per second for this query.
// burstBytes is the maximum burst bytes allowed.
// If bytesPerSecond is 0 or negative, rate limiting is disabled.
func WithQueryBandwidthLimit(ctx context.Context, bytesPerSecond int64, burstBytes int64) context.Context {
if bytesPerSecond <= 0 {
return ctx
}

// Set burst to rate if not specified or invalid
burst := int(bytesPerSecond)
if burstBytes > 0 {
burst = int(burstBytes)
}

// Create a limiter with the specified rate and burst
limiter := rate.NewLimiter(rate.Limit(bytesPerSecond), burst)

return context.WithValue(ctx, rateLimiterKey{}, &queryRateLimiter{
limiter: limiter,
})
}

// getQueryRateLimiter extracts the rate limiter from context.
// Returns nil if no rate limiter is configured.
func getQueryRateLimiter(ctx context.Context) *rate.Limiter {
rl, ok := ctx.Value(rateLimiterKey{}).(*queryRateLimiter)
if !ok || rl == nil {
return nil
}
return rl.limiter
}

// rateLimitedReader wraps an io.ReadCloser and limits the read rate using a shared limiter.
type rateLimitedReader struct {
io.ReadCloser
limiter *rate.Limiter
ctx context.Context
logger log.Logger
}

func newRateLimitedReader(ctx context.Context, readCloser io.ReadCloser, logger log.Logger) *rateLimitedReader {
return &rateLimitedReader{
ReadCloser: readCloser,
limiter: getQueryRateLimiter(ctx),
ctx: ctx,
logger: logger,
}
}

// Read reads data from the underlying reader while respecting the rate limit.
// It reads in batches that don't exceed the burst size, waiting for rate limiter
// approval before each read to ensure we don't read ahead of the rate limit.
func (r *rateLimitedReader) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}

burst := r.limiter.Burst()
if burst <= 0 {
// This should never happen with limiters created by WithQueryBandwidthLimit
// but handle it defensively - if burst is invalid, we can't rate limit
return r.ReadCloser.Read(p)
}

// Cap the read size to the minimum read size and the burst
minReadSize := min(minReadSize, burst)
totalRead := 0

// Other logging stats
var (
rateLimitedCount int
totalWaitTime time.Duration
maxWaitTime time.Duration
)

// Defer logging to ensure it happens on all exit paths
defer func() {
if rateLimitedCount > 0 && r.logger != nil {
logger := util_log.WithContext(r.ctx, r.logger)
level.Debug(logger).Log(
"msg", "query rate limited during bucket read operation",
"rateLimitedCount", rateLimitedCount,
"totalWaitTime", totalWaitTime.String(),
"maxWaitTime", maxWaitTime.String(),
"readBufferSize", humanize.Bytes(uint64(len(p))),
"readBytes", humanize.Bytes(uint64(totalRead)),
"remainingBytes", humanize.Bytes(uint64(len(p)-totalRead)),
"err", err,
)
}
}()

for totalRead < len(p) {
remaining := len(p) - totalRead
// Use minReadSize but cap to the remaining
readSize := min(minReadSize, remaining)

// Reserve rate limiter tokens for this batch read
reservation := r.limiter.ReserveN(time.Now(), readSize)
if !reservation.OK() {
// Reservation failed (e.g., readSize > burst), return error
// This should not happen in practice since we cap readSize to burst
if totalRead > 0 {
return totalRead, nil
}
return 0, fmt.Errorf("rate limited reader: reservation failed. readSize (%d) > burst: (%d)?", readSize, burst)
}

// If we need to wait, record the logging stats and wait for the delay
if delay := reservation.Delay(); delay > 0 {
rateLimitedCount++
totalWaitTime += delay
maxWaitTime = max(maxWaitTime, delay)

timer := time.NewTimer(delay)
select {
case <-timer.C:
// Delay completed, proceed
case <-r.ctx.Done():
timer.Stop()
reservation.Cancel()
if totalRead > 0 {
return totalRead, nil
}
return 0, r.ctx.Err()
}
}

// Read from underlying reader (up to the approved read size)
batch := p[totalRead : totalRead+readSize]
read, err := r.ReadCloser.Read(batch)
totalRead += read

if err != nil {
return totalRead, err
}

// If we read less than requested, we've hit EOF or the reader is done
if read < readSize {
return totalRead, err
}
}

return totalRead, nil
}
Loading
Loading