Skip to content

Commit a6ded68

Browse files
committed
HBASE-28996: Implement Custom ReplicationEndpoint to Enable WAL Backup to External Storage
1 parent c556dde commit a6ded68

26 files changed

+1371
-60
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.backup.replication;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and
30+
* bulk-loaded files within the specified backup root directory.
31+
*/
32+
@InterfaceAudience.Private
33+
public class BackupFileSystemManager {
34+
private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class);
35+
36+
public static final String WALS_DIR = "WALs";
37+
public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
38+
private final String peerId;
39+
private final FileSystem backupFs;
40+
private final Path backupRootDir;
41+
private final Path walsDir;
42+
private final Path bulkLoadFilesDir;
43+
44+
public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr)
45+
throws IOException {
46+
this.peerId = peerId;
47+
this.backupRootDir = new Path(backupRootDirStr);
48+
this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
49+
this.walsDir = createDirectory(WALS_DIR);
50+
this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
51+
}
52+
53+
private Path createDirectory(String dirName) throws IOException {
54+
Path dirPath = new Path(backupRootDir, dirName);
55+
backupFs.mkdirs(dirPath);
56+
LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
57+
return dirPath;
58+
}
59+
60+
public Path getWalsDir() {
61+
return walsDir;
62+
}
63+
64+
public Path getBulkLoadFilesDir() {
65+
return bulkLoadFilesDir;
66+
}
67+
68+
public FileSystem getBackupFs() {
69+
return backupFs;
70+
}
71+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.backup.replication;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.hbase.Cell;
25+
import org.apache.hadoop.hbase.CellUtil;
26+
import org.apache.hadoop.hbase.TableName;
27+
import org.apache.hadoop.hbase.wal.WAL;
28+
import org.apache.hadoop.hbase.wal.WALEdit;
29+
import org.apache.yetus.audience.InterfaceAudience;
30+
31+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
32+
33+
/**
34+
* Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication.
35+
* <p>
36+
* This utility class extracts and constructs the file paths of bulk-loaded files based on WAL
37+
* entries. It processes bulk load descriptors and their associated store descriptors to generate
38+
* the paths for each bulk-loaded file.
39+
* <p>
40+
* The class is designed for scenarios where replicable bulk load operations need to be parsed and
41+
* their file paths need to be determined programmatically.
42+
* </p>
43+
*/
44+
@InterfaceAudience.Private
45+
public final class BulkLoadProcessor {
46+
private BulkLoadProcessor() {
47+
}
48+
49+
public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries) throws IOException {
50+
List<Path> bulkLoadFilePaths = new ArrayList<>();
51+
52+
for (WAL.Entry entry : walEntries) {
53+
WALEdit edit = entry.getEdit();
54+
for (Cell cell : edit.getCells()) {
55+
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
56+
TableName tableName = entry.getKey().getTableName();
57+
String namespace = tableName.getNamespaceAsString();
58+
String table = tableName.getQualifierAsString();
59+
bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table));
60+
}
61+
}
62+
}
63+
return bulkLoadFilePaths;
64+
}
65+
66+
private static List<Path> processBulkLoadDescriptor(Cell cell, String namespace, String table)
67+
throws IOException {
68+
List<Path> bulkLoadFilePaths = new ArrayList<>();
69+
WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
70+
71+
if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == null) {
72+
return bulkLoadFilePaths; // Skip if not replicable
73+
}
74+
75+
String regionName = bld.getEncodedRegionName().toStringUtf8();
76+
for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) {
77+
bulkLoadFilePaths
78+
.addAll(processStoreDescriptor(storeDescriptor, namespace, table, regionName));
79+
}
80+
81+
return bulkLoadFilePaths;
82+
}
83+
84+
private static List<Path> processStoreDescriptor(WALProtos.StoreDescriptor storeDescriptor,
85+
String namespace, String table, String regionName) {
86+
List<Path> paths = new ArrayList<>();
87+
String columnFamily = storeDescriptor.getFamilyName().toStringUtf8();
88+
89+
for (String storeFile : storeDescriptor.getStoreFileList()) {
90+
paths.add(new Path(namespace,
91+
new Path(table, new Path(regionName, new Path(columnFamily, storeFile)))));
92+
}
93+
94+
return paths;
95+
}
96+
}

0 commit comments

Comments
 (0)