Skip to content

Commit

Permalink
Make a StreamingStacClient to make G is independent from F; finish se…
Browse files Browse the repository at this point in the history
…arch pagination
  • Loading branch information
pomadchin committed May 20, 2021
1 parent 5a4fbe1 commit ccbd9ed
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 34 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ lazy val client = crossProject(JSPlatform, JVMPlatform)
.settings(publishSettings)
.settings(
libraryDependencies ++= Seq(
"com.github.julien-truffaut" %%% "monocle-core" % Versions.Monocle,
"com.github.julien-truffaut" %%% "monocle-macro" % Versions.Monocle,
"io.circe" %%% "circe-core" % Versions.Circe,
"io.circe" %%% "circe-generic" % Versions.Circe,
"io.circe" %%% "circe-refined" % Versions.Circe,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.azavea.stac4s.jsTypes.TemporalExtent
import eu.timepit.refined.types.numeric.NonNegInt
import io.circe._
import io.circe.refined._
import monocle.Lens
import monocle.macros.GenLens

case class SearchFilters(
bbox: Option[Bbox] = None,
Expand All @@ -21,6 +23,7 @@ case class SearchFilters(
)

object SearchFilters extends ClientCodecs {
implicit val paginationTokenLens: Lens[SearchFilters, Option[PaginationToken]] = GenLens[SearchFilters](_.next)

implicit val searchFiltersDecoder: Decoder[SearchFilters] = { c =>
for {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.azavea.stac4s.api

package object client {
type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters]
type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters]
type StacClient[F[_]] = StacClientF[F, SearchFilters]
type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters]
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import eu.timepit.refined.types.numeric.NonNegInt
import geotrellis.vector.{io => _, _}
import io.circe._
import io.circe.refined._
import monocle.Lens
import monocle.macros.GenLens

case class SearchFilters(
bbox: Option[Bbox] = None,
Expand All @@ -21,6 +23,7 @@ case class SearchFilters(
)

object SearchFilters extends ClientCodecs {
implicit val paginationTokenLens: Lens[SearchFilters, Option[PaginationToken]] = GenLens[SearchFilters](_.next)

implicit val searchFiltersDecoder: Decoder[SearchFilters] = { c =>
for {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.azavea.stac4s.api

package object client {
type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters]
type StacClient[F[_]] = StacClientF[F, SearchFilters]
type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@ import cats.MonadThrow
import cats.syntax.apply._
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.option._
import eu.timepit.refined.types.string.NonEmptyString
import fs2.Stream
import io.circe
import io.circe.syntax._
import io.circe.{Encoder, Json}
import io.circe.{Encoder, Error, Json, JsonObject}
import monocle.Lens
import sttp.client3.circe.asJson
import sttp.client3.{ResponseException, SttpBackend, UriContext, basicRequest}
import sttp.model.Uri

case class SttpStacClientF[F[_]: MonadThrow, S: Encoder](
case class SttpStacClientF[F[_]: MonadThrow, S: Lens[*, Option[PaginationToken]]: Encoder](
client: SttpBackend[F, Any],
baseUri: Uri
) extends StreamingStacClientF[F, Stream[F, *], S] {
private val paginationTokenLens = implicitly[Lens[S, Option[PaginationToken]]]

/** Get the next page [[Uri]] from the received [[Json]] body. */
private def getNextLink(body: Either[ResponseException[String, circe.Error], Json]): F[Option[Uri]] =
/** Get the next page [[Uri]] from the retrieved [[Json]] body. */
private def getNextLink(body: Either[ResponseException[String, Error], Json]): F[Option[Uri]] =
body
.flatMap {
_.hcursor
Expand All @@ -34,22 +36,27 @@ case class SttpStacClientF[F[_]: MonadThrow, S: Encoder](

def search: Stream[F, StacItem] = search(None)

def search(filter: S): Stream[F, StacItem] = search(filter.asJson.some)
def search(filter: S): Stream[F, StacItem] = search(filter.some)

private def search(filter: Option[Json]): Stream[F, StacItem] =
private def search(filter: Option[S]): Stream[F, StacItem] = {
val emptyJson = JsonObject.empty.asJson
// the initial filter may contain the paginationToken that is used for the initial query
val initialBody = filter.map(_.asJson).getOrElse(emptyJson)
// the same filter would be used as a body for all pagination requests
val noPaginationBody = filter.map(paginationTokenLens.set(None)(_).asJson).getOrElse(emptyJson)
Stream
.unfoldLoopEval(baseUri.withPath("search")) { link =>
.unfoldLoopEval((baseUri.withPath("search"), initialBody)) { case (link, request) =>
client
.send(filter.fold(basicRequest)(f => basicRequest.body(f.asJson.noSpaces)).post(link).response(asJson[Json]))
.send(basicRequest.body(request.noSpaces).post(link).response(asJson[Json]))
.flatMap { response =>
val body = response.body
val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F]
val nextLink = getNextLink(body)

(items, nextLink).tupled
val body = response.body
val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F]
val next = getNextLink(body).map(_.map(_ -> noPaginationBody))
(items, next).tupled
}
}
.flatMap(Stream.emits)
}

def collections: Stream[F, StacCollection] =
Stream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.azavea.stac4s

import com.azavea.stac4s.testing.TestInstances

import cats.kernel.laws.discipline.SemigroupTests
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.must.Matchers
import org.scalatestplus.scalacheck.Checkers
import org.typelevel.discipline.scalatest.FunSuiteDiscipline

import cats.kernel.laws.discipline.SemigroupTests
import com.azavea.stac4s.testing.TestInstances

class JsFPLawsSpec extends AnyFunSuite with FunSuiteDiscipline with Checkers with Matchers with TestInstances {
checkAll("Semigroup.Bbox", SemigroupTests[Bbox].semigroup)
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.azavea.stac4s

import com.azavea.stac4s.testing.TestInstances

import cats.kernel.laws.discipline.SemigroupTests
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.must.Matchers
import org.scalatestplus.scalacheck.Checkers
import org.typelevel.discipline.scalatest.FunSuiteDiscipline

import cats.kernel.laws.discipline.SemigroupTests
import com.azavea.stac4s.testing.TestInstances

class JvmFPLawsSpec extends AnyFunSuite with FunSuiteDiscipline with Checkers with Matchers with TestInstances {
checkAll("Semigroup.Bbox", SemigroupTests[Bbox].semigroup)
}

0 comments on commit ccbd9ed

Please sign in to comment.