Skip to content

Commit 8d9fa03

Browse files
feat: pull publisher countries through from registry
1 parent 39bbffb commit 8d9fa03

File tree

7 files changed

+69
-15
lines changed

7 files changed

+69
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""add publisher_country column
2+
3+
Revision ID: b21b5de6ee2d
4+
Revises: ebb26242c904
5+
Create Date: 2025-03-26 10:08:14.401880
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
from alembic import op
12+
import sqlalchemy as sa
13+
14+
15+
# revision identifiers, used by Alembic.
16+
revision: str = "b21b5de6ee2d"
17+
down_revision: Union[str, None] = "ebb26242c904"
18+
branch_labels: Union[str, Sequence[str], None] = None
19+
depends_on: Union[str, Sequence[str], None] = None
20+
21+
22+
def upgrade() -> None:
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
op.add_column("dataset", sa.Column("publisher_country", sa.String(), nullable=True))
25+
# ### end Alembic commands ###
26+
27+
28+
def downgrade() -> None:
29+
# ### commands auto generated by Alembic - please adjust! ###
30+
op.drop_column("dataset", "publisher_country")
31+
# ### end Alembic commands ###

oc4ids_datastore_pipeline/database.py

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class Dataset(Base):
2828
dataset_id: Mapped[str] = mapped_column(String, primary_key=True)
2929
source_url: Mapped[str] = mapped_column(String)
3030
publisher_name: Mapped[str] = mapped_column(String)
31+
publisher_country: Mapped[Optional[str]] = mapped_column(String, nullable=True)
3132
license_url: Mapped[Optional[str]] = mapped_column(String, nullable=True)
3233
license_title: Mapped[Optional[str]] = mapped_column(String, nullable=True)
3334
license_title_short: Mapped[Optional[str]] = mapped_column(String, nullable=True)

