Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
Expand Down Expand Up @@ -84,7 +86,7 @@ public final void updateProperties(@Nullable String filePath) throws IOException
return;
}
try (InputStream confInput = Files.newInputStream(Paths.get(filePath))) {
properties.load(confInput);
properties.load(new InputStreamReader(confInput, StandardCharsets.UTF_8));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Properties;

Expand Down Expand Up @@ -105,7 +107,7 @@ private void loadProps() {
try (InputStream inputStream = url.openStream()) {

LOGGER.info("Start to read config file {}", url);
commonProperties.load(inputStream);
commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));

} catch (FileNotFoundException e) {
LOGGER.error("Fail to find config file {}, reject ConfigNode startup.", url, e);
Expand All @@ -128,7 +130,7 @@ private void loadProps() {
try (InputStream inputStream = url.openStream()) {
LOGGER.info("start reading ConfigNode conf file: {}", url);
Properties properties = new Properties();
properties.load(inputStream);
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
commonProperties.putAll(properties);
loadProperties(commonProperties);
} catch (IOException | BadNodeUrlException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -64,7 +66,7 @@ public TConfigNodeLocation removeCheck(String args) {
return nodeLocation;
}
try (FileInputStream inputStream = new FileInputStream(systemPropertiesFile)) {
systemProperties.load(inputStream);
systemProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
if (isNumeric(args)) {
int id = Integer.parseInt(args);
nodeLocation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -414,7 +417,7 @@ private static synchronized Properties getSystemProperties() throws IOException

Properties systemProperties = new Properties();
try (FileInputStream inputStream = new FileInputStream(systemPropertiesFile)) {
systemProperties.load(inputStream);
systemProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
}
return systemProperties;
}
Expand All @@ -423,7 +426,7 @@ private static synchronized void storeSystemProperties(Properties systemProperti
throws IOException {
try (FileOutputStream fileOutputStream = new FileOutputStream(systemPropertiesFile)) {
systemProperties.store(
fileOutputStream,
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8),
" THIS FILE IS AUTOMATICALLY GENERATED. PLEASE DO NOT MODIFY THIS FILE !!!");
fileOutputStream.getFD().sync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -163,7 +165,7 @@ private void loadProps() {
if (url != null) {
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to read config file {}", url);
commonProperties.load(inputStream);
commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
} catch (FileNotFoundException e) {
LOGGER.error("Fail to find config file {}, reject DataNode startup.", url, e);
System.exit(-1);
Expand All @@ -184,7 +186,7 @@ private void loadProps() {
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to read config file {}", url);
Properties properties = new Properties();
properties.load(inputStream);
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
commonProperties.putAll(properties);
loadProperties(commonProperties);
} catch (FileNotFoundException e) {
Expand Down Expand Up @@ -1640,7 +1642,7 @@ public void loadHotModifiedProps() throws QueryProcessException {
Properties commonProperties = new Properties();
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to reload config file {}", url);
commonProperties.load(inputStream);
commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
} catch (Exception e) {
LOGGER.warn("Fail to reload config file {}", url, e);
throw new QueryProcessException(
Expand All @@ -1655,7 +1657,7 @@ public void loadHotModifiedProps() throws QueryProcessException {
try (InputStream inputStream = url.openStream()) {
LOGGER.info("Start to reload config file {}", url);
Properties properties = new Properties();
properties.load(inputStream);
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
commonProperties.putAll(properties);
loadHotModifiedProps(commonProperties);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -176,7 +178,8 @@ public boolean checkIsFirstStart() throws IOException {
// write properties to system.properties
try (FileOutputStream outputStream = new FileOutputStream(propertiesFile)) {
systemProperties.forEach((k, v) -> properties.setProperty(k, v.get()));
properties.store(outputStream, SYSTEM_PROPERTIES_STRING);
properties.store(
new OutputStreamWriter(outputStream, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING);
}
isFirstStart = true;
return true;
Expand Down Expand Up @@ -266,7 +269,8 @@ public void checkSystemConfig() throws ConfigurationException, IOException {
// overwrite system.properties when first start
try (FileOutputStream outputStream = new FileOutputStream(propertiesFile)) {
systemProperties.forEach((k, v) -> properties.setProperty(k, v.get()));
properties.store(outputStream, SYSTEM_PROPERTIES_STRING);
properties.store(
new OutputStreamWriter(outputStream, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING);
}
if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
&& config.getWalMode().equals(WALMode.DISABLE)) {
Expand Down Expand Up @@ -309,7 +313,8 @@ private void upgradePropertiesFileFromBrokenFile() throws IOException {
});
properties.setProperty(IOTDB_VERSION_STRING, IoTDBConstant.VERSION);
properties.setProperty(COMMIT_ID_STRING, IoTDBConstant.BUILD_INFO);
properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
properties.store(
new OutputStreamWriter(tmpFOS, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING);
// upgrade finished, delete old system.properties file
if (propertiesFile.exists()) {
Files.delete(propertiesFile.toPath());
Expand Down Expand Up @@ -380,7 +385,8 @@ public void serializeClusterNameAndDataNodeId(String clusterName, int dataNodeId
try {
properties.setProperty(IoTDBConstant.CLUSTER_NAME, clusterName);
properties.setProperty(DATA_NODE_ID, String.valueOf(dataNodeId));
properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
properties.store(
new OutputStreamWriter(tmpFOS, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING);
// serialize finished, delete old system.properties file
if (propertiesFile.exists()) {
Files.delete(propertiesFile.toPath());
Expand Down Expand Up @@ -417,7 +423,8 @@ public void serializeMutableSystemPropertiesIfNecessary() throws IOException {
if (needsSerialize) {
try (FileOutputStream outputStream = new FileOutputStream(propertiesFile)) {
systemProperties.forEach((k, v) -> properties.setProperty(k, v.get()));
properties.store(outputStream, SYSTEM_PROPERTIES_STRING);
properties.store(
new OutputStreamWriter(outputStream, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING);
}
}
long endTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class IoTDBRestServiceDescriptor {
Expand Down Expand Up @@ -64,7 +66,7 @@ private Properties loadProps(String configName) {
try (InputStream inputStream = url.openStream()) {
logger.info("Start to read config file {}", url);
Properties properties = new Properties();
properties.load(inputStream);
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
return properties;
} catch (FileNotFoundException e) {
logger.warn("REST service fail to find config file {}", url, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.BytesUtils;
Expand Down Expand Up @@ -135,7 +136,9 @@ private void receive() throws IOException {
req);
fail();
}
} catch (Exception e) {
} catch (final PipeConnectionException e) {
LOGGER.info("Socket closed when listening to data. Because: {}", e.getMessage());
} catch (final Exception e) {
LOGGER.warn("Exception during handling receiving, receiverId: {}", receiverId, e);
fail();
}
Expand Down Expand Up @@ -167,7 +170,7 @@ private boolean checkSum(byte[] bytes) {
private byte[] readData(InputStream inputStream) throws IOException {
final int length = readLength(inputStream);

if (length == 0) {
if (length <= 0) {
// Will fail() after checkSum()
return new byte[0];
}
Expand Down Expand Up @@ -207,18 +210,48 @@ private int readLength(InputStream inputStream) throws IOException {
: 0;
}

private void readTillFull(InputStream inputStream, byte[] readBuffer) throws IOException {
/**
* Read to the buffer until it is full.
*
* @param inputStream the input socket stream
* @param readBuffer the buffer to read into
* @throws IOException if any IOException occurs
* @throws PipeConnectionException if the socket is closed during listening
*/
private void readTillFull(final InputStream inputStream, final byte[] readBuffer)
throws IOException, PipeConnectionException {
int alreadyReadBytes = 0;
while (alreadyReadBytes < readBuffer.length) {
alreadyReadBytes +=
final int readBytes =
inputStream.read(readBuffer, alreadyReadBytes, readBuffer.length - alreadyReadBytes);
// In socket input stream readBytes == -1 indicates EOF, namely the
// socket is closed
if (readBytes == -1) {
throw new PipeConnectionException("Socket closed when executing readTillFull.");
}
alreadyReadBytes += readBytes;
}
}

private void skipTillEnough(InputStream inputStream, long length) throws IOException {
int currentSkippedBytes = 0;
/**
* Skip given number of bytes of the buffer until enough bytes is skipped.
*
* @param inputStream the input socket stream
* @param length the length to skip
* @throws IOException if any IOException occurs
* @throws PipeConnectionException if the socket is closed during skipping
*/
private void skipTillEnough(final InputStream inputStream, final long length)
throws IOException, PipeConnectionException {
long currentSkippedBytes = 0;
while (currentSkippedBytes < length) {
currentSkippedBytes += (int) inputStream.skip(length - currentSkippedBytes);
final long skippedBytes = inputStream.skip(length - currentSkippedBytes);
// In socket input stream skippedBytes == 0 indicates EOF, namely the
// socket is closed
if (skippedBytes == 0) {
throw new PipeConnectionException("Socket closed when executing skipTillEnough.");
}
currentSkippedBytes += skippedBytes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -124,12 +127,12 @@ public boolean updateConfigNodeList(List<TEndPoint> latestConfigNodes) {
private void storeConfigNode() throws IOException {
Properties properties = new Properties();
try (FileInputStream inputStream = new FileInputStream(propertiesFile)) {
properties.load(inputStream);
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
}
properties.setProperty(
CONFIG_NODE_LIST, NodeUrlUtils.convertTEndPointUrls(new ArrayList<>(onlineConfigNodes)));
try (FileOutputStream fileOutputStream = new FileOutputStream(propertiesFileTmp)) {
properties.store(fileOutputStream, "");
properties.store(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8), "");
}
updatePropertiesFile();
}
Expand All @@ -151,7 +154,7 @@ public void loadConfigNodeList() {
configNodeInfoReadWriteLock.writeLock().lock();
Properties properties = new Properties();
try (FileInputStream inputStream = new FileInputStream(propertiesFile)) {
properties.load(inputStream);
properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public class CommonConfig {
private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 1024 * 1024; // 2MB
private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
private long pipeListeningQueueTransferSnapshotThreshold = 1000;

private int subscriptionSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
Expand Down Expand Up @@ -951,6 +952,15 @@ public void setPipeLeaderCacheMemoryUsagePercentage(float pipeLeaderCacheMemoryU
this.pipeLeaderCacheMemoryUsagePercentage = pipeLeaderCacheMemoryUsagePercentage;
}

public long getPipeListeningQueueTransferSnapshotThreshold() {
return pipeListeningQueueTransferSnapshotThreshold;
}

public void setPipeListeningQueueTransferSnapshotThreshold(
long pipeListeningQueueTransferSnapshotThreshold) {
this.pipeListeningQueueTransferSnapshotThreshold = pipeListeningQueueTransferSnapshotThreshold;
}

public int getSubscriptionSubtaskExecutorMaxThreadNum() {
return subscriptionSubtaskExecutorMaxThreadNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ private void loadPipeProps(Properties properties) {
properties.getProperty(
"pipe_leader_cache_memory_usage_percentage",
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
config.setPipeListeningQueueTransferSnapshotThreshold(
Long.parseLong(
properties.getProperty(
"pipe_listening_queue_transfer_snapshot_threshold",
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
}

private void loadSubscriptionProps(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package org.apache.iotdb.commons.conf;

import java.io.File;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
Expand All @@ -34,7 +37,10 @@ private IoTDBConstant() {}
Properties prop = new Properties();
String finalBuildInfo = "UNKNOWN";
try {
prop.load(IoTDBConstant.class.getResourceAsStream("/git.properties"));
prop.load(
new InputStreamReader(
Objects.requireNonNull(IoTDBConstant.class.getResourceAsStream("/git.properties")),
StandardCharsets.UTF_8));
finalBuildInfo = prop.getProperty("git.commit.id.abbrev", "UNKNOWN");
String isDirty = prop.getProperty("git.dirty", "false");
if (isDirty.equalsIgnoreCase("true")) {
Expand Down
Loading