diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java index 8221718b..a0c53302 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java @@ -18,7 +18,7 @@ import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_TOKEN; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; -import io.streamnative.pulsar.handlers.mqtt.identitypool.AuthenticationProviderMTls; +import io.streamnative.pulsar.handlers.mqtt.authentication.AuthenticationProviderMTls; import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils; import java.util.HashMap; import java.util.List; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthRequest.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthRequest.java new file mode 100644 index 00000000..beb7e77c --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthRequest.java @@ -0,0 +1,25 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.authentication; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public final class AuthRequest { + private String subject; + private Map variables; +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthenticationProviderMTls.java similarity index 80% rename from mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java rename to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthenticationProviderMTls.java index 2f043aed..14c77346 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthenticationProviderMTls.java @@ -11,13 +11,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.mqtt.identitypool; - -import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN; -import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS; -import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN; -import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SHA1; -import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SNID; +package io.streamnative.pulsar.handlers.mqtt.authentication; + +import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.DN; +import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SAN; +import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SHA1; +import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SNID; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import io.streamnative.oidc.broker.common.OIDCPoolResources; @@ -43,10 +44,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrBuilder; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -64,6 +67,8 @@ public class AuthenticationProviderMTls implements AuthenticationProvider { @VisibleForTesting private OIDCPoolResources poolResources; + private final ObjectMapper objectMapper = ObjectMapperFactory.create(); + @Getter @VisibleForTesting private final ConcurrentHashMap poolMap = new ConcurrentHashMap<>(); @@ -212,7 +217,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat final X509Certificate certificate = (X509Certificate) certs[0]; // parse DN - Map params; + Map params; try { String subject = certificate.getSubjectX500Principal().getName(); params = parseDN(subject); @@ -228,20 +233,26 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat // parse SHA1 params.put(SHA1, parseSHA1FingerPrint(certificate)); - String principal = matchPool(params); - if (principal.isEmpty()) { + String poolName = matchPool(params); + if (poolName.isEmpty()) { errorCode = ErrorCode.NO_MATCH_POOL; throw new AuthenticationException("No matched identity pool from the client certificate"); } + AuthRequest authRequest = new AuthRequest(poolName, params); + String authRequestJson = objectMapper.writeValueAsString(authRequest); metrics.recordSuccess(); - return principal; + return authRequestJson; } catch (AuthenticationException e) { metrics.recordFailure(errorCode); throw e; + } catch (JsonProcessingException e) { + log.error("Failed to serialize the auth request", e); + metrics.recordFailure(errorCode); + throw new AuthenticationException(e.getMessage()); } } - public String matchPool(Map params) throws AuthenticationException { + public String matchPool(Map params) throws AuthenticationException { List principals = new ArrayList<>(); poolMap.forEach((poolName, compiler) -> { Boolean matched = false; @@ -284,8 +295,8 @@ static String parseSHA1FingerPrint(X509Certificate certificate) { } } - static Map parseDN(String dn) throws InvalidNameException { - Map params = new HashMap<>(); + static Map parseDN(String dn) throws InvalidNameException { + Map params = new HashMap<>(); if (StringUtils.isEmpty(dn)) { return params; } @@ -293,23 +304,29 @@ static Map parseDN(String dn) throws InvalidNameException { LdapName ldapName = new LdapName(dn); for (Rdn rdn : ldapName.getRdns()) { String rdnType = rdn.getType().toUpperCase(); - if (DN_KEYS.contains(rdnType)) { - String value = Rdn.escapeValue(rdn.getValue()); - value = value.replace("\r", "\\0D"); - value = value.replace("\n", "\\0A"); - params.put(rdnType, value); - } + String value = Rdn.escapeValue(rdn.getValue()); + value = value.replace("\r", "\\0D"); + value = value.replace("\n", "\\0A"); + params.put(rdnType, value); } return params; } - static void parseSAN(X509Certificate certificate, @NotNull Map map) { + static void parseSAN(X509Certificate certificate, @NotNull Map map) { try { + // byte[] extensionValue = certificate.getExtensionValue("2.5.29.17"); + // TODO How to get the original extension name Collection> subjectAlternativeNames = certificate.getSubjectAlternativeNames(); if (subjectAlternativeNames != null) { List formattedSANList = subjectAlternativeNames.stream() - .map(list -> getSanName((int) list.get(0)) + ":" + list.get(1)) + .map(list -> { + String sanName = getSanName((int) list.get(0)); + String sanValue = (String) list.get(1); + map.put(sanName, sanValue); + sanName = mapSANNames(sanName, sanValue, map); + return sanName + ":" + sanValue; + }) .collect(Collectors.toList()); String formattedSAN = String.join(",", formattedSANList); map.put(SAN, formattedSAN); @@ -319,10 +336,27 @@ static void parseSAN(X509Certificate certificate, @NotNull Map m } } + static String mapSANNames(String sanName, String sanValue, @NotNull Map map) { + String newSanName = sanName; + // "RFC822NAME:aaa" -> "EMAIL:aaa,DEVICE_ID:aaa,RFC822NAME:aaa" + if (sanName.equals("DNS")) { + StrBuilder strBuilder = new StrBuilder(); + strBuilder.append("EMAIL:").append(sanValue).append(","); + map.put("EMAIL", sanValue); + +// strBuilder.append("DEVICE_ID:").append(sanValue).append(","); +// map.put("DEVICE_ID", sanValue); + + strBuilder.append(sanName); + newSanName = strBuilder.toString(); + } + return newSanName; + } + private static String getSanName(int type) { return switch (type) { case 0 -> "OTHERNAME"; - case 1 -> "EMAIL"; + case 1 -> "RFC822NAME"; case 2 -> "DNS"; case 3 -> "X400"; case 4 -> "DIR"; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/ExpressionCompiler.java similarity index 95% rename from mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java rename to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/ExpressionCompiler.java index 8f6fd6c7..7d989015 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/ExpressionCompiler.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.mqtt.identitypool; +package io.streamnative.pulsar.handlers.mqtt.authentication; import dev.cel.common.CelAbstractSyntaxTree; @@ -73,7 +73,7 @@ private void compile() throws CelValidationException, CelEvaluationException { this.program = runtime.createProgram(ast); } - public Boolean eval(Map mapValue) throws Exception { + public Boolean eval(Map mapValue) throws Exception { final Object eval = program.eval(mapValue); if (eval instanceof Boolean) { return (Boolean) eval; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/package-info.java similarity index 90% rename from mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java rename to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/package-info.java index 34f62426..b16c3fc3 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/package-info.java @@ -11,4 +11,4 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.mqtt.identitypool; \ No newline at end of file +package io.streamnative.pulsar.handlers.mqtt.authentication; \ No newline at end of file diff --git a/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTlsTest.java b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthenticationProviderMTlsTest.java similarity index 92% rename from mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTlsTest.java rename to mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthenticationProviderMTlsTest.java index 133f7e47..bdbe247a 100644 --- a/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTlsTest.java +++ b/mqtt-impl/src/test/java/io/streamnative/pulsar/handlers/mqtt/authentication/AuthenticationProviderMTlsTest.java @@ -11,8 +11,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.mqtt.identitypool; +package io.streamnative.pulsar.handlers.mqtt.authentication; +import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.local.LocalAddress; import io.streamnative.oidc.broker.common.pojo.Pool; import java.io.File; @@ -25,9 +26,11 @@ import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSessionContext; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore; @@ -38,6 +41,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j public class AuthenticationProviderMTlsTest { private static final String SUPER_USER = "superUser"; @@ -45,6 +49,7 @@ public class AuthenticationProviderMTlsTest { private ServiceConfiguration serviceConfiguration; private LocalMemoryMetadataStore metadataStore; + private final ObjectMapper objectMapper = ObjectMapperFactory.create(); @SuppressWarnings("UnstableApiUsage") @BeforeClass @@ -73,7 +78,8 @@ public Object[][] reuseMetadata() { public void testExpression() throws Exception { String dn = FileUtils.readFileToString(new File(getResourcePath("mtls/cel-test.txt")), "UTF-8"); - Map params = AuthenticationProviderMTls.parseDN(dn); + Map params = AuthenticationProviderMTls.parseDN(dn); + params.put("O2", "StreamNative, Inc."); ExpressionCompiler compiler = new ExpressionCompiler("DN.contains(\"CN=streamnative.io\")"); Boolean eval = compiler.eval(params); @@ -121,7 +127,9 @@ public void testAuthenticationProviderMTls(boolean reuseMetadata) throws Excepti SSLSession sslSession = new MockSSLSession(x509Certificates); AuthenticationDataCommand authData = new AuthenticationDataCommand("", LocalAddress.ANY, sslSession); String principal = authenticationProvider.authenticate(authData); - Assert.assertEquals(principal, poolName); + log.info("Principal: {}", principal); + AuthRequest authRequest = objectMapper.readValue(principal, AuthRequest.class); + Assert.assertEquals(authRequest.getSubject(), poolName); authenticationProvider.close(); }