Skip to content

Commit fb8f572

Browse files
committed
feat: adds rudimentary script to import seantis risk excel.
1 parent 4af0879 commit fb8f572

File tree

3 files changed

+215
-0
lines changed

3 files changed

+215
-0
lines changed

setup.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ fanstatic.libraries =
6868
console_scripts =
6969
add_user = riskmatrix.scripts.add_user:main
7070
upgrade = riskmatrix.scripts.upgrade:main
71+
import-seantis-excel = riskmatrix.scripts.seantis_import_risk_excel:main
7172

7273
[flake8]
7374
extend-select = B901,B903,B904,B908,TC2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
"""
2+
Import risk excel 🕸️ into RiskMatrix ✨
3+
4+
This script is specific to our sitation at seantis. The script is included
5+
anyway, you might adjust it to import the excel at your organization too.
6+
"""
7+
import argparse
8+
import sys
9+
from datetime import datetime
10+
from typing import TYPE_CHECKING
11+
from typing import Iterator
12+
13+
try:
14+
from openpyxl import load_workbook
15+
except ImportError:
16+
print("Excel import requires openpyxl library. Install with:\n")
17+
print("$ pip install openpyxl")
18+
print()
19+
sys.exit(1)
20+
21+
import sqlalchemy
22+
from pyramid.paster import bootstrap
23+
from pyramid.paster import get_appsettings
24+
from sqlalchemy import select
25+
26+
from riskmatrix.models import Organization
27+
from riskmatrix.models import Risk
28+
from riskmatrix.models import RiskAssessment
29+
from riskmatrix.models import RiskCatalog
30+
from riskmatrix.models.asset import Asset
31+
from riskmatrix.orm import Base
32+
from riskmatrix.orm import get_engine
33+
from riskmatrix.scripts.util import select_existing_organization
34+
35+
if TYPE_CHECKING:
36+
37+
from typing import TypedDict
38+
39+
from sqlalchemy.orm import Session
40+
41+
class RiskDetails(TypedDict):
42+
""" A risk extracted from the excel. """
43+
name: str
44+
category: str
45+
asset_name: str
46+
desc: str
47+
likelihood: int
48+
impact: int
49+
50+
51+
def parse_args(argv: list[str]) -> argparse.Namespace:
52+
parser = argparse.ArgumentParser()
53+
parser.add_argument(
54+
'config_uri',
55+
help='Configuration file, e.g., development.ini',
56+
)
57+
parser.add_argument(
58+
'catalog',
59+
help='Risk catalog excel file, e.g., catalog.xlsx',
60+
)
61+
return parser.parse_args(argv[1:])
62+
63+
64+
def get_or_create_asset(
65+
asset_name: str,
66+
organization: Organization,
67+
session: 'Session'
68+
) -> Asset:
69+
70+
q = select(Asset).where(
71+
Asset.organization_id == organization.id,
72+
Asset.name == asset_name
73+
)
74+
75+
asset = session.scalars(q).one_or_none()
76+
77+
if asset:
78+
return asset
79+
80+
asset = Asset(asset_name, organization)
81+
asset.organization_id = organization.id
82+
session.add(asset)
83+
return asset
84+
85+
86+
def populate_catalog(
87+
catalog: RiskCatalog,
88+
risks: 'Iterator[RiskDetails]',
89+
session: 'Session'
90+
) -> None:
91+
92+
for risk_details in risks:
93+
asset = get_or_create_asset(
94+
risk_details['asset_name'], catalog.organization, session
95+
)
96+
97+
risk = Risk(
98+
name=risk_details['name'],
99+
catalog=catalog,
100+
description=risk_details['desc'],
101+
category=risk_details['category']
102+
)
103+
104+
assessment = RiskAssessment(risk=risk, asset=asset)
105+
assessment.likelihood = risk_details['likelihood']
106+
assessment.impact = risk_details['impact']
107+
session.add(assessment)
108+
109+
110+
def risks_from_excel(
111+
excel_file: str,
112+
sheet_name: str = 'Risikokatalog'
113+
) -> 'Iterator[RiskDetails]':
114+
"""
115+
Load risks from excel.
116+
"""
117+
workbook = load_workbook(excel_file, read_only=True)
118+
119+
sheet = workbook[sheet_name]
120+
121+
# Rows are vertically grouped into sections by a category. A section begins
122+
# with a row that contains the category name but is otherwise empty.
123+
current_category = None
124+
125+
# Header row sometimes spans over two rows (combined), sometimes only one.
126+
# Anyway, actual riks rows will start after row #2.
127+
start_after_row = 2
128+
129+
iterator = sheet.iter_rows( # type: ignore[union-attr,misc]
130+
values_only=True,
131+
min_row=start_after_row
132+
)
133+
134+
for row in iterator:
135+
nr = row[0]
136+
name = row[1]
137+
138+
is_empty_row = not (nr or name)
139+
is_category_row = not nr and name
140+
141+
if is_empty_row:
142+
continue
143+
elif is_category_row:
144+
current_category = name
145+
continue
146+
147+
yield {
148+
'name': str(name),
149+
'category': str(current_category),
150+
'asset_name': str(row[2]),
151+
'desc': str(row[3]),
152+
'likelihood': int(str(row[7])),
153+
'impact': int(str(row[8]))
154+
}
155+
156+
# readonly mode forces us to manually close the workbook, see also:
157+
# https://openpyxl.readthedocs.io/en/stable/optimized.html#read-only-mode
158+
workbook.close()
159+
160+
161+
def main(argv: list[str] = sys.argv) -> None:
162+
args = parse_args(argv)
163+
164+
with bootstrap(args.config_uri) as env:
165+
settings = get_appsettings(args.config_uri)
166+
167+
engine = get_engine(settings)
168+
Base.metadata.create_all(engine)
169+
170+
with env['request'].tm:
171+
dbsession = env['request'].dbsession
172+
173+
print('Organization to attach risk catalog to')
174+
175+
org = select_existing_organization(dbsession)
176+
177+
if not org:
178+
return
179+
180+
today = datetime.today().strftime('%Y-%m-%d')
181+
182+
catalog = RiskCatalog(
183+
'seantis risk register',
184+
organization=org,
185+
description=f'Imported from risk excel on {today}.'
186+
)
187+
188+
catalog.organization_id = org.id
189+
190+
try:
191+
populate_catalog(
192+
catalog,
193+
risks_from_excel(args.catalog),
194+
dbsession
195+
)
196+
except sqlalchemy.exc.IntegrityError:
197+
# TODO: Risks and assets (and therefore also assessments) are
198+
# unique per organization, not catalog. Adding a risk from the
199+
# excel that is already present in this organization will fail.
200+
print(
201+
'Organization already contains some risks from the Excel. '
202+
'Abort!'
203+
)
204+
dbsession.rollback()
205+
else:
206+
print(
207+
f'Successfully populated risk catalog "{catalog.name}" '
208+
'from risk register excel.'
209+
)
210+
211+
212+
if __name__ == '__main__':
213+
main(sys.argv)

test_requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,6 @@ types-setuptools==69.0.0.0
3232
types-translationstring==1.4.0.1
3333
types-WebOb==1.8.0.5
3434
types-WTForms==3.1.0.2
35+
types-openpyxl==3.1.0.20240408
3536
virtualenv==20.24.4
3637
WebTest==3.0.0

0 commit comments

Comments
 (0)