diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index d6ecc1c03de5..b7f2f8f70eac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -29,8 +29,6 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -103,7 +101,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info) public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info) throws IOException { Path rootDir = CommonFSUtils.getRootDir(conf); - archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()), + archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()), FSUtils.getRegionDirFromRootDir(rootDir, info)); } @@ -119,8 +117,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i * operations could not complete. * @throws IOException if the request cannot be completed */ - public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir) - throws IOException { + public static boolean archiveRegion(Configuration conf, FileSystem fs, Path rootdir, + Path tableDir, Path regionDir) throws IOException { // otherwise, we archive the files // make sure we can archive if (tableDir == null || regionDir == null) { @@ -163,8 +161,8 @@ public boolean accept(Path file) { // convert the files in the region to a File Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add); LOG.debug("Archiving " + toArchive); - List failedArchive = - resolveAndArchive(fs, regionArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); + List failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive, + EnvironmentEdgeManager.currentTime()); if (!failedArchive.isEmpty()) { throw new FailedArchiveException( "Failed to archive/delete all the files for region:" + regionDir.getName() + " into " @@ -192,7 +190,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi List> futures = new ArrayList<>(regionDirList.size()); for (Path regionDir : regionDirList) { Future future = getArchiveExecutor(conf).submit(() -> { - archiveRegion(fs, rootDir, tableDir, regionDir); + archiveRegion(conf, fs, rootDir, tableDir, regionDir); return null; }); futures.add(future); @@ -211,8 +209,8 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi private static synchronized ThreadPoolExecutor getArchiveExecutor(final Configuration conf) { if (archiveExecutor == null) { int maxThreads = conf.getInt("hbase.hfilearchiver.thread.pool.max", 8); - archiveExecutor = - Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, getThreadFactory()); + archiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + getThreadFactory("HFileArchiver")); // Shutdown this ThreadPool in a shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> archiveExecutor.shutdown())); @@ -224,13 +222,13 @@ private static synchronized ThreadPoolExecutor getArchiveExecutor(final Configur // The difference from Threads.getNamedThreadFactory() is that it doesn't fix ThreadGroup for // new threads. If we use Threads.getNamedThreadFactory(), we will face ThreadGroup related // issues in some tests. - private static ThreadFactory getThreadFactory() { + private static ThreadFactory getThreadFactory(String archiverName) { return new ThreadFactory() { final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { - final String name = "HFileArchiver-" + threadNumber.getAndIncrement(); + final String name = archiverName + "-" + threadNumber.getAndIncrement(); Thread t = new Thread(r, name); t.setDaemon(true); return t; @@ -279,7 +277,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R // do the actual archive List failedArchive = - resolveAndArchive(fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); + resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); if (!failedArchive.isEmpty()) { throw new FailedArchiveException( "Failed to archive/delete all the files for region:" @@ -302,7 +300,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo, Path tableDir, byte[] family, Collection compactedFiles) throws IOException { Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family); - archive(fs, regionInfo, family, compactedFiles, storeArchiveDir); + archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir); } /** @@ -333,11 +331,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi "Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme()); } path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family); - archive(fs, regionInfo, family, replayedEdits, path); + archive(conf, fs, regionInfo, family, replayedEdits, path); } - private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family, - Collection compactedFiles, Path storeArchiveDir) throws IOException { + private static void archive(Configuration conf, FileSystem fs, RegionInfo regionInfo, + byte[] family, Collection compactedFiles, Path storeArchiveDir) throws IOException { // sometimes in testing, we don't have rss, so we need to check for that if (fs == null) { LOG.warn( @@ -371,8 +369,8 @@ private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family, compactedFiles.stream().map(getStorePath).collect(Collectors.toList()); // do the actual archive - List failedArchive = - resolveAndArchive(fs, storeArchiveDir, storeFiles, EnvironmentEdgeManager.currentTime()); + List failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles, + EnvironmentEdgeManager.currentTime()); if (!failedArchive.isEmpty()) { throw new FailedArchiveException( @@ -425,8 +423,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf * @return the list of failed to archive files. * @throws IOException if an unexpected file operation exception occurred */ - private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, - Collection toArchive, long start) throws IOException { + private static List resolveAndArchive(Configuration conf, FileSystem fs, + Path baseArchiveDir, Collection toArchive, long start) throws IOException { // Early exit if no files to archive if (toArchive.isEmpty()) { LOG.trace("No files to archive, returning an empty list."); @@ -448,12 +446,12 @@ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, if (file.isFile()) { filesOnly.add(file); } else { - handleDirectory(fs, baseArchiveDir, failures, file, start); + handleDirectory(conf, fs, baseArchiveDir, failures, file, start); } } // Archive files concurrently - archiveFilesConcurrently(baseArchiveDir, filesOnly, failures, startTime); + archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, startTime); return new ArrayList<>(failures); // Convert to a List for the return value } @@ -466,32 +464,33 @@ private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchive LOG.trace("Archive directory ready: {}", baseArchiveDir); } - private static void handleDirectory(FileSystem fs, Path baseArchiveDir, Queue failures, - File directory, long start) { + private static void handleDirectory(Configuration conf, FileSystem fs, Path baseArchiveDir, + Queue failures, File directory, long start) { LOG.trace("Processing directory: {}, archiving its children.", directory); Path subArchiveDir = new Path(baseArchiveDir, directory.getName()); try { Collection children = directory.getChildren(); - failures.addAll(resolveAndArchive(fs, subArchiveDir, children, start)); + failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children, start)); } catch (IOException e) { LOG.warn("Failed to archive directory: {}", directory, e); failures.add(directory); } } - private static void archiveFilesConcurrently(Path baseArchiveDir, List files, - Queue failures, String startTime) { + private static void archiveFilesConcurrently(Configuration conf, Path baseArchiveDir, + List files, Queue failures, String startTime) { LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir); - - ExecutorService executorService = Executors.newCachedThreadPool(); + Map> futureMap = new HashMap<>(); + // Submit file archiving tasks + // default is 16 which comes equal hbase.hstore.blockingStoreFiles default value + int maxThreads = conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16); + ThreadPoolExecutor hfilesArchiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, + TimeUnit.SECONDS, getThreadFactory("HFileArchiverPerRegion-")); try { - Map> futureMap = new HashMap<>(); - - // Submit file archiving tasks for (File file : files) { - Future future = - executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); + Future future = hfilesArchiveExecutor + .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); futureMap.put(file, future); } @@ -513,7 +512,7 @@ private static void archiveFilesConcurrently(Path baseArchiveDir, List fil } } } finally { - executorService.shutdown(); + hfilesArchiveExecutor.shutdown(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index 8c2f1067c952..fc259433784c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -289,6 +290,7 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName final List regions, final boolean archive) throws IOException { final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); final FileSystem fs = mfs.getFileSystem(); + final Configuration conf = env.getMasterConfiguration(); final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), tableName); @@ -307,8 +309,7 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName } } } - HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs, mfs.getRootDir(), tableDir, - regionDirList); + HFileArchiver.archiveRegions(conf, fs, mfs.getRootDir(), tableDir, regionDirList); if (!regionDirList.isEmpty()) { LOG.debug("Archived {} regions", tableName); } @@ -319,7 +320,7 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName CommonFSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), tableName); Path regionDir = new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName()); if (fs.exists(regionDir)) { - HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir); + HFileArchiver.archiveRegion(conf, fs, mfs.getRootDir(), mobTableDir, regionDir); } // Delete table directory from FS diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index e4ccedda0c24..827051b9b3e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -980,7 +980,7 @@ public static void deleteRegionFromFileSystem(final Configuration conf, final Fi // Archive region Path rootDir = CommonFSUtils.getRootDir(conf); - HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir); + HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir); // Delete empty region dir if (!fs.delete(regionDir, true)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index e087b8e723e2..3ff3255995b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -659,7 +659,8 @@ public void testCleaningRace() throws Exception { try { // Try to archive the file - HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir); + HFileArchiver.archiveRegion(conf, fs, rootDir, sourceRegionDir.getParent(), + sourceRegionDir); // The archiver succeded, the file is no longer in the original location // but it's in the archive location. @@ -690,12 +691,14 @@ public void testCleaningRace() throws Exception { public void testArchiveRegionTableAndRegionDirsNull() throws IOException { Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); FileSystem fileSystem = UTIL.getTestFileSystem(); + Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); // Try to archive the file but with null regionDir, can't delete sourceFile - assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null)); + assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, null)); } @Test public void testArchiveRegionWithTableDirNull() throws IOException { + Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path regionDir = new Path( CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())), "xyzabc"); Path familyDir = new Path(regionDir, "rd"); @@ -707,12 +710,13 @@ public void testArchiveRegionWithTableDirNull() throws IOException { Path sourceRegionDir = new Path(rootDir, regionDir); fileSystem.mkdirs(sourceRegionDir); // Try to archive the file - assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, sourceRegionDir)); + assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, sourceRegionDir)); assertFalse(fileSystem.exists(sourceRegionDir)); } @Test public void testArchiveRegionWithRegionDirNull() throws IOException { + Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path regionDir = new Path(CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())), "elgn4nf"); @@ -726,7 +730,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException { fileSystem.mkdirs(sourceRegionDir); // Try to archive the file but with null regionDir, can't delete sourceFile assertFalse( - HFileArchiver.archiveRegion(fileSystem, rootDir, sourceRegionDir.getParent(), null)); + HFileArchiver.archiveRegion(conf, fileSystem, rootDir, sourceRegionDir.getParent(), null)); assertTrue(fileSystem.exists(sourceRegionDir)); fileSystem.delete(sourceRegionDir, true); }