From 32122b74cc8c34cd58c0dc4b55fd73b380d01816 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 22 Jul 2024 10:42:41 +0800 Subject: [PATCH 1/4] setup --- README-zh.md | 4 +- README.md | 4 +- src/.vuepress/sidebar/V1.3.x/en.ts | 1 + src/.vuepress/sidebar/V1.3.x/zh.ts | 1 + src/.vuepress/sidebar_timecho/V1.3.x/en.ts | 1 + src/.vuepress/sidebar_timecho/V1.3.x/zh.ts | 1 + .../latest/User-Manual/Data-Subscription.md | 441 ++++++++++++++++++ 7 files changed, 449 insertions(+), 4 deletions(-) create mode 100644 src/zh/UserGuide/latest/User-Manual/Data-Subscription.md diff --git a/README-zh.md b/README-zh.md index ad3475d3e..f8665e961 100644 --- a/README-zh.md +++ b/README-zh.md @@ -66,8 +66,8 @@ npm run build # 文档格式 -- 所有的md都会被编译成html,REDEME.md编译为index.html,xx.md编译为xx.html -- md内标签必须有开头有结尾,比如\必须有\与之对应,而且是严格对应;如果文档中需要插入标签,比如List\,可以加这个放入代码块中,也可以在**俩个**尖括号前加上\,如\<\\String\\> +- 所有的md都会被编译成html,README.md编译为index.html,xx.md编译为xx.html +- md内标签必须有开头有结尾,比如\必须有\与之对应,而且是严格对应;如果文档中需要插入标签,比如List\,可以加这个放入代码块中,也可以在**俩个**尖括号前加上\,如\\ - 标签不能交叉嵌套,比如\

\

\