oc4ids_datastore_pipeline/pipeline.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[s
106106
def save_dataset_metadata(
107107
dataset_id: str,
108108
source_url: str,
109+
publisher_country: str,
109110
json_data: dict[str, Any],
110111
json_url: Optional[str],
111112
csv_url: Optional[str],
@@ -122,6 +123,7 @@ def save_dataset_metadata(
122123
dataset_id=dataset_id,
123124
source_url=source_url,
124125
publisher_name=publisher_name,
126+
publisher_country=publisher_country,
125127
license_url=license_url,
126128
license_title=license_title,
127129
license_title_short=license_title_short,
@@ -135,9 +137,9 @@ def save_dataset_metadata(
135137
raise ProcessDatasetError(f"Failed to update metadata for dataset: {e}")
136138

137139

138-
def process_dataset(dataset_id: str, source_url: str) -> None:
140+
def process_dataset(dataset_id: str, registry_metadata: dict[str, str]) -> None:
139141
logger.info(f"Processing dataset {dataset_id}")
140-
json_data = download_json(dataset_id, source_url)
142+
json_data = download_json(dataset_id, registry_metadata["source_url"])
141143
validate_json(dataset_id, json_data)
142144
json_path = write_json_to_file(
143145
file_name=f"data/{dataset_id}/{dataset_id}.json",
@@ -149,7 +151,8 @@ def process_dataset(dataset_id: str, source_url: str) -> None:
149151
)
150152
save_dataset_metadata(
151153
dataset_id=dataset_id,
152-
source_url=source_url,
154+
source_url=registry_metadata["source_url"],
155+
publisher_country=registry_metadata["country"],
153156
json_data=json_data,
154157
json_url=json_public_url,
155158
csv_url=csv_public_url,
@@ -158,7 +161,7 @@ def process_dataset(dataset_id: str, source_url: str) -> None:
158161
logger.info(f"Processed dataset {dataset_id}")
159162

160163

161-
def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
164+
def process_deleted_datasets(registered_datasets: dict[str, dict[str, str]]) -> None:
162165
stored_datasets = get_dataset_ids()
163166
deleted_datasets = stored_datasets - registered_datasets.keys()
164167
for dataset_id in deleted_datasets:
@@ -171,13 +174,17 @@ def process_registry() -> None:
171174
registered_datasets = fetch_registered_datasets()
172175
process_deleted_datasets(registered_datasets)
173176
errors: list[dict[str, Any]] = []
174-
for dataset_id, url in registered_datasets.items():
177+
for dataset_id, registry_metadata in registered_datasets.items():
175178
try:
176-
process_dataset(dataset_id, url)
179+
process_dataset(dataset_id, registry_metadata)
177180
except Exception as e:
178181
logger.warning(f"Failed to process dataset {dataset_id} with error {e}")
179182
errors.append(
180-
{"dataset_id": dataset_id, "source_url": url, "message": str(e)}
183+
{
184+
"dataset_id": dataset_id,
185+
"source_url": registry_metadata["source_url"],
186+
"message": str(e),
187+
}
181188
)
182189
if errors:
183190
logger.error(

oc4ids_datastore_pipeline/registry.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@
99
_license_mappings = None
1010

1111

12-
def fetch_registered_datasets() -> dict[str, str]:
12+
def fetch_registered_datasets() -> dict[str, dict[str, str]]:
1313
logger.info("Fetching registered datasets list from registry")
1414
try:
1515
url = "https://opendataservices.github.io/oc4ids-registry/datatig/type/dataset/records_api.json" # noqa: E501
1616
r = requests.get(url)
1717
r.raise_for_status()
1818
json_data = r.json()
1919
registered_datasets = {
20-
key: value["fields"]["url"]["value"]
20+
key: {
21+
"source_url": value["fields"]["url"]["value"],
22+
"country": value["fields"]["country"]["value"],
23+
}
2124
for (key, value) in json_data["records"].items()
2225
}
2326
registered_datasets_count = len(registered_datasets)

pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi"
55
[project]
66
name = "oc4ids-datastore-pipeline"
77
description = "OC4IDS Datastore Pipeline"
8-
version = "0.4.0"
8+
version = "0.5.0"
99
readme = "README.md"
1010
dependencies = [
1111
"alembic",
@@ -50,3 +50,4 @@ follow_untyped_imports = true
5050
[tool.pytest.ini_options]
5151
log_cli = true
5252
log_cli_level = "INFO"
53+
pythonpath = ["."]

tests/test_pipeline.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None:
132132
"oc4ids_datastore_pipeline.pipeline.delete_files_for_dataset"
133133
)
134134

135-
registered_datasets = {"test_dataset": "https://test_dataset.json"}
135+
registered_datasets = {
136+
"test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"}
137+
}
136138
process_deleted_datasets(registered_datasets)
137139

138140
patch_delete_dataset.assert_called_once_with("old_dataset")
@@ -146,7 +148,9 @@ def test_process_dataset_raises_failure_exception(mocker: MockerFixture) -> None
146148
patch_download_json.side_effect = ProcessDatasetError("Download failed: Exception")
147149

148150
with pytest.raises(ProcessDatasetError) as exc_info:
149-
process_dataset("test_dataset", "https://test_dataset.json")
151+
process_dataset(
152+
"test_dataset", {"source_url": "https://test_dataset.json", "country": "ab"}
153+
)
150154

151155
assert "Download failed: Exception" in str(exc_info.value)
152156

@@ -156,7 +160,7 @@ def test_process_registry_catches_exception(mocker: MockerFixture) -> None:
156160
"oc4ids_datastore_pipeline.pipeline.fetch_registered_datasets"
157161
)
158162
patch_fetch_registered_datasets.return_value = {
159-
"test_dataset": "https://test_dataset.json"
163+
"test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"}
160164
}
161165
mocker.patch("oc4ids_datastore_pipeline.pipeline.process_deleted_datasets")
162166
patch_process_dataset = mocker.patch(

tests/test_registry.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,22 @@ def test_fetch_registered_datasets(mocker: MockerFixture) -> None:
1414
mock_response = MagicMock()
1515
mock_response.json.return_value = {
1616
"records": {
17-
"test_dataset": {"fields": {"url": {"value": "https://test_dataset.json"}}}
17+
"test_dataset": {
18+
"fields": {
19+
"url": {"value": "https://test_dataset.json"},
20+
"country": {"value": "ab"},
21+
}
22+
}
1823
}
1924
}
2025
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
2126
patch_get.return_value = mock_response
2227

2328
result = fetch_registered_datasets()
2429

25-
assert result == {"test_dataset": "https://test_dataset.json"}
30+
assert result == {
31+
"test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"}
32+
}
2633

2734

2835
def test_fetch_registered_datasets_raises_failure_exception(

0 commit comments

Comments
 (0)