Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,18 @@ private void registerBulkLoad(ObserverContext<? extends RegionCoprocessorEnviron
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
Map<TableName, Long> continuousBackupTableSet = tbl.getContinuousBackupTableSet();

if (fullyBackedUpTables.contains(tableName)) {
// Tables in continuousBackupTableSet do not rely on BackupSystemTable but rather
// scan on WAL backup directory to identify bulkload operation HBASE-29656
if (
fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] do you see a lot of entries before this change that keeps registering for the same table? if so and if this is not only unit test, do you think it's a logic error from that trigger?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a suggestion. Perhaps we could add a comment stating that for continuous backup, this isn't necessary, as everything will be utilized from the WAL backup location.

Copy link
Author

@ankitsol ankitsol Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't understand the question completely. This BackupObserver#registerBulkLoad() is called for each bulkload operation and registers them in backup system table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after adding the comment should have addressed my concerns, and yeah !continuousBackupTableSet.containsKey(tableName) means only non-continuous backup need this register bulkload.

) {
tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Table {} has not gone through full backup - skipping.", tableName);
LOG.trace("Table {} has either not gone through full backup or is "
+ "part of continuousBackupTableSet - skipping", tableName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,16 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -47,7 +38,6 @@
import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.backup.util.BulkFilesCollector;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
Expand Down Expand Up @@ -342,8 +332,8 @@ private void reBulkloadFiles(TableName sourceTable, TableName targetTable, long

RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);

List<Path> bulkloadFiles =
collectBulkFiles(sourceTable, targetTable, startTime, endTime, new Path(restoreRootDir));
List<Path> bulkloadFiles = BackupUtils.collectBulkFiles(conn, sourceTable, targetTable,
startTime, endTime, new Path(restoreRootDir), new ArrayList<String>());

if (bulkloadFiles.isEmpty()) {
LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore.",
Expand Down Expand Up @@ -380,7 +370,7 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT
sourceTable, targetTable, startTime, endTime, walDirPath);

List<String> validDirs =
getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
if (validDirs.isEmpty()) {
LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime,
endTime);
Expand All @@ -390,62 +380,6 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT
executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
}

private List<Path> collectBulkFiles(TableName sourceTable, TableName targetTable, long startTime,
long endTime, Path restoreRootDir) throws IOException {

String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
Path walDirPath = new Path(walBackupDir);
LOG.info(
"Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}, restore root: {}",
sourceTable, targetTable, startTime, endTime, walDirPath, restoreRootDir);

List<String> validDirs =
getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
if (validDirs.isEmpty()) {
LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.",
startTime, endTime);
return Collections.emptyList();
}

String walDirsCsv = String.join(",", validDirs);

return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime);
}

/**
* Fetches valid WAL directories based on the given time range.
*/
private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, long startTime,
long endTime) throws IOException {
FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR));

List<String> validDirs = new ArrayList<>();
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);

for (FileStatus dayDir : dayDirs) {
if (!dayDir.isDirectory()) {
continue; // Skip files, only process directories
}

String dirName = dayDir.getPath().getName();
try {
Date dirDate = dateFormat.parse(dirName);
long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59)

// Check if this day's WAL files overlap with the required time range
if (dirEndTime >= startTime && dirStartTime <= endTime) {
validDirs.add(dayDir.getPath().toString());
}
} catch (ParseException e) {
LOG.warn("Skipping invalid directory name: {}", dirName, e);
}
}
return validDirs;
}

/**
* Executes WAL replay using WALPlayer.
*/
Expand Down
Loading