diff --git a/service/sharddistributor/leader/election/election.go b/service/sharddistributor/leader/election/election.go index 8324c4c2672..97eceb544e7 100644 --- a/service/sharddistributor/leader/election/election.go +++ b/service/sharddistributor/leader/election/election.go @@ -112,6 +112,7 @@ func (e *elector) Run(ctx context.Context) <-chan bool { defer close(leaderCh) defer cancelRun() // Ensure child context is canceled on exit defer func() { + e.logger.Info("Leader election process exiting") if r := recover(); r != nil { e.logger.Error("Panic in election process", tag.Value(r)) } @@ -131,13 +132,16 @@ func (e *elector) Run(ctx context.Context) <-chan bool { select { case <-runCtx.Done(): + e.logger.Info("Context canceled, stopping election loop, exit immediately", tag.Error(runCtx.Err())) return // Context was canceled, exit immediately case <-e.clock.After(e.cfg.FailedElectionCooldown): + e.logger.Info("Cooldown period ended, retrying election") // Continue after cooldown } } } if runCtx.Err() != nil { + e.logger.Info("Context canceled, stopping election loop", tag.Error(runCtx.Err())) break } } @@ -148,6 +152,7 @@ func (e *elector) Run(ctx context.Context) <-chan bool { // runElection runs a single election attempt func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err error) { + e.logger.Info("Run election") // Add random delay before campaigning to spread load across instances delay := time.Duration(rand.Intn(int(e.cfg.MaxRandomDelay))) @@ -155,11 +160,13 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err er select { case <-e.clock.After(delay): + e.logger.Debug("Random delay before campaigning completed") // Continue after delay case <-ctx.Done(): return fmt.Errorf("context cancelled during pre-campaign delay: %w", ctx.Err()) } + e.logger.Debug("Creating election") election, err := e.leaderStore.CreateElection(ctx, e.namespace.Name) if err != nil { return fmt.Errorf("create session: %w", err) @@ -183,6 +190,7 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err er leaderCh <- false }() + e.logger.Debug("Starting campaign for leader") // Campaign to become leader if err := election.Campaign(ctx, e.hostname); err != nil { return fmt.Errorf("failed to campaign: %w", err) @@ -190,6 +198,7 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err er leaderProcess = e.processFactory.CreateProcessor(e.namespace, e.store, election) + e.logger.Debug("Run leader process") err = leaderProcess.Run(ctx) if err != nil { return fmt.Errorf("onLeader: %w", err) @@ -217,7 +226,6 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err er case <-leaderTimer.Chan(): e.logger.Info("Leadership period ended, voluntarily resigning") - return errSelfResign } } diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 8e177e50119..41ec954f3f4 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -324,7 +324,12 @@ func (p *namespaceProcessor) identifyStaleShardStats(namespaceState *store.Names // rebalanceShards is the core logic for distributing shards among active executors. func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) { - metricsLoopScope := p.metricsClient.Scope(metrics.ShardDistributorAssignLoopScope) + metricsLoopScope := p.metricsClient.Scope( + metrics.ShardDistributorAssignLoopScope, + metrics.NamespaceTag(p.namespaceCfg.Name), + metrics.NamespaceTypeTag(p.namespaceCfg.Type), + ) + metricsLoopScope.AddCounter(metrics.ShardDistributorAssignLoopAttempts, 1) defer func() { if err != nil { @@ -352,7 +357,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo } if namespaceState.GlobalRevision <= p.lastAppliedRevision { - p.logger.Debug("No changes detected. Skipping rebalance.") + p.logger.Info("No changes detected. Skipping rebalance.") return nil } p.lastAppliedRevision = namespaceState.GlobalRevision @@ -365,7 +370,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo activeExecutors := p.getActiveExecutors(namespaceState, staleExecutors) if len(activeExecutors) == 0 { - p.logger.Warn("No active executors found. Cannot assign shards.") + p.logger.Info("No active executors found. Cannot assign shards.") return nil } p.logger.Info("Active executors", tag.ShardExecutors(activeExecutors)) @@ -381,7 +386,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo distributionChanged := len(deletedShards) > 0 || len(staleExecutors) > 0 || assignedToEmptyExecutors || updatedAssignments if !distributionChanged { - p.logger.Debug("No changes to distribution detected. Skipping rebalance.") + p.logger.Info("No changes to distribution detected. Skipping rebalance.") return nil } @@ -401,10 +406,7 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo for _, assignedState := range namespaceState.ShardAssignments { totalActiveShards += len(assignedState.AssignedShards) } - metricsLoopScope.Tagged( - metrics.NamespaceTag(p.namespaceCfg.Name), - metrics.NamespaceTypeTag(p.namespaceCfg.Type), - ).UpdateGauge(metrics.ShardDistributorActiveShards, float64(totalActiveShards)) + metricsLoopScope.UpdateGauge(metrics.ShardDistributorActiveShards, float64(totalActiveShards)) return nil }