Skip to content

Commit d5cbc73

Browse files
refactor: add custom exception class
1 parent 17b05e6 commit d5cbc73

File tree

2 files changed

+113
-52
lines changed

2 files changed

+113
-52
lines changed

oc4ids_datastore_pipeline/pipeline.py

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,17 @@
2424
logger = logging.getLogger(__name__)
2525

2626

27+
class ProcessDatasetError(Exception):
28+
def __init__(self, message: str):
29+
super().__init__(message)
30+
31+
32+
class ValidationError(ProcessDatasetError):
33+
def __init__(self, errors_count: int, errors: list[str]):
34+
message = f"Dataset has {errors_count} validation errors: {str(errors)}"
35+
super().__init__(message)
36+
37+
2738
def download_json(url: str) -> Any:
2839
logger.info(f"Downloading json from {url}")
2940
try:
@@ -33,19 +44,23 @@ def download_json(url: str) -> Any:
3344
logger.info(f"Downloaded {url} ({response_size} bytes)")
3445
return r.json()
3546
except Exception as e:
36-
raise Exception("Download failed", e)
47+
raise ProcessDatasetError(f"Download failed: {str(e)}")
3748

3849

3950
def validate_json(dataset_id: str, json_data: dict[str, Any]) -> None:
4051
logger.info(f"Validating dataset {dataset_id}")
4152
try:
4253
validation_result = oc4ids_json_output(json_data=json_data)
4354
validation_errors_count = validation_result["validation_errors_count"]
55+
validation_errors = validation_result["validation_errors"]
4456
if validation_errors_count > 0:
45-
raise Exception(f"Dataset has {validation_errors_count} validation errors")
57+
raise ValidationError(
58+
errors_count=validation_errors_count,
59+
errors=validation_errors,
60+
)
4661
logger.info(f"Dataset {dataset_id} is valid")
4762
except Exception as e:
48-
raise Exception("Validation failed", e)
63+
raise ProcessDatasetError(f"Validation failed: {str(e)}")
4964

5065

5166
def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
@@ -57,7 +72,7 @@ def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
5772
logger.info(f"Finished writing to {file_name}")
5873
return file_name
5974
except Exception as e:
60-
raise Exception("Error while writing to JSON file", e)
75+
raise ProcessDatasetError(f"Error writing dataset to file: {e}")
6176

6277

