Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion service/sharddistributor/leader/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (e *elector) Run(ctx context.Context) <-chan bool {
defer close(leaderCh)
defer cancelRun() // Ensure child context is canceled on exit
defer func() {
e.logger.Info("Leader election process exiting")
if r := recover(); r != nil {
e.logger.Error("Panic in election process", tag.Value(r))
}
Expand All @@ -131,13 +132,16 @@ func (e *elector) Run(ctx context.Context) <-chan bool {

select {
case <-runCtx.Done():
e.logger.Info("Context canceled, stopping election loop, exit immediately", tag.Error(runCtx.Err()))
return // Context was canceled, exit immediately
case <-e.clock.After(e.cfg.FailedElectionCooldown):
e.logger.Info("Cooldown period ended, retrying election")
// Continue after cooldown
}
}
}
if runCtx.Err() != nil {
e.logger.Info("Context canceled, stopping election loop", tag.Error(runCtx.Err()))
break
}
}
Expand All @@ -148,18 +152,21 @@ func (e *elector) Run(ctx context.Context) <-chan bool {

// runElection runs a single election attempt
func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err error) {
e.logger.Info("Run election")
// Add random delay before campaigning to spread load across instances
delay := time.Duration(rand.Intn(int(e.cfg.MaxRandomDelay)))

e.logger.Debug("Adding random delay before campaigning", tag.ElectionDelay(delay))

select {
case <-e.clock.After(delay):
e.logger.Debug("Random delay before campaigning completed")
// Continue after delay
case <-ctx.Done():
return fmt.Errorf("context cancelled during pre-campaign delay: %w", ctx.Err())
}

e.logger.Debug("Creating election")
election, err := e.leaderStore.CreateElection(ctx, e.namespace.Name)
if err != nil {
return fmt.Errorf("create session: %w", err)
Expand All @@ -183,13 +190,15 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err er
leaderCh <- false
}()

e.logger.Debug("Starting campaign for leader")
// Campaign to become leader
if err := election.Campaign(ctx, e.hostname); err != nil {
return fmt.Errorf("failed to campaign: %w", err)
}

leaderProcess = e.processFactory.CreateProcessor(e.namespace, e.store, election)

e.logger.Debug("Run leader process")
err = leaderProcess.Run(ctx)
if err != nil {
return fmt.Errorf("onLeader: %w", err)
Expand Down Expand Up @@ -217,7 +226,6 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err er

case <-leaderTimer.Chan():
e.logger.Info("Leadership period ended, voluntarily resigning")

return errSelfResign
}
}
Expand Down
18 changes: 10 additions & 8 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,12 @@ func (p *namespaceProcessor) identifyStaleShardStats(namespaceState *store.Names

// rebalanceShards is the core logic for distributing shards among active executors.
func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) {
metricsLoopScope := p.metricsClient.Scope(metrics.ShardDistributorAssignLoopScope)
metricsLoopScope := p.metricsClient.Scope(
metrics.ShardDistributorAssignLoopScope,
metrics.NamespaceTag(p.namespaceCfg.Name),
metrics.NamespaceTypeTag(p.namespaceCfg.Type),
)

metricsLoopScope.AddCounter(metrics.ShardDistributorAssignLoopAttempts, 1)
defer func() {
if err != nil {
Expand Down Expand Up @@ -352,7 +357,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
}

if namespaceState.GlobalRevision <= p.lastAppliedRevision {
p.logger.Debug("No changes detected. Skipping rebalance.")
p.logger.Info("No changes detected. Skipping rebalance.")
return nil
}
p.lastAppliedRevision = namespaceState.GlobalRevision
Expand All @@ -365,7 +370,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo

activeExecutors := p.getActiveExecutors(namespaceState, staleExecutors)
if len(activeExecutors) == 0 {
p.logger.Warn("No active executors found. Cannot assign shards.")
p.logger.Info("No active executors found. Cannot assign shards.")
return nil
}
p.logger.Info("Active executors", tag.ShardExecutors(activeExecutors))
Expand All @@ -381,7 +386,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo

distributionChanged := len(deletedShards) > 0 || len(staleExecutors) > 0 || assignedToEmptyExecutors || updatedAssignments
if !distributionChanged {
p.logger.Debug("No changes to distribution detected. Skipping rebalance.")
p.logger.Info("No changes to distribution detected. Skipping rebalance.")
return nil
}

Expand All @@ -401,10 +406,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
for _, assignedState := range namespaceState.ShardAssignments {
totalActiveShards += len(assignedState.AssignedShards)
}
metricsLoopScope.Tagged(
metrics.NamespaceTag(p.namespaceCfg.Name),
metrics.NamespaceTypeTag(p.namespaceCfg.Type),
).UpdateGauge(metrics.ShardDistributorActiveShards, float64(totalActiveShards))
metricsLoopScope.UpdateGauge(metrics.ShardDistributorActiveShards, float64(totalActiveShards))

return nil
}
Expand Down
Loading