Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
2de12d8
feat(shard distributor): add shard key helpers and metrics state
AndreasHolt Oct 19, 2025
5d95067
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
6e57536
fix(shard distributor): update LastMoveTime in the case where a shard…
AndreasHolt Oct 19, 2025
595d320
test(shard distributor): add tests for shard metrics
AndreasHolt Oct 19, 2025
d9ba54d
fix(shard distributor): modify comment
AndreasHolt Oct 19, 2025
32d2ecd
fix(shard distributor): add atomic check to prevent metrics race
AndreasHolt Oct 19, 2025
b624a00
fix(shard distributor): apply shard metric updates in a second phase …
AndreasHolt Oct 19, 2025
aad7b2e
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
6360f8a
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
1536d0a
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
f316fbf
test(shard distributor): BuildShardPrefix, BuildShardKey, ParseShardKey
AndreasHolt Oct 22, 2025
4524da9
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
126f725
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc53f68
test(shard distributor): small changes to shard key tests s.t. they l…
AndreasHolt Oct 25, 2025
733bbcb
fix(shard distributor): no longer check for key type ShardStatisticsK…
AndreasHolt Oct 25, 2025
6816b8e
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
f97e0cf
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
92ba56c
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
154e9be
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
d17b38c
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
dda32c9
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
799e90e
feat(shard distributor): add tags for aggregated load and assigned co…
AndreasHolt Oct 27, 2025
5ff359b
feat(shard distributor): initial placement based on initial placement…
AndreasHolt Oct 27, 2025
d73026e
test(shard distributor): tests that verify we pick the least loaded e…
AndreasHolt Oct 27, 2025
6ed2554
test(shard distributor): add test for GetShardOwner ShardNotFound cas…
AndreasHolt Oct 28, 2025
513e88c
feat(shard distributor): clean up the shard statistics
AndreasHolt Oct 29, 2025
9833525
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
0332fe5
fix(shard distributor): add mapping (new metric)
AndreasHolt Oct 29, 2025
d5a13d9
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
634bc02
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
812e854
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
b9813e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
dfb7448
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
36ec08f
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
38a6e81
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Nov 6, 2025
af733e6
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
a52e86f
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
abfc80e
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
df0feaf
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
8546a26
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
dde87ef
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
415e80c
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
c67d5c3
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
9ffcefb
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc769bf
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
8c22663
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
5ac3c5d
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
3973b82
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
3830d5e
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
443c0b1
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
9d159e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
18e63b7
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
e08a286
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
08eb635
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
f63664a
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
10e2ffa
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
8c6b0c8
chore: removed an old struct that appeared during rebase
Theis-Mathiassen Nov 11, 2025
158e030
feat(shard distributor): throttle shard-stat writes
AndreasHolt Nov 13, 2025
dd45ff0
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Nov 13, 2025
05e0d1d
fix(shard distributor): linter error
AndreasHolt Nov 13, 2025
e0779ec
feat(shard distributor): decouple shard stats write-throttling decisi…
AndreasHolt Nov 18, 2025
a732a1a
Merge branch 'heartbeat-shard-statistics' into initial-placement-with…
AndreasHolt Nov 18, 2025
9546f24
Merge branch 'master' into heartbeat-shard-statistics
AndreasHolt Nov 19, 2025
db70702
fix(shard-distributor): inverted condition in shard stats cleanup loop
AndreasHolt Nov 19, 2025
019749f
Merge branch 'heartbeat-shard-statistics' into initial-placement-with…
AndreasHolt Nov 19, 2025
b5c277a
feat(shard-distributor): only assign shards to ACTIVE executors and a…
AndreasHolt Nov 19, 2025
6d32bb8
fix: added guard to check if prev load is in invalid state
Theis-Mathiassen Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,9 @@ type (
}

LeaderProcess struct {
Period time.Duration `yaml:"period"`
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
Period time.Duration `yaml:"period"`
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
ShardStatsTTL time.Duration `yaml:"shardStatsTTL"`
}
)

Expand Down
9 changes: 9 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,9 +1185,18 @@ func ShardKey(shardKey string) Tag {
return newStringTag("shard-key", shardKey)
}

func AggregateLoad(load float64) Tag {
return newFloat64Tag("aggregated-load", load)
}

func AssignedCount(count int) Tag {
return newInt("assigned-count", count)
}

func ShardStatus(status string) Tag {
return newStringTag("shard-status", status)
}

func ShardLoad(load string) Tag {
return newStringTag("shard-load", load)
}
Expand Down
1 change: 1 addition & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,4 @@ shardDistribution:
process:
period: 1s
heartbeatTTL: 2s
shardStatsTTL: 60s
9 changes: 7 additions & 2 deletions service/sharddistributor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ type (
}

