Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-3155. Improved ozone client flush implementation to make it faster. #716

Merged
merged 6 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ public class BlockOutputStream extends OutputStream {
private final int bytesPerChecksum;
private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
private final int streamBufferSize;
private final long streamBufferFlushSize;
private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
Expand Down Expand Up @@ -131,10 +133,10 @@ public class BlockOutputStream extends OutputStream {
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
long streamBufferFlushSize, long streamBufferMaxSize,
int streamBufferSize, long streamBufferFlushSize,
boolean streamBufferFlushDelay, long streamBufferMaxSize,
BufferPool bufferPool, ChecksumType checksumType,
int bytesPerChecksum)
throws IOException {
int bytesPerChecksum) throws IOException {
this.blockID = new AtomicReference<>(blockID);
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
Expand All @@ -143,8 +145,10 @@ public BlockOutputStream(BlockID blockID,
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.streamBufferSize = streamBufferSize;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.streamBufferFlushDelay = streamBufferFlushDelay;
this.bufferPool = bufferPool;
this.bytesPerChecksum = bytesPerChecksum;

Expand Down Expand Up @@ -434,7 +438,9 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
@Override
public void flush() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0) {
&& bufferPool != null && bufferPool.getSize() > 0
&& (!streamBufferFlushDelay ||
writtenDataLength - totalDataFlushedLength >= streamBufferSize)) {
try {
handleFlush(false);
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -447,7 +453,6 @@ public void flush() throws IOException {
}
}


private void writeChunk(ChunkBuffer buffer)
throws IOException {
// This data in the buffer will be pushed to datanode and a reference will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ public final class OzoneConfigKeys {
public static final TimeDuration OZONE_CLIENT_RETRY_INTERVAL_DEFAULT =
TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);

/**
* If this value is true, when the client calls the flush() method,
* we will checks whether the data in the buffer is greater than
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change "we will checks" to "it checks"

* OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT. If greater than,
* send the data in the buffer to the datanode.
* */
public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY =
"ozone.client.stream.buffer.flush.delay";
public static final boolean OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
false;

// This defines the overall connection limit for the connection pool used in
// RestClient.
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
Expand Down
9 changes: 9 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,15 @@
<description>Connection timeout for Ozone client in milliseconds.
</description>
</property>
<property>
<name>ozone.client.stream.buffer.flush.delay</name>
<value>false</value>
<tag>OZONE, CLIENT</tag>
<description>If set true, when call flush(),we'll determine whether the
data in the current buffer is greater than ozone.client.stream.buffer.size.
if greater than it we'll sent buffer to the datanode.
</description>
</property>
<property>
<name>ozone.client.stream.buffer.size</name>
<value>4MB</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
private long currentPosition;
private Token<OzoneBlockTokenIdentifier> token;

private final int streamBufferSize;
private final long streamBufferFlushSize;
private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final long watchTimeout;
private BufferPool bufferPool;
Expand All @@ -64,7 +66,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
Pipeline pipeline, String requestId, int chunkSize,
long length, long streamBufferFlushSize, long streamBufferMaxSize,
long length, int streamBufferSize, long streamBufferFlushSize,
boolean streamBufferFlushDelay, long streamBufferMaxSize,
long watchTimeout, BufferPool bufferPool,
ChecksumType checksumType, int bytesPerChecksum,
Token<OzoneBlockTokenIdentifier> token) {
Expand All @@ -77,7 +80,9 @@ private BlockOutputStreamEntry(BlockID blockID, String key,
this.token = token;
this.length = length;
this.currentPosition = 0;
this.streamBufferSize = streamBufferSize;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferFlushDelay = streamBufferFlushDelay;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.bufferPool = bufferPool;
Expand Down Expand Up @@ -110,9 +115,9 @@ private void checkStream() throws IOException {
}
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
pipeline, streamBufferFlushSize,
streamBufferMaxSize, bufferPool, checksumType,
bytesPerChecksum);
pipeline, streamBufferSize, streamBufferFlushSize,
streamBufferFlushDelay, streamBufferMaxSize, bufferPool,
checksumType, bytesPerChecksum);
}
}

Expand Down Expand Up @@ -215,7 +220,9 @@ public static class Builder {
private String requestId;
private int chunkSize;
private long length;
private int streamBufferSize;
private long streamBufferFlushSize;
private boolean streamBufferFlushDelay;
private long streamBufferMaxSize;
private long watchTimeout;
private BufferPool bufferPool;
Expand Down Expand Up @@ -269,11 +276,21 @@ public Builder setLength(long len) {
return this;
}

public Builder setStreamBufferSize(int bufferSize) {
this.streamBufferSize = bufferSize;
return this;
}

public Builder setStreamBufferFlushSize(long bufferFlushSize) {
this.streamBufferFlushSize = bufferFlushSize;
return this;
}

public Builder setStreamBufferFlushDelay(boolean bufferFlushDelay) {
this.streamBufferFlushDelay = bufferFlushDelay;
return this;
}

public Builder setStreamBufferMaxSize(long bufferMaxSize) {
this.streamBufferMaxSize = bufferMaxSize;
return this;
Expand All @@ -297,7 +314,8 @@ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, pipeline, requestId, chunkSize,
length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
length, streamBufferSize, streamBufferFlushSize,
streamBufferFlushDelay, streamBufferMaxSize, watchTimeout,
bufferPool, checksumType, bytesPerChecksum, token);
}
}
Expand Down Expand Up @@ -331,10 +349,18 @@ public long getCurrentPosition() {
return currentPosition;
}

