From 832aae2c33904caf51b28ad9a9e96dc6cce61976 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 23 Jun 2025 15:15:49 -0400 Subject: [PATCH] Prevent observer from returning partial block data for erasure coded files --- .../hdfs/server/namenode/FSNamesystem.java | 26 ++- .../server/namenode/ha/TestObserverNode.java | 187 +++++++++++++----- 2 files changed, 162 insertions(+), 51 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f359d86df7b2a..776860b840721 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2227,11 +2227,20 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg, dir, pc, srcArg, offset, length, true); inode = res.getIIp().getLastINode(); if (isInSafeMode()) { + int minBlocks = 1; + + ErasureCodingPolicy ecPolicy = res.blocks.getErasureCodingPolicy(); for (LocatedBlock b : res.blocks.getLocatedBlocks()) { + if (ecPolicy != null) { + // If the file is erasure coded, we need at least the number of data units of + // blocks available, unless the file is smaller than a full stripe of cells. + long numCells = (b.getBlockSize() - 1) / (long)ecPolicy.getCellSize() + 1; + minBlocks = (int)Math.min((long)ecPolicy.getNumDataUnits(), numCells); + } // if safemode & no block locations yet then throw safemodeException - if ((b.getLocations() == null) || (b.getLocations().length == 0)) { + if ((b.getLocations() == null) || (b.getLocations().length < minBlocks)) { SafeModeException se = newSafemodeException( - "Zero blocklocations for " + srcArg); + "Not enough blocklocations for " + srcArg); if (haEnabled && haContext != null && (haContext.getState().getServiceState() == ACTIVE || haContext.getState().getServiceState() == OBSERVER)) { @@ -9207,9 +9216,18 @@ private void checkBlockLocationsWhenObserver(LocatedBlocks blocks, String src) } List locatedBlockList = blocks.getLocatedBlocks(); if (locatedBlockList != null) { + int minBlocks = 1; + + ErasureCodingPolicy ecPolicy = blocks.getErasureCodingPolicy(); for (LocatedBlock b : locatedBlockList) { - if (b.getLocations() == null || b.getLocations().length == 0) { - throw new ObserverRetryOnActiveException("Zero blocklocations for " + src); + if (ecPolicy != null) { + // If the file is erasure coded, we need at least the number of data units of + // blocks available, unless the file is smaller than a full stripe of cells. + long numCells = (b.getBlockSize() - 1) / (long)ecPolicy.getCellSize() + 1; + minBlocks = (int)Math.min((long)ecPolicy.getNumDataUnits(), numCells); + } + if (b.getLocations() == null || b.getLocations().length < minBlocks) { + throw new ObserverRetryOnActiveException("Not enough blocklocations for " + src); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 0111f273e71de..fbe1935e225fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -21,10 +21,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -69,17 +70,18 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +100,7 @@ public class TestObserverNode { private final Path testPath= new Path("/TestObserverNode"); - @BeforeClass + @BeforeAll public static void startUpCluster() throws Exception { conf = new Configuration(); conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); @@ -110,23 +112,23 @@ public static void startUpCluster() throws Exception { dfsCluster = qjmhaCluster.getDfsCluster(); } - @Before + @BeforeEach public void setUp() throws Exception { setObserverRead(true); } - @After + @AfterEach public void cleanUp() throws IOException { dfs.delete(testPath, true); - assertEquals("NN[0] should be active", HAServiceState.ACTIVE, - getServiceState(dfsCluster.getNameNode(0))); - assertEquals("NN[1] should be standby", HAServiceState.STANDBY, - getServiceState(dfsCluster.getNameNode(1))); - assertEquals("NN[2] should be observer", HAServiceState.OBSERVER, - getServiceState(dfsCluster.getNameNode(2))); + assertEquals(HAServiceState.ACTIVE, getServiceState(dfsCluster.getNameNode(0)), + "NN[0] should be active"); + assertEquals(HAServiceState.STANDBY, getServiceState(dfsCluster.getNameNode(1)), + "NN[1] should be standby"); + assertEquals(HAServiceState.OBSERVER, getServiceState(dfsCluster.getNameNode(2)), + "NN[2] should be observer"); } - @AfterClass + @AfterAll public static void shutDownCluster() throws IOException { if (qjmhaCluster != null) { qjmhaCluster.shutdown(); @@ -228,8 +230,8 @@ public void testConfigStartup() throws Exception { } // Confirm that the namenode at nnIdx is standby - assertTrue("The NameNode is observer despite being transitioned to standby", - dfsCluster.getNameNode(nnIdx).isStandbyState()); + assertTrue(dfsCluster.getNameNode(nnIdx).isStandbyState(), + "The NameNode is observer despite being transitioned to standby"); // Restart the NameNode with observer startup option as false dfsCluster.getConfiguration(nnIdx) @@ -238,9 +240,9 @@ public void testConfigStartup() throws Exception { // Verify that the NameNode is not in Observer state dfsCluster.waitNameNodeUp(nnIdx); - assertTrue("The NameNode started as Observer despite " - + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being false", - dfsCluster.getNameNode(nnIdx).isStandbyState()); + assertTrue(dfsCluster.getNameNode(nnIdx).isStandbyState(), + "The NameNode started as Observer despite " + + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being false"); dfs.mkdir(testPath, FsPermission.getDefault()); assertSentTo(0); @@ -260,9 +262,9 @@ public void testConfigStartup() throws Exception { // Check that the NameNode is in Observer state dfsCluster.waitNameNodeUp(nnIdx); - assertTrue("The NameNode did not start as Observer despite " - + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being true", - dfsCluster.getNameNode(nnIdx).isObserverState()); + assertTrue(dfsCluster.getNameNode(nnIdx).isObserverState(), + "The NameNode did not start as Observer despite " + + DFS_NAMENODE_OBSERVER_ENABLED_KEY + " being true"); dfs.mkdir(testPath2, FsPermission.getDefault()); assertSentTo(0); @@ -437,6 +439,43 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception { dfs.open(testPath).close(); assertSentTo(0); + // Test erasure coded files + ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024); + + // Fake a small file that only needs 1 block + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0)); + DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build(); + LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo}); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + // Small file should suceed with just the one block + dfs.open(testPath).close(); + assertSentTo(2); + + // Fake a larger file that needs all 3 data shards + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0)); + DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build(); + LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo}); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + // Large file should failover to the active + dfs.open(testPath).close(); + assertSentTo(0); + Mockito.reset(bmSpy); // Remove safe mode on observer, request should still go to it. @@ -471,7 +510,62 @@ public void testObserverNodeBlockMissingRetry() throws Exception { anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), Mockito.any(), Mockito.any()); - dfs.open(testPath); + dfs.open(testPath).close(); + assertSentTo(0); + + dfs.getClient().listPaths("/", new byte[0], true); + assertSentTo(0); + + dfs.getClient().getLocatedFileInfo(testPath.toString(), false); + assertSentTo(0); + + dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true); + assertSentTo(0); + + // Test erasure coded files + ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024); + + // Fake a small file that only needs 1 block + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0)); + DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build(); + LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo}); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + // The small file should succeed on the observer, while the large file should not + + dfs.open(testPath).close(); + assertSentTo(2); + + dfs.getClient().listPaths("/", new byte[0], true); + assertSentTo(2); + + dfs.getClient().getLocatedFileInfo(testPath.toString(), false); + assertSentTo(2); + + dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true); + assertSentTo(2); + + // Fake a larger file that needs all 3 data shards + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0)); + DatanodeInfo datanodeInfo = new DatanodeInfo.DatanodeInfoBuilder().build(); + LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[] {datanodeInfo}); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + dfs.open(testPath).close(); assertSentTo(0); dfs.getClient().listPaths("/", new byte[0], true); @@ -501,7 +595,7 @@ public void testFsckWithObserver() throws Exception { /** * Test that, if a write happens happens to go to Observer, * Observer would throw {@link ObserverRetryOnActiveException}, - * to inform client to retry on Active + * to inform client to retry on Active. * * @throws Exception */ @@ -563,16 +657,15 @@ public void testStickyActive() throws Exception { dfsCluster.rollEditLogAndTail(0); // No Observers present, should still go to Active dfsCluster.transitionToStandby(2); - assertEquals("NN[2] should be standby", HAServiceState.STANDBY, - getServiceState(dfsCluster.getNameNode(2))); + assertEquals(HAServiceState.STANDBY, getServiceState(dfsCluster.getNameNode(2)), + "NN[2] should be standby"); newFs.open(testFile).close(); assertSentTo(0); // Restore Observer int newObserver = 1; dfsCluster.transitionToObserver(newObserver); - assertEquals("NN[" + newObserver + "] should be observer", - HAServiceState.OBSERVER, - getServiceState(dfsCluster.getNameNode(newObserver))); + assertEquals(HAServiceState.OBSERVER, getServiceState(dfsCluster.getNameNode(newObserver)), + "NN[" + newObserver + "] should be observer"); long startTime = Time.monotonicNow(); try { while(Time.monotonicNow() - startTime <= 5000) { @@ -661,19 +754,19 @@ public void testMkdirsRaceWithObserverRead() throws Exception { LOG.warn("MkDirRunner thread failed", e.getCause()); } } - assertTrue("Not all threads finished", finished); + assertTrue(finished, "Not all threads finished"); threadPool.shutdown(); - assertEquals("Active and Observer stateIds don't match", - dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(), - dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId()); + assertEquals(dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(), + dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId(), + "Active and Observer stateIds don't match"); for (int i = 0; i < numThreads; i++) { - assertTrue("Client #" + i + assertTrue(clientStates[i].lastSeenStateId >= activStateId && + clientStates[i].fnfe == null, + "Client #" + i + " lastSeenStateId=" + clientStates[i].lastSeenStateId + " activStateId=" + activStateId - + "\n" + clientStates[i].fnfe, - clientStates[i].lastSeenStateId >= activStateId && - clientStates[i].fnfe == null); + + "\n" + clientStates[i].fnfe); } // Restore edit log @@ -707,7 +800,7 @@ public void run() { FileStatus stat = fs.getFileStatus(DIR_PATH); assertSentTo(fs, 2); - assertTrue("Should be a directory", stat.isDirectory()); + assertTrue(stat.isDirectory(), "Should be a directory"); } catch (FileNotFoundException ioe) { clientState.fnfe = ioe; } catch (Exception e) { @@ -752,13 +845,13 @@ public void testSimpleReadEmptyDirOrFile() throws IOException { private static void assertSentTo(DistributedFileSystem fs, int nnIdx) throws IOException { - assertTrue("Request was not sent to the expected namenode " + nnIdx, - HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx)); + assertTrue(HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx), + "Request was not sent to the expected namenode " + nnIdx); } private void assertSentTo(int nnIdx) throws IOException { - assertTrue("Request was not sent to the expected namenode " + nnIdx, - HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); + assertTrue(HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx), + "Request was not sent to the expected namenode " + nnIdx); } private static void setObserverRead(boolean flag) throws Exception {