Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ticdc): prevent using the same TiDB cluster as both upstream and downstream #12063

Merged
67 changes: 67 additions & 0 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/pkg/check"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -295,6 +296,20 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, up.PDClient, changefeedConfig.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support creating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand Down Expand Up @@ -380,6 +395,35 @@ func (h *OpenAPI) ResumeChangefeed(c *gin.Context) {
return
}

cfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

upManager, err := h.capture.GetUpstreamManager()
if err != nil {
_ = c.Error(err)
return
}
up, err := upManager.GetDefaultUpstream()
if err != nil {
_ = c.Error(err)
return
}
// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, up.PDClient, cfInfo.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support resuming a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminResume,
Expand Down Expand Up @@ -453,6 +497,29 @@ func (h *OpenAPI) UpdateChangefeed(c *gin.Context) {
return
}

upManager, err := h.capture.GetUpstreamManager()
if err != nil {
_ = c.Error(err)
return
}
up, err := upManager.GetDefaultUpstream()
if err != nil {
_ = c.Error(err)
return
}
// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, up.PDClient, newInfo.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support updating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

err = owner.UpdateChangefeed(ctx, newInfo)
if err != nil {
_ = c.Error(err)
Expand Down
14 changes: 14 additions & 0 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import (
"github.com/pingcap/tiflow/cdc/owner"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/check"
cerror "github.com/pingcap/tiflow/pkg/errors"
mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -383,6 +386,17 @@ func TestResumeChangefeed(t *testing.T) {
router := newRouterWithoutStatusProvider(cp)
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedStatusForAPI{CheckpointTs: 1}, nil)
statusProvider.EXPECT().GetChangeFeedInfo(gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{SinkURI: "mock"}, nil).AnyTimes()
pdClient := &gc.MockPDClient{}
cp.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(pdClient), nil).AnyTimes()

// Mock UpstreamDownstreamNotSame check
oldGetClusterID := check.GetClusterIDBySinkURIFn
defer func() { check.GetClusterIDBySinkURIFn = oldGetClusterID }()
check.GetClusterIDBySinkURIFn = func(_ context.Context, _ string) (uint64, bool, error) {
return 0, false, nil
}

// test resume changefeed succeeded
mo.EXPECT().
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
configUpdated = true
newInfo.Config = cfg.ReplicaConfig.ToInternalReplicaConfig()
}
// If the sinkURI is empty, we keep the old sinkURI.
if cfg.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = cfg.SinkURI
Expand Down
52 changes: 50 additions & 2 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/check"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -96,6 +97,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
return
}
}

provider := h.capture.StatusProvider()
owner, err := h.capture.GetOwner()
if err != nil {
Expand Down Expand Up @@ -141,6 +143,19 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CertAllowedCN: cfg.CertAllowedCN,
}

// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, pdClient, cfg.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support creating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

var etcdCli *clientv3.Client
if len(cfg.PDAddrs) == 0 {
etcdCli = h.capture.GetEtcdClient().GetEtcdClient().Unwrap()
Expand Down Expand Up @@ -473,6 +488,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
return
}

var pdClient pd.Client
var storage tidbkv.Storage
// if PDAddrs is not empty, use it to create a new kvstore
// Note: upManager is nil in some unit test cases
Expand All @@ -483,13 +499,19 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
if err != nil {
_ = c.Error(errors.Trace(err))
}
pdClient, err = h.helpers.getPDClient(ctx, pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
return
}
} else { // get the upstream of the changefeed to get the kvstore
up, ok := upManager.Get(oldCfInfo.UpstreamID)
if !ok {
_ = c.Error(errors.New(fmt.Sprintf("upstream %d not found", oldCfInfo.UpstreamID)))
return
}
storage = up.KVStorage
pdClient = up.PDClient
}

newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx,
Expand All @@ -499,6 +521,18 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
return
}

notSame, err := check.UpstreamDownstreamNotSame(ctx, pdClient, newCfInfo.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support updating a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

log.Info("New ChangeFeed and Upstream Info",
zap.String("changefeedInfo", newCfInfo.String()),
zap.Any("upstreamInfo", newUpInfo))
Expand Down Expand Up @@ -728,7 +762,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
return
}

