Skip to content

Connector cps.endpoint config does not target emulator #290

@Axelcouty

Description

@Axelcouty

Hi,

I'm trying to use this connector locally against the pubsub emulator. google-cloud-cli:454.0.0-emulators

I am able to create a topic and a subscription with java client andhile I could make the connector work with existing topic & subscription for my true, existing gcloud project I'm not able to do so with the emulator.

Both the emulator & kafka-connect are started as part of a docker-compose:

  • kafka-connect
    • version: 6.2.1
    • connector version: 1.2.0
    • notable environment variables
      • PUBSUB_EMULATOR_HOST: pubsub:8085
      • CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB: http(s):pubsub:8085 (I tried several)
  • Emulator
    • version: 454.0.0-emulators
    • stat cmd: gcloud beta emulators pubsub start --host-port 0.0.0.0:8085

I tried with this source connector example:

{
  "cps.subscription": "vmchIIQhNG",
  "metadata.publish": "false",
  "name": "test-connector-5800cc7b-a616-4bec-a7b5-b4d5128dbc3b",
  "kafka.partition.count": "1",
  "cps.endpoint": "pubsub:8085",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "cps.project": "my-project",
  "kafka.topic": "test-363210ef-c078-462b-8c46-bc9213a03e2e",
  "kafka.record.headers": "true",
  "headers.publish": "true",
  "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
}

When I have a look at the connector's state I see the following exception raised:

rg.apache.kafka.connect.errors.ConnectException: Error verifying the subscription vmchIIQhNG for project eixample-dev
	at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:314)
	at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:146)
	at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
	at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
	at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
	at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
	at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
	at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=vmchIIQhNG).
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	... 3 more
	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
		at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:312)
		at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:146)
		at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
		at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
		at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
		at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
		at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
		at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
		at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
		... 3 more
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=vmchIIQhNG).
	at io.grpc.Status.asRuntimeException(Status.java:539)
	... 17 more

It looks to me I'm not able to understand what are the required configuration to be able to hit the local emulator from kafka connect.


Additional infos :

My use case is simply to be able to write tests locally when working on SMT development.

Docker compose, pubsub part:

  pubsub:
    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:454.0.0-emulators
    command: 
      - bash
      - -c
      - "gcloud beta emulators pubsub start --host-port 0.0.0.0:8085"
    ports:
      - "8085:8085"

Thanks for your help.

Metadata

Metadata

Assignees

Labels

type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions