Skip to content

feat: ref annos now soft delete/update #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 91 additions & 2 deletions dynamicannotationdb/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ def create_table(
with_crud_columns=with_crud_columns,
)

if hasattr(AnnotationModel, "target_id") and reference_table:
reference_table_name = self.db.get_table_sql_metadata(reference_table)
logging.info(
f"{table_name} is targeting reference table: {reference_table_name}"
)

self.create_reference_update_trigger(
table_name, reference_table, AnnotationModel
)

self.db.base.metadata.tables[AnnotationModel.__name__].create(
bind=self.db.engine
)
Expand Down Expand Up @@ -141,6 +151,76 @@ def create_table(
)
return table_name

def create_reference_update_trigger(self, table_name, reference_table, model):
func_name = f"{table_name}_update_reference_id"

column_names = [
col.name
for col in model.__table__.columns
if col.name not in ["id", "target_id"]
]
column_names_str = ", ".join(column_names)

func = DDL(
f"""
CREATE OR REPLACE FUNCTION {func_name}()
RETURNS TRIGGER
AS $func$
DECLARE
new_id INTEGER;
BEGIN
IF EXISTS
(SELECT 1
FROM information_schema.columns
WHERE table_name='{reference_table}'
AND column_name='superceded_id') THEN
IF NEW.superceded_id IS NOT NULL THEN
-- Copy the current row from {table_name} without the id column
INSERT INTO {table_name} ({column_names_str}, target_id)
SELECT {column_names_str}, {table_name}.target_id
FROM {table_name}
WHERE {table_name}.target_id = OLD.id
RETURNING id INTO new_id;

-- Update the new row's target_id to new.superceded_id
UPDATE {table_name}
SET target_id = NEW.superceded_id
WHERE id = new_id;

-- Update the original row's superceded_id and valid column if it exists
UPDATE {table_name}
SET superceded_id = new_id,
valid = (CASE WHEN EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='{table_name}' AND column_name='valid') THEN FALSE ELSE valid END),
deleted = (CASE WHEN EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='{table_name}' AND column_name='deleted') THEN timezone('utc', now()) ELSE deleted END)
WHERE target_id = OLD.id;
END IF;
RETURN NEW;
ELSE
RETURN NULL;
END IF;
END;
$func$ LANGUAGE plpgsql;
""")

trigger = DDL(
f"""
CREATE TRIGGER update_{table_name}_target_id AFTER UPDATE OF superceded_id ON {reference_table}
FOR EACH ROW EXECUTE PROCEDURE {func_name}();
""")

event.listen(
model.__table__,
"after_create",
func.execute_if(dialect="postgresql"),
)

event.listen(
model.__table__,
"after_create",
trigger.execute_if(dialect="postgresql"),
)
return True

def update_table_metadata(
self,
table_name: str,
Expand Down Expand Up @@ -273,7 +353,6 @@ def insert_annotations(self, table_name: str, annotations: List[dict]):

formatted_anno_data = []
for annotation in annotations:

annotation_data, __ = self.schema.split_flattened_schema_data(
schema_type, annotation
)
Expand Down Expand Up @@ -353,7 +432,7 @@ def update_annotation(self, table_name: str, annotation: dict) -> str:
table_name : str
name of targeted table to update annotations
annotation : dict
new data for that annotation, allows for partial updates but
new data for that annotation, allows for partial updates but
requires an 'id' field to target the row

Returns
Expand Down Expand Up @@ -381,6 +460,16 @@ def update_annotation(self, table_name: str, annotation: dict) -> str:
raise f"No result found for {anno_id}. Error: {e}" from e

if old_anno.superceded_id:
# find the latest annotation by finding the annotation with no superceded_id
while old_anno.superceded_id:
old_anno = (
self.db.cached_session.query(AnnotationModel)
.filter(AnnotationModel.id == old_anno.superceded_id)
.one()
)
logging.debug(f"Found superceded annotation: {old_anno.id} {old_anno.superceded_id}")
logging.debug(f"Found latest annotation: {old_anno.id}")

raise UpdateAnnotationError(anno_id, old_anno.superceded_id)

# Merge old data with new changes
Expand Down
30 changes: 22 additions & 8 deletions tests/test_annotation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import pytest

from emannotationschemas import type_mapping
from emannotationschemas.schemas.base import ReferenceAnnotation

from dynamicannotationdb.errors import UpdateAnnotationError


def test_create_table(dadb_interface, annotation_metadata):
table_name = annotation_metadata["table_name"]
Expand Down Expand Up @@ -257,12 +258,19 @@ def test_update_reference_annotation(dadb_interface, annotation_metadata):
"id": 1,
"bouton_type": "basmati",
}

with pytest.raises(UpdateAnnotationError) as e:
# UpdateAnnotationError('Annotation with ID 1 has already been superseded by annotation ID 2, update annotation ID 2 instead')
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

# lets try again with the correct id
test_data = {
"id": 2,
"bouton_type": "basmati",
}
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

assert update_map == {1: 2}
assert update_map == {2: 3}
# return values from newly updated row
test_data = dadb_interface.annotation.get_annotations(table_name, [2])
test_data = dadb_interface.annotation.get_annotations(table_name, [3])
assert test_data[0]["bouton_type"] == "basmati"


Expand All @@ -273,19 +281,25 @@ def test_nested_update_reference_annotation(dadb_interface, annotation_metadata)
"tag": "here is a updated tag",
"id": 1,
}
with pytest.raises(UpdateAnnotationError) as e:
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

test_data = {
"tag": "here is a updated tag",
"id": 3,
}
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

assert update_map == {1: 2}
assert update_map == {3: 4}
# return values from newly updated row
test_data = dadb_interface.annotation.get_annotations(table_name, [2])
test_data = dadb_interface.annotation.get_annotations(table_name, [4])
assert test_data[0]["tag"] == "here is a updated tag"


def test_delete_reference_annotation(dadb_interface, annotation_metadata):
table_name = "presynaptic_bouton_types"

ids_to_delete = [2]
ids_to_delete = [1,2]
is_deleted = dadb_interface.annotation.delete_annotation(table_name, ids_to_delete)

assert is_deleted == ids_to_delete
Expand Down
Loading