_, err = h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
cfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -760,11 +794,12 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
defer cancel()
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
_ = c.Error(cerror.WrapError(cerror.ErrAPIGetPDClientFailed, err))
return
}
defer pdClient.Close()
}

// If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not.
newCheckpointTs := status.CheckpointTs
if cfg.OverwriteCheckpointTs != 0 {
Expand Down Expand Up @@ -796,6 +831,19 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}
}()

// Check whether the upstream and downstream are the different cluster.
notSame, err := check.UpstreamDownstreamNotSame(ctx, pdClient, cfInfo.SinkURI)
if err != nil {
_ = c.Error(err)
return
}
if !notSame {
_ = c.Error(cerror.ErrSameUpstreamDownstream.GenWithStack(
"TiCDC does not support resuming a changefeed with the same TiDB cluster " +
"as both the source and the target for the changefeed."))
return
}

job := model.AdminJob{
CfID: changefeedID,
Type: model.AdminResume,
Expand Down
24 changes: 24 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
"github.com/pingcap/tiflow/pkg/check"
"github.com/pingcap/tiflow/pkg/config"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
Expand Down Expand Up @@ -77,6 +78,13 @@ func TestCreateChangefeed(t *testing.T) {
cp.EXPECT().GetOwner().Return(mo, nil).AnyTimes()
cp.EXPECT().StatusProvider().Return(provider).AnyTimes()

// Mock UpstreamDownstreamNotSame check
oldGetClusterID := check.GetClusterIDBySinkURIFn
defer func() { check.GetClusterIDBySinkURIFn = oldGetClusterID }()
check.GetClusterIDBySinkURIFn = func(_ context.Context, _ string) (uint64, bool, error) {
return 0, false, nil
}

// case 1: json format mismatches with the spec.
errConfig := struct {
ID string `json:"changefeed_id"`
Expand Down Expand Up @@ -335,6 +343,8 @@ func TestUpdateChangefeed(t *testing.T) {
t.Parallel()
update := testCase{url: "/api/v2/changefeeds/%s", method: "PUT"}
helpers := NewMockAPIV2Helpers(gomock.NewController(t))
helpers.EXPECT().getPDClient(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&mockPDClient{}, nil).AnyTimes()
mockOwner := mock_owner.NewMockOwner(gomock.NewController(t))
mockCapture := mock_capture.NewMockCapture(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(mockCapture, helpers)
Expand All @@ -346,6 +356,13 @@ func TestUpdateChangefeed(t *testing.T) {
mockCapture.EXPECT().IsOwner().Return(true).AnyTimes()
mockCapture.EXPECT().GetOwner().Return(mockOwner, nil).AnyTimes()

// Mock UpstreamDownstreamNotSame check
oldGetClusterID := check.GetClusterIDBySinkURIFn
defer func() { check.GetClusterIDBySinkURIFn = oldGetClusterID }()
check.GetClusterIDBySinkURIFn = func(_ context.Context, _ string) (uint64, bool, error) {
return 0, false, nil
}

// case 1 invalid id
invalidID := "Invalid_#"
w := httptest.NewRecorder()
Expand Down Expand Up @@ -752,6 +769,13 @@ func TestResumeChangefeed(t *testing.T) {
close(done)
}).AnyTimes()

// Mock UpstreamDownstreamNotSame check
oldGetClusterID := check.GetClusterIDBySinkURIFn
defer func() { check.GetClusterIDBySinkURIFn = oldGetClusterID }()
check.GetClusterIDBySinkURIFn = func(_ context.Context, _ string) (uint64, bool, error) {
return 0, false, nil
}

// case 1: invalid changefeed id
w := httptest.NewRecorder()
invalidID := "@^Invalid"
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,11 @@ error = '''
external storage api
'''

["CDC:ErrSameUpstreamDownstream"]
error = '''
upstream and downstream are the same, %s
'''

["CDC:ErrSchedulerRequestFailed"]
error = '''
scheduler request failed, %s
Expand Down
Loading
Loading