From 71032780151a2e21a8647916f5b873cb2537b724 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Mon, 17 Nov 2025 12:00:05 +0100 Subject: [PATCH 1/5] change State structures' timestamp to etcdtypes.Time --- service/sharddistributor/handler/executor.go | 6 +- .../sharddistributor/handler/executor_test.go | 35 +++++----- .../leader/process/processor.go | 16 ++--- .../leader/process/processor_test.go | 34 ++++----- .../store/etcd/etcdtypes/state.go | 70 +++++++++++++++++++ .../store/etcd/etcdtypes/time.go | 50 +++++++++++++ .../store/etcd/executorstore/etcdstore.go | 53 +++++++------- .../etcd/executorstore/etcdstore_test.go | 22 +++--- .../shardcache/namespaceshardcache.go | 3 +- .../shardcache/namespaceshardcache_test.go | 4 +- service/sharddistributor/store/state.go | 31 +++++--- .../store/wrappers/metered/metered_test.go | 4 +- 12 files changed, 232 insertions(+), 96 deletions(-) create mode 100644 service/sharddistributor/store/etcd/etcdtypes/state.go create mode 100644 service/sharddistributor/store/etcd/etcdtypes/time.go diff --git a/service/sharddistributor/handler/executor.go b/service/sharddistributor/handler/executor.go index 9187bb94ce9..4f617280165 100644 --- a/service/sharddistributor/handler/executor.go +++ b/service/sharddistributor/handler/executor.go @@ -74,14 +74,14 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe // If the state has changed we need to update heartbeat data. // Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate. if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED { - lastHeartbeatTime := time.Unix(previousHeartbeat.LastHeartbeat, 0) + lastHeartbeatTime := previousHeartbeat.LastHeartbeat if now.Sub(lastHeartbeatTime) < _heartbeatRefreshRate { return _convertResponse(assignedShards, mode), nil } } newHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: request.Status, ReportedShards: request.ShardStatusReports, Metadata: request.GetMetadata(), @@ -103,7 +103,7 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest) (*store.AssignedState, error) { assignedShards := store.AssignedState{ AssignedShards: make(map[string]*types.ShardAssignment), - LastUpdated: h.timeSource.Now().Unix(), + LastUpdated: h.timeSource.Now().UTC(), ModRevision: int64(0), } err := h.storage.DeleteExecutors(ctx, request.GetNamespace(), []string{request.GetExecutorID()}, store.NopGuard()) diff --git a/service/sharddistributor/handler/executor_test.go b/service/sharddistributor/handler/executor_test.go index df60e9eaebd..ac05926f9c3 100644 --- a/service/sharddistributor/handler/executor_test.go +++ b/service/sharddistributor/handler/executor_test.go @@ -41,7 +41,7 @@ func TestHeartbeat(t *testing.T) { mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(nil, nil, store.ErrExecutorNotFound) mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, }) @@ -65,7 +65,7 @@ func TestHeartbeat(t *testing.T) { } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, } @@ -91,7 +91,7 @@ func TestHeartbeat(t *testing.T) { } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, } @@ -100,7 +100,7 @@ func TestHeartbeat(t *testing.T) { mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, nil, nil) mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{ - LastHeartbeat: mockTimeSource.Now().Unix(), + LastHeartbeat: mockTimeSource.Now().UTC(), Status: types.ExecutorStatusACTIVE, }) @@ -124,13 +124,13 @@ func TestHeartbeat(t *testing.T) { } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, } mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, nil, nil) mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusDRAINING, }) @@ -178,7 +178,7 @@ func TestHeartbeat(t *testing.T) { Status: types.ExecutorStatusACTIVE, } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, } @@ -207,7 +207,7 @@ func TestHeartbeat(t *testing.T) { Status: types.ExecutorStatusACTIVE, } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, } @@ -240,7 +240,7 @@ func TestHeartbeat(t *testing.T) { } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, ReportedShards: map[string]*types.ShardStatusReport{ "shard1": {Status: types.ShardStatusREADY, ShardLoad: 1.0}, @@ -269,13 +269,14 @@ func TestHeartbeat(t *testing.T) { return nil }, ) - mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, store.HeartbeatState{ - LastHeartbeat: now.Unix(), - Status: types.ExecutorStatusACTIVE, - ReportedShards: map[string]*types.ShardStatusReport{ - "shard0": {Status: types.ShardStatusREADY, ShardLoad: 1.0}, + mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, gomock.AssignableToTypeOf(store.HeartbeatState{})).DoAndReturn( + func(_ context.Context, _ string, _ string, hb store.HeartbeatState) error { + // Validate status and reported shards, ignore exact timestamp + require.Equal(t, types.ExecutorStatusACTIVE, hb.Status) + require.Contains(t, hb.ReportedShards, "shard0") + return nil }, - }) + ) _, err := handler.Heartbeat(ctx, req) require.NoError(t, err) @@ -303,7 +304,7 @@ func TestHeartbeat(t *testing.T) { } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, ReportedShards: map[string]*types.ShardStatusReport{ "shard1": {Status: types.ShardStatusREADY, ShardLoad: 1.0}, @@ -346,7 +347,7 @@ func TestHeartbeat(t *testing.T) { } previousHeartbeat := store.HeartbeatState{ - LastHeartbeat: now.Unix(), + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, ReportedShards: map[string]*types.ShardStatusReport{ "shard1": {Status: types.ShardStatusREADY, ShardLoad: 1.0}, diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 901e6822ef7..127324249f8 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -251,11 +251,10 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { // identifyStaleExecutors returns a list of executors who have not reported a heartbeat recently. func (p *namespaceProcessor) identifyStaleExecutors(namespaceState *store.NamespaceState) map[string]int64 { expiredExecutors := make(map[string]int64) - now := p.timeSource.Now().Unix() - heartbeatTTL := int64(p.cfg.HeartbeatTTL.Seconds()) + now := p.timeSource.Now().UTC() for executorID, state := range namespaceState.Executors { - if (now - state.LastHeartbeat) > heartbeatTTL { + if now.Sub(state.LastHeartbeat) > p.cfg.HeartbeatTTL { expiredExecutors[executorID] = namespaceState.ShardAssignments[executorID].ModRevision } } @@ -266,8 +265,7 @@ func (p *namespaceProcessor) identifyStaleExecutors(namespaceState *store.Namesp // identifyStaleShardStats returns a list of shard statistics that are no longer relevant. func (p *namespaceProcessor) identifyStaleShardStats(namespaceState *store.NamespaceState) []string { activeShards := make(map[string]struct{}) - now := p.timeSource.Now().Unix() - shardStatsTTL := int64(p.cfg.HeartbeatTTL.Seconds()) + now := p.timeSource.Now().UTC() // 1. build set of active executors @@ -279,7 +277,7 @@ func (p *namespaceProcessor) identifyStaleShardStats(namespaceState *store.Names } isActive := executor.Status == types.ExecutorStatusACTIVE - isNotStale := (now - executor.LastHeartbeat) <= shardStatsTTL + isNotStale := now.Sub(executor.LastHeartbeat) <= p.cfg.HeartbeatTTL if isActive && isNotStale { for shardID := range assignedState.AssignedShards { activeShards[shardID] = struct{}{} @@ -304,8 +302,8 @@ func (p *namespaceProcessor) identifyStaleShardStats(namespaceState *store.Names if _, ok := activeShards[shardID]; ok { continue } - recentUpdate := stats.LastUpdateTime > 0 && (now-stats.LastUpdateTime) <= shardStatsTTL - recentMove := stats.LastMoveTime > 0 && (now-stats.LastMoveTime) <= shardStatsTTL + recentUpdate := !stats.LastUpdateTime.IsZero() && now.Sub(stats.LastUpdateTime) <= p.cfg.HeartbeatTTL + recentMove := !stats.LastMoveTime.IsZero() && now.Sub(stats.LastMoveTime) <= p.cfg.HeartbeatTTL if recentUpdate || recentMove { // Preserve stats that have been updated recently to allow cooldown/load history to // survive executor churn. These shards are likely awaiting reassignment, @@ -488,7 +486,7 @@ func (p *namespaceProcessor) addAssignmentsToNamespaceState(namespaceState *stor newState[executorID] = store.AssignedState{ AssignedShards: assignedShardsMap, - LastUpdated: p.timeSource.Now().Unix(), + LastUpdated: p.timeSource.Now().UTC(), ModRevision: modRevision, } } diff --git a/service/sharddistributor/leader/process/processor_test.go b/service/sharddistributor/leader/process/processor_test.go index 4027e9a496b..f023bb2d5b5 100644 --- a/service/sharddistributor/leader/process/processor_test.go +++ b/service/sharddistributor/leader/process/processor_test.go @@ -86,7 +86,7 @@ func TestRebalanceShards_InitialDistribution(t *testing.T) { defer mocks.ctrl.Finish() processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) - now := mocks.timeSource.Now().Unix() + now := mocks.timeSource.Now() state := map[string]store.HeartbeatState{ "exec-1": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}, "exec-2": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}, @@ -112,7 +112,7 @@ func TestRebalanceShards_ExecutorRemoved(t *testing.T) { defer mocks.ctrl.Finish() processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) - now := mocks.timeSource.Now().Unix() + now := mocks.timeSource.Now() heartbeats := map[string]store.HeartbeatState{ "exec-1": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}, "exec-2": {Status: types.ExecutorStatusDRAINING, LastHeartbeat: now}, @@ -150,8 +150,8 @@ func TestRebalanceShards_ExecutorStale(t *testing.T) { now := mocks.timeSource.Now() heartbeats := map[string]store.HeartbeatState{ - "exec-1": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now.Unix()}, - "exec-2": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now.Add(-2 * time.Second).Unix()}, + "exec-1": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}, + "exec-2": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now.Add(-2 * time.Second)}, } assignments := map[string]store.AssignedState{ "exec-1": { @@ -219,8 +219,8 @@ func TestCleanupStaleExecutors(t *testing.T) { now := mocks.timeSource.Now() heartbeats := map[string]store.HeartbeatState{ - "exec-active": {LastHeartbeat: now.Unix()}, - "exec-stale": {LastHeartbeat: now.Add(-2 * time.Second).Unix()}, + "exec-active": {LastHeartbeat: now}, + "exec-stale": {LastHeartbeat: now.Add(-2 * time.Second)}, } namespaceState := &store.NamespaceState{Executors: heartbeats} @@ -235,11 +235,11 @@ func TestCleanupStaleShardStats(t *testing.T) { defer mocks.ctrl.Finish() processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) - now := mocks.timeSource.Now() + now := mocks.timeSource.Now().UTC() heartbeats := map[string]store.HeartbeatState{ - "exec-active": {LastHeartbeat: now.Unix(), Status: types.ExecutorStatusACTIVE}, - "exec-stale": {LastHeartbeat: now.Add(-2 * time.Second).Unix()}, + "exec-active": {LastHeartbeat: now, Status: types.ExecutorStatusACTIVE}, + "exec-stale": {LastHeartbeat: now.Add(-2 * time.Second)}, } assignments := map[string]store.AssignedState{ @@ -257,9 +257,9 @@ func TestCleanupStaleShardStats(t *testing.T) { } shardStats := map[string]store.ShardStatistics{ - "shard-1": {SmoothedLoad: 1.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()}, - "shard-2": {SmoothedLoad: 2.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()}, - "shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Add(-2 * time.Second).Unix(), LastMoveTime: now.Add(-2 * time.Second).Unix()}, + "shard-1": {SmoothedLoad: 1.0, LastUpdateTime: now, LastMoveTime: now}, + "shard-2": {SmoothedLoad: 2.0, LastUpdateTime: now, LastMoveTime: now}, + "shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Add(-2 * time.Second), LastMoveTime: now.Add(-2 * time.Second)}, } namespaceState := &store.NamespaceState{ @@ -279,14 +279,14 @@ func TestCleanupStaleShardStats(t *testing.T) { now := mocks.timeSource.Now() - expiredExecutor := now.Add(-2 * time.Second).Unix() + expiredExecutor := now.Add(-2 * time.Second) namespaceState := &store.NamespaceState{ Executors: map[string]store.HeartbeatState{ "exec-stale": {LastHeartbeat: expiredExecutor}, }, ShardAssignments: map[string]store.AssignedState{}, ShardStats: map[string]store.ShardStatistics{ - "shard-1": {SmoothedLoad: 5.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()}, + "shard-1": {SmoothedLoad: 5.0, LastUpdateTime: now, LastMoveTime: now}, }, } @@ -307,7 +307,7 @@ func TestRebalance_StoreErrors(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), expectedErr.Error()) - now := mocks.timeSource.Now().Unix() + now := mocks.timeSource.Now() mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{ Executors: map[string]store.HeartbeatState{"e": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}}, GlobalRevision: 1, @@ -391,7 +391,7 @@ func TestRebalanceShards_NoShardsToReassign(t *testing.T) { defer mocks.ctrl.Finish() processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) - now := mocks.timeSource.Now().Unix() + now := mocks.timeSource.Now() heartbeats := map[string]store.HeartbeatState{ "exec-1": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}, } @@ -419,7 +419,7 @@ func TestRebalanceShards_WithUnassignedShards(t *testing.T) { defer mocks.ctrl.Finish() processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) - now := mocks.timeSource.Now().Unix() + now := mocks.timeSource.Now() heartbeats := map[string]store.HeartbeatState{ "exec-1": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}, } diff --git a/service/sharddistributor/store/etcd/etcdtypes/state.go b/service/sharddistributor/store/etcd/etcdtypes/state.go new file mode 100644 index 00000000000..cf9890cafc6 --- /dev/null +++ b/service/sharddistributor/store/etcd/etcdtypes/state.go @@ -0,0 +1,70 @@ +package etcdtypes + +import ( + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/store" +) + +type AssignedState struct { + AssignedShards map[string]*types.ShardAssignment `json:"assigned_shards"` + LastUpdated Time `json:"last_updated"` + ModRevision int64 `json:"mod_revision"` +} + +// ToAssignedState converts the current AssignedState to store.AssignedState. +func (s *AssignedState) ToAssignedState() *store.AssignedState { + if s == nil { + return nil + } + + return &store.AssignedState{ + AssignedShards: s.AssignedShards, + LastUpdated: s.LastUpdated.ToTime(), + ModRevision: s.ModRevision, + } +} + +// FromAssignedState creates an AssignedState from a store.AssignedState. +func FromAssignedState(src *store.AssignedState) *AssignedState { + if src == nil { + return nil + } + + return &AssignedState{ + AssignedShards: src.AssignedShards, + LastUpdated: Time(src.LastUpdated), + ModRevision: src.ModRevision, + } +} + +type ShardStatistics struct { + SmoothedLoad float64 `json:"smoothed_load"` + LastUpdateTime Time `json:"last_update_time"` + LastMoveTime Time `json:"last_move_time"` +} + +// ToShardStatistics converts the current ShardStatistics to store.ShardStatistics. +func (s *ShardStatistics) ToShardStatistics() *store.ShardStatistics { + if s == nil { + return nil + } + + return &store.ShardStatistics{ + SmoothedLoad: s.SmoothedLoad, + LastUpdateTime: s.LastUpdateTime.ToTime(), + LastMoveTime: s.LastMoveTime.ToTime(), + } +} + +// FromShardStatistics creates a ShardStatistics from a store.ShardStatistics. +func FromShardStatistics(src *store.ShardStatistics) *ShardStatistics { + if src == nil { + return nil + } + + return &ShardStatistics{ + SmoothedLoad: src.SmoothedLoad, + LastUpdateTime: Time(src.LastUpdateTime), + LastMoveTime: Time(src.LastMoveTime), + } +} diff --git a/service/sharddistributor/store/etcd/etcdtypes/time.go b/service/sharddistributor/store/etcd/etcdtypes/time.go new file mode 100644 index 00000000000..744f963db12 --- /dev/null +++ b/service/sharddistributor/store/etcd/etcdtypes/time.go @@ -0,0 +1,50 @@ +package etcdtypes + +import "time" + +// Time is a wrapper around time that implements JSON marshalling/unmarshalling +// in time.RFC3339Nano format to keep precision when storing in etcd. +// Convert to UTC before storing/parsing to ensure consistency. +type Time time.Time + +// ToTime converts Time back to time.Time. +func (t Time) ToTime() time.Time { + return time.Time(t) +} + +// MarshalJSON implements the json.Marshaler interface. +// It encodes the time in time.RFC3339Nano format. +func (t Time) MarshalJSON() ([]byte, error) { + s := time.Time(t).UTC().Format(time.RFC3339Nano) + return []byte(`"` + s + `"`), nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +// It decodes the time from time.RFC3339Nano format. +func (t *Time) UnmarshalJSON(data []byte) error { + str := string(data) + if len(str) >= 2 && str[0] == '"' && str[len(str)-1] == '"' { + str = str[1 : len(str)-1] + } + parsed, err := time.Parse(time.RFC3339Nano, str) + if err != nil { + return err + } + *t = Time(parsed.UTC()) + return nil +} + +// ToTime parses a string in time.RFC3339Nano format and returns a time.Time in UTC. +func ToTime(s string) (time.Time, error) { + parsed, err := time.Parse(time.RFC3339Nano, s) + if err != nil { + return time.Time{}, err + } + return parsed.UTC(), nil +} + +// FromTime converts time.Time to UTC and +// formats time.Time to a string in time.RFC3339Nano format. +func FromTime(t time.Time) string { + return t.UTC().Format(time.RFC3339Nano) +} diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index 9bd2b465d61..dbc6d090cf8 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -8,7 +8,6 @@ import ( "encoding/json" "errors" "fmt" - "strconv" "time" clientv3 "go.etcd.io/etcd/client/v3" @@ -21,6 +20,7 @@ import ( "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes" "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common" "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/shardcache" ) @@ -42,8 +42,8 @@ type executorStoreImpl struct { type shardStatisticsUpdate struct { key string shardID string - stats store.ShardStatistics - desiredLastMove int64 // intended LastMoveTime for this update + stats etcdtypes.ShardStatistics + desiredLastMove etcdtypes.Time // intended LastMoveTime for this update } // ExecutorStoreParams defines the dependencies for the etcd store, for use with fx. @@ -138,7 +138,7 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec // Build all operations including metadata ops := []clientv3.Op{ - clientv3.OpPut(heartbeatETCDKey, strconv.FormatInt(request.LastHeartbeat, 10)), + clientv3.OpPut(heartbeatETCDKey, etcdtypes.FromTime(request.LastHeartbeat)), clientv3.OpPut(stateETCDKey, string(jsonState)), clientv3.OpPut(reportedShardsETCDKey, string(reportedShardsData)), } @@ -173,7 +173,7 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, } heartbeatState := &store.HeartbeatState{} - assignedState := &store.AssignedState{} + assignedState := &etcdtypes.AssignedState{} found := false for _, kv := range resp.Kvs { @@ -187,11 +187,10 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, found = true // We found at least one valid key part for the executor. switch keyType { case etcdkeys.ExecutorHeartbeatKey: - timestamp, err := strconv.ParseInt(value, 10, 64) + heartbeatState.LastHeartbeat, err = etcdtypes.ToTime(value) if err != nil { return nil, nil, fmt.Errorf("parse heartbeat timestamp: %w", err) } - heartbeatState.LastHeartbeat = timestamp case etcdkeys.ExecutorStatusKey: if err := common.DecompressAndUnmarshal(kv.Value, &heartbeatState.Status); err != nil { return nil, nil, fmt.Errorf("parse executor status: %w", err) @@ -212,7 +211,7 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, return nil, nil, store.ErrExecutorNotFound } - return heartbeatState, assignedState, nil + return heartbeatState, assignedState.ToAssignedState(), nil } // --- ShardStore Implementation --- @@ -237,10 +236,13 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st } heartbeat := heartbeatStates[executorID] assigned := assignedStates[executorID] + switch keyType { case etcdkeys.ExecutorHeartbeatKey: - timestamp, _ := strconv.ParseInt(value, 10, 64) - heartbeat.LastHeartbeat = timestamp + heartbeat.LastHeartbeat, err = etcdtypes.ToTime(value) + if err != nil { + return nil, fmt.Errorf("parse heartbeat timestamp: %w", err) + } case etcdkeys.ExecutorStatusKey: if err := common.DecompressAndUnmarshal(kv.Value, &heartbeat.Status); err != nil { return nil, fmt.Errorf("parse executor status: %w", err) @@ -250,9 +252,11 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st return nil, fmt.Errorf("parse reported shards: %w", err) } case etcdkeys.ExecutorAssignedStateKey: - if err := common.DecompressAndUnmarshal(kv.Value, &assigned); err != nil { + var assignedRaw etcdtypes.AssignedState + if err := common.DecompressAndUnmarshal(kv.Value, &assignedRaw); err != nil { return nil, fmt.Errorf("parse assigned shards: %w, %s", err, value) } + assigned = *assignedRaw.ToAssignedState() assigned.ModRevision = kv.ModRevision } heartbeatStates[executorID] = heartbeat @@ -273,11 +277,11 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st if shardKeyType != etcdkeys.ShardStatisticsKey { continue } - var shardStatistic store.ShardStatistics + var shardStatistic etcdtypes.ShardStatistics if err := common.DecompressAndUnmarshal(kv.Value, &shardStatistic); err != nil { continue } - shardStats[shardID] = shardStatistic + shardStats[shardID] = *shardStatistic.ToShardStatistics() } return &store.NamespaceState{ @@ -370,7 +374,7 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, if err != nil { return fmt.Errorf("build executor assigned state key: %w", err) } - value, err := json.Marshal(state) + value, err := json.Marshal(etcdtypes.FromAssignedState(&state)) if err != nil { return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err) } @@ -453,8 +457,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, return fmt.Errorf("get executor assigned state: %w", err) } - var state store.AssignedState - var shardStats store.ShardStatistics + var state etcdtypes.AssignedState + var shardStats etcdtypes.ShardStatistics modRevision := int64(0) // A revision of 0 means the key doesn't exist yet. if len(resp.Kvs) > 0 { @@ -473,7 +477,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, if err != nil { return fmt.Errorf("get shard statistics: %w", err) } - now := s.timeSource.Now().Unix() + + now := s.timeSource.Now().UTC() statsModRevision := int64(0) if len(statsResp.Kvs) > 0 { statsModRevision = statsResp.Kvs[0].ModRevision @@ -483,12 +488,12 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, // Statistics already exist, update the last move time. // This can happen if the shard was previously assigned to an executor, and a lookup happens after the executor is deleted, // AssignShard is then called to assign the shard to a new executor. - shardStats.LastMoveTime = now + shardStats.LastMoveTime = etcdtypes.Time(now) } else { // Statistics don't exist, initialize them. shardStats.SmoothedLoad = 0 - shardStats.LastUpdateTime = now - shardStats.LastMoveTime = now + shardStats.LastUpdateTime = etcdtypes.Time(now) + shardStats.LastMoveTime = etcdtypes.Time(now) } // 2. Get the executor state. @@ -669,7 +674,7 @@ func (s *executorStoreImpl) prepareShardStatisticsUpdates(ctx context.Context, n for executorID, state := range newAssignments { for shardID := range state.AssignedShards { - now := s.timeSource.Now().Unix() + now := s.timeSource.Now().UTC() oldOwner, err := s.shardCache.GetShardOwner(ctx, namespace, shardID) if err != nil && !errors.Is(err, store.ErrShardNotFound) { @@ -691,7 +696,7 @@ func (s *executorStoreImpl) prepareShardStatisticsUpdates(ctx context.Context, n return nil, fmt.Errorf("get shard statistics: %w", err) } - stats := store.ShardStatistics{} + stats := etcdtypes.ShardStatistics{} if len(statsResp.Kvs) > 0 { if err := common.DecompressAndUnmarshal(statsResp.Kvs[0].Value, &stats); err != nil { @@ -699,14 +704,14 @@ func (s *executorStoreImpl) prepareShardStatisticsUpdates(ctx context.Context, n } } else { stats.SmoothedLoad = 0 - stats.LastUpdateTime = now + stats.LastUpdateTime = etcdtypes.Time(now) } updates = append(updates, shardStatisticsUpdate{ key: shardStatisticsKey, shardID: shardID, stats: stats, - desiredLastMove: now, + desiredLastMove: etcdtypes.Time(now), }) } } diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go index a47565768bd..2a2ef1fc102 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -3,7 +3,6 @@ package executorstore import ( "context" "encoding/json" - "strconv" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes" "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common" "github.com/uber/cadence/service/sharddistributor/store/etcd/leaderstore" "github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper" @@ -28,11 +28,11 @@ func TestRecordHeartbeat(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - nowTS := time.Now().Unix() + now := time.Now().UTC() executorID := "executor-TestRecordHeartbeat" req := store.HeartbeatState{ - LastHeartbeat: nowTS, + LastHeartbeat: now, Status: types.ExecutorStatusACTIVE, ReportedShards: map[string]*types.ShardStatusReport{ "shard-TestRecordHeartbeat": {Status: types.ShardStatusREADY}, @@ -59,7 +59,7 @@ func TestRecordHeartbeat(t *testing.T) { resp, err := tc.Client.Get(ctx, heartbeatKey) require.NoError(t, err) assert.Equal(t, int64(1), resp.Count, "Heartbeat key should exist") - assert.Equal(t, strconv.FormatInt(nowTS, 10), string(resp.Kvs[0].Value)) + assert.Equal(t, etcdtypes.FromTime(now), string(resp.Kvs[0].Value)) resp, err = tc.Client.Get(ctx, stateKey) require.NoError(t, err) @@ -97,12 +97,12 @@ func TestGetHeartbeat(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - nowTS := time.Now().Unix() + now := time.Now().UTC() executorID := "executor-get" req := store.HeartbeatState{ Status: types.ExecutorStatusDRAINING, - LastHeartbeat: nowTS, + LastHeartbeat: now, } // 1. Record a heartbeat @@ -130,7 +130,7 @@ func TestGetHeartbeat(t *testing.T) { // 3. Verify the state assert.Equal(t, types.ExecutorStatusDRAINING, hb.Status) - assert.Equal(t, nowTS, hb.LastHeartbeat) + assert.Equal(t, now, hb.LastHeartbeat) require.NotNil(t, assignedFromDB.AssignedShards) assert.Equal(t, assignState[executorID].AssignedShards, assignedFromDB.AssignedShards) @@ -310,11 +310,11 @@ func TestGuardedOperations(t *testing.T) { elector, err := leaderstore.NewLeaderStore(leaderstore.StoreParams{Client: tc.Client, Cfg: tc.LeaderCfg, Lifecycle: fxtest.NewLifecycle(t)}) require.NoError(t, err) election1, err := elector.CreateElection(ctx, namespace) - require.NoError(t, err) defer election1.Cleanup(ctx) + defer func() { _ = election1.Cleanup(ctx) }() election2, err := elector.CreateElection(ctx, namespace) - require.NoError(t, err) defer election2.Cleanup(ctx) + defer func() { _ = election2.Cleanup(ctx) }() // 2. First node becomes leader require.NoError(t, election1.Campaign(ctx, "host-1")) @@ -541,10 +541,10 @@ func TestShardStatisticsPersistence(t *testing.T) { require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE})) // 2. Pre-create shard statistics as if coming from prior history - stats := store.ShardStatistics{SmoothedLoad: 12.5, LastUpdateTime: 1234, LastMoveTime: 5678} + stats := store.ShardStatistics{SmoothedLoad: 12.5, LastUpdateTime: time.Unix(1234, 0).UTC(), LastMoveTime: time.Unix(5678, 0).UTC()} shardStatsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey) require.NoError(t, err) - payload, err := json.Marshal(stats) + payload, err := json.Marshal(etcdtypes.FromShardStatistics(&stats)) require.NoError(t, err) _, err = tc.Client.Put(ctx, shardStatsKey, string(payload)) require.NoError(t, err) diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go index c59b15f2d3d..f35ba4b73fc 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go @@ -12,6 +12,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes" "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common" ) @@ -175,7 +176,7 @@ func (n *namespaceShardToExecutor) refreshExecutorState(ctx context.Context) err case etcdkeys.ExecutorAssignedStateKey: shardOwner := getOrCreateShardOwner(shardOwners, executorID) - var assignedState store.AssignedState + var assignedState etcdtypes.AssignedState err = common.DecompressAndUnmarshal(kv.Value, &assignedState) if err != nil { return fmt.Errorf("parse assigned state: %w", err) diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go index 0eb0607b574..dd5061f608f 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go @@ -12,15 +12,15 @@ import ( "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/types" - "github.com/uber/cadence/service/sharddistributor/store" "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys" + "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes" "github.com/uber/cadence/service/sharddistributor/store/etcd/testhelper" ) // setupExecutorWithShards creates an executor in etcd with assigned shards and metadata func setupExecutorWithShards(t *testing.T, testCluster *testhelper.StoreTestCluster, namespace, executorID string, shards []string, metadata map[string]string) { // Create assigned state - assignedState := &store.AssignedState{ + assignedState := &etcdtypes.AssignedState{ AssignedShards: make(map[string]*types.ShardAssignment), } for _, shardID := range shards { diff --git a/service/sharddistributor/store/state.go b/service/sharddistributor/store/state.go index 70c75ea0df6..25f6bb3ee07 100644 --- a/service/sharddistributor/store/state.go +++ b/service/sharddistributor/store/state.go @@ -1,20 +1,26 @@ package store import ( + "time" + "github.com/uber/cadence/common/types" ) type HeartbeatState struct { - LastHeartbeat int64 `json:"last_heartbeat"` - Status types.ExecutorStatus `json:"status"` - ReportedShards map[string]*types.ShardStatusReport `json:"reported_shards"` - Metadata map[string]string `json:"metadata"` + // LastHeartbeat is the time of the last heartbeat received from the executor + LastHeartbeat time.Time + Status types.ExecutorStatus + ReportedShards map[string]*types.ShardStatusReport + Metadata map[string]string } type AssignedState struct { - AssignedShards map[string]*types.ShardAssignment `json:"assigned_shards"` // What we assigned - LastUpdated int64 `json:"last_updated"` - ModRevision int64 `json:"mod_revision"` + // AssignedShards is the map of shard ID to shard assignment + AssignedShards map[string]*types.ShardAssignment + + // LastUpdated is the time we last updated this assignment + LastUpdated time.Time + ModRevision int64 } type NamespaceState struct { @@ -29,9 +35,14 @@ type ShardState struct { } type ShardStatistics struct { - SmoothedLoad float64 `json:"smoothed_load"` // EWMA of shard load that persists across executor changes - LastUpdateTime int64 `json:"last_update_time"` // heartbeat timestamp that last updated the EWMA - LastMoveTime int64 `json:"last_move_time"` // timestamp for the latest reassignment, used for cooldowns + // EWMA of shard load that persists across executor changes + SmoothedLoad float64 + + // LastUpdateTime is the heartbeat timestamp that last updated the EWMA + LastUpdateTime time.Time + + // LastMoveTime is the timestamp when this shard was last reassigned + LastMoveTime time.Time } type ShardOwner struct { diff --git a/service/sharddistributor/store/wrappers/metered/metered_test.go b/service/sharddistributor/store/wrappers/metered/metered_test.go index e9d0383aa3d..5874cc736f2 100644 --- a/service/sharddistributor/store/wrappers/metered/metered_test.go +++ b/service/sharddistributor/store/wrappers/metered/metered_test.go @@ -24,10 +24,10 @@ const ( func TestMeteredStore_GetHeartbeat(t *testing.T) { heartbeatRes := &store.HeartbeatState{ - LastHeartbeat: time.Now().Unix(), + LastHeartbeat: time.Now().UTC(), } assignedState := &store.AssignedState{ - LastUpdated: time.Now().Unix(), + LastUpdated: time.Now().UTC(), } tests := []struct { From 4901c1b524c218a424ae1e88e1a4cfd06191805c Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 18 Nov 2025 10:43:17 +0100 Subject: [PATCH 2/5] simplify etcd key building --- .../store/etcd/etcdkeys/etcdkeys.go | 136 +++++++++++------- .../store/etcd/etcdkeys/etcdkeys_test.go | 39 +++-- .../store/etcd/executorstore/etcdstore.go | 68 +++------ .../etcd/executorstore/etcdstore_test.go | 20 +-- .../shardcache/namespaceshardcache.go | 9 +- 5 files changed, 127 insertions(+), 145 deletions(-) diff --git a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go index 39e3f917de8..f0d25c8a05f 100644 --- a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go +++ b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go @@ -5,51 +5,68 @@ import ( "strings" ) -const ( - ExecutorHeartbeatKey = "heartbeat" - ExecutorStatusKey = "status" - ExecutorReportedShardsKey = "reported_shards" - ExecutorAssignedStateKey = "assigned_state" - ShardAssignedKey = "assigned" - ShardStatisticsKey = "statistics" - ExecutorMetadataKey = "metadata" -) +// BuildNamespacePrefix constructs the etcd key prefix for a given namespace. +// result: / +func BuildNamespacePrefix(prefix, namespace string) string { + return fmt.Sprintf("%s/%s", prefix, namespace) +} -var validKeyTypes = []string{ - ExecutorHeartbeatKey, - ExecutorStatusKey, - ExecutorReportedShardsKey, - ExecutorAssignedStateKey, - ExecutorMetadataKey, +// BuildExecutorsPrefix constructs the etcd key prefix for executors within a given namespace. +// result: //executors/ +func BuildExecutorsPrefix(prefix, namespace string) string { + return fmt.Sprintf("%s/executors/", BuildNamespacePrefix(prefix, namespace)) } -func isValidKeyType(key string) bool { - for _, validKey := range validKeyTypes { - if key == validKey { - return true - } - } - return false +// BuildExecutorIDPrefix constructs the etcd key prefix for a specific executor within a namespace. +// result: //executors// +func BuildExecutorIDPrefix(prefix, namespace, executorID string) string { + return fmt.Sprintf("%s%s/", BuildExecutorsPrefix(prefix, namespace), executorID) } -func BuildNamespacePrefix(prefix string, namespace string) string { - return fmt.Sprintf("%s/%s", prefix, namespace) +// BuildShardsPrefix constructs the etcd key prefix for shards within a given namespace. +// result: //shards/ +func BuildShardsPrefix(prefix, namespace string) string { + return fmt.Sprintf("%s/shards/", BuildNamespacePrefix(prefix, namespace)) } -func BuildExecutorPrefix(prefix string, namespace string) string { - return fmt.Sprintf("%s/executors/", BuildNamespacePrefix(prefix, namespace)) +// ExecutorKeyType represents the allowed executor-level key types in etcd. +// Use BuildExecutorKey to construct keys of these types. +type ExecutorKeyType string + +const ( + ExecutorHeartbeatKey ExecutorKeyType = "heartbeat" + ExecutorStatusKey ExecutorKeyType = "status" + ExecutorReportedShardsKey ExecutorKeyType = "reported_shards" + ExecutorAssignedStateKey ExecutorKeyType = "assigned_state" + ExecutorMetadataKey ExecutorKeyType = "metadata" +) + +// validExecutorKeyTypes defines the set of valid executor key types. +var validExecutorKeyTypes = map[ExecutorKeyType]struct{}{ + ExecutorHeartbeatKey: {}, + ExecutorStatusKey: {}, + ExecutorReportedShardsKey: {}, + ExecutorAssignedStateKey: {}, + ExecutorMetadataKey: {}, } -func BuildExecutorKey(prefix string, namespace, executorID, keyType string) (string, error) { - // We allow an empty key, to build the full prefix - if !isValidKeyType(keyType) && keyType != "" { - return "", fmt.Errorf("invalid key type: %s", keyType) - } - return fmt.Sprintf("%s%s/%s", BuildExecutorPrefix(prefix, namespace), executorID, keyType), nil +// isValidExecutorKeyType checks if the provided key type is valid. +func isValidExecutorKeyType(keyType ExecutorKeyType) bool { + _, exist := validExecutorKeyTypes[keyType] + return exist } -func ParseExecutorKey(prefix string, namespace, key string) (executorID, keyType string, err error) { - prefix = BuildExecutorPrefix(prefix, namespace) +// BuildExecutorKey constructs the etcd key for a specific executor and key type. +// result: //executors// +func BuildExecutorKey(prefix, namespace, executorID string, keyType ExecutorKeyType) string { + return fmt.Sprintf("%s%s", BuildExecutorIDPrefix(prefix, namespace, executorID), keyType) +} + +// ParseExecutorKey parses an etcd key and extracts the executor ID and key type. +// It returns an error if the key does not conform to the expected format. +// Expected format of key: //executors// +func ParseExecutorKey(prefix, namespace, key string) (executorID string, keyType ExecutorKeyType, err error) { + prefix = BuildExecutorsPrefix(prefix, namespace) if !strings.HasPrefix(key, prefix) { return "", "", fmt.Errorf("key '%s' does not have expected prefix '%s'", key, prefix) } @@ -61,29 +78,44 @@ func ParseExecutorKey(prefix string, namespace, key string) (executorID, keyType // For metadata keys, the format is: executorID/metadata/metadataKey // For other keys, the format is: executorID/keyType // We return executorID and the first keyType (e.g., "metadata") - if len(parts) > 2 && parts[1] == ExecutorMetadataKey { + if len(parts) > 2 && ExecutorKeyType(parts[1]) == ExecutorMetadataKey { // This is a metadata key, return "metadata" as the keyType - return parts[0], parts[1], nil + return parts[0], ExecutorMetadataKey, nil } if len(parts) != 2 { return "", "", fmt.Errorf("unexpected key format: %s", key) } - return parts[0], parts[1], nil + if !isValidExecutorKeyType(ExecutorKeyType(parts[1])) { + return "", "", fmt.Errorf("invalid executor key type: %s", parts[1]) + } + return parts[0], ExecutorKeyType(parts[1]), nil } -func BuildShardPrefix(prefix string, namespace string) string { - return fmt.Sprintf("%s/shards/", BuildNamespacePrefix(prefix, namespace)) +// BuildMetadataKey constructs the etcd key for a specific metadata entry of an executor. +// result: //executors//metadata/ +func BuildMetadataKey(prefix string, namespace, executorID, metadataKey string) string { + return fmt.Sprintf("%s/%s", BuildExecutorKey(prefix, namespace, executorID, ExecutorMetadataKey), metadataKey) } -func BuildShardKey(prefix string, namespace, shardID, keyType string) (string, error) { - if keyType != ShardStatisticsKey { - return "", fmt.Errorf("invalid shard key type: %s", keyType) - } - return fmt.Sprintf("%s%s/%s", BuildShardPrefix(prefix, namespace), shardID, keyType), nil +// ShardKeyType represents the allowed shard-level key types in etcd. +// Use BuildShardKey to construct keys of these types. +type ShardKeyType string + +const ( + ShardStatisticsKey ShardKeyType = "statistics" +) + +// BuildShardKey constructs the etcd key for a specific shard and key type. +// result: //shards// +func BuildShardKey(prefix string, namespace, shardID string, keyType ShardKeyType) string { + return fmt.Sprintf("%s%s/%s", BuildShardsPrefix(prefix, namespace), shardID, keyType) } -func ParseShardKey(prefix string, namespace, key string) (shardID, keyType string, err error) { - prefix = BuildShardPrefix(prefix, namespace) +// ParseShardKey parses an etcd key and extracts the shard ID and key type. +// It returns an error if the key does not conform to the expected format. +// Expected format of key: //shards// +func ParseShardKey(prefix string, namespace, key string) (shardID string, keyType ShardKeyType, err error) { + prefix = BuildShardsPrefix(prefix, namespace) if !strings.HasPrefix(key, prefix) { return "", "", fmt.Errorf("key '%s' does not have expected prefix '%s'", key, prefix) } @@ -92,14 +124,8 @@ func ParseShardKey(prefix string, namespace, key string) (shardID, keyType strin if len(parts) != 2 { return "", "", fmt.Errorf("unexpected shard key format: %s", key) } - return parts[0], parts[1], nil -} - -func BuildMetadataKey(prefix string, namespace, executorID, metadataKey string) string { - metadataKeyPrefix, err := BuildExecutorKey(prefix, namespace, executorID, ExecutorMetadataKey) - if err != nil { - // This should never happen since ExecutorMetadataKey is a valid constant - panic(fmt.Sprintf("BuildMetadataKey: unexpected error building executor key: %v", err)) + if parts[1] != string(ShardStatisticsKey) { + return "", "", fmt.Errorf("invalid shard key type: %s", parts[1]) } - return fmt.Sprintf("%s/%s", metadataKeyPrefix, metadataKey) + return parts[0], ShardStatisticsKey, nil } diff --git a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go index de041e0ce72..601ab4541c0 100644 --- a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go +++ b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go @@ -11,44 +11,32 @@ func TestBuildNamespacePrefix(t *testing.T) { assert.Equal(t, "/cadence/test-ns", got) } -func TestBuildExecutorPrefix(t *testing.T) { - got := BuildExecutorPrefix("/cadence", "test-ns") +func TestBuildExecutorsPrefix(t *testing.T) { + got := BuildExecutorsPrefix("/cadence", "test-ns") assert.Equal(t, "/cadence/test-ns/executors/", got) } -func TestBuildShardPrefix(t *testing.T) { - got := BuildShardPrefix("/cadence", "test-ns") +func TestBuildShardsPrefix(t *testing.T) { + got := BuildShardsPrefix("/cadence", "test-ns") assert.Equal(t, "/cadence/test-ns/shards/", got) } func TestBuildExecutorKey(t *testing.T) { - got, err := BuildExecutorKey("/cadence", "test-ns", "exec-1", "heartbeat") - assert.NoError(t, err) + got := BuildExecutorKey("/cadence", "test-ns", "exec-1", "heartbeat") assert.Equal(t, "/cadence/test-ns/executors/exec-1/heartbeat", got) } -func TestBuildExecutorKeyFail(t *testing.T) { - _, err := BuildExecutorKey("/cadence", "test-ns", "exec-1", "invalid") - assert.ErrorContains(t, err, "invalid key type: invalid") -} - func TestBuildShardKey(t *testing.T) { - got, err := BuildShardKey("/cadence", "test-ns", "shard-1", "statistics") - assert.NoError(t, err) + got := BuildShardKey("/cadence", "test-ns", "shard-1", ShardStatisticsKey) assert.Equal(t, "/cadence/test-ns/shards/shard-1/statistics", got) } -func TestBuildShardKeyFail(t *testing.T) { - _, err := BuildShardKey("/cadence", "test-ns", "shard-1", "invalid") - assert.ErrorContains(t, err, "invalid shard key type: invalid") -} - func TestParseExecutorKey(t *testing.T) { // Valid key executorID, keyType, err := ParseExecutorKey("/cadence", "test-ns", "/cadence/test-ns/executors/exec-1/heartbeat") assert.NoError(t, err) assert.Equal(t, "exec-1", executorID) - assert.Equal(t, "heartbeat", keyType) + assert.Equal(t, ExecutorHeartbeatKey, keyType) // Prefix missing _, _, err = ParseExecutorKey("/cadence", "test-ns", "/wrong/prefix") @@ -64,7 +52,7 @@ func TestParseShardKey(t *testing.T) { shardID, keyType, err := ParseShardKey("/cadence", "test-ns", "/cadence/test-ns/shards/shard-1/statistics") assert.NoError(t, err) assert.Equal(t, "shard-1", shardID) - assert.Equal(t, "statistics", keyType) + assert.Equal(t, ShardStatisticsKey, keyType) // Prefix missing _, _, err = ParseShardKey("/cadence", "test-ns", "/cadence/other/shards/shard-1/statistics") @@ -75,6 +63,11 @@ func TestParseShardKey(t *testing.T) { assert.ErrorContains(t, err, "unexpected shard key format: /cadence/test-ns/shards/shard-1/statistics/extra") } +func TestParseShardKey_InvalidType(t *testing.T) { + _, _, err := ParseShardKey("/cadence", "test-ns", "/cadence/test-ns/shards/shard-1/unknown") + assert.ErrorContains(t, err, "invalid shard key type: unknown") +} + func TestBuildMetadataKey(t *testing.T) { got := BuildMetadataKey("/cadence", "test-ns", "exec-1", "my-metadata-key") assert.Equal(t, "/cadence/test-ns/executors/exec-1/metadata/my-metadata-key", got) @@ -90,3 +83,9 @@ func TestParseExecutorKey_MetadataKey(t *testing.T) { assert.Equal(t, "exec-1", executorID) assert.Equal(t, ExecutorMetadataKey, keyType) } + +func TestParseExecutorKey_InvalidKeyType(t *testing.T) { + key := BuildExecutorIDPrefix("/cadence", "test-ns", "exec-1") + "invalid_type" + _, _, err := ParseExecutorKey("/cadence", "test-ns", key) + assert.ErrorContains(t, err, "invalid executor key type: invalid_type") +} diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index dbc6d090cf8..e43503f9ba1 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -113,18 +113,9 @@ func (s *executorStoreImpl) Stop() { // --- HeartbeatStore Implementation --- func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, executorID string, request store.HeartbeatState) error { - heartbeatETCDKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorHeartbeatKey) - if err != nil { - return fmt.Errorf("build executor heartbeat key: %w", err) - } - stateETCDKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) - if err != nil { - return fmt.Errorf("build executor status key: %w", err) - } - reportedShardsETCDKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorReportedShardsKey) - if err != nil { - return fmt.Errorf("build executor reported shards key: %w", err) - } + heartbeatETCDKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorHeartbeatKey) + stateETCDKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) + reportedShardsETCDKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorReportedShardsKey) reportedShardsData, err := json.Marshal(request.ReportedShards) if err != nil { @@ -159,11 +150,8 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec // GetHeartbeat retrieves the last known heartbeat state for a single executor. func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, executorID string) (*store.HeartbeatState, *store.AssignedState, error) { // The prefix for all keys related to a single executor. - executorPrefix, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, "") - if err != nil { - return nil, nil, fmt.Errorf("build executor prefix: %w", err) - } - resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) + executorIDPrefix := etcdkeys.BuildExecutorIDPrefix(s.prefix, namespace, executorID) + resp, err := s.client.Get(ctx, executorIDPrefix, clientv3.WithPrefix()) if err != nil { return nil, nil, fmt.Errorf("etcd get failed for executor %s: %w", executorID, err) } @@ -221,7 +209,7 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st assignedStates := make(map[string]store.AssignedState) shardStats := make(map[string]store.ShardStatistics) - executorPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace) + executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace) resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) if err != nil { return nil, fmt.Errorf("get executor data: %w", err) @@ -264,8 +252,8 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st } // Fetch shard-level statistics stored under shard namespace keys. - shardPrefix := etcdkeys.BuildShardPrefix(s.prefix, namespace) - shardResp, err := s.client.Get(ctx, shardPrefix, clientv3.WithPrefix()) + shardsPrefix := etcdkeys.BuildShardsPrefix(s.prefix, namespace) + shardResp, err := s.client.Get(ctx, shardsPrefix, clientv3.WithPrefix()) if err != nil { return nil, fmt.Errorf("get shard data: %w", err) } @@ -298,7 +286,7 @@ func (s *executorStoreImpl) SubscribeToAssignmentChanges(ctx context.Context, na func (s *executorStoreImpl) Subscribe(ctx context.Context, namespace string) (<-chan int64, error) { revisionChan := make(chan int64, 1) - watchPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace) + watchPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace) go func() { defer close(revisionChan) watchChan := s.client.Watch(ctx, watchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV()) @@ -370,10 +358,7 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, // and comparisons to check for concurrent modifications. for executorID, state := range request.NewState.ShardAssignments { // Update the executor's assigned_state key. - executorStateKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) - if err != nil { - return fmt.Errorf("build executor assigned state key: %w", err) - } + executorStateKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) value, err := json.Marshal(etcdtypes.FromAssignedState(&state)) if err != nil { return fmt.Errorf("marshal assigned shards for executor %s: %w", executorID, err) @@ -436,18 +421,9 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, } func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, executorID string) error { - assignedState, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) - if err != nil { - return fmt.Errorf("build executor assigned state key: %w", err) - } - statusKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) - if err != nil { - return fmt.Errorf("build executor status key: %w", err) - } - shardStatsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) - if err != nil { - return fmt.Errorf("build shard statistics key: %w", err) - } + assignedState := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) + statusKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) + shardStatsKey := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) // Use a read-modify-write loop to handle concurrent updates safely. for { @@ -598,11 +574,8 @@ func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace strin var ops []clientv3.Op for _, executorID := range executorIDs { - executorPrefix, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, "") - if err != nil { - return fmt.Errorf("build executor prefix: %w", err) - } - ops = append(ops, clientv3.OpDelete(executorPrefix, clientv3.WithPrefix())) + executorIDPrefix := etcdkeys.BuildExecutorIDPrefix(s.prefix, namespace, executorID) + ops = append(ops, clientv3.OpDelete(executorIDPrefix, clientv3.WithPrefix())) } if len(ops) == 0 { @@ -636,10 +609,7 @@ func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace stri } var ops []clientv3.Op for _, shardID := range shardIDs { - shardStatsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) - if err != nil { - return fmt.Errorf("build shard statistics key: %w", err) - } + shardStatsKey := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) ops = append(ops, clientv3.OpDelete(shardStatsKey)) } @@ -686,11 +656,7 @@ func (s *executorStoreImpl) prepareShardStatisticsUpdates(ctx context.Context, n continue } - shardStatisticsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) - if err != nil { - return nil, fmt.Errorf("build shard statistics key: %w", err) - } - + shardStatisticsKey := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) statsResp, err := s.client.Get(ctx, shardStatisticsKey) if err != nil { return nil, fmt.Errorf("get shard statistics: %w", err) diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go index 2a2ef1fc102..e4f1d2fa547 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -47,12 +47,9 @@ func TestRecordHeartbeat(t *testing.T) { require.NoError(t, err) // Verify directly in etcd - heartbeatKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorHeartbeatKey) - require.NoError(t, err) - stateKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorStatusKey) - require.NoError(t, err) - reportedShardsKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorReportedShardsKey) - require.NoError(t, err) + heartbeatKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorHeartbeatKey) + stateKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorStatusKey) + reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, etcdkeys.ExecutorReportedShardsKey) metadataKey1 := etcdkeys.BuildMetadataKey(tc.EtcdPrefix, tc.Namespace, executorID, "key-1") metadataKey2 := etcdkeys.BuildMetadataKey(tc.EtcdPrefix, tc.Namespace, executorID, "key-2") @@ -359,8 +356,7 @@ func TestSubscribe(t *testing.T) { require.NoError(t, err) // Manually put a heartbeat update, which is an insignificant change - heartbeatKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "heartbeat") - require.NoError(t, err) + heartbeatKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "heartbeat") _, err = tc.Client.Put(ctx, heartbeatKey, "timestamp") require.NoError(t, err) @@ -372,8 +368,7 @@ func TestSubscribe(t *testing.T) { } // Now update the reported shards, which IS a significant change - reportedShardsKey, err := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards") - require.NoError(t, err) + reportedShardsKey := etcdkeys.BuildExecutorKey(tc.EtcdPrefix, tc.Namespace, executorID, "reported_shards") _, err = tc.Client.Put(ctx, reportedShardsKey, `{"shard-1":{"status":"running"}}`) require.NoError(t, err) @@ -460,7 +455,7 @@ func TestParseExecutorKey_Errors(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "does not have expected prefix") - key := etcdkeys.BuildExecutorPrefix(tc.EtcdPrefix, tc.Namespace) + "too/many/parts" + key := etcdkeys.BuildExecutorsPrefix(tc.EtcdPrefix, tc.Namespace) + "too/many/parts" _, _, err = etcdkeys.ParseExecutorKey(tc.EtcdPrefix, tc.Namespace, key) require.Error(t, err) assert.Contains(t, err.Error(), "unexpected key format") @@ -542,8 +537,7 @@ func TestShardStatisticsPersistence(t *testing.T) { // 2. Pre-create shard statistics as if coming from prior history stats := store.ShardStatistics{SmoothedLoad: 12.5, LastUpdateTime: time.Unix(1234, 0).UTC(), LastMoveTime: time.Unix(5678, 0).UTC()} - shardStatsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey) - require.NoError(t, err) + shardStatsKey := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey) payload, err := json.Marshal(etcdtypes.FromShardStatistics(&stats)) require.NoError(t, err) _, err = tc.Client.Put(ctx, shardStatsKey, string(payload)) diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go index f35ba4b73fc..5f7be93b284 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go @@ -33,7 +33,7 @@ type namespaceShardToExecutor struct { func newNamespaceShardToExecutor(etcdPrefix, namespace string, client *clientv3.Client, stopCh chan struct{}, logger log.Logger) (*namespaceShardToExecutor, error) { // Start listening - watchPrefix := etcdkeys.BuildExecutorPrefix(etcdPrefix, namespace) + watchPrefix := etcdkeys.BuildExecutorsPrefix(etcdPrefix, namespace) watchChan := client.Watch(context.Background(), watchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV()) return &namespaceShardToExecutor{ @@ -89,10 +89,7 @@ func (n *namespaceShardToExecutor) GetExecutorModRevisionCmp() ([]clientv3.Cmp, defer n.RUnlock() comparisons := []clientv3.Cmp{} for executor, revision := range n.executorRevision { - executorAssignedStateKey, err := etcdkeys.BuildExecutorKey(n.etcdPrefix, n.namespace, executor, etcdkeys.ExecutorAssignedStateKey) - if err != nil { - return nil, fmt.Errorf("build executor assigned state key: %w", err) - } + executorAssignedStateKey := etcdkeys.BuildExecutorKey(n.etcdPrefix, n.namespace, executor, etcdkeys.ExecutorAssignedStateKey) comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorAssignedStateKey), "=", revision)) } @@ -151,7 +148,7 @@ func (n *namespaceShardToExecutor) refresh(ctx context.Context) error { } func (n *namespaceShardToExecutor) refreshExecutorState(ctx context.Context) error { - executorPrefix := etcdkeys.BuildExecutorPrefix(n.etcdPrefix, n.namespace) + executorPrefix := etcdkeys.BuildExecutorsPrefix(n.etcdPrefix, n.namespace) resp, err := n.client.Get(ctx, executorPrefix, clientv3.WithPrefix()) if err != nil { From 00b22196d4cdd8aa21ba2c8a0af30c2143a4ca00 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 18 Nov 2025 12:57:58 +0100 Subject: [PATCH 3/5] fix etcd keys --- .../store/etcd/executorstore/etcdstore.go | 22 +++++++------------ .../shardcache/namespaceshardcache_test.go | 3 +-- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index e43503f9ba1..2961dee51d7 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -113,9 +113,9 @@ func (s *executorStoreImpl) Stop() { // --- HeartbeatStore Implementation --- func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, executorID string, request store.HeartbeatState) error { - heartbeatETCDKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorHeartbeatKey) - stateETCDKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) - reportedShardsETCDKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorReportedShardsKey) + heartbeatKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorHeartbeatKey) + stateKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorStatusKey) + reportedShardsKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorReportedShardsKey) reportedShardsData, err := json.Marshal(request.ReportedShards) if err != nil { @@ -129,9 +129,9 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec // Build all operations including metadata ops := []clientv3.Op{ - clientv3.OpPut(heartbeatETCDKey, etcdtypes.FromTime(request.LastHeartbeat)), - clientv3.OpPut(stateETCDKey, string(jsonState)), - clientv3.OpPut(reportedShardsETCDKey, string(reportedShardsData)), + clientv3.OpPut(heartbeatKey, etcdtypes.FromTime(request.LastHeartbeat)), + clientv3.OpPut(stateKey, string(jsonState)), + clientv3.OpPut(reportedShardsKey, string(reportedShardsData)), } for key, value := range request.Metadata { metadataKey := etcdkeys.BuildMetadataKey(s.prefix, namespace, executorID, key) @@ -337,20 +337,14 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string, // 1. Prepare operations to delete stale executors and add comparisons to ensure they haven't been modified for executorID, expectedModRevision := range request.ExecutorsToDelete { // Build the assigned state key to check for concurrent modifications - executorStateKey, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) - if err != nil { - return fmt.Errorf("build executor assigned state key for comparison: %w", err) - } + executorStateKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) // Add a comparison to ensure the executor's assigned state hasn't changed // This prevents deleting an executor that just received a shard assignment comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(executorStateKey), "=", expectedModRevision)) // Delete all keys for this executor - executorPrefix, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, "") - if err != nil { - return fmt.Errorf("build executor prefix key for deletion: %w", err) - } + executorPrefix := etcdkeys.BuildExecutorIDPrefix(s.prefix, namespace, executorID) ops = append(ops, clientv3.OpDelete(executorPrefix, clientv3.WithPrefix())) } diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go index dd5061f608f..e708efefe14 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go @@ -29,8 +29,7 @@ func setupExecutorWithShards(t *testing.T, testCluster *testhelper.StoreTestClus assignedStateJSON, err := json.Marshal(assignedState) require.NoError(t, err) - executorAssignedStateKey, err := etcdkeys.BuildExecutorKey(testCluster.EtcdPrefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) - require.NoError(t, err) + executorAssignedStateKey := etcdkeys.BuildExecutorKey(testCluster.EtcdPrefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey) testCluster.Client.Put(context.Background(), executorAssignedStateKey, string(assignedStateJSON)) // Add metadata From 66ce47eeb792f8ac53bad0576e4782397bdd3a9b Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 18 Nov 2025 15:03:08 +0100 Subject: [PATCH 4/5] add more tests --- common/types/sharddistributor.go | 4 +- ...rddistributor_statuses_enumer_generated.go | 110 ++++---- .../store/etcd/etcdkeys/etcdkeys.go | 6 +- .../store/etcd/etcdtypes/state_test.go | 243 ++++++++++++++++++ .../store/etcd/etcdtypes/time.go | 31 ++- .../store/etcd/etcdtypes/time_test.go | 217 ++++++++++++++++ .../store/etcd/executorstore/etcdstore.go | 6 +- .../etcd/executorstore/etcdstore_test.go | 2 +- .../shardcache/namespaceshardcache_test.go | 3 +- 9 files changed, 549 insertions(+), 73 deletions(-) create mode 100644 service/sharddistributor/store/etcd/etcdtypes/state_test.go create mode 100644 service/sharddistributor/store/etcd/etcdtypes/time_test.go diff --git a/common/types/sharddistributor.go b/common/types/sharddistributor.go index 7db14d3c9ff..3e4b960f69d 100644 --- a/common/types/sharddistributor.go +++ b/common/types/sharddistributor.go @@ -24,7 +24,7 @@ package types import "fmt" -//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go +//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -trimprefix=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go type GetShardOwnerRequest struct { ShardKey string @@ -198,7 +198,7 @@ func (v *ExecutorHeartbeatResponse) GetMigrationPhase() (o MigrationMode) { } type ShardAssignment struct { - Status AssignmentStatus + Status AssignmentStatus `json:"status"` } func (v *ShardAssignment) GetStatus() (o AssignmentStatus) { diff --git a/common/types/sharddistributor_statuses_enumer_generated.go b/common/types/sharddistributor_statuses_enumer_generated.go index 9e1f0f873e0..b120dfe7b66 100644 --- a/common/types/sharddistributor_statuses_enumer_generated.go +++ b/common/types/sharddistributor_statuses_enumer_generated.go @@ -1,4 +1,4 @@ -// Code generated by "enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go"; DO NOT EDIT. +// Code generated by "enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -trimprefix=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go"; DO NOT EDIT. package types @@ -8,11 +8,11 @@ import ( "strings" ) -const _ExecutorStatusName = "ExecutorStatusINVALIDExecutorStatusACTIVEExecutorStatusDRAININGExecutorStatusDRAINED" +const _ExecutorStatusName = "INVALIDACTIVEDRAININGDRAINED" -var _ExecutorStatusIndex = [...]uint8{0, 21, 41, 63, 84} +var _ExecutorStatusIndex = [...]uint8{0, 7, 13, 21, 28} -const _ExecutorStatusLowerName = "executorstatusinvalidexecutorstatusactiveexecutorstatusdrainingexecutorstatusdrained" +const _ExecutorStatusLowerName = "invalidactivedrainingdrained" func (i ExecutorStatus) String() string { if i < 0 || i >= ExecutorStatus(len(_ExecutorStatusIndex)-1) { @@ -34,21 +34,21 @@ func _ExecutorStatusNoOp() { var _ExecutorStatusValues = []ExecutorStatus{ExecutorStatusINVALID, ExecutorStatusACTIVE, ExecutorStatusDRAINING, ExecutorStatusDRAINED} var _ExecutorStatusNameToValueMap = map[string]ExecutorStatus{ - _ExecutorStatusName[0:21]: ExecutorStatusINVALID, - _ExecutorStatusLowerName[0:21]: ExecutorStatusINVALID, - _ExecutorStatusName[21:41]: ExecutorStatusACTIVE, - _ExecutorStatusLowerName[21:41]: ExecutorStatusACTIVE, - _ExecutorStatusName[41:63]: ExecutorStatusDRAINING, - _ExecutorStatusLowerName[41:63]: ExecutorStatusDRAINING, - _ExecutorStatusName[63:84]: ExecutorStatusDRAINED, - _ExecutorStatusLowerName[63:84]: ExecutorStatusDRAINED, + _ExecutorStatusName[0:7]: ExecutorStatusINVALID, + _ExecutorStatusLowerName[0:7]: ExecutorStatusINVALID, + _ExecutorStatusName[7:13]: ExecutorStatusACTIVE, + _ExecutorStatusLowerName[7:13]: ExecutorStatusACTIVE, + _ExecutorStatusName[13:21]: ExecutorStatusDRAINING, + _ExecutorStatusLowerName[13:21]: ExecutorStatusDRAINING, + _ExecutorStatusName[21:28]: ExecutorStatusDRAINED, + _ExecutorStatusLowerName[21:28]: ExecutorStatusDRAINED, } var _ExecutorStatusNames = []string{ - _ExecutorStatusName[0:21], - _ExecutorStatusName[21:41], - _ExecutorStatusName[41:63], - _ExecutorStatusName[63:84], + _ExecutorStatusName[0:7], + _ExecutorStatusName[7:13], + _ExecutorStatusName[13:21], + _ExecutorStatusName[21:28], } // ExecutorStatusString retrieves an enum value from the enum constants string name. @@ -103,11 +103,11 @@ func (i *ExecutorStatus) UnmarshalJSON(data []byte) error { return err } -const _ShardStatusName = "ShardStatusINVALIDShardStatusREADYShardStatusDONE" +const _ShardStatusName = "INVALIDREADYDONE" -var _ShardStatusIndex = [...]uint8{0, 18, 34, 49} +var _ShardStatusIndex = [...]uint8{0, 7, 12, 16} -const _ShardStatusLowerName = "shardstatusinvalidshardstatusreadyshardstatusdone" +const _ShardStatusLowerName = "invalidreadydone" func (i ShardStatus) String() string { if i < 0 || i >= ShardStatus(len(_ShardStatusIndex)-1) { @@ -128,18 +128,18 @@ func _ShardStatusNoOp() { var _ShardStatusValues = []ShardStatus{ShardStatusINVALID, ShardStatusREADY, ShardStatusDONE} var _ShardStatusNameToValueMap = map[string]ShardStatus{ - _ShardStatusName[0:18]: ShardStatusINVALID, - _ShardStatusLowerName[0:18]: ShardStatusINVALID, - _ShardStatusName[18:34]: ShardStatusREADY, - _ShardStatusLowerName[18:34]: ShardStatusREADY, - _ShardStatusName[34:49]: ShardStatusDONE, - _ShardStatusLowerName[34:49]: ShardStatusDONE, + _ShardStatusName[0:7]: ShardStatusINVALID, + _ShardStatusLowerName[0:7]: ShardStatusINVALID, + _ShardStatusName[7:12]: ShardStatusREADY, + _ShardStatusLowerName[7:12]: ShardStatusREADY, + _ShardStatusName[12:16]: ShardStatusDONE, + _ShardStatusLowerName[12:16]: ShardStatusDONE, } var _ShardStatusNames = []string{ - _ShardStatusName[0:18], - _ShardStatusName[18:34], - _ShardStatusName[34:49], + _ShardStatusName[0:7], + _ShardStatusName[7:12], + _ShardStatusName[12:16], } // ShardStatusString retrieves an enum value from the enum constants string name. @@ -194,11 +194,11 @@ func (i *ShardStatus) UnmarshalJSON(data []byte) error { return err } -const _AssignmentStatusName = "AssignmentStatusINVALIDAssignmentStatusREADY" +const _AssignmentStatusName = "INVALIDREADY" -var _AssignmentStatusIndex = [...]uint8{0, 23, 44} +var _AssignmentStatusIndex = [...]uint8{0, 7, 12} -const _AssignmentStatusLowerName = "assignmentstatusinvalidassignmentstatusready" +const _AssignmentStatusLowerName = "invalidready" func (i AssignmentStatus) String() string { if i < 0 || i >= AssignmentStatus(len(_AssignmentStatusIndex)-1) { @@ -218,15 +218,15 @@ func _AssignmentStatusNoOp() { var _AssignmentStatusValues = []AssignmentStatus{AssignmentStatusINVALID, AssignmentStatusREADY} var _AssignmentStatusNameToValueMap = map[string]AssignmentStatus{ - _AssignmentStatusName[0:23]: AssignmentStatusINVALID, - _AssignmentStatusLowerName[0:23]: AssignmentStatusINVALID, - _AssignmentStatusName[23:44]: AssignmentStatusREADY, - _AssignmentStatusLowerName[23:44]: AssignmentStatusREADY, + _AssignmentStatusName[0:7]: AssignmentStatusINVALID, + _AssignmentStatusLowerName[0:7]: AssignmentStatusINVALID, + _AssignmentStatusName[7:12]: AssignmentStatusREADY, + _AssignmentStatusLowerName[7:12]: AssignmentStatusREADY, } var _AssignmentStatusNames = []string{ - _AssignmentStatusName[0:23], - _AssignmentStatusName[23:44], + _AssignmentStatusName[0:7], + _AssignmentStatusName[7:12], } // AssignmentStatusString retrieves an enum value from the enum constants string name. @@ -281,11 +281,11 @@ func (i *AssignmentStatus) UnmarshalJSON(data []byte) error { return err } -const _MigrationModeName = "MigrationModeINVALIDMigrationModeLOCALPASSTHROUGHMigrationModeLOCALPASSTHROUGHSHADOWMigrationModeDISTRIBUTEDPASSTHROUGHMigrationModeONBOARDED" +const _MigrationModeName = "INVALIDLOCALPASSTHROUGHLOCALPASSTHROUGHSHADOWDISTRIBUTEDPASSTHROUGHONBOARDED" -var _MigrationModeIndex = [...]uint8{0, 20, 49, 84, 119, 141} +var _MigrationModeIndex = [...]uint8{0, 7, 23, 45, 67, 76} -const _MigrationModeLowerName = "migrationmodeinvalidmigrationmodelocalpassthroughmigrationmodelocalpassthroughshadowmigrationmodedistributedpassthroughmigrationmodeonboarded" +const _MigrationModeLowerName = "invalidlocalpassthroughlocalpassthroughshadowdistributedpassthroughonboarded" func (i MigrationMode) String() string { if i < 0 || i >= MigrationMode(len(_MigrationModeIndex)-1) { @@ -308,24 +308,24 @@ func _MigrationModeNoOp() { var _MigrationModeValues = []MigrationMode{MigrationModeINVALID, MigrationModeLOCALPASSTHROUGH, MigrationModeLOCALPASSTHROUGHSHADOW, MigrationModeDISTRIBUTEDPASSTHROUGH, MigrationModeONBOARDED} var _MigrationModeNameToValueMap = map[string]MigrationMode{ - _MigrationModeName[0:20]: MigrationModeINVALID, - _MigrationModeLowerName[0:20]: MigrationModeINVALID, - _MigrationModeName[20:49]: MigrationModeLOCALPASSTHROUGH, - _MigrationModeLowerName[20:49]: MigrationModeLOCALPASSTHROUGH, - _MigrationModeName[49:84]: MigrationModeLOCALPASSTHROUGHSHADOW, - _MigrationModeLowerName[49:84]: MigrationModeLOCALPASSTHROUGHSHADOW, - _MigrationModeName[84:119]: MigrationModeDISTRIBUTEDPASSTHROUGH, - _MigrationModeLowerName[84:119]: MigrationModeDISTRIBUTEDPASSTHROUGH, - _MigrationModeName[119:141]: MigrationModeONBOARDED, - _MigrationModeLowerName[119:141]: MigrationModeONBOARDED, + _MigrationModeName[0:7]: MigrationModeINVALID, + _MigrationModeLowerName[0:7]: MigrationModeINVALID, + _MigrationModeName[7:23]: MigrationModeLOCALPASSTHROUGH, + _MigrationModeLowerName[7:23]: MigrationModeLOCALPASSTHROUGH, + _MigrationModeName[23:45]: MigrationModeLOCALPASSTHROUGHSHADOW, + _MigrationModeLowerName[23:45]: MigrationModeLOCALPASSTHROUGHSHADOW, + _MigrationModeName[45:67]: MigrationModeDISTRIBUTEDPASSTHROUGH, + _MigrationModeLowerName[45:67]: MigrationModeDISTRIBUTEDPASSTHROUGH, + _MigrationModeName[67:76]: MigrationModeONBOARDED, + _MigrationModeLowerName[67:76]: MigrationModeONBOARDED, } var _MigrationModeNames = []string{ - _MigrationModeName[0:20], - _MigrationModeName[20:49], - _MigrationModeName[49:84], - _MigrationModeName[84:119], - _MigrationModeName[119:141], + _MigrationModeName[0:7], + _MigrationModeName[7:23], + _MigrationModeName[23:45], + _MigrationModeName[45:67], + _MigrationModeName[67:76], } // MigrationModeString retrieves an enum value from the enum constants string name. diff --git a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go index f0d25c8a05f..bb83ea78f43 100644 --- a/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go +++ b/service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go @@ -50,8 +50,8 @@ var validExecutorKeyTypes = map[ExecutorKeyType]struct{}{ ExecutorMetadataKey: {}, } -// isValidExecutorKeyType checks if the provided key type is valid. -func isValidExecutorKeyType(keyType ExecutorKeyType) bool { +// IsValidExecutorKeyType checks if the provided key type is valid. +func IsValidExecutorKeyType(keyType ExecutorKeyType) bool { _, exist := validExecutorKeyTypes[keyType] return exist } @@ -85,7 +85,7 @@ func ParseExecutorKey(prefix, namespace, key string) (executorID string, keyType if len(parts) != 2 { return "", "", fmt.Errorf("unexpected key format: %s", key) } - if !isValidExecutorKeyType(ExecutorKeyType(parts[1])) { + if !IsValidExecutorKeyType(ExecutorKeyType(parts[1])) { return "", "", fmt.Errorf("invalid executor key type: %s", parts[1]) } return parts[0], ExecutorKeyType(parts[1]), nil diff --git a/service/sharddistributor/store/etcd/etcdtypes/state_test.go b/service/sharddistributor/store/etcd/etcdtypes/state_test.go new file mode 100644 index 00000000000..c2f104235f2 --- /dev/null +++ b/service/sharddistributor/store/etcd/etcdtypes/state_test.go @@ -0,0 +1,243 @@ +package etcdtypes + +import ( + "encoding/json" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/store" +) + +func TestAssignedState_FieldNumberMatched(t *testing.T) { + require.Equal(t, + reflect.TypeOf(AssignedState{}).NumField(), + reflect.TypeOf(store.AssignedState{}).NumField(), + "AssignedState field count mismatch with store.AssignedState; ensure conversion is updated", + ) +} + +func TestAssignedState_ToAssignedState(t *testing.T) { + tests := map[string]struct { + input *AssignedState + expect *store.AssignedState + }{ + "nil": { + input: nil, + expect: nil, + }, + "success": { + input: &AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "1": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + ModRevision: 42, + }, + expect: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "1": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC), + ModRevision: 42, + }, + }, + } + + for name, c := range tests { + t.Run(name, func(t *testing.T) { + got := c.input.ToAssignedState() + + if c.expect == nil { + require.Nil(t, got) + return + } + + require.NotNil(t, got) + require.Equal(t, len(c.input.AssignedShards), len(got.AssignedShards)) + for k := range c.input.AssignedShards { + require.Equal(t, c.input.AssignedShards[k].Status, got.AssignedShards[k].Status) + } + require.Equal(t, time.Time(c.input.LastUpdated).UnixNano(), got.LastUpdated.UnixNano()) + require.Equal(t, c.input.ModRevision, got.ModRevision) + }) + } +} +func TestAssignedState_FromAssignedState(t *testing.T) { + tests := map[string]struct { + input *store.AssignedState + expect *AssignedState + }{ + "nil": { + input: nil, + expect: nil, + }, + "success": { + input: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "9": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: time.Date(2025, 11, 18, 13, 0, 0, 987654321, time.UTC), + ModRevision: 77, + }, + expect: &AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "9": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: Time(time.Date(2025, 11, 18, 13, 0, 0, 987654321, time.UTC)), + ModRevision: 77, + }, + }, + } + + for name, c := range tests { + t.Run(name, func(t *testing.T) { + got := FromAssignedState(c.input) + if c.expect == nil { + require.Nil(t, got) + return + } + require.NotNil(t, got) + require.Equal(t, len(c.input.AssignedShards), len(got.AssignedShards)) + for k := range c.input.AssignedShards { + require.Equal(t, c.input.AssignedShards[k].Status, got.AssignedShards[k].Status) + } + require.Equal(t, c.input.LastUpdated.UnixNano(), time.Time(got.LastUpdated).UnixNano()) + require.Equal(t, c.input.ModRevision, got.ModRevision) + }) + } +} + +func TestAssignedState_JSONMarshalling(t *testing.T) { + const jsonStr = `{"assigned_shards":{"1":{"status":"READY"}},"last_updated":"2025-11-18T12:00:00.123456789Z","mod_revision":42}` + + state := &AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "1": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + ModRevision: 42, + } + + // Marshal to JSON + b, err := json.Marshal(state) + require.NoError(t, err) + require.JSONEq(t, jsonStr, string(b)) + + // Unmarshal from JSON + var unmarshalled AssignedState + err = json.Unmarshal([]byte(jsonStr), &unmarshalled) + require.NoError(t, err) + require.Equal(t, state.AssignedShards["1"].Status, unmarshalled.AssignedShards["1"].Status) + require.Equal(t, time.Time(state.LastUpdated).UnixNano(), time.Time(unmarshalled.LastUpdated).UnixNano()) + require.Equal(t, state.ModRevision, unmarshalled.ModRevision) +} +func TestShardStatistics_FieldNumberMatched(t *testing.T) { + require.Equal(t, + reflect.TypeOf(ShardStatistics{}).NumField(), + reflect.TypeOf(store.ShardStatistics{}).NumField(), + "ShardStatistics field count mismatch with store.ShardStatistics; ensure conversion is updated", + ) +} + +func TestShardStatistics_ToShardStatistics(t *testing.T) { + tests := map[string]struct { + input *ShardStatistics + expect *store.ShardStatistics + }{ + "nil": { + input: nil, + expect: nil, + }, + "success": { + input: &ShardStatistics{ + SmoothedLoad: 12.34, + LastUpdateTime: Time(time.Date(2025, 11, 18, 14, 0, 0, 111111111, time.UTC)), + LastMoveTime: Time(time.Date(2025, 11, 18, 15, 0, 0, 222222222, time.UTC)), + }, + expect: &store.ShardStatistics{ + SmoothedLoad: 12.34, + LastUpdateTime: time.Date(2025, 11, 18, 14, 0, 0, 111111111, time.UTC), + LastMoveTime: time.Date(2025, 11, 18, 15, 0, 0, 222222222, time.UTC), + }, + }, + } + + for name, c := range tests { + t.Run(name, func(t *testing.T) { + got := c.input.ToShardStatistics() + if c.expect == nil { + require.Nil(t, got) + return + } + require.NotNil(t, got) + require.Equal(t, c.input.SmoothedLoad, got.SmoothedLoad) + require.Equal(t, time.Time(c.input.LastUpdateTime).UnixNano(), got.LastUpdateTime.UnixNano()) + require.Equal(t, time.Time(c.input.LastMoveTime).UnixNano(), got.LastMoveTime.UnixNano()) + }) + } +} + +func TestShardStatistics_FromShardStatistics(t *testing.T) { + tests := map[string]struct { + input *store.ShardStatistics + expect *ShardStatistics + }{ + "nil": { + input: nil, + expect: nil, + }, + "success": { + input: &store.ShardStatistics{ + SmoothedLoad: 99.01, + LastUpdateTime: time.Date(2025, 11, 18, 16, 0, 0, 333333333, time.UTC), + LastMoveTime: time.Date(2025, 11, 18, 17, 0, 0, 444444444, time.UTC), + }, + expect: &ShardStatistics{ + SmoothedLoad: 99.01, + LastUpdateTime: Time(time.Date(2025, 11, 18, 16, 0, 0, 333333333, time.UTC)), + LastMoveTime: Time(time.Date(2025, 11, 18, 17, 0, 0, 444444444, time.UTC)), + }, + }, + } + + for name, c := range tests { + t.Run(name, func(t *testing.T) { + got := FromShardStatistics(c.input) + if c.expect == nil { + require.Nil(t, got) + return + } + require.NotNil(t, got) + require.InDelta(t, c.input.SmoothedLoad, got.SmoothedLoad, 0.0000001) + require.Equal(t, c.input.LastUpdateTime.UnixNano(), time.Time(got.LastUpdateTime).UnixNano()) + require.Equal(t, c.input.LastMoveTime.UnixNano(), time.Time(got.LastMoveTime).UnixNano()) + }) + } +} + +func TestShardStatistics_JSONMarshalling(t *testing.T) { + const jsonStr = `{"smoothed_load":12.34,"last_update_time":"2025-11-18T14:00:00.111111111Z","last_move_time":"2025-11-18T15:00:00.222222222Z"}` + + state := &ShardStatistics{ + SmoothedLoad: 12.34, + LastUpdateTime: Time(time.Date(2025, 11, 18, 14, 0, 0, 111111111, time.UTC)), + LastMoveTime: Time(time.Date(2025, 11, 18, 15, 0, 0, 222222222, time.UTC)), + } + + // Marshal to JSON + b, err := json.Marshal(state) + require.NoError(t, err) + require.JSONEq(t, jsonStr, string(b)) + + // Unmarshal from JSON + var unmarshalled ShardStatistics + err = json.Unmarshal([]byte(jsonStr), &unmarshalled) + require.NoError(t, err) + require.InDelta(t, state.SmoothedLoad, unmarshalled.SmoothedLoad, 0.0000001) + require.Equal(t, time.Time(state.LastUpdateTime).UnixNano(), time.Time(unmarshalled.LastUpdateTime).UnixNano()) + require.Equal(t, time.Time(state.LastMoveTime).UnixNano(), time.Time(unmarshalled.LastMoveTime).UnixNano()) +} diff --git a/service/sharddistributor/store/etcd/etcdtypes/time.go b/service/sharddistributor/store/etcd/etcdtypes/time.go index 744f963db12..f77f1bca536 100644 --- a/service/sharddistributor/store/etcd/etcdtypes/time.go +++ b/service/sharddistributor/store/etcd/etcdtypes/time.go @@ -7,15 +7,30 @@ import "time" // Convert to UTC before storing/parsing to ensure consistency. type Time time.Time +// ToTimePtr converts *time.Time to *Time. +func ToTimePtr(t *time.Time) *Time { + if t == nil { + return nil + } + tt := Time(*t) + return &tt +} + // ToTime converts Time back to time.Time. func (t Time) ToTime() time.Time { return time.Time(t) } +// ToTimePtr converts Time back to *time.Time. +func (t Time) ToTimePtr() *time.Time { + tt := time.Time(t) + return &tt +} + // MarshalJSON implements the json.Marshaler interface. // It encodes the time in time.RFC3339Nano format. func (t Time) MarshalJSON() ([]byte, error) { - s := time.Time(t).UTC().Format(time.RFC3339Nano) + s := FormatTime(time.Time(t)) return []byte(`"` + s + `"`), nil } @@ -26,25 +41,25 @@ func (t *Time) UnmarshalJSON(data []byte) error { if len(str) >= 2 && str[0] == '"' && str[len(str)-1] == '"' { str = str[1 : len(str)-1] } - parsed, err := time.Parse(time.RFC3339Nano, str) + parsed, err := ParseTime(str) if err != nil { return err } - *t = Time(parsed.UTC()) + *t = Time(parsed) return nil } -// ToTime parses a string in time.RFC3339Nano format and returns a time.Time in UTC. -func ToTime(s string) (time.Time, error) { - parsed, err := time.Parse(time.RFC3339Nano, s) +// ParseTime parses a string in time.RFC3339Nano format and returns a time.Time in UTC. +func ParseTime(s string) (time.Time, error) { + parsed, err := time.ParseInLocation(time.RFC3339Nano, s, time.UTC) if err != nil { return time.Time{}, err } return parsed.UTC(), nil } -// FromTime converts time.Time to UTC and +// FormatTime converts time.Time to UTC and // formats time.Time to a string in time.RFC3339Nano format. -func FromTime(t time.Time) string { +func FormatTime(t time.Time) string { return t.UTC().Format(time.RFC3339Nano) } diff --git a/service/sharddistributor/store/etcd/etcdtypes/time_test.go b/service/sharddistributor/store/etcd/etcdtypes/time_test.go new file mode 100644 index 00000000000..0ea6d4ac903 --- /dev/null +++ b/service/sharddistributor/store/etcd/etcdtypes/time_test.go @@ -0,0 +1,217 @@ +package etcdtypes + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestTimeToTime verifies ToTime returns the original time.Time value. +func TestTimeToTime(t *testing.T) { + now := time.Now() + require.Equal(t, now, Time(now).ToTime()) +} + +func TestTimeToTimePtr_NilInput_ReturnsNil(t *testing.T) { + var input *time.Time + result := ToTimePtr(input) + require.Nil(t, result) +} + +func TestTimeToTimePtr_ValidInput_ReturnsPointer(t *testing.T) { + now := time.Now() + result := ToTimePtr(&now) + require.NotNil(t, result) + require.Equal(t, Time(now), *result) +} + +func TestTimeToTimePtr_ConvertsBackToTimePtr(t *testing.T) { + now := time.Now() + wrapped := Time(now) + result := wrapped.ToTimePtr() + require.NotNil(t, result) + require.Equal(t, now, *result) +} + +func TestTimeMarshalJSON(t *testing.T) { + for name, c := range map[string]struct { + input Time + expectOutput string + expectErr string + }{ + "zero time": { + input: Time(time.Time{}), + expectOutput: `"0001-01-01T00:00:00Z"`, + expectErr: "", + }, + "utc time with nanos": { + input: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + expectOutput: `"2025-11-18T12:00:00.123456789Z"`, + expectErr: "", + }, + "non-UTC time": { + input: Time(time.Date(2025, 11, 18, 1, 2, 3, 456789012, time.FixedZone("X", -2*3600))), + expectOutput: `"2025-11-18T03:02:03.456789012Z"`, + expectErr: "", + }, + } { + t.Run(name, func(t *testing.T) { + output, err := c.input.MarshalJSON() + require.Equal(t, c.expectOutput, string(output)) + if c.expectErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), c.expectErr) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestTimeUnmarshalJSON(t *testing.T) { + for name, c := range map[string]struct { + input []byte + expectTime Time + expectErr string + }{ + "zero time": { + input: []byte(`"0001-01-01T00:00:00Z"`), + expectTime: Time(time.Time{}), + expectErr: "", + }, + "valid RFC3339Nano": { + input: []byte(`"2025-11-18T12:00:00.123456789Z"`), + expectTime: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + expectErr: "", + }, + "non-UTC time": { + input: []byte(`"2025-11-17T23:02:03.456789012Z"`), + expectTime: Time(time.Date(2025, 11, 17, 23, 2, 3, 456789012, time.UTC)), + expectErr: "", + }, + "invalid format": { + input: []byte(`"not-a-time"`), + expectTime: Time{}, + expectErr: "cannot parse", + }, + "empty string": { + input: []byte(`""`), + expectTime: Time{}, + expectErr: "cannot parse", + }, + } { + t.Run(name, func(t *testing.T) { + var tim Time + err := json.Unmarshal(c.input, &tim) + require.Equal(t, c.expectTime, tim) + if c.expectErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), c.expectErr) + } else { + require.NoError(t, err) + } + }) + } +} + +// TestParseTimeSuccess checks valid parsing scenarios. +func TestParseTimeSuccess(t *testing.T) { + for name, c := range map[string]struct { + input string + expected time.Time + }{ + "with nanos": { + input: "2025-11-18T12:00:00.123456789Z", + expected: time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC), + }, + "without nanos": { + input: "2000-01-01T00:00:00Z", + expected: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + }, + "non-UTC time": { + input: "2025-11-18T01:02:03.456789012-02:00", + expected: time.Date(2025, 11, 18, 3, 2, 3, 456789012, time.UTC), + }, + } { + t.Run(name, func(t *testing.T) { + parsed, err := ParseTime(c.input) + require.NoError(t, err, "unexpected error for input %q", c.input) + assert.Equal(t, c.expected, parsed) + assert.Equal(t, c.expected.UnixNano(), parsed.UnixNano(), "ParseTime mismatch got %v want %v", parsed, c.expected) + assert.Equal(t, time.UTC, parsed.Location(), "ParseTime location mismatch got %v want %v", parsed.Location(), time.UTC) + }) + } +} + +func TestParseTime_Failures(t *testing.T) { + for name, c := range map[string]struct { + input string + }{ + "not-a-time": {input: "not-a-time"}, + "empty string": {input: ""}, + "invalid month": {input: "2025-13-01T00:00:00Z"}, + } { + t.Run(name, func(t *testing.T) { + _, err := ParseTime(c.input) + assert.Error(t, err, "[%s] expected error for input %q", name, c.input) + }) + } +} + +func TestFormatTime(t *testing.T) { + for name, c := range map[string]struct { + input time.Time + expected string + }{ + "non-UTC with nanos": { + input: time.Date(2025, 11, 17, 21, 2, 3, 456789012, time.FixedZone("X", -2*3600)), + expected: "2025-11-17T23:02:03.456789012Z", + }, + "non-UTC without nanos": { + input: time.Date(2000, 1, 1, 0, 0, 0, 0, time.FixedZone("Y", +5*3600)), + expected: "1999-12-31T19:00:00Z", + }, + "zero time": { + input: time.Time{}, + expected: "0001-01-01T00:00:00Z", + }, + } { + t.Run(name, func(t *testing.T) { + str := FormatTime(c.input) + assert.Equal(t, c.expected, str) + }) + } +} + +// TestTime_JSONMarshalling ensures Marshal and Unmarshal retains nanosecond precision. +func TestTime_JSONMarshalling(t *testing.T) { + for name, c := range map[string]struct { + input time.Time + }{ + "zero time": { + input: time.Time{}, + }, + "start of 2025": { + input: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), + }, + "now with nanos": { + input: time.Now().UTC(), + }, + "max nanos": { + input: time.Date(2025, 6, 30, 23, 59, 59, 999999999, time.UTC), + }, + } { + t.Run(name, func(t *testing.T) { + wrapped := Time(c.input) + b, err := json.Marshal(wrapped) + require.NoError(t, err, "marshal error") + + var back Time + require.NoError(t, json.Unmarshal(b, &back), "unmarshal error") + assert.Equal(t, c.input.UTC().UnixNano(), time.Time(back).UnixNano(), "round-trip lost precision: got %v want %v", time.Time(back), c.input.UTC()) + }) + } +} diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index 2961dee51d7..edba090876b 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -129,7 +129,7 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec // Build all operations including metadata ops := []clientv3.Op{ - clientv3.OpPut(heartbeatKey, etcdtypes.FromTime(request.LastHeartbeat)), + clientv3.OpPut(heartbeatKey, etcdtypes.FormatTime(request.LastHeartbeat)), clientv3.OpPut(stateKey, string(jsonState)), clientv3.OpPut(reportedShardsKey, string(reportedShardsData)), } @@ -175,7 +175,7 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, found = true // We found at least one valid key part for the executor. switch keyType { case etcdkeys.ExecutorHeartbeatKey: - heartbeatState.LastHeartbeat, err = etcdtypes.ToTime(value) + heartbeatState.LastHeartbeat, err = etcdtypes.ParseTime(value) if err != nil { return nil, nil, fmt.Errorf("parse heartbeat timestamp: %w", err) } @@ -227,7 +227,7 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st switch keyType { case etcdkeys.ExecutorHeartbeatKey: - heartbeat.LastHeartbeat, err = etcdtypes.ToTime(value) + heartbeat.LastHeartbeat, err = etcdtypes.ParseTime(value) if err != nil { return nil, fmt.Errorf("parse heartbeat timestamp: %w", err) } diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go index e4f1d2fa547..490a72da967 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -56,7 +56,7 @@ func TestRecordHeartbeat(t *testing.T) { resp, err := tc.Client.Get(ctx, heartbeatKey) require.NoError(t, err) assert.Equal(t, int64(1), resp.Count, "Heartbeat key should exist") - assert.Equal(t, etcdtypes.FromTime(now), string(resp.Kvs[0].Value)) + assert.Equal(t, etcdtypes.FormatTime(now), string(resp.Kvs[0].Value)) resp, err = tc.Client.Get(ctx, stateKey) require.NoError(t, err) diff --git a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go index e708efefe14..381e8e946b8 100644 --- a/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go +++ b/service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go @@ -42,7 +42,8 @@ func setupExecutorWithShards(t *testing.T, testCluster *testhelper.StoreTestClus // verifyShardOwner checks that a shard has the expected owner and metadata func verifyShardOwner(t *testing.T, cache *namespaceShardToExecutor, shardID, expectedExecutorID string, expectedMetadata map[string]string) { owner, err := cache.GetShardOwner(context.Background(), shardID) - assert.NoError(t, err) + require.NoError(t, err) + require.NotNil(t, owner) assert.Equal(t, expectedExecutorID, owner.ExecutorID) for key, expectedValue := range expectedMetadata { assert.Equal(t, expectedValue, owner.Metadata[key]) From 0228e3c99208076cde63ac9c39b59a1b31be1811 Mon Sep 17 00:00:00 2001 From: Seva Kaloshin Date: Tue, 18 Nov 2025 16:27:54 +0100 Subject: [PATCH 5/5] fix tests --- service/sharddistributor/store/etcd/etcdtypes/time.go | 7 +++++-- .../store/etcd/etcdtypes/time_test.go | 11 ++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/service/sharddistributor/store/etcd/etcdtypes/time.go b/service/sharddistributor/store/etcd/etcdtypes/time.go index f77f1bca536..e7985f293b5 100644 --- a/service/sharddistributor/store/etcd/etcdtypes/time.go +++ b/service/sharddistributor/store/etcd/etcdtypes/time.go @@ -22,8 +22,11 @@ func (t Time) ToTime() time.Time { } // ToTimePtr converts Time back to *time.Time. -func (t Time) ToTimePtr() *time.Time { - tt := time.Time(t) +func (t *Time) ToTimePtr() *time.Time { + if t == nil { + return nil + } + tt := time.Time(*t) return &tt } diff --git a/service/sharddistributor/store/etcd/etcdtypes/time_test.go b/service/sharddistributor/store/etcd/etcdtypes/time_test.go index 0ea6d4ac903..eb0b1df8ae9 100644 --- a/service/sharddistributor/store/etcd/etcdtypes/time_test.go +++ b/service/sharddistributor/store/etcd/etcdtypes/time_test.go @@ -21,19 +21,16 @@ func TestTimeToTimePtr_NilInput_ReturnsNil(t *testing.T) { require.Nil(t, result) } -func TestTimeToTimePtr_ValidInput_ReturnsPointer(t *testing.T) { +func TestTimeToTimePtr(t *testing.T) { now := time.Now() result := ToTimePtr(&now) require.NotNil(t, result) require.Equal(t, Time(now), *result) } -func TestTimeToTimePtr_ConvertsBackToTimePtr(t *testing.T) { - now := time.Now() - wrapped := Time(now) - result := wrapped.ToTimePtr() - require.NotNil(t, result) - require.Equal(t, now, *result) +func TestTimeToTimePtr_Nil(t *testing.T) { + result := ToTimePtr(nil) + require.Nil(t, result) } func TestTimeMarshalJSON(t *testing.T) {