From 8dff236134eaab95b869708c60dbd9283ee70634 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Wed, 19 May 2021 21:33:56 -0400 Subject: [PATCH 1/5] StacClient supports pagination and has fs2.Streams in the interface --- CHANGELOG.md | 1 + build.sbt | 30 ++---- .../stac4s/api/client/SttpStacClient.scala | 4 +- .../stac4s/api/client/SttpStacClient.scala | 4 +- .../stac4s/api/client/StacClientF.scala | 9 +- .../stac4s/api/client/SttpStacClientF.scala | 102 ++++++++++++------ .../api/client/SttpEitherInstances.scala | 33 ++++++ .../api/client/SttpStacClientFSpec.scala | 13 ++- project/Versions.scala | 29 +++-- 9 files changed, 143 insertions(+), 82 deletions(-) create mode 100644 modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpEitherInstances.scala diff --git a/CHANGELOG.md b/CHANGELOG.md index bc81555f..d3dad47d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] ### Added - Add Scala 2.13 cross compilation [#310](https://github.com/azavea/stac4s/pull/310) +- STAC Client pagination support [#327](https://github.com/azavea/stac4s/pull/327) ### Changed - Make StacClient type alias a trait [#325](https://github.com/azavea/stac4s/pull/325) diff --git a/build.sbt b/build.sbt index 31490734..a86c6c8e 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,4 @@ import xerial.sbt.Sonatype._ -import Dependencies._ lazy val commonSettings = Seq( // We are overriding the default behavior of sbt-git which, by default, only @@ -104,22 +103,20 @@ lazy val credentialSettings = Seq( val jvmGeometryDependencies = Def.setting { Seq( - "org.locationtech.jts" % "jts-core" % Versions.Jts, - geotrellis("vector").value + "org.locationtech.jts" % "jts-core" % Versions.Jts, + "org.locationtech.geotrellis" %% "geotrellis-vector" % Versions.GeoTrellis.value ) } val coreDependenciesJVM = Def.setting { - Seq( - "org.threeten" % "threeten-extra" % Versions.ThreeTenExtra - ) ++ jvmGeometryDependencies.value + Seq("org.threeten" % "threeten-extra" % Versions.ThreeTenExtra) ++ jvmGeometryDependencies.value } val testingDependenciesJVM = Def.setting { Seq( - geotrellis("vector").value, - "org.locationtech.jts" % "jts-core" % Versions.Jts, - "org.threeten" % "threeten-extra" % Versions.ThreeTenExtra + "org.locationtech.geotrellis" %% "geotrellis-vector" % Versions.GeoTrellis.value, + "org.locationtech.jts" % "jts-core" % Versions.Jts, + "org.threeten" % "threeten-extra" % Versions.ThreeTenExtra ) } @@ -131,7 +128,7 @@ val testRunnerDependenciesJVM = Seq( lazy val root = project .in(file(".")) - .settings(moduleName := "root") + .settings(name := "stac4s") .settings(commonSettings) .settings(publishSettings) .settings(noPublishSettings) @@ -182,11 +179,7 @@ lazy val testing = crossProject(JSPlatform, JVMPlatform) ) ) .jvmSettings(libraryDependencies ++= testingDependenciesJVM.value) - .jsSettings( - libraryDependencies ++= Seq( - "io.github.cquiroz" %%% "scala-java-time" % "2.3.0" % Test - ) - ) + .jsSettings(libraryDependencies += "io.github.cquiroz" %%% "scala-java-time" % "2.3.0" % Test) lazy val testingJVM = testing.jvm lazy val testingJS = testing.js @@ -203,11 +196,7 @@ lazy val coreTest = crossProject(JSPlatform, JVMPlatform) "org.scalatestplus" %%% "scalacheck-1-14" % Versions.ScalatestPlusScalacheck % Test ) ) - .jsSettings( - libraryDependencies ++= Seq( - "io.github.cquiroz" %%% "scala-java-time" % "2.3.0" % Test - ) - ) + .jsSettings(libraryDependencies += "io.github.cquiroz" %%% "scala-java-time" % "2.3.0" % Test) lazy val coreTestJVM = coreTest.jvm lazy val coreTestJS = coreTest.js @@ -232,6 +221,7 @@ lazy val client = crossProject(JSPlatform, JVMPlatform) "com.softwaremill.sttp.client3" %%% "json-common" % Versions.Sttp, "com.softwaremill.sttp.model" %%% "core" % Versions.SttpModel, "com.softwaremill.sttp.shared" %%% "core" % Versions.SttpShared, + "co.fs2" %%% "fs2-core" % Versions.Fs2, "org.scalatest" %%% "scalatest" % Versions.Scalatest % Test ) ) diff --git a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala index abc534b1..ef88cb7f 100644 --- a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala +++ b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala @@ -1,11 +1,11 @@ package com.azavea.stac4s.api.client -import cats.MonadError +import cats.MonadThrow import sttp.client3.SttpBackend import sttp.model.Uri object SttpStacClient { - def apply[F[_]: MonadError[*[_], Throwable]](client: SttpBackend[F, Any], baseUri: Uri): SttpStacClient[F] = + def apply[F[_]: MonadThrow](client: SttpBackend[F, Any], baseUri: Uri): SttpStacClient[F] = SttpStacClientF[F, SearchFilters](client, baseUri) } diff --git a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala index abc534b1..ef88cb7f 100644 --- a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala +++ b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SttpStacClient.scala @@ -1,11 +1,11 @@ package com.azavea.stac4s.api.client -import cats.MonadError +import cats.MonadThrow import sttp.client3.SttpBackend import sttp.model.Uri object SttpStacClient { - def apply[F[_]: MonadError[*[_], Throwable]](client: SttpBackend[F, Any], baseUri: Uri): SttpStacClient[F] = + def apply[F[_]: MonadThrow](client: SttpBackend[F, Any], baseUri: Uri): SttpStacClient[F] = SttpStacClientF[F, SearchFilters](client, baseUri) } diff --git a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala index a5d038bc..c148e41f 100644 --- a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala +++ b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala @@ -3,13 +3,14 @@ package com.azavea.stac4s.api.client import com.azavea.stac4s._ import eu.timepit.refined.types.string.NonEmptyString +import fs2.Stream trait StacClientF[F[_], S] { - def search: F[List[StacItem]] - def search(filter: S): F[List[StacItem]] - def collections: F[List[StacCollection]] + def search: Stream[F, StacItem] + def search(filter: S): Stream[F, StacItem] + def collections: Stream[F, StacCollection] def collection(collectionId: NonEmptyString): F[StacCollection] - def items(collectionId: NonEmptyString): F[List[StacItem]] + def items(collectionId: NonEmptyString): Stream[F, StacItem] def item(collectionId: NonEmptyString, itemId: NonEmptyString): F[StacItem] def itemCreate(collectionId: NonEmptyString, item: StacItem): F[StacItem] def collectionCreate(collection: StacCollection): F[StacCollection] diff --git a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala index 64073420..b5328af0 100644 --- a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala +++ b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala @@ -1,42 +1,70 @@ package com.azavea.stac4s.api.client -import com.azavea.stac4s.{StacCollection, StacItem} +import com.azavea.stac4s.{StacCollection, StacItem, StacLink, StacLinkType} -import cats.MonadError +import cats.MonadThrow +import cats.syntax.apply._ +import cats.syntax.either._ import cats.syntax.flatMap._ -import cats.syntax.functor._ import cats.syntax.option._ import eu.timepit.refined.types.string.NonEmptyString +import fs2.Stream +import io.circe import io.circe.syntax._ import io.circe.{Encoder, Json} import sttp.client3.circe.asJson -import sttp.client3.{SttpBackend, basicRequest} +import sttp.client3.{ResponseException, SttpBackend, UriContext, basicRequest} import sttp.model.Uri -case class SttpStacClientF[F[_]: MonadError[*[_], Throwable], S: Encoder]( +case class SttpStacClientF[F[_]: MonadThrow, S: Encoder]( client: SttpBackend[F, Any], baseUri: Uri ) extends StacClientF[F, S] { - def search: F[List[StacItem]] = search(None) - def search(filter: S): F[List[StacItem]] = search(filter.asJson.some) + /** Get the next page [[Uri]] from the received [[Json]] body. */ + private def getNextLink(body: Either[ResponseException[String, circe.Error], Json]): F[Option[Uri]] = + body + .flatMap { + _.hcursor + .downField("links") + .as[Option[List[StacLink]]] + .map(_.flatMap(_.collectFirst { case l if l.rel == StacLinkType.Next => uri"${l.href}" })) + } + .liftTo[F] - private def search(filter: Option[Json]): F[List[StacItem]] = - client - .send { - filter - .fold(basicRequest)(f => basicRequest.body(f.asJson.noSpaces)) - .post(baseUri.withPath("search")) - .response(asJson[Json]) + def search: Stream[F, StacItem] = search(None) + + def search(filter: S): Stream[F, StacItem] = search(filter.asJson.some) + + private def search(filter: Option[Json]): Stream[F, StacItem] = + Stream + .unfoldLoopEval(baseUri.withPath("search")) { link => + client + .send(filter.fold(basicRequest)(f => basicRequest.body(f.asJson.noSpaces)).post(link).response(asJson[Json])) + .flatMap { response => + val body = response.body + val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F] + val nextLink = getNextLink(body) + + (items, nextLink).tupled + } } - .map(_.body.flatMap(_.hcursor.downField("features").as[List[StacItem]])) - .flatMap(MonadError[F, Throwable].fromEither) + .flatMap(Stream.emits) - def collections: F[List[StacCollection]] = - client - .send(basicRequest.get(baseUri.withPath("collections")).response(asJson[Json])) - .map(_.body.flatMap(_.hcursor.downField("collections").as[List[StacCollection]])) - .flatMap(MonadError[F, Throwable].fromEither) + def collections: Stream[F, StacCollection] = + Stream + .unfoldLoopEval(baseUri.withPath("collections")) { link => + client + .send(basicRequest.get(link).response(asJson[Json])) + .flatMap { response => + val body = response.body + val items = body.flatMap(_.hcursor.downField("collections").as[List[StacCollection]]).liftTo[F] + val nextLink = getNextLink(body) + + (items, nextLink).tupled + } + } + .flatMap(Stream.emits) def collection(collectionId: NonEmptyString): F[StacCollection] = client @@ -45,14 +73,23 @@ case class SttpStacClientF[F[_]: MonadError[*[_], Throwable], S: Encoder]( .get(baseUri.withPath("collections", collectionId.value)) .response(asJson[StacCollection]) ) - .map(_.body) - .flatMap(MonadError[F, Throwable].fromEither) + .flatMap(_.body.liftTo[F]) - def items(collectionId: NonEmptyString): F[List[StacItem]] = - client - .send(basicRequest.get(baseUri.withPath("collections", collectionId.value, "items")).response(asJson[Json])) - .map(_.body.flatMap(_.hcursor.downField("features").as[List[StacItem]])) - .flatMap(MonadError[F, Throwable].fromEither) + def items(collectionId: NonEmptyString): Stream[F, StacItem] = { + Stream + .unfoldLoopEval(baseUri.withPath("collections", collectionId.value, "items")) { link => + client + .send(basicRequest.get(link).response(asJson[Json])) + .flatMap { response => + val body = response.body + val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F] + val nextLink = getNextLink(body) + + (items, nextLink).tupled + } + } + .flatMap(Stream.emits) + } def item(collectionId: NonEmptyString, itemId: NonEmptyString): F[StacItem] = client @@ -61,8 +98,7 @@ case class SttpStacClientF[F[_]: MonadError[*[_], Throwable], S: Encoder]( .get(baseUri.withPath("collections", collectionId.value, "items", itemId.value)) .response(asJson[StacItem]) ) - .map(_.body) - .flatMap(MonadError[F, Throwable].fromEither) + .flatMap(_.body.liftTo[F]) def itemCreate(collectionId: NonEmptyString, item: StacItem): F[StacItem] = client @@ -72,8 +108,7 @@ case class SttpStacClientF[F[_]: MonadError[*[_], Throwable], S: Encoder]( .body(item.asJson.noSpaces) .response(asJson[StacItem]) ) - .map(_.body) - .flatMap(MonadError[F, Throwable].fromEither) + .flatMap(_.body.liftTo[F]) def collectionCreate(collection: StacCollection): F[StacCollection] = client @@ -83,6 +118,5 @@ case class SttpStacClientF[F[_]: MonadError[*[_], Throwable], S: Encoder]( .body(collection.asJson.noSpaces) .response(asJson[StacCollection]) ) - .map(_.body) - .flatMap(MonadError[F, Throwable].fromEither) + .flatMap(_.body.liftTo[F]) } diff --git a/modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpEitherInstances.scala b/modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpEitherInstances.scala new file mode 100644 index 00000000..d4c76d47 --- /dev/null +++ b/modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpEitherInstances.scala @@ -0,0 +1,33 @@ +package com.azavea.stac4s.api.client + +import cats.effect.{ExitCase, Sync} + +trait SttpEitherInstances { + + /** [[Sync]] instance defined for Either[Throwable, *]. + * It is required (sadly) to derive [[fs2.Stream.Compiler]] which is necessary for the [[fs2.Stream.compile]] function. + */ + implicit val eitherSync: Sync[Either[Throwable, *]] = new Sync[Either[Throwable, *]] { + lazy val me = cats.instances.either.catsStdInstancesForEither[Throwable] + + def suspend[A](thunk: => Either[Throwable, A]): Either[Throwable, A] = thunk + + def bracketCase[A, B](acquire: Either[Throwable, A])(use: A => Either[Throwable, B])( + release: (A, ExitCase[Throwable]) => Either[Throwable, Unit] + ): Either[Throwable, B] = + flatMap(acquire)(use) + + def flatMap[A, B](fa: Either[Throwable, A])(f: A => Either[Throwable, B]): Either[Throwable, B] = + fa.flatMap(f) + + def tailRecM[A, B](a: A)(f: A => Either[Throwable, Either[A, B]]): Either[Throwable, B] = + me.tailRecM(a)(f) + + def raiseError[A](e: Throwable): Either[Throwable, A] = me.raiseError(e) + + def handleErrorWith[A](fa: Either[Throwable, A])(f: Throwable => Either[Throwable, A]): Either[Throwable, A] = + me.handleErrorWith(fa)(f) + + def pure[A](x: A): Either[Throwable, A] = me.pure(x) + } +} diff --git a/modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpStacClientFSpec.scala b/modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpStacClientFSpec.scala index 4d54505c..96358144 100644 --- a/modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpStacClientFSpec.scala +++ b/modules/client/shared/src/test/scala/com/azavea/stac4s/api/client/SttpStacClientFSpec.scala @@ -16,7 +16,7 @@ import sttp.client3.testing.SttpBackendStub import sttp.model.Method import sttp.monad.EitherMonad -trait SttpStacClientFSpec[S] extends AnyFunSpec with Matchers with BeforeAndAfterAll { +trait SttpStacClientFSpec[S] extends AnyFunSpec with Matchers with BeforeAndAfterAll with SttpEitherInstances { def arbCollectionShort: Arbitrary[StacCollection] def arbItemCollectionShort: Arbitrary[ItemCollection] @@ -24,6 +24,7 @@ trait SttpStacClientFSpec[S] extends AnyFunSpec with Matchers with BeforeAndAfte def client: SttpStacClientF[Either[Throwable, *], S] + /** We use the default synchronous Either backend to use the same tests set for the Scala JS backend. */ lazy val backend = SttpBackendStub(EitherMonad) .whenRequestMatches(_.uri.path == Seq("search")) @@ -86,31 +87,37 @@ trait SttpStacClientFSpec[S] extends AnyFunSpec with Matchers with BeforeAndAfte describe("SttpStacClientSpec") { it("search") { - client.search + client.search.compile.toList .valueOr(throw _) } it("collections") { - client.collections + client.collections.compile.toList .valueOr(throw _) + .map(_.id should not be empty) } it("collection") { client .collection(NonEmptyString.unsafeFrom("collection_id")) .valueOr(throw _) + .id should not be empty } it("items") { client .items(NonEmptyString.unsafeFrom("collection_id")) + .compile + .toList .valueOr(throw _) + .map(_.id should not be empty) } it("item") { client .item(NonEmptyString.unsafeFrom("collection_id"), NonEmptyString.unsafeFrom("item_id")) .valueOr(throw _) + .id should not be empty } it("itemCreate") { diff --git a/project/Versions.scala b/project/Versions.scala index eb6d431f..0db5c087 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -2,10 +2,19 @@ import sbt._ import sbt.Keys._ object Versions { - val Cats = "2.6.0" + + private def ver(for212: String, for213: String) = Def.setting { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, 12)) => for212 + case Some((2, 13)) => for213 + case _ => sys.error("not good") + } + } + + val Cats = "2.6.1" val Circe = "0.13.0" val Enumeratum = "1.6.1" - val GeoTrellis = "3.6.0" + val GeoTrellis = Def.setting(ver("3.6.0", "3.6.1-SNAPSHOT").value) val Jts = "1.16.1" val Monocle = "2.1.0" val Refined = "0.9.24" @@ -19,20 +28,6 @@ object Versions { val Sttp = "3.2.3" val SttpModel = "1.4.4" val SttpShared = "1.2.2" + val Fs2 = "2.5.6" val ThreeTenExtra = "1.6.0" } - -object Dependencies { - - private def ver(for212: String, for213: String) = Def.setting { - CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, 12)) => for212 - case Some((2, 13)) => for213 - case _ => sys.error("not good") - } - } - - def geotrellis(module: String) = Def.setting { - "org.locationtech.geotrellis" %% s"geotrellis-$module" % ver(Versions.GeoTrellis, "3.6.1-SNAPSHOT").value - } -} From ee4e0d85eebf38294355c4bd29cfc8e194720fc8 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Wed, 19 May 2021 23:33:28 -0400 Subject: [PATCH 2/5] Cleanup PaginationToken companion object --- .../stac4s/api/client/PaginationToken.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/PaginationToken.scala b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/PaginationToken.scala index a91c04f1..44e8c476 100644 --- a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/PaginationToken.scala +++ b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/PaginationToken.scala @@ -2,12 +2,11 @@ package com.azavea.stac4s.api.client import cats.syntax.either._ import eu.timepit.refined.types.numeric.PosInt -import io.circe import io.circe.generic.semiauto._ import io.circe.parser.parse import io.circe.refined._ import io.circe.syntax._ -import io.circe.{Decoder, Encoder} +import io.circe.{Decoder, Encoder, Error} import java.time.Instant import java.util.Base64 @@ -18,17 +17,17 @@ final case class PaginationToken(timestampAtLeast: Instant, serialIdGreaterThan: * https://github.com/azavea/franklin/blob/f5be8ddf48661c5bc43cbd22cb7277e961641803/application/src/main/scala/com/azavea/franklin/api/schemas/package.scala#L84-L85 */ object PaginationToken { - val b64Encoder = Base64.getEncoder - val b64Decoder = Base64.getDecoder - val defaultDecoder: Decoder[PaginationToken] = deriveDecoder val defaultEncoder: Encoder[PaginationToken] = deriveEncoder + val b64Encoder: Base64.Encoder = Base64.getEncoder + val b64Decoder: Base64.Decoder = Base64.getDecoder + def encPaginationToken(token: PaginationToken): String = b64Encoder.encodeToString( token.asJson(defaultEncoder).noSpaces.getBytes ) - def decPaginationToken(encoded: String): Either[circe.Error, PaginationToken] = { + def decPaginationTokenEither(encoded: String): Either[Error, PaginationToken] = { val jsonString = new String(b64Decoder.decode(encoded)) for { js <- parse(jsonString) @@ -36,8 +35,8 @@ object PaginationToken { } yield decoded } - implicit val dec: Decoder[PaginationToken] = - Decoder.decodeString.emap(str => decPaginationToken(str).leftMap(_.getMessage)) + implicit val paginationTokenDecoder: Decoder[PaginationToken] = + Decoder.decodeString.emap(str => decPaginationTokenEither(str).leftMap(_.getMessage)) - implicit val enc: Encoder[PaginationToken] = { encPaginationToken(_).asJson } + implicit val paginationTokenEncoder: Encoder[PaginationToken] = { encPaginationToken(_).asJson } } From 95b382f2812fdf6cf2f021440003c4c1677b734d Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Thu, 20 May 2021 12:45:29 -0400 Subject: [PATCH 3/5] Split StacClient and StreamingStacClient --- .../com/azavea/stac4s/api/client/StacClient.scala | 3 ++- .../com/azavea/stac4s/api/client/StacClient.scala | 3 ++- .../com/azavea/stac4s/api/client/StacClientF.scala | 12 +++++++----- .../azavea/stac4s/api/client/SttpStacClientF.scala | 2 +- project/Versions.scala | 6 +++--- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala index c4b1a724..c863b450 100644 --- a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala +++ b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala @@ -1,3 +1,4 @@ package com.azavea.stac4s.api.client -trait StacClient[F[_]] extends StacClientF[F, SearchFilters] +trait StacClient[F[_]] extends StacClientF[F, SearchFilters] +trait StreamingStacClient[F[_], G[_]] extends StreamingStacClientF[F, G, SearchFilters] diff --git a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala index c4b1a724..c863b450 100644 --- a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala +++ b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala @@ -1,3 +1,4 @@ package com.azavea.stac4s.api.client -trait StacClient[F[_]] extends StacClientF[F, SearchFilters] +trait StacClient[F[_]] extends StacClientF[F, SearchFilters] +trait StreamingStacClient[F[_], G[_]] extends StreamingStacClientF[F, G, SearchFilters] diff --git a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala index c148e41f..3967d84d 100644 --- a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala +++ b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/StacClientF.scala @@ -3,15 +3,17 @@ package com.azavea.stac4s.api.client import com.azavea.stac4s._ import eu.timepit.refined.types.string.NonEmptyString -import fs2.Stream trait StacClientF[F[_], S] { - def search: Stream[F, StacItem] - def search(filter: S): Stream[F, StacItem] - def collections: Stream[F, StacCollection] def collection(collectionId: NonEmptyString): F[StacCollection] - def items(collectionId: NonEmptyString): Stream[F, StacItem] def item(collectionId: NonEmptyString, itemId: NonEmptyString): F[StacItem] def itemCreate(collectionId: NonEmptyString, item: StacItem): F[StacItem] def collectionCreate(collection: StacCollection): F[StacCollection] } + +trait StreamingStacClientF[F[_], G[_], S] extends StacClientF[F, S] { + def search: G[StacItem] + def search(filter: S): G[StacItem] + def collections: G[StacCollection] + def items(collectionId: NonEmptyString): G[StacItem] +} diff --git a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala index b5328af0..12948197 100644 --- a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala +++ b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala @@ -19,7 +19,7 @@ import sttp.model.Uri case class SttpStacClientF[F[_]: MonadThrow, S: Encoder]( client: SttpBackend[F, Any], baseUri: Uri -) extends StacClientF[F, S] { +) extends StreamingStacClientF[F, Stream[F, *], S] { /** Get the next page [[Uri]] from the received [[Json]] body. */ private def getNextLink(body: Either[ResponseException[String, circe.Error], Json]): F[Option[Uri]] = diff --git a/project/Versions.scala b/project/Versions.scala index 0db5c087..2f843c69 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -25,9 +25,9 @@ object Versions { val Scapegoat = "1.4.8" val Shapeless = "2.3.7" val SpdxChecker = "1.0.0" - val Sttp = "3.2.3" - val SttpModel = "1.4.4" - val SttpShared = "1.2.2" + val Sttp = "3.3.4" + val SttpModel = "1.4.7" + val SttpShared = "1.2.5" val Fs2 = "2.5.6" val ThreeTenExtra = "1.6.0" } From 5648aea82478d728cfee6c196298ee371842378d Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Thu, 20 May 2021 16:46:44 -0400 Subject: [PATCH 4/5] Make a StreamingStacClient G independent from F; finish search pagination --- build.sbt | 2 ++ .../stac4s/api/client/SearchFilters.scala | 3 ++ .../azavea/stac4s/api/client/StacClient.scala | 4 --- .../azavea/stac4s/api/client/package.scala | 5 ++- .../stac4s/api/client/SearchFilters.scala | 3 ++ .../azavea/stac4s/api/client/StacClient.scala | 4 --- .../com/azavea/stac4s/api/client/client.scala | 5 --- .../azavea/stac4s/api/client/package.scala | 8 +++++ .../stac4s/api/client/SttpStacClientF.scala | 35 +++++++++++-------- .../com/azavea/stac4s/JsFPLawsSpec.scala | 6 ++-- .../com/azavea/stac4s/JvmFPLawsSpec.scala | 6 ++-- 11 files changed, 47 insertions(+), 34 deletions(-) delete mode 100644 modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala delete mode 100644 modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala delete mode 100644 modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/client.scala create mode 100644 modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala diff --git a/build.sbt b/build.sbt index a86c6c8e..75e85a28 100644 --- a/build.sbt +++ b/build.sbt @@ -209,6 +209,8 @@ lazy val client = crossProject(JSPlatform, JVMPlatform) .settings(publishSettings) .settings( libraryDependencies ++= Seq( + "com.github.julien-truffaut" %%% "monocle-core" % Versions.Monocle, + "com.github.julien-truffaut" %%% "monocle-macro" % Versions.Monocle, "io.circe" %%% "circe-core" % Versions.Circe, "io.circe" %%% "circe-generic" % Versions.Circe, "io.circe" %%% "circe-refined" % Versions.Circe, diff --git a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala index 645182a6..34185168 100644 --- a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala +++ b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala @@ -8,6 +8,8 @@ import com.azavea.stac4s.jsTypes.TemporalExtent import eu.timepit.refined.types.numeric.NonNegInt import io.circe._ import io.circe.refined._ +import monocle.Lens +import monocle.macros.GenLens case class SearchFilters( bbox: Option[Bbox] = None, @@ -21,6 +23,7 @@ case class SearchFilters( ) object SearchFilters extends ClientCodecs { + implicit val paginationTokenLens: Lens[SearchFilters, Option[PaginationToken]] = GenLens[SearchFilters](_.next) implicit val searchFiltersDecoder: Decoder[SearchFilters] = { c => for { diff --git a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala deleted file mode 100644 index c863b450..00000000 --- a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala +++ /dev/null @@ -1,4 +0,0 @@ -package com.azavea.stac4s.api.client - -trait StacClient[F[_]] extends StacClientF[F, SearchFilters] -trait StreamingStacClient[F[_], G[_]] extends StreamingStacClientF[F, G, SearchFilters] diff --git a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala index d8f875d3..d5fa668b 100644 --- a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala +++ b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala @@ -1,5 +1,8 @@ package com.azavea.stac4s.api package object client { - type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters] + type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters] + type StacClient[F[_]] = StacClientF[F, SearchFilters] + type StreamingClient[F[_]] = StreamingStacClientF[F, fs2.Stream[F, *], SearchFilters] + type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters] } diff --git a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala index 18ad346b..c43382df 100644 --- a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala +++ b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/SearchFilters.scala @@ -8,6 +8,8 @@ import eu.timepit.refined.types.numeric.NonNegInt import geotrellis.vector.{io => _, _} import io.circe._ import io.circe.refined._ +import monocle.Lens +import monocle.macros.GenLens case class SearchFilters( bbox: Option[Bbox] = None, @@ -21,6 +23,7 @@ case class SearchFilters( ) object SearchFilters extends ClientCodecs { + implicit val paginationTokenLens: Lens[SearchFilters, Option[PaginationToken]] = GenLens[SearchFilters](_.next) implicit val searchFiltersDecoder: Decoder[SearchFilters] = { c => for { diff --git a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala deleted file mode 100644 index c863b450..00000000 --- a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/StacClient.scala +++ /dev/null @@ -1,4 +0,0 @@ -package com.azavea.stac4s.api.client - -trait StacClient[F[_]] extends StacClientF[F, SearchFilters] -trait StreamingStacClient[F[_], G[_]] extends StreamingStacClientF[F, G, SearchFilters] diff --git a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/client.scala b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/client.scala deleted file mode 100644 index d8f875d3..00000000 --- a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/client.scala +++ /dev/null @@ -1,5 +0,0 @@ -package com.azavea.stac4s.api - -package object client { - type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters] -} diff --git a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala new file mode 100644 index 00000000..d5fa668b --- /dev/null +++ b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala @@ -0,0 +1,8 @@ +package com.azavea.stac4s.api + +package object client { + type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters] + type StacClient[F[_]] = StacClientF[F, SearchFilters] + type StreamingClient[F[_]] = StreamingStacClientF[F, fs2.Stream[F, *], SearchFilters] + type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters] +} diff --git a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala index 12948197..f456d301 100644 --- a/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala +++ b/modules/client/shared/src/main/scala/com/azavea/stac4s/api/client/SttpStacClientF.scala @@ -6,23 +6,25 @@ import cats.MonadThrow import cats.syntax.apply._ import cats.syntax.either._ import cats.syntax.flatMap._ +import cats.syntax.functor._ import cats.syntax.option._ import eu.timepit.refined.types.string.NonEmptyString import fs2.Stream -import io.circe import io.circe.syntax._ -import io.circe.{Encoder, Json} +import io.circe.{Encoder, Error, Json, JsonObject} +import monocle.Lens import sttp.client3.circe.asJson import sttp.client3.{ResponseException, SttpBackend, UriContext, basicRequest} import sttp.model.Uri -case class SttpStacClientF[F[_]: MonadThrow, S: Encoder]( +case class SttpStacClientF[F[_]: MonadThrow, S: Lens[*, Option[PaginationToken]]: Encoder]( client: SttpBackend[F, Any], baseUri: Uri ) extends StreamingStacClientF[F, Stream[F, *], S] { + private val paginationTokenLens = implicitly[Lens[S, Option[PaginationToken]]] - /** Get the next page [[Uri]] from the received [[Json]] body. */ - private def getNextLink(body: Either[ResponseException[String, circe.Error], Json]): F[Option[Uri]] = + /** Get the next page [[Uri]] from the retrieved [[Json]] body. */ + private def getNextLink(body: Either[ResponseException[String, Error], Json]): F[Option[Uri]] = body .flatMap { _.hcursor @@ -34,22 +36,27 @@ case class SttpStacClientF[F[_]: MonadThrow, S: Encoder]( def search: Stream[F, StacItem] = search(None) - def search(filter: S): Stream[F, StacItem] = search(filter.asJson.some) + def search(filter: S): Stream[F, StacItem] = search(filter.some) - private def search(filter: Option[Json]): Stream[F, StacItem] = + private def search(filter: Option[S]): Stream[F, StacItem] = { + val emptyJson = JsonObject.empty.asJson + // the initial filter may contain the paginationToken that is used for the initial query + val initialBody = filter.map(_.asJson).getOrElse(emptyJson) + // the same filter would be used as a body for all pagination requests + val noPaginationBody = filter.map(paginationTokenLens.set(None)(_).asJson).getOrElse(emptyJson) Stream - .unfoldLoopEval(baseUri.withPath("search")) { link => + .unfoldLoopEval((baseUri.withPath("search"), initialBody)) { case (link, request) => client - .send(filter.fold(basicRequest)(f => basicRequest.body(f.asJson.noSpaces)).post(link).response(asJson[Json])) + .send(basicRequest.body(request.noSpaces).post(link).response(asJson[Json])) .flatMap { response => - val body = response.body - val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F] - val nextLink = getNextLink(body) - - (items, nextLink).tupled + val body = response.body + val items = body.flatMap(_.hcursor.downField("features").as[List[StacItem]]).liftTo[F] + val next = getNextLink(body).map(_.map(_ -> noPaginationBody)) + (items, next).tupled } } .flatMap(Stream.emits) + } def collections: Stream[F, StacCollection] = Stream diff --git a/modules/core-test/js/src/test/scala/com/azavea/stac4s/JsFPLawsSpec.scala b/modules/core-test/js/src/test/scala/com/azavea/stac4s/JsFPLawsSpec.scala index edf328c9..bd0d4651 100644 --- a/modules/core-test/js/src/test/scala/com/azavea/stac4s/JsFPLawsSpec.scala +++ b/modules/core-test/js/src/test/scala/com/azavea/stac4s/JsFPLawsSpec.scala @@ -1,13 +1,13 @@ package com.azavea.stac4s +import com.azavea.stac4s.testing.TestInstances + +import cats.kernel.laws.discipline.SemigroupTests import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.must.Matchers import org.scalatestplus.scalacheck.Checkers import org.typelevel.discipline.scalatest.FunSuiteDiscipline -import cats.kernel.laws.discipline.SemigroupTests -import com.azavea.stac4s.testing.TestInstances - class JsFPLawsSpec extends AnyFunSuite with FunSuiteDiscipline with Checkers with Matchers with TestInstances { checkAll("Semigroup.Bbox", SemigroupTests[Bbox].semigroup) } diff --git a/modules/core-test/jvm/src/test/scala/com/azavea/stac4s/JvmFPLawsSpec.scala b/modules/core-test/jvm/src/test/scala/com/azavea/stac4s/JvmFPLawsSpec.scala index 26b03c65..243dbc8c 100644 --- a/modules/core-test/jvm/src/test/scala/com/azavea/stac4s/JvmFPLawsSpec.scala +++ b/modules/core-test/jvm/src/test/scala/com/azavea/stac4s/JvmFPLawsSpec.scala @@ -1,13 +1,13 @@ package com.azavea.stac4s +import com.azavea.stac4s.testing.TestInstances + +import cats.kernel.laws.discipline.SemigroupTests import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.must.Matchers import org.scalatestplus.scalacheck.Checkers import org.typelevel.discipline.scalatest.FunSuiteDiscipline -import cats.kernel.laws.discipline.SemigroupTests -import com.azavea.stac4s.testing.TestInstances - class JvmFPLawsSpec extends AnyFunSuite with FunSuiteDiscipline with Checkers with Matchers with TestInstances { checkAll("Semigroup.Bbox", SemigroupTests[Bbox].semigroup) } From e85ee2fd9cfc3d878530f8093f5951c47de66919 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Fri, 21 May 2021 10:54:20 -0400 Subject: [PATCH 5/5] Rename type alias --- .../src/main/scala/com/azavea/stac4s/api/client/package.scala | 2 +- .../src/main/scala/com/azavea/stac4s/api/client/package.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala index d5fa668b..71a6050f 100644 --- a/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala +++ b/modules/client/js/src/main/scala/com/azavea/stac4s/api/client/package.scala @@ -3,6 +3,6 @@ package com.azavea.stac4s.api package object client { type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters] type StacClient[F[_]] = StacClientF[F, SearchFilters] - type StreamingClient[F[_]] = StreamingStacClientF[F, fs2.Stream[F, *], SearchFilters] + type StreamingStacClientFS2[F[_]] = StreamingStacClientF[F, fs2.Stream[F, *], SearchFilters] type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters] } diff --git a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala index d5fa668b..71a6050f 100644 --- a/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala +++ b/modules/client/jvm/src/main/scala/com/azavea/stac4s/api/client/package.scala @@ -3,6 +3,6 @@ package com.azavea.stac4s.api package object client { type SttpStacClient[F[_]] = SttpStacClientF[F, SearchFilters] type StacClient[F[_]] = StacClientF[F, SearchFilters] - type StreamingClient[F[_]] = StreamingStacClientF[F, fs2.Stream[F, *], SearchFilters] + type StreamingStacClientFS2[F[_]] = StreamingStacClientF[F, fs2.Stream[F, *], SearchFilters] type StreamingStacClient[F[_], G[_]] = StreamingStacClientF[F, G, SearchFilters] }