Skip to content

Commit a071642

Browse files
committed
[Enhancement] add support for restore to ccr
1 parent f7b77e5 commit a071642

File tree

6 files changed

+110
-6
lines changed

6 files changed

+110
-6
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.backup;
19+
20+
import org.apache.doris.persist.gson.GsonUtils;
21+
22+
import com.google.common.collect.Maps;
23+
import com.google.gson.annotations.SerializedName;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.stream.Collectors;
28+
29+
public class RestoreBinlogInfo {
30+
// currently we are sending only DB and table info.
31+
// partitions level restore not possible since, there can be
32+
// race condition when two partition recover and ccr-syncer try to sync it.
33+
@SerializedName(value = "dbId")
34+
private long dbId;
35+
@SerializedName(value = "dbName")
36+
private String dbName;
37+
@SerializedName(value = "tableInfo")
38+
// map of tableId and TableName.
39+
private Map<Long, String> tableInfo = Maps.newHashMap();
40+
41+
/*
42+
* constuctor
43+
*/
44+
public RestoreBinlogInfo(long dbId, String dbName) {
45+
this.dbId = dbId;
46+
this.dbName = dbName;
47+
}
48+
49+
public void addTableInfo(long tableId, String tableName) {
50+
tableInfo.put(tableId, tableName);
51+
}
52+
53+
public long getDbId() {
54+
return dbId;
55+
}
56+
57+
public List<Long> getTableIdList() {
58+
return tableInfo.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
59+
}
60+
61+
public String toJson() {
62+
return GsonUtils.GSON.toJson(this);
63+
}
64+
65+
public static RestoreBinlogInfo fromJson(String json) {
66+
return GsonUtils.GSON.fromJson(json, RestoreBinlogInfo.class);
67+
}
68+
}

fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ public enum RestoreJobState {
224224
@SerializedName("prop")
225225
private Map<String, String> properties = Maps.newHashMap();
226226

227+
@SerializedName("resbinlog")
228+
private RestoreBinlogInfo restoreBinlogInfo = null; // only set in FinishedCase.
229+
227230
private MarkedCountDownLatch<Long, Long> createReplicaTasksLatch = null;
228231

229232
public RestoreJob() {
@@ -424,6 +427,10 @@ public boolean isFinished() {
424427
return state == RestoreJobState.FINISHED;
425428
}
426429

430+
public RestoreBinlogInfo getRestoreBinlogInfo() {
431+
return restoreBinlogInfo;
432+
}
433+
427434
@Override
428435
public synchronized Status updateRepo(Repository repo) {
429436
this.repo = repo;
@@ -2097,6 +2104,7 @@ private Status allTabletCommitted(boolean isReplay) {
20972104
return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
20982105
}
20992106

2107+
restoreBinlogInfo = new RestoreBinlogInfo(dbId, db.getName());
21002108
// replace the origin tables in atomic.
21012109
if (isAtomicRestore) {
21022110
Status st = atomicReplaceOlapTables(db, isReplay);
@@ -2113,6 +2121,9 @@ private Status allTabletCommitted(boolean isReplay) {
21132121
if (tbl == null) {
21142122
continue;
21152123
}
2124+
//just restore existing table, then version will change.
2125+
// so we need to write bin log for those tables also.
2126+
restoreBinlogInfo.addTableInfo(tbl.getId(), tbl.getName());
21162127
OlapTable olapTbl = (OlapTable) tbl;
21172128
if (!tbl.writeLockIfExist()) {
21182129
continue;
@@ -2158,6 +2169,7 @@ private Status allTabletCommitted(boolean isReplay) {
21582169
}
21592170

21602171
if (!isReplay) {
2172+
restoredTbls.stream().forEach(tbl -> restoreBinlogInfo.addTableInfo(tbl.getId(), tbl.getName()));
21612173
restoredPartitions.clear();
21622174
restoredTbls.clear();
21632175
restoredResources.clear();
@@ -2173,9 +2185,9 @@ private Status allTabletCommitted(boolean isReplay) {
21732185
state = RestoreJobState.FINISHED;
21742186

21752187
env.getEditLog().logRestoreJob(this);
2176-
21772188
// Only send release snapshot tasks after the job is finished.
21782189
releaseSnapshots(savedSnapshotInfos);
2190+
restoreBinlogInfo = null;
21792191
}
21802192

21812193
LOG.info("job is finished. is replay: {}. {}", isReplay, this);

fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.alter.AlterJobV2;
2121
import org.apache.doris.alter.IndexChangeJob;
22+
import org.apache.doris.backup.RestoreBinlogInfo;
2223
import org.apache.doris.catalog.Database;
2324
import org.apache.doris.catalog.Env;
2425
import org.apache.doris.common.Config;
@@ -355,6 +356,17 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
355356
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
356357
}
357358

359+
public void addRestoreInfo(RestoreBinlogInfo info, long commitSeq) {
360+
long dbId = info.getDbId();
361+
List<Long> tableIds = info.getTableIdList();
362+
long timestamp = -1;
363+
TBinlogType type = TBinlogType.RESTORE_INFO;
364+
String data = info.toJson();
365+
366+
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
367+
}
368+
369+
358370
public void addTableRename(TableInfo info, long commitSeq) {
359371
long dbId = info.getDbId();
360372
List<Long> tableIds = Lists.newArrayList();

fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.analysis.UserIdentity;
2424
import org.apache.doris.backup.BackupJob;
2525
import org.apache.doris.backup.Repository;
26+
import org.apache.doris.backup.RestoreBinlogInfo;
2627
import org.apache.doris.backup.RestoreJob;
2728
import org.apache.doris.binlog.AddPartitionRecord;
2829
import org.apache.doris.binlog.CreateTableRecord;
@@ -1726,7 +1727,13 @@ public void logAlterRepository(Repository repo) {
17261727
}
17271728

17281729
public void logRestoreJob(RestoreJob job) {
1729-
logEdit(OperationType.OP_RESTORE_JOB, job);
1730+
long logId = logEdit(OperationType.OP_RESTORE_JOB, job);
1731+
// write bin log only if restore job the finished.
1732+
RestoreBinlogInfo restoreBinlogInfo = job.getRestoreBinlogInfo();
1733+
if ((job.isFinished()) && (restoreBinlogInfo != null)) {
1734+
LOG.info("log restore info, logId:{}, infos: {}", logId, restoreBinlogInfo.toJson());
1735+
Env.getCurrentEnv().getBinlogManager().addRestoreInfo(restoreBinlogInfo, logId);
1736+
}
17301737
}
17311738

17321739
public void logUpdateUserProperty(UserPropertyInfo propertyInfo) {

gensrc/thrift/FrontendService.thrift

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ enum TBinlogType {
10281028
RENAME_PARTITION = 22,
10291029
DROP_ROLLUP = 23,
10301030
RECOVER_INFO = 24,
1031-
1031+
RESTORE_INFO = 25,
10321032
// Keep some IDs for allocation so that when new binlog types are added in the
10331033
// future, the changes can be picked back to the old versions without breaking
10341034
// compatibility.
@@ -1044,8 +1044,7 @@ enum TBinlogType {
10441044
// MODIFY_XXX = 17,
10451045
// MIN_UNKNOWN = 18,
10461046
// UNKNOWN_3 = 19,
1047-
MIN_UNKNOWN = 25,
1048-
UNKNOWN_10 = 26,
1047+
MIN_UNKNOWN = 26,
10491048
UNKNOWN_11 = 27,
10501049
UNKNOWN_12 = 28,
10511050
UNKNOWN_13 = 29,

regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -947,4 +947,10 @@ class Syncer {
947947
)
948948
"""
949949
}
950-
}
950+
951+
void disableDbBinlog() {
952+
suite.sql """
953+
ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false")
954+
"""
955+
}
956+
}

0 commit comments

Comments
 (0)