Skip to content

Commit 323f8c9

Browse files
github-actions[bot]w41terhello-stephen
authored
branch-3.0: [feat](binlog) Support getting binlogs in batch #47557 (#47638)
Cherry-picked from #47557 Co-authored-by: walter <[email protected]> Co-authored-by: Dongyang Li <[email protected]>
1 parent a413a16 commit 323f8c9

File tree

6 files changed

+35
-14
lines changed

6 files changed

+35
-14
lines changed

fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,15 @@ public void addRecoverTableRecord(RecoverInfo info, long commitSeq) {
475475

476476
// get binlog by dbId, return first binlog.version > version
477477
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
478+
Pair<TStatus, List<TBinlog>> result = getBinlog(dbId, tableId, prevCommitSeq, 1);
479+
if (result.second != null && result.second.size() > 0) {
480+
return Pair.of(result.first, result.second.get(0));
481+
}
482+
return Pair.of(result.first, null);
483+
}
484+
485+
// get binlogs by dbId, return the first N binlogs, which first binlog.version > prevCommitSeq
486+
public Pair<TStatus, List<TBinlog>> getBinlog(long dbId, long tableId, long prevCommitSeq, long numAcquired) {
478487
TStatus status = new TStatus(TStatusCode.OK);
479488
lock.readLock().lock();
480489
try {
@@ -485,7 +494,7 @@ public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommit
485494
return Pair.of(status, null);
486495
}
487496

488-
return dbBinlog.getBinlog(tableId, prevCommitSeq);
497+
return dbBinlog.getBinlog(tableId, prevCommitSeq, numAcquired);
489498
} finally {
490499
lock.readLock().unlock();
491500
}

fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,23 @@
2323
import org.apache.doris.thrift.TStatus;
2424
import org.apache.doris.thrift.TStatusCode;
2525

26+
import java.util.ArrayList;
27+
import java.util.List;
2628
import java.util.TreeSet;
29+
import java.util.stream.Collectors;
2730

2831
public class BinlogUtils {
29-
public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long prevCommitSeq) {
32+
public static Pair<TStatus, List<TBinlog>> getBinlog(
33+
TreeSet<TBinlog> binlogs, long prevCommitSeq, long numAcquired) {
3034
TStatus status = new TStatus(TStatusCode.OK);
3135
TBinlog firstBinlog = binlogs.first();
3236

3337
// all commitSeq > commitSeq
3438
if (firstBinlog.getCommitSeq() > prevCommitSeq) {
3539
status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ);
36-
return Pair.of(status, firstBinlog);
40+
List<TBinlog> array = new ArrayList<>();
41+
array.add(firstBinlog);
42+
return Pair.of(status, array);
3743
}
3844

3945
// find first binlog whose commitSeq > commitSeq
@@ -46,7 +52,12 @@ public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long pr
4652
status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ);
4753
return Pair.of(status, null);
4854
} else {
49-
return Pair.of(status, binlog);
55+
numAcquired = Math.min(Math.max(numAcquired, 1), 255);
56+
List<TBinlog> obtain = binlogs.tailSet(binlog)
57+
.stream()
58+
.limit(numAcquired)
59+
.collect(Collectors.toList());
60+
return Pair.of(status, obtain);
5061
}
5162
}
5263

fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ public long getDbId() {
205205
return dbId;
206206
}
207207

208-
public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
208+
public Pair<TStatus, List<TBinlog>> getBinlog(long tableId, long prevCommitSeq, long numAcquired) {
209209
TStatus status = new TStatus(TStatusCode.OK);
210210
lock.readLock().lock();
211211
try {
@@ -216,10 +216,10 @@ public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
216216
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
217217
return Pair.of(status, null);
218218
}
219-
return tableBinlog.getBinlog(prevCommitSeq);
219+
return tableBinlog.getBinlog(prevCommitSeq, numAcquired);
220220
}
221221

222-
return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq);
222+
return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq, numAcquired);
223223
} finally {
224224
lock.readLock().unlock();
225225
}

fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ private void addBinlogWithoutCheck(TBinlog binlog) {
106106
}
107107
}
108108

109-
public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
109+
public Pair<TStatus, List<TBinlog>> getBinlog(long prevCommitSeq, long numAcquired) {
110110
lock.readLock().lock();
111111
try {
112-
return BinlogUtils.getBinlog(binlogs, prevCommitSeq);
112+
return BinlogUtils.getBinlog(binlogs, prevCommitSeq, numAcquired);
113113
} finally {
114114
lock.readLock().unlock();
115115
}

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -2869,7 +2869,9 @@ private TGetBinlogResult getBinlogImpl(TGetBinlogRequest request, String clientI
28692869
TGetBinlogResult result = new TGetBinlogResult();
28702870
result.setStatus(new TStatus(TStatusCode.OK));
28712871
long prevCommitSeq = request.getPrevCommitSeq();
2872-
Pair<TStatus, TBinlog> statusBinlogPair = env.getBinlogManager().getBinlog(dbId, tableId, prevCommitSeq);
2872+
long numAcquired = request.getNumAcquired();
2873+
Pair<TStatus, List<TBinlog>> statusBinlogPair = env.getBinlogManager()
2874+
.getBinlog(dbId, tableId, prevCommitSeq, numAcquired);
28732875
TStatus status = statusBinlogPair.first;
28742876
if (status != null && status.getStatusCode() != TStatusCode.OK) {
28752877
result.setStatus(status);
@@ -2878,10 +2880,8 @@ private TGetBinlogResult getBinlogImpl(TGetBinlogRequest request, String clientI
28782880
return result;
28792881
}
28802882
}
2881-
TBinlog binlog = statusBinlogPair.second;
2882-
if (binlog != null) {
2883-
List<TBinlog> binlogs = Lists.newArrayList();
2884-
binlogs.add(binlog);
2883+
List<TBinlog> binlogs = statusBinlogPair.second;
2884+
if (binlogs != null) {
28852885
result.setBinlogs(binlogs);
28862886
}
28872887
return result;

gensrc/thrift/FrontendService.thrift

+1
Original file line numberDiff line numberDiff line change
@@ -1172,6 +1172,7 @@ struct TGetBinlogRequest {
11721172
7: optional string user_ip
11731173
8: optional string token
11741174
9: optional i64 prev_commit_seq
1175+
10: optional i64 num_acquired // the max num of binlogs in a batch
11751176
}
11761177

11771178
enum TBinlogType {

0 commit comments

Comments
 (0)