LeaderProcess struct {
Period time.Duration `yaml:"period"`
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
Period time.Duration `yaml:"period"`
HeartbeatTTL time.Duration `yaml:"heartbeatTTL"`
ShardStatsTTL time.Duration `yaml:"shardStatsTTL"`
}
)

Expand All @@ -97,6 +98,10 @@ const (
MigrationModeONBOARDED = "onboarded"
)

const (
DefaultShardStatsTTL = time.Minute
)

// ConfigMode maps string migration mode values to types.MigrationMode
var ConfigMode = map[string]types.MigrationMode{
MigrationModeINVALID: types.MigrationModeINVALID,
Expand Down
88 changes: 77 additions & 11 deletions service/sharddistributor/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sync"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
Expand Down Expand Up @@ -113,34 +114,99 @@ func (h *handlerImpl) GetShardOwner(ctx context.Context, request *types.GetShard

func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string, shardID string) (*types.GetShardOwnerResponse, error) {

// Get the current state of the namespace and find the executor with the least assigned shards
// Get the current state of the namespace and evaluate executor load to choose a placement target.
state, err := h.storage.GetState(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("get state: %w", err)
}

var executor string
minAssignedShards := math.MaxInt

for assignedExecutor, assignment := range state.ShardAssignments {
if len(assignment.AssignedShards) < minAssignedShards {
minAssignedShards = len(assignment.AssignedShards)
executor = assignedExecutor
}
executorID, aggregatedLoad, assignedCount, err := pickLeastLoadedExecutor(state)
if err != nil {
h.logger.Error(
"no eligible executor found for ephemeral assignment",
tag.ShardNamespace(namespace),
tag.ShardKey(shardID),
tag.Error(err),
)
return nil, err
}

h.logger.Info(
"selected executor for ephemeral shard assignment",
tag.AggregateLoad(aggregatedLoad),
tag.AssignedCount(assignedCount),
tag.ShardNamespace(namespace),
tag.ShardKey(shardID),
tag.ShardExecutor(executorID),
)

// Assign the shard to the executor with the least assigned shards
err = h.storage.AssignShard(ctx, namespace, shardID, executor)
err = h.storage.AssignShard(ctx, namespace, shardID, executorID)
if err != nil {
h.logger.Error(
"failed to assign ephemeral shard",
tag.ShardNamespace(namespace),
tag.ShardKey(shardID),
tag.ShardExecutor(executorID),
tag.Error(err),
)
return nil, fmt.Errorf("assign ephemeral shard: %w", err)
}

return &types.GetShardOwnerResponse{
Owner: executor,
Owner: executorID,
Namespace: namespace,
}, nil
}

// pickLeastLoadedExecutor returns the ACTIVE executor with the minimal aggregated smoothed load.
// Ties are broken by fewer assigned shards.
func pickLeastLoadedExecutor(state *store.NamespaceState) (executorID string, aggregatedLoad float64, assignedCount int, err error) {
if state == nil {
return "", 0, 0, fmt.Errorf("namespace state is nil")
}
if len(state.ShardAssignments) == 0 {
return "", 0, 0, fmt.Errorf("namespace state has no executors")
}

var chosenID string
var chosenAggregatedLoad float64
var chosenAssignedCount int
minAggregatedLoad := math.MaxFloat64
minAssignedShards := math.MaxInt

for candidate, assignment := range state.ShardAssignments {
executorState, ok := state.Executors[candidate]
if !ok || executorState.Status != types.ExecutorStatusACTIVE {
continue
}

aggregated := 0.0
for shard := range assignment.AssignedShards {
if stats, ok := state.ShardStats[shard]; ok {
if !math.IsNaN(stats.SmoothedLoad) && !math.IsInf(stats.SmoothedLoad, 0) {
aggregated += stats.SmoothedLoad
}
}
}

count := len(assignment.AssignedShards)
if aggregated < minAggregatedLoad || (aggregated == minAggregatedLoad && count < minAssignedShards) {
minAggregatedLoad = aggregated
minAssignedShards = count
chosenID = candidate
chosenAggregatedLoad = aggregated
chosenAssignedCount = count
}
}

if chosenID == "" {
return "", 0, 0, fmt.Errorf("no active executors available")
}

return chosenID, chosenAggregatedLoad, chosenAssignedCount, nil
}

func (h *handlerImpl) WatchNamespaceState(request *types.WatchNamespaceStateRequest, server WatchNamespaceStateServer) error {
h.startWG.Wait()

Expand Down
Loading