From a7f26889cd8a2a5f0027d087a0eb0891181ffa2d Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Mon, 9 Sep 2024 10:19:34 +0800 Subject: [PATCH] Implement AuthenticationProviderMTls (#1441) --- .../oidc/broker/common/OIDCConstants.java | 25 ++ .../oidc/broker/common/OIDCPoolResources.java | 98 ++++++ .../oidc/broker/common/package-info.java | 15 + .../oidc/broker/common/pojo/Pool.java | 49 +++ .../oidc/broker/common/utils/Paths.java | 32 ++ .../AuthenticationProviderMTls.java | 322 ++++++++++++++++++ .../mqtt/identitypool/ExpressionCompiler.java | 83 +++++ .../mqtt/identitypool/package-info.java | 14 + .../AuthenticationProviderMTlsTest.java | 244 +++++++++++++ .../src/test/resources/mtls/cel-test.txt | 1 + .../src/test/resources/mtls/client-cert.pem | 72 ++++ mqtt-impl/src/test/resources/mtls/client.cer | 18 + pom.xml | 9 + 13 files changed, 982 insertions(+) create mode 100644 mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCConstants.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCPoolResources.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/package-info.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/pojo/Pool.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/utils/Paths.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java create mode 100644 mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTlsTest.java create mode 100644 mqtt-impl/src/test/resources/mtls/cel-test.txt create mode 100644 mqtt-impl/src/test/resources/mtls/client-cert.pem create mode 100644 mqtt-impl/src/test/resources/mtls/client.cer diff --git a/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCConstants.java b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCConstants.java new file mode 100644 index 000000000..18280f727 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCConstants.java @@ -0,0 +1,25 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oidc.broker.common; + +/** + * Constant values related to Apache Pulsar broker OIDC options. + */ +public final class OIDCConstants { + + /** + * Timeout value, in seconds, for metadata resource synchronization operations. + */ + public static final int RESOURCE_SYNC_OPERATION_TIMEOUT_SEC = 30; +} diff --git a/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCPoolResources.java b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCPoolResources.java new file mode 100644 index 000000000..43355d725 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/OIDCPoolResources.java @@ -0,0 +1,98 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oidc.broker.common; + +import static io.streamnative.oidc.broker.common.OIDCConstants.RESOURCE_SYNC_OPERATION_TIMEOUT_SEC; +import com.fasterxml.jackson.core.type.TypeReference; +import io.streamnative.oidc.broker.common.pojo.Pool; +import io.streamnative.oidc.broker.common.utils.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import javax.validation.constraints.NotNull; +import org.apache.pulsar.broker.resources.BaseResources; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreException; + + +@SuppressWarnings("UnstableApiUsage") +public final class OIDCPoolResources extends BaseResources { + private static final String BASE_PATH = "/sn-oidc/pools"; + + public OIDCPoolResources(@NotNull MetadataStore metadataStore) { + super(metadataStore, new TypeReference<>() { }, RESOURCE_SYNC_OPERATION_TIMEOUT_SEC); + } + + public @NotNull Optional getPool(@NotNull String poolName) throws MetadataStoreException { + return get(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName))); + } + + public @NotNull CompletableFuture> getPoolAsync(@NotNull String poolName) { + return getAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName))); + } + + public void createPool(@NotNull Pool pool) throws MetadataStoreException { + create(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), pool); + } + + public @NotNull CompletableFuture createPoolAsync(@NotNull Pool pool) { + return createAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), pool); + } + + public @NotNull CompletableFuture existsAsync(@NotNull String poolName) { + return super.existsAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName))); + } + + public void deletePool(@NotNull String poolName) throws MetadataStoreException { + super.delete(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName))); + } + + public @NotNull CompletableFuture deletePoolAsync(@NotNull String poolName) { + return super.deleteIfExistsAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(poolName))); + } + + public @NotNull CompletableFuture updatePoolAsync(@NotNull Pool pool) { + return super.setAsync(joinPath(BASE_PATH, Paths.getUrlEncodedPath(pool.name())), __ -> pool); + } + + public @NotNull CompletableFuture> listPoolNamesAsync() { + return super.getChildrenAsync(joinPath(BASE_PATH)); + } + + public @NotNull CompletableFuture> listPoolsAsync() { + return super.getChildrenAsync(joinPath(BASE_PATH)) + .thenCompose(poolNames -> { + List>> pools = new ArrayList<>(); + for (String name : poolNames) { + pools.add(getAsync(joinPath(BASE_PATH, name))); + } + return FutureUtil.waitForAll(pools) + .thenApply(__ -> pools.stream().map(f -> f.join()) + .filter(f -> f.isPresent()) + .map(f -> f.get()) + .collect(Collectors.toList())); + }); + } + + public static boolean pathIsFromPool(String path) { + return path.startsWith(BASE_PATH + "/"); + } + + public static String poolFromPath(String path) { + return path.substring(BASE_PATH.length() + 1); + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/package-info.java b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/package-info.java new file mode 100644 index 000000000..a633c9ac5 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/package-info.java @@ -0,0 +1,15 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Copy from sn-pulsar-plugins, only used to compile stage +package io.streamnative.oidc.broker.common; \ No newline at end of file diff --git a/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/pojo/Pool.java b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/pojo/Pool.java new file mode 100644 index 000000000..f51246dec --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/pojo/Pool.java @@ -0,0 +1,49 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oidc.broker.common.pojo; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; +import javax.validation.constraints.NotNull; + +public record Pool(@JsonProperty(value = "name", required = true) @NotNull String name, + @JsonProperty(value = "auth_type", defaultValue = AUTH_TYPE_TOKEN) @NotNull String authType, + @JsonProperty(value = "description", required = true) @NotNull String description, + @JsonProperty(value = "provider_name") @NotNull String providerName, + @JsonProperty(value = "expression", required = true) @NotNull String expression) { + + public static final String AUTH_TYPE_TOKEN = "token"; + public static final String AUTH_TYPE_MTLS = "mtls"; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Pool pool = (Pool) o; + return Objects.equals(name, pool.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + public String authType() { + return (authType == null || authType.isEmpty()) ? AUTH_TYPE_TOKEN : authType; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/utils/Paths.java b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/utils/Paths.java new file mode 100644 index 000000000..79ca57741 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/oidc/broker/common/utils/Paths.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oidc.broker.common.utils; + +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import javax.validation.constraints.NotNull; +import lombok.experimental.UtilityClass; + +@UtilityClass +public final class Paths { + + public String getUrlEncodedPath(@NotNull String name) { + return URLEncoder.encode(name, StandardCharsets.UTF_8); + } + + public String getUrlDecodedPath(@NotNull String name) { + return URLDecoder.decode(name, StandardCharsets.UTF_8); + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java new file mode 100644 index 000000000..906f9d8d6 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java @@ -0,0 +1,322 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.identitypool; + + +import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN; +import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS; +import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN; +import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SHA1; +import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SNID; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import io.streamnative.oidc.broker.common.OIDCPoolResources; +import io.streamnative.oidc.broker.common.pojo.Pool; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.naming.AuthenticationException; +import javax.naming.InvalidNameException; +import javax.naming.ldap.LdapName; +import javax.naming.ldap.Rdn; +import javax.validation.constraints.NotNull; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; + +@Slf4j +public class AuthenticationProviderMTls implements AuthenticationProvider { + static final char PRINCIPAL_JOINER = (char) 27; + static final String AUTH_METHOD_MTLS = "mtls"; + + private MetadataStore metadataStore; + @Getter + @VisibleForTesting + private OIDCPoolResources poolResources; + + @Getter + @VisibleForTesting + private final ConcurrentHashMap poolMap = new ConcurrentHashMap<>(); + private boolean needCloseMetaData = false; + + private enum ErrorCode { + UNKNOWN, + INVALID_CERTS, + INVALID_DN, + INVALID_SAN, + NO_MATCH_POOL; + + ErrorCode() { + } + } + + @Override + public void initialize(ServiceConfiguration config) throws IOException { + this.metadataStore = createLocalMetadataStore(config); + this.needCloseMetaData = true; + this.metadataStore.registerListener(this::handleMetadataChanges); + this.poolResources = new OIDCPoolResources(metadataStore); + this.loadAsync(); + } + + public void initialize(MetadataStore metadataStore) { + this.metadataStore = metadataStore; + this.metadataStore.registerListener(this::handleMetadataChanges); + this.poolResources = new OIDCPoolResources(metadataStore); + this.loadAsync(); + } + + public MetadataStoreExtended createLocalMetadataStore(ServiceConfiguration config) throws + MetadataStoreException { + return MetadataStoreExtended.create(config.getMetadataStoreUrl(), + MetadataStoreConfig.builder() + .sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis()) + .allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations()) + .configFilePath(config.getMetadataStoreConfigPath()) + .batchingEnabled(config.isMetadataStoreBatchingEnabled()) + .batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis()) + .batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations()) + .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb()) + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) + .build()); + } + + private CompletableFuture loadAsync() { + return loadPoolsAsync().thenAccept(__ -> log.info("loaded streamnative mtls authentication identitypools")); + } + + private CompletableFuture loadPoolsAsync() { + return doLoadPools(poolResources.listPoolsAsync()); + } + + private CompletableFuture doLoadPools(CompletableFuture> future) { + return future.thenApply(pools -> { + pools.stream().forEach(p -> { + if (p.authType().equals(AUTH_METHOD_MTLS)) { + try { + poolMap.put(p.name(), new ExpressionCompiler(p.expression())); + } catch (Exception e) { + log.error("Failed to compile expression for pool: {}, expression: {}", + p.name(), p.expression(), e); + } + } + }); + + return poolMap.keySet(); + }).thenAccept(poolsNames -> log.info("refreshed mTls identity pools, pools: {}", poolsNames)) + .exceptionally(ex -> { + log.error("load pool error", ex); + return null; + }); + } + + private CompletableFuture loadPoolAsync(String pool) { + final CompletableFuture> pools = poolResources.getPoolAsync(pool).thenCompose(opt -> { + if (opt.isEmpty()) { + return CompletableFuture.completedFuture(new ArrayList<>()); + } + return CompletableFuture.completedFuture(List.of(opt.get())); + }); + return doLoadPools(pools); + } + + private void handleMetadataChanges(Notification n) { + if (OIDCPoolResources.pathIsFromPool(n.getPath())) { + log.info("pool-handleMetadataChanges : {}", n); + handlePoolAsync(n); + } + } + + private void handlePoolAsync(Notification n) { + String pool = OIDCPoolResources.poolFromPath(n.getPath()); + if (NotificationType.Created == n.getType() || NotificationType.Modified == n.getType()) { + loadPoolAsync(pool); + } else if (NotificationType.Deleted == n.getType()) { + deletePool(pool); + } + } + + private void deletePool(String pool) { + poolMap.remove(pool); + } + + @Override + public String getAuthMethodName() { + return AUTH_METHOD_MTLS; + } + + @Override + public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + ErrorCode errorCode = ErrorCode.UNKNOWN; + + try { + if (!authData.hasDataFromTls()) { + errorCode = ErrorCode.INVALID_CERTS; + throw new AuthenticationException("Failed to get TLS certificates from client"); + } + + Certificate[] certs = authData.getTlsCertificates(); + if (null == certs || certs.length == 0) { + errorCode = ErrorCode.INVALID_CERTS; + throw new AuthenticationException("Failed to get TLS certificates from client"); + } + + // Parse CEL params form client certificate, refer to: + // https://docs.confluent.io/cloud/current/security/authenticate/ + // workload-identities/identity-providers/mtls/cel-filters.html + final X509Certificate certificate = (X509Certificate) certs[0]; + + // parse DN + Map params; + try { + String subject = certificate.getSubjectX500Principal().getName(); + params = parseDN(subject); + } catch (Exception e) { + errorCode = ErrorCode.INVALID_DN; + throw new AuthenticationException("Failed to parse the DN from the client certificate"); + } + + // parse SAN + parseSAN(certificate, params); + // get SNID + params.put(SNID, certificate.getSerialNumber().toString(16).toUpperCase()); + // parse SHA1 + params.put(SHA1, parseSHA1FingerPrint(certificate)); + + String principal = matchPool(params); + if (principal.isEmpty()) { + errorCode = ErrorCode.NO_MATCH_POOL; + throw new AuthenticationException("No matched identity pool from the client certificate"); + } + AuthenticationMetrics.authenticateSuccess(this.getClass().getSimpleName(), this.getAuthMethodName()); + return principal; + } catch (AuthenticationException e) { + this.incrementFailureMetric(errorCode); + throw e; + } + } + + public String matchPool(Map params) throws AuthenticationException { + List principals = new ArrayList<>(); + poolMap.forEach((poolName, compiler) -> { + Boolean matched = false; + try { + matched = compiler.eval(params); + } catch (Exception e) { + log.warn("Failed to evaluate expression, eval : {} value : {}", compiler.getExpression(), params, e); + } + if (matched) { + principals.add(poolName); + } + }); + return Joiner.on(PRINCIPAL_JOINER).join(principals); + } + + + @Override + public void close() throws IOException { + // no op + if (needCloseMetaData) { + try { + metadataStore.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + static String parseSHA1FingerPrint(X509Certificate certificate) { + try { + byte[] certBytes = certificate.getEncoded(); + + MessageDigest md = MessageDigest.getInstance("SHA-1"); + byte[] sha1hash = md.digest(certBytes); + + return Hex.encodeHexString(sha1hash, false); + } catch (Exception e) { + log.error("Failed to parse the SHA-1 fingerprint from the client certificate", e); + return ""; + } + } + + static Map parseDN(String dn) throws InvalidNameException { + Map params = new HashMap<>(); + if (StringUtils.isEmpty(dn)) { + return params; + } + params.put(DN, dn); + LdapName ldapName = new LdapName(dn); + for (Rdn rdn : ldapName.getRdns()) { + String rdnType = rdn.getType().toUpperCase(); + if (DN_KEYS.contains(rdnType)) { + String value = Rdn.escapeValue(rdn.getValue()); + value = value.replace("\r", "\\0D"); + value = value.replace("\n", "\\0A"); + params.put(rdnType, value); + } + } + + return params; + } + + static void parseSAN(X509Certificate certificate, @NotNull Map map) { + try { + Collection> subjectAlternativeNames = certificate.getSubjectAlternativeNames(); + if (subjectAlternativeNames != null) { + List formattedSANList = subjectAlternativeNames.stream() + .map(list -> getSanName((int) list.get(0)) + ":" + list.get(1)) + .collect(Collectors.toList()); + String formattedSAN = String.join(",", formattedSANList); + map.put(SAN, formattedSAN); + } + } catch (Exception e) { + log.error("Failed to parse the SAN from the client certificate, skip parse SAN", e); + } + } + + private static String getSanName(int type) { + return switch (type) { + case 0 -> "OTHERNAME"; + case 1 -> "EMAIL"; + case 2 -> "DNS"; + case 3 -> "X400"; + case 4 -> "DIR"; + case 5 -> "EDIPARTY"; + case 6 -> "URI"; + case 7 -> "IP"; + case 8 -> "RID"; + default -> "OTHERNAME"; + }; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java new file mode 100644 index 000000000..8f6fd6c7f --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java @@ -0,0 +1,83 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.identitypool; + + +import dev.cel.common.CelAbstractSyntaxTree; +import dev.cel.common.CelValidationException; +import dev.cel.common.types.SimpleType; +import dev.cel.compiler.CelCompiler; +import dev.cel.compiler.CelCompilerBuilder; +import dev.cel.compiler.CelCompilerFactory; +import dev.cel.parser.CelStandardMacro; +import dev.cel.runtime.CelEvaluationException; +import dev.cel.runtime.CelRuntime; +import dev.cel.runtime.CelRuntimeFactory; +import java.util.Map; +import java.util.Set; +import lombok.Getter; + +public class ExpressionCompiler { + + static final Set DN_KEYS = Set.of("DC", "CN", "OU", "O", "L", "ST", "C", "UID"); + static final String DN = "DN"; + static final String SAN = "SAN"; + static final String SNID = "SNID"; + static final String SHA1 = "SHA1"; + static final CelCompiler COMPILER; + + static { + CelCompilerBuilder celCompilerBuilder = CelCompilerFactory.standardCelCompilerBuilder() + .setStandardMacros(CelStandardMacro.STANDARD_MACROS); + celCompilerBuilder.addVar(DN, SimpleType.STRING); + for (String key : DN_KEYS) { + celCompilerBuilder.addVar(key, SimpleType.STRING); + } + celCompilerBuilder.addVar(SAN, SimpleType.STRING); + celCompilerBuilder.addVar(SNID, SimpleType.STRING); + celCompilerBuilder.addVar(SHA1, SimpleType.STRING); + COMPILER = celCompilerBuilder.build(); + } + + final CelRuntime runtime = CelRuntimeFactory.standardCelRuntimeBuilder().build(); + + @Getter + private final String expression; + + private CelAbstractSyntaxTree ast; + + private CelRuntime.Program program; + + public ExpressionCompiler(String expression) { + this.expression = expression; + try { + this.compile(); + } catch (CelValidationException | CelEvaluationException ex) { + throw new IllegalArgumentException(ex); + } + } + + private void compile() throws CelValidationException, CelEvaluationException { + this.ast = COMPILER.compile(expression).getAst(); + this.program = runtime.createProgram(ast); + } + + public Boolean eval(Map mapValue) throws Exception { + final Object eval = program.eval(mapValue); + if (eval instanceof Boolean) { + return (Boolean) eval; + } + return false; + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java new file mode 100644 index 000000000..34f624264 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java @@ -0,0 +1,14 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.identitypool; \ No newline at end of file diff --git a/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTlsTest.java b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTlsTest.java new file mode 100644 index 000000000..133f7e47d --- /dev/null +++ b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTlsTest.java @@ -0,0 +1,244 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.identitypool; + +import io.netty.channel.local.LocalAddress; +import io.streamnative.oidc.broker.common.pojo.Pool; +import java.io.File; +import java.net.URL; +import java.security.Principal; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.Map; +import java.util.Set; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSessionContext; +import org.apache.commons.io.FileUtils; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; +import org.apache.pulsar.common.util.SecurityUtility; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class AuthenticationProviderMTlsTest { + + private static final String SUPER_USER = "superUser"; + private static final String CLUSTER = "mtls-test"; + + private ServiceConfiguration serviceConfiguration; + private LocalMemoryMetadataStore metadataStore; + + @SuppressWarnings("UnstableApiUsage") + @BeforeClass + public void setup() throws Exception { + this.metadataStore = new LocalMemoryMetadataStore("memory:local", MetadataStoreConfig.builder().build()); + this.serviceConfiguration = new ServiceConfiguration(); + this.serviceConfiguration.setClusterName(CLUSTER); + this.serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER)); + this.serviceConfiguration.setMetadataStoreUrl("memory:local"); + } + + @AfterClass + public void cleanup() throws Exception { + this.metadataStore.close(); + } + + @DataProvider + public Object[][] reuseMetadata() { + return new Object[][] { + {true}, + {false} + }; + } + + @Test + public void testExpression() throws Exception { + String dn = FileUtils.readFileToString(new File(getResourcePath("mtls/cel-test.txt")), "UTF-8"); + + Map params = AuthenticationProviderMTls.parseDN(dn); + + ExpressionCompiler compiler = new ExpressionCompiler("DN.contains(\"CN=streamnative.io\")"); + Boolean eval = compiler.eval(params); + Assert.assertTrue(eval); + + compiler = new ExpressionCompiler("O.contains(\"StreamNative\")"); + eval = compiler.eval(params); + Assert.assertTrue(eval); + + compiler = new ExpressionCompiler("O==r\"StreamNative\\, Inc.\""); + eval = compiler.eval(params); + Assert.assertTrue(eval); + + System.out.println(params.get("ST")); + compiler = new ExpressionCompiler("ST==r\"California\\0DAfter\""); + eval = compiler.eval(params); + Assert.assertTrue(eval); + } + + @Test(dataProvider = "reuseMetadata") + public void testAuthenticationProviderMTls(boolean reuseMetadata) throws Exception { + AuthenticationProviderMTls authenticationProvider = new AuthenticationProviderMTls(); + if (reuseMetadata) { + authenticationProvider.initialize(this.serviceConfiguration); + } else { + authenticationProvider.initialize(this.metadataStore); + } + + String poolName = "test-pool"; + + String sha1 = "c6deb54faffe854191ccb33ae44b9471c3a849a8".toUpperCase(); + String serialNumber = "61E61B07906A4FF7CD46B9591D3E1C390DF25E01"; + + Pool pool = new Pool(poolName, Pool.AUTH_TYPE_MTLS, "this a test mtls type pool", null, + "DN.contains(\"C=US\") && OU=='Apache Pulsar' && SAN.contains(\"IP:127.0.0.1\") && SHA1=='" + sha1 + "' && " + + "SNID=='" + serialNumber + "'"); + + authenticationProvider.getPoolResources().createPool(pool); + + Awaitility.await().until(() -> authenticationProvider.getPoolMap().size() == 1); + + X509Certificate[] x509Certificates = + SecurityUtility.loadCertificatesFromPemFile(getResourcePath("mtls/client-cert.pem")); + + SSLSession sslSession = new MockSSLSession(x509Certificates); + AuthenticationDataCommand authData = new AuthenticationDataCommand("", LocalAddress.ANY, sslSession); + String principal = authenticationProvider.authenticate(authData); + Assert.assertEquals(principal, poolName); + authenticationProvider.close(); + } + + private String getResourcePath(String path) { + // get resource directory path + URL resource = this.getClass().getClassLoader().getResource(path); + if (resource == null) { + throw new RuntimeException("Resource not found: " + path); + } + return resource.getPath(); + } + + static class MockSSLSession implements SSLSession { + private final Certificate[] peerCertificates; + + public MockSSLSession(Certificate[] peerCertificates) { + this.peerCertificates = peerCertificates; + } + + @Override + public byte[] getId() { + return new byte[0]; + } + + @Override + public SSLSessionContext getSessionContext() { + return null; + } + + @Override + public long getCreationTime() { + return 0; + } + + @Override + public long getLastAccessedTime() { + return 0; + } + + @Override + public void invalidate() { + + } + + @Override + public boolean isValid() { + return false; + } + + @Override + public void putValue(String name, Object value) { + + } + + @Override + public Object getValue(String name) { + return null; + } + + @Override + public void removeValue(String name) { + + } + + @Override + public String[] getValueNames() { + return new String[0]; + } + + @Override + public Certificate[] getPeerCertificates() throws SSLPeerUnverifiedException { + return peerCertificates; + } + + @Override + public Certificate[] getLocalCertificates() { + return new Certificate[0]; + } + + @Override + public Principal getPeerPrincipal() throws SSLPeerUnverifiedException { + return null; + } + + @Override + public Principal getLocalPrincipal() { + return null; + } + + @Override + public String getCipherSuite() { + return ""; + } + + @Override + public String getProtocol() { + return ""; + } + + @Override + public String getPeerHost() { + return ""; + } + + @Override + public int getPeerPort() { + return 0; + } + + @Override + public int getPacketBufferSize() { + return 0; + } + + @Override + public int getApplicationBufferSize() { + return 0; + } + } +} diff --git a/mqtt-impl/src/test/resources/mtls/cel-test.txt b/mqtt-impl/src/test/resources/mtls/cel-test.txt new file mode 100644 index 000000000..107d2f1a3 --- /dev/null +++ b/mqtt-impl/src/test/resources/mtls/cel-test.txt @@ -0,0 +1 @@ +CN=streamnative.io,OU=Pulsar\, Cloud,O=StreamNative\, Inc.,L=Mountain View\, 899 W Evelyn Ave,ST=California\0DAfter,C=US \ No newline at end of file diff --git a/mqtt-impl/src/test/resources/mtls/client-cert.pem b/mqtt-impl/src/test/resources/mtls/client-cert.pem new file mode 100644 index 000000000..192d68624 --- /dev/null +++ b/mqtt-impl/src/test/resources/mtls/client-cert.pem @@ -0,0 +1,72 @@ +Certificate: + Data: + Version: 3 (0x2) + Serial Number: + 61:e6:1b:07:90:6a:4f:f7:cd:46:b9:59:1d:3e:1c:39:0d:f2:5e:01 + Signature Algorithm: sha256WithRSAEncryption + Issuer: CN = CARoot + Validity + Not Before: May 30 13:38:24 2022 GMT + Not After : May 27 13:38:24 2032 GMT + Subject: C = US, ST = CA, O = Apache, OU = Apache Pulsar, CN = superUser + Subject Public Key Info: + Public Key Algorithm: rsaEncryption + RSA Public-Key: (2048 bit) + Modulus: + 00:cd:43:7d:98:40:f9:b0:5b:bc:ae:db:c0:0b:ad: + 26:90:96:e0:62:38:ed:68:b1:70:46:3b:de:44:f9: + 14:51:86:10:eb:ca:90:e7:88:e8:f9:91:85:e0:dd: + b5:b4:14:b9:78:e3:86:d5:54:6d:68:ec:14:92:b4: + f8:22:5b:05:3d:ed:31:25:65:08:05:84:ca:e6:0c: + 21:12:58:32:c7:1a:60:a3:4f:d2:4a:9e:28:19:7c: + 45:84:00:8c:89:dc:de:8a:e5:4f:88:91:cc:a4:f1: + 81:45:4c:7d:c2:ff:e2:c1:89:c6:12:73:95:e2:36: + bd:db:ae:8b:5a:68:6a:90:51:de:2b:88:5f:aa:67: + f4:a8:e3:63:dc:be:19:82:cc:9d:7f:e6:8d:fb:82: + be:22:01:3d:56:13:3b:5b:04:b4:e8:c5:18:e6:2e: + 0d:fa:ba:4a:8d:e8:c6:5a:a1:51:9a:4a:62:d7:af: + dd:b4:fc:e2:d5:cd:ae:99:6c:5c:61:56:0b:d7:0c: + 1a:77:5c:f5:3a:6a:54:b5:9e:33:ac:a9:75:28:9a: + 76:af:d0:7a:57:00:1b:91:13:31:fd:42:88:21:47: + 05:10:01:2f:59:bb:c7:3a:d9:e1:58:4c:1b:6c:71: + b6:98:ef:dd:03:82:58:a3:32:dc:90:a1:b6:a6:1e: + e1:0b + Exponent: 65537 (0x10001) + X509v3 extensions: + X509v3 Subject Alternative Name: + DNS:localhost, IP Address:127.0.0.1 + Signature Algorithm: sha256WithRSAEncryption + 96:c2:23:2d:46:d0:3d:23:0e:ab:3d:b6:1e:31:96:00:eb:ae: + 17:ac:6e:c0:d4:1a:8d:0f:36:63:27:02:49:4e:24:cf:d3:80: + 88:3a:4f:d0:f1:e5:1c:df:2d:8a:ab:ae:8d:48:77:a0:d0:dc: + d5:80:1c:a1:3d:0d:49:64:bf:cb:39:84:c9:f3:5d:e0:2d:ba: + a0:f2:ac:03:85:44:a1:97:6b:0b:de:ed:a7:49:19:46:b2:18: + 49:21:62:43:52:36:6f:47:6c:21:6b:5e:41:85:28:71:6c:22: + 27:35:76:82:ed:ac:ad:d7:fa:9d:4c:7d:6f:44:7e:06:dd:8a: + 11:32:0c:d9:d0:f6:63:2a:40:ae:0d:5a:df:9e:d7:91:8a:db: + 2d:95:f3:19:f0:8f:1e:34:e3:b2:31:67:38:74:fd:3f:e6:49: + 5e:53:eb:88:ae:b1:45:71:0e:67:97:3c:99:4e:c7:ea:1e:02: + 67:b4:54:ef:4f:10:55:4a:70:c0:eb:41:e4:50:d4:48:5e:70: + c5:0f:79:f2:06:3d:35:ea:ce:5d:13:8e:14:65:fc:98:21:16: + 2d:5d:6d:f8:e0:6b:c7:c6:e4:8a:ca:c9:38:1f:93:27:86:28: + ef:96:e7:ad:6c:4a:9e:10:78:48:00:f4:4a:43:dc:87:1d:e3: + d3:39:53:68 +-----BEGIN CERTIFICATE----- +MIIDFDCCAfygAwIBAgIUYeYbB5BqT/fNRrlZHT4cOQ3yXgEwDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGQ0FSb290MB4XDTIyMDUzMDEzMzgyNFoXDTMyMDUyNzEz +MzgyNFowVzELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMQ8wDQYDVQQKEwZBcGFj +aGUxFjAUBgNVBAsTDUFwYWNoZSBQdWxzYXIxEjAQBgNVBAMTCXN1cGVyVXNlcjCC +ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAM1DfZhA+bBbvK7bwAutJpCW +4GI47WixcEY73kT5FFGGEOvKkOeI6PmRheDdtbQUuXjjhtVUbWjsFJK0+CJbBT3t +MSVlCAWEyuYMIRJYMscaYKNP0kqeKBl8RYQAjInc3orlT4iRzKTxgUVMfcL/4sGJ +xhJzleI2vduui1poapBR3iuIX6pn9KjjY9y+GYLMnX/mjfuCviIBPVYTO1sEtOjF +GOYuDfq6So3oxlqhUZpKYtev3bT84tXNrplsXGFWC9cMGndc9TpqVLWeM6ypdSia +dq/QelcAG5ETMf1CiCFHBRABL1m7xzrZ4VhMG2xxtpjv3QOCWKMy3JChtqYe4QsC +AwEAAaMeMBwwGgYDVR0RBBMwEYIJbG9jYWxob3N0hwR/AAABMA0GCSqGSIb3DQEB +CwUAA4IBAQCWwiMtRtA9Iw6rPbYeMZYA664XrG7A1BqNDzZjJwJJTiTP04CIOk/Q +8eUc3y2Kq66NSHeg0NzVgByhPQ1JZL/LOYTJ813gLbqg8qwDhUShl2sL3u2nSRlG +shhJIWJDUjZvR2wha15BhShxbCInNXaC7ayt1/qdTH1vRH4G3YoRMgzZ0PZjKkCu +DVrfnteRitstlfMZ8I8eNOOyMWc4dP0/5kleU+uIrrFFcQ5nlzyZTsfqHgJntFTv +TxBVSnDA60HkUNRIXnDFD3nyBj016s5dE44UZfyYIRYtXW344GvHxuSKysk4H5Mn +hijvluetbEqeEHhIAPRKQ9yHHePTOVNo +-----END CERTIFICATE----- diff --git a/mqtt-impl/src/test/resources/mtls/client.cer b/mqtt-impl/src/test/resources/mtls/client.cer new file mode 100644 index 000000000..36f70ea7d --- /dev/null +++ b/mqtt-impl/src/test/resources/mtls/client.cer @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC9DCCAdygAwIBAgIUHqngM2NyphZKHyK/t8Vs1OoUL58wDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMCAXDTI0MDgyMzEwNTgzNloYDzIxMjQwNzMw +MTA1ODM2WjARMQ8wDQYDVQQDDAZDTElFTlQwggEiMA0GCSqGSIb3DQEBAQUAA4IB +DwAwggEKAoIBAQC0S6AnsYUBJJP3ZiOqPdqvjW4nbslKvTYFo2tfLnobOakuSKyD +Iisx9FlUHLFMslL3QZ6mp/U8pmZU6YORWo/f6eElLuGtQPSVxqTEZH0v+GY8N8HA +XshvXmh7CSkGFzbKJ9vyAN2rZ/XrgEP3bBSQZxVSloLaNjV+lXzs3JaULxvQL+EJ +ef38RC//qI7qPflaNveIpnaCpG+Rx8QnFDYeyi8BDS2PdeKwLzj4aXaW5qjoBC3v +qk9/XLqxMXTAX8Ty32E7HrgxhG0mA48gYg2T/Ba8RPykNGvM/cncRgC+B0IJJ1jY +XAgdg/C5Xsz7DveWeezWibripMQN0UrkTjcLAgMBAAGjQjBAMB0GA1UdDgQWBBQY +djFGcYoRFXb7lyZkoFqQJnML1DAfBgNVHSMEGDAWgBSXAIfV5mlQg8lagBoWOfYa +lQ2JZzANBgkqhkiG9w0BAQsFAAOCAQEAiATbrV3/uIuIJQbXBssuVOlTcUWXPAHf +bFGz31NgPnb0za2ApoJ+912orSxSAvlG10g+2Z34iW8+bciH8e1DF+cPQKVg0lkm +jaMQO1cUVw6e2aRHagIMvgEwcz+PqVJoLvWWp5sjqnlafu/ZuXjzeyefuFbxD3kI +EAuSp4juklHrjLZbDulUgGuodKnJ/plzRLKUYoKArdmCAulZRmxcKBw6oYjQWo3A +6lLLHgqaV2g2RuAQFP6qCTMGEWXu4F8ZWJC6vV0zEDMh6QKJdNH1RShbsWlfIxsW +TU6Pswt9EyDIo2Wd72n/sAC8pFxvM8tfhFsAOihHB1XBb2YhtNCORQ== +-----END CERTIFICATE----- diff --git a/pom.xml b/pom.xml index fa3a580bd..35e126a97 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,13 @@ ${mockito.version} test + + dev.cel + cel + 0.5.2 + + provided + @@ -180,6 +187,7 @@ true false true + **/oidc/broker/common/** @@ -264,6 +272,7 @@ **/.github/** **/.ci/** **/mqtt-perf/** + **/resources/mtls/** JAVADOC_STYLE