Skip to content

Commit

Permalink
[fix](cloud-mow)Fix not release delete bitmap lock when checking txn …
Browse files Browse the repository at this point in the history
…state is visible #47580 (#47652)

### What problem does this PR solve?
pick #47580
  • Loading branch information
hust-hhb authored Feb 8, 2025
1 parent a733a56 commit 3759b58
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
tableList.stream().map(Table::getId).collect(Collectors.toList()));
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(dbId, transactionId)) {
return;
}
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
if (lockContext.getBackendToPartitionTablets().isEmpty()) {
Expand All @@ -374,6 +377,40 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
}
}

private boolean checkTransactionStateBeforeCommit(long dbId, long transactionId)
throws TransactionCommitFailedException {
// if this txn has been calculated by previously task but commit rpc is timeout,
// be will send another commit request to fe, so if txn is committed or visible,
// no need to calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (transactionState == null) {
throw new TransactionCommitFailedException("txn does not exist: " + transactionId);
}
if (null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] is already aborted. abort reason: " + transactionState.getReason());
} else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ",
transactionId,
transactionState.getTransactionStatus().toString());
return false;
} else if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
return true;
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is: " + transactionState.getTransactionStatus().toString());
}
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is null ");
}
}

/**
* Post process of commitTxn
* 1. update some stats
Expand Down Expand Up @@ -499,23 +536,6 @@ private void commitTransactionWithoutLock(long dbId, List<Table> tableList, long
}

if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
// before sending delete bitmap task to be, if txn is committed or visible, no need to
// calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (null != transactionState && null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId,
transactionState.getTransactionStatus().toString());
return;
} else {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
}
}
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos);
}

Expand Down Expand Up @@ -1162,6 +1182,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
throws UserException {
List<OlapTable> mowTableList = getMowTableList(tableList, null);
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) {
return;
}
}
commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// under the License.

suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
if (!isCloudMode()) {
return
}
GetDebugPoint().clearDebugPointsForAllFEs()

def backendId_to_backendIP = [:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {
}
}
qt_sql """ select * from ${tableName} order by id"""
def now = System.currentTimeMillis()
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())

}
}
def time_diff = System.currentTimeMillis() - now
assertTrue(time_diff < 10000)
} finally {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
sql "DROP TABLE IF EXISTS ${tableName};"
Expand Down

0 comments on commit 3759b58

Please sign in to comment.