Skip to content

Pull publisher countries through from registry #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions migrations/versions/b21b5de6ee2d_add_publisher_country_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""add publisher_country column

Revision ID: b21b5de6ee2d
Revises: ebb26242c904
Create Date: 2025-03-26 10:08:14.401880

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "b21b5de6ee2d"
down_revision: Union[str, None] = "ebb26242c904"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("dataset", sa.Column("publisher_country", sa.String(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("dataset", "publisher_country")
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions oc4ids_datastore_pipeline/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Dataset(Base):
dataset_id: Mapped[str] = mapped_column(String, primary_key=True)
source_url: Mapped[str] = mapped_column(String)
publisher_name: Mapped[str] = mapped_column(String)
publisher_country: Mapped[Optional[str]] = mapped_column(String, nullable=True)
license_url: Mapped[Optional[str]] = mapped_column(String, nullable=True)
license_title: Mapped[Optional[str]] = mapped_column(String, nullable=True)
license_title_short: Mapped[Optional[str]] = mapped_column(String, nullable=True)
Expand Down
21 changes: 14 additions & 7 deletions oc4ids_datastore_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[s
def save_dataset_metadata(
dataset_id: str,
source_url: str,
publisher_country: str,
json_data: dict[str, Any],
json_url: Optional[str],
csv_url: Optional[str],
Expand All @@ -122,6 +123,7 @@ def save_dataset_metadata(
dataset_id=dataset_id,
source_url=source_url,
publisher_name=publisher_name,
publisher_country=publisher_country,
license_url=license_url,
license_title=license_title,
license_title_short=license_title_short,
Expand All @@ -135,9 +137,9 @@ def save_dataset_metadata(
raise ProcessDatasetError(f"Failed to update metadata for dataset: {e}")


def process_dataset(dataset_id: str, source_url: str) -> None:
def process_dataset(dataset_id: str, registry_metadata: dict[str, str]) -> None:
logger.info(f"Processing dataset {dataset_id}")
json_data = download_json(dataset_id, source_url)
json_data = download_json(dataset_id, registry_metadata["source_url"])
validate_json(dataset_id, json_data)
json_path = write_json_to_file(
file_name=f"data/{dataset_id}/{dataset_id}.json",
Expand All @@ -149,7 +151,8 @@ def process_dataset(dataset_id: str, source_url: str) -> None:
)
save_dataset_metadata(
dataset_id=dataset_id,
source_url=source_url,
source_url=registry_metadata["source_url"],
publisher_country=registry_metadata["country"],
json_data=json_data,
json_url=json_public_url,
csv_url=csv_public_url,
Expand All @@ -158,7 +161,7 @@ def process_dataset(dataset_id: str, source_url: str) -> None:
logger.info(f"Processed dataset {dataset_id}")


def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
def process_deleted_datasets(registered_datasets: dict[str, dict[str, str]]) -> None:
stored_datasets = get_dataset_ids()
deleted_datasets = stored_datasets - registered_datasets.keys()
for dataset_id in deleted_datasets:
Expand All @@ -171,13 +174,17 @@ def process_registry() -> None:
registered_datasets = fetch_registered_datasets()
process_deleted_datasets(registered_datasets)
errors: list[dict[str, Any]] = []
for dataset_id, url in registered_datasets.items():
for dataset_id, registry_metadata in registered_datasets.items():
try:
process_dataset(dataset_id, url)
process_dataset(dataset_id, registry_metadata)
except Exception as e:
logger.warning(f"Failed to process dataset {dataset_id} with error {e}")
errors.append(
{"dataset_id": dataset_id, "source_url": url, "message": str(e)}
{
"dataset_id": dataset_id,
"source_url": registry_metadata["source_url"],
"message": str(e),
}
)
if errors:
logger.error(
Expand Down
7 changes: 5 additions & 2 deletions oc4ids_datastore_pipeline/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
_license_mappings = None


def fetch_registered_datasets() -> dict[str, str]:
def fetch_registered_datasets() -> dict[str, dict[str, str]]:
logger.info("Fetching registered datasets list from registry")
try:
url = "https://opendataservices.github.io/oc4ids-registry/datatig/type/dataset/records_api.json" # noqa: E501
r = requests.get(url)
r.raise_for_status()
json_data = r.json()
registered_datasets = {
key: value["fields"]["url"]["value"]
key: {
"source_url": value["fields"]["url"]["value"],
"country": value["fields"]["country"]["value"],
}
for (key, value) in json_data["records"].items()
}
registered_datasets_count = len(registered_datasets)
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "oc4ids-datastore-pipeline"
description = "OC4IDS Datastore Pipeline"
version = "0.4.0"
version = "0.5.0"
readme = "README.md"
dependencies = [
"alembic",
Expand Down Expand Up @@ -50,3 +50,4 @@ follow_untyped_imports = true
[tool.pytest.ini_options]
log_cli = true
log_cli_level = "INFO"
pythonpath = ["."]
10 changes: 7 additions & 3 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None:
"oc4ids_datastore_pipeline.pipeline.delete_files_for_dataset"
)

registered_datasets = {"test_dataset": "https://test_dataset.json"}
registered_datasets = {
"test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"}
}
process_deleted_datasets(registered_datasets)

patch_delete_dataset.assert_called_once_with("old_dataset")
Expand All @@ -146,7 +148,9 @@ def test_process_dataset_raises_failure_exception(mocker: MockerFixture) -> None
patch_download_json.side_effect = ProcessDatasetError("Download failed: Exception")

with pytest.raises(ProcessDatasetError) as exc_info:
process_dataset("test_dataset", "https://test_dataset.json")
process_dataset(
"test_dataset", {"source_url": "https://test_dataset.json", "country": "ab"}
)

assert "Download failed: Exception" in str(exc_info.value)

Expand All @@ -156,7 +160,7 @@ def test_process_registry_catches_exception(mocker: MockerFixture) -> None:
"oc4ids_datastore_pipeline.pipeline.fetch_registered_datasets"
)
patch_fetch_registered_datasets.return_value = {
"test_dataset": "https://test_dataset.json"
"test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"}
}
mocker.patch("oc4ids_datastore_pipeline.pipeline.process_deleted_datasets")
patch_process_dataset = mocker.patch(
Expand Down
11 changes: 9 additions & 2 deletions tests/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@ def test_fetch_registered_datasets(mocker: MockerFixture) -> None:
mock_response = MagicMock()
mock_response.json.return_value = {
"records": {
"test_dataset": {"fields": {"url": {"value": "https://test_dataset.json"}}}
"test_dataset": {
"fields": {
"url": {"value": "https://test_dataset.json"},
"country": {"value": "ab"},
}
}
}
}
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.return_value = mock_response

result = fetch_registered_datasets()

assert result == {"test_dataset": "https://test_dataset.json"}
assert result == {
"test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"}
}


def test_fetch_registered_datasets_raises_failure_exception(
Expand Down