Skip to content

Commit 5331a75

Browse files
feat: adds support for integration with Pub/Sub emulator
1 parent 4749491 commit 5331a75

File tree

5 files changed

+73
-28
lines changed

5 files changed

+73
-28
lines changed

README.md

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -191,27 +191,28 @@ configurations:
191191

192192
#### Sink Connector
193193

194-
| Config | Value Range | Default | Description |
195-
|----------------------------|-------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
196-
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
197-
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
198-
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
199-
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
200-
| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
201-
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
202-
| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
203-
| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. |
204-
| maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. |
205-
| maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. |
206-
| maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. |
207-
| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
208-
| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
209-
| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. |
210-
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
211-
| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
212-
| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
213-
| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
214-
| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress.
194+
| Config | Value Range | Default | Description |
195+
|-----------------------------|-------------------------------|-----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
196+
| cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". |
197+
| cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. |
198+
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
199+
| cps.host | String | "pubsub" | Set it to `emulator` to use Pub/Sub emulator. |
200+
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
201+
| maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. |
202+
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
203+
| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. |
204+
| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub. |
205+
| maxRequestTimeoutMs | Integer | 10,000 | The timeout for individual publish requests to Pub/Sub. |
206+
| maxTotalTimeoutMs | Integer | 60,000 | The total timeout for a call to publish (including retries) to Pub/Sub. |
207+
| maxShutdownTimeoutMs | Integer | 60,000 | The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect. |
208+
| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. |
209+
| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
210+
| metadata.publish | Boolean | false | When true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub. |
211+
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Pub/Sub. |
212+
| orderingKeySource | String (none, key, partition) | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions. |
213+
| messageBodyName | String | "cps_message_body" | When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body. |
214+
| enableCompression | Boolean | false | When true, enable [publish-side compression](https://cloud.google.com/pubsub/docs/publisher#compressing) in order to save on networking costs between Kafka Connect and Cloud Pub/Sub. |
215+
| compressionBytesThreshold | Long | 240 | When enableCompression is true, the minimum size of publish request (in bytes) to compress.
215216

216217
### Pub/Sub Lite connector configs
217218

src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public class ConnectorUtils {
2929
public static final String CPS_TOPIC_CONFIG = "cps.topic";
3030
public static final String CPS_ENDPOINT = "cps.endpoint";
3131
public static final String CPS_DEFAULT_ENDPOINT = "pubsub.googleapis.com:443";
32+
public static final String CPS_HOST = "cps.host";
33+
public static final String CPS_DEFAULT_HOST = "pubsub";
3234
public static final String CPS_MESSAGE_KEY_ATTRIBUTE = "key";
3335
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
3436
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,13 @@ public ConfigDef config() {
273273
Type.STRING,
274274
ConnectorUtils.CPS_DEFAULT_ENDPOINT,
275275
Importance.LOW,
276-
"The Pub/Sub endpoint to use.");
276+
"The Pub/Sub endpoint to use.")
277+
.define(
278+
ConnectorUtils.CPS_HOST,
279+
Type.STRING,
280+
ConnectorUtils.CPS_DEFAULT_HOST,
281+
Importance.LOW,
282+
"The Pub/Sub cps host to use.");
277283
}
278284

279285
@Override

0 commit comments

Comments
 (0)