\
这是不允许的 - 文章的一级标题就是该文档对应sidebar的标题,所以文档最大的章节标题就不需要了 diff --git a/README.md b/README.md index e1102f955..c5d290865 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,8 @@ If you have installed and the error still occurs, then `sudo xcode-select --rese # Document format -- All mds will be compiled into html, REDEME.md will be compiled into index.html, and xx.md will be compiled into xx.html -- The tags in .md must have a beginning and an end. For example, \ must have a \ corresponding to it, and it is strictly corresponding; if you need to insert tags in the document, such as List \, you can add this into the code block, You can also add \ before the two angle brackets, such as \<\\String\\> +- All mds will be compiled into html, README.md will be compiled into index.html, and xx.md will be compiled into xx.html +- The tags in .md must have a beginning and an end. For example, \ must have a \ corresponding to it, and it is strictly corresponding; if you need to insert tags in the document, such as List \, you can add this into the code block, You can also add \ before the two angle brackets, such as \\\ - Tags cannot be cross-nested, such as \

\

\

\
This is not allowed - The first-level title of the article is the title of the sidebar of the document, so the largest chapter title of the document is not needed diff --git a/src/.vuepress/sidebar/V1.3.x/en.ts b/src/.vuepress/sidebar/V1.3.x/en.ts index 14d53e2cc..c28a8c67c 100644 --- a/src/.vuepress/sidebar/V1.3.x/en.ts +++ b/src/.vuepress/sidebar/V1.3.x/en.ts @@ -89,6 +89,7 @@ export const enSidebar = { { text: 'Operator and Expression', link: 'Operator-and-Expression' }, { text: 'Streaming', link: 'Streaming' }, { text: 'Data Sync', link: 'Data-Sync' }, + { text: 'Data Subscription', link: 'Data-Subscription' }, { text: 'Database Programming', link: 'Database-Programming' }, { text: 'Authority Management', link: 'Authority-Management' }, { text: 'Maintennance', link: 'Maintennance' }, diff --git a/src/.vuepress/sidebar/V1.3.x/zh.ts b/src/.vuepress/sidebar/V1.3.x/zh.ts index 3ac77ae62..4e8edcbbd 100644 --- a/src/.vuepress/sidebar/V1.3.x/zh.ts +++ b/src/.vuepress/sidebar/V1.3.x/zh.ts @@ -89,6 +89,7 @@ export const zhSidebar = { { text: '运算符和表达式', link: 'Operator-and-Expression' }, { text: '流处理', link: 'Streaming' }, { text: '数据同步', link: 'Data-Sync' }, + { text: '数据订阅', link: 'Data-Subscription' }, { text: '数据库编程', link: 'Database-Programming' }, { text: '权限管理', link: 'Authority-Management' }, { text: '运维语句', link: 'Maintennance' }, diff --git a/src/.vuepress/sidebar_timecho/V1.3.x/en.ts b/src/.vuepress/sidebar_timecho/V1.3.x/en.ts index ab32a7009..4ad1f5eda 100644 --- a/src/.vuepress/sidebar_timecho/V1.3.x/en.ts +++ b/src/.vuepress/sidebar_timecho/V1.3.x/en.ts @@ -92,6 +92,7 @@ export const enSidebar = { { text: 'Operator and Expression', link: 'Operator-and-Expression' }, { text: 'Streaming', link: 'Streaming_timecho' }, { text: 'Data Sync', link: 'Data-Sync_timecho' }, + { text: 'Data Subscription', link: 'Data-Subscription' }, { text: 'Tiered Storage', link: 'Tiered-Storage_timecho' }, { text: 'View', link: 'IoTDB-View_timecho' }, { text: 'AINode', link: 'AINode_timecho' }, diff --git a/src/.vuepress/sidebar_timecho/V1.3.x/zh.ts b/src/.vuepress/sidebar_timecho/V1.3.x/zh.ts index 0ee45f6e8..65a3ac3e1 100644 --- a/src/.vuepress/sidebar_timecho/V1.3.x/zh.ts +++ b/src/.vuepress/sidebar_timecho/V1.3.x/zh.ts @@ -92,6 +92,7 @@ export const zhSidebar = { { text: '运算符和表达式', link: 'Operator-and-Expression' }, { text: '流处理', link: 'Streaming_timecho' }, { text: '数据同步', link: 'Data-Sync_timecho' }, + { text: '数据订阅', link: 'Data-Subscription' }, { text: '多级存储', link: 'Tiered-Storage_timecho' }, { text: '视图', link: 'IoTDB-View_timecho' }, { text: 'AINode', link: 'AINode_timecho' }, diff --git a/src/zh/UserGuide/latest/User-Manual/Data-Subscription.md b/src/zh/UserGuide/latest/User-Manual/Data-Subscription.md new file mode 100644 index 000000000..6dc11fed1 --- /dev/null +++ b/src/zh/UserGuide/latest/User-Manual/Data-Subscription.md @@ -0,0 +1,441 @@ + + +# 功能介绍 + +## 应用场景 + +IoTDB 数据订阅模块(**下称 IoTDB 订阅客户端**)虽然参考了部分消息队列产品如 Kafka 的功能定义,**但是 IoTDB 订阅客户端并不是为了替换这些消息队列产品,而是提供区别于数据查询的一种新的数据消费方式**。在下面应用场景中,使用 IoTDB 订阅客户端消费数据会有显著的优势: + +1. **替换大量数据的轮询查询**:避免查询频率高、测点多时,轮询查询对原有系统性能的较大影响;避免查询范围不好确定问题,可保证下游拿到准确全量的数据 +2. **方便下游进行系统集成**:更方便对接 Flink、Spark、Kafka / DataX、Camel / MySQL、PG 等系统组件。不需要为每一个大数据组件,单独定制开发 IoTDB 的变更捕获等逻辑,可简化集成组件设计,方便用户 +3. …… + +## 主要概念 + +**IoTDB 订阅客户端包含 3 个核心概念:Topic、Consumer、Consumer Group** + +1. **Topic(主题): Topic** 是 IoTDB 订阅客户端用于分类数据的逻辑概念,可以看作是数据的发布通道。生产者将数据发布到特定的主题,而消费者则订阅这些主题以接收相关数据。 +2. **Consumer(消费者): Consumer** 是 IoTDB 订阅客户端中的应用程序或服务,负责接收和处理发布到特定 **Topic** 的数据。**Consumer** 从队列中获取数据并进行相应的处理。在 IoTDB 订阅客户端中提供了两种类型的 **Consumers**: + 1. 一种是 `SubscriptionPullConsumer`,对应的是消息队列中的 pull 消费模式,用户代码需要主动调用数据获取逻辑 + 2. 一种是 `SubscriptionPushConsumer`,对应的是消息队列中的 push 消费模式,用户代码由新到达的数据事件触发 +3. **Consumer Group(消费者组): Consumer Group** 是一组 **Consumers** 的集合,当同一个 **Consumer Group** 中不同的 **Consumers 订阅了相同的 Topic 时,这些 Consumers** 便共享该 Topic 下数据的处理负载,该 Topic 下的每条数据只能被组内的一个 **Consumer** 处理,确保数据不会被重复处理(即一个消费者组中的所有消费者共享消费进度) + 1. 一个 consumer group 中的 consumers 可以有任意多个,允许 consumers 的动态增删 + 2. 一个 topic 不需要被一个 consumer group 中的所有 consumer 订阅,所有的 consumer 都可以按需订阅指定的 topics,即可以订阅多个 topics,可以动态的新增订阅或取消订阅 topics。一个 consumer 只有显式订阅了某个 topic,才会收到对应 topic 的数据 + 3. 允许 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 加入同一个 consumer group + 4. 不允许一个 consumer 同时加入多个不同的 consumer groups + +TODO: PIC + +# 语法说明 + +## Topic 管理 + +### 概念 + +- **基本概念**:Topic 是 IoTDB 订阅客户端中用于分类数据的逻辑概念,可以看作是数据的发布通道。生产者将数据发布到特定的主题,而消费者则订阅这些主题以接收相关数据。不同于 Kafka,在 IoTDB 订阅客户端中主题描述了订阅数据的**序列特征,时间特征,呈现形式,及可选的自定义处理逻辑** +- **表示方法:Topic 由下述几个部分定义** + - **Path**:表示一组需要订阅的时间序列集合 + - **Time**:表示需要订阅的时间序列的时间范围(event time) + - **Processor**:表示对原始订阅数据应用的自定义处理逻辑,可以通过类似 pipe processor 插件的方式指定 + - **Format**:表示从该主题订阅出的数据呈现形式,目前支持下述两种数据形式: + - `SessionDataSetsHandler`:使用 `SubscriptionSessionDataSetsHandler` 获取从该主题订阅出的数据,消费者可以按行消费每条数据 + - `TsFileHandler`:使用 `SubscriptionTsFileHandler` 获取从该主题订阅出的数据,消费者可以直接订阅到存储相应数据的 TsFile +- **生命周期管理** + - Topic 只有被创建了,才能被订阅 + - Topic 在没有被订阅的情况下,才能被删除,Topic 被删除时,其相关的消费进度都会被清理 + +### 管理方式 1: SQL + +1. 创建 Topic + +```SQL +CREATE TOPIC +WITH ( + [ = ,], +); +``` + +对于 parameter 和 value,定义如下: + +| **parameter** | **required or optional with default value** | **description** | +| :---------------- | :------------------------------------------ | :----------------------------------------------------------- | +| **path** | optional: `root.**` | topic 对应时间序列的路径 path | +| **start-time** | optional: `MIN_VALUE` | topic 对应订阅数据时间序列的开始时间(event time)可以为 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00;也可以为 long 值,含义为裸时间戳,单位与数据库时间戳精度一致。支持特殊 value **`now`**,含义为 topic 的创建时间,当 start-time 为 `now` 且 end-time 为 `MAX_VALUE` 时表示只订阅实时数据 | +| **end-time** | optional: `MAX_VALUE` | topic 对应订阅数据时间序列的结束时间(event time)可以为 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00;也可以为 long 值,含义为裸时间戳,单位与数据库时间戳精度一致。支持特殊 value **`now`**,含义为 topic 的创建时间,当 end-time 为 `now` 且 start-time 为 `MIN_VALUE` 时表示只订阅历史数据 | +| **processor** | optional: `do-nothing-processor` | processor 插件名及其插件配置 | +| **format** | optional: `SessionDataSetsHandler` | topic 对应的数据呈现形式,有两个选项:
1. `SessionDataSetsHandler`
2. `TsFileHandler` | +| **mode (v1.3.3.2 新增)** | option: `live` | topic 对应的订阅模式:
1. `live`:订阅该主题时,订阅的数据集模式为动态数据集,即可以不断消费到最新的数据
2. `snapshot`:consumer 订阅该主题时,订阅的数据集模式为静态数据集,即 consumer group 订阅该主题的时刻(不是创建主题的时刻)数据的 snapshot;形成订阅后的静态数据集不支持 TTL | +| **loose-range (v1.3.3.2 新增)** | option: `""` | String: 是否严格按照 path 和 time range 来筛选该 topic 对应的数据,例如:
1. `""`:严格按照 path 和 time range 来筛选该 topic 对应的数据
2. `"time"`:不严格按照 time range 来筛选该 topic 对应的数据(粗筛);严格按照 path 来筛选该 topic 对应的数据
3. `"path"`:不严格按照 path 来筛选该 topic 对应的数据(粗筛);严格按照 time range 来筛选该 topic 对应的数据
4. `"time, path"` / `"path, time"` / `"all"`:不严格按照 path 和 time range 来筛选该 topic 对应的数据(粗筛) | + +示例: + +```SQL +-- 全量订阅 +CREATE TOPIC root_all; + +-- 自定义订阅 +CREATE TOPIC sg_timerange +WITH ( + 'path' = 'root.sg.**', + 'start-time' = '2023-01-01', + 'end-time' = '2023-12-31', +); +``` + +2. 删除 Topic + +```SQL +DROP TOPIC ; +``` + +3. 获取 Topic + +```SQL +SHOW TOPICS; +SHOW TOPIC ; +``` + +### 管理方式 2: JAVA SDK + +IoTDB 订阅客户端中的 SubscriptionSession 类提供了 Topic 管理的相关接口: + +1. 创建 Topic + +```Java +Topic createTopic(String topicName, Properties properties) throws Exception; +``` + +示例: + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(topicName, config); +} +``` + +2. 删除 Topic + +```Java +Topic dropTopic(String topicName) throws Exception; +``` + +3. 获取 Topic + +```Java +// 获取所有 topics +Set getTopics() throws Exception; + +// 获取单个 topic +Optional getTopic(String topicName) throws Exception; +``` + +## Consumer 操作 + +**在 IoTDB 订阅客户端中只能通过 JAVA SDK 的方式进行 consumer 相关的操作。** + +**本节介绍创建 consumer,订阅 topic,消费数据和取消订阅 4 种操作。** + +### 创建 Consumer + +在使用 JAVA SDK 创建 consumer 时,需要指定 consumer 所应用的参数。 + +对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: + +| key | **required or optional with default** | description | +| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | String: IoTDB 中某 DataNode 的 RPC host | +| port | optional: 6667 | Integer: IoTDB 中某 DataNode 的 RPC port | +| node-urls | optional: 127.0.0.1:6667 | List\: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | +| username | optional: root | String: IoTDB 中 DataNode 的用户名 | +| password | optional: root | String: IoTDB 中 DataNode 的密码 | +| groupId | optional | String: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | +| consumerId | optional | String: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | Long: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | Long: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | String: consumer 订阅出的 TsFile 文件临时存放的目录路径 | +| fileSaveFsync | optional: false | Boolean: consumer 订阅 TsFile 的过程中是否主动调用 fsync | + +#### SubscriptionPushConsumer + +以下为 `SubscriptionPushConsumer` 中的特殊配置: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:
1. `ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)
2. `ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | +| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | + +其中,`ConsumerListener` 接口定义如下: + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +#### SubscriptionPullConsumer + +以下为 `SubscriptionPullConsumer` 中的特殊配置: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: 是否自动提交消费进度
如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | +| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**
仅当 autoCommit 参数为 true 的时候才会生效 | + +在创建 consumer 后,需要手动调用 consumer 的 open 方法: + +```Java +void open() throws Exception; +``` + +此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 + +### 订阅 Topic + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA SDK 用于订阅 Topics: + +```Java +// 订阅 topics +void subscribe(String topic) throws Exception; +void subscribe(List<\String\> topics) throws Exception; +``` + +- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 +- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 +- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 + +### 消费数据 + +**无论是 push 模式还是 pull 模式的 consumer:** + +- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 +- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics + +#### SubscriptionPushConsumer + +SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 + +#### SubscriptionPullConsumer + +SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 + +当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +AsyncCommitCallback 类定义如下: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +### 取消订阅 + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA SDK 用于取消订阅并关闭 consumer: + +```Java +// 取消订阅 topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List\ topics) throws Exception; + +// 关闭 consumer +void close(); +``` + +- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 +- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics +- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 + +## 订阅状态获取 + +### 获取方式 1: SQL + +用户可以使用 SQL 查询 IoTDB 订阅客户端中目前存在的订阅关系: + +```SQL +-- 查询所有的 topics 与 consumer group 的订阅关系 +SHOW SUBSCRIPTIONS +-- 查询某个 topic 下所有的 subscriptions +SHOW SUBSCRIPTIONS ON +``` + +结果集: + +```SQL +[TopicName|ConsumerGroupName|SubscribedConsumers] +``` + +- TopicName:主题 ID +- ConsumerGroupName:用户代码中指定的消费者组 ID +- SubscribedConsumers:该消费者组中订阅了该主题的所有客户端 ID + +### 获取方式 2: JAVA SDK + +IoTDB 订阅客户端中的 SubscriptionSession 类提供了获取订阅状态的相关接口: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +# 代码示例 (JAVA SDK) + +## 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +## 多 Push Consumer 消费 TsFileHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + +# FAQ + +## IoTDB 数据订阅与 Kafka 的区别是什么? + +### 消费有序性 + +- **Kafka 保证消息在单个 partition 内是有序的**,当某个 topic 仅对应一个 partition 且只有一个 consumer 订阅了这个 topic,即可保证该 consumer(单线程) 消费该 topic 数据的顺序即为数据写入的顺序。 +- IoTDB 订阅客户端**不保证** consumer 消费数据的顺序即为数据写入的顺序,但会尽量反映数据写入的顺序。 + +### 消息送达语义 + +- Kafka 可以通过配置实现 Producer 和 Consumer 的 Exactly once 语义。 +- IoTDB 订阅客户端目前无法提供 Consumer 的 Exactly once 语义。 \ No newline at end of file From d5486eba4401a85baf5d881a1f597f682d6cd0a2 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 22 Jul 2024 10:45:43 +0800 Subject: [PATCH 2/4] cp --- .../Master/User-Manual/Data-Subscription.md | 441 ++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 src/zh/UserGuide/Master/User-Manual/Data-Subscription.md diff --git a/src/zh/UserGuide/Master/User-Manual/Data-Subscription.md b/src/zh/UserGuide/Master/User-Manual/Data-Subscription.md new file mode 100644 index 000000000..6dc11fed1 --- /dev/null +++ b/src/zh/UserGuide/Master/User-Manual/Data-Subscription.md @@ -0,0 +1,441 @@ + + +# 功能介绍 + +## 应用场景 + +IoTDB 数据订阅模块(**下称 IoTDB 订阅客户端**)虽然参考了部分消息队列产品如 Kafka 的功能定义,**但是 IoTDB 订阅客户端并不是为了替换这些消息队列产品,而是提供区别于数据查询的一种新的数据消费方式**。在下面应用场景中,使用 IoTDB 订阅客户端消费数据会有显著的优势: + +1. **替换大量数据的轮询查询**:避免查询频率高、测点多时,轮询查询对原有系统性能的较大影响;避免查询范围不好确定问题,可保证下游拿到准确全量的数据 +2. **方便下游进行系统集成**:更方便对接 Flink、Spark、Kafka / DataX、Camel / MySQL、PG 等系统组件。不需要为每一个大数据组件,单独定制开发 IoTDB 的变更捕获等逻辑,可简化集成组件设计,方便用户 +3. …… + +## 主要概念 + +**IoTDB 订阅客户端包含 3 个核心概念:Topic、Consumer、Consumer Group** + +1. **Topic(主题): Topic** 是 IoTDB 订阅客户端用于分类数据的逻辑概念,可以看作是数据的发布通道。生产者将数据发布到特定的主题,而消费者则订阅这些主题以接收相关数据。 +2. **Consumer(消费者): Consumer** 是 IoTDB 订阅客户端中的应用程序或服务,负责接收和处理发布到特定 **Topic** 的数据。**Consumer** 从队列中获取数据并进行相应的处理。在 IoTDB 订阅客户端中提供了两种类型的 **Consumers**: + 1. 一种是 `SubscriptionPullConsumer`,对应的是消息队列中的 pull 消费模式,用户代码需要主动调用数据获取逻辑 + 2. 一种是 `SubscriptionPushConsumer`,对应的是消息队列中的 push 消费模式,用户代码由新到达的数据事件触发 +3. **Consumer Group(消费者组): Consumer Group** 是一组 **Consumers** 的集合,当同一个 **Consumer Group** 中不同的 **Consumers 订阅了相同的 Topic 时,这些 Consumers** 便共享该 Topic 下数据的处理负载,该 Topic 下的每条数据只能被组内的一个 **Consumer** 处理,确保数据不会被重复处理(即一个消费者组中的所有消费者共享消费进度) + 1. 一个 consumer group 中的 consumers 可以有任意多个,允许 consumers 的动态增删 + 2. 一个 topic 不需要被一个 consumer group 中的所有 consumer 订阅,所有的 consumer 都可以按需订阅指定的 topics,即可以订阅多个 topics,可以动态的新增订阅或取消订阅 topics。一个 consumer 只有显式订阅了某个 topic,才会收到对应 topic 的数据 + 3. 允许 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 加入同一个 consumer group + 4. 不允许一个 consumer 同时加入多个不同的 consumer groups + +TODO: PIC + +# 语法说明 + +## Topic 管理 + +### 概念 + +- **基本概念**:Topic 是 IoTDB 订阅客户端中用于分类数据的逻辑概念,可以看作是数据的发布通道。生产者将数据发布到特定的主题,而消费者则订阅这些主题以接收相关数据。不同于 Kafka,在 IoTDB 订阅客户端中主题描述了订阅数据的**序列特征,时间特征,呈现形式,及可选的自定义处理逻辑** +- **表示方法:Topic 由下述几个部分定义** + - **Path**:表示一组需要订阅的时间序列集合 + - **Time**:表示需要订阅的时间序列的时间范围(event time) + - **Processor**:表示对原始订阅数据应用的自定义处理逻辑,可以通过类似 pipe processor 插件的方式指定 + - **Format**:表示从该主题订阅出的数据呈现形式,目前支持下述两种数据形式: + - `SessionDataSetsHandler`:使用 `SubscriptionSessionDataSetsHandler` 获取从该主题订阅出的数据,消费者可以按行消费每条数据 + - `TsFileHandler`:使用 `SubscriptionTsFileHandler` 获取从该主题订阅出的数据,消费者可以直接订阅到存储相应数据的 TsFile +- **生命周期管理** + - Topic 只有被创建了,才能被订阅 + - Topic 在没有被订阅的情况下,才能被删除,Topic 被删除时,其相关的消费进度都会被清理 + +### 管理方式 1: SQL + +1. 创建 Topic + +```SQL +CREATE TOPIC +WITH ( + [ = ,], +); +``` + +对于 parameter 和 value,定义如下: + +| **parameter** | **required or optional with default value** | **description** | +| :---------------- | :------------------------------------------ | :----------------------------------------------------------- | +| **path** | optional: `root.**` | topic 对应时间序列的路径 path | +| **start-time** | optional: `MIN_VALUE` | topic 对应订阅数据时间序列的开始时间(event time)可以为 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00;也可以为 long 值,含义为裸时间戳,单位与数据库时间戳精度一致。支持特殊 value **`now`**,含义为 topic 的创建时间,当 start-time 为 `now` 且 end-time 为 `MAX_VALUE` 时表示只订阅实时数据 | +| **end-time** | optional: `MAX_VALUE` | topic 对应订阅数据时间序列的结束时间(event time)可以为 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00;也可以为 long 值,含义为裸时间戳,单位与数据库时间戳精度一致。支持特殊 value **`now`**,含义为 topic 的创建时间,当 end-time 为 `now` 且 start-time 为 `MIN_VALUE` 时表示只订阅历史数据 | +| **processor** | optional: `do-nothing-processor` | processor 插件名及其插件配置 | +| **format** | optional: `SessionDataSetsHandler` | topic 对应的数据呈现形式,有两个选项:
1. `SessionDataSetsHandler`
2. `TsFileHandler` | +| **mode (v1.3.3.2 新增)** | option: `live` | topic 对应的订阅模式:
1. `live`:订阅该主题时,订阅的数据集模式为动态数据集,即可以不断消费到最新的数据
2. `snapshot`:consumer 订阅该主题时,订阅的数据集模式为静态数据集,即 consumer group 订阅该主题的时刻(不是创建主题的时刻)数据的 snapshot;形成订阅后的静态数据集不支持 TTL | +| **loose-range (v1.3.3.2 新增)** | option: `""` | String: 是否严格按照 path 和 time range 来筛选该 topic 对应的数据,例如:
1. `""`:严格按照 path 和 time range 来筛选该 topic 对应的数据
2. `"time"`:不严格按照 time range 来筛选该 topic 对应的数据(粗筛);严格按照 path 来筛选该 topic 对应的数据
3. `"path"`:不严格按照 path 来筛选该 topic 对应的数据(粗筛);严格按照 time range 来筛选该 topic 对应的数据
4. `"time, path"` / `"path, time"` / `"all"`:不严格按照 path 和 time range 来筛选该 topic 对应的数据(粗筛) | + +示例: + +```SQL +-- 全量订阅 +CREATE TOPIC root_all; + +-- 自定义订阅 +CREATE TOPIC sg_timerange +WITH ( + 'path' = 'root.sg.**', + 'start-time' = '2023-01-01', + 'end-time' = '2023-12-31', +); +``` + +2. 删除 Topic + +```SQL +DROP TOPIC ; +``` + +3. 获取 Topic + +```SQL +SHOW TOPICS; +SHOW TOPIC ; +``` + +### 管理方式 2: JAVA SDK + +IoTDB 订阅客户端中的 SubscriptionSession 类提供了 Topic 管理的相关接口: + +1. 创建 Topic + +```Java +Topic createTopic(String topicName, Properties properties) throws Exception; +``` + +示例: + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(topicName, config); +} +``` + +2. 删除 Topic + +```Java +Topic dropTopic(String topicName) throws Exception; +``` + +3. 获取 Topic + +```Java +// 获取所有 topics +Set getTopics() throws Exception; + +// 获取单个 topic +Optional getTopic(String topicName) throws Exception; +``` + +## Consumer 操作 + +**在 IoTDB 订阅客户端中只能通过 JAVA SDK 的方式进行 consumer 相关的操作。** + +**本节介绍创建 consumer,订阅 topic,消费数据和取消订阅 4 种操作。** + +### 创建 Consumer + +在使用 JAVA SDK 创建 consumer 时,需要指定 consumer 所应用的参数。 + +对于 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 而言,有以下公共配置: + +| key | **required or optional with default** | description | +| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | String: IoTDB 中某 DataNode 的 RPC host | +| port | optional: 6667 | Integer: IoTDB 中某 DataNode 的 RPC port | +| node-urls | optional: 127.0.0.1:6667 | List\: IoTDB 中所有 DataNode 的 RPC 地址,可以是多个;host:port 和 node-urls 选填一个即可。当 host:port 和 node-urls 都填写了,则取 host:port 和 node-urls 的**并集**构成新的 node-urls 应用 | +| username | optional: root | String: IoTDB 中 DataNode 的用户名 | +| password | optional: root | String: IoTDB 中 DataNode 的密码 | +| groupId | optional | String: consumer group id,若未指定则随机分配(新的 consumer group),保证不同的 consumer group 对应的 consumer group id 均不相同 | +| consumerId | optional | String: consumer client id,若未指定则随机分配,保证同一个 consumer group 中每一个 consumer client id 均不相同 | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | Long: consumer 向 IoTDB DataNode 定期发送心跳请求的间隔 | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | Long: consumer 探测 IoTDB 集群节点扩缩容情况调整订阅连接的间隔 | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | String: consumer 订阅出的 TsFile 文件临时存放的目录路径 | +| fileSaveFsync | optional: false | Boolean: consumer 订阅 TsFile 的过程中是否主动调用 fsync | + +#### SubscriptionPushConsumer + +以下为 `SubscriptionPushConsumer` 中的特殊配置: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | 消费进度的确认机制包含以下选项:
1. `ACKStrategy.BEFORE_CONSUME`(当 consumer 收到数据时立刻提交消费进度,`onReceive` 前)
2. `ACKStrategy.AFTER_CONSUME`(当 consumer 消费完数据再去提交消费进度,`onReceive` 后) | +| consumeListener | optional | 消费数据的回调函数,需实现 `ConsumeListener` 接口,定义消费 `SessionDataSetsHandler` 和 `TsFileHandler` 形式数据的处理逻辑 | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: consumer 自动拉取数据的时间间隔,单位为**毫秒** | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: consumer 每次拉取数据的超时时间,单位为**毫秒** | + +其中,`ConsumerListener` 接口定义如下: + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +#### SubscriptionPullConsumer + +以下为 `SubscriptionPullConsumer` 中的特殊配置: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: 是否自动提交消费进度
如果此参数设置为 false,则需要调用 `commit` 方法来手动提交消费进度 | +| autoCommitInterval | optional: 5000 (min: 500) | Long: 自动提交消费进度的时间间隔,单位为**毫秒**
仅当 autoCommit 参数为 true 的时候才会生效 | + +在创建 consumer 后,需要手动调用 consumer 的 open 方法: + +```Java +void open() throws Exception; +``` + +此时,IoTDB 订阅客户端才会校验 consumer 的配置正确性,在校验成功后 consumer 就会加入对应的 consumer group。也就是说,在打开 consumer 后,才可以使用返回的 consumer 对象进行订阅 Topic,消费数据等操作。 + +### 订阅 Topic + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA SDK 用于订阅 Topics: + +```Java +// 订阅 topics +void subscribe(String topic) throws Exception; +void subscribe(List<\String\> topics) throws Exception; +``` + +- 在 consumer 订阅 topic 前,topic 必须已经被创建,否则订阅会失败 +- 一个 consumer 在已经订阅了某个 topic 的情况下再次订阅这个 topic,不会报错 +- 如果该 consumer 所在的 consumer group 中已经有 consumers 订阅了相同的 topics,那么该 consumer 将会复用对应的消费进度 + +### 消费数据 + +**无论是 push 模式还是 pull 模式的 consumer:** + +- 只有显式订阅了某个 topic,才会收到对应 topic 的数据 +- 若在创建后没有订阅任何 topics,此时该 consumer 无法消费到任何数据,即使该 consumer 所在的 consumer group 中其它的 consumers 订阅了一些 topics + +#### SubscriptionPushConsumer + +SubscriptionPushConsumer 在订阅 topics 后,无需手动拉取数据,其消费数据的逻辑在创建 SubscriptionPushConsumer 指定的 `consumeListener` 配置中。 + +#### SubscriptionPullConsumer + +SubscriptionPullConsumer 在订阅 topics 后,需要主动调用 `poll` 方法拉取数据: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +在 poll 方法中可以指定需要拉取的 topic 名称(如果不指定则默认拉取该 consumer 已订阅的所有 topics)和超时时间。 + +当 SubscriptionPullConsumer 配置 autoCommit 参数为 false 时,需要手动调用 commitSync 和 commitAsync 方法同步或异步提交某批数据的消费进度: + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +AsyncCommitCallback 类定义如下: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +### 取消订阅 + +`SubscriptionPushConsumer` 和 `SubscriptionPullConsumer` 提供了下述 JAVA SDK 用于取消订阅并关闭 consumer: + +```Java +// 取消订阅 topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List\ topics) throws Exception; + +// 关闭 consumer +void close(); +``` + +- 在 topic 存在的情况下,如果一个 consumer 在没有订阅了某个 topic 的情况下取消订阅某个 topic,不会报错 +- consumer close 时会退出对应的 consumer group,同时自动 unsubscribe 该 consumer 现存订阅的所有 topics +- consumer 在 close 后生命周期即结束,无法再重新 open 订阅并消费数据 + +## 订阅状态获取 + +### 获取方式 1: SQL + +用户可以使用 SQL 查询 IoTDB 订阅客户端中目前存在的订阅关系: + +```SQL +-- 查询所有的 topics 与 consumer group 的订阅关系 +SHOW SUBSCRIPTIONS +-- 查询某个 topic 下所有的 subscriptions +SHOW SUBSCRIPTIONS ON +``` + +结果集: + +```SQL +[TopicName|ConsumerGroupName|SubscribedConsumers] +``` + +- TopicName:主题 ID +- ConsumerGroupName:用户代码中指定的消费者组 ID +- SubscribedConsumers:该消费者组中订阅了该主题的所有客户端 ID + +### 获取方式 2: JAVA SDK + +IoTDB 订阅客户端中的 SubscriptionSession 类提供了获取订阅状态的相关接口: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +# 代码示例 (JAVA SDK) + +## 单 Pull Consumer 消费 SessionDataSetsHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +## 多 Push Consumer 消费 TsFileHandler 形式的数据 + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + +# FAQ + +## IoTDB 数据订阅与 Kafka 的区别是什么? + +### 消费有序性 + +- **Kafka 保证消息在单个 partition 内是有序的**,当某个 topic 仅对应一个 partition 且只有一个 consumer 订阅了这个 topic,即可保证该 consumer(单线程) 消费该 topic 数据的顺序即为数据写入的顺序。 +- IoTDB 订阅客户端**不保证** consumer 消费数据的顺序即为数据写入的顺序,但会尽量反映数据写入的顺序。 + +### 消息送达语义 + +- Kafka 可以通过配置实现 Producer 和 Consumer 的 Exactly once 语义。 +- IoTDB 订阅客户端目前无法提供 Consumer 的 Exactly once 语义。 \ No newline at end of file From 56e520d6ad72e402df7267a77992e83ea747621c Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 22 Jul 2024 11:05:15 +0800 Subject: [PATCH 3/4] add EN trans --- .../Master/User-Manual/Data-Subscription.md | 442 ++++++++++++++++++ .../latest/User-Manual/Data-Subscription.md | 442 ++++++++++++++++++ 2 files changed, 884 insertions(+) create mode 100644 src/UserGuide/Master/User-Manual/Data-Subscription.md create mode 100644 src/UserGuide/latest/User-Manual/Data-Subscription.md diff --git a/src/UserGuide/Master/User-Manual/Data-Subscription.md b/src/UserGuide/Master/User-Manual/Data-Subscription.md new file mode 100644 index 000000000..b3f555d9c --- /dev/null +++ b/src/UserGuide/Master/User-Manual/Data-Subscription.md @@ -0,0 +1,442 @@ + + +# Feature Introduction + +## Application Scenarios + +Although the IoTDB data subscription module (**hereinafter referred to as the IoTDB subscription client**) refers to some functions of message queue products such as Kafka, **the IoTDB subscription client is not intended to replace these message queue products but to provide a new way of data consumption different from data query**. In the following application scenarios, using the IoTDB subscription client to consume data will have significant advantages: + +1. **Replace polling queries for a large amount of data**: Avoid the significant impact on the performance of the original system when the query frequency is high and there are many measurement points; avoid the problem of uncertain query scope, and ensure that downstream obtains accurate and complete data. +2. **Facilitate downstream system integration**: Easier to connect with Flink, Spark, Kafka / DataX, Camel / MySQL, PG, and other system components. There is no need to customize the development of IoTDB's change capture logic for each big data component separately, which can simplify the design of integration components and facilitate users. +3. …… + +## Key Concepts + +**The IoTDB subscription client includes 3 core concepts: Topic, Consumer, and Consumer Group** + +1. **Topic**: Topic is a logical concept used by the IoTDB subscription client to classify data and can be regarded as a data publishing channel. Producers publish data to specific topics, and consumers subscribe to these topics to receive relevant data. +2. **Consumer**: Consumer is an application or service in the IoTDB subscription client responsible for receiving and processing data published to a specific **Topic**. **Consumers** fetch data from the queue and process it accordingly. The IoTDB subscription client provides two types of **Consumers**: + 1. `SubscriptionPullConsumer`, corresponding to the pull consumption mode in message queues, where user code needs to actively call data fetching logic. + 2. `SubscriptionPushConsumer`, corresponding to the push consumption mode in message queues, where user code is triggered by new data arrival events. +3. **Consumer Group**: A Consumer Group is a collection of **Consumers**. When different **Consumers** in the same **Consumer Group** subscribe to the same **Topic**, these **Consumers** share the processing load of data under that **Topic**. Each piece of data under that **Topic** can only be processed by one **Consumer** within the group, ensuring that data will not be processed repeatedly (i.e., all consumers in a consumer group share the consumption progress). + 1. There can be any number of consumers in a consumer group, allowing for dynamic addition and removal of consumers. + 2. A topic does not need to be subscribed to by all consumers in a consumer group; all consumers can subscribe to specified topics as needed, meaning they can subscribe to multiple topics and dynamically add or cancel topic subscriptions. A consumer will only receive data from a topic if it has explicitly subscribed to that topic. + 3. Both `SubscriptionPullConsumer` and `SubscriptionPushConsumer` are allowed to join the same consumer group. + 4. A consumer is not allowed to join multiple different consumer groups simultaneously. + +TODO: PIC + +# Syntax Description + +## Topic Management + +### Concept + +- **Basic Concept**: Topic is a logical concept used by the IoTDB subscription client to classify data and can be regarded as a data publishing channel. Producers publish data to specific topics, and consumers subscribe to these topics to receive relevant data. Unlike Kafka, in the IoTDB subscription client, a topic describes the **sequence characteristics, time characteristics, presentation form, and optional custom processing logic** of the subscribed data. +- **Representation Method**: A topic is defined by the following parts: + - **Path**: Indicates a set of time series collections to be subscribed to. + - **Time**: Indicates the time range of the time series to be subscribed to (event time). + - **Processor**: Indicates the custom processing logic applied to the original subscription data, which can be specified through a plugin similar to a pipe processor. + - **Format**: Indicates the presentation form of the data subscribed from this topic. Currently, two data forms are supported: + - `SessionDataSetsHandler`: Use `SubscriptionSessionDataSetsHandler` to get data subscribed from this topic, allowing consumers to consume each piece of data row by row. + - `TsFileHandler`: Use `SubscriptionTsFileHandler` to get data subscribed from this topic, allowing consumers to directly subscribe to TsFiles storing the corresponding data. +- **Lifecycle Management** + - A topic can only be subscribed to after it has been created. + - A topic can only be deleted when it is not being subscribed to, and its related consumption progress will be cleared upon deletion. + +### Management Method 1: SQL + +1. Create Topic + +```SQL +CREATE TOPIC +WITH ( + [ = ,], +); +``` + +For the parameter and value, the definitions are as follows: + +| **parameter** | **required or optional with default value** | **description** | +| :---------------- | :------------------------------------------ | :----------------------------------------------------------- | +| **path** | optional: `root.**` | The path of the time series corresponding to the topic. | +| **start-time** | optional: `MIN_VALUE` | The start time of the subscribed data time series of the topic (event time), which can be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00; or a long value, meaning a raw timestamp, with units consistent with the database timestamp precision. Supports the special value **`now`**, meaning the creation time of the topic. When `start-time` is `now` and `end-time` is `MAX_VALUE`, it indicates subscribing only to real-time data. | +| **end-time** | optional: `MAX_VALUE` | The end time of the subscribed data time series of the topic (event time), which can be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00; or a long value, meaning a raw timestamp, with units consistent with the database timestamp precision. Supports the special value **`now`**, meaning the creation time of the topic. When `end-time` is `now` and `start-time` is `MIN_VALUE`, it indicates subscribing only to historical data. | +| **processor** | optional: `do-nothing-processor` | The processor plugin name and its plugin configuration. | +| **format** | optional: `SessionDataSetsHandler` | The data presentation format corresponding to the topic, with two options:
1. `SessionDataSetsHandler`
2. `TsFileHandler` | +| **mode (new in v1.3.3.2)** | option: `live` | The subscription mode corresponding to the topic:
1. `live`: When subscribing to this topic, the data set mode is a dynamic data set, meaning it can continuously consume the latest data.
2. `snapshot`: When the consumer subscribes to this topic, the data set mode is a static data set, meaning the snapshot of the data at the time the consumer group subscribes to this topic (not the creation time of the topic). The static data set formed after subscription does not support TTL. | +| **loose-range (new in v1.3.3.2)** | option: `""` | String: Whether to strictly filter the data corresponding to the topic according to the path and time range, for example:
1. `""`: Strictly filter the data corresponding to the topic according to the path and time range.
2. `"time"`: Do not strictly filter the data corresponding to the topic according to the time range (coarse filter); strictly filter the data corresponding to the topic according to the path.
3. `"path"`: Do not strictly filter the data corresponding to the topic according to the path (coarse filter); strictly filter the data corresponding to the topic according to the time range.
4. `"time, path"` / `"path, time"` / `"all"`: Do not strictly filter the data corresponding to the topic according to the path and time range (coarse filter). | + +Example: + +```SQL +-- Full subscription +CREATE TOPIC root_all; + +-- Custom subscription +CREATE TOPIC sg_timerange +WITH ( + 'path' = 'root.sg.**', + 'start-time' = '2023-01-01', + 'end-time' = '2023-12-31', +); +``` + +2. Delete Topic + +```SQL +DROP TOPIC ; +``` + +3. Get Topic + +```SQL +SHOW TOPICS; +SHOW TOPIC ; +``` + +### Management Method 2: JAVA SDK + +The SubscriptionSession class in the IoTDB subscription client provides related interfaces for Topic management: + +1. Create Topic + +```Java +Topic createTopic(String topicName, Properties properties) throws Exception; +``` + +Example: + + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(topicName, config); +} +``` + +2. Delete Topic + +```Java +Topic dropTopic(String topicName) throws Exception; +``` + +3. Get Topic + +```Java +// Get all topics +Set getTopics() throws Exception; + +// Get a single topic +Optional getTopic(String topicName) throws Exception; +``` + +## Consumer Operations + +**In the IoTDB subscription client, consumer-related operations can only be performed using the JAVA SDK.** + +**This section introduces four operations: creating a consumer, subscribing to a topic, consuming data, and unsubscribing.** + +### Creating a Consumer + +When using the JAVA SDK to create a consumer, you need to specify the parameters applied to the consumer. + +For `SubscriptionPullConsumer` and `SubscriptionPushConsumer`, the following common configurations are available: + +| key | **required or optional with default** | description | +| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | String: RPC host of a DataNode in IoTDB | +| port | optional: 6667 | Integer: RPC port of a DataNode in IoTDB | +| node-urls | optional: 127.0.0.1:6667 | List\: RPC addresses of all DataNodes in IoTDB; can be multiple; either host:port or node-urls can be specified. If both are specified, the union of host:port and node-urls will be used as the new node-urls | +| username | optional: root | String: Username of the DataNode in IoTDB | +| password | optional: root | String: Password of the DataNode in IoTDB | +| groupId | optional | String: Consumer group ID. If not specified, a new consumer group will be randomly assigned, ensuring different consumer groups have unique IDs | +| consumerId | optional | String: Consumer client ID. If not specified, a unique ID will be randomly assigned within the same consumer group | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | Long: Interval for the consumer to send heartbeat requests to the IoTDB DataNode | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | Long: Interval for the consumer to detect cluster node expansion or reduction and adjust subscription connections | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | String: Directory path where the consumer temporarily stores the subscribed TsFile files | +| fileSaveFsync | optional: false | Boolean: Whether the consumer actively calls fsync during the TsFile subscription process | + +#### SubscriptionPushConsumer + +The following are special configurations for `SubscriptionPushConsumer`: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | Acknowledgment strategy for consumption progress includes the following options:
1. `ACKStrategy.BEFORE_CONSUME` (Submit consumption progress immediately when the consumer receives data, before `onReceive`)
2. `ACKStrategy.AFTER_CONSUME` (Submit consumption progress after the consumer finishes consuming data, after `onReceive`) | +| consumeListener | optional | Callback function for consuming data, must implement the `ConsumeListener` interface, defining the logic for handling data in `SessionDataSetsHandler` and `TsFileHandler` formats | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: Time interval for the consumer to automatically pull data, in milliseconds | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: Timeout for the consumer to pull data each time, in milliseconds | + +The `ConsumerListener` interface is defined as follows: + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +#### SubscriptionPullConsumer + +The following are the special configurations for `SubscriptionPullConsumer`: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: Whether to automatically commit consumption progress
If this parameter is set to false, the `commit` method needs to be called to manually commit the consumption progress | +| autoCommitInterval | optional: 5000 (min: 500) | Long: The interval for automatically committing consumption progress, in **milliseconds**
This is only effective when the autoCommit parameter is set to true | + +After creating the consumer, you need to manually call the open method of the consumer: + +```Java +void open() throws Exception; +``` + +At this point, the IoTDB subscription client will verify the correctness of the consumer's configuration. Once verified successfully, the consumer will join the corresponding consumer group. This means that after opening the consumer, you can use the returned consumer object to subscribe to Topics, consume data, and perform other operations. + +### Subscribing to Topics + +`SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA SDK for subscribing to Topics: + +```Java +// Subscribe to topics +void subscribe(String topic) throws Exception; +void subscribe(List<\String\> topics) throws Exception; +``` + +- Before a consumer subscribes to a topic, the topic must have been created; otherwise, the subscription will fail. +- If a consumer subscribes to a topic that it has already subscribed to, no error will be thrown. +- If other consumers in the same consumer group have already subscribed to the same topics, the consumer will reuse the corresponding consumption progress. + +### Consuming Data + +**For both push mode and pull mode consumers:** + +- A consumer will only receive data for a topic if it has explicitly subscribed to that topic. +- If a consumer has not subscribed to any topics after being created, it will not be able to consume any data, even if other consumers in the same consumer group have subscribed to some topics. + +#### SubscriptionPushConsumer + +After subscribing to topics, a `SubscriptionPushConsumer` does not need to manually pull data; the logic for consuming data is specified in the consumeListener configuration when creating the `SubscriptionPushConsumer`. + +#### SubscriptionPullConsumer + +After subscribing to topics, a `SubscriptionPullConsumer` needs to actively call the `poll` method to pull data: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +In the `poll` method, you can specify the topic names to pull (if not specified, it will default to pulling all the topics that the consumer has subscribed to) and the timeout duration. + +When the SubscriptionPullConsumer is configured with the autoCommit parameter set to false, the `commitSync` and `commitAsync` methods need to be called manually to synchronously or asynchronously commit the consumption progress of a batch of data: + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +The `AsyncCommitCallback` class is defined as follows: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +### Unsubscribing + +`SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA SDK for unsubscribing and closing consumers: + +```Java +// Unsubscribe from topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List\ topics) throws Exception; + +// Close the consumer +void close(); +``` + +- If a consumer unsubscribes from a topic that it has not subscribed to, no error will be thrown. +- When a consumer closes, it will exit the corresponding consumer group and automatically unsubscribe from all the topics it has subscribed to. +- After the consumer is closed, its lifecycle ends, and it cannot be reopened to subscribe and consume data again. + +## Subscription Status Retrieval + +### Method 1: SQL + +Users can use SQL to query the existing subscription relationships in the IoTDB subscription client: + +```SQL +-- Query all topics and the subscription relationships of consumer groups +SHOW SUBSCRIPTIONS +-- Query all subscriptions under a specific topic +SHOW SUBSCRIPTIONS ON +``` + +Result set: + +```SQL +[TopicName|ConsumerGroupName|SubscribedConsumers] +``` + +- TopicName: Topic ID +- ConsumerGroupName: Consumer group ID specified in user code +- SubscribedConsumers: All client IDs in the consumer group that have subscribed to the topic + +### Method 2: JAVA SDK + +The `SubscriptionSession` class in the IoTDB subscription client provides related interfaces for retrieving subscription status: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +# Code Examples (JAVA SDK) + +## Single Pull Consumer Consuming Data in SessionDataSetsHandler Format + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +## Multiple Push Consumers Consuming Data in TsFileHandler Format + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + +# FAQ + +## What is the difference between IoTDB data subscription and Kafka? + +### Consumption Orderliness + +- **Kafka guarantees message order within a single partition**. When a topic corresponds to only one partition and only one consumer subscribes to this topic, the consumer (single-threaded) can ensure that the order of consuming the topic's data is the same as the order in which the data was written. +- The IoTDB subscription client **does not guarantee** that the order in which the consumer consumes data is the same as the order in which the data was written, but it will try to reflect the order of data writing. + +### Message Delivery Semantics + +- Kafka can achieve Exactly once semantics for Producer and Consumer through configuration. +- The IoTDB subscription client currently cannot provide Exactly once semantics for Consumer. diff --git a/src/UserGuide/latest/User-Manual/Data-Subscription.md b/src/UserGuide/latest/User-Manual/Data-Subscription.md new file mode 100644 index 000000000..b3f555d9c --- /dev/null +++ b/src/UserGuide/latest/User-Manual/Data-Subscription.md @@ -0,0 +1,442 @@ + + +# Feature Introduction + +## Application Scenarios + +Although the IoTDB data subscription module (**hereinafter referred to as the IoTDB subscription client**) refers to some functions of message queue products such as Kafka, **the IoTDB subscription client is not intended to replace these message queue products but to provide a new way of data consumption different from data query**. In the following application scenarios, using the IoTDB subscription client to consume data will have significant advantages: + +1. **Replace polling queries for a large amount of data**: Avoid the significant impact on the performance of the original system when the query frequency is high and there are many measurement points; avoid the problem of uncertain query scope, and ensure that downstream obtains accurate and complete data. +2. **Facilitate downstream system integration**: Easier to connect with Flink, Spark, Kafka / DataX, Camel / MySQL, PG, and other system components. There is no need to customize the development of IoTDB's change capture logic for each big data component separately, which can simplify the design of integration components and facilitate users. +3. …… + +## Key Concepts + +**The IoTDB subscription client includes 3 core concepts: Topic, Consumer, and Consumer Group** + +1. **Topic**: Topic is a logical concept used by the IoTDB subscription client to classify data and can be regarded as a data publishing channel. Producers publish data to specific topics, and consumers subscribe to these topics to receive relevant data. +2. **Consumer**: Consumer is an application or service in the IoTDB subscription client responsible for receiving and processing data published to a specific **Topic**. **Consumers** fetch data from the queue and process it accordingly. The IoTDB subscription client provides two types of **Consumers**: + 1. `SubscriptionPullConsumer`, corresponding to the pull consumption mode in message queues, where user code needs to actively call data fetching logic. + 2. `SubscriptionPushConsumer`, corresponding to the push consumption mode in message queues, where user code is triggered by new data arrival events. +3. **Consumer Group**: A Consumer Group is a collection of **Consumers**. When different **Consumers** in the same **Consumer Group** subscribe to the same **Topic**, these **Consumers** share the processing load of data under that **Topic**. Each piece of data under that **Topic** can only be processed by one **Consumer** within the group, ensuring that data will not be processed repeatedly (i.e., all consumers in a consumer group share the consumption progress). + 1. There can be any number of consumers in a consumer group, allowing for dynamic addition and removal of consumers. + 2. A topic does not need to be subscribed to by all consumers in a consumer group; all consumers can subscribe to specified topics as needed, meaning they can subscribe to multiple topics and dynamically add or cancel topic subscriptions. A consumer will only receive data from a topic if it has explicitly subscribed to that topic. + 3. Both `SubscriptionPullConsumer` and `SubscriptionPushConsumer` are allowed to join the same consumer group. + 4. A consumer is not allowed to join multiple different consumer groups simultaneously. + +TODO: PIC + +# Syntax Description + +## Topic Management + +### Concept + +- **Basic Concept**: Topic is a logical concept used by the IoTDB subscription client to classify data and can be regarded as a data publishing channel. Producers publish data to specific topics, and consumers subscribe to these topics to receive relevant data. Unlike Kafka, in the IoTDB subscription client, a topic describes the **sequence characteristics, time characteristics, presentation form, and optional custom processing logic** of the subscribed data. +- **Representation Method**: A topic is defined by the following parts: + - **Path**: Indicates a set of time series collections to be subscribed to. + - **Time**: Indicates the time range of the time series to be subscribed to (event time). + - **Processor**: Indicates the custom processing logic applied to the original subscription data, which can be specified through a plugin similar to a pipe processor. + - **Format**: Indicates the presentation form of the data subscribed from this topic. Currently, two data forms are supported: + - `SessionDataSetsHandler`: Use `SubscriptionSessionDataSetsHandler` to get data subscribed from this topic, allowing consumers to consume each piece of data row by row. + - `TsFileHandler`: Use `SubscriptionTsFileHandler` to get data subscribed from this topic, allowing consumers to directly subscribe to TsFiles storing the corresponding data. +- **Lifecycle Management** + - A topic can only be subscribed to after it has been created. + - A topic can only be deleted when it is not being subscribed to, and its related consumption progress will be cleared upon deletion. + +### Management Method 1: SQL + +1. Create Topic + +```SQL +CREATE TOPIC +WITH ( + [ = ,], +); +``` + +For the parameter and value, the definitions are as follows: + +| **parameter** | **required or optional with default value** | **description** | +| :---------------- | :------------------------------------------ | :----------------------------------------------------------- | +| **path** | optional: `root.**` | The path of the time series corresponding to the topic. | +| **start-time** | optional: `MIN_VALUE` | The start time of the subscribed data time series of the topic (event time), which can be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00; or a long value, meaning a raw timestamp, with units consistent with the database timestamp precision. Supports the special value **`now`**, meaning the creation time of the topic. When `start-time` is `now` and `end-time` is `MAX_VALUE`, it indicates subscribing only to real-time data. | +| **end-time** | optional: `MAX_VALUE` | The end time of the subscribed data time series of the topic (event time), which can be in ISO format, such as 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00; or a long value, meaning a raw timestamp, with units consistent with the database timestamp precision. Supports the special value **`now`**, meaning the creation time of the topic. When `end-time` is `now` and `start-time` is `MIN_VALUE`, it indicates subscribing only to historical data. | +| **processor** | optional: `do-nothing-processor` | The processor plugin name and its plugin configuration. | +| **format** | optional: `SessionDataSetsHandler` | The data presentation format corresponding to the topic, with two options:
1. `SessionDataSetsHandler`
2. `TsFileHandler` | +| **mode (new in v1.3.3.2)** | option: `live` | The subscription mode corresponding to the topic:
1. `live`: When subscribing to this topic, the data set mode is a dynamic data set, meaning it can continuously consume the latest data.
2. `snapshot`: When the consumer subscribes to this topic, the data set mode is a static data set, meaning the snapshot of the data at the time the consumer group subscribes to this topic (not the creation time of the topic). The static data set formed after subscription does not support TTL. | +| **loose-range (new in v1.3.3.2)** | option: `""` | String: Whether to strictly filter the data corresponding to the topic according to the path and time range, for example:
1. `""`: Strictly filter the data corresponding to the topic according to the path and time range.
2. `"time"`: Do not strictly filter the data corresponding to the topic according to the time range (coarse filter); strictly filter the data corresponding to the topic according to the path.
3. `"path"`: Do not strictly filter the data corresponding to the topic according to the path (coarse filter); strictly filter the data corresponding to the topic according to the time range.
4. `"time, path"` / `"path, time"` / `"all"`: Do not strictly filter the data corresponding to the topic according to the path and time range (coarse filter). | + +Example: + +```SQL +-- Full subscription +CREATE TOPIC root_all; + +-- Custom subscription +CREATE TOPIC sg_timerange +WITH ( + 'path' = 'root.sg.**', + 'start-time' = '2023-01-01', + 'end-time' = '2023-12-31', +); +``` + +2. Delete Topic + +```SQL +DROP TOPIC ; +``` + +3. Get Topic + +```SQL +SHOW TOPICS; +SHOW TOPIC ; +``` + +### Management Method 2: JAVA SDK + +The SubscriptionSession class in the IoTDB subscription client provides related interfaces for Topic management: + +1. Create Topic + +```Java +Topic createTopic(String topicName, Properties properties) throws Exception; +``` + +Example: + + +```Java +try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(topicName, config); +} +``` + +2. Delete Topic + +```Java +Topic dropTopic(String topicName) throws Exception; +``` + +3. Get Topic + +```Java +// Get all topics +Set getTopics() throws Exception; + +// Get a single topic +Optional getTopic(String topicName) throws Exception; +``` + +## Consumer Operations + +**In the IoTDB subscription client, consumer-related operations can only be performed using the JAVA SDK.** + +**This section introduces four operations: creating a consumer, subscribing to a topic, consuming data, and unsubscribing.** + +### Creating a Consumer + +When using the JAVA SDK to create a consumer, you need to specify the parameters applied to the consumer. + +For `SubscriptionPullConsumer` and `SubscriptionPushConsumer`, the following common configurations are available: + +| key | **required or optional with default** | description | +| :---------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | +| host | optional: 127.0.0.1 | String: RPC host of a DataNode in IoTDB | +| port | optional: 6667 | Integer: RPC port of a DataNode in IoTDB | +| node-urls | optional: 127.0.0.1:6667 | List\: RPC addresses of all DataNodes in IoTDB; can be multiple; either host:port or node-urls can be specified. If both are specified, the union of host:port and node-urls will be used as the new node-urls | +| username | optional: root | String: Username of the DataNode in IoTDB | +| password | optional: root | String: Password of the DataNode in IoTDB | +| groupId | optional | String: Consumer group ID. If not specified, a new consumer group will be randomly assigned, ensuring different consumer groups have unique IDs | +| consumerId | optional | String: Consumer client ID. If not specified, a unique ID will be randomly assigned within the same consumer group | +| heartbeatIntervalMs | optional: 30000 (min: 1000) | Long: Interval for the consumer to send heartbeat requests to the IoTDB DataNode | +| endpointsSyncIntervalMs | optional: 120000 (min: 5000) | Long: Interval for the consumer to detect cluster node expansion or reduction and adjust subscription connections | +| fileSaveDir | optional: Paths.get(System.getProperty("user.dir"), "iotdb-subscription").toString() | String: Directory path where the consumer temporarily stores the subscribed TsFile files | +| fileSaveFsync | optional: false | Boolean: Whether the consumer actively calls fsync during the TsFile subscription process | + +#### SubscriptionPushConsumer + +The following are special configurations for `SubscriptionPushConsumer`: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| ackStrategy | optional: `ACKStrategy.AFTER_CONSUME` | Acknowledgment strategy for consumption progress includes the following options:
1. `ACKStrategy.BEFORE_CONSUME` (Submit consumption progress immediately when the consumer receives data, before `onReceive`)
2. `ACKStrategy.AFTER_CONSUME` (Submit consumption progress after the consumer finishes consuming data, after `onReceive`) | +| consumeListener | optional | Callback function for consuming data, must implement the `ConsumeListener` interface, defining the logic for handling data in `SessionDataSetsHandler` and `TsFileHandler` formats | +| autoPollIntervalMs | optional: 5000 (min: 500) | Long: Time interval for the consumer to automatically pull data, in milliseconds | +| autoPollTimeoutMs | optional: 10000 (min: 1000) | Long: Timeout for the consumer to pull data each time, in milliseconds | + +The `ConsumerListener` interface is defined as follows: + +```Java +@FunctionInterface +interface ConsumeListener { + default ConsumeResult onReceive(Message message) { + return ConsumeResult.SUCCESS; + } +} + +enum ConsumeResult { + SUCCESS, + FAILURE, +} +``` + +#### SubscriptionPullConsumer + +The following are the special configurations for `SubscriptionPullConsumer`: + +| key | **required or optional with default** | description | +| :----------------- | :------------------------------------ | :----------------------------------------------------------- | +| autoCommit | optional: true | Boolean: Whether to automatically commit consumption progress
If this parameter is set to false, the `commit` method needs to be called to manually commit the consumption progress | +| autoCommitInterval | optional: 5000 (min: 500) | Long: The interval for automatically committing consumption progress, in **milliseconds**
This is only effective when the autoCommit parameter is set to true | + +After creating the consumer, you need to manually call the open method of the consumer: + +```Java +void open() throws Exception; +``` + +At this point, the IoTDB subscription client will verify the correctness of the consumer's configuration. Once verified successfully, the consumer will join the corresponding consumer group. This means that after opening the consumer, you can use the returned consumer object to subscribe to Topics, consume data, and perform other operations. + +### Subscribing to Topics + +`SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA SDK for subscribing to Topics: + +```Java +// Subscribe to topics +void subscribe(String topic) throws Exception; +void subscribe(List<\String\> topics) throws Exception; +``` + +- Before a consumer subscribes to a topic, the topic must have been created; otherwise, the subscription will fail. +- If a consumer subscribes to a topic that it has already subscribed to, no error will be thrown. +- If other consumers in the same consumer group have already subscribed to the same topics, the consumer will reuse the corresponding consumption progress. + +### Consuming Data + +**For both push mode and pull mode consumers:** + +- A consumer will only receive data for a topic if it has explicitly subscribed to that topic. +- If a consumer has not subscribed to any topics after being created, it will not be able to consume any data, even if other consumers in the same consumer group have subscribed to some topics. + +#### SubscriptionPushConsumer + +After subscribing to topics, a `SubscriptionPushConsumer` does not need to manually pull data; the logic for consuming data is specified in the consumeListener configuration when creating the `SubscriptionPushConsumer`. + +#### SubscriptionPullConsumer + +After subscribing to topics, a `SubscriptionPullConsumer` needs to actively call the `poll` method to pull data: + +```Java +List poll(final Duration timeout) throws Exception; +List poll(final long timeoutMs) throws Exception; +List poll(final Set topicNames, final Duration timeout) throws Exception; +List poll(final Set topicNames, final long timeoutMs) throws Exception; +``` + +In the `poll` method, you can specify the topic names to pull (if not specified, it will default to pulling all the topics that the consumer has subscribed to) and the timeout duration. + +When the SubscriptionPullConsumer is configured with the autoCommit parameter set to false, the `commitSync` and `commitAsync` methods need to be called manually to synchronously or asynchronously commit the consumption progress of a batch of data: + +```Java +void commitSync(final SubscriptionMessage message) throws Exception; +void commitSync(final Iterable messages) throws Exception; + +CompletableFuture commitAsync(final SubscriptionMessage message); +CompletableFuture commitAsync(final Iterable messages); +void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback); +void commitAsync(final Iterable messages, final AsyncCommitCallback callback); +``` + +The `AsyncCommitCallback` class is defined as follows: + +```Java +public interface AsyncCommitCallback { + default void onComplete() { + // Do nothing + } + + default void onFailure(final Throwable e) { + // Do nothing + } +} +``` + +### Unsubscribing + +`SubscriptionPushConsumer` and `SubscriptionPullConsumer` provide the following JAVA SDK for unsubscribing and closing consumers: + +```Java +// Unsubscribe from topics +void unsubscribe(String topic) throws Exception; +void unsubscribe(List\ topics) throws Exception; + +// Close the consumer +void close(); +``` + +- If a consumer unsubscribes from a topic that it has not subscribed to, no error will be thrown. +- When a consumer closes, it will exit the corresponding consumer group and automatically unsubscribe from all the topics it has subscribed to. +- After the consumer is closed, its lifecycle ends, and it cannot be reopened to subscribe and consume data again. + +## Subscription Status Retrieval + +### Method 1: SQL + +Users can use SQL to query the existing subscription relationships in the IoTDB subscription client: + +```SQL +-- Query all topics and the subscription relationships of consumer groups +SHOW SUBSCRIPTIONS +-- Query all subscriptions under a specific topic +SHOW SUBSCRIPTIONS ON +``` + +Result set: + +```SQL +[TopicName|ConsumerGroupName|SubscribedConsumers] +``` + +- TopicName: Topic ID +- ConsumerGroupName: Consumer group ID specified in user code +- SubscribedConsumers: All client IDs in the consumer group that have subscribed to the topic + +### Method 2: JAVA SDK + +The `SubscriptionSession` class in the IoTDB subscription client provides related interfaces for retrieving subscription status: + +```Java +Set getSubscriptions() throws Exception; +Set getSubscriptions(final String topicName) throws Exception; +``` + +# Code Examples (JAVA SDK) + +## Single Pull Consumer Consuming Data in SessionDataSetsHandler Format + +```Java +// Create topics +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATH_KEY, "root.sg.**"); + session.createTopic(TOPIC_1, config); +} + +// Subscription: property-style ctor +final Properties config = new Properties(); +config.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); +config.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); + +final SubscriptionPullConsumer consumer1 = new SubscriptionPullConsumer(config); +consumer1.open(); +consumer1.subscribe(TOPIC_1); +while (true) { + LockSupport.parkNanos(SLEEP_NS); // wait some time + final List messages = consumer1.poll(POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + } + // Auto commit +} + +// Show topics and subscriptions +try (final SubscriptionSession session = new SubscriptionSession(HOST, PORT)) { + session.open(); + session.getTopics().forEach((System.out::println)); + session.getSubscriptions().forEach((System.out::println)); +} + +consumer1.unsubscribe(TOPIC_1); +consumer1.close(); +``` + +## Multiple Push Consumers Consuming Data in TsFileHandler Format + +```Java +// Create topics +try (final SubscriptionSession subscriptionSession = new SubscriptionSession(HOST, PORT)) { + subscriptionSession.open(); + final Properties config = new Properties(); + config.put(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + subscriptionSession.createTopic(TOPIC_2, config); +} + +final List threads = new ArrayList<>(); +for (int i = 0; i < 8; ++i) { + final int idx = i; + final Thread thread = + new Thread( + () -> { + // Subscription: builder-style ctor + try (final SubscriptionPushConsumer consumer2 = + new SubscriptionPushConsumer.Builder() + .consumerId("c" + idx) + .consumerGroupId("cg2") + .fileSaveDir(System.getProperty("java.io.tmpdir")) + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + doSomething(message.getTsFileHandler()); + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + consumer2.open(); + consumer2.subscribe(TOPIC_2); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + threads.add(thread); +} + +for (final Thread thread : threads) { + thread.join(); +} +``` + +# FAQ + +## What is the difference between IoTDB data subscription and Kafka? + +### Consumption Orderliness + +- **Kafka guarantees message order within a single partition**. When a topic corresponds to only one partition and only one consumer subscribes to this topic, the consumer (single-threaded) can ensure that the order of consuming the topic's data is the same as the order in which the data was written. +- The IoTDB subscription client **does not guarantee** that the order in which the consumer consumes data is the same as the order in which the data was written, but it will try to reflect the order of data writing. + +### Message Delivery Semantics + +- Kafka can achieve Exactly once semantics for Producer and Consumer through configuration. +- The IoTDB subscription client currently cannot provide Exactly once semantics for Consumer. From f81a0f53a28ea543e5c638b55f907881ebe90313 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 22 Jul 2024 13:00:38 +0800 Subject: [PATCH 4/4] add pics --- README.md | 2 +- src/UserGuide/Master/User-Manual/Data-Subscription.md | 2 +- src/UserGuide/latest/User-Manual/Data-Subscription.md | 2 +- src/zh/UserGuide/Master/User-Manual/Data-Subscription.md | 2 +- src/zh/UserGuide/latest/User-Manual/Data-Subscription.md | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c5d290865..a2cb5d8bb 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ If you have installed and the error still occurs, then `sudo xcode-select --rese # Document format - All mds will be compiled into html, README.md will be compiled into index.html, and xx.md will be compiled into xx.html -- The tags in .md must have a beginning and an end. For example, \ must have a \ corresponding to it, and it is strictly corresponding; if you need to insert tags in the document, such as List \, you can add this into the code block, You can also add \ before the two angle brackets, such as \\\ +- The tags in .md must have a beginning and an end. For example, \ must have a \ corresponding to it, and it is strictly corresponding; if you need to insert tags in the document, such as List \, you can add this into the code block, You can also add \ before the two angle brackets, such as \\ - Tags cannot be cross-nested, such as \

