diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/CommonWebSocketServlet.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/CommonWebSocketServlet.java index 81531f07c81..e03216d5bb9 100644 --- a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/CommonWebSocketServlet.java +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/CommonWebSocketServlet.java @@ -15,8 +15,8 @@ import java.io.IOException; import java.io.Serial; import java.util.Base64; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -78,7 +78,7 @@ public class CommonWebSocketServlet extends WebSocketServlet { public static final String DEFAULT_ADAPTER_ID = EventWebSocketAdapter.ADAPTER_ID; - private final Map connectionHandlers = new HashMap<>(); + private final Map connectionHandlers = new ConcurrentHashMap<>(); private final AuthFilter authFilter; @SuppressWarnings("unused") diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/event/EventWebSocket.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/event/EventWebSocket.java index ae5a91a3b9c..9bac95f5679 100644 --- a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/event/EventWebSocket.java +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/event/EventWebSocket.java @@ -61,13 +61,25 @@ public class EventWebSocket { private final EventPublisher eventPublisher; private final ItemEventUtility itemEventUtility; + // All access must be guarded by "this" private @Nullable Session session; + + // All access must be guarded by "this" private @Nullable RemoteEndpoint remoteEndpoint; + + // All access must be guarded by "this" private String remoteIdentifier = ""; + // All access must be guarded by "this" private List typeFilter = List.of(); + + // All access must be guarded by "this" private List sourceFilter = List.of(); + + // All access must be guarded by "this" private @Nullable TopicEventFilter topicIncludeFilter = null; + + // All access must be guarded by "this" private @Nullable TopicEventFilter topicExcludeFilter = null; public EventWebSocket(Gson gson, EventWebSocketAdapter wsAdapter, ItemEventUtility itemEventUtility, @@ -81,23 +93,32 @@ public EventWebSocket(Gson gson, EventWebSocketAdapter wsAdapter, ItemEventUtili @OnWebSocketClose public void onClose(int statusCode, String reason) { this.wsAdapter.unregisterListener(this); - remoteIdentifier = ""; - this.session = null; - this.remoteEndpoint = null; + synchronized (this) { + remoteIdentifier = ""; + this.session = null; + this.remoteEndpoint = null; + } } @OnWebSocketConnect public void onConnect(Session session) { - this.session = session; RemoteEndpoint remoteEndpoint = session.getRemote(); - this.remoteEndpoint = remoteEndpoint; - this.remoteIdentifier = remoteEndpoint.getInetSocketAddress().toString(); + synchronized (this) { + this.session = session; + this.remoteEndpoint = remoteEndpoint; + this.remoteIdentifier = remoteEndpoint.getInetSocketAddress().toString(); + } this.wsAdapter.registerListener(this); } @OnWebSocketMessage public void onText(String message) { - RemoteEndpoint remoteEndpoint = this.remoteEndpoint; + RemoteEndpoint remoteEndpoint; + Session session; + synchronized (this) { + remoteEndpoint = this.remoteEndpoint; + session = this.session; + } if (session == null || remoteEndpoint == null) { // no connection or no remote endpoint , do nothing this is possible due to async behavior return; @@ -141,17 +162,25 @@ public void onText(String message) { responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "heartbeat", "PONG", null, eventDTO.eventId); } else if ((WEBSOCKET_TOPIC_PREFIX + "filter/type").equals(eventDTO.topic)) { - typeFilter = Objects.requireNonNullElse(gson.fromJson(eventDTO.payload, STRING_LIST_TYPE), - List.of()); - logger.debug("Setting type filter for connection to {}: {}", - remoteEndpoint.getInetSocketAddress(), typeFilter); + synchronized (this) { + typeFilter = Objects.requireNonNullElse( + gson.fromJson(eventDTO.payload, STRING_LIST_TYPE), List.of()); + if (logger.isDebugEnabled()) { + logger.debug("Setting type filter for connection to {}: {}", + remoteEndpoint.getInetSocketAddress(), typeFilter); + } + } responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/type", eventDTO.payload, null, eventDTO.eventId); } else if ((WEBSOCKET_TOPIC_PREFIX + "filter/source").equals(eventDTO.topic)) { - sourceFilter = Objects.requireNonNullElse(gson.fromJson(eventDTO.payload, STRING_LIST_TYPE), - List.of()); - logger.debug("Setting source filter for connection to {}: {}", - remoteEndpoint.getInetSocketAddress(), typeFilter); + synchronized (this) { + sourceFilter = Objects.requireNonNullElse( + gson.fromJson(eventDTO.payload, STRING_LIST_TYPE), List.of()); + if (logger.isDebugEnabled()) { + logger.debug("Setting source filter for connection to {}: {}", + remoteEndpoint.getInetSocketAddress(), sourceFilter); + } + } responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/source", eventDTO.payload, null, eventDTO.eventId); } else if ((WEBSOCKET_TOPIC_PREFIX + "filter/topic").equals(eventDTO.topic)) { @@ -170,14 +199,20 @@ public void onText(String message) { includeTopics = includeTopics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList(); excludeTopics = excludeTopics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList(); // create topic filter if topic list not empty - if (!includeTopics.isEmpty()) { - topicIncludeFilter = new TopicEventFilter(includeTopics); - } - if (!excludeTopics.isEmpty()) { - topicExcludeFilter = new TopicEventFilter(excludeTopics); + if (!includeTopics.isEmpty() || !excludeTopics.isEmpty()) { + synchronized (this) { + if (!includeTopics.isEmpty()) { + topicIncludeFilter = new TopicEventFilter(includeTopics); + } + if (!excludeTopics.isEmpty()) { + topicExcludeFilter = new TopicEventFilter(excludeTopics); + } + } + if (logger.isDebugEnabled()) { + logger.debug("Setting topic filter for connection to {}: {}", + remoteEndpoint.getInetSocketAddress(), topics); + } } - logger.debug("Setting topic filter for connection to {}: {}", - remoteEndpoint.getInetSocketAddress(), topics); responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic", eventDTO.payload, null, eventDTO.eventId); } else { @@ -207,13 +242,17 @@ public void onText(String message) { try { sendMessage(gson.toJson(responseEvent)); } catch (IOException e) { - logger.debug("Failed to send WebSocketResponseEvent event {} to {}: {}", responseEvent, remoteIdentifier, - e.getMessage()); + if (logger.isDebugEnabled()) { + synchronized (this) { + logger.debug("Failed to send WebSocketResponseEvent event {} to {}: {}", responseEvent, + remoteIdentifier, e.getMessage()); + } + } } } @OnWebSocketError - public void onError(Session session, Throwable error) { + public void onError(@Nullable Session session, @Nullable Throwable error) { if (session != null) { session.close(); } @@ -224,6 +263,18 @@ public void onError(Session session, Throwable error) { } public void processEvent(Event event) { + List typeFilter; + List sourceFilter; + TopicEventFilter topicIncludeFilter; + TopicEventFilter topicExcludeFilter; + String remoteIdentifier; + synchronized (this) { + typeFilter = this.typeFilter; + sourceFilter = this.sourceFilter; + topicIncludeFilter = this.topicIncludeFilter; + topicExcludeFilter = this.topicExcludeFilter; + remoteIdentifier = this.remoteIdentifier; + } try { String source = event.getSource(); if ((source == null || !sourceFilter.contains(event.getSource())) @@ -237,8 +288,11 @@ public void processEvent(Event event) { } } - private synchronized void sendMessage(String message) throws IOException { - RemoteEndpoint remoteEndpoint = this.remoteEndpoint; + private void sendMessage(String message) throws IOException { + RemoteEndpoint remoteEndpoint; + synchronized (this) { + remoteEndpoint = this.remoteEndpoint; + } if (remoteEndpoint == null) { logger.warn("Could not determine remote endpoint, failed to send '{}'.", message); return; diff --git a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java index a0800403924..ba6c2b51bfa 100644 --- a/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java +++ b/bundles/org.openhab.core.io.websocket/src/main/java/org/openhab/core/io/websocket/log/LogWebSocket.java @@ -15,12 +15,15 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -38,7 +41,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.openhab.core.common.ThreadPoolManager; +import org.openhab.core.common.ThreadFactoryBuilder; import org.osgi.service.log.LogEntry; import org.osgi.service.log.LogListener; import org.slf4j.Logger; @@ -77,42 +80,68 @@ public class LogWebSocket implements LogListener { private final LogWebSocketAdapter wsAdapter; private final Gson gson; + // All access must be guarded by "this" private @Nullable Session session; + + // All access must be guarded by "this" private @Nullable RemoteEndpoint remoteEndpoint; - private final ScheduledExecutorService scheduledExecutorService; + // All access must be guarded by "this" + private @Nullable ScheduledExecutorService scheduledExecutorService; + + // All access must be guarded by "this" private @Nullable ScheduledFuture commitScheduledFuture; + // All access must be guarded by "this" private long lastSentTime = 0; + + // All access must be guarded by "this" private List deferredLogs = new ArrayList<>(); + // All access must be guarded by "this" private boolean enabled = false; + + // All access must be guarded by "this" private long lastSequence = FIRST_SEQUENCE; - private List loggerPatterns = List.of(); + private final List loggerPatterns = new CopyOnWriteArrayList<>(); public LogWebSocket(Gson gson, LogWebSocketAdapter wsAdapter) { this.wsAdapter = wsAdapter; this.gson = gson; - - scheduledExecutorService = ThreadPoolManager.getScheduledPool("LogWebSocket"); } @OnWebSocketClose public void onClose(int statusCode, String reason) { - if (enabled) { - this.wsAdapter.unregisterListener(this); - } stopDeferredScheduledFuture(); - this.session = null; - this.remoteEndpoint = null; + synchronized (this) { + if (enabled) { + this.wsAdapter.unregisterListener(this); + enabled = false; + } + this.session = null; + this.remoteEndpoint = null; + this.deferredLogs.clear(); + if (this.scheduledExecutorService != null) { + this.scheduledExecutorService.shutdownNow(); + } + this.scheduledExecutorService = null; + } } @OnWebSocketConnect - public void onConnect(Session session) { + public synchronized void onConnect(Session session) { this.session = session; - RemoteEndpoint remoteEndpoint = session.getRemote(); - this.remoteEndpoint = remoteEndpoint; + this.remoteEndpoint = session.getRemote(); + if (this.scheduledExecutorService != null) { + this.scheduledExecutorService.shutdownNow(); + } + InetSocketAddress isa = session.getRemoteAddress(); + String name = isa == null ? "websocket-logger" + : "websocket-logger-" + isa.getHostString() + ':' + isa.getPort(); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder.create() + .withNamePrefix("OH").withName(name).withUncaughtExceptionHandler((t, e) -> { + }).build()); } @OnWebSocketMessage @@ -122,17 +151,22 @@ public void onText(String message) { return; } - // Defer sending live logs while we process the history - lastSentTime = Long.MAX_VALUE; - stopDeferredScheduledFuture(); + Session session; + RemoteEndpoint remoteEndpoint; + synchronized (this) { + // Defer sending live logs while we process the history + lastSentTime = Long.MAX_VALUE; + stopDeferredScheduledFuture(); + + // Enable log messages + if (!enabled) { + this.wsAdapter.registerListener(this); + enabled = true; + } - // Enable log messages - if (!enabled) { - this.wsAdapter.registerListener(this); - enabled = true; + session = this.session; + remoteEndpoint = this.remoteEndpoint; } - - RemoteEndpoint remoteEndpoint = this.remoteEndpoint; if (session == null || remoteEndpoint == null) { // no connection or no remote endpoint , do nothing this is possible due to async behavior return; @@ -146,27 +180,37 @@ public void onText(String message) { return; } - loggerPatterns = logFilterDto.loggerNames == null ? List.of() - : logFilterDto.loggerNames.stream().map(Pattern::compile).toList(); + if (!loggerPatterns.isEmpty()) { + loggerPatterns.clear(); + } + List loggerNames; + if (logFilterDto != null && (loggerNames = logFilterDto.loggerNames) != null) { + List filters = loggerNames.stream().map(Pattern::compile).toList(); + if (!filters.isEmpty()) { + loggerPatterns.addAll(filters); + } + } Long timeStart; Long timeStop; - if (logFilterDto.timeStart != null) { + if (logFilterDto != null && logFilterDto.timeStart != null) { timeStart = logFilterDto.timeStart; } else { timeStart = Long.MIN_VALUE; } - if (logFilterDto.timeStop != null) { + if (logFilterDto != null && logFilterDto.timeStop != null) { timeStop = logFilterDto.timeStop; } else { timeStop = Long.MAX_VALUE; } Long sequenceStart; - if (logFilterDto.sequenceStart != null) { + if (logFilterDto != null && logFilterDto.sequenceStart != null) { sequenceStart = logFilterDto.sequenceStart; } else { - sequenceStart = lastSequence; + synchronized (this) { + sequenceStart = lastSequence; + } } List logs = new ArrayList<>(); @@ -175,7 +219,9 @@ public void onText(String message) { } if (logs.isEmpty()) { - lastSentTime = 0; + synchronized (this) { + lastSentTime = 0; + } return; } @@ -191,15 +237,17 @@ public void onText(String message) { List dtoList = filteredEvents.stream().map(this::map).collect(Collectors.toList()); Collections.sort(dtoList); + long sentTime; try { sendMessage(gson.toJson(dtoList)); + sentTime = System.currentTimeMillis(); } catch (IOException e) { + sentTime = Long.MIN_VALUE; } - lastSentTime = System.currentTimeMillis(); // Remove any duplicates from the live log buffer long newestSequence = logs.getFirst().getSequence(); - synchronized (deferredLogs) { + synchronized (this) { Iterator iterator = deferredLogs.iterator(); while (iterator.hasNext()) { LogDTO value = iterator.next(); @@ -208,13 +256,16 @@ public void onText(String message) { } } } + synchronized (this) { + lastSentTime = sentTime == Long.MIN_VALUE ? 0L : sentTime; + } // Continue with live logs... flush(); } @OnWebSocketError - public void onError(Session session, @Nullable Throwable error) { + public void onError(@Nullable Session session, @Nullable Throwable error) { if (session != null) { session.close(); } @@ -224,14 +275,21 @@ public void onError(Session session, @Nullable Throwable error) { onClose(StatusCode.NO_CODE, message); } - private synchronized void sendMessage(String message) throws IOException { - RemoteEndpoint remoteEndpoint = this.remoteEndpoint; + private void sendMessage(String message) throws IOException { + RemoteEndpoint remoteEndpoint; + synchronized (this) { + remoteEndpoint = this.remoteEndpoint; + } if (remoteEndpoint == null) { return; } remoteEndpoint.sendString(message); } + /** + * @implNote Under no circumstances must this method result in something being logged, since that + * causes an "endless circle". + */ @Override public void logged(@NonNullByDefault({}) LogEntry logEntry) { if (!loggerPatterns.isEmpty() && loggerPatterns.stream().noneMatch(logPatternMatch(logEntry))) { @@ -239,25 +297,32 @@ public void logged(@NonNullByDefault({}) LogEntry logEntry) { } LogDTO logDTO = map(logEntry); - lastSequence = logEntry.getSequence(); - - // If the last message sent was less than SEND_PERIOD ago, then we just buffer - if (lastSentTime > System.currentTimeMillis() - SEND_PERIOD) { - // Start the timer if this is the first deferred log - synchronized (deferredLogs) { - if (deferredLogs.isEmpty()) { - commitScheduledFuture = scheduledExecutorService.schedule(this::flush, + boolean bufferEmpty; + ScheduledExecutorService executor; + synchronized (this) { + if ((executor = scheduledExecutorService) == null) { + return; + } + lastSequence = logEntry.getSequence(); + + // If the buffer isn't empty or the last message was sent less than SEND_PERIOD ago, then we just buffer + if (!(bufferEmpty = deferredLogs.isEmpty()) || lastSentTime > System.currentTimeMillis() - SEND_PERIOD) { + if (bufferEmpty) { + stopDeferredScheduledFuture(); + commitScheduledFuture = executor.schedule(this::flush, lastSentTime + SEND_PERIOD - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } deferredLogs.add(logDTO); - } - } else { - lastSentTime = System.currentTimeMillis(); - try { - sendMessage(gson.toJson(logDTO)); - } catch (IOException e) { - // Fail silently! + } else { + lastSentTime = System.currentTimeMillis(); + executor.submit(() -> { + try { + sendMessage(gson.toJson(logDTO)); + } catch (IOException e) { + // Fail silently! + } + }); } } } @@ -283,25 +348,33 @@ private LogDTO map(LogEntry logEntry) { private void stopDeferredScheduledFuture() { // Stop any existing scheduled commit - ScheduledFuture commitScheduledFuture = this.commitScheduledFuture; - if (commitScheduledFuture != null) { + ScheduledFuture commitScheduledFuture; + synchronized (this) { + commitScheduledFuture = this.commitScheduledFuture; + this.commitScheduledFuture = null; + } + if (commitScheduledFuture != null && !commitScheduledFuture.isDone()) { commitScheduledFuture.cancel(false); - commitScheduledFuture = null; } } - private synchronized void flush() { + private void flush() { stopDeferredScheduledFuture(); - synchronized (deferredLogs) { - if (!deferredLogs.isEmpty()) { - try { - sendMessage(gson.toJson(deferredLogs)); - } catch (IOException e) { - } - + List logs; + synchronized (this) { + if (deferredLogs.isEmpty()) { + logs = null; + } else { + logs = List.copyOf(deferredLogs); deferredLogs.clear(); } } + if (logs != null) { + try { + sendMessage(gson.toJson(logs)); + } catch (IOException e) { + } + } } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/events/TopicEventFilter.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/events/TopicEventFilter.java index e28c0eb71dc..2fde5323866 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/events/TopicEventFilter.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/events/TopicEventFilter.java @@ -13,15 +13,17 @@ package org.openhab.core.events; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.regex.Pattern; import org.eclipse.jdt.annotation.NonNullByDefault; -import org.eclipse.jdt.annotation.Nullable; /** * The {@link TopicEventFilter} is a default openHAB {@link EventFilter} implementation that ensures filtering * of events based on a single event topic or multiple event topics. + *

+ * Thread-safe. * * @author Stefan Bußweiler - Initial contribution * @author Florian Hotze - Add support for filtering of events by multiple event topics @@ -29,8 +31,7 @@ @NonNullByDefault public class TopicEventFilter implements EventFilter { - private final @Nullable Pattern topicRegex; - private final List topicsRegexes = new ArrayList<>(); + private final List topicsRegexes; /** * Constructs a new topic event filter. @@ -40,7 +41,7 @@ public class TopicEventFilter implements EventFilter { * Regex */ public TopicEventFilter(String topicRegex) { - this.topicRegex = Pattern.compile(topicRegex); + this.topicsRegexes = List.of(Pattern.compile(topicRegex)); } /** @@ -51,20 +52,15 @@ public TopicEventFilter(String topicRegex) { * Regex */ public TopicEventFilter(List topicsRegexes) { - this.topicRegex = null; + List tmpTopicsRegexes = new ArrayList<>(); for (String topicRegex : topicsRegexes) { - this.topicsRegexes.add(Pattern.compile(topicRegex)); + tmpTopicsRegexes.add(Pattern.compile(topicRegex)); } + this.topicsRegexes = Collections.unmodifiableList(tmpTopicsRegexes); } @Override public boolean apply(Event event) { - String topic = event.getTopic(); - Pattern topicRegex = this.topicRegex; - if (topicRegex != null) { - return topicRegex.matcher(topic).matches(); - } else { - return topicsRegexes.stream().anyMatch(p -> p.matcher(topic).matches()); - } + return topicsRegexes.stream().anyMatch(p -> p.matcher(event.getTopic()).matches()); } }