Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 85 additions & 49 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ func (this InstancesByCountReplicas) Less(i, j int) bool {
return len(this[i].Replicas) < len(this[j].Replicas)
}

// InstancesByDc is a sortable type for Instance
// 1. Instances are sorted by DC
// 2. Within DC group instances are sorted by replicas count
// 3. Within ReplicasCount group insances are:
// a) not sorted if ReplicasCount > 0
// b) sorted by replication lag if ReplicasCount == 0
//
// DC1 < DC2
// if DC1 == DC2 => len(Replicas1) < len (Replicas2)
// if Replicas.cnt == 0 => replicationLag1 < replicatonLag2
type InstancesByDc [](*Instance)

func (this InstancesByDc) Len() int { return len(this) }
func (this InstancesByDc) Swap(i, j int) { this[i], this[j] = this[j], this[i] }
func (this InstancesByDc) Less(i, j int) bool {
if this[i].DataCenter == this[j].DataCenter {
if len(this[i].Replicas) == 0 && len(this[j].Replicas) == 0 {
return this[i].ReplicationLagSeconds.Int64 < this[j].ReplicationLagSeconds.Int64
}
return len(this[i].Replicas) < len(this[j].Replicas)
}
return (this[i].DataCenter < this[j].DataCenter)
}

// Constant strings for Group Replication information
// See https://dev.mysql.com/doc/refman/8.0/en/replication-group-members-table.html for additional information.
const (
Expand Down Expand Up @@ -1665,62 +1689,91 @@ func filterOSCInstances(instances [](*Instance)) [](*Instance) {
return result
}

// Get two busiest instances per DC
func getTwoBusiestPerDC(all [](*Instance)) [](*Instance) {
result := [](*Instance){}

// sort by DC and replicas count
sort.Sort(sort.Reverse(InstancesByDc(all)))

currentDCInstances := 0
var currentDC *string = nil

for _, im := range all {
if currentDC == nil || *currentDC != im.DataCenter {
currentDCInstances = 0
currentDC = &im.DataCenter
}
if currentDCInstances > 1 {
continue
}
currentDCInstances++
result = append(result, im)
}
return result
}

// GetClusterOSCReplicas returns a heuristic list of replicas which are fit as controll replicas for an OSC operation.
// These would be intermediate masters
func GetClusterOSCReplicas(clusterName string) ([](*Instance), error) {
intermediateMasters := [](*Instance){}
result := [](*Instance){}
var err error
if strings.Index(clusterName, "'") >= 0 {
if strings.Contains(clusterName, "'") {
return [](*Instance){}, log.Errorf("Invalid cluster name: %s", clusterName)
}

result := [](*Instance){}
// Stage 1: 1st tier servers.
// We get up to two 1st tier servers from each DC in the following order:
// 1. Most busiest IMs
// 2. Most lagging leaf nodes
// Examples:
// 1. If there are N > 1 IMs in the DC, we will use 2 busiest ones
// (having the highest number of replicas)
// 2. If there is only 1 IM in the DC, but there are some leaf nodes,
// we will use IM + most lagging leaf node
// 3. If there are no IMs in the DC, but there are leaf nodes, we will use
// up to two most lagging leaf nodes
//
// So this stage will collect at most 2 servers per DC
{
// Pick up to two busiest IMs
condition := `
replication_depth = 1
and num_slave_hosts > 0
and cluster_name = ?
`
intermediateMasters, err = readInstancesByCondition(condition, sqlutils.Args(clusterName), "")
firstTierServers, err := readInstancesByCondition(condition, sqlutils.Args(clusterName), "")
if err != nil {
return result, err
}
sort.Sort(sort.Reverse(InstancesByCountReplicas(intermediateMasters)))
intermediateMasters = filterOSCInstances(intermediateMasters)
intermediateMasters = intermediateMasters[0:math.MinInt(2, len(intermediateMasters))]
result = append(result, intermediateMasters...)

firstTierServers = filterOSCInstances(firstTierServers)
result = append(result, getTwoBusiestPerDC(firstTierServers)...)
}

// Stage 2: 2nd tier servers
// Examine all selected 1st tier servers, and if they are IMs, get at most
// two of their busiest replicas (2nd tier servers).
// So this stage will collect at most 2 replicas per IM. If we collected 2 IMs
// per DC in the 1st stage, here we will get 4 servers per DC
{
// Get 2 replicas of found IMs, if possible
if len(intermediateMasters) == 1 {
// Pick 2 replicas for this IM
replicas, err := ReadReplicaInstances(&(intermediateMasters[0].Key))
// Get at most 2 replicas of found IMs
for _, im := range result {
if len(im.Replicas) == 0 {
// this is 1st tier leaf
continue
}
replicas, err := ReadReplicaInstances(&im.Key)
if err != nil {
return result, err
}
sort.Sort(sort.Reverse(InstancesByCountReplicas(replicas)))
replicas = filterOSCInstances(replicas)
replicas = replicas[0:math.MinInt(2, len(replicas))]
result = append(result, replicas...)

}
if len(intermediateMasters) == 2 {
// Pick one replica from each IM (should be possible)
for _, im := range intermediateMasters {
replicas, err := ReadReplicaInstances(&im.Key)
if err != nil {
return result, err
}
sort.Sort(sort.Reverse(InstancesByCountReplicas(replicas)))
replicas = filterOSCInstances(replicas)
if len(replicas) > 0 {
result = append(result, replicas[0])
}
}
}
}

// Stage 3: 3rd tier servers
// Get 2 busiest 3rd tier replicas per DC
{
// Get 2 3rd tier replicas, if possible
condition := `
replication_depth = 3
and cluster_name = ?
Expand All @@ -1729,25 +1782,8 @@ func GetClusterOSCReplicas(clusterName string) ([](*Instance), error) {
if err != nil {
return result, err
}
sort.Sort(sort.Reverse(InstancesByCountReplicas(replicas)))
replicas = filterOSCInstances(replicas)
replicas = replicas[0:math.MinInt(2, len(replicas))]
result = append(result, replicas...)
}
{
// Get 2 1st tier leaf replicas, if possible
condition := `
replication_depth = 1
and num_slave_hosts = 0
and cluster_name = ?
`
replicas, err := readInstancesByCondition(condition, sqlutils.Args(clusterName), "")
if err != nil {
return result, err
}
replicas = filterOSCInstances(replicas)
replicas = replicas[0:math.MinInt(2, len(replicas))]
result = append(result, replicas...)
result = append(result, getTwoBusiestPerDC(replicas)...)
}

return result, nil
Expand Down
Loading