Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport VReplication: Support reversing read-only traffic in vtctldclient (#16920) #144

Open
wants to merge 9 commits into
base: release-19.0-github
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro

func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
t.Helper()
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery)
assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1)
require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n",
query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr)
}

func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
Expand Down
9 changes: 8 additions & 1 deletion go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ import (
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultRdonly = 0
defaultReplicas = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

Expand Down
31 changes: 31 additions & 0 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
"--sql", sql, keyspace)
require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err))
}

func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) {
tabletType = strings.ToLower(strings.TrimSpace(tabletType))
rr := getRoutingRules(t)
// We set matched = true by default because it is possible, if --no-routing-rules is set while creating
// a workflow, that the routing rules are empty when the workflow starts.
// We set it to false below when the rule is found, but before matching the routed keyspace.
matched := true
for _, r := range rr.GetRules() {
fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table)
if tabletType != "" && tabletType != "primary" {
fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType)
}
if r.FromTable == fromRule {
// We found the rule, so we can set matched to false here and check for the routed keyspace below.
matched = false
require.NotEmpty(t, r.ToTables)
toTable := r.ToTables[0]
// The ToTables value is of the form "routedKeyspace.table".
routedKeyspace, routedTable, ok := strings.Cut(toTable, ".")
require.True(t, ok)
require.Equal(t, table, routedTable)
if routedKeyspace == toKeyspace {
// We found the rule, the table and keyspace matches, so our search is done.
matched = true
break
}
}
}
require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace)
}
220 changes: 220 additions & 0 deletions go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ import (
"golang.org/x/exp/maps"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -57,7 +61,19 @@ func TestVtctldclientCLI(t *testing.T) {
targetKeyspaceName := "customer"
var mt iMoveTables
workflowName := "wf1"

sourceReplicaTab = vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-101"].Vttablet
require.NotNil(t, sourceReplicaTab)
sourceTab = vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet
require.NotNil(t, sourceTab)

targetTabs := setupMinimalCustomerKeyspace(t)
targetTab1 = targetTabs["-80"]
require.NotNil(t, targetTab1)
targetTab2 = targetTabs["80-"]
require.NotNil(t, targetTab2)
targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-201"].Vttablet
require.NotNil(t, targetReplicaTab1)

t.Run("MoveTablesCreateFlags1", func(t *testing.T) {
testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
Expand All @@ -79,6 +95,19 @@ func TestVtctldclientCLI(t *testing.T) {
"-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet,
"40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet,
}

sourceReplicaTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-201"].Vttablet
require.NotNil(t, sourceReplicaTab)
sourceTab = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-80"].Tablets["zone1-200"].Vttablet
require.NotNil(t, sourceTab)

targetTab1 = tablets["-40"]
require.NotNil(t, targetTab1)
targetTab2 = tablets["40-80"]
require.NotNil(t, targetTab2)
targetReplicaTab1 = vc.Cells["zone1"].Keyspaces[targetKeyspaceName].Shards["-40"].Tablets["zone1-401"].Vttablet
require.NotNil(t, targetReplicaTab1)

splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets)
})
}
Expand Down Expand Up @@ -121,6 +150,7 @@ func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsRespons
// Validates some of the flags created from the previous test.
func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) {
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
wf := (*mt).(iWorkflow)
(*mt).Start() // Need to start because we set auto-start to false.
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
confirmNoRoutingRules(t)
Expand All @@ -140,7 +170,85 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK
for _, tab := range targetTabs {
catchup(t, tab, workflowName, "MoveTables")
}

(*mt).SwitchReads()
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

(*mt).ReverseReads()
validateReadsRouteToSource(t, "replica")
validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateTableRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).ReverseReadsAndWrites()
validateReadsRouteToSource(t, "replica")
validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToSource(t)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateTableRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).ReverseReads()
validateReadsRouteToSource(t, "replica")
validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToTarget(t)
validateTableRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)

(*mt).ReverseWrites()
validateReadsRouteToSource(t, "replica")
validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToSource(t)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateTableRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).ReverseWrites()
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToSource(t)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched)

