diff --git a/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id b/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id new file mode 100644 index 0000000..eb1bfab --- /dev/null +++ b/.mvn/.gradle-enterprise/gradle-enterprise-workspace-id @@ -0,0 +1 @@ +kl7k3ragjregzk2gtxfrzupvie \ No newline at end of file diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java index 61daa03..b64f017 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java @@ -25,7 +25,6 @@ import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest; import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest; import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest; -import org.apache.iotdb.collector.service.RuntimeService; import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; @@ -35,30 +34,30 @@ public class PluginApiServiceImpl extends PluginApiService { @Override public Response createPlugin( final CreatePluginRequest createPluginRequest, final SecurityContext securityContext) { - return Response.ok("create plugin").entity(RuntimeService.plugin().createPlugin()).build(); + return Response.ok("create plugin").build(); } @Override public Response alterPlugin( final AlterPluginRequest alterPluginRequest, final SecurityContext securityContext) { - return Response.ok("alter plugin").entity(RuntimeService.plugin().alterPlugin()).build(); + return Response.ok("alter plugin").build(); } @Override public Response startPlugin( final StartPluginRequest startPluginRequest, final SecurityContext securityContext) { - return Response.ok("start plugin").entity(RuntimeService.plugin().startPlugin()).build(); + return Response.ok("start plugin").build(); } @Override public Response stopPlugin( final StopPluginRequest stopPluginRequest, final SecurityContext securityContext) { - return Response.ok("stop plugin").entity(RuntimeService.plugin().stopPlugin()).build(); + return Response.ok("stop plugin").build(); } @Override public Response dropPlugin( final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) { - return Response.ok("drop plugin").entity(RuntimeService.plugin().dropPlugin()).build(); + return Response.ok("drop plugin").build(); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java index 94a0aca..053b618 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java @@ -37,12 +37,15 @@ public Response createTask( final CreateTaskRequest createTaskRequest, final SecurityContext securityContext) { TaskApiServiceRequestValidationHandler.validateCreateRequest(createTaskRequest); - return RuntimeService.task() - .createTask( - createTaskRequest.getTaskId(), - createTaskRequest.getSourceAttribute(), - createTaskRequest.getProcessorAttribute(), - createTaskRequest.getSinkAttribute()); + return RuntimeService.task().isPresent() + ? RuntimeService.task() + .get() + .createTask( + createTaskRequest.getTaskId(), + createTaskRequest.getSourceAttribute(), + createTaskRequest.getProcessorAttribute(), + createTaskRequest.getSinkAttribute()) + : Response.serverError().entity("Task runtime is down").build(); } @Override @@ -56,7 +59,9 @@ public Response startTask( final StartTaskRequest startTaskRequest, final SecurityContext securityContext) { TaskApiServiceRequestValidationHandler.validateStartRequest(startTaskRequest); - return RuntimeService.task().startTask(startTaskRequest.getTaskId()); + return RuntimeService.task().isPresent() + ? RuntimeService.task().get().startTask(startTaskRequest.getTaskId()) + : Response.serverError().entity("Task runtime is down").build(); } @Override @@ -64,7 +69,9 @@ public Response stopTask( final StopTaskRequest stopTaskRequest, final SecurityContext securityContext) { TaskApiServiceRequestValidationHandler.validateStopRequest(stopTaskRequest); - return RuntimeService.task().stopTask(stopTaskRequest.getTaskId()); + return RuntimeService.task().isPresent() + ? RuntimeService.task().get().stopTask(stopTaskRequest.getTaskId()) + : Response.serverError().entity("Task runtime is down").build(); } @Override @@ -72,6 +79,8 @@ public Response dropTask( final DropTaskRequest dropTaskRequest, final SecurityContext securityContext) { TaskApiServiceRequestValidationHandler.validateDropRequest(dropTaskRequest); - return RuntimeService.task().dropTask(dropTaskRequest.getTaskId()); + return RuntimeService.task().isPresent() + ? RuntimeService.task().get().dropTask(dropTaskRequest.getTaskId()) + : Response.serverError().entity("Task runtime is down").build(); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java index 9360248..e067086 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java @@ -34,10 +34,10 @@ public class Options { static { try { - Class.forName("org.apache.iotdb.collector.config.ApiServiceOptions"); - Class.forName("org.apache.iotdb.collector.config.TaskRuntimeOptions"); + Class.forName(ApiServiceOptions.class.getName()); + Class.forName(TaskRuntimeOptions.class.getName()); } catch (final ClassNotFoundException e) { - throw new RuntimeException("Failed to load ApiServiceOptions", e); + throw new RuntimeException("Failed to load options", e); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java deleted file mode 100644 index 097111a..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPullSource.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.plugin.api; - -public abstract class CollectorPullSource implements CollectorSource {} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PullSource.java similarity index 92% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorSource.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PullSource.java index 2e483e1..e15a493 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PullSource.java @@ -21,4 +21,4 @@ import org.apache.iotdb.pipe.api.PipeSource; -public interface CollectorSource extends StoppablePlugin, PipeSource {} +public abstract class PullSource implements PipeSource {} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PushSource.java similarity index 82% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPushSource.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PushSource.java index 622ed52..e8eebf4 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/CollectorPushSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/PushSource.java @@ -19,14 +19,19 @@ package org.apache.iotdb.collector.plugin.api; +import org.apache.iotdb.pipe.api.PipeSource; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; -public abstract class CollectorPushSource implements CollectorSource { +public abstract class PushSource implements PipeSource { - protected final EventCollector collector; + protected EventCollector collector; - public CollectorPushSource(final EventCollector collector) { + public PushSource() { + this.collector = null; + } + + public final void setCollector(final EventCollector collector) { this.collector = collector; } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java deleted file mode 100644 index 8a97564..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/StoppablePlugin.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.plugin.api; - -public interface StoppablePlugin { - void stop() throws Exception; -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorProcessorRuntimeConfiguration.java similarity index 56% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorProcessorRuntimeConfiguration.java index 1da84bc..c135801 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/RuntimeConfig.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorProcessorRuntimeConfiguration.java @@ -17,47 +17,26 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.api; +package org.apache.iotdb.collector.plugin.api.customizer; -import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; -public class RuntimeConfig implements PipeExtractorRuntimeConfiguration { +public class CollectorProcessorRuntimeConfiguration implements PipeProcessorRuntimeConfiguration { - public static class RuntimeEnvironment implements PipeRuntimeEnvironment { + private final CollectorRuntimeEnvironment runtimeEnvironment; - public int getParallelism() { - return 0; - } - - public int getParallelismIndex() { - return 0; - } - - @Override - public String getPipeName() { - return ""; - } - - @Override - public long getCreationTime() { - return 0; - } + public CollectorProcessorRuntimeConfiguration( + final String pipeName, + final long creationTime, + final int parallelism, + final int instanceIndex) { + runtimeEnvironment = + new CollectorRuntimeEnvironment(pipeName, creationTime, parallelism, instanceIndex); } @Override public PipeRuntimeEnvironment getRuntimeEnvironment() { - return new PipeRuntimeEnvironment() { - - @Override - public String getPipeName() { - return ""; - } - - @Override - public long getCreationTime() { - return 0; - } - }; + return runtimeEnvironment; } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorRuntimeEnvironment.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorRuntimeEnvironment.java new file mode 100644 index 0000000..8ace81c --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorRuntimeEnvironment.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.api.customizer; + +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; + +public class CollectorRuntimeEnvironment implements PipeRuntimeEnvironment { + + private final String pipeName; + private final long creationTime; + private final int parallelism; + private final int instanceIndex; + + public CollectorRuntimeEnvironment( + final String pipeName, + final long creationTime, + final int parallelism, + final int instanceIndex) { + this.pipeName = pipeName; + this.creationTime = creationTime; + this.parallelism = parallelism; + this.instanceIndex = instanceIndex; + } + + @Override + public String getPipeName() { + return pipeName; + } + + @Override + public long getCreationTime() { + return creationTime; + } + + public int getParallelism() { + return parallelism; + } + + public int getInstanceIndex() { + return instanceIndex; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorSinkRuntimeConfiguration.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorSinkRuntimeConfiguration.java new file mode 100644 index 0000000..b709291 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorSinkRuntimeConfiguration.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.api.customizer; + +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration; + +public class CollectorSinkRuntimeConfiguration implements PipeSinkRuntimeConfiguration { + + private final CollectorRuntimeEnvironment runtimeEnvironment; + + public CollectorSinkRuntimeConfiguration( + final String pipeName, + final long creationTime, + final int parallelism, + final int instanceIndex) { + runtimeEnvironment = + new CollectorRuntimeEnvironment(pipeName, creationTime, parallelism, instanceIndex); + } + + @Override + public PipeRuntimeEnvironment getRuntimeEnvironment() { + return runtimeEnvironment; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorSourceRuntimeConfiguration.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorSourceRuntimeConfiguration.java new file mode 100644 index 0000000..93970f9 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorSourceRuntimeConfiguration.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.api.customizer; + +import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; + +public class CollectorSourceRuntimeConfiguration implements PipeSourceRuntimeConfiguration { + + private final CollectorRuntimeEnvironment runtimeEnvironment; + + public CollectorSourceRuntimeConfiguration( + final String pipeName, + final long creationTime, + final int parallelism, + final int instanceIndex) { + runtimeEnvironment = + new CollectorRuntimeEnvironment(pipeName, creationTime, parallelism, instanceIndex); + } + + @Override + public PipeRuntimeEnvironment getRuntimeEnvironment() { + return runtimeEnvironment; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskComponent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/event/CollectorEvent.java similarity index 73% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskComponent.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/event/CollectorEvent.java index 729bc6d..7fa251e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskComponent.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/event/CollectorEvent.java @@ -16,14 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.collector.runtime.task.def; -public interface TaskComponent { - void create() throws Exception; +package org.apache.iotdb.collector.plugin.api.event; - void start() throws Exception; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; - void stop(); +public abstract class CollectorEvent implements Event { - void drop(); + public abstract TabletInsertionEvent toTabletInsertionEvent(); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/event/SourceEvent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/event/DemoEvent.java similarity index 71% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/event/SourceEvent.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/event/DemoEvent.java index 0c097ce..451ee88 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/event/SourceEvent.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/event/DemoEvent.java @@ -17,29 +17,24 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.event; +package org.apache.iotdb.collector.plugin.api.event; import org.apache.iotdb.pipe.api.event.Event; -public class SourceEvent implements Event { - private String name; +public class DemoEvent implements Event { - public SourceEvent() {} + private final String value; - public SourceEvent(final String name) { - this.name = name; + public DemoEvent(final String value) { + this.value = value; } - public String getName() { - return name; - } - - public void setName(final String name) { - this.name = name; + public String getValue() { + return value; } @Override public String toString() { - return "SourceEvent [name=" + name + "]"; + return "DemoEvent [value = " + value + "]"; } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinPlugin.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java similarity index 57% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinPlugin.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java index 683a2c4..203570c 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/BuiltinPlugin.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/BuiltinPlugin.java @@ -17,31 +17,26 @@ * under the License. */ -package org.apache.iotdb.collector.plugin; +package org.apache.iotdb.collector.plugin.builtin; -import org.apache.iotdb.collector.plugin.processor.DoNothingProcessor; -import org.apache.iotdb.collector.plugin.sink.SessionSink; -import org.apache.iotdb.collector.plugin.source.HttpPullSource; -import org.apache.iotdb.collector.plugin.source.HttpPushSource; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor; +import org.apache.iotdb.collector.plugin.builtin.sink.DemoSink; +import org.apache.iotdb.collector.plugin.builtin.source.HttpPullSource; +import org.apache.iotdb.collector.plugin.builtin.source.HttpPushSource; public enum BuiltinPlugin { - // PushSources + // Push Sources HTTP_PUSH_SOURCE("http-push-source", HttpPushSource.class), - // PullSources + // Pull Sources HTTP_PULL_SOURCE("http-pull-source", HttpPullSource.class), // Processors DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class), // Sinks - IOTDB_SESSION_SINK("iotdb-session-sink", SessionSink.class); + IOTDB_THRIFT_SINK("iotdb-thrift-sink", DemoSink.class); private final String collectorPluginName; private final Class collectorPluginClass; @@ -53,28 +48,15 @@ public enum BuiltinPlugin { this.className = collectorPluginClass.getName(); } - public String getCollectorPluginName() { + public String getPluginName() { return collectorPluginName; } - public Class getCollectorPluginClass() { + public Class getPluginClass() { return collectorPluginClass; } public String getClassName() { return className; } - - public static final Set SHOW_COLLECTOR_PLUGINS_BLACKLIST = - Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList( - // PushSources - HTTP_PUSH_SOURCE.getCollectorPluginName().toUpperCase(), - // PullSources - HTTP_PULL_SOURCE.getCollectorPluginName().toUpperCase(), - // Processors - DO_NOTHING_PROCESSOR.getCollectorPluginName().toUpperCase(), - // Sinks - IOTDB_SESSION_SINK.getCollectorPluginName().toUpperCase()))); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/processor/DoNothingProcessor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java similarity index 63% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/processor/DoNothingProcessor.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java index 2bf2ee9..163bf0a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/processor/DoNothingProcessor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/processor/DoNothingProcessor.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.processor; +package org.apache.iotdb.collector.plugin.builtin.processor; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; @@ -26,42 +26,42 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; public class DoNothingProcessor implements PipeProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class); - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - // Do Nothing + public void validate(PipeParameterValidator validator) { + // do nothing } @Override public void customize( - final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration) - throws Exception { - // Do Nothing + PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) { + // do nothing } @Override - public void process( - final TabletInsertionEvent tabletInsertionEvent, final EventCollector eventCollector) - throws Exception { - LOGGER.info("DoNothingProcessor process tabletInsertionEvent: {}", tabletInsertionEvent); + public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) + throws IOException { eventCollector.collect(tabletInsertionEvent); } @Override - public void process(final Event event, final EventCollector eventCollector) throws Exception { - LOGGER.info("DoNothingProcessor process event: {}", event); + public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) + throws IOException { + eventCollector.collect(tsFileInsertionEvent); + } + + @Override + public void process(Event event, EventCollector eventCollector) throws IOException { eventCollector.collect(event); } @Override - public void close() throws Exception { - // Do Nothing + public void close() { + // do nothing } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/sink/SessionSink.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/DemoSink.java similarity index 56% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/sink/SessionSink.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/DemoSink.java index 64966db..8373b86 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/sink/SessionSink.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/DemoSink.java @@ -17,9 +17,8 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.sink; +package org.apache.iotdb.collector.plugin.builtin.sink; -import org.apache.iotdb.collector.plugin.event.SourceEvent; import org.apache.iotdb.pipe.api.PipeSink; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration; @@ -27,44 +26,60 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SessionSink implements PipeSink { +public class DemoSink implements PipeSink { - private static final Logger LOGGER = LoggerFactory.getLogger(SessionSink.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DemoSink.class); @Override - public void validate(final PipeParameterValidator validator) throws Exception {} + public void validate(final PipeParameterValidator validator) { + LOGGER.info("DemoSink validate successfully"); + } + // TODO: SHIT.. @Override - public void customize( - final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) - throws Exception {} + public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) + throws Exception { + LOGGER.info("DemoSink customize successfully"); + } @Override public void customize( - final PipeParameters parameters, final PipeSinkRuntimeConfiguration configuration) - throws Exception {} + final PipeParameters parameters, final PipeSinkRuntimeConfiguration configuration) { + LOGGER.info("DemoSink customize successfully"); + } + + @Override + public void handshake() { + LOGGER.info("DemoSink handshake successfully"); + } @Override - public void handshake() throws Exception { - LOGGER.info("SessionSink handshake successfully"); + public void heartbeat() { + LOGGER.info("DemoSink heartbeat successfully"); } @Override - public void heartbeat() throws Exception {} + public void transfer(final TabletInsertionEvent tabletInsertionEvent) { + LOGGER.info("DemoSink transfer TabletInsertionEvent successfully"); + } @Override - public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {} + public void transfer(TsFileInsertionEvent tsFileInsertionEvent) { + LOGGER.info("DemoSink transfer TsFileInsertionEvent successfully"); + } @Override - public void transfer(final Event event) throws Exception { - final SourceEvent sourceEvent = (SourceEvent) event; - LOGGER.info("SessionSink transfer successfully {}", sourceEvent); + public void transfer(final Event event) { + LOGGER.info("DemoSink transfer successfully {}", event); } @Override - public void close() throws Exception {} + public void close() throws Exception { + LOGGER.info("DemoSink close successfully"); + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPullSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java similarity index 80% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPullSource.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java index 83881a3..c8bfd93 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPullSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java @@ -17,10 +17,10 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.source; +package org.apache.iotdb.collector.plugin.builtin.source; -import org.apache.iotdb.collector.plugin.api.CollectorPullSource; -import org.apache.iotdb.collector.plugin.event.SourceEvent; +import org.apache.iotdb.collector.plugin.api.PullSource; +import org.apache.iotdb.collector.plugin.api.event.DemoEvent; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; -public class HttpPullSource extends CollectorPullSource { +public class HttpPullSource extends PullSource { private static final Logger LOGGER = LoggerFactory.getLogger(HttpPullSource.class); @@ -56,15 +56,12 @@ public void start() throws Exception {} @Override public Event supply() { - final Event event = new SourceEvent(String.valueOf(new Random().nextInt(1000))); - LOGGER.info("event: {} created success", event); - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)); + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(100)); + final Event event = new DemoEvent(String.valueOf(new Random().nextInt(1000))); + LOGGER.info("{} created successfully ...", event); return event; } - @Override - public void stop() throws Exception {} - @Override public void close() {} } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java similarity index 80% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPushSource.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java index 96d03d0..233335c 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/source/HttpPushSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java @@ -17,11 +17,10 @@ * under the License. */ -package org.apache.iotdb.collector.plugin.source; +package org.apache.iotdb.collector.plugin.builtin.source; -import org.apache.iotdb.collector.plugin.api.CollectorPushSource; -import org.apache.iotdb.collector.plugin.event.SourceEvent; -import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.collector.plugin.api.PushSource; +import org.apache.iotdb.collector.plugin.api.event.DemoEvent; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.configuration.PipeSourceRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -34,17 +33,13 @@ import java.util.Random; import java.util.concurrent.TimeUnit; -public class HttpPushSource extends CollectorPushSource { +public class HttpPushSource extends PushSource { private static final Logger LOGGER = LoggerFactory.getLogger(HttpPushSource.class); private volatile boolean isStarted = true; private Thread workerThread; - public HttpPushSource(final EventCollector collector) { - super(collector); - } - @Override public void validate(final PipeParameterValidator validator) {} @@ -68,10 +63,10 @@ public void start() { private void doWork() { try { while (isStarted && !Thread.currentThread().isInterrupted()) { - final Event event = new SourceEvent(String.valueOf(new Random().nextInt(1000))); - LOGGER.info("event: {} created success", event); + final Event event = new DemoEvent(String.valueOf(new Random().nextInt(1000))); + LOGGER.info("{} created successfully ...", event); supply(event); - TimeUnit.SECONDS.sleep(2); + TimeUnit.SECONDS.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -80,11 +75,6 @@ private void doWork() { } } - @Override - public void stop() throws Exception { - this.isStarted = false; - } - @Override public void close() { isStarted = false; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginFactory.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginFactory.java deleted file mode 100644 index 89bdca4..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.plugin; - -import org.apache.iotdb.collector.plugin.BuiltinPlugin; -import org.apache.iotdb.collector.plugin.processor.DoNothingProcessor; -import org.apache.iotdb.collector.plugin.sink.SessionSink; -import org.apache.iotdb.collector.plugin.source.HttpPullSource; -import org.apache.iotdb.pipe.api.PipePlugin; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; - -public class PluginFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(PluginFactory.class); - - protected final Map> pluginConstructors = new HashMap<>(); - - public PluginFactory() { - initFactory(); - } - - private void initFactory() { - pluginConstructors.put( - BuiltinPlugin.HTTP_PULL_SOURCE.getCollectorPluginName(), HttpPullSource::new); - pluginConstructors.put( - BuiltinPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName(), DoNothingProcessor::new); - pluginConstructors.put( - BuiltinPlugin.IOTDB_SESSION_SINK.getCollectorPluginName(), SessionSink::new); - LOGGER.info("builtin plugin has been initialized"); - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java index 52f49e1..adbad7d 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java @@ -19,7 +19,52 @@ package org.apache.iotdb.collector.runtime.plugin; -public class PluginRuntime { +import org.apache.iotdb.collector.runtime.plugin.constructor.ProcessorConstructor; +import org.apache.iotdb.collector.runtime.plugin.constructor.SinkConstructor; +import org.apache.iotdb.collector.runtime.plugin.constructor.SourceConstructor; +import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.PipeSink; +import org.apache.iotdb.pipe.api.PipeSource; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +public class PluginRuntime implements AutoCloseable { + + private final PluginMetaKeeper metaKeeper; + private final SourceConstructor sourceConstructor; + private final ProcessorConstructor processorConstructor; + private final SinkConstructor sinkConstructor; + + public PluginRuntime() { + this.metaKeeper = new PluginMetaKeeper(); + this.sourceConstructor = new SourceConstructor(metaKeeper); + this.processorConstructor = new ProcessorConstructor(metaKeeper); + this.sinkConstructor = new SinkConstructor(metaKeeper); + } + + public PipeSource constructSource(final PipeParameters sourceParameters) { + return sourceConstructor.reflectPlugin(sourceParameters); + } + + public boolean isPullSource(final PipeParameters sourceParameters) throws Exception { + try (final PipeSource source = constructSource(sourceParameters)) { + return sourceConstructor.isPullSource(source); + } + } + + public boolean isPushSource(final PipeParameters sourceParameters) throws Exception { + try (final PipeSource source = constructSource(sourceParameters)) { + return sourceConstructor.isPushSource(source); + } + } + + public PipeProcessor constructProcessor(final PipeParameters processorParameters) { + return processorConstructor.reflectPlugin(processorParameters); + } + + public PipeSink constructSink(final PipeParameters sinkParameters) { + return sinkConstructor.reflectPlugin(sinkParameters); + } public boolean createPlugin() { return true; @@ -40,4 +85,7 @@ public boolean stopPlugin() { public boolean dropPlugin() { return true; } + + @Override + public void close() throws Exception {} } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java new file mode 100644 index 0000000..c64ef39 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.constructor; + +import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta; +import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; +import org.apache.iotdb.pipe.api.PipePlugin; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public abstract class PluginConstructor { + + private static final Logger LOGGER = LoggerFactory.getLogger(PluginConstructor.class); + + private final PluginMetaKeeper pluginMetaKeeper; + + protected final Map> pluginConstructors = new HashMap<>(); + + protected PluginConstructor(PluginMetaKeeper pluginMetaKeeper) { + this.pluginMetaKeeper = pluginMetaKeeper; + initConstructors(); + } + + // New plugins shall be put here + protected abstract void initConstructors(); + + public abstract PipePlugin reflectPlugin(PipeParameters pipeParameters); + + public PipePlugin reflectPluginByKey(String pluginKey) { + return pluginConstructors.getOrDefault(pluginKey, () -> reflect(pluginKey)).get(); + } + + private PipePlugin reflect(String pluginName) { + if (pluginMetaKeeper == null) { + throw new PipeException( + "Failed to reflect PipePlugin instance, because PluginMetaKeeper is null."); + } + + if (pluginName == null) { + throw new PipeException( + "Failed to reflect PipePlugin instance, because plugin name is null."); + } + + final PluginMeta information = pluginMetaKeeper.getPipePluginMeta(pluginName); + if (information == null) { + String errorMessage = + String.format( + "Failed to reflect PipePlugin instance, because " + + "PipePlugin %s has not been registered.", + pluginName.toUpperCase()); + LOGGER.warn(errorMessage); + throw new PipeException(errorMessage); + } + + try { + final Class pluginClass = + information.isBuiltin() + ? pluginMetaKeeper.getBuiltinPluginClass(information.getPluginName()) + : Class.forName(information.getClassName()); // TODO + return (PipePlugin) pluginClass.getDeclaredConstructor().newInstance(); + } catch (InstantiationException + | InvocationTargetException + | NoSuchMethodException + | IllegalAccessException + | ClassNotFoundException e) { + String errorMessage = + String.format( + "Failed to reflect PipePlugin %s(%s) instance, because %s", + pluginName, information.getClassName(), e); + LOGGER.warn(errorMessage, e); + throw new PipeException(errorMessage); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/ProcessorConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/ProcessorConstructor.java new file mode 100644 index 0000000..43620b3 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/ProcessorConstructor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.constructor; + +import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin; +import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor; +import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +public class ProcessorConstructor extends PluginConstructor { + + public ProcessorConstructor(PluginMetaKeeper pluginMetaKeeper) { + super(pluginMetaKeeper); + } + + @Override + protected void initConstructors() { + pluginConstructors.put( + BuiltinPlugin.DO_NOTHING_PROCESSOR.getPluginName(), DoNothingProcessor::new); + } + + @Override + public final PipeProcessor reflectPlugin(PipeParameters processorParameters) { + return (PipeProcessor) + reflectPluginByKey( + processorParameters + .getStringOrDefault("processor", BuiltinPlugin.DO_NOTHING_PROCESSOR.getPluginName()) + .toLowerCase()); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java new file mode 100644 index 0000000..066cf08 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.constructor; + +import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin; +import org.apache.iotdb.collector.plugin.builtin.sink.DemoSink; +import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; +import org.apache.iotdb.pipe.api.PipeSink; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +public class SinkConstructor extends PluginConstructor { + + public SinkConstructor(PluginMetaKeeper pluginMetaKeeper) { + super(pluginMetaKeeper); + } + + @Override + protected void initConstructors() { + pluginConstructors.put(BuiltinPlugin.IOTDB_THRIFT_SINK.getPluginName(), DemoSink::new); + } + + @Override + public final PipeSink reflectPlugin(PipeParameters sinkParameters) { + if (sinkParameters.hasAttribute("sink")) { + throw new IllegalArgumentException("sink attribute is required"); + } + + return (PipeSink) + reflectPluginByKey( + sinkParameters + .getStringOrDefault("sink", BuiltinPlugin.IOTDB_THRIFT_SINK.getPluginName()) + .toLowerCase()); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java new file mode 100644 index 0000000..f4c2cd0 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.constructor; + +import org.apache.iotdb.collector.plugin.api.PullSource; +import org.apache.iotdb.collector.plugin.api.PushSource; +import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin; +import org.apache.iotdb.collector.plugin.builtin.source.HttpPullSource; +import org.apache.iotdb.collector.plugin.builtin.source.HttpPushSource; +import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; +import org.apache.iotdb.pipe.api.PipeSource; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +public class SourceConstructor extends PluginConstructor { + + public SourceConstructor(PluginMetaKeeper pluginMetaKeeper) { + super(pluginMetaKeeper); + } + + @Override + protected void initConstructors() { + pluginConstructors.put(BuiltinPlugin.HTTP_PULL_SOURCE.getPluginName(), HttpPullSource::new); + pluginConstructors.put(BuiltinPlugin.HTTP_PUSH_SOURCE.getPluginName(), HttpPushSource::new); + } + + @Override + public final PipeSource reflectPlugin(PipeParameters sourceParameters) { + if (sourceParameters.hasAttribute("source")) { + throw new IllegalArgumentException("source attribute is required"); + } + + return (PipeSource) reflectPluginByKey(sourceParameters.getString("source").toLowerCase()); + } + + public boolean isPullSource(PipeSource source) { + return source instanceof PullSource; + } + + public boolean isPushSource(PipeSource source) { + return source instanceof PushSource; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMeta.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMeta.java new file mode 100644 index 0000000..2c9b6cc --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMeta.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.meta; + +import java.util.Objects; + +public class PluginMeta { + + private final String pluginName; + private final String className; + + // jarName and jarMD5 are used to identify the jar file. + // they could be null if the plugin is built-in. they should be both null or both not null. + private final boolean isBuiltin; + private final String jarName; + private final String jarMD5; + + public PluginMeta( + String pluginName, String className, boolean isBuiltin, String jarName, String jarMD5) { + this.pluginName = Objects.requireNonNull(pluginName).toUpperCase(); + this.className = Objects.requireNonNull(className); + + this.isBuiltin = isBuiltin; + if (isBuiltin) { + this.jarName = jarName; + this.jarMD5 = jarMD5; + } else { + this.jarName = Objects.requireNonNull(jarName); + this.jarMD5 = Objects.requireNonNull(jarMD5); + } + } + + public PluginMeta(String pluginName, String className) { + this.pluginName = Objects.requireNonNull(pluginName).toUpperCase(); + this.className = Objects.requireNonNull(className); + + this.isBuiltin = true; + this.jarName = null; + this.jarMD5 = null; + } + + public boolean isBuiltin() { + return isBuiltin; + } + + public String getPluginName() { + return pluginName; + } + + public String getClassName() { + return className; + } + + public String getJarName() { + return jarName; + } + + public String getJarMD5() { + return jarMD5; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PluginMeta that = (PluginMeta) obj; + return pluginName.equals(that.pluginName) + && className.equals(that.className) + && isBuiltin == that.isBuiltin + && Objects.equals(jarName, that.jarName) + && Objects.equals(jarMD5, that.jarMD5); + } + + @Override + public int hashCode() { + return pluginName.hashCode(); + } + + @Override + public String toString() { + return "PluginMeta{" + + "pluginName='" + + pluginName + + '\'' + + ", className='" + + className + + '\'' + + ", isBuiltin=" + + isBuiltin + + ", jarName='" + + jarName + + '\'' + + ", jarMD5='" + + jarMD5 + + '\'' + + '}'; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java new file mode 100644 index 0000000..4d9ddfd --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.meta; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +public class PluginMetaKeeper { + + protected final Map pipePluginNameToMetaMap = new ConcurrentHashMap<>(); + protected final Map> builtinPipePluginNameToClassMap = new ConcurrentHashMap<>(); + + public PluginMetaKeeper() { + loadBuiltinPlugins(); + } + + private void loadBuiltinPlugins() { + // for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) { + // final String pipePluginName = builtinPipePlugin.getPipePluginName(); + // final Class pipePluginClass = builtinPipePlugin.getPipePluginClass(); + // final String className = builtinPipePlugin.getClassName(); + // + // addPipePluginMeta(pipePluginName, new PluginMeta(pipePluginName, className)); + // addBuiltinPluginClass(pipePluginName, pipePluginClass); + // addPipePluginVisibility( + // pipePluginName, VisibilityUtils.calculateFromPluginClass(pipePluginClass)); + // } + } + + public void addPipePluginMeta(String pluginName, PluginMeta pluginMeta) { + pipePluginNameToMetaMap.put(pluginName.toUpperCase(), pluginMeta); + } + + public void removePipePluginMeta(String pluginName) { + pipePluginNameToMetaMap.remove(pluginName.toUpperCase()); + } + + public PluginMeta getPipePluginMeta(String pluginName) { + return pipePluginNameToMetaMap.get(pluginName.toUpperCase()); + } + + public Iterable getAllPipePluginMeta() { + return pipePluginNameToMetaMap.values(); + } + + public boolean containsPipePlugin(String pluginName) { + return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase()); + } + + private void addBuiltinPluginClass(String pluginName, Class builtinPipePluginClass) { + builtinPipePluginNameToClassMap.put(pluginName.toUpperCase(), builtinPipePluginClass); + } + + public Class getBuiltinPluginClass(String pluginName) { + return builtinPipePluginNameToClassMap.get(pluginName.toUpperCase()); + } + + public String getPluginNameByJarName(String jarName) { + for (Map.Entry entry : pipePluginNameToMetaMap.entrySet()) { + if (jarName.equals(entry.getValue().getJarName())) { + return entry.getKey(); + } + } + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PluginMetaKeeper that = (PluginMetaKeeper) o; + return pipePluginNameToMetaMap.equals(that.pipePluginNameToMetaMap); + } + + @Override + public int hashCode() { + return Objects.hash(pipePluginNameToMetaMap); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java new file mode 100644 index 0000000..ac270ce --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task; + +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +public abstract class Task { + + protected final String taskId; + protected final PipeParameters parameters; + + protected final int parallelism; + + private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L; + protected final AtomicBoolean isRunning = new AtomicBoolean(false); + protected final AtomicBoolean isDropped = new AtomicBoolean(false); + + protected Task( + final String taskId, + final Map attributes, + final String parallelismKey, + final int parallelismValue) { + this.taskId = taskId; + this.parameters = new PipeParameters(attributes); + + this.parallelism = parameters.getIntOrDefault(parallelismKey, parallelismValue); + } + + public void resume() { + isRunning.set(true); + } + + public void pause() { + isRunning.set(false); + } + + protected void waitUntilRunningOrDropped() { + while (!isRunning.get() && !isDropped.get()) { + LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS); + } + } + + public final synchronized void create() throws Exception { + resume(); + createInternal(); + } + + public abstract void createInternal() throws Exception; + + public final synchronized void start() throws Exception { + resume(); + startInternal(); + } + + public abstract void startInternal() throws Exception; + + public final synchronized void stop() throws Exception { + pause(); + stopInternal(); + } + + public abstract void stopInternal() throws Exception; + + public final synchronized void drop() throws Exception { + pause(); + isDropped.set(true); + dropInternal(); + } + + public abstract void dropInternal() throws Exception; +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskCombiner.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskCombiner.java new file mode 100644 index 0000000..c835bc9 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskCombiner.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskCombiner { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskCombiner.class); + + private final Task source; + private final Task processor; + private final Task sink; + + public TaskCombiner(final Task source, final Task processor, final Task sink) { + this.source = source; + this.processor = processor; + this.sink = sink; + } + + public void create() throws Exception { + sink.create(); + processor.create(); + source.create(); + } + + public void start() throws Exception { + sink.start(); + processor.start(); + source.start(); + } + + public void stop() throws Exception { + source.stop(); + processor.stop(); + sink.stop(); + } + + public void drop() throws Exception { + stop(); + + source.drop(); + processor.drop(); + sink.drop(); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java index 3754f5f..ec3a4f1 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java @@ -19,11 +19,9 @@ package org.apache.iotdb.collector.runtime.task; -import org.apache.iotdb.collector.runtime.task.def.ProcessorTask; -import org.apache.iotdb.collector.runtime.task.def.PushSourceTask; -import org.apache.iotdb.collector.runtime.task.def.SinkTask; -import org.apache.iotdb.collector.runtime.task.def.SourceTask; -import org.apache.iotdb.collector.runtime.task.def.TaskRepository; +import org.apache.iotdb.collector.runtime.task.processor.ProcessorTask; +import org.apache.iotdb.collector.runtime.task.sink.SinkTask; +import org.apache.iotdb.collector.runtime.task.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,106 +31,119 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class TaskRuntime { +public class TaskRuntime implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TaskRuntime.class); - private static final Map TASK_REPOSITORY_MAP = new ConcurrentHashMap<>(); + private final Map tasks = new ConcurrentHashMap<>(); - public Response createTask( + public synchronized Response createTask( final String taskId, final Map sourceAttribute, final Map processorAttribute, final Map sinkAttribute) { try { - if (validateTaskIsExist(taskId)) { + if (tasks.containsKey(taskId)) { return Response.status(Response.Status.CONFLICT) .entity(String.format("task %s has existed", taskId)) .build(); } - final SinkTask sinkTask = new SinkTask(sinkAttribute); - final ProcessorTask processorTask = new ProcessorTask(processorAttribute, sinkTask); - final SourceTask sourceTask = new PushSourceTask(taskId, sourceAttribute, processorTask); - final TaskRepository taskRepository = new TaskRepository(sourceTask, processorTask, sinkTask); + final SinkTask sinkTask = new SinkTask(taskId, sinkAttribute); + final ProcessorTask processorTask = + new ProcessorTask(taskId, processorAttribute, sinkTask.makeProducer()); + final SourceTask sourceTask = + SourceTask.construct(taskId, sourceAttribute, processorTask.makeProducer()); - TASK_REPOSITORY_MAP.put(taskId, taskRepository); - taskRepository.create(); + final TaskCombiner taskCombiner = new TaskCombiner(sourceTask, processorTask, sinkTask); + tasks.put(taskId, taskCombiner); + taskCombiner.create(); LOGGER.info("Successfully created task {}", taskId); return Response.status(Response.Status.CREATED) .entity(String.format("Successfully created task %s", taskId)) .build(); } catch (final Exception e) { - LOGGER.warn("Failed to create task", e); + LOGGER.warn("Failed to create task {} because {}", taskId, e.getMessage(), e); return Response.serverError() .entity(String.format("Failed to create task %s, because %s", taskId, e.getMessage())) .build(); } } - public boolean alterTask() { - return true; - } - - public Response startTask(final String taskId) { - if (!validateTaskIsExist(taskId)) { + public synchronized Response startTask(final String taskId) { + if (!tasks.containsKey(taskId)) { return Response.status(Response.Status.NOT_FOUND) .entity(String.format("task %s not found", taskId)) .build(); } - TASK_REPOSITORY_MAP.get(taskId).start(); - LOGGER.info("Task {} started successfully", taskId); - return Response.status(Response.Status.OK) - .entity(String.format("task %s start successfully", taskId)) - .build(); + try { + tasks.get(taskId).start(); + + LOGGER.info("Task {} start successfully", taskId); + return Response.status(Response.Status.OK) + .entity(String.format("task %s start successfully", taskId)) + .build(); + } catch (Exception e) { + LOGGER.warn("Failed to start task {} because {}", taskId, e.getMessage(), e); + return Response.serverError() + .entity(String.format("Failed to start task %s, because %s", taskId, e.getMessage())) + .build(); + } } - public Response stopTask(final String taskId) { - if (!validateTaskIsExist(taskId)) { + public synchronized Response stopTask(final String taskId) { + if (!tasks.containsKey(taskId)) { return Response.status(Response.Status.NOT_FOUND) .entity(String.format("task %s not found", taskId)) .build(); } try { - final TaskRepository taskRepository = TASK_REPOSITORY_MAP.get(taskId); - if (taskRepository != null) { - taskRepository.stop(); - } + tasks.get(taskId).stop(); + + LOGGER.info("Task {} stop successfully", taskId); + return Response.status(Response.Status.OK) + .entity(String.format("task %s stop successfully", taskId)) + .build(); } catch (final Exception e) { - LOGGER.warn("Failed to stop task", e); + LOGGER.warn("Failed to stop task {} because {}", taskId, e.getMessage(), e); return Response.serverError() .entity(String.format("Failed to stop task %s, because %s", taskId, e.getMessage())) .build(); } - - LOGGER.info("Task {} stopped successfully", taskId); - return Response.status(Response.Status.OK) - .entity(String.format("task %s stop successfully", taskId)) - .build(); } - public Response dropTask(final String taskId) { - if (!validateTaskIsExist(taskId)) { + public synchronized Response dropTask(final String taskId) { + if (!tasks.containsKey(taskId)) { return Response.status(Response.Status.NOT_FOUND) .entity(String.format("task %s not found", taskId)) .build(); } - TASK_REPOSITORY_MAP.remove(taskId).drop(); - LOGGER.info("Task {} dropped successfully", taskId); - return Response.status(Response.Status.OK) - .entity(String.format("task %s drop successfully", taskId)) - .build(); + try { + tasks.remove(taskId).drop(); + + LOGGER.info("Task {} drop successfully", taskId); + return Response.status(Response.Status.OK) + .entity(String.format("task %s drop successfully", taskId)) + .build(); + } catch (final Exception e) { + LOGGER.warn("Failed to drop task {} because {}", taskId, e.getMessage(), e); + return Response.serverError() + .entity(String.format("Failed to drop task %s, because %s", taskId, e.getMessage())) + .build(); + } } - private boolean validateTaskIsExist(final String taskId) { - if (TASK_REPOSITORY_MAP.containsKey(taskId)) { - return true; + @Override + public synchronized void close() throws Exception { + final long currentTime = System.currentTimeMillis(); + for (final TaskCombiner taskCombiner : tasks.values()) { + taskCombiner.drop(); } - LOGGER.warn("Task {} not found", taskId); - return false; + tasks.clear(); + LOGGER.info("Task runtime closed in {}ms", System.currentTimeMillis() - currentTime); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumer.java deleted file mode 100644 index 7f94487..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.datastructure; - -import org.apache.iotdb.pipe.api.PipePlugin; -import org.apache.iotdb.pipe.api.PipeProcessor; -import org.apache.iotdb.pipe.api.PipeSink; - -import com.lmax.disruptor.WorkHandler; - -public class TaskEventConsumer implements WorkHandler { - - private final PipePlugin plugin; - private final TaskEventCollector collector; - private final TaskEventConsumerController consumerController; - - public TaskEventConsumer( - final PipePlugin plugin, - final TaskEventCollector collector, - final TaskEventConsumerController consumerController) { - this.plugin = plugin; - this.collector = collector; - this.consumerController = consumerController; - } - - public TaskEventConsumer( - final PipePlugin plugin, final TaskEventConsumerController consumerController) { - this(plugin, null, consumerController); - } - - @Override - public void onEvent(final TaskEventContainer taskEventContainer) throws Exception { - if (!consumerController.shouldRun()) { - return; - } - if (plugin instanceof PipeProcessor) { - ((PipeProcessor) plugin).process(taskEventContainer.getEvent(), this.collector); - } else if (plugin instanceof PipeSink) { - ((PipeSink) plugin).transfer(taskEventContainer.getEvent()); - } - } - - public TaskEventConsumerController getConsumerController() { - return consumerController; - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/ProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/ProcessorTask.java deleted file mode 100644 index c498280..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/ProcessorTask.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.def; - -import org.apache.iotdb.collector.config.TaskRuntimeOptions; -import org.apache.iotdb.collector.plugin.processor.DoNothingProcessor; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventCollector; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumer; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventContainer; -import org.apache.iotdb.collector.runtime.task.exception.DisruptorTaskExceptionHandler; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; - -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import com.lmax.disruptor.util.DaemonThreadFactory; - -import java.util.Map; -import java.util.Optional; - -public class ProcessorTask extends Task implements TaskComponent { - - private Disruptor processorDisruptor; - private TaskEventConsumer[] eventConsumers; - - private final PipeParameters parameters; - private final int processParallelismNum; - private final TaskEventCollector collector; - - public ProcessorTask(final Map processorAttributes, final SinkTask sinkTask) { - this.parameters = new PipeParameters(processorAttributes); - this.processParallelismNum = - this.parameters.getIntOrDefault( - TaskRuntimeOptions.TASK_PROCESS_PARALLELISM_NUM.key(), - TaskRuntimeOptions.TASK_PROCESS_PARALLELISM_NUM.value()); - this.collector = new TaskEventCollector(sinkTask.getSinkRingBuffer()); - - this.initProcessorDisruptor(); - } - - @Override - public void create() { - if (this.processorDisruptor == null) { - this.initProcessorDisruptor(); - } - - this.eventConsumers = - this.getConsumer( - this.createInstance(DoNothingProcessor.class), processParallelismNum, collector); - - this.processorDisruptor.setDefaultExceptionHandler(new DisruptorTaskExceptionHandler()); - this.processorDisruptor.handleEventsWithWorkerPool(this.eventConsumers); - this.processorDisruptor.start(); - } - - private void initProcessorDisruptor() { - this.processorDisruptor = - new Disruptor<>( - TaskEventContainer::new, - this.parameters.getIntOrDefault( - TaskRuntimeOptions.TASK_PROCESSOR_RING_BUFFER_SIZE.key(), - TaskRuntimeOptions.TASK_PROCESSOR_RING_BUFFER_SIZE.value()), - DaemonThreadFactory.INSTANCE, - ProducerType.MULTI, - new BlockingWaitStrategy()); - } - - @Override - public void start() throws Exception { - for (final TaskEventConsumer consumer : this.eventConsumers) { - consumer.getConsumerController().resume(); - } - } - - @Override - public void stop() { - for (final TaskEventConsumer consumer : this.eventConsumers) { - consumer.getConsumerController().pause(); - } - } - - @Override - public void drop() { - if (this.processorDisruptor != null) { - this.processorDisruptor.shutdown(); - this.processorDisruptor = null; - } - } - - public Optional> getProcessorRingBuffer() { - if (this.processorDisruptor != null) { - return Optional.of(this.processorDisruptor.getRingBuffer()); - } - return Optional.empty(); - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PullSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PullSourceTask.java deleted file mode 100644 index 0e0c19c..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PullSourceTask.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.def; - -import org.apache.iotdb.collector.plugin.source.HttpPullSource; -import org.apache.iotdb.pipe.api.event.Event; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; - -public class PullSourceTask extends SourceTask { - - private static final Logger LOGGER = LoggerFactory.getLogger(PullSourceTask.class); - - public PullSourceTask( - final String taskId, - final Map sourceParams, - final ProcessorTask processorTask) { - super(taskId, sourceParams, processorTask); - } - - @Override - public void create() { - createSourceTask(); - for (int i = 0; i < sourceParallelismNum; i++) { - // use sourceAttribute later - try (final HttpPullSource source = createInstance(HttpPullSource.class)) { - addSourceTask(source); - SOURCE_EXECUTOR_SERVICE - .get(taskId) - .submit( - () -> { - try { - source.start(); - while (SOURCE_TASK_STATUS.containsKey(taskId)) { - if (SOURCE_TASK_STATUS.get(taskId) == TaskState.Stopped) { - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); - continue; - } - - final Event event = source.supply(); - processorTask - .getProcessorRingBuffer() - .ifPresent( - ringBuffer -> - ringBuffer.publishEvent( - ((container, sequence, o) -> container.setEvent(event)), - event)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } catch (final Exception e) { - LOGGER.warn("Pull source task failed", e); - } - } - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PushSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PushSourceTask.java deleted file mode 100644 index 176f671..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/PushSourceTask.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.def; - -import org.apache.iotdb.collector.plugin.source.HttpPushSource; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventCollector; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class PushSourceTask extends SourceTask { - - private static final Logger LOGGER = LoggerFactory.getLogger(PushSourceTask.class); - - private final TaskEventCollector collector; - - public PushSourceTask( - final String taskId, - final Map sourceParams, - final ProcessorTask processorTask) { - super(taskId, sourceParams, processorTask); - - this.collector = - new TaskEventCollector( - processorTask.getProcessorRingBuffer().isPresent() - ? processorTask.getProcessorRingBuffer().get() - : null); - } - - @Override - public void create() { - createSourceTask(); - for (int i = 0; i < sourceParallelismNum; i++) { - // use sourceAttribute later - try (final HttpPushSource source = new HttpPushSource(collector)) { - addSourceTask(source); - SOURCE_EXECUTOR_SERVICE - .get(taskId) - .submit( - () -> { - try { - source.start(); - } catch (final Exception e) { - LOGGER.warn("Failed to start push source", e); - throw new RuntimeException(e); - } - }); - } catch (Exception e) { - LOGGER.warn("failed to create instance of HttpSource", e); - } - } - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SinkTask.java deleted file mode 100644 index e04b230..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SinkTask.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.def; - -import org.apache.iotdb.collector.config.TaskRuntimeOptions; -import org.apache.iotdb.collector.plugin.sink.SessionSink; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumer; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventContainer; -import org.apache.iotdb.collector.runtime.task.exception.DisruptorTaskExceptionHandler; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; - -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import com.lmax.disruptor.util.DaemonThreadFactory; - -import java.util.Map; - -public class SinkTask extends Task implements TaskComponent { - - private Disruptor sinkDisruptor; - private TaskEventConsumer[] eventConsumers; - private final PipeParameters parameters; - private final int sinkParallelismNum; - - public SinkTask(final Map processorAttributes) { - this.parameters = new PipeParameters(processorAttributes); - this.sinkParallelismNum = - this.parameters.getIntOrDefault( - TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM.key(), - TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM.value()); - - this.initSinkDisruptor(); - } - - @Override - public void create() { - if (this.sinkDisruptor == null) { - this.initSinkDisruptor(); - } - - this.eventConsumers = - this.getConsumer(this.createInstance(SessionSink.class), sinkParallelismNum, null); - - this.sinkDisruptor.setDefaultExceptionHandler(new DisruptorTaskExceptionHandler()); - this.sinkDisruptor.handleEventsWithWorkerPool(this.eventConsumers); - this.sinkDisruptor.start(); - } - - private void initSinkDisruptor() { - this.sinkDisruptor = - new Disruptor<>( - TaskEventContainer::new, - this.parameters.getIntOrDefault( - TaskRuntimeOptions.TASK_SINK_RING_BUFFER_SIZE.key(), - TaskRuntimeOptions.TASK_SINK_RING_BUFFER_SIZE.value()), - DaemonThreadFactory.INSTANCE, - ProducerType.MULTI, - new BlockingWaitStrategy()); - } - - @Override - public void start() throws Exception { - for (final TaskEventConsumer consumer : this.eventConsumers) { - consumer.getConsumerController().resume(); - } - } - - @Override - public void stop() { - for (final TaskEventConsumer consumer : this.eventConsumers) { - consumer.getConsumerController().pause(); - } - } - - @Override - public void drop() { - if (this.sinkDisruptor != null) { - this.sinkDisruptor.shutdown(); - this.sinkDisruptor = null; - } - } - - public RingBuffer getSinkRingBuffer() { - return this.sinkDisruptor.getRingBuffer(); - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SourceTask.java deleted file mode 100644 index 2a822ca..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/SourceTask.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.def; - -import org.apache.iotdb.collector.config.TaskRuntimeOptions; -import org.apache.iotdb.commons.concurrent.IoTThreadFactory; -import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; -import org.apache.iotdb.pipe.api.PipeSource; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -public abstract class SourceTask extends Task implements TaskComponent { - - private static final Logger LOGGER = LoggerFactory.getLogger(SourceTask.class); - - // Source needs the processor's RingBuffer to publish event. - protected final ProcessorTask processorTask; - protected final int sourceParallelismNum; - protected final String taskId; - // Store the status of the tasks, Running or Stopped - protected static final Map SOURCE_TASK_STATUS = new ConcurrentHashMap<>(); - protected static final Map SOURCE_EXECUTOR_SERVICE = - new ConcurrentHashMap<>(); - // Source tasks list - protected static final Map> SOURCE_TASK = new ConcurrentHashMap<>(); - - public SourceTask( - final String taskId, - final Map sourceParams, - final ProcessorTask processorTask) { - this.taskId = taskId; - final PipeParameters params = new PipeParameters(sourceParams); - this.sourceParallelismNum = - params.getIntOrDefault( - TaskRuntimeOptions.TASK_SOURCE_PARALLELISM_NUM.key(), - TaskRuntimeOptions.TASK_SOURCE_PARALLELISM_NUM.value()); - this.processorTask = processorTask; - } - - @Override - public void start() throws Exception { - SOURCE_TASK_STATUS.computeIfPresent(taskId, (taskId, status) -> TaskState.Running); - SOURCE_TASK - .get(taskId) - .forEach( - source -> { - try { - source.start(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Override - public synchronized void stop() { - SOURCE_TASK_STATUS.computeIfPresent(taskId, (taskId, status) -> TaskState.Stopped); - SOURCE_TASK - .get(taskId) - .forEach( - source -> { - try { - source.close(); - } catch (Exception e) { - LOGGER.warn("Failed to close source", e); - } - }); - } - - @Override - public void drop() { - stop(); - SOURCE_TASK.remove(taskId); - SOURCE_TASK_STATUS.remove(taskId); - SOURCE_EXECUTOR_SERVICE.remove(taskId).shutdown(); - } - - protected void addSourceTask(final PipeSource source) { - if (!SOURCE_TASK.containsKey(taskId)) { - SOURCE_TASK.put(taskId, new CopyOnWriteArrayList<>()); - } - SOURCE_TASK.get(taskId).add(source); - } - - protected void createSourceTask() { - SOURCE_EXECUTOR_SERVICE.putIfAbsent( - taskId, - new WrappedThreadPoolExecutor( - sourceParallelismNum, - sourceParallelismNum, - 0L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new IoTThreadFactory("source-executor"), - "source-executor")); - SOURCE_TASK_STATUS.putIfAbsent(taskId, TaskState.Running); - } - - protected enum TaskState { - Running, - Stopped - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/Task.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/Task.java deleted file mode 100644 index 8468795..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/Task.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.def; - -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventCollector; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumer; -import org.apache.iotdb.collector.runtime.task.datastructure.TaskEventConsumerController; -import org.apache.iotdb.pipe.api.PipePlugin; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Objects; -import java.util.stream.Stream; - -public abstract class Task { - - private static final Logger LOGGER = LoggerFactory.getLogger(Task.class); - - protected TaskEventConsumer[] getConsumer( - final PipePlugin plugin, final int consumerNum, final TaskEventCollector collector) { - return Stream.generate(() -> createConsumer(plugin, collector)) - .limit(consumerNum) - .toArray(TaskEventConsumer[]::new); - } - - private TaskEventConsumer createConsumer( - final PipePlugin plugin, final TaskEventCollector collector) { - return Objects.nonNull(collector) - ? new TaskEventConsumer(plugin, collector, new TaskEventConsumerController()) - : new TaskEventConsumer(plugin, new TaskEventConsumerController()); - } - - protected T createInstance(final Class clazz) { - try { - final Constructor constructor = clazz.getDeclaredConstructor(); - constructor.setAccessible(true); - return constructor.newInstance(); - } catch (final NoSuchMethodException e) { - LOGGER.warn("class {} is abstract class.", clazz, e); - } catch (final IllegalAccessException e) { - LOGGER.warn("failed to visit class {} constructor method.", clazz, e); - } catch (final InstantiationException e) { - LOGGER.warn("failed to instantiate class {}.", clazz, e); - } catch (final InvocationTargetException e) { - LOGGER.warn("the constructor threw an exception.", e); - } - throw new RuntimeException(); - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskRepository.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskRepository.java deleted file mode 100644 index c5a1626..0000000 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/def/TaskRepository.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.collector.runtime.task.def; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TaskRepository { - - private static final Logger LOGGER = LoggerFactory.getLogger(TaskRepository.class); - - private final TaskComponent sourceComponent; - private final TaskComponent processorComponent; - private final TaskComponent sinkComponent; - - public TaskRepository( - final TaskComponent sourceComponent, - final TaskComponent processorComponent, - final TaskComponent sinkComponent) { - this.sourceComponent = sourceComponent; - this.processorComponent = processorComponent; - this.sinkComponent = sinkComponent; - } - - // Disruptor consumers must be started before producers - public void create() { - try { - sinkComponent.create(); - processorComponent.create(); - sourceComponent.create(); - } catch (final Exception e) { - LOGGER.warn("Failed to create task", e); - } - } - - // Disruptor consumers must be started before producers - public void start() { - try { - sinkComponent.start(); - processorComponent.start(); - sourceComponent.start(); - } catch (final Exception e) { - LOGGER.warn("Failed to start task", e); - } - } - - public void stop() { - try { - sourceComponent.stop(); - processorComponent.stop(); - sinkComponent.stop(); - } catch (final Exception e) { - LOGGER.warn("Failed to stop task", e); - } - } - - public void drop() { - try { - sourceComponent.drop(); - processorComponent.drop(); - sinkComponent.drop(); - } catch (final Exception e) { - LOGGER.warn("Failed to drop task", e); - } - } -} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventCollector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/EventCollector.java similarity index 67% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventCollector.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/EventCollector.java index 43886a5..a96b61a 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventCollector.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/EventCollector.java @@ -17,27 +17,25 @@ * under the License. */ -package org.apache.iotdb.collector.runtime.task.datastructure; +package org.apache.iotdb.collector.runtime.task.event; -import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.event.Event; import com.lmax.disruptor.RingBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class TaskEventCollector implements EventCollector { +import javax.annotation.concurrent.ThreadSafe; - private static final Logger LOGGER = LoggerFactory.getLogger(TaskEventCollector.class); - private final RingBuffer ringBuffer; +@ThreadSafe +public class EventCollector implements org.apache.iotdb.pipe.api.collector.EventCollector { - public TaskEventCollector(final RingBuffer ringBuffer) { + private final RingBuffer ringBuffer; + + public EventCollector(final RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } @Override public void collect(final Event event) { ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event); - LOGGER.info("successfully publish event {}", event); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventContainer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/EventContainer.java similarity index 87% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventContainer.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/EventContainer.java index 8ac42fd..99817e7 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventContainer.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/event/EventContainer.java @@ -17,14 +17,13 @@ * under the License. */ -package org.apache.iotdb.collector.runtime.task.datastructure; +package org.apache.iotdb.collector.runtime.task.event; import org.apache.iotdb.pipe.api.event.Event; -public class TaskEventContainer implements Event { - private Event event; +public class EventContainer implements Event { - public TaskEventContainer() {} + private Event event; public Event getEvent() { return event; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java new file mode 100644 index 0000000..c153506 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.processor; + +import org.apache.iotdb.collector.runtime.task.event.EventContainer; +import org.apache.iotdb.pipe.api.PipeProcessor; +import org.apache.iotdb.pipe.api.collector.EventCollector; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +import com.lmax.disruptor.WorkHandler; + +class ProcessorConsumer implements WorkHandler { + + private final PipeProcessor processor; + private final EventCollector eventCollector; + + ProcessorConsumer(final PipeProcessor processor, final EventCollector eventCollector) { + this.processor = processor; + this.eventCollector = eventCollector; + } + + PipeProcessor consumer() { + return processor; + } + + @Override + public void onEvent(final EventContainer eventContainer) throws Exception { + // TODO: retry strategy + final Event event = eventContainer.getEvent(); + if (event instanceof TabletInsertionEvent) { + processor.process((TabletInsertionEvent) event, eventCollector); + } else if (event instanceof TsFileInsertionEvent) { + processor.process((TsFileInsertionEvent) event, eventCollector); + } else if (event != null) { + processor.process(event, eventCollector); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorExceptionHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorExceptionHandler.java new file mode 100644 index 0000000..277cdb0 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorExceptionHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.processor; + +import org.apache.iotdb.collector.runtime.task.event.EventContainer; + +import com.lmax.disruptor.ExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ProcessorExceptionHandler implements ExceptionHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorExceptionHandler.class); + + @Override + public void handleEventException(Throwable ex, long sequence, EventContainer event) { + // TODO: retry strategy + LOGGER.warn("Failed to handle event", ex); + } + + @Override + public void handleOnStartException(Throwable ex) { + LOGGER.warn("Failed to start processor disruptor", ex); + } + + @Override + public void handleOnShutdownException(Throwable ex) { + LOGGER.warn("Failed to shutdown processor disruptor", ex); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java new file mode 100644 index 0000000..e671ea0 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.processor; + +import org.apache.iotdb.collector.plugin.api.customizer.CollectorProcessorRuntimeConfiguration; +import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.task.Task; +import org.apache.iotdb.collector.runtime.task.event.EventCollector; +import org.apache.iotdb.collector.runtime.task.event.EventContainer; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.commons.concurrent.IoTThreadFactory; +import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_PROCESSOR_RING_BUFFER_SIZE; +import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_PROCESS_PARALLELISM_NUM; + +public class ProcessorTask extends Task { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorTask.class); + + private static final Map REGISTERED_EXECUTOR_SERVICES = + new ConcurrentHashMap<>(); + + private final Disruptor disruptor; + private final EventCollector sinkProducer; + private ProcessorConsumer[] processorConsumers; + + public ProcessorTask( + final String taskId, + final Map attributes, + final EventCollector sinkProducer) { + super( + taskId, + attributes, + TASK_PROCESS_PARALLELISM_NUM.key(), + TASK_PROCESS_PARALLELISM_NUM.value()); + + REGISTERED_EXECUTOR_SERVICES.putIfAbsent( + taskId, + new WrappedThreadPoolExecutor( + parallelism, + parallelism, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(parallelism), + new IoTThreadFactory(taskId), // TODO: thread name + taskId)); + + disruptor = + new Disruptor<>( + EventContainer::new, + TASK_PROCESSOR_RING_BUFFER_SIZE.value(), + REGISTERED_EXECUTOR_SERVICES.get(taskId), + ProducerType.MULTI, + new BlockingWaitStrategy()); + this.sinkProducer = sinkProducer; + } + + @Override + public void createInternal() throws Exception { + final PluginRuntime pluginRuntime = + RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null; + if (pluginRuntime == null) { + throw new IllegalStateException("Plugin runtime is down"); + } + + final long creationTime = System.currentTimeMillis(); + processorConsumers = new ProcessorConsumer[parallelism]; + for (int i = 0; i < parallelism; i++) { + processorConsumers[i] = + new ProcessorConsumer(pluginRuntime.constructProcessor(parameters), sinkProducer); + try { + processorConsumers[i].consumer().validate(new PipeParameterValidator(parameters)); + processorConsumers[i] + .consumer() + .customize( + parameters, + new CollectorProcessorRuntimeConfiguration(taskId, creationTime, parallelism, i)); + } catch (final Exception e) { + try { + processorConsumers[i].consumer().close(); + } catch (final Exception ex) { + LOGGER.warn("Failed to close sink on creation failure", ex); + } + throw e; + } + } + disruptor.handleEventsWithWorkerPool(processorConsumers); + + disruptor.setDefaultExceptionHandler(new ProcessorExceptionHandler()); + } + + @Override + public void startInternal() { + disruptor.start(); + } + + @Override + public void stopInternal() { + disruptor.halt(); + } + + @Override + public void dropInternal() { + if (processorConsumers != null) { + for (int i = 0; i < parallelism; i++) { + try { + processorConsumers[i].consumer().close(); + } catch (final Exception e) { + LOGGER.warn("Failed to close sink", e); + } + } + } + + disruptor.shutdown(); + + final ExecutorService executorService = REGISTERED_EXECUTOR_SERVICES.remove(taskId); + if (executorService != null) { + executorService.shutdown(); + } + } + + public EventCollector makeProducer() { + return new EventCollector(disruptor.getRingBuffer()); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java new file mode 100644 index 0000000..2fe13f0 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.sink; + +import org.apache.iotdb.collector.runtime.task.event.EventContainer; +import org.apache.iotdb.pipe.api.PipeSink; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; + +import com.lmax.disruptor.WorkHandler; + +class SinkConsumer implements WorkHandler { + + private final PipeSink sink; + + SinkConsumer(final PipeSink sink) { + this.sink = sink; + } + + PipeSink consumer() { + return sink; + } + + @Override + public void onEvent(EventContainer eventContainer) throws Exception { + // TODO: retry strategy + final Event event = eventContainer.getEvent(); + if (event instanceof TabletInsertionEvent) { + sink.transfer((TabletInsertionEvent) event); + } else if (event instanceof TsFileInsertionEvent) { + sink.transfer((TsFileInsertionEvent) event); + } else if (event != null) { + sink.transfer(event); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/exception/DisruptorTaskExceptionHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkExceptionHandler.java similarity index 68% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/exception/DisruptorTaskExceptionHandler.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkExceptionHandler.java index 8716d84..d567337 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/exception/DisruptorTaskExceptionHandler.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkExceptionHandler.java @@ -17,27 +17,32 @@ * under the License. */ -package org.apache.iotdb.collector.runtime.task.exception; +package org.apache.iotdb.collector.runtime.task.sink; + +import org.apache.iotdb.collector.runtime.task.event.EventContainer; import com.lmax.disruptor.ExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DisruptorTaskExceptionHandler implements ExceptionHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorTaskExceptionHandler.class); +class SinkExceptionHandler implements ExceptionHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(SinkExceptionHandler.class); @Override - public void handleEventException(Throwable ex, long sequence, Object event) { - LOGGER.error("Event processing failed [seq={}, event={}]", sequence, event, ex); + public void handleEventException(Throwable ex, long sequence, EventContainer event) { + // TODO: heartbeat + // TODO: retry strategy + LOGGER.warn("Failed to handle event", ex); } @Override public void handleOnStartException(Throwable ex) { - LOGGER.error("Failed to start disruptor", ex); + LOGGER.warn("Failed to start sink disruptor", ex); } @Override public void handleOnShutdownException(Throwable ex) { - LOGGER.error("Failed to shutdown disruptor", ex); + LOGGER.warn("Failed to shutdown sink disruptor", ex); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java new file mode 100644 index 0000000..e3e66db --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.sink; + +import org.apache.iotdb.collector.plugin.api.customizer.CollectorSinkRuntimeConfiguration; +import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.task.Task; +import org.apache.iotdb.collector.runtime.task.event.EventCollector; +import org.apache.iotdb.collector.runtime.task.event.EventContainer; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.commons.concurrent.IoTThreadFactory; +import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM; +import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SINK_RING_BUFFER_SIZE; + +public class SinkTask extends Task { + + private static final Logger LOGGER = LoggerFactory.getLogger(SinkTask.class); + + private static final Map REGISTERED_EXECUTOR_SERVICES = + new ConcurrentHashMap<>(); + + private final Disruptor disruptor; + private SinkConsumer[] consumers; + + public SinkTask(final String taskId, final Map attributes) { + super(taskId, attributes, TASK_SINK_PARALLELISM_NUM.key(), TASK_SINK_PARALLELISM_NUM.value()); + + REGISTERED_EXECUTOR_SERVICES.putIfAbsent( + taskId, + new WrappedThreadPoolExecutor( + parallelism, + parallelism, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(parallelism), + new IoTThreadFactory(taskId), // TODO: thread name + taskId)); + + disruptor = + new Disruptor<>( + EventContainer::new, + TASK_SINK_RING_BUFFER_SIZE.value(), + REGISTERED_EXECUTOR_SERVICES.get(taskId), + ProducerType.MULTI, + new BlockingWaitStrategy()); + } + + @Override + public void createInternal() throws Exception { + final PluginRuntime pluginRuntime = + RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null; + if (pluginRuntime == null) { + throw new IllegalStateException("Plugin runtime is down"); + } + + final long creationTime = System.currentTimeMillis(); + consumers = new SinkConsumer[parallelism]; + for (int i = 0; i < parallelism; i++) { + consumers[i] = new SinkConsumer(pluginRuntime.constructSink(parameters)); + try { + consumers[i].consumer().validate(new PipeParameterValidator(parameters)); + consumers[i] + .consumer() + .customize( + parameters, + new CollectorSinkRuntimeConfiguration(taskId, creationTime, parallelism, i)); + consumers[i].consumer().handshake(); + } catch (final Exception e) { + try { + consumers[i].consumer().close(); + } catch (final Exception ex) { + LOGGER.warn("Failed to close sink on creation failure", ex); + } + throw e; + } + } + disruptor.handleEventsWithWorkerPool(consumers); + + disruptor.setDefaultExceptionHandler(new SinkExceptionHandler()); + } + + @Override + public void startInternal() { + disruptor.start(); + } + + @Override + public void stopInternal() { + disruptor.halt(); + } + + @Override + public void dropInternal() { + if (consumers != null) { + for (int i = 0; i < parallelism; i++) { + try { + consumers[i].consumer().close(); + } catch (final Exception e) { + LOGGER.warn("Failed to close sink", e); + } + } + } + + disruptor.shutdown(); + + final ExecutorService executorService = REGISTERED_EXECUTOR_SERVICES.remove(taskId); + if (executorService != null) { + executorService.shutdown(); + } + } + + public EventCollector makeProducer() { + return new EventCollector(disruptor.getRingBuffer()); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java new file mode 100644 index 0000000..ca0c8c4 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.source; + +import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.task.Task; +import org.apache.iotdb.collector.runtime.task.event.EventCollector; +import org.apache.iotdb.collector.runtime.task.source.pull.PullSourceTask; +import org.apache.iotdb.collector.runtime.task.source.push.PushSourceTask; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import java.util.Map; + +import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SOURCE_PARALLELISM_NUM; + +public abstract class SourceTask extends Task { + + protected final EventCollector processorProducer; + + protected SourceTask( + final String taskId, + final Map attributes, + final EventCollector processorProducer) { + super( + taskId, attributes, TASK_SOURCE_PARALLELISM_NUM.key(), TASK_SOURCE_PARALLELISM_NUM.value()); + this.processorProducer = processorProducer; + } + + public static SourceTask construct( + final String taskId, + final Map attributes, + final EventCollector processorProducer) + throws Exception { + final PluginRuntime pluginRuntime = + RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null; + if (pluginRuntime == null) { + throw new IllegalStateException("Plugin runtime is down"); + } + + final PipeParameters parameters = new PipeParameters(attributes); + if (pluginRuntime.isPullSource(parameters)) { + return new PullSourceTask(taskId, attributes, processorProducer); + } + if (pluginRuntime.isPushSource(parameters)) { + return new PushSourceTask(taskId, attributes, processorProducer); + } + throw new IllegalArgumentException("Unsupported source type"); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumerController.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceConsumer.java similarity index 56% rename from iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumerController.java rename to iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceConsumer.java index 7506a09..85baedf 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/datastructure/TaskEventConsumerController.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceConsumer.java @@ -17,29 +17,27 @@ * under the License. */ -package org.apache.iotdb.collector.runtime.task.datastructure; +package org.apache.iotdb.collector.runtime.task.source.pull; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; +import org.apache.iotdb.collector.plugin.api.PullSource; +import org.apache.iotdb.pipe.api.collector.EventCollector; -public class TaskEventConsumerController { +class PullSourceConsumer { - private final AtomicBoolean running = new AtomicBoolean(true); + private final PullSource pullSource; + private final EventCollector eventCollector; - private static final long PARK_NANOS = 100_000_000L; - - public void pause() { - running.set(false); + PullSourceConsumer(final PullSource pullSource, final EventCollector eventCollector) { + this.pullSource = pullSource; + this.eventCollector = eventCollector; } - public void resume() { - running.set(true); + PullSource consumer() { + return pullSource; } - public boolean shouldRun() { - while (!running.get()) { - LockSupport.parkNanos(PARK_NANOS); - } - return running.get(); + public void onScheduler() throws Exception { + // TODO: scheduler strategy + eventCollector.collect(pullSource.supply()); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java new file mode 100644 index 0000000..b47054a --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.source.pull; + +import org.apache.iotdb.collector.plugin.api.PullSource; +import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration; +import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.task.event.EventCollector; +import org.apache.iotdb.collector.runtime.task.source.SourceTask; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.commons.concurrent.IoTThreadFactory; +import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class PullSourceTask extends SourceTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(PullSourceTask.class); + + private static final Map REGISTERED_EXECUTOR_SERVICES = + new ConcurrentHashMap<>(); + + private PullSourceConsumer[] consumers; + + public PullSourceTask( + final String taskId, + final Map attributes, + final EventCollector processorProducer) { + super(taskId, attributes, processorProducer); + } + + @Override + public void createInternal() throws Exception { + final PluginRuntime pluginRuntime = + RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null; + if (pluginRuntime == null) { + throw new IllegalStateException("Plugin runtime is down"); + } + + REGISTERED_EXECUTOR_SERVICES.putIfAbsent( + taskId, + new WrappedThreadPoolExecutor( + parallelism, + parallelism, + 0L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(parallelism), + new IoTThreadFactory(taskId), // TODO: thread name + taskId)); + + final long creationTime = System.currentTimeMillis(); + consumers = new PullSourceConsumer[parallelism]; + for (int i = 0; i < parallelism; i++) { + consumers[i] = + new PullSourceConsumer( + (PullSource) pluginRuntime.constructSource(parameters), processorProducer); + try { + consumers[i].consumer().validate(new PipeParameterValidator(parameters)); + consumers[i] + .consumer() + .customize( + parameters, + new CollectorSourceRuntimeConfiguration(taskId, creationTime, parallelism, i)); + consumers[i].consumer().start(); + } catch (final Exception e) { + try { + consumers[i].consumer().close(); + } catch (final Exception ex) { + LOGGER.warn("Failed to close source on creation failure", ex); + throw e; + } + } + + int finalI = i; + REGISTERED_EXECUTOR_SERVICES + .get(taskId) + .submit( + () -> { + while (!isDropped.get()) { + try { + consumers[finalI].onScheduler(); + } catch (final Exception e) { + LOGGER.warn("Failed to pull source", e); + } + + waitUntilRunningOrDropped(); + } + }); + } + } + + @Override + public void startInternal() { + // do nothing + } + + @Override + public void stopInternal() { + // do nothing + } + + @Override + public void dropInternal() { + if (consumers != null) { + for (int i = 0; i < parallelism; i++) { + try { + consumers[i].consumer().close(); + } catch (final Exception e) { + LOGGER.warn("Failed to close source", e); + } + } + } + + final ExecutorService executorService = REGISTERED_EXECUTOR_SERVICES.remove(taskId); + if (executorService != null) { + executorService.shutdown(); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java new file mode 100644 index 0000000..938f29f --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task.source.push; + +import org.apache.iotdb.collector.plugin.api.PushSource; +import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration; +import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.task.event.EventCollector; +import org.apache.iotdb.collector.runtime.task.source.SourceTask; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class PushSourceTask extends SourceTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(PushSourceTask.class); + + private PushSource[] pushSources; + + public PushSourceTask( + final String taskId, + final Map sourceParams, + final EventCollector processorProducer) { + super(taskId, sourceParams, processorProducer); + } + + @Override + public void createInternal() throws Exception { + final PluginRuntime pluginRuntime = + RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null; + if (pluginRuntime == null) { + throw new IllegalStateException("Plugin runtime is down"); + } + + final long creationTime = System.currentTimeMillis(); + pushSources = new PushSource[parallelism]; + for (int i = 0; i < parallelism; i++) { + pushSources[i] = (PushSource) pluginRuntime.constructSource(parameters); + pushSources[i].setCollector(processorProducer); + + try { + pushSources[i].validate(new PipeParameterValidator(parameters)); + pushSources[i].customize( + parameters, + new CollectorSourceRuntimeConfiguration(taskId, creationTime, parallelism, i)); + pushSources[i].start(); + } catch (final Exception e) { + try { + pushSources[i].close(); + } catch (final Exception ex) { + LOGGER.warn("Failed to close source on creation failure", ex); + throw e; + } + } + } + } + + @Override + public void startInternal() { + // do nothing + } + + @Override + public void stopInternal() { + // do nothing + } + + @Override + public void dropInternal() { + if (pushSources != null) { + for (int i = 0; i < parallelism; i++) { + try { + pushSources[i].close(); + } catch (final Exception e) { + LOGGER.warn("Failed to close source", e); + } + } + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java index 5d15016..001df2d 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/RuntimeService.java @@ -22,29 +22,57 @@ import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; import org.apache.iotdb.collector.runtime.task.TaskRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + public class RuntimeService implements IService { - private static TaskRuntime task; - private static PluginRuntime plugin; + private static final Logger LOGGER = LoggerFactory.getLogger(RuntimeService.class); + + private static final AtomicReference TASK = new AtomicReference<>(); + private static final AtomicReference PLUGIN = new AtomicReference<>(); @Override - public void start() { - plugin = new PluginRuntime(); - task = new TaskRuntime(); + public synchronized void start() { + TASK.set(new TaskRuntime()); + PLUGIN.set(new PluginRuntime()); } - public static TaskRuntime task() { - return task; + public static Optional task() { + return Optional.of(TASK.get()); } - public static PluginRuntime plugin() { - return plugin; + public static Optional plugin() { + return Optional.of(PLUGIN.get()); } @Override - public void stop() { - task = null; - plugin = null; + public synchronized void stop() { + task() + .ifPresent( + taskRuntime -> { + try { + taskRuntime.close(); + } catch (final Exception e) { + LOGGER.warn("[RuntimeService] Failed to close task runtime: {}", e.getMessage(), e); + } + }); + TASK.set(null); + + plugin() + .ifPresent( + pluginRuntime -> { + try { + pluginRuntime.close(); + } catch (final Exception e) { + LOGGER.warn( + "[RuntimeService] Failed to close plugin runtime: {}", e.getMessage(), e); + } + }); + PLUGIN.set(null); } @Override diff --git a/iotdb-collector/collector-core/src/main/resources/application.properties b/iotdb-collector/collector-core/src/main/resources/application.properties index 3c69299..244ed55 100644 --- a/iotdb-collector/collector-core/src/main/resources/application.properties +++ b/iotdb-collector/collector-core/src/main/resources/application.properties @@ -21,36 +21,36 @@ ### API Service Configuration #################### -# jetty service running port -# effectiveMode: first_start -# Datatype: int +# The port on which the Jetty service runs. +# Effective mode: on every start +# Data type: int api_service_port=17070 #################### ### Task Runtime Configuration #################### -# the number of concurrent threads that the source task runs -# effectiveMode: first_start -# Datatype: int +# The number of concurrent threads for the source task. +# Effective mode: on every start +# Data type: int task_source_parallelism_num=4 -# the number of concurrent threads that the process task runs -# effectiveMode: first_start -# Datatype: int +# The number of concurrent threads for the process task. +# Effective mode: on every start +# Data type: int task_process_parallelism_num=4 -# the number of concurrent threads that the sink task runs -# effectiveMode: first_start -# Datatype: int +# The number of concurrent threads for the sink task. +# Effective mode: on every start +# Data type: int task_sink_parallelism_num=4 -# the disruptor ring buffer size of the processor task -# effectiveMode: first_start -# Datatype: int +# The ring buffer size for the processor task. +# Effective mode: on every start +# Data type: int task_processor_ring_buffer_size=1024 -# the disruptor ring buffer size of the sink task -# effectiveMode: first_start -# Datatype: int -task_sink_ring_buffer_size=1024 \ No newline at end of file +# The ring buffer size for the sink task. +# Effective mode: on every start +# Data type: int +task_sink_ring_buffer_size=1024