Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions akka-http-example/src/main/scala/HttpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ object HttpClient {
val connFuture = IO(Http).ask(Http.Connect("127.0.0.1", port)).mapTo[Http.OutgoingConnection]
val connection = await(connFuture)
val request = HttpRequest(HttpMethods.GET, Uri(path))
Flow(List(request -> 'NoContext)).produceTo(connection.processor)
await(Flow(connection.processor).map(_._1).toFuture)
Flow(List(request -> 'NoContext)).produceTo(connection.requestSubscriber)
await(Flow(connection.responsePublisher).map(_._1).toFuture)
}
}
}
3 changes: 2 additions & 1 deletion akka-http-example/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.util.ByteString
import akka.actor.{ActorSystem,ActorRefFactory}
import akka.stream.actor.{ActorPublisher}
import akka.stream.{MaterializerSettings,FlowMaterializer}
import akka.stream.scaladsl2.Source
import akka.http.model._
import com.typesafe.config._
import org.reactivestreams.{Publisher,Subscriber}
Expand Down Expand Up @@ -36,7 +37,7 @@ object MainFunctions {
HttpServer.bindServer(port) {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse (
entity = new Chunked(MediaTypes.`text/plain`, publisher)
entity = new Chunked(MediaTypes.`text/plain`, Source(publisher))
)
case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
}
Expand Down
3 changes: 2 additions & 1 deletion akka-http-example/src/main/scala/StreamClientPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import scala.async.Async.{async, await}
import akka.util.{ByteString}
import akka.actor.{ActorSystem,ActorLogging,Props}
import akka.stream.{FlowMaterializer,MaterializerSettings}
import akka.stream.scaladsl2.Sink
import akka.stream.actor._
import akka.http.model.HttpEntity
import org.reactivestreams.Publisher
Expand All @@ -23,7 +24,7 @@ object StreamClientPublisher {
val processor = system.actorOf(Props[DataChunkProcessor])
val processorSubscriber = ActorSubscriber[ByteString](processor)
val processorPublisher = ActorPublisher[ChunkStreamPart](processor)
response.entity.dataBytes(materializer).subscribe(processorSubscriber)
response.entity.dataBytes.connect(Sink(processorSubscriber))
processorPublisher
}
Await.result(publisherFuture, 1.seconds)
Expand Down
6 changes: 3 additions & 3 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ object Dependencies {
val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion

val akkaHttp = "com.typesafe.akka" %% "akka-http-core-experimental" % "0.7"
val akkaStream ="com.typesafe.akka" %% "akka-stream-experimental" % "0.7"
val akkaHttp = "com.typesafe.akka" %% "akka-http-core-experimental" % "0.9"
val akkaStream ="com.typesafe.akka" %% "akka-stream-experimental" % "0.9"

val playJson = "com.typesafe.play" %% "play-json" % "2.3.2"
val scalatest = "org.scalatest" %% "scalatest" % "2.2.0" % "test"
Expand Down Expand Up @@ -58,4 +58,4 @@ object AkkaHttpStreamExample extends Build {
.settings ( resolvers ++= Seq(typesafeRepo) )
.settings ( libraryDependencies ++= Dependencies.allDependencies )
.settings ( scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature") )
}
}