Skip to content

Commit

Permalink
[feat](binlog) Speed binlog gc by locked binlogs (#47547)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Feb 8, 2025
1 parent f6a1d62 commit 6974c0a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2547,7 +2547,7 @@ public class Config extends ConfigBase {
public static int max_binlog_messsage_size = 1024 * 1024 * 1024;

@ConfField(mutable = true, masterOnly = true, description = {
"是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
"是否禁止使用 WITH RESOURCE 语句创建 Catalog。",
"Whether to disable creating catalog with WITH RESOURCE statement."})
public static boolean disallow_create_catalog_with_resource = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ private void addBinlog(TBinlog binlog, Object raw) {
return;
}

LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, type {}, data {}",
binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(), binlog.getType(),
binlog.getData());

DBBinlog dbBinlog;
lock.writeLock().lock();
try {
Expand Down Expand Up @@ -596,7 +600,6 @@ public List<BinlogTombstone> gc() {
return tombstones;
}


public void replayGc(BinlogGcInfo binlogGcInfo) {
lock.writeLock().lock();
Map<Long, DBBinlog> gcDbBinlogMap;
Expand Down
45 changes: 39 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -288,7 +289,6 @@ public Pair<TStatus, BinlogLagInfo> getBinlogLag(long tableId, long prevCommitSe

public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) {
TableBinlog tableBinlog = null;

lock.writeLock().lock();
try {
if (tableId < 0) {
Expand Down Expand Up @@ -457,20 +457,43 @@ private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
}

if (lastExpiredBinlog != null) {
dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq());
final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
dummy.setCommitSeq(expiredCommitSeq);

// release expired timestamps by commit seq.
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) {
while (timeIter.hasNext() && timeIter.next().first <= expiredCommitSeq) {
timeIter.remove();
}

gcDroppedResources(lastExpiredBinlog.getCommitSeq());
lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq);
gcDroppedResources(expiredCommitSeq);
}

return lastExpiredBinlog;
}

private Optional<Long> getMinLockedCommitSeq() {
lock.readLock().lock();
try {
Optional<Long> minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo);
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
Optional<Long> tableMinLockedCommitSeq = tableBinlog.getMinLockedCommitSeq();
if (!tableMinLockedCommitSeq.isPresent()) {
continue;
}
if (minLockedCommitSeq.isPresent()) {
minLockedCommitSeq = Optional.of(Math.min(minLockedCommitSeq.get(), tableMinLockedCommitSeq.get()));
} else {
minLockedCommitSeq = tableMinLockedCommitSeq;
}
}
return minLockedCommitSeq;
} finally {
lock.readLock().unlock();
}
}

private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long maxBytes = dbBinlogConfig.getMaxBytes();
Expand All @@ -481,10 +504,12 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);

// step 1: get current tableBinlog info and expiredCommitSeq
Optional<Long> minLockedCommitSeq = getMinLockedCommitSeq();
TBinlog lastExpiredBinlog = null;
List<TableBinlog> tableBinlogs = Lists.newArrayList();
lock.writeLock().lock();
try {
long expiredCommitSeq = -1;
long expiredCommitSeq = -1L;
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
Pair<Long, Long> pair = timeIter.next();
Expand All @@ -494,6 +519,13 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
expiredCommitSeq = pair.first;
}

// Speed up gc by recycling binlogs that are not locked by syncer.
// To keep compatible with the old version, if no binlog is locked here, fallthrough to the
// previous behavior (keep the entire binlogs until it is expired).
if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) {
expiredCommitSeq = minLockedCommitSeq.get() - 1L;
}

final long lastExpiredCommitSeq = expiredCommitSeq;
BinlogComparator checker = (binlog) -> {
// NOTE: TreeSet read size during iterator remove is valid.
Expand All @@ -507,6 +539,7 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
|| maxHistoryNums < allBinlogs.size();
};
lastExpiredBinlog = getLastExpiredBinlog(checker);
tableBinlogs.addAll(tableBinlogMap.values());
} finally {
lock.writeLock().unlock();
}
Expand All @@ -518,7 +551,7 @@ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
// step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db
// tombstone
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
for (TableBinlog tableBinlog : tableBinlogs) {
// step 2.1: gc tableBinlog,and get table tombstone
BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
if (tableTombstone != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -171,6 +172,15 @@ private Pair<TStatus, Long> lockTableBinlog(String jobUniqueId, long lockCommitS
return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
}

public Optional<Long> getMinLockedCommitSeq() {
lock.readLock().lock();
try {
return lockedBinlogs.values().stream().min(Long::compareTo);
} finally {
lock.readLock().unlock();
}
}

private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
Expand Down Expand Up @@ -199,14 +209,15 @@ private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator ch
return null;
}

long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
dummyBinlog.setCommitSeq(expiredCommitSeq);

Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
while (timeIterator.hasNext() && timeIterator.next().first <= expiredCommitSeq) {
timeIterator.remove();
}

lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq);
return Pair.of(tombstoneUpsert, expiredCommitSeq);
}

Expand Down Expand Up @@ -279,6 +290,13 @@ public BinlogTombstone gc() {
}
expiredCommitSeq = entry.first;
}

// find the min locked binlog commit seq, if not exists, use the last binlog commit seq.
Optional<Long> minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo);
if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) {
// Speed up the gc progress by the min locked commit seq.
expiredCommitSeq = minLockedCommitSeq.get() - 1L;
}
}

final long lastExpiredCommitSeq = expiredCommitSeq;
Expand Down

0 comments on commit 6974c0a

Please sign in to comment.