(*mt).ReverseReads()
validateReadsRouteToSource(t, "replica")
validateTableRoutingRule(t, "customer", "replica", targetKs, sourceKs)
validateWritesRouteToSource(t)
validateTableRoutingRule(t, "customer", "", targetKs, sourceKs)
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

// Confirm that everything is still in sync after our switch fest.
vdiff(t, targetKeyspace, workflowName, "zone1", false, true, nil)

(*mt).SwitchReadsAndWrites()
validateReadsRouteToTarget(t, "replica")
validateTableRoutingRule(t, "customer", "replica", sourceKs, targetKs)
validateWritesRouteToTarget(t)
validateTableRoutingRule(t, "customer", "", sourceKs, targetKs)
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

(*mt).Complete()
confirmRoutingRulesExist(t)
// Confirm that --keep-data was honored.
Expand Down Expand Up @@ -212,6 +320,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
}, workflowFlavorVtctld)

ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName)
wf := rs.(iWorkflow)
rs.Create()
validateReshardResponse(rs)
workflowResponse := getWorkflow(keyspace, workflowName)
Expand All @@ -238,17 +347,123 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
}
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)

shardReadsRouteToSource := func() {
require.True(t, getShardRoute(t, keyspace, "-80", "replica"))
}

shardReadsRouteToTarget := func() {
require.True(t, getShardRoute(t, keyspace, "-40", "replica"))
}

shardWritesRouteToSource := func() {
require.True(t, getShardRoute(t, keyspace, "-80", "primary"))
}

shardWritesRouteToTarget := func() {
require.True(t, getShardRoute(t, keyspace, "-40", "primary"))
}

rs.SwitchReadsAndWrites()
waitForLowLag(t, keyspace, workflowName+"_reverse")
vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil)
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseReadsAndWrites()
waitForLowLag(t, keyspace, workflowName)
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReads()
shardReadsRouteToTarget()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

rs.ReverseReads()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseReadsAndWrites()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseReads()
shardReadsRouteToSource()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)

rs.ReverseWrites()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.ReverseWrites()
shardReadsRouteToTarget()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateReadsSwitched)

rs.ReverseReads()
shardReadsRouteToSource()
shardWritesRouteToSource()
confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

// Confirm that everything is still in sync after our switch fest.
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)

rs.SwitchReadsAndWrites()
shardReadsRouteToTarget()
shardWritesRouteToTarget()
confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)

rs.Complete()
}

func getSrvKeyspace(t *testing.T, keyspace string) *topodatapb.SrvKeyspace {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSrvKeyspaces", keyspace, "zone1")
require.NoError(t, err)
var srvKeyspaces map[string]*topodatapb.SrvKeyspace
err = json2.Unmarshal([]byte(output), &srvKeyspaces)
require.NoError(t, err)
require.Equal(t, 1, len(srvKeyspaces))
return srvKeyspaces["zone1"]
}

func getShardRoute(t *testing.T, keyspace, shard string, tabletType string) bool {
srvKeyspace := getSrvKeyspace(t, keyspace)
for _, partition := range srvKeyspace.Partitions {
tt, err := topoproto.ParseTabletType(tabletType)
require.NoError(t, err)
if partition.ServedType == tt {
for _, shardReference := range partition.ShardReferences {
if shardReference.Name == shard {
return true
}
}
}
}
return false
}

func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse {
(*rs).Show()
reshardOutput := (*rs).GetLastOutput()
Expand Down Expand Up @@ -389,3 +604,8 @@ func validateMoveTablesWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow)
require.Equal(t, binlogdatapb.OnDDLAction_STOP, bls.OnDdl)
require.True(t, bls.StopAfterCopy)
}

func confirmStates(t *testing.T, workflow *iWorkflow, startState, endState string) {
require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Start State: %s", startState))
require.Contains(t, (*workflow).GetLastOutput(), fmt.Sprintf("Current State: %s", endState))
}
Loading
Loading