Skip to content

Commit ca09c63

Browse files
committed
Integrate shard distributor client in matching client
We now pass the shard distributor client from the resource where it's fully initialized instead of from the bean where the retry handler is still lacking.
1 parent be54846 commit ca09c63

File tree

9 files changed

+127
-47
lines changed

9 files changed

+127
-47
lines changed

client/clientBean.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type (
4343
Bean interface {
4444
GetHistoryClient() history.Client
4545
GetHistoryPeers() history.PeerResolver
46-
GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
46+
GetMatchingClient(domainIDToName DomainIDToNameFunc, shardDistributorClient sharddistributor.Client) (matching.Client, error)
4747
GetFrontendClient() frontend.Client
4848
GetShardDistributorClient() sharddistributor.Client
4949
GetRemoteAdminClient(cluster string) admin.Client
@@ -123,11 +123,11 @@ func (h *clientBeanImpl) GetHistoryPeers() history.PeerResolver {
123123
return h.historyPeers
124124
}
125125

126-
func (h *clientBeanImpl) GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
126+
func (h *clientBeanImpl) GetMatchingClient(domainIDToName DomainIDToNameFunc, shardDistributorClient sharddistributor.Client) (matching.Client, error) {
127127
if client := h.matchingClient.Load(); client != nil {
128128
return client.(matching.Client), nil
129129
}
130-
return h.lazyInitMatchingClient(domainIDToName)
130+
return h.lazyInitMatchingClient(domainIDToName, shardDistributorClient)
131131
}
132132

133133
func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
@@ -170,13 +170,13 @@ func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) frontend.Client
170170
return client
171171
}
172172

173-
func (h *clientBeanImpl) lazyInitMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
173+
func (h *clientBeanImpl) lazyInitMatchingClient(domainIDToName DomainIDToNameFunc, shardDistributorClient sharddistributor.Client) (matching.Client, error) {
174174
h.Lock()
175175
defer h.Unlock()
176176
if cached := h.matchingClient.Load(); cached != nil {
177177
return cached.(matching.Client), nil
178178
}
179-
client, err := h.factory.NewMatchingClient(domainIDToName)
179+
client, err := h.factory.NewMatchingClient(domainIDToName, shardDistributorClient)
180180
if err != nil {
181181
return nil, err
182182
}

client/clientBean_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/clientfactory.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ type (
5757
// Factory can be used to create RPC clients for cadence services
5858
Factory interface {
5959
NewHistoryClient() (history.Client, history.PeerResolver, error)
60-
NewMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
60+
NewMatchingClient(domainIDToName DomainIDToNameFunc, shardDistributorClient sharddistributor.Client) (matching.Client, error)
6161

6262
NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, history.PeerResolver, error)
63-
NewMatchingClientWithTimeout(domainIDToName DomainIDToNameFunc, timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error)
63+
NewMatchingClientWithTimeout(domainIDToName DomainIDToNameFunc, timeout time.Duration, longPollTimeout time.Duration, shardDistributorClient sharddistributor.Client) (matching.Client, error)
6464

6565
NewAdminClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, largeTimeout time.Duration) (admin.Client, error)
6666
NewFrontendClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)
@@ -108,8 +108,8 @@ func (cf *rpcClientFactory) NewHistoryClient() (history.Client, history.PeerReso
108108
return cf.NewHistoryClientWithTimeout(timeoutwrapper.HistoryDefaultTimeout)
109109
}
110110

111-
func (cf *rpcClientFactory) NewMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
112-
return cf.NewMatchingClientWithTimeout(domainIDToName, timeoutwrapper.MatchingDefaultTimeout, timeoutwrapper.MatchingDefaultLongPollTimeout)
111+
func (cf *rpcClientFactory) NewMatchingClient(domainIDToName DomainIDToNameFunc, shardDistributorClient sharddistributor.Client) (matching.Client, error) {
112+
return cf.NewMatchingClientWithTimeout(domainIDToName, timeoutwrapper.MatchingDefaultTimeout, timeoutwrapper.MatchingDefaultLongPollTimeout, shardDistributorClient)
113113
}
114114

