-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29656 Scan WALs to identify bulkload operations for incremental backup #7400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: HBASE-28957
Are you sure you want to change the base?
Conversation
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements WAL scanning to identify bulk load operations for continuous incremental backups. Instead of relying on the backup system table to track bulk load hfiles, continuous incremental backups now use BulkLoadCollectorJob to scan backed-up WAL files directly.
Key changes:
- BulkLoadCollectorJob now runs on backed-up WALs instead of source cluster WALs for continuous incremental backups
- The backup system table is no longer used to track bulk loads when continuous backup is enabled
- The replication checkpoint timestamp is captured at backup start for filtering WAL entries
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| TestIncrementalBackupWithContinuous.java | Updated test assertions to verify bulk load rows are not stored in system table for continuous backups |
| TestBackupBase.java | Added empty maps to method signatures for compatibility |
| BulkLoadCollectorJob.java | Changed constructor visibility from protected to public |
| TableBackupClient.java | Added capture of replication checkpoint timestamp at backup start |
| IncrementalTableBackupClient.java | Implemented WAL-based bulk load collection using BulkLoadCollectorJob for continuous backups |
| BackupObserver.java | Modified to skip registering bulk loads in system table when continuous backup is enabled |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| Path archive = new Path(archiveDir, filename); | ||
| List<Path> bulkloadPaths = | ||
| BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, collectorOutput, table, table, | ||
| tablesToPrevBackupTs.get(table), backupInfo.getIncrCommittedWalTs()); |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NullPointerException if tablesToPrevBackupTs.get(table) returns null. The map may not contain an entry for the table if no previous backup exists, which would cause an NPE when the primitive long is expected.
| tablesToPrevBackupTs.get(table), backupInfo.getIncrCommittedWalTs()); | |
| tablesToPrevBackupTs.get(table) != null ? tablesToPrevBackupTs.get(table) : 0L, backupInfo.getIncrCommittedWalTs()); |
| // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs | ||
| Path collectorOutput = new Path(getBulkOutputDir(), BULKLOAD_COLLECTOR_OUTPUT); | ||
| for (TableName table : tablesToBackup) { | ||
| String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NullPointerException if tablesToWALFileList.get(table) returns null. If the table has no WAL files in the map, String.join will throw an NPE when attempting to join null.
| String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); | |
| List<String> walDirs = tablesToWALFileList.get(table); | |
| String walDirsCsv = String.join(",", walDirs != null ? walDirs : java.util.Collections.emptyList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just have one minor comment about the change in walToHFiles about the WALInputFormat.END_TIME_KEY, do we have any existing or new unit tests cover this timestamp change?
conf.set(WALInputFormat.END_TIME_KEY, Long.toString(backupInfo.getIncrCommittedWalTs()));
|
|
||
| if (fullyBackedUpTables.contains(tableName)) { | ||
| if ( | ||
| fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] do you see a lot of entries before this change that keeps registering for the same table? if so and if this is not only unit test, do you think it's a logic error from that trigger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a suggestion. Perhaps we could add a comment stating that for continuous backup, this isn't necessary, as everything will be utilized from the WAL backup location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't understand the question completely. This BackupObserver#registerBulkLoad() is called for each bulkload operation and registers them in backup system table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after adding the comment should have addressed my concerns, and yeah !continuousBackupTableSet.containsKey(tableName) means only non-continuous backup need this register bulkload.
| if (bulkLoadFiles.isEmpty()) { | ||
| LOG.info("No bulk-load files found for table {}", table); | ||
| } else { | ||
| mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] using continue may align the style with the other loop of !backupInfo.isContinuousBackupEnabled()
| if (bulkLoadFiles.isEmpty()) { | |
| LOG.info("No bulk-load files found for table {}", table); | |
| } else { | |
| mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); | |
| } | |
| if (bulkLoadFiles.isEmpty()) { | |
| LOG.info("No bulk-load files found for table {}", table); | |
| continue; | |
| } | |
| mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); |
...e-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one minor nit. LGTM otherwise.
| if (!tablesToBackup.contains(srcTable)) { | ||
| LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this be moved Line 162? It looks like some variables are being set, but they could end up just not being used because of this if block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added few comments regarding minimizing duplicate code.
|
|
||
| if (fullyBackedUpTables.contains(tableName)) { | ||
| if ( | ||
| fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a suggestion. Perhaps we could add a comment stating that for continuous backup, this isn't necessary, as everything will be utilized from the WAL backup location.
| // set the start timestamp of the overall backup | ||
| long startTs = EnvironmentEdgeManager.currentTime(); | ||
| backupInfo.setStartTs(startTs); | ||
| if (backupInfo.getType() == BackupType.INCREMENTAL && backupInfo.isContinuousBackupEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this logic added to TableBackupClient? Wouldn't it be more appropriate to place it in IncrementalTableBackupClient?
| } | ||
|
|
||
| private List<String> getBackupLogs(long startTs) throws IOException { | ||
| private List<String> getBackupLogs(long startTs, long endTs) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid duplicating code. We already have similar functionality for retrieving log files within a time range in org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#getValidWalDirs. Can we use that instead? We could move the file to a common location such as src/main/java/org/apache/hadoop/hbase/backup/util.
| Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); | ||
| Path archive = new Path(archiveDir, filename); | ||
| List<Path> bulkloadPaths = | ||
| BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, collectorOutput, table, table, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than calling BulkFilesCollector directly, we can use the org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#collectBulkFiles() method, which serves as a higher-level approach and internally invokes BulkFilesCollector.collectFromWalDirs(). This helps us avoid duplicating code. In both restore and incremental backup scenarios, we need to extract bulkload files by reading WAL files within a given time range, so it makes sense to have a single logic for this. We should consider placing this common logic in a utility class under the util package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BulkFilesCollector#collectFromWalDirs() is itself a utility function. I have computed valid WAL directory using BackupUtils#getValidWalDirs() once already in IncrementalTableBackupClient#convertWALsToHFiles() so here I am reusing that. If I call AbstractPitrRestoreHandler#collectBulkFiles() it would again call BackupUtils#getValidWalDirs()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @ankitsol . This class should not make a call to an abstract class - you would have to make the method public -, instead move more logic to the utility class if you want to share more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BulkFilesCollector#collectFromWalDirs()is itself a utility function. I have computed valid WAL directory usingBackupUtils#getValidWalDirs()once already inIncrementalTableBackupClient#convertWALsToHFiles()so here I am reusing that. If I callAbstractPitrRestoreHandler#collectBulkFiles()it would again callBackupUtils#getValidWalDirs()
Consider passing that as a parameter. Adjust the original methods as minimally as possible to accommodate both scenarios.
This class should not make a call to an abstract class
No, as mentioned earlier, we should move the shared elements to a utility class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing that as a parameter. Adjust the original methods as minimally as possible to accommodate both scenarios.
Please elaborate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I call AbstractPitrRestoreHandler#collectBulkFiles() it would again call BackupUtils#getValidWalDirs()
instead of calling BackupUtils#getValidWalDirs() inside AbstractPitrRestoreHandler#collectBulkFiles(), take the output of BackupUtils#getValidWalDirs() as parameter.
| LOG.info("Called collectFromWalDirs for source table {}, target table {}, startTime {}, endTime" | ||
| + " {}, restoreRootDir {}", sourceTable, targetTable, startTime, endTime, restoreRootDir); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] is this a debug message? or should we delete it ?
Non-continuous Incremental backup uses backup system table to identify which bulkload hfiles it needs to copy.
With continuous incremental backup, this change uses BulkLoadCollectorJob to identify bulkload hfiles it needs to copy. BulkLoadCollectorJob is run on back-ed up WAL instead of source cluster WALs
JIRA: https://issues.apache.org/jira/browse/HBASE-29656