Skip to content

Commit 54c4178

Browse files
committed
Update isolationLoadbalancer to use isolation group <-> partition assignment
Rather than arbitrarily assigning isolation groups to partitions, use the assignment stored in the database and cached in the client. Refactor PartitionConfigProvider to expose the full partition configuration.
1 parent b1add26 commit 54c4178

6 files changed

+393
-249
lines changed

client/clientfactory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
164164
defaultLoadBalancer := matching.NewLoadBalancer(partitionConfigProvider)
165165
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(partitionConfigProvider)
166166
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, partitionConfigProvider, cf.logger)
167-
igLoadBalancer := matching.NewIsolationLoadBalancer(weightedLoadBalancer, partitionConfigProvider, cf.allIsolationGroups)
167+
igLoadBalancer := matching.NewIsolationLoadBalancer(weightedLoadBalancer, partitionConfigProvider, domainIDToName, cf.dynConfig)
168168
loadBalancers := map[string]matching.LoadBalancer{
169169
"random": defaultLoadBalancer,
170170
"round-robin": roundRobinLoadBalancer,

client/matching/isolation_loadbalancer.go

+53-70
Original file line numberDiff line numberDiff line change
@@ -24,124 +24,107 @@ package matching
2424

2525
import (
2626
"math/rand"
27-
"slices"
28-
29-
"golang.org/x/exp/maps"
3027

28+
"github.com/uber/cadence/common/dynamicconfig"
3129
"github.com/uber/cadence/common/partition"
3230
"github.com/uber/cadence/common/types"
3331
)
3432

3533
type isolationLoadBalancer struct {
36-
provider PartitionConfigProvider
37-
fallback LoadBalancer
38-
allIsolationGroups func() []string
34+
provider PartitionConfigProvider
35+
fallback LoadBalancer
36+
domainIDToName func(string) (string, error)
37+
isolationEnabled func(string) bool
3938
}
4039

41-
func NewIsolationLoadBalancer(fallback LoadBalancer, provider PartitionConfigProvider, allIsolationGroups func() []string) LoadBalancer {
40+
func NewIsolationLoadBalancer(fallback LoadBalancer, provider PartitionConfigProvider, domainIDToName func(string) (string, error), config *dynamicconfig.Collection) LoadBalancer {
41+
isolationEnabled := config.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation)
4242
return &isolationLoadBalancer{
43-
provider: provider,
44-
fallback: fallback,
45-
allIsolationGroups: allIsolationGroups,
43+
provider: provider,
44+
fallback: fallback,
45+
domainIDToName: domainIDToName,
46+
isolationEnabled: isolationEnabled,
4647
}
4748
}
4849

4950
func (i *isolationLoadBalancer) PickWritePartition(taskListType int, req WriteRequest) string {
5051
taskList := *req.GetTaskList()
51-
nPartitions := i.provider.GetNumberOfWritePartitions(req.GetDomainUUID(), taskList, taskListType)
52-
taskListName := req.GetTaskList().Name
5352

54-
if nPartitions <= 1 {
55-
return taskListName
53+
domainName, err := i.domainIDToName(req.GetDomainUUID())
54+
if err != nil || !i.isolationEnabled(domainName) {
55+
return i.fallback.PickWritePartition(taskListType, req)
5656
}
5757

5858
taskGroup, ok := req.GetPartitionConfig()[partition.IsolationGroupKey]
59-
if !ok {
59+
if !ok || taskGroup == "" {
6060
return i.fallback.PickWritePartition(taskListType, req)
6161
}
6262

63-
partitions, ok := i.getPartitionsForGroup(taskGroup, nPartitions)
64-
if !ok {
63+
config := i.provider.GetPartitionConfig(req.GetDomainUUID(), taskList, taskListType)
64+
65+
partitions := getPartitionsForGroup(taskGroup, config.WritePartitions)
66+
if len(partitions) == 0 {
6567
return i.fallback.PickWritePartition(taskListType, req)
6668
}
6769

68-
p := i.pickBetween(partitions)
70+
p := pickBetween(partitions)
6971

7072
return getPartitionTaskListName(taskList.GetName(), p)
7173
}
7274

7375
func (i *isolationLoadBalancer) PickReadPartition(taskListType int, req ReadRequest, isolationGroup string) string {
7476
taskList := *req.GetTaskList()
75-
nRead := i.provider.GetNumberOfReadPartitions(req.GetDomainUUID(), taskList, taskListType)
76-
taskListName := taskList.Name
77-
78-
if nRead <= 1 {
79-
return taskListName
80-
}
8177

82-
partitions, ok := i.getPartitionsForGroup(isolationGroup, nRead)
83-
if !ok {
78+
domainName, err := i.domainIDToName(req.GetDomainUUID())
79+
if err != nil || !i.isolationEnabled(domainName) || isolationGroup == "" {
8480
return i.fallback.PickReadPartition(taskListType, req, isolationGroup)
8581
}
8682

87-
// Scaling down, we need to consider both sets of partitions
88-
if numWrite := i.provider.GetNumberOfWritePartitions(req.GetDomainUUID(), taskList, taskListType); numWrite != nRead {
89-
writePartitions, ok := i.getPartitionsForGroup(isolationGroup, numWrite)
90-
if ok {
91-
for p := range writePartitions {
92-
partitions[p] = struct{}{}
93-
}
94-
}
83+
config := i.provider.GetPartitionConfig(req.GetDomainUUID(), taskList, taskListType)
84+
85+
partitions := getPartitionsForGroup(isolationGroup, config.ReadPartitions)
86+
if len(partitions) == 0 {
87+
return i.fallback.PickReadPartition(taskListType, req, isolationGroup)
9588
}
9689

97-
p := i.pickBetween(partitions)
90+
p := pickBetween(partitions)
9891

9992
return getPartitionTaskListName(taskList.GetName(), p)
10093
}
10194

10295
func (i *isolationLoadBalancer) UpdateWeight(taskListType int, req ReadRequest, partition string, info *types.LoadBalancerHints) {
96+
i.fallback.UpdateWeight(taskListType, req, partition, info)
10397
}
10498

105-
func (i *isolationLoadBalancer) getPartitionsForGroup(taskGroup string, partitionCount int) (map[int]any, bool) {
99+
func getPartitionsForGroup(taskGroup string, partitions map[int]*types.TaskListPartition) []int {
106100
if taskGroup == "" {
107-
return nil, false
101+
return nil
108102
}
109-
isolationGroups := slices.Clone(i.allIsolationGroups())
110-
slices.Sort(isolationGroups)
111-
index := slices.Index(isolationGroups, taskGroup)
112-
if index == -1 {
113-
return nil, false
114-
}
115-
partitions := make(map[int]any, 1)
116-
// 3 groups [a, b, c] and 4 partitions gives us a mapping like this:
117-
// 0, 3: a
118-
// 1: b
119-
// 2: c
120-
// 4 groups [a, b, c, d] and 10 partitions gives us a mapping like this:
121-
// 0, 4, 8: a
122-
// 1, 5, 9: b
123-
// 2, 6: c
124-
// 3, 7: d
125-
if len(isolationGroups) <= partitionCount {
126-
for j := index; j < partitionCount; j += len(isolationGroups) {
127-
partitions[j] = struct{}{}
103+
104+
var res []int
105+
for id, p := range partitions {
106+
if partitionAcceptsGroup(p, taskGroup) {
107+
res = append(res, id)
128108
}
129-
// 4 groups [a,b,c,d] and 3 partitions gives us a mapping like this:
130-
// 0: a, d
131-
// 1: b
132-
// 2: c
133-
} else {
134-
partitions[index%partitionCount] = struct{}{}
135109
}
136-
if len(partitions) == 0 {
137-
return nil, false
138-
}
139-
return partitions, true
110+
return res
140111
}
141112

142-
func (i *isolationLoadBalancer) pickBetween(partitions map[int]any) int {
113+
func pickBetween(partitions []int) int {
143114
// Could alternatively use backlog weights to make a smarter choice
144-
total := len(partitions)
145-
picked := rand.Intn(total)
146-
return maps.Keys(partitions)[picked]
115+
picked := rand.Intn(len(partitions))
116+
return partitions[picked]
117+
}
118+
119+
func partitionAcceptsGroup(partition *types.TaskListPartition, taskGroup string) bool {
120+
// Accepts all groups
121+
if len(partition.IsolationGroups) == 0 {
122+
return true
123+
}
124+
for _, ig := range partition.IsolationGroups {
125+
if ig == taskGroup {
126+
return true
127+
}
128+
}
129+
return false
147130
}

0 commit comments

Comments
 (0)