Skip to content

Commit de6b65d

Browse files
committed
Add keep same disk when loading snapshot
1 parent 7f86ba6 commit de6b65d

File tree

6 files changed

+199
-55
lines changed

6 files changed

+199
-55
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,8 @@ public class IoTDBConfig {
11851185

11861186
private boolean includeNullValueInWriteThroughputMetric = false;
11871187

1188+
private boolean keepSameDiskWhenLoadingSnapshot = false;
1189+
11881190
private ConcurrentHashMap<String, EncryptParameter> tsFileDBToEncryptMap =
11891191
new ConcurrentHashMap<>(
11901192
Collections.singletonMap("root.__audit", new EncryptParameter("UNENCRYPTED", null)));
@@ -4257,6 +4259,14 @@ public void setPasswordLockTimeMinutes(int passwordLockTimeMinutes) {
42574259
this.passwordLockTimeMinutes = passwordLockTimeMinutes;
42584260
}
42594261

4262+
public boolean isKeepSameDiskWhenLoadingSnapshot() {
4263+
return keepSameDiskWhenLoadingSnapshot;
4264+
}
4265+
4266+
public void setKeepSameDiskWhenLoadingSnapshot(boolean keepSameDiskWhenLoadingSnapshot) {
4267+
this.keepSameDiskWhenLoadingSnapshot = keepSameDiskWhenLoadingSnapshot;
4268+
}
4269+
42604270
public ConcurrentHashMap<String, EncryptParameter> getTSFileDBToEncryptMap() {
42614271
return tsFileDBToEncryptMap;
42624272
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,12 @@ private void loadIoTConsensusProps(TrimProperties properties) throws IOException
11731173
"region_migration_speed_limit_bytes_per_second",
11741174
ConfigurationFileUtils.getConfigurationDefaultValue(
11751175
"region_migration_speed_limit_bytes_per_second"))));
1176+
conf.setKeepSameDiskWhenLoadingSnapshot(
1177+
Boolean.parseBoolean(
1178+
properties.getProperty(
1179+
"keep_same_disk_when_loading_snapshot",
1180+
ConfigurationFileUtils.getConfigurationDefaultValue(
1181+
"keep_same_disk_when_loading_snapshot"))));
11761182
}
11771183

11781184
private void loadIoTConsensusV2Props(TrimProperties properties) throws IOException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -314,56 +314,69 @@ private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir)
314314
}
315315
}
316316

