Skip to content

Commit dd27ab1

Browse files
committed
feat(shard distributor): decouple shard stats write-throttling decision from heartbeat TTL
Signed-off-by: Andreas Holt <[email protected]>
1 parent 05e0d1d commit dd27ab1

File tree

7 files changed

+31
-15
lines changed

7 files changed

+31
-15
lines changed

common/config/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -658,8 +658,9 @@ type (
658658
}
659659

660660
LeaderProcess struct {
661-
Period time.Duration `yaml:"period"`
662-
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
661+
Period time.Duration `yaml:"period"`
662+
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
663+
ShardStatsTTL time.Duration `yaml:"shardStatsTTL"`
663664
}
664665
)
665666

config/development.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,4 @@ shardDistribution:
186186
process:
187187
period: 1s
188188
heartbeatTTL: 2s
189+
shardStatsTTL: 60s

service/sharddistributor/config/config.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ type (
7979
}
8080

8181
LeaderProcess struct {
82-
Period time.Duration `yaml:"period"`
83-
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
82+
Period time.Duration `yaml:"period"`
83+
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
84+
ShardStatsTTL time.Duration `yaml:"shardStatsTTL"`
8485
}
8586
)
8687

@@ -97,6 +98,10 @@ const (
9798
MigrationModeONBOARDED = "onboarded"
9899
)
99100

101+
const (
102+
DefaultShardStatsTTL = time.Minute
103+
)
104+
100105
// ConfigMode maps string migration mode values to types.MigrationMode
101106
var ConfigMode = map[string]types.MigrationMode{
102107
MigrationModeINVALID: types.MigrationModeINVALID,

service/sharddistributor/leader/process/processor.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ func NewProcessorFactory(
8181
if cfg.Process.HeartbeatTTL == 0 {
8282
cfg.Process.HeartbeatTTL = _defaultHearbeatTTL
8383
}
84+
if cfg.Process.ShardStatsTTL == 0 {
85+
cfg.Process.ShardStatsTTL = config.DefaultShardStatsTTL
86+
}
8487

8588
return &processorFactory{
8689
logger: logger,
@@ -277,7 +280,7 @@ func (p *namespaceProcessor) cleanupStaleShardStats(ctx context.Context, namespa
277280

278281
activeShards := make(map[string]struct{})
279282
now := p.timeSource.Now().Unix()
280-
shardStatsTTL := int64(p.cfg.HeartbeatTTL.Seconds())
283+
shardStatsTTL := int64(p.cfg.ShardStatsTTL.Seconds())
281284

282285
// 1. build set of active executors
283286

service/sharddistributor/leader/process/processor_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ func setupProcessorTest(t *testing.T, namespaceType string) *testDependencies {
4444
mockedClock,
4545
config.ShardDistribution{
4646
Process: config.LeaderProcess{
47-
Period: time.Second,
48-
HeartbeatTTL: time.Second,
47+
Period: time.Second,
48+
HeartbeatTTL: time.Second,
49+
ShardStatsTTL: 10 * time.Second,
4950
},
5051
},
5152
),
@@ -215,7 +216,7 @@ func TestCleanupStaleShardStats(t *testing.T) {
215216
shardStats := map[string]store.ShardStatistics{
216217
"shard-1": {SmoothedLoad: 1.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
217218
"shard-2": {SmoothedLoad: 2.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
218-
"shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Add(-2 * time.Second).Unix(), LastMoveTime: now.Add(-2 * time.Second).Unix()},
219+
"shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Add(-11 * time.Second).Unix(), LastMoveTime: now.Add(-11 * time.Second).Unix()},
219220
}
220221

221222
namespaceState := &store.NamespaceState{

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,18 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
9696
timeSource = clock.NewRealTimeSource()
9797
}
9898

99+
shardStatsTTL := p.Cfg.Process.ShardStatsTTL
100+
if shardStatsTTL <= 0 {
101+
shardStatsTTL = config.DefaultShardStatsTTL
102+
}
103+
99104
store := &executorStoreImpl{
100105
client: etcdClient,
101106
prefix: etcdCfg.Prefix,
102107
logger: p.Logger,
103108
shardCache: shardCache,
104109
timeSource: timeSource,
105-
maxStatsPersistIntervalSeconds: deriveStatsPersistInterval(p.Cfg.Process.HeartbeatTTL),
110+
maxStatsPersistIntervalSeconds: deriveStatsPersistInterval(shardStatsTTL),
106111
}
107112

108113
p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
@@ -168,13 +173,12 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
168173
return nil
169174
}
170175

171-
func deriveStatsPersistInterval(heartbeatTTL time.Duration) int64 {
172-
ttlSeconds := int64(heartbeatTTL.Seconds())
173-
interval := ttlSeconds - 1
174-
if interval < 1 {
175-
interval = 1
176+
func deriveStatsPersistInterval(shardStatsTTL time.Duration) int64 {
177+
ttlSeconds := int64(shardStatsTTL.Seconds())
178+
if ttlSeconds <= 1 {
179+
return 1
176180
}
177-
return interval
181+
return ttlSeconds - 1
178182
}
179183

180184
func (s *executorStoreImpl) recordShardStatistics(ctx context.Context, namespace, executorID string, reported map[string]*types.ShardStatusReport) {

service/sharddistributor/store/etcd/executorstore/etcdstore_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ func TestRecordHeartbeatSkipsShardStatisticsWithNilReport(t *testing.T) {
182182
func TestRecordHeartbeatShardStatisticsThrottlesWrites(t *testing.T) {
183183
tc := testhelper.SetupStoreTestCluster(t)
184184
tc.LeaderCfg.Process.HeartbeatTTL = 10 * time.Second
185+
tc.LeaderCfg.Process.ShardStatsTTL = 10 * time.Second
185186
mockTS := clock.NewMockedTimeSourceAt(time.Unix(1000, 0))
186187
executorStore := createStoreWithTimeSource(t, tc, mockTS)
187188
esImpl, ok := executorStore.(*executorStoreImpl)

0 commit comments

Comments
 (0)