Skip to content

Commit 5cb1fd4

Browse files
committed
Add akka-http backend implementation
1 parent 8b18590 commit 5cb1fd4

16 files changed

+848
-2
lines changed

README.md

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
Java backend for `Socket.IO` library (http://socket.io/)
1+
Java/Scala backend for `Socket.IO` library (http://socket.io/)
22

33
Supports `Socket.IO` clients version 1.0+
4-
Requires JSR 356-compatible server (tested with Jetty 9 and Tomcat 8)
4+
5+
There is currently support for JSR 356-compatible servlet containers (tested with Jetty 9 and Tomcat 8) and Akka-HTTP.
56

67
Right now only websocket and XHR polling transports are implemented.
78

@@ -41,3 +42,21 @@ When Jetty server is embedded into your application, but websocket endpoint is e
4142
});
4243
```
4344
See example in [com.codeminders.socketio.sample.jetty.ChatServer](https://github.com/codeminders/socket.io-server-java/blob/master/samples/jetty/src/main/java/com/codeminders/socketio/sample/jetty/ChatServer.java)
45+
46+
## Akka-HTTP usage
47+
48+
The akka-http support revolves around a single exposed class, `SocketIOAkkaHttp`.
49+
50+
```scala
51+
val socketIO = SocketIOAkkaHttp().fold(sys.error, identity)
52+
53+
val routes: Route = {
54+
pathPrefix("socket.io") {
55+
socketIO.route
56+
}
57+
}
58+
59+
Http().bindAndHandle(routes, "localhost", 8080)
60+
```
61+
62+
The socket URL can be changed by moving the `.route` call to another area in your routes hierarchy. To configure limits and timeouts, an optional `SocketIOAkkaHttpSettings` instance can be passed to the `SocketIOAkkaHttp()` constructor.

pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,14 @@
6969
<role>contributor</role>
7070
</roles>
7171
</contributor>
72+
<contributor>
73+
<name>Brian Tarricone</name>
74+
<email>[email protected]</email>
75+
<timezone>-8</timezone>
76+
<roles>
77+
<role>contributor</role>
78+
</roles>
79+
</contributor>
7280
</contributors>
7381

7482
<licenses>
@@ -94,6 +102,7 @@
94102
<module>socket-io</module>
95103
<module>socket-io-servlet</module>
96104
<module>samples</module>
105+
<module>socket-io-akka-http-scala</module>
97106
</modules>
98107

99108
<distributionManagement>

socket-io-akka-http-scala/pom.xml

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<parent>
7+
<artifactId>socketio-parent</artifactId>
8+
<groupId>com.codeminders.socketio</groupId>
9+
<version>2.0.0-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>socket-io-akka-http-scala_${scala.binary.version}</artifactId>
13+
<packaging>jar</packaging>
14+
15+
<properties>
16+
<scala.binary.version>2.12</scala.binary.version>
17+
<scala.version>2.12.8</scala.version>
18+
<akka.version>2.5.19</akka.version>
19+
<akka-http.version>10.1.6</akka-http.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.codeminders.socketio</groupId>
25+
<artifactId>socket-io</artifactId>
26+
<version>2.0.0-SNAPSHOT</version>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>org.scala-lang</groupId>
31+
<artifactId>scala-library</artifactId>
32+
<version>${scala.version}</version>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>com.typesafe.akka</groupId>
37+
<artifactId>akka-actor_${scala.binary.version}</artifactId>
38+
<version>${akka.version}</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>com.typesafe.akka</groupId>
42+
<artifactId>akka-stream_${scala.binary.version}</artifactId>
43+
<version>${akka.version}</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>com.typesafe.akka</groupId>
47+
<artifactId>akka-http-core_${scala.binary.version}</artifactId>
48+
<version>${akka-http.version}</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>com.typesafe.akka</groupId>
52+
<artifactId>akka-http_${scala.binary.version}</artifactId>
53+
<version>${akka-http.version}</version>
54+
</dependency>
55+
</dependencies>
56+
57+
<build>
58+
<plugins>
59+
<plugin>
60+
<groupId>net.alchim31.maven</groupId>
61+
<artifactId>scala-maven-plugin</artifactId>
62+
<version>3.4.4</version>
63+
<configuration>
64+
<recompileMode>incremental</recompileMode>
65+
<args>
66+
<arg>-deprecation</arg>
67+
<arg>-encoding</arg><arg>utf-8</arg>
68+
<arg>-feature</arg>
69+
<arg>-unchecked</arg>
70+
<arg>-Xlint:infer-any</arg>
71+
<arg>-Xlint:missing-interpolator</arg>
72+
<arg>-Xlint:nullary-unit</arg>
73+
<arg>-Xlint:private-shadow</arg>
74+
<arg>-Xlint:type-parameter-shadow</arg>
75+
<arg>-Xlint:unsound-match</arg>
76+
<arg>-Ypartial-unification</arg>
77+
<arg>-Yno-adapted-args</arg>
78+
<arg>-Ywarn-dead-code</arg>
79+
<arg>-Ywarn-extra-implicit</arg>
80+
<arg>-Ywarn-infer-any</arg>
81+
<arg>-Ywarn-nullary-unit</arg>
82+
<arg>-Ywarn-unused:imports</arg>
83+
<arg>-Ywarn-unused:locals</arg>
84+
<arg>-Ywarn-unused:privates</arg>
85+
</args>
86+
<charset>UTF-8</charset>
87+
</configuration>
88+
<executions>
89+
<execution>
90+
<id>scala-compile-first</id>
91+
<phase>process-resources</phase>
92+
<goals>
93+
<goal>add-source</goal>
94+
<goal>compile</goal>
95+
</goals>
96+
</execution>
97+
<execution>
98+
<id>scala-test-compile</id>
99+
<phase>process-test-resources</phase>
100+
<goals>
101+
<goal>testCompile</goal>
102+
</goals>
103+
</execution>
104+
</executions>
105+
</plugin>
106+
</plugins>
107+
</build>
108+
</project>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
com.codeminders.socket-io.server.akka-http {
2+
# Whether or not to allow all origins via CORS
3+
allow-all-origins = true
4+
# A list of CORS allowed origins (only used if 'allow-all-origins' is false)
5+
allowed-origins = []
6+
# Interval at which to expect application-level pings
7+
ping-interval = 25 seconds
8+
# Time after which a connection will be considered dead if a ping has not been received
9+
ping-timeout = 60 seconds
10+
# Maximum size of binary WebSocket messages.
11+
max-binary-message-size = 8192
12+
# Maximum size of text WebSocket messages.
13+
max-text-message-size = 32768
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.spurint.socketio.server.akkahttp
2+
3+
import akka.http.scaladsl.model.{HttpRequest => AkkaHttpRequest}
4+
import akka.stream.Materializer
5+
import akka.stream.scaladsl.{Keep, StreamConverters}
6+
import com.codeminders.socketio.server.HttpRequest
7+
import java.io.{BufferedReader, InputStream, InputStreamReader}
8+
import java.util.Locale
9+
10+
private[akkahttp] class AkkaHttpRequestWrapper(request: AkkaHttpRequest)(implicit materializer: Materializer) extends HttpRequest {
11+
private lazy val query = request.uri.query()
12+
private lazy val entityInputStream = request.entity.dataBytes.toMat(StreamConverters.asInputStream())(Keep.right).run
13+
private lazy val entityReader = new BufferedReader(new InputStreamReader(entityInputStream))
14+
15+
override def getMethod: String = request.method.value
16+
17+
override def getHeader(name: String): String =
18+
request.headers.find(_.lowercaseName == name.toLowerCase(Locale.US)).map(_.value).orNull
19+
20+
override def getContentType: String = request.entity.contentType.value
21+
22+
override def getParameter(name: String): String = query.get(name).orNull
23+
24+
override def getInputStream: InputStream = entityInputStream
25+
26+
override def getReader: BufferedReader = entityReader
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.spurint.socketio.server.akkahttp
2+
3+
import akka.http.scaladsl.model
4+
import akka.http.scaladsl.model._
5+
import akka.http.scaladsl.model.headers.RawHeader
6+
import akka.stream.scaladsl.StreamConverters
7+
import com.codeminders.socketio.server.HttpResponse
8+
import java.io.{OutputStream, PipedInputStream, PipedOutputStream}
9+
import java.nio.charset.StandardCharsets
10+
import java.util.concurrent.ConcurrentHashMap
11+
import scala.collection.JavaConverters._
12+
13+
private[akkahttp] class AkkaHttpResponseWrapper extends HttpResponse with AutoCloseable {
14+
@volatile private var statusCode: StatusCode = StatusCodes.OK
15+
private val headers = new ConcurrentHashMap[String, String]
16+
@volatile private var contentType: Option[String] = None
17+
18+
private lazy val inputStream = new PipedInputStream()
19+
private lazy val outputStream = new PipedOutputStream(inputStream)
20+
private lazy val entitySource = StreamConverters.fromInputStream(() => inputStream)
21+
22+
override def setHeader(name: String, value: String): Unit = headers.put(name, value)
23+
24+
override def setContentType(contentType: String): Unit = this.contentType = Option(contentType)
25+
26+
override def getOutputStream: OutputStream = outputStream
27+
28+
override def sendError(statusCode: Int, message: String): Unit = {
29+
this.statusCode = parseStatusCode(statusCode)
30+
outputStream.write(message.getBytes(StandardCharsets.UTF_8))
31+
close()
32+
}
33+
34+
override def sendError(statusCode: Int): Unit = {
35+
this.statusCode = parseStatusCode(statusCode)
36+
close()
37+
}
38+
39+
override def flushBuffer(): Unit = outputStream.flush()
40+
41+
override def close(): Unit = {
42+
flushBuffer()
43+
outputStream.close()
44+
}
45+
46+
lazy val toAkkaHttpResponse: Either[String, model.HttpResponse] = {
47+
close()
48+
this.contentType.map(s => ContentType.parse(s)).getOrElse(Right(ContentTypes.NoContentType)).map { contentType =>
49+
model.HttpResponse(
50+
status = statusCode,
51+
headers = headers.asScala.map { case (name, value) => RawHeader(name, value) }.to[scala.collection.immutable.Seq],
52+
entity = HttpEntity(contentType, entitySource)
53+
)
54+
}.swap.map { errors => errors.map(_.summary).mkString("; ") }.swap
55+
}
56+
57+
private def parseStatusCode(statusCode: Int): StatusCode = {
58+
StatusCodes.getForKey(statusCode).getOrElse(StatusCodes.custom(statusCode, "", ""))
59+
}
60+
}

0 commit comments

Comments
 (0)