317+
private File createLinksFromSnapshotToSourceDir(
318+
String targetSuffix,
319+
File file,
320+
Map<String, String> fileTarget,
321+
String fileKey,
322+
String finalDir)
323+
throws IOException {
324+
File targetFile =
325+
new File(finalDir + File.separator + targetSuffix + File.separator + file.getName());
326+
327+
try {
328+
if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
329+
throw new IOException(
330+
String.format(
331+
"Cannot create directory %s", targetFile.getParentFile().getAbsolutePath()));
332+
}
333+
334+
try {
335+
Files.createLink(targetFile.toPath(), file.toPath());
336+
LOGGER.debug("Created hard link from {} to {}", file, targetFile);
337+
fileTarget.put(fileKey, finalDir);
338+
return targetFile;
339+
} catch (IOException e) {
340+
LOGGER.info("Cannot create link from {} to {}, fallback to copy", file, targetFile);
341+
}
342+
343+
Files.copy(file.toPath(), targetFile.toPath());
344+
fileTarget.put(fileKey, finalDir);
345+
return targetFile;
346+
} catch (Exception e) {
347+
LOGGER.warn(
348+
"Failed to process file {} in dir {}: {}", file.getName(), finalDir, e.getMessage(), e);
349+
throw e;
350+
}
351+
}
352+
317353
private void createLinksFromSnapshotToSourceDir(
318-
String targetSuffix, File[] files, FolderManager folderManager)
319-
throws DiskSpaceInsufficientException, IOException {
354+
String targetSuffix, File[] files, FolderManager folderManager) throws IOException {
320355
Map<String, String> fileTarget = new HashMap<>();
321356
for (File file : files) {
322357
String fileKey = file.getName().split("\\.")[0];
323358
String dataDir = fileTarget.get(fileKey);
324359

360+
if (dataDir != null) {
361+
createLinksFromSnapshotToSourceDir(targetSuffix, file, fileTarget, fileKey, dataDir);
362+
continue;
363+
}
364+
325365
try {
326-
folderManager.getNextWithRetry(
327-
currentDataDir -> {
328-
String effectiveDir = (dataDir != null) ? dataDir : currentDataDir;
329-
File targetFile =
330-
new File(
331-
effectiveDir
332-
+ File.separator
333-
+ targetSuffix
334-
+ File.separator
335-
+ file.getName());
336-
337-
try {
338-
if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
339-
throw new IOException(
340-
String.format(
341-
"Cannot create directory %s",
342-
targetFile.getParentFile().getAbsolutePath()));
343-
}
344-
345-
try {
346-
Files.createLink(targetFile.toPath(), file.toPath());
347-
LOGGER.debug("Created hard link from {} to {}", file, targetFile);
348-
return targetFile;
349-
} catch (IOException e) {
350-
LOGGER.info(
351-
"Cannot create link from {} to {}, fallback to copy", file, targetFile);
352-
}
353-
354-
Files.copy(file.toPath(), targetFile.toPath());
355-
fileTarget.put(fileKey, effectiveDir);
356-
return targetFile;
357-
} catch (Exception e) {
358-
LOGGER.warn(
359-
"Failed to process file {} in dir {}: {}",
360-
file.getName(),
361-
effectiveDir,
362-
e.getMessage(),
363-
e);
364-
throw e;
365-
}
366-
});
366+
String firstFolderOfSameDisk =
367+
IoTDBDescriptor.getInstance().getConfig().isKeepSameDiskWhenLoadingSnapshot()
368+
? folderManager.getFirstFolderOfSameDisk(file.getAbsolutePath())
369+
: null;
370+
371+
if (firstFolderOfSameDisk != null) {
372+
createLinksFromSnapshotToSourceDir(
373+
targetSuffix, file, fileTarget, fileKey, firstFolderOfSameDisk);
374+
} else {
375+
folderManager.getNextWithRetry(
376+
currentDataDir ->
377+
createLinksFromSnapshotToSourceDir(
378+
targetSuffix, file, fileTarget, fileKey, currentDataDir));
379+
}
367380
} catch (Exception e) {
368381
throw new IOException(
369382
String.format(

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,21 @@
3838
import org.junit.After;
3939
import org.junit.Assert;
4040
import org.junit.Before;
41+
import org.junit.Ignore;
4142
import org.junit.Test;
4243
import org.mockito.Mockito;
4344

4445
import java.io.File;
4546
import java.io.IOException;
47+
import java.nio.file.FileStore;
48+
import java.nio.file.Files;
49+
import java.nio.file.Path;
4650
import java.util.ArrayList;
4751
import java.util.List;
4852

53+
import static org.apache.iotdb.consensus.iot.IoTConsensusServerImpl.SNAPSHOT_DIR_NAME;
4954
import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
55+
import static org.junit.Assert.assertEquals;
5056

5157
public class IoTDBSnapshotTest {
5258
private String[][] testDataDirs =
@@ -65,11 +71,12 @@ public void tearDown() throws IOException, StorageEngineException {
6571
FileUtils.recursivelyDeleteFolder("target" + File.separator + "tmp");
6672
}
6773

68-
private List<TsFileResource> writeTsFiles() throws IOException, WriteProcessException {
74+
private List<TsFileResource> writeTsFiles(String[] dataDirs)
75+
throws IOException, WriteProcessException {
6976
List<TsFileResource> resources = new ArrayList<>();
7077
for (int i = 0; i < 100; i++) {
7178
String filePath =
72-
testDataDirs[0][i % 3]
79+
dataDirs[i % dataDirs.length]
7380
+ File.separator
7481
+ "sequence"
7582
+ File.separator
@@ -108,7 +115,7 @@ public void testCreateSnapshot()
108115
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
109116
TierManager.getInstance().resetFolders();
110117
try {
111-
List<TsFileResource> resources = writeTsFiles();
118+
List<TsFileResource> resources = writeTsFiles(testDataDirs[0]);
112119
DataRegion region = new DataRegion(testSgName, "0");
113120
region.getTsFileManager().addAll(resources, true);
114121
File snapshotDir = new File("target" + File.separator + "snapshot");
@@ -117,12 +124,12 @@ public void testCreateSnapshot()
117124
new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
118125
File[] files =
119126
snapshotDir.listFiles((dir, name) -> name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME));
120-
Assert.assertEquals(1, files.length);
127+
assertEquals(1, files.length);
121128
SnapshotLogAnalyzer analyzer = new SnapshotLogAnalyzer(files[0]);
122129
Assert.assertTrue(analyzer.isSnapshotComplete());
123130
int cnt = analyzer.getTotalFileCountInSnapshot();
124131
analyzer.close();
125-
Assert.assertEquals(200, cnt);
132+
assertEquals(200, cnt);
126133
for (TsFileResource resource : resources) {
127134
Assert.assertTrue(resource.tryWriteLock());
128135
}
@@ -142,7 +149,7 @@ public void testCreateSnapshotWithUnclosedTsFile()
142149
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
143150
TierManager.getInstance().resetFolders();
144151
try {
145-
List<TsFileResource> resources = writeTsFiles();
152+
List<TsFileResource> resources = writeTsFiles(testDataDirs[0]);
146153
resources.subList(50, 100).forEach(x -> x.setStatusForTest(TsFileResourceStatus.UNCLOSED));
147154
DataRegion region = new DataRegion(testSgName, "0");
148155
region.setAllowCompaction(false);
@@ -153,13 +160,13 @@ public void testCreateSnapshotWithUnclosedTsFile()
153160
new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
154161
File[] files =
155162
snapshotDir.listFiles((dir, name) -> name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME));
156-
Assert.assertEquals(1, files.length);
163+
assertEquals(1, files.length);
157164
SnapshotLogAnalyzer analyzer = new SnapshotLogAnalyzer(files[0]);
158165
int cnt = 0;
159166
Assert.assertTrue(analyzer.isSnapshotComplete());
160167
cnt = analyzer.getTotalFileCountInSnapshot();
161168
analyzer.close();
162-
Assert.assertEquals(100, cnt);
169+
assertEquals(100, cnt);
163170
for (TsFileResource resource : resources) {
164171
Assert.assertTrue(resource.tryWriteLock());
165172
}
@@ -179,7 +186,7 @@ public void testLoadSnapshot()
179186
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
180187
TierManager.getInstance().resetFolders();
181188
try {
182-
List<TsFileResource> resources = writeTsFiles();
189+
List<TsFileResource> resources = writeTsFiles(testDataDirs[0]);
183190
DataRegion region = new DataRegion(testSgName, "0");
184191
CompressionRatio.getInstance().updateRatio(100, 100, "0");
185192
region.getTsFileManager().addAll(resources, true);
@@ -195,8 +202,8 @@ public void testLoadSnapshot()
195202
.loadSnapshotForStateMachine();
196203
Assert.assertNotNull(dataRegion);
197204
List<TsFileResource> resource = dataRegion.getTsFileManager().getTsFileList(true);
198-
Assert.assertEquals(100, resource.size());
199-
Assert.assertEquals(
205+
assertEquals(100, resource.size());
206+
assertEquals(
200207
new Pair<>(100L, 100L),
201208
CompressionRatio.getInstance().getDataRegionRatioMap().get("0"));
202209
} finally {
@@ -208,6 +215,86 @@ public void testLoadSnapshot()
208215
}
209216
}
210217

218+
@Ignore("Need manual execution to specify different disks")
219+
@Test
220+
public void testLoadSnapshotNoHardLink()
221+
throws IOException, WriteProcessException, DirectoryNotLegalException {
222+
IoTDBDescriptor.getInstance().getConfig().setKeepSameDiskWhenLoadingSnapshot(true);
223+
// initialize dirs
224+
String[][] dataDirsForDB = new String[][] {{"C://snapshot_test", "D://snapshot_test"}};
225+
File snapshotDir = new File("D://snapshot_store//");
226+
if (snapshotDir.exists()) {
227+
FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath());
228+
}
229+
for (String[] dirs : dataDirsForDB) {
230+
for (String dir : dirs) {
231+
if (new File(dir).exists()) {
232+
FileUtils.recursivelyDeleteFolder(dir);
233+
}
234+
}
235+
}
236+
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(dataDirsForDB);
237+
TierManager.getInstance().resetFolders();
238+
239+
// prepare files, files should be written into two folders
240+
List<TsFileResource> resources = writeTsFiles(dataDirsForDB[0]);
241+
DataRegion region = new DataRegion(testSgName, "0");
242+
region.getTsFileManager().addAll(resources, true);
243+
244+
// take a snapshot into one disk
245+
Assert.assertTrue(snapshotDir.exists() || snapshotDir.mkdirs());
246+
try {
247+
Assert.assertTrue(
248+
new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true));
249+
File[] files =
250+
snapshotDir.listFiles((dir, name) -> name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME));
251+
// use loadWithoutLog
252+
if (files != null && files.length > 0) {
253+
files[0].delete();
254+
}
255+
// move files to snapshot store (simulate snapshot transfer)
256+
for (String dir : dataDirsForDB[0]) {
257+
File internalSnapshotDir = new File(dir, SNAPSHOT_DIR_NAME);
258+
if (internalSnapshotDir.exists()) {
259+
for (File file : FileUtils.listFilesRecursively(internalSnapshotDir, f -> true)) {
260+
if (file.isFile()) {
261+
String absolutePath = file.getAbsolutePath();
262+
int snapshotIdIndex = absolutePath.indexOf("snapshot_store");
263+
int suffixIndex = snapshotIdIndex + "snapshot_store".length();
264+
String suffix = absolutePath.substring(suffixIndex);
265+
File snapshotFile = new File(snapshotDir, suffix);
266+
FileUtils.copyFile(file, snapshotFile);
267+
}
268+
}
269+
}
270+
}
271+
272+
// load the snapshot
273+
DataRegion dataRegion =
274+
new SnapshotLoader(snapshotDir.getAbsolutePath(), testSgName, "0")
275+
.loadSnapshotForStateMachine();
276+
Assert.assertNotNull(dataRegion);
277+
resources = dataRegion.getTsFileManager().getTsFileList(true);
278+
assertEquals(100, resources.size());
279+
280+
// files should not be moved to another disk
281+
Path snapshotDirPath = snapshotDir.toPath();
282+
FileStore snapshotFileStore = Files.getFileStore(snapshotDirPath);
283+
for (TsFileResource tsFileResource : resources) {
284+
Path tsfilePath = tsFileResource.getTsFile().toPath();
285+
FileStore tsFileFileStore = Files.getFileStore(tsfilePath);
286+
assertEquals(snapshotFileStore, tsFileFileStore);
287+
}
288+
} finally {
289+
FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath());
290+
for (String[] dirs : dataDirsForDB) {
291+
for (String dir : dirs) {
292+
FileUtils.recursivelyDeleteFolder(dir);
293+
}
294+
}
295+
}
296+
}
297+
211298
@Test
212299
public void testGetSnapshotFile() throws IOException {
213300
File tsFile =
@@ -228,7 +315,7 @@ public void testGetSnapshotFile() throws IOException {
228315
Mockito.when(region.getDataRegionIdString()).thenReturn("0");
229316
File snapshotFile =
230317
new SnapshotTaker(region).getSnapshotFilePathForTsFile(tsFile, "test-snapshotId");
231-
Assert.assertEquals(
318+
assertEquals(
232319
new File(
233320
IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs()[0]
234321
+ File.separator

iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1607,6 +1607,12 @@ data_region_iot_max_memory_ratio_for_queue = 0.6
16071607
# Datatype: long
16081608
region_migration_speed_limit_bytes_per_second = 50331648
16091609

1610+
# When loading snapshot, try keeping TsFiles in the same disk as the snapshot dir.
1611+
# This may reduce file copies but may also result in a worse disk load-balance
1612+
# effectiveMode: hot_reload
1613+
# Datatype: boolean
1614+
keep_same_disk_when_loading_snapshot=false
1615+
16101616
####################
16111617
### Blob Allocator Configuration
16121618
####################

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35+
import java.io.IOException;
36+
import java.nio.file.FileStore;
37+
import java.nio.file.Files;
38+
import java.nio.file.Path;
39+
import java.nio.file.Paths;
3540
import java.util.HashMap;
3641
import java.util.List;
3742
import java.util.Map;
@@ -146,4 +151,21 @@ public <T, E extends Exception> T getNextWithRetry(ThrowingFunction<String, T, E
146151
public List<String> getFolders() {
147152
return folders;
148153
}
154+
155+
public String getFirstFolderOfSameDisk(String pathStr) {
156+
Path path = Paths.get(pathStr);
157+
try {
158+
FileStore fileStore = Files.getFileStore(path);
159+
for (String folder : folders) {
160+
Path folderPath = Paths.get(folder);
161+
FileStore folderFileStore = Files.getFileStore(folderPath);
162+
if (folderFileStore.equals(fileStore)) {
163+
return folder;
164+
}
165+
}
166+
} catch (IOException e) {
167+
logger.warn("Failed to read file store path '" + pathStr + "'", e);
168+
}
169+
return null;
170+
}
149171
}

0 commit comments

Comments
 (0)