diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java index d96bf6c1..5e10edec 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTCommonConfiguration.java @@ -17,6 +17,7 @@ import com.google.common.collect.Sets; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; import lombok.Getter; import lombok.Setter; @@ -376,6 +377,12 @@ public class MQTTCommonConfiguration extends ServiceConfiguration { ) private boolean systemEventEnabled = true; + @FieldContext( + category = CATEGORY_MQTT_PROXY, + doc = "Enable web http service." + ) + private Optional mopWebServicePort = Optional.of(5680); + public long getMqttTlsCertRefreshCheckDurationSec() { if (mqttTlsCertRefreshCheckDurationSec != 300) { return mqttTlsCertRefreshCheckDurationSec; diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java index ef901c80..dfe71ea6 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java @@ -14,12 +14,15 @@ package io.streamnative.pulsar.handlers.mqtt.common; import static io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventType.CONNECT; +import static io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventType.DISCONNECT; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.concurrent.DefaultThreadFactory; import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.ConnectEvent; import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventListener; import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.MqttEvent; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -33,7 +36,9 @@ @Slf4j public class MQTTConnectionManager { - private final ConcurrentMap connections; + private final ConcurrentMap localConnections; + + private final ConcurrentMap eventConnections; @Getter private static final HashedWheelTimer sessionExpireInterval = @@ -41,18 +46,23 @@ public class MQTTConnectionManager { new DefaultThreadFactory("session-expire-interval"), 1, TimeUnit.SECONDS); @Getter - private final EventListener eventListener; + private final EventListener connectListener; + + @Getter + private final EventListener disconnectListener; private final String advertisedAddress; public MQTTConnectionManager(String advertisedAddress) { this.advertisedAddress = advertisedAddress; - this.connections = new ConcurrentHashMap<>(2048); - this.eventListener = new ConnectEventListener(); + this.localConnections = new ConcurrentHashMap<>(2048); + this.eventConnections = new ConcurrentHashMap<>(2048); + this.connectListener = new ConnectEventListener(); + this.disconnectListener = new DisconnectEventListener(); } public void addConnection(Connection connection) { - Connection existing = connections.put(connection.getClientId(), connection); + Connection existing = localConnections.put(connection.getClientId(), connection); if (existing != null) { log.warn("The clientId is existed. Close existing connection. CId={}", existing.getClientId()); existing.disconnect(); @@ -68,7 +78,7 @@ public void addConnection(Connection connection) { */ public void newSessionExpireInterval(Consumer task, String clientId, int interval) { sessionExpireInterval.newTimeout(timeout -> { - Connection connection = connections.get(clientId); + Connection connection = localConnections.get(clientId); if (connection != null && connection.getState() != Connection.ConnectionState.DISCONNECTED) { return; @@ -80,16 +90,28 @@ public void newSessionExpireInterval(Consumer task, String clientId, in // Must use connections.remove(key, value). public void removeConnection(Connection connection) { if (connection != null) { - connections.remove(connection.getClientId(), connection); + localConnections.remove(connection.getClientId(), connection); } } public Connection getConnection(String clientId) { - return connections.get(clientId); + return localConnections.get(clientId); + } + + public Collection getLocalConnections() { + return this.localConnections.values(); + } + + public Collection getAllConnections() { + Collection connections = new ArrayList<>(this.localConnections.values().size() + + this.eventConnections.values().size()); + connections.addAll(this.localConnections.values()); + connections.addAll(eventConnections.values()); + return connections; } public void close() { - connections.values().forEach(connection -> connection.getChannel().close()); + localConnections.values().forEach(connection -> connection.getChannel().close()); } class ConnectEventListener implements EventListener { @@ -103,9 +125,25 @@ public void onChange(MqttEvent event) { if (connection != null) { log.warn("[ConnectEvent] close existing connection : {}", connection); connection.disconnect(); + } else { + eventConnections.put(connectEvent.getClientId(), connection); } } } } } + + //TODO + class DisconnectEventListener implements EventListener { + + @Override + public void onChange(MqttEvent event) { + if (event.getEventType() == DISCONNECT) { + ConnectEvent connectEvent = (ConnectEvent) event.getSourceEvent(); + if (!connectEvent.getAddress().equals(advertisedAddress)) { + eventConnections.remove(connectEvent.getClientId()); + } + } + } + } } diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/EventType.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/EventType.java index 92d3f940..e5b27350 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/EventType.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/EventType.java @@ -15,6 +15,7 @@ public enum EventType { CONNECT, + DISCONNECT, LAST_WILL_MESSAGE, RETAINED_MESSAGE, ADD_PSK_IDENTITY; diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java index 7651a046..402f2bbf 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyService.java @@ -35,6 +35,7 @@ import io.streamnative.pulsar.handlers.mqtt.proxy.handler.LookupHandler; import io.streamnative.pulsar.handlers.mqtt.proxy.handler.PulsarServiceLookupHandler; import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException; +import io.streamnative.pulsar.handlers.mqtt.proxy.web.WebService; import java.io.Closeable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -74,6 +75,7 @@ public class MQTTProxyService implements Closeable { private Channel listenChannelTlsPsk; private final EventLoopGroup acceptorGroup; private final EventLoopGroup workerGroup; + private final WebService webService; private DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("mqtt-redirect-acceptor"); private DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("mqtt-redirect-io"); @@ -101,7 +103,8 @@ public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration prox this.eventService = proxyConfig.isSystemEventEnabled() ? new SystemTopicBasedSystemEventService(pulsarService) : new DisabledSystemEventService(); - this.eventService.addListener(connectionManager.getEventListener()); + this.eventService.addListener(connectionManager.getConnectListener()); + this.eventService.addListener(connectionManager.getDisconnectListener()); this.eventService.addListener(new RetainedMessageHandler(eventService).getEventListener()); this.acceptorGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getMqttProxyNumAcceptorThreads(), false, acceptorThreadFactory); @@ -112,6 +115,7 @@ public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration prox this.proxyAdapter = new MQTTProxyAdapter(this); this.sslContextRefresher = Executors.newSingleThreadScheduledExecutor( new DefaultThreadFactory("mop-proxy-ssl-context-refresher")); + this.webService = new WebService(this); } private void configValid(MQTTProxyConfiguration proxyConfig) { @@ -168,6 +172,7 @@ public void start() throws MQTTProxyException { } this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig); this.eventService.start(); + this.webService.start(); } public void start0() throws MQTTProxyException { diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java index d7f138d3..ed68c595 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java @@ -280,7 +280,11 @@ public void processDisconnect(final MqttAdapterMessage msg) { @Override public void processConnectionLost() { if (log.isDebugEnabled()) { - log.debug("[Proxy Connection Lost] [{}] ", connection.getClientId()); + String clientId = "unknown"; + if (connection != null) { + clientId = connection.getClientId(); + } + log.debug("[Proxy Connection Lost] [{}] ", clientId); } autoSubscribeHandler.close(); if (connection != null) { diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java new file mode 100644 index 00000000..70a3e3c1 --- /dev/null +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java @@ -0,0 +1,270 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.proxy.web; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.jetty.JettyStatisticsCollector; +import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; +import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; +import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; +import lombok.Getter; +import org.apache.pulsar.broker.web.DynamicSkipUnknownPropertyHandler; +import org.apache.pulsar.broker.web.GzipHandlerUtil; +import org.apache.pulsar.broker.web.JettyRequestLogFactory; +import org.apache.pulsar.broker.web.JsonMapperProvider; +import org.apache.pulsar.broker.web.UnrecognizedPropertyExceptionMapper; +import org.apache.pulsar.broker.web.WebExecutorThreadPool; +import org.apache.pulsar.common.util.PulsarSslFactory; +import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.ConnectionLimit; +import org.eclipse.jetty.server.ForwardedRequestCustomizer; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.ProxyConnectionFactory; +import org.eclipse.jetty.server.RequestLog; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.server.handler.RequestLogHandler; +import org.eclipse.jetty.server.handler.ResourceHandler; +import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.resource.Resource; +import org.glassfish.jersey.media.multipart.MultiPartFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Web Service embedded into MoP Proxy. + */ +public class WebService implements AutoCloseable { + + private static final String MATCH_ALL = "/*"; + + public static final String ATTRIBUTE_PROXY_NAME = "mop-proxy"; + public static final String HANDLER_CACHE_CONTROL = "max-age=3600"; + + private final Server server; + private final List handlers; + + private final WebExecutorThreadPool webServiceExecutor; + + private final ServerConnector httpConnector; + private JettyStatisticsCollector jettyStatisticsCollector; + private PulsarSslFactory sslFactory; + private ScheduledFuture sslContextRefreshTask; + private final MQTTCommonConfiguration config; + private final MQTTProxyService proxyService; + + @Getter + private static final DynamicSkipUnknownPropertyHandler sharedUnknownPropertyHandler = + new DynamicSkipUnknownPropertyHandler(); + + public void updateHttpRequestsFailOnUnknownPropertiesEnabled(boolean httpRequestsFailOnUnknownPropertiesEnabled){ + sharedUnknownPropertyHandler + .setSkipUnknownProperty(!httpRequestsFailOnUnknownPropertiesEnabled); + } + + public WebService(MQTTProxyService proxyService) { + this.handlers = new ArrayList<>(); + this.config = proxyService.getProxyConfig(); + this.proxyService = proxyService; + this.webServiceExecutor = new WebExecutorThreadPool( + config.getNumHttpServerThreads(), + "mop-web", + config.getHttpServerThreadPoolQueueSize()); + this.server = new Server(webServiceExecutor); + if (config.getMaxHttpServerConnections() > 0) { + server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server)); + } + List connectors = new ArrayList<>(); + + Optional port = config.getMopWebServicePort(); + HttpConfiguration httpConfig = new HttpConfiguration(); + if (config.isWebServiceTrustXForwardedFor()) { + httpConfig.addCustomizer(new ForwardedRequestCustomizer()); + } + httpConfig.setRequestHeaderSize(config.getHttpMaxRequestHeaderSize()); + HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig); + if (port.isPresent()) { + List connectionFactories = new ArrayList<>(); + if (config.isWebServiceHaProxyProtocolEnabled()) { + connectionFactories.add(new ProxyConnectionFactory()); + } + connectionFactories.add(httpConnectionFactory); + httpConnector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0])); + httpConnector.setPort(port.get()); + httpConnector.setHost(config.getBindAddress()); + connectors.add(httpConnector); + } else { + httpConnector = null; + } + + // Limit number of concurrent HTTP connections to avoid getting out of file descriptors + connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); + server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); + + // Whether to reject requests with unknown attributes. + sharedUnknownPropertyHandler.setSkipUnknownProperty(!config.isHttpRequestsFailOnUnknownPropertiesEnabled()); + + addWebServerHandlers(); + } + + public void addRestResources(String basePath, boolean requiresAuthentication, Map attributeMap, + boolean useSharedJsonMapperProvider, String... javaPackages) { + ResourceConfig config = new ResourceConfig(); + for (String javaPackage : javaPackages) { + config.packages(false, javaPackage); + } + addResourceServlet(basePath, requiresAuthentication, attributeMap, config, useSharedJsonMapperProvider); + } + + public void addRestResource(String basePath, boolean requiresAuthentication, Map attributeMap, + boolean useSharedJsonMapperProvider, Class... resourceClasses) { + ResourceConfig config = new ResourceConfig(); + for (Class resourceClass : resourceClasses) { + config.register(resourceClass); + } + addResourceServlet(basePath, requiresAuthentication, attributeMap, config, useSharedJsonMapperProvider); + } + + private void addResourceServlet(String basePath, boolean requiresAuthentication, Map attributeMap, + ResourceConfig config, boolean useSharedJsonMapperProvider) { + if (useSharedJsonMapperProvider){ + JsonMapperProvider jsonMapperProvider = new JsonMapperProvider(sharedUnknownPropertyHandler); + config.register(jsonMapperProvider); + config.register(UnrecognizedPropertyExceptionMapper.class); + } else { + config.register(JsonMapperProvider.class); + } + config.register(MultiPartFeature.class); + ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); + servletHolder.setAsyncSupported(true); + addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); + } + + public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, + Map attributeMap) { + ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); + // Notice: each context path should be unique, but there's nothing here to verify that + servletContextHandler.setContextPath(path); + servletContextHandler.addServlet(servletHolder, MATCH_ALL); + if (attributeMap != null) { + attributeMap.forEach(servletContextHandler::setAttribute); + } + handlers.add(servletContextHandler); + } + + public void addStaticResources(String basePath, String resourcePath) { + ContextHandler capHandler = new ContextHandler(); + capHandler.setContextPath(basePath); + ResourceHandler resHandler = new ResourceHandler(); + resHandler.setBaseResource(Resource.newClassPathResource(resourcePath)); + resHandler.setEtags(true); + resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL); + capHandler.setHandler(resHandler); + handlers.add(capHandler); + } + + public void start() throws MQTTProxyException { + try { + RequestLogHandler requestLogHandler = new RequestLogHandler(); + RequestLog requestLogger = JettyRequestLogFactory.createRequestLogger(false, server); + requestLogHandler.setRequestLog(requestLogger); + handlers.add(0, new ContextHandlerCollection()); + handlers.add(requestLogHandler); + + ContextHandlerCollection contexts = new ContextHandlerCollection(); + contexts.setHandlers(handlers.toArray(new Handler[handlers.size()])); + + Handler handlerForContexts = GzipHandlerUtil.wrapWithGzipHandler(contexts, + config.getHttpServerGzipCompressionExcludedPaths()); + HandlerCollection handlerCollection = new HandlerCollection(); + handlerCollection.setHandlers(new Handler[] {handlerForContexts, new DefaultHandler(), requestLogHandler}); + + // Metrics handler + StatisticsHandler stats = new StatisticsHandler(); + stats.setHandler(handlerCollection); + try { + jettyStatisticsCollector = new JettyStatisticsCollector(stats); + jettyStatisticsCollector.register(); + } catch (IllegalArgumentException e) { + // Already registered. Eg: in unit tests + } + + server.setHandler(stats); + server.start(); + + if (httpConnector != null) { + log.info("MoP HTTP Service started at http://{}:{}", httpConnector.getHost(), + httpConnector.getLocalPort()); + } else { + log.info("MoP HTTP Service disabled"); + } + + } catch (Exception e) { + throw new MQTTProxyException(e); + } + } + + private void addWebServerHandlers() { + Map attributeMap = new HashMap<>(); + attributeMap.put(ATTRIBUTE_PROXY_NAME, proxyService); + + addRestResources("/admin", + true, attributeMap, false, + "io.streamnative.pulsar.handlers.mqtt.proxy.web.admin"); + } + + @Override + public void close() throws MQTTProxyException { + try { + server.stop(); + // unregister statistics from Prometheus client's default CollectorRegistry singleton + // to prevent memory leaks in tests + if (jettyStatisticsCollector != null) { + try { + CollectorRegistry.defaultRegistry.unregister(jettyStatisticsCollector); + } catch (Exception e) { + // ignore any exception happening in unregister + // exception will be thrown for 2. instance of WebService in tests since + // the register supports a single JettyStatisticsCollector + } + jettyStatisticsCollector = null; + } + webServiceExecutor.join(); + if (this.sslContextRefreshTask != null) { + this.sslContextRefreshTask.cancel(true); + } + log.info("Web service closed"); + } catch (Exception e) { + throw new MQTTProxyException(e); + } + } + + private static final Logger log = LoggerFactory.getLogger(WebService.class); +} diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java new file mode 100644 index 00000000..a1fb9f5c --- /dev/null +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java @@ -0,0 +1,59 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.proxy.web.admin; + +import io.streamnative.pulsar.handlers.mqtt.common.Connection; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.MediaType; +import org.apache.pulsar.broker.web.RestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +@Path("/devices") +@Produces(MediaType.APPLICATION_JSON) +@Api(value = "/devices", tags = "devices") +public class Devices extends WebResource { + + @GET + @Path("/list") + @ApiOperation(value = "List of connected devices.", + response = String.class, responseContainer = "List") + @ApiResponses(value = { + @ApiResponse(code = 500, message = "Internal server error")}) + public void getList(@Suspended final AsyncResponse asyncResponse) { + try { + final Collection allConnections = service().getConnectionManager().getAllConnections(); + asyncResponse.resume(allConnections.stream().map(e -> + e.getClientId()).collect(Collectors.toList())); + } catch (Exception e) { + log.error("[{}] Failed to list devices {}", clientAppId(), e); + asyncResponse.resume(new RestException(e)); + } + + } + + private static final Logger log = LoggerFactory.getLogger(Devices.class); +} diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java new file mode 100644 index 00000000..5dd43918 --- /dev/null +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/WebResource.java @@ -0,0 +1,149 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.proxy.web.admin; + +import static java.util.concurrent.TimeUnit.SECONDS; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; +import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; +import io.streamnative.pulsar.handlers.mqtt.proxy.web.WebService; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriInfo; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.broker.web.PulsarWebResource; +import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.PulsarServiceNameResolver; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for Web resources in Pulsar. It provides basic authorization functions. + */ +public abstract class WebResource { + + private static final Logger log = LoggerFactory.getLogger(PulsarWebResource.class); + + private static final LoadingCache SERVICE_NAME_RESOLVER_CACHE = + Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build( + new CacheLoader<>() { + @Override + public @Nullable PulsarServiceNameResolver load(@NonNull String serviceUrl) throws Exception { + PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver(); + serviceNameResolver.updateServiceUrl(serviceUrl); + return serviceNameResolver; + } + }); + + static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal"; + + @Context + protected ServletContext servletContext; + + @Context + protected HttpServletRequest httpRequest; + + @Context + protected UriInfo uri; + + private MQTTProxyService service; + + protected MQTTProxyService service() { + if (service == null) { + service = (MQTTProxyService) servletContext.getAttribute(WebService.ATTRIBUTE_PROXY_NAME); + } + return service; + } + + protected MQTTCommonConfiguration config() { + return service().getProxyConfig(); + } + + /** + * Gets a caller id (IP + role). + * + * @return the web service caller identification + */ + public String clientAppId() { + return (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName); + } + + public String originalPrincipal() { + return httpRequest.getHeader(ORIGINAL_PRINCIPAL_HEADER); + } + + public AuthenticationDataSource clientAuthData() { + return (AuthenticationDataSource) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName); + } + + public boolean isRequestHttps() { + return "https".equalsIgnoreCase(httpRequest.getScheme()); + } + + public static boolean isClientAuthenticated(String appId) { + return appId != null; + } + + + public T sync(Supplier> supplier) { + try { + return supplier.get().get(config().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (ExecutionException | TimeoutException ex) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof WebApplicationException) { + throw (WebApplicationException) realCause; + } else { + throw new RestException(realCause); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RestException(ex); + } + } + + protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) { + Throwable realCause = FutureUtil.unwrapCompletionException(exception); + if (realCause instanceof WebApplicationException) { + asyncResponse.resume(realCause); + } else if (realCause instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(Status.CONFLICT, realCause)); + } else if (realCause instanceof MetadataStoreException.NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, realCause)); + } else if (realCause instanceof MetadataStoreException.BadVersionException) { + asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); + } else if (realCause instanceof PulsarAdminException) { + asyncResponse.resume(new RestException(((PulsarAdminException) realCause))); + } else { + asyncResponse.resume(new RestException(realCause)); + } + } +} diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/package-info.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/package-info.java new file mode 100644 index 00000000..938733ee --- /dev/null +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package info. + */ +package io.streamnative.pulsar.handlers.mqtt.proxy.web.admin; diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/package-info.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/package-info.java new file mode 100644 index 00000000..96117fd8 --- /dev/null +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package info. + */ +package io.streamnative.pulsar.handlers.mqtt.proxy.web; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java index 05a10971..80fd1f21 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java @@ -116,6 +116,8 @@ public abstract class MQTTProtocolHandlerTestBase { @Getter protected List mqttProxyPortList = new ArrayList<>(); @Getter + protected List mqttProxyHttpPortList = new ArrayList<>(); + @Getter protected List mqttProxyPortTlsList = new ArrayList<>(); @Getter protected List mqttProxyPortTlsPskList = new ArrayList<>(); @@ -288,6 +290,7 @@ protected void stopBroker() throws Exception { mqttBrokerPortTlsList.clear(); mqttBrokerPortTlsPskList.clear(); mqttProxyPortList.clear(); + mqttProxyHttpPortList.clear(); mqttProxyPortTlsList.clear(); mqttProxyPortTlsPskList.clear(); } @@ -304,6 +307,7 @@ public void stopBroker(int brokerIndex) throws Exception { mqttProxyPortList.remove(brokerIndex); mqttProxyPortTlsList.remove(brokerIndex); mqttProxyPortTlsPskList.remove(brokerIndex); + mqttProxyHttpPortList.remove(brokerIndex); } protected void startBroker() throws Exception { @@ -349,10 +353,13 @@ protected void startBroker(MQTTCommonConfiguration conf) throws Exception { int mqttProxyPort = -1; int mqttProxyTlsPort = -1; int mqttProxyTlsPskPort = -1; + int mqttHttpPort = -1; if (conf.isMqttProxyEnabled()) { mqttProxyPort = PortManager.nextFreePort(); + mqttHttpPort = PortManager.nextFreePort(); conf.setMqttProxyPort(mqttProxyPort); mqttProxyPortList.add(mqttProxyPort); + mqttProxyHttpPortList.add(mqttHttpPort); if (conf.isMqttProxyTlsEnabled()) { mqttProxyTlsPort = PortManager.nextFreePort(); conf.setMqttProxyTlsPort(mqttProxyTlsPort); @@ -369,6 +376,7 @@ protected void startBroker(MQTTCommonConfiguration conf) throws Exception { conf.setBrokerServicePortTls(Optional.of(brokerPortTls)); conf.setWebServicePort(Optional.of(brokerWebServicePort)); conf.setWebServicePortTls(Optional.of(brokerWebServicePortTls)); + conf.setMopWebServicePort(Optional.of(mqttHttpPort)); String listener = "mqtt://127.0.0.1:" + mqttBrokerPort; String tlsListener = null; String tlsPskListener = null; @@ -384,10 +392,11 @@ protected void startBroker(MQTTCommonConfiguration conf) throws Exception { log.info("Start broker info, brokerPort: {}, brokerPortTls : {}, " + "brokerWebServicePort : {} , brokerWebServicePortTls : {}, " + "mqttBrokerPort: {}, mqttBrokerTlsPort: {}, mqttBrokerTlsPskPort: {}, " - + "mqttProxyPort: {}, mqttProxyTlsPort: {}, mqttProxyTlsPskPort: {}", + + "mqttProxyPort: {}, mqttProxyTlsPort: {}, mqttProxyTlsPskPort: {}, " + + "mqttHttpPort: {}", brokerPort, brokerPortTls, brokerWebServicePort, brokerWebServicePortTls, mqttBrokerPort, mqttBrokerTlsPort, mqttBrokerTlsPskPort, - mqttProxyPort, mqttProxyTlsPort, mqttProxyTlsPskPort); + mqttProxyPort, mqttProxyTlsPort, mqttProxyTlsPskPort, mqttHttpPort); ConfigurationUtils.extractFieldToProperties(conf); setTLSConf(conf); this.pulsarServiceList.add(doStartBroker(conf)); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java index 68ca0061..4c2215ec 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTTestBase.java @@ -35,7 +35,7 @@ public class MQTTTestBase extends MQTTProtocolHandlerTestBase { public static final int TIMEOUT = 80 * 1000; - private final Random random = new Random(); + protected final Random random = new Random(); @DataProvider(name = "batchEnabled") public Object[][] batchEnabled() { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java new file mode 100644 index 00000000..ebb9db1f --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyHttpTest.java @@ -0,0 +1,80 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamnative.pulsar.handlers.mqtt.mqtt3.fusesource.proxy; + +import com.google.gson.Gson; +import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.testng.Assert; +import org.testng.annotations.Test; +/** + * Integration tests for MQTT protocol handler with proxy. + */ +@Slf4j +public class ProxyHttpTest extends MQTTTestBase { + + @Override + protected MQTTCommonConfiguration initConfig() throws Exception { + MQTTCommonConfiguration mqtt = super.initConfig(); + mqtt.setMqttProxyEnabled(true); + return mqtt; + } + + @Test + public void testGetDeviceList() throws Exception { + int index = random.nextInt(mqttProxyPortList.size()); + List mqttProxyPortList = getMqttProxyPortList(); + List mqttProxyHttpPortList = getMqttProxyHttpPortList(); + MQTT mqttProducer = new MQTT(); + int port = mqttProxyPortList.get(index); + String clientId = "device-list-client"; + mqttProducer.setHost("127.0.0.1", port); + mqttProducer.setClientId(clientId); + BlockingConnection producer = mqttProducer.blockingConnection(); + producer.connect(); + producer.publish("testHttp", "Hello MQTT".getBytes(StandardCharsets.UTF_8), QoS.AT_MOST_ONCE, false); + Thread.sleep(4000); + HttpClient httpClient = HttpClientBuilder.create().build(); + final String mopEndPoint = "http://localhost:" + mqttProxyHttpPortList.get(index) + "/admin/devices/list"; + HttpResponse response = httpClient.execute(new HttpGet(mopEndPoint)); + InputStream inputStream = response.getEntity().getContent(); + InputStreamReader isReader = new InputStreamReader(inputStream); + BufferedReader reader = new BufferedReader(isReader); + StringBuffer buffer = new StringBuffer(); + String str; + while ((str = reader.readLine()) != null){ + buffer.append(str); + } + String ret = buffer.toString(); + ArrayList deviceList = new Gson().fromJson(ret, ArrayList.class); + Assert.assertEquals(deviceList.size(), 1); + Assert.assertTrue(deviceList.contains(clientId)); + } + +}