From bbba4ae3a6207cc1e81ddc76e2bc172d154aafa3 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Tue, 21 Jan 2025 22:29:06 +0200 Subject: [PATCH 1/8] Added the getAncestors Database functionality. 1. Made the necessary changes as mentioned by the team. 3. Made the necessary changes to the getAncestors Database functionality. --- .../postgres/runs/V0.3.0.2__get_ancestors.sql | 121 ++++ .../runs/GetAncestorsIntegrationTests.scala | 533 ++++++++++++++++++ 2 files changed, 654 insertions(+) create mode 100644 database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql create mode 100644 database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala diff --git a/database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql new file mode 100644 index 00000000..ba97b2e1 --- /dev/null +++ b/database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql @@ -0,0 +1,121 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.get_ancestors( + IN i_id_partitioning BIGINT, + IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT ancestorid BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.get_ancestors(3) +-- Returns Ancestors' partition ID for the given id +-- +-- Parameters: +-- i_id_partitioning - id that we asking the Ancestors for +-- i_limit - (optional) maximum number of partitionings to return, default is 5 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- ancestorid - ID of Ancestor partition +-- partitioning - partitioning data of ancestor +-- author - author of the Ancestor partitioning +-- has_more - Flag indicating if there are more partitionings available + +-- Status codes: +-- 11 - OK +-- 41 - Partitioning not found +-- 42 - Ancestor Partitioning not found +-- +------------------------------------------------------------------------------- +DECLARE + partitionCreateAt TIMESTAMP; + _has_more BOOLEAN; + +BEGIN + -- Check if the partitioning exists + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_id_partitioning; + IF NOT FOUND THEN + status := 41; + status_text := 'Partitioning not found'; + RETURN NEXT; + RETURN; + END IF; + + -- Get the creation timestamp of the partitioning + SELECT created_at + FROM runs.partitionings + WHERE id_partitioning = i_id_partitioning + INTO partitionCreateAt; + + -- Check if there are more partitionings than the limit + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow IN ( + SELECT fk_flow + FROM flows.partitioning_to_flow + WHERE fk_partitioning = i_id_partitioning + ) + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + + -- Return the ancestors + RETURN QUERY + SELECT + 11 AS status, + 'OK' AS status_text, + P.id_partitioning AS ancestorid, + P.partitioning AS partitioning, + P.created_by AS author, + _has_more AS has_more + FROM + runs.partitionings P + INNER JOIN flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning + INNER JOIN flows.partitioning_to_flow PF2 ON PF2.fk_flow = PF.fk_flow + WHERE + PF2.fk_partitioning = i_id_partitioning + AND + P.created_at < partitionCreateAt + GROUP BY P.id_partitioning + ORDER BY P.id_partitioning, P.created_at DESC + LIMIT i_limit + OFFSET i_offset; + + IF FOUND THEN + status := 11; + status_text := 'OK'; + ELSE + status := 42; + status_text := 'Ancestor Partitioning not found'; + END IF; + RETURN NEXT; + RETURN; + +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala new file mode 100644 index 00000000..fcf7cb91 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala @@ -0,0 +1,533 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import io.circe.parser.parse +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +import java.time.OffsetDateTime + +class GetAncestorsIntegrationTests extends DBTestSuite { + + private val getAncestorsFn = "runs.get_ancestors" + private val partitioningsTable = "runs.partitionings" + private val createPartitioningFn = "runs.create_partitioning" + + private val createFlowFn = "flows._create_flow" + private val addToParentFlowsFn = "flows._add_to_parent_flows" + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB", "keyC"], + | "keysToValuesMap": { + | "keyA": "valueA", + | "keyB": "valueB", + | "keyC": "valueC" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning1 = parse(partitioning1.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyD", "keyE", "keyF"], + | "keysToValuesMap": { + | "keyD": "valueD", + | "keyE": "valueE", + | "keyF": "valueF" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning2 = parse(partitioning2.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning3 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyG", "keyH", "keyI"], + | "keysToValuesMap": { + | "keyG": "valueG", + | "keyH": "valueH", + | "keyI": "valueI" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning3 = parse(partitioning3.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning4 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyJ", "keyK", "keyL"], + | "keysToValuesMap": { + | "keyJ": "valueJ", + | "keyK": "valueK", + | "keyL": "valueL" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning4 = parse(partitioning4.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning5 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyM", "keyN", "keyO"], + | "keysToValuesMap": { + | "keyM": "valueM", + | "keyN": "valueN", + | "keyO": "valueO" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning5 = parse(partitioning5.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning6 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyP", "keyQ", "keyR"], + | "keysToValuesMap": { + | "keyP": "valueP", + | "keyQ": "valueQ", + | "keyR": "valueR" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning6 = parse(partitioning6.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning7 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyS", "keyT", "keyU"], + | "keysToValuesMap": { + | "keyS": "valueS", + | "keyT": "valueT", + | "keyU": "valueU" + | } + |} + |""".stripMargin + ) + + private val expectedPartitioning7 = parse(partitioning7.value).getOrElse(throw new Exception("Failed to parse JSON")) + + private val partitioning8 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyV", "keyW", "keyX"], + | "keysToValuesMap": { + | "keyV": "valueV", + | "keyW": "valueW", + | "keyX": "valueX" + | } + |} + |""".stripMargin + ) + + var flowIdOfPartitioning1: Long = _ + var flowIdOfPartitioning2: Long = _ + var flowIdOfPartitioning3: Long = _ + + test("Returns Ancestors for a given Partition ID") { + + val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") + val Time2 = OffsetDateTime.parse("1992-08-04T10:00:00Z") + val Time3 = OffsetDateTime.parse("1992-08-05T10:00:00Z") + val Time4 = OffsetDateTime.parse("1992-08-06T10:00:00Z") + val Time5 = OffsetDateTime.parse("1992-08-07T10:00:00Z") + val Time6 = OffsetDateTime.parse("1992-08-08T10:00:00Z") + val Time7 = OffsetDateTime.parse("1992-08-09T10:00:00Z") + val Time8 = OffsetDateTime.parse("1992-08-09T11:00:00Z") + + table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Grandpa").add("created_at", Time1)) + table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Father").add("created_at", Time2)) + table(partitioningsTable).insert(add("partitioning", partitioning3).add("created_by", "Son").add("created_at", Time3)) + table(partitioningsTable).insert(add("partitioning", partitioning4).add("created_by", "Grandson").add("created_at", Time4)) + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "Grandma").add("created_at", Time5)) + table(partitioningsTable).insert(add("partitioning", partitioning6).add("created_by", "Mother").add("created_at", Time6)) + table(partitioningsTable).insert(add("partitioning", partitioning7).add("created_by", "Daughter").add("created_at", Time7)) + table(partitioningsTable).insert(add("partitioning", partitioning8).add("created_by", "Granddaughter").add("created_at", Time8)) + + val partId1: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + val partId2: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning2, "id_partitioning").get.get + + val partId3: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning3, "id_partitioning").get.get + + val partId4: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning4, "id_partitioning").get.get + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + val partId6: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning6, "id_partitioning").get.get + + val partId7: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning7, "id_partitioning").get.get + + val partId8: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning8, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId2) + .setParam("i_by_user", "Father") + .execute { queryResult => + flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId6) + .setParam("i_by_user", "Daughter") + .execute { queryResult => + flowIdOfPartitioning3 = queryResult.next().getLong("id_flow").get + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId1) + .setParam("i_fk_partitioning", partId3) + .setParam("i_by_user", "Son") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId2) + .setParam("i_fk_partitioning", partId4) + .setParam("i_by_user", "Grandson") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId6) + .setParam("i_fk_partitioning", partId7) + .setParam("i_by_user", "GrandDaughter") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId3) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId4) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "GrandMa") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId5) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId7) + .setParam("i_fk_partitioning", partId8) + .setParam("i_by_user", "Mother") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + //TEST 1 Ancestors Partition + function(getAncestorsFn) + .setParam("i_id_partitioning", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + assert(row.getString("author").contains("Grandpa")) + } + + //TEST multiple Ancestors Partitions + function(getAncestorsFn) + .setParam("i_id_partitioning", partId5) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + } + + //TEST Separate flow for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_id_partitioning", partId7) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + } + + //TEST ALL flows for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_id_partitioning", partId8) + .setParam("i_limit", 10) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandpa")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Father")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Son")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Grandson")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId5)) + assert(returnedPartitioningParsed == expectedPartitioning5) + assert(row.getString("author").contains("Grandma")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Mother")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId7)) + assert(returnedPartitioningParsed == expectedPartitioning7) + assert(row.getString("author").contains("Daughter")) + } + } + + test("Change in Parent") { + + val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") + + table("runs.partitionings").insert( + add("partitioning", partitioning1) + .add("created_by", "GrandPa") + .add("created_at", Time1) + ) + + val partId1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "GrandPa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + val partId3 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning3) + .setParam("i_parent_partitioning_id", partId1) + .setParam("i_by_user", "Father") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(12)) + assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) + row.getLong("id_partitioning").get + } + + function(getAncestorsFn) + .setParam("i_id_partitioning", partId3) + .execute { queryResult => + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestorid").contains(partId1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("GrandPa")) + } + } + + test("Child Partitioning not found") { + val nonExistentID = 9999L + + function(getAncestorsFn) + .setParam("i_id_partitioning", nonExistentID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Partitioning not found")) + assert(row.getJsonB("ancestorid").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } + + test("Ancestor Partitioning not found") { + + table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "NO_Ancestor")) + + val partId5: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId5) + .setParam("i_by_user", "Grandpa") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(getAncestorsFn) + .setParam("i_id_partitioning", partId5) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(42)) + assert(row.getString("status_text").contains("Ancestor Partitioning not found")) + assert(row.getJsonB("ancestorid").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } +} From e479ea3030a7983393d3d6c9ebf8f6cbfec6f330 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Tue, 21 Jan 2025 22:29:06 +0200 Subject: [PATCH 2/8] Added the getAncestors Database functionality. 1. Made the necessary changes as mentioned by the team. 2. Made the necessary changes to the getAncestors Database functionality. 3. Made changes as requested --- .../V0.3.0.2__get_partitioning_ancestors.sql | 130 ++++++++ .../runs/GetAncestorsIntegrationTests.scala | 308 +++++++++--------- 2 files changed, 285 insertions(+), 153 deletions(-) create mode 100644 database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql diff --git a/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql new file mode 100644 index 00000000..2377b8dd --- /dev/null +++ b/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql @@ -0,0 +1,130 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION runs.get_partitioning_ancestors( + IN i_id_partitioning BIGINT, + IN i_limit INT DEFAULT 10, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT ancestor_id BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.get_partitioning_ancestors(3) +-- Returns the ids and partitionings of the ancestors of the given partition id. +-- +-- Parameters: +-- i_id_partitioning - id that we're asking the Ancestors for +-- i_limit - (optional) maximum number of partitionings to return, default is 10 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status message +-- ancestor_id - ID of Ancestor partitioning +-- partitioning - partitioning data of ancestor +-- author - author of the Ancestor partitioning +-- has_more - Flag indicating if there are more partitionings available + +-- Status codes: +-- 10 - OK +-- 41 - Partitioning not found +-- +------------------------------------------------------------------------------- +DECLARE + _partitioningCreateAt TIMESTAMP; + _has_more BOOLEAN; + +BEGIN + -- Check if the partitioning exists + PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_id_partitioning; + IF NOT FOUND THEN + status := 41; + status_text := 'Partitioning not found'; + RETURN NEXT; + RETURN; + END IF; + + -- Get the creation timestamp of the partitioning + SELECT created_at + FROM runs.partitionings + WHERE id_partitioning = i_id_partitioning + INTO _partitioningCreateAt; + + -- Check if there are more partitionings than the limit + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow IN ( + SELECT fk_flow + FROM flows.partitioning_to_flow + WHERE fk_partitioning = i_id_partitioning + ) + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + + -- Return the ancestors + RETURN QUERY + SELECT + 10 AS status, + 'OK' AS status_text, + P.id_partitioning AS ancestor_id, + P.partitioning AS partitioning, + P.created_by AS author, + _has_more AS has_more +-- FROM +-- flows.partitioning_to_flow PF +-- INNER JOIN flows.flows F ON F.id_flow = PF.fk_flow +-- INNER JOIN runs.partitionings P ON P.id_partitioning = F.fk_primary_partitioning +-- WHERE +-- PF.fk_partitioning = i_id_partitioning AND +-- P.id_partitioning IS DISTINCT FROM i_id_partitioning + FROM + runs.partitionings P + INNER JOIN flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning + INNER JOIN flows.partitioning_to_flow PF2 ON PF2.fk_flow = PF.fk_flow + WHERE + PF2.fk_partitioning = i_id_partitioning + AND + PF.created_at <= PF2.created_at + GROUP BY P.id_partitioning , PF.created_at + ORDER BY P.id_partitioning , PF.created_at DESC + LIMIT i_limit + OFFSET i_offset; + + + IF NOT FOUND THEN + status := 10; + status_text := 'OK'; + RETURN NEXT; +-- TODO: Remove comment if not needed +-- ELSE +-- status := 42; +-- status_text := 'Ancestor Partitioning not found'; + END IF; +-- RETURN NEXT; + RETURN; + +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.get_partitioning_ancestors(BIGINT, INT, BIGINT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.get_partitioning_ancestors(BIGINT, INT, BIGINT) TO atum_user; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala index fcf7cb91..10b36c1d 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala @@ -24,7 +24,7 @@ import java.time.OffsetDateTime class GetAncestorsIntegrationTests extends DBTestSuite { - private val getAncestorsFn = "runs.get_ancestors" + private val getAncestorsFn = "runs.get_partitioning_ancestors" private val partitioningsTable = "runs.partitionings" private val createPartitioningFn = "runs.create_partitioning" @@ -163,14 +163,14 @@ class GetAncestorsIntegrationTests extends DBTestSuite { test("Returns Ancestors for a given Partition ID") { - val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") - val Time2 = OffsetDateTime.parse("1992-08-04T10:00:00Z") - val Time3 = OffsetDateTime.parse("1992-08-05T10:00:00Z") - val Time4 = OffsetDateTime.parse("1992-08-06T10:00:00Z") - val Time5 = OffsetDateTime.parse("1992-08-07T10:00:00Z") - val Time6 = OffsetDateTime.parse("1992-08-08T10:00:00Z") - val Time7 = OffsetDateTime.parse("1992-08-09T10:00:00Z") - val Time8 = OffsetDateTime.parse("1992-08-09T11:00:00Z") + val Time1 = OffsetDateTime.parse("2025-08-03T10:00:00Z") + val Time2 = OffsetDateTime.parse("2025-08-04T10:00:00Z") + val Time3 = OffsetDateTime.parse("2025-08-05T10:00:00Z") + val Time4 = OffsetDateTime.parse("2025-08-06T10:00:00Z") + val Time5 = OffsetDateTime.parse("2025-08-07T10:00:00Z") + val Time6 = OffsetDateTime.parse("2025-08-08T10:00:00Z") + val Time7 = OffsetDateTime.parse("2025-08-09T10:00:00Z") + val Time8 = OffsetDateTime.parse("2025-08-09T11:00:00Z") table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Grandpa").add("created_at", Time1)) table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Father").add("created_at", Time2)) @@ -304,12 +304,13 @@ class GetAncestorsIntegrationTests extends DBTestSuite { val returnedPartitioning = row.getJsonB("partitioning").get val returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) + assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId1)) + assert(row.getLong("ancestor_id").contains(partId1)) assert(returnedPartitioningParsed == expectedPartitioning1) assert(row.getString("author").contains("Grandpa")) assert(row.getString("author").contains("Grandpa")) + assert(!queryResult.hasNext) } //TEST multiple Ancestors Partitions @@ -320,171 +321,172 @@ class GetAncestorsIntegrationTests extends DBTestSuite { var returnedPartitioning = row.getJsonB("partitioning").get var returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) + assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId1)) + assert(row.getLong("ancestor_id").contains(partId1)) assert(returnedPartitioningParsed == expectedPartitioning1) assert(row.getString("author").contains("Grandpa")) row = queryResult.next() returnedPartitioning = row.getJsonB("partitioning").get returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) + assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId2)) + assert(row.getLong("ancestor_id").contains(partId2)) assert(returnedPartitioningParsed == expectedPartitioning2) assert(row.getString("author").contains("Father")) row = queryResult.next() returnedPartitioning = row.getJsonB("partitioning").get returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) + assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId3)) + assert(row.getLong("ancestor_id").contains(partId3)) assert(returnedPartitioningParsed == expectedPartitioning3) assert(row.getString("author").contains("Son")) row = queryResult.next() returnedPartitioning = row.getJsonB("partitioning").get returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) + assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId4)) + assert(row.getLong("ancestor_id").contains(partId4)) assert(returnedPartitioningParsed == expectedPartitioning4) assert(row.getString("author").contains("Grandson")) + assert(!queryResult.hasNext) } - - //TEST Separate flow for Ancestors Partitions - function(getAncestorsFn) - .setParam("i_id_partitioning", partId7) - .execute { queryResult => - val row = queryResult.next() - val returnedPartitioning = row.getJsonB("partitioning").get - val returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId6)) - assert(returnedPartitioningParsed == expectedPartitioning6) - assert(row.getString("author").contains("Mother")) - } - - //TEST ALL flows for Ancestors Partitions - function(getAncestorsFn) - .setParam("i_id_partitioning", partId8) - .setParam("i_limit", 10) - .execute { queryResult => - var row = queryResult.next() - var returnedPartitioning = row.getJsonB("partitioning").get - var returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId1)) - assert(returnedPartitioningParsed == expectedPartitioning1) - assert(row.getString("author").contains("Grandpa")) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId2)) - assert(returnedPartitioningParsed == expectedPartitioning2) - assert(row.getString("author").contains("Father")) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId3)) - assert(returnedPartitioningParsed == expectedPartitioning3) - assert(row.getString("author").contains("Son")) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId4)) - assert(returnedPartitioningParsed == expectedPartitioning4) - assert(row.getString("author").contains("Grandson")) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId5)) - assert(returnedPartitioningParsed == expectedPartitioning5) - assert(row.getString("author").contains("Grandma")) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId6)) - assert(returnedPartitioningParsed == expectedPartitioning6) - assert(row.getString("author").contains("Mother")) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId7)) - assert(returnedPartitioningParsed == expectedPartitioning7) - assert(row.getString("author").contains("Daughter")) - } +// +// //TEST Separate flow for Ancestors Partitions +// function(getAncestorsFn) +// .setParam("i_id_partitioning", partId7) +// .execute { queryResult => +// val row = queryResult.next() +// val returnedPartitioning = row.getJsonB("partitioning").get +// val returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId6)) +// assert(returnedPartitioningParsed == expectedPartitioning6) +// assert(row.getString("author").contains("Mother")) +// } +// +// //TEST ALL flows for Ancestors Partitions +// function(getAncestorsFn) +// .setParam("i_id_partitioning", partId8) +// .setParam("i_limit", 10) +// .execute { queryResult => +// var row = queryResult.next() +// var returnedPartitioning = row.getJsonB("partitioning").get +// var returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId1)) +// assert(returnedPartitioningParsed == expectedPartitioning1) +// assert(row.getString("author").contains("Grandpa")) +// row = queryResult.next() +// returnedPartitioning = row.getJsonB("partitioning").get +// returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId2)) +// assert(returnedPartitioningParsed == expectedPartitioning2) +// assert(row.getString("author").contains("Father")) +// row = queryResult.next() +// returnedPartitioning = row.getJsonB("partitioning").get +// returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId3)) +// assert(returnedPartitioningParsed == expectedPartitioning3) +// assert(row.getString("author").contains("Son")) +// row = queryResult.next() +// returnedPartitioning = row.getJsonB("partitioning").get +// returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId4)) +// assert(returnedPartitioningParsed == expectedPartitioning4) +// assert(row.getString("author").contains("Grandson")) +// row = queryResult.next() +// returnedPartitioning = row.getJsonB("partitioning").get +// returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId5)) +// assert(returnedPartitioningParsed == expectedPartitioning5) +// assert(row.getString("author").contains("Grandma")) +// row = queryResult.next() +// returnedPartitioning = row.getJsonB("partitioning").get +// returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId6)) +// assert(returnedPartitioningParsed == expectedPartitioning6) +// assert(row.getString("author").contains("Mother")) +// row = queryResult.next() +// returnedPartitioning = row.getJsonB("partitioning").get +// returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId7)) +// assert(returnedPartitioningParsed == expectedPartitioning7) +// assert(row.getString("author").contains("Daughter")) +// } } - test("Change in Parent") { - - val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") - - table("runs.partitionings").insert( - add("partitioning", partitioning1) - .add("created_by", "GrandPa") - .add("created_at", Time1) - ) - - val partId1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning1, "id_partitioning").get.get - - function(createFlowFn) - .setParam("i_fk_partitioning", partId1) - .setParam("i_by_user", "GrandPa") - .execute { queryResult => - flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get - } - - val partId3 = function(createPartitioningFn) - .setParam("i_partitioning", partitioning3) - .setParam("i_parent_partitioning_id", partId1) - .setParam("i_by_user", "Father") - .execute { queryResult => - assert(queryResult.hasNext) - val row = queryResult.next() - assert(row.getInt("status").contains(12)) - assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) - row.getLong("id_partitioning").get - } - - function(getAncestorsFn) - .setParam("i_id_partitioning", partId3) - .execute { queryResult => - val row = queryResult.next() - val returnedPartitioning = row.getJsonB("partitioning").get - val returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestorid").contains(partId1)) - assert(returnedPartitioningParsed == expectedPartitioning1) - assert(row.getString("author").contains("GrandPa")) - } - } +// test("Change in Parent") { +// +// val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") +// +// table("runs.partitionings").insert( +// add("partitioning", partitioning1) +// .add("created_by", "GrandPa") +// .add("created_at", Time1) +// ) +// +// val partId1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning1, "id_partitioning").get.get +// +// function(createFlowFn) +// .setParam("i_fk_partitioning", partId1) +// .setParam("i_by_user", "GrandPa") +// .execute { queryResult => +// flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get +// } +// +// val partId3 = function(createPartitioningFn) +// .setParam("i_partitioning", partitioning3) +// .setParam("i_parent_partitioning_id", partId1) +// .setParam("i_by_user", "Father") +// .execute { queryResult => +// assert(queryResult.hasNext) +// val row = queryResult.next() +// assert(row.getInt("status").contains(12)) +// assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) +// row.getLong("id_partitioning").get +// } +// +// function(getAncestorsFn) +// .setParam("i_id_partitioning", partId3) +// .execute { queryResult => +// val row = queryResult.next() +// val returnedPartitioning = row.getJsonB("partitioning").get +// val returnedPartitioningParsed = parse(returnedPartitioning.value) +// .getOrElse(fail("Failed to parse returned partitioning")) +// assert(row.getInt("status").contains(10)) +// assert(row.getString("status_text").contains("OK")) +// assert(row.getLong("ancestor_id").contains(partId1)) +// assert(returnedPartitioningParsed == expectedPartitioning1) +// assert(row.getString("author").contains("GrandPa")) +// } +// } test("Child Partitioning not found") { val nonExistentID = 9999L @@ -496,7 +498,7 @@ class GetAncestorsIntegrationTests extends DBTestSuite { val row = queryResult.next() assert(row.getInt("status").contains(41)) assert(row.getString("status_text").contains("Partitioning not found")) - assert(row.getJsonB("ancestorid").isEmpty) + assert(row.getJsonB("ancestor_id").isEmpty) assert(row.getJsonB("partitioning").isEmpty) assert(row.getString("author").isEmpty) assert(!queryResult.hasNext) @@ -522,9 +524,9 @@ class GetAncestorsIntegrationTests extends DBTestSuite { .execute { queryResult => assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(42)) - assert(row.getString("status_text").contains("Ancestor Partitioning not found")) - assert(row.getJsonB("ancestorid").isEmpty) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getJsonB("ancestor_id").isEmpty) assert(row.getJsonB("partitioning").isEmpty) assert(row.getString("author").isEmpty) assert(!queryResult.hasNext) From 8694a4ff2c394274d60c82a5f8234c20ba17827a Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Wed, 12 Feb 2025 17:37:10 +0200 Subject: [PATCH 3/8] Added the getAncestors Database functionality. 1. Made the necessary changes as mentioned by the team. 2. Made the necessary changes to the getAncestors Database functionality. 3. Now working completely as intended. --- .../postgres/runs/V0.3.0.2__get_ancestors.sql | 121 ------------------ 1 file changed, 121 deletions(-) delete mode 100644 database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql diff --git a/database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql deleted file mode 100644 index ba97b2e1..00000000 --- a/database/src/main/postgres/runs/V0.3.0.2__get_ancestors.sql +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE OR REPLACE FUNCTION runs.get_ancestors( - IN i_id_partitioning BIGINT, - IN i_limit INT DEFAULT 5, - IN i_offset BIGINT DEFAULT 0, - OUT status INTEGER, - OUT status_text TEXT, - OUT ancestorid BIGINT, - OUT partitioning JSONB, - OUT author TEXT, - OUT has_more BOOLEAN -) RETURNS SETOF record AS -$$ - ------------------------------------------------------------------------------- --- --- Function: runs.get_ancestors(3) --- Returns Ancestors' partition ID for the given id --- --- Parameters: --- i_id_partitioning - id that we asking the Ancestors for --- i_limit - (optional) maximum number of partitionings to return, default is 5 --- i_offset - (optional) offset to use for pagination, default is 0 --- --- Returns: --- status - Status code --- status_text - Status message --- ancestorid - ID of Ancestor partition --- partitioning - partitioning data of ancestor --- author - author of the Ancestor partitioning --- has_more - Flag indicating if there are more partitionings available - --- Status codes: --- 11 - OK --- 41 - Partitioning not found --- 42 - Ancestor Partitioning not found --- -------------------------------------------------------------------------------- -DECLARE - partitionCreateAt TIMESTAMP; - _has_more BOOLEAN; - -BEGIN - -- Check if the partitioning exists - PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_id_partitioning; - IF NOT FOUND THEN - status := 41; - status_text := 'Partitioning not found'; - RETURN NEXT; - RETURN; - END IF; - - -- Get the creation timestamp of the partitioning - SELECT created_at - FROM runs.partitionings - WHERE id_partitioning = i_id_partitioning - INTO partitionCreateAt; - - -- Check if there are more partitionings than the limit - SELECT count(*) > i_limit - FROM flows.partitioning_to_flow PTF - WHERE PTF.fk_flow IN ( - SELECT fk_flow - FROM flows.partitioning_to_flow - WHERE fk_partitioning = i_id_partitioning - ) - LIMIT i_limit + 1 OFFSET i_offset - INTO _has_more; - - -- Return the ancestors - RETURN QUERY - SELECT - 11 AS status, - 'OK' AS status_text, - P.id_partitioning AS ancestorid, - P.partitioning AS partitioning, - P.created_by AS author, - _has_more AS has_more - FROM - runs.partitionings P - INNER JOIN flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning - INNER JOIN flows.partitioning_to_flow PF2 ON PF2.fk_flow = PF.fk_flow - WHERE - PF2.fk_partitioning = i_id_partitioning - AND - P.created_at < partitionCreateAt - GROUP BY P.id_partitioning - ORDER BY P.id_partitioning, P.created_at DESC - LIMIT i_limit - OFFSET i_offset; - - IF FOUND THEN - status := 11; - status_text := 'OK'; - ELSE - status := 42; - status_text := 'Ancestor Partitioning not found'; - END IF; - RETURN NEXT; - RETURN; - -END; -$$ - LANGUAGE plpgsql VOLATILE SECURITY DEFINER; - -ALTER FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) OWNER TO atum_owner; -GRANT EXECUTE ON FUNCTION runs.get_ancestors(BIGINT, INT, BIGINT) TO atum_user; From d9687180ee9be77c9623206d9d22f33c0a771996 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Wed, 12 Feb 2025 17:38:43 +0200 Subject: [PATCH 4/8] Added the getAncestors Database functionality. 1. Made the necessary changes as mentioned by the team. 2. Made the necessary changes to the getAncestors Database functionality. 3. Now working completely as intended. --- .../V0.3.0.2__get_partitioning_ancestors.sql | 37 +- .../runs/GetAncestorsIntegrationTests.scala | 500 ++++++++---------- 2 files changed, 235 insertions(+), 302 deletions(-) diff --git a/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql index 2377b8dd..d0adb783 100644 --- a/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql +++ b/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql @@ -50,10 +50,10 @@ $$ -- ------------------------------------------------------------------------------- DECLARE - _partitioningCreateAt TIMESTAMP; _has_more BOOLEAN; BEGIN + -- Check if the partitioning exists PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_id_partitioning; IF NOT FOUND THEN @@ -63,12 +63,6 @@ BEGIN RETURN; END IF; - -- Get the creation timestamp of the partitioning - SELECT created_at - FROM runs.partitionings - WHERE id_partitioning = i_id_partitioning - INTO _partitioningCreateAt; - -- Check if there are more partitionings than the limit SELECT count(*) > i_limit FROM flows.partitioning_to_flow PTF @@ -89,37 +83,24 @@ BEGIN P.partitioning AS partitioning, P.created_by AS author, _has_more AS has_more --- FROM --- flows.partitioning_to_flow PF --- INNER JOIN flows.flows F ON F.id_flow = PF.fk_flow --- INNER JOIN runs.partitionings P ON P.id_partitioning = F.fk_primary_partitioning --- WHERE --- PF.fk_partitioning = i_id_partitioning AND --- P.id_partitioning IS DISTINCT FROM i_id_partitioning FROM - runs.partitionings P - INNER JOIN flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning - INNER JOIN flows.partitioning_to_flow PF2 ON PF2.fk_flow = PF.fk_flow + flows.partitioning_to_flow PF + INNER JOIN flows.flows F ON F.id_flow = PF.fk_flow + INNER JOIN runs.partitionings P ON P.id_partitioning = F.fk_primary_partitioning WHERE - PF2.fk_partitioning = i_id_partitioning - AND - PF.created_at <= PF2.created_at - GROUP BY P.id_partitioning , PF.created_at - ORDER BY P.id_partitioning , PF.created_at DESC + PF.fk_partitioning = i_id_partitioning AND + P.id_partitioning IS DISTINCT FROM i_id_partitioning + GROUP BY P.id_partitioning + ORDER BY P.id_partitioning LIMIT i_limit OFFSET i_offset; - + --If no ancestors found send an OK status IF NOT FOUND THEN status := 10; status_text := 'OK'; RETURN NEXT; --- TODO: Remove comment if not needed --- ELSE --- status := 42; --- status_text := 'Ancestor Partitioning not found'; END IF; --- RETURN NEXT; RETURN; END; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala index 10b36c1d..3562660d 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala @@ -25,10 +25,8 @@ import java.time.OffsetDateTime class GetAncestorsIntegrationTests extends DBTestSuite { private val getAncestorsFn = "runs.get_partitioning_ancestors" - private val partitioningsTable = "runs.partitionings" - private val createPartitioningFn = "runs.create_partitioning" + private val createPartitioningFn = "runs.create_partitioning_if_not_exists" - private val createFlowFn = "flows._create_flow" private val addToParentFlowsFn = "flows._add_to_parent_flows" private val partitioning1 = JsonBString( @@ -157,148 +155,178 @@ class GetAncestorsIntegrationTests extends DBTestSuite { |""".stripMargin ) - var flowIdOfPartitioning1: Long = _ - var flowIdOfPartitioning2: Long = _ - var flowIdOfPartitioning3: Long = _ - - test("Returns Ancestors for a given Partition ID") { - - val Time1 = OffsetDateTime.parse("2025-08-03T10:00:00Z") - val Time2 = OffsetDateTime.parse("2025-08-04T10:00:00Z") - val Time3 = OffsetDateTime.parse("2025-08-05T10:00:00Z") - val Time4 = OffsetDateTime.parse("2025-08-06T10:00:00Z") - val Time5 = OffsetDateTime.parse("2025-08-07T10:00:00Z") - val Time6 = OffsetDateTime.parse("2025-08-08T10:00:00Z") - val Time7 = OffsetDateTime.parse("2025-08-09T10:00:00Z") - val Time8 = OffsetDateTime.parse("2025-08-09T11:00:00Z") - - table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Grandpa").add("created_at", Time1)) - table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Father").add("created_at", Time2)) - table(partitioningsTable).insert(add("partitioning", partitioning3).add("created_by", "Son").add("created_at", Time3)) - table(partitioningsTable).insert(add("partitioning", partitioning4).add("created_by", "Grandson").add("created_at", Time4)) - table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "Grandma").add("created_at", Time5)) - table(partitioningsTable).insert(add("partitioning", partitioning6).add("created_by", "Mother").add("created_at", Time6)) - table(partitioningsTable).insert(add("partitioning", partitioning7).add("created_by", "Daughter").add("created_at", Time7)) - table(partitioningsTable).insert(add("partitioning", partitioning8).add("created_by", "Granddaughter").add("created_at", Time8)) - - val partId1: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning1, "id_partitioning").get.get - - val partId2: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning2, "id_partitioning").get.get - - val partId3: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning3, "id_partitioning").get.get - - val partId4: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning4, "id_partitioning").get.get + //First Failure Test: Child Partition not found + test("Child Partitioning not found") { + val nonExistentID = 9999L - val partId5: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + function(getAncestorsFn) + .setParam("i_id_partitioning", nonExistentID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Partitioning not found")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } - val partId6: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning6, "id_partitioning").get.get + //Second Failure Test: Ancestor Partitioning not found + test("Ancestor Partitioning not found") { - val partId7: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning7, "id_partitioning").get.get + val partitioningID1 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning1) + .setParam("i_by_user", "Grandma") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get + } - val partId8: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning8, "id_partitioning").get.get + function(getAncestorsFn) + .setParam("i_id_partitioning", partitioningID1) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getJsonB("ancestor_id").isEmpty) + assert(row.getJsonB("partitioning").isEmpty) + assert(row.getString("author").isEmpty) + assert(!queryResult.hasNext) + } + } - function(createFlowFn) - .setParam("i_fk_partitioning", partId1) - .setParam("i_by_user", "Grandpa") + // Testing for return of the Ancestors for a given Partition ID + // + // 1(Grandma) 2(Grandpa) + // | | + // 3(Mother) 4(Father) 6(Daughter) + // \ | | + // 5(Son) 7(Granddaughter) + // | / + // 8(Grandson) + test("Returns Ancestors for a given Partition ID"){ + val partitioningID1 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning1) + .setParam("i_by_user", "Grandma") .execute { queryResult => - flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } - function(createFlowFn) - .setParam("i_fk_partitioning", partId2) - .setParam("i_by_user", "Father") + val partitioningID2 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning2) + .setParam("i_by_user", "Grandpa") .execute { queryResult => - flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } - function(createFlowFn) - .setParam("i_fk_partitioning", partId6) - .setParam("i_by_user", "Daughter") + val partitioningID3 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning3) + .setParam("i_by_user", "Mother") + .setParam("i_parent_partitioning", partitioning1) .execute { queryResult => - flowIdOfPartitioning3 = queryResult.next().getLong("id_flow").get + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } - function(addToParentFlowsFn) - .setParam("i_fk_parent_partitioning", partId1) - .setParam("i_fk_partitioning", partId3) - .setParam("i_by_user", "Son") + val partitioningID4 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning4) + .setParam("i_by_user", "Father") + .setParam("i_parent_partitioning", partitioning2) .execute { queryResult => - val result1 = queryResult.next() - assert(result1.getInt("status").get == 11) - assert(result1.getString("status_text").get == "Partitioning added to flows") + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } - function(addToParentFlowsFn) - .setParam("i_fk_parent_partitioning", partId2) - .setParam("i_fk_partitioning", partId4) - .setParam("i_by_user", "Grandson") + val partitioningID5 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning5) + .setParam("i_by_user", "Son") + .setParam("i_parent_partitioning", partitioning3) .execute { queryResult => - val result1 = queryResult.next() - assert(result1.getInt("status").get == 11) - assert(result1.getString("status_text").get == "Partitioning added to flows") + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } function(addToParentFlowsFn) - .setParam("i_fk_parent_partitioning", partId6) - .setParam("i_fk_partitioning", partId7) - .setParam("i_by_user", "GrandDaughter") + .setParam("i_fk_parent_partitioning", partitioningID4) + .setParam("i_fk_partitioning", partitioningID5) + .setParam("i_by_user", "Son") .execute { queryResult => val result1 = queryResult.next() assert(result1.getInt("status").get == 11) assert(result1.getString("status_text").get == "Partitioning added to flows") } - function(addToParentFlowsFn) - .setParam("i_fk_parent_partitioning", partId3) - .setParam("i_fk_partitioning", partId5) - .setParam("i_by_user", "GrandMa") + val partitioningID6 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning6) + .setParam("i_by_user", "Daughter") .execute { queryResult => - val result1 = queryResult.next() - assert(result1.getInt("status").get == 11) - assert(result1.getString("status_text").get == "Partitioning added to flows") + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } - function(addToParentFlowsFn) - .setParam("i_fk_parent_partitioning", partId4) - .setParam("i_fk_partitioning", partId5) - .setParam("i_by_user", "GrandMa") + val partitioningID7 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning7) + .setParam("i_by_user", "Granddaughter") + .setParam("i_parent_partitioning", partitioning6) .execute { queryResult => - val result1 = queryResult.next() - assert(result1.getInt("status").get == 11) - assert(result1.getString("status_text").get == "Partitioning added to flows") + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } - function(addToParentFlowsFn) - .setParam("i_fk_parent_partitioning", partId5) - .setParam("i_fk_partitioning", partId8) - .setParam("i_by_user", "Mother") + val partitioningID8 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning8) + .setParam("i_by_user", "Grandson") + .setParam("i_parent_partitioning", partitioning5) .execute { queryResult => - val result1 = queryResult.next() - assert(result1.getInt("status").get == 11) - assert(result1.getString("status_text").get == "Partitioning added to flows") + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Partitioning created")) + row.getLong("id_partitioning").get } function(addToParentFlowsFn) - .setParam("i_fk_parent_partitioning", partId7) - .setParam("i_fk_partitioning", partId8) - .setParam("i_by_user", "Mother") + .setParam("i_fk_parent_partitioning", partitioningID7) + .setParam("i_fk_partitioning", partitioningID8) + .setParam("i_by_user", "Grandson") .execute { queryResult => val result1 = queryResult.next() assert(result1.getInt("status").get == 11) assert(result1.getString("status_text").get == "Partitioning added to flows") } - //TEST 1 Ancestors Partition + //Test 1 Ancestor function(getAncestorsFn) - .setParam("i_id_partitioning", partId3) + .setParam("i_id_partitioning", partitioningID3) .execute { queryResult => val row = queryResult.next() val returnedPartitioning = row.getJsonB("partitioning").get @@ -306,16 +334,15 @@ class GetAncestorsIntegrationTests extends DBTestSuite { .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partId1)) + assert(row.getLong("ancestor_id").contains(partitioningID1)) assert(returnedPartitioningParsed == expectedPartitioning1) - assert(row.getString("author").contains("Grandpa")) - assert(row.getString("author").contains("Grandpa")) + assert(row.getString("author").contains("Grandma")) assert(!queryResult.hasNext) } - //TEST multiple Ancestors Partitions + //Test Multiple Ancestors function(getAncestorsFn) - .setParam("i_id_partitioning", partId5) + .setParam("i_id_partitioning", partitioningID5) .execute { queryResult => var row = queryResult.next() var returnedPartitioning = row.getJsonB("partitioning").get @@ -323,212 +350,137 @@ class GetAncestorsIntegrationTests extends DBTestSuite { .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partId1)) + assert(row.getLong("ancestor_id").contains(partitioningID1)) assert(returnedPartitioningParsed == expectedPartitioning1) - assert(row.getString("author").contains("Grandpa")) + assert(row.getString("author").contains("Grandma")) + assert(queryResult.hasNext) row = queryResult.next() returnedPartitioning = row.getJsonB("partitioning").get returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partId2)) + assert(row.getLong("ancestor_id").contains(partitioningID2)) assert(returnedPartitioningParsed == expectedPartitioning2) - assert(row.getString("author").contains("Father")) + assert(row.getString("author").contains("Grandpa")) + assert(queryResult.hasNext) row = queryResult.next() returnedPartitioning = row.getJsonB("partitioning").get returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partId3)) + assert(row.getLong("ancestor_id").contains(partitioningID3)) assert(returnedPartitioningParsed == expectedPartitioning3) - assert(row.getString("author").contains("Son")) + assert(row.getString("author").contains("Mother")) row = queryResult.next() returnedPartitioning = row.getJsonB("partitioning").get returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partId4)) + assert(row.getLong("ancestor_id").contains(partitioningID4)) assert(returnedPartitioningParsed == expectedPartitioning4) - assert(row.getString("author").contains("Grandson")) + assert(row.getString("author").contains("Father")) assert(!queryResult.hasNext) } -// -// //TEST Separate flow for Ancestors Partitions -// function(getAncestorsFn) -// .setParam("i_id_partitioning", partId7) -// .execute { queryResult => -// val row = queryResult.next() -// val returnedPartitioning = row.getJsonB("partitioning").get -// val returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId6)) -// assert(returnedPartitioningParsed == expectedPartitioning6) -// assert(row.getString("author").contains("Mother")) -// } -// -// //TEST ALL flows for Ancestors Partitions -// function(getAncestorsFn) -// .setParam("i_id_partitioning", partId8) -// .setParam("i_limit", 10) -// .execute { queryResult => -// var row = queryResult.next() -// var returnedPartitioning = row.getJsonB("partitioning").get -// var returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId1)) -// assert(returnedPartitioningParsed == expectedPartitioning1) -// assert(row.getString("author").contains("Grandpa")) -// row = queryResult.next() -// returnedPartitioning = row.getJsonB("partitioning").get -// returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId2)) -// assert(returnedPartitioningParsed == expectedPartitioning2) -// assert(row.getString("author").contains("Father")) -// row = queryResult.next() -// returnedPartitioning = row.getJsonB("partitioning").get -// returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId3)) -// assert(returnedPartitioningParsed == expectedPartitioning3) -// assert(row.getString("author").contains("Son")) -// row = queryResult.next() -// returnedPartitioning = row.getJsonB("partitioning").get -// returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId4)) -// assert(returnedPartitioningParsed == expectedPartitioning4) -// assert(row.getString("author").contains("Grandson")) -// row = queryResult.next() -// returnedPartitioning = row.getJsonB("partitioning").get -// returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId5)) -// assert(returnedPartitioningParsed == expectedPartitioning5) -// assert(row.getString("author").contains("Grandma")) -// row = queryResult.next() -// returnedPartitioning = row.getJsonB("partitioning").get -// returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId6)) -// assert(returnedPartitioningParsed == expectedPartitioning6) -// assert(row.getString("author").contains("Mother")) -// row = queryResult.next() -// returnedPartitioning = row.getJsonB("partitioning").get -// returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId7)) -// assert(returnedPartitioningParsed == expectedPartitioning7) -// assert(row.getString("author").contains("Daughter")) -// } - } - -// test("Change in Parent") { -// -// val Time1 = OffsetDateTime.parse("1992-08-03T10:00:00Z") -// -// table("runs.partitionings").insert( -// add("partitioning", partitioning1) -// .add("created_by", "GrandPa") -// .add("created_at", Time1) -// ) -// -// val partId1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning1, "id_partitioning").get.get -// -// function(createFlowFn) -// .setParam("i_fk_partitioning", partId1) -// .setParam("i_by_user", "GrandPa") -// .execute { queryResult => -// flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get -// } -// -// val partId3 = function(createPartitioningFn) -// .setParam("i_partitioning", partitioning3) -// .setParam("i_parent_partitioning_id", partId1) -// .setParam("i_by_user", "Father") -// .execute { queryResult => -// assert(queryResult.hasNext) -// val row = queryResult.next() -// assert(row.getInt("status").contains(12)) -// assert(row.getString("status_text").contains("Partitioning created with parent partitioning")) -// row.getLong("id_partitioning").get -// } -// -// function(getAncestorsFn) -// .setParam("i_id_partitioning", partId3) -// .execute { queryResult => -// val row = queryResult.next() -// val returnedPartitioning = row.getJsonB("partitioning").get -// val returnedPartitioningParsed = parse(returnedPartitioning.value) -// .getOrElse(fail("Failed to parse returned partitioning")) -// assert(row.getInt("status").contains(10)) -// assert(row.getString("status_text").contains("OK")) -// assert(row.getLong("ancestor_id").contains(partId1)) -// assert(returnedPartitioningParsed == expectedPartitioning1) -// assert(row.getString("author").contains("GrandPa")) -// } -// } - - test("Child Partitioning not found") { - val nonExistentID = 9999L + //TEST Separate flow for Ancestors Partitions function(getAncestorsFn) - .setParam("i_id_partitioning", nonExistentID) + .setParam("i_id_partitioning", partitioningID7) .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(41)) - assert(row.getString("status_text").contains("Partitioning not found")) - assert(row.getJsonB("ancestor_id").isEmpty) - assert(row.getJsonB("partitioning").isEmpty) - assert(row.getString("author").isEmpty) + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partitioningID6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Daughter")) assert(!queryResult.hasNext) } - } - test("Ancestor Partitioning not found") { + //TEST ALL flows for Ancestors Partitions + function(getAncestorsFn) + .setParam("i_id_partitioning", partitioningID8) + .execute { queryResult => + var row = queryResult.next() + var returnedPartitioning = row.getJsonB("partitioning").get + var returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partitioningID1)) + assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getString("author").contains("Grandma")) + assert(queryResult.hasNext) + + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partitioningID2)) + assert(returnedPartitioningParsed == expectedPartitioning2) + assert(row.getString("author").contains("Grandpa")) + assert(queryResult.hasNext) - table(partitioningsTable).insert(add("partitioning", partitioning5).add("created_by", "NO_Ancestor")) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partitioningID3)) + assert(returnedPartitioningParsed == expectedPartitioning3) + assert(row.getString("author").contains("Mother")) + assert(queryResult.hasNext) - val partId5: Long = table(partitioningsTable) - .fieldValue("partitioning", partitioning5, "id_partitioning").get.get + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partitioningID4)) + assert(returnedPartitioningParsed == expectedPartitioning4) + assert(row.getString("author").contains("Father")) + assert(queryResult.hasNext) - function(createFlowFn) - .setParam("i_fk_partitioning", partId5) - .setParam("i_by_user", "Grandpa") - .execute { queryResult => - flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get - } + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partitioningID5)) + assert(returnedPartitioningParsed == expectedPartitioning5) + assert(row.getString("author").contains("Son")) + assert(queryResult.hasNext) - function(getAncestorsFn) - .setParam("i_id_partitioning", partId5) - .execute { queryResult => + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(partitioningID6)) + assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getString("author").contains("Daughter")) assert(queryResult.hasNext) - val row = queryResult.next() + + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getJsonB("ancestor_id").isEmpty) - assert(row.getJsonB("partitioning").isEmpty) - assert(row.getString("author").isEmpty) + assert(row.getLong("ancestor_id").contains(partitioningID7)) + assert(returnedPartitioningParsed == expectedPartitioning7) + assert(row.getString("author").contains("Granddaughter")) + assert(!queryResult.hasNext) } } From a5c556ce070aa87f3007b12892db924d9220c3f1 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Fri, 14 Feb 2025 17:27:35 +0200 Subject: [PATCH 5/8] Added the getAncestors Database functionality. 1. Made the necessary changes as mentioned by the team. 2. Made the necessary changes to the getAncestors Database functionality. 3. Now working completely as intended. 4. Removed unnecessary files --- ...rtitioningAncestorsIntegrationTests.scala} | 162 ++++++------------ 1 file changed, 50 insertions(+), 112 deletions(-) rename database/src/test/scala/za/co/absa/atum/database/runs/{GetAncestorsIntegrationTests.scala => GetPartitioningAncestorsIntegrationTests.scala} (67%) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala similarity index 67% rename from database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala rename to database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala index 3562660d..553b3f26 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetAncestorsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala @@ -20,9 +20,10 @@ import io.circe.parser.parse import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString -import java.time.OffsetDateTime +import scala.collection.mutable +import scala.util.control.Breaks.{break, breakable} -class GetAncestorsIntegrationTests extends DBTestSuite { +class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { private val getAncestorsFn = "runs.get_partitioning_ancestors" private val createPartitioningFn = "runs.create_partitioning_if_not_exists" @@ -324,6 +325,17 @@ class GetAncestorsIntegrationTests extends DBTestSuite { assert(result1.getString("status_text").get == "Partitioning added to flows") } + //Used Linked Hash Map to keep structure and order + val resultsMap = mutable.LinkedHashMap( + "Grandma" -> (partitioningID1, expectedPartitioning1), + "Grandpa" -> (partitioningID2, expectedPartitioning2), + "Mother" -> (partitioningID3, expectedPartitioning3), + "Father" -> (partitioningID4, expectedPartitioning4), + "Son" -> (partitioningID5, expectedPartitioning5), + "Daughter" -> (partitioningID6, expectedPartitioning6), + "Granddaughter" -> (partitioningID7, expectedPartitioning7) + ) + //Test 1 Ancestor function(getAncestorsFn) .setParam("i_id_partitioning", partitioningID3) @@ -334,8 +346,8 @@ class GetAncestorsIntegrationTests extends DBTestSuite { .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID1)) - assert(returnedPartitioningParsed == expectedPartitioning1) + assert(row.getLong("ancestor_id").contains(resultsMap("Grandma")._1)) + assert(returnedPartitioningParsed == resultsMap("Grandma")._2) assert(row.getString("author").contains("Grandma")) assert(!queryResult.hasNext) } @@ -348,40 +360,22 @@ class GetAncestorsIntegrationTests extends DBTestSuite { var returnedPartitioning = row.getJsonB("partitioning").get var returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID1)) - assert(returnedPartitioningParsed == expectedPartitioning1) - assert(row.getString("author").contains("Grandma")) - assert(queryResult.hasNext) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID2)) - assert(returnedPartitioningParsed == expectedPartitioning2) - assert(row.getString("author").contains("Grandpa")) - assert(queryResult.hasNext) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID3)) - assert(returnedPartitioningParsed == expectedPartitioning3) - assert(row.getString("author").contains("Mother")) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID4)) - assert(returnedPartitioningParsed == expectedPartitioning4) - assert(row.getString("author").contains("Father")) + //Used breakable to be able to break the loop + breakable { + for ((k, v) <- resultsMap) { + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(v._1)) + assert(returnedPartitioningParsed == v._2) + assert(row.getString("author").contains(k)) + if (!queryResult.hasNext) break() + assert(queryResult.hasNext) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + } + } assert(!queryResult.hasNext) } @@ -395,8 +389,8 @@ class GetAncestorsIntegrationTests extends DBTestSuite { .getOrElse(fail("Failed to parse returned partitioning")) assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID6)) - assert(returnedPartitioningParsed == expectedPartitioning6) + assert(row.getLong("ancestor_id").contains(resultsMap("Daughter")._1)) + assert(returnedPartitioningParsed == resultsMap("Daughter")._2) assert(row.getString("author").contains("Daughter")) assert(!queryResult.hasNext) } @@ -409,78 +403,22 @@ class GetAncestorsIntegrationTests extends DBTestSuite { var returnedPartitioning = row.getJsonB("partitioning").get var returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID1)) - assert(returnedPartitioningParsed == expectedPartitioning1) - assert(row.getString("author").contains("Grandma")) - assert(queryResult.hasNext) - - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID2)) - assert(returnedPartitioningParsed == expectedPartitioning2) - assert(row.getString("author").contains("Grandpa")) - assert(queryResult.hasNext) - - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID3)) - assert(returnedPartitioningParsed == expectedPartitioning3) - assert(row.getString("author").contains("Mother")) - assert(queryResult.hasNext) - - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID4)) - assert(returnedPartitioningParsed == expectedPartitioning4) - assert(row.getString("author").contains("Father")) - assert(queryResult.hasNext) - - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID5)) - assert(returnedPartitioningParsed == expectedPartitioning5) - assert(row.getString("author").contains("Son")) - assert(queryResult.hasNext) - - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID6)) - assert(returnedPartitioningParsed == expectedPartitioning6) - assert(row.getString("author").contains("Daughter")) - assert(queryResult.hasNext) - - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(partitioningID7)) - assert(returnedPartitioningParsed == expectedPartitioning7) - assert(row.getString("author").contains("Granddaughter")) - + //Used breakable to be able to break the loop + breakable { + for ((k, v) <- resultsMap) { + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(v._1)) + assert(returnedPartitioningParsed == v._2) + assert(row.getString("author").contains(k)) + if (!queryResult.hasNext) break() + assert(queryResult.hasNext) + row = queryResult.next() + returnedPartitioning = row.getJsonB("partitioning").get + returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + } + } assert(!queryResult.hasNext) } } From 8961762b50e9592e147cd4885d83e3372a4740f0 Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Tue, 25 Feb 2025 15:29:04 +0200 Subject: [PATCH 6/8] Changes made: Adjustments to the SQL and added in status code 14. Adjustments to the tests, made it more readable and shorter. --- ...V0.3.0.10__get_partitioning_ancestors.sql} | 17 +-- ...artitioningAncestorsIntegrationTests.scala | 118 +++++++----------- 2 files changed, 51 insertions(+), 84 deletions(-) rename database/src/main/postgres/runs/{V0.3.0.2__get_partitioning_ancestors.sql => V0.3.0.10__get_partitioning_ancestors.sql} (89%) diff --git a/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.10__get_partitioning_ancestors.sql similarity index 89% rename from database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql rename to database/src/main/postgres/runs/V0.3.0.10__get_partitioning_ancestors.sql index d0adb783..b4bbfa2f 100644 --- a/database/src/main/postgres/runs/V0.3.0.2__get_partitioning_ancestors.sql +++ b/database/src/main/postgres/runs/V0.3.0.10__get_partitioning_ancestors.sql @@ -46,6 +46,7 @@ $$ -- Status codes: -- 10 - OK +-- 14 - OK (No Ancestors found, therefore no op) -- 41 - Partitioning not found -- ------------------------------------------------------------------------------- @@ -65,12 +66,13 @@ BEGIN -- Check if there are more partitionings than the limit SELECT count(*) > i_limit - FROM flows.partitioning_to_flow PTF - WHERE PTF.fk_flow IN ( - SELECT fk_flow - FROM flows.partitioning_to_flow - WHERE fk_partitioning = i_id_partitioning - ) + FROM + flows.partitioning_to_flow PF + INNER JOIN flows.flows F ON F.id_flow = PF.fk_flow + INNER JOIN runs.partitionings P ON P.id_partitioning = F.fk_primary_partitioning + WHERE + PF.fk_partitioning = i_id_partitioning AND + P.id_partitioning IS DISTINCT FROM i_id_partitioning LIMIT i_limit + 1 OFFSET i_offset INTO _has_more; @@ -90,14 +92,13 @@ BEGIN WHERE PF.fk_partitioning = i_id_partitioning AND P.id_partitioning IS DISTINCT FROM i_id_partitioning - GROUP BY P.id_partitioning ORDER BY P.id_partitioning LIMIT i_limit OFFSET i_offset; --If no ancestors found send an OK status IF NOT FOUND THEN - status := 10; + status := 14; status_text := 'OK'; RETURN NEXT; END IF; diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala index 553b3f26..cee9b946 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala @@ -156,52 +156,6 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { |""".stripMargin ) - //First Failure Test: Child Partition not found - test("Child Partitioning not found") { - val nonExistentID = 9999L - - function(getAncestorsFn) - .setParam("i_id_partitioning", nonExistentID) - .execute { queryResult => - assert(queryResult.hasNext) - val row = queryResult.next() - assert(row.getInt("status").contains(41)) - assert(row.getString("status_text").contains("Partitioning not found")) - assert(row.getJsonB("ancestor_id").isEmpty) - assert(row.getJsonB("partitioning").isEmpty) - assert(row.getString("author").isEmpty) - assert(!queryResult.hasNext) - } - } - - //Second Failure Test: Ancestor Partitioning not found - test("Ancestor Partitioning not found") { - - val partitioningID1 = function(createPartitioningFn) - .setParam("i_partitioning", partitioning1) - .setParam("i_by_user", "Grandma") - .execute { queryResult => - assert(queryResult.hasNext) - val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) - row.getLong("id_partitioning").get - } - - function(getAncestorsFn) - .setParam("i_id_partitioning", partitioningID1) - .execute { queryResult => - assert(queryResult.hasNext) - val row = queryResult.next() - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getJsonB("ancestor_id").isEmpty) - assert(row.getJsonB("partitioning").isEmpty) - assert(row.getString("author").isEmpty) - assert(!queryResult.hasNext) - } - } - // Testing for return of the Ancestors for a given Partition ID // // 1(Grandma) 2(Grandpa) @@ -212,14 +166,13 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { // | / // 8(Grandson) test("Returns Ancestors for a given Partition ID"){ + + //Data Preparation Step Start ------------------------------------------------------------------------------------ val partitioningID1 = function(createPartitioningFn) .setParam("i_partitioning", partitioning1) .setParam("i_by_user", "Grandma") .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -227,10 +180,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_partitioning", partitioning2) .setParam("i_by_user", "Grandpa") .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -239,10 +189,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_by_user", "Mother") .setParam("i_parent_partitioning", partitioning1) .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -251,10 +198,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_by_user", "Father") .setParam("i_parent_partitioning", partitioning2) .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -263,10 +207,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_by_user", "Son") .setParam("i_parent_partitioning", partitioning3) .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -275,19 +216,14 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_fk_partitioning", partitioningID5) .setParam("i_by_user", "Son") .execute { queryResult => - val result1 = queryResult.next() - assert(result1.getInt("status").get == 11) - assert(result1.getString("status_text").get == "Partitioning added to flows") + queryResult.next() } val partitioningID6 = function(createPartitioningFn) .setParam("i_partitioning", partitioning6) .setParam("i_by_user", "Daughter") .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -296,10 +232,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_by_user", "Granddaughter") .setParam("i_parent_partitioning", partitioning6) .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -308,10 +241,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_by_user", "Grandson") .setParam("i_parent_partitioning", partitioning5) .execute { queryResult => - assert(queryResult.hasNext) val row = queryResult.next() - assert(row.getInt("status").contains(11)) - assert(row.getString("status_text").contains("Partitioning created")) row.getLong("id_partitioning").get } @@ -320,9 +250,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { .setParam("i_fk_partitioning", partitioningID8) .setParam("i_by_user", "Grandson") .execute { queryResult => - val result1 = queryResult.next() - assert(result1.getInt("status").get == 11) - assert(result1.getString("status_text").get == "Partitioning added to flows") + queryResult.next() } //Used Linked Hash Map to keep structure and order @@ -335,6 +263,7 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { "Daughter" -> (partitioningID6, expectedPartitioning6), "Granddaughter" -> (partitioningID7, expectedPartitioning7) ) + //Data Preparation Step End -------------------------------------------------------------------------------------- //Test 1 Ancestor function(getAncestorsFn) @@ -422,4 +351,41 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { assert(!queryResult.hasNext) } } + + //First Failure Test: Child Partition not found + test("Child Partitioning not found") { + val nonExistentID = 9999L + + function(getAncestorsFn) + .setParam("i_id_partitioning", nonExistentID) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(41)) + assert(row.getString("status_text").contains("Partitioning not found")) + assert(!queryResult.hasNext) + } + } + + //Second Failure Test: Ancestor Partitioning not found + test("Ancestor Partitioning not found") { + + val partitioningID1 = function(createPartitioningFn) + .setParam("i_partitioning", partitioning1) + .setParam("i_by_user", "Grandma") + .execute { queryResult => + val row = queryResult.next() + row.getLong("id_partitioning").get + } + + function(getAncestorsFn) + .setParam("i_id_partitioning", partitioningID1) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(14)) + assert(row.getString("status_text").contains("OK")) + assert(!queryResult.hasNext) + } + } } From f83460a3a133a37090e9de4d462f36f0876e0a5f Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Fri, 28 Feb 2025 14:39:11 +0200 Subject: [PATCH 7/8] Changes made: Made amendments to test code mentioned by PR. - Removed Breakable tests - Added a set case and removed the partial test case. --- ...artitioningAncestorsIntegrationTests.scala | 90 ++++++++++--------- 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala index cee9b946..03123e75 100644 --- a/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/runs/GetPartitioningAncestorsIntegrationTests.scala @@ -254,7 +254,15 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { } //Used Linked Hash Map to keep structure and order - val resultsMap = mutable.LinkedHashMap( + val resultsMapFor5 = mutable.LinkedHashMap( + "Grandma" -> (partitioningID1, expectedPartitioning1), + "Grandpa" -> (partitioningID2, expectedPartitioning2), + "Mother" -> (partitioningID3, expectedPartitioning3), + "Father" -> (partitioningID4, expectedPartitioning4), + ) + + //Used Linked Hash Map to keep structure and order + val resultsMapFor8 = mutable.LinkedHashMap( "Grandma" -> (partitioningID1, expectedPartitioning1), "Grandpa" -> (partitioningID2, expectedPartitioning2), "Mother" -> (partitioningID3, expectedPartitioning3), @@ -273,10 +281,11 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { val returnedPartitioning = row.getJsonB("partitioning").get val returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(resultsMap("Grandma")._1)) - assert(returnedPartitioningParsed == resultsMap("Grandma")._2) + assert(row.getLong("ancestor_id").contains(partitioningID1)) + assert(returnedPartitioningParsed == expectedPartitioning1) assert(row.getString("author").contains("Grandma")) assert(!queryResult.hasNext) } @@ -285,25 +294,22 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { function(getAncestorsFn) .setParam("i_id_partitioning", partitioningID5) .execute { queryResult => - var row = queryResult.next() - var returnedPartitioning = row.getJsonB("partitioning").get - var returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - //Used breakable to be able to break the loop - breakable { - for ((k, v) <- resultsMap) { - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(v._1)) - assert(returnedPartitioningParsed == v._2) - assert(row.getString("author").contains(k)) - if (!queryResult.hasNext) break() - assert(queryResult.hasNext) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - } + for ((k, v) <- resultsMapFor5) { + assert(queryResult.hasNext) + + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + val expectedPartitioningId = v._1 + val expectedPartitioning = v._2 + val expectedAuthor = k + + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(expectedPartitioningId)) + assert(returnedPartitioningParsed == expectedPartitioning) + assert(row.getString("author").contains(expectedAuthor)) } assert(!queryResult.hasNext) } @@ -316,10 +322,11 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { val returnedPartitioning = row.getJsonB("partitioning").get val returnedPartitioningParsed = parse(returnedPartitioning.value) .getOrElse(fail("Failed to parse returned partitioning")) + assert(row.getInt("status").contains(10)) assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(resultsMap("Daughter")._1)) - assert(returnedPartitioningParsed == resultsMap("Daughter")._2) + assert(row.getLong("ancestor_id").contains(partitioningID6)) + assert(returnedPartitioningParsed == expectedPartitioning6) assert(row.getString("author").contains("Daughter")) assert(!queryResult.hasNext) } @@ -328,25 +335,22 @@ class GetPartitioningAncestorsIntegrationTests extends DBTestSuite { function(getAncestorsFn) .setParam("i_id_partitioning", partitioningID8) .execute { queryResult => - var row = queryResult.next() - var returnedPartitioning = row.getJsonB("partitioning").get - var returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - //Used breakable to be able to break the loop - breakable { - for ((k, v) <- resultsMap) { - assert(row.getInt("status").contains(10)) - assert(row.getString("status_text").contains("OK")) - assert(row.getLong("ancestor_id").contains(v._1)) - assert(returnedPartitioningParsed == v._2) - assert(row.getString("author").contains(k)) - if (!queryResult.hasNext) break() - assert(queryResult.hasNext) - row = queryResult.next() - returnedPartitioning = row.getJsonB("partitioning").get - returnedPartitioningParsed = parse(returnedPartitioning.value) - .getOrElse(fail("Failed to parse returned partitioning")) - } + for ((k, v) <- resultsMapFor8) { + assert(queryResult.hasNext) + + val row = queryResult.next() + val returnedPartitioning = row.getJsonB("partitioning").get + val returnedPartitioningParsed = parse(returnedPartitioning.value) + .getOrElse(fail("Failed to parse returned partitioning")) + val expectedPartitioningId = v._1 + val expectedPartitioning = v._2 + val expectedAuthor = k + + assert(row.getInt("status").contains(10)) + assert(row.getString("status_text").contains("OK")) + assert(row.getLong("ancestor_id").contains(expectedPartitioningId)) + assert(returnedPartitioningParsed == expectedPartitioning) + assert(row.getString("author").contains(expectedAuthor)) } assert(!queryResult.hasNext) } From 4dcaa86d7f49f0fcf63b3f11a8e90c2544f643fd Mon Sep 17 00:00:00 2001 From: ABLL526 Date: Fri, 7 Mar 2025 16:28:14 +0200 Subject: [PATCH 8/8] Changes Made: - Made some changes to the naming convention on the files. --- ....11__alter_checkpoints.ddl => V0.3.0.2__alter_checkpoints.ddl} | 0 ...ing_ancestors.sql => V0.3.0.3__get_partitioning_ancestors.sql} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename database/src/main/postgres/runs/{V0.3.0.11__alter_checkpoints.ddl => V0.3.0.2__alter_checkpoints.ddl} (100%) rename database/src/main/postgres/runs/{V0.3.0.10__get_partitioning_ancestors.sql => V0.3.0.3__get_partitioning_ancestors.sql} (100%) diff --git a/database/src/main/postgres/runs/V0.3.0.11__alter_checkpoints.ddl b/database/src/main/postgres/runs/V0.3.0.2__alter_checkpoints.ddl similarity index 100% rename from database/src/main/postgres/runs/V0.3.0.11__alter_checkpoints.ddl rename to database/src/main/postgres/runs/V0.3.0.2__alter_checkpoints.ddl diff --git a/database/src/main/postgres/runs/V0.3.0.10__get_partitioning_ancestors.sql b/database/src/main/postgres/runs/V0.3.0.3__get_partitioning_ancestors.sql similarity index 100% rename from database/src/main/postgres/runs/V0.3.0.10__get_partitioning_ancestors.sql rename to database/src/main/postgres/runs/V0.3.0.3__get_partitioning_ancestors.sql