From 1c3ddb3e0a3dc85a83fe3f4fa344104d569e9fdf Mon Sep 17 00:00:00 2001
From: ysc <2725843507@qq.com>
Date: Mon, 3 Mar 2025 19:26:11 +0800
Subject: [PATCH] Refactoring the three phases of data collection tasks using
disruptor
---
.gitignore | 5 +-
iotdb-collector/collector-core/pom.xml | 4 +
.../apache/iotdb/collector/Application.java | 2 +
.../iotdb/collector/agent/CollectorAgent.java | 50 ----
.../CollectorProcessorTaskExecutor.java | 68 -----
.../executor/CollectorSinkTaskExecutor.java | 67 -----
.../executor/CollectorSourceTaskExecutor.java | 67 -----
.../agent/executor/CollectorTaskExecutor.java | 64 -----
.../executor/CollectorTaskExecutorAgent.java | 53 ----
.../plugin/CollectorPluginConstructor.java | 77 ------
.../agent/task/CollectorProcessorTask.java | 88 ------
.../agent/task/CollectorSinkTask.java | 87 ------
.../agent/task/CollectorSourceTask.java | 69 -----
.../agent/task/CollectorTaskAgent.java | 117 --------
.../{ => ping}/impl/PingApiServiceImpl.java | 6 +-
.../api/v1/impl/AdminApiServiceImpl.java | 98 -------
.../v1/plugin/impl/PluginApiServiceImpl.java | 64 +++++
...ginApiServiceRequestValidationHandler.java | 22 ++
.../api/v1/task/impl/TaskApiServiceImpl.java | 77 ++++++
...askApiServiceRequestValidationHandler.java | 47 ++++
.../collector/config/ApiServiceOptions.java | 2 +-
.../iotdb/collector/config/Options.java | 9 +
.../collector/config/TaskRuntimeOptions.java | 63 +++++
...ollectorPlugin.java => BuiltinPlugin.java} | 24 +-
.../plugin/api/CollectorPullSource.java | 22 ++
.../api/CollectorPushSource.java} | 24 +-
.../collector/plugin/api/CollectorSource.java | 24 ++
.../collector/plugin/api/RuntimeConfig.java | 63 +++++
.../collector/plugin/api/StoppablePlugin.java | 24 ++
.../source => }/event/SourceEvent.java | 2 +-
.../processor/DoNothingProcessor.java | 15 +-
.../{builtin => }/sink/SessionSink.java | 16 +-
.../HttpPullSource.java} | 55 ++--
.../plugin/source/HttpPushSource.java | 101 +++++++
.../runtime/plugin/PluginFactory.java | 53 ++++
.../plugin/PluginRuntime.java} | 24 +-
.../collector/runtime/task/TaskRuntime.java | 138 ++++++++++
.../datastructure/TaskEventCollector.java} | 23 +-
.../task/datastructure/TaskEventConsumer.java | 63 +++++
.../TaskEventConsumerController.java} | 28 +-
.../datastructure/TaskEventContainer.java | 36 +++
.../runtime/task/def/ProcessorTask.java | 114 ++++++++
.../runtime/task/def/PullSourceTask.java | 80 ++++++
.../runtime/task/def/PushSourceTask.java | 72 +++++
.../collector/runtime/task/def/SinkTask.java | 105 ++++++++
.../runtime/task/def/SourceTask.java | 130 +++++++++
.../collector/runtime/task/def/Task.java | 69 +++++
.../runtime/task/def/TaskComponent.java | 29 ++
.../runtime/task/def/TaskRepository.java | 83 ++++++
.../DisruptorTaskExceptionHandler.java | 43 +++
.../iotdb/collector/service/ApiService.java | 7 +-
.../collector/service/RuntimeService.java | 54 ++++
.../src/main/resources/application.properties | 38 ++-
.../src/main/resources/logback.xml | 44 +++
iotdb-collector/collector-openapi/pom.xml | 51 +++-
.../openapi3/iotdb_collector_rest_v1.yaml | 253 ------------------
.../src/main/openapi3/{ => ping}/ping.yaml | 4 +-
.../src/main/openapi3/v1/plugin.yaml | 156 +++++++++++
.../src/main/openapi3/v1/task.yaml | 154 +++++++++++
pom.xml | 5 +
60 files changed, 2153 insertions(+), 1279 deletions(-)
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/{ => ping}/impl/PingApiServiceImpl.java (88%)
delete mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceRequestValidationHandler.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/{BuiltinCollectorPlugin.java => BuiltinPlugin.java} (72%)
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/{agent/plugin/CollectorPluginAgent.java => plugin/api/CollectorPushSource.java} (58%)
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorSource.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/{builtin/source => }/event/SourceEvent.java (95%)
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/{builtin => }/processor/DoNothingProcessor.java (74%)
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/{builtin => }/sink/SessionSink.java (77%)
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/{builtin/source/HttpSource.java => source/HttpPullSource.java} (51%)
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPushSource.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginFactory.java
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/{agent/task/CollectorTask.java => runtime/plugin/PluginRuntime.java} (69%)
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/{agent/collect/CollectorEventCollector.java => runtime/task/datastructure/TaskEventCollector.java} (64%)
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumer.java
rename iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/{api/v1/handler/RequestValidationHandler.java => runtime/task/datastructure/TaskEventConsumerController.java} (57%)
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventContainer.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/ProcessorTask.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PullSourceTask.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PushSourceTask.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SinkTask.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SourceTask.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/Task.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskComponent.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskRepository.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/exception/DisruptorTaskExceptionHandler.java
create mode 100644 iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java
create mode 100644 iotdb-collector/collector-core/src/main/resources/logback.xml
delete mode 100644 iotdb-collector/collector-openapi/src/main/openapi3/iotdb_collector_rest_v1.yaml
rename iotdb-collector/collector-openapi/src/main/openapi3/{ => ping}/ping.yaml (96%)
create mode 100644 iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
create mode 100644 iotdb-collector/collector-openapi/src/main/openapi3/v1/task.yaml
diff --git a/.gitignore b/.gitignore
index e8ef7a9..dced3d9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,4 +4,7 @@ target/
# IntelliJ relaged
.idea/
-*.iml
\ No newline at end of file
+*.iml
+
+# log
+logs/
\ No newline at end of file
diff --git a/iotdb-collector/collector-core/pom.xml b/iotdb-collector/collector-core/pom.xml
index ba372d0..f6ed16d 100644
--- a/iotdb-collector/collector-core/pom.xml
+++ b/iotdb-collector/collector-core/pom.xml
@@ -99,6 +99,10 @@
org.glassfish.jersey.injectjersey-hk2
+
+ com.lmax
+ disruptor
+
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
index 6cf6456..96ecdaf 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.collector.config.Configuration;
import org.apache.iotdb.collector.service.ApiService;
import org.apache.iotdb.collector.service.IService;
+import org.apache.iotdb.collector.service.RuntimeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@ public class Application {
private final LinkedList services = new LinkedList<>();
private Application() {
+ services.add(new RuntimeService());
services.add(new ApiService());
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
deleted file mode 100644
index bca4106..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent;
-
-import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
-import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
-import org.apache.iotdb.collector.agent.task.CollectorTaskAgent;
-
-public class CollectorAgent {
-
- private final CollectorTaskAgent collectorTaskAgent = CollectorTaskAgent.instance();
- private final CollectorTaskExecutorAgent collectorTaskExecutorAgent =
- CollectorTaskExecutorAgent.instance();
- private final CollectorPluginAgent collectorPluginAgent = CollectorPluginAgent.instance();
-
- private CollectorAgent() {}
-
- public static CollectorTaskAgent task() {
- return CollectorAgentHolder.INSTANCE.collectorTaskAgent;
- }
-
- public static CollectorTaskExecutorAgent executor() {
- return CollectorAgentHolder.INSTANCE.collectorTaskExecutorAgent;
- }
-
- public static CollectorPluginAgent plugin() {
- return CollectorAgentHolder.INSTANCE.collectorPluginAgent;
- }
-
- private static class CollectorAgentHolder {
- private static final CollectorAgent INSTANCE = new CollectorAgent();
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
deleted file mode 100644
index 385fd60..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.executor;
-
-import org.apache.iotdb.collector.agent.task.CollectorTask;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
-public class CollectorProcessorTaskExecutor extends CollectorTaskExecutor {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(CollectorProcessorTaskExecutor.class);
-
- private static final Map PROCESSOR_EXECUTOR = new ConcurrentHashMap<>();
- private static final Map PROCESSOR_TASK_MAP = new ConcurrentHashMap<>();
-
- public boolean validateIfAbsent(final String taskId) {
- return !PROCESSOR_EXECUTOR.containsKey(taskId) && !PROCESSOR_TASK_MAP.containsKey(taskId);
- }
-
- @Override
- public Optional getExecutor(final String taskId) {
- return Optional.of(
- IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-processor-executor-" + taskId));
- }
-
- @Override
- public void recordExecution(
- final CollectorTask collectorTask, final ExecutorService executorService) {
- final String taskId = collectorTask.getTaskId();
- PROCESSOR_EXECUTOR.putIfAbsent(taskId, executorService);
- PROCESSOR_TASK_MAP.putIfAbsent(taskId, collectorTask);
-
- LOGGER.info("register collector processor task {}", taskId);
- }
-
- @Override
- public void eraseExecution(final String taskId) {
- PROCESSOR_TASK_MAP.remove(taskId).stop();
- PROCESSOR_EXECUTOR.remove(taskId).shutdownNow();
-
- LOGGER.info("deregister collector processor task {}", taskId);
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
deleted file mode 100644
index a490e61..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.executor;
-
-import org.apache.iotdb.collector.agent.task.CollectorTask;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
-public class CollectorSinkTaskExecutor extends CollectorTaskExecutor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSinkTaskExecutor.class);
-
- private static final Map SINK_EXECUTOR = new ConcurrentHashMap<>();
- private static final Map SINK_TASK_MAP = new ConcurrentHashMap<>();
-
- public boolean validateIfAbsent(final String taskId) {
- return !SINK_EXECUTOR.containsKey(taskId) && !SINK_TASK_MAP.containsKey(taskId);
- }
-
- @Override
- public Optional getExecutor(final String taskId) {
- return Optional.of(
- IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-sink-executor-" + taskId));
- }
-
- @Override
- public void recordExecution(
- final CollectorTask collectorTask, final ExecutorService executorService) {
- final String taskId = collectorTask.getTaskId();
- SINK_EXECUTOR.putIfAbsent(taskId, executorService);
- SINK_TASK_MAP.putIfAbsent(taskId, collectorTask);
-
- LOGGER.info("register collector sink task {}", taskId);
- }
-
- @Override
- public void eraseExecution(final String taskId) {
- SINK_TASK_MAP.remove(taskId).stop();
- SINK_EXECUTOR.remove(taskId).shutdownNow();
-
- LOGGER.info("deregister collector sink task {}", taskId);
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
deleted file mode 100644
index fde8236..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.executor;
-
-import org.apache.iotdb.collector.agent.task.CollectorTask;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-
-public class CollectorSourceTaskExecutor extends CollectorTaskExecutor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSourceTaskExecutor.class);
-
- private static final Map SOURCE_EXECUTOR = new ConcurrentHashMap<>();
- private static final Map SOURCE_TASK_MAP = new ConcurrentHashMap<>();
-
- public boolean validateIfAbsent(final String taskId) {
- return !SOURCE_EXECUTOR.containsKey(taskId) && !SOURCE_TASK_MAP.containsKey(taskId);
- }
-
- @Override
- public Optional getExecutor(final String taskId) {
- return Optional.of(
- IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-source-executor-" + taskId));
- }
-
- @Override
- public void recordExecution(
- final CollectorTask collectorTask, final ExecutorService executorService) {
- final String taskId = collectorTask.getTaskId();
- SOURCE_EXECUTOR.put(taskId, executorService);
- SOURCE_TASK_MAP.putIfAbsent(taskId, collectorTask);
-
- LOGGER.info("register collector source task {}", taskId);
- }
-
- @Override
- public void eraseExecution(String taskId) {
- SOURCE_TASK_MAP.remove(taskId).stop();
- SOURCE_EXECUTOR.remove(taskId).shutdownNow();
-
- LOGGER.info("deregister collector source task {}", taskId);
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
deleted file mode 100644
index 8100766..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.executor;
-
-import org.apache.iotdb.collector.agent.task.CollectorTask;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-
-public abstract class CollectorTaskExecutor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorTaskExecutor.class);
-
- public void register(final CollectorTask collectorTask) {
- if (validateIfAbsent(collectorTask.getTaskId())) {
- getExecutor(collectorTask.getTaskId())
- .ifPresent(
- executor -> {
- executor.submit(collectorTask);
- LOGGER.info("register success {}", collectorTask.getTaskId());
- recordExecution(collectorTask, executor);
- });
- } else {
- LOGGER.warn("task {} has existed", collectorTask.getTaskId());
- }
- }
-
- public abstract boolean validateIfAbsent(final String taskId);
-
- public abstract Optional getExecutor(final String taskId);
-
- public abstract void recordExecution(
- final CollectorTask collectorTask, final ExecutorService executorService);
-
- public void deregister(final String taskId) {
- if (!validateIfAbsent(taskId)) {
- eraseExecution(taskId);
- } else {
- LOGGER.warn("task {} has not existed", taskId);
- }
- }
-
- public abstract void eraseExecution(final String taskId);
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
deleted file mode 100644
index 5adcace..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.executor;
-
-public class CollectorTaskExecutorAgent {
-
- private final CollectorSourceTaskExecutor sourceTaskExecutor;
- private final CollectorProcessorTaskExecutor processorTaskExecutor;
- private final CollectorSinkTaskExecutor sinkTaskExecutor;
-
- private CollectorTaskExecutorAgent() {
- sourceTaskExecutor = new CollectorSourceTaskExecutor();
- processorTaskExecutor = new CollectorProcessorTaskExecutor();
- sinkTaskExecutor = new CollectorSinkTaskExecutor();
- }
-
- public CollectorSourceTaskExecutor getSourceTaskExecutor() {
- return CollectorTaskExecutorAgentHolder.INSTANCE.sourceTaskExecutor;
- }
-
- public CollectorProcessorTaskExecutor getProcessorTaskExecutor() {
- return CollectorTaskExecutorAgentHolder.INSTANCE.processorTaskExecutor;
- }
-
- public CollectorSinkTaskExecutor getSinkTaskExecutor() {
- return CollectorTaskExecutorAgentHolder.INSTANCE.sinkTaskExecutor;
- }
-
- public static CollectorTaskExecutorAgent instance() {
- return CollectorTaskExecutorAgentHolder.INSTANCE;
- }
-
- private static class CollectorTaskExecutorAgentHolder {
- private static final CollectorTaskExecutorAgent INSTANCE = new CollectorTaskExecutorAgent();
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
deleted file mode 100644
index cce94f6..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.plugin;
-
-import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
-import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor;
-import org.apache.iotdb.collector.plugin.builtin.sink.SessionSink;
-import org.apache.iotdb.collector.plugin.builtin.source.HttpSource;
-import org.apache.iotdb.pipe.api.PipePlugin;
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.PipeSink;
-import org.apache.iotdb.pipe.api.PipeSource;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Supplier;
-
-public class CollectorPluginConstructor {
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorPluginConstructor.class);
-
- protected final Map> pluginConstructors = new HashMap<>();
-
- private CollectorPluginConstructor() {
- initConstructors();
- }
-
- private void initConstructors() {
- pluginConstructors.put(
- BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName(), HttpSource::new);
- pluginConstructors.put(
- BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName(),
- DoNothingProcessor::new);
- pluginConstructors.put(
- BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName(), SessionSink::new);
- LOGGER.info("builtin plugin has been initialized");
- }
-
- public PipeSource getSource(final String pluginName) {
- return (PipeSource) pluginConstructors.get(pluginName).get();
- }
-
- public PipeProcessor getProcessor(final String pluginName) {
- return (PipeProcessor) pluginConstructors.get(pluginName).get();
- }
-
- public PipeSink getSink(final String pluginName) {
- return (PipeSink) pluginConstructors.get(pluginName).get();
- }
-
- public static CollectorPluginConstructor instance() {
- return CollectorPluginConstructorHolder.INSTANCE;
- }
-
- private static class CollectorPluginConstructorHolder {
- private static final CollectorPluginConstructor INSTANCE = new CollectorPluginConstructor();
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
deleted file mode 100644
index 8d1acfc..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.task;
-
-import org.apache.iotdb.collector.agent.collect.CollectorEventCollector;
-import org.apache.iotdb.commons.pipe.task.EventSupplier;
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.event.Event;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-public class CollectorProcessorTask extends CollectorTask {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorProcessorTask.class);
-
- private final Map processorAttribute;
- private final PipeProcessor pipeProcessor;
- private final EventSupplier eventSupplier;
- private final BlockingQueue pendingQueue;
- private final CollectorEventCollector collectorEventCollector;
- private boolean isStarted = true;
-
- public CollectorProcessorTask(
- final String taskId,
- final Map processorAttribute,
- final PipeProcessor pipeProcessor,
- final EventSupplier eventSupplier,
- final BlockingQueue pendingQueue) {
- super(taskId);
- this.processorAttribute = processorAttribute;
- this.pipeProcessor = pipeProcessor;
- this.eventSupplier = eventSupplier;
- this.pendingQueue = pendingQueue;
- this.collectorEventCollector = new CollectorEventCollector(pendingQueue);
- }
-
- @Override
- public void runMayThrow() {
- while (isStarted) {
- try {
- pipeProcessor.process(eventSupplier.supply(), collectorEventCollector);
- } catch (final Exception e) {
- LOGGER.warn("error occur while processing event because {}", e.getMessage());
- }
- }
- }
-
- public Map getProcessorAttribute() {
- return processorAttribute;
- }
-
- public PipeProcessor getPipeProcessor() {
- return pipeProcessor;
- }
-
- public EventSupplier getEventSupplier() {
- return eventSupplier;
- }
-
- public BlockingQueue getPendingQueue() {
- return pendingQueue;
- }
-
- public void stop() {
- isStarted = false;
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
deleted file mode 100644
index 77e65b4..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.task;
-
-import org.apache.iotdb.pipe.api.PipeSink;
-import org.apache.iotdb.pipe.api.event.Event;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-public class CollectorSinkTask extends CollectorTask {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSinkTask.class);
-
- private final Map sinkAttribute;
- private final PipeSink pipeSink;
- private final BlockingQueue pendingQueue;
- private boolean isStarted = true;
-
- public CollectorSinkTask(
- final String taskId,
- final Map sinkAttribute,
- final PipeSink pipeSink,
- final BlockingQueue pendingQueue) {
- super(taskId);
- this.sinkAttribute = sinkAttribute;
- this.pipeSink = pipeSink;
- this.pendingQueue = pendingQueue;
- }
-
- @Override
- public void runMayThrow() {
- try {
- pipeSink.handshake();
- } catch (final Exception e) {
- LOGGER.warn("handshake fail because {}", e.getMessage());
- }
- isStarted = true;
- while (isStarted) {
- try {
- final Event event = pendingQueue.take();
- pipeSink.transfer(event);
- LOGGER.info("transfer event {} success, remain number is {}", event, pendingQueue.size());
- } catch (final InterruptedException e) {
- LOGGER.warn("interrupted while waiting for take a event");
- } catch (final Exception e) {
- LOGGER.warn("error occur while transfer event to endpoint");
- }
- }
- }
-
- public Map getSinkAttribute() {
- return sinkAttribute;
- }
-
- public PipeSink getPipeSink() {
- return pipeSink;
- }
-
- public void stop() {
- isStarted = false;
- }
-
- public BlockingQueue getPendingQueue() {
- return pendingQueue;
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
deleted file mode 100644
index 26269c3..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.task;
-
-import org.apache.iotdb.commons.pipe.task.EventSupplier;
-import org.apache.iotdb.pipe.api.PipeSource;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class CollectorSourceTask extends CollectorTask {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSourceTask.class);
-
- private final Map sourceAttribute;
- private final PipeSource pipeSource;
-
- public CollectorSourceTask(
- final String taskId, final Map sourceAttribute, final PipeSource pipeSource) {
- super(taskId);
- this.sourceAttribute = sourceAttribute;
- this.pipeSource = pipeSource;
- }
-
- @Override
- public void runMayThrow() throws Throwable {
- pipeSource.start();
- }
-
- public Map getSourceAttribute() {
- return sourceAttribute;
- }
-
- public PipeSource getPipeSource() {
- return pipeSource;
- }
-
- public EventSupplier getEventSupplier() {
- return pipeSource::supply;
- }
-
- @Override
- public void stop() {
- try {
- pipeSource.close();
- } catch (final Exception e) {
- LOGGER.warn("failed to close pipe source {}", pipeSource, e);
- }
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
deleted file mode 100644
index 9cd787d..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.agent.task;
-
-import org.apache.iotdb.collector.agent.executor.CollectorProcessorTaskExecutor;
-import org.apache.iotdb.collector.agent.executor.CollectorSinkTaskExecutor;
-import org.apache.iotdb.collector.agent.executor.CollectorSourceTaskExecutor;
-import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
-import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
-import org.apache.iotdb.collector.agent.plugin.CollectorPluginConstructor;
-import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class CollectorTaskAgent {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorTaskAgent.class);
-
- private static final CollectorPluginConstructor CONSTRUCTOR =
- CollectorPluginAgent.instance().constructor();
- private static final CollectorSourceTaskExecutor SOURCE_TASK_EXECUTOR =
- CollectorTaskExecutorAgent.instance().getSourceTaskExecutor();
- private static final CollectorProcessorTaskExecutor PROCESSOR_TASK_EXECUTOR =
- CollectorTaskExecutorAgent.instance().getProcessorTaskExecutor();
- private static final CollectorSinkTaskExecutor SINK_TASK_EXECUTOR =
- CollectorTaskExecutorAgent.instance().getSinkTaskExecutor();
-
- private CollectorTaskAgent() {}
-
- public boolean createCollectorTask(
- final Map sourceAttribute,
- final Map processorAttribute,
- final Map sinkAttribute,
- final String taskId) {
- try {
- final CollectorSourceTask collectorSourceTask =
- new CollectorSourceTask(
- taskId,
- sourceAttribute,
- CONSTRUCTOR.getSource(
- sourceAttribute.getOrDefault(
- "source-plugin",
- BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName())));
- SOURCE_TASK_EXECUTOR.register(collectorSourceTask);
-
- final CollectorProcessorTask collectorProcessorTask =
- new CollectorProcessorTask(
- taskId,
- processorAttribute,
- CONSTRUCTOR.getProcessor(
- processorAttribute.getOrDefault(
- "processor-plugin",
- BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName())),
- collectorSourceTask.getEventSupplier(),
- new LinkedBlockingQueue<>());
- PROCESSOR_TASK_EXECUTOR.register(collectorProcessorTask);
-
- final CollectorSinkTask collectorSinkTask =
- new CollectorSinkTask(
- taskId,
- sinkAttribute,
- CONSTRUCTOR.getSink(
- sinkAttribute.getOrDefault(
- "sink-plugin",
- BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName())),
- collectorProcessorTask.getPendingQueue());
- SINK_TASK_EXECUTOR.register(collectorSinkTask);
- } catch (final Exception e) {
- LOGGER.warn("create collector task error", e);
- return false;
- }
-
- return true;
- }
-
- public boolean stopCollectorTask(final String taskId) {
- try {
- SOURCE_TASK_EXECUTOR.deregister(taskId);
- PROCESSOR_TASK_EXECUTOR.deregister(taskId);
- SINK_TASK_EXECUTOR.deregister(taskId);
- } catch (final Exception e) {
- LOGGER.warn("stop collector task error", e);
- return false;
- }
-
- return true;
- }
-
- public static CollectorTaskAgent instance() {
- return CollectorTaskAgentHolder.INSTANCE;
- }
-
- private static class CollectorTaskAgentHolder {
- private static final CollectorTaskAgent INSTANCE = new CollectorTaskAgent();
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/ping/impl/PingApiServiceImpl.java
similarity index 88%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/ping/impl/PingApiServiceImpl.java
index 7d73e38..13b49ca 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/ping/impl/PingApiServiceImpl.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.iotdb.collector.api.impl;
+package org.apache.iotdb.collector.api.ping.impl;
-import org.apache.iotdb.collector.api.PingApiService;
-import org.apache.iotdb.collector.api.v1.model.ExecutionStatus;
+import org.apache.iotdb.collector.api.ping.PingApiService;
+import org.apache.iotdb.collector.api.ping.model.ExecutionStatus;
import org.apache.iotdb.rpc.TSStatusCode;
import javax.ws.rs.core.Response;
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
deleted file mode 100644
index 07d1f3e..0000000
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.collector.api.v1.impl;
-
-import org.apache.iotdb.collector.agent.CollectorAgent;
-import org.apache.iotdb.collector.api.v1.AdminApiService;
-import org.apache.iotdb.collector.api.v1.NotFoundException;
-import org.apache.iotdb.collector.api.v1.handler.RequestValidationHandler;
-import org.apache.iotdb.collector.api.v1.model.AlterPipeRequest;
-import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
-import org.apache.iotdb.collector.api.v1.model.DropPipeRequest;
-import org.apache.iotdb.collector.api.v1.model.StartPipeRequest;
-import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.SecurityContext;
-
-public class AdminApiServiceImpl extends AdminApiService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AdminApiServiceImpl.class);
-
- @Override
- public Response alterPipe(
- final AlterPipeRequest alterPipeRequest, final SecurityContext securityContext)
- throws NotFoundException {
- return Response.ok("alterPipe").build();
- }
-
- @Override
- public Response createPipe(
- final CreatePipeRequest createPipeRequest, final SecurityContext securityContext)
- throws NotFoundException {
- RequestValidationHandler.validateCreateRequest(createPipeRequest);
-
- final boolean createdResult =
- CollectorAgent.task()
- .createCollectorTask(
- createPipeRequest.getSourceAttribute(),
- createPipeRequest.getProcessorAttribute(),
- createPipeRequest.getSinkAttribute(),
- createPipeRequest.getTaskId());
- if (createdResult) {
- LOGGER.info("Create task successful");
- return Response.status(Response.Status.OK).entity("create task success").build();
- }
- LOGGER.warn("Create task failed");
- return Response.status(Response.Status.BAD_REQUEST).entity("create task fail").build();
- }
-
- @Override
- public Response dropPipe(
- final DropPipeRequest dropPipeRequest, final SecurityContext securityContext)
- throws NotFoundException {
- return Response.ok("dropPipe").build();
- }
-
- @Override
- public Response startPipe(
- final StartPipeRequest startPipeRequest, final SecurityContext securityContext)
- throws NotFoundException {
- return Response.ok("startPipe").build();
- }
-
- @Override
- public Response stopPipe(
- final StopPipeRequest stopPipeRequest, final SecurityContext securityContext)
- throws NotFoundException {
- RequestValidationHandler.validateStopRequest(stopPipeRequest);
-
- final boolean stopResult = CollectorAgent.task().stopCollectorTask(stopPipeRequest.getTaskId());
- if (stopResult) {
- LOGGER.info("Stop task: {} successful", stopPipeRequest.getTaskId());
- return Response.ok().entity("stop task: " + stopPipeRequest.getTaskId() + " success").build();
- }
- LOGGER.warn("Stop task: {} failed", stopPipeRequest.getTaskId());
- return Response.status(Response.Status.BAD_REQUEST).entity("stop task fail").build();
- }
-}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
new file mode 100644
index 0000000..61daa03
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.plugin.impl;
+
+import org.apache.iotdb.collector.api.v1.plugin.PluginApiService;
+import org.apache.iotdb.collector.api.v1.plugin.model.AlterPluginRequest;
+import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
+import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
+import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest;
+import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest;
+import org.apache.iotdb.collector.service.RuntimeService;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class PluginApiServiceImpl extends PluginApiService {
+
+ @Override
+ public Response createPlugin(
+ final CreatePluginRequest createPluginRequest, final SecurityContext securityContext) {
+ return Response.ok("create plugin").entity(RuntimeService.plugin().createPlugin()).build();
+ }
+
+ @Override
+ public Response alterPlugin(
+ final AlterPluginRequest alterPluginRequest, final SecurityContext securityContext) {
+ return Response.ok("alter plugin").entity(RuntimeService.plugin().alterPlugin()).build();
+ }
+
+ @Override
+ public Response startPlugin(
+ final StartPluginRequest startPluginRequest, final SecurityContext securityContext) {
+ return Response.ok("start plugin").entity(RuntimeService.plugin().startPlugin()).build();
+ }
+
+ @Override
+ public Response stopPlugin(
+ final StopPluginRequest stopPluginRequest, final SecurityContext securityContext) {
+ return Response.ok("stop plugin").entity(RuntimeService.plugin().stopPlugin()).build();
+ }
+
+ @Override
+ public Response dropPlugin(
+ final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) {
+ return Response.ok("drop plugin").entity(RuntimeService.plugin().dropPlugin()).build();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
new file mode 100644
index 0000000..36adce3
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.plugin.impl;
+
+public class PluginApiServiceRequestValidationHandler {}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
new file mode 100644
index 0000000..94a0aca
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.task.impl;
+
+import org.apache.iotdb.collector.api.v1.task.TaskApiService;
+import org.apache.iotdb.collector.api.v1.task.model.AlterTaskRequest;
+import org.apache.iotdb.collector.api.v1.task.model.CreateTaskRequest;
+import org.apache.iotdb.collector.api.v1.task.model.DropTaskRequest;
+import org.apache.iotdb.collector.api.v1.task.model.StartTaskRequest;
+import org.apache.iotdb.collector.api.v1.task.model.StopTaskRequest;
+import org.apache.iotdb.collector.service.RuntimeService;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class TaskApiServiceImpl extends TaskApiService {
+
+ @Override
+ public Response createTask(
+ final CreateTaskRequest createTaskRequest, final SecurityContext securityContext) {
+ TaskApiServiceRequestValidationHandler.validateCreateRequest(createTaskRequest);
+
+ return RuntimeService.task()
+ .createTask(
+ createTaskRequest.getTaskId(),
+ createTaskRequest.getSourceAttribute(),
+ createTaskRequest.getProcessorAttribute(),
+ createTaskRequest.getSinkAttribute());
+ }
+
+ @Override
+ public Response alterTask(
+ final AlterTaskRequest alterTaskRequest, final SecurityContext securityContext) {
+ return Response.ok("alter task").build();
+ }
+
+ @Override
+ public Response startTask(
+ final StartTaskRequest startTaskRequest, final SecurityContext securityContext) {
+ TaskApiServiceRequestValidationHandler.validateStartRequest(startTaskRequest);
+
+ return RuntimeService.task().startTask(startTaskRequest.getTaskId());
+ }
+
+ @Override
+ public Response stopTask(
+ final StopTaskRequest stopTaskRequest, final SecurityContext securityContext) {
+ TaskApiServiceRequestValidationHandler.validateStopRequest(stopTaskRequest);
+
+ return RuntimeService.task().stopTask(stopTaskRequest.getTaskId());
+ }
+
+ @Override
+ public Response dropTask(
+ final DropTaskRequest dropTaskRequest, final SecurityContext securityContext) {
+ TaskApiServiceRequestValidationHandler.validateDropRequest(dropTaskRequest);
+
+ return RuntimeService.task().dropTask(dropTaskRequest.getTaskId());
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceRequestValidationHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceRequestValidationHandler.java
new file mode 100644
index 0000000..7397f0c
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceRequestValidationHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.task.impl;
+
+import org.apache.iotdb.collector.api.v1.task.model.CreateTaskRequest;
+import org.apache.iotdb.collector.api.v1.task.model.DropTaskRequest;
+import org.apache.iotdb.collector.api.v1.task.model.StartTaskRequest;
+import org.apache.iotdb.collector.api.v1.task.model.StopTaskRequest;
+
+import java.util.Objects;
+
+public class TaskApiServiceRequestValidationHandler {
+ private TaskApiServiceRequestValidationHandler() {}
+
+ public static void validateCreateRequest(final CreateTaskRequest createTaskRequest) {
+ Objects.requireNonNull(createTaskRequest.getTaskId(), "taskId cannot be null");
+ }
+
+ public static void validateStopRequest(final StopTaskRequest stopTaskRequest) {
+ Objects.requireNonNull(stopTaskRequest.getTaskId(), "taskId cannot be null");
+ }
+
+ public static void validateStartRequest(final StartTaskRequest startTaskRequest) {
+ Objects.requireNonNull(startTaskRequest.getTaskId(), "taskId cannot be null");
+ }
+
+ public static void validateDropRequest(final DropTaskRequest dropTaskRequest) {
+ Objects.requireNonNull(dropTaskRequest.getTaskId(), "taskId cannot be null");
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
index 2e3ee55..8a66cbb 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
@@ -24,7 +24,7 @@ public class ApiServiceOptions extends Options {
public static final Option PORT =
new Option("api_service_port", 17070) {
@Override
- public void setValue(String valueString) {
+ public void setValue(final String valueString) {
value = Integer.parseInt(valueString);
}
};
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
index 8ad2887..9360248 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
@@ -32,6 +32,15 @@ public class Options {
private static final Map> OPTIONS = new ConcurrentHashMap<>();
+ static {
+ try {
+ Class.forName("org.apache.iotdb.collector.config.ApiServiceOptions");
+ Class.forName("org.apache.iotdb.collector.config.TaskRuntimeOptions");
+ } catch (final ClassNotFoundException e) {
+ throw new RuntimeException("Failed to load ApiServiceOptions", e);
+ }
+ }
+
public abstract static class Option {
private final String key;
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java
new file mode 100644
index 0000000..70efcc9
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/TaskRuntimeOptions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+public class TaskRuntimeOptions extends Options {
+
+ public static final Option TASK_SOURCE_PARALLELISM_NUM =
+ new Option("task_source_parallelism_num", 4) {
+ @Override
+ public void setValue(final String valueString) {
+ value = Integer.parseInt(valueString);
+ }
+ };
+
+ public static final Option TASK_PROCESS_PARALLELISM_NUM =
+ new Option("task_process_parallelism_num", 4) {
+ @Override
+ public void setValue(final String valueString) {
+ value = Integer.parseInt(valueString);
+ }
+ };
+
+ public static final Option TASK_SINK_PARALLELISM_NUM =
+ new Option("task_sink_parallelism_num", 4) {
+ @Override
+ public void setValue(final String valueString) {
+ value = Integer.parseInt(valueString);
+ }
+ };
+
+ public static final Option TASK_PROCESSOR_RING_BUFFER_SIZE =
+ new Option("task_processor_ring_buffer_size", 1024) {
+ @Override
+ public void setValue(String valueString) {
+ value = Integer.parseInt(valueString);
+ }
+ };
+
+ public static final Option TASK_SINK_RING_BUFFER_SIZE =
+ new Option("task_sink_ring_buffer_size", 1024) {
+ @Override
+ public void setValue(String valueString) {
+ value = Integer.parseInt(valueString);
+ }
+ };
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinPlugin.java
similarity index 72%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinPlugin.java
index 2d676e5..683a2c4 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinCollectorPlugin.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinPlugin.java
@@ -19,19 +19,23 @@
package org.apache.iotdb.collector.plugin;
-import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor;
-import org.apache.iotdb.collector.plugin.builtin.sink.SessionSink;
-import org.apache.iotdb.collector.plugin.builtin.source.HttpSource;
+import org.apache.iotdb.collector.plugin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.plugin.sink.SessionSink;
+import org.apache.iotdb.collector.plugin.source.HttpPullSource;
+import org.apache.iotdb.collector.plugin.source.HttpPushSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-public enum BuiltinCollectorPlugin {
+public enum BuiltinPlugin {
- // Sources
- HTTP_SOURCE("http-source", HttpSource.class),
+ // PushSources
+ HTTP_PUSH_SOURCE("http-push-source", HttpPushSource.class),
+
+ // PullSources
+ HTTP_PULL_SOURCE("http-pull-source", HttpPullSource.class),
// Processors
DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
@@ -43,7 +47,7 @@ public enum BuiltinCollectorPlugin {
private final Class> collectorPluginClass;
private final String className;
- BuiltinCollectorPlugin(String collectorPluginName, Class> collectorPluginClass) {
+ BuiltinPlugin(final String collectorPluginName, final Class> collectorPluginClass) {
this.collectorPluginName = collectorPluginName;
this.collectorPluginClass = collectorPluginClass;
this.className = collectorPluginClass.getName();
@@ -65,8 +69,10 @@ public String getClassName() {
Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
- // Sources
- HTTP_SOURCE.getCollectorPluginName().toUpperCase(),
+ // PushSources
+ HTTP_PUSH_SOURCE.getCollectorPluginName().toUpperCase(),
+ // PullSources
+ HTTP_PULL_SOURCE.getCollectorPluginName().toUpperCase(),
// Processors
DO_NOTHING_PROCESSOR.getCollectorPluginName().toUpperCase(),
// Sinks
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java
new file mode 100644
index 0000000..097111a
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.api;
+
+public abstract class CollectorPullSource implements CollectorSource {}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPushSource.java
similarity index 58%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPushSource.java
index 33ee5ca..622ed52 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPushSource.java
@@ -17,23 +17,25 @@
* under the License.
*/
-package org.apache.iotdb.collector.agent.plugin;
+package org.apache.iotdb.collector.plugin.api;
-public class CollectorPluginAgent {
- private final CollectorPluginConstructor collectorPluginConstructor =
- CollectorPluginConstructor.instance();
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
- private CollectorPluginAgent() {}
+public abstract class CollectorPushSource implements CollectorSource {
- public CollectorPluginConstructor constructor() {
- return CollectorPluginAgentHolder.INSTANCE.collectorPluginConstructor;
+ protected final EventCollector collector;
+
+ public CollectorPushSource(final EventCollector collector) {
+ this.collector = collector;
}
- public static CollectorPluginAgent instance() {
- return new CollectorPluginAgent();
+ @Override
+ public final Event supply() {
+ throw new UnsupportedOperationException();
}
- private static class CollectorPluginAgentHolder {
- private static final CollectorPluginAgent INSTANCE = new CollectorPluginAgent();
+ public final void supply(final Event event) throws Exception {
+ collector.collect(event);
}
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorSource.java
new file mode 100644
index 0000000..2e483e1
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorSource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.api;
+
+import org.apache.iotdb.pipe.api.PipeSource;
+
+public interface CollectorSource extends StoppablePlugin, PipeSource {}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java
new file mode 100644
index 0000000..1da84bc
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.api;
+
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
+
+public class RuntimeConfig implements PipeExtractorRuntimeConfiguration {
+
+ public static class RuntimeEnvironment implements PipeRuntimeEnvironment {
+
+ public int getParallelism() {
+ return 0;
+ }
+
+ public int getParallelismIndex() {
+ return 0;
+ }
+
+ @Override
+ public String getPipeName() {
+ return "";
+ }
+
+ @Override
+ public long getCreationTime() {
+ return 0;
+ }
+ }
+
+ @Override
+ public PipeRuntimeEnvironment getRuntimeEnvironment() {
+ return new PipeRuntimeEnvironment() {
+
+ @Override
+ public String getPipeName() {
+ return "";
+ }
+
+ @Override
+ public long getCreationTime() {
+ return 0;
+ }
+ };
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java
new file mode 100644
index 0000000..8a97564
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.api;
+
+public interface StoppablePlugin {
+ void stop() throws Exception;
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/event/SourceEvent.java
similarity index 95%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/event/SourceEvent.java
index 813e25e..0c097ce 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/event/SourceEvent.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/event/SourceEvent.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.collector.plugin.builtin.source.event;
+package org.apache.iotdb.collector.plugin.event;
import org.apache.iotdb.pipe.api.event.Event;
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/processor/DoNothingProcessor.java
similarity index 74%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/processor/DoNothingProcessor.java
index 436fc20..2bf2ee9 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/processor/DoNothingProcessor.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.collector.plugin.builtin.processor;
+package org.apache.iotdb.collector.plugin.processor;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -35,24 +35,27 @@ public class DoNothingProcessor implements PipeProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class);
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
+ public void validate(final PipeParameterValidator validator) throws Exception {
// Do Nothing
}
@Override
- public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration)
throws Exception {
// Do Nothing
}
@Override
- public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+ public void process(
+ final TabletInsertionEvent tabletInsertionEvent, final EventCollector eventCollector)
throws Exception {
- // Do Nothing
+ LOGGER.info("DoNothingProcessor process tabletInsertionEvent: {}", tabletInsertionEvent);
+ eventCollector.collect(tabletInsertionEvent);
}
@Override
- public void process(Event event, EventCollector eventCollector) throws Exception {
+ public void process(final Event event, final EventCollector eventCollector) throws Exception {
LOGGER.info("DoNothingProcessor process event: {}", event);
eventCollector.collect(event);
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/sink/SessionSink.java
similarity index 77%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/sink/SessionSink.java
index e2a5260..64966db 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/SessionSink.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/sink/SessionSink.java
@@ -17,9 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.collector.plugin.builtin.sink;
+package org.apache.iotdb.collector.plugin.sink;
-import org.apache.iotdb.collector.plugin.builtin.source.event.SourceEvent;
+import org.apache.iotdb.collector.plugin.event.SourceEvent;
import org.apache.iotdb.pipe.api.PipeSink;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration;
@@ -36,14 +36,16 @@ public class SessionSink implements PipeSink {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionSink.class);
@Override
- public void validate(PipeParameterValidator validator) throws Exception {}
+ public void validate(final PipeParameterValidator validator) throws Exception {}
@Override
- public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
throws Exception {}
@Override
- public void customize(PipeParameters parameters, PipeSinkRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeSinkRuntimeConfiguration configuration)
throws Exception {}
@Override
@@ -55,10 +57,10 @@ public void handshake() throws Exception {
public void heartbeat() throws Exception {}
@Override
- public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {}
+ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {}
@Override
- public void transfer(Event event) throws Exception {
+ public void transfer(final Event event) throws Exception {
final SourceEvent sourceEvent = (SourceEvent) event;
LOGGER.info("SessionSink transfer successfully {}", sourceEvent);
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPullSource.java
similarity index 51%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPullSource.java
index 65ba79b..83881a3 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpSource.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPullSource.java
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.collector.plugin.builtin.source;
+package org.apache.iotdb.collector.plugin.source;
-import org.apache.iotdb.collector.plugin.builtin.source.event.SourceEvent;
-import org.apache.iotdb.pipe.api.PipeSource;
+import org.apache.iotdb.collector.plugin.api.CollectorPullSource;
+import org.apache.iotdb.collector.plugin.event.SourceEvent;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -31,49 +31,40 @@
import org.slf4j.LoggerFactory;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
-public class HttpSource implements PipeSource {
+public class HttpPullSource extends CollectorPullSource {
- private static final Logger LOGGER = LoggerFactory.getLogger(HttpSource.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HttpPullSource.class);
- private static final BlockingQueue queue = new LinkedBlockingQueue<>();
- private boolean isStarted = true;
+ @Override
+ public void validate(PipeParameterValidator pipeParameterValidator) {}
@Override
- public void validate(PipeParameterValidator validator) throws Exception {}
+ public void customize(
+ PipeParameters pipeParameters,
+ PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) {}
@Override
- public void customize(PipeParameters parameters, PipeExtractorRuntimeConfiguration configuration)
- throws Exception {}
+ public void customize(
+ PipeParameters pipeParameters,
+ PipeSourceRuntimeConfiguration pipeSourceRuntimeConfiguration) {}
@Override
- public void customize(PipeParameters parameters, PipeSourceRuntimeConfiguration configuration)
- throws Exception {}
+ public void start() throws Exception {}
@Override
- public void start() {
- isStarted = true;
- while (isStarted) {
- Event event = new SourceEvent(String.valueOf(new Random().nextInt(1000)));
- try {
- queue.put(event);
- Thread.sleep(1000);
- LOGGER.info("event: {} created success", event);
- } catch (final InterruptedException e) {
- LOGGER.warn("failed to create event because {}", e.getMessage());
- }
- }
+ public Event supply() {
+ final Event event = new SourceEvent(String.valueOf(new Random().nextInt(1000)));
+ LOGGER.info("event: {} created success", event);
+ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
+ return event;
}
@Override
- public Event supply() throws Exception {
- return queue.take();
- }
+ public void stop() throws Exception {}
@Override
- public void close() throws Exception {
- isStarted = false;
- }
+ public void close() {}
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPushSource.java
new file mode 100644
index 0000000..96d03d0
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPushSource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.source;
+
+import org.apache.iotdb.collector.plugin.api.CollectorPushSource;
+import org.apache.iotdb.collector.plugin.event.SourceEvent;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class HttpPushSource extends CollectorPushSource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HttpPushSource.class);
+
+ private volatile boolean isStarted = true;
+ private Thread workerThread;
+
+ public HttpPushSource(final EventCollector collector) {
+ super(collector);
+ }
+
+ @Override
+ public void validate(final PipeParameterValidator validator) {}
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeExtractorRuntimeConfiguration configuration) {}
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeSourceRuntimeConfiguration configuration) {}
+
+ @Override
+ public void start() {
+ if (workerThread == null || !workerThread.isAlive()) {
+ isStarted = true;
+ workerThread = new Thread(this::doWork);
+ workerThread.start();
+ }
+ }
+
+ private void doWork() {
+ try {
+ while (isStarted && !Thread.currentThread().isInterrupted()) {
+ final Event event = new SourceEvent(String.valueOf(new Random().nextInt(1000)));
+ LOGGER.info("event: {} created success", event);
+ supply(event);
+ TimeUnit.SECONDS.sleep(2);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER.error("Error in push source", e);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ this.isStarted = false;
+ }
+
+ @Override
+ public void close() {
+ isStarted = false;
+ if (workerThread != null) {
+ workerThread.interrupt();
+ try {
+ workerThread.join(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ workerThread = null;
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginFactory.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginFactory.java
new file mode 100644
index 0000000..89bdca4
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin;
+
+import org.apache.iotdb.collector.plugin.BuiltinPlugin;
+import org.apache.iotdb.collector.plugin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.plugin.sink.SessionSink;
+import org.apache.iotdb.collector.plugin.source.HttpPullSource;
+import org.apache.iotdb.pipe.api.PipePlugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class PluginFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PluginFactory.class);
+
+ protected final Map> pluginConstructors = new HashMap<>();
+
+ public PluginFactory() {
+ initFactory();
+ }
+
+ private void initFactory() {
+ pluginConstructors.put(
+ BuiltinPlugin.HTTP_PULL_SOURCE.getCollectorPluginName(), HttpPullSource::new);
+ pluginConstructors.put(
+ BuiltinPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName(), DoNothingProcessor::new);
+ pluginConstructors.put(
+ BuiltinPlugin.IOTDB_SESSION_SINK.getCollectorPluginName(), SessionSink::new);
+ LOGGER.info("builtin plugin has been initialized");
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
similarity index 69%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
index 4f13ff9..52f49e1 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
@@ -17,21 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.collector.agent.task;
+package org.apache.iotdb.collector.runtime.plugin;
-import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+public class PluginRuntime {
-public abstract class CollectorTask extends WrappedRunnable {
+ public boolean createPlugin() {
+ return true;
+ }
- protected final String taskId;
+ public boolean alterPlugin() {
+ return true;
+ }
- protected CollectorTask(final String taskId) {
- this.taskId = taskId;
+ public boolean startPlugin() {
+ return true;
}
- public String getTaskId() {
- return taskId;
+ public boolean stopPlugin() {
+ return true;
}
- public abstract void stop();
+ public boolean dropPlugin() {
+ return true;
+ }
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
new file mode 100644
index 0000000..3754f5f
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task;
+
+import org.apache.iotdb.collector.runtime.task.def.ProcessorTask;
+import org.apache.iotdb.collector.runtime.task.def.PushSourceTask;
+import org.apache.iotdb.collector.runtime.task.def.SinkTask;
+import org.apache.iotdb.collector.runtime.task.def.SourceTask;
+import org.apache.iotdb.collector.runtime.task.def.TaskRepository;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TaskRuntime {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskRuntime.class);
+
+ private static final Map TASK_REPOSITORY_MAP = new ConcurrentHashMap<>();
+
+ public Response createTask(
+ final String taskId,
+ final Map sourceAttribute,
+ final Map processorAttribute,
+ final Map sinkAttribute) {
+ try {
+ if (validateTaskIsExist(taskId)) {
+ return Response.status(Response.Status.CONFLICT)
+ .entity(String.format("task %s has existed", taskId))
+ .build();
+ }
+
+ final SinkTask sinkTask = new SinkTask(sinkAttribute);
+ final ProcessorTask processorTask = new ProcessorTask(processorAttribute, sinkTask);
+ final SourceTask sourceTask = new PushSourceTask(taskId, sourceAttribute, processorTask);
+ final TaskRepository taskRepository = new TaskRepository(sourceTask, processorTask, sinkTask);
+
+ TASK_REPOSITORY_MAP.put(taskId, taskRepository);
+ taskRepository.create();
+
+ LOGGER.info("Successfully created task {}", taskId);
+ return Response.status(Response.Status.CREATED)
+ .entity(String.format("Successfully created task %s", taskId))
+ .build();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to create task", e);
+ return Response.serverError()
+ .entity(String.format("Failed to create task %s, because %s", taskId, e.getMessage()))
+ .build();
+ }
+ }
+
+ public boolean alterTask() {
+ return true;
+ }
+
+ public Response startTask(final String taskId) {
+ if (!validateTaskIsExist(taskId)) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(String.format("task %s not found", taskId))
+ .build();
+ }
+
+ TASK_REPOSITORY_MAP.get(taskId).start();
+ LOGGER.info("Task {} started successfully", taskId);
+ return Response.status(Response.Status.OK)
+ .entity(String.format("task %s start successfully", taskId))
+ .build();
+ }
+
+ public Response stopTask(final String taskId) {
+ if (!validateTaskIsExist(taskId)) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(String.format("task %s not found", taskId))
+ .build();
+ }
+
+ try {
+ final TaskRepository taskRepository = TASK_REPOSITORY_MAP.get(taskId);
+ if (taskRepository != null) {
+ taskRepository.stop();
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to stop task", e);
+ return Response.serverError()
+ .entity(String.format("Failed to stop task %s, because %s", taskId, e.getMessage()))
+ .build();
+ }
+
+ LOGGER.info("Task {} stopped successfully", taskId);
+ return Response.status(Response.Status.OK)
+ .entity(String.format("task %s stop successfully", taskId))
+ .build();
+ }
+
+ public Response dropTask(final String taskId) {
+ if (!validateTaskIsExist(taskId)) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(String.format("task %s not found", taskId))
+ .build();
+ }
+
+ TASK_REPOSITORY_MAP.remove(taskId).drop();
+ LOGGER.info("Task {} dropped successfully", taskId);
+ return Response.status(Response.Status.OK)
+ .entity(String.format("task %s drop successfully", taskId))
+ .build();
+ }
+
+ private boolean validateTaskIsExist(final String taskId) {
+ if (TASK_REPOSITORY_MAP.containsKey(taskId)) {
+ return true;
+ }
+ LOGGER.warn("Task {} not found", taskId);
+ return false;
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventCollector.java
similarity index 64%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventCollector.java
index 056685e..43886a5 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventCollector.java
@@ -17,32 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.collector.agent.collect;
+package org.apache.iotdb.collector.runtime.task.datastructure;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
+import com.lmax.disruptor.RingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.BlockingQueue;
+public class TaskEventCollector implements EventCollector {
-public class CollectorEventCollector implements EventCollector {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskEventCollector.class);
+ private final RingBuffer ringBuffer;
- private static final Logger LOGGER = LoggerFactory.getLogger(CollectorEventCollector.class);
-
- private final BlockingQueue pendingQueue;
-
- public CollectorEventCollector(final BlockingQueue pendingQueue) {
- this.pendingQueue = pendingQueue;
+ public TaskEventCollector(final RingBuffer ringBuffer) {
+ this.ringBuffer = ringBuffer;
}
@Override
public void collect(final Event event) {
- try {
- pendingQueue.put(event);
- } catch (final InterruptedException e) {
- LOGGER.warn("collect event failed because {}", e.getMessage(), e);
- }
+ ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event);
+ LOGGER.info("successfully publish event {}", event);
}
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumer.java
new file mode 100644
index 0000000..7f94487
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.datastructure;
+
+import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.PipeSink;
+
+import com.lmax.disruptor.WorkHandler;
+
+public class TaskEventConsumer implements WorkHandler {
+
+ private final PipePlugin plugin;
+ private final TaskEventCollector collector;
+ private final TaskEventConsumerController consumerController;
+
+ public TaskEventConsumer(
+ final PipePlugin plugin,
+ final TaskEventCollector collector,
+ final TaskEventConsumerController consumerController) {
+ this.plugin = plugin;
+ this.collector = collector;
+ this.consumerController = consumerController;
+ }
+
+ public TaskEventConsumer(
+ final PipePlugin plugin, final TaskEventConsumerController consumerController) {
+ this(plugin, null, consumerController);
+ }
+
+ @Override
+ public void onEvent(final TaskEventContainer taskEventContainer) throws Exception {
+ if (!consumerController.shouldRun()) {
+ return;
+ }
+ if (plugin instanceof PipeProcessor) {
+ ((PipeProcessor) plugin).process(taskEventContainer.getEvent(), this.collector);
+ } else if (plugin instanceof PipeSink) {
+ ((PipeSink) plugin).transfer(taskEventContainer.getEvent());
+ }
+ }
+
+ public TaskEventConsumerController getConsumerController() {
+ return consumerController;
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumerController.java
similarity index 57%
rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumerController.java
index 9456c2f..7506a09 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumerController.java
@@ -17,21 +17,29 @@
* under the License.
*/
-package org.apache.iotdb.collector.api.v1.handler;
+package org.apache.iotdb.collector.runtime.task.datastructure;
-import org.apache.iotdb.collector.api.v1.model.CreatePipeRequest;
-import org.apache.iotdb.collector.api.v1.model.StopPipeRequest;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
-import java.util.Objects;
+public class TaskEventConsumerController {
-public class RequestValidationHandler {
- private RequestValidationHandler() {}
+ private final AtomicBoolean running = new AtomicBoolean(true);
- public static void validateCreateRequest(final CreatePipeRequest createPipeRequest) {
- Objects.requireNonNull(createPipeRequest.getTaskId(), "taskId cannot be null");
+ private static final long PARK_NANOS = 100_000_000L;
+
+ public void pause() {
+ running.set(false);
+ }
+
+ public void resume() {
+ running.set(true);
}
- public static void validateStopRequest(final StopPipeRequest stopPipeRequest) {
- Objects.requireNonNull(stopPipeRequest.getTaskId(), "taskId cannot be null");
+ public boolean shouldRun() {
+ while (!running.get()) {
+ LockSupport.parkNanos(PARK_NANOS);
+ }
+ return running.get();
}
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventContainer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventContainer.java
new file mode 100644
index 0000000..8ac42fd
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventContainer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.datastructure;
+
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class TaskEventContainer implements Event {
+ private Event event;
+
+ public TaskEventContainer() {}
+
+ public Event getEvent() {
+ return event;
+ }
+
+ public void setEvent(final Event event) {
+ this.event = event;
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/ProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/ProcessorTask.java
new file mode 100644
index 0000000..c498280
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/ProcessorTask.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.def;
+
+import org.apache.iotdb.collector.config.TaskRuntimeOptions;
+import org.apache.iotdb.collector.plugin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventCollector;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumer;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventContainer;
+import org.apache.iotdb.collector.runtime.task.exception.DisruptorTaskExceptionHandler;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+public class ProcessorTask extends Task implements TaskComponent {
+
+ private Disruptor processorDisruptor;
+ private TaskEventConsumer[] eventConsumers;
+
+ private final PipeParameters parameters;
+ private final int processParallelismNum;
+ private final TaskEventCollector collector;
+
+ public ProcessorTask(final Map processorAttributes, final SinkTask sinkTask) {
+ this.parameters = new PipeParameters(processorAttributes);
+ this.processParallelismNum =
+ this.parameters.getIntOrDefault(
+ TaskRuntimeOptions.TASK_PROCESS_PARALLELISM_NUM.key(),
+ TaskRuntimeOptions.TASK_PROCESS_PARALLELISM_NUM.value());
+ this.collector = new TaskEventCollector(sinkTask.getSinkRingBuffer());
+
+ this.initProcessorDisruptor();
+ }
+
+ @Override
+ public void create() {
+ if (this.processorDisruptor == null) {
+ this.initProcessorDisruptor();
+ }
+
+ this.eventConsumers =
+ this.getConsumer(
+ this.createInstance(DoNothingProcessor.class), processParallelismNum, collector);
+
+ this.processorDisruptor.setDefaultExceptionHandler(new DisruptorTaskExceptionHandler());
+ this.processorDisruptor.handleEventsWithWorkerPool(this.eventConsumers);
+ this.processorDisruptor.start();
+ }
+
+ private void initProcessorDisruptor() {
+ this.processorDisruptor =
+ new Disruptor<>(
+ TaskEventContainer::new,
+ this.parameters.getIntOrDefault(
+ TaskRuntimeOptions.TASK_PROCESSOR_RING_BUFFER_SIZE.key(),
+ TaskRuntimeOptions.TASK_PROCESSOR_RING_BUFFER_SIZE.value()),
+ DaemonThreadFactory.INSTANCE,
+ ProducerType.MULTI,
+ new BlockingWaitStrategy());
+ }
+
+ @Override
+ public void start() throws Exception {
+ for (final TaskEventConsumer consumer : this.eventConsumers) {
+ consumer.getConsumerController().resume();
+ }
+ }
+
+ @Override
+ public void stop() {
+ for (final TaskEventConsumer consumer : this.eventConsumers) {
+ consumer.getConsumerController().pause();
+ }
+ }
+
+ @Override
+ public void drop() {
+ if (this.processorDisruptor != null) {
+ this.processorDisruptor.shutdown();
+ this.processorDisruptor = null;
+ }
+ }
+
+ public Optional> getProcessorRingBuffer() {
+ if (this.processorDisruptor != null) {
+ return Optional.of(this.processorDisruptor.getRingBuffer());
+ }
+ return Optional.empty();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PullSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PullSourceTask.java
new file mode 100644
index 0000000..0e0c19c
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PullSourceTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.def;
+
+import org.apache.iotdb.collector.plugin.source.HttpPullSource;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+public class PullSourceTask extends SourceTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PullSourceTask.class);
+
+ public PullSourceTask(
+ final String taskId,
+ final Map sourceParams,
+ final ProcessorTask processorTask) {
+ super(taskId, sourceParams, processorTask);
+ }
+
+ @Override
+ public void create() {
+ createSourceTask();
+ for (int i = 0; i < sourceParallelismNum; i++) {
+ // use sourceAttribute later
+ try (final HttpPullSource source = createInstance(HttpPullSource.class)) {
+ addSourceTask(source);
+ SOURCE_EXECUTOR_SERVICE
+ .get(taskId)
+ .submit(
+ () -> {
+ try {
+ source.start();
+ while (SOURCE_TASK_STATUS.containsKey(taskId)) {
+ if (SOURCE_TASK_STATUS.get(taskId) == TaskState.Stopped) {
+ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+ continue;
+ }
+
+ final Event event = source.supply();
+ processorTask
+ .getProcessorRingBuffer()
+ .ifPresent(
+ ringBuffer ->
+ ringBuffer.publishEvent(
+ ((container, sequence, o) -> container.setEvent(event)),
+ event));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (final Exception e) {
+ LOGGER.warn("Pull source task failed", e);
+ }
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PushSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PushSourceTask.java
new file mode 100644
index 0000000..176f671
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PushSourceTask.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.def;
+
+import org.apache.iotdb.collector.plugin.source.HttpPushSource;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventCollector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class PushSourceTask extends SourceTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PushSourceTask.class);
+
+ private final TaskEventCollector collector;
+
+ public PushSourceTask(
+ final String taskId,
+ final Map sourceParams,
+ final ProcessorTask processorTask) {
+ super(taskId, sourceParams, processorTask);
+
+ this.collector =
+ new TaskEventCollector(
+ processorTask.getProcessorRingBuffer().isPresent()
+ ? processorTask.getProcessorRingBuffer().get()
+ : null);
+ }
+
+ @Override
+ public void create() {
+ createSourceTask();
+ for (int i = 0; i < sourceParallelismNum; i++) {
+ // use sourceAttribute later
+ try (final HttpPushSource source = new HttpPushSource(collector)) {
+ addSourceTask(source);
+ SOURCE_EXECUTOR_SERVICE
+ .get(taskId)
+ .submit(
+ () -> {
+ try {
+ source.start();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to start push source", e);
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.warn("failed to create instance of HttpSource", e);
+ }
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SinkTask.java
new file mode 100644
index 0000000..e04b230
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SinkTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.def;
+
+import org.apache.iotdb.collector.config.TaskRuntimeOptions;
+import org.apache.iotdb.collector.plugin.sink.SessionSink;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumer;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventContainer;
+import org.apache.iotdb.collector.runtime.task.exception.DisruptorTaskExceptionHandler;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.Map;
+
+public class SinkTask extends Task implements TaskComponent {
+
+ private Disruptor sinkDisruptor;
+ private TaskEventConsumer[] eventConsumers;
+ private final PipeParameters parameters;
+ private final int sinkParallelismNum;
+
+ public SinkTask(final Map processorAttributes) {
+ this.parameters = new PipeParameters(processorAttributes);
+ this.sinkParallelismNum =
+ this.parameters.getIntOrDefault(
+ TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM.key(),
+ TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM.value());
+
+ this.initSinkDisruptor();
+ }
+
+ @Override
+ public void create() {
+ if (this.sinkDisruptor == null) {
+ this.initSinkDisruptor();
+ }
+
+ this.eventConsumers =
+ this.getConsumer(this.createInstance(SessionSink.class), sinkParallelismNum, null);
+
+ this.sinkDisruptor.setDefaultExceptionHandler(new DisruptorTaskExceptionHandler());
+ this.sinkDisruptor.handleEventsWithWorkerPool(this.eventConsumers);
+ this.sinkDisruptor.start();
+ }
+
+ private void initSinkDisruptor() {
+ this.sinkDisruptor =
+ new Disruptor<>(
+ TaskEventContainer::new,
+ this.parameters.getIntOrDefault(
+ TaskRuntimeOptions.TASK_SINK_RING_BUFFER_SIZE.key(),
+ TaskRuntimeOptions.TASK_SINK_RING_BUFFER_SIZE.value()),
+ DaemonThreadFactory.INSTANCE,
+ ProducerType.MULTI,
+ new BlockingWaitStrategy());
+ }
+
+ @Override
+ public void start() throws Exception {
+ for (final TaskEventConsumer consumer : this.eventConsumers) {
+ consumer.getConsumerController().resume();
+ }
+ }
+
+ @Override
+ public void stop() {
+ for (final TaskEventConsumer consumer : this.eventConsumers) {
+ consumer.getConsumerController().pause();
+ }
+ }
+
+ @Override
+ public void drop() {
+ if (this.sinkDisruptor != null) {
+ this.sinkDisruptor.shutdown();
+ this.sinkDisruptor = null;
+ }
+ }
+
+ public RingBuffer getSinkRingBuffer() {
+ return this.sinkDisruptor.getRingBuffer();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SourceTask.java
new file mode 100644
index 0000000..2a822ca
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SourceTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.def;
+
+import org.apache.iotdb.collector.config.TaskRuntimeOptions;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
+import org.apache.iotdb.pipe.api.PipeSource;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class SourceTask extends Task implements TaskComponent {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SourceTask.class);
+
+ // Source needs the processor's RingBuffer to publish event.
+ protected final ProcessorTask processorTask;
+ protected final int sourceParallelismNum;
+ protected final String taskId;
+ // Store the status of the tasks, Running or Stopped
+ protected static final Map SOURCE_TASK_STATUS = new ConcurrentHashMap<>();
+ protected static final Map SOURCE_EXECUTOR_SERVICE =
+ new ConcurrentHashMap<>();
+ // Source tasks list
+ protected static final Map> SOURCE_TASK = new ConcurrentHashMap<>();
+
+ public SourceTask(
+ final String taskId,
+ final Map sourceParams,
+ final ProcessorTask processorTask) {
+ this.taskId = taskId;
+ final PipeParameters params = new PipeParameters(sourceParams);
+ this.sourceParallelismNum =
+ params.getIntOrDefault(
+ TaskRuntimeOptions.TASK_SOURCE_PARALLELISM_NUM.key(),
+ TaskRuntimeOptions.TASK_SOURCE_PARALLELISM_NUM.value());
+ this.processorTask = processorTask;
+ }
+
+ @Override
+ public void start() throws Exception {
+ SOURCE_TASK_STATUS.computeIfPresent(taskId, (taskId, status) -> TaskState.Running);
+ SOURCE_TASK
+ .get(taskId)
+ .forEach(
+ source -> {
+ try {
+ source.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void stop() {
+ SOURCE_TASK_STATUS.computeIfPresent(taskId, (taskId, status) -> TaskState.Stopped);
+ SOURCE_TASK
+ .get(taskId)
+ .forEach(
+ source -> {
+ try {
+ source.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close source", e);
+ }
+ });
+ }
+
+ @Override
+ public void drop() {
+ stop();
+ SOURCE_TASK.remove(taskId);
+ SOURCE_TASK_STATUS.remove(taskId);
+ SOURCE_EXECUTOR_SERVICE.remove(taskId).shutdown();
+ }
+
+ protected void addSourceTask(final PipeSource source) {
+ if (!SOURCE_TASK.containsKey(taskId)) {
+ SOURCE_TASK.put(taskId, new CopyOnWriteArrayList<>());
+ }
+ SOURCE_TASK.get(taskId).add(source);
+ }
+
+ protected void createSourceTask() {
+ SOURCE_EXECUTOR_SERVICE.putIfAbsent(
+ taskId,
+ new WrappedThreadPoolExecutor(
+ sourceParallelismNum,
+ sourceParallelismNum,
+ 0L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new IoTThreadFactory("source-executor"),
+ "source-executor"));
+ SOURCE_TASK_STATUS.putIfAbsent(taskId, TaskState.Running);
+ }
+
+ protected enum TaskState {
+ Running,
+ Stopped
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/Task.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/Task.java
new file mode 100644
index 0000000..8468795
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/Task.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.def;
+
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventCollector;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumer;
+import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumerController;
+import org.apache.iotdb.pipe.api.PipePlugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+public abstract class Task {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Task.class);
+
+ protected TaskEventConsumer[] getConsumer(
+ final PipePlugin plugin, final int consumerNum, final TaskEventCollector collector) {
+ return Stream.generate(() -> createConsumer(plugin, collector))
+ .limit(consumerNum)
+ .toArray(TaskEventConsumer[]::new);
+ }
+
+ private TaskEventConsumer createConsumer(
+ final PipePlugin plugin, final TaskEventCollector collector) {
+ return Objects.nonNull(collector)
+ ? new TaskEventConsumer(plugin, collector, new TaskEventConsumerController())
+ : new TaskEventConsumer(plugin, new TaskEventConsumerController());
+ }
+
+ protected T createInstance(final Class clazz) {
+ try {
+ final Constructor constructor = clazz.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ return constructor.newInstance();
+ } catch (final NoSuchMethodException e) {
+ LOGGER.warn("class {} is abstract class.", clazz, e);
+ } catch (final IllegalAccessException e) {
+ LOGGER.warn("failed to visit class {} constructor method.", clazz, e);
+ } catch (final InstantiationException e) {
+ LOGGER.warn("failed to instantiate class {}.", clazz, e);
+ } catch (final InvocationTargetException e) {
+ LOGGER.warn("the constructor threw an exception.", e);
+ }
+ throw new RuntimeException();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskComponent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskComponent.java
new file mode 100644
index 0000000..729bc6d
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskComponent.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.collector.runtime.task.def;
+
+public interface TaskComponent {
+ void create() throws Exception;
+
+ void start() throws Exception;
+
+ void stop();
+
+ void drop();
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskRepository.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskRepository.java
new file mode 100644
index 0000000..c5a1626
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskRepository.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.def;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskRepository {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskRepository.class);
+
+ private final TaskComponent sourceComponent;
+ private final TaskComponent processorComponent;
+ private final TaskComponent sinkComponent;
+
+ public TaskRepository(
+ final TaskComponent sourceComponent,
+ final TaskComponent processorComponent,
+ final TaskComponent sinkComponent) {
+ this.sourceComponent = sourceComponent;
+ this.processorComponent = processorComponent;
+ this.sinkComponent = sinkComponent;
+ }
+
+ // Disruptor consumers must be started before producers
+ public void create() {
+ try {
+ sinkComponent.create();
+ processorComponent.create();
+ sourceComponent.create();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to create task", e);
+ }
+ }
+
+ // Disruptor consumers must be started before producers
+ public void start() {
+ try {
+ sinkComponent.start();
+ processorComponent.start();
+ sourceComponent.start();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to start task", e);
+ }
+ }
+
+ public void stop() {
+ try {
+ sourceComponent.stop();
+ processorComponent.stop();
+ sinkComponent.stop();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to stop task", e);
+ }
+ }
+
+ public void drop() {
+ try {
+ sourceComponent.drop();
+ processorComponent.drop();
+ sinkComponent.drop();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to drop task", e);
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/exception/DisruptorTaskExceptionHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/exception/DisruptorTaskExceptionHandler.java
new file mode 100644
index 0000000..8716d84
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/exception/DisruptorTaskExceptionHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task.exception;
+
+import com.lmax.disruptor.ExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DisruptorTaskExceptionHandler implements ExceptionHandler