Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import java.io.IOException;
import java.io.Serial;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -78,7 +78,7 @@ public class CommonWebSocketServlet extends WebSocketServlet {

public static final String DEFAULT_ADAPTER_ID = EventWebSocketAdapter.ADAPTER_ID;

private final Map<String, WebSocketAdapter> connectionHandlers = new HashMap<>();
private final Map<String, WebSocketAdapter> connectionHandlers = new ConcurrentHashMap<>();
private final AuthFilter authFilter;

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,25 @@ public class EventWebSocket {
private final EventPublisher eventPublisher;
private final ItemEventUtility itemEventUtility;

// All access must be guarded by "this"
private @Nullable Session session;

// All access must be guarded by "this"
private @Nullable RemoteEndpoint remoteEndpoint;

// All access must be guarded by "this"
private String remoteIdentifier = "<unknown>";

// All access must be guarded by "this"
private List<String> typeFilter = List.of();

// All access must be guarded by "this"
private List<String> sourceFilter = List.of();

// All access must be guarded by "this"
private @Nullable TopicEventFilter topicIncludeFilter = null;

// All access must be guarded by "this"
private @Nullable TopicEventFilter topicExcludeFilter = null;

public EventWebSocket(Gson gson, EventWebSocketAdapter wsAdapter, ItemEventUtility itemEventUtility,
Expand All @@ -81,23 +93,32 @@ public EventWebSocket(Gson gson, EventWebSocketAdapter wsAdapter, ItemEventUtili
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
this.wsAdapter.unregisterListener(this);
remoteIdentifier = "<unknown>";
this.session = null;
this.remoteEndpoint = null;
synchronized (this) {
remoteIdentifier = "<unknown>";
this.session = null;
this.remoteEndpoint = null;
}
}

@OnWebSocketConnect
public void onConnect(Session session) {
this.session = session;
RemoteEndpoint remoteEndpoint = session.getRemote();
this.remoteEndpoint = remoteEndpoint;
this.remoteIdentifier = remoteEndpoint.getInetSocketAddress().toString();
synchronized (this) {
this.session = session;
this.remoteEndpoint = remoteEndpoint;
this.remoteIdentifier = remoteEndpoint.getInetSocketAddress().toString();
}
this.wsAdapter.registerListener(this);
}

@OnWebSocketMessage
public void onText(String message) {
RemoteEndpoint remoteEndpoint = this.remoteEndpoint;
RemoteEndpoint remoteEndpoint;
Session session;
synchronized (this) {
remoteEndpoint = this.remoteEndpoint;
session = this.session;
}
if (session == null || remoteEndpoint == null) {
// no connection or no remote endpoint , do nothing this is possible due to async behavior
return;
Expand Down Expand Up @@ -141,17 +162,25 @@ public void onText(String message) {
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "heartbeat",
"PONG", null, eventDTO.eventId);
} else if ((WEBSOCKET_TOPIC_PREFIX + "filter/type").equals(eventDTO.topic)) {
typeFilter = Objects.requireNonNullElse(gson.fromJson(eventDTO.payload, STRING_LIST_TYPE),
List.of());
logger.debug("Setting type filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), typeFilter);
synchronized (this) {
typeFilter = Objects.requireNonNullElse(
gson.fromJson(eventDTO.payload, STRING_LIST_TYPE), List.of());
if (logger.isDebugEnabled()) {
logger.debug("Setting type filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), typeFilter);
}
}
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/type",
eventDTO.payload, null, eventDTO.eventId);
} else if ((WEBSOCKET_TOPIC_PREFIX + "filter/source").equals(eventDTO.topic)) {
sourceFilter = Objects.requireNonNullElse(gson.fromJson(eventDTO.payload, STRING_LIST_TYPE),
List.of());
logger.debug("Setting source filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), typeFilter);
synchronized (this) {
sourceFilter = Objects.requireNonNullElse(
gson.fromJson(eventDTO.payload, STRING_LIST_TYPE), List.of());
if (logger.isDebugEnabled()) {
logger.debug("Setting source filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), sourceFilter);
}
}
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/source",
eventDTO.payload, null, eventDTO.eventId);
} else if ((WEBSOCKET_TOPIC_PREFIX + "filter/topic").equals(eventDTO.topic)) {
Expand All @@ -170,14 +199,20 @@ public void onText(String message) {
includeTopics = includeTopics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList();
excludeTopics = excludeTopics.stream().map(t -> t.trim().replace("*", ".*") + "$").toList();
// create topic filter if topic list not empty
if (!includeTopics.isEmpty()) {
topicIncludeFilter = new TopicEventFilter(includeTopics);
}
if (!excludeTopics.isEmpty()) {
topicExcludeFilter = new TopicEventFilter(excludeTopics);
if (!includeTopics.isEmpty() || !excludeTopics.isEmpty()) {
synchronized (this) {
if (!includeTopics.isEmpty()) {
topicIncludeFilter = new TopicEventFilter(includeTopics);
}
if (!excludeTopics.isEmpty()) {
topicExcludeFilter = new TopicEventFilter(excludeTopics);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Setting topic filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), topics);
}
}
logger.debug("Setting topic filter for connection to {}: {}",
remoteEndpoint.getInetSocketAddress(), topics);
responseEvent = new EventDTO(WEBSOCKET_EVENT_TYPE, WEBSOCKET_TOPIC_PREFIX + "filter/topic",
eventDTO.payload, null, eventDTO.eventId);
} else {
Expand Down Expand Up @@ -207,13 +242,17 @@ public void onText(String message) {
try {
sendMessage(gson.toJson(responseEvent));
} catch (IOException e) {
logger.debug("Failed to send WebSocketResponseEvent event {} to {}: {}", responseEvent, remoteIdentifier,
e.getMessage());
if (logger.isDebugEnabled()) {
synchronized (this) {
logger.debug("Failed to send WebSocketResponseEvent event {} to {}: {}", responseEvent,
remoteIdentifier, e.getMessage());
}
}
}
}

@OnWebSocketError
public void onError(Session session, Throwable error) {
public void onError(@Nullable Session session, @Nullable Throwable error) {
if (session != null) {
session.close();
}
Expand All @@ -224,6 +263,18 @@ public void onError(Session session, Throwable error) {
}

public void processEvent(Event event) {
List<String> typeFilter;
List<String> sourceFilter;
TopicEventFilter topicIncludeFilter;
TopicEventFilter topicExcludeFilter;
String remoteIdentifier;
synchronized (this) {
typeFilter = this.typeFilter;
sourceFilter = this.sourceFilter;
topicIncludeFilter = this.topicIncludeFilter;
topicExcludeFilter = this.topicExcludeFilter;
remoteIdentifier = this.remoteIdentifier;
}
try {
String source = event.getSource();
if ((source == null || !sourceFilter.contains(event.getSource()))
Expand All @@ -237,8 +288,11 @@ public void processEvent(Event event) {
}
}

private synchronized void sendMessage(String message) throws IOException {
RemoteEndpoint remoteEndpoint = this.remoteEndpoint;
private void sendMessage(String message) throws IOException {
RemoteEndpoint remoteEndpoint;
synchronized (this) {
remoteEndpoint = this.remoteEndpoint;
}
if (remoteEndpoint == null) {
logger.warn("Could not determine remote endpoint, failed to send '{}'.", message);
return;
Expand Down
Loading