From d0305e752ffda1ce9be79f92ccbad238ea9a1ee9 Mon Sep 17 00:00:00 2001 From: Tamer Mohammed Abdul-Radi Date: Sat, 11 Oct 2014 23:45:53 +0200 Subject: [PATCH] Updates akka stream/http to version 0.9 --- akka-http-example/src/main/scala/HttpClient.scala | 4 ++-- akka-http-example/src/main/scala/Main.scala | 3 ++- .../src/main/scala/StreamClientPublisher.scala | 3 ++- project/Build.scala | 6 +++--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/akka-http-example/src/main/scala/HttpClient.scala b/akka-http-example/src/main/scala/HttpClient.scala index 63fee60..fb58657 100644 --- a/akka-http-example/src/main/scala/HttpClient.scala +++ b/akka-http-example/src/main/scala/HttpClient.scala @@ -25,8 +25,8 @@ object HttpClient { val connFuture = IO(Http).ask(Http.Connect("127.0.0.1", port)).mapTo[Http.OutgoingConnection] val connection = await(connFuture) val request = HttpRequest(HttpMethods.GET, Uri(path)) - Flow(List(request -> 'NoContext)).produceTo(connection.processor) - await(Flow(connection.processor).map(_._1).toFuture) + Flow(List(request -> 'NoContext)).produceTo(connection.requestSubscriber) + await(Flow(connection.responsePublisher).map(_._1).toFuture) } } } \ No newline at end of file diff --git a/akka-http-example/src/main/scala/Main.scala b/akka-http-example/src/main/scala/Main.scala index 75616e4..ece8c86 100644 --- a/akka-http-example/src/main/scala/Main.scala +++ b/akka-http-example/src/main/scala/Main.scala @@ -5,6 +5,7 @@ import akka.util.ByteString import akka.actor.{ActorSystem,ActorRefFactory} import akka.stream.actor.{ActorPublisher} import akka.stream.{MaterializerSettings,FlowMaterializer} +import akka.stream.scaladsl2.Source import akka.http.model._ import com.typesafe.config._ import org.reactivestreams.{Publisher,Subscriber} @@ -36,7 +37,7 @@ object MainFunctions { HttpServer.bindServer(port) { case HttpRequest(GET, Uri.Path("/"), _, _, _) => HttpResponse ( - entity = new Chunked(MediaTypes.`text/plain`, publisher) + entity = new Chunked(MediaTypes.`text/plain`, Source(publisher)) ) case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!") } diff --git a/akka-http-example/src/main/scala/StreamClientPublisher.scala b/akka-http-example/src/main/scala/StreamClientPublisher.scala index 6370ae1..f33bb0a 100644 --- a/akka-http-example/src/main/scala/StreamClientPublisher.scala +++ b/akka-http-example/src/main/scala/StreamClientPublisher.scala @@ -8,6 +8,7 @@ import scala.async.Async.{async, await} import akka.util.{ByteString} import akka.actor.{ActorSystem,ActorLogging,Props} import akka.stream.{FlowMaterializer,MaterializerSettings} +import akka.stream.scaladsl2.Sink import akka.stream.actor._ import akka.http.model.HttpEntity import org.reactivestreams.Publisher @@ -23,7 +24,7 @@ object StreamClientPublisher { val processor = system.actorOf(Props[DataChunkProcessor]) val processorSubscriber = ActorSubscriber[ByteString](processor) val processorPublisher = ActorPublisher[ChunkStreamPart](processor) - response.entity.dataBytes(materializer).subscribe(processorSubscriber) + response.entity.dataBytes.connect(Sink(processorSubscriber)) processorPublisher } Await.result(publisherFuture, 1.seconds) diff --git a/project/Build.scala b/project/Build.scala index 3ede029..0c39c7e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -25,8 +25,8 @@ object Dependencies { val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion - val akkaHttp = "com.typesafe.akka" %% "akka-http-core-experimental" % "0.7" - val akkaStream ="com.typesafe.akka" %% "akka-stream-experimental" % "0.7" + val akkaHttp = "com.typesafe.akka" %% "akka-http-core-experimental" % "0.9" + val akkaStream ="com.typesafe.akka" %% "akka-stream-experimental" % "0.9" val playJson = "com.typesafe.play" %% "play-json" % "2.3.2" val scalatest = "org.scalatest" %% "scalatest" % "2.2.0" % "test" @@ -58,4 +58,4 @@ object AkkaHttpStreamExample extends Build { .settings ( resolvers ++= Seq(typesafeRepo) ) .settings ( libraryDependencies ++= Dependencies.allDependencies ) .settings ( scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature") ) -} \ No newline at end of file +}