diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java index 392e27710911..c506d6dc6aed 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java @@ -96,12 +96,18 @@ private void registerBulkLoad(ObserverContext fullyBackedUpTables = tbl.getTablesIncludedInBackups(); + Map continuousBackupTableSet = tbl.getContinuousBackupTableSet(); - if (fullyBackedUpTables.contains(tableName)) { + // Tables in continuousBackupTableSet do not rely on BackupSystemTable but rather + // scan on WAL backup directory to identify bulkload operation HBASE-29656 + if ( + fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName) + ) { tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths); } else { if (LOG.isTraceEnabled()) { - LOG.trace("Table {} has not gone through full backup - skipping.", tableName); + LOG.trace("Table {} has either not gone through full backup or is " + + "part of continuousBackupTableSet - skipping", tableName); } } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index ce6c4d4dc683..3f31255d60f6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -20,25 +20,16 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; @@ -47,7 +38,6 @@ import org.apache.hadoop.hbase.backup.RestoreJob; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.backup.util.BulkFilesCollector; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.mapreduce.WALInputFormat; import org.apache.hadoop.hbase.mapreduce.WALPlayer; @@ -342,8 +332,8 @@ private void reBulkloadFiles(TableName sourceTable, TableName targetTable, long RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); - List bulkloadFiles = - collectBulkFiles(sourceTable, targetTable, startTime, endTime, new Path(restoreRootDir)); + List bulkloadFiles = BackupUtils.collectBulkFiles(conn, sourceTable, targetTable, + startTime, endTime, new Path(restoreRootDir), new ArrayList()); if (bulkloadFiles.isEmpty()) { LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore.", @@ -380,7 +370,7 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT sourceTable, targetTable, startTime, endTime, walDirPath); List validDirs = - getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); if (validDirs.isEmpty()) { LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime, endTime); @@ -390,62 +380,6 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime); } - private List collectBulkFiles(TableName sourceTable, TableName targetTable, long startTime, - long endTime, Path restoreRootDir) throws IOException { - - String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - Path walDirPath = new Path(walBackupDir); - LOG.info( - "Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}, restore root: {}", - sourceTable, targetTable, startTime, endTime, walDirPath, restoreRootDir); - - List validDirs = - getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); - if (validDirs.isEmpty()) { - LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.", - startTime, endTime); - return Collections.emptyList(); - } - - String walDirsCsv = String.join(",", validDirs); - - return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()), - walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime); - } - - /** - * Fetches valid WAL directories based on the given time range. - */ - private List getValidWalDirs(Configuration conf, Path walBackupDir, long startTime, - long endTime) throws IOException { - FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf); - FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); - - List validDirs = new ArrayList<>(); - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - - for (FileStatus dayDir : dayDirs) { - if (!dayDir.isDirectory()) { - continue; // Skip files, only process directories - } - - String dirName = dayDir.getPath().getName(); - try { - Date dirDate = dateFormat.parse(dirName); - long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) - long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) - - // Check if this day's WAL files overlap with the required time range - if (dirEndTime >= startTime && dirStartTime <= endTime) { - validDirs.add(dayDir.getPath().toString()); - } - } catch (ParseException e) { - LOG.warn("Skipping invalid directory name: {}", dirName, e); - } - } - return validDirs; - } - /** * Executes WAL replay using WALPlayer. */ diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index c2aa0aa17fd1..1bd3621b2945 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -19,27 +19,19 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; -import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TimeZone; +import java.util.stream.Collectors; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -86,6 +78,7 @@ @InterfaceAudience.Private public class IncrementalTableBackupClient extends TableBackupClient { private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); + private static final String BULKLOAD_COLLECTOR_OUTPUT = "bulkload-collector-output"; protected IncrementalTableBackupClient() { } @@ -137,89 +130,89 @@ protected static int getIndex(TableName tbl, List sTableList) { * the backup is marked as complete. * @param tablesToBackup list of tables to be backed up */ - protected List handleBulkLoad(List tablesToBackup) throws IOException { + protected List handleBulkLoad(List tablesToBackup, + Map> tablesToWALFileList, Map tablesToPrevBackupTs) + throws IOException { Map toBulkload = new HashMap<>(); - List bulkLoads; - if (backupInfo.isContinuousBackupEnabled()) { - bulkLoads = - backupManager.readBulkloadRows(tablesToBackup, backupInfo.getIncrCommittedWalTs()); - } else { - bulkLoads = backupManager.readBulkloadRows(tablesToBackup); - } + List bulkLoads = new ArrayList<>(); + FileSystem tgtFs; try { tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); } catch (URISyntaxException use) { throw new IOException("Unable to get FileSystem", use); } + Path rootdir = CommonFSUtils.getRootDir(conf); Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); - for (BulkLoad bulkLoad : bulkLoads) { - TableName srcTable = bulkLoad.getTableName(); - MergeSplitBulkloadInfo bulkloadInfo = - toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); - String regionName = bulkLoad.getRegion(); - String fam = bulkLoad.getColumnFamily(); - String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); + if (!backupInfo.isContinuousBackupEnabled()) { + bulkLoads = backupManager.readBulkloadRows(tablesToBackup); + for (BulkLoad bulkLoad : bulkLoads) { + TableName srcTable = bulkLoad.getTableName(); + if (!tablesToBackup.contains(srcTable)) { + LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); + continue; + } + + MergeSplitBulkloadInfo bulkloadInfo = + toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); + String regionName = bulkLoad.getRegion(); + String fam = bulkLoad.getColumnFamily(); + String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); + Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); + Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); + String srcTableQualifier = srcTable.getQualifierAsString(); + String srcTableNs = srcTable.getNamespaceAsString(); + Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier + + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); + if (!tgtFs.mkdirs(tgtFam)) { + throw new IOException("couldn't create " + tgtFam); + } - if (!tablesToBackup.contains(srcTable)) { - LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); - continue; - } - Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); - Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - - // For continuous backup: bulkload files are copied from backup directory defined by - // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster. - String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { - String dayDirectoryName = BackupUtils.formatToDateString(bulkLoad.getTimestamp()); - Path bulkLoadBackupPath = - new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + dayDirectoryName); - Path bulkLoadDir = new Path(bulkLoadBackupPath, - srcTable.getNamespaceAsString() + Path.SEPARATOR + srcTable.getNameAsString()); - FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf); - Path fullBulkLoadBackupPath = - new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); - if (backupFs.exists(fullBulkLoadBackupPath)) { - LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath); - p = fullBulkLoadBackupPath; - } else { - LOG.warn("Backup bulkload file not found {}", fullBulkLoadBackupPath); + Path tgt = new Path(tgtFam, filename); + Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); + Path archive = new Path(archiveDir, filename); + + if (fs.exists(p)) { + if (LOG.isTraceEnabled()) { + LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), + srcTableQualifier); + LOG.trace("copying {} to {}", p, tgt); + } + bulkloadInfo.addActiveFile(p.toString()); + } else if (fs.exists(archive)) { + LOG.debug("copying archive {} to {}", archive, tgt); + bulkloadInfo.addArchiveFiles(archive.toString()); } } - String srcTableQualifier = srcTable.getQualifierAsString(); - String srcTableNs = srcTable.getNamespaceAsString(); - Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier - + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); - if (!tgtFs.mkdirs(tgtFam)) { - throw new IOException("couldn't create " + tgtFam); + for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { + mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(), + bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs); } - Path tgt = new Path(tgtFam, filename); + } else { + // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs + Path collectorOutput = new Path(getBulkOutputDir(), BULKLOAD_COLLECTOR_OUTPUT); + for (TableName table : tablesToBackup) { + long startTs = tablesToPrevBackupTs.getOrDefault(table, 0L); + long endTs = backupInfo.getIncrCommittedWalTs(); + List walDirs = tablesToWALFileList.getOrDefault(table, new ArrayList()); - Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); - Path archive = new Path(archiveDir, filename); + List bulkloadPaths = BackupUtils.collectBulkFiles(conn, table, table, startTs, endTs, + collectorOutput, walDirs); - if (fs.exists(p)) { - if (LOG.isTraceEnabled()) { - LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), - srcTableQualifier); - LOG.trace("copying {} to {}", p, tgt); + List bulkLoadFiles = + bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList()); + + if (bulkLoadFiles.isEmpty()) { + LOG.info("No bulk-load files found for table {}", table); + continue; } - bulkloadInfo.addActiveFile(p.toString()); - } else if (fs.exists(archive)) { - LOG.debug("copying archive {} to {}", archive, tgt); - bulkloadInfo.addArchiveFiles(archive.toString()); - } - } - for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { - mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(), - bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs); + mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); + } } - return bulkLoads; } @@ -306,11 +299,20 @@ private void updateFileLists(List activeFiles, List archiveFiles */ @Override public void execute() throws IOException, ColumnFamilyMismatchException { + // tablesToWALFileList and tablesToPrevBackupTs are needed for "continuous" Incremental backup + Map> tablesToWALFileList = new HashMap<>(); + Map tablesToPrevBackupTs = new HashMap<>(); try { Map tablesToFullBackupIds = getFullBackupIds(); verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds); // case PREPARE_INCREMENTAL: + if (backupInfo.isContinuousBackupEnabled()) { + // committedWALsTs is needed only for Incremental backups with continuous backup + // since these do not depend on log roll ts + long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); + backupInfo.setIncrCommittedWalTs(committedWALsTs); + } beginBackup(backupManager, backupInfo); backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); // Non-continuous Backup incremental backup is controlled by 'incremental backup table set' @@ -339,7 +341,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException { BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); setupRegionLocator(); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(); + convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); } catch (Exception e) { @@ -371,7 +373,8 @@ public void execute() throws IOException, ColumnFamilyMismatchException { backupManager.writeBackupStartCode(newStartCode); } - List bulkLoads = handleBulkLoad(backupInfo.getTableNames()); + List bulkLoads = + handleBulkLoad(backupInfo.getTableNames(), tablesToWALFileList, tablesToPrevBackupTs); // backup complete completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); @@ -425,10 +428,19 @@ protected void deleteBulkLoadDirectory() throws IOException { } } - protected void convertWALsToHFiles() throws IOException { + protected void convertWALsToHFiles(Map> tablesToWALFileList, + Map tablesToPrevBackupTs) throws IOException { long previousBackupTs = 0L; + long currentBackupTs = 0L; if (backupInfo.isContinuousBackupEnabled()) { + String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + if (Strings.isNullOrEmpty(walBackupDir)) { + throw new IOException( + "Incremental backup requires the WAL backup directory " + CONF_CONTINUOUS_BACKUP_WAL_DIR); + } + Path walBackupPath = new Path(walBackupDir); Set tableSet = backupInfo.getTables(); + currentBackupTs = backupInfo.getIncrCommittedWalTs(); List backupInfos = backupManager.getBackupHistory(true); for (TableName table : tableSet) { for (BackupInfo backup : backupInfos) { @@ -442,7 +454,10 @@ protected void convertWALsToHFiles() throws IOException { } else { previousBackupTs = backup.getIncrCommittedWalTs(); } - walBackupFileList = getBackupLogs(previousBackupTs); + walBackupFileList = + BackupUtils.getValidWalDirs(conf, walBackupPath, previousBackupTs, currentBackupTs); + tablesToWALFileList.put(table, walBackupFileList); + tablesToPrevBackupTs.put(table, previousBackupTs); walToHFiles(walBackupFileList, Arrays.asList(table.getNameAsString()), previousBackupTs); break; @@ -469,47 +484,6 @@ protected void convertWALsToHFiles() throws IOException { } } - private List getBackupLogs(long startTs) throws IOException { - // get log files from backup dir - String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - if (Strings.isNullOrEmpty(walBackupDir)) { - throw new IOException( - "Incremental backup requires the WAL backup directory " + CONF_CONTINUOUS_BACKUP_WAL_DIR); - } - List resultLogFiles = new ArrayList<>(); - Path walBackupPath = new Path(walBackupDir); - FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf); - FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); - SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - - for (FileStatus dayDir : dayDirs) { - if (!dayDir.isDirectory()) { - continue; // Skip files, only process directories - } - - String dirName = dayDir.getPath().getName(); - try { - Date dirDate = dateFormat.parse(dirName); - long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) - long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) - - if (dirEndTime >= startTs) { - Path dirPath = dayDir.getPath(); - FileStatus[] logs = backupFs.listStatus(dirPath); - for (FileStatus log : logs) { - String filepath = log.getPath().toString(); - LOG.debug("Found WAL file: {}", filepath); - resultLogFiles.add(filepath); - } - } - } catch (ParseException e) { - LOG.warn("Skipping invalid directory name: " + dirName, e); - } - } - return resultLogFiles; - } - protected boolean tableExists(TableName table, Connection conn) throws IOException { try (Admin admin = conn.getAdmin()) { return admin.tableExists(table); @@ -533,11 +507,7 @@ protected void walToHFiles(List dirPaths, List tableList, long p conf.set(JOB_NAME_CONF_KEY, jobname); if (backupInfo.isContinuousBackupEnabled()) { conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs)); - // committedWALsTs is needed only for Incremental backups with continuous backup - // since these do not depend on log roll ts - long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); - backupInfo.setIncrCommittedWalTs(committedWALsTs); - conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs)); + conf.set(WALInputFormat.END_TIME_KEY, Long.toString(backupInfo.getIncrCommittedWalTs())); } String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java index b752c7f78e01..cf19d2622216 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java @@ -75,7 +75,7 @@ public class BulkLoadCollectorJob extends Configured implements Tool { public BulkLoadCollectorJob() { } - protected BulkLoadCollectorJob(final Configuration c) { + public BulkLoadCollectorJob(final Configuration c) { super(c); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index bf309104775c..28bbfcf254ae 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hbase.backup.util; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; +import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URLDecoder; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; @@ -46,6 +50,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; @@ -78,6 +83,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.com.google.common.base.Strings; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; @@ -945,4 +951,71 @@ public static String formatToDateString(long dayInMillis) { dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); return dateFormat.format(new Date(dayInMillis)); } + + /** + * Fetches bulkload filepaths based on the given time range from backup WAL directory. + */ + public static List collectBulkFiles(Connection conn, TableName sourceTable, + TableName targetTable, long startTime, long endTime, Path restoreRootDir, List walDirs) + throws IOException { + + if (walDirs.isEmpty()) { + String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + if (Strings.isNullOrEmpty(walBackupDir)) { + throw new IOException( + "WAL backup directory is not configured " + CONF_CONTINUOUS_BACKUP_WAL_DIR); + } + Path walDirPath = new Path(walBackupDir); + walDirs = + BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime); + } + + if (walDirs.isEmpty()) { + LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.", + startTime, endTime); + return Collections.emptyList(); + } + + LOG.info( + "Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL " + + "backup dir: {}, restore root: {}", + sourceTable, targetTable, startTime, endTime, walDirs, restoreRootDir); + String walDirsCsv = String.join(",", walDirs); + + return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()), + walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime); + } + + /** + * Fetches valid WAL directories based on the given time range. + */ + public static List getValidWalDirs(Configuration conf, Path walBackupDir, long startTime, + long endTime) throws IOException { + FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf); + FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR)); + + List validDirs = new ArrayList<>(); + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + + for (FileStatus dayDir : dayDirs) { + if (!dayDir.isDirectory()) { + continue; // Skip files, only process directories + } + + String dirName = dayDir.getPath().getName(); + try { + Date dirDate = dateFormat.parse(dirName); + long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00) + long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59) + + // Check if this day's WAL files overlap with the required time range + if (dirEndTime >= startTime && dirStartTime <= endTime) { + validDirs.add(dayDir.getPath().toString()); + } + } catch (ParseException e) { + LOG.warn("Skipping invalid directory name: {}", dirName, e); + } + } + return validDirs; + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 159514bd45b1..e32e1b8f920a 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -175,7 +175,7 @@ public void execute() throws IOException { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(); + convertWALsToHFiles(new HashMap<>(), new HashMap<>()); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); failStageIf(Stage.stage_2); @@ -200,7 +200,7 @@ public void execute() throws IOException { BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); - handleBulkLoad(backupInfo.getTableNames()); + handleBulkLoad(backupInfo.getTableNames(), new HashMap<>(), new HashMap<>()); failStageIf(Stage.stage_4); // backup complete diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 54f3842f463b..72867da95f17 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -163,20 +163,18 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws performBulkLoad("bulkPreIncr", methodName, tableName1); expectedRowCount += ROWS_IN_BULK_LOAD; assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); - assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); loadTable(TEST_UTIL.getConnection().getTable(tableName1)); Thread.sleep(15000); performBulkLoad("bulkPostIncr", methodName, tableName1); - assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size()); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); // Incremental backup String backup2 = backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup2)); - - // bulkPostIncr Bulkload entry should not be deleted post incremental backup - assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); TEST_UTIL.truncateTable(tableName1); // Restore incremental backup