Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](cloud-mow)Fix not release delete bitmap lock when checking txn state is visible #47580

Merged
merged 2 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
tableList.stream().map(Table::getId).collect(Collectors.toList()));
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(dbId, transactionId)) {
zhannngchen marked this conversation as resolved.
Show resolved Hide resolved
return;
}
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
if (lockContext.getBackendToPartitionTablets().isEmpty()) {
Expand All @@ -375,6 +378,40 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
}
}

private boolean checkTransactionStateBeforeCommit(long dbId, long transactionId)
throws TransactionCommitFailedException {
// if this txn has been calculated by previously task but commit rpc is timeout,
// be will send another commit request to fe, so if txn is committed or visible,
// no need to calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (transactionState == null) {
throw new TransactionCommitFailedException("txn does not exist: " + transactionId);
}
if (null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] is already aborted. abort reason: " + transactionState.getReason());
} else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ",
transactionId,
transactionState.getTransactionStatus().toString());
return false;
} else if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
return true;
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is: " + transactionState.getTransactionStatus().toString());
}
} else {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] status is null ");
}
}

/**
* Post process of commitTxn
* 1. update some stats
Expand Down Expand Up @@ -500,23 +537,6 @@ private void commitTransactionWithoutLock(long dbId, List<Table> tableList, long
}

if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
// before sending delete bitmap task to be, if txn is committed or visible, no need to
// calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (null != transactionState && null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId,
transactionState.getTransactionStatus().toString());
return;
} else {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
}
}
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
Config.calculate_delete_bitmap_task_timeout_seconds);
}
Expand Down Expand Up @@ -1122,6 +1142,9 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
try {
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) {
return true;
}
DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext();
getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext);
if (lockContext.getBackendToPartitionTablets().isEmpty()) {
Expand Down Expand Up @@ -1246,6 +1269,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
throws UserException {
List<OlapTable> mowTableList = getMowTableList(tableList, null);
if (!mowTableList.isEmpty()) {
if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) {
return;
}
}
commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// under the License.

suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
if (!isCloudMode()) {
return
}
GetDebugPoint().clearDebugPointsForAllFEs()

def backendId_to_backendIP = [:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {
}
}
qt_sql """ select * from ${tableName} order by id"""
def now = System.currentTimeMillis()
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())

}
}
def time_diff = System.currentTimeMillis() - now
assertTrue(time_diff < 10000)
} finally {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
sql "DROP TABLE IF EXISTS ${tableName};"
Expand Down
Loading