Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ticdc): prevent using the same TiDB cluster as both upstream and downstream #12063

Merged
73 changes: 73 additions & 0 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/pkg/check"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -295,6 +296,22 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, up.PDClient, changefeedConfig.SinkURI)
if err != nil {
_ = c.Error(err)
log.Error("same in create", zap.Error(err))
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support creating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
log.Error("same in create", zap.Error(err))
return
}

upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand Down Expand Up @@ -380,6 +397,37 @@ func (h *OpenAPI) ResumeChangefeed(c *gin.Context) {
return
}

cfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

upManager, err := h.capture.GetUpstreamManager()
if err != nil {
_ = c.Error(err)
return
}
up, err := upManager.GetDefaultUpstream()
if err != nil {
_ = c.Error(err)
return
}
// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, up.PDClient, cfInfo.SinkURI)
if err != nil {
_ = c.Error(err)
log.Error("same in create", zap.Error(err))
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support resuming a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
log.Error("same in create", zap.Error(err))
return
}

job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminResume,
Expand Down Expand Up @@ -453,6 +501,31 @@ func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
return
}

upManager, err := h.capture.GetUpstreamManager()
if err != nil {
_ = c.Error(err)
return
}
up, err := upManager.GetDefaultUpstream()
if err != nil {
_ = c.Error(err)
return
}
// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, up.PDClient, newInfo.SinkURI)
if err != nil {
_ = c.Error(err)
log.Error("same in create", zap.Error(err))
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support updating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
log.Error("same in create", zap.Error(err))
return
}

err = owner.UpdateChangefeed(ctx, newInfo)
if err != nil {
_ = c.Error(err)
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
configUpdated = true
newInfo.Config = cfg.ReplicaConfig.ToInternalReplicaConfig()
}
// If the sinkURI is empty, we keep the old sinkURI.
if cfg.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = cfg.SinkURI
Expand Down
69 changes: 67 additions & 2 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/check"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -96,6 +97,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
return
}
}

provider := h.capture.StatusProvider()
owner, err := h.capture.GetOwner()
if err != nil {
Expand Down Expand Up @@ -141,6 +143,21 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CertAllowedCN: cfg.CertAllowedCN,
}

// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, pdClient, cfg.SinkURI)
if err != nil {
_ = c.Error(err)
log.Error("same in create", zap.Error(err))
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support creating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
log.Error("same in create", zap.Error(err))
return
}

var etcdCli *clientv3.Client
if len(cfg.PDAddrs) == 0 {
etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap()
Expand Down Expand Up @@ -499,6 +516,40 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
return
}

// Check whether the upstream and downstream are the different cluster.
// Generate a pd client first.
var pdClient pd.Client
// if PDAddrs is empty, use the default pdClient
if len(updateCfConfig.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
pdClient = up.PDClient
} else {
credential := updateCfConfig.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err = h.helpers.getPDClient(timeoutCtx, updateCfConfig.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
return
}
defer pdClient.Close()
}
notSame, err := check.UpstreamDownstreamNotSame(ctx, pdClient, newCfInfo.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support updating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

log.Info("New ChangeFeed and Upstream Info",
zap.String("changefeedInfo", newCfInfo.String()),
zap.Any("upstreamInfo", newUpInfo))
Expand Down Expand Up @@ -728,7 +779,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
return
}

_, err = h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
cfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -760,11 +811,12 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
defer cancel()
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
return
}
defer pdClient.Close()
}

// If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not.
newCheckpointTs := status.CheckpointTs
if cfg.OverwriteCheckpointTs != 0 {
Expand Down Expand Up @@ -796,6 +848,19 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}
}()

// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, pdClient, cfInfo.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support resuming a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminResume,
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,11 @@ error = '''
external storage api
'''

["CDC:ErrSameUpstreamDownstream"]
error = '''
upstream and downstream are the same, %s
'''

["CDC:ErrSchedulerRequestFailed"]
error = '''
scheduler request failed, %s
Expand Down
85 changes: 85 additions & 0 deletions pkg/check/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package check

import (
"context"
"net/url"
"strconv"

"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

// UpstreamDownstreamNotSame checks whether the upstream and downstream are not the same cluster.
func UpstreamDownstreamNotSame(ctx context.Context, upPD pd.Client, downSinkURI string) (bool, error) {
upID := upPD.GetClusterID(ctx)

downID, isTiDB, err := getClusterIDBySinkURI(ctx, downSinkURI)
log.Debug("CheckNotSameUpstreamDownstream",
zap.Uint64("upID", upID), zap.Uint64("downID", downID), zap.Bool("isTiDB", isTiDB))
if err != nil {
log.Error("failed to get cluster ID from sink URI",
zap.String("downSinkURI", downSinkURI), zap.Error(err))
return false, cerror.Trace(err)
}

if !isTiDB {
return true, nil
}

return upID != downID, nil
}

func getClusterIDBySinkURI(ctx context.Context, sinkURI string) (uint64, bool, error) {
// Create a MySQL connection by using the sink URI.
url, err := url.Parse(sinkURI)
if err != nil {
return 0, true, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
dsnStr, err := pmysql.GenerateDSN(ctx, url, pmysql.NewConfig(), pmysql.CreateMySQLDBConn)
if err != nil {
return 0, true, cerror.Trace(err)
}
db, err := pmysql.CreateMySQLDBConn(ctx, dsnStr)
if err != nil {
return 0, true, cerror.Trace(err)
}
defer db.Close()

// Check whether the downstream is TiDB.
isTiDB := pmysql.CheckIsTiDB(ctx, db)
if !isTiDB {
return 0, false, nil
}

// Get the cluster ID from the downstream TiDB.
row := db.QueryRowContext(ctx, "SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'cluster_id'")
if err != nil {
return 0, true, cerror.Trace(err)
}
var clusterIDStr string
err = row.Scan(&clusterIDStr)
if err != nil {
return 0, true, cerror.Trace(err)
}
clusterID, err := strconv.ParseUint(clusterIDStr, 10, 64)
if err != nil {
return 0, true, cerror.Trace(err)
}
return clusterID, true, nil
}
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,10 @@ var (
"internal check failed, %s",
errors.RFCCodeText("CDC:ErrInternalCheckFailed"),
)
ErrSameUpstreamDownstream = errors.Normalize(
"upstream and downstream are the same, %s",
errors.RFCCodeText("CDC:ErrSameUpstreamDownstream"),
)

ErrHandleDDLFailed = errors.Normalize(
"handle ddl failed, query: %s, startTs: %d. "+
Expand Down
Loading