Skip to content

Commit 94f5ead

Browse files
author
Alexandros Papadakis
committed
Fixes problems with large snapshots
1 parent f0942d0 commit 94f5ead

File tree

4 files changed

+42
-11
lines changed

4 files changed

+42
-11
lines changed

ratis-proto/src/main/proto/Grpc.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ service RaftServerProtocolService {
4444
returns(stream ratis.common.AppendEntriesReplyProto) {}
4545

4646
rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
47-
returns(ratis.common.InstallSnapshotReplyProto) {}
47+
returns(stream ratis.common.InstallSnapshotReplyProto) {}
4848
}
4949

5050
service AdminProtocolService {

ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ private synchronized void waitForCommit() throws InterruptedException {
206206
}
207207

208208
private void reload() throws IOException {
209-
Preconditions.assertTrue(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED);
209+
LifeCycle.State current = stateMachine.getLifeCycleState();
210+
211+
Preconditions.assertTrue(current == LifeCycle.State.NEW || current == LifeCycle.State.PAUSED);
210212

211213
stateMachine.reinitialize();
212214

ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.File;
2626
import java.io.FileInputStream;
2727
import java.io.IOException;
28+
import java.io.InputStream;
2829
import java.nio.file.Path;
2930
import java.util.Optional;
3031

@@ -65,7 +66,7 @@ public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOE
6566
public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
6667
final long remaining = info.getFileSize() - offset;
6768
final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize;
68-
final ByteString data = ByteString.readFrom(in, chunkLength);
69+
final ByteString data = readChunk(in, chunkLength);
6970

7071
final FileChunkProto proto = FileChunkProto.newBuilder()
7172
.setFilename(relativePath.toString())
@@ -80,6 +81,32 @@ public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException {
8081
return proto;
8182
}
8283

84+
/**
85+
* Blocks until a chunk of the given size can be made from the stream, or EOF is reached. Calls
86+
* read() repeatedly in case the given stream implementation doesn't completely fill the given
87+
* buffer in one read() call.
88+
*
89+
* @return A chunk of the desired size, or else a chunk as large as was available when end of
90+
* stream was reached. Returns null if the given stream had no more data in it.
91+
*/
92+
private static ByteString readChunk(InputStream in, final int chunkSize) throws IOException {
93+
final byte[] buf = new byte[chunkSize];
94+
int bytesRead = 0;
95+
while (bytesRead < chunkSize) {
96+
final int count = in.read(buf, bytesRead, chunkSize - bytesRead);
97+
if (count == -1) {
98+
break;
99+
}
100+
bytesRead += count;
101+
}
102+
103+
if (bytesRead == 0) {
104+
return null;
105+
}
106+
107+
// Always make a copy since InputStream could steal a reference to buf.
108+
return ByteString.copyFrom(buf, 0, bytesRead);
109+
}
83110
@Override
84111
public void close() throws IOException {
85112
in.close();

ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.FileOutputStream;
2222
import java.io.IOException;
2323
import java.nio.channels.FileChannel;
24-
import java.util.UUID;
2524

2625
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2726
import org.apache.ratis.io.CorruptedFileException;
@@ -63,22 +62,25 @@ public void installSnapshot(StateMachine stateMachine,
6362
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
6463
final RaftStorageDirectory dir = storage.getStorageDir();
6564

66-
// create a unique temporary directory
67-
final File tmpDir = new File(dir.getTmpDir(), UUID.randomUUID().toString());
65+
// create a unique temporary directory based on the request id
66+
final File tmpDir = new File(dir.getTmpDir(), snapshotChunkRequest.getRequestId());
6867
FileUtils.createDirectories(tmpDir);
6968
tmpDir.deleteOnExit();
7069

71-
LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir);
70+
LOG.info("Installing snapshot:{}, to tmp dir:{}", snapshotChunkRequest.getRequestId(), tmpDir);
7271

7372
// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
7473
// and are not lost when whole request cycle is done. Check requestId and requestIndex here
7574

7675
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
76+
LOG.info("Installing chunk :{} with offset{}, to tmp dir:{} for file {}",
77+
chunk.getChunkIndex(), chunk.getOffset(), tmpDir, chunk.getFilename());
7778
SnapshotInfo pi = stateMachine.getLatestSnapshot();
7879
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
7980
throw new IOException("There exists snapshot file "
8081
+ pi.getFiles() + " in " + selfId
81-
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
82+
+ " with endIndex (" + pi.getTermIndex().getIndex()
83+
+ ") >= lastIncludedIndex (" + lastIncludedIndex + ")");
8284
}
8385

8486
String fileName = chunk.getFilename(); // this is relative to the root dir
@@ -132,10 +134,10 @@ public void installSnapshot(StateMachine stateMachine,
132134
}
133135

134136
if (snapshotChunkRequest.getDone()) {
135-
LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
137+
LOG.info("Install snapshot is done, moving files from dir:{} to:{}",
136138
tmpDir, dir.getStateMachineDir());
137-
dir.getStateMachineDir().delete();
138-
tmpDir.renameTo(dir.getStateMachineDir());
139+
FileUtils.moveDirectory(tmpDir.toPath(), dir.getStateMachineDir().toPath());
140+
FileUtils.deleteFully(tmpDir);
139141
}
140142
}
141143
}

0 commit comments

Comments
 (0)