diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index bf3fbd531bfe..2442e0789a8d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -205,6 +206,14 @@ protected void doStart() { notifyStarted(); } + @Override + public EmptyEntriesPolicy getEmptyEntriesPolicy() { + // Since this endpoint writes to S3 asynchronously, an empty entry batch + // does not guarantee that all previously submitted entries were persisted. + // Hence, avoid committing the WAL position. + return EmptyEntriesPolicy.SUBMIT; + } + @Override public ReplicationResult replicate(ReplicateContext replicateContext) { final List entries = replicateContext.getEntries(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java new file mode 100644 index 000000000000..5a5d8ab754c3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Policy that defines what a replication endpoint should do when the entry batch is empty. This is + * used to determine whether the replication source should consider an empty batch as: - + * {@code COMMIT}: Consider the position as fully committed, and update the WAL position. - + * {@code SUBMIT}: Treat it as submitted but not committed, i.e., do not advance the WAL position. + * Some endpoints may buffer entries (e.g., in open files on S3) and delay actual persistence. In + * such cases, an empty batch should not result in WAL position commit. + */ +@InterfaceAudience.Private +public enum EmptyEntriesPolicy { + COMMIT, + SUBMIT +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index fc5c2bf62659..fbb6b6b9ef10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -291,4 +291,22 @@ public int getTimeout() { * @throws IllegalStateException if this service's state isn't FAILED. */ Throwable failureCause(); + + /** + * Defines the behavior when the replication source encounters an empty entry batch. + *

+ * By default, this method returns {@link EmptyEntriesPolicy#COMMIT}, meaning the replication + * source can safely consider the WAL position as committed and move on. + *

+ *

+ * However, certain endpoints like backup or asynchronous S3 writers may delay persistence (e.g., + * writing to temporary files or buffers). In those cases, returning + * {@link EmptyEntriesPolicy#SUBMIT} avoids incorrectly advancing WAL position and risking data + * loss. + *

+ * @return the {@link EmptyEntriesPolicy} to apply for empty entry batches. + */ + default EmptyEntriesPolicy getEmptyEntriesPolicy() { + return EmptyEntriesPolicy.COMMIT; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index ee819faa77b8..f45c8762683a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -150,13 +152,25 @@ protected void postFinish() { } /** - * Do the shipping logic + * Do the shipping logic. */ - private void shipEdits(WALEntryBatch entryBatch) { + @RestrictedApi( + explanation = "Package-private for test visibility only. Do not use outside tests.", + link = "", + allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)") + void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - updateLogPosition(entryBatch, ReplicationResult.COMMITTED); + /* + * Delegate to the endpoint to decide how to treat empty entry batches. In most replication + * flows, receiving an empty entry batch means that everything so far has been successfully + * replicated and committed — so it's safe to mark the WAL position as committed (COMMIT). + * However, some endpoints (e.g., asynchronous S3 backups) may buffer writes and delay actual + * persistence. In such cases, we must avoid committing the WAL position prematurely. + */ + final ReplicationResult result = getReplicationResult(); + updateLogPosition(entryBatch, result); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -232,6 +246,13 @@ private void shipEdits(WALEntryBatch entryBatch) { } } + private ReplicationResult getReplicationResult() { + EmptyEntriesPolicy policy = source.getReplicationEndpoint().getEmptyEntriesPolicy(); + return (policy == EmptyEntriesPolicy.COMMIT) + ? ReplicationResult.COMMITTED + : ReplicationResult.SUBMITTED; + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) { @@ -256,7 +277,11 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException { } } - private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { + @RestrictedApi( + explanation = "Package-private for test visibility only. Do not use outside tests.", + link = "", + allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)") + boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { boolean updated = false; // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file // record on zk, so let's call it. The last wal position maybe zero if end of file is true and diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 37af52eb93b9..25eef51ff681 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -53,11 +53,13 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -492,6 +494,67 @@ public synchronized UUID getPeerUUID() { } + /** + * Custom ReplicationEndpoint that simulates an asynchronous target like S3 or cloud storage. In + * this case, empty entry batches should not cause WAL position to be committed immediately. + */ + public static class AsyncReplicationEndpoint extends DoNothingReplicationEndpoint { + @Override + public EmptyEntriesPolicy getEmptyEntriesPolicy() { + return EmptyEntriesPolicy.SUBMIT; + } + } + + /** + * Default synchronous ReplicationEndpoint that treats empty entry batches as a signal to commit + * WAL position, assuming all entries pushed before were safely replicated. + */ + public static class SyncReplicationEndpoint extends DoNothingReplicationEndpoint { + // Inherits default COMMIT behavior + } + + /** + * Verifies that ReplicationSourceShipper commits the WAL position when using a synchronous + * endpoint and the entry batch is empty. + */ + @Test + public void testEmptyBatchCommitsPositionForCommitEndpoint() { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + Mockito.when(source.getReplicationEndpoint()).thenReturn(new SyncReplicationEndpoint()); + + ReplicationSourceShipper shipper = + Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source, null)); + + WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal")); + + shipper.shipEdits(emptyBatch); + + // With default (COMMIT) policy, empty entry batch should advance WAL position + Mockito.verify(shipper).updateLogPosition(emptyBatch, ReplicationResult.COMMITTED); + } + + /** + * Verifies that ReplicationSourceShipper does NOT commit the WAL position when using an + * asynchronous endpoint and the entry batch is empty. + */ + @Test + public void testEmptyBatchSubmitsPositionForSubmitEndpoint() { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + ReplicationSource source = Mockito.mock(ReplicationSource.class); + Mockito.when(source.getReplicationEndpoint()).thenReturn(new AsyncReplicationEndpoint()); + + ReplicationSourceShipper shipper = + Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source, null)); + + WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal")); + + shipper.shipEdits(emptyBatch); + + // With SUBMIT policy, empty entry batch should NOT advance WAL position + Mockito.verify(shipper).updateLogPosition(emptyBatch, ReplicationResult.SUBMITTED); + } + private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, String endpointName) throws IOException { conf.setInt("replication.source.maxretriesmultiplier", 1);