diff --git a/README.md b/README.md index 6ccfcb0..0ea44e8 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,8 @@ -Java backend for `Socket.IO` library (http://socket.io/) +Java/Scala backend for `Socket.IO` library (http://socket.io/) Supports `Socket.IO` clients version 1.0+ -Requires JSR 356-compatible server (tested with Jetty 9 and Tomcat 8) + +There is currently support for JSR 356-compatible servlet containers (tested with Jetty 9 and Tomcat 8) and Akka-HTTP. Right now only websocket and XHR polling transports are implemented. @@ -41,3 +42,21 @@ When Jetty server is embedded into your application, but websocket endpoint is e }); ``` See example in [com.codeminders.socketio.sample.jetty.ChatServer](https://github.com/codeminders/socket.io-server-java/blob/master/samples/jetty/src/main/java/com/codeminders/socketio/sample/jetty/ChatServer.java) + +## Akka-HTTP usage + +The akka-http support revolves around a single exposed class, `SocketIOAkkaHttp`. + +```scala + val socketIO = SocketIOAkkaHttp().fold(sys.error, identity) + + val routes: Route = { + pathPrefix("socket.io") { + socketIO.route + } + } + + Http().bindAndHandle(routes, "localhost", 8080) +``` + +The socket URL can be changed by moving the `.route` call to another area in your routes hierarchy. To configure limits and timeouts, an optional `SocketIOAkkaHttpSettings` instance can be passed to the `SocketIOAkkaHttp()` constructor. diff --git a/pom.xml b/pom.xml index a1daef7..a5af413 100755 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.codeminders.socketio socketio-parent - 1.0.9 + 2.0.0-SNAPSHOT pom Socket.IO Java Server @@ -15,7 +15,7 @@ 2010 - 1.7 + 1.8 UTF-8 @@ -69,6 +69,14 @@ contributor + + Brian Tarricone + brian@tarricone.org + -8 + + contributor + + @@ -92,7 +100,9 @@ socket-io + socket-io-servlet samples + socket-io-akka-http-scala @@ -140,8 +150,6 @@ 2.3.2 UTF-8 - 1.6 - 1.6 ${jdk.version} ${jdk.version} @@ -277,13 +285,24 @@ + + + + javax.servlet + javax.servlet-api + 3.0.1 + provided + + + javax.websocket + javax.websocket-api + 1.1 + provided + + + + - - javax.servlet - javax.servlet-api - 3.0.1 - provided - com.google.guava guava diff --git a/samples/chat/pom.xml b/samples/chat/pom.xml index 94e3450..1db7b58 100644 --- a/samples/chat/pom.xml +++ b/samples/chat/pom.xml @@ -7,7 +7,7 @@ com.codeminders.socketio socketio-sample - 1.0.9 + 2.0.0-SNAPSHOT socketio-sample-chat @@ -19,8 +19,16 @@ com.codeminders.socketio - socket-io - 1.0.9 + socket-io-servlet + 2.0.0-SNAPSHOT + + + javax.servlet + javax.servlet-api + + + javax.websocket + javax.websocket-api diff --git a/samples/chat/src/main/java/com/codeminders/socketio/sample/chat/ChatSocketServlet.java b/samples/chat/src/main/java/com/codeminders/socketio/sample/chat/ChatSocketServlet.java index 250fa4b..0490def 100644 --- a/samples/chat/src/main/java/com/codeminders/socketio/sample/chat/ChatSocketServlet.java +++ b/samples/chat/src/main/java/com/codeminders/socketio/sample/chat/ChatSocketServlet.java @@ -25,7 +25,7 @@ import com.codeminders.socketio.common.DisconnectReason; import com.codeminders.socketio.common.SocketIOException; import com.codeminders.socketio.server.*; -import com.codeminders.socketio.server.transport.websocket.WebsocketIOServlet; +import com.codeminders.socketio.server.servlet.transport.websocket.WebsocketIOServlet; import com.google.common.io.ByteStreams; import javax.servlet.ServletConfig; diff --git a/samples/jetty/pom.xml b/samples/jetty/pom.xml index 89533f2..e14824a 100644 --- a/samples/jetty/pom.xml +++ b/samples/jetty/pom.xml @@ -7,7 +7,7 @@ com.codeminders.socketio socketio-sample - 1.0.9 + 2.0.0-SNAPSHOT socketio-sample-chat-jetty @@ -22,7 +22,7 @@ com.codeminders.socketio socketio-sample-chat - 1.0.9 + 2.0.0-SNAPSHOT classes @@ -43,12 +43,10 @@ javax.servlet javax.servlet-api - 3.1.0 javax.websocket javax.websocket-api - 1.1 diff --git a/samples/jetty/src/main/java/com/codeminders/socketio/sample/jetty/ChatServer.java b/samples/jetty/src/main/java/com/codeminders/socketio/sample/jetty/ChatServer.java index 63cd977..9a00f0f 100644 --- a/samples/jetty/src/main/java/com/codeminders/socketio/sample/jetty/ChatServer.java +++ b/samples/jetty/src/main/java/com/codeminders/socketio/sample/jetty/ChatServer.java @@ -23,7 +23,7 @@ package com.codeminders.socketio.sample.jetty; import com.codeminders.socketio.sample.chat.ChatSocketServlet; -import com.codeminders.socketio.server.transport.websocket.WebsocketTransportConnection; +import com.codeminders.socketio.server.servlet.transport.websocket.WebsocketTransportConnection; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; diff --git a/samples/pom.xml b/samples/pom.xml index e21859f..b9bcd80 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -7,7 +7,7 @@ com.codeminders.socketio socketio-parent - 1.0.9 + 2.0.0-SNAPSHOT socketio-sample diff --git a/samples/tomcat/pom.xml b/samples/tomcat/pom.xml index 3308689..6b81fce 100644 --- a/samples/tomcat/pom.xml +++ b/samples/tomcat/pom.xml @@ -7,7 +7,7 @@ com.codeminders.socketio socketio-sample - 1.0.9 + 2.0.0-SNAPSHOT socketio-sample-chat-tomcat @@ -23,17 +23,15 @@ javax.servlet javax.servlet-api - 3.1.0 javax.websocket javax.websocket-api - 1.1 com.codeminders.socketio socketio-sample-chat - 1.0.9 + 2.0.0-SNAPSHOT classes diff --git a/socket-io-akka-http-scala/pom.xml b/socket-io-akka-http-scala/pom.xml new file mode 100644 index 0000000..5d38647 --- /dev/null +++ b/socket-io-akka-http-scala/pom.xml @@ -0,0 +1,157 @@ + + + 4.0.0 + + + socketio-parent + com.codeminders.socketio + 2.0.0-SNAPSHOT + + + socket-io-akka-http-scala_${scala.binary.version} + jar + + + 2.13 + 2.13.2 + 2.6.5 + 10.1.12 + + + + + scala-2.13 + + true + + + 2.13 + 2.13.2 + + + + scala-2.12 + + 2.12 + 2.12.10 + + + + + net.alchim31.maven + scala-maven-plugin + + incremental + + -Ypartial-unification + -Xlint:unsound-match + -Yno-adapted-args + -Ywarn-infer-any + -Ywarn-nullary-unit + + + + + + + + + + + com.codeminders.socketio + socket-io + 2.0.0-SNAPSHOT + + + + org.scala-lang + scala-library + ${scala.version} + + + + com.typesafe.akka + akka-actor_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-stream_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-http-core_${scala.binary.version} + ${akka-http.version} + + + com.typesafe.akka + akka-http_${scala.binary.version} + ${akka-http.version} + + + + + target-${scala.binary.version} + target-${scala.binary.version}/classes + target-${scala.binary.version}/test-classes + + + net.alchim31.maven + scala-maven-plugin + 4.3.1 + + incremental + + -deprecation + -encodingutf-8 + -feature + -unchecked + -Xlint:infer-any + -Xlint:missing-interpolator + -Xlint:nullary-unit + -Xlint:private-shadow + -Xlint:type-parameter-shadow + -Ywarn-dead-code + -Ywarn-extra-implicit + -Ywarn-unused:imports + -Ywarn-unused:locals + -Ywarn-unused:privates + + UTF-8 + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.spurint.maven.plugins + scala-cross-maven-plugin + 0.2.1 + + + rewrite-pom + + rewrite-pom + + + + + + + diff --git a/socket-io-akka-http-scala/src/main/resources/reference.conf b/socket-io-akka-http-scala/src/main/resources/reference.conf new file mode 100644 index 0000000..f853950 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/resources/reference.conf @@ -0,0 +1,10 @@ +com.codeminders.socket-io.server.akka-http { + # Whether or not to allow all origins via CORS + allow-all-origins = true + # A list of CORS allowed origins (only used if 'allow-all-origins' is false) + allowed-origins = [] + # Interval at which to expect application-level pings + ping-interval = 25 seconds + # Time after which a connection will be considered dead if a ping has not been received + ping-timeout = 60 seconds +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/AkkaHttpRequestWrapper.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/AkkaHttpRequestWrapper.scala new file mode 100644 index 0000000..0dd4c4d --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/AkkaHttpRequestWrapper.scala @@ -0,0 +1,27 @@ +package org.spurint.socketio.server.akkahttp + +import akka.http.scaladsl.model.{HttpRequest => AkkaHttpRequest} +import akka.stream.Materializer +import akka.stream.scaladsl.{Keep, StreamConverters} +import com.codeminders.socketio.server.HttpRequest +import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.util.Locale + +private[akkahttp] class AkkaHttpRequestWrapper(request: AkkaHttpRequest)(implicit materializer: Materializer) extends HttpRequest { + private lazy val query = request.uri.query() + private lazy val entityInputStream = request.entity.dataBytes.toMat(StreamConverters.asInputStream())(Keep.right).run + private lazy val entityReader = new BufferedReader(new InputStreamReader(entityInputStream)) + + override def getMethod: String = request.method.value + + override def getHeader(name: String): String = + request.headers.find(_.lowercaseName == name.toLowerCase(Locale.US)).map(_.value).orNull + + override def getContentType: String = request.entity.contentType.value + + override def getParameter(name: String): String = query.get(name).orNull + + override def getInputStream: InputStream = entityInputStream + + override def getReader: BufferedReader = entityReader +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/AkkaHttpResponseWrapper.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/AkkaHttpResponseWrapper.scala new file mode 100644 index 0000000..1225f38 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/AkkaHttpResponseWrapper.scala @@ -0,0 +1,60 @@ +package org.spurint.socketio.server.akkahttp + +import akka.http.scaladsl.model +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.RawHeader +import akka.stream.scaladsl.StreamConverters +import com.codeminders.socketio.server.HttpResponse +import java.io.{OutputStream, PipedInputStream, PipedOutputStream} +import java.nio.charset.StandardCharsets +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ + +private[akkahttp] class AkkaHttpResponseWrapper extends HttpResponse with AutoCloseable { + @volatile private var statusCode: StatusCode = StatusCodes.OK + private val headers = new ConcurrentHashMap[String, String] + @volatile private var contentType: Option[String] = None + + private lazy val inputStream = new PipedInputStream() + private lazy val outputStream = new PipedOutputStream(inputStream) + private lazy val entitySource = StreamConverters.fromInputStream(() => inputStream) + + override def setHeader(name: String, value: String): Unit = headers.put(name, value) + + override def setContentType(contentType: String): Unit = this.contentType = Option(contentType) + + override def getOutputStream: OutputStream = outputStream + + override def sendError(statusCode: Int, message: String): Unit = { + this.statusCode = parseStatusCode(statusCode) + outputStream.write(message.getBytes(StandardCharsets.UTF_8)) + close() + } + + override def sendError(statusCode: Int): Unit = { + this.statusCode = parseStatusCode(statusCode) + close() + } + + override def flushBuffer(): Unit = outputStream.flush() + + override def close(): Unit = { + flushBuffer() + outputStream.close() + } + + lazy val toAkkaHttpResponse: Either[String, model.HttpResponse] = { + close() + this.contentType.map(s => ContentType.parse(s)).getOrElse(Right(ContentTypes.NoContentType)).map { contentType => + model.HttpResponse( + status = statusCode, + headers = headers.asScala.map({ case (name, value) => RawHeader(name, value) }).toList, + entity = HttpEntity(contentType, entitySource) + ) + }.swap.map { errors => errors.map(_.summary).mkString("; ") }.swap + } + + private def parseStatusCode(statusCode: Int): StatusCode = { + StatusCodes.getForKey(statusCode).getOrElse(StatusCodes.custom(statusCode, "", "")) + } +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/SocketIOAkkaHttp.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/SocketIOAkkaHttp.scala new file mode 100644 index 0000000..3721932 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/SocketIOAkkaHttp.scala @@ -0,0 +1,172 @@ +package org.spurint.socketio.server.akkahttp + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.Connection +import akka.http.scaladsl.model.ws.UpgradeToWebSocket +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.stream.Materializer +import akka.stream.scaladsl.StreamConverters +import com.codeminders.socketio.common.SocketIOException +import com.codeminders.socketio.protocol.SocketIOProtocol +import com.codeminders.socketio.server.{Namespace, SocketIOManager, UnsupportedTransportException} +import org.spurint.socketio.server.akkahttp.transport.{AkkaHttpTransportProvider, AkkaHttpWebSocketConnection, AkkaHttpWebSocketTransport} +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.util.control.NonFatal + +object SocketIOAkkaHttp { + private final val ERROR_HEADERS: immutable.Seq[HttpHeader] = immutable.Seq( + Connection("close") + ) + + /** + * Creates a new [[SocketIOAkkaHttp]] object. + * + * This is the main entry point for embedding a Socket.IO server in an akka-http server. + * + * @param settings an (optional) [[SocketIOAkkaHttpSettings]] instance + * @param ec an [[ExecutionContext]] + * @param system an [[ActorSystem]] + * @param mat a [[Materializer]] + * @return a [[Right]] containing a new [[SocketIOAkkaHttp]] instance, or a [[Left]] of [[String]] containing an error message + */ + def apply(settings: SocketIOAkkaHttpSettings)(implicit ec: ExecutionContext, system: ActorSystem, mat: Materializer): Either[String, SocketIOAkkaHttp] = { + try { + Right(new SocketIOAkkaHttp(settings)) + } catch { + case NonFatal(e) => Left(e.getMessage) + } + } + + /** + * Creates a new [[SocketIOAkkaHttp]] object. + * + * This is the main entry point for embedding a Socket.IO server in an akka-http server. + * + * Default settings are applied. + * + * @param ec an [[ExecutionContext]] + * @param system an [[ActorSystem]] + * @param mat a [[Materializer]] + * @return a [[Right]] containing a new [[SocketIOAkkaHttp]] instance, or a [[Left]] of [[String]] containing an error message + */ + def apply()(implicit ec: ExecutionContext, system: ActorSystem, mat: Materializer): Either[String, SocketIOAkkaHttp] = { + apply(SocketIOAkkaHttpSettings()) + } +} + +class SocketIOAkkaHttp private (settings: SocketIOAkkaHttpSettings)(implicit ec: ExecutionContext, system: ActorSystem, mat: Materializer) { + import SocketIOAkkaHttp._ + + private val logger = Logging.getLogger(system, getClass) + + private val transportProvider = new AkkaHttpTransportProvider(settings) + transportProvider.init() + SocketIOManager.getInstance.setTransportProvider(transportProvider) + of(SocketIOProtocol.DEFAULT_NAMESPACE) + + /** + * Returns or creates a namespace with the specified name. + * + * @param namespace the namespace name + * @return a [[Namespace]] + */ + def of(namespace: String): Namespace = { + Option(SocketIOManager.getInstance.getNamespace(namespace)) + .getOrElse(SocketIOManager.getInstance.createNamespace(namespace)) + } + + /** + * Shuts down this [[SocketIOAkkaHttp]] instance. + */ + def destroy(): Unit = { + SocketIOManager.getInstance.setTransportProvider(null) + transportProvider.destroy() + } + + /** + * The Akka-HTTP [[Route]] that should be added to the appropriate place in your route structure. + * + * For example, you might add to your route structure as so: + * {{{ + * pathPrefix("socket.io") { + * socketIoAkkaHttp.route + * } + * }}} + * + * Note that you should use [[pathPrefix]], as often clients will send trailing slashes, and will + * also send further path components to access non-default namespaces. + */ + val route: Route = { + (get & pathSuffix("socket.io.js")) { + complete(HttpResponse( + status = StatusCodes.OK, + entity = HttpEntity( + MediaTypes.`application/javascript`.withCharset(HttpCharsets.`UTF-8`), + StreamConverters.fromInputStream(() => getClass.getClassLoader.getResourceAsStream("com/codeminders/socketio/socket.io.js")) + ) + )) + } ~ + (get | post | options) { + extractRequest { request => + extractUpgradeToWebSocket { upgrade => + complete(handle(request, Some(upgrade))) + } ~ + complete(handle(request, maybeUpgrade = None)) + } + } + } + + private def handle(request: HttpRequest, maybeUpgrade: Option[UpgradeToWebSocket]): HttpResponse = { + val requestWrapper = new AkkaHttpRequestWrapper(request) + val responseWrapper = new AkkaHttpResponseWrapper() + + try { + val transport = SocketIOManager.getInstance + .getTransportProvider + .getTransport(requestWrapper) + + val connection = Option(transport.handle(requestWrapper, responseWrapper, SocketIOManager.getInstance)) + (connection, maybeUpgrade) match { + case (Some(wsConnection: AkkaHttpWebSocketConnection), Some(upgrade)) => + responseWrapper.toAkkaHttpResponse.map(_.entity.discardBytes()) + upgrade.handleMessagesWithSinkSource(wsConnection.incomingFlow, wsConnection.outgoingFlow) + + case (Some(wsConnection: AkkaHttpWebSocketTransport), None) => + logger.info("Got WebSocket transport but no upgrade") + responseWrapper.toAkkaHttpResponse.map(_.entity.discardBytes()) + wsConnection.abort() + HttpResponse(StatusCodes.BadRequest, headers = ERROR_HEADERS) + + case (Some(conn), None) => + responseWrapper.toAkkaHttpResponse.fold({ err => + logger.warning("Failed to create HTTP response: {}", err) + conn.abort() + HttpResponse(StatusCodes.InternalServerError, headers = ERROR_HEADERS) + }, identity) + + case (Some(conn), Some(_)) => + responseWrapper.toAkkaHttpResponse.map(_.entity.discardBytes()) + conn.abort() + HttpResponse(StatusCodes.BadRequest, headers = ERROR_HEADERS) + + case _ => + responseWrapper.toAkkaHttpResponse.map(_.entity.discardBytes()) + HttpResponse(StatusCodes.InternalServerError, headers = ERROR_HEADERS) + } + } catch { + case e: UnsupportedTransportException => + logger.info("Unsupported socket.io transport: {}", e.getMessage) + HttpResponse(StatusCodes.BadRequest, headers = ERROR_HEADERS) + case e: SocketIOException => + logger.warning("Failed to process socket.io request: {}", e.getMessage) + HttpResponse(StatusCodes.BadRequest, headers = ERROR_HEADERS) + case NonFatal(e) => + logger.error(e, "Unexpected exception processing socket.io request") + HttpResponse(StatusCodes.InternalServerError, headers = ERROR_HEADERS) + } + } +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/SocketIOAkkaHttpSettings.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/SocketIOAkkaHttpSettings.scala new file mode 100644 index 0000000..a671c19 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/SocketIOAkkaHttpSettings.scala @@ -0,0 +1,45 @@ +package org.spurint.socketio.server.akkahttp + +import com.typesafe.config.{Config, ConfigFactory} +import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration + +object SocketIOAkkaHttpSettings { + private val REFERENCE_CONFIG_PATH = "com.codeminders.socket-io.server.akka-http" + + /** + * Constructs a new [[SocketIOAkkaHttpSettings]] instance based on default settings + * @return a new [[SocketIOAkkaHttpSettings]] + */ + def apply(): SocketIOAkkaHttpSettings = apply(ConfigFactory.load().getConfig(REFERENCE_CONFIG_PATH)) + + /** + * Constructs a new [[SocketIOAkkaHttpSettings]] instance based on values provided in the Typesafe Config + * + * @param config a Typesafe Config object + * @return a new [[SocketIOAkkaHttpSettings]] + */ + def apply(config: Config): SocketIOAkkaHttpSettings = { + val configWithFallbacks = config.withFallback(ConfigFactory.defaultReference().getConfig(REFERENCE_CONFIG_PATH)) + SocketIOAkkaHttpSettings( + allowAllOrigins = configWithFallbacks.getBoolean("allow-all-origins"), + allowedOrigins = configWithFallbacks.getStringList("allowed-origins").asScala.toList, + pingInterval = FiniteDuration(configWithFallbacks.getDuration("ping-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS), + pingTimeout = FiniteDuration(configWithFallbacks.getDuration("ping-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS), + ) + } +} + +/** + * Object describing settings for the Akka-HTTP Socket.IO adapter. + * + * @param allowAllOrigins whether or not to allow all origins via CORS + * @param allowedOrigins a list of CORS allowed origins (only used if [[allowAllOrigins]] is false) + * @param pingInterval interval at which to expect application-level pings + * @param pingTimeout time after which a connection will be considered dead if a ping has not been received + */ +case class SocketIOAkkaHttpSettings(allowAllOrigins: Boolean, + allowedOrigins: Seq[String], + pingInterval: FiniteDuration, + pingTimeout: FiniteDuration) diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/TypesafeBasedConfig.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/TypesafeBasedConfig.scala new file mode 100644 index 0000000..03952ee --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/TypesafeBasedConfig.scala @@ -0,0 +1,46 @@ +package org.spurint.socketio.server.akkahttp + +import com.codeminders.socketio.server.Config + +private[akkahttp] class TypesafeBasedConfig(namespace: String, settings: SocketIOAkkaHttpSettings) extends Config { + override def getPingInterval(default: Long): Long = settings.pingInterval.toMillis + + override def getTimeout(default: Long): Long = settings.pingTimeout.toMillis + + override def getBufferSize: Int = Config.DEFAULT_BUFFER_SIZE + + override def getMaxIdle: Int = Config.DEFAULT_MAX_IDLE + + override def getString(key: String): String = key match { + case "allowedOrigins" => settings.allowedOrigins.mkString(",") + case _ => null + } + + override def getString(key: String, default: String): String = key match { + case "allowedOrigins" => settings.allowedOrigins.mkString(",") + case _ => default + } + + override def getInt(key: String, default: Int): Int = key match { + case Config.MAX_TEXT_MESSAGE_SIZE => default + case Config.BUFFER_SIZE => getBufferSize + case Config.MAX_IDLE => getMaxIdle + case _ => default + } + + override def getLong(key: String, default: Long): Long = key match { + case Config.PING_INTERVAL => getPingInterval(default) + case Config.TIMEOUT => getTimeout(default) + case Config.MAX_TEXT_MESSAGE_SIZE => default + case Config.BUFFER_SIZE => getBufferSize + case Config.MAX_IDLE => getMaxIdle + case _ => default + } + + override def getBoolean(key: String, default: Boolean): Boolean = key match { + case "allowAllOrigins" => settings.allowAllOrigins + case _ => default + } + + override def getNamespace: String = namespace +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpPollingTransport.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpPollingTransport.scala new file mode 100644 index 0000000..a25ec04 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpPollingTransport.scala @@ -0,0 +1,42 @@ +package org.spurint.socketio.server.akkahttp.transport + +import akka.http.scaladsl.model.StatusCodes +import com.codeminders.socketio.common.ConnectionState +import com.codeminders.socketio.protocol.EngineIOProtocol +import com.codeminders.socketio.server.{Config, TransportType, _} +import org.spurint.socketio.server.akkahttp.SocketIOAkkaHttpSettings + +private[akkahttp] abstract class AkkaHttpPollingTransport(settings: SocketIOAkkaHttpSettings) extends AkkaHttpTransport(settings) { + override def handle(request: HttpRequest, response: HttpResponse, socketIOManager: SocketIOManager): TransportConnection = { + val connection = getConnection(request, socketIOManager) + val session = connection.getSession + + session.getConnectionState match { + case ConnectionState.CONNECTING => + val upgrades = List( + Option(socketIOManager.getTransportProvider.getTransport(TransportType.WEB_SOCKET)) + .map(_ => TransportType.WEB_SOCKET.toString) + ).flatten + + connection.send( + EngineIOProtocol.createHandshakePacket( + session.getSessionId, + upgrades.toArray, + getConfig.getPingInterval(Config.DEFAULT_PING_INTERVAL), + getConfig.getTimeout(Config.DEFAULT_PING_TIMEOUT) + ) + ) + connection.handle(request, response) // called to send the handshake packet + + session.onConnect(connection) + + case ConnectionState.CONNECTED => + connection.handle(request, response) + + case _ => + response.sendError(StatusCodes.Gone.intValue, "Socket.IO session is closed") + } + + connection + } +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpTransport.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpTransport.scala new file mode 100644 index 0000000..c34792f --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpTransport.scala @@ -0,0 +1,9 @@ +package org.spurint.socketio.server.akkahttp.transport + +import com.codeminders.socketio.server.Config +import com.codeminders.socketio.server.transport.AbstractTransport +import org.spurint.socketio.server.akkahttp.{SocketIOAkkaHttpSettings, TypesafeBasedConfig} + +private[akkahttp] abstract class AkkaHttpTransport(settings: SocketIOAkkaHttpSettings) extends AbstractTransport { + override protected val getConfig: Config = new TypesafeBasedConfig(getType.name, settings) +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpTransportProvider.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpTransportProvider.scala new file mode 100644 index 0000000..97c524b --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpTransportProvider.scala @@ -0,0 +1,19 @@ +package org.spurint.socketio.server.akkahttp.transport + +import akka.stream.Materializer +import com.codeminders.socketio.server.Transport +import com.codeminders.socketio.server.transport.AbstractTransportProvider +import org.spurint.socketio.server.akkahttp.SocketIOAkkaHttpSettings +import scala.concurrent.ExecutionContext + +private[akkahttp] class AkkaHttpTransportProvider(settings: SocketIOAkkaHttpSettings) + (implicit ec: ExecutionContext, + mat: Materializer) + extends AbstractTransportProvider +{ + override protected def createXHRPollingTransport(): Transport = new AkkaHttpXHRPollingTransport(settings) + + override protected def createJSONPPollingTransport(): Transport = null + + override protected def createWebSocketTransport(): Transport = new AkkaHttpWebSocketTransport(settings) +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpWebSocketConnection.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpWebSocketConnection.scala new file mode 100644 index 0000000..8814f08 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpWebSocketConnection.scala @@ -0,0 +1,113 @@ +package org.spurint.socketio.server.akkahttp.transport + +import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} +import akka.http.scaladsl.util.FastFuture +import akka.stream._ +import akka.stream.scaladsl.{Keep, StreamConverters} +import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} +import akka.util.ByteString +import com.codeminders.socketio.common.SocketIOException +import com.codeminders.socketio.server.Transport +import com.codeminders.socketio.server.transport.websocket.AbstractWebsocketTransportConnection +import org.spurint.socketio.server.akkahttp.util.MessageQueue +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success, Try} + +private[akkahttp] class AkkaHttpWebSocketConnection(transport: Transport) + (implicit ec: ExecutionContext, + mat: Materializer) + extends AbstractWebsocketTransportConnection(transport) +{ + private val outgoingQueue = new MessageQueue[Message](128) + + override def abort(): Unit = { + super.abort() + outgoingQueue.close() + } + + override protected def sendString(data: String): Unit = outgoingQueue.offer(TextMessage(data)) + + override protected def sendBinary(data: Array[Byte]): Unit = outgoingQueue.offer(BinaryMessage(ByteString(data))) + + lazy val outgoingFlow: Graph[SourceShape[Message], Any] = new GraphStage[SourceShape[Message]] { + val out = Outlet[Message]("Message.out") + + override def shape: SourceShape[Message] = SourceShape.of(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def preStart(): Unit = sendHandshake() + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + val messageCallback = getAsyncCallback[Message](push(out, _)) + val closeCallback = getAsyncCallback[Unit](_ => completeStage()) + val errorCallback = getAsyncCallback[Throwable] { t => + failStage(t) + abort() + } + + outgoingQueue.take().onComplete { + case Success(message) => messageCallback.invoke(message) + case Failure(MessageQueue.QueueClosed) => closeCallback.invoke(()) + case Failure(t) => errorCallback.invoke(t) + } + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + super.onDownstreamFinish(cause) + outgoingQueue.close() + } + }) + } + } + + lazy val incomingFlow: Graph[SinkShape[Message], Any] = new GraphStage[SinkShape[Message]] { + val in = Inlet[Message]("Message.in") + + override def shape: SinkShape[Message] = SinkShape.of(in) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def preStart(): Unit = pull(in) + + val closedCallback = getAsyncCallback[Unit](_ => completeStage()) + outgoingQueue.closedFuture.onComplete(_ => closedCallback.invoke(())) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val successCallback = getAsyncCallback[Unit](_ => pull(in)) + val errorCallback = getAsyncCallback[Throwable] { t => + failStage(t) + abort() + } + + (grab(in) match { + case tm: TextMessage => + tm.textStream + .runFold(new StringBuilder)((accum, next) => accum.append(next)) + .map(_.toString) + .map(handleTextFrame) + + case bm: BinaryMessage => + FastFuture(Try( + handleBinaryFrame(bm.dataStream.toMat(StreamConverters.asInputStream())(Keep.right).run()) + )) + }).onComplete { + case Success(wasSuccessful) if wasSuccessful => successCallback.invoke(()) + case Success(_) => errorCallback.invoke(new SocketIOException("Failed to decode incoming message")) + case Failure(t) => errorCallback.invoke(t) + } + } + + override def onUpstreamFinish(): Unit = { + outgoingQueue.close() + super.onUpstreamFinish() + } + + override def onUpstreamFailure(t: Throwable): Unit = { + super.onUpstreamFailure(t) + abort() + } + }) + } + } +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpWebSocketTransport.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpWebSocketTransport.scala new file mode 100644 index 0000000..8800805 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpWebSocketTransport.scala @@ -0,0 +1,35 @@ +package org.spurint.socketio.server.akkahttp.transport +import akka.stream.Materializer +import com.codeminders.socketio.server._ +import org.spurint.socketio.server.akkahttp.SocketIOAkkaHttpSettings +import scala.concurrent.ExecutionContext + +private[akkahttp] class AkkaHttpWebSocketTransport(settings: SocketIOAkkaHttpSettings) + (implicit ec: ExecutionContext, + mat: Materializer) + extends AkkaHttpTransport(settings) +{ + override def init(): Unit = () + + override def destroy(): Unit = () + + override def getType: TransportType = TransportType.WEB_SOCKET + + override def handle(request: HttpRequest, response: HttpResponse, socketIOManager: SocketIOManager): TransportConnection = { + val connection = getConnection(request, socketIOManager) + connection match { + case connection: AkkaHttpWebSocketConnection => + connection.setRequest(request) + case connection => + connection.abort() + } + // the caller will handle starting up the web socket connection and returning an apprpriate response + connection + } + + override def createConnection(): TransportConnection = new AkkaHttpWebSocketConnection(this) + + override def getConnection(request: HttpRequest, sessionManager: SocketIOManager): TransportConnection = { + super.getConnection(request, sessionManager) + } +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpXHRPollingTransport.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpXHRPollingTransport.scala new file mode 100644 index 0000000..890e6aa --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/transport/AkkaHttpXHRPollingTransport.scala @@ -0,0 +1,16 @@ +package org.spurint.socketio.server.akkahttp.transport + +import com.codeminders.socketio.server.transport.XHRTransportConnection +import com.codeminders.socketio.server.{TransportConnection, TransportType} +import org.spurint.socketio.server.akkahttp.SocketIOAkkaHttpSettings + +private[akkahttp] class AkkaHttpXHRPollingTransport(settings: SocketIOAkkaHttpSettings) extends AkkaHttpPollingTransport(settings) { + override def init(): Unit = () + + override def destroy(): Unit = () + + override def getType: TransportType = TransportType.XHR_POLLING + + override def createConnection(): TransportConnection = new XHRTransportConnection(this) + +} diff --git a/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/util/MessageQueue.scala b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/util/MessageQueue.scala new file mode 100644 index 0000000..3e94187 --- /dev/null +++ b/socket-io-akka-http-scala/src/main/scala/org/spurint/socketio/server/akkahttp/util/MessageQueue.scala @@ -0,0 +1,78 @@ +package org.spurint.socketio.server.akkahttp.util + +import java.util.concurrent.ArrayBlockingQueue +import scala.concurrent.{Future, Promise} + +private[akkahttp] object MessageQueue { + case object QueueClosed extends RuntimeException +} + +private[akkahttp] class MessageQueue[T](depth: Int) extends AutoCloseable { + private val futureQueue = new ArrayBlockingQueue[Future[T]](depth) + @volatile private var pendingPromise: Option[Promise[T]] = None + @volatile private var closed: Boolean = false + + private val _closed = Promise[Unit]() + val closedFuture: Future[Unit] = _closed.future + + def offer(elem: T): Boolean = { + synchronized { + if (closed) { + false + } else { + pendingPromise.map { promise => + promise.success(elem) + pendingPromise = None + true + }.getOrElse( + futureQueue.offer(Future.successful(elem)) + ) + } + } + } + + def error(ex: Throwable): Boolean = { + synchronized { + if (closed) { + false + } else { + pendingPromise.map { promise => + promise.failure(ex) + pendingPromise = None + close() + true + }.getOrElse { + futureQueue.offer(Future.failed(ex)) + close() + true + } + } + } + } + + def take(): Future[T] = { + synchronized { + Option(futureQueue.poll()).getOrElse { + if (closed) { + Future.failed(MessageQueue.QueueClosed) + } else { + assert(pendingPromise.isEmpty) + val promise = Promise[T]() + pendingPromise = Some(promise) + promise.future + } + } + } + } + + override def close(): Unit = { + synchronized { + closed = true + pendingPromise.foreach { promise => + promise.failure(MessageQueue.QueueClosed) + pendingPromise = None + } + } + _closed.trySuccess(()) + } +} diff --git a/socket-io-servlet/pom.xml b/socket-io-servlet/pom.xml new file mode 100644 index 0000000..fd1beae --- /dev/null +++ b/socket-io-servlet/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + + socketio-parent + com.codeminders.socketio + 2.0.0-SNAPSHOT + + + socket-io-servlet + jar + + Socket.IO Server Servlet Transport + Servlet container support for Socket.IO + + + + com.codeminders.socketio + socket-io + 2.0.0-SNAPSHOT + + + javax.servlet + javax.servlet-api + + + javax.websocket + javax.websocket-api + + + diff --git a/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/HttpServletRequestWrapper.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/HttpServletRequestWrapper.java new file mode 100644 index 0000000..1df3a93 --- /dev/null +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/HttpServletRequestWrapper.java @@ -0,0 +1,55 @@ +package com.codeminders.socketio.server.servlet; + +import com.codeminders.socketio.server.HttpRequest; + +import javax.servlet.http.HttpServletRequest; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +public class HttpServletRequestWrapper implements HttpRequest { + private final HttpServletRequest request; + + public HttpServletRequestWrapper(HttpServletRequest request) + { + this.request = request; + } + + @Override + public String getMethod() + { + return request.getMethod(); + } + + @Override + public String getHeader(String name) + { + return request.getHeader(name); + } + + @Override + public String getContentType() + { + return request.getContentType(); + } + + @Override + public String getParameter(String name) + { + return request.getParameter(name); + } + + @Override + public InputStream getInputStream() + throws IOException + { + return request.getInputStream(); + } + + @Override + public BufferedReader getReader() + throws IOException + { + return request.getReader(); + } +} diff --git a/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/HttpServletResponseWrapper.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/HttpServletResponseWrapper.java new file mode 100644 index 0000000..961bc6a --- /dev/null +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/HttpServletResponseWrapper.java @@ -0,0 +1,56 @@ +package com.codeminders.socketio.server.servlet; + +import com.codeminders.socketio.server.HttpResponse; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.OutputStream; + +public class HttpServletResponseWrapper implements HttpResponse { + private final HttpServletResponse response; + + public HttpServletResponseWrapper(HttpServletResponse response) + { + this.response = response; + } + + @Override + public void setHeader(String name, String value) + { + response.setHeader(name, value); + } + + @Override + public void setContentType(String contentType) + { + response.setContentType(contentType); + } + + @Override + public OutputStream getOutputStream() + throws IOException + { + return response.getOutputStream(); + } + + @Override + public void sendError(int statusCode, String message) + throws IOException + { + response.sendError(statusCode, message); + } + + @Override + public void sendError(int statusCode) + throws IOException + { + response.sendError(statusCode); + } + + @Override + public void flushBuffer() + throws IOException + { + response.flushBuffer(); + } +} diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/ServletBasedConfig.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/ServletBasedConfig.java similarity index 96% rename from socket-io/src/main/java/com/codeminders/socketio/server/ServletBasedConfig.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/ServletBasedConfig.java index df0162b..87a6dfc 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/ServletBasedConfig.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/ServletBasedConfig.java @@ -22,10 +22,11 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server; +package com.codeminders.socketio.server.servlet; + +import com.codeminders.socketio.server.Config; import javax.servlet.ServletConfig; -import java.util.logging.Logger; /** * @author Mathieu Carbou diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/SocketIOServlet.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/SocketIOServlet.java similarity index 88% rename from socket-io/src/main/java/com/codeminders/socketio/server/SocketIOServlet.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/SocketIOServlet.java index 9b6923c..f87f066 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/SocketIOServlet.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/SocketIOServlet.java @@ -23,10 +23,15 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server; +package com.codeminders.socketio.server.servlet; import com.codeminders.socketio.protocol.EngineIOProtocol; import com.codeminders.socketio.protocol.SocketIOProtocol; +import com.codeminders.socketio.server.Namespace; +import com.codeminders.socketio.server.SocketIOManager; +import com.codeminders.socketio.server.SocketIOProtocolException; +import com.codeminders.socketio.server.TransportProvider; +import com.codeminders.socketio.server.UnsupportedTransportException; import com.google.common.io.ByteStreams; import javax.servlet.ServletException; @@ -132,6 +137,9 @@ private void serve(HttpServletRequest request, HttpServletResponse response) { assert (SocketIOManager.getInstance().getTransportProvider() != null); + HttpServletRequestWrapper requestWrapper = new HttpServletRequestWrapper(request); + HttpServletResponseWrapper responseWrapper = new HttpServletResponseWrapper(response); + try { if (LOGGER.isLoggable(Level.FINE)) @@ -142,8 +150,8 @@ private void serve(HttpServletRequest request, HttpServletResponse response) SocketIOManager.getInstance(). getTransportProvider(). - getTransport(request). - handle(request, response, SocketIOManager.getInstance()); + getTransport(requestWrapper). + handle(requestWrapper, responseWrapper, SocketIOManager.getInstance()); } catch (UnsupportedTransportException | SocketIOProtocolException e) { diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractHttpTransport.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractHttpTransport.java similarity index 75% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractHttpTransport.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractHttpTransport.java index e213357..c0401a9 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractHttpTransport.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractHttpTransport.java @@ -22,29 +22,38 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport; +package com.codeminders.socketio.server.servlet.transport; import com.codeminders.socketio.common.ConnectionState; import com.codeminders.socketio.protocol.EngineIOProtocol; -import com.codeminders.socketio.server.*; +import com.codeminders.socketio.server.Config; +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.HttpResponse; +import com.codeminders.socketio.server.Session; +import com.codeminders.socketio.server.SocketIOManager; +import com.codeminders.socketio.server.TransportConnection; +import com.codeminders.socketio.server.TransportType; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; -import javax.servlet.http.HttpServletRequest; +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.ArrayList; import java.util.logging.Level; import java.util.logging.Logger; -public abstract class AbstractHttpTransport extends AbstractTransport +public abstract class AbstractHttpTransport extends AbstractServletTransport { private static final Logger LOGGER = Logger.getLogger(AbstractHttpTransport.class.getName()); + public AbstractHttpTransport(ServletConfig servletConfig, ServletContext servletContext) { + super(servletConfig, servletContext); + } + @Override - public void handle(HttpServletRequest request, - HttpServletResponse response, - SocketIOManager socketIOManager) + public TransportConnection handle(HttpRequest request, + HttpResponse response, + SocketIOManager socketIOManager) throws IOException { if (LOGGER.isLoggable(Level.FINE)) @@ -74,5 +83,7 @@ else if (session.getConnectionState() == ConnectionState.CONNECTED) } else response.sendError(HttpServletResponse.SC_GONE, "Socket.IO session is closed"); + + return connection; } } diff --git a/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractServletTransport.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractServletTransport.java new file mode 100644 index 0000000..4ebfe1b --- /dev/null +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractServletTransport.java @@ -0,0 +1,64 @@ +/** + * The MIT License + * Copyright (c) 2015 Alexander Sova (bird@codeminders.com) + *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + *

+ * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.codeminders.socketio.server.servlet.transport; + +import com.codeminders.socketio.server.Config; +import com.codeminders.socketio.server.servlet.ServletBasedConfig; +import com.codeminders.socketio.server.transport.AbstractTransport; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; + +/** + * @author Alexander Sova (bird@codeminders.com) + * @author Mathieu Carbou + */ +public abstract class AbstractServletTransport extends AbstractTransport +{ + private final ServletConfig servletConfig; + private final ServletContext servletContext; + + private Config config; + + protected AbstractServletTransport(ServletConfig servletConfig, ServletContext servletContext) { + this.servletConfig = servletConfig; + this.servletContext = servletContext; + } + + @Override + public void destroy() + { + } + + @Override + public void init() + { + this.config = new ServletBasedConfig(this.servletConfig, getType().toString()); + } + + @Override + protected Config getConfig() + { + return this.config; + } +} diff --git a/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractServletTransportProvider.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractServletTransportProvider.java new file mode 100644 index 0000000..27bdf8f --- /dev/null +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/AbstractServletTransportProvider.java @@ -0,0 +1,38 @@ +package com.codeminders.socketio.server.servlet.transport; + +import com.codeminders.socketio.server.Transport; +import com.codeminders.socketio.server.transport.AbstractTransportProvider; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; + +/** + * @author Alexander Sova (bird@codeminders.com) + */ +public abstract class AbstractServletTransportProvider extends AbstractTransportProvider { + private final ServletConfig servletConfig; + private final ServletContext servletContext; + + protected AbstractServletTransportProvider(ServletConfig servletConfig, ServletContext servletContext) { + this.servletConfig = servletConfig; + this.servletContext = servletContext; + } + + @Override + protected Transport createXHRPollingTransport() + { + return new XHRPollingTransport(servletConfig, servletContext); + } + + @Override + protected Transport createJSONPPollingTransport() + { + return null; + } + + @Override + protected Transport createWebSocketTransport() + { + return null; + } +} diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/JSONPPollingTransport.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/JSONPPollingTransport.java similarity index 69% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/JSONPPollingTransport.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/JSONPPollingTransport.java index 963cd01..c5bd11a 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/JSONPPollingTransport.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/JSONPPollingTransport.java @@ -23,23 +23,28 @@ * THE SOFTWARE. * */ -package com.codeminders.socketio.server.transport; +package com.codeminders.socketio.server.servlet.transport; import com.codeminders.socketio.protocol.EngineIOProtocol; -import com.codeminders.socketio.server.SocketIOProtocolException; +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.HttpResponse; import com.codeminders.socketio.server.Session; +import com.codeminders.socketio.server.SocketIOProtocolException; import com.codeminders.socketio.server.TransportType; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; import java.io.IOException; +import java.nio.charset.StandardCharsets; public abstract class JSONPPollingTransport extends AbstractHttpTransport { private static final String EIO_PREFIX = "___eio"; private static final String FRAME_ID = JSONPPollingTransport.class.getName() + ".FRAME_ID"; - protected JSONPPollingTransport() { } + protected JSONPPollingTransport(ServletConfig servletConfig, ServletContext servletContext) { + super(servletConfig, servletContext); + } @Override public TransportType getType() @@ -47,25 +52,27 @@ public TransportType getType() return TransportType.JSONP_POLLING; } - public void startSend(Session session, ServletResponse response) throws IOException + public void startSend(Session session, HttpResponse response) throws IOException { response.setContentType("text/javascript; charset=UTF-8"); } - public void writeData(Session session, ServletResponse response, String data) throws IOException + public void writeData(Session session, HttpResponse response, String data) throws IOException { - response.getOutputStream().print(EIO_PREFIX); - response.getOutputStream().print("[" + session.getAttribute(FRAME_ID) + "]('"); - response.getOutputStream().print(data); //TODO: encode data? - response.getOutputStream().print("');"); + StringBuilder sb = new StringBuilder() + .append(EIO_PREFIX) + .append("[").append(session.getAttribute(FRAME_ID)).append("]('") + .append(data) + .append("');"); + response.getOutputStream().write(sb.toString().getBytes(StandardCharsets.UTF_8)); } - public void finishSend(Session session, ServletResponse response) throws IOException + public void finishSend(Session session, HttpResponse response) throws IOException { response.flushBuffer(); } - public void onConnect(Session session, ServletRequest request, ServletResponse response) + public void onConnect(Session session, HttpRequest request, HttpResponse response) throws IOException { try { diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/XHRPollingTransport.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/XHRPollingTransport.java similarity index 81% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/XHRPollingTransport.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/XHRPollingTransport.java index 377e56d..fe87fb4 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/XHRPollingTransport.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/XHRPollingTransport.java @@ -22,13 +22,21 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport; +package com.codeminders.socketio.server.servlet.transport; import com.codeminders.socketio.server.TransportConnection; import com.codeminders.socketio.server.TransportType; +import com.codeminders.socketio.server.transport.XHRTransportConnection; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; public class XHRPollingTransport extends AbstractHttpTransport { + public XHRPollingTransport(ServletConfig servletConfig, ServletContext servletContext) { + super(servletConfig, servletContext); + } + @Override public TransportType getType() { diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/ServletConfigHolder.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/ServletConfigHolder.java similarity index 96% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/ServletConfigHolder.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/ServletConfigHolder.java index c5df9a1..45e5a30 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/ServletConfigHolder.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/ServletConfigHolder.java @@ -20,7 +20,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport.websocket; +package com.codeminders.socketio.server.servlet.transport.websocket; import javax.servlet.ServletConfig; diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/SynchronizedWebsocketIO.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/SynchronizedWebsocketIO.java similarity index 96% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/SynchronizedWebsocketIO.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/SynchronizedWebsocketIO.java index a982234..c69d2f2 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/SynchronizedWebsocketIO.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/SynchronizedWebsocketIO.java @@ -20,7 +20,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport.websocket; +package com.codeminders.socketio.server.servlet.transport.websocket; import java.io.IOException; diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketConfigurator.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketConfigurator.java similarity index 96% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketConfigurator.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketConfigurator.java index fb2412e..18a0eda 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketConfigurator.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketConfigurator.java @@ -20,7 +20,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport.websocket; +package com.codeminders.socketio.server.servlet.transport.websocket; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketIO.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketIO.java similarity index 96% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketIO.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketIO.java index e5e0560..744b5e1 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketIO.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketIO.java @@ -20,7 +20,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport.websocket; +package com.codeminders.socketio.server.servlet.transport.websocket; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketIOServlet.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketIOServlet.java similarity index 73% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketIOServlet.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketIOServlet.java index 25d1f9e..67a246e 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketIOServlet.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketIOServlet.java @@ -20,9 +20,10 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport.websocket; +package com.codeminders.socketio.server.servlet.transport.websocket; -import com.codeminders.socketio.server.SocketIOServlet; +import com.codeminders.socketio.common.SocketIOException; +import com.codeminders.socketio.server.servlet.SocketIOServlet; import com.codeminders.socketio.server.TransportProvider; import javax.servlet.ServletConfig; @@ -38,8 +39,12 @@ public void init(ServletConfig config) throws ServletException { super.init(config); ServletConfigHolder.getInstance().setConfig(config); - TransportProvider transportProvider = new WebsocketTransportProvider(); - transportProvider.init(config, getServletContext()); - setTransportProvider(transportProvider); + try { + TransportProvider transportProvider = new WebsocketTransportProvider(config, getServletContext()); + transportProvider.init(); + setTransportProvider(transportProvider); + } catch (SocketIOException e) { + throw new ServletException("Failed to initialize Websocket transport provider: " + e.getMessage(), e); + } } } diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransport.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransport.java similarity index 76% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransport.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransport.java index 0064823..1bd4085 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransport.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransport.java @@ -23,23 +23,30 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport.websocket; +package com.codeminders.socketio.server.servlet.transport.websocket; +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.HttpResponse; import com.codeminders.socketio.server.SocketIOManager; import com.codeminders.socketio.server.TransportConnection; import com.codeminders.socketio.server.TransportType; -import com.codeminders.socketio.server.transport.AbstractTransport; +import com.codeminders.socketio.server.servlet.transport.AbstractServletTransport; import com.codeminders.socketio.server.transport.AbstractTransportConnection; -import javax.servlet.http.HttpServletRequest; +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.logging.Logger; -public final class WebsocketTransport extends AbstractTransport +public final class WebsocketTransport extends AbstractServletTransport { private static final Logger LOGGER = Logger.getLogger(WebsocketTransport.class.getName()); + public WebsocketTransport(ServletConfig servletConfig, ServletContext servletContext) { + super(servletConfig, servletContext); + } + @Override public TransportType getType() { @@ -47,29 +54,31 @@ public TransportType getType() } @Override - public void handle(HttpServletRequest request, - HttpServletResponse response, - SocketIOManager sessionManager) throws IOException + public TransportConnection handle(HttpRequest request, + HttpResponse response, + SocketIOManager sessionManager) throws IOException { if(!"GET".equals(request.getMethod())) { response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "Only GET method is allowed for websocket transport"); - return; + return null; } if (request.getHeader("Sec-WebSocket-Key") == null) { response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Missing request header 'Sec-WebSocket-Key'"); - return; + return null; } final TransportConnection connection = getConnection(request, sessionManager); // a bit hacky but safe since we know the type of TransportConnection here ((AbstractTransportConnection)connection).setRequest(request); + + return connection; } @Override diff --git a/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransportConnection.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransportConnection.java new file mode 100644 index 0000000..2d345e5 --- /dev/null +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransportConnection.java @@ -0,0 +1,210 @@ +/** + * The MIT License + * Copyright (c) 2015 Alexander Sova (bird@codeminders.com) + *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + *

+ * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.codeminders.socketio.server.servlet.transport.websocket; + +import com.codeminders.socketio.common.SocketIOException; +import com.codeminders.socketio.protocol.EngineIOProtocol; +import com.codeminders.socketio.server.Config; +import com.codeminders.socketio.server.SocketIOManager; +import com.codeminders.socketio.server.Transport; +import com.codeminders.socketio.server.servlet.ServletBasedConfig; +import com.codeminders.socketio.server.transport.websocket.AbstractWebsocketTransportConnection; + +import javax.websocket.CloseReason; +import javax.websocket.EndpointConfig; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.HandshakeRequest; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author Alexander Sova (bird@codeminders.com) + * @author Alex Saveliev (lyolik@codeminders.com) + */ +@ServerEndpoint(value="/socket.io/", configurator = WebsocketConfigurator.class) +public final class WebsocketTransportConnection extends AbstractWebsocketTransportConnection +{ + private static final Logger LOGGER = Logger.getLogger(WebsocketTransportConnection.class.getName()); + + private static Class websocketIOClass = WebsocketIO.class; + + private WebsocketIO websocketIO; + + public WebsocketTransportConnection(Transport transport) + { + super(transport); + } + + /** + * + * @param clazz class responsible for I/O operations + */ + public static void setWebsocketIOClass(Class clazz) { + WebsocketTransportConnection.websocketIOClass = clazz; + } + + @OnOpen + public void onOpen(javax.websocket.Session session, EndpointConfig config) throws Exception + { + setupIO(session); + setupSession(session); + init(new ServletBasedConfig( + ServletConfigHolder.getInstance().getConfig(), + getTransport().getType().toString())); + session.setMaxBinaryMessageBufferSize(getConfig().getBufferSize()); + session.setMaxIdleTimeout(getConfig().getMaxIdle()); + session.setMaxTextMessageBufferSize(getConfig().getInt(Config.MAX_TEXT_MESSAGE_SIZE, 32000)); + sendHandshake(); + } + + private void setupIO(Session session) throws Exception { + websocketIO = websocketIOClass.getConstructor(Session.class).newInstance(session); + } + + @OnClose + public void onClose(javax.websocket.Session session, CloseReason closeReason) + { + handleConnectionClosed(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase()); + } + + @OnMessage + public void onMessage(String text) + { + handleTextFrame(text); + } + + @OnMessage + public void onMessage(InputStream is) + { + handleBinaryFrame(is); + } + + @OnError + public void onError(javax.websocket.Session session, Throwable error) { + if (websocketIO != null) + { + disconnectEndpoint(); + websocketIO = null; + } + } + + @Override + public void abort() + { + super.abort(); + if (websocketIO != null) + { + disconnectEndpoint(); + websocketIO = null; + } + } + + protected void sendString(String data) throws SocketIOException + { + if (LOGGER.isLoggable(Level.FINE)) + LOGGER.log(Level.FINE, "Session[" + getSession().getSessionId() + "]: send text: " + data); + + try + { + websocketIO.sendString(data); + } + catch (IOException e) + { + disconnectEndpoint(); + throw new SocketIOException(e); + } + } + + //TODO: implement streaming. right now it is all in memory. + //TODO: read and send in chunks using sendPartialBytes() + protected void sendBinary(byte[] data) throws SocketIOException + { + if (LOGGER.isLoggable(Level.FINE)) + LOGGER.log(Level.FINE, "Session[" + getSession().getSessionId() + "]: send binary"); + + try + { + websocketIO.sendBinary(data); + } + catch (IOException e) + { + disconnectEndpoint(); + throw new SocketIOException(e); + } + } + + private void disconnectEndpoint() + { + try + { + websocketIO.disconnect(); + } + catch (IOException ex) + { + // ignore + } + } + + /** + * @param session websocket session + * @return session id extracted from handshake request's parameter + */ + private String getSessionId(javax.websocket.Session session) + { + HandshakeRequest handshake = (HandshakeRequest) + session.getUserProperties().get(HandshakeRequest.class.getName()); + if (handshake == null) { + return null; + } + List values = handshake.getParameterMap().get(EngineIOProtocol.SESSION_ID); + if (values == null || values.isEmpty()) { + return null; + } + return values.get(0); + } + + /** + * Initializes socket.io session + * @param session websocket session + */ + private void setupSession(javax.websocket.Session session) + { + String sessionId = getSessionId(session); + com.codeminders.socketio.server.Session sess = null; + if (sessionId != null) { + sess = SocketIOManager.getInstance().getSession(sessionId); + } + if (sess == null) { + sess = SocketIOManager.getInstance().createSession(); + } + setSession(sess); + } +} diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransportProvider.java b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransportProvider.java similarity index 70% rename from socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransportProvider.java rename to socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransportProvider.java index e3004dc..51073b3 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransportProvider.java +++ b/socket-io-servlet/src/main/java/com/codeminders/socketio/server/servlet/transport/websocket/WebsocketTransportProvider.java @@ -20,17 +20,25 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.codeminders.socketio.server.transport.websocket; +package com.codeminders.socketio.server.servlet.transport.websocket; import com.codeminders.socketio.server.Transport; -import com.codeminders.socketio.server.transport.AbstractTransportProvider; +import com.codeminders.socketio.server.servlet.transport.AbstractServletTransportProvider; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; /** * @author Alexander Sova (bird@codeminders.com) */ -public class WebsocketTransportProvider extends AbstractTransportProvider +public class WebsocketTransportProvider extends AbstractServletTransportProvider { - static final Transport websocket = new WebsocketTransport(); + private final Transport websocket; + + public WebsocketTransportProvider(ServletConfig servletConfig, ServletContext servletContext) { + super(servletConfig, servletContext); + this.websocket = new WebsocketTransport(servletConfig, servletContext); + } @Override protected Transport createWebSocketTransport() diff --git a/socket-io/pom.xml b/socket-io/pom.xml index 3b44eab..ed7962d 100644 --- a/socket-io/pom.xml +++ b/socket-io/pom.xml @@ -3,23 +3,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.7 - 1.7 - - - - com.codeminders.socketio socketio-parent - 1.0.9 + 2.0.0-SNAPSHOT socket-io @@ -39,12 +27,6 @@ jackson-databind 2.9.1 - - javax.websocket - javax.websocket-api - 1.1 - provided - junit junit diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/HttpRequest.java b/socket-io/src/main/java/com/codeminders/socketio/server/HttpRequest.java new file mode 100644 index 0000000..6ff798b --- /dev/null +++ b/socket-io/src/main/java/com/codeminders/socketio/server/HttpRequest.java @@ -0,0 +1,14 @@ +package com.codeminders.socketio.server; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +public interface HttpRequest { + String getMethod(); + String getHeader(String name); + String getContentType(); + String getParameter(String name); + InputStream getInputStream() throws IOException; + BufferedReader getReader() throws IOException; +} diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/HttpResponse.java b/socket-io/src/main/java/com/codeminders/socketio/server/HttpResponse.java new file mode 100644 index 0000000..fd5c2d2 --- /dev/null +++ b/socket-io/src/main/java/com/codeminders/socketio/server/HttpResponse.java @@ -0,0 +1,13 @@ +package com.codeminders.socketio.server; + +import java.io.IOException; +import java.io.OutputStream; + +public interface HttpResponse { + void setHeader(String name, String value); + void setContentType(String contentType); + OutputStream getOutputStream() throws IOException; + void sendError(int statusCode, String message) throws IOException; + void sendError(int statusCode) throws IOException; + void flushBuffer() throws IOException; +} diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/Session.java b/socket-io/src/main/java/com/codeminders/socketio/server/Session.java index 3f78fb0..c0659d4 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/Session.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/Session.java @@ -25,15 +25,24 @@ */ package com.codeminders.socketio.server; -import com.codeminders.socketio.common.SocketIOException; -import com.codeminders.socketio.protocol.*; import com.codeminders.socketio.common.ConnectionState; import com.codeminders.socketio.common.DisconnectReason; +import com.codeminders.socketio.common.SocketIOException; +import com.codeminders.socketio.protocol.ACKPacket; +import com.codeminders.socketio.protocol.BinaryPacket; +import com.codeminders.socketio.protocol.EngineIOPacket; +import com.codeminders.socketio.protocol.EngineIOProtocol; +import com.codeminders.socketio.protocol.EventPacket; +import com.codeminders.socketio.protocol.SocketIOPacket; +import com.codeminders.socketio.protocol.SocketIOProtocol; -import javax.servlet.http.HttpSession; import java.io.InputStream; -import java.util.*; -import java.util.concurrent.*; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -52,7 +61,6 @@ public class Session implements DisconnectListener private final SocketIOManager socketIOManager; private final String sessionId; - private final HttpSession httpSession; private final Map attributes = new ConcurrentHashMap<>(); private Map sockets = new LinkedHashMap<>(); // namespace, socket @@ -71,13 +79,12 @@ public class Session implements DisconnectListener private int packet_id = 0; // packet id. used for requesting ACK private Map ack_listeners = new LinkedHashMap<>(); // packetid, listener - Session(SocketIOManager socketIOManager, String sessionId, HttpSession httpSession) + Session(SocketIOManager socketIOManager, String sessionId) { assert (socketIOManager != null); this.socketIOManager = socketIOManager; this.sessionId = sessionId; - this.httpSession = httpSession; } public Socket createSocket(String ns) @@ -502,9 +509,4 @@ private void forcePollingCycle() LOGGER.log(Level.WARNING, "Cannot send NOOP packet while upgrading the transport", e); } } - - public HttpSession getHttpSession() - { - return httpSession; - } } diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/Socket.java b/socket-io/src/main/java/com/codeminders/socketio/server/Socket.java index 8702aac..b7b2307 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/Socket.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/Socket.java @@ -25,8 +25,10 @@ import com.codeminders.socketio.common.DisconnectReason; import com.codeminders.socketio.common.SocketIOException; -import javax.servlet.http.HttpServletRequest; -import java.util.*; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; /** * @author Alexander Sova (bird@codeminders.com) @@ -143,7 +145,7 @@ public String getId() /** * @return current HTTP request from underlying connection, null if socket is disconnected */ - public HttpServletRequest getRequest() + public HttpRequest getRequest() { TransportConnection connection = getSession().getConnection(); return connection == null ? null : connection.getRequest(); diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/SocketIOManager.java b/socket-io/src/main/java/com/codeminders/socketio/server/SocketIOManager.java index 9ec4487..b3c27a5 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/SocketIOManager.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/SocketIOManager.java @@ -25,9 +25,12 @@ */ package com.codeminders.socketio.server; -import javax.servlet.http.HttpSession; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; /** * Class to manage Socket.IO sessions and namespaces. @@ -79,24 +82,11 @@ private String generateSessionId() /** * Creates new session * - * @deprecated use {@link SocketIOManager#createSession(HttpSession)} * @return new session */ - @Deprecated public Session createSession() { - return createSession(null); - } - - /** - * Creates new session - * - * @param httpSession The HTTP session of the connecting client - * @return new session - */ - public Session createSession(HttpSession httpSession) - { - Session session = new Session(this, generateSessionId(), httpSession); + Session session = new Session(this, generateSessionId()); sessions.put(session.getSessionId(), session); return session; } diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/Transport.java b/socket-io/src/main/java/com/codeminders/socketio/server/Transport.java index 7ec2c56..00b13bf 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/Transport.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/Transport.java @@ -25,11 +25,8 @@ */ package com.codeminders.socketio.server; -import javax.servlet.ServletConfig; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import com.codeminders.socketio.common.SocketIOException; + import java.io.IOException; public interface Transport @@ -42,26 +39,24 @@ public interface Transport /** * Initializes the transport * - * @param config Servlet config - * @param context Servlet context - * @throws ServletException if init fails + * @throws SocketIOException if init fails */ - void init(ServletConfig config, ServletContext context) - throws ServletException; + void init() + throws SocketIOException; void destroy(); /** * Handles incoming HTTP request * - * @param request object that contains the request the client made of the servlet - * @param response object that contains the response the servlet returns to the client + * @param request object that contains the request the client made + * @param response object that contains the response that will be returned to the client * @param socketIOManager session manager * @throws IOException if an input or output error occurs while the servlet is handling the request */ - void handle(HttpServletRequest request, - HttpServletResponse response, - SocketIOManager socketIOManager) throws IOException; + TransportConnection handle(HttpRequest request, + HttpResponse response, + SocketIOManager socketIOManager) throws IOException; /** * Creates new connection diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/TransportConnection.java b/socket-io/src/main/java/com/codeminders/socketio/server/TransportConnection.java index f93d8a4..a5a6f72 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/TransportConnection.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/TransportConnection.java @@ -29,8 +29,6 @@ import com.codeminders.socketio.protocol.EngineIOPacket; import com.codeminders.socketio.protocol.SocketIOPacket; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.IOException; /** @@ -44,7 +42,7 @@ public interface TransportConnection Session getSession(); Transport getTransport(); - void handle(HttpServletRequest request, HttpServletResponse response) + void handle(HttpRequest request, HttpResponse response) throws IOException; /** @@ -72,5 +70,5 @@ void handle(HttpServletRequest request, HttpServletResponse response) /** * @return current HTTP request, null if connection is disconnected */ - HttpServletRequest getRequest(); + HttpRequest getRequest(); } diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/TransportProvider.java b/socket-io/src/main/java/com/codeminders/socketio/server/TransportProvider.java index 5068aa2..80cb1cb 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/TransportProvider.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/TransportProvider.java @@ -22,10 +22,8 @@ */ package com.codeminders.socketio.server; -import javax.servlet.ServletConfig; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.ServletRequest; +import com.codeminders.socketio.common.SocketIOException; + import java.util.Collection; /** @@ -38,24 +36,21 @@ public interface TransportProvider { /** * Creates all the transports * - * @param config servlet configuration - * @param context servlet context - * @throws ServletException if init failed + * @throws SocketIOException if init failed */ - void init(ServletConfig config, ServletContext context) - throws ServletException; + void init() throws SocketIOException; void destroy(); /** * Finds appropriate Transport class based on the rules defined at * https://github.com/socketio/engine.io-protocol#transports * - * @param request incoming servlet request + * @param request incoming HTTP request * @return appropriate Transport object * @throws UnsupportedTransportException no transport was found * @throws SocketIOProtocolException invalid request was sent */ - Transport getTransport(ServletRequest request) + Transport getTransport(HttpRequest request) throws UnsupportedTransportException, SocketIOProtocolException; Transport getTransport(TransportType type); diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransport.java b/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransport.java index fb4c67e..7ffb1e8 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransport.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransport.java @@ -1,62 +1,17 @@ -/** - * The MIT License - * Copyright (c) 2015 Alexander Sova (bird@codeminders.com) - *

- * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - *

- * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - *

- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ package com.codeminders.socketio.server.transport; import com.codeminders.socketio.protocol.EngineIOProtocol; -import com.codeminders.socketio.server.*; +import com.codeminders.socketio.server.Config; +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.Session; +import com.codeminders.socketio.server.SocketIOManager; +import com.codeminders.socketio.server.Transport; +import com.codeminders.socketio.server.TransportConnection; -import javax.servlet.ServletConfig; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpSession; +public abstract class AbstractTransport implements Transport { + protected abstract Config getConfig(); -/** - * @author Alexander Sova (bird@codeminders.com) - * @author Mathieu Carbou - */ -public abstract class AbstractTransport implements Transport -{ - private Config config; - - @Override - public void destroy() - { - } - - @Override - public void init(ServletConfig config, ServletContext context) - throws ServletException - { - this.config = new ServletBasedConfig(config, getType().toString()); - } - - protected final Config getConfig() - { - return config; - } - - protected final TransportConnection createConnection(Session session) + private final TransportConnection createConnection(Session session) { TransportConnection connection = createConnection(); connection.setSession(session); @@ -64,7 +19,7 @@ protected final TransportConnection createConnection(Session session) return connection; } - protected TransportConnection getConnection(HttpServletRequest request, SocketIOManager sessionManager) + protected TransportConnection getConnection(HttpRequest request, SocketIOManager sessionManager) { String sessionId = request.getParameter(EngineIOProtocol.SESSION_ID); Session session = null; @@ -73,7 +28,7 @@ protected TransportConnection getConnection(HttpServletRequest request, SocketIO session = sessionManager.getSession(sessionId); if(session == null) - return createConnection(sessionManager.createSession(request.getSession())); + return createConnection(sessionManager.createSession()); TransportConnection activeConnection = session.getConnection(); @@ -89,5 +44,4 @@ public String toString() { return getType().toString(); } - } diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportConnection.java b/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportConnection.java index 0dc861d..a07102b 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportConnection.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportConnection.java @@ -29,9 +29,14 @@ import com.codeminders.socketio.common.SocketIOException; import com.codeminders.socketio.protocol.SocketIOPacket; import com.codeminders.socketio.protocol.SocketIOProtocol; -import com.codeminders.socketio.server.*; +import com.codeminders.socketio.server.ACKListener; +import com.codeminders.socketio.server.Config; +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.Session; +import com.codeminders.socketio.server.SocketIOClosedException; +import com.codeminders.socketio.server.Transport; +import com.codeminders.socketio.server.TransportConnection; -import javax.servlet.http.HttpServletRequest; import java.util.Arrays; import java.util.logging.Logger; @@ -46,7 +51,7 @@ public abstract class AbstractTransportConnection implements TransportConnection private Config config; private Session session; private Transport transport; - private HttpServletRequest request; + private HttpRequest request; public AbstractTransportConnection(Transport transport) { @@ -126,12 +131,12 @@ public void emit(String namespace, String name, Object... args) } @Override - public HttpServletRequest getRequest() + public HttpRequest getRequest() { return request; } - public void setRequest(HttpServletRequest request) + public void setRequest(HttpRequest request) { this.request = request; } diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportProvider.java b/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportProvider.java index d4a53d0..c4b5a16 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportProvider.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/transport/AbstractTransportProvider.java @@ -1,37 +1,34 @@ package com.codeminders.socketio.server.transport; +import com.codeminders.socketio.common.SocketIOException; import com.codeminders.socketio.protocol.EngineIOProtocol; -import com.codeminders.socketio.server.*; - -import javax.servlet.ServletConfig; -import javax.servlet.ServletContext; -import javax.servlet.ServletException; -import javax.servlet.ServletRequest; -import java.util.*; -import java.util.logging.Logger; - -/** - * @author Alexander Sova (bird@codeminders.com) - */ -public abstract class AbstractTransportProvider implements TransportProvider { +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.SocketIOProtocolException; +import com.codeminders.socketio.server.Transport; +import com.codeminders.socketio.server.TransportProvider; +import com.codeminders.socketio.server.TransportType; +import com.codeminders.socketio.server.UnsupportedTransportException; - private static final Logger LOGGER = Logger.getLogger(AbstractTransportProvider.class.getName()); +import java.util.Collection; +import java.util.EnumMap; +import java.util.Map; - protected Map transports = new EnumMap<>(TransportType.class); +public abstract class AbstractTransportProvider implements TransportProvider { + private Map transports = new EnumMap<>(TransportType.class); /** * Creates and initializes all available transports */ @Override - public void init(ServletConfig config, ServletContext context) - throws ServletException + public void init() + throws SocketIOException { addIfNotNull(TransportType.XHR_POLLING, createXHRPollingTransport()); addIfNotNull(TransportType.JSONP_POLLING, createJSONPPollingTransport()); addIfNotNull(TransportType.WEB_SOCKET, createWebSocketTransport()); - for(Transport t : transports.values()) - t.init(config, context); + for (Transport t : transports.values()) + t.init(); } @Override @@ -42,7 +39,7 @@ public void destroy() } @Override - public Transport getTransport(ServletRequest request) + public Transport getTransport(HttpRequest request) throws UnsupportedTransportException, SocketIOProtocolException { String transportName = request.getParameter(EngineIOProtocol.TRANSPORT); @@ -80,20 +77,9 @@ public Collection getTransports() return transports.values(); } - protected Transport createXHRPollingTransport() - { - return new XHRPollingTransport(); - } - - protected Transport createJSONPPollingTransport() - { - return null; - } - - protected Transport createWebSocketTransport() - { - return null; - } + protected abstract Transport createXHRPollingTransport(); + protected abstract Transport createJSONPPollingTransport(); + protected abstract Transport createWebSocketTransport(); private void addIfNotNull(TransportType type, Transport transport) { diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/XHRTransportConnection.java b/socket-io/src/main/java/com/codeminders/socketio/server/transport/XHRTransportConnection.java index 214c156..8c6fb69 100644 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/XHRTransportConnection.java +++ b/socket-io/src/main/java/com/codeminders/socketio/server/transport/XHRTransportConnection.java @@ -5,16 +5,16 @@ import com.codeminders.socketio.protocol.EngineIOPacket; import com.codeminders.socketio.protocol.EngineIOProtocol; import com.codeminders.socketio.protocol.SocketIOPacket; +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.HttpResponse; import com.codeminders.socketio.server.SocketIOProtocolException; import com.codeminders.socketio.server.Transport; import com.google.common.io.CharStreams; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; - +import java.nio.charset.StandardCharsets; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -29,6 +29,8 @@ public class XHRTransportConnection extends AbstractTransportConnection private static final String ALLOWED_ORIGINS = "allowedOrigins"; private static final String ALLOW_ALL_ORIGINS = "allowAllOrigins"; + private static final int SC_METHOD_NOT_ALLOWED = 405; + private static final Logger LOGGER = Logger.getLogger(XHRTransportConnection.class.getName()); private BlockingQueue packets = new LinkedBlockingDeque<>(); @@ -41,7 +43,7 @@ public XHRTransportConnection(Transport transport) } @Override - public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException + public void handle(HttpRequest request, HttpResponse response) throws IOException { if(done) return; @@ -51,7 +53,12 @@ public void handle(HttpServletRequest request, HttpServletResponse response) thr if(getConfig().getBoolean(ALLOW_ALL_ORIGINS, false)) { - response.setHeader("Access-Control-Allow-Origin", request.getHeader("Origin")); + String origin = request.getHeader("Origin"); + if (origin == null) + { + origin = "*"; + } + response.setHeader("Access-Control-Allow-Origin", origin); response.setHeader("Access-Control-Allow-Credentials", "true"); } else @@ -88,7 +95,7 @@ public void handle(HttpServletRequest request, HttpServletResponse response) thr { throw new SocketIOProtocolException("Unsupported request content type for incoming polling request: " + contentType); } - response.getWriter().print("ok"); + response.getOutputStream().write("ok".getBytes(StandardCharsets.UTF_8)); } else if ("GET".equals(request.getMethod())) //outgoing { @@ -115,7 +122,7 @@ else if ("GET".equals(request.getMethod())) //outgoing else if(!"OPTIONS".equals(request.getMethod())) { // OPTIONS is CORS pre-flight request - response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED); + response.sendError(SC_METHOD_NOT_ALLOWED); } } diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/AbstractWebsocketTransportConnection.java b/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/AbstractWebsocketTransportConnection.java new file mode 100644 index 0000000..662c2b0 --- /dev/null +++ b/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/AbstractWebsocketTransportConnection.java @@ -0,0 +1,182 @@ +package com.codeminders.socketio.server.transport.websocket; + +import com.codeminders.socketio.common.ConnectionState; +import com.codeminders.socketio.common.DisconnectReason; +import com.codeminders.socketio.common.SocketIOException; +import com.codeminders.socketio.protocol.BinaryPacket; +import com.codeminders.socketio.protocol.EngineIOPacket; +import com.codeminders.socketio.protocol.EngineIOProtocol; +import com.codeminders.socketio.protocol.SocketIOPacket; +import com.codeminders.socketio.server.Config; +import com.codeminders.socketio.server.HttpRequest; +import com.codeminders.socketio.server.HttpResponse; +import com.codeminders.socketio.server.SocketIOProtocolException; +import com.codeminders.socketio.server.Transport; +import com.codeminders.socketio.server.transport.AbstractTransportConnection; +import com.google.common.io.ByteStreams; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.logging.Level; +import java.util.logging.Logger; + +public abstract class AbstractWebsocketTransportConnection extends AbstractTransportConnection { + private static final Logger LOGGER = Logger.getLogger(AbstractWebsocketTransportConnection.class.getName()); + + private static final int SC_BAD_REQUEST = 400; + + protected AbstractWebsocketTransportConnection(Transport transport) + { + super(transport); + } + + @Override + protected void init() + { + getSession().setTimeout(getConfig().getTimeout(Config.DEFAULT_PING_TIMEOUT)); + + if (LOGGER.isLoggable(Level.FINE)) + LOGGER.fine(getConfig().getNamespace() + " WebSocket configuration:" + + " timeout=" + getSession().getTimeout()); + } + + @Override + public void handle(HttpRequest request, HttpResponse response) throws IOException + { + response.sendError(SC_BAD_REQUEST, "Unexpected request on upgraded WebSocket connection"); + } + + @Override + public void abort() + { + getSession().clearTimeout(); + } + + @Override + public void send(EngineIOPacket packet) throws SocketIOException + { + sendString(EngineIOProtocol.encode(packet)); + } + + @Override + public void send(SocketIOPacket packet) throws SocketIOException + { + send(EngineIOProtocol.createMessagePacket(packet.encode())); + if(packet instanceof BinaryPacket) + { + Collection attachments = ((BinaryPacket) packet).getAttachments(); + for (InputStream is : attachments) + { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + try + { + os.write(EngineIOPacket.Type.MESSAGE.value()); + ByteStreams.copy(is, os); + } + catch (IOException e) + { + if(LOGGER.isLoggable(Level.WARNING)) + LOGGER.log(Level.SEVERE, "Cannot load binary object to send it to the socket", e); + } + sendBinary(os.toByteArray()); + } + } + } + + protected boolean sendHandshake() { + if(getSession().getConnectionState() == ConnectionState.CONNECTING) + { + try + { + send(EngineIOProtocol.createHandshakePacket(getSession().getSessionId(), + new String[]{}, + getConfig().getPingInterval(Config.DEFAULT_PING_INTERVAL), + getConfig().getTimeout(Config.DEFAULT_PING_TIMEOUT))); + + getSession().onConnect(this); + } + catch (SocketIOException e) + { + LOGGER.log(Level.SEVERE, "Cannot connect", e); + getSession().setDisconnectReason(DisconnectReason.CONNECT_FAILED); + return false; + } + } + + return true; + } + + protected boolean handleTextFrame(String text) { + if (LOGGER.isLoggable(Level.FINE)) + LOGGER.fine("Session[" + getSession().getSessionId() + "]: text received: " + text); + + getSession().resetTimeout(); + + try + { + getSession().onPacket(EngineIOProtocol.decode(text), this); + } + catch (SocketIOProtocolException e) + { + if(LOGGER.isLoggable(Level.WARNING)) + LOGGER.log(Level.WARNING, "Invalid packet received", e); + return false; + } + + return true; + } + + protected boolean handleBinaryFrame(InputStream is) { + if (LOGGER.isLoggable(Level.FINE)) + LOGGER.fine("Session[" + getSession().getSessionId() + "]: binary received"); + + getSession().resetTimeout(); + + try + { + getSession().onPacket(EngineIOProtocol.decode(is), this); + } + catch (SocketIOProtocolException e) + { + if(LOGGER.isLoggable(Level.WARNING)) + LOGGER.log(Level.WARNING, "Problem processing binary received", e); + return false; + } + + return true; + } + + protected void handleConnectionClosed(int closeCode, String closeReasonPhrase) { + if(LOGGER.isLoggable(Level.FINE)) + LOGGER.log(Level.FINE, "Session[" + getSession().getSessionId() + "]:" + + " websocket closed. (" + closeCode + "): " + closeReasonPhrase); + + //If close is unexpected then try to guess the reason based on closeCode, otherwise the reason is already set + if(getSession().getConnectionState() != ConnectionState.CLOSING) + getSession().setDisconnectReason(fromCloseCode(closeCode)); + + getSession().setDisconnectMessage(closeReasonPhrase); + getSession().onShutdown(); + } + + /** + * @link https://tools.ietf.org/html/rfc6455#section-11.7 + */ + protected DisconnectReason fromCloseCode(int code) + { + switch (code) { + case 1000: + return DisconnectReason.CLOSED; // Normal Closure + case 1001: + return DisconnectReason.CLIENT_GONE; // Going Away + default: + return DisconnectReason.ERROR; + } + } + + protected abstract void sendString(String data) throws SocketIOException; + + protected abstract void sendBinary(byte[] data) throws SocketIOException; +} diff --git a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransportConnection.java b/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransportConnection.java deleted file mode 100644 index 6e3e37a..0000000 --- a/socket-io/src/main/java/com/codeminders/socketio/server/transport/websocket/WebsocketTransportConnection.java +++ /dev/null @@ -1,346 +0,0 @@ -/** - * The MIT License - * Copyright (c) 2015 Alexander Sova (bird@codeminders.com) - *

- * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - *

- * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - *

- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package com.codeminders.socketio.server.transport.websocket; - -import com.codeminders.socketio.common.ConnectionState; -import com.codeminders.socketio.common.DisconnectReason; -import com.codeminders.socketio.common.SocketIOException; -import com.codeminders.socketio.protocol.BinaryPacket; -import com.codeminders.socketio.protocol.EngineIOPacket; -import com.codeminders.socketio.protocol.EngineIOProtocol; -import com.codeminders.socketio.protocol.SocketIOPacket; -import com.codeminders.socketio.server.*; -import com.codeminders.socketio.server.transport.AbstractTransportConnection; -import com.google.common.io.ByteStreams; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.HttpSession; -import javax.websocket.*; -import javax.websocket.Session; -import javax.websocket.server.HandshakeRequest; -import javax.websocket.server.ServerEndpoint; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * @author Alexander Sova (bird@codeminders.com) - * @author Alex Saveliev (lyolik@codeminders.com) - */ -@ServerEndpoint(value="/socket.io/", configurator = WebsocketConfigurator.class) -public final class WebsocketTransportConnection extends AbstractTransportConnection -{ - private static final Logger LOGGER = Logger.getLogger(WebsocketTransportConnection.class.getName()); - - private static Class websocketIOClass = WebsocketIO.class; - - private WebsocketIO websocketIO; - - public WebsocketTransportConnection() { - super(WebsocketTransportProvider.websocket); - } - - public WebsocketTransportConnection(Transport transport) - { - super(transport); - } - - /** - * - * @param clazz class responsible for I/O operations - */ - public static void setWebsocketIOClass(Class clazz) { - WebsocketTransportConnection.websocketIOClass = clazz; - } - - @Override - protected void init() - { - getSession().setTimeout(getConfig().getTimeout(Config.DEFAULT_PING_TIMEOUT)); - - if (LOGGER.isLoggable(Level.FINE)) - LOGGER.fine(getConfig().getNamespace() + " WebSocket configuration:" + - " timeout=" + getSession().getTimeout()); - } - - @OnOpen - public void onOpen(javax.websocket.Session session, EndpointConfig config) throws Exception - { - setupIO(session); - setupSession(session); - init(new ServletBasedConfig( - ServletConfigHolder.getInstance().getConfig(), - getTransport().getType().toString())); - session.setMaxBinaryMessageBufferSize(getConfig().getBufferSize()); - session.setMaxIdleTimeout(getConfig().getMaxIdle()); - session.setMaxTextMessageBufferSize(getConfig().getInt(Config.MAX_TEXT_MESSAGE_SIZE, 32000)); - - if(getSession().getConnectionState() == ConnectionState.CONNECTING) - { - try - { - send(EngineIOProtocol.createHandshakePacket(getSession().getSessionId(), - new String[]{}, - getConfig().getPingInterval(Config.DEFAULT_PING_INTERVAL), - getConfig().getTimeout(Config.DEFAULT_PING_TIMEOUT))); - - getSession().onConnect(this); - } - catch (SocketIOException e) - { - LOGGER.log(Level.SEVERE, "Cannot connect", e); - getSession().setDisconnectReason(DisconnectReason.CONNECT_FAILED); - abort(); - } - } - } - - private void setupIO(Session session) throws Exception { - websocketIO = websocketIOClass.getConstructor(Session.class).newInstance(session); - } - - @OnClose - public void onClose(javax.websocket.Session session, CloseReason closeReason) - { - if(LOGGER.isLoggable(Level.FINE)) - LOGGER.log(Level.FINE, "Session[" + getSession().getSessionId() + "]:" + - " websocket closed. " + closeReason.toString()); - - //If close is unexpected then try to guess the reason based on closeCode, otherwise the reason is already set - if(getSession().getConnectionState() != ConnectionState.CLOSING) - getSession().setDisconnectReason(fromCloseCode(closeReason.getCloseCode().getCode())); - - getSession().setDisconnectMessage(closeReason.getReasonPhrase()); - getSession().onShutdown(); - } - - @OnMessage - public void onMessage(String text) - { - if (LOGGER.isLoggable(Level.FINE)) - LOGGER.fine("Session[" + getSession().getSessionId() + "]: text received: " + text); - - getSession().resetTimeout(); - - try - { - getSession().onPacket(EngineIOProtocol.decode(text), this); - } - catch (SocketIOProtocolException e) - { - if(LOGGER.isLoggable(Level.WARNING)) - LOGGER.log(Level.WARNING, "Invalid packet received", e); - } - } - - @OnMessage - public void onMessage(InputStream is) - { - if (LOGGER.isLoggable(Level.FINE)) - LOGGER.fine("Session[" + getSession().getSessionId() + "]: binary received"); - - getSession().resetTimeout(); - - try - { - getSession().onPacket(EngineIOProtocol.decode(is), this); - } - catch (SocketIOProtocolException e) - { - if(LOGGER.isLoggable(Level.WARNING)) - LOGGER.log(Level.WARNING, "Problem processing binary received", e); - } - } - - @OnError - public void onError(javax.websocket.Session session, Throwable error) { - // TODO implement - // One reason might be when you are refreshing web page causing connection to be dropped - } - - @Override - public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException - { - response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Unexpected request on upgraded WebSocket connection"); - } - - @Override - public void abort() - { - getSession().clearTimeout(); - if (websocketIO != null) - { - disconnectEndpoint(); - websocketIO = null; - } - } - - @Override - public void send(EngineIOPacket packet) throws SocketIOException - { - sendString(EngineIOProtocol.encode(packet)); - } - - @Override - public void send(SocketIOPacket packet) throws SocketIOException - { - send(EngineIOProtocol.createMessagePacket(packet.encode())); - if(packet instanceof BinaryPacket) - { - Collection attachments = ((BinaryPacket) packet).getAttachments(); - for (InputStream is : attachments) - { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - try - { - os.write(EngineIOPacket.Type.MESSAGE.value()); - ByteStreams.copy(is, os); - } - catch (IOException e) - { - if(LOGGER.isLoggable(Level.WARNING)) - LOGGER.log(Level.SEVERE, "Cannot load binary object to send it to the socket", e); - } - sendBinary(os.toByteArray()); - } - } - } - - protected void sendString(String data) throws SocketIOException - { - if (LOGGER.isLoggable(Level.FINE)) - LOGGER.log(Level.FINE, "Session[" + getSession().getSessionId() + "]: send text: " + data); - - try - { - websocketIO.sendString(data); - } - catch (IOException e) - { - disconnectEndpoint(); - throw new SocketIOException(e); - } - } - - //TODO: implement streaming. right now it is all in memory. - //TODO: read and send in chunks using sendPartialBytes() - protected void sendBinary(byte[] data) throws SocketIOException - { - if (LOGGER.isLoggable(Level.FINE)) - LOGGER.log(Level.FINE, "Session[" + getSession().getSessionId() + "]: send binary"); - - try - { - websocketIO.sendBinary(data); - } - catch (IOException e) - { - disconnectEndpoint(); - throw new SocketIOException(e); - } - } - - private void disconnectEndpoint() - { - try - { - websocketIO.disconnect(); - } - catch (IOException ex) - { - // ignore - } - } - - /** - * @link https://tools.ietf.org/html/rfc6455#section-11.7 - */ - private DisconnectReason fromCloseCode(int code) - { - switch (code) { - case 1000: - return DisconnectReason.CLOSED; // Normal Closure - case 1001: - return DisconnectReason.CLIENT_GONE; // Going Away - default: - return DisconnectReason.ERROR; - } - } - - /** - * @param session websocket session - * @return session id extracted from handshake request's parameter - */ - private String getSessionId(javax.websocket.Session session) - { - HandshakeRequest handshake = (HandshakeRequest) - session.getUserProperties().get(HandshakeRequest.class.getName()); - if (handshake == null) { - return null; - } - List values = handshake.getParameterMap().get(EngineIOProtocol.SESSION_ID); - if (values == null || values.isEmpty()) { - return null; - } - return values.get(0); - } - - private HttpSession getHttpSession(javax.websocket.Session session) - { - HandshakeRequest handshake = (HandshakeRequest) - session.getUserProperties().get(HandshakeRequest.class.getName()); - if (handshake == null) - { - return null; - } - if (!(handshake.getHttpSession() instanceof HttpSession)) - { - return null; - } - return (HttpSession) handshake.getHttpSession(); - } - - /** - * Initializes socket.io session - * @param session - * @throws Exception - */ - private void setupSession(javax.websocket.Session session) throws Exception - { - String sessionId = getSessionId(session); - com.codeminders.socketio.server.Session sess = null; - if (sessionId != null) { - sess = SocketIOManager.getInstance().getSession(sessionId); - } - if (sess == null) { - HttpSession httpSession = getHttpSession(session); - sess = SocketIOManager.getInstance().createSession(httpSession); - } - setSession(sess); - } -}