Skip to content

Commit e8e4577

Browse files
authored
add a management command backfill_certificate_page_revision (#3115)
1 parent 701dd09 commit e8e4577

File tree

1 file changed

+153
-0
lines changed

1 file changed

+153
-0
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import csv
2+
import json
3+
from pathlib import Path
4+
5+
from django.core.management.base import BaseCommand
6+
from wagtail.blocks import StreamValue
7+
8+
from cms.models import CertificatePage, SignatoryPage
9+
10+
11+
class Command(BaseCommand):
12+
"""Django management command to Wagtail CertificatePage revisions using a CSV file"""
13+
14+
help = "Create backfilled revisions for CertificatePage objects using CSV input"
15+
16+
def add_arguments(self, parser):
17+
"""Parses command line arguments."""
18+
parser.add_argument(
19+
"--csv-file",
20+
type=str,
21+
help="Path to the CSV file containing signatory and certificate page information",
22+
required=True,
23+
)
24+
25+
def handle(self, *args, **options): # noqa: ARG002
26+
"""Handles the command execution."""
27+
csv_file_path = options["csv_file"]
28+
29+
try:
30+
with Path(csv_file_path).open("r", newline="", encoding="utf-8") as csvfile:
31+
reader = csv.DictReader(csvfile)
32+
for row_num, row in enumerate(reader, start=2): # Start at 2 for header
33+
self.process_certificate_page_revision(row, row_num)
34+
35+
except FileNotFoundError:
36+
self.stdout.write(self.style.ERROR(f"CSV file not found: {csv_file_path}"))
37+
38+
def revision_has_same_signatories(self, certificate_revision, signatory_blocks):
39+
page_obj = certificate_revision.as_object()
40+
rev_signatory_ids = [child.value.id for child in page_obj.signatories]
41+
new_signatory_ids = [sp.id for _, sp in signatory_blocks]
42+
return rev_signatory_ids == new_signatory_ids
43+
44+
def process_certificate_page_revision(self, row, row_num):
45+
certificate_page_id = row.get("certificate_page_id", "").strip()
46+
signatories_json = row.get("signatory_names", "").strip()
47+
48+
if not certificate_page_id:
49+
self.stderr.write(
50+
self.style.ERROR(f"Row {row_num}: Missing certificate_page_id")
51+
)
52+
return
53+
54+
if not signatories_json:
55+
self.stderr.write(
56+
self.style.ERROR(f"Row {row_num}: Missing signatory_names JSON array")
57+
)
58+
return
59+
60+
try:
61+
# Example: a list of pairs, e.g. [["Name1", "Name2"], ["Name3", "Name4"]]
62+
signatory_pairs = json.loads(signatories_json)
63+
except Exception: # noqa: BLE001
64+
self.stderr.write(
65+
self.style.ERROR(
66+
f"Row {row_num}: signatory_names is not valid JSON: {signatories_json}"
67+
)
68+
)
69+
return
70+
71+
# Load the CertificatePage
72+
certificate_page = CertificatePage.objects.get(id=certificate_page_id).specific
73+
74+
# Copy the original live revision to restore later
75+
original_live_revision = certificate_page.get_latest_revision()
76+
77+
backfill_created = False
78+
for index, signatory_names in enumerate(signatory_pairs, start=1):
79+
if not isinstance(signatory_names, list) or not signatory_names:
80+
self.stderr.write(
81+
self.style.ERROR(
82+
f"Row {row_num}: Pair #{index} must be a non-empty list of names: {signatory_names}"
83+
)
84+
)
85+
continue
86+
87+
signatory_pages = list(
88+
SignatoryPage.objects.filter(name__in=signatory_names)
89+
)
90+
found_names = {s.name for s in signatory_pages}
91+
missing = [n for n in signatory_names if n not in found_names]
92+
93+
if missing:
94+
self.stderr.write(
95+
self.style.WARNING(
96+
f"Row {row_num} Pair {index}: Missing SignatoryPage(s): {missing}"
97+
)
98+
)
99+
continue
100+
101+
signatory_blocks = [
102+
("signatory", signatory_page) for signatory_page in signatory_pages
103+
]
104+
105+
backfill_signatories = StreamValue(
106+
certificate_page.signatories.stream_block,
107+
signatory_blocks,
108+
is_lazy=False,
109+
)
110+
111+
if any(
112+
self.revision_has_same_signatories(revision, signatory_blocks)
113+
for revision in certificate_page.revisions.all()
114+
):
115+
self.stdout.write(
116+
self.style.WARNING(
117+
f"Row {row_num} Pair {index}: Identical revision already exists"
118+
)
119+
)
120+
continue
121+
122+
# Apply new signatories
123+
certificate_page.signatories = backfill_signatories
124+
125+
# create the backfilled historical revision
126+
backfill_revision = certificate_page.save_revision(
127+
changed=True,
128+
log_action="wagtail.edit",
129+
)
130+
backfill_revision.publish()
131+
132+
backfill_created = True
133+
self.stdout.write(
134+
self.style.SUCCESS(
135+
f"Row {row_num} Pair {index}: Created revision {backfill_revision.id} for CertificatePage {certificate_page_id}"
136+
)
137+
)
138+
139+
if backfill_created:
140+
# Restore original live revision
141+
restored_page = original_live_revision.as_object()
142+
restored_page.pk = certificate_page.pk
143+
restored_revision = restored_page.save_revision(
144+
changed=False,
145+
log_action="wagtail.revert",
146+
)
147+
restored_revision.publish()
148+
149+
self.stdout.write(
150+
self.style.SUCCESS(
151+
f"Row {row_num}: Restored original live revision {restored_revision.id} for CertificatePage {certificate_page_id}"
152+
)
153+
)

0 commit comments

Comments
 (0)