Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28836 Parallize the file archival to improve the split times #6615

Open
wants to merge 1 commit into
base: branch-2.5
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -421,50 +427,94 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
*/
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
// short circuit if no files to move
// Early exit if no files to archive
if (toArchive.isEmpty()) {
LOG.trace("No files to archive, returning an empty list.");
return Collections.emptyList();
}

LOG.trace("Moving files to the archive directory {}", baseArchiveDir);
LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);

// make sure the archive directory exists
if (!fs.exists(baseArchiveDir)) {
if (!fs.mkdirs(baseArchiveDir)) {
throw new IOException("Failed to create the archive directory:" + baseArchiveDir
+ ", quitting archive attempt.");
}
LOG.trace("Created archive directory {}", baseArchiveDir);
}
// Ensure the archive directory exists
ensureArchiveDirectoryExists(fs, baseArchiveDir);

List<File> failures = new ArrayList<>();
// Thread-safe collection for storing failures
Queue<File> failures = new ConcurrentLinkedQueue<>();
String startTime = Long.toString(start);

// Separate files and directories for processing
List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
// if its a file archive it
try {
LOG.trace("Archiving {}", file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
if (file.isFile()) {
filesOnly.add(file);
} else {
handleDirectory(fs, baseArchiveDir, failures, file, start);
}
}

// Archive files concurrently
archiveFilesConcurrently(baseArchiveDir, filesOnly, failures, startTime);

return new ArrayList<>(failures); // Convert to a List for the return value
}

private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir)
throws IOException {
if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
throw new IOException("Failed to create the archive directory: " + baseArchiveDir);
}
LOG.trace("Archive directory ready: {}", baseArchiveDir);
}

private static void handleDirectory(FileSystem fs, Path baseArchiveDir, Queue<File> failures,
File directory, long start) {
LOG.trace("Processing directory: {}, archiving its children.", directory);
Path subArchiveDir = new Path(baseArchiveDir, directory.getName());

try {
Collection<File> children = directory.getChildren();
failures.addAll(resolveAndArchive(fs, subArchiveDir, children, start));
} catch (IOException e) {
LOG.warn("Failed to archive directory: {}", directory, e);
failures.add(directory);
}
}

private static void archiveFilesConcurrently(Path baseArchiveDir, List<File> files,
Queue<File> failures, String startTime) {
LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir);

ExecutorService executorService = Executors.newCachedThreadPool();
try {
Map<File, Future<Boolean>> futureMap = new HashMap<>();

// Submit file archiving tasks
for (File file : files) {
Future<Boolean> future =
executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futureMap.put(file, future);
}

// Process results of each task
for (Map.Entry<File, Future<Boolean>> entry : futureMap.entrySet()) {
File file = entry.getKey();
try {
if (!entry.getValue().get()) {
LOG.warn("Failed to archive file: {} into directory: {}", file, baseArchiveDir);
failures.add(file);
}
} else {
// otherwise its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
} catch (InterruptedException e) {
LOG.error("Archiving interrupted for file: {}", file, e);
Thread.currentThread().interrupt(); // Restore interrupt status
failures.add(file);
} catch (ExecutionException e) {
LOG.error("Archiving failed for file: {}", file, e);
failures.add(file);
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
} finally {
executorService.shutdown();
}
return failures;
}

/**
Expand Down