Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@ private Config() {}

public static boolean rpcThriftCompressionEnable = false;

/** thrift init buffer size, 1KB by default */
public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
/** key of thrift default buffer size */
public static final String DEFAULT_BUFFER_CAPACITY = "thrift_default_buffer_capacity";

public static final String INITIAL_BUFFER_CAPACITY = "initial_buffer_capacity";

/** thrift max frame size (16384000 bytes by default), we change it to 64MB */
public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;

public static final String MAX_FRAME_SIZE = "max_frame_size";
/** key of thrift max frame size */
public static final String THRIFT_FRAME_MAX_SIZE = "thrift_max_frame_size";
}
4 changes: 2 additions & 2 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ public void setClient(TSIService.Iface client) {
}

private void openTransport() throws TTransportException {
RpcTransportFactory.setInitialBufferCapacity(params.getInitialBufferCapacity());
RpcTransportFactory.setMaxLength(params.getMaxFrameSize());
RpcTransportFactory.setDefaultBufferCapacity(params.getThriftDefaultBufferSize());
RpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
transport =
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(params.getHost(), params.getPort(), Config.DEFAULT_CONNECTION_TIMEOUT_MS));
Expand Down
22 changes: 12 additions & 10 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iotdb.jdbc;

import org.apache.iotdb.rpc.RpcUtils;

