2020import static org .apache .hadoop .hbase .backup .BackupRestoreConstants .CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS ;
2121import static org .apache .hadoop .hbase .backup .BackupRestoreConstants .CONF_CONTINUOUS_BACKUP_WAL_DIR ;
2222import static org .apache .hadoop .hbase .backup .BackupRestoreConstants .DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS ;
23- import static org .apache .hadoop .hbase .backup .replication .BackupFileSystemManager .WALS_DIR ;
2423import static org .apache .hadoop .hbase .backup .replication .ContinuousBackupReplicationEndpoint .ONE_DAY_IN_MILLISECONDS ;
24+ import static org .apache .hadoop .hbase .backup .util .BackupFileSystemManager .WALS_DIR ;
2525import static org .apache .hadoop .hbase .backup .util .BackupUtils .DATE_FORMAT ;
2626import static org .apache .hadoop .hbase .mapreduce .WALPlayer .IGNORE_EMPTY_FILES ;
2727
3030import java .text .SimpleDateFormat ;
3131import java .util .ArrayList ;
3232import java .util .Arrays ;
33+ import java .util .Collections ;
3334import java .util .Date ;
3435import java .util .List ;
3536import java .util .Map ;
4142import org .apache .hadoop .fs .Path ;
4243import org .apache .hadoop .hbase .HBaseConfiguration ;
4344import org .apache .hadoop .hbase .TableName ;
45+ import org .apache .hadoop .hbase .backup .BackupRestoreFactory ;
4446import org .apache .hadoop .hbase .backup .PointInTimeRestoreRequest ;
47+ import org .apache .hadoop .hbase .backup .RestoreJob ;
4548import org .apache .hadoop .hbase .backup .RestoreRequest ;
4649import org .apache .hadoop .hbase .backup .util .BackupUtils ;
50+ import org .apache .hadoop .hbase .backup .util .BulkFilesCollector ;
4751import org .apache .hadoop .hbase .client .Connection ;
4852import org .apache .hadoop .hbase .mapreduce .WALInputFormat ;
4953import org .apache .hadoop .hbase .mapreduce .WALPlayer ;
@@ -305,6 +309,63 @@ private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTa
305309
306310 backupAdmin .restore (restoreRequest );
307311 replayWal (sourceTable , targetTable , backupMetadata .getStartTs (), endTime );
312+
313+ reBulkloadFiles (sourceTable , targetTable , backupMetadata .getStartTs (), endTime ,
314+ request .isKeepOriginalSplits (), request .getRestoreRootDir ());
315+ }
316+
317+ /**
318+ * Re-applies/re-bulkloads store files discovered from WALs into the target table.
319+ * <p>
320+ * <b>Note:</b> this method re-uses the same {@link RestoreJob} MapReduce job that we originally
321+ * implemented for performing full and incremental backup restores. The MR job (obtained via
322+ * {@link BackupRestoreFactory#getRestoreJob(Configuration)}) is used here to perform an HFile
323+ * bulk-load of the discovered store files into {@code targetTable}.
324+ * @param sourceTable source table name (used for locating bulk files and logging)
325+ * @param targetTable destination table to bulk-load the HFiles into
326+ * @param startTime start of WAL range (ms)
327+ * @param endTime end of WAL range (ms)
328+ * @param keepOriginalSplits pass-through flag to control whether original region splits are
329+ * preserved
330+ * @param restoreRootDir local/DFS path under which temporary and output dirs are created
331+ * @throws IOException on IO or job failure
332+ */
333+ private void reBulkloadFiles (TableName sourceTable , TableName targetTable , long startTime ,
334+ long endTime , boolean keepOriginalSplits , String restoreRootDir ) throws IOException {
335+
336+ Configuration conf = HBaseConfiguration .create (conn .getConfiguration ());
337+ conf .setBoolean (RestoreJob .KEEP_ORIGINAL_SPLITS_KEY , keepOriginalSplits );
338+
339+ String walBackupDir = conn .getConfiguration ().get (CONF_CONTINUOUS_BACKUP_WAL_DIR );
340+ Path walDirPath = new Path (walBackupDir );
341+ conf .set (RestoreJob .BACKUP_ROOT_PATH_KEY , walDirPath .toString ());
342+
343+ RestoreJob restoreService = BackupRestoreFactory .getRestoreJob (conf );
344+
345+ List <Path > bulkloadFiles =
346+ collectBulkFiles (sourceTable , targetTable , startTime , endTime , new Path (restoreRootDir ));
347+
348+ if (bulkloadFiles .isEmpty ()) {
349+ LOG .info ("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore." ,
350+ sourceTable , startTime , endTime );
351+ return ;
352+ }
353+
354+ Path [] pathsArray = bulkloadFiles .toArray (new Path [0 ]);
355+
356+ try {
357+ // Use the existing RestoreJob MR job (the same MapReduce job used for full/incremental
358+ // restores)
359+ // to perform the HFile bulk-load of the discovered store files into `targetTable`.
360+ restoreService .run (pathsArray , new TableName [] { sourceTable }, new Path (restoreRootDir ),
361+ new TableName [] { targetTable }, false );
362+ LOG .info ("Re-bulkload completed for {}" , targetTable );
363+ } catch (Exception e ) {
364+ String errorMessage =
365+ String .format ("Re-bulkload failed for %s: %s" , targetTable , e .getMessage ());
366+ LOG .error (errorMessage , e );
367+ throw new IOException (errorMessage , e );
368+ }
308369 }
309370
310371 /**
@@ -329,6 +390,29 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT
329390 executeWalReplay (validDirs , sourceTable , targetTable , startTime , endTime );
330391 }
331392
393+ private List <Path > collectBulkFiles (TableName sourceTable , TableName targetTable , long startTime ,
394+ long endTime , Path restoreRootDir ) throws IOException {
395+
396+ String walBackupDir = conn .getConfiguration ().get (CONF_CONTINUOUS_BACKUP_WAL_DIR );
397+ Path walDirPath = new Path (walBackupDir );
398+ LOG .info (
399+ "Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}, restore root: {}" ,
400+ sourceTable , targetTable , startTime , endTime , walDirPath , restoreRootDir );
401+
402+ List <String > validDirs =
403+ getValidWalDirs (conn .getConfiguration (), walDirPath , startTime , endTime );
404+ if (validDirs .isEmpty ()) {
405+ LOG .warn ("No valid WAL directories found for range {} - {}. Skipping bulk-file collection." ,
406+ startTime , endTime );
407+ return Collections .emptyList ();
408+ }
409+
410+ String walDirsCsv = String .join ("," , validDirs );
411+
412+ return BulkFilesCollector .collectFromWalDirs (HBaseConfiguration .create (conn .getConfiguration ()),
413+ walDirsCsv , restoreRootDir , sourceTable , targetTable , startTime , endTime );
414+ }
415+
332416 /**
333417 * Fetches valid WAL directories based on the given time range.
334418 */
@@ -356,7 +440,7 @@ private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, long
356440 validDirs .add (dayDir .getPath ().toString ());
357441 }
358442 } catch (ParseException e ) {
359- LOG .warn ("Skipping invalid directory name: " + dirName , e );
443+ LOG .warn ("Skipping invalid directory name: {}" , dirName , e );
360444 }
361445 }
362446 return validDirs ;
0 commit comments