Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d936f9a

Browse files
committedMar 10, 2025·
Update isolationLoadbalancer to use isolation group <-> partition assignment
Rather than arbitrarily assigning isolation groups to partitions, use the assignment stored in the database and cached in the client. Refactor PartitionConfigProvider to expose the full partition configuration.
1 parent b1add26 commit d936f9a

6 files changed

+379
-228
lines changed
 

‎client/clientfactory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
164164
defaultLoadBalancer := matching.NewLoadBalancer(partitionConfigProvider)
165165
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(partitionConfigProvider)
166166
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, partitionConfigProvider, cf.logger)
167-
igLoadBalancer := matching.NewIsolationLoadBalancer(weightedLoadBalancer, partitionConfigProvider, cf.allIsolationGroups)
167+
igLoadBalancer := matching.NewIsolationLoadBalancer(weightedLoadBalancer, partitionConfigProvider, domainIDToName, cf.dynConfig)
168168
loadBalancers := map[string]matching.LoadBalancer{
169169
"random": defaultLoadBalancer,
170170
"round-robin": roundRobinLoadBalancer,

‎client/matching/isolation_loadbalancer.go

+40-57
Original file line numberDiff line numberDiff line change
@@ -24,43 +24,47 @@ package matching
2424

2525
import (
2626
"math/rand"
27-
"slices"
2827

2928
"golang.org/x/exp/maps"
3029

30+
"github.com/uber/cadence/common/dynamicconfig"
3131
"github.com/uber/cadence/common/partition"
3232
"github.com/uber/cadence/common/types"
3333
)
3434

3535
type isolationLoadBalancer struct {
36-
provider PartitionConfigProvider
37-
fallback LoadBalancer
38-
allIsolationGroups func() []string
36+
provider PartitionConfigProvider
37+
fallback LoadBalancer
38+
domainIDToName func(string) (string, error)
39+
isolationEnabled func(string) bool
3940
}
4041

41-
func NewIsolationLoadBalancer(fallback LoadBalancer, provider PartitionConfigProvider, allIsolationGroups func() []string) LoadBalancer {
42+
func NewIsolationLoadBalancer(fallback LoadBalancer, provider PartitionConfigProvider, domainIDToName func(string) (string, error), config *dynamicconfig.Collection) LoadBalancer {
43+
isolationEnabled := config.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation)
4244
return &isolationLoadBalancer{
43-
provider: provider,
44-
fallback: fallback,
45-
allIsolationGroups: allIsolationGroups,
45+
provider: provider,
46+
fallback: fallback,
47+
domainIDToName: domainIDToName,
48+
isolationEnabled: isolationEnabled,
4649
}
4750
}
4851

4952
func (i *isolationLoadBalancer) PickWritePartition(taskListType int, req WriteRequest) string {
5053
taskList := *req.GetTaskList()
51-
nPartitions := i.provider.GetNumberOfWritePartitions(req.GetDomainUUID(), taskList, taskListType)
52-
taskListName := req.GetTaskList().Name
5354

54-
if nPartitions <= 1 {
55-
return taskListName
55+
domainName, err := i.domainIDToName(req.GetDomainUUID())
56+
if err != nil || !i.isolationEnabled(domainName) {
57+
return i.fallback.PickWritePartition(taskListType, req)
5658
}
5759

5860
taskGroup, ok := req.GetPartitionConfig()[partition.IsolationGroupKey]
59-
if !ok {
61+
if !ok || taskGroup == "" {
6062
return i.fallback.PickWritePartition(taskListType, req)
6163
}
6264

63-
partitions, ok := i.getPartitionsForGroup(taskGroup, nPartitions)
65+
config := i.provider.GetPartitionConfig(req.GetDomainUUID(), taskList, taskListType)
66+
67+
partitions, ok := i.getPartitionsForGroup(taskGroup, config.WritePartitions)
6468
if !ok {
6569
return i.fallback.PickWritePartition(taskListType, req)
6670
}
@@ -72,71 +76,50 @@ func (i *isolationLoadBalancer) PickWritePartition(taskListType int, req WriteRe
7276

7377
func (i *isolationLoadBalancer) PickReadPartition(taskListType int, req ReadRequest, isolationGroup string) string {
7478
taskList := *req.GetTaskList()
75-
nRead := i.provider.GetNumberOfReadPartitions(req.GetDomainUUID(), taskList, taskListType)
76-
taskListName := taskList.Name
7779

78-
if nRead <= 1 {
79-
return taskListName
80+
domainName, err := i.domainIDToName(req.GetDomainUUID())
81+
if err != nil || !i.isolationEnabled(domainName) {
82+
return i.fallback.PickReadPartition(taskListType, req, isolationGroup)
8083
}
8184

82-
partitions, ok := i.getPartitionsForGroup(isolationGroup, nRead)
85+
config := i.provider.GetPartitionConfig(req.GetDomainUUID(), taskList, taskListType)
86+
87+
partitions, ok := i.getPartitionsForGroup(isolationGroup, config.ReadPartitions)
8388
if !ok {
8489
return i.fallback.PickReadPartition(taskListType, req, isolationGroup)
8590
}
8691

87-
// Scaling down, we need to consider both sets of partitions
88-
if numWrite := i.provider.GetNumberOfWritePartitions(req.GetDomainUUID(), taskList, taskListType); numWrite != nRead {
89-
writePartitions, ok := i.getPartitionsForGroup(isolationGroup, numWrite)
90-
if ok {
91-
for p := range writePartitions {
92-
partitions[p] = struct{}{}
93-
}
94-
}
95-
}
96-
9792
p := i.pickBetween(partitions)
9893

9994
return getPartitionTaskListName(taskList.GetName(), p)
10095
}
10196

10297
func (i *isolationLoadBalancer) UpdateWeight(taskListType int, req ReadRequest, partition string, info *types.LoadBalancerHints) {
98+
i.fallback.UpdateWeight(taskListType, req, partition, info)
10399
}
104100

105-
func (i *isolationLoadBalancer) getPartitionsForGroup(taskGroup string, partitionCount int) (map[int]any, bool) {
101+
func (i *isolationLoadBalancer) getPartitionsForGroup(taskGroup string, partitions map[int]*types.TaskListPartition) (map[int]any, bool) {
106102
if taskGroup == "" {
107103
return nil, false
108104
}
109-
isolationGroups := slices.Clone(i.allIsolationGroups())
110-
slices.Sort(isolationGroups)
111-
index := slices.Index(isolationGroups, taskGroup)
112-
if index == -1 {
113-
return nil, false
114-
}
115-
partitions := make(map[int]any, 1)
116-
// 3 groups [a, b, c] and 4 partitions gives us a mapping like this:
117-
// 0, 3: a
118-
// 1: b
119-
// 2: c
120-
// 4 groups [a, b, c, d] and 10 partitions gives us a mapping like this:
121-
// 0, 4, 8: a
122-
// 1, 5, 9: b
123-
// 2, 6: c
124-
// 3, 7: d
125-
if len(isolationGroups) <= partitionCount {
126-
for j := index; j < partitionCount; j += len(isolationGroups) {
127-
partitions[j] = struct{}{}
105+
106+
res := make(map[int]any)
107+
for id, p := range partitions {
108+
if len(p.IsolationGroups) == 0 {
109+
res[id] = struct{}{}
110+
continue
111+
}
112+
for _, ig := range p.IsolationGroups {
113+
if ig == taskGroup {
114+
res[id] = struct{}{}
115+
break
116+
}
128117
}
129-
// 4 groups [a,b,c,d] and 3 partitions gives us a mapping like this:
130-
// 0: a, d
131-
// 1: b
132-
// 2: c
133-
} else {
134-
partitions[index%partitionCount] = struct{}{}
135118
}
136-
if len(partitions) == 0 {
119+
if len(res) == 0 {
137120
return nil, false
138121
}
139-
return partitions, true
122+
return res, true
140123
}
141124

142125
func (i *isolationLoadBalancer) pickBetween(partitions map[int]any) int {

‎client/matching/isolation_loadbalancer_test.go

+214-132
Original file line numberDiff line numberDiff line change
@@ -35,53 +35,93 @@ import (
3535
func TestIsolationPickWritePartition(t *testing.T) {
3636
tl := "tl"
3737
cases := []struct {
38-
name string
39-
group string
40-
isolationGroups []string
41-
numWrite int
42-
shouldFallback bool
43-
allowed []string
38+
name string
39+
group string
40+
config *types.TaskListPartitionConfig
41+
disableIsolation bool
42+
shouldFallback bool
43+
allowed []string
4444
}{
4545
{
46-
name: "single partition",
47-
group: "a",
48-
numWrite: 1,
49-
isolationGroups: []string{"a"},
50-
allowed: []string{tl},
46+
name: "single partition",
47+
group: "a",
48+
config: &types.TaskListPartitionConfig{
49+
WritePartitions: map[int]*types.TaskListPartition{
50+
0: {},
51+
},
52+
},
53+
allowed: []string{tl},
54+
},
55+
{
56+
name: "single partition - allowed",
57+
group: "a",
58+
config: &types.TaskListPartitionConfig{
59+
WritePartitions: map[int]*types.TaskListPartition{
60+
0: {[]string{"a"}},
61+
},
62+
},
63+
allowed: []string{tl},
64+
},
65+
{
66+
name: "single partition - not allowed",
67+
group: "a",
68+
config: &types.TaskListPartitionConfig{
69+
WritePartitions: map[int]*types.TaskListPartition{
70+
0: {[]string{"b"}},
71+
},
72+
},
73+
shouldFallback: true,
74+
allowed: []string{"fallback"},
5175
},
5276
{
53-
name: "multiple partitions - single option",
54-
group: "b",
55-
numWrite: 2,
56-
isolationGroups: []string{"a", "b"},
57-
allowed: []string{getPartitionTaskListName(tl, 1)},
77+
name: "single partition - isolation disabled",
78+
group: "a",
79+
config: &types.TaskListPartitionConfig{
80+
WritePartitions: map[int]*types.TaskListPartition{
81+
0: {},
82+
},
83+
},
84+
disableIsolation: true,
85+
shouldFallback: true,
86+
allowed: []string{"fallback"},
5887
},
5988
{
60-
name: "multiple partitions - multiple options",
61-
group: "a",
62-
numWrite: 2,
63-
isolationGroups: []string{"a"},
64-
allowed: []string{tl, getPartitionTaskListName(tl, 1)},
89+
name: "multiple partitions - single option",
90+
group: "b",
91+
config: &types.TaskListPartitionConfig{
92+
WritePartitions: map[int]*types.TaskListPartition{
93+
0: {[]string{"a"}},
94+
1: {[]string{"b"}},
95+
},
96+
},
97+
allowed: []string{getPartitionTaskListName(tl, 1)},
6598
},
6699
{
67-
name: "fallback - no group",
68-
numWrite: 2,
69-
isolationGroups: []string{"a"},
70-
shouldFallback: true,
71-
allowed: []string{"fallback"},
100+
name: "multiple partitions - multiple options",
101+
group: "a",
102+
config: &types.TaskListPartitionConfig{
103+
WritePartitions: map[int]*types.TaskListPartition{
104+
0: {[]string{"a"}},
105+
1: {[]string{"a"}},
106+
},
107+
},
108+
allowed: []string{tl, getPartitionTaskListName(tl, 1)},
72109
},
73110
{
74-
name: "fallback - no groups",
75-
group: "a",
76-
numWrite: 2,
77-
isolationGroups: []string{""},
78-
shouldFallback: true,
79-
allowed: []string{"fallback"},
111+
name: "fallback - no group",
112+
config: &types.TaskListPartitionConfig{
113+
WritePartitions: map[int]*types.TaskListPartition{
114+
0: {[]string{"a"}},
115+
1: {[]string{"b"}},
116+
},
117+
},
118+
shouldFallback: true,
119+
allowed: []string{"fallback"},
80120
},
81121
}
82122
for _, tc := range cases {
83123
t.Run(tc.name, func(t *testing.T) {
84-
lb, fallback := createWithMocks(t, tc.isolationGroups, tc.numWrite, tc.numWrite)
124+
lb, fallback := createWithMocks(t, !tc.disableIsolation, tc.config)
85125
req := &types.AddDecisionTaskRequest{
86126
DomainUUID: "domainId",
87127
TaskList: &types.TaskList{
@@ -106,69 +146,93 @@ func TestIsolationPickWritePartition(t *testing.T) {
106146
func TestIsolationPickReadPartition(t *testing.T) {
107147
tl := "tl"
108148
cases := []struct {
109-
name string
110-
group string
111-
isolationGroups []string
112-
numRead int
113-
numWrite int
114-
shouldFallback bool
115-
allowed []string
149+
name string
150+
group string
151+
config *types.TaskListPartitionConfig
152+
disableIsolation bool
153+
shouldFallback bool
154+
allowed []string
116155
}{
117156
{
118-
name: "single partition",
119-
group: "a",
120-
numRead: 1,
121-
numWrite: 1,
122-
isolationGroups: []string{"a"},
123-
allowed: []string{tl},
157+
name: "single partition",
158+
group: "a",
159+
config: &types.TaskListPartitionConfig{
160+
ReadPartitions: map[int]*types.TaskListPartition{
161+
0: {},
162+
},
163+
},
164+
allowed: []string{tl},
165+
},
166+
{
167+
name: "single partition - allowed",
168+
group: "a",
169+
config: &types.TaskListPartitionConfig{
170+
ReadPartitions: map[int]*types.TaskListPartition{
171+
0: {[]string{"a"}},
172+
},
173+
},
174+
allowed: []string{tl},
124175
},
125176
{
126-
name: "multiple partitions - single option",
127-
group: "b",
128-
numRead: 2,
129-
numWrite: 2,
130-
isolationGroups: []string{"a", "b"},
131-
allowed: []string{getPartitionTaskListName(tl, 1)},
177+
name: "single partition - not allowed",
178+
group: "a",
179+
config: &types.TaskListPartitionConfig{
180+
ReadPartitions: map[int]*types.TaskListPartition{
181+
0: {[]string{"b"}},
182+
},
183+
},
184+
shouldFallback: true,
185+
allowed: []string{"fallback"},
132186
},
133187
{
134-
name: "multiple partitions - multiple options",
135-
group: "a",
136-
numRead: 2,
137-
numWrite: 2,
138-
isolationGroups: []string{"a"},
139-
allowed: []string{tl, getPartitionTaskListName(tl, 1)},
188+
name: "single partition - isolation disabled",
189+
group: "a",
190+
config: &types.TaskListPartitionConfig{
191+
ReadPartitions: map[int]*types.TaskListPartition{
192+
0: {},
193+
},
194+
},
195+
disableIsolation: true,
196+
shouldFallback: true,
197+
allowed: []string{"fallback"},
140198
},
141199
{
142-
name: "scaling - multiple options",
143-
group: "d",
144-
numRead: 4,
145-
numWrite: 3,
146-
isolationGroups: []string{"a", "b", "c", "d"},
147-
// numRead = 4 means tasks for d could be in the last partition (idx=3)
148-
// numWrite = 3 means new tasks for d are being written to the root (idx=0)
149-
allowed: []string{tl, getPartitionTaskListName(tl, 3)},
200+
name: "multiple partitions - single option",
201+
group: "b",
202+
config: &types.TaskListPartitionConfig{
203+
ReadPartitions: map[int]*types.TaskListPartition{
204+
0: {[]string{"a"}},
205+
1: {[]string{"b"}},
206+
},
207+
},
208+
allowed: []string{getPartitionTaskListName(tl, 1)},
150209
},
151210
{
152-
name: "fallback - no group",
153-
numRead: 2,
154-
numWrite: 2,
155-
isolationGroups: []string{"a"},
156-
shouldFallback: true,
157-
allowed: []string{"fallback"},
211+
name: "multiple partitions - multiple options",
212+
group: "a",
213+
config: &types.TaskListPartitionConfig{
214+
ReadPartitions: map[int]*types.TaskListPartition{
215+
0: {[]string{"a"}},
216+
1: {[]string{"a"}},
217+
},
218+
},
219+
allowed: []string{tl, getPartitionTaskListName(tl, 1)},
158220
},
159221
{
160-
name: "fallback - no groups",
161-
group: "a",
162-
numRead: 2,
163-
numWrite: 2,
164-
isolationGroups: []string{""},
165-
shouldFallback: true,
166-
allowed: []string{"fallback"},
222+
name: "fallback - no group",
223+
config: &types.TaskListPartitionConfig{
224+
ReadPartitions: map[int]*types.TaskListPartition{
225+
0: {[]string{"a"}},
226+
1: {[]string{"b"}},
227+
},
228+
},
229+
shouldFallback: true,
230+
allowed: []string{"fallback"},
167231
},
168232
}
169233
for _, tc := range cases {
170234
t.Run(tc.name, func(t *testing.T) {
171-
lb, fallback := createWithMocks(t, tc.isolationGroups, tc.numWrite, tc.numRead)
235+
lb, fallback := createWithMocks(t, !tc.disableIsolation, tc.config)
172236
req := &types.MatchingQueryWorkflowRequest{
173237
DomainUUID: "domainId",
174238
TaskList: &types.TaskList{
@@ -187,69 +251,85 @@ func TestIsolationPickReadPartition(t *testing.T) {
187251

188252
func TestIsolationGetPartitionsForGroup(t *testing.T) {
189253
cases := []struct {
190-
name string
191-
group string
192-
isolationGroups []string
193-
partitions int
194-
expected []int
254+
name string
255+
group string
256+
partitions map[int]*types.TaskListPartition
257+
expected []int
195258
}{
196259
{
197-
name: "single partition",
198-
group: "a",
199-
isolationGroups: []string{"a", "b", "c"},
200-
partitions: 1,
201-
expected: []int{0},
260+
name: "single partition",
261+
group: "a",
262+
partitions: map[int]*types.TaskListPartition{
263+
0: {[]string{"a", "b", "c"}},
264+
},
265+
expected: []int{0},
202266
},
203267
{
204-
name: "partitions less than groups",
205-
group: "b",
206-
isolationGroups: []string{"a", "b", "c"},
207-
partitions: 2,
208-
expected: []int{1},
268+
name: "single partition - wildcard",
269+
group: "a",
270+
partitions: map[int]*types.TaskListPartition{
271+
0: {},
272+
},
273+
expected: []int{0},
209274
},
210275
{
211-
name: "partitions equals groups",
212-
group: "c",
213-
isolationGroups: []string{"a", "b", "c"},
214-
partitions: 3,
215-
expected: []int{2},
276+
name: "single partition - no options",
277+
group: "a",
278+
partitions: map[int]*types.TaskListPartition{
279+
0: {[]string{"b"}},
280+
},
281+
expected: nil,
216282
},
217283
{
218-
name: "partitions greater than groups",
219-
group: "c",
220-
isolationGroups: []string{"a", "b", "c"},
221-
partitions: 4,
222-
expected: []int{2},
284+
name: "multiple partitions - single option",
285+
group: "b",
286+
partitions: map[int]*types.TaskListPartition{
287+
0: {[]string{"a", "c"}},
288+
1: {[]string{"b"}},
289+
},
290+
expected: []int{1},
223291
},
224292
{
225-
name: "partitions greater than groups - multiple assigned",
226-
group: "a",
227-
isolationGroups: []string{"a", "b", "c"},
228-
partitions: 4,
229-
expected: []int{0, 3},
293+
name: "multiple partitions - multiple options",
294+
group: "b",
295+
partitions: map[int]*types.TaskListPartition{
296+
0: {[]string{"a", "b", "c"}},
297+
1: {[]string{"b"}},
298+
2: {[]string{"d"}},
299+
},
300+
expected: []int{0, 1},
230301
},
231302
{
232-
name: "not ok - no isolation group",
233-
group: "",
234-
isolationGroups: []string{"a"},
235-
partitions: 4,
303+
name: "multiple partitions - multiple options with wildcard",
304+
group: "b",
305+
partitions: map[int]*types.TaskListPartition{
306+
0: {[]string{"a", "c"}},
307+
1: {[]string{"b"}},
308+
2: {},
309+
},
310+
expected: []int{1, 2},
236311
},
237312
{
238-
name: "not ok - no isolation groups",
239-
group: "a",
240-
isolationGroups: []string{},
241-
partitions: 4,
313+
name: "multiple partitions - no options",
314+
group: "d",
315+
partitions: map[int]*types.TaskListPartition{
316+
0: {[]string{"a", "c"}},
317+
1: {[]string{"b"}},
318+
2: {[]string{"c"}},
319+
},
320+
expected: nil,
242321
},
243322
{
244-
name: "not ok - unknown isolation group",
245-
group: "d",
246-
isolationGroups: []string{"a", "b", "c"},
247-
partitions: 4,
323+
name: "no group",
324+
partitions: map[int]*types.TaskListPartition{
325+
0: {[]string{"a", "b", "c"}},
326+
},
327+
expected: nil,
248328
},
249329
}
250330
for _, tc := range cases {
251331
t.Run(tc.name, func(t *testing.T) {
252-
lb, _ := createWithMocks(t, tc.isolationGroups, tc.partitions, tc.partitions)
332+
lb, _ := createWithMocks(t, true, nil)
253333
actual, ok := lb.getPartitionsForGroup(tc.group, tc.partitions)
254334
if tc.expected == nil {
255335
assert.Nil(t, actual)
@@ -266,18 +346,20 @@ func TestIsolationGetPartitionsForGroup(t *testing.T) {
266346
}
267347
}
268348

269-
func createWithMocks(t *testing.T, isolationGroups []string, writePartitions, readPartitions int) (*isolationLoadBalancer, *MockLoadBalancer) {
349+
func createWithMocks(t *testing.T, isolationEnabled bool, config *types.TaskListPartitionConfig) (*isolationLoadBalancer, *MockLoadBalancer) {
270350
ctrl := gomock.NewController(t)
271351
fallback := NewMockLoadBalancer(ctrl)
272352
cfg := NewMockPartitionConfigProvider(ctrl)
273-
cfg.EXPECT().GetNumberOfWritePartitions(gomock.Any(), gomock.Any(), gomock.Any()).Return(writePartitions).AnyTimes()
274-
cfg.EXPECT().GetNumberOfReadPartitions(gomock.Any(), gomock.Any(), gomock.Any()).Return(readPartitions).AnyTimes()
275-
allIsolationGroups := func() []string {
276-
return isolationGroups
277-
}
353+
cfg.EXPECT().GetPartitionConfig(gomock.Any(), gomock.Any(), gomock.Any()).Return(config).AnyTimes()
354+
278355
return &isolationLoadBalancer{
279-
provider: cfg,
280-
fallback: fallback,
281-
allIsolationGroups: allIsolationGroups,
356+
provider: cfg,
357+
fallback: fallback,
358+
domainIDToName: func(s string) (string, error) {
359+
return s, nil
360+
},
361+
isolationEnabled: func(s string) bool {
362+
return isolationEnabled
363+
},
282364
}, fallback
283365
}

‎client/matching/partition_config_provider.go

+50-38
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type (
4545
GetNumberOfReadPartitions(domainID string, taskList types.TaskList, taskListType int) int
4646
// GetNumberOfWritePartitions returns the number of write partitions
4747
GetNumberOfWritePartitions(domainID string, taskList types.TaskList, taskListType int) int
48+
// GetPartitionConfig returns the cached partition configuration
49+
GetPartitionConfig(domainID string, taskList types.TaskList, taskListType int) *types.TaskListPartitionConfig
4850
// UpdatePartitionConfig updates the partition configuration for a task list
4951
UpdatePartitionConfig(domainID string, taskList types.TaskList, taskListType int, config *types.TaskListPartitionConfig)
5052
}
@@ -65,6 +67,8 @@ type (
6567
}
6668
)
6769

70+
var singlePartitionConfig = createDefaultConfig(1, 1)
71+
6872
func (c *syncedTaskListPartitionConfig) updateConfig(newConfig types.TaskListPartitionConfig) bool {
6973
c.Lock()
7074
defer c.Unlock()
@@ -99,61 +103,53 @@ func NewPartitionConfigProvider(
99103
}
100104

101105
func (p *partitionConfigProviderImpl) GetNumberOfReadPartitions(domainID string, taskList types.TaskList, taskListType int) int {
102-
domainName, err := p.domainIDToName(domainID)
103-
if err != nil {
104-
return 1
105-
}
106-
if !p.enableReadFromCache(domainName, taskList.GetName(), taskListType) {
107-
return p.nReadPartitions(domainName, taskList.GetName(), taskListType)
108-
}
109-
c := p.getPartitionConfig(domainID, taskList, taskListType)
110-
if c == nil {
111-
return 1
112-
}
113-
c.RLock()
114-
v := c.Version
115-
w := len(c.WritePartitions)
116-
r := len(c.ReadPartitions)
117-
c.RUnlock()
118-
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
119-
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
120-
scope.UpdateGauge(metrics.TaskListPartitionConfigNumWriteGauge, float64(w))
121-
scope.UpdateGauge(metrics.TaskListPartitionConfigVersionGauge, float64(v))
122-
return int(r)
106+
config := p.GetPartitionConfig(domainID, taskList, taskListType)
107+
return len(config.ReadPartitions)
123108
}
124109

125110
func (p *partitionConfigProviderImpl) GetNumberOfWritePartitions(domainID string, taskList types.TaskList, taskListType int) int {
111+
config := p.GetPartitionConfig(domainID, taskList, taskListType)
112+
v := config.Version
113+
w := len(config.WritePartitions)
114+
r := len(config.ReadPartitions)
115+
if w > r {
116+
p.logger.Warn("Number of write partitions exceeds number of read partitions, using number of read partitions", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.GetName()), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", r), tag.Dynamic("write-partition", w), tag.Dynamic("config-version", v))
117+
return r
118+
}
119+
return w
120+
}
121+
122+
func (p *partitionConfigProviderImpl) GetPartitionConfig(domainID string, taskList types.TaskList, taskListType int) *types.TaskListPartitionConfig {
126123
domainName, err := p.domainIDToName(domainID)
127124
if err != nil {
128-
return 1
125+
return createDefaultConfig(1, 1)
129126
}
130127
if !p.enableReadFromCache(domainName, taskList.GetName(), taskListType) {
131-
nPartitions := p.nWritePartitions(domainName, taskList.GetName(), taskListType)
128+
nWrite := p.nWritePartitions(domainName, taskList.GetName(), taskListType)
129+
nRead := p.nReadPartitions(domainName, taskList.GetName(), taskListType)
132130
// checks to make sure number of writes never exceeds number of reads
133-
if nRead := p.nReadPartitions(domainName, taskList.GetName(), taskListType); nPartitions > nRead {
134-
p.logger.Warn("Number of write partitions exceeds number of read partitions, using number of read partitions", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.GetName()), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", nRead), tag.Dynamic("write-partition", nPartitions))
135-
nPartitions = nRead
131+
if nWrite > nRead {
132+
p.logger.Warn("Number of write partitions exceeds number of read partitions, using number of read partitions", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.GetName()), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", nRead), tag.Dynamic("write-partition", nWrite))
133+
nWrite = nRead
136134
}
137-
return nPartitions
135+
return createDefaultConfig(nRead, nWrite)
138136
}
139-
c := p.getPartitionConfig(domainID, taskList, taskListType)
137+
c := p.getCachedPartitionConfig(domainID, taskList, taskListType)
140138
if c == nil {
141-
return 1
139+
return singlePartitionConfig
142140
}
143141
c.RLock()
144-
v := c.Version
145-
w := len(c.WritePartitions)
146-
r := len(c.ReadPartitions)
142+
config := c.TaskListPartitionConfig
147143
c.RUnlock()
144+
v := config.Version
145+
w := len(config.WritePartitions)
146+
r := len(config.ReadPartitions)
148147
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
149148
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
150149
scope.UpdateGauge(metrics.TaskListPartitionConfigNumWriteGauge, float64(w))
151150
scope.UpdateGauge(metrics.TaskListPartitionConfigVersionGauge, float64(v))
152-
if w > r {
153-
p.logger.Warn("Number of write partitions exceeds number of read partitions, using number of read partitions", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.GetName()), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", r), tag.Dynamic("write-partition", w), tag.Dynamic("config-version", v))
154-
return int(r)
155-
}
156-
return int(w)
151+
152+
return &config
157153
}
158154

159155
func (p *partitionConfigProviderImpl) UpdatePartitionConfig(domainID string, taskList types.TaskList, taskListType int, config *types.TaskListPartitionConfig) {
@@ -186,7 +182,7 @@ func (p *partitionConfigProviderImpl) UpdatePartitionConfig(domainID string, tas
186182
}
187183
}
188184

189-
func (p *partitionConfigProviderImpl) getPartitionConfig(domainID string, taskList types.TaskList, taskListType int) *syncedTaskListPartitionConfig {
185+
func (p *partitionConfigProviderImpl) getCachedPartitionConfig(domainID string, taskList types.TaskList, taskListType int) *syncedTaskListPartitionConfig {
190186
if taskList.GetKind() != types.TaskListKindNormal {
191187
return nil
192188
}
@@ -217,3 +213,19 @@ func getTaskListTypeTag(taskListType int) metrics.Tag {
217213
return metrics.TaskListTypeTag("")
218214
}
219215
}
216+
217+
func createDefaultConfig(nRead, nWrite int) *types.TaskListPartitionConfig {
218+
read := make(map[int]*types.TaskListPartition, nRead)
219+
for i := 0; i < nRead; i++ {
220+
read[i] = &types.TaskListPartition{}
221+
}
222+
write := make(map[int]*types.TaskListPartition, nWrite)
223+
for i := 0; i < nWrite; i++ {
224+
write[i] = &types.TaskListPartition{}
225+
}
226+
return &types.TaskListPartitionConfig{
227+
Version: 0,
228+
ReadPartitions: read,
229+
WritePartitions: write,
230+
}
231+
}

‎client/matching/partition_config_provider_mock.go

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎client/matching/partition_config_provider_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,66 @@ func TestGetNumberOfWritePartitions(t *testing.T) {
212212
}
213213
}
214214

215+
func TestGetPartitionConfig(t *testing.T) {
216+
testCases := []struct {
217+
name string
218+
taskListKind types.TaskListKind
219+
enableReadFromCache bool
220+
cachedConfigExists bool
221+
expected *types.TaskListPartitionConfig
222+
}{
223+
{
224+
name: "get from dynamic config",
225+
taskListKind: types.TaskListKindNormal,
226+
enableReadFromCache: false,
227+
expected: createDefaultConfig(3, 3),
228+
},
229+
{
230+
name: "get from cache",
231+
taskListKind: types.TaskListKindNormal,
232+
enableReadFromCache: true,
233+
cachedConfigExists: true,
234+
// It's wrong but it's the value we've got
235+
expected: createDefaultConfig(2, 5),
236+
},
237+
{
238+
name: "get from cache for sticky tasklist",
239+
taskListKind: types.TaskListKindSticky,
240+
enableReadFromCache: true,
241+
expected: createDefaultConfig(1, 1),
242+
},
243+
{
244+
name: "cache config missing, fallback to default",
245+
taskListKind: types.TaskListKindNormal,
246+
enableReadFromCache: true,
247+
cachedConfigExists: false,
248+
expected: createDefaultConfig(1, 1),
249+
},
250+
}
251+
252+
for _, tc := range testCases {
253+
t.Run(tc.name, func(t *testing.T) {
254+
partitionProvider, mockCache := setUpMocksForPartitionConfigProvider(t, tc.enableReadFromCache)
255+
256+
if tc.enableReadFromCache && tc.taskListKind == types.TaskListKindNormal {
257+
if tc.cachedConfigExists {
258+
mockCache.EXPECT().Get(gomock.Any()).Return(&syncedTaskListPartitionConfig{
259+
TaskListPartitionConfig: types.TaskListPartitionConfig{ReadPartitions: partitions(2), WritePartitions: partitions(5)},
260+
}).Times(1)
261+
} else {
262+
mockCache.EXPECT().Get(gomock.Any()).Return(nil).Times(1)
263+
}
264+
}
265+
kind := tc.taskListKind
266+
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}
267+
config := partitionProvider.GetPartitionConfig("test-domain-id", taskList, 0)
268+
269+
// Validate result
270+
assert.Equal(t, tc.expected, config)
271+
})
272+
}
273+
}
274+
215275
func TestUpdatePartitionConfig(t *testing.T) {
216276
testCases := []struct {
217277
name string

0 commit comments

Comments
 (0)
Please sign in to comment.