forked from janiskomuls/scala-bootcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWebSocket.scala
142 lines (117 loc) · 5.9 KB
/
WebSocket.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.evolutiongaming.bootcamp.http
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.syntax.all._
import fs2.{Pipe, Stream}
import fs2.concurrent.{Queue, Topic}
import org.http4s._
import org.http4s.client.jdkhttpclient.{JdkWSClient, WSConnectionHighLevel, WSFrame, WSRequest}
import org.http4s.dsl.io._
import org.http4s.implicits._
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.server.websocket.WebSocketBuilder
import org.http4s.websocket.WebSocketFrame
import java.net.http.HttpClient
import scala.concurrent.ExecutionContext
import cats.effect.Clock
import java.time.Instant
object WebSocketIntroduction {
// WEBSOCKET
// One of the main limitations of HTTP is its request-response model. The server can only send data to the
// client, when the client requests it to. Unlike HTTP, WebSocket provides full-duplex communication. That
// means the client and the server can send data to each other in both directions at the same time.
// WebSocket is distinct from HTTP. However, both protocols use TCP (Transmission Control Protocol) as their
// transport. In addition, WebSocket utilizes the same ports as HTTP by default (443 and 80) and uses HTTP
// `Upgrade` header during its handshake. It means that, to establish a WebSocket connection, the client and
// the server establish an HTTP connection first. Then the client proposes an upgrade to WebSocket. If the
// server accepts, a new WebSocket connection is established.
// WebSocket communication consists of frames (data fragments), which can be sent both by the client and
// the server. Frames can be of several types:
// * text frames contain text data;
// * binary frames contain binary data;
// * ping/pong frames check the connection (when sent from the server, the client responds automatically);
// * other service frames: connection close frame, etc.
// Developers usually directly work with text and binary frames only. In contrary to HTTP, WebSocket does
// not enforce any specific message format, so frames can contain any text or binary data.
}
object WebSocketServer extends IOApp {
// Let's build a WebSocket server using Http4s.
private val echoRoute = HttpRoutes.of[IO] {
// websocat "ws://localhost:9002/echo"
case GET -> Root / "echo" =>
// Pipe is a stream transformation function of type `Stream[F, I] => Stream[F, O]`. In this case
// `I == O == WebSocketFrame`. So the pipe transforms incoming WebSocket messages from the client to
// outgoing WebSocket messages to send to the client.
val echoPipe: Pipe[IO, WebSocketFrame, WebSocketFrame] =
_.collect {
case WebSocketFrame.Text(message, _) => WebSocketFrame.Text(message)
}
for {
// Unbounded queue to store WebSocket messages from the client, which are pending to be processed.
// For production use bounded queue seems a better choice. Unbounded queue may result in out of
// memory error, if the client is sending messages quicker than the server can process them.
queue <- Queue.unbounded[IO, WebSocketFrame]
response <- WebSocketBuilder[IO].build(
// Sink, where the incoming WebSocket messages from the client are pushed to.
receive = queue.enqueue,
// Outgoing stream of WebSocket messages to send to the client.
send = queue.dequeue.through(echoPipe),
)
} yield response
// Exercise 1. Send current time to user when he asks it.
// Note: getting current time is a side effect.
// Exercise 2. Notify user periodically how long he is connected.
// Tip: you can merge streams via `merge` operator.
}
// Topics provide an implementation of the publish-subscribe pattern with an arbitrary number of
// publishers and an arbitrary number of subscribers.
private def chatRoute(chatTopic: Topic[IO, String]) = HttpRoutes.of[IO] {
// websocat "ws://localhost:9002/chat"
case GET -> Root / "chat" =>
WebSocketBuilder[IO].build(
// Sink, where the incoming WebSocket messages from the client are pushed to.
receive = chatTopic.publish.compose[Stream[IO, WebSocketFrame]](_.collect {
case WebSocketFrame.Text(message, _) => message
}),
// Outgoing stream of WebSocket messages to send to the client.
send = chatTopic.subscribe(10).map(WebSocketFrame.Text(_)),
)
// Exercise 3. Use first message from a user as his username and prepend it to each his message.
// Tip: to do this you will likely need fs2.Pull.
}
private def httpApp(chatTopic: Topic[IO, String]) = {
echoRoute <+> chatRoute(chatTopic)
}.orNotFound
override def run(args: List[String]): IO[ExitCode] =
for {
chatTopic <- Topic[IO, String]("Hello!")
_ <- BlazeServerBuilder[IO](ExecutionContext.global)
.bindHttp(port = 9002, host = "localhost")
.withHttpApp(httpApp(chatTopic))
.serve
.compile
.drain
} yield ExitCode.Success
}
// Regrettably, Http4s does not yet provide a WebSocket client (contributions are welcome!):
// https://github.com/http4s/http4s/issues/330
// But there is an Http4s wrapper for builtin JDK HTTP client.
object WebSocketClient extends IOApp {
private val uri = uri"ws://localhost:9002/echo"
private def printLine(string: String = ""): IO[Unit] = IO(println(string))
override def run(args: List[String]): IO[ExitCode] = {
val clientResource = Resource.eval(IO(HttpClient.newHttpClient()))
.flatMap(JdkWSClient[IO](_).connectHighLevel(WSRequest(uri)))
clientResource.use { client =>
for {
_ <- client.send(WSFrame.Text("hello"))
_ <- client.receiveStream.collectFirst {
case WSFrame.Text(s, _) => s
}.compile.string >>= printLine
} yield ExitCode.Success
}
}
}
// Attributions and useful links:
// https://en.wikipedia.org/wiki/WebSocket
// https://javascript.info/websocket
// https://hpbn.co/websocket/