Skip to content

Commit 65b7740

Browse files
committed
fix
1 parent 1ed399e commit 65b7740

File tree

5 files changed

+79
-23
lines changed

5 files changed

+79
-23
lines changed

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ public class Options {
3434

3535
static {
3636
try {
37-
Class.forName("org.apache.iotdb.collector.config.ApiServiceOptions");
38-
Class.forName("org.apache.iotdb.collector.config.TaskRuntimeOptions");
37+
Class.forName(ApiServiceOptions.class.getName());
38+
Class.forName(TaskRuntimeOptions.class.getName());
3939
} catch (final ClassNotFoundException e) {
40-
throw new RuntimeException("Failed to load ApiServiceOptions", e);
40+
throw new RuntimeException("Failed to load options", e);
4141
}
4242
}
4343

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,14 @@
1919

2020
package org.apache.iotdb.collector.runtime.task;
2121

22-
import org.apache.iotdb.collector.config.TaskRuntimeOptions;
2322
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
2423

25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
27-
2824
import java.util.Map;
2925
import java.util.concurrent.atomic.AtomicBoolean;
3026
import java.util.concurrent.locks.LockSupport;
3127

3228
public abstract class Task {
3329

34-
private static final Logger LOGGER = LoggerFactory.getLogger(Task.class);
35-
3630
protected final String taskId;
3731
protected final PipeParameters parameters;
3832

@@ -42,14 +36,15 @@ public abstract class Task {
4236
protected final AtomicBoolean isRunning = new AtomicBoolean(false);
4337
protected final AtomicBoolean isDropped = new AtomicBoolean(false);
4438

45-
protected Task(final String taskId, final Map<String, String> attributes) {
39+
protected Task(
40+
final String taskId,
41+
final Map<String, String> attributes,
42+
final String parallelismKey,
43+
final int parallelismValue) {
4644
this.taskId = taskId;
4745
this.parameters = new PipeParameters(attributes);
4846

49-
this.parallelism =
50-
parameters.getIntOrDefault(
51-
TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM.key(),
52-
TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM.value());
47+
this.parallelism = parameters.getIntOrDefault(parallelismKey, parallelismValue);
5348
}
5449

5550
public void resume() {

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,32 @@
2525
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
2626
import org.apache.iotdb.collector.runtime.task.event.EventContainer;
2727
import org.apache.iotdb.collector.service.RuntimeService;
28+
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
29+
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
2830
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
2931

3032
import com.lmax.disruptor.BlockingWaitStrategy;
3133
import com.lmax.disruptor.dsl.Disruptor;
3234
import com.lmax.disruptor.dsl.ProducerType;
33-
import com.lmax.disruptor.util.DaemonThreadFactory;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

3738
import java.util.Map;
39+
import java.util.concurrent.ConcurrentHashMap;
40+
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.TimeUnit;
43+
44+
import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_PROCESSOR_RING_BUFFER_SIZE;
45+
import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_PROCESS_PARALLELISM_NUM;
3846

3947
public class ProcessorTask extends Task {
4048

4149
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorTask.class);
4250

51+
private static final Map<String, ExecutorService> REGISTERED_EXECUTOR_SERVICES =
52+
new ConcurrentHashMap<>();
53+
4354
private final Disruptor<EventContainer> disruptor;
4455
private final EventCollector sinkProducer;
4556
private ProcessorConsumer[] processorConsumers;
@@ -48,13 +59,28 @@ public ProcessorTask(
4859
final String taskId,
4960
final Map<String, String> attributes,
5061
final EventCollector sinkProducer) {
51-
super(taskId, attributes);
62+
super(
63+
taskId,
64+
attributes,
65+
TASK_PROCESS_PARALLELISM_NUM.key(),
66+
TASK_PROCESS_PARALLELISM_NUM.value());
67+
68+
REGISTERED_EXECUTOR_SERVICES.putIfAbsent(
69+
taskId,
70+
new WrappedThreadPoolExecutor(
71+
parallelism,
72+
parallelism,
73+
0L,
74+
TimeUnit.SECONDS,
75+
new LinkedBlockingQueue<>(parallelism),
76+
new IoTThreadFactory(taskId), // TODO: thread name
77+
taskId));
5278

5379
disruptor =
5480
new Disruptor<>(
5581
EventContainer::new,
56-
parallelism, // fault usage
57-
DaemonThreadFactory.INSTANCE, // fault usage
82+
TASK_PROCESSOR_RING_BUFFER_SIZE.value(),
83+
REGISTERED_EXECUTOR_SERVICES.get(taskId),
5884
ProducerType.MULTI,
5985
new BlockingWaitStrategy());
6086
this.sinkProducer = sinkProducer;
@@ -117,6 +143,11 @@ public void dropInternal() {
117143
}
118144

119145
disruptor.shutdown();
146+
147+
final ExecutorService executorService = REGISTERED_EXECUTOR_SERVICES.remove(taskId);
148+
if (executorService != null) {
149+
executorService.shutdown();
150+
}
120151
}
121152

122153
public EventCollector makeProducer() {

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,54 @@
2525
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
2626
import org.apache.iotdb.collector.runtime.task.event.EventContainer;
2727
import org.apache.iotdb.collector.service.RuntimeService;
28+
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
29+
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
2830
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
2931

3032
import com.lmax.disruptor.BlockingWaitStrategy;
3133
import com.lmax.disruptor.dsl.Disruptor;
3234
import com.lmax.disruptor.dsl.ProducerType;
33-
import com.lmax.disruptor.util.DaemonThreadFactory;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

3738
import java.util.Map;
39+
import java.util.concurrent.ConcurrentHashMap;
40+
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.TimeUnit;
43+
44+
import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SINK_PARALLELISM_NUM;
45+
import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SINK_RING_BUFFER_SIZE;
3846

3947
public class SinkTask extends Task {
4048

4149
private static final Logger LOGGER = LoggerFactory.getLogger(SinkTask.class);
4250

51+
private static final Map<String, ExecutorService> REGISTERED_EXECUTOR_SERVICES =
52+
new ConcurrentHashMap<>();
53+
4354
private final Disruptor<EventContainer> disruptor;
4455
private SinkConsumer[] consumers;
4556

4657
public SinkTask(final String taskId, final Map<String, String> attributes) {
47-
super(taskId, attributes);
58+
super(taskId, attributes, TASK_SINK_PARALLELISM_NUM.key(), TASK_SINK_PARALLELISM_NUM.value());
59+
60+
REGISTERED_EXECUTOR_SERVICES.putIfAbsent(
61+
taskId,
62+
new WrappedThreadPoolExecutor(
63+
parallelism,
64+
parallelism,
65+
0L,
66+
TimeUnit.SECONDS,
67+
new LinkedBlockingQueue<>(parallelism),
68+
new IoTThreadFactory(taskId), // TODO: thread name
69+
taskId));
4870

4971
disruptor =
5072
new Disruptor<>(
5173
EventContainer::new,
52-
parallelism, // fault usage
53-
DaemonThreadFactory.INSTANCE, // fault usage
74+
TASK_SINK_RING_BUFFER_SIZE.value(),
75+
REGISTERED_EXECUTOR_SERVICES.get(taskId),
5476
ProducerType.MULTI,
5577
new BlockingWaitStrategy());
5678
}
@@ -112,6 +134,11 @@ public void dropInternal() {
112134
}
113135

114136
disruptor.shutdown();
137+
138+
final ExecutorService executorService = REGISTERED_EXECUTOR_SERVICES.remove(taskId);
139+
if (executorService != null) {
140+
executorService.shutdown();
141+
}
115142
}
116143

117144
public EventCollector makeProducer() {

iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
import java.util.Map;
3131

32+
import static org.apache.iotdb.collector.config.TaskRuntimeOptions.TASK_SOURCE_PARALLELISM_NUM;
33+
3234
public abstract class SourceTask extends Task {
3335

3436
protected final EventCollector processorProducer;
@@ -37,7 +39,8 @@ protected SourceTask(
3739
final String taskId,
3840
final Map<String, String> attributes,
3941
final EventCollector processorProducer) {
40-
super(taskId, attributes);
42+
super(
43+
taskId, attributes, TASK_SOURCE_PARALLELISM_NUM.key(), TASK_SOURCE_PARALLELISM_NUM.value());
4144
this.processorProducer = processorProducer;
4245
}
4346

0 commit comments

Comments
 (0)