public int getStreamBufferSize() {
return streamBufferSize;
}

public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}

public boolean getStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}

public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class BlockOutputStreamEntryPool {
private final String requestID;
private final int streamBufferSize;
private final long streamBufferFlushSize;
private final boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final long watchTimeout;
private final long blockSize;
Expand All @@ -75,7 +76,8 @@ public class BlockOutputStreamEntryPool {
public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
HddsProtos.ReplicationType type,
int bufferSize, long bufferFlushSize, long bufferMaxSize,
int bufferSize, long bufferFlushSize,
boolean bufferFlushDelay, long bufferMaxSize,
long size, long watchTimeout, ContainerProtos.ChecksumType checksumType,
int bytesPerChecksum, String uploadID, int partNumber,
boolean isMultipart, OmKeyInfo info,
Expand All @@ -93,6 +95,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
this.requestID = requestId;
this.streamBufferSize = bufferSize;
this.streamBufferFlushSize = bufferFlushSize;
this.streamBufferFlushDelay = bufferFlushDelay;
this.streamBufferMaxSize = bufferMaxSize;
this.blockSize = size;
this.watchTimeout = watchTimeout;
Expand Down Expand Up @@ -137,6 +140,7 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
requestID = null;
streamBufferSize = 0;
streamBufferFlushSize = 0;
streamBufferFlushDelay = false;
streamBufferMaxSize = 0;
bufferPool = new BufferPool(chunkSize, 1);
watchTimeout = 0;
Expand Down Expand Up @@ -188,7 +192,9 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
.setRequestId(requestID)
.setChunkSize(chunkSize)
.setLength(subKeyInfo.getLength())
.setStreamBufferSize(streamBufferSize)
.setStreamBufferFlushSize(streamBufferFlushSize)
.setStreamBufferFlushDelay(streamBufferFlushDelay)
.setStreamBufferMaxSize(streamBufferMaxSize)
.setWatchTimeout(watchTimeout)
.setbufferPool(bufferPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,16 @@ public KeyOutputStream(OpenKeySession handler,
XceiverClientManager xceiverClientManager,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
int bufferSize, long bufferFlushSize, long bufferMaxSize,
long size, long watchTimeout,
int bufferSize, long bufferFlushSize, boolean isBufferFlushDelay,
long bufferMaxSize, long size, long watchTimeout,
ChecksumType checksumType, int bytesPerChecksum,
String uploadID, int partNumber, boolean isMultipart,
int maxRetryCount, long retryInterval) {
OmKeyInfo info = handler.getKeyInfo();
blockOutputStreamEntryPool =
new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
type, bufferSize, bufferFlushSize, bufferMaxSize, size,
type, bufferSize, bufferFlushSize, isBufferFlushDelay,
bufferMaxSize, size,
watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber,
isMultipart, info, xceiverClientManager, handler.getId());
// Retrieve the file encryption key info, null if file is not in
Expand Down Expand Up @@ -542,6 +543,7 @@ public static class Builder {
private ReplicationFactor factor;
private int streamBufferSize;
private long streamBufferFlushSize;
private boolean streamBufferFlushDelay;
private long streamBufferMaxSize;
private long blockSize;
private long watchTimeout;
Expand Down Expand Up @@ -608,6 +610,11 @@ public Builder setStreamBufferFlushSize(long size) {
return this;
}

public Builder setStreamBufferFlushDelay(boolean isDelay) {
this.streamBufferFlushDelay = isDelay;
return this;
}

public Builder setStreamBufferMaxSize(long size) {
this.streamBufferMaxSize = size;
return this;
Expand Down Expand Up @@ -646,7 +653,8 @@ public Builder setRetryInterval(long retryIntervalInMS) {
public KeyOutputStream build() {
return new KeyOutputStream(openHandler, xceiverManager, omClient,
chunkSize, requestID, factor, type,
streamBufferSize, streamBufferFlushSize, streamBufferMaxSize,
streamBufferSize, streamBufferFlushSize, streamBufferFlushDelay,
streamBufferMaxSize,
blockSize, watchTimeout, checksumType,
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
maxRetryCount, retryInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class RpcClient implements ClientProtocol {
private final ACLType groupRights;
private final int streamBufferSize;
private final long streamBufferFlushSize;
private boolean streamBufferFlushDelay;
private final long streamBufferMaxSize;
private final long blockSize;
private final ClientId clientId = ClientId.randomId();
Expand Down Expand Up @@ -185,6 +186,9 @@ public RpcClient(Configuration conf, String omServiceId) throws IOException {
.getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT,
StorageUnit.BYTES);
streamBufferFlushDelay = conf.getBoolean(
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY,
OzoneConfigKeys.OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
streamBufferMaxSize = (long) conf
.getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT,
Expand Down Expand Up @@ -1192,6 +1196,7 @@ private OzoneOutputStream createOutputStream(OpenKeySession openKey,
.setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
.setStreamBufferSize(streamBufferSize)
.setStreamBufferFlushSize(streamBufferFlushSize)
.setStreamBufferFlushDelay(streamBufferFlushDelay)
.setStreamBufferMaxSize(streamBufferMaxSize)
.setBlockSize(blockSize)
.setChecksumType(checksumType)
Expand Down