@@ -10,8 +10,6 @@ import (
1010 "fmt"
1111 "time"
1212
13- types2 "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/types"
14-
1513 clientv3 "go.etcd.io/etcd/client/v3"
1614 "go.uber.org/fx"
1715
@@ -22,6 +20,7 @@ import (
2220 "github.com/uber/cadence/service/sharddistributor/config"
2321 "github.com/uber/cadence/service/sharddistributor/store"
2422 "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys"
23+ "github.com/uber/cadence/service/sharddistributor/store/etcd/etcdtypes"
2524 "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/common"
2625 "github.com/uber/cadence/service/sharddistributor/store/etcd/executorstore/shardcache"
2726)
@@ -43,8 +42,8 @@ type executorStoreImpl struct {
4342type shardStatisticsUpdate struct {
4443 key string
4544 shardID string
46- stats types2 .ShardStatistics
47- desiredLastMove types2 .Time // intended LastMoveTime for this update
45+ stats etcdtypes .ShardStatistics
46+ desiredLastMove etcdtypes .Time // intended LastMoveTime for this update
4847}
4948
5049// ExecutorStoreParams defines the dependencies for the etcd store, for use with fx.
@@ -139,7 +138,7 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
139138
140139 // Build all operations including metadata
141140 ops := []clientv3.Op {
142- clientv3 .OpPut (heartbeatETCDKey , types2 .FromTime (request .LastHeartbeat )),
141+ clientv3 .OpPut (heartbeatETCDKey , etcdtypes .FromTime (request .LastHeartbeat )),
143142 clientv3 .OpPut (stateETCDKey , string (jsonState )),
144143 clientv3 .OpPut (reportedShardsETCDKey , string (reportedShardsData )),
145144 }
@@ -174,7 +173,7 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string,
174173 }
175174
176175 heartbeatState := & store.HeartbeatState {}
177- assignedState := & types2 .AssignedState {}
176+ assignedState := & etcdtypes .AssignedState {}
178177 found := false
179178
180179 for _ , kv := range resp .Kvs {
@@ -188,7 +187,7 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string,
188187 found = true // We found at least one valid key part for the executor.
189188 switch keyType {
190189 case etcdkeys .ExecutorHeartbeatKey :
191- heartbeatState .LastHeartbeat , err = types2 .ToTime (value )
190+ heartbeatState .LastHeartbeat , err = etcdtypes .ToTime (value )
192191 if err != nil {
193192 return nil , nil , fmt .Errorf ("parse heartbeat timestamp: %w" , err )
194193 }
@@ -240,7 +239,7 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
240239
241240 switch keyType {
242241 case etcdkeys .ExecutorHeartbeatKey :
243- heartbeat .LastHeartbeat , err = types2 .ToTime (value )
242+ heartbeat .LastHeartbeat , err = etcdtypes .ToTime (value )
244243 if err != nil {
245244 return nil , fmt .Errorf ("parse heartbeat timestamp: %w" , err )
246245 }
@@ -253,7 +252,7 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
253252 return nil , fmt .Errorf ("parse reported shards: %w" , err )
254253 }
255254 case etcdkeys .ExecutorAssignedStateKey :
256- var assignedRaw types2 .AssignedState
255+ var assignedRaw etcdtypes .AssignedState
257256 if err := common .DecompressAndUnmarshal (kv .Value , & assignedRaw ); err != nil {
258257 return nil , fmt .Errorf ("parse assigned shards: %w, %s" , err , value )
259258 }
@@ -278,7 +277,7 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
278277 if shardKeyType != etcdkeys .ShardStatisticsKey {
279278 continue
280279 }
281- var shardStatistic types2 .ShardStatistics
280+ var shardStatistic etcdtypes .ShardStatistics
282281 if err := common .DecompressAndUnmarshal (kv .Value , & shardStatistic ); err != nil {
283282 continue
284283 }
@@ -355,7 +354,7 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
355354 if err != nil {
356355 return fmt .Errorf ("build executor assigned state key: %w" , err )
357356 }
358- value , err := json .Marshal (types2 .FromAssignedState (& state ))
357+ value , err := json .Marshal (etcdtypes .FromAssignedState (& state ))
359358 if err != nil {
360359 return fmt .Errorf ("marshal assigned shards for executor %s: %w" , executorID , err )
361360 }
@@ -438,8 +437,8 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
438437 return fmt .Errorf ("get executor assigned state: %w" , err )
439438 }
440439
441- var state types2 .AssignedState
442- var shardStats types2 .ShardStatistics
440+ var state etcdtypes .AssignedState
441+ var shardStats etcdtypes .ShardStatistics
443442 modRevision := int64 (0 ) // A revision of 0 means the key doesn't exist yet.
444443
445444 if len (resp .Kvs ) > 0 {
@@ -469,12 +468,12 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
469468 // Statistics already exist, update the last move time.
470469 // This can happen if the shard was previously assigned to an executor, and a lookup happens after the executor is deleted,
471470 // AssignShard is then called to assign the shard to a new executor.
472- shardStats .LastMoveTime = types2 .Time (now )
471+ shardStats .LastMoveTime = etcdtypes .Time (now )
473472 } else {
474473 // Statistics don't exist, initialize them.
475474 shardStats .SmoothedLoad = 0
476- shardStats .LastUpdateTime = types2 .Time (now )
477- shardStats .LastMoveTime = types2 .Time (now )
475+ shardStats .LastUpdateTime = etcdtypes .Time (now )
476+ shardStats .LastMoveTime = etcdtypes .Time (now )
478477 }
479478
480479 // 2. Get the executor state.
@@ -677,22 +676,22 @@ func (s *executorStoreImpl) prepareShardStatisticsUpdates(ctx context.Context, n
677676 return nil , fmt .Errorf ("get shard statistics: %w" , err )
678677 }
679678
680- stats := types2 .ShardStatistics {}
679+ stats := etcdtypes .ShardStatistics {}
681680
682681 if len (statsResp .Kvs ) > 0 {
683682 if err := common .DecompressAndUnmarshal (statsResp .Kvs [0 ].Value , & stats ); err != nil {
684683 return nil , fmt .Errorf ("parse shard statistics: %w" , err )
685684 }
686685 } else {
687686 stats .SmoothedLoad = 0
688- stats .LastUpdateTime = types2 .Time (now )
687+ stats .LastUpdateTime = etcdtypes .Time (now )
689688 }
690689
691690 updates = append (updates , shardStatisticsUpdate {
692691 key : shardStatisticsKey ,
693692 shardID : shardID ,
694693 stats : stats ,
695- desiredLastMove : types2 .Time (now ),
694+ desiredLastMove : etcdtypes .Time (now ),
696695 })
697696 }
698697 }
0 commit comments