Skip to content

Commit 208bb1f

Browse files
authored
MINOR: Add integration test for plugin aliases (#16621)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent f09ead1 commit 208bb1f

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java

+56
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.kafka.connect.sink.SinkTask;
3838
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
3939
import org.apache.kafka.connect.storage.StringConverter;
40+
import org.apache.kafka.connect.transforms.Filter;
41+
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
4042
import org.apache.kafka.connect.util.ConnectorTaskId;
4143
import org.apache.kafka.connect.util.SinkUtils;
4244
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@@ -80,9 +82,12 @@
8082
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
8183
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
8284
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
85+
import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
8386
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
87+
import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
8488
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
8589
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG;
90+
import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
8691
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
8792
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
8893
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
@@ -1375,6 +1380,57 @@ public void testRuntimePropertyReconfiguration() throws Exception {
13751380
);
13761381
}
13771382

1383+
@Test
1384+
public void testPluginAliases() throws Exception {
1385+
connect = connectBuilder.build();
1386+
// start the clusters
1387+
connect.start();
1388+
1389+
// Create a topic; not strictly necessary but prevents log spam when we start a source connector later
1390+
final String topic = "kafka17150";
1391+
connect.kafka().createTopic(topic, 1);
1392+
1393+
Map<String, String> baseConnectorConfig = new HashMap<>();
1394+
// General connector properties
1395+
baseConnectorConfig.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS));
1396+
// Aliased converter classes
1397+
baseConnectorConfig.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getSimpleName());
1398+
baseConnectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getSimpleName());
1399+
baseConnectorConfig.put(HEADER_CONVERTER_CLASS_CONFIG, StringConverter.class.getSimpleName());
1400+
// Aliased SMT and predicate classes
1401+
baseConnectorConfig.put(TRANSFORMS_CONFIG, "filter");
1402+
baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.type", Filter.class.getSimpleName());
1403+
baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.predicate", "tombstone");
1404+
baseConnectorConfig.put(PREDICATES_CONFIG, "tombstone");
1405+
baseConnectorConfig.put(PREDICATES_CONFIG + ".tombstone.type", RecordIsTombstone.class.getSimpleName());
1406+
1407+
// Test a source connector
1408+
final String sourceConnectorName = "plugins-alias-test-source";
1409+
Map<String, String> sourceConnectorConfig = new HashMap<>(baseConnectorConfig);
1410+
// Aliased source connector class
1411+
sourceConnectorConfig.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
1412+
// Connector-specific properties
1413+
sourceConnectorConfig.put(TOPIC_CONFIG, topic);
1414+
sourceConnectorConfig.put("throughput", "10");
1415+
sourceConnectorConfig.put("messages.per.poll", String.valueOf(MESSAGES_PER_POLL));
1416+
// Create the connector and ensure it and its tasks can start
1417+
connect.configureConnector(sourceConnectorName, sourceConnectorConfig);
1418+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sourceConnectorName, NUM_TASKS, "Connector and tasks did not start in time");
1419+
connect.deleteConnector(sourceConnectorName);
1420+
1421+
// Test a sink connector
1422+
final String sinkConnectorName = "plugins-alias-test-sink";
1423+
Map<String, String> sinkConnectorConfig = new HashMap<>(baseConnectorConfig);
1424+
// Aliased sink connector class
1425+
sinkConnectorConfig.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
1426+
// Connector-specific properties
1427+
sinkConnectorConfig.put(TOPICS_CONFIG, topic);
1428+
// Create the connector and ensure it and its tasks can start
1429+
connect.configureConnector(sinkConnectorName, sinkConnectorConfig);
1430+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sinkConnectorName, NUM_TASKS, "Connector and tasks did not start in time");
1431+
connect.deleteConnector(sinkConnectorName);
1432+
}
1433+
13781434
private Map<String, String> defaultSourceConnectorProps(String topic) {
13791435
// setup props for the source connector
13801436
Map<String, String> props = new HashMap<>();

0 commit comments

Comments
 (0)