Skip to content

Commit

Permalink
HBASE-28836 Parallize the file archival to improve the split times
Browse files Browse the repository at this point in the history
Add a config to limit thread per region
  • Loading branch information
mnpoonia committed Jan 24, 2025
1 parent bba911a commit 63f84cc
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -103,7 +101,7 @@ public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)
public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
archiveRegion(fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),
FSUtils.getRegionDirFromRootDir(rootDir, info));
}

Expand All @@ -119,8 +117,8 @@ public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo i
* operations could not complete.
* @throws IOException if the request cannot be completed
*/
public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir)
throws IOException {
public static boolean archiveRegion(Configuration conf, FileSystem fs, Path rootdir,
Path tableDir, Path regionDir) throws IOException {
// otherwise, we archive the files
// make sure we can archive
if (tableDir == null || regionDir == null) {
Expand Down Expand Up @@ -163,8 +161,8 @@ public boolean accept(Path file) {
// convert the files in the region to a File
Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
LOG.debug("Archiving " + toArchive);
List<File> failedArchive =
resolveAndArchive(fs, regionArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:" + regionDir.getName() + " into "
Expand Down Expand Up @@ -192,7 +190,7 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi
List<Future<Void>> futures = new ArrayList<>(regionDirList.size());
for (Path regionDir : regionDirList) {
Future<Void> future = getArchiveExecutor(conf).submit(() -> {
archiveRegion(fs, rootDir, tableDir, regionDir);
archiveRegion(conf, fs, rootDir, tableDir, regionDir);
return null;
});
futures.add(future);
Expand All @@ -211,8 +209,8 @@ public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDi
private static synchronized ThreadPoolExecutor getArchiveExecutor(final Configuration conf) {
if (archiveExecutor == null) {
int maxThreads = conf.getInt("hbase.hfilearchiver.thread.pool.max", 8);
archiveExecutor =
Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, getThreadFactory());
archiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
getThreadFactory("HFileArchiver"));

// Shutdown this ThreadPool in a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> archiveExecutor.shutdown()));
Expand All @@ -224,13 +222,13 @@ private static synchronized ThreadPoolExecutor getArchiveExecutor(final Configur
// The difference from Threads.getNamedThreadFactory() is that it doesn't fix ThreadGroup for
// new threads. If we use Threads.getNamedThreadFactory(), we will face ThreadGroup related
// issues in some tests.
private static ThreadFactory getThreadFactory() {
private static ThreadFactory getThreadFactory(String archiverName) {
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
final String name = archiverName + "-" + threadNumber.getAndIncrement();
Thread t = new Thread(r, name);
t.setDaemon(true);
return t;
Expand Down Expand Up @@ -279,7 +277,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R

// do the actual archive
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
"Failed to archive/delete all the files for region:"
Expand All @@ -302,7 +300,7 @@ public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, R
public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles) throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir);
}

/**
Expand Down Expand Up @@ -333,11 +331,11 @@ public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, Regi
"Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme());
}
path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family);
archive(fs, regionInfo, family, replayedEdits, path);
archive(conf, fs, regionInfo, family, replayedEdits, path);
}

private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
private static void archive(Configuration conf, FileSystem fs, RegionInfo regionInfo,
byte[] family, Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
// sometimes in testing, we don't have rss, so we need to check for that
if (fs == null) {
LOG.warn(
Expand Down Expand Up @@ -371,8 +369,8 @@ private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
compactedFiles.stream().map(getStorePath).collect(Collectors.toList());

// do the actual archive
List<File> failedArchive =
resolveAndArchive(fs, storeArchiveDir, storeFiles, EnvironmentEdgeManager.currentTime());
List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles,
EnvironmentEdgeManager.currentTime());

if (!failedArchive.isEmpty()) {
throw new FailedArchiveException(
Expand Down Expand Up @@ -425,8 +423,8 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
* @return the list of failed to archive files.
* @throws IOException if an unexpected file operation exception occurred
*/
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
private static List<File> resolveAndArchive(Configuration conf, FileSystem fs,
Path baseArchiveDir, Collection<File> toArchive, long start) throws IOException {
// Early exit if no files to archive
if (toArchive.isEmpty()) {
LOG.trace("No files to archive, returning an empty list.");
Expand All @@ -448,12 +446,12 @@ private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
if (file.isFile()) {
filesOnly.add(file);
} else {
handleDirectory(fs, baseArchiveDir, failures, file, start);
handleDirectory(conf, fs, baseArchiveDir, failures, file, start);
}
}

// Archive files concurrently
archiveFilesConcurrently(baseArchiveDir, filesOnly, failures, startTime);
archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, startTime);

return new ArrayList<>(failures); // Convert to a List for the return value
}
Expand All @@ -466,32 +464,33 @@ private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchive
LOG.trace("Archive directory ready: {}", baseArchiveDir);
}

private static void handleDirectory(FileSystem fs, Path baseArchiveDir, Queue<File> failures,
File directory, long start) {
private static void handleDirectory(Configuration conf, FileSystem fs, Path baseArchiveDir,
Queue<File> failures, File directory, long start) {
LOG.trace("Processing directory: {}, archiving its children.", directory);
Path subArchiveDir = new Path(baseArchiveDir, directory.getName());

try {
Collection<File> children = directory.getChildren();
failures.addAll(resolveAndArchive(fs, subArchiveDir, children, start));
failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children, start));
} catch (IOException e) {
LOG.warn("Failed to archive directory: {}", directory, e);
failures.add(directory);
}
}

private static void archiveFilesConcurrently(Path baseArchiveDir, List<File> files,
Queue<File> failures, String startTime) {
private static void archiveFilesConcurrently(Configuration conf, Path baseArchiveDir,
List<File> files, Queue<File> failures, String startTime) {
LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir);

ExecutorService executorService = Executors.newCachedThreadPool();
Map<File, Future<Boolean>> futureMap = new HashMap<>();
// Submit file archiving tasks
// default is 16 which comes equal hbase.hstore.blockingStoreFiles default value
int maxThreads = conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16);
ThreadPoolExecutor hfilesArchiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L,
TimeUnit.SECONDS, getThreadFactory("HFileArchiverPerRegion-"));
try {
Map<File, Future<Boolean>> futureMap = new HashMap<>();

// Submit file archiving tasks
for (File file : files) {
Future<Boolean> future =
executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
Future<Boolean> future = hfilesArchiveExecutor
.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futureMap.put(file, future);
}

Expand All @@ -513,7 +512,7 @@ private static void archiveFilesConcurrently(Path baseArchiveDir, List<File> fil
}
}
} finally {
executorService.shutdown();
hfilesArchiveExecutor.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaTableAccessor;
Expand Down Expand Up @@ -289,6 +290,7 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName
final List<RegionInfo> regions, final boolean archive) throws IOException {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final FileSystem fs = mfs.getFileSystem();
final Configuration conf = env.getMasterConfiguration();

final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), tableName);

