Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ type Limits interface {
IngestionRelabelingRules(tenantID string) []*relabel.Config
SampleTypeRelabelingRules(tenantID string) []*relabel.Config
DistributorUsageGroups(tenantID string) *validation.UsageGroupConfig
WritePathOverrides(tenantID string) writepath.Config
validation.ProfileValidationLimits
aggregator.Limits
writepath.Overrides
}

func New(
Expand Down Expand Up @@ -188,11 +188,7 @@ func New(

ingesterRoute := writepath.IngesterFunc(d.sendRequestsToIngester)
segmentWriterRoute := writepath.IngesterFunc(d.sendRequestsToSegmentWriter)
d.router = writepath.NewRouter(
logger, reg, limits,
ingesterRoute,
segmentWriterRoute,
)
d.router = writepath.NewRouter(logger, reg, ingesterRoute, segmentWriterRoute)

var err error
subservices := []services.Service(nil)
Expand Down Expand Up @@ -560,7 +556,8 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
// functions to send the request to the appropriate service; these are
// called independently, and may be called concurrently: the request is
// cloned in this case – the callee may modify the request safely.
return d.router.Send(ctx, req)
config := d.limits.WritePathOverrides(req.TenantID)
return d.router.Send(ctx, req, config)
}

func noNewProfilesReceivedError() *connect.Error {
Expand Down Expand Up @@ -635,7 +632,8 @@ func (d *Distributor) aggregate(ctx context.Context, req *distributormodel.Profi
Profile: pprof.RawFromProto(p.Profile()),
Annotations: annotations,
}
return d.router.Send(localCtx, aggregated)
config := d.limits.WritePathOverrides(req.TenantID)
return d.router.Send(localCtx, aggregated, config)
})()
if sendErr != nil {
_ = level.Error(d.logger).Log("msg", "failed to handle aggregation", "tenant", req.TenantID, "err", err)
Expand Down
44 changes: 29 additions & 15 deletions pkg/distributor/writepath/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/pyroscope/pkg/tenant"
"github.com/grafana/pyroscope/pkg/util"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
"github.com/grafana/pyroscope/pkg/util/delayhandler"
httputil "github.com/grafana/pyroscope/pkg/util/http"
)

Expand All @@ -44,17 +45,12 @@ func (f IngesterFunc) Push(
return f(ctx, req)
}

type Overrides interface {
WritePathOverrides(tenantID string) Config
}

type Router struct {
service services.Service
inflight sync.WaitGroup

logger log.Logger
overrides Overrides
metrics *metrics
logger log.Logger
metrics *metrics

ingester IngesterClient
segwriter IngesterClient
Expand All @@ -63,13 +59,11 @@ type Router struct {
func NewRouter(
logger log.Logger,
registerer prometheus.Registerer,
overrides Overrides,
ingester IngesterClient,
segwriter IngesterClient,
) *Router {
r := &Router{
logger: logger,
overrides: overrides,
metrics: newMetrics(registerer),
ingester: ingester,
segwriter: segwriter,
Expand All @@ -93,17 +87,19 @@ func (m *Router) running(ctx context.Context) error {
return nil
}

func (m *Router) Send(ctx context.Context, req *distributormodel.ProfileSeries) error {
func (m *Router) Send(ctx context.Context, req *distributormodel.ProfileSeries, config Config) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "Router.Send")
defer sp.Finish()
config := m.overrides.WritePathOverrides(req.TenantID)
if config.AsyncIngest {
delayhandler.CancelDelay(ctx)
}
switch config.WritePath {
case SegmentWriterPath:
return m.send(m.segwriterRoute(true))(ctx, req)
return m.sendToSegmentWriterOnly(ctx, req, &config)
case CombinedPath:
return m.sendToBoth(ctx, req, &config)
default:
return m.send(m.ingesterRoute())(ctx, req)
return m.sendToIngesterOnly(ctx, req)
}
}

Expand Down Expand Up @@ -163,7 +159,7 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.ProfileSe
// The request is to be sent to both asynchronously, therefore we're
// cloning it. We do not wait for the secondary request to complete.
// On shutdown, however, we will wait for all inflight requests.
segwriter.client = m.sendClone(ctx, req.Clone(), segwriter.client, config)
segwriter.client = m.detachedClient(ctx, req.Clone(), segwriter.client, config)
}

if segwriter != nil {
Expand All @@ -182,6 +178,22 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.ProfileSe
return nil
}

func (m *Router) sendToSegmentWriterOnly(ctx context.Context, req *distributormodel.ProfileSeries, config *Config) error {
r := m.segwriterRoute(true)
if !config.AsyncIngest {
return m.send(r)(ctx, req)
}
r.client = m.detachedClient(ctx, req, r.client, config)
m.sendAsync(ctx, req, r)
return nil
}

func (m *Router) sendToIngesterOnly(ctx context.Context, req *distributormodel.ProfileSeries) error {
// NOTE(kolesnikovae): If we also want to support async requests to ingesters,
// we should implement it here and in sendToBoth.
return m.send(m.ingesterRoute())(ctx, req)
}

type sendFunc func(context.Context, *distributormodel.ProfileSeries) error

type route struct {
Expand All @@ -190,7 +202,9 @@ type route struct {
primary bool
}

func (m *Router) sendClone(ctx context.Context, req *distributormodel.ProfileSeries, client IngesterClient, config *Config) IngesterFunc {
// detachedClient creates a new IngesterFunc that wraps the call with a local context
// that has a timeout and tenant ID injected so it can be used for asynchronous requests.
func (m *Router) detachedClient(ctx context.Context, req *distributormodel.ProfileSeries, client IngesterClient, config *Config) IngesterFunc {
return func(context.Context, *distributormodel.ProfileSeries) (*connect.Response[pushv1.PushResponse], error) {
localCtx, cancel := context.WithTimeout(context.Background(), config.SegmentWriterTimeout)
localCtx = tenant.InjectTenantID(localCtx, req.TenantID)
Expand Down
Loading