From 27c138975d1012f8bb5e9c0e3216e2727a33db47 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Fri, 7 Feb 2025 13:03:33 +0800 Subject: [PATCH 1/2] [fix](cloud-mow)Fix no release delete bitmap lock when checking txn state is visible --- .../CloudGlobalTransactionMgr.java | 54 +++++++++++++------ ...ud_mow_stream_load_with_commit_fail.groovy | 3 ++ ..._cloud_mow_stream_load_with_timeout.groovy | 20 +++++++ 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index f83deb3c54a262..abc3bf7f985ca7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -355,6 +355,9 @@ public void commitTransaction(long dbId, List tableList, long transaction tableList.stream().map(Table::getId).collect(Collectors.toList())); Map> backendToPartitionInfos = null; if (!mowTableList.isEmpty()) { + if (!checkTransactionStateBeforeCommit(dbId, transactionId)) { + return; + } DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext(); getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext); if (lockContext.getBackendToPartitionTablets().isEmpty()) { @@ -375,6 +378,40 @@ public void commitTransaction(long dbId, List
tableList, long transaction } } + private boolean checkTransactionStateBeforeCommit(long dbId, long transactionId) + throws TransactionCommitFailedException { + // if this txn has been calculated by previously task but commit rpc is timeout, + // be will send another commit request to fe, so if txn is committed or visible, + // no need to calculate delete bitmap again, just return ok to be to finish this commit. + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, transactionId); + if (transactionState == null) { + throw new TransactionCommitFailedException("txn does not exist: " + transactionId); + } + if (null != transactionState.getTransactionStatus()) { + if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] is already aborted. abort reason: " + transactionState.getReason()); + } else if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED + || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", + transactionId, + transactionState.getTransactionStatus().toString()); + return false; + } else if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) { + LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId, + transactionState.getTransactionStatus().toString()); + return true; + } else { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] status is: " + transactionState.getTransactionStatus().toString()); + } + } else { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] status is null "); + } + } + /** * Post process of commitTxn * 1. update some stats @@ -500,23 +537,6 @@ private void commitTransactionWithoutLock(long dbId, List
tableList, long } if (!mowTableList.isEmpty()) { - // may be this txn has been calculated by previously task but commit rpc is timeout, - // and be will send another commit request to fe, so need to check txn status first - // before sending delete bitmap task to be, if txn is committed or visible, no need to - // calculate delete bitmap again, just return ok to be to finish this commit. - TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(dbId, transactionId); - if (null != transactionState && null != transactionState.getTransactionStatus()) { - if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED - || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId, - transactionState.getTransactionStatus().toString()); - return; - } else { - LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId, - transactionState.getTransactionStatus().toString()); - } - } sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, Config.calculate_delete_bitmap_task_timeout_seconds); } diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy index fa71c3644f2027..0ab20324d7280f 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy @@ -16,6 +16,9 @@ // under the License. suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") { + if (!isCloudMode()) { + return + } GetDebugPoint().clearDebugPointsForAllFEs() def backendId_to_backendIP = [:] diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy index 7176aec702f411..8ffc04dd735b95 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy @@ -180,6 +180,26 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") { } } qt_sql """ select * from ${tableName} order by id""" + def now = System.currentTimeMillis() + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout") + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + + } + } + def time_diff = System.currentTimeMillis() - now + assertTrue(time_diff < 10000) } finally { GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout") sql "DROP TABLE IF EXISTS ${tableName};" From 37cffc517635987077a650c58d2425a259be672c Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Fri, 7 Feb 2025 14:49:41 +0800 Subject: [PATCH 2/2] edit --- .../cloud/transaction/CloudGlobalTransactionMgr.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index abc3bf7f985ca7..f7f97594ff5289 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1142,6 +1142,9 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, try { Map> backendToPartitionInfos = null; if (!mowTableList.isEmpty()) { + if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) { + return true; + } DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext(); getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext); if (lockContext.getBackendToPartitionTablets().isEmpty()) { @@ -1266,6 +1269,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, public void commitTransaction2PC(Database db, List
tableList, long transactionId, long timeoutMillis) throws UserException { List mowTableList = getMowTableList(tableList, null); + if (!mowTableList.isEmpty()) { + if (!checkTransactionStateBeforeCommit(db.getId(), transactionId)) { + return; + } + } commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null); }