Skip to content

Commit

Permalink
[fix](cloud-mow)Fix not release delete bitmap lock when checking txn …
Browse files Browse the repository at this point in the history
…state is visible (#47580)

### What problem does this PR solve?
Fe should check txn state before calculate delete bitmap, if txn state
is visible, just retrurn ok to be, no need to recalculate delete bitmap
again.This check operation should do before get delete bitmap lock,
otherwise delete bitmap lock will not relase until time out.
Issue Number: close #xxx
  • Loading branch information
hust-hhb authored Feb 8, 2025
1 parent e56480e commit acb6bd2
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
tableList.stream().map(Table::getId).collect(Collectors.toList()));
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(dbId, transactionId)) {
return;
}
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
if (lockContext.getBackendToPartitionTablets().isEmpty()) {
Expand All @@ -375,6 +378,40 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
}
}

private boolean checkTransactionStateBeforeCommit(long dbId, long transactionId)
throws TransactionCommitFailedException {
// if this txn has been calculated by previously task but commit rpc is timeout,
// be will send another commit request to fe, so if txn is committed or visible,
// no need to calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (transactionState == null) {
throw new TransactionCommitFailedException("txn does not exist: " + transactionId);
}
if (null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] is already aborted. abort reason: " + transactionState.getReason());
} else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ",
transactionId,
transactionState.getTransactionStatus().toString());
return false;
} else if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
return true;
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is: " + transactionState.getTransactionStatus().toString());
}
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is null ");
}
}

/**
* Post process of commitTxn
* 1. update some stats
Expand Down Expand Up @@ -500,23 +537,6 @@ private void commitTransactionWithoutLock(long dbId, List<Table> tableList, long
}

if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
// before sending delete bitmap task to be, if txn is committed or visible, no need to
// calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (null != transactionState && null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId,
transactionState.getTransactionStatus().toString());
return;
} else {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
}
}
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
Config.calculate_delete_bitmap_task_timeout_seconds);
}
Expand Down Expand Up @@ -1122,6 +1142,9 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
try {
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) {
return true;
}
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
if (lockContext.getBackendToPartitionTablets().isEmpty()) {
Expand Down Expand Up @@ -1246,6 +1269,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
throws UserException {
List<OlapTable> mowTableList = getMowTableList(tableList, null);
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) {
return;
}
}
commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// under the License.

suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
if (!isCloudMode()) {
return
}
GetDebugPoint().clearDebugPointsForAllFEs()

def backendId_to_backendIP = [:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {
}
}
qt_sql """ select * from ${tableName} order by id"""
def now = System.currentTimeMillis()
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())

}
}
def time_diff = System.currentTimeMillis() - now
assertTrue(time_diff < 10000)
} finally {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
sql "DROP TABLE IF EXISTS ${tableName};"
Expand Down

0 comments on commit acb6bd2

Please sign in to comment.