Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] JWT bearer grant type support #18912

Draft
wants to merge 36 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
defadbe
First pass at refactoring
kirktrue Feb 14, 2025
9f2b079
More refactoring
kirktrue Feb 14, 2025
06119da
Updates
kirktrue Feb 15, 2025
7aceaa5
Update AuthenticateCallbackHandler.java
kirktrue Feb 15, 2025
0d7554b
Update AuthenticateCallbackHandler.java
kirktrue Feb 15, 2025
c410fc3
Update AuthenticateCallbackHandler.java
kirktrue Feb 15, 2025
62d96f7
Update AuthenticateCallbackHandler.java
kirktrue Feb 15, 2025
4102c20
Moving things around more
kirktrue Feb 15, 2025
56ed3a9
Updates
kirktrue Feb 15, 2025
04016ed
More updates
kirktrue Feb 15, 2025
7b04655
More updates
kirktrue Feb 15, 2025
bbebbce
Moved internals back to internals for now
kirktrue Feb 18, 2025
88d187d
Moved more code back to internals
kirktrue Feb 18, 2025
c16eaaf
Fixed refresh tests
kirktrue Feb 19, 2025
3ffbb13
Fixed the remaining broken unit test
kirktrue Feb 19, 2025
8a18ef1
First pass at incorporating Zach's JWT bearer code
kirktrue Feb 19, 2025
9026358
First pass at hooking the JWT bearer retriever into the rest of the code
kirktrue Feb 19, 2025
c58d27e
Reverted FileAccessTokenRetriever name change
kirktrue Feb 19, 2025
4939c8a
Rename to revert to original code
kirktrue Feb 19, 2025
0ff639b
More refactoring
kirktrue Feb 19, 2025
15582d0
Refactoring
kirktrue Feb 19, 2025
58ea79f
Clean up of Javadoc
kirktrue Feb 19, 2025
bb5f1c0
Updated formatting
kirktrue Feb 19, 2025
a88b553
Incorporating jwt-bearer configuration and JAAS options
kirktrue Feb 19, 2025
247a75d
More refactoring
kirktrue Feb 19, 2025
038343a
More refactoring
kirktrue Feb 19, 2025
14c8746
spotlessApply fixups
kirktrue Feb 20, 2025
f0113a1
Fixed out-of-order final static and allowing Jackson annotations
kirktrue Feb 20, 2025
a6db62c
The great refactoring of OAuthCompatibilityTool
kirktrue Feb 20, 2025
a7c31a5
Update AccessTokenRetriever.java
kirktrue Feb 20, 2025
07dfaee
Update ValidatorAccessTokenValidator.java
kirktrue Feb 20, 2025
a65fbc1
Merge branch 'apache:trunk' into KAFKA-18573-add-jwt-bearer-grant-type
kirktrue Feb 21, 2025
df66e1c
Update AccessTokenRetriever.java
kirktrue Feb 21, 2025
cf1abbf
Renamed ValidateException to InvalidJwtException
kirktrue Feb 21, 2025
7f62a08
Minor refactoring of class and method names
kirktrue Feb 21, 2025
9d460e7
Revised structure to support request formatters
kirktrue Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
<allow pkg="javax.crypto" />
</subpackage>
<subpackage name="oauthbearer">
<allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="com.fasterxml.jackson.databind" />
<allow pkg="org.jose4j" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.config.ConfigDef.Range;

import java.util.List;

