Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions pkg/segmentwriter/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,21 @@ const (
)

type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
SegmentDuration time.Duration `yaml:"segment_duration,omitempty" category:"advanced"`
FlushConcurrency uint `yaml:"flush_concurrency,omitempty" category:"advanced"`
UploadTimeout time.Duration `yaml:"upload-timeout,omitempty" category:"advanced"`
UploadMaxRetries int `yaml:"upload-retry_max_retries,omitempty" category:"advanced"`
UploadMinBackoff time.Duration `yaml:"upload-retry_min_period,omitempty" category:"advanced"`
UploadMaxBackoff time.Duration `yaml:"upload-retry_max_period,omitempty" category:"advanced"`
UploadHedgeAfter time.Duration `yaml:"upload-hedge_upload_after,omitempty" category:"advanced"`
UploadHedgeRateMax float64 `yaml:"upload-hedge_rate_max,omitempty" category:"advanced"`
UploadHedgeRateBurst uint `yaml:"upload-hedge_rate_burst,omitempty" category:"advanced"`
MetadataDLQEnabled bool `yaml:"metadata_dlq_enabled,omitempty" category:"advanced"`
MetadataUpdateTimeout time.Duration `yaml:"metadata_update_timeout,omitempty" category:"advanced"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
SegmentDuration time.Duration `yaml:"segment_duration,omitempty" category:"advanced"`
FlushConcurrency uint `yaml:"flush_concurrency,omitempty" category:"advanced"`
UploadTimeout time.Duration `yaml:"upload-timeout,omitempty" category:"advanced"`
UploadMaxRetries int `yaml:"upload-retry_max_retries,omitempty" category:"advanced"`
UploadMinBackoff time.Duration `yaml:"upload-retry_min_period,omitempty" category:"advanced"`
UploadMaxBackoff time.Duration `yaml:"upload-retry_max_period,omitempty" category:"advanced"`
UploadHedgeAfter time.Duration `yaml:"upload-hedge_upload_after,omitempty" category:"advanced"`
UploadHedgeRateMax float64 `yaml:"upload-hedge_rate_max,omitempty" category:"advanced"`
UploadHedgeRateBurst uint `yaml:"upload-hedge_rate_burst,omitempty" category:"advanced"`
MetadataDLQEnabled bool `yaml:"metadata_dlq_enabled,omitempty" category:"advanced"`
MetadataUpdateTimeout time.Duration `yaml:"metadata_update_timeout,omitempty" category:"advanced"`
BucketHealthCheckEnabled bool `yaml:"bucket_health_check_enabled,omitempty" category:"advanced"`
BucketHealthCheckTimeout time.Duration `yaml:"bucket_health_check_timeout,omitempty" category:"advanced"`
}

func (cfg *Config) Validate() error {
Expand All @@ -79,6 +81,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.UintVar(&cfg.UploadHedgeRateBurst, prefix+".upload-hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
f.BoolVar(&cfg.MetadataDLQEnabled, prefix+".metadata-dlq-enabled", true, "Enables dead letter queue (DLQ) for metadata. If the metadata update fails, it will be stored and updated asynchronously.")
f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", 2*time.Second, "Timeout for metadata update requests.")
f.BoolVar(&cfg.BucketHealthCheckEnabled, prefix+".bucket-health-check-enabled", true, "Enables bucket health check on startup. This both validates credentials and warms up the connection to reduce latency for the first write.")
f.DurationVar(&cfg.BucketHealthCheckTimeout, prefix+".bucket-health-check-timeout", 10*time.Second, "Timeout for bucket health check operations.")
}

type Limits interface {
Expand Down Expand Up @@ -155,7 +159,42 @@ func New(
return i, nil
}

// performBucketHealthCheck performs a lightweight bucket operation to warm up the connection
// and detect any object storage issues early. This serves the dual purpose of validating
// bucket accessibility and reducing latency for the first actual write operation.
func (i *SegmentWriterService) performBucketHealthCheck(ctx context.Context) error {
if !i.config.BucketHealthCheckEnabled {
return nil
}

level.Debug(i.logger).Log("msg", "starting bucket health check", "timeout", i.config.BucketHealthCheckTimeout.String())

healthCheckCtx, cancel := context.WithTimeout(ctx, i.config.BucketHealthCheckTimeout)
defer cancel()

err := i.storageBucket.Iter(healthCheckCtx, "", func(string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably quicker to do an Exists()/Get()/Attribures() and expect a not found.

What I am also unsure about if the bucket name is wrong, do we get a "not found" or another error?

Copy link
Contributor Author

@oleg-kozlyuk-grafana oleg-kozlyuk-grafana Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea here is to try and not depend on implementation specifics and whether the bucket is empty. To my best understanding, bucket not being present should emit a not found error indeed, while any other scenario would result in successful request.

Speaking frankly, I am also not sure if the error is same, but this approach feels the most robust out of alternatives I've considered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reasoning make sense, let's keep it like that.

In case of the bucket list being very long/expensive to gather, we could consider adding a prefix that likely would never exist, then it will always be expecting an empty reply.

Also for the functioning of the segment writer it is critical that we have the write permission, so another alternative, could be testing if an Upload is allowed.

// We only care about connectivity, not the actual contents
// Return an error to stop iteration after first item (if any)
return errors.New("stop iteration")
})

// Ignore the "stop iteration" error we intentionally return
// and any "object not found" type errors as they indicate the bucket is accessible
if err == nil || i.storageBucket.IsObjNotFoundErr(err) || err.Error() == "stop iteration" {
level.Debug(i.logger).Log("msg", "bucket health check succeeded")
return nil
}

level.Warn(i.logger).Log("msg", "bucket health check failed", "err", err)
return nil // Don't fail startup, just warn
}

func (i *SegmentWriterService) starting(ctx context.Context) error {
// Perform bucket health check before ring registration to warm up the connection
// and avoid slow first requests affecting p99 latency
// On error, will emit a warning but continue startup
_ = i.performBucketHealthCheck(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the bigger question is how would we know that something is wrong, if we mark ourselves as ready/healthy anyhow. Maybe the right thing here is to exit with an error code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to take a cautious approach here to make sure I don't create an incident accidentally because I didn't consider something. As discussed over call, I'll add a TODO comment and an issue into board, so we can change it into a fatal error once we're sure this doesn't break PROD.


if err := services.StartManagerAndAwaitHealthy(ctx, i.subservices); err != nil {
return err
}
Expand Down
Loading