Skip to content

Commit 318af49

Browse files
committed
[#383] Remove the verifySubscription() function when starting CPS source connector
1 parent 4749491 commit 318af49

File tree

2 files changed

+0
-69
lines changed

2 files changed

+0
-69
lines changed

src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,8 @@
1515
*/
1616
package com.google.pubsub.kafka.source;
1717

18-
import com.google.api.gax.core.CredentialsProvider;
19-
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
20-
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
21-
import com.google.common.annotations.VisibleForTesting;
22-
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
2318
import com.google.pubsub.kafka.common.ConnectorUtils;
2419
import com.google.pubsub.kafka.common.Version;
25-
import com.google.pubsub.v1.GetSubscriptionRequest;
2620
import java.util.ArrayList;
2721
import java.util.Arrays;
2822
import java.util.HashMap;
@@ -33,7 +27,6 @@
3327
import org.apache.kafka.common.config.ConfigDef.Type;
3428
import org.apache.kafka.common.config.ConfigException;
3529
import org.apache.kafka.connect.connector.Task;
36-
import org.apache.kafka.connect.errors.ConnectException;
3730
import org.apache.kafka.connect.source.SourceConnector;
3831
import org.slf4j.Logger;
3932
import org.slf4j.LoggerFactory;
@@ -135,15 +128,6 @@ public String version() {
135128

136129
@Override
137130
public void start(Map<String, String> props) {
138-
// Do a validation of configs here too so that we do not pass null objects to
139-
// verifySubscription().
140-
Map<String, Object> validated = config().parse(props);
141-
String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
142-
String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString();
143-
ConnectorCredentialsProvider credentialsProvider =
144-
ConnectorCredentialsProvider.fromConfig(validated);
145-
146-
verifySubscription(cpsProject, cpsSubscription, credentialsProvider);
147131
this.props = props;
148132
log.info("Started the CloudPubSubSourceConnector");
149133
}
@@ -286,36 +270,6 @@ public ConfigDef config() {
286270
"The Pub/Sub endpoint to use.");
287271
}
288272

289-
/**
290-
* Check whether the user provided Cloud Pub/Sub subscription name specified by {@link
291-
* #CPS_SUBSCRIPTION_CONFIG} exists or not.
292-
*/
293-
@VisibleForTesting
294-
public void verifySubscription(
295-
String cpsProject, String cpsSubscription, CredentialsProvider credentialsProvider) {
296-
try {
297-
SubscriberStubSettings subscriberStubSettings =
298-
SubscriberStubSettings.newBuilder()
299-
.setTransportChannelProvider(
300-
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
301-
.setMaxInboundMessageSize(20 << 20) // 20MB
302-
.build())
303-
.setCredentialsProvider(credentialsProvider)
304-
.build();
305-
GrpcSubscriberStub stub = GrpcSubscriberStub.create(subscriberStubSettings);
306-
GetSubscriptionRequest request =
307-
GetSubscriptionRequest.newBuilder()
308-
.setSubscription(
309-
String.format(
310-
ConnectorUtils.CPS_SUBSCRIPTION_FORMAT, cpsProject, cpsSubscription))
311-
.build();
312-
stub.getSubscriptionCallable().call(request);
313-
} catch (Exception e) {
314-
throw new ConnectException(
315-
"Error verifying the subscription " + cpsSubscription + " for project " + cpsProject, e);
316-
}
317-
}
318-
319273
@Override
320274
public void stop() {}
321275
}

src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnectorTest.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,12 @@
1616
package com.google.pubsub.kafka.source;
1717

1818
import static org.junit.Assert.assertEquals;
19-
import static org.mockito.ArgumentMatchers.any;
20-
import static org.mockito.ArgumentMatchers.anyString;
21-
import static org.mockito.Mockito.doNothing;
22-
import static org.mockito.Mockito.doThrow;
2319
import static org.mockito.Mockito.spy;
2420

25-
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
2621
import com.google.pubsub.kafka.common.ConnectorUtils;
2722
import java.util.HashMap;
2823
import java.util.List;
2924
import java.util.Map;
30-
import org.apache.kafka.common.config.ConfigException;
31-
import org.apache.kafka.connect.errors.ConnectException;
3225
import org.junit.Before;
3326
import org.junit.Test;
3427

@@ -52,24 +45,8 @@ public void setup() {
5245
props.put(CloudPubSubSourceConnector.KAFKA_TOPIC_CONFIG, KAFKA_TOPIC);
5346
}
5447

55-
@Test(expected = ConnectException.class)
56-
public void testStartWhenSubscriptionNonexistant() {
57-
doThrow(new ConnectException(""))
58-
.when(connector)
59-
.verifySubscription(anyString(), anyString(), any(ConnectorCredentialsProvider.class));
60-
connector.start(props);
61-
}
62-
63-
@Test(expected = ConfigException.class)
64-
public void testStartWhenRequiredConfigMissing() {
65-
connector.start(new HashMap<String, String>());
66-
}
67-
6848
@Test
6949
public void testTaskConfigs() {
70-
doNothing()
71-
.when(connector)
72-
.verifySubscription(anyString(), anyString(), any(ConnectorCredentialsProvider.class));
7350
connector.start(props);
7451
List<Map<String, String>> taskConfigs = connector.taskConfigs(NUM_TASKS);
7552
assertEquals(taskConfigs.size(), NUM_TASKS);

0 commit comments

Comments
 (0)