Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#246: Implement basics of ParitioningReader #323

Merged
merged 63 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
b97b603
#244: Create the Info module
benedeki Aug 19, 2024
e623974
* fixed License headers
benedeki Aug 19, 2024
5e4eadb
* renamed to _Reader_
benedeki Aug 24, 2024
2e1e2ea
* README.md update
benedeki Aug 25, 2024
738c904
* fix
benedeki Aug 25, 2024
df8c9bd
* JaCoCO action update
benedeki Aug 26, 2024
5affd82
* added dummy code for testing coverage
benedeki Aug 26, 2024
0f1e121
* erroneous class renamed
benedeki Aug 26, 2024
d773a93
* Deleted wrong files
benedeki Aug 26, 2024
0776f9c
#245 Add the ability to query REST endpoints from Reader module
benedeki Sep 10, 2024
38fde1c
* Work still in progress
benedeki Sep 23, 2024
1ac2233
* the first working commit
benedeki Nov 1, 2024
e6dcb52
* Removed temporary notes
benedeki Nov 1, 2024
6968b02
* introduced `MonadError` into the `GenericServerConnection`
benedeki Nov 1, 2024
b9bacef
* Fixed UTs
benedeki Nov 1, 2024
bbb1e7f
* trying to get rid of Java 11 dependency
benedeki Nov 4, 2024
33e6628
* Downgraded sttpClient
benedeki Nov 4, 2024
f7ced56
* further downgrade
benedeki Nov 5, 2024
ca2116b
* Removed exceptions
benedeki Nov 6, 2024
e5e6f63
* commented out parts of README.md which are not yet part of the code
benedeki Nov 6, 2024
fe07272
- major rework
benedeki Nov 17, 2024
7656f6f
* doc fix
benedeki Nov 17, 2024
eb9a678
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 17, 2024
7641c07
* disabled failing test
benedeki Nov 17, 2024
bc82a5b
* adjustments
benedeki Nov 18, 2024
0e7675e
- further cleaning
benedeki Nov 18, 2024
432716a
* tests progress
benedeki Nov 21, 2024
11b0a16
* several UTs added
benedeki Nov 22, 2024
2c3f145
Merge branch 'master' into feature/245-add-the-ability-to-query-rest-…
benedeki Nov 22, 2024
e07dffb
* last improvements before PR ready
benedeki Nov 24, 2024
3955a50
* description to class `ServerConfig`
benedeki Nov 25, 2024
b287a66
* removed empty line
benedeki Nov 25, 2024
d04d23b
* addressed PR comments
benedeki Nov 27, 2024
c344249
* just better implementation
benedeki Nov 30, 2024
c0b0988
#247: Implement basics of FlowReader
benedeki Dec 4, 2024
b53ba99
Merge remote-tracking branch 'remotes/origin/master' into feature/247…
benedeki Dec 4, 2024
55d60e1
* major progress
benedeki Dec 9, 2024
5dfe5c5
* Further progress
benedeki Dec 9, 2024
67ffe07
* Flow reader methods to read checkpoints
benedeki Dec 11, 2024
09e2ed8
* small fixes
benedeki Dec 11, 2024
e7ff732
* License year
benedeki Dec 11, 2024
e63a2e4
* Finished implementation
benedeki Feb 2, 2025
1488d1f
#247: Paging support in Reader module
benedeki Feb 4, 2025
96ffa33
* some omitted files
benedeki Feb 4, 2025
8b69e3e
Merge branch 'master' into feature/247-paging-in-reader
benedeki Feb 4, 2025
04b56bd
* filenames check exclusions fix
benedeki Feb 4, 2025
698b765
Merge branch 'feature/247-paging-in-reader' of https://github.com/Abs…
benedeki Feb 4, 2025
855a333
* further fix
benedeki Feb 4, 2025
be90711
* fix trial 3
benedeki Feb 4, 2025
afd64a6
* Licenses fix
benedeki Feb 4, 2025
f70143e
Merge branch 'feature/247-paging-in-reader' into feature/247-implemen…
benedeki Feb 5, 2025
f803ea3
* uncommented endpoint for Swagger documentation creation
benedeki Feb 5, 2025
1a56f3d
* complete refactoring
benedeki Feb 21, 2025
bbed1f0
* added adr
benedeki Feb 21, 2025
15dd12a
* removing unnecessary changes
benedeki Feb 21, 2025
c630fa0
* removed unnecessary changes II
benedeki Feb 21, 2025
f4ce4d3
* Typo fixes
benedeki Feb 21, 2025
56cfa09
* addressed more PR comments
benedeki Feb 21, 2025
205952e
* Further comments address
benedeki Feb 25, 2025
172ed9c
* `PartitioningReader` methods to retrieve checkpoints and additional…
benedeki Feb 28, 2025
d7c123b
Merge branch 'master' into feature/246-implement-basics-of-paritionin…
benedeki Mar 1, 2025
14a26fa
* documentation typo fixed
benedeki Mar 3, 2025
16cb44e
Merge branch 'feature/246-implement-basics-of-paritioningreader' of h…
benedeki Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class AtumContext private[agent] (
val retrievedAD = agent.updateAdditionalData(this.atumPartitions, currAdditionalDataSubmit)

// Could be different from the one that was submitted. Replacing, just to have the most fresh copy possible.
this.additionalData = retrievedAD.data.map{ case (k, v) => (k, v.flatMap(_.value)) }
this.additionalData = retrievedAD.data.map{ case (k, v) => (k, v.map(_.value)) }
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class CapturingDispatcher(config: Config) extends Dispatcher(config) {
): AdditionalDataDTO = {
val result = AdditionalDataDTO(
additionalDataPatchDTO.data.map { case (key, value) =>
key -> Some(AdditionalDataItemDTO(Some(value), additionalDataPatchDTO.byUser))
key -> Some(AdditionalDataItemDTO(value, additionalDataPatchDTO.byUser))
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ConsoleDispatcher(config: Config) extends Dispatcher(config) with Logging

AdditionalDataDTO(
additionalDataPatchDTO.data.map { case (key, value) =>
key -> Some(AdditionalDataItemDTO(Some(value), additionalDataPatchDTO.byUser))
key -> Some(AdditionalDataItemDTO(value, additionalDataPatchDTO.byUser))
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import sttp.client3._
import sttp.model.Uri
import sttp.client3.okhttp.OkHttpSyncBackend
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.ApiPaths
import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.utils.JsonSyntaxExtensions._
Expand All @@ -31,19 +32,17 @@ import za.co.absa.atum.model.utils.JsonSyntaxExtensions._
class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
import HttpDispatcher._

val serverUrl: String = config.getString(UrlKey)
private val serverUrl: String = config.getString(UrlKey)

private val apiV1 = "/api/v1"
private val apiV2 = "/api/v2"
private val apiV1 = s"/${ApiPaths.Api}/${ApiPaths.V1}"
private val apiV2 = s"/${ApiPaths.Api}/${ApiPaths.V2}"

private val partitioningsPath = "partitionings"
private val createPartitioningEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/${ApiPaths.V1Paths.CreatePartitioning}")
private val createCheckpointEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/${ApiPaths.V1Paths.CreateCheckpoint}")

private val createPartitioningEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/createPartitioning")
private val createCheckpointEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/createCheckpoint")

private val getPartitioningIdEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath")
private val getPartitioningIdEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}")
private def createAdditionalDataEndpoint(partitioningId: Long): Uri =
Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath/$partitioningId/additional-data")
Uri.unsafeParse(s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/$partitioningId/${ApiPaths.V2Paths.AdditionalData}")

private val commonAtumRequest = basicRequest
.header("Content-Type", "application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}

case class AdditionalDataItemDTO(
value: Option[String],
value: String,
author: String
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class SerializationUtilsUnitTests extends AnyFlatSpecLike {
"asJsonString" should "serialize AdditionalDataDTO into json string" in {
val additionalDataDTO = AdditionalDataDTO(
Map[String, Option[AdditionalDataItemDTO]](
"key1" -> Some(AdditionalDataItemDTO(Some("val1"), "testAuthor")),
"key2" -> Some(AdditionalDataItemDTO(Some("val2"), "testAuthor")),
"key1" -> Some(AdditionalDataItemDTO("val1", "testAuthor")),
"key2" -> Some(AdditionalDataItemDTO("val2", "testAuthor")),
"key3" -> None
)
)
Expand All @@ -55,8 +55,8 @@ class SerializationUtilsUnitTests extends AnyFlatSpecLike {

val expectedAdditionalDataDTO = AdditionalDataDTO(
Map[String, Option[AdditionalDataItemDTO]](
"key1" -> Some(AdditionalDataItemDTO(Some("val1"), "testAuthor")),
"key2" -> Some(AdditionalDataItemDTO(Some("val2"), "testAuthor"))
"key1" -> Some(AdditionalDataItemDTO("val1", "testAuthor")),
"key2" -> Some(AdditionalDataItemDTO("val2", "testAuthor"))
)
)
val actualAdditionalDataDTO = additionalDataDTOJson.as[AdditionalDataDTO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,86 @@ package za.co.absa.atum.reader

import sttp.client3.SttpBackend
import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.ApiPaths.{Api, V2, V2Paths}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, CheckpointV2DTO}
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.reader.core.RequestResult.RequestResult
import za.co.absa.atum.reader.core.{PartitioningIdProvider, Reader}
import za.co.absa.atum.reader.requests.QueryParamNames
import za.co.absa.atum.reader.server.ServerConfig

/**
*
* @param partitioning - the Atum partitions to read the information from
* @param serverConfig - the Atum server configuration
* @param backend - sttp backend, that will be executing the requests
* @param ev - using evidence based approach to ensure that the type F is a MonadError instead of using context
* bounds, as it make the imports easier to follow
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
*/
case class PartitioningReader[F[_]](partitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
case class PartitioningReader[F[_]: MonadError](partitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any])
extends Reader[F] with PartitioningIdProvider[F]{
def foo(): String = {
// just to have some testable content
"bar"

/**
* Function to retrieve a page of checkpoints belonging to the partitioning.
* The checkpoints are ordered by their creation order.
*
* @param pageSize - the size of the page (record count) to be returned
* @param offset - offset of the page (starting position)
* @return - a page of checkpoints
*/
def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointV2DTO]]] = {
for {
partitioningIdOrError <- partitioningId(partitioning)
checkpointsOrError <- mapRequestResultF(partitioningIdOrError, queryCheckpoints(_, None, pageSize, offset))
} yield checkpointsOrError
}

/**
* Function to retrieve a page of checkpoints of the given name belonging to the partitioning. (While the usual logic
* would suggest, there would be only one checkpoint of a name, nothing prevents to have checkpoints of the same name;
* also during reprocessing the checkpoints might multiply.)
* The checkpoints are ordered by their creation order.
*
* @param checkpointName - the name to filter with
* @param pageSize - the size of the page (record count) to be returned
* @param offset - offset of the page (starting position)
* @return - a page of checkpoints
*/
def getCheckpointsOfNamePage(checkpointName: String, pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointV2DTO]]] = {
for {
partitioningIdOrError <- partitioningId(partitioning)
checkpointsOrError <- mapRequestResultF(partitioningIdOrError, queryCheckpoints(_, Some(checkpointName), pageSize, offset))
} yield checkpointsOrError
}

/**
* Returns the additional data associated with the partitioning
*
* @return - the additional data as they were save for the partitioning
*/
def getAdditionalData: F[RequestResult[AdditionalDataDTO]] = {
for{
partitioningIdOrError <- partitioningId(partitioning)
additionalDataOrError <- mapRequestResultF(partitioningIdOrError, queryAdditionalData)
} yield additionalDataOrError.map(_.data)
}

private def queryCheckpoints(partitioningId: Long,
checkpointName: Option[String],
limit: Int,
offset: Long): F[RequestResult[PaginatedResponse[CheckpointV2DTO]]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$partitioningId/${V2Paths.Checkpoints}"
val params = Map(
QueryParamNames.Limit -> limit.toString,
QueryParamNames.Offset -> offset.toString
) ++ checkpointName.map(QueryParamNames.CheckpointName -> _)
getQuery(endpoint, params)
}

private def queryAdditionalData(partitioningId: Long): F[RequestResult[SingleSuccessResponse[AdditionalDataDTO]]] = {
val endpoint = s"$Api/$V2/${V2Paths.Partitionings}/$partitioningId/${V2Paths.AdditionalData}"
getQuery(endpoint)
}
}
Loading
Loading