@@ -31,13 +31,12 @@ var (
3131)
3232
3333type executorStoreImpl struct {
34- client * clientv3.Client
35- prefix string
36- logger log.Logger
37- shardCache * shardcache.ShardToExecutorCache
38- timeSource clock.TimeSource
39- // Max interval (seconds) before we force a shard-stat persist.
40- maxStatsPersistIntervalSeconds int64
34+ client * clientv3.Client
35+ prefix string
36+ logger log.Logger
37+ shardCache * shardcache.ShardToExecutorCache
38+ timeSource clock.TimeSource
39+ maxStatsPersistIntervalSeconds int64 // Max interval (seconds) before we force a shard-stat persist.
4140}
4241
4342// Constants for gating shard statistics writes to reduce etcd load.
@@ -175,10 +174,7 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
175174
176175func deriveStatsPersistInterval (shardStatsTTL time.Duration ) int64 {
177176 ttlSeconds := int64 (shardStatsTTL .Seconds ())
178- if ttlSeconds <= 1 {
179- return 1
180- }
181- return ttlSeconds - 1
177+ return max (1 , ttlSeconds - 1 )
182178}
183179
184180func (s * executorStoreImpl ) recordShardStatistics (ctx context.Context , namespace , executorID string , reported map [string ]* types.ShardStatusReport ) {
@@ -251,15 +247,14 @@ func (s *executorStoreImpl) recordShardStatistics(ctx context.Context, namespace
251247 // Update smoothed load via EWMA.
252248 prevSmoothed := stats .SmoothedLoad
253249 prevUpdate := stats .LastUpdateTime
254- newSmoothed := ewmaSmoothedLoad (prevSmoothed , load , prevUpdate , now )
255250
256251 // Decide whether to persist this update. We always persist if this is the
257252 // first observation (prevUpdate == 0). Otherwise, if the change is small
258253 // and the previous persist is recent, skip the write to reduce etcd load.
259254 shouldPersist := true
260255 if prevUpdate > 0 {
261256 age := now - prevUpdate
262- delta := math .Abs (newSmoothed - prevSmoothed )
257+ delta := math .Abs (load - prevSmoothed )
263258 if delta < shardStatsEpsilon && age < s .maxStatsPersistIntervalSeconds {
264259 shouldPersist = false
265260 }
@@ -270,6 +265,7 @@ func (s *executorStoreImpl) recordShardStatistics(ctx context.Context, namespace
270265 continue
271266 }
272267
268+ newSmoothed := ewmaSmoothedLoad (prevSmoothed , load , prevUpdate , now )
273269 stats .SmoothedLoad = newSmoothed
274270 stats .LastUpdateTime = now
275271
0 commit comments