From d2c8afc03547b4bba2945b48043e4f95f6299432 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Tue, 10 Sep 2024 16:41:22 +0800 Subject: [PATCH] Support mTLS authentication for MoP (#1414) --- .github/workflows/pr_test.yml | 22 +- .../pulsar/handlers/mqtt/Connection.java | 4 +- .../pulsar/handlers/mqtt/Constants.java | 2 + .../mqtt/MQTTAuthenticationService.java | 44 +++- .../mqtt/MQTTCommonConfiguration.java | 7 + .../pulsar/handlers/mqtt/MQTTService.java | 5 +- .../handlers/mqtt/adapter/AdapterChannel.java | 2 +- .../mqtt/exception/MQTTAuthException.java | 35 +++ .../AuthenticationProviderMTls.java | 1 - .../MQTTProxyProtocolMethodProcessor.java | 34 ++- .../handlers/mqtt/proxy/MQTTProxyService.java | 3 +- ...AbstractCommonProtocolMethodProcessor.java | 13 +- .../MQTTBrokerProtocolMethodProcessor.java | 35 ++- .../handlers/mqtt/utils/MqttMessageUtils.java | 100 +++++++ .../pulsar/handlers/mqtt/utils/MqttUtils.java | 4 + .../pulsar/handlers/mqtt/utils/Paths.java | 32 +++ pom.xml | 2 + .../mqtt3/fusesource/proxy/ProxyTest.java | 4 +- .../mqtt5/hivemq/base/MQTT5ClientUtils.java | 8 + .../mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java | 244 ++++++++++++++++++ tests/src/test/resources/mtls/proxy/ca.cer | 19 ++ tests/src/test/resources/mtls/proxy/ca.crt | 19 ++ tests/src/test/resources/mtls/proxy/ca.key | 28 ++ tests/src/test/resources/mtls/proxy/ca.srl | 1 + .../test/resources/mtls/proxy/ca_config.cnf | 15 ++ .../src/test/resources/mtls/proxy/client.cer | 18 ++ .../src/test/resources/mtls/proxy/client.csr | 15 ++ .../src/test/resources/mtls/proxy/client.key | 28 ++ .../src/test/resources/mtls/proxy/client.p12 | Bin 0 -> 2531 bytes .../resources/mtls/proxy/clientkeystore.jks | Bin 0 -> 2107 bytes .../src/test/resources/mtls/proxy/server.cer | 18 ++ .../src/test/resources/mtls/proxy/server.csr | 15 ++ .../src/test/resources/mtls/proxy/server.key | 28 ++ .../src/test/resources/mtls/proxy/server.p12 | Bin 0 -> 2531 bytes .../resources/mtls/proxy/serverkeystore.jks | Bin 0 -> 2110 bytes .../test/resources/mtls/proxy/truststore.jks | Bin 0 -> 1158 bytes 36 files changed, 764 insertions(+), 41 deletions(-) create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTAuthException.java create mode 100644 mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/Paths.java create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java create mode 100644 tests/src/test/resources/mtls/proxy/ca.cer create mode 100644 tests/src/test/resources/mtls/proxy/ca.crt create mode 100644 tests/src/test/resources/mtls/proxy/ca.key create mode 100644 tests/src/test/resources/mtls/proxy/ca.srl create mode 100644 tests/src/test/resources/mtls/proxy/ca_config.cnf create mode 100644 tests/src/test/resources/mtls/proxy/client.cer create mode 100644 tests/src/test/resources/mtls/proxy/client.csr create mode 100644 tests/src/test/resources/mtls/proxy/client.key create mode 100644 tests/src/test/resources/mtls/proxy/client.p12 create mode 100644 tests/src/test/resources/mtls/proxy/clientkeystore.jks create mode 100644 tests/src/test/resources/mtls/proxy/server.cer create mode 100644 tests/src/test/resources/mtls/proxy/server.csr create mode 100644 tests/src/test/resources/mtls/proxy/server.key create mode 100644 tests/src/test/resources/mtls/proxy/server.p12 create mode 100644 tests/src/test/resources/mtls/proxy/serverkeystore.jks create mode 100644 tests/src/test/resources/mtls/proxy/truststore.jks diff --git a/.github/workflows/pr_test.yml b/.github/workflows/pr_test.yml index 9ce8e86b5..3b74a0051 100644 --- a/.github/workflows/pr_test.yml +++ b/.github/workflows/pr_test.yml @@ -23,17 +23,17 @@ jobs: check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Cache Maven packages - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: ~/.m2 key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Set up JDK 17 - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: 17 @@ -55,7 +55,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 5 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Get All Tests id: list-test @@ -78,17 +78,17 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 20 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Cache Maven packages - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: ~/.m2 key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Set up JDK 17 - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: 17 @@ -125,17 +125,17 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 10 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Cache Maven packages - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: ~/.m2 key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Set up JDK 17 - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: distribution: 'temurin' java-version: 17 @@ -144,7 +144,7 @@ jobs: run: mvn clean install -DskipTests - name: Download jacoco artifact - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v3 with: path: mqtt-impl/target diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java index a2eda7925..4c30d1062 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Connection.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -69,7 +70,8 @@ public class Connection { @Getter private final TopicSubscriptionManager topicSubscriptionManager; @Getter - private final MqttConnectMessage connectMessage; + @Setter + private MqttConnectMessage connectMessage; @Getter private final ClientRestrictions clientRestrictions; @Getter diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java index a0c663936..c5da59cf8 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/Constants.java @@ -24,6 +24,8 @@ public final class Constants { public static final String AUTH_BASIC = "basic"; public static final String AUTH_TOKEN = "token"; + public static final String AUTH_MTLS = "mTls"; + public static final String ATTR_TOPIC_SUBS = "topicSubs"; public static final String MQTT_PROPERTIES = "MQTT_PROPERTIES_%d_"; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java index c474d2103..25dd2d74f 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java @@ -14,14 +14,17 @@ package io.streamnative.pulsar.handlers.mqtt; import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_BASIC; +import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS; import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_TOKEN; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.streamnative.pulsar.handlers.mqtt.identitypool.AuthenticationProviderMTls; import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.naming.AuthenticationException; +import javax.net.ssl.SSLSession; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -29,6 +32,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.service.BrokerService; /** * MQTT authentication service. @@ -42,8 +46,15 @@ public class MQTTAuthenticationService { @Getter private final Map authenticationProviders; - public MQTTAuthenticationService(AuthenticationService authenticationService, List authenticationMethods) { - this.authenticationService = authenticationService; + private final BrokerService brokerService; + + private final boolean mqttProxyMTlsAuthenticationEnabled; + + public MQTTAuthenticationService(BrokerService brokerService, List authenticationMethods, boolean + mqttProxyMTlsAuthenticationEnabled) { + this.brokerService = brokerService; + this.mqttProxyMTlsAuthenticationEnabled = mqttProxyMTlsAuthenticationEnabled; + this.authenticationService = brokerService.getAuthenticationService(); this.authenticationProviders = getAuthenticationProviders(authenticationMethods); } @@ -57,34 +68,49 @@ private Map getAuthenticationProviders(List entry : authenticationProviders.entrySet()) { String authMethod = entry.getKey(); try { - AuthenticationDataSource authData = getAuthData(authMethod, payload); + AuthenticationDataSource authData = getAuthData(authMethod, payload, session); userRole = entry.getValue().authenticate(authData); authenticated = true; authenticationDataSource = authData; @@ -116,12 +142,14 @@ public AuthenticationResult authenticate(String clientIdentifier, return new AuthenticationResult(authenticated, userRole, command); } - public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload) { + public AuthenticationDataSource getAuthData(String authMethod, MqttConnectPayload payload, SSLSession session) { switch (authMethod) { case AUTH_BASIC: return new AuthenticationDataCommand(payload.userName() + ":" + payload.password()); case AUTH_TOKEN: return new AuthenticationDataCommand(payload.password()); + case AUTH_MTLS: + return new AuthenticationDataCommand(null, null, session); default: throw new IllegalArgumentException( String.format("Unsupported authentication method : %s!", authMethod)); diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java index 676dc5a0e..a1a0c7f08 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTCommonConfiguration.java @@ -145,6 +145,13 @@ public class MQTTCommonConfiguration extends ServiceConfiguration { ) private boolean mqttProxyTlsEnabled = false; + @FieldContext( + category = CATEGORY_MQTT_PROXY, + required = false, + doc = "Whether use mTLS authenticate for mTLS connection" + ) + private boolean mqttProxyMTlsAuthenticationEnabled = false; + @FieldContext( category = CATEGORY_MQTT_PROXY, required = false, diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java index 26db52fa4..b47ccf460 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTService.java @@ -96,8 +96,9 @@ public MQTTService(BrokerService brokerService, MQTTServerConfiguration serverCo this.metricsProvider = new MQTTMetricsProvider(metricsCollector); this.pulsarService.addPrometheusRawMetricsProvider(metricsProvider); this.authenticationService = serverConfiguration.isMqttAuthenticationEnabled() - ? new MQTTAuthenticationService(brokerService.getAuthenticationService(), - serverConfiguration.getMqttAuthenticationMethods()) : null; + ? new MQTTAuthenticationService(brokerService, + serverConfiguration.getMqttAuthenticationMethods(), + serverConfiguration.isMqttProxyMTlsAuthenticationEnabled()) : null; this.connectionManager = new MQTTConnectionManager(pulsarService.getAdvertisedAddress()); this.subscriptionManager = new MQTTSubscriptionManager(); if (getServerConfiguration().isMqttProxyEnabled()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java index ccf483d4c..1e43b74af 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/AdapterChannel.java @@ -51,7 +51,7 @@ public CompletableFuture writeAndFlush(final MqttAdapterMessage adapterMsg }); future.exceptionally(ex -> { log.warn("[AdapterChannel][{}] Proxy write to broker {} failed." - + " error message: {}", clientId, broker, ex.getMessage()); + + " adapterMsg message: {}", clientId, broker, adapterMsg, ex); return null; }); return future; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTAuthException.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTAuthException.java new file mode 100644 index 000000000..267536539 --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/exception/MQTTAuthException.java @@ -0,0 +1,35 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.exception; + +/** + * Internal server exception. + */ +public class MQTTAuthException extends Exception { + + public MQTTAuthException() { + } + + public MQTTAuthException(String message) { + super(message); + } + + public MQTTAuthException(String message, Throwable cause) { + super(message, cause); + } + + public MQTTAuthException(Throwable cause) { + super(cause); + } +} diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java index 6cad681cf..2f043aeda 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/AuthenticationProviderMTls.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.mqtt.identitypool; - import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN; import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS; import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index a64df78fa..6dafd3db5 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -13,6 +13,10 @@ */ package io.streamnative.pulsar.handlers.mqtt.proxy; +import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttConnectMessage; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttPublishMessage; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttSubscribeMessage; import com.google.common.collect.Lists; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttConnectMessage; @@ -64,17 +68,19 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; + /** * Proxy inbound handler is the bridge between proxy and MoP. */ @Slf4j public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMethodProcessor { + private final PulsarService pulsarService; + @Getter private Connection connection; private final LookupHandler lookupHandler; private final MQTTProxyConfiguration proxyConfig; - private final PulsarService pulsarService; private final Map> topicBrokers; private final Map adapterChannels; @Getter @@ -86,6 +92,7 @@ public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMeth private final MQTTConnectionManager connectionManager; private final SystemEventService eventService; private final MQTTProxyAdapter proxyAdapter; + private final AtomicBoolean isDisconnected = new AtomicBoolean(false); private final AutoSubscribeHandler autoSubscribeHandler; @@ -95,8 +102,9 @@ public class MQTTProxyProtocolMethodProcessor extends AbstractCommonProtocolMeth public MQTTProxyProtocolMethodProcessor(MQTTProxyService proxyService, ChannelHandlerContext ctx) { super(proxyService.getAuthenticationService(), - proxyService.getProxyConfig().isMqttAuthenticationEnabled(), ctx); - this.pulsarService = proxyService.getPulsarService(); + proxyService.getProxyConfig().isMqttAuthenticationEnabled(), + ctx); + pulsarService = proxyService.getPulsarService(); this.lookupHandler = proxyService.getLookupHandler(); this.proxyConfig = proxyService.getProxyConfig(); this.connectionManager = proxyService.getConnectionManager(); @@ -115,7 +123,7 @@ public MQTTProxyProtocolMethodProcessor(MQTTProxyService proxyService, ChannelHa @Override public void doProcessConnect(MqttAdapterMessage adapter, String userRole, AuthenticationDataSource authData, ClientRestrictions clientRestrictions) { - final MqttConnectMessage msg = (MqttConnectMessage) adapter.getMqttMessage(); + MqttConnectMessage msg = (MqttConnectMessage) adapter.getMqttMessage(); final ServerRestrictions serverRestrictions = ServerRestrictions.builder() .receiveMaximum(proxyConfig.getReceiveMaximum()) .maximumPacketSize(proxyConfig.getMqttMessageMaxLength()) @@ -133,6 +141,12 @@ public void doProcessConnect(MqttAdapterMessage adapter, String userRole, .processor(this) .build(); connection.sendConnAck(); + if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { + MqttConnectMessage connectMessage = createMqttConnectMessage(msg, AUTH_MTLS, userRole); + msg = connectMessage; + connection.setConnectMessage(msg); + } + ConnectEvent connectEvent = ConnectEvent.builder() .clientId(connection.getClientId()) .address(pulsarService.getAdvertisedAddress()) @@ -152,6 +166,10 @@ public void processPublish(MqttAdapterMessage adapter) { proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(), TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain())); adapter.setClientId(connection.getClientId()); + if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { + MqttPublishMessage mqttMessage = createMqttPublishMessage(msg, AUTH_MTLS, connection.getUserRole()); + adapter.setMqttMessage(mqttMessage); + } startPublish() .thenCompose(__ -> writeToBroker(pulsarTopicName, adapter)) .whenComplete((unused, ex) -> { @@ -282,6 +300,10 @@ public void processSubscribe(final MqttAdapterMessage adapter) { log.debug("[Proxy Subscribe] [{}] msg: {}", clientId, msg); } registerTopicListener(adapter); + if (proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { + MqttSubscribeMessage mqttMessage = createMqttSubscribeMessage(msg, AUTH_MTLS, connection.getUserRole()); + adapter.setMqttMessage(mqttMessage); + } doSubscribe(adapter, false) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); @@ -447,8 +469,10 @@ private CompletableFuture connectToBroker(final String topic) { key -> lookupHandler.findBroker(TopicName.get(topic)).thenApply(mqttBroker -> adapterChannels.computeIfAbsent(mqttBroker, key1 -> { AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker); + final MqttConnectMessage connectMessage = connection.getConnectMessage(); + adapterChannel.writeAndFlush(new MqttAdapterMessage(connection.getClientId(), - connection.getConnectMessage())); + connectMessage)); return adapterChannel; }) ) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index bc17954ab..968f2a40b 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -117,7 +117,7 @@ public void start() throws MQTTProxyException { throw new MQTTProxyException(e); } - if (proxyConfig.isMqttProxyTlsEnabled()) { + if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) { ServerBootstrap tlsBootstrap = serverBootstrap.clone(); tlsBootstrap.childHandler(new MQTTProxyChannelInitializer( this, proxyConfig, true, sslContextRefresher)); @@ -148,7 +148,6 @@ public void start() throws MQTTProxyException { throw new MQTTProxyException(e); } } - this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); this.eventService.start(); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java index 678e4c1e3..57f25a99f 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/AbstractCommonProtocolMethodProcessor.java @@ -21,10 +21,12 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; +import io.netty.handler.ssl.SslHandler; import io.streamnative.pulsar.handlers.mqtt.Connection; import io.streamnative.pulsar.handlers.mqtt.MQTTAuthenticationService; import io.streamnative.pulsar.handlers.mqtt.ProtocolMethodProcessor; import io.streamnative.pulsar.handlers.mqtt.adapter.MqttAdapterMessage; +import io.streamnative.pulsar.handlers.mqtt.exception.MQTTAuthException; import io.streamnative.pulsar.handlers.mqtt.exception.restrictions.InvalidReceiveMaximumException; import io.streamnative.pulsar.handlers.mqtt.messages.MqttPropertyUtils; import io.streamnative.pulsar.handlers.mqtt.messages.ack.MqttConnectAck; @@ -33,6 +35,7 @@ import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; import io.streamnative.pulsar.handlers.mqtt.utils.NettyUtils; +import javax.net.ssl.SSLSession; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -123,8 +126,10 @@ public void processConnect(MqttAdapterMessage adapter) { clientId, username); } } else { - MQTTAuthenticationService.AuthenticationResult authResult = authenticationService - .authenticate(connectMessage); + MQTTAuthenticationService.AuthenticationResult authResult; + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + SSLSession session = (sslHandler != null) ? sslHandler.engine().getSession() : null; + authResult = authenticationService.authenticate(adapter.fromProxy(), session, connectMessage); if (authResult.isFailed()) { MqttMessage mqttMessage = MqttConnectAck.errorBuilder().authFail(protocolVersion); log.error("[CONNECT] Invalid or incorrect authentication. CId={}, username={}", clientId, username); @@ -157,6 +162,10 @@ public void processConnect(MqttAdapterMessage adapter) { } } + protected MQTTAuthenticationService.AuthenticationResult mtlsAuth(boolean fromProxy) throws MQTTAuthException { + return MQTTAuthenticationService.AuthenticationResult.FAILED; + } + @Override public void processPubAck(MqttAdapterMessage msg) { if (log.isDebugEnabled()) { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java index df7e5b4b7..3a4bed839 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/MQTTBrokerProtocolMethodProcessor.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.mqtt.support; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createWillMessage; +import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.getMtlsAuthMethodAndData; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.pingResp; import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.topicSubscriptions; import io.netty.channel.ChannelHandlerContext; @@ -74,7 +75,9 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -91,6 +94,7 @@ */ @Slf4j public class MQTTBrokerProtocolMethodProcessor extends AbstractCommonProtocolMethodProcessor { + private final PulsarService pulsarService; private final QosPublishHandlers qosPublishHandlers; private final MQTTServerConfiguration configuration; @@ -104,6 +108,7 @@ public class MQTTBrokerProtocolMethodProcessor extends AbstractCommonProtocolMet private final WillMessageHandler willMessageHandler; private final RetainedMessageHandler retainedMessageHandler; private final AutoSubscribeHandler autoSubscribeHandler; + @Getter private final CompletableFuture inactiveFuture = new CompletableFuture<>(); @@ -180,19 +185,28 @@ public void processPubAck(MqttAdapterMessage adapter) { @Override public void processPublish(MqttAdapterMessage adapter) { if (log.isDebugEnabled()) { - log.debug("[Publish] [{}] msg: {}", connection.getClientId(), adapter); + log.debug("[Publish] [{}] msg: {}", adapter.getClientId(), adapter); } MqttPublishMessage msg = (MqttPublishMessage) adapter.getMqttMessage(); CompletableFuture result; if (!configuration.isMqttAuthorizationEnabled()) { if (log.isDebugEnabled()) { log.debug("[Publish] authorization is disabled, allowing client. CId={}, userRole={}", - connection.getClientId(), connection.getUserRole()); + adapter.getClientId(), connection.getUserRole()); } result = doPublish(adapter); } else { + String userRole = connection.getUserRole(); + AuthenticationDataSource authData = connection.getAuthData(); + if (adapter.fromProxy()) { + final Optional> mtlsAuthMethodAndData = getMtlsAuthMethodAndData(msg); + if (mtlsAuthMethodAndData.isPresent()) { + userRole = mtlsAuthMethodAndData.get().getKey(); + authData = new AuthenticationDataCommand(new String(mtlsAuthMethodAndData.get().getValue())); + } + } result = this.authorizationService.canProduceAsync(TopicName.get(msg.variableHeader().topicName()), - connection.getUserRole(), connection.getAuthData()) + userRole, authData) .thenCompose(authorized -> authorized ? doPublish(adapter) : doUnauthorized(adapter)); } result.thenAccept(__ -> msg.release()) @@ -340,7 +354,7 @@ public boolean connectionEstablished() { public void processSubscribe(MqttAdapterMessage adapter) { MqttSubscribeMessage msg = (MqttSubscribeMessage) adapter.getMqttMessage(); final String clientId = connection.getClientId(); - final String userRole = connection.getUserRole(); + String userRole = connection.getUserRole(); final int packetId = msg.variableHeader().messageId(); if (log.isDebugEnabled()) { log.debug("[Subscribe] [{}] msg: {}", clientId, msg); @@ -351,15 +365,24 @@ public void processSubscribe(MqttAdapterMessage adapter) { } doSubscribe(msg); } else { + AuthenticationDataSource authData = connection.getAuthData(); + if (adapter.fromProxy()) { + final Optional> mtlsAuthMethodAndData = getMtlsAuthMethodAndData(msg); + if (mtlsAuthMethodAndData.isPresent()) { + userRole = mtlsAuthMethodAndData.get().getKey(); + authData = new AuthenticationDataCommand(new String(mtlsAuthMethodAndData.get().getValue())); + } + } List> authorizationFutures = new ArrayList<>(); AtomicBoolean authorizedFlag = new AtomicBoolean(true); for (MqttTopicSubscription topic: msg.payload().topicSubscriptions()) { + String finalUserRole = userRole; authorizationFutures.add(this.authorizationService.canConsumeAsync(TopicName.get(topic.topicName()), - userRole, connection.getAuthData(), userRole).thenAccept((authorized) -> { + userRole, authData, userRole).thenAccept((authorized) -> { if (!authorized) { authorizedFlag.set(false); log.warn("[Subscribe] no authorization to sub topic={}, userRole={}, CId= {}", - topic.topicName(), userRole, clientId); + topic.topicName(), finalUserRole, clientId); } })); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java index 706e1964c..dac7a9f24 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttMessageUtils.java @@ -15,26 +15,33 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; +import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_MTLS; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import io.netty.handler.codec.mqtt.MqttVersion; import io.streamnative.pulsar.handlers.mqtt.support.MessageBuilder; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.tuple.Pair; /** * Mqtt message utils. @@ -182,6 +189,99 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage) return builder.build(); } + public static MqttConnectMessage createMqttConnectMessage(MqttConnectMessage connectMessage, + String authMethod, + String authData) { + final MqttConnectVariableHeader header = connectMessage.variableHeader(); + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value() + , authMethod)); + properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value() + , authData.getBytes())); + MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( + MqttVersion.MQTT_5.protocolName(), MqttVersion.MQTT_5.protocolLevel(), header.hasUserName(), + header.hasPassword(), header.isWillRetain(), header.willQos(), header.isWillFlag(), + header.isCleanSession(), header.keepAliveTimeSeconds(), properties + ); + MqttConnectMessage newConnectMessage = new MqttConnectMessage(connectMessage.fixedHeader(), variableHeader, + connectMessage.payload()); + return newConnectMessage; + } + + public static MqttPublishMessage createMqttPublishMessage(MqttPublishMessage publishMessage, + String authMethod, + String authData) { + final MqttPublishVariableHeader header = publishMessage.variableHeader(); + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value() + , authMethod)); + properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value() + , authData.getBytes())); + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader( + header.topicName(), header.packetId(), properties); + MqttPublishMessage newPublishMessage = new MqttPublishMessage(publishMessage.fixedHeader(), variableHeader, + publishMessage.payload()); + return newPublishMessage; + } + + public static Optional> getMtlsAuthMethodAndData(MqttConnectMessage connectMessage) { + final MqttConnectVariableHeader header = connectMessage.variableHeader(); + MqttProperties properties = header.properties(); + final MqttProperties.MqttProperty property = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()); + if (property != null && property.value() instanceof String + && ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) { + final MqttProperties.MqttProperty data = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); + return Optional.of(Pair.of((String) property.value(), (byte[]) data.value())); + } + return Optional.empty(); + } + + public static Optional> getMtlsAuthMethodAndData(MqttPublishMessage publishMessage) { + final MqttPublishVariableHeader header = publishMessage.variableHeader(); + MqttProperties properties = header.properties(); + final MqttProperties.MqttProperty property = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()); + if (property != null && property.value() instanceof String + && ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) { + final MqttProperties.MqttProperty data = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); + return Optional.of(Pair.of((String) property.value(), (byte[]) data.value())); + } + return Optional.empty(); + } + + public static Optional> getMtlsAuthMethodAndData(MqttSubscribeMessage subscribeMessage) { + final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader(); + MqttProperties properties = header.properties(); + final MqttProperties.MqttProperty property = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value()); + if (property != null && property.value() instanceof String + && ((String) property.value()).equalsIgnoreCase(AUTH_MTLS)) { + final MqttProperties.MqttProperty data = properties.getProperty( + MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value()); + return Optional.of(Pair.of((String) property.value(), (byte[]) data.value())); + } + return Optional.empty(); + } + + public static MqttSubscribeMessage createMqttSubscribeMessage(MqttSubscribeMessage subscribeMessage, + String authMethod, + String authData) { + final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader(); + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD.value() + , authMethod)); + properties.add(new MqttProperties.BinaryProperty(MqttProperties.MqttPropertyType.AUTHENTICATION_DATA.value() + , authData.getBytes())); + MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader( + header.messageId(), properties); + MqttSubscribeMessage newSubscribeMessage = new MqttSubscribeMessage(subscribeMessage.fixedHeader(), + variableHeader, subscribeMessage.payload()); + return newSubscribeMessage; + } + public static MqttMessage createMqttDisconnectMessage() { return MessageBuilder.disconnect().build(); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java index c33529516..b64b851e7 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/MqttUtils.java @@ -42,6 +42,10 @@ public static boolean isMqtt3(int version) { public static boolean isNotMqtt3(int version) { return !isMqtt3(version); } + + public static boolean isMqtt5(int version) { + return version == MqttVersion.MQTT_5.protocolLevel(); + } public static boolean isQosSupported(MqttConnectMessage msg) { return isQosSupported(msg.fixedHeader().qosLevel()); } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/Paths.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/Paths.java new file mode 100644 index 000000000..14d76b29e --- /dev/null +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/Paths.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.utils; + +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import javax.validation.constraints.NotNull; +import lombok.experimental.UtilityClass; + +@UtilityClass +public final class Paths { + + public String getUrlEncodedPath(@NotNull String name) { + return URLEncoder.encode(name, StandardCharsets.UTF_8); + } + + public String getUrlDecodedPath(@NotNull String name) { + return URLDecoder.decode(name, StandardCharsets.UTF_8); + } +} diff --git a/pom.xml b/pom.xml index d5041e5b5..c95faa726 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,8 @@ + + io.grpc grpc-all diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java index 636c03c55..37a39d05f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java @@ -96,7 +96,7 @@ public void testBacklogShouldBeZeroWithQos0() throws Exception { Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) }; connection.subscribe(topics); String message = "Hello MQTT"; - int numMessages = 20000; + int numMessages = 200; for (int i = 0; i < numMessages; i++) { connection.publish(topicName, (message + i).getBytes(), QoS.AT_MOST_ONCE, false); } @@ -127,7 +127,7 @@ public void testBacklogShouldBeZeroWithQos1() throws Exception { Topic[] topics = { new Topic(topicName, QoS.AT_LEAST_ONCE) }; connection.subscribe(topics); String message = "Hello MQTT"; - int numMessages = 20000; + int numMessages = 200; for (int i = 0; i < numMessages; i++) { connection.publish(topicName, (message + i).getBytes(), QoS.AT_LEAST_ONCE, false); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java index 8183f4281..7307f152a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/MQTT5ClientUtils.java @@ -36,6 +36,14 @@ public static Mqtt5BlockingClient createMqtt5ProxyClient(int proxyPort) { .buildBlocking(); } + public static Mqtt5BlockingClient createMqtt5ProxyClient(int proxyPort, String x) { + return Mqtt5Client.builder() + .identifier(UUID.randomUUID().toString()) + .serverHost("127.0.0.1") + .serverPort(proxyPort) + .buildBlocking(); + } + public static void publishQos1ARandomMsg(Mqtt5BlockingClient client, String topic) { client.publishWith() .topic(topic) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java new file mode 100644 index 000000000..3bb89651f --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/base/ProxyMtlsTest.java @@ -0,0 +1,244 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.base; + +import static io.streamnative.oidc.broker.common.pojo.Pool.AUTH_TYPE_MTLS; +import static org.mockito.Mockito.spy; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.hivemq.client.mqtt.MqttClientSslConfig; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.datatypes.MqttTopic; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import io.jsonwebtoken.SignatureAlgorithm; +import io.streamnative.oidc.broker.common.OIDCPoolResources; +import io.streamnative.oidc.broker.common.pojo.Pool; +import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration; +import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import java.io.File; +import java.io.FileInputStream; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.util.SecurityUtility; +import org.awaitility.Awaitility; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ProxyMtlsTest extends MQTTTestBase { + + + String path = "./src/test/resources/mtls/proxy/"; + + String token; + + MQTTCommonConfiguration localConfig; + + @Override + protected MQTTCommonConfiguration initConfig() throws Exception { + System.setProperty("jdk.security.allowNonCaAnchor", "true"); + enableTls = true; + MQTTCommonConfiguration mqtt = super.initConfig(); + + mqtt.setSystemTopicEnabled(false); + mqtt.setMqttProxyEnabled(true); + mqtt.setMqttProxyMTlsAuthenticationEnabled(true); + mqtt.setMqttProxyTlsEnabled(true); + mqtt.setMqttTlsRequireTrustedClientCertOnConnect(true); + mqtt.setMqttTlsAllowInsecureConnection(false); + + mqtt.setMqttTlsEnabledWithKeyStore(true); + mqtt.setMqttTlsKeyStoreType("JKS"); + mqtt.setMqttTlsKeyStore(path + "serverkeystore.jks"); + mqtt.setMqttTlsKeyStorePassword("123456"); + mqtt.setMqttTlsTrustStoreType("JKS"); + mqtt.setMqttTlsTrustStore(path + "truststore.jks"); + mqtt.setMqttTlsTrustStorePassword("123456"); + + SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); + token = AuthTokenUtils.createToken(secretKey, "superUser", Optional.empty()); + + mqtt.setAuthenticationEnabled(true); + mqtt.setMqttAuthenticationEnabled(true); + mqtt.setMqttAuthenticationMethods(ImmutableList.of("token")); + mqtt.setSuperUserRoles(ImmutableSet.of("superUser")); + mqtt.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); + + mqtt.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + mqtt.setBrokerClientAuthenticationParameters("token:" + token); + mqtt.setProperties(properties); + + localConfig = mqtt; + + return mqtt; + } + + @Override + public void afterSetup() throws Exception { + AuthenticationToken authToken = new AuthenticationToken(); + authToken.configure("token:" + token); + + pulsarClient = PulsarClient.builder() + .serviceUrl(brokerUrl.toString()) + .authentication(authToken) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + admin = spy(PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl.toString()) + .authentication(authToken) + .build()); + + final PulsarService pulsarService = pulsarServiceList.get(0); + OIDCPoolResources oidcPoolResources = new OIDCPoolResources(pulsarService.getLocalMetadataStore()); + + Pool pool = new Pool("test-pool", AUTH_TYPE_MTLS, "d", "provider-1", "CN=='CLIENT'"); + oidcPoolResources.createPool(pool); + + Awaitility.await().until(() -> oidcPoolResources.getPool("test-pool") != null); + } + + public SSLContext createSSLContext() throws Exception { + File caCertFile = new File(path + "ca.cer"); + Certificate caCert = CertificateFactory + .getInstance("X.509").generateCertificate(new FileInputStream(caCertFile)); + + File clientCertFile = new File(path + "client.cer"); + Certificate clientCert = CertificateFactory + .getInstance("X.509").generateCertificate(new FileInputStream(clientCertFile)); + + PrivateKey privateKey = SecurityUtility.loadPrivateKeyFromPemFile(path + "client.key"); + + final SSLContext sslContext = SecurityUtility.createSslContext(true, + new Certificate[]{caCert}, new Certificate[]{clientCert}, privateKey); + + return sslContext; + } + + @Test + public void testMqtt3() throws Exception { + SSLContext sslContext = createSSLContext(); + MQTT mqtt = createMQTTProxyTlsClient(); + mqtt.setSslContext(sslContext); + + String topicName = "mqtt3"; + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) }; + connection.subscribe(topics); + String message = "Hello MQTT3"; + connection.publish(topicName, message.getBytes(), QoS.AT_MOST_ONCE, false); + Message received = connection.receive(); + Assert.assertEquals(received.getTopic(), topicName); + Assert.assertEquals(new String(received.getPayload()), message); + received.ack(); + connection.disconnect(); + } + + @Test + public void testMqtt5() throws Exception { + + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(new FileInputStream(path + "clientkeystore.jks"), "123456".toCharArray()); + + KeyStore trustStore = KeyStore.getInstance("JKS"); + trustStore.load(new FileInputStream(path + "truststore.jks"), "123456".toCharArray()); + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, "123456".toCharArray()); + + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + final MqttClientSslConfig sslConfig = MqttClientSslConfig.builder().keyManagerFactory(keyManagerFactory) + .trustManagerFactory(trustManagerFactory) + .build(); + + Random random = new Random(); + final Mqtt5BlockingClient client1 = Mqtt5Client.builder() + .identifier("client-1") + .serverHost("localhost") + .sslConfig(sslConfig) + .serverPort(getMqttProxyPortTlsList().get(random.nextInt(getMqttProxyPortTlsList().size()))) + .buildBlocking(); + final Mqtt5BlockingClient client2 = Mqtt5Client.builder() + .identifier("client-2") + .serverHost("localhost") + .sslConfig(sslConfig) + .serverPort(getMqttProxyPortTlsList().get(random.nextInt(getMqttProxyPortTlsList().size()))) + .buildBlocking(); + + String topic1 = "testMqtt5-client-1"; + String topic2 = "testMqtt5-client-2"; + + client1.connect(); + client2.connect(); + client1.subscribeWith().topicFilter(topic1).qos(MqttQos.AT_LEAST_ONCE).send(); + client2.subscribeWith().topicFilter(topic2).qos(MqttQos.AT_LEAST_ONCE).send(); + byte[] msg1 = "client-1-payload".getBytes(); + byte[] msg2 = "client-2-payload".getBytes(); + client1.publishWith() + .topic(topic1) + .qos(MqttQos.AT_LEAST_ONCE) + .payload(msg1) + .send(); + + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5Publish publish = publishes.receive(); + Assert.assertEquals(publish.getTopic(), MqttTopic.of(topic1)); + Assert.assertEquals(publish.getPayloadAsBytes(), msg1); + } + // + client2.publishWith() + .topic(topic2) + .qos(MqttQos.AT_LEAST_ONCE) + .payload(msg2) + .send(); + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client2.publishes(MqttGlobalPublishFilter.ALL)) { + Mqtt5Publish publish = publishes.receive(); + Assert.assertEquals(publish.getTopic(), MqttTopic.of(topic2)); + Assert.assertEquals(publish.getPayloadAsBytes(), msg2); + } + client1.unsubscribeWith().topicFilter(topic1).send(); + client1.disconnect(); + client2.unsubscribeWith().topicFilter(topic1).send(); + client2.disconnect(); + } +} diff --git a/tests/src/test/resources/mtls/proxy/ca.cer b/tests/src/test/resources/mtls/proxy/ca.cer new file mode 100644 index 000000000..0fd518e1f --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.cer @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDEDCCAfigAwIBAgIUMsRb5hhnGL727b9Lbdj/kVI2H1cwDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMB4XDTI0MDgyMzEwNTcxN1oXDTM0MDgyMTEw +NTcxN1owETEPMA0GA1UEAwwGUk9PVENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAoEpINahEpHzitKLvYJYuEGz1lDrdLqxbV39P2Lkxy9o+4nunNvwH +1dT3e0FBEzNBmA1ply0465MXGoTzljgnugeit2hfAfYTYLoDWZnKXfEz6qs1Oe4/ +K9LanD+W30nzoE5huq9jsoxHX6FDDu7driV/U+ffxGzYEteVAHj6EpwdOdLjW2uV +zDlL5V23NR3sSI23xIbIKng6/U6t8SSX70gCM7HG38KK8LzBDcaz10tixVQ0vtIm +g+xkNoDG1re21e6hI6Q62KNQqzygqEQ7AzbW0fe2M8RdviFGe2UW131FqD0DQ3J8 +C2F5aq6bjWPZc7vr/HQJf4n4od1P9cSg5QIDAQABo2AwXjAdBgNVHQ4EFgQUlwCH +1eZpUIPJWoAaFjn2GpUNiWcwHwYDVR0jBBgwFoAUlwCH1eZpUIPJWoAaFjn2GpUN +iWcwDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAYYwDQYJKoZIhvcNAQELBQAD +ggEBAHcjg7XIVgjKwAq79FVDc+l53YDpw0PpGvL6mI29Wrp2cR2gx0A89hyXGfZD +SxxWmQyx2Z/S+bocaJdJrFIikA1XgHD2LQYjLx2D5ONM8uFcsM1KfPcCAc18D0Y4 +o1lAkC4rk/b/Mv9OKGjtvpwHxDpgoXVBR4yFE9h9dvpOvae8pOPWpeY/P2dEVOo/ +1N7bc8K/yhOjdT/umB1x3xdpTX6mO90tzDSjWerCJTUZgADjHui5N8CHc4RH9+1b +xQNPFBIXgHi/UOYD6ekxoAfbnoAjC3BBKwsICnBDz6qX07uSl/5zbXl06CFWnOQr +O9LMX4bSgPDmi437cnBB0np3NvA= +-----END CERTIFICATE----- diff --git a/tests/src/test/resources/mtls/proxy/ca.crt b/tests/src/test/resources/mtls/proxy/ca.crt new file mode 100644 index 000000000..e0203cd22 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDEDCCAfigAwIBAgIUEJ+TKT+jKJrdq1wAsjhIukssB04wDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMB4XDTI0MDgyMzEwNTA1OFoXDTM0MDgyMTEw +NTA1OFowETEPMA0GA1UEAwwGUk9PVENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAoEpINahEpHzitKLvYJYuEGz1lDrdLqxbV39P2Lkxy9o+4nunNvwH +1dT3e0FBEzNBmA1ply0465MXGoTzljgnugeit2hfAfYTYLoDWZnKXfEz6qs1Oe4/ +K9LanD+W30nzoE5huq9jsoxHX6FDDu7driV/U+ffxGzYEteVAHj6EpwdOdLjW2uV +zDlL5V23NR3sSI23xIbIKng6/U6t8SSX70gCM7HG38KK8LzBDcaz10tixVQ0vtIm +g+xkNoDG1re21e6hI6Q62KNQqzygqEQ7AzbW0fe2M8RdviFGe2UW131FqD0DQ3J8 +C2F5aq6bjWPZc7vr/HQJf4n4od1P9cSg5QIDAQABo2AwXjAdBgNVHQ4EFgQUlwCH +1eZpUIPJWoAaFjn2GpUNiWcwHwYDVR0jBBgwFoAUlwCH1eZpUIPJWoAaFjn2GpUN +iWcwDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAYYwDQYJKoZIhvcNAQELBQAD +ggEBACh6PR76yNLMErrEqZcUhJngOQK0yZR9zjcDoAM4YDSx/qHP/rZA9j2HJdVz +oPAf1rU7QGmMojyQ7sj0jtViWtX9QJFtPcCR8vnwONDyC4hOwNAWLx9TeRp6ZZVL +Y8TN9ydo3jTj4ZjG9rPzQBH3+vErr+WtuIK5H5AvQCzRZPv1r78FBdhA6uQtm6EV +WZ81xJPLQirIDl7yWtCkE+325pOW8J48I6wa3EG/30quXjcaGzlPCAQMGsyjPBvC +m4c8dtEYbYEynMdRDhvgd3OI91L6eeGvEMQvOrLJWtB1zDHZ1xleHmNkBV625UTX +h+Ojtlo/0Nlw3mPABR/B++Rdeo8= +-----END CERTIFICATE----- diff --git a/tests/src/test/resources/mtls/proxy/ca.key b/tests/src/test/resources/mtls/proxy/ca.key new file mode 100644 index 000000000..f846ae7c3 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCgSkg1qESkfOK0 +ou9gli4QbPWUOt0urFtXf0/YuTHL2j7ie6c2/AfV1Pd7QUETM0GYDWmXLTjrkxca +hPOWOCe6B6K3aF8B9hNgugNZmcpd8TPqqzU57j8r0tqcP5bfSfOgTmG6r2OyjEdf +oUMO7t2uJX9T59/EbNgS15UAePoSnB050uNba5XMOUvlXbc1HexIjbfEhsgqeDr9 +Tq3xJJfvSAIzscbfworwvMENxrPXS2LFVDS+0iaD7GQ2gMbWt7bV7qEjpDrYo1Cr +PKCoRDsDNtbR97YzxF2+IUZ7ZRbXfUWoPQNDcnwLYXlqrpuNY9lzu+v8dAl/ifih +3U/1xKDlAgMBAAECggEAANcS1NEqdvA+ofK+rXNsD2L60ImUcjOuEOHWcczasXZV +4QsD39pnUhwZJfi8FhUtMVZUqNmwVC/DrbxnqHBn6OY0WKC/6rs288lVzijrsh1b +B85Y65JPm3Ox+KKusEHreHogsgFMVPm+QAaQ2umumMSSi8aJ2jY11bdyjPuVV/ae +YeFxs7C6Mk9uTb9KvESxjfxk0rOznXX5WGZLy1p4F2o1NX8I1ueVneeR7h5fDlzF +Tedl+BSU86JT1AFdbhsZJhzGShLPILACGQX6Rteim+C0sJyykuHQy+vh6/LKnYbW +j+JUkJWP3Hdk8gPyxwJ2foYh7Hw725RlPY5JlGtaYQKBgQDX4yf4UeuGgtNhGvVX +WshUrO37zlKsMu8FWBTyTfudDVcbcojeU8Kv0vW7PlCfyCvXnFE80/ipL/zedHYf +ZVCaZEPA+rOeCdBgVZ3FY59lw1h1DwQqYybwj/IuSzw3AqQ5f9tzxNG7noxdFX1T +BrY4VHbFYWPEMJJc5v2zGgALkQKBgQC+EprVUvZE6wbPdKFmg++fTKnwDybUrDKH +uB/garImkTqXOzPWVUdiw3XpWXvpCCwWxJ8yb9eaO4GfmCmhgKjgql1w2vxjDVNI +/3xZubeAR3WfAmIUgzB76L2Oz9xvzc1t/rnpvGQNihSGOnEogsXZBvRT+raVMPe+ +DHWu9qXOFQKBgAw2Gh2urI7YOZKljrkZNnmrqm5y1jRNUT3RJKYsCQ5yIbo4uUsy +G7IMUb/8n1zaWriAbAvvxYH0Z+5BUikmdu+0uixhQeWvkmzQivMOVobQDOHaLpcj +MqGq0r0Rnl9SM+3YsJYUzPQ63J+rRoJ6v7Xh+THi91yyjqTYoAMQdm4xAoGBAIYb +WmNpRZkaupNlFvvd2xPqY3ydNCiZ1o0rvFH69feAQHazrr9rLBLjFi6ulF63BWSL +Fkff4Z9QnQSdt8HbpUve6E7YM3svy7OVj4c/IdnAkZy/cbRHW84RSK2au02nR2p0 +b3gbE/z5j8GlOnH60t1tqrYWDvz0r9fHssDgBdyBAoGBAKB8sZCKpyKL66weEz+H +LFAqw+Fm3MdXFLx9tRm/0H0jlZXWviViYvbT1A+gZFMDqlX6Ivqxcxy/W0a3BXMn +9QKYMJFvbOjCI4qh+M/eyEofOtEAh/c5x7g1MYPaE08YaxoFyutP8MJszvWjnsuq +NgdFFE0gyRsHGmJdCDQ5dmVi +-----END PRIVATE KEY----- diff --git a/tests/src/test/resources/mtls/proxy/ca.srl b/tests/src/test/resources/mtls/proxy/ca.srl new file mode 100644 index 000000000..e46db4553 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca.srl @@ -0,0 +1 @@ +1EA9E0336372A6164A1F22BFB7C56CD4EA142F9F diff --git a/tests/src/test/resources/mtls/proxy/ca_config.cnf b/tests/src/test/resources/mtls/proxy/ca_config.cnf new file mode 100644 index 000000000..6c1edb524 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/ca_config.cnf @@ -0,0 +1,15 @@ +[ req ] +default_bits = 2048 +prompt = no +default_md = sha256 +distinguished_name = dn +x509_extensions = v3_ca + +[ dn ] +CN = ROOTCA + +[ v3_ca ] +subjectKeyIdentifier = hash +authorityKeyIdentifier = keyid:always,issuer +basicConstraints = critical,CA:true +keyUsage = digitalSignature, cRLSign, keyCertSign diff --git a/tests/src/test/resources/mtls/proxy/client.cer b/tests/src/test/resources/mtls/proxy/client.cer new file mode 100644 index 000000000..36f70ea7d --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/client.cer @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC9DCCAdygAwIBAgIUHqngM2NyphZKHyK/t8Vs1OoUL58wDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGUk9PVENBMCAXDTI0MDgyMzEwNTgzNloYDzIxMjQwNzMw +MTA1ODM2WjARMQ8wDQYDVQQDDAZDTElFTlQwggEiMA0GCSqGSIb3DQEBAQUAA4IB +DwAwggEKAoIBAQC0S6AnsYUBJJP3ZiOqPdqvjW4nbslKvTYFo2tfLnobOakuSKyD +Iisx9FlUHLFMslL3QZ6mp/U8pmZU6YORWo/f6eElLuGtQPSVxqTEZH0v+GY8N8HA +XshvXmh7CSkGFzbKJ9vyAN2rZ/XrgEP3bBSQZxVSloLaNjV+lXzs3JaULxvQL+EJ +ef38RC//qI7qPflaNveIpnaCpG+Rx8QnFDYeyi8BDS2PdeKwLzj4aXaW5qjoBC3v +qk9/XLqxMXTAX8Ty32E7HrgxhG0mA48gYg2T/Ba8RPykNGvM/cncRgC+B0IJJ1jY +XAgdg/C5Xsz7DveWeezWibripMQN0UrkTjcLAgMBAAGjQjBAMB0GA1UdDgQWBBQY +djFGcYoRFXb7lyZkoFqQJnML1DAfBgNVHSMEGDAWgBSXAIfV5mlQg8lagBoWOfYa +lQ2JZzANBgkqhkiG9w0BAQsFAAOCAQEAiATbrV3/uIuIJQbXBssuVOlTcUWXPAHf +bFGz31NgPnb0za2ApoJ+912orSxSAvlG10g+2Z34iW8+bciH8e1DF+cPQKVg0lkm +jaMQO1cUVw6e2aRHagIMvgEwcz+PqVJoLvWWp5sjqnlafu/ZuXjzeyefuFbxD3kI +EAuSp4juklHrjLZbDulUgGuodKnJ/plzRLKUYoKArdmCAulZRmxcKBw6oYjQWo3A +6lLLHgqaV2g2RuAQFP6qCTMGEWXu4F8ZWJC6vV0zEDMh6QKJdNH1RShbsWlfIxsW +TU6Pswt9EyDIo2Wd72n/sAC8pFxvM8tfhFsAOihHB1XBb2YhtNCORQ== +-----END CERTIFICATE----- diff --git a/tests/src/test/resources/mtls/proxy/client.csr b/tests/src/test/resources/mtls/proxy/client.csr new file mode 100644 index 000000000..0be3c4a35 --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/client.csr @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICVjCCAT4CAQAwETEPMA0GA1UEAwwGQ0xJRU5UMIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEAtEugJ7GFASST92Yjqj3ar41uJ27JSr02BaNrXy56Gzmp +LkisgyIrMfRZVByxTLJS90Gepqf1PKZmVOmDkVqP3+nhJS7hrUD0lcakxGR9L/hm +PDfBwF7Ib15oewkpBhc2yifb8gDdq2f164BD92wUkGcVUpaC2jY1fpV87NyWlC8b +0C/hCXn9/EQv/6iO6j35Wjb3iKZ2gqRvkcfEJxQ2HsovAQ0tj3XisC84+Gl2luao +6AQt76pPf1y6sTF0wF/E8t9hOx64MYRtJgOPIGINk/wWvET8pDRrzP3J3EYAvgdC +CSdY2FwIHYPwuV7M+w73lnns1om64qTEDdFK5E43CwIDAQABoAAwDQYJKoZIhvcN +AQELBQADggEBAIoQFcm5h17AMtuZ2hdERwptzzQ8HxOC6Eb3NpfItRHLDKKMSuD9 +WZPixwz/53uF4jlcN+Uc5AONSKognp70pR3Ku9G47cEBb/iYF20OU6nd6a9+X4wr +r9SwkS/FlNb3A1UM0HuO9pXX3Raq3WR04gSWdJ2S7gbGWJp5vJiHbEwbIqb05lYG +sRdkVlJdGRbPmpVJmPib3wMidbm8K8SzLIDyLyTiMl92z7dBZq83tYAKdZ7D0TEb +k4rKu19Wi+EkOVe9qSdzWlgXLoNxct4rxpqbrWUg8DROtWFyQLA4WvjDzRnxFJgU +fhYb2IQXM1GUhNLGAdJOu8iaV2Dpd7fPGSw= +-----END CERTIFICATE REQUEST----- diff --git a/tests/src/test/resources/mtls/proxy/client.key b/tests/src/test/resources/mtls/proxy/client.key new file mode 100644 index 000000000..7d95fe83d --- /dev/null +++ b/tests/src/test/resources/mtls/proxy/client.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0S6AnsYUBJJP3 +ZiOqPdqvjW4nbslKvTYFo2tfLnobOakuSKyDIisx9FlUHLFMslL3QZ6mp/U8pmZU +6YORWo/f6eElLuGtQPSVxqTEZH0v+GY8N8HAXshvXmh7CSkGFzbKJ9vyAN2rZ/Xr +gEP3bBSQZxVSloLaNjV+lXzs3JaULxvQL+EJef38RC//qI7qPflaNveIpnaCpG+R +x8QnFDYeyi8BDS2PdeKwLzj4aXaW5qjoBC3vqk9/XLqxMXTAX8Ty32E7HrgxhG0m +A48gYg2T/Ba8RPykNGvM/cncRgC+B0IJJ1jYXAgdg/C5Xsz7DveWeezWibripMQN +0UrkTjcLAgMBAAECggEAQ8Q9IU5HGMPf3diFRULUhLGbGrU4caAmwv3GqNL2UG9e +2Ke2N9/K7o7SWJwkRBiuuILwl+F/etlskzPmIOcyNs5YsropVw6YIAe2/J5ss3Ah +NTcb2yuFGN8aVEyAH+rvzBIpSI/swbVkqKzgXwo/vHsSd6Vc75n6h2a2uuy6qF1f +C4Jr1XXD4TBFRxzfdxpJOZlEHlDQ7Qj9csrHNrrN3QUtFzIPV8XYqAxqd/aoRkXL +VnmTZSH2fJvQR1XWdEcvLF9okulczUdTw0wUrYJAEh+rzhiGY/At3A1UVtzwqprZ +hVv866sC6PYY8q/tOCkTphBK5efzZ04dhJTNtKWqyQKBgQDYy7IsDFtDOmTQCL2h +sRO7qd+QBRnaIWNy6dcfo1WWGdsfBF8CpEnENBDaUT1v7NHYxkZdDeVeufXX6PDv +m55Yaz3NXC0kHyhH0PpWIhtOjEzCiyocdohiZalTFG+1sbpp7/XIagU+Vgoynotm +Nd0FipskzQmNbno8QWwZz4FxzQKBgQDU5jEe4TsGYmz6pLiphsIqm/ovndqAJ74G +dV4Q3Lm+qzQ79EAa1StYd05eG9EZWdbwkxgToulO3ANTh5PxgXCwmGYSAzGO2RXO ++/kv6hhBFsxgd8Ve+9AfLx1Wd9e2IJoUkhBwr3BRFIv2MhiV8PSBs4i4kidxUIZP +HDJJNUXUNwKBgQCC9cGClFBI8yxU8wLCevqFoZ9YG4y7VPIDR7jY9szLqIDSYsyW +BvI8oIsRpoOrae51uYhly/Aj4cfdjmyFAYeMt/OUazsll+C4SUf/4giG0X/JAVIF +8aB/eBPqCO1WX69RMVBSqaDTQBxW6akhrCYAo/MGLwm3MuaKIacQjGYQfQKBgC9r +OgObvO7WG1nUOIEhz7t31EioyxMCRxPfLl1pHEH4lgDIjUKsuiPRJvZVEcSouvQI +fzNYdMiovmDrcKs43mWm/A0FAxPDDFV2z/C5Hj/ZGRpfcumOArP/ZXRt6vDY4Bi2 +08yVdtnITsg+LjWvXnZJC6m4e+qEOfYC3LxrjisPAoGAUH1jW7oGLXUY/4YJpbRe +RNuRjX7K8ynJ/ztwsTFPM8AIl0lEdwdi1U6/Fo6CT1ls7jBPASOt8KCmYg9g8us5 +SWhMKvRD6ryWFqpOJUDQq3b7N2illdNW/kACi8aI5Ghup56iZT7yq+J+h0jxlj9y +gbL73lKp6ZOHeGKBOqDJLhI= +-----END PRIVATE KEY----- diff --git a/tests/src/test/resources/mtls/proxy/client.p12 b/tests/src/test/resources/mtls/proxy/client.p12 new file mode 100644 index 0000000000000000000000000000000000000000..bffb931b535ede54286564914188716961e2d2a2 GIT binary patch literal 2531 zcmai$X*3&%8ipkzL=&}dwZvAHqRB{&t(GE`GPWYIlv)NgLKU?Y(IRLu6-%p%+Lzke z)Eac5+8SahDoPP95_|3Bnse{z{G1=(`Of>kKfYh@b3P;iG6e*%BMA@^7=lPJP1u0} zH~|F&NCJoeA)MHEkp%Xoe^Nhr{I7gZIGycBorwR;w{t& z;W1=|D;7s}4c?>6_X2@jY)B}W>wi~)5Dqq^BpBkBU<&YJ2LTWuUc-JX-=}L{%hn@d z0f3Aty;uUfo7*2vrCP$tFg$o+5_YF^cc?&(<97u%as#}yk2v6gwltLk)})xs6MKr} zZQ?{I6x-^Iki|b2iqdz__UI0duBVyrC_MGl7~F$K zWi$vGY$N6RJ}=?j#gskI937uNL#FmS7}?7{7prmAJ-wQ%^J97>j|IlmL@TyHjg?xwA&_cXiet@%MHCNPo5zfZ z&9ks7&IzO8;K}m`Gd*bVdrbRZA&FTUW@D5rL#Jc1whY1`68rK1%;$FK@TGp16N=nw z*yqWlTqlPCm7FA+RM$ie$<286#lw=c_4MUT2Rt&54Z~rq*!`e2T3jLNY0+_&7Ftc= zQ)%>@Aj_lsoR`x+=tRSH&9j%Qi3Joho>SLsaM^;C#zEhiJ(BTneYCGpyP__(lG8ki zHMx1+-<2r2EZe#&6`$E?wUm9AY0g*xb!$j#5F78tU%nn3Q1#IjIbEny++ zf%kjhoGP)5LULySHY3yf^LY^=fiMHPDQoq2NaWfMptjb1Fzob}41u;EC%8K$p87=cldgU_Q3N|@F=n=Rq!Jy2YjB8;ll)OO zF$vX}Q3aF<$n2NtsFuQixoN~i%t|*_O5Y4ooRwXS6tku&-3SN!lR(V%55~a_{%#Z>QBnB(Z@wB!MIKXK()o0Y@~5z!838 zhn!3h_|*Ry0p$h&PeRQT^Wwi1Uk<}vJZH~1Q=97TAw3Jr_}AhmnaE4?X=d>ki|7Yb zral?~5kLdlhw_)gHM>Cd%%q_vla^}P_Wav=(4ymq6U_p?IpN}R_eL1>cRP>HCo$S} zhCF3-Yn7jB`Sl40m~F>fhG*mRXnsMxZ2I|YgORCP3w+FbIUJ|sOmehbu5)T8yd*D6 z4u!sg&6a3{lflZEv$ICe*-E^^>ieL42Z6h# z`=-1c_)i4tUUK_wgLerbo!OG zMb-5tJ_x04@=#u9sy@HLE3lNf&wy8rb(bo{)mzU*IAhq< z>3I_4x6V=){MWjQ$da?3sYE%%Xff>0cbedeV90R{9h;e;73%arq*CB9^M<7I>AUK* z_F8-gYld5>7JO4hD)iJ9^X8tp4;XSCX!M0cnH<*@f zn`PmHS$E>6WvBkbM^uJ`Yc6}SL!0*y<<;IJo+!&tyJ%xAvaHU! zupvlkutgyN)CQ^XhGhD$XLP3n!#6Bfzr!G9mqZ_ovY)*rZ9P;_A=E4bQOQFZp{WXL z3x|8*PsRzyJ#Gvv5cP0|~6PQ1HaAc-c5vOkl)5-~jBEWrql=F6kv2qNJ(jJ9}nBBUXI7@7Uw?V8%;KS!Z<1^S&P`1RrGS+C30jB{ zN*E+q%RqOK+YU>0nwn8H0whse=J`sb0TP9T{d|jn01h?~yk!VN`?5wy6|+lBymp$L h>T~YS50VgzgX$&jZZ$CEL0r@Y*9@zR0>gi!_P% literal 0 HcmV?d00001 diff --git a/tests/src/test/resources/mtls/proxy/clientkeystore.jks b/tests/src/test/resources/mtls/proxy/clientkeystore.jks new file mode 100644 index 0000000000000000000000000000000000000000..a5033dc9f1398d5f4688358f9fa468055382d371 GIT binary patch literal 2107 zcmV-B2*mgP?f&fm0006200031000310Wkmo0g-;=;!^+s1pzRE1pP1$1_~<%0R#am z0uccL1pows1nK|En&-Vfe2Ttek-fKVo^60LgCwlDfuc`6D;roGbzmJ~l>v!?!Cum# zYu`Qh<5dq4C>;xEkIi2_SF0R7ChXI3doZc0?e3qSD?C4R*hFVpN$yui<$^ZCfWj}v zZD1*;qDy4MUc@hcwYdm|{L(JXU}Tvd`tTU1VBKV@;~_ZKmyGn-K5eK%(=$%^j)+Sg z)OoB4*wWj)e-|4J`9ul=H}AM@^d7{)#osH9lMBe~_{+l{`u_kz_Fc=o_FKl$vF-1}QCxFKkJg zRF(gMD6%9DUYth+^G~CV+dv6-0{n0TaSK5rB<(6Q!%8jAF&8+G!dMtdyg6~dr*ed! zU~>X@xW~ytQ8y`4_~*1_%Vtow&j)XEKQ!|Ou}o{0Ug7^=CS!8Bb)Va!f5mYZsB?0Zl*4Ki z66`rDfn9g&FeS%4r+(}lY%;AE{12dOlX+t_j*VXY7S@F&hvBE_P}r#c4JAWmpGDva zs%-4>5o$`(q|)iL(BCkKN0Le<=hJr*?_YMujK@KBkfOG+FFWDx%*Ow{L*VrqlHF8- zibhI}tL(D5?4OggI~$T9#+1sVKkwMCt^fT2_i-R__)DQnQ!T75esVaw&dk@$q((oCx3Z{3=U zeUHPkCHHS~II3|jfS{;}MyzYBGHmjlCu}5e)++s5GXLIV0cBcM3xIP&N9sEH76EBkqY)-GUD7IuJvdt~H|rHllqB=~+1- ze8PXPM=kb)0}LQYblgDYE!+IzypLoSR$=$R5iqvu-NVRE1HkQ@>9ropmr4Rki~#IL z^|dIC%;RCz-zs>;NW>3A=vk^`a~u#Qz~H~OeeK5kv%#2zgO&_kg?kc>*PQ$7V3r$r z*xDXv^96BhAXp9P6Dh*!L!g+vtf&z*U1{Hd>{tptB#a|~AU)HD;kvaXxbJGKeXkF( z14Pj_91fP3!zD*$))lpldrU5pDIy{JYuAaDp0BVH7Ns<8vGsx{@dxvh73bXkjJ{(G-aQI?ya}ccWKR*=IPg@%S)xVuiiyjXtL~LXihV) zsoWCWK%RFxm%iQFQ`L;^W0PrGsG}oYD+1r}buyTOGI3+*%IV{YL%s8Hb-F0n;_hF$ zIy?!LC9WfenEj5A`=#)KV7B4pRm?r*J}}*FVmnLwS=kO;nW6mKsy(pZ>8uwv_Y80~ zYl={^C+W|^){ZK|xQY=qI3c+l8n_H)#~w1qb(J?=LjD1-Q_$$;HM&(5z0O6}-bD%a z<;K@hi2X(@A7HhD0qs?YThfjI000311z0XMFgXAK0{Ae30`xF~0oC$WVAB$M}MBdR^xuZ?ae zZpli$HU*<=UoLtZIjJs4tb-yeG4xqf9I;HYQujfgrl<8hre;*>gOOT~-|68cF5#^} z^p(b>#AJOh_+~se!N6X~Z(eA72`L5_Hp(a4@&MhdXZ7oVL-%YHkY^QAmV(+gHGY+R z?A(@=FB{M=;R$*D{6sJRsE+DA`C2yjh^BUeq;HYO#3vLs9?CBP4K0s#;;=6`_-S^Q z=BVfdE$^yNe_Xn;F?7IR#PZ)^J07?(gl#4Rk04?Vll&IEMEs;QYs~%0+(rPt2SN!a zSlC<$9fR<>Ud;Or_m+9=)`_~}q{I!;O5{#A3jzZH00E;yFhDRJ1_M&LNQU!7LaYdIr0pDyPStgC65Ia{CR}P-pq(^E3 z488#{b3c!%QfMyqmZzH|s(D&|@7cL{^LrTqqnmp@`60jlk+s%N`1vS7sTXMnVz`PTOe(;M=Sg#CY4K~PozOacP+PB91iv4DZfU|!DVhp38f4?=N*T<*4Z z;PFH(i}$*(M|3dWZ6)9Y`zvzv1CG^0Q<;SDMdqa3gnBMHBZY&BgzBE(uLd`>FP360 zZ3z_PaZkl-ZnK|5Yv9(8WAYm<0{jXV|E>-b@$3E}|CTDIc|zpAT(-oDikT9dt(&S< z@Qk~XP@W#rZFIm-R{KtzD0%Fv;oXBNlCUuz=g6~XJb58-SdqqwR~D_k7H4@yk?UTa z6)oBD6JFT;YX_R7o8#sF$VaHih~o#B@5GFD``jTww56Xg-TASwGsxvJTaF99${TBt zM7$N|(?HE~=+{$yY%VEBLt^)o*kxuA1tV3%o!|MD)_roe@``xo00aS>?Y+L)HLnjo z)3p4ygpI_Wnmc16nCgZVJ~CaHvbfat&aWct(dI>+AZk$VsSv_^SwM$!Z-|w;bv4=^ zkmV1$LE2iKoaG&A`WDs2mXw>Kwc7F_ED;suLKnLG{su~7hhZ*lx^QWge~3S(`^?2X znqb{yON&}G+s*+On{I4zRk-v>#@TUnkN1HE;O+bNt9RDb*>1IVoSARa>C|V z=;&X5LB9lD+uVzGq36=0D7RGEb#Gdd)XN{aE+-B$6J#-MvdgBFpQf!8&MC533A4HI zjc}y%>?F1$PLyIlG!{tJ}vp*z2X0 z89Sz7+7Spv!3Zfb=!Agq5B06&e@? zoSop~j!9Y#d}uvd?rVEWQzDW380A}G3S}t>W+xLNS$*&c3d6Fd|LX0(AYi=>#r+p8C5_9my5W(EltxKY8o6(yjHfY%4+Q z*+JzTFcv&2(+Dk*L0dE>RQixv1ls;Y-%B-u)!E88m2Sw^-gR#LNZJnLf^K2#{PuCOvlf z7t(jFjvB9;%x+Sa@mfbuSzIW+O1~>P{Q~AHwIN;VCObXuTWOzu;*GU5cfdh_h{fKR zjSSgPqqphAhbMN_uFmfHi~>~_j5}v0Ea^)JC9~Hxg0A~(enV%X-aPVAflDQ0sG@Iff8W^_ zJVr%5#W{IWT52Kou@~JO`)#+uO>;L}%ZEcpv>ptS-S+cT#g&|;+|^nseP%0BQfsw# z!146;w#wICSMiNo^liYGE3j7&Hg?ry)B}zoIF$uIQRn`QnS!T;fyNN zZ=z9Ygx#Jb_=Or^&C7G;bLYfr4OdyEV_#Q&l#c%UM(9J1K7m>g>XWbDZoiXPNvbiZ zaDYx*R}InEc_i1DW{gnZ?qmv4|Eh4{ql6H~cG5u|9)se3hvqCXp@$jI8_|Iqnv$_Y zR!?=F0oD(4?liaBREcqEH(idXHYawA={?^9e`9R!$Lk5jHhh`? zJiP0}Brv54N^v1_=w^=tR?rlzCw<@?8h1)*9p3&FeUcw|AUjBn6{p_J7vv6$f4hPi*Cc7a|tk zpRQm?PS2e^749kS#vF-z(01`OB>PV!7z-@LAd+Y5=7o_0K`MZSvm@}AqaOX0muC!= z^Ubjnk+YH^m!J+;e9hpmDwtp-I6F^!NHUtgYxuBpB^A=VIwtPlI}kYRWg}xSw~CT9 zxH*mHF4T@>D*3R!ZSit#B{nO%(5V^}c7wX+u}xb`onoaS9)5Duoj(y>>Z{lAw-v4o zmgsjzE=kc94mZ@SUL)oa+zPv4HOL+R05Z^@B&7X;AniQfP=_!041w5CUbC?$ugXuV zG~9~V<9NO0t05eK^Z;!`pX5GuH~9h;J6EQ*12pTh(K?GzkMmV1KdU`8-FjQiJH1Ol=Gz|t<#xEeWUWv3`!hDB3llhIu) f^t{x#_@Vvi)6$|0jUX_)^ literal 0 HcmV?d00001 diff --git a/tests/src/test/resources/mtls/proxy/serverkeystore.jks b/tests/src/test/resources/mtls/proxy/serverkeystore.jks new file mode 100644 index 0000000000000000000000000000000000000000..6f9e628d3b89b9a51570da16dbe72041747a2d45 GIT binary patch literal 2110 zcmV-E2*LOM?f&fm0006200031000310Wkmo0g-;|TK50|1pzRE1pP1$1_~<%0R#am z0uccL1pows1nKMpalwLwn?O=BDL^xNG|BX<&4hL1nh!;kT!wCAp{|Dk9jbuQnl#f+1J zB|*E?@*{;Zo!&gI)tOG(M9afw>7M%>WppXNxB0tizR~ceW*^gw z49^kFRmN7xC>Wu$1k}T9pe0#W-s2-+h~`|S$Wx$dd#K|Dr7<$j|Jp1*#a}VJkP}?g zp(noBqpd;N@pts8HgCf34?QFGYU;j?TUYIkwrX@uM!3ZB}@}@HW<~-hP@+d&-)#Cjlm~$seGV8)lH^Jhcmr@Dm0l5ow26HH|_Q{ogH4gy~x90vGwA1qkzLaUxi>v z25P6aYt50tNI#IK9vcHCQ)L}vbYZO%hM@_{A*WsYnX=fRUu-flIJ*s!_BYq9->b&x zLQ#Qtf>oceuV$M=3WAvn76~8I4_KR27V!ae_+;rBJq#mlL!2b;paH!&at9_G(J}7j zj_33m{<6Vkc!Q!vFg*f(&&F6jHfPPzbH9h9m3DPU*Pk~l(r|QbW2EAfVWmZXaixY<2iVc zS|Z7yk?}SC3$ey2OmYlUUN2o_F8Xn0@0jZ%%*%r6N7fqXX<6L?@QejCUwOO?+-W^r zM_I0y9RU3lP~fG}2p+H5+l`zhm=~H-(t7p0Vt^@CK~S?Y2)5~uY3Vu@FhcJ4SV)xn z;!xcQYc4Bj?l18c=-ZCI+wM*}=D++uk7 zB2sePJFb~K_zN(m6L9&5vmcStQT96m{zBYanRTqHlnK8EEAu$O`+eZzQ$K2iygZ*g>njybSJDb|yVE4y7}O zq56grujnfn0gvPI?ie-sE{LomZ(ZUMI3q(_&C)tE?R244PFrE(zuad{VB6gc7=7~j zM_B^mzcpRInlkLusdqQhi?q=;h7hMqau~^CU6eOGAp+DiefmaR?MED@Wu@FM1O`ja ztgC>C+~G%NC|;C~{j+^0RxR=X000311z0XMFgXAK0{bw60{1Y20pFkl0s#U76dtMI zGh=e57D^u?zqiF~)an#3o-hpt2`Yw2hW8Bt0Sg5HFcC2iFbxI+RRjYJ22xK?R6{{9 zAQufXG%z?aGchnVH!?R`7!NWrGBhwZGcYkQH8(OhS}+tb5-<=3162eA3<+#+V_|G) zZ*z1of&n5h4F(A+hDe6@4FLfG1potr0S^E$f&mHwf&l>ljw~!ps5;O>zkx7M&!&bY zhLXj?E|@n&oeoivQSq5-SJal1T+uBbtga5Diq*!Dk5*QIL5Q%^%kPZtCfpxuWvH`{ zeio?jwl%6SU-Q54=RcsmR=^j0aBYiw&TMCc^Z&$3LF4dFQOoF-mrrFC(Nl#Nf_R$8 zZ%c?F(wmyY!Sy`>G~mYf%bK^p+pr~NeRb>g0+GWzqK&xQ zQGA&7Iqa3U?qkwA(K!`j5epnoLlNmvjvdx9n*hEZiUX>`Vn|++B8Jq&sicN$t2392 zQn3&LNQU+thDZTr0|Wso1P}wxK*9T_eSm?{SSAe{n`~y>md*&E!+`gQZ~XB)toxkb5J? zjefnoMHjD2%mY8EG@ch<6O9$ksX?iV-a!3vrys+u9902*BnTHnNZ_`|ZG5Es(iuQy zxv>y$gh6!Hv!hMno^@c{t=()lWuW6u=<~H{ML;tyCJ9FWLUt6MFeJAt{qEz}vetK> z86&s!Z5N1ymh&CZP4?0jV;CwFTydXDdM6{BSErMZ!AyI8WzFL^hN}=w5jz5XcWrj^ zTo?hSLRY?i@1fd8i7tSNsI_Y76glGvU@)9!oZ6u2l=oK$uUy-ss9(?8xwE?UQ2<5- ztQ{xPC2TR=3!Desx_7xG*3r;r%C(U#47im$ylQ??IZz#4F7}&Fo{2_YlDbK&;mPY~c zS%R&v%hzd+*R4UNxjWBXkrxfQhZceAuC9goXk^S|{WG!C=su-tYEx z)~0kcM@iB5VlsVAFk%0T{)VcASc1`8D9xwyQu+rSusbxS3F;|>q_bx`Ll9(+8>@Gaj!T7N^uZr6Z6ltSk=n8P~B&B5B3iOKk)p3Ap7{%1obcuGm2h>KBl!&=~avQR__ZQF`Z z#OA6b%_D2+UUZ$$y<`s%XX_I5BJ8{7e2yqhBAuDfF?dK=;%(SyNzIKO>ID?IEn@1d zc)Qz#8A4`Nd3@nIt;T~L_|%;|;pe=Ms=)3J%h~~;mtC6M+G8u|%9_Z`{ug^^d{-l~A9Z_#zJuGOpZ0kK`xc+4vNt+dgmh_GJEGl4;{7Q`@3FflL< z1_@w>NC9O71OfpC00ba)a%4cwqMyui63--R)KVqhDM4b`j+|Y=cbV}rm+Vjk6acCy YVfGr#=%_Z{n_~4?*xk_CWy5XkBdssI20 literal 0 HcmV?d00001