public class IoTDBConnectionParams {

private String host = Config.IOTDB_DEFAULT_HOST;
Expand All @@ -27,8 +29,8 @@ public class IoTDBConnectionParams {
private String username = Config.DEFAULT_USER;
private String password = Config.DEFAULT_PASSWORD;

private int initialBufferCapacity = Config.DEFAULT_INITIAL_BUFFER_CAPACITY;
private int maxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE;
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
private int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;

public IoTDBConnectionParams(String url) {
this.jdbcUriString = url;
Expand Down Expand Up @@ -82,19 +84,19 @@ public void setPassword(String password) {
this.password = password;
}

public int getInitialBufferCapacity() {
return initialBufferCapacity;
public int getThriftDefaultBufferSize() {
return thriftDefaultBufferSize;
}

public void setInitialBufferCapacity(int initialBufferCapacity) {
this.initialBufferCapacity = initialBufferCapacity;
public void setThriftDefaultBufferSize(int thriftDefaultBufferSize) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
}

public int getMaxFrameSize() {
return maxFrameSize;
public int getThriftMaxFrameSize() {
return thriftMaxFrameSize;
}

public void setMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
public void setThriftMaxFrameSize(int thriftMaxFrameSize) {
this.thriftMaxFrameSize = thriftMaxFrameSize;
}
}
11 changes: 6 additions & 5 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ static IoTDBConnectionParams parseUrl(String url, Properties info) throws IoTDBU
if (info.containsKey(Config.AUTH_PASSWORD)) {
params.setPassword(info.getProperty(Config.AUTH_PASSWORD));
}
if (info.containsKey(Config.INITIAL_BUFFER_CAPACITY)) {
params.setInitialBufferCapacity(
Integer.parseInt(info.getProperty(Config.INITIAL_BUFFER_CAPACITY)));
if (info.containsKey(Config.DEFAULT_BUFFER_CAPACITY)) {
params.setThriftDefaultBufferSize(
Integer.parseInt(info.getProperty(Config.DEFAULT_BUFFER_CAPACITY)));
}
if (info.containsKey(Config.MAX_FRAME_SIZE)) {
params.setMaxFrameSize(Integer.parseInt(info.getProperty(Config.MAX_FRAME_SIZE)));
if (info.containsKey(Config.THRIFT_FRAME_MAX_SIZE)) {
params.setThriftMaxFrameSize(
Integer.parseInt(info.getProperty(Config.THRIFT_FRAME_MAX_SIZE)));
}

return params;
Expand Down
15 changes: 7 additions & 8 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand Down Expand Up @@ -616,11 +617,9 @@ public class IoTDBConfig {
// time in nanosecond precision when starting up
private long startUpNanosecond = System.nanoTime();

/** thrift max frame size, the default is 15MB, we change it to 64MB */
private int thriftMaxFrameSize = 67108864;
private int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;

/** thrift init buffer size, the default is 1KB. */
private int thriftInitBufferSize = 1024;
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;

/** time interval in minute for calculating query frequency */
private int frequencyIntervalInMinute = 1;
Expand Down Expand Up @@ -1945,12 +1944,12 @@ public void setThriftMaxFrameSize(int thriftMaxFrameSize) {
this.thriftMaxFrameSize = thriftMaxFrameSize;
}

public int getThriftInitBufferSize() {
return thriftInitBufferSize;
public int getThriftDefaultBufferSize() {
return thriftDefaultBufferSize;
}

public void setThriftInitBufferSize(int thriftInitBufferSize) {
this.thriftInitBufferSize = thriftInitBufferSize;
public void setThriftDefaultBufferSize(int thriftDefaultBufferSize) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
}

public int getMaxQueryDeduplicatedPathNum() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,10 @@ private void loadProps() {
conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2);
}

conf.setThriftInitBufferSize(
conf.setThriftDefaultBufferSize(
Integer.parseInt(
properties.getProperty(
"thrift_init_buffer_size", String.valueOf(conf.getThriftInitBufferSize()))));
"thrift_init_buffer_size", String.valueOf(conf.getThriftDefaultBufferSize()))));

conf.setFrequencyIntervalInMinute(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ private void checkRecovery() throws IOException {

@Override
public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
RpcTransportFactory.setInitialBufferCapacity(ioTDBConfig.getThriftInitBufferSize());
RpcTransportFactory.setMaxLength(ioTDBConfig.getThriftMaxFrameSize());
RpcTransportFactory.setDefaultBufferCapacity(ioTDBConfig.getThriftDefaultBufferSize());
RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
transport =
RpcTransportFactory.INSTANCE.getTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
TProtocol protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public class RpcTransportFactory extends TTransportFactory {
public static boolean USE_SNAPPY = false;
public static final RpcTransportFactory INSTANCE;

private static int initialBufferCapacity = RpcUtils.DEFAULT_BUF_CAPACITY;
private static int maxLength = RpcUtils.DEFAULT_MAX_LENGTH;
private static int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
private static int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;

static {
INSTANCE =
USE_SNAPPY
? new RpcTransportFactory(new TimeoutChangeableTSnappyFramedTransport.Factory())
: new RpcTransportFactory(new Factory(initialBufferCapacity, maxLength));
: new RpcTransportFactory(new Factory(thriftDefaultBufferSize, thriftMaxFrameSize));
}

private TTransportFactory inner;
Expand All @@ -60,11 +60,11 @@ public static void setUseSnappy(boolean useSnappy) {
USE_SNAPPY = useSnappy;
}

public static void setInitialBufferCapacity(int initialBufferCapacity) {
RpcTransportFactory.initialBufferCapacity = initialBufferCapacity;
public static void setDefaultBufferCapacity(int thriftDefaultBufferSize) {
RpcTransportFactory.thriftDefaultBufferSize = thriftDefaultBufferSize;
}

public static void setMaxLength(int maxLength) {
RpcTransportFactory.maxLength = maxLength;
public static void setThriftMaxFrameSize(int thriftMaxFrameSize) {
RpcTransportFactory.thriftMaxFrameSize = thriftMaxFrameSize;
}
}
10 changes: 4 additions & 6 deletions service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@

public class RpcUtils {

/** How big should the default read and write buffers be? */
public static final int DEFAULT_BUF_CAPACITY = 64 * 1024;
/** How big is the largest allowable frame? Defaults to 16MB. */
public static final int DEFAULT_MAX_LENGTH = 16384000;
/** How big should the default read and write buffers be? Defaults to 64KB */
public static final int THRIFT_DEFAULT_BUF_CAPACITY = 64 * 1024;
/**
* It is used to prevent the size of the parsing package from being too large and allocating the
* buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
* reading. The default value is 512MB
* reading. Thrift max frame size (16384000 bytes by default), we change it to 512MB.
*/
public static final int FRAME_HARD_MAX_LENGTH = 536870912;
public static final int THRIFT_FRAME_MAX_SIZE = 536870912;

/**
* if resizeIfNecessary is called continuously with a small size for more than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
private AutoScalingBufferReadTransport readCompressBuffer;

protected TCompressedElasticFramedTransport(
TTransport underlying, int initialBufferCapacity, int softMaxLength) {
super(underlying, initialBufferCapacity, softMaxLength);
writeCompressBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
readCompressBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
TTransport underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) {
super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize);
writeCompressBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
readCompressBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
}

@Override
Expand Down Expand Up @@ -80,8 +80,8 @@ public void flush() throws TTransportException {
}

writeBuffer.reset();
if (softMaxLength < length) {
writeBuffer.resizeIfNecessary(softMaxLength);
if (thriftDefaultBufferSize < length) {
writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
}
underlying.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,55 +23,56 @@
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;

import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_BUF_CAPACITY;
import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_MAX_LENGTH;

public class TElasticFramedTransport extends TTransport {

public static class Factory extends TTransportFactory {

protected final int initialCapacity;
protected final int softMaxLength;
/**
* It is used to prevent the size of the parsing package from being too large and allocating the
* buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
* reading.
*/
protected final int thriftMaxFrameSize;

/**
* The capacity of the underlying buffer is allowed to exceed thriftDefaultBufferSize, but if
* adjacent requests all have sizes smaller than thriftDefaultBufferSize, the underlying buffer
* will be shrunk beneath thriftDefaultBufferSize. The shrinking is limited at most once per
* minute to reduce overhead when thriftDefaultBufferSize is set unreasonably or the workload
* naturally contains both ver large and very small requests.
*/
protected final int thriftDefaultBufferSize;

public Factory() {
this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
}

public Factory(int initialCapacity) {
this(initialCapacity, DEFAULT_MAX_LENGTH);
this(RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE);
}

public Factory(int initialCapacity, int softMaxLength) {
this.initialCapacity = initialCapacity;
this.softMaxLength = softMaxLength;
public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
}

@Override
public TTransport getTransport(TTransport trans) {
return new TElasticFramedTransport(trans, initialCapacity, softMaxLength);
return new TElasticFramedTransport(trans, thriftDefaultBufferSize, thriftMaxFrameSize);
}
}

public TElasticFramedTransport(TTransport underlying) {
this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE);
}

public TElasticFramedTransport(
TTransport underlying, int initialBufferCapacity, int softMaxLength) {
TTransport underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize) {
this.underlying = underlying;
this.softMaxLength = softMaxLength;
readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
}

/**
* The capacity of the underlying buffer is allowed to exceed maxSoftLength, but if adjacent
* requests all have sizes smaller than maxSoftLength, the underlying buffer will be shrunk
* beneath maxSoftLength. The shrinking is limited at most once per minute to reduce overhead when
* maxSoftLength is set unreasonably or the workload naturally contains both ver large and very
* small requests.
*/
protected final int softMaxLength;
protected final int thriftDefaultBufferSize;
protected final int thriftMaxFrameSize;

protected final TTransport underlying;
protected AutoScalingBufferReadTransport readBuffer;
Expand Down Expand Up @@ -115,19 +116,11 @@ protected void readFrame() throws TTransportException {
TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
}

if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
if (size > thriftMaxFrameSize) {
close();
throw new TTransportException(
TTransportException.CORRUPTED_DATA,
"Frame size ("
+ size
+ ") larger than protect max length ("
+ RpcUtils.FRAME_HARD_MAX_LENGTH
+ ")!");
}

if (size < softMaxLength) {
readBuffer.resizeIfNecessary(softMaxLength);
"Frame size (" + size + ") larger than protect max size (" + thriftMaxFrameSize + ")!");
}
readBuffer.fill(underlying, size);
}
Expand All @@ -139,8 +132,8 @@ public void flush() throws TTransportException {
underlying.write(i32buf, 0, 4);
underlying.write(writeBuffer.getBuffer(), 0, length);
writeBuffer.reset();
if (length > softMaxLength) {
writeBuffer.resizeIfNecessary(softMaxLength);
if (length > thriftDefaultBufferSize) {
writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
}
underlying.flush();
}
Expand Down
Loading