115115
func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, history.PeerResolver, error) {
@@ -147,6 +147,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
147147
domainIDToName DomainIDToNameFunc,
148148
timeout time.Duration,
149149
longPollTimeout time.Duration,
150+
shardDistributorClient sharddistributor.Client,
150151
) (matching.Client, error) {
151152
var rawClient matching.Client
152153
var namedPort = membership.PortTchannel
@@ -174,8 +175,11 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
174175
client := matching.NewClient(
175176
rawClient,
176177
peerResolver,
178+
shardDistributorClient,
179+
cf.dynConfig.GetStringProperty(dynamicconfig.MatchingShardDistributionMode),
177180
matching.NewMultiLoadBalancer(defaultLoadBalancer, loadBalancers, domainIDToName, cf.dynConfig, cf.logger),
178181
partitionConfigProvider,
182+
cf.logger,
179183
)
180184
client = timeoutwrapper.NewMatchingClient(client, longPollTimeout, timeout)
181185
if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.MatchingErrorInjectionRate)(); errorRate != 0 {

client/matching/client.go

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,51 @@ package matching
2222

2323
import (
2424
"context"
25+
"fmt"
2526

2627
"go.uber.org/yarpc"
2728

29+
"github.com/uber/cadence/client/sharddistributor"
30+
"github.com/uber/cadence/common"
31+
"github.com/uber/cadence/common/dynamicconfig"
2832
"github.com/uber/cadence/common/future"
33+
"github.com/uber/cadence/common/log"
34+
"github.com/uber/cadence/common/log/tag"
2935
"github.com/uber/cadence/common/persistence"
3036
"github.com/uber/cadence/common/types"
37+
"github.com/uber/cadence/service/sharddistributor/constants"
3138
)
3239

3340
var _ Client = (*clientImpl)(nil)
3441

3542
type clientImpl struct {
36-
client Client
37-
peerResolver PeerResolver
38-
loadBalancer LoadBalancer
39-
provider PartitionConfigProvider
43+
client Client
44+
peerResolver PeerResolver
45+
shardDistributorClient sharddistributor.Client
46+
shardDistributionMode dynamicconfig.StringPropertyFn
47+
loadBalancer LoadBalancer
48+
provider PartitionConfigProvider
49+
logger log.Logger
4050
}
4151

4252
// NewClient creates a new history service TChannel client
4353
func NewClient(
4454
client Client,
4555
peerResolver PeerResolver,
56+
shardDistributorClient sharddistributor.Client,
57+
hashDistributionMode dynamicconfig.StringPropertyFn,
4658
lb LoadBalancer,
4759
provider PartitionConfigProvider,
60+
logger log.Logger,
4861
) Client {
4962
return &clientImpl{
50-
client: client,
51-
peerResolver: peerResolver,
52-
loadBalancer: lb,
53-
provider: provider,
63+
client: client,
64+
peerResolver: peerResolver,
65+
shardDistributorClient: shardDistributorClient,
66+
shardDistributionMode: hashDistributionMode,
67+
loadBalancer: lb,
68+
provider: provider,
69+
logger: logger,
5470
}
5571
}
5672

@@ -65,7 +81,7 @@ func (c *clientImpl) AddActivityTask(
6581
)
6682
originalTaskListName := request.TaskList.GetName()
6783
request.TaskList.Name = partition
68-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
84+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
6985
if err != nil {
7086
return nil, err
7187
}
@@ -94,7 +110,7 @@ func (c *clientImpl) AddDecisionTask(
94110
)
95111
originalTaskListName := request.TaskList.GetName()
96112
request.TaskList.Name = partition
97-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
113+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
98114
if err != nil {
99115
return nil, err
100116
}
@@ -124,7 +140,7 @@ func (c *clientImpl) PollForActivityTask(
124140
)
125141
originalTaskListName := request.PollRequest.GetTaskList().GetName()
126142
request.PollRequest.TaskList.Name = partition
127-
peer, err := c.peerResolver.FromTaskList(request.PollRequest.TaskList.GetName())
143+
peer, err := c.getShardOwner(ctx, request.PollRequest.TaskList.GetName())
128144
if err != nil {
129145
return nil, err
130146
}
@@ -162,7 +178,7 @@ func (c *clientImpl) PollForDecisionTask(
162178
)
163179
originalTaskListName := request.PollRequest.GetTaskList().GetName()
164180
request.PollRequest.TaskList.Name = partition
165-
peer, err := c.peerResolver.FromTaskList(request.PollRequest.TaskList.GetName())
181+
peer, err := c.getShardOwner(ctx, request.PollRequest.TaskList.GetName())
166182
if err != nil {
167183
return nil, err
168184
}
@@ -199,7 +215,7 @@ func (c *clientImpl) QueryWorkflow(
199215
"",
200216
)
201217
request.TaskList.Name = partition
202-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
218+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
203219
if err != nil {
204220
return nil, err
205221
}
@@ -211,7 +227,7 @@ func (c *clientImpl) RespondQueryTaskCompleted(
211227
request *types.MatchingRespondQueryTaskCompletedRequest,
212228
opts ...yarpc.CallOption,
213229
) error {
214-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
230+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
215231
if err != nil {
216232
return err
217233
}
@@ -223,7 +239,7 @@ func (c *clientImpl) CancelOutstandingPoll(
223239
request *types.CancelOutstandingPollRequest,
224240
opts ...yarpc.CallOption,
225241
) error {
226-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
242+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
227243
if err != nil {
228244
return err
229245
}
@@ -235,7 +251,7 @@ func (c *clientImpl) DescribeTaskList(
235251
request *types.MatchingDescribeTaskListRequest,
236252
opts ...yarpc.CallOption,
237253
) (*types.DescribeTaskListResponse, error) {
238-
peer, err := c.peerResolver.FromTaskList(request.DescRequest.TaskList.GetName())
254+
peer, err := c.getShardOwner(ctx, request.DescRequest.TaskList.GetName())
239255
if err != nil {
240256
return nil, err
241257
}
@@ -247,7 +263,7 @@ func (c *clientImpl) ListTaskListPartitions(
247263
request *types.MatchingListTaskListPartitionsRequest,
248264
opts ...yarpc.CallOption,
249265
) (*types.ListTaskListPartitionsResponse, error) {
250-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
266+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
251267
if err != nil {
252268
return nil, err
253269
}
@@ -305,7 +321,7 @@ func (c *clientImpl) UpdateTaskListPartitionConfig(
305321
request *types.MatchingUpdateTaskListPartitionConfigRequest,
306322
opts ...yarpc.CallOption,
307323
) (*types.MatchingUpdateTaskListPartitionConfigResponse, error) {
308-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
324+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
309325
if err != nil {
310326
return nil, err
311327
}
@@ -317,9 +333,39 @@ func (c *clientImpl) RefreshTaskListPartitionConfig(
317333
request *types.MatchingRefreshTaskListPartitionConfigRequest,
318334
opts ...yarpc.CallOption,
319335
) (*types.MatchingRefreshTaskListPartitionConfigResponse, error) {
320-
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
336+
peer, err := c.getShardOwner(ctx, request.TaskList.GetName())
321337
if err != nil {
322338
return nil, err
323339
}
324340
return c.client.RefreshTaskListPartitionConfig(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
325341
}
342+
343+
func (c *clientImpl) getShardOwner(ctx context.Context, taskListName string) (string, error) {
344+
sharddistributorMode := c.shardDistributionMode()
345+
if sharddistributorMode == common.ShardModeShardDistributor && c.shardDistributorClient != nil {
346+
request := &types.GetShardOwnerRequest{
347+
ShardKey: taskListName,
348+
Namespace: constants.MatchingNamespace,
349+
}
350+
351+
resp, err := c.shardDistributorClient.GetShardOwner(ctx, request)
352+
if err != nil {
353+
return "", fmt.Errorf("find shard in shard distributor: %w", err)
354+
}
355+
356+
return resp.Owner, nil
357+
}
358+
359+
if sharddistributorMode == common.ShardModeShardDistributor && c.shardDistributorClient == nil {
360+
c.logger.Warn("ShardDistributor mode enabled, but shard distributor is not available, falling back to hash-ring")
361+
} else if c.shardDistributionMode() != common.ShardModeHashRing {
362+
c.logger.Warn("Unknown hash distribution mode, falling back to hash-ring", tag.Mode(c.shardDistributionMode()))
363+
}
364+
365+
owner, err := c.peerResolver.FromTaskList(taskListName)
366+
if err != nil {
367+
return "", fmt.Errorf("find shard in hash ring: %w", err)
368+
}
369+
370+
return owner, nil
371+
}

client/matching/client_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import (
3030
"go.uber.org/mock/gomock"
3131
"go.uber.org/yarpc"
3232

33+
"github.com/uber/cadence/client/sharddistributor"
34+
"github.com/uber/cadence/common"
35+
"github.com/uber/cadence/common/dynamicconfig"
36+
"github.com/uber/cadence/common/log"
3337
"github.com/uber/cadence/common/persistence"
3438
"github.com/uber/cadence/common/types"
3539
)
@@ -47,8 +51,11 @@ func TestNewClient(t *testing.T) {
4751
peerResolver := NewMockPeerResolver(ctrl)
4852
loadbalancer := NewMockLoadBalancer(ctrl)
4953
provider := NewMockPartitionConfigProvider(ctrl)
54+
shardDistributor := sharddistributor.NewMockClient(ctrl)
55+
shardDistributorMode := dynamicconfig.StringPropertyFn(func(opts ...dynamicconfig.FilterOption) string { return common.ShardModeHashRing })
56+
logger := log.NewNoop()
5057

51-
c := NewClient(client, peerResolver, loadbalancer, provider)
58+
c := NewClient(client, peerResolver, shardDistributor, shardDistributorMode, loadbalancer, provider, logger)
5259
assert.NotNil(t, c)
5360
}
5461

@@ -133,7 +140,11 @@ func TestClient_withoutResponse(t *testing.T) {
133140
loadbalancerMock := NewMockLoadBalancer(ctrl)
134141
providerMock := NewMockPartitionConfigProvider(ctrl)
135142
tt.mock(peerResolverMock, loadbalancerMock, client)
136-
c := NewClient(client, peerResolverMock, loadbalancerMock, providerMock)
143+
shardDistributor := sharddistributor.NewMockClient(ctrl)
144+
shardDistributorMode := dynamicconfig.StringPropertyFn(func(opts ...dynamicconfig.FilterOption) string { return common.ShardModeHashRing })
145+
logger := log.NewNoop()
146+
147+
c := NewClient(client, peerResolverMock, shardDistributor, shardDistributorMode, loadbalancerMock, providerMock, logger)
137148

138149
err := tt.op(c)
139150
if tt.wantError {
@@ -515,7 +526,11 @@ func TestClient_withResponse(t *testing.T) {
515526
loadbalancerMock := NewMockLoadBalancer(ctrl)
516527
providerMock := NewMockPartitionConfigProvider(ctrl)
517528
tt.mock(peerResolverMock, loadbalancerMock, client, providerMock)
518-
c := NewClient(client, peerResolverMock, loadbalancerMock, providerMock)
529+
shardDistributor := sharddistributor.NewMockClient(ctrl)
530+
shardDistributorMode := dynamicconfig.StringPropertyFn(func(opts ...dynamicconfig.FilterOption) string { return common.ShardModeHashRing })
531+
logger := log.NewNoop()
532+
533+
c := NewClient(client, peerResolverMock, shardDistributor, shardDistributorMode, loadbalancerMock, providerMock, logger)
519534

520535
res, err := tt.op(c)
521536
if tt.wantError {

common/dynamicconfig/constants.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2335,6 +2335,16 @@ const (
23352335

23362336
TasklistLoadBalancerStrategy
23372337

2338+
// MatchingShardDistributionMode is the mode of shard distribution for matching, we currently have two modes:
2339+
//
2340+
// - "hash-ring" means that the shards are distributed using a consistent hash ring, in particular using the ringpop library
2341+
// - "shard-distributor" means that the shards are distributed using the _highly experimental_ shard distributor service
2342+
//
2343+
// KeyName: matching.shardDistributionMode
2344+
// Value type: string enum: "hash-ring" or "shard-distributor"
2345+
// Default value: "hash-ring"
2346+
MatchingShardDistributionMode
2347+
23382348
// LastStringKey must be the last one in this const group
23392349
LastStringKey
23402350
)
@@ -4624,6 +4634,11 @@ var StringKeys = map[StringKey]DynamicString{
46244634
DefaultValue: "random", // other options: "round-robin"
46254635
Filters: []Filter{DomainName, TaskListName, TaskType},
46264636
},
4637+
MatchingShardDistributionMode: {
4638+
KeyName: "matching.shardDistributionMode",
4639+
Description: "MatchingShardDistributionMode defines which shard distribution mode should be used",
4640+
DefaultValue: "hash-ring",
4641+
},
46274642
}
46284643

46294644
var DurationKeys = map[DurationKey]DynamicDuration{

0 commit comments

Comments
 (0)