Skip to content

Commit 1da67cf

Browse files
committed
fix(shard distributor): batch shard deletes and fix cleanup loop condition
Signed-off-by: Andreas Holt <[email protected]>
1 parent c6959bf commit 1da67cf

File tree

3 files changed

+59
-3
lines changed

3 files changed

+59
-3
lines changed

service/sharddistributor/leader/process/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
237237
continue
238238
}
239239
staleShardStats := p.identifyStaleShardStats(namespaceState)
240-
if len(staleShardStats) > 0 {
240+
if len(staleShardStats) == 0 {
241241
// No stale shard stats to delete
242242
continue
243243
}

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ var (
2929
_executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE)
3030
)
3131

32+
const deleteShardStatsBatchSize = 64
33+
3234
type executorStoreImpl struct {
3335
client *clientv3.Client
3436
prefix string
@@ -625,11 +627,29 @@ func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace strin
625627
return nil
626628
}
627629

630+
// DeleteShardStats deletes shard statistics in batches to avoid hitting etcd transaction limits (128 ops).
631+
// If any batch fails (e.g. due to leadership loss), the operation returns immediately.
632+
// Partial deletions are acceptable as the periodic cleanup loop will retry remaining keys.
628633
func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error {
629634
if len(shardIDs) == 0 {
630635
return nil
631636
}
632-
var ops []clientv3.Op
637+
638+
for start := 0; start < len(shardIDs); start += deleteShardStatsBatchSize {
639+
end := start + deleteShardStatsBatchSize
640+
if end > len(shardIDs) {
641+
end = len(shardIDs)
642+
}
643+
644+
if err := s.deleteShardStatsBatch(ctx, namespace, shardIDs[start:end], guard); err != nil {
645+
return err
646+
}
647+
}
648+
return nil
649+
}
650+
651+
func (s *executorStoreImpl) deleteShardStatsBatch(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error {
652+
ops := make([]clientv3.Op, 0, len(shardIDs))
633653
for _, shardID := range shardIDs {
634654
shardStatsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey)
635655
if err != nil {
@@ -640,10 +660,10 @@ func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace stri
640660

641661
nativeTxn := s.client.Txn(ctx)
642662
guardedTxn, err := guard(nativeTxn)
643-
644663
if err != nil {
645664
return fmt.Errorf("apply transaction guard: %w", err)
646665
}
666+
647667
etcdGuardedTxn, ok := guardedTxn.(clientv3.Txn)
648668
if !ok {
649669
return fmt.Errorf("guard function returned invalid transaction type")

service/sharddistributor/store/etcd/executorstore/etcdstore_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,42 @@ func TestGetShardStatisticsForMissingShard(t *testing.T) {
579579
assert.NotContains(t, st.ShardStats, "unknown")
580580
}
581581

582+
// TestDeleteShardStatsDeletesLargeBatches verifies that shard statistics are correctly deleted in batches.
583+
func TestDeleteShardStatsDeletesLargeBatches(t *testing.T) {
584+
tc := testhelper.SetupStoreTestCluster(t)
585+
executorStore := createStore(t, tc)
586+
587+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
588+
defer cancel()
589+
590+
totalShardStats := deleteShardStatsBatchSize*2 + 7 // two batches + 7 extra (remainder)
591+
shardIDs := make([]string, 0, totalShardStats)
592+
593+
// Create stale stats
594+
for i := 0; i < totalShardStats; i++ {
595+
shardID := "stale-stats-" + strconv.Itoa(i)
596+
shardIDs = append(shardIDs, shardID)
597+
598+
statsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey)
599+
require.NoError(t, err)
600+
stats := store.ShardStatistics{
601+
SmoothedLoad: float64(i),
602+
LastUpdateTime: int64(i),
603+
LastMoveTime: int64(i),
604+
}
605+
payload, err := json.Marshal(stats)
606+
require.NoError(t, err)
607+
_, err = tc.Client.Put(ctx, statsKey, string(payload))
608+
require.NoError(t, err)
609+
}
610+
611+
require.NoError(t, executorStore.DeleteShardStats(ctx, tc.Namespace, shardIDs, store.NopGuard()))
612+
613+
nsState, err := executorStore.GetState(ctx, tc.Namespace)
614+
require.NoError(t, err)
615+
assert.Empty(t, nsState.ShardStats)
616+
}
617+
582618
// --- Test Setup ---
583619

584620
func stringStatus(s types.ExecutorStatus) string {

0 commit comments

Comments
 (0)