Skip to content

Commit

Permalink
Make a StreamingStacClient to make G is independent from F; finish se…
Browse files Browse the repository at this point in the history
…arch pagination
  • Loading branch information
pomadchin committed May 20, 2021
1 parent 5a4fbe1 commit c4d8a6e
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.azavea.stac4s.jsTypes.TemporalExtent
import eu.timepit.refined.types.numeric.NonNegInt
import io.circe._
import io.circe.refined._
import monocle.Lens
import monocle.macros.GenLens

case class SearchFilters(
bbox: Option[Bbox] = None,
Expand All @@ -21,6 +23,7 @@ case class SearchFilters(
)

object SearchFilters extends ClientCodecs {
implicit val paginationTokenLens: Lens[SearchFilters, Option[PaginationToken]] = GenLens[SearchFilters](_.next)

implicit val searchFiltersDecoder: Decoder[SearchFilters] = { c =>
for {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.azavea.stac4s.api

package object client {
type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters]
type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters]
type StacClient[F[_]] = StacClientF[F, SearchFilters]
type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters]
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import eu.timepit.refined.types.numeric.NonNegInt
import geotrellis.vector.{io => _, _}
import io.circe._
import io.circe.refined._
import monocle.Lens
import monocle.macros.GenLens

case class SearchFilters(
bbox: Option[Bbox] = None,
Expand All @@ -21,6 +23,7 @@ case class SearchFilters(
)

object SearchFilters extends ClientCodecs {
implicit val paginationTokenLens: Lens[SearchFilters, Option[PaginationToken]] = GenLens[SearchFilters](_.next)

implicit val searchFiltersDecoder: Decoder[SearchFilters] = { c =>
for {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.azavea.stac4s.api

package object client {
type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters]
type StacClient[F[_]] = StacClientF[F, SearchFilters]
type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@ import cats.MonadThrow
import cats.syntax.apply._
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.option._
import eu.timepit.refined.types.string.NonEmptyString
import fs2.Stream
import io.circe
import io.circe.syntax._
import io.circe.{Encoder, Json}
import io.circe.{Encoder, Error, Json, JsonObject}
import monocle.Lens
import sttp.client3.circe.asJson
import sttp.client3.{ResponseException, SttpBackend, UriContext, basicRequest}
import sttp.model.Uri

case class SttpStacClientF[F[_]: MonadThrow, S: Encoder](
case class SttpStacClientF[F[_]: MonadThrow, S: Lens[*, Option[PaginationToken]]: Encoder](
client: SttpBackend[F, Any],
baseUri: Uri
) extends StreamingStacClientF[F, Stream[F, *], S] {
private val paginationTokenLens = implicitly[Lens[S, Option[PaginationToken]]]

/** Get the next page [[Uri]] from the received [[Json]] body. */
private def getNextLink(body: Either[ResponseException[String, circe.Error], Json]): F[Option[Uri]] =
/** Get the next page [[Uri]] from the retrieved [[Json]] body. */
private def getNextLink(body: Either[ResponseException[String, Error], Json]): F[Option[Uri]] =
body
.flatMap {
_.hcursor
Expand All @@ -34,22 +36,27 @@ case class SttpStacClientF[F[_]: MonadThrow, S: Encoder](

def search: Stream[F, StacItem] = search(None)

def search(filter: S): Stream[F, StacItem] = search(filter.asJson.some)
def search(filter: S): Stream[F, StacItem] = search(filter.some)

private def search(filter: Option[Json]): Stream[F, StacItem] =
private def search(filter: Option[S]): Stream[F, StacItem] = {
val emptyJson = JsonObject.empty.asJson
// the initial filter may contain the paginationToken that is used for the initial query
val initialBody = filter.map(_.asJson).getOrElse(emptyJson)
// the same filter would be used as a body for all pagination requests
val noPaginationBody = filter.map(paginationTokenLens.set(None)(_).asJson).getOrElse(emptyJson)
Stream
.unfoldLoopEval(baseUri.withPath("search")) { link =>
.unfoldLoopEval((baseUri.withPath("search"), initialBody)) { case (link, request) =>
client
.send(filter.fold(basicRequest)(f => basicRequest.body(f.asJson.noSpaces)).post(link).response(asJson[Json]))
.send(basicRequest.body(request.noSpaces).post(link).response(asJson[Json]))
.flatMap { response =>
val body = response.body
val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F]
val nextLink = getNextLink(body)

(items, nextLink).tupled
val body = response.body
val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F]
val next = getNextLink(body).map(_.map(_ -> noPaginationBody))
(items, next).tupled
}
}
.flatMap(Stream.emits)
}

def collections: Stream[F, StacCollection] =
Stream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.azavea.stac4s

import com.azavea.stac4s.testing.TestInstances

import cats.kernel.laws.discipline.SemigroupTests
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.must.Matchers
import org.scalatestplus.scalacheck.Checkers
import org.typelevel.discipline.scalatest.FunSuiteDiscipline

import cats.kernel.laws.discipline.SemigroupTests
import com.azavea.stac4s.testing.TestInstances

class JsFPLawsSpec extends AnyFunSuite with FunSuiteDiscipline with Checkers with Matchers with TestInstances {
checkAll("Semigroup.Bbox", SemigroupTests[Bbox].semigroup)
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.azavea.stac4s

import com.azavea.stac4s.testing.TestInstances

import cats.kernel.laws.discipline.SemigroupTests
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.must.Matchers
import org.scalatestplus.scalacheck.Checkers
import org.typelevel.discipline.scalatest.FunSuiteDiscipline

import cats.kernel.laws.discipline.SemigroupTests
import com.azavea.stac4s.testing.TestInstances

class JvmFPLawsSpec extends AnyFunSuite with FunSuiteDiscipline with Checkers with Matchers with TestInstances {
checkAll("Semigroup.Bbox", SemigroupTests[Bbox].semigroup)
}

0 comments on commit c4d8a6e

Please sign in to comment.