Skip to content

Commit 48b5c3c

Browse files
committed
working replication endpoint with unit tests
1 parent 4e46e0e commit 48b5c3c

File tree

3 files changed

+565
-83
lines changed

3 files changed

+565
-83
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,19 @@
4242
import org.apache.hadoop.hbase.replication.ReplicationResult;
4343
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
4444
import org.apache.hadoop.hbase.util.CommonFSUtils;
45+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4546
import org.apache.hadoop.hbase.wal.FSHLogProvider;
4647
import org.apache.hadoop.hbase.wal.WAL;
4748
import org.apache.yetus.audience.InterfaceAudience;
4849
import org.slf4j.Logger;
4950
import org.slf4j.LoggerFactory;
5051

52+
/**
53+
* ContinuousBackupReplicationEndpoint is responsible for replicating WAL entries to a backup
54+
* storage. It organizes WAL entries by day and periodically flushes the data, ensuring that WAL
55+
* files do not exceed the configured size. The class includes mechanisms for handling the WAL
56+
* files, performing bulk load backups, and ensuring that the replication process is safe.
57+
*/
5158
@InterfaceAudience.Private
5259
public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint {
5360
private static final Logger LOG =
@@ -75,9 +82,9 @@ public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint
7582
private String peerId;
7683
private ScheduledExecutorService flushExecutor;
7784

78-
private static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1);
85+
public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1);
7986
public static final String WAL_FILE_PREFIX = "wal_file.";
80-
private static final String DATE_FORMAT = "yyyy-MM-dd";
87+
public static final String DATE_FORMAT = "yyyy-MM-dd";
8188

