Skip to content

Commit 2aa7a18

Browse files
fix: Add alternative property to directly create ServiceAccount (#400)
* fix: Add alternative property to directly create ServiceAccount * chore: remove duplicated key definition * chore: lint * chore: rename sa property; add config to source-connector * chore: Add comments about both GCP Credential properties being set * chore: do not check for both SA json and SA file to be set * chore: improve option documentation --------- Co-authored-by: Diego Alonso Marquez Palacios <[email protected]>
1 parent 7534e69 commit 2aa7a18

File tree

7 files changed

+116
-12
lines changed

7 files changed

+116
-12
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,10 @@ configurations:
179179
| kafka.key.attribute | String | null | The Pub/Sub message attribute to use as a key for messages published to Kafka. If set to "orderingKey", use the message's ordering key. |
180180
| kafka.partition.count | Integer | 1 | The number of Kafka partitions for the Kafka topic in which messages will be published to. NOTE: this parameter is ignored if partition scheme is "kafka_partitioner". |
181181
| kafka.partition.scheme | round_robin, hash_key, hash_value, kafka_partitioner, ordering_key | round_robin | The scheme for assigning a message to a partition in Kafka. The scheme "round_robin" assigns partitions in a round robin fashion, while the schemes "hash_key" and "hash_value" find the partition by hashing the message key and message value respectively. "kafka_partitioner" scheme delegates partitioning logic to Kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not. "ordering_key" uses the hash code of a message's ordering key. If no ordering key is present, uses "round_robin". |
182-
| gcp.credentials.file.path | String | Optional | The filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used.If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
183-
| gcp.credentials.json | String | Optional | GCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
182+
| gcp.credentials.file.path | String | Optional | Use the `gcp.sa.credentials.file.path ` property instead due to a potential security risk. See https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. This method does not validate the credential configuration. The security risk occurs when a credential configuration is accepted from a source that is not under your control and used without validation on your side. |
183+
| gcp.credentials.json | String | Optional | Use the `gcp.sa.credentials.json` property property instead due to a potential security risk. See https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. This method does not validate the credential configuration. The security risk occurs when a credential configuration is accepted from a source that is not under your control and used without validation on your side. |
184+
| gcp.sa.credentials.file.path | String | Optional | The filepath, which stores a GCP Service Account credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
185+
| gcp.sa.credentials.json | String | Optional | GCP Service Account credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value. |
184186
| kafka.record.headers | Boolean | false | Use Kafka record headers to store Pub/Sub message attributes. |
185187
| cps.streamingPull.enabled | Boolean | false | Whether to use streaming pull for the connector to connect to Pub/Sub. If provided, cps.maxBatchSize is ignored. |
186188
| cps.streamingPull.flowControlMessages | Long | 1,000 | The maximum number of outstanding messages per task when using streaming pull. |

src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.api.gax.core.CredentialsProvider;
1919
import com.google.auth.Credentials;
2020
import com.google.auth.oauth2.GoogleCredentials;
21+
import com.google.auth.oauth2.ServiceAccountCredentials;
2122
import java.io.ByteArrayInputStream;
2223
import java.io.FileInputStream;
2324
import java.io.IOException;
@@ -36,6 +37,18 @@ private ConnectorCredentialsProvider(CredentialsProvider impl) {
3637
}
3738

3839
public static ConnectorCredentialsProvider fromConfig(Map<String, Object> config) {
40+
// If both the `GCP_SA_CREDENTIALS_*` and `GCP_CREDENTIALS_FILE_*` properties are set,
41+
// give preference to the `GCP_SA_CREDENTIALS_*` variants.
42+
String credentialsSAPath =
43+
config.get(ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG).toString();
44+
String credentialsSAJson = config.get(ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG).toString();
45+
46+
if (!credentialsSAPath.isEmpty()) {
47+
return ConnectorCredentialsProvider.getServiceAccountFromFile(credentialsSAPath);
48+
} else if (!credentialsSAJson.isEmpty()) {
49+
return ConnectorCredentialsProvider.getServiceAccountFromJson(credentialsSAJson);
50+
}
51+
3952
String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString();
4053
String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString();
4154
if (!credentialsPath.isEmpty()) {
@@ -49,18 +62,49 @@ public static ConnectorCredentialsProvider fromConfig(Map<String, Object> config
4962
return ConnectorCredentialsProvider.fromFile(credentialsPath);
5063
} else if (!credentialsJson.isEmpty()) {
5164
return ConnectorCredentialsProvider.fromJson(credentialsJson);
52-
} else {
53-
return ConnectorCredentialsProvider.fromDefault();
5465
}
66+
67+
return ConnectorCredentialsProvider.fromDefault();
68+
}
69+
70+
public static ConnectorCredentialsProvider getServiceAccountFromFile(String credentialsSAPath) {
71+
return new ConnectorCredentialsProvider(
72+
() ->
73+
ServiceAccountCredentials.fromStream(new FileInputStream(credentialsSAPath))
74+
.createScoped(GCP_SCOPE));
75+
}
76+
77+
public static ConnectorCredentialsProvider getServiceAccountFromJson(String credentialsSAJson) {
78+
return new ConnectorCredentialsProvider(
79+
() ->
80+
ServiceAccountCredentials.fromStream(
81+
new ByteArrayInputStream(credentialsSAJson.getBytes()))
82+
.createScoped(GCP_SCOPE));
5583
}
5684

85+
/**
86+
* Prefer {@link #getServiceAccountFromFile(String)} instead due to a potential security risk. See
87+
* {@see <a
88+
* href="https://cloud.google.com/docs/authentication/external/externally-sourced-credentials">documentation</a>}
89+
* for more details. This method does not validate the credential configuration. The security risk
90+
* occurs when a credential configuration is accepted from a source that is not under your control
91+
* and used without validation on your side.
92+
*/
5793
public static ConnectorCredentialsProvider fromFile(String credentialPath) {
5894
return new ConnectorCredentialsProvider(
5995
() ->
6096
GoogleCredentials.fromStream(new FileInputStream(credentialPath))
6197
.createScoped(GCP_SCOPE));
6298
}
6399

100+
/**
101+
* Prefer {@link #getServiceAccountFromJson(String)} instead due to a potential security risk. See
102+
* {@see <a
103+
* href="https://cloud.google.com/docs/authentication/external/externally-sourced-credentials">documentation</a>}
104+
* for more details. This method does not validate the credential configuration. The security risk
105+
* occurs when a credential configuration is accepted from a source that is not under your control
106+
* and used without validation on your side.
107+
*/
64108
public static ConnectorCredentialsProvider fromJson(String credentialsJson) {
65109
return new ConnectorCredentialsProvider(
66110
() ->

src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class ConnectorUtils {
3333
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
3434
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";
3535
public static final String GCP_CREDENTIALS_JSON_CONFIG = "gcp.credentials.json";
36+
public static final String GCP_SA_CREDENTIALS_FILE_PATH_CONFIG = "gcp.sa.credentials.file.path";
37+
public static final String GCP_SA_CREDENTIALS_JSON_CONFIG = "gcp.sa.credentials.json";
3638
public static final String KAFKA_MESSAGE_CPS_BODY_FIELD = "message";
3739
public static final String KAFKA_TOPIC_ATTRIBUTE = "kafka.topic";
3840
public static final String KAFKA_PARTITION_ATTRIBUTE = "kafka.partition";

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,27 @@ public ConfigDef config() {
239239
Type.STRING,
240240
"",
241241
Importance.HIGH,
242-
"The path to the GCP credentials file")
242+
"Due to a potential security vulnerability, prefer "
243+
+ ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG)
244+
.define(
245+
ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG,
246+
Type.STRING,
247+
"",
248+
Importance.HIGH,
249+
"Path to the GCP Service Account (SA) JSON key file.")
243250
.define(
244251
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
245252
Type.STRING,
246253
"",
247254
Importance.HIGH,
248-
"GCP JSON credentials")
255+
"Due to a potential security vulnerability, prefer "
256+
+ ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG)
257+
.define(
258+
ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG,
259+
Type.STRING,
260+
"",
261+
Importance.HIGH,
262+
"The literal JSON content of the GCP Service Account (SA) key, passed as a string.")
249263
.define(
250264
ORDERING_KEY_SOURCE,
251265
Type.STRING,

src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,27 @@ public ConfigDef config() {
258258
Type.STRING,
259259
"",
260260
Importance.HIGH,
261-
"The path to the GCP credentials file")
261+
"Due to a potential security vulnerability, prefer "
262+
+ ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG)
263+
.define(
264+
ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG,
265+
Type.STRING,
266+
"",
267+
Importance.HIGH,
268+
"Path to the GCP Service Account (SA) JSON key file.")
262269
.define(
263270
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
264271
Type.STRING,
265272
"",
266273
Importance.HIGH,
267-
"GCP JSON credentials")
274+
"Due to a potential security vulnerability, prefer "
275+
+ ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG)
276+
.define(
277+
ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG,
278+
Type.STRING,
279+
"",
280+
Importance.HIGH,
281+
"The literal JSON content of the GCP Service Account (SA) key, passed as a string.")
268282
.define(
269283
USE_KAFKA_HEADERS,
270284
Type.BOOLEAN,

src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,26 @@ static ConfigDef config() {
5656
ConfigDef.Type.STRING,
5757
"",
5858
Importance.HIGH,
59-
"The path to the GCP credentials file")
59+
"Due to a potential security vulnerability, prefer "
60+
+ ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG)
6061
.define(
6162
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
6263
ConfigDef.Type.STRING,
6364
"",
6465
Importance.HIGH,
65-
"GCP JSON credentials");
66+
"Due to a potential security vulnerability, prefer "
67+
+ ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG)
68+
.define(
69+
ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG,
70+
ConfigDef.Type.STRING,
71+
"",
72+
Importance.HIGH,
73+
"Path to the GCP Service Account (SA) JSON key file.")
74+
.define(
75+
ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG,
76+
ConfigDef.Type.STRING,
77+
"",
78+
Importance.HIGH,
79+
"The literal JSON content of the GCP Service Account (SA) key, passed as a string.");
6680
}
6781
}

src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,26 @@ static ConfigDef config() {
7070
ConfigDef.Type.STRING,
7171
"",
7272
Importance.HIGH,
73-
"The path to the GCP credentials file")
73+
"Due to a potential security vulnerability, prefer "
74+
+ ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG)
7475
.define(
7576
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
7677
ConfigDef.Type.STRING,
7778
"",
7879
Importance.HIGH,
79-
"GCP JSON credentials");
80+
"Due to a potential security vulnerability, prefer "
81+
+ ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG)
82+
.define(
83+
ConnectorUtils.GCP_SA_CREDENTIALS_FILE_PATH_CONFIG,
84+
ConfigDef.Type.STRING,
85+
"",
86+
Importance.HIGH,
87+
"Path to the GCP Service Account (SA) JSON key file.")
88+
.define(
89+
ConnectorUtils.GCP_SA_CREDENTIALS_JSON_CONFIG,
90+
ConfigDef.Type.STRING,
91+
"",
92+
Importance.HIGH,
93+
"The literal JSON content of the GCP Service Account (SA) key, passed as a string.");
8094
}
8195
}

0 commit comments

Comments
 (0)