From 93894bf5343e02d0c2601cd182749722ce85f248 Mon Sep 17 00:00:00 2001 From: Aroooba Date: Sat, 15 Jul 2023 03:26:00 +0900 Subject: [PATCH 1/2] Move blocking publishChatMessage operation to boundedElastic thread --- src/main/java/com/github/rawsanj/handler/WebHttpHandler.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/github/rawsanj/handler/WebHttpHandler.java b/src/main/java/com/github/rawsanj/handler/WebHttpHandler.java index a02fde4..f00a826 100644 --- a/src/main/java/com/github/rawsanj/handler/WebHttpHandler.java +++ b/src/main/java/com/github/rawsanj/handler/WebHttpHandler.java @@ -9,6 +9,7 @@ import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.scheduler.Schedulers; import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RequestPredicates.POST; @@ -23,7 +24,9 @@ public RouterFunction htmlRouter(@Value("classpath:/static/index return route(GET("/"), request -> ok().contentType(MediaType.TEXT_HTML).bodyValue(html)) .andRoute(POST("/message"), request -> request.bodyToMono(Message.class) .flatMap(message -> redisChatMessagePublisher.publishChatMessage(message.getMessage())) - .flatMap(aLong -> ServerResponse.ok().bodyValue(new Message("Message Sent Successfully!.")))); + .flatMap(aLong -> ServerResponse.ok().bodyValue(new Message("Message Sent Successfully!."))) + .subscribeOn(Schedulers.boundedElastic())); + } } From 371c6a98951029e67f7ba8b8ea612d18fa102251 Mon Sep 17 00:00:00 2001 From: Aroooba Date: Sat, 15 Jul 2023 03:24:31 +0900 Subject: [PATCH 2/2] Update ChatWebSocketHandler to move blocking operations to boundedElastic threads --- .../rawsanj/handler/ChatWebSocketHandler.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java b/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java index ebdc694..c2ab28e 100644 --- a/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java +++ b/src/main/java/com/github/rawsanj/handler/ChatWebSocketHandler.java @@ -11,6 +11,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; @Slf4j public class ChatWebSocketHandler implements WebSocketHandler { @@ -40,15 +41,19 @@ public Mono handle(WebSocketSession webSocketSession) { Mono inputMessage = webSocketSession.receive() .flatMap(webSocketMessage -> redisChatMessagePublisher.publishChatMessage(webSocketMessage.getPayloadAsText())) .doOnSubscribe(subscription -> { - long activeUserCount = activeUserCounter.incrementAndGet(); - log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); - chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount)); + Mono.fromRunnable(() -> { + long activeUserCount = activeUserCounter.incrementAndGet(); + log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); + chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount)); + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); }) .doOnError(throwable -> log.error("Error Occurred while sending message to Redis.", throwable)) .doFinally(signalType -> { - long activeUserCount = activeUserCounter.decrementAndGet(); - log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); - chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount)); + Mono.fromRunnable(() -> { + long activeUserCount = activeUserCounter.decrementAndGet(); + log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); + chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount)); + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); }) .then();