8289
@Override
8390
public void init(Context context) throws IOException {
@@ -139,6 +146,8 @@ private void flushAndBackupSafely() {
139146
LOG.info("{} Periodic WAL flush triggered", Utils.logPeerId(peerId));
140147
flushWriters();
141148
replicationSource.persistOffsets();
149+
LOG.info("{} Periodic WAL flush and offset persistence completed successfully",
150+
Utils.logPeerId(peerId));
142151
} catch (IOException e) {
143152
LOG.error("{} Error during WAL flush: {}", Utils.logPeerId(peerId), e.getMessage(), e);
144153
} finally {
@@ -150,7 +159,13 @@ private void flushWriters() throws IOException {
150159
for (Map.Entry<Long, FSHLogProvider.Writer> entry : walWriters.entrySet()) {
151160
FSHLogProvider.Writer writer = entry.getValue();
152161
if (writer != null) {
153-
writer.close();
162+
try {
163+
writer.close();
164+
} catch (IOException e) {
165+
LOG.error("{} Failed to close WAL writer for day: {}. Error: {}", Utils.logPeerId(peerId),
166+
entry.getKey(), e.getMessage());
167+
throw e;
168+
}
154169
}
155170
}
156171
walWriters.clear();
@@ -250,14 +265,17 @@ private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
250265
fs.mkdirs(dayDir);
251266

252267
// Generate a unique WAL file name
253-
String walFileName = WAL_FILE_PREFIX + dayInMillis + "." + UUID.randomUUID();
268+
long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
269+
String walFileName = WAL_FILE_PREFIX + currentTime + "." + UUID.randomUUID();
254270
Path walFilePath = new Path(dayDir, walFileName);
255271

256272
// Initialize the WAL writer
257273
FSHLogProvider.Writer writer =
258274
ObjectStoreProtobufWalWriter.class.getDeclaredConstructor().newInstance();
259275
writer.init(fs, walFilePath, conf, true, WALUtil.getWALBlockSize(conf, fs, walFilePath),
260276
StreamSlowMonitor.create(conf, walFileName));
277+
278+
LOG.debug("{} Created WAL writer for day: {}", Utils.logPeerId(peerId), dayDirectoryName);
261279
return writer;
262280
} catch (Exception e) {
263281
throw new UncheckedIOException(
@@ -282,12 +300,15 @@ protected void doStop() {
282300

283301
private void close() {
284302
shutdownFlushExecutor();
303+
lock.lock();
285304
try {
286305
flushWriters();
287306
replicationSource.persistOffsets();
288307
} catch (IOException e) {
289308
LOG.error("{} Failed to Flush Open Wal Writers: {}", Utils.logPeerId(peerId), e.getMessage(),
290309
e);
310+
} finally {
311+
lock.unlock();
291312
}
292313
}
293314

@@ -302,13 +323,14 @@ private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
302323
LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file,
303324
destPath);
304325
} catch (IOException e) {
305-
LOG.error("{} Failed to back up bulk load file: {}", Utils.logPeerId(peerId), file, e);
326+
LOG.error("{} Failed to back up bulk load file {}: {}", Utils.logPeerId(peerId), file,
327+
e.getMessage(), e);
306328
throw e;
307329
}
308330
}
309331
}
310332

311-
public Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
333+
private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
312334
FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
313335
Path rootDir = CommonFSUtils.getRootDir(conf);
314336
Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
@@ -344,6 +366,8 @@ private void shutdownFlushExecutor() {
344366
!flushExecutor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
345367
) {
346368
flushExecutor.shutdownNow();
369+
LOG.warn("{} Flush executor did not terminate within timeout, forcing shutdown.",
370+
Utils.logPeerId(peerId));
347371
}
348372
} catch (InterruptedException e) {
349373
Thread.currentThread().interrupt();

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ObjectStoreProtobufWalWriter.java

Lines changed: 22 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -18,55 +18,33 @@
1818
package org.apache.hadoop.hbase.backup.replication;
1919

2020
import java.io.IOException;
21-
import java.io.OutputStream;
2221
import java.util.concurrent.atomic.AtomicLong;
2322
import org.apache.hadoop.fs.FSDataOutputStream;
24-
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
2523
import org.apache.hadoop.fs.FileSystem;
2624
import org.apache.hadoop.fs.Path;
27-
import org.apache.hadoop.hbase.Cell;
28-
import org.apache.hadoop.hbase.ExtendedCell;
2925
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
30-
import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufLogWriter;
26+
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
3127
import org.apache.hadoop.hbase.util.AtomicUtils;
32-
import org.apache.hadoop.hbase.wal.FSHLogProvider;
33-
import org.apache.hadoop.hbase.wal.WAL;
28+
import org.apache.hadoop.hbase.util.CommonFSUtils;
3429
import org.apache.yetus.audience.InterfaceAudience;
35-
import org.slf4j.Logger;
36-
import org.slf4j.LoggerFactory;
37-
38-
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
3930

31+
/**
32+
* A custom implementation of {@link ProtobufLogWriter} that provides support for writing
33+
* protobuf-based WAL (Write-Ahead Log) entries to object store-backed files.
34+
* <p>
35+
* This class overrides the {@link ProtobufLogWriter#sync(boolean)} and
36+
* {@link ProtobufLogWriter#initOutput(FileSystem, Path, boolean, int, short, long, StreamSlowMonitor, boolean)}
37+
* methods to ensure compatibility with object stores, while ignoring specific capability checks
38+
* such as HFLUSH and HSYNC. These checks are often not supported by some object stores, and
39+
* bypassing them ensures smooth operation in such environments.
40+
* </p>
41+
*/
4042
@InterfaceAudience.Private
41-
public class ObjectStoreProtobufWalWriter extends AbstractProtobufLogWriter
42-
implements FSHLogProvider.Writer {
43-
private static final Logger LOG = LoggerFactory.getLogger(ObjectStoreProtobufWalWriter.class);
43+
public class ObjectStoreProtobufWalWriter extends ProtobufLogWriter {
4444

4545
protected FSDataOutputStream output;
4646
private final AtomicLong syncedLength = new AtomicLong(0);
4747

48-
@Override
49-
public void append(WAL.Entry entry) throws IOException {
50-
entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
51-
.writeDelimitedTo(output);
52-
for (Cell cell : entry.getEdit().getCells()) {
53-
// cellEncoder must assume little about the stream, since we write PB and cells in turn.
54-
cellEncoder.write((ExtendedCell) cell);
55-
}
56-
length.set(output.getPos());
57-
}
58-
59-
@Override
60-
public void close() throws IOException {
61-
if (this.output != null) {
62-
if (!trailerWritten) {
63-
writeWALTrailer();
64-
}
65-
this.output.close();
66-
this.output = null;
67-
}
68-
}
69-
7048
@Override
7149
public void sync(boolean forceSync) throws IOException {
7250
FSDataOutputStream fsDataOutputstream = this.output;
@@ -78,49 +56,16 @@ public void sync(boolean forceSync) throws IOException {
7856
}
7957

8058
@Override
81-
public long getSyncedLength() {
82-
return this.syncedLength.get();
83-
}
84-
85-
@Override
86-
protected void initOutput(FileSystem fs, Path path, boolean overridable, int bufferSize,
59+
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
8760
short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
8861
throws IOException {
89-
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overridable)
90-
.bufferSize(bufferSize).replication(replication).blockSize(blockSize);
91-
this.output = builder.build();
92-
}
93-
94-
@Override
95-
protected void closeOutputIfNecessary() {
96-
if (this.output != null) {
97-
try {
98-
this.output.close();
99-
} catch (IOException e) {
100-
LOG.warn("Close output failed", e);
101-
}
62+
try {
63+
super.initOutput(fs, path, overwritable, bufferSize, replication, blockSize, monitor,
64+
noLocalWrite);
65+
} catch (CommonFSUtils.StreamLacksCapabilityException e) {
66+
// Ignore capability check for HFLUSH and HSYNC capabilities
67+
// Some object stores may not support these capabilities, so we bypass the exception handling
68+
// to ensure compatibility with such stores.
10269
}
10370
}
104-
105-
@Override
106-
protected long writeMagicAndWALHeader(byte[] magic, WALProtos.WALHeader header)
107-
throws IOException {
108-
output.write(magic);
109-
header.writeDelimitedTo(output);
110-
return output.getPos();
111-
}
112-
113-
@Override
114-
protected OutputStream getOutputStreamForCellEncoder() {
115-
return this.output;
116-
}
117-
118-
@Override
119-
protected long writeWALTrailerAndMagic(WALProtos.WALTrailer trailer, byte[] magic)
120-
throws IOException {
121-
trailer.writeTo(output);
122-
output.writeInt(trailer.getSerializedSize());
123-
output.write(magic);
124-
return output.getPos();
125-
}
12671
}

0 commit comments

Comments
 (0)