public class SaslConfigs {

private static final String OAUTHBEARER_NOTE = " Currently applies only to OAUTHBEARER.";
Expand Down Expand Up @@ -129,6 +131,16 @@ public class SaslConfigs {
+ " authentication provider."
+ LOGIN_EXPONENTIAL_BACKOFF_NOTE;











public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "sasl.oauthbearer.scope.claim.name";
public static final String DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "scope";
public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC = "The OAuth claim for the scope is often named \"" + DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME + "\", but this (optional)"
Expand All @@ -141,6 +153,16 @@ public class SaslConfigs {
+ " setting can provide a different name to use for the subject included in the JWT payload's claims if the OAuth/OIDC provider uses a different"
+ " name for that claim.";

public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPE = "sasl.oauthbearer.token.endpoint.grant.type";
public static final String DEFAULT_SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPE = "client_credentials";
public static final List<String> SUPPORTED_SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPES = List.of(
"client_credentials",
"urn:ietf:params:oauth:grant-type:jwt-bearer"
);
public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPE_DOC = "The grant type used when sending the JWT token to the token endpoint. "
+ "This should be set explicitly to determine which token retriever to use. The supported values are "
+ SUPPORTED_SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPES;

public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url";
public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC = "The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer's token"
+ " endpoint URL to which requests will be made to login based on the configuration in " + SASL_JAAS_CONFIG + ". If the URL is file-based, it"
Expand Down Expand Up @@ -217,6 +239,7 @@ public static void addClientSaslSupport(ConfigDef config) {
.define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS, ConfigDef.Type.LONG, DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS, ConfigDef.Importance.LOW, SASL_LOGIN_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, ConfigDef.Type.STRING, DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, ConfigDef.Type.STRING, DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPE, ConfigDef.Type.STRING, DEFAULT_SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPE, ConfigDef.Importance.MEDIUM, SASL_OAUTHBEARER_TOKEN_ENDPOINT_GRANT_TYPE_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.security.oauthbearer;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenRetriever;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenRetrieverFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidatorFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.DefaultAccessTokenRetriever;
import org.apache.kafka.common.security.oauthbearer.internals.secured.DefaultAccessTokenValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.InvalidJwtException;
import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
import org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
Expand All @@ -44,15 +47,20 @@
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.sasl.SaslException;

import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;

/**
* <p>
* <code>OAuthBearerLoginCallbackHandler</code> is an {@link AuthenticateCallbackHandler} that
* accepts {@link OAuthBearerTokenCallback} and {@link SaslExtensionsCallback} callbacks to
* perform the steps to request a JWT from an OAuth/OIDC provider using the
* <code>clientcredentials</code>. This grant type is commonly used for non-interactive
* "service accounts" where there is no user available to interactively supply credentials.
* perform the steps to request a JWT from an OAuth/OIDC provider. The OAuth grant types that
* are supported include:
*
* <ul>
* <li>client_credentials</li>
* <li>jwt-bearer</li>
* </ul>
*
* These grant types are commonly used for non-interactive "service accounts" where there is
* no user available to interactively supply credentials.
* </p>
*
* <p>
Expand Down Expand Up @@ -91,24 +99,40 @@
* </p>
*
* <p>
* The Kafka configuration must also include JAAS configuration which includes the following
* OAuth-specific options:
* The Kafka configuration must also include JAAS configuration which includes OAuth-specific options.
* For <code>client_credentials</code>, use:
*
* <ul>
* <li><code>clientId</code>OAuth client ID (required)</li>
* <li><code>clientSecret</code>OAuth client secret (required)</li>
* <li><code>scope</code>OAuth scope (optional)</li>
* </ul>
*
* For the <code>jwt-bearer</code> grant type, use:
*
* <ul>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* <li><code>XXXXXXXXXXXX</code>XXXXXXXXXXXX</li>
* </ul>
* </p>
*
* <p>
* The JAAS configuration can also include any SSL options that are needed. The configuration
* options are the same as those specified by the configuration in
* {@link org.apache.kafka.common.config.SslConfigs#addClientSslSupport(ConfigDef)}.
* {@link SslConfigs#addClientSslSupport(ConfigDef)}.
* </p>
*
* <p>
* Here's an example of the JAAS configuration for a Kafka client:
* Here's an example of the JAAS configuration for a Kafka client using the
* <code>client_credentials</code> grant type:
*
* <code>
* sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
Expand All @@ -120,15 +144,14 @@
* </p>
*
* <p>
* The configuration option
* {@link org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}
* The configuration option {@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}
* is also required in order for the client to contact the OAuth/OIDC provider. For example:
*
* <code>
* sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
* </code>
*
* Please see the OAuth/OIDC providers documentation for the token endpoint URL.
* Please see the OAuth/OIDC provider's documentation for the token endpoint URL.
* </p>
*
* <p>
Expand All @@ -148,81 +171,55 @@
* </ul>
* </p>
*/

public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler {
public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler, Closeable {

private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);

public static final String CLIENT_ID_CONFIG = "clientId";
public static final String CLIENT_SECRET_CONFIG = "clientSecret";
public static final String SCOPE_CONFIG = "scope";

public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " +
"client ID to uniquely identify the service account to use for authentication for " +
"this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " +
"value and is provided to the OAuth provider using the OAuth " +
"clientcredentials grant type.";

public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " +
"client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " +
"account and identifies the service account to use for authentication for " +
"this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " +
"value and is provided to the OAuth provider using the OAuth " +
"clientcredentials grant type.";

public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " +
"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " +
"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " +
"include with the login request.";

private static final String EXTENSION_PREFIX = "extension_";

private Map<String, Object> moduleOptions;

private AccessTokenRetriever accessTokenRetriever;

private AccessTokenValidator accessTokenValidator;

private boolean isInitialized = false;

protected AccessTokenRetriever accessTokenRetriever;

@Override
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, saslMechanism, moduleOptions);
AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism);
init(accessTokenRetriever, accessTokenValidator);
}

public void init(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator accessTokenValidator) {
this.accessTokenRetriever = accessTokenRetriever;
this.accessTokenValidator = accessTokenValidator;

try {
this.accessTokenRetriever.init();
} catch (IOException e) {
throw new KafkaException("The OAuth login configuration encountered an error when initializing the AccessTokenRetriever", e);
configure(
new DefaultAccessTokenRetriever(),
new DefaultAccessTokenValidator(),
configs,
saslMechanism,
jaasConfigEntries
);
} catch (Throwable t) {
throw new KafkaException("The OAuth login configuration encountered an error during initialization", t);
}

isInitialized = true;
}

/*
* Package-visible for testing.
*/
void configure(AccessTokenRetriever accessTokenRetriever,
AccessTokenValidator accessTokenValidator,
Map<String, ?> configs,
String saslMechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
this.accessTokenRetriever = accessTokenRetriever;
this.accessTokenValidator = accessTokenValidator;

this.accessTokenRetriever.configure(configs, saslMechanism, jaasConfigEntries);
this.accessTokenValidator.configure(configs, saslMechanism, jaasConfigEntries);

AccessTokenRetriever getAccessTokenRetriever() {
return accessTokenRetriever;
this.isInitialized = true;
}

@Override
public void close() {
if (accessTokenRetriever != null) {
try {
this.accessTokenRetriever.close();
} catch (IOException e) {
log.warn("The OAuth login configuration encountered an error when closing the AccessTokenRetriever", e);
}
}
Utils.closeQuietly(accessTokenRetriever, "accessTokenRetriever");
Utils.closeQuietly(accessTokenValidator, "accessTokenValidator");
}

@Override
Expand All @@ -240,20 +237,20 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
}
}

private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
protected void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
checkInitialized();
String accessToken = accessTokenRetriever.retrieve();

try {
OAuthBearerToken token = accessTokenValidator.validate(accessToken);
callback.token(token);
} catch (ValidateException e) {
} catch (InvalidJwtException e) {
log.warn(e.getMessage(), e);
callback.error("invalid_token", e.getMessage(), null);
}
}

private void handleExtensionsCallback(SaslExtensionsCallback callback) {
protected void handleExtensionsCallback(SaslExtensionsCallback callback) {
checkInitialized();

Map<String, String> extensions = new HashMap<>();
Expand Down Expand Up @@ -286,9 +283,8 @@ private void handleExtensionsCallback(SaslExtensionsCallback callback) {
callback.extensions(saslExtensions);
}

private void checkInitialized() {
protected void checkInitialized() {
if (!isInitialized)
throw new IllegalStateException(String.format("To use %s, first call the configure or init method", getClass().getSimpleName()));
throw new IllegalStateException(String.format("To use %s, first call the configure method", getClass().getSimpleName()));
}

}
Loading