Skip to content

Commit f90b9a0

Browse files
author
Alexandros Papadakis
committed
Fixes problems with large snapshots
1 parent d823e8f commit f90b9a0

File tree

3 files changed

+40
-11
lines changed

3 files changed

+40
-11
lines changed

ratis-proto/src/main/proto/Grpc.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ service RaftServerProtocolService {
4444
returns(stream ratis.common.AppendEntriesReplyProto) {}
4545

4646
rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
47-
returns(ratis.common.InstallSnapshotReplyProto) {}
47+
returns(stream ratis.common.InstallSnapshotReplyProto) {}
4848
}
4949

5050
service AdminProtocolService {

ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.File;
2626
import java.io.FileInputStream;
2727
import java.io.IOException;
28+
import java.io.InputStream;
2829
import java.nio.file.Path;
2930
import java.util.Optional;
3031

@@ -65,7 +66,7 @@ public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOE
6566
public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
6667
final long remaining = info.getFileSize() - offset;
6768
final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize;
68-
final ByteString data = ByteString.readFrom(in, chunkLength);
69+
final ByteString data = readChunk(in, chunkLength);
6970

7071
final FileChunkProto proto = FileChunkProto.newBuilder()
7172
.setFilename(relativePath.toString())
@@ -80,6 +81,32 @@ public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
8081
return proto;
8182
}
8283

84+
/**
85+
* Blocks until a chunk of the given size can be made from the stream, or EOF is reached. Calls
86+
* read() repeatedly in case the given stream implementation doesn't completely fill the given
87+
* buffer in one read() call.
88+
*
89+
* @return A chunk of the desired size, or else a chunk as large as was available when end of
90+
* stream was reached. Returns null if the given stream had no more data in it.
91+
*/
92+
private static ByteString readChunk(InputStream in, final int chunkSize) throws IOException {
93+
final byte[] buf = new byte[chunkSize];
94+
int bytesRead = 0;
95+
while (bytesRead < chunkSize) {
96+
final int count = in.read(buf, bytesRead, chunkSize - bytesRead);
97+
if (count == -1) {
98+
break;
99+
}
100+
bytesRead += count;
101+
}
102+
103+
if (bytesRead == 0) {
104+
return null;
105+
}
106+
107+
// Always make a copy since InputStream could steal a reference to buf.
108+
return ByteString.copyFrom(buf, 0, bytesRead);
109+
}
83110
@Override
84111
public void close() throws IOException {
85112
in.close();

ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.FileOutputStream;
2222
import java.io.IOException;
2323
import java.nio.channels.FileChannel;
24-
import java.util.UUID;
2524

2625
import org.apache.ratis.io.CorruptedFileException;
2726
import org.apache.ratis.io.MD5Hash;
@@ -61,22 +60,25 @@ public void installSnapshot(StateMachine stateMachine,
6160
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
6261
final RaftStorageDirectory dir = storage.getStorageDir();
6362

64-
// create a unique temporary directory
65-
final File tmpDir = new File(dir.getTmpDir(), UUID.randomUUID().toString());
63+
// create a unique temporary directory based on the request id
64+
final File tmpDir = new File(dir.getTmpDir(), snapshotChunkRequest.getRequestId());
6665
FileUtils.createDirectories(tmpDir);
6766
tmpDir.deleteOnExit();
6867

69-
LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
68+
LOG.info("Installing snapshot:{}, to tmp dir:{}", snapshotChunkRequest.getRequestId(), tmpDir);
7069

7170
// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
7271
// and are not lost when whole request cycle is done. Check requestId and requestIndex here
7372

7473
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
74+
LOG.info("Installing chunk :{} with offset{}, to tmp dir:{} for file {}",
75+
chunk.getChunkIndex(), chunk.getOffset(), tmpDir, chunk.getFilename());
7576
SnapshotInfo pi = stateMachine.getLatestSnapshot();
7677
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
7778
throw new IOException("There exists snapshot file "
7879
+ pi.getFiles() + " in " + selfId
79-
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
80+
+ " with endIndex (" + pi.getTermIndex().getIndex()
81+
+ ") >= lastIncludedIndex (" + lastIncludedIndex + ")");
8082
}
8183

8284
String fileName = chunk.getFilename(); // this is relative to the root dir
@@ -130,10 +132,10 @@ public void installSnapshot(StateMachine stateMachine,
130132
}
131133

132134
if (snapshotChunkRequest.getDone()) {
133-
LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
134-
tmpDir, dir.getStateMachineDir());
135-
dir.getStateMachineDir().delete();
136-
tmpDir.renameTo(dir.getStateMachineDir());
135+
LOG.info("Install snapshot is done, moving files from dir:{} to:{}",
136+
tmpDir, dir.getStateMachineDir());
137+
FileUtils.moveDirectory(tmpDir.toPath(), dir.getStateMachineDir().toPath());
138+
FileUtils.deleteFully(tmpDir);
137139
}
138140
}
139141
}

0 commit comments

Comments
 (0)