6378
def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[str]]:
@@ -76,7 +91,7 @@ def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[s
7691
logger.info(f"Transformed to XLSX at {xlsx_path}")
7792
return csv_path, xlsx_path
7893
except Exception as e:
79-
logger.warning(f"Failed to transform JSON to CSV and XLSX with error {e}")
94+
logger.warning(f"Failed to transform JSON to CSV and XLSX: {e}")
8095
return None, None
8196

8297

@@ -89,46 +104,47 @@ def save_dataset_metadata(
89104
xlsx_url: Optional[str],
90105
) -> None:
91106
logger.info(f"Saving metadata for dataset {dataset_id}")
92-
publisher_name = json_data.get("publisher", {}).get("name", "")
93-
license_url = json_data.get("license", None)
94-
license_name = get_license_name_from_url(license_url) if license_url else None
95-
dataset = Dataset(
96-
dataset_id=dataset_id,
97-
source_url=source_url,
98-
publisher_name=publisher_name,
99-
license_url=license_url,
100-
license_name=license_name,
101-
json_url=json_url,
102-
csv_url=csv_url,
103-
xlsx_url=xlsx_url,
104-
updated_at=datetime.datetime.now(datetime.UTC),
105-
)
106-
save_dataset(dataset)
107-
108-
109-
def process_dataset(dataset_id: str, source_url: str) -> None:
110-
logger.info(f"Processing dataset {dataset_id}")
111107
try:
112-
json_data = download_json(source_url)
113-
validate_json(dataset_id, json_data)
114-
json_path = write_json_to_file(
115-
f"data/{dataset_id}/{dataset_id}.json", json_data
116-
)
117-
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
118-
json_public_url, csv_public_url, xlsx_public_url = upload_files(
119-
dataset_id, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
120-
)
121-
save_dataset_metadata(
108+
publisher_name = json_data.get("publisher", {}).get("name", "")
109+
license_url = json_data.get("license", None)
110+
license_name = get_license_name_from_url(license_url) if license_url else None
111+
dataset = Dataset(
122112
dataset_id=dataset_id,
123113
source_url=source_url,
124-
json_data=json_data,
125-
json_url=json_public_url,
126-
csv_url=csv_public_url,
127-
xlsx_url=xlsx_public_url,
114+
publisher_name=publisher_name,
115+
license_url=license_url,
116+
license_name=license_name,
117+
json_url=json_url,
118+
csv_url=csv_url,
119+
xlsx_url=xlsx_url,
120+
updated_at=datetime.datetime.now(datetime.UTC),
128121
)
129-
logger.info(f"Processed dataset {dataset_id}")
122+
save_dataset(dataset)
130123
except Exception as e:
131-
logger.warning(f"Failed to process dataset {dataset_id} with error {e}")
124+
raise ProcessDatasetError(f"Failed to update metadata for dataset: {e}")
125+
126+
127+
def process_dataset(dataset_id: str, source_url: str) -> None:
128+
logger.info(f"Processing dataset {dataset_id}")
129+
json_data = download_json(source_url)
130+
validate_json(dataset_id, json_data)
131+
json_path = write_json_to_file(
132+
file_name=f"data/{dataset_id}/{dataset_id}.json",
133+
json_data=json_data,
134+
)
135+
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
136+
json_public_url, csv_public_url, xlsx_public_url = upload_files(
137+
dataset_id, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
138+
)
139+
save_dataset_metadata(
140+
dataset_id=dataset_id,
141+
source_url=source_url,
142+
json_data=json_data,
143+
json_url=json_public_url,
144+
csv_url=csv_public_url,
145+
xlsx_url=xlsx_public_url,
146+
)
147+
logger.info(f"Processed dataset {dataset_id}")
132148

133149

134150
def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
@@ -143,8 +159,17 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
143159
def process_registry() -> None:
144160
registered_datasets = fetch_registered_datasets()
145161
process_deleted_datasets(registered_datasets)
162+
errors: list[dict[str, Any]] = []
146163
for dataset_id, url in registered_datasets.items():
147-
process_dataset(dataset_id, url)
164+
try:
165+
process_dataset(dataset_id, url)
166+
except Exception as e:
167+
logger.warning(f"Failed to process dataset {dataset_id} with error {e}")
168+
errors.append({"dataset": dataset_id, "source_url": url, "errors": str(e)})
169+
if errors:
170+
logger.error(
171+
f"Errors while processing registry: {json.dumps(errors, indent=4)}"
172+
)
148173
logger.info("Finished processing all datasets")
149174

150175

tests/test_pipeline.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
from pytest_mock import MockerFixture
77

88
from oc4ids_datastore_pipeline.pipeline import (
9+
ProcessDatasetError,
910
download_json,
1011
process_dataset,
1112
process_deleted_datasets,
13+
process_registry,
1214
transform_to_csv_and_xlsx,
1315
validate_json,
1416
write_json_to_file,
@@ -19,7 +21,7 @@ def test_download_json_raises_failure_exception(mocker: MockerFixture) -> None:
1921
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
2022
patch_get.side_effect = Exception("Mocked exception")
2123

22-
with pytest.raises(Exception) as exc_info:
24+
with pytest.raises(ProcessDatasetError) as exc_info:
2325
download_json(url="https://test_dataset.json")
2426

2527
assert "Download failed" in str(exc_info.value)
@@ -32,7 +34,7 @@ def test_validate_json_raises_failure_exception(mocker: MockerFixture) -> None:
3234
)
3335
patch_oc4ids_json_output.side_effect = Exception("Mocked exception")
3436

35-
with pytest.raises(Exception) as exc_info:
37+
with pytest.raises(ProcessDatasetError) as exc_info:
3638
validate_json(dataset_id="test_dataset", json_data={})
3739

3840
assert "Validation failed" in str(exc_info.value)
@@ -45,13 +47,28 @@ def test_validate_json_raises_validation_errors_exception(
4547
patch_oc4ids_json_output = mocker.patch(
4648
"oc4ids_datastore_pipeline.pipeline.oc4ids_json_output"
4749
)
48-
patch_oc4ids_json_output.return_value = {"validation_errors_count": 2}
49-
50-
with pytest.raises(Exception) as exc_info:
50+
patch_oc4ids_json_output.return_value = {
51+
"validation_errors_count": 2,
52+
"validation_errors": [
53+
[
54+
'{"message": "Non-unique id values"}',
55+
[
56+
{
57+
"path": "projects/22/parties",
58+
"value": "test_value",
59+
},
60+
{"path": "projects/30/parties", "value": "test_value"},
61+
],
62+
]
63+
],
64+
}
65+
66+
with pytest.raises(ProcessDatasetError) as exc_info:
5167
validate_json(dataset_id="test_dataset", json_data={})
5268

5369
assert "Validation failed" in str(exc_info.value)
5470
assert "Dataset has 2 validation errors" in str(exc_info.value)
71+
assert "Non-unique id values" in str(exc_info.value)
5572

5673

5774
def test_write_json_to_file_writes_in_correct_format() -> None:
@@ -73,13 +90,13 @@ def test_write_json_to_file_raises_failure_exception(mocker: MockerFixture) -> N
7390
patch_json_dump = mocker.patch("oc4ids_datastore_pipeline.pipeline.json.dump")
7491
patch_json_dump.side_effect = Exception("Mocked exception")
7592

76-
with pytest.raises(Exception) as exc_info:
93+
with pytest.raises(ProcessDatasetError) as exc_info:
7794
with tempfile.TemporaryDirectory() as dir:
7895
file_name = os.path.join(dir, "test_dataset.json")
7996
write_json_to_file(file_name=file_name, json_data={"key": "value"})
8097

81-
assert "Error while writing to JSON file" in str(exc_info.value)
82-
assert "Mocked exception" in str(exc_info.value)
98+
assert "Error writing dataset to file" in str(exc_info.value)
99+
assert "Mocked exception" in str(exc_info.value)
83100

84101

85102
def test_transform_to_csv_and_xlsx_returns_correct_paths(mocker: MockerFixture) -> None:
@@ -122,10 +139,29 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None:
122139
patch_delete_files_for_dataset.assert_called_once_with("old_dataset")
123140

124141

125-
def test_process_dataset_catches_exception(mocker: MockerFixture) -> None:
142+
def test_process_dataset_raises_failure_exception(mocker: MockerFixture) -> None:
126143
patch_download_json = mocker.patch(
127144
"oc4ids_datastore_pipeline.pipeline.download_json"
128145
)
129-
patch_download_json.side_effect = Exception("Download failed")
146+
patch_download_json.side_effect = ProcessDatasetError("Download failed: Exception")
147+
148+
with pytest.raises(ProcessDatasetError) as exc_info:
149+
process_dataset("test_dataset", "https://test_dataset.json")
150+
151+
assert "Download failed: Exception" in str(exc_info.value)
152+
153+
154+
def test_process_registry_catches_exception(mocker: MockerFixture) -> None:
155+
patch_fetch_registered_datasets = mocker.patch(
156+
"oc4ids_datastore_pipeline.pipeline.fetch_registered_datasets"
157+
)
158+
patch_fetch_registered_datasets.return_value = {
159+
"test_dataset": "https://test_dataset.json"
160+
}
161+
mocker.patch("oc4ids_datastore_pipeline.pipeline.process_deleted_datasets")
162+
patch_process_dataset = mocker.patch(
163+
"oc4ids_datastore_pipeline.pipeline.process_dataset"
164+
)
165+
patch_process_dataset.side_effect = Exception("Mocked exception")
130166

131-
process_dataset("test_dataset", "https://test_dataset.json")
167+
process_registry()

0 commit comments

Comments
 (0)