\

\

\
This is not allowed - The first-level title of the article is the title of the sidebar of the document, so the largest chapter title of the document is not needed diff --git a/src/UserGuide/Master/User-Manual/Data-Subscription.md b/src/UserGuide/Master/User-Manual/Data-Subscription.md index b3f555d9c..2a9d91c67 100644 --- a/src/UserGuide/Master/User-Manual/Data-Subscription.md +++ b/src/UserGuide/Master/User-Manual/Data-Subscription.md @@ -43,7 +43,7 @@ Although the IoTDB data subscription module (**hereinafter referred to as the Io 3. Both `SubscriptionPullConsumer` and `SubscriptionPushConsumer` are allowed to join the same consumer group. 4. A consumer is not allowed to join multiple different consumer groups simultaneously. -TODO: PIC +![subscription-example](https://github.com/user-attachments/assets/80bfe5bd-6b0c-4836-a14b-d811ccf54ae2) # Syntax Description diff --git a/src/UserGuide/latest/User-Manual/Data-Subscription.md b/src/UserGuide/latest/User-Manual/Data-Subscription.md index b3f555d9c..2a9d91c67 100644 --- a/src/UserGuide/latest/User-Manual/Data-Subscription.md +++ b/src/UserGuide/latest/User-Manual/Data-Subscription.md @@ -43,7 +43,7 @@ Although the IoTDB data subscription module (**hereinafter referred to as the Io 3. Both `SubscriptionPullConsumer` and `SubscriptionPushConsumer` are allowed to join the same consumer group. 4. A consumer is not allowed to join multiple different consumer groups simultaneously. -TODO: PIC +![subscription-example](https://github.com/user-attachments/assets/80bfe5bd-6b0c-4836-a14b-d811ccf54ae2) # Syntax Description diff --git a/src/zh/UserGuide/Master/User-Manual/Data-Subscription.md b/src/zh/UserGuide/Master/User-Manual/Data-Subscription.md index 6dc11fed1..af4e360c6 100644 --- a/src/zh/UserGuide/Master/User-Manual/Data-Subscription.md +++ b/src/zh/UserGuide/Master/User-Manual/Data-Subscription.md @@ -43,7 +43,7 @@ IoTDB 数据订阅模块(**下称 IoTDB 订阅客户端**)虽然参考了部 3. 允许 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 加入同一个 consumer group 4. 不允许一个 consumer 同时加入多个不同的 consumer groups -TODO: PIC +![subscription-example](https://github.com/user-attachments/assets/80bfe5bd-6b0c-4836-a14b-d811ccf54ae2) # 语法说明 diff --git a/src/zh/UserGuide/latest/User-Manual/Data-Subscription.md b/src/zh/UserGuide/latest/User-Manual/Data-Subscription.md index 6dc11fed1..af4e360c6 100644 --- a/src/zh/UserGuide/latest/User-Manual/Data-Subscription.md +++ b/src/zh/UserGuide/latest/User-Manual/Data-Subscription.md @@ -43,7 +43,7 @@ IoTDB 数据订阅模块(**下称 IoTDB 订阅客户端**)虽然参考了部 3. 允许 `SubscriptionPullConsumer` 和 `SubscriptionPushConsumer` 加入同一个 consumer group 4. 不允许一个 consumer 同时加入多个不同的 consumer groups -TODO: PIC +![subscription-example](https://github.com/user-attachments/assets/80bfe5bd-6b0c-4836-a14b-d811ccf54ae2) # 语法说明