Skip to content

Commit

Permalink
Add watch loop for keyspace group primaries
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Jun 15, 2023
1 parent 669a266 commit 40b75d9
Showing 1 changed file with 64 additions and 59 deletions.
123 changes: 64 additions & 59 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the
// keyspace group membership path.
// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id
// in the keyspace group membership path.
compiledKGMembershipIDRegexp *regexp.Regexp
// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
Expand All @@ -250,6 +250,11 @@ type KeyspaceGroupManager struct {
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher

// primaries stores the mapping from the keyspace group ID to the primary address.
primaries sync.Map // store as map[uint32]string
// kgPrimaryWatcher is the watcher for the primaries of all keyspace groups.
kgPrimaryWatcher *etcdutil.LoopWatcher
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -300,16 +305,16 @@ func NewKeyspaceGroupManager(

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize() error {
if err := kgm.InitializeGroupWatchLoop(); err != nil {
log.Error("failed to initialize group watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the loaded keyspace groups.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}
if err := kgm.InitializeTSOServerWatchLoop(); err != nil {
log.Error("failed to initialize tso server watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the allocated resources.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)

Check warning on line 311 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L309-L311

Added lines #L309 - L311 were not covered by tests
}
if err := kgm.InitializeGroupWatchLoop(); err != nil {
log.Error("failed to initialize group watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the loaded keyspace groups.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}
return nil
}

Expand All @@ -329,6 +334,58 @@ func (kgm *KeyspaceGroupManager) Close() {
log.Info("keyspace group manager closed")
}

// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the
// registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceKey := discovery.TSOPath(kgm.clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err

Check warning on line 350 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L348-L350

Added lines #L348 - L350 were not covered by tests
}
kgm.tsoNodes.Store(s.ServiceAddr, struct{}{})
kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok {
delete(kgm.serviceRegistryMap, key)
kgm.tsoNodes.Delete(serviceAddr)
return nil
}
return perrors.Errorf("failed to find the service address for key %s", key)

Check warning on line 363 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L363

Added line #L363 was not covered by tests
}

kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher(
kgm.ctx,
&kgm.wg,
kgm.etcdClient,
"tso-nodes-watcher",
tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)

kgm.wg.Add(1)
go kgm.tsoNodesWatcher.StartWatchLoop()

if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
log.Error("failed to load the registered tso servers", errs.ZapError(err))
return err

Check warning on line 383 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L382-L383

Added lines #L382 - L383 were not covered by tests
}

return nil
}

// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group
// membership/distribution metadata.
// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group}
Expand Down Expand Up @@ -410,58 +467,6 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
return nil
}

// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the
// registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceKey := discovery.TSOPath(kgm.clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
kgm.tsoNodes.Store(s.ServiceAddr, struct{}{})
kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok {
delete(kgm.serviceRegistryMap, key)
kgm.tsoNodes.Delete(serviceAddr)
return nil
}
return perrors.Errorf("failed to find the service address for key %s", key)
}

kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher(
kgm.ctx,
&kgm.wg,
kgm.etcdClient,
"tso-nodes-watcher",
tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)

kgm.wg.Add(1)
go kgm.tsoNodesWatcher.StartWatchLoop()

if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
log.Error("failed to load the registered tso servers", errs.ZapError(err))
return err
}

return nil
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
for _, member := range group.Members {
if member.Address == kgm.tsoServiceID.ServiceAddr {
Expand Down

0 comments on commit 40b75d9

Please sign in to comment.