Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 16cb44e

Browse files
committedMar 3, 2025··
Merge branch 'feature/246-implement-basics-of-paritioningreader' of https://github.com/AbsaOSS/atum-service into feature/246-implement-basics-of-paritioningreader
2 parents 14a26fa + d7c123b commit 16cb44e

File tree

5 files changed

+34
-13
lines changed

5 files changed

+34
-13
lines changed
 

‎.github/workflows/jacoco_report.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ on:
2424
env:
2525
scalaLong: 2.13.11
2626
scalaShort: "2.13"
27-
coverage-overall: 58.0
27+
coverage-overall: 56.0
2828
coverage-changed-files: 80.0
2929
check-overall-coverages: true
3030

‎build.sbt

+1-2
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,7 @@ lazy val reader = (projectMatrix in file("reader"))
136136
.settings(
137137
Setup.commonSettings ++ Seq(
138138
name := "atum-reader",
139-
javacOptions ++= Setup.clientJavacOptions,
140-
publish / skip := true // module is not yet finished, so we don't want to publish it
139+
javacOptions ++= Setup.clientJavacOptions
141140
): _*
142141
)
143142
.addScalaCrossBuild(Setup.clientSupportedScalaVersions, Dependencies.readerDependencies)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2021 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
UPDATE runs.checkpoints
17+
SET measured_by_atum_agent = FALSE
18+
WHERE measured_by_atum_agent IS NULL;
19+
20+
ALTER TABLE runs.checkpoints
21+
ALTER COLUMN measured_by_atum_agent SET DEFAULT FALSE,
22+
ALTER COLUMN measured_by_atum_agent SET NOT NULL;

‎reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala

+8-8
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ import za.co.absa.atum.reader.server.ServerConfig
3636
* @param backend - sttp backend, that will be executing the requests
3737
* @tparam F - the effect type (e.g. Future, IO, Task, etc.)
3838
*/
39-
class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)
40-
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any])
39+
case class FlowReader[F[_]: MonadError](mainFlowPartitioning: AtumPartitions)
40+
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any])
4141
extends Reader[F] with PartitioningIdProvider[F] {
4242

4343
/**
@@ -50,8 +50,8 @@ class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)
5050
*/
5151
def getCheckpointsPage(pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
5252
for {
53-
mainPartitioningIdOrErrror <- partitioningId(mainFlowPartitioning)
54-
flowIdOrError <- mapRequestResultF(mainPartitioningIdOrErrror, queryFlowId)
53+
mainPartitioningIdOrError <- partitioningId(mainFlowPartitioning)
54+
flowIdOrError <- mapRequestResultF(mainPartitioningIdOrError, queryFlowId)
5555
checkpointsOrError <- mapRequestResultF(flowIdOrError, queryCheckpoints(_, None, pageSize, offset))
5656
} yield checkpointsOrError
5757
}
@@ -67,10 +67,10 @@ class FlowReader[F[_]: MonadError](val mainFlowPartitioning: AtumPartitions)
6767
*/
6868
def getCheckpointsOfNamePage(checkpointName: String, pageSize: Int = 10, offset: Long = 0): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
6969
for {
70-
mainPartitioningId <- partitioningId(mainFlowPartitioning)
71-
flowId <- mapRequestResultF(mainPartitioningId, queryFlowId)
72-
checkpoints <- mapRequestResultF(flowId, queryCheckpoints(_, Some(checkpointName), pageSize, offset))
73-
} yield checkpoints
70+
mainPartitioningIdOrError <- partitioningId(mainFlowPartitioning)
71+
flowIdOrError <- mapRequestResultF(mainPartitioningIdOrError, queryFlowId)
72+
checkpointsOrError <- mapRequestResultF(flowIdOrError, queryCheckpoints(_, Some(checkpointName), pageSize, offset))
73+
} yield checkpointsOrError
7474
}
7575

7676
private def queryFlowId(mainPartitioningId: Long): F[RequestResult[Long]] = {

‎reader/src/test/scala/za/co/absa/atum/reader/FlowReaderUnitTests.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class FlowReaderUnitTests extends AnyFunSuiteLike {
117117
requestId = UUID.fromString("29ce91a7-b668-41d2-a160-26402551fb0b")
118118
)
119119

120-
val reader = new FlowReader(atumPartitions)
120+
val reader = FlowReader(atumPartitions)
121121
val result = reader.getCheckpointsPage()
122122
assert(result == Right(expectedData))
123123
}
@@ -190,7 +190,7 @@ class FlowReaderUnitTests extends AnyFunSuiteLike {
190190
requestId = UUID.fromString("29ce91a7-b668-41d2-a160-26402551fb0b")
191191
)
192192

193-
val reader = new FlowReader(atumPartitions)
193+
val reader = FlowReader(atumPartitions)
194194
val result = reader.getCheckpointsOfNamePage("Test checkpoints 1")
195195
assert(result == Right(expectedData))
196196
}

0 commit comments

Comments
 (0)
Please sign in to comment.