From 68d51350e0d7dbe4c42a12c2e5dfcdf2b118ccc3 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Tue, 9 Sep 2025 15:43:16 +0530 Subject: [PATCH 1/2] HIVE-29186: Compactor workers should be revoked only if running in metastore in case of HMS instance restart --- .../org/apache/hadoop/hive/ql/txn/compactor/Initiator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 252defbf7448..329fb095eda7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -236,7 +236,10 @@ private TableOptimizer instantiateTableOptimizer(String className) { } private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { - if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname()); + if (!remoteOnly && + MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore")) { + txnHandler.revokeFromLocalWorkers(ServerUtils.hostname()); + } txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS)); } From e07553543dd0c552a3b0866c63ff9fce2fab23d8 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Wed, 10 Sep 2025 12:20:40 +0530 Subject: [PATCH 2/2] Update TestInitiator.recoverFailedLocalWorkers test as per the new logic --- .../hive/ql/txn/compactor/TestInitiator.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 636550aeaca8..0edd0ea0d6ba 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -39,7 +39,9 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -82,25 +84,38 @@ public void recoverFailedLocalWorkers() throws Exception { rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR); txnHandler.compact(rqst); - txnHandler.findNextToCompact(aFindNextCompactRequest(ServerUtils.hostname() + "-193892", WORKER_VERSION)); - txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION)); + CompactionInfo ci1 = txnHandler.findNextToCompact( + aFindNextCompactRequest(ServerUtils.hostname() + "-193892", WORKER_VERSION)); + CompactionInfo ci2 = txnHandler.findNextToCompact( + aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION)); startInitiator(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(2, compacts.size()); - boolean sawInitiated = false; + int numWorkingCompactions = 0; for (ShowCompactResponseElement c : compacts) { if (c.getState().equals("working")) { - Assert.assertEquals("nosuchhost-193892", c.getWorkerid()); - } else if (c.getState().equals("initiated")) { - sawInitiated = true; + numWorkingCompactions++; } else { Assert.fail("Unexpected state"); } } - Assert.assertTrue(sawInitiated); + Assert.assertEquals(2, numWorkingCompactions); + + // Simulating nosuchhost-193892 worker completes compaction + txnHandler.markCompacted(ci2); + txnHandler.markCleaned(ci2); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(1).getState())); + Assert.assertTrue(TxnHandler.WORKING_RESPONSE.equals(rsp.getCompacts().get(0).getState())); + + txnHandler.revokeTimedoutWorkers(0); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertTrue(TxnHandler.INITIATED_RESPONSE.equals(rsp.getCompacts().get(0).getState())); } @Test