Skip to content

Commit 782edea

Browse files
committed
fix
1 parent cbcf673 commit 782edea

File tree

18 files changed

+95
-105
lines changed

18 files changed

+95
-105
lines changed

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void sendHandshake() {
164164
"Handshake error with target server ip: %s, port: %s, because: %s.",
165165
client.getIpAddress(), client.getPort(), resp.getStatus()));
166166
} else {
167-
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
167+
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
168168
IOT_PRINTER.println(
169169
String.format(
170170
"Handshake success. Target server ip: %s, port: %s",
@@ -301,7 +301,7 @@ private void initClient() {
301301
new ThriftClientProperty.Builder()
302302
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
303303
.setRpcThriftCompressionEnabled(
304-
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
304+
PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled())
305305
.build(),
306306
getEndPoint().getIp(),
307307
getEndPoint().getPort(),

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigNodePluginAgent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public PipeConfigNodePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
3232
}
3333

3434
@Override
35-
protected PipeSourceConstructor createPipeExtractorConstructor(
35+
protected PipeSourceConstructor createPipeSourceConstructor(
3636
PipePluginMetaKeeper pipePluginMetaKeeper) {
3737
return new PipeConfigRegionSourceConstructor();
3838
}
@@ -44,7 +44,7 @@ protected PipeProcessorConstructor createPipeProcessorConstructor(
4444
}
4545

4646
@Override
47-
protected PipeSinkConstructor createPipeConnectorConstructor(
47+
protected PipeSinkConstructor createPipeSinkConstructor(
4848
PipePluginMetaKeeper pipePluginMetaKeeper) {
4949
return new PipeConfigRegionSinkConstructor();
5050
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionPluginAgent.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMetaKeeper;
2828
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
2929
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
30+
import org.apache.iotdb.pipe.api.PipeConnector;
3031
import org.apache.iotdb.pipe.api.PipeExtractor;
3132
import org.apache.iotdb.pipe.api.PipeProcessor;
32-
import org.apache.iotdb.pipe.api.PipeSink;
3333
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
3434
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
3535

@@ -42,7 +42,7 @@ public PipeDataRegionPluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeep
4242
}
4343

4444
@Override
45-
protected PipeSourceConstructor createPipeExtractorConstructor(
45+
protected PipeSourceConstructor createPipeSourceConstructor(
4646
PipePluginMetaKeeper pipePluginMetaKeeper) {
4747
return new PipeDataRegionSourceConstructor((DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
4848
}
@@ -67,9 +67,9 @@ public void validate(
6767
Map<String, String> processorAttributes,
6868
Map<String, String> sinkAttributes)
6969
throws Exception {
70-
PipeExtractor temporaryExtractor = validateExtractor(sourceAttributes);
70+
PipeExtractor temporaryExtractor = validateSource(sourceAttributes);
7171
PipeProcessor temporaryProcessor = validateProcessor(processorAttributes);
72-
PipeSink temporarySink = validateSink(pipeName, sinkAttributes);
72+
PipeConnector temporarySink = validateSink(pipeName, sinkAttributes);
7373

7474
// validate visibility
7575
// TODO: validate visibility for schema region and config region

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionPluginAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public PipeSchemaRegionPluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
3232
}
3333

3434
@Override
35-
protected PipeSourceConstructor createPipeExtractorConstructor(
35+
protected PipeSourceConstructor createPipeSourceConstructor(
3636
PipePluginMetaKeeper pipePluginMetaKeeper) {
3737
return new PipeSchemaRegionSourceConstructor();
3838
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
3737
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
3838
import org.apache.iotdb.metrics.type.Histogram;
39-
import org.apache.iotdb.pipe.api.PipeSink;
39+
import org.apache.iotdb.pipe.api.PipeConnector;
4040
import org.apache.iotdb.pipe.api.event.Event;
4141
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4242
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -72,7 +72,7 @@ public PipeSinkSubtask(
7272
final String attributeSortedString,
7373
final int sinkIndex,
7474
final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
75-
final PipeSink outputPipeSink) {
75+
final PipeConnector outputPipeSink) {
7676
super(taskID, creationTime, outputPipeSink);
7777
this.attributeSortedString = attributeSortedString;
7878
this.sinkIndex = sinkIndex;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
3535
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
3636
import org.apache.iotdb.db.storageengine.StorageEngine;
37-
import org.apache.iotdb.pipe.api.PipeSink;
37+
import org.apache.iotdb.pipe.api.PipeConnector;
3838
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
3939
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
4040
import org.apache.iotdb.pipe.api.event.Event;
@@ -128,7 +128,7 @@ public synchronized String register(
128128
}
129129

130130
for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
131-
final PipeSink pipeSink =
131+
final PipeConnector pipeSink =
132132
isDataRegionSink
133133
? PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
134134
: PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeSinkParameters);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2626
import org.apache.iotdb.db.storageengine.StorageEngine;
2727
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
28+
import org.apache.iotdb.pipe.api.PipeConnector;
2829
import org.apache.iotdb.pipe.api.annotation.TableModel;
2930
import org.apache.iotdb.pipe.api.annotation.TreeModel;
3031
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java renamed to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSinkServer.java

File renamed without changes.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ public SubscriptionTaskSinkStage(
4141

4242
@Override
4343
protected void registerSubtask() {
44-
this.connectorSubtaskId =
44+
this.sinkSubtaskId =
4545
SubscriptionSinkSubtaskManager.instance()
4646
.register(
4747
executor.get(),
48-
pipeConnectorParameters,
48+
pipeSinkParameters,
4949
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, regionId));
5050
}
5151

@@ -56,22 +56,21 @@ public void createSubtask() throws PipeException {
5656

5757
@Override
5858
public void startSubtask() throws PipeException {
59-
SubscriptionSinkSubtaskManager.instance().start(connectorSubtaskId);
59+
SubscriptionSinkSubtaskManager.instance().start(sinkSubtaskId);
6060
}
6161

6262
@Override
6363
public void stopSubtask() throws PipeException {
64-
SubscriptionSinkSubtaskManager.instance().stop(connectorSubtaskId);
64+
SubscriptionSinkSubtaskManager.instance().stop(sinkSubtaskId);
6565
}
6666

6767
@Override
6868
public void dropSubtask() throws PipeException {
6969
SubscriptionSinkSubtaskManager.instance()
70-
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
70+
.deregister(pipeName, creationTime, regionId, sinkSubtaskId);
7171
}
7272

7373
public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
74-
return SubscriptionSinkSubtaskManager.instance()
75-
.getPipeConnectorPendingQueue(connectorSubtaskId);
74+
return SubscriptionSinkSubtaskManager.instance().getPipeConnectorPendingQueue(sinkSubtaskId);
7675
}
7776
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtask.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,13 @@ public SubscriptionSinkSubtask(
4141
final String taskID,
4242
final long creationTime,
4343
final String attributeSortedString,
44-
final int connectorIndex,
44+
final int sinkIndex,
4545
final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
46-
final PipeConnector outputPipeConnector,
46+
final PipeConnector outputPipeSink,
4747
final String topicName,
4848
final String consumerGroupId) {
4949
super(
50-
taskID,
51-
creationTime,
52-
attributeSortedString,
53-
connectorIndex,
54-
inputPendingQueue,
55-
outputPipeConnector);
50+
taskID, creationTime, attributeSortedString, sinkIndex, inputPendingQueue, outputPipeSink);
5651
this.topicName = topicName;
5752
this.consumerGroupId = consumerGroupId;
5853
}

0 commit comments

Comments
 (0)