Skip to content

Commit

Permalink
client/router: implement the query region gRPC client (#8939)
Browse files Browse the repository at this point in the history
ref #8690

Implement the router stream update logic.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Jan 27, 2025
1 parent 2a1f2dc commit debceaf
Show file tree
Hide file tree
Showing 8 changed files with 670 additions and 26 deletions.
46 changes: 26 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,11 @@ var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
type serviceModeKeeper struct {
// RMutex here is for the future usage that there might be multiple goroutines
// triggering service mode switching concurrently.
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient *tso.Cli
tsoSvcDiscovery sd.ServiceDiscovery
routerClient *router.Cli
}

func (k *serviceModeKeeper) close() {
Expand Down Expand Up @@ -570,21 +569,16 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *router.Region {
if res.Region == nil {
return nil
}
// EnableRouterClient enables the router client.
// This is only for test currently.
func (c *client) EnableRouterClient() {
c.inner.initRouterClient()
}

r := &router.Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
Buckets: res.Buckets,
}
for _, s := range res.DownPeers {
r.DownPeers = append(r.DownPeers, s.Peer)
}
return r
func (c *client) getRouterClient() *router.Cli {
c.inner.RLock()
defer c.inner.RUnlock()
return c.inner.routerClient
}

// GetRegionFromMember implements the RPCClient interface.
Expand Down Expand Up @@ -623,7 +617,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetRegion implements the RPCClient interface.
Expand All @@ -637,6 +631,10 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

if routerClient := c.getRouterClient(); routerClient != nil {
return routerClient.GetRegion(ctx, key, opts...)
}

options := &opt.GetRegionOp{}
for _, opt := range opts {
opt(options)
Expand All @@ -663,7 +661,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
if err = c.respForErr(metrics.CmdFailedDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetPrevRegion implements the RPCClient interface.
Expand All @@ -677,6 +675,10 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

if routerClient := c.getRouterClient(); routerClient != nil {
return routerClient.GetPrevRegion(ctx, key, opts...)
}

options := &opt.GetRegionOp{}
for _, opt := range opts {
opt(options)
Expand All @@ -703,7 +705,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if err = c.respForErr(metrics.CmdFailedDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetRegionByID implements the RPCClient interface.
Expand All @@ -717,6 +719,10 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

if routerClient := c.getRouterClient(); routerClient != nil {
return routerClient.GetRegionByID(ctx, regionID, opts...)
}

options := &opt.GetRegionOp{}
for _, opt := range opts {
opt(options)
Expand Down Expand Up @@ -744,7 +750,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if err = c.respForErr(metrics.CmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// ScanRegions implements the RPCClient interface.
Expand Down
Loading

0 comments on commit debceaf

Please sign in to comment.