diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 901e6822ef7..50348b5e0d2 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -237,7 +237,7 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { continue } staleShardStats := p.identifyStaleShardStats(namespaceState) - if len(staleShardStats) > 0 { + if len(staleShardStats) == 0 { // No stale shard stats to delete continue } diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index 9bd2b465d61..213a707bd9b 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -29,6 +29,8 @@ var ( _executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE) ) +const deleteShardStatsBatchSize = 64 + type executorStoreImpl struct { client *clientv3.Client prefix string @@ -625,11 +627,29 @@ func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace strin return nil } +// DeleteShardStats deletes shard statistics in batches to avoid hitting etcd transaction limits (128 ops). +// If any batch fails (e.g. due to leadership loss), the operation returns immediately. +// Partial deletions are acceptable as the periodic cleanup loop will retry remaining keys. func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error { if len(shardIDs) == 0 { return nil } - var ops []clientv3.Op + + for start := 0; start < len(shardIDs); start += deleteShardStatsBatchSize { + end := start + deleteShardStatsBatchSize + if end > len(shardIDs) { + end = len(shardIDs) + } + + if err := s.deleteShardStatsBatch(ctx, namespace, shardIDs[start:end], guard); err != nil { + return err + } + } + return nil +} + +func (s *executorStoreImpl) deleteShardStatsBatch(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error { + ops := make([]clientv3.Op, 0, len(shardIDs)) for _, shardID := range shardIDs { shardStatsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) if err != nil { @@ -640,10 +660,10 @@ func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace stri nativeTxn := s.client.Txn(ctx) guardedTxn, err := guard(nativeTxn) - if err != nil { return fmt.Errorf("apply transaction guard: %w", err) } + etcdGuardedTxn, ok := guardedTxn.(clientv3.Txn) if !ok { return fmt.Errorf("guard function returned invalid transaction type") diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go index a47565768bd..53478238a51 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -579,6 +579,42 @@ func TestGetShardStatisticsForMissingShard(t *testing.T) { assert.NotContains(t, st.ShardStats, "unknown") } +// TestDeleteShardStatsDeletesLargeBatches verifies that shard statistics are correctly deleted in batches. +func TestDeleteShardStatsDeletesLargeBatches(t *testing.T) { + tc := testhelper.SetupStoreTestCluster(t) + executorStore := createStore(t, tc) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + totalShardStats := deleteShardStatsBatchSize*2 + 7 // two batches + 7 extra (remainder) + shardIDs := make([]string, 0, totalShardStats) + + // Create stale stats + for i := 0; i < totalShardStats; i++ { + shardID := "stale-stats-" + strconv.Itoa(i) + shardIDs = append(shardIDs, shardID) + + statsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey) + require.NoError(t, err) + stats := store.ShardStatistics{ + SmoothedLoad: float64(i), + LastUpdateTime: int64(i), + LastMoveTime: int64(i), + } + payload, err := json.Marshal(stats) + require.NoError(t, err) + _, err = tc.Client.Put(ctx, statsKey, string(payload)) + require.NoError(t, err) + } + + require.NoError(t, executorStore.DeleteShardStats(ctx, tc.Namespace, shardIDs, store.NopGuard())) + + nsState, err := executorStore.GetState(ctx, tc.Namespace) + require.NoError(t, err) + assert.Empty(t, nsState.ShardStats) +} + // --- Test Setup --- func stringStatus(s types.ExecutorStatus) string {