Expand All @@ -307,8 +309,7 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName
}
}
}
HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs, mfs.getRootDir(), tableDir,
regionDirList);
HFileArchiver.archiveRegions(conf, fs, mfs.getRootDir(), tableDir, regionDirList);
if (!regionDirList.isEmpty()) {
LOG.debug("Archived {} regions", tableName);
}
Expand All @@ -319,7 +320,7 @@ protected static void deleteFromFs(final MasterProcedureEnv env, final TableName
CommonFSUtils.getTableDir(new Path(mfs.getRootDir(), MobConstants.MOB_DIR_NAME), tableName);
Path regionDir = new Path(mobTableDir, MobUtils.getMobRegionInfo(tableName).getEncodedName());
if (fs.exists(regionDir)) {
HFileArchiver.archiveRegion(fs, mfs.getRootDir(), mobTableDir, regionDir);
HFileArchiver.archiveRegion(conf, fs, mfs.getRootDir(), mobTableDir, regionDir);
}

// Delete table directory from FS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ public static void deleteRegionFromFileSystem(final Configuration conf, final Fi

// Archive region
Path rootDir = CommonFSUtils.getRootDir(conf);
HFileArchiver.archiveRegion(fs, rootDir, tableDir, regionDir);
HFileArchiver.archiveRegion(conf, fs, rootDir, tableDir, regionDir);

// Delete empty region dir
if (!fs.delete(regionDir, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ public void testCleaningRace() throws Exception {

try {
// Try to archive the file
HFileArchiver.archiveRegion(fs, rootDir, sourceRegionDir.getParent(), sourceRegionDir);
HFileArchiver.archiveRegion(conf, fs, rootDir, sourceRegionDir.getParent(),
sourceRegionDir);

// The archiver succeded, the file is no longer in the original location
// but it's in the archive location.
Expand Down Expand Up @@ -690,12 +691,14 @@ public void testCleaningRace() throws Exception {
public void testArchiveRegionTableAndRegionDirsNull() throws IOException {
Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace");
FileSystem fileSystem = UTIL.getTestFileSystem();
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, null));
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, null));
}

@Test
public void testArchiveRegionWithTableDirNull() throws IOException {
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path regionDir = new Path(
CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())), "xyzabc");
Path familyDir = new Path(regionDir, "rd");
Expand All @@ -707,12 +710,13 @@ public void testArchiveRegionWithTableDirNull() throws IOException {
Path sourceRegionDir = new Path(rootDir, regionDir);
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file
assertFalse(HFileArchiver.archiveRegion(fileSystem, rootDir, null, sourceRegionDir));
assertFalse(HFileArchiver.archiveRegion(conf, fileSystem, rootDir, null, sourceRegionDir));
assertFalse(fileSystem.exists(sourceRegionDir));
}

@Test
public void testArchiveRegionWithRegionDirNull() throws IOException {
Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration();
Path regionDir =
new Path(CommonFSUtils.getTableDir(new Path("./"), TableName.valueOf(name.getMethodName())),
"elgn4nf");
Expand All @@ -726,7 +730,7 @@ public void testArchiveRegionWithRegionDirNull() throws IOException {
fileSystem.mkdirs(sourceRegionDir);
// Try to archive the file but with null regionDir, can't delete sourceFile
assertFalse(
HFileArchiver.archiveRegion(fileSystem, rootDir, sourceRegionDir.getParent(), null));
HFileArchiver.archiveRegion(conf, fileSystem, rootDir, sourceRegionDir.getParent(), null));
assertTrue(fileSystem.exists(sourceRegionDir));
fileSystem.delete(sourceRegionDir, true);
}
Expand Down

0 comments on commit 63f84cc

Please sign in to comment.