-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathChatWebSocketHandler.java
72 lines (63 loc) · 3.25 KB
/
ChatWebSocketHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.github.rawsanj.handler;
import com.github.rawsanj.messaging.RedisChatMessagePublisher;
import com.github.rawsanj.model.ChatMessage;
import com.github.rawsanj.util.ObjectStringConverter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {
private final Sinks.Many<ChatMessage> chatMessageSink;
private final Flux<ChatMessage> chatMessageFluxSink;
private final RedisChatMessagePublisher redisChatMessagePublisher;
private final RedisAtomicLong activeUserCounter;
private final ObjectStringConverter objectStringConverter;
public ChatWebSocketHandler(Sinks.Many<ChatMessage> chatMessageSink, RedisChatMessagePublisher redisChatMessagePublisher,
RedisAtomicLong activeUserCounter, ObjectStringConverter objectStringConverter) {
this.chatMessageSink = chatMessageSink;
this.chatMessageFluxSink = chatMessageSink.asFlux();
this.redisChatMessagePublisher = redisChatMessagePublisher;
this.activeUserCounter = activeUserCounter;
this.objectStringConverter = objectStringConverter;
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<WebSocketMessage> sendMessageFlux = chatMessageFluxSink.flatMap(objectStringConverter::objectToString)
.map(webSocketSession::textMessage)
.doOnError(throwable -> log.error("Error Occurred while sending message to WebSocket.", throwable));
Mono<Void> outputMessage = webSocketSession.send(sendMessageFlux);
Mono<Void> inputMessage = webSocketSession.receive()
.flatMap(webSocketMessage -> redisChatMessagePublisher.publishChatMessage(webSocketMessage.getPayloadAsText()))
.doOnSubscribe(subscription -> {
Mono.fromRunnable(() -> {
long activeUserCount = activeUserCounter.incrementAndGet();
log.info("User '{}' Connected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
chatMessageSink.tryEmitNext(new ChatMessage(0, "CONNECTED", "CONNECTED", activeUserCount));
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
})
.doOnError(throwable -> log.error("Error Occurred while sending message to Redis.", throwable))
.doFinally(signalType -> {
Mono.fromRunnable(() -> {
long activeUserCount = activeUserCounter.decrementAndGet();
log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount));
}).subscribeOn(Schedulers.boundedElastic()).subscribe();
})
.then();
return Mono.zip(inputMessage, outputMessage).then();
}
public Mono<Sinks.EmitResult> sendMessage(ChatMessage chatMessage) {
return Mono.fromSupplier(() -> chatMessageSink.tryEmitNext(chatMessage))
.doOnSuccess(emitResult -> {
if (emitResult.isFailure()) {
log.error("Failed to send message with id: {}", chatMessage.getId());
}
});
}
}