diff --git a/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts b/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts index e4a00e0d0..9fe10e08b 100644 --- a/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts +++ b/src/.vuepress/sidebar_timecho/V1.3.3/zh.ts @@ -137,7 +137,12 @@ export const zhSidebar = { prefix: 'API/', // children: 'structure', children: [ - { text: 'Java原生接口', link: 'Programming-Java-Native-API' }, + { text: 'Java原生接口', collapsible: true, + children: [ + { text: 'Java原生API', link: 'Programming-Java-Native-API' }, + { text: '数据订阅API', link: 'Programming-Data-Sync' }, + ], + }, { text: 'Python原生接口', link: 'Programming-Python-Native-API' }, { text: 'C++原生接口', link: 'Programming-Cpp-Native-API' }, { text: 'Go原生接口', link: 'Programming-Go-Native-API' }, diff --git a/src/zh/UserGuide/latest/API/Programming-Data-Sync.md b/src/zh/UserGuide/latest/API/Programming-Data-Sync.md new file mode 100644 index 000000000..76eebf23c --- /dev/null +++ b/src/zh/UserGuide/latest/API/Programming-Data-Sync.md @@ -0,0 +1,247 @@ + + +# 数据订阅API +IoTDB 提供了强大的数据订阅功能,允许用户通过订阅SDK实时获取IoTDB新增的数据。详细的功能定义及介绍:[数据订阅](../../User-Manual/Data-Sync_timecho.md#数据同步) + +## 1 核心步骤 + +1. 创建Topic:创建一个Topic,Topic中包含希望订阅的测点。 +2. 订阅Topic:在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败。同一个 consumer group 下的 consumers 会均分数据。 +3. 消费数据:只有显式订阅了某个 topic,才会收到对应 topic 的数据。 +4. 取消订阅: consumer close 时会退出对应的 consumer group,同时取消现存的所有订阅。 + + +## 2 详细步骤 +本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [全量接口说明](./Programming-Java-Native-API.md#全量接口说明) + + +### 2.1 创建maven项目 +创建一个maven项目,并导入以下依赖(JDK >= 1.8, Maven >= 3.6) + +```xml + + + org.apache.iotdb + iotdb-session + + ${project.version} + + +``` + +### 2.2 代码案例 +#### 2.2.1 Topic操作 +```java +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.model.Topic; + +public class DataConsumerExample { + + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { + try (SubscriptionSession session = new SubscriptionSession("127.0.0.1", 6667)) { + // 1. open session + session.open(); + + // 2. create a topic of all data + Properties sessionConfig = new Properties(); + sessionConfig.put(TopicConstant.PATH_KEY, "root.**"); + + session.createTopic("allData", sessionConfig); + + // 3. show all topics + Set topics = session.getTopics(); + System.out.println(topics); + + // 4. show a specific topic + Optional allData = session.getTopic("allData"); + System.out.println(allData.get()); + } + } +} +``` +#### 2.2.2 数据消费 + +##### 场景-1: 订阅IoTDB中新增的实时数据(大屏或组态展示的场景) + +```java +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType; +import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; +import org.apache.tsfile.read.common.RowRecord; + +public class DataConsumerExample { + + public static void main(String[] args) throws IOException { + + // 5. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); + consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + consumerConfig.put(ConsumerConstant.CONSUME_LISTENER_KEY, TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE); + try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { + pullConsumer.open(); + pullConsumer.subscribe("topic_all"); + while (true) { + List messages = pullConsumer.poll(10000); + for (final SubscriptionMessage message : messages) { + final short messageType = message.getMessageType(); + if (SubscriptionMessageType.isValidatedMessageType(messageType)) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + while (dataSet.hasNext()) { + final RowRecord record = dataSet.next(); + System.out.println(record); + } + } + } + } + } + } + } +} + + +``` +##### 场景-2:订阅新增的 TsFile(定期数据备份的场景) + +前提:需要被消费的topic的格式为TsfileHandler类型,举例:`create topic topic_all_tsfile with ('path'='root.**','format'='TsFileHandler')` + +```java +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; + + +public class DataConsumerExample { + + public static void main(String[] args) throws IOException { + // 1. create a pull consumer, the subscription is automatically cancelled when the logic in the try resources is completed + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); + consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + // 2. Specify the consumption type as the tsfile type + consumerConfig.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + consumerConfig.put(ConsumerConstant.FILE_SAVE_DIR_KEY, "/Users/iotdb/Downloads"); + try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { + pullConsumer.open(); + pullConsumer.subscribe("topic_all_tsfile"); + while (true) { + List messages = pullConsumer.poll(10000); + for (final SubscriptionMessage message : messages) { + message.getTsFileHandler().copyFile("/Users/iotdb/Downloads/1.tsfile"); + } + } + } + } +} +``` + + + + +## 2 全量接口说明 + +### 2.1 参数列表 +可通过Properties参数对象设置消费者相关参数,具体参数如下。 + +#### 2.1.1 SubscriptionConsumer + + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | +| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | +| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | +| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | +| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | +| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | +| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | +| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | + +`SubscriptionPushConsumer` 中的特殊配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | +| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | + +`SubscriptionPullConsumer` 中的特殊配置: + +| 参数 | 是否必填(默认值) | 参数含义 | +| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | +| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | + + +### 函数列表 +#### 数据订阅 +##### SubscriptionPullConsumer + +| **函数名** | **说明** | **参数** | +|-------------------------------------|--------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `open()` | 打开消费者连接,启动消息消费。如果 `autoCommit` 启用,会启动自动提交工作器。 | 无 | +| `close()` | 关闭消费者连接。如果 `autoCommit` 启用,会在关闭前提交所有未提交的消息。 | 无 | +| `poll(final Duration timeout)` | 拉取消息,指定超时时间。 | `timeout` : 拉取的超时时间。 | +| `poll(final long timeoutMs)` | 拉取消息,指定超时时间(毫秒)。 | `timeoutMs` : 超时时间,单位为毫秒。 | +| `poll(final Set topicNames, final Duration timeout)` | 拉取指定主题的消息,指定超时时间。 | `topicNames` : 要拉取的主题集合。`timeout`: 超时时间。 | +| `poll(final Set topicNames, final long timeoutMs)` | 拉取指定主题的消息,指定超时时间(毫秒)。 | `topicNames` : 要拉取的主题集合。`timeoutMs`: 超时时间,单位为毫秒。 | +| `commitSync(final SubscriptionMessage message)` | 同步提交单条消息。 | `message` : 需要提交的消息对象。 | +| `commitSync(final Iterable messages)` | 同步提交多条消息。 | `messages` : 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message)` | 异步提交单条消息。 | `message` : 需要提交的消息对象。 | +| `commitAsync(final Iterable messages)` | 异步提交多条消息。 | `messages` : 需要提交的消息集合。 | +| `commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback)` | 异步提交单条消息并指定回调函数。 | `message` : 需要提交的消息对象。`callback` : 异步提交完成后的回调函数。 | +| `commitAsync(final Iterable messages, final AsyncCommitCallback callback)` | 异步提交多条消息并指定回调函数。 | `messages` : 需要提交的消息集合。`callback` : 异步提交完成后的回调函数。 | + +##### SubscriptionPushConsumer + +| **函数名** | **说明** | **参数** | +|-------------------------------------|----------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `open()` | 打开消费者连接,启动消息消费,提交自动轮询工作器。 | 无 | +| `close()` | 关闭消费者连接,停止消息消费。 | 无 | +| `toString()` | 返回消费者对象的核心配置信息。 | 无 | +| `coreReportMessage()` | 获取消费者核心配置的键值对表示形式。 | 无 | +| `allReportMessage()` | 获取消费者所有配置的键值对表示形式。 | 无 | +| `buildPushConsumer()` | 通过 `Builder` 构建 `SubscriptionPushConsumer` 实例。 | 无 | +| `ackStrategy(final AckStrategy ackStrategy)` | 配置消费者的消息确认策略。 | `ackStrategy`: 指定的消息确认策略。 | +| `consumeListener(final ConsumeListener consumeListener)` | 配置消费者的消息消费逻辑。 | `consumeListener`: 消费者接收消息时的处理逻辑。 | +| `autoPollIntervalMs(final long autoPollIntervalMs)` | 配置自动轮询的时间间隔。 | `autoPollIntervalMs` : 自动轮询的间隔时间,单位为毫秒。 | +| `autoPollTimeoutMs(final long autoPollTimeoutMs)` | 配置自动轮询的超时时间。 | `autoPollTimeoutMs`: 自动轮询的超时时间,单位为毫秒。 | + diff --git a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md index 8c68005d8..3e38fbda7 100644 --- a/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Java-Native-API.md @@ -1,792 +1,495 @@ -# Java 原生接口 +# Session原生API -## 安装 +IoTDB 原生 API 中的 Session 是实现与数据库交互的核心接口,它集成了丰富的方法,支持数据写入、查询以及元数据操作等功能。通过实例化 Session,能够建立与 IoTDB 服务器的连接,在该连接所构建的环境中执行各类数据库操作。Session为非线程安全,不能被多线程同时调用。 -### 依赖 +SessionPool 是 Session 的连接池,推荐使用SessionPool编程。在多线程并发的情形下,SessionPool 能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。 -* JDK >= 1.8 -* Maven >= 3.6 +## 1 步骤概览 +1. 创建连接池实例:初始化一个SessionPool对象,用于管理多个Session实例。 +2. 执行操作:直接从SessionPool中获取Session实例,并执行数据库操作,无需每次都打开和关闭连接。 +3. 关闭连接池资源:在不再需要进行数据库操作时,关闭SessionPool,释放所有相关资源。 -### 在 MAVEN 中使用原生接口 +## 2 详细步骤 +本章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [全量接口说明](./Programming-Java-Native-API.md#) 或 查阅: [源码](https://github.com/apache/iotdb/tree/master/example/session/src/main/java/org/apache/iotdb) + +### 2.1 创建maven项目 +创建一个maven项目,并在pom.xml文件中添加以下依赖(JDK >= 1.8, Maven >= 3.6) ```xml org.apache.iotdb iotdb-session + ${project.version} ``` - -## 语法说明 - - - 对于 IoTDB-SQL 接口:传入的 SQL 参数需要符合 [语法规范](../User-Manual/Syntax-Rule.md#字面值常量) ,并且针对 JAVA 字符串进行反转义,如双引号前需要加反斜杠。(即:经 JAVA 转义之后与命令行执行的 SQL 语句一致。) - - 对于其他接口: - - 经参数传入的路径或路径前缀中的节点: 在 SQL 语句中需要使用反引号(`)进行转义的,此处均需要进行转义。 - - 经参数传入的标识符(如模板名):在 SQL 语句中需要使用反引号(`)进行转义的,均可以不用进行转义。 - - 语法说明相关代码示例可以参考:`example/session/src/main/java/org/apache/iotdb/SyntaxConventionRelatedExample.java` - -## 基本接口说明 - -下面将给出 Session 对应的接口的简要介绍和对应参数: - -### Session管理 - -* 初始化 Session - -``` java -// 全部使用默认配置 -session = new Session.Builder.build(); - -// 指定一个可连接节点 -session = - new Session.Builder() - .host(String host) - .port(int port) - .build(); - -// 指定多个可连接节点 -session = - new Session.Builder() - .nodeUrls(List nodeUrls) - .build(); - -// 其他配置项 -session = - new Session.Builder() - .fetchSize(int fetchSize) - .username(String username) - .password(String password) - .thriftDefaultBufferSize(int thriftDefaultBufferSize) - .thriftMaxFrameSize(int thriftMaxFrameSize) - .enableRedirection(boolean enableRedirection) - .version(Version version) - .build(); -``` - -其中,version 表示客户端使用的 SQL 语义版本,用于升级 0.13 时兼容 0.12 的 SQL 语义,可能取值有:`V_0_12`、`V_0_13`、`V_1_0`等。 - - -* 开启 Session - -``` java -void open() -``` - -* 开启 Session,并决定是否开启 RPC 压缩 - -``` java -void open(boolean enableRPCCompression) -``` - -注意: 客户端的 RPC 压缩开启状态需和服务端一致 - -* 关闭 Session - -``` java -void close() -``` - -* SessionPool - -我们提供了一个针对原生接口的连接池 (`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。 -如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。 - -当一个连接被用完后,他会自动返回池中等待下次被使用; -当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作; -你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。 - -对于查询操作: - -1. 使用 SessionPool 进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`; -2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`; -3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`. -4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名 - -使用示例可以参见 `session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java` - -或 `example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java` - - -### 测点管理接口 - -#### Database 管理 - -* 设置 database - -``` java -void setStorageGroup(String storageGroupId) -``` - -* 删除单个或多个 database - -``` java -void deleteStorageGroup(String storageGroup) -void deleteStorageGroups(List storageGroups) -``` -#### 时间序列管理 - -* 创建单个或多个时间序列 - -``` java -void createTimeseries(String path, TSDataType dataType, - TSEncoding encoding, CompressionType compressor, Map props, - Map tags, Map attributes, String measurementAlias) - -void createMultiTimeseries(List paths, List dataTypes, - List encodings, List compressors, - List> propsList, List> tagsList, - List> attributesList, List measurementAliasList) -``` - -* 创建对齐时间序列 - -``` -void createAlignedTimeseries(String prefixPath, List measurements, - List dataTypes, List encodings, - List compressors, List measurementAliasList); -``` - -注意:目前**暂不支持**使用传感器别名。 - -* 删除一个或多个时间序列 - -``` java -void deleteTimeseries(String path) -void deleteTimeseries(List paths) -``` - -* 检测时间序列是否存在 - -``` java -boolean checkTimeseriesExists(String path) -``` - -#### 元数据模版 - -* 创建元数据模板,可以通过先后创建 Template、MeasurementNode 的对象,描述模板内物理量结构与类型、编码方式、压缩方式等信息,并通过以下接口创建模板 - -``` java -public void createSchemaTemplate(Template template); - -Class Template { - private String name; - private boolean directShareTime; - Map children; - public Template(String name, boolean isShareTime); - - public void addToTemplate(Node node); - public void deleteFromTemplate(String name); - public void setShareTime(boolean shareTime); -} - -Abstract Class Node { - private String name; - public void addChild(Node node); - public void deleteChild(Node node); -} - -Class MeasurementNode extends Node { - TSDataType dataType; - TSEncoding encoding; - CompressionType compressor; - public MeasurementNode(String name, - TSDataType dataType, - TSEncoding encoding, - CompressionType compressor); -} -``` - -通过上述类的实例描述模板时,Template 内应当仅能包含单层的 MeasurementNode,具体可以参见如下示例: - -``` java -MeasurementNode nodeX = new MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); -MeasurementNode nodeY = new MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY); -MeasurementNode nodeSpeed = new MeasurementNode("speed", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY); - -// This is the template we suggest to implement -Template flatTemplate = new Template("flatTemplate"); -template.addToTemplate(nodeX); -template.addToTemplate(nodeY); -template.addToTemplate(nodeSpeed); - -createSchemaTemplate(flatTemplate); -``` - -* 完成模板挂载操作后,可以通过如下的接口在给定的设备上使用模板注册序列,或者也可以直接向相应的设备写入数据以自动使用模板注册序列。 - -``` java -void createTimeseriesUsingSchemaTemplate(List devicePathList) -``` - -* 将名为'templateName'的元数据模板挂载到'prefixPath'路径下,在执行这一步之前,你需要创建名为'templateName'的元数据模板 -* **请注意,我们强烈建议您将模板设置在 database 或 database 下层的节点中,以更好地适配未来版本更新及各模块的协作** - -``` java -void setSchemaTemplate(String templateName, String prefixPath) -``` - -- 将模板挂载到 MTree 上之后,你可以随时查询所有模板的名称、某模板被设置到 MTree 的所有路径、所有正在使用某模板的所有路径,即如下接口: - -``` java -/** @return All template names. */ -public List showAllTemplates(); - -/** @return All paths have been set to designated template. */ -public List showPathsTemplateSetOn(String templateName); - -/** @return All paths are using designated template. */ -public List showPathsTemplateUsingOn(String templateName) -``` - -- 如果你需要删除某一个模板,请确保在进行删除之前,MTree 上已经没有节点被挂载了模板,对于已经被挂载模板的节点,可以用如下接口卸载模板; - - -``` java -void unsetSchemaTemplate(String prefixPath, String templateName); -public void dropSchemaTemplate(String templateName); -``` - -* 请注意,如果一个子树中有多个孩子节点需要使用模板,可以在其共同父母节点上使用 setSchemaTemplate 。而只有在已有数据点插入模板对应的物理量时,模板才会被设置为激活状态,进而被 show timeseries 等查询检测到。 -* 卸载'prefixPath'路径下的名为'templateName'的元数据模板。你需要保证给定的路径'prefixPath'下需要有名为'templateName'的元数据模板。 - -注意:目前不支持从曾经在'prefixPath'路径及其后代节点使用模板插入数据后(即使数据已被删除)卸载模板。 - - -### 数据写入接口 - -推荐使用 insertTablet 帮助提高写入效率 - -* 插入一个 Tablet,Tablet 是一个设备若干行数据块,每一行的列都相同 - * **写入效率高** - * **支持批量写入** - * **支持写入空值**:空值处可以填入任意值,然后通过 BitMap 标记空值 - -``` java -void insertTablet(Tablet tablet) - -public class Tablet { - /** deviceId of this tablet */ - public String prefixPath; - /** the list of measurement schemas for creating the tablet */ - private List schemas; - /** timestamps in this tablet */ - public long[] timestamps; - /** each object is a primitive type array, which represents values of one measurement */ - public Object[] values; - /** each bitmap represents the existence of each value in the current column. */ - public BitMap[] bitMaps; - /** the number of rows to include in this tablet */ - public int rowSize; - /** the maximum number of rows for this tablet */ - private int maxRowNumber; - /** whether this tablet store data of aligned timeseries or not */ - private boolean isAligned; +### 2.2 创建连接池实例 + +```java +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.session.pool.SessionPool; + +public class IoTDBSessionPoolExample { + private static SessionPool sessionPool; + + public static void main(String[] args) { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } } ``` -* 插入多个 Tablet - -``` java -void insertTablets(Map tablets) -``` - -* 插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据。这里的 value 是 Object 类型,相当于提供了一个公用接口,后面可以通过 TSDataType 将 value 强转为原类型 - - 其中,Object 类型与 TSDataType 类型的对应关系如下表所示: - - | TSDataType | Object | - |------------|--------------| - | BOOLEAN | Boolean | - | INT32 | Integer | - | DATE | LocalDate | - | INT64 | Long | - | TIMESTAMP | Long | - | FLOAT | Float | - | DOUBLE | Double | - | TEXT | String, Binary | - | STRING | String, Binary | - | BLOB | Binary | - -``` java -void insertRecord(String prefixPath, long time, List measurements, - List types, List values) -``` - -* 插入多个 Record - -``` java -void insertRecords(List deviceIds, - List times, - List> measurementsList, - List> typesList, - List> valuesList) -``` +### 2.3 执行数据库操作 +#### 2.3.1 数据写入 +在工业场景中,数据写入可分为以下几类:多行数据写入、单设备多行数据写入,下面按不同场景对写入接口进行介绍。 -* 插入同属于一个 device 的多个 Record +##### 多行数据写入接口 +接口说明:支持一次写入多行数据,每一行对应一个设备一个时间戳的多个测点值。 -``` java -void insertRecordsOfOneDevice(String deviceId, List times, - List> measurementsList, List> typesList, - List> valuesList) -``` +接口列表: -#### 带有类型推断的写入 +| 接口名称 | 功能描述 | +|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------| +| `insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多行数据,适用于不同测点独立采集的场景 | -当数据均是 String 类型时,我们可以使用如下接口,根据 value 的值进行类型推断。例如:value 为 "true" ,就可以自动推断为布尔类型。value 为 "3.2" ,就可以自动推断为数值类型。服务器需要做类型推断,可能会有额外耗时,速度较无需类型推断的写入慢 -* 插入一个 Record,一个 Record 是一个设备一个时间戳下多个测点的数据 +代码案例: +```java +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.tsfile.enums.TSDataType; -``` java -void insertRecord(String prefixPath, long time, List measurements, List values) -``` +public class SessionPoolExample { -* 插入多个 Record - -``` java -void insertRecords(List deviceIds, List times, - List> measurementsList, List> valuesList) -``` - -* 插入同属于一个 device 的多个 Record - -``` java -void insertStringRecordsOfOneDevice(String deviceId, List times, - List> measurementsList, List> valuesList) -``` + private static SessionPool sessionPool; -#### 对齐时间序列的写入 + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { -对齐时间序列的写入使用 insertAlignedXXX 接口,其余与上述接口类似: + // 1. init SessionPool + constructSessionPool(); -* insertAlignedRecord -* insertAlignedRecords -* insertAlignedRecordsOfOneDevice -* insertAlignedStringRecordsOfOneDevice -* insertAlignedTablet -* insertAlignedTablets + // 2. execute insert data + insertRecordsExample(); -### 数据删除接口 + // 3. close SessionPool + closeSessionPool(); -* 删除一个或多个时间序列在某个时间点前或这个时间点的数据 - -``` java -void deleteData(String path, long endTime) -void deleteData(List paths, long endTime) -``` - -### 数据查询接口 - -* 时间序列原始数据范围查询: - - 指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。 - -``` java -SessionDataSet executeRawDataQuery(List paths, long startTime, long endTime); -``` - -* 最新点查询: - - 查询最后一条时间戳大于等于某个时间点的数据。 - ``` java - SessionDataSet executeLastDataQuery(List paths, long lastTime); - ``` - - 快速查询单设备下指定序列最新点,支持重定向;如果您确认使用的查询路径是合法的,可将`isLegalPathNodes`置为true以避免路径校验带来的性能损失。 - ``` java - SessionDataSet executeLastDataQueryForOneDevice( - String db, String device, List sensors, boolean isLegalPathNodes); - ``` - -* 聚合查询: - - 支持指定查询时间范围。指定的查询时间范围为左闭右开区间,包含开始时间但不包含结束时间。 - - 支持按照时间区间分段查询。 - -``` java -SessionDataSet executeAggregationQuery(List paths, List aggregations); - -SessionDataSet executeAggregationQuery( - List paths, List aggregations, long startTime, long endTime); - -SessionDataSet executeAggregationQuery( - List paths, - List aggregations, - long startTime, - long endTime, - long interval); - -SessionDataSet executeAggregationQuery( - List paths, - List aggregations, - long startTime, - long endTime, - long interval, - long slidingStep); -``` - -* 直接执行查询语句 - -``` java -SessionDataSet executeQueryStatement(String sql) -``` - -### 数据订阅 - -#### 1 Topic 管理 - -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了 Topic 管理的相关接口。Topic状态变化如下图所示: - -
- -
- -##### 1.1 创建 Topic - -```Java - void createTopicIfNotExists(String topicName, Properties properties) throws Exception; -``` - -示例: - -```Java -try (final SubscriptionSession session = new SubscriptionSession(host, port)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(topicName, config); -} -``` - -##### 1.2 删除 Topic - -```Java -void dropTopicIfExists(String topicName) throws Exception; -``` - -##### 1.3 查看 Topic - -```Java -// 获取所有 topics -Set getTopics() throws Exception; - -// 获取单个 topic -Optional getTopic(String topicName) throws Exception; -``` - -#### 2 查看订阅状态 + } -IoTDB 订阅客户端中的 `SubscriptionSession` 类提供了获取订阅状态的相关接口: + private static void constructSessionPool() { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } -```Java -Set getSubscriptions() throws Exception; -Set getSubscriptions(final String topicName) throws Exception; -``` + public static void insertRecordsExample() throws IoTDBConnectionException, StatementExecutionException { + String deviceId = "root.sg1.d1"; + List measurements = new ArrayList<>(); + measurements.add("s1"); + measurements.add("s2"); + measurements.add("s3"); + List deviceIds = new ArrayList<>(); + List> measurementsList = new ArrayList<>(); + List> valuesList = new ArrayList<>(); + List timestamps = new ArrayList<>(); + List> typesList = new ArrayList<>(); + + for (long time = 0; time < 500; time++) { + List values = new ArrayList<>(); + List types = new ArrayList<>(); + values.add(1L); + values.add(2L); + values.add(3L); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + types.add(TSDataType.INT64); + + deviceIds.add(deviceId); + measurementsList.add(measurements); + valuesList.add(values); + typesList.add(types); + timestamps.add(time); + if (time != 0 && time % 100 == 0) { + try { + sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException | StatementExecutionException e) { + // solve exception + } + deviceIds.clear(); + measurementsList.clear(); + valuesList.clear(); + typesList.clear(); + timestamps.clear(); + } + } -#### 3 创建 Consumer - -在使用 JAVA 原生接口创建 consumer 时,需要指定 consumer 所应用的参数。 - -对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: - -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| host | optional: 127.0.0.1 | `String`: IoTDB 中某 DataNode 的 RPC host | -| port | optional: 6667 | `Integer`: IoTDB 中某 DataNode 的 RPC port | -| node-urls | optional: 127.0.0.1:6667 | `List`: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | -| username | optional: root | `String`: IoTDB 中 DataNode 的用户名 | -| password | optional: root | `String`: IoTDB 中 DataNode 的密码 | -| groupId | optional | `String`: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | -| consumerId | optional | `String`: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | -| heartbeatIntervalMs | optional: 30000 (min: 1000) | `Long`: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | -| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | `Long`: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | -| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | `String`: consumer 订阅出的 TsFile 文件临时存放的目录路径 | -| fileSaveFsync | optional: false | `Boolean`: consumer 订阅 TsFile 的过程中是否主动调用 fsync | - - -##### 3.1 SubscriptionPushConsumer - -以下为 `SubscriptionPushConsumer` 中的特殊配置: -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:`ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)`ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | -| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | -| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | -| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | - -其中,`ConsumerListener` 接口定义如下: - -```Java -@FunctionInterface -interface ConsumeListener { - default ConsumeResult onReceive(Message message) { - return ConsumeResult.SUCCESS; - } -} + try { + sessionPool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException | StatementExecutionException e) { + // solve exception + } + } -enum ConsumeResult { - SUCCESS, - FAILURE, + public static void closeSessionPool(){ + sessionPool.close(); + } } ``` -##### 3.2 SubscriptionPullConsumer +##### 单设备多行数据写入接口 +接口说明:支持一次写入单个设备的多行数据,每一行对应一个时间戳的多个测点值。 -以下为 `SubscriptionPullConsumer` 中的特殊配置: +接口列表: -| 参数 | 是否必填(默认值) | 参数含义 | -| :-------------------------------------------- | :--------------------------------- | :----------------------------------------------------------- | -| autoCommit | optional: true | Boolean: 是否自动提交消费进度如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | -| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**仅当 autoCommit 参数为 true 的时候才会生效 | +| 接口名称 | 功能描述 | +|-----------------------------------------------------------------------------------------|----------------------------| +| `insertTablet(Tablet tablet)` | 插入单个设备的多行数据,适用于不同测点独立采集的场景 | -在创建 consumer 后,需要手动调用 consumer 的 open 方法: +代码案例: +```java +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; -```Java -void open() throws Exception; -``` +public class SessionPoolExample { -此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 + private static SessionPool sessionPool; -#### 4 订阅 Topic + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于订阅 Topics: + // 1. init SessionPool + constructSessionPool(); -```Java -// 订阅 topics -void subscribe(String topic) throws Exception; -void subscribe(List topics) throws Exception; -``` + // 2. execute insert data + insertTabletExample(); -- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 -- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 -- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 + // 3. close SessionPool + closeSessionPool(); -#### 5 消费数据 - -无论是 push 模式还是 pull 模式的 consumer: - -- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 -- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics + } -##### 5.1 SubscriptionPushConsumer + private static void constructSessionPool() { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } -SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 + private static void insertTabletExample() throws IoTDBConnectionException, StatementExecutionException { + /* + * A Tablet example: + * device1 + * time s1, s2, s3 + * 1, 1, 1, 1 + * 2, 2, 2, 2 + * 3, 3, 3, 3 + */ + // The schema of measurements of one device + // only measurementId and data type in MeasurementSchema take effects in Tablet + List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + Tablet tablet = new Tablet("root.sg.d1", schemaList, 100); + + // Method 1 to add tablet data + long timestamp = System.currentTimeMillis(); + + Random random = new Random(); + for (long row = 0; row < 100; row++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + for (int s = 0; s < 3; s++) { + long value = random.nextLong(); + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value); + } + if (tablet.rowSize == tablet.getMaxRowNumber()) { + sessionPool.insertTablet(tablet); + tablet.reset(); + } + timestamp++; + } -##### 5.2 SubscriptionPullConsumer + if (tablet.rowSize != 0) { + sessionPool.insertTablet(tablet); + tablet.reset(); + } + } -SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: -```Java -List poll(final Duration timeout) throws Exception; -List poll(final long timeoutMs) throws Exception; -List poll(final Set topicNames, final Duration timeout) throws Exception; -List poll(final Set topicNames, final long timeoutMs) throws Exception; + public static void closeSessionPool(){ + sessionPool.close(); + } +} ``` -在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 +#### 2.3.2 SQL操作 -当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: +SQL操作分为查询和非查询两类操作,对应的接口为`executeQuery`和`executeNonQuery`操作,其区别为前者执行的是具体的查询语句,会返回一个结果集,后者是执行的是增、删、改操作,不返回结果集。 -```Java -void commitSync(final SubscriptionMessage message) throws Exception; -void commitSync(final Iterable messages) throws Exception; +```java +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; -CompletableFuture commitAsync(final SubscriptionMessage message); -CompletableFuture commitAsync(final Iterable messages); -void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); -void commitAsync(final Iterable messages, final AsyncCommitCallback callback); -``` +public class SessionPoolExample { -AsyncCommitCallback 类定义如下: + private static SessionPool sessionPool; -```Java -public interface AsyncCommitCallback { - default void onComplete() { - // Do nothing - } + public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { - default void onFailure(final Throwable e) { - // Do nothing - } -} -``` + // 1. init SessionPool + constructSessionPool(); -#### 6 取消订阅 + // 2. executes a non-query SQL statement, such as a DDL or DML command. + executeQueryExample(); -`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA 原生接口用于取消订阅并关闭 consumer: + // 3. executes a query SQL statement and returns the result set. + executeNonQueryExample(); -```Java -// 取消订阅 topics -void unsubscribe(String topic) throws Exception; -void unsubscribe(List topics) throws Exception; + // 4. close SessionPool + closeSessionPool(); -// 关闭 consumer -void close(); -``` + } -- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 -- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics -- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 -#### 7 代码示例 + private static void executeNonQueryExample() throws IoTDBConnectionException, StatementExecutionException { -##### 7.1 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 + // 1. create a nonAligned time series + sessionPool.executeNonQueryStatement("create timeseries root.test.d1.s1 with dataType = int32"); -```Java -// Create topics -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - final Properties config = new Properties(); - config.put(TopicConstant.PATH_KEY, "root.db.**"); - session.createTopic(TOPIC_1, config); -} + // 2. set ttl + sessionPool.executeNonQueryStatement("set TTL to root.test.** 10000"); -// Subscription: property-style ctor -final Properties config = new Properties(); -config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); -config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); - -final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); -consumer1.open(); -consumer1.subscribe(TOPIC_1); -while (true) { - LockSupport.parkNanos(SLEEP_NS); // wait some time - final List messages = consumer1.poll(POLL_TIMEOUT_MS); - for (final SubscriptionMessage message : messages) { - for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { - System.out.println(dataSet.getColumnNames()); - System.out.println(dataSet.getColumnTypes()); - while (dataSet.hasNext()) { - System.out.println(dataSet.next()); - } + // 3. delete time series + sessionPool.executeNonQueryStatement("delete timeseries root.test.d1.s1"); } - } - // Auto commit -} -// Show topics and subscriptions -try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { - session.open(); - session.getTopics().forEach((System.out::println)); - session.getSubscriptions().forEach((System.out::println)); -} - -consumer1.unsubscribe(TOPIC_1); -consumer1.close(); -``` - -##### 7.2 多 Push Consumer 消费 TsFileHandler 形式的数据 - -```Java -// Create topics -try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { - subscriptionSession.open(); - final Properties config = new Properties(); - config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); - subscriptionSession.createTopic(TOPIC_2, config); -} -final List threads = new ArrayList<>(); -for (int i = 0; i < 8; ++i) { - final int idx = i; - final Thread thread = - new Thread( - () -> { - // Subscription: builder-style ctor - try (final SubscriptionPushConsumer consumer2 = - new SubscriptionPushConsumer.Builder() - .consumerId("c" + idx) - .consumerGroupId("cg2") - .fileSaveDir(System.getProperty("java.io.tmpdir")) - .ackStrategy(AckStrategy.AFTER_CONSUME) - .consumeListener( - message -> { - doSomething(message.getTsFileHandler()); - return ConsumeResult.SUCCESS; - }) - .buildPushConsumer()) { - consumer2.open(); - consumer2.subscribe(TOPIC_2); - // block the consumer main thread - Thread.sleep(Long.MAX_VALUE); - } catch (final IOException | InterruptedException e) { - throw new RuntimeException(e); + private static void executeQueryExample() throws IoTDBConnectionException, StatementExecutionException { + // 1. execute normal query + try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select s1 from root.sg1.d1 limit 10")) { + while (wrapper.hasNext()) { + System.out.println(wrapper.next()); } - }); - thread.start(); - threads.add(thread); -} - -for (final Thread thread : threads) { - thread.join(); -} -``` - + } -### 其他功能(直接执行SQL语句) - -``` java -void executeNonQueryStatement(String sql) -``` - -### 写入测试接口 (用于分析网络带宽) - -不实际写入数据,只将数据传输到 server 即返回 - -* 测试 insertRecord - -``` java -void testInsertRecord(String deviceId, long time, List measurements, List values) - -void testInsertRecord(String deviceId, long time, List measurements, - List types, List values) -``` - -* 测试 testInsertRecords - -``` java -void testInsertRecords(List deviceIds, List times, - List> measurementsList, List> valuesList) - -void testInsertRecords(List deviceIds, List times, - List> measurementsList, List> typesList, - List> valuesList) -``` - -* 测试 insertTablet + // 2. execute aggregate query + try(SessionDataSetWrapper wrapper = sessionPool.executeQueryStatement("select count(s1) from root.sg1.d1 group by ([0, 40), 5ms) ")) { + while (wrapper.hasNext()) { + System.out.println(wrapper.next()); + } + } -``` java -void testInsertTablet(Tablet tablet) -``` + } -* 测试 insertTablets + private static void constructSessionPool() { + // Using nodeUrls ensures that when one node goes down, other nodes are automatically connected to retry + List nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(3) + .build(); + } -``` java -void testInsertTablets(Map tablets) + public static void closeSessionPool(){ + sessionPool.close(); + } +} ``` -### 示例代码 - -浏览上述接口的详细信息,请参阅代码 ```session/src/main/java/org/apache/iotdb/session/Session.java``` - -使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java``` - -使用对齐时间序列和元数据模板的示例可以参见 `example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java` \ No newline at end of file +### 3 全量接口说明 + +#### 3.1 参数列表 +Session具有如下的字段,可以通过构造函数或Session.Builder方式设置如下参数 + +| 字段名 | 类型 | 说明 | +|--------------------------------|-------------------------------|----------------------------------------------------------------------| +| `nodeUrls` | `List` | 数据库节点的 URL 列表,支持多节点连接 | +| `username` | `String` | 用户名 | +| `password` | `String` | 密码 | +| `fetchSize` | `int` | 查询结果的默认批量返回大小 | +| `useSSL` | `boolean` | 是否启用 SSL | +| `trustStore` | `String` | 信任库路径 | +| `trustStorePwd` | `String` | 信任库密码 | +| `queryTimeoutInMs` | `long` | 查询的超时时间,单位毫秒 | +| `enableRPCCompression` | `boolean` | 是否启用 RPC 压缩 | +| `connectionTimeoutInMs` | `int` | 连接超时时间,单位毫秒 | +| `zoneId` | `ZoneId` | 会话的时区设置 | +| `thriftDefaultBufferSize` | `int` | Thrift 默认缓冲区大小 | +| `thriftMaxFrameSize` | `int` | Thrift 最大帧大小 | +| `defaultEndPoint` | `TEndPoint` | 默认的数据库端点信息 | +| `defaultSessionConnection` | `SessionConnection` | 默认的会话连接对象 | +| `isClosed` | `boolean` | 当前会话是否已关闭 | +| `enableRedirection` | `boolean` | 是否启用重定向功能 | +| `enableRecordsAutoConvertTablet` | `boolean` | 是否启用记录自动转换为 Tablet 的功能 | +| `deviceIdToEndpoint` | `Map` | 设备 ID 和数据库端点的映射关系 | +| `endPointToSessionConnection` | `Map` | 数据库端点和会话连接的映射关系 | +| `executorService` | `ScheduledExecutorService` | 用于定期更新节点列表的线程池 | +| `availableNodes` | `INodeSupplier` | 可用节点的供应器 | +| `enableQueryRedirection` | `boolean` | 是否启用查询重定向功能 | +| `version` | `Version` | 客户端的版本号,用于与服务端的兼容性判断 | +| `enableAutoFetch` | `boolean` | 是否启用自动获取功能 | +| `maxRetryCount` | `int` | 最大重试次数 | +| `retryIntervalInMs` | `long` | 重试的间隔时间,单位毫秒 | + + + +#### 3.2 接口列表 + +##### 3.2.1 元数据管理 + +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `createDatabase(String database)` | 创建数据库 | `database`: 数据库名称 | +| `deleteDatabase(String database)` | 删除指定数据库 | `database`: 要删除的数据库名称 | +| `deleteDatabases(List databases)` | 批量删除数据库 | `databases`: 要删除的数据库名称列表 | +| `createTimeseries(String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor)` | 创建单个时间序列 | `path`: 时间序列路径,`dataType`: 数据类型,`encoding`: 编码类型,`compressor`: 压缩类型 | +| `createAlignedTimeseries(...)` | 创建对齐时间序列 | 设备ID、测点列表、数据类型列表、编码列表、压缩类型列表 | +| `createMultiTimeseries(...)` | 批量创建时间序列 | 多个路径、数据类型、编码、压缩类型、属性、标签、别名等 | +| `deleteTimeseries(String path)` | 删除时间序列 | `path`: 要删除的时间序列路径 | +| `deleteTimeseries(List paths)` | 批量删除时间序列 | `paths`: 要删除的时间序列路径列表 | +| `setSchemaTemplate(String templateName, String prefixPath)` | 设置模式模板 | `templateName`: 模板名称,`prefixPath`: 应用模板的路径 | +| `createSchemaTemplate(Template template)` | 创建模式模板 | `template`: 模板对象 | +| `dropSchemaTemplate(String templateName)` | 删除模式模板 | `templateName`: 要删除的模板名称 | +| `addAlignedMeasurementsInTemplate(...)` | 添加对齐测点到模板 | 模板名称、测点路径列表、数据类型、编码类型、压缩类型 | +| `addUnalignedMeasurementsInTemplate(...)` | 添加非对齐测点到模板 | 同上 | +| `deleteNodeInTemplate(String templateName, String path)` | 删除模板中的节点 | `templateName`: 模板名称,`path`: 要删除的路径 | +| `countMeasurementsInTemplate(String name)` | 统计模板中测点数量 | `name`: 模板名称 | +| `isMeasurementInTemplate(String templateName, String path)` | 检查模板中是否存在某测点 | `templateName`: 模板名称,`path`: 测点路径 | +| `isPathExistInTemplate(String templateName, String path)` | 检查模板中路径是否存在 | 同上 | +| `showMeasurementsInTemplate(String templateName)` | 显示模板中的测点 | `templateName`: 模板名称 | +| `showMeasurementsInTemplate(String templateName, String pattern)` | 按模式显示模板中的测点 | `templateName`: 模板名称,`pattern`: 匹配模式 | +| `showAllTemplates()` | 显示所有模板 | 无参数 | +| `showPathsTemplateSetOn(String templateName)` | 显示模板应用的路径 | `templateName`: 模板名称 | +| `showPathsTemplateUsingOn(String templateName)` | 显示模板实际使用的路径 | 同上 | +| `unsetSchemaTemplate(String prefixPath, String templateName)` | 取消路径的模板设置 | `prefixPath`: 路径,`templateName`: 模板名称 | + + +##### 3.2.2 数据写入 +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `insertRecord(String deviceId, long time, List measurements, List types, Object... values)` | 插入单条记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`types`: 数据类型列表,`values`: 值列表 | +| `insertRecord(String deviceId, long time, List measurements, List values)` | 插入单条记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`values`: 值列表 | +| `insertRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` | 插入多条记录 | `deviceIds`: 设备ID列表,`times`: 时间戳列表,`measurementsList`: 测点列表列表,`valuesList`: 值列表 | +| `insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多条记录 | 同上,增加 `typesList`: 数据类型列表 | +| `insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入单设备的多条记录 | `deviceId`: 设备ID,`times`: 时间戳列表,`measurementsList`: 测点列表列表,`typesList`: 类型列表,`valuesList`: 值列表 | +| `insertRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList, boolean haveSorted)` | 插入排序后的单设备多条记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList)` | 插入字符串格式的单设备记录 | `deviceId`: 设备ID,`times`: 时间戳列表,`measurementsList`: 测点列表,`valuesList`: 值列表 | +| `insertStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList, boolean haveSorted)` | 插入排序的字符串格式单设备记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertAlignedRecord(String deviceId, long time, List measurements, List types, List values)` | 插入单条对齐记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`types`: 类型列表,`values`: 值列表 | +| `insertAlignedRecord(String deviceId, long time, List measurements, List values)` | 插入字符串格式的单条对齐记录 | `deviceId`: 设备ID,`time`: 时间戳,`measurements`: 测点列表,`values`: 值列表 | +| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> valuesList)` | 插入多条对齐记录 | `deviceIds`: 设备ID列表,`times`: 时间戳列表,`measurementsList`: 测点列表,`valuesList`: 值列表 | +| `insertAlignedRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入多条对齐记录 | 同上,增加 `typesList`: 数据类型列表 | +| `insertAlignedRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList)` | 插入单设备的多条对齐记录 | 同上 | +| `insertAlignedRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> typesList, List> valuesList, boolean haveSorted)` | 插入排序的单设备多条对齐记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertAlignedStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList)` | 插入字符串格式的单设备对齐记录 | `deviceId`: 设备ID,`times`: 时间戳列表,`measurementsList`: 测点列表,`valuesList`: 值列表 | +| `insertAlignedStringRecordsOfOneDevice(String deviceId, List times, List> measurementsList, List> valuesList, boolean haveSorted)` | 插入排序的字符串格式单设备对齐记录 | 同上,增加 `haveSorted`: 数据是否已排序 | +| `insertTablet(Tablet tablet)` | 插入单个Tablet数据 | `tablet`: 要插入的Tablet数据 | +| `insertTablet(Tablet tablet, boolean sorted)` | 插入排序的Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | +| `insertAlignedTablet(Tablet tablet)` | 插入对齐的Tablet数据 | `tablet`: 要插入的Tablet数据 | +| `insertAlignedTablet(Tablet tablet, boolean sorted)` | 插入排序的对齐Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | +| `insertTablets(Map tablets)` | 批量插入多个Tablet数据 | `tablets`: 设备ID到Tablet的映射表 | +| `insertTablets(Map tablets, boolean sorted)` | 批量插入排序的多个Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | +| `insertAlignedTablets(Map tablets)` | 批量插入多个对齐Tablet数据 | `tablets`: 设备ID到Tablet的映射表 | +| `insertAlignedTablets(Map tablets, boolean sorted)` | 批量插入排序的多个对齐Tablet数据 | 同上,增加 `sorted`: 数据是否已排序 | + +##### 3.2.3 数据删除 + +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `deleteTimeseries(String path)` | 删除单个时间序列 | `path`: 时间序列路径 | +| `deleteTimeseries(List paths)` | 批量删除时间序列 | `paths`: 时间序列路径列表 | +| `deleteData(String path, long endTime)` | 删除指定路径的历史数据 | `path`: 路径,`endTime`: 结束时间戳 | +| `deleteData(List paths, long endTime)` | 批量删除路径的历史数据 | `paths`: 路径列表,`endTime`: 结束时间戳 | +| `deleteData(List paths, long startTime, long endTime)` | 删除路径时间范围内的历史数据 | 同上,增加 `startTime`: 起始时间戳 | + + +##### 3.2.4 数据查询 +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `executeQueryStatement(String sql)` | 执行查询语句 | `sql`: 查询SQL语句 | +| `executeQueryStatement(String sql, long timeoutInMs)` | 执行带超时的查询语句 | `sql`: 查询SQL语句,`timeoutInMs`: 查询超时时间(毫秒) | +| `executeRawDataQuery(List paths, long startTime, long endTime)` | 查询指定路径的原始数据 | `paths`: 查询路径列表,`startTime`: 起始时间戳,`endTime`: 结束时间戳 | +| `executeRawDataQuery(List paths, long startTime, long endTime, long timeOut)` | 查询指定路径的原始数据(带超时) | 同上,增加 `timeOut`: 超时时间 | +| `executeLastDataQuery(List paths)` | 查询最新数据 | `paths`: 查询路径列表 | +| `executeLastDataQuery(List paths, long lastTime)` | 查询指定时间的最新数据 | `paths`: 查询路径列表,`lastTime`: 指定的时间戳 | +| `executeLastDataQuery(List paths, long lastTime, long timeOut)` | 查询指定时间的最新数据(带超时) | 同上,增加 `timeOut`: 超时时间 | +| `executeLastDataQueryForOneDevice(String db, String device, List sensors, boolean isLegalPathNodes)` | 查询单个设备的最新数据 | `db`: 数据库名,`device`: 设备名,`sensors`: 传感器列表,`isLegalPathNodes`: 是否合法路径节点 | +| `executeAggregationQuery(List paths, List aggregations)` | 执行聚合查询 | `paths`: 查询路径列表,`aggregations`: 聚合类型列表 | +| `executeAggregationQuery(List paths, List aggregations, long startTime, long endTime)` | 执行带时间范围的聚合查询 | 同上,增加 `startTime`: 起始时间戳,`endTime`: 结束时间戳 | +| `executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval)` | 执行带时间间隔的聚合查询 | 同上,增加 `interval`: 时间间隔 | +| `executeAggregationQuery(List paths, List aggregations, long startTime, long endTime, long interval, long slidingStep)` | 执行滑动窗口聚合查询 | 同上,增加 `slidingStep`: 滑动步长 | +| `fetchAllConnections()` | 获取所有活动连接信息 | 无参数 | + +##### 3.2.5 系统状态与备份 +| 方法名 | 功能描述 | 参数解释 | +|-----------------------------------------------------------------------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------| +| `getBackupConfiguration()` | 获取备份配置信息 | 无参数 | +| `fetchAllConnections()` | 获取所有活动的连接信息 | 无参数 | +| `getSystemStatus()` | 获取系统状态 | 已废弃,默认返回 `SystemStatus.NORMAL` | \ No newline at end of file