diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cec71281..b39c52df 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @benedeki @lsulak @TebaleloS @Zejnilovic @dk1844 @salamonpavel +* @benedeki @lsulak @TebaleloS @Zejnilovic @dk1844 @salamonpavel @ABLL526 diff --git a/database/README.md b/database/README.md index ebdbe080..963a16cd 100644 --- a/database/README.md +++ b/database/README.md @@ -32,3 +32,10 @@ to remove them or sbt flywayBaseline ``` to set the current state as the baseline. + +```zsh +docker kill atum_db +docker rm atum_db +docker run --name=atum_db -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=atum_db -p 5432:5432 -d postgres:16 +sbt flywayMigrate +``` \ No newline at end of file diff --git a/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql new file mode 100644 index 00000000..ba97b2e1 --- /dev/null +++ b/database/src/main/postgres/runs/V0.3.0.1__get_ancestors.sql @@ -0,0 +1,121 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.get_ancestors( + IN i_id_partitioning BIGINT, + IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT ancestorid BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.get_ancestors(3) +-- Returns Ancestors' partition ID for the given id +-- +-- Parameters: +-- i_id_partitioning - id that we asking the Ancestors for +-- i_limit - (optional) maximum number of partitionings to return, default is 5 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- ancestorid - ID of Ancestor partition +-- partitioning - partitioning data of ancestor +-- author - author of the Ancestor partitioning +-- has_more - Flag indicating if there are more partitionings available + +-- Status codes: +-- 11 - OK +-- 41 - Partitioning not found +-- 42 - Ancestor Partitioning not found +-- +------------------------------------------------------------------------------- +DECLARE + partitionCreateAt TIMESTAMP; + _has_more BOOLEAN; + +BEGIN + -- Check if the partitioning exists + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_id_partitioning; + IF NOT FOUND THEN + status := 41; + status_text := 'Partitioning not found'; + RETURN NEXT; + RETURN; + END IF; + + -- Get the creation timestamp of the partitioning + SELECT created_at + FROM runs.partitionings + WHERE id_partitioning = i_id_partitioning + INTO partitionCreateAt; + + -- Check if there are more partitionings than the limit + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow IN ( + SELECT fk_flow + FROM flows.partitioning_to_flow + WHERE fk_partitioning = i_id_partitioning + ) + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + + -- Return the ancestors + RETURN QUERY + SELECT + 11 AS status, + 'OK' AS status_text, + P.id_partitioning AS ancestorid, + P.partitioning AS partitioning, + P.created_by AS author, + _has_more AS has_more + FROM + runs.partitionings P + INNER JOIN flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning + INNER JOIN flows.partitioning_to_flow PF2 ON PF2.fk_flow = PF.fk_flow + WHERE + PF2.fk_partitioning = i_id_partitioning + AND + P.created_at < partitionCreateAt + GROUP BY P.id_partitioning + ORDER BY P.id_partitioning, P.created_at DESC + LIMIT i_limit + OFFSET i_offset; + + IF FOUND THEN + status := 11; + status_text := 'OK'; + ELSE + status := 42; + status_text := 'Ancestor Partitioning not found'; + END IF; + RETURN NEXT; + RETURN; + +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) TO atum_user; diff --git a/database/src/main/postgres/runs/V0.3.0.2__add_parent.sql b/database/src/main/postgres/runs/V0.3.0.2__add_parent.sql new file mode 100644 index 00000000..d893bc68 --- /dev/null +++ b/database/src/main/postgres/runs/V0.3.0.2__add_parent.sql @@ -0,0 +1,112 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.add_parent( + IN i_id_partitioning BIGINT, + IN i_id_parent_partitioning BIGINT, + IN i_by_user TEXT, + IN i_copy_measurements BOOLEAN DEFAULT true, + IN i_copy_additional_data BOOLEAN DEFAULT true, + OUT status INTEGER, + OUT status_text TEXT +) RETURNS record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.add_parent(5) +-- Add the Parent Partition for a given partition ID and copies measurements and additional data from parent. +-- +-- Parameters: +-- i_id_partitioning - id of the partition to be changed +-- i_id_parent_partitioning - id of the new parent of the partition, +-- i_by_user - user behind the change +-- i_copy_measurements - copies measurements +-- i_copy_additional_data - copies additional data +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- +-- Status codes: +-- 11 - OK +-- 41 - Child Partitioning not found +-- 42 - Parent Partitioning not found +-- +------------------------------------------------------------------------------- +DECLARE + Additional_Data HSTORE; + +BEGIN + + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_id_partitioning; + IF NOT FOUND THEN + status := 41; + status_text := 'Child Partitioning not found'; + RETURN; + END IF; + + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_id_parent_partitioning; + IF NOT FOUND THEN + status := 42; + status_text := 'Parent Partitioning not found'; + RETURN; + END IF; + +-- SELECT F.id_flow as mainFlow +-- FROM runs.get_partitioning_main_flow(i_partitioning_id) AS F +-- INTO mainFlow; + +-- flow_id := array( +-- SELECT fk_flow AS flow_id +-- FROM flows.partitioning_to_flow +-- WHERE fk_partitioning = i_partitioning_id +-- AND fk_flow != mainFlow +-- ); + +-- FOREACH var IN ARRAY flow_id LOOP +-- DELETE FROM flows.partitioning_to_flow AS PTF +-- WHERE PTF.fk_partitioning = i_partitioning_id +-- AND PTF.fk_flow = var; +-- END LOOP; + + IF i_copy_additional_data THEN + SELECT + hstore(array_agg(PAD.ad_name), array_agg(PAD.ad_value)) AS Additional_Data + FROM + runs.get_partitioning_additional_data(i_id_parent_partitioning) AS PAD + INTO + Additional_Data; + PERFORM 1 FROM runs.create_or_update_additional_data(i_id_partitioning, Additional_Data, i_by_user); + END IF; + + IF i_copy_measurements THEN + INSERT INTO runs.measure_definitions (fk_partitioning, measure_name, measured_columns, created_by) + SELECT i_id_partitioning, PMI.measure_name, PMI.measured_columns, i_by_user + FROM + runs.get_partitioning_measures_by_id(i_id_parent_partitioning) AS PMI; + END IF; + + PERFORM 1 FROM flows._add_to_parent_flows(i_id_parent_partitioning, i_id_partitioning, i_by_user); + status := 11; + status_text := 'Parent Updated'; + RETURN; + +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.add_parent(BIGINT, BIGINT, TEXT, BOOLEAN, BOOLEAN) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.add_parent(BIGINT, BIGINT, TEXT, BOOLEAN, BOOLEAN) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/AddParentIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/AddParentIntegrationTests.scala new file mode 100644 index 00000000..fde42577 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/AddParentIntegrationTests.scala @@ -0,0 +1,409 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString +import za.co.absa.balta.classes.setter.CustomDBType + +class AddParentIntegrationTests extends DBTestSuite { + + private val updateParentFn = "runs.add_parent" + private val createPartitioningFn = "runs.create_partitioning" + private val fncGetPartitioningAdditionalData = "runs.get_partitioning_additional_data" + private val fncGetPartitioningMeasuresById = "runs.get_partitioning_measures_by_id" + + private val partitioning = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValuesMap": { + | "key1": "valueX", + | "key2": "valueY", + | "key3": "valueZ", + | "key4": "valueA" + | } + |} + |""".stripMargin + ) + + private val parentPartitioning = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3"], + | "keysToValuesMap": { + | "key1": "valueX", + | "key3": "valueZ" + | } + |} + |""".stripMargin + ) + + private val parentPartitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3"], + | "keysToValuesMap": { + | "key1": "valueW", + | "key3": "valueY" + | } + |} + |""".stripMargin + ) + + test("Child Partitioning not found") { + val nonExistentID = 9999L + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + function(updateParentFn) + .setParam("i_id_partitioning", nonExistentID) + .setParam("i_id_parent_partitioning", parentPartitioningID) + .setParam("i_by_user", "Fantômas") + .setParam("i_copy_measurements", true) + .setParam("i_copy_additional_data", true) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Child Partitioning not found")) + assert(!queryResult.hasNext) + } + + } + + test("Parent Partitioning not found") { + val nonExistentID = 9999L + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partitioningID = function(createPartitioningFn) + .setParam("i_partitioning", partitioning) + .setParam("i_by_user", "Fantômas") + .setParam("i_parent_partitioning_id", parentPartitioningID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + function(updateParentFn) + .setParam("i_id_partitioning", partitioningID) + .setParam("i_id_parent_partitioning", nonExistentID) + .setParam("i_by_user", "Fantômas") + .setParam("i_copy_measurements", true) + .setParam("i_copy_additional_data", true) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(42)) + assert(row.getString("status_text").contains("Parent Partitioning not found")) + assert(!queryResult.hasNext) + } + + } + + test("Parent Partitioning Updated no additional data and no measurements") { + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val parentPartitioningID2 = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning2) + .setParam("i_by_user", "Tomas Riddle") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partitioningID = function(createPartitioningFn) + .setParam("i_partitioning", partitioning) + .setParam("i_by_user", "Fantômas") + .setParam("i_parent_partitioning_id", parentPartitioningID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + function(updateParentFn) + .setParam("i_id_partitioning", partitioningID) + .setParam("i_id_parent_partitioning", parentPartitioningID) + .setParam("i_by_user", "Happy Nappy") + .setParam("i_copy_measurements", false) + .setParam("i_copy_additional_data", false) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Parent Updated")) + assert(!queryResult.hasNext) + } + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID2)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2 + ) + } + + test("Parent Partitioning Updated with additional data and no measurements") { + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val parentPartitioningID2 = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning2) + .setParam("i_by_user", "Tomas Riddle") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partitioningID = function(createPartitioningFn) + .setParam("i_partitioning", partitioning) + .setParam("i_by_user", "Fantômas") + .setParam("i_parent_partitioning_id", parentPartitioningID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + table("runs.additional_data").insert( + add("fk_partitioning", parentPartitioningID) + .add("created_by", "Joseph") + .add("ad_name", "ad_1") + .add("ad_value", "This is the additional data for Joseph") + ) + + table("runs.additional_data").insert( + add("fk_partitioning", parentPartitioningID) + .add("created_by", "Joseph") + .add("ad_name", "ad_2") + .add("ad_value", "This is the additional data for Joseph") + ) + + function(updateParentFn) + .setParam("i_id_partitioning", partitioningID) + .setParam("i_id_parent_partitioning", parentPartitioningID) + .setParam("i_by_user", "Happy Nappy") + .setParam("i_copy_measurements", false) + .setParam("i_copy_additional_data", true) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Parent Updated")) + assert(!queryResult.hasNext) + } + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID2)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2 + ) + + function(fncGetPartitioningAdditionalData) + .setParam("i_partitioning_id", partitioningID) + .execute { queryResult => + val results = queryResult.next() + assert(results.getInt("status").contains(11)) + assert(results.getString("status_text").contains("OK")) + assert(results.getString("ad_name").contains("ad_1")) + assert(results.getString("ad_value").contains("This is the additional data for Joseph")) + assert(results.getString("ad_author").contains("Happy Nappy")) + + val results2 = queryResult.next() + assert(results2.getInt("status").contains(11)) + assert(results2.getString("status_text").contains("OK")) + assert(results2.getString("ad_name").contains("ad_2")) + assert(results2.getString("ad_value").contains("This is the additional data for Joseph")) + assert(results2.getString("ad_author").contains("Happy Nappy")) + + assert(!queryResult.hasNext) + } + + + } + + test("Parent Partitioning Updated with additional data and with measurements") { + + val parentPartitioningID = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning) + .setParam("i_by_user", "Albert Einstein") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val parentPartitioningID2 = function(createPartitioningFn) + .setParam("i_partitioning", parentPartitioning2) + .setParam("i_by_user", "Tomas Riddle") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } + + val partitioningID = function(createPartitioningFn) + .setParam("i_partitioning", partitioning) + .setParam("i_by_user", "Fantômas") + .setParam("i_parent_partitioning_id", parentPartitioningID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + table("runs.additional_data").insert( + add("fk_partitioning", parentPartitioningID) + .add("created_by", "Joseph") + .add("ad_name", "ad_1") + .add("ad_value", "This is the additional data for Joseph") + ) + + table("runs.measure_definitions").insert( + add("fk_partitioning", parentPartitioningID) + .add("created_by", "Joseph") + .add("measure_name", "measure1") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + ) + + table("runs.measure_definitions").insert( + add("fk_partitioning", parentPartitioningID) + .add("created_by", "Joseph") + .add("measure_name", "measure2") + .add("measured_columns", CustomDBType("""{"col2"}""", "TEXT[]")) + ) + + function(updateParentFn) + .setParam("i_id_partitioning", partitioningID) + .setParam("i_id_parent_partitioning", parentPartitioningID) + .setParam("i_by_user", "Happy Nappy") + .setParam("i_copy_measurements", true) + .setParam("i_copy_additional_data", true) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Parent Updated")) + assert(!queryResult.hasNext) + } + + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID2)) == 1 + ) + assert( + table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2 + ) + + function(fncGetPartitioningAdditionalData) + .setParam("i_partitioning_id", partitioningID) + .execute { queryResult => + val results = queryResult.next() + assert(results.getInt("status").contains(11)) + assert(results.getString("status_text").contains("OK")) + assert(results.getString("ad_name").contains("ad_1")) + assert(results.getString("ad_value").contains("This is the additional data for Joseph")) + assert(results.getString("ad_author").contains("Happy Nappy")) + } + + function(fncGetPartitioningMeasuresById) + .setParam("i_partitioning_id", partitioningID) + .execute { queryResult => + val results = queryResult.next() + assert(results.getInt("status").contains(11)) + assert(results.getString("status_text").contains("OK")) + assert(results.getString("measure_name").contains("measure1")) + assert(results.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col1"))) + + val results2 = queryResult.next() + assert(results2.getInt("status").contains(11)) + assert(results2.getString("status_text").contains("OK")) + assert(results2.getString("measure_name").contains("measure2")) + assert(results2.getArray[String]("measured_columns").map(_.toSeq).contains(Seq("col2"))) + } + } +} diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala new file mode 100644 index 00000000..fcf7cb91 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala @@ -0,0 +1,533 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import io.circe.parser.parse +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +import java.time.OffsetDateTime + +class GetAncestorsIntegrationTests extends DBTestSuite { + + private val getAncestorsFn = "runs.get_ancestors" + private val partitioningsTable = "runs.partitionings" + private val createPartitioningFn = "runs.create_partitioning" + + private val createFlowFn = "flows._create_flow" + private val addToParentFlowsFn = "flows._add_to_parent_flows" + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB", "keyC"], + | "keysToValuesMap": { + | "keyA": "valueA", + | "keyB": "valueB", + | "keyC": "valueC" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning1 = parse(partitioning1.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyD", "keyE", "keyF"], + | "keysToValuesMap": { + | "keyD": "valueD", + | "keyE": "valueE", + | "keyF": "valueF" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning2 = parse(partitioning2.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning3 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyG", "keyH", "keyI"], + | "keysToValuesMap": { + | "keyG": "valueG", + | "keyH": "valueH", + | "keyI": "valueI" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning3 = parse(partitioning3.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning4 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyJ", "keyK", "keyL"], + | "keysToValuesMap": { + | "keyJ": "valueJ", + | "keyK": "valueK", + | "keyL": "valueL" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning4 = parse(partitioning4.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning5 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyM", "keyN", "keyO"], + | "keysToValuesMap": { + | "keyM": "valueM", + | "keyN": "valueN", + | "keyO": "valueO" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning5 = parse(partitioning5.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning6 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyP", "keyQ", "keyR"], + | "keysToValuesMap": { + | "keyP": "valueP", + | "keyQ": "valueQ", + | "keyR": "valueR" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning6 = parse(partitioning6.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning7 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyS", "keyT", "keyU"], + | "keysToValuesMap": { + | "keyS": "valueS", + | "keyT": "valueT", + | "keyU": "valueU" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning7 = parse(partitioning7.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning8 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyV", "keyW", "keyX"], + | "keysToValuesMap": { + | "keyV": "valueV", + | "keyW": "valueW", + | "keyX": "valueX" + | } + |} + |""".stripMargin + ) + + var flowIdOfPartitioning1: Long = _ + var flowIdOfPartitioning2: Long = _ + var flowIdOfPartitioning3: Long = _ + + test("Returns Ancestors for a given Partition ID") { + + val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") + val Time2 = OffsetDateTime.parse("1992-08-04T10:00:00Z") + val Time3 = OffsetDateTime.parse("1992-08-05T10:00:00Z") + val Time4 = OffsetDateTime.parse("1992-08-06T10:00:00Z") + val Time5 = OffsetDateTime.parse("1992-08-07T10:00:00Z") + val Time6 = OffsetDateTime.parse("1992-08-08T10:00:00Z") + val Time7 = OffsetDateTime.parse("1992-08-09T10:00:00Z") + val Time8 = OffsetDateTime.parse("1992-08-09T11:00:00Z") + + table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Grandpa").add("created_at", Time1)) + table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Father").add("created_at", Time2)) + table(partitioningsTable).insert(add("partitioning", partitioning3).add("created_by", "Son").add("created_at", Time3)) + table(partitioningsTable).insert(add("partitioning", partitioning4).add("created_by", "Grandson").add("created_at", Time4)) + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "Grandma").add("created_at", Time5)) + table(partitioningsTable).insert(add("partitioning", partitioning6).add("created_by", "Mother").add("created_at", Time6)) + table(partitioningsTable).insert(add("partitioning", partitioning7).add("created_by", "Daughter").add("created_at", Time7)) + table(partitioningsTable).insert(add("partitioning", partitioning8).add("created_by", "Granddaughter").add("created_at", Time8)) + + val partId1: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + val partId2: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning2, "id_partitioning").get.get + + val partId3: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning3, "id_partitioning").get.get + + val partId4: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning4, "id_partitioning").get.get + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + val partId6: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning6, "id_partitioning").get.get + + val partId7: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning7, "id_partitioning").get.get + + val partId8: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning8, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId2) + .setParam("i_by_user", "Father") + .execute { queryResult => + flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId6) + .setParam("i_by_user", "Daughter") + .execute { queryResult => + flowIdOfPartitioning3 = queryResult.next().getLong("id_flow").get + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId1) + .setParam("i_fk_partitioning", partId3) + .setParam("i_by_user", "Son") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId2) + .setParam("i_fk_partitioning", partId4) + .setParam("i_by_user", "Grandson") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId6) + .setParam("i_fk_partitioning", partId7) + .setParam("i_by_user", "GrandDaughter") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId3) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId4) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId5) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId7) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + //TEST 1 Ancestors Partition + function(getAncestorsFn) + .setParam("i_id_partitioning", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + assert(row.getString("author").contains("Grandpa")) + } + + //TEST multiple Ancestors Partitions + function(getAncestorsFn) + .setParam("i_id_partitioning", partId5) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + } + + //TEST Separate flow for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_id_partitioning", partId7) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + } + + //TEST ALL flows for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_id_partitioning", partId8) + .setParam("i_limit", 10) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId5)) + assert(returnedPartitioningParsed == expectedPartitioning5) + assert(row.getString("author").contains("Grandma")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId7)) + assert(returnedPartitioningParsed == expectedPartitioning7) + assert(row.getString("author").contains("Daughter")) + } + } + + test("Change in Parent") { + + val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") + + table("runs.partitionings").insert( + add("partitioning", partitioning1) + .add("created_by", "GrandPa") + .add("created_at", Time1) + ) + + val partId1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "GrandPa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + val partId3 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning3) + .setParam("i_parent_partitioning_id", partId1) + .setParam("i_by_user", "Father") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + function(getAncestorsFn) + .setParam("i_id_partitioning", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("GrandPa")) + } + } + + test("Child Partitioning not found") { + val nonExistentID = 9999L + + function(getAncestorsFn) + .setParam("i_id_partitioning", nonExistentID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Partitioning not found")) + assert(row.getJsonB("ancestorid").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } + + test("Ancestor Partitioning not found") { + + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "NO_Ancestor")) + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(getAncestorsFn) + .setParam("i_id_partitioning", partId5) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(42)) + assert(row.getString("status_text").contains("Ancestor Partitioning not found")) + assert(row.getJsonB("ancestorid").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } +} diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/ParentPatchV2DTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/ParentPatchV2DTO.scala new file mode 100644 index 00000000..f89ddb81 --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/dto/ParentPatchV2DTO.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.dto + +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +case class ParentPatchV2DTO( + parentPartitioningId: Long, +) + +object ParentPatchV2DTO { + implicit val decoderFlowDTO: Decoder[ParentPatchV2DTO] = deriveDecoder + implicit val encoderFlowDTO: Encoder[ParentPatchV2DTO] = deriveEncoder +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 80117415..8a307015 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -33,7 +33,7 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0") lazy val ow2Version = "9.5" lazy val jacocoVersion = "0.8.11-absa.1" -def jacocoUrl(artifactName: String): String = s"https://github.com/AbsaOSS/jacoco/releases/download/$jacocoVersion/org.jacoco.$artifactName-$jacocoVersion.jar" +def jacocoUrl(artifactName: String): String = s"file:///C:/Users/ABLL526/Documents/GitHub/Jar-Store2/org.jacoco.$artifactName-$jacocoVersion.jar" def ow2Url(artifactName: String): String = s"https://repo1.maven.org/maven2/org/ow2/asm/$artifactName/$ow2Version/$artifactName-$ow2Version.jar" addSbtPlugin("com.jsuereth" %% "scala-arm" % "2.0" from "https://repo1.maven.org/maven2/com/jsuereth/scala-arm_2.11/2.0/scala-arm_2.11-2.0.jar") @@ -46,4 +46,4 @@ addSbtPlugin("org.ow2.asm" % "asm" % ow2Version from ow2Url("asm")) addSbtPlugin("org.ow2.asm" % "asm-commons" % ow2Version from ow2Url("asm-commons")) addSbtPlugin("org.ow2.asm" % "asm-tree" % ow2Version from ow2Url("asm-tree")) -addSbtPlugin("za.co.absa.sbt" % "sbt-jacoco" % "3.4.1-absa.4" from "https://github.com/AbsaOSS/sbt-jacoco/releases/download/3.4.1-absa.4/sbt-jacoco-3.4.1-absa.4.jar") +addSbtPlugin("za.co.absa.sbt" % "sbt-jacoco" % "3.4.1-absa.4" from "file:///C:/Users/ABLL526/Documents/GitHub/Jar-Store2/sbt-jacoco-3.4.1-absa.3.jar") diff --git a/publish.sbt b/publish.sbt index 486192d5..9bd2dadf 100644 --- a/publish.sbt +++ b/publish.sbt @@ -58,6 +58,12 @@ ThisBuild / developers := List( email = "tebalelo.sekhula@absa.africa", url = url("https://github.com/TebaleloS") ), + Developer( + id = "ABLL526", + name = "Liam Leibrandt", + email = "liam.leibrandt@absa.africa", + url = url("https://github.com/ABLL526") + ), Developer( id = "salamonpavel", name = "Pavel Salamon", diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 435bcbc4..65242ac5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -66,6 +66,8 @@ object Main extends ZIOAppDefault with Server { GetPartitioning.layer, GetFlowPartitionings.layer, GetPartitioningMainFlow.layer, + GetAncestors.layer, + //PatchPartitioningParent.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index 17600b7e..95788dad 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -32,6 +32,12 @@ trait PartitioningController { partitioningSubmitDTO: PartitioningSubmitV2DTO ): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)] +// def patchPartitioningParentV2( +// partitioningId: Long, +// parentPartitioningID: Long, +// byUser: String +// ): IO[ErrorResponse, SingleSuccessResponse[ParentPatchV2DTO]] + def getPartitioningAdditionalData( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] @@ -60,4 +66,11 @@ trait PartitioningController { def getPartitioningMainFlow( partitioningId: Long ): IO[ErrorResponse, SingleSuccessResponse[FlowDTO]] + + def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 20071545..68a80b0e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -99,6 +99,18 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) } yield (response, uri) } +// override def patchPartitioningParentV2( +// partitioningId: Long, +// parentPartitioningID: Long, +// byUser: String +// ): IO[ErrorResponse,SingleSuccessResponse[ParentPatchV2DTO]] = { +// mapToSingleSuccessResponse( +// serviceCall[ParentPatchV2DTO, ParentPatchV2DTO]( +// partitioningService.patchPartitioningParent(partitioningId, parentPartitioningID, byUser) +// ) +// ) +// } + override def patchPartitioningAdditionalDataV2( partitioningId: Long, additionalDataPatchDTO: AdditionalDataPatchDTO @@ -160,6 +172,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]]( + partitioningService.getAncestors(partitioningId, limit, offset) + ) + ) + } + } object PartitioningControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala new file mode 100644 index 00000000..60670f33 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestors.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import io.circe.{DecodingFailure, Json} +import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningWithIdDTO} +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors._ +import za.co.absa.atum.server.model.PartitioningForDB +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} + +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet + +import scala.annotation.tailrec + +class GetAncestors(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[GetAncestorsArgs, Option[ + GetAncestorsResult + ], Task](args => + Seq( + fr"${args.partitioningId}", + fr"${args.limit}", + fr"${args.offset}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ancestorid", "partitioning", "author", "has_more") +} + +object GetAncestors { + case class GetAncestorsArgs(partitioningId: Long, limit: Option[Int], offset: Option[Long]) + case class GetAncestorsResult(ancestorid: Long, partitioningJson: Json, author: String, hasMore: Boolean) + + object GetAncestorsResult { + + @tailrec def resultsToPartitioningWithIdDTOs( + results: Seq[GetAncestorsResult], + acc: Seq[PartitioningWithIdDTO] + ): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = { + if (results.isEmpty) Right(acc) + else { + val head = results.head + val tail = results.tail + val decodingResult = head.partitioningJson.as[PartitioningForDB] + decodingResult match { + case Left(decodingFailure) => Left(decodingFailure) + case Right(partitioningForDB) => + val partitioningDTO = partitioningForDB.keys.map { key => + PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) + } + resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.ancestorid, partitioningDTO, head.author)) + } + } + } + + } + + val layer: URLayer[PostgresDatabaseProvider, GetAncestors] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetAncestors()(Runs, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParent.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParent.scala new file mode 100644 index 00000000..92c91a77 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParent.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import za.co.absa.atum.model.dto.{FlowDTO, ParentPatchV2DTO} +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs +import za.co.absa.atum.server.api.database.runs.functions.PatchPartitioningParent.PatchPartitioningParentArgs +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} + +class PatchPartitioningParent(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieSingleResultFunctionWithStatus[PatchPartitioningParentArgs, Option[ParentPatchV2DTO], Task](input => Seq(fr"$input")) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = + super.fieldsToSelect ++ Seq("parent_id") +} + +object PatchPartitioningParent { + case class PatchPartitioningParentArgs(partitioningId: Long, parentPartitioningID: Long, byUser: String) + val layer: URLayer[PostgresDatabaseProvider, PatchPartitioningParent] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new PatchPartitioningParent()(Runs, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala index 3e590308..37b78d16 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/ApiPaths.scala @@ -40,6 +40,8 @@ object ApiPaths { final val Flows = "flows" final val Measures = "measures" final val MainFlow = "main-flow" + final val Ancestors = "ancestors" + final val Parents = "parents" } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 2c078918..ff0bd69f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -76,6 +76,18 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(conflictErrorOneOfVariant) } +// protected val patchPartitioningParentEndpointV2 +// : PublicEndpoint[(Long, Long, String), ErrorResponse, SingleSuccessResponse[ +// ParentPatchV2DTO +// ], Any] = { +// apiV2.get +// .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Parents / path[Long]("partitioningId") / V2Paths.Parents ) +// .in(query[String]("byUser")) +// .out(statusCode(StatusCode.Ok)) +// .out(jsonBody[SingleSuccessResponse[ParentPatchV2DTO]]) +// .errorOutVariantPrepend(notFoundErrorOneOfVariant) +// } + protected val getPartitioningAdditionalDataEndpointV2 : PublicEndpoint[Long, ErrorResponse, SingleSuccessResponse[AdditionalDataDTO], Any] = { apiV2.get @@ -185,6 +197,19 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(errorInDataOneOfVariant) } + protected val getAncestorsEndpointV2 + : PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[ + PartitioningWithIdDTO + ], Any] = { + apiV2.get + .in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.Ancestors) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index cc13fffa..15b6e0dd 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -24,7 +24,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, ParentPatchV2DTO, PartitioningWithIdDTO} import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} @@ -111,6 +111,26 @@ trait Routes extends Endpoints with ServerOptions { PartitioningController.getFlowPartitionings(flowId, limit, offset) } ), + createServerEndpoint[ + (Long, Option[Int], Option[Long]), + ErrorResponse, + PaginatedResponse[PartitioningWithIdDTO] + ]( + getAncestorsEndpointV2, + { case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getAncestors(partitioningId, limit, offset) + } + ), +// createServerEndpoint[ +// (Long, Long, String), +// ErrorResponse, +// SingleSuccessResponse[ParentPatchV2DTO] +// ]( +// patchPartitioningParentEndpointV2, +// { case (partitioningId: Long, parentPartitioningId: Long, byUser: String) => +// PartitioningController.patchPartitioningParentV2(partitioningId, parentPartitioningId, byUser) +// } +// ), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.succeed(StatusResponse.up)) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes @@ -130,6 +150,8 @@ trait Routes extends Endpoints with ServerOptions { // getPartitioningCheckpointEndpointV2, // getPartitioningMeasuresEndpointV2, getPartitioningEndpointV2, + getAncestorsEndpointV2, + // patchPartitioningParentEndpointV2, // getPartitioningMeasuresEndpointV2, // getFlowPartitioningsEndpointV2, // getPartitioningMainFlowEndpointV2, @@ -149,16 +171,16 @@ trait Routes extends Endpoints with ServerOptions { } private def createServerEndpoint[I, E, O]( - endpoint: PublicEndpoint[I, E, O, Any], - logic: I => ZIO[HttpEnv.Env, E, O] - ): ZServerEndpoint[HttpEnv.Env, Any] = { + endpoint: PublicEndpoint[I, E, O, Any], + logic: I => ZIO[HttpEnv.Env, E, O] + ): ZServerEndpoint[HttpEnv.Env, Any] = { endpoint.zServerLogic(logic).widen[HttpEnv.Env] } protected def allRoutes( - httpMonitoringConfig: HttpMonitoringConfig, - jvmMonitoringConfig: JvmMonitoringConfig - ): HttpRoutes[HttpEnv.F] = { + httpMonitoringConfig: HttpMonitoringConfig, + jvmMonitoringConfig: JvmMonitoringConfig + ): HttpRoutes[HttpEnv.F] = { createAllServerRoutes(httpMonitoringConfig) <+> createSwaggerRoutes <+> (if (httpMonitoringConfig.enabled) http4sMetricsRoutes else HttpRoutes.empty[HttpEnv.F]) <+> diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index e38d8bb1..4bc1a015 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -55,5 +55,17 @@ trait PartitioningRepository { offset: Option[Long] ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] + def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] + def getPartitioningMainFlow(partitioningId: Long): IO[DatabaseError, FlowDTO] + +// def patchPartitioningParent( +// partitioningId: Long, +// parentPartitioningID: Long, +// byUser: String +// ): IO[DatabaseError, ParentPatchV2DTO] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 6df3887e..e7f0552f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._ +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors._ import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.database.flows.functions._ @@ -38,7 +39,10 @@ class PartitioningRepositoryImpl( getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById, getPartitioningFn: GetPartitioning, getFlowPartitioningsFn: GetFlowPartitionings, + getAncestorsFn: GetAncestors, + //patchPartitioningParentFn: PatchPartitioningParent, getPartitioningMainFlowFn: GetPartitioningMainFlow + ) extends PartitioningRepository with BaseRepository { @@ -148,6 +152,28 @@ class PartitioningRepositoryImpl( } } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getAncestorsFn(GetAncestorsArgs(partitioningId, limit, offset)), + "getAncestors" + ).map(_.flatten) + .flatMap { partitioningResults => + ZIO + .fromEither(GetAncestorsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + partitionings => { + if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings) + else ResultNoMore(partitionings) + } + ) + } + } + override def getPartitioningMainFlow(partitioningId: Long): IO[DatabaseError, FlowDTO] = { dbSingleResultCallWithStatus( getPartitioningMainFlowFn(partitioningId), @@ -158,6 +184,20 @@ class PartitioningRepositoryImpl( } } +// override def patchPartitioningParent( +// partitioningId: Long, +// parentPartitioningID: Long, +// byUser: String +// ): IO[DatabaseError, ParentPatchV2DTO] = { +// dbSingleResultCallWithStatus( +// patchPartitioningParentFn(PatchPartitioningParentArgs(partitioningId, parentPartitioningID, byUser)), +// "patchPartitioningParent" +// ).flatMap { +// case Some(parentPatchV2DTO) => ZIO.succeed(parentPatchV2DTO) +// case None => ZIO.fail(GeneralDatabaseError("Unexpected error.")) +// } +// } + } object PartitioningRepositoryImpl { @@ -171,7 +211,10 @@ object PartitioningRepositoryImpl { with GetPartitioningMeasuresById with GetPartitioning with GetFlowPartitionings + with GetAncestors + //with PatchPartitioningParent with GetPartitioningMainFlow, + PartitioningRepository ] = ZLayer { for { @@ -185,6 +228,8 @@ object PartitioningRepositoryImpl { getPartitioning <- ZIO.service[GetPartitioning] getFlowPartitionings <- ZIO.service[GetFlowPartitionings] getPartitioningMainFlow <- ZIO.service[GetPartitioningMainFlow] + getAncestors <- ZIO.service[GetAncestors] + //patchPartitioningParent <- ZIO.service[PatchPartitioningParent] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, createPartitioning, @@ -195,7 +240,10 @@ object PartitioningRepositoryImpl { getPartitioningMeasuresById, getPartitioning, getFlowPartitionings, + getAncestors, + //patchPartitioningParent, getPartitioningMainFlow + ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 184f77fc..ec87239a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -30,6 +30,12 @@ trait PartitioningService { partitioningSubmitDTO: PartitioningSubmitV2DTO ): IO[ServiceError, PartitioningWithIdDTO] +// def patchPartitioningParent( +// partitioningId: Long, +// parentPartitioningID: Long, +// byUser: String +// ): IO[ServiceError, ParentPatchV2DTO] + def getPartitioningMeasures(partitioning: PartitioningDTO): IO[ServiceError, Seq[MeasureDTO]] def getPartitioningAdditionalData(partitioningId: Long): IO[ServiceError, AdditionalDataDTO] @@ -54,4 +60,10 @@ trait PartitioningService { ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] def getPartitioningMainFlow(partitioningId: Long): IO[ServiceError, FlowDTO] + + def getAncestors( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index cd0a5241..173206b5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -42,6 +42,17 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } +// override def patchPartitioningParent( +// partitioningId: Long, +// parentPartitioningID: Long, +// byUser: String +// ): IO[ServiceError, ParentPatchV2DTO] = { +// repositoryCall( +// partitioningRepository.patchPartitioningParent(partitioningId, parentPartitioningID, byUser), +// "patchPartitioningParent" +// ) +// } + override def getPartitioningMeasures(partitioning: PartitioningDTO): IO[ServiceError, Seq[MeasureDTO]] = { repositoryCall( partitioningRepository.getPartitioningMeasures(partitioning), @@ -104,6 +115,17 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } + override def getAncestors( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] = { + repositoryCall( + partitioningRepository.getAncestors(partitioningId, limit, offset), + "getAncestors" + ) + } + } object PartitioningServiceImpl { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 6d39bf2f..bbcec96b 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -22,6 +22,7 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.{ResultValueType, dto} import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsResult import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB, MeasureFromDB, PartitioningFromDB} import java.time.ZonedDateTime @@ -90,6 +91,20 @@ trait TestData { hasMore = true ) + protected val getAncestorsResult1: GetAncestorsResult = GetAncestorsResult( + ancestorid = 1L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + + protected val getAncestorsResult2: GetAncestorsResult = GetAncestorsResult( + ancestorid = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = true + ) + // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, @@ -121,6 +136,15 @@ trait TestData { protected val partitioningSubmitV2DTO3: PartitioningSubmitV2DTO = partitioningSubmitV2DTO1.copy(author = "yetAnotherAuthor") + + protected val parentPatchV2DTO01: ParentPatchV2DTO = ParentPatchV2DTO( + parentPartitioningId = 1111L + ) + + protected val parentPatchV2DTO02: ParentPatchV2DTO = ParentPatchV2DTO( + parentPartitioningId = 2222L + ) + // Flow protected val flowDTO1: FlowDTO = FlowDTO( id = 1L, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index f42da635..b6e330e1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -90,6 +90,20 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.getPartitioningMainFlow(999L)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) +// when(partitioningServiceMock.patchPartitioningParent(111L, 1111L, "Jack")) +// .thenReturn(ZIO.succeed(parentPatchV2DTO01)) +// when(partitioningServiceMock.patchPartitioningParent(222L, 2222L, "Jill")) +// .thenReturn(ZIO.fail(NotFoundServiceError("not found"))) +// when(partitioningServiceMock.patchPartitioningParent(999L, 2222L, "Bean")) +// .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + + when(partitioningServiceMock.getAncestors(1111L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getAncestors(8888L, Some(1), Some(0))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getAncestors(9999L, Some(1), Some(0))) + .thenReturn(ZIO.fail(NotFoundServiceError("Partitioning not found"))) + private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -229,6 +243,44 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with more data available") { + for { + result <- PartitioningController.getAncestors(1111L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = true), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected InternalServerErrorResponse when service call fails with GeneralServiceError") { + assertZIO(PartitioningController.getAncestors(8888L, Some(1), Some(0)).exit)( + failsWithA[InternalServerErrorResponse] + ) + }, + test("Returns expected NotFoundErrorResponse when service call fails with NotFoundServiceError") { + assertZIO(PartitioningController.getAncestors(9999L, Some(1), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } + ), +// suite("PatchParentSuite")( +// test("Returns expected ParentPatchV2DTO") { +// for { +// result <- PartitioningController.patchPartitioningParentV2(111L, 1111L, "Jack") +// expected = SingleSuccessResponse(parentPatchV2DTO01, uuid1) +// actual = result.copy(requestId = uuid1) +// } yield assertTrue(actual == expected) +// }, +// test("Returns expected NotFoundErrorResponse") { +// assertZIO(PartitioningController.patchPartitioningParentV2(222L, 2222L, "Jill").exit)( +// failsWithA[NotFoundErrorResponse] +// ) +// }, +// test("Returns expected InternalServerErrorResponse") { +// assertZIO(PartitioningController.patchPartitioningParentV2(999L, 2222L, "Bean").exit)( +// failsWithA[InternalServerErrorResponse] +// ) +// } +// ), suite("GetPartitioningMainFlowSuite")( test("Returns expected FlowDTO") { for { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala new file mode 100644 index 00000000..dcd380e3 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetAncestorsIntegrationTests.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.{Scope, ZIO} +import zio.test.{Spec, TestEnvironment, assertTrue} +import zio.interop.catz.asyncInstance + +object GetAncestorsIntegrationTests extends ConfigProviderTest { + + override def spec: Spec[Unit with TestEnvironment with Scope, Any] = { + suite("GetAncestorsIntegrationTests")( + test("Retrieve Ancestors' partitions for a given id") { + val partitioningID: Long = 1111L + for { + getAncestors <- ZIO.service[GetAncestors] + result <- getAncestors(GetAncestorsArgs(partitioningID, None, None)) + + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + } + ) + }.provide( + GetAncestors.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParentV2IntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParentV2IntegrationTests.scala new file mode 100644 index 00000000..127bfbfd --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/PatchPartitioningParentV2IntegrationTests.scala @@ -0,0 +1,48 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs +import za.co.absa.atum.server.api.database.runs.functions.PatchPartitioningParent.PatchPartitioningParentArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio._ +import zio.interop.catz.asyncInstance +import zio.test._ + +object PatchPartitioningParentV2IntegrationTests extends ConfigProviderTest { + override def spec: Spec[TestEnvironment with Scope, Any] = { + suite("PatchPartitioningParentSuite")( + test("Returns expected PatchParentV2DTO with provided partitioning") { + val partitioningId: Long = 999L + val parentPartitioningId: Long = 1111L + val byUser: String = "Jack" + for { + patchPartitioningParent <- ZIO.service[PatchPartitioningParent] + result <- patchPartitioningParent(PatchPartitioningParentArgs(partitioningId, parentPartitioningId, byUser)) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) + } + ).provide( + PatchPartitioningParent.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + } +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala new file mode 100644 index 00000000..c909b3dd --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetAncestorsEndpointV2UnitTests.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{UriContext, basicRequest} +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.model.envelopes.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object GetAncestorsEndpointV2UnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.getAncestors(1111L, Some(1), Some(0))) + .thenReturn( + ZIO.succeed( + PaginatedResponse(Seq.empty, Pagination(1, 0, hasMore = true), uuid1) + ) + ) + when(partitioningControllerMock.getAncestors(8888L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("Partitioning not found") + ) + ) + when(partitioningControllerMock.getAncestors(9999L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val getAncestorsServerEndpoint = + getAncestorsEndpointV2.zServerLogic({ case (partitioningId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getAncestors(partitioningId, limit: Option[Int], offset: Option[Long]) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(getAncestorsServerEndpoint) + .thenRunLogic() + .backend() + + suite("GetAncestorsEndpointSuite")( + test("Returns an expected PaginatedResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/1111/partitionings?limit=1&offset=0") + .response(asJson[PaginatedResponse[PartitioningWithIdDTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq.empty[PartitioningWithIdDTO], Pagination(1, 0, hasMore = true), uuid1)), + StatusCode.Ok + ) + ) + }, + test("Returns a NotFoundErrorResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/8888/partitionings?limit=1&offset=0") + .response(asJson[NotFoundErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns an InternalServerErrorResponse") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/9999/partitionings?limit=1&offset=0") + .response(asJson[InternalServerErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + } + ) + + }.provide(partitioningControllerMockLayer) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/PatchPartitioningParentV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/PatchPartitioningParentV2EndpointUnitTests.scala new file mode 100644 index 00000000..f998ad3f --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/PatchPartitioningParentV2EndpointUnitTests.scala @@ -0,0 +1,101 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.{FlowDTO, ParentPatchV2DTO} +import za.co.absa.atum.model.envelopes.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import zio.test.Assertion.equalTo +import sttp.client3.{UriContext, basicRequest} +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object PatchPartitioningParentV2EndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.patchPartitioningParentV2(111L, 1111L, "Jack")) + .thenReturn( + ZIO.succeed( + SingleSuccessResponse(parentPatchV2DTO01, uuid1) + ) + ) + when(partitioningControllerMock.patchPartitioningParentV2(222L, 2222L, "Jack")) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("Child Partitioning not found") + ) + ) + when(partitioningControllerMock.patchPartitioningParentV2(9999L, 9999L, "Bean")) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val patchPartitioningParentServerEndpoint = + patchPartitioningParentEndpointV2.zServerLogic({ case (partitioningId: Long, parentPartitioningId: Long, byUser: String) => + PartitioningController.patchPartitioningParentV2(partitioningId, parentPartitioningId: Long, byUser: String) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(patchPartitioningParentServerEndpoint) + .thenRunLogic() + .backend() + + suite("PatchPartitioningParentSuite")( + test("Returns an expected ParentPatchV2DTO") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/111/parents/1111/parents?byUser=Jack") + .response(asJson[SingleSuccessResponse[ParentPatchV2DTO]]) + + val response = request.send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo(Right(SingleSuccessResponse(parentPatchV2DTO01, uuid1)), StatusCode.Ok) + ) + }, + test("Returns expected 404 when partitioning not found for a given id") { + val request = basicRequest + .get(uri"https://test.com/api/v2/partitionings/222/partitionings?parentPartitioningId=1111L&byUser=Jack") + .response(asJson[SingleSuccessResponse[ParentPatchV2DTO]]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + } + ) + }.provide(partitioningControllerMockLayer) +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 69f56513..14aebc15 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -21,6 +21,8 @@ import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, Part import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors +import za.co.absa.atum.server.api.database.runs.functions.GetAncestors.GetAncestorsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -147,16 +149,30 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getFlowPartitioningsMock = mock(classOf[GetFlowPartitionings]) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(1L, Some(10), Some(0))) - ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(2L, Some(10), Some(0))) ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult2))))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(0L, None, None))) .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(3L, Some(10), Some(0))) - ).thenReturn(ZIO.fail(new Exception("boom!"))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) private val getFlowPartitioningsMockLayer = ZLayer.succeed(getFlowPartitioningsMock) + // Get Ancestors By Id Mocks + private val getAncestorsMock = mock(classOf[GetAncestors]) + + when(getAncestorsMock.apply(GetAncestorsArgs(1L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult1))))) + when(getAncestorsMock.apply(GetAncestorsArgs(1111L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getAncestorsResult2))))) + when(getAncestorsMock.apply(GetAncestorsArgs(9999L, Some(10), Some(0))) + ).thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + when(getAncestorsMock.apply(GetAncestorsArgs(8888L, Some(10), Some(0))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + + private val getAncestorsMockLayer = ZLayer.succeed(getAncestorsMock) + // Create Partitioning Mocks private val getPartitioningMainFlowMock = mock(classOf[GetPartitioningMainFlow]) @@ -170,6 +186,19 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMainFlowMockLayer = ZLayer.succeed(getPartitioningMainFlowMock) + // Create Partitioning Mocks +// private val patchPartitioningParentMock = mock(classOf[PatchPartitioningParent]) +// +// when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(111L, 1111L, "Jack"))).thenReturn( +// ZIO.right(Row(FunctionStatus(11, "OK"), Some(parentPatchV2DTO01)))) +// when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(222L, 2222L, "Jill"))) +// .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Child Partitioning not found")))) +// when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(333L, 3333L, "Bean"))) +// .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Parent Partitioning not found")))) +// when(patchPartitioningParentMock.apply(PatchPartitioningParentArgs(444L, 1111L, "Bean"))).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) +// +// private val patchPartitioningParentMockLayer = ZLayer.succeed(patchPartitioningParentMock) + override def spec: Spec[TestEnvironment with Scope, Any] = { suite("PartitioningRepositorySuite")( @@ -338,6 +367,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), +// suite("PatchPartitioningParentSuite")( +// test("Returns expected parentPatchV2DTO") { +// for { +// result <- PartitioningRepository.patchPartitioningParent(111L, 1111L, "Jack") +// } yield assertTrue(result == parentPatchV2DTO01) +// }, +// test("Returns expected NotFoundDatabaseError") { +// assertZIO(PartitioningRepository.patchPartitioningParent(222L, 2222L, "Jill").exit)( +// failsWithA[NotFoundDatabaseError] +// ) +// }, +// test("Returns expected NotFoundDatabaseError") { +// assertZIO(PartitioningRepository.patchPartitioningParent(333L, 3333L, "Bean").exit)( +// failsWithA[NotFoundDatabaseError] +// ) +// }, +// test("Returns expected GeneralDatabaseError") { +// assertZIO(PartitioningRepository.patchPartitioningParent(444L, 1111L, "Bean").exit)( +// failsWithA[GeneralDatabaseError] +// ) +// } +// ), suite("GetFlowPartitioningsSuite")( test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { for { @@ -360,6 +411,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(1L, Some(10), Some(0)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getAncestors(1111L, Some(10), Some(0)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO2))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(9999L, Some(10), Some(0)).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getAncestors(8888L, Some(10), Some(0)).exit)( + failsWithA[GeneralDatabaseError] + ) + } + ), suite("GetPartitioningSuite")( test("GetPartitioning - Returns expected PartitioningWithIdDTO") { for { @@ -389,7 +462,10 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { getPartitioningMockLayer, getPartitioningMeasuresV2MockLayer, getFlowPartitioningsMockLayer, + getAncestorsMockLayer, + //patchPartitioningParentMockLayer, getPartitioningMainFlowMockLayer + ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index 34be8a2e..7dec1faa 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -68,6 +68,15 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningById(8888L)) .thenReturn(ZIO.fail(NotFoundDatabaseError("Partitioning not found"))) + when(partitioningRepositoryMock.getAncestors(1111L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getAncestors(2222L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getAncestors(8888L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getAncestors(9999L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Partitioning not found"))) + when(partitioningRepositoryMock.getPartitioningMeasuresById(1L)) .thenReturn(ZIO.succeed(Seq(measureDTO1, measureDTO2))) when(partitioningRepositoryMock.getPartitioningMeasuresById(2L)) @@ -98,6 +107,13 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningMainFlow(3L)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) +// when(partitioningRepositoryMock.patchPartitioningParent(111L, 1111L, "Jack")) +// .thenReturn(ZIO.succeed(parentPatchV2DTO01)) +// when(partitioningRepositoryMock.patchPartitioningParent(222L, 2222L, "Jill")) +// .thenReturn(ZIO.fail(NotFoundDatabaseError("boom!"))) +// when(partitioningRepositoryMock.patchPartitioningParent(333L, 3333L, "Bean")) +// .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + private val partitioningRepositoryMockLayer = ZLayer.succeed(partitioningRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -209,6 +225,28 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), + suite("GetAncestorsSuite")( + test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getAncestors(1111L, Some(1), Some(1L)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected Right with ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getAncestors(2222L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected GeneralServiceError when database error occurs") { + assertZIO(PartitioningService.getAncestors(8888L, Some(1), Some(1L)).exit)( + failsWithA[GeneralServiceError] + ) + }, + test("Returns expected NotFoundServiceError when child partition does not exist") { + assertZIO(PartitioningService.getAncestors(9999L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundServiceError] + ) + } + ), suite("GetPartitioningMeasuresByIdSuite")( test("Returns expected Right with Seq[MeasureDTO]") { for { @@ -262,6 +300,23 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { ) } ), +// suite("PatchPartitioningParent")( +// test("Returns expected Right with ParentPatchV2DTO") { +// for { +// result <- PartitioningService.patchPartitioningParent(111L, 1111L, "Jack") +// } yield assertTrue(result == parentPatchV2DTO01) +// }, +// test("Returns expected ServiceError") { +// assertZIO(PartitioningService.patchPartitioningParent(222L, 2222L, "Jill").exit)( +// failsWithA[NotFoundServiceError] +// ) +// }, +// test("Returns expected ServiceError") { +// assertZIO(PartitioningService.patchPartitioningParent(333L, 3333L, "Bean").exit)( +// failsWithA[GeneralServiceError] +// ) +// } +// ), suite("GetFlowPartitioningsSuite")( test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { for {