Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdclient: Add caller info to pd client #1516

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions internal/locate/pd_codec.go
Original file line number Diff line number Diff line change
@@ -44,10 +44,13 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
)

var _ pd.Client = &CodecPDClient{}

const componentName = "codec-pd-client"

// CodecPDClient wraps a PD Client to decode the encoded keys in region meta.
type CodecPDClient struct {
pd.Client
@@ -57,7 +60,7 @@ type CodecPDClient struct {
// NewCodecPDClient creates a CodecPDClient in API v1.
func NewCodecPDClient(mode apicodec.Mode, client pd.Client) *CodecPDClient {
codec := apicodec.NewCodecV1(mode)
return &CodecPDClient{client, codec}
return &CodecPDClient{client.WithCallerComponent(componentName), codec}
}

// NewCodecPDClientWithKeyspace creates a CodecPDClient in API v2 with keyspace name.
@@ -71,7 +74,7 @@ func NewCodecPDClientWithKeyspace(mode apicodec.Mode, client pd.Client, keyspace
return nil, err
}

return &CodecPDClient{client, codec}, nil
return &CodecPDClient{client.WithCallerComponent(componentName), codec}, nil
}

// GetKeyspaceID attempts to retrieve keyspace ID corresponding to the given keyspace name from PD.
@@ -202,3 +205,8 @@ func (c *CodecPDClient) decodeRegionKeyInPlace(r *router.Region) error {
}
return err
}

// WithCallerComponent returns a new PD client with the specified caller component.
func (c *CodecPDClient) WithCallerComponent(component caller.Component) pd.Client {
return &CodecPDClient{c.Client.WithCallerComponent(component), c.codec}
}
2 changes: 1 addition & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
@@ -687,7 +687,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
}

c := &RegionCache{
pdClient: pdClient,
pdClient: pdClient.WithCallerComponent("region-cache"),
requestHealthFeedbackCallback: options.requestHealthFeedbackCallback,
}

2 changes: 1 addition & 1 deletion internal/locate/store_cache.go
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ type storeCache interface {
}

func newStoreCache(pdClient pd.Client) *storeCacheImpl {
c := &storeCacheImpl{pdClient: pdClient}
c := &storeCacheImpl{pdClient: pdClient.WithCallerComponent("store-cache")}
c.notifyCheckCh = make(chan struct{}, 1)
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
2 changes: 1 addition & 1 deletion oracle/oracles/pd.go
Original file line number Diff line number Diff line change
@@ -179,7 +179,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e
}

o := &pdOracle{
c: pdClient,
c: pdClient.WithCallerComponent("oracle"),
quit: make(chan struct{}),
lastTSUpdateInterval: atomic.Int64{},
}
9 changes: 9 additions & 0 deletions oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/caller"
)

func TestPDOracle_UntilExpired(t *testing.T) {
@@ -87,6 +88,10 @@ func (c *MockPdClient) GetTS(ctx context.Context) (int64, int64, error) {
return 0, c.logicalTimestamp.Add(1), nil
}

func (c *MockPdClient) WithCallerComponent(component caller.Component) pd.Client {
return c
}

func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) {
pdClient := MockPdClient{}
o := NewPdOracleWithClient(&pdClient)
@@ -411,6 +416,10 @@ func (c *MockPDClientWithPause) Resume() {
c.mu.Unlock()
}

func (c *MockPDClientWithPause) WithCallerComponent(component caller.Component) pd.Client {
return c
}

func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) {
pdClient := &MockPDClientWithPause{}
o, err := NewPdOracle(pdClient, &PDOracleOptions{
5 changes: 3 additions & 2 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ const (
rawBatchPutSize = 16 * 1024
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
rawBatchPairCount = 512
componentName = caller.Component("rawkv-client-go")
)

type rawOptions struct {
@@ -204,7 +205,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt)
}

// Use an unwrapped PDClient to obtain keyspace meta.
pdCli, err := pd.NewClientWithContext(ctx, caller.Component("rawkv-client-go"), pdAddrs, pd.SecurityOption{
pdCli, err := pd.NewClientWithContext(ctx, componentName, pdAddrs, pd.SecurityOption{
CAPath: opt.security.ClusterSSLCA,
CertPath: opt.security.ClusterSSLCert,
KeyPath: opt.security.ClusterSSLKey,
@@ -240,7 +241,7 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt)
apiVersion: opt.apiVersion,
clusterID: pdCli.GetClusterID(ctx),
regionCache: locate.NewRegionCache(pdCli),
pdClient: pdCli,
pdClient: pdCli.WithCallerComponent(componentName),
rpcClient: rpcCli,
}, nil
}
4 changes: 2 additions & 2 deletions tikv/kv.go
Original file line number Diff line number Diff line change
@@ -285,7 +285,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
clusterID: pdClient.GetClusterID(context.TODO()),
uuid: uuid,
oracle: o,
pdClient: pdClient,
pdClient: pdClient.WithCallerComponent("kv-store"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining and describing one enum not using string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining them in pd repo? If you agree, I can open a pr to do it.

regionCache: regionCache,
kv: spkv,
safePoint: 0,
@@ -916,7 +916,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...opt.C
if err != nil {
return nil, errors.WithStack(err)
}
pdCli = util.InterceptedPDClient{Client: pdCli}
pdCli = util.NewInterceptedPDClient(pdCli)
uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO()))

tlsConfig, err := security.ToTLSConfig()
2 changes: 1 addition & 1 deletion txnkv/client.go
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) {
return nil, errors.WithStack(err)
}

pdClient = util.InterceptedPDClient{Client: pdClient}
pdClient = util.NewInterceptedPDClient(pdClient)

// Construct codec from options.
var codecCli *tikv.CodecPDClient
10 changes: 10 additions & 0 deletions util/pd_interceptor.go
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ import (
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
)

var (
@@ -64,6 +65,10 @@ type InterceptedPDClient struct {
pd.Client
}

func NewInterceptedPDClient(client pd.Client) *InterceptedPDClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs comment for public.

return &InterceptedPDClient{client.WithCallerComponent("intercepted-pd-client")}
}

// interceptedTsFuture is a PD's wrapper future to record stmt detail.
type interceptedTsFuture struct {
tso.TSFuture
@@ -137,3 +142,8 @@ func (m InterceptedPDClient) GetStore(ctx context.Context, storeID uint64) (*met
recordPDWaitTime(ctx, start)
return s, err
}

// WithCallerComponent implements pd.Client#WithCallerComponent.
func (m InterceptedPDClient) WithCallerComponent(component caller.Component) pd.Client {
return NewInterceptedPDClient(m.Client.WithCallerComponent(component))
}