diff --git a/dcpy/connectors/edm/__init__.py b/dcpy/connectors/edm/__init__.py index e69de29bb2..50a8eab0bc 100644 --- a/dcpy/connectors/edm/__init__.py +++ b/dcpy/connectors/edm/__init__.py @@ -0,0 +1 @@ +# EDM connectors package diff --git a/dcpy/connectors/edm/builds.py b/dcpy/connectors/edm/builds.py deleted file mode 100644 index ba42cee925..0000000000 --- a/dcpy/connectors/edm/builds.py +++ /dev/null @@ -1,196 +0,0 @@ -from dataclasses import asdict -from datetime import datetime -from pathlib import Path - -import pytz - -from dcpy.configuration import ( - BUILD_NAME, - CI, - PUBLISHING_BUCKET, - PUBLISHING_BUCKET_ROOT_FOLDER, -) -from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType -from dcpy.connectors.registry import VersionedConnector -from dcpy.models.connectors.edm.publishing import ( - BuildKey, -) -from dcpy.utils import git, s3 -from dcpy.utils.logging import logger - - -def _bucket() -> str: - assert PUBLISHING_BUCKET, ( - "'PUBLISHING_BUCKET' must be defined to use edm.recipes connector" - ) - return PUBLISHING_BUCKET - - -_TEMP_PUBLISHING_FILE_SUFFIXES = { - ".zip", - ".parquet", - ".csv", - ".pdf", - ".xlsx", - ".json", - ".text", -} - - -def get_builds(product: str) -> list[str]: - """Get all build versions for a product.""" - return sorted(s3.get_subfolders(_bucket(), f"{product}/build/"), reverse=True) - - -class BuildsConnector(VersionedConnector, arbitrary_types_allowed=True): - conn_type: str = "edm.publishing.builds" - _storage: PathedStorageConnector | None = None - - def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): - """Initialize BuildsConnector with optional storage.""" - super().__init__(**kwargs) - if storage is not None: - self._storage = storage - - @property - def storage(self) -> PathedStorageConnector: - """Lazy-loaded storage connector. Only initializes when first accessed.""" - if self._storage is None: - self._storage = PathedStorageConnector.from_storage_kwargs( - conn_type="edm.publishing.builds", - storage_backend=StorageType.S3, - s3_bucket=_bucket(), - root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, - _validate_root_path=False, - ) - return self._storage - - @staticmethod - def create() -> "BuildsConnector": - """Create a BuildsConnector with lazy-loaded S3 storage.""" - return BuildsConnector() - - def _generate_metadata(self) -> dict[str, str]: - """Generates "standard" s3 metadata for our files""" - metadata = { - "date-created": datetime.now(pytz.timezone("America/New_York")).isoformat() - } - metadata["commit"] = git.commit_hash() - if CI: - metadata["run-url"] = git.action_url() - return metadata - - def _upload_build( - self, - build_dir: Path, - product: str, - *, - acl: s3.ACL | None = None, - build_name: str | None = None, - # max_files: int = s3.MAX_FILE_COUNT, # TODO - ) -> BuildKey: - """ - Uploads a product build to an S3 bucket using cloudpathlib. - - This function handles uploading a local output folder to a specified - location in an S3 bucket. The path, product, and build name must be - provided, along with an optional ACL (Access Control List) to control - file access in S3. - - Raises: - FileNotFoundError: If the provided output_path does not exist. - ValueError: If the build name is not provided and cannot be found in the environment variables. - """ - if not build_dir.exists(): - raise FileNotFoundError(f"Path {build_dir} does not exist") - build_name = build_name or BUILD_NAME - if not build_name: - raise ValueError( - f"Build name supplied via CLI or the env var 'BUILD_NAME' cannot be '{build_name}'." - ) - build_key = BuildKey(product, build_name) - - logger.info(f'Uploading {build_dir} to {build_key.path} with ACL "{acl}"') - self.storage.push( - key=build_key.path, - filepath=str(build_dir), - acl=str(acl), - metadata=self._generate_metadata(), - ) - - return build_key - - def push_versioned(self, key: str, version: str, **kwargs) -> dict: - # For builds, the "version" is the build name/ID - connector_args = kwargs["connector_args"] - acl = ( - s3.string_as_acl(connector_args["acl"]) - if connector_args.get("acl") - else None - ) - - logger.info(f"Pushing build for product: {key}, build: {version}") - result = self._upload_build( - build_dir=kwargs["build_path"], - product=key, - acl=acl, - build_name=version, - ) - return asdict(result) - - def _pull( - self, - key: str, - version: str, - destination_path: Path, - *, - filepath: str = "", - **kwargs, - ) -> dict: - build_key = BuildKey(key, version) - - source_key = f"{build_key.path}/{filepath}" - - if not self.storage.exists(source_key): - raise FileNotFoundError(f"File {source_key} not found") - - # Determine output path - is_file_path = destination_path.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES - output_filepath = ( - destination_path / Path(filepath).name - if not is_file_path - else destination_path - ) - - logger.info( - f"Downloading {build_key}, {filepath}, {source_key} -> {output_filepath}" - ) - - self.storage.pull(key=source_key, destination_path=output_filepath) - return {"path": output_filepath} - - def pull_versioned( - self, key: str, version: str, destination_path: Path, **kwargs - ) -> dict: - return self._pull(key, version, destination_path, **kwargs) - - def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: - """List all build versions (build names) for a product.""" - build_folder_key = f"{key}/build" - if not self.storage.exists(build_folder_key): - return [] - - return sorted(self.storage.get_subfolders(build_folder_key), reverse=sort_desc) - - def get_latest_version(self, key: str, **kwargs) -> str: - """Builds don't have a meaningful 'latest' version concept.""" - raise NotImplementedError( - "Builds don't have a meaningful 'latest' version. Use list_versions() to see available builds." - ) - - def version_exists(self, key: str, version: str, **kwargs) -> bool: - """Check if a specific build exists.""" - return version in self.list_versions(key) - - def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: - return Path("edm") / "builds" / "datasets" / key / version diff --git a/dcpy/connectors/edm/connectors.py b/dcpy/connectors/edm/connectors.py new file mode 100644 index 0000000000..deb0e87a64 --- /dev/null +++ b/dcpy/connectors/edm/connectors.py @@ -0,0 +1,474 @@ +"""Unified EDM connectors for publishing workflows. + +This module provides connectors for different stages of the EDM publishing pipeline: +- DraftsConnector: For draft versions with version.revision structure +- BuildsConnector: For build artifacts +- PublishedConnector: For published versions +- PlanConnector: For planned recipes +""" + +from dataclasses import asdict, dataclass +from datetime import datetime +from pathlib import Path +from typing import Callable + +import pytz + +from dcpy.configuration import ( + BUILD_NAME, + CI, + PUBLISHING_BUCKET, + PUBLISHING_BUCKET_ROOT_FOLDER, +) +from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType +from dcpy.connectors.registry import VersionedConnector +from dcpy.models.connectors.edm.publishing import ( + BuildKey, + DraftKey, + PlanKey, + ProductKey, + PublishKey, +) +from dcpy.utils import git, s3 +from dcpy.utils.logging import logger + +_TEMP_PUBLISHING_FILE_SUFFIXES = { + ".zip", + ".parquet", + ".csv", + ".pdf", + ".xlsx", + ".json", + ".text", +} + + +def _bucket() -> str: + assert PUBLISHING_BUCKET, ( + "'PUBLISHING_BUCKET' must be defined to use edm connectors" + ) + return PUBLISHING_BUCKET + + +@dataclass +class EdmConnectorConfig: + """Configuration for EDM connector behavior.""" + + conn_type: str + folder_name: str # e.g., "draft", "build", "publish", "plan" + key_factory: Callable[ + [str, str, dict], ProductKey + ] # (product, version, kwargs) -> ProductKey + version_parser: Callable[[str], dict] | None = ( + None # version string -> dict of parsed parts + ) + list_versions_impl: Callable[["EdmConnector", str, dict], list[str]] | None = None + supports_latest: bool = True + metadata_generator: Callable[[], dict] | None = None + + +class EdmConnector(VersionedConnector, arbitrary_types_allowed=True): + """Unified connector for all EDM publishing workflows.""" + + conn_type: str + _storage: PathedStorageConnector | None = None + _config: EdmConnectorConfig + + def __init__( + self, + config: EdmConnectorConfig, + storage: PathedStorageConnector | None = None, + **kwargs, + ): + # Pass conn_type to Pydantic initialization + super().__init__(conn_type=config.conn_type, **kwargs) + self._config = config + if storage is not None: + self._storage = storage + + @property + def storage(self) -> PathedStorageConnector: + if self._storage is None: + self._storage = PathedStorageConnector.from_storage_kwargs( + conn_type=self.conn_type, + storage_backend=StorageType.S3, + s3_bucket=_bucket(), + root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, + _validate_root_path=False, + ) + return self._storage + + def _parse_version(self, version: str) -> tuple[str, str]: + """Parse version string using the configured version_parser. + + Returns a tuple (version, revision) for backwards compatibility with tests. + """ + if self._config.version_parser: + parsed = self._config.version_parser(version) + # Return tuple for backwards compatibility + return parsed.get("version", ""), parsed.get("revision", "") + return ("", "") + + def _download_file( + self, product_key: ProductKey, filepath: str, output_dir: Path | None = None + ) -> Path: + output_dir = output_dir or Path(".") + is_file_path = output_dir.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES + output_filepath = ( + output_dir / Path(filepath).name if not is_file_path else output_dir + ) + logger.info(f"Downloading {product_key}, {filepath} -> {output_filepath}") + + source_key = f"{product_key.path}/{filepath}" + if not self.storage.exists(source_key): + raise FileNotFoundError(f"File {source_key} not found") + + self.storage.pull(key=source_key, destination_path=output_filepath) + return output_filepath + + def pull_versioned( + self, key: str, version: str, destination_path: Path, **kwargs + ) -> dict: + # Parse version if needed + version_parts = {} + if self._config.version_parser: + version_parts = self._config.version_parser(version) + + # Create the product key + product_key = self._config.key_factory(key, version, version_parts) + + # Handle filepath/dataset parameters + filepath = kwargs.get("filepath", kwargs.get("source_path", "")) + dataset = kwargs.get("dataset") + path_prefix = f"{dataset}/" if dataset else "" + full_filepath = f"{path_prefix}{filepath}" + + pulled_path = self._download_file(product_key, full_filepath, destination_path) + return {"path": pulled_path} + + def push_versioned(self, key: str, version: str, **kwargs) -> dict: + source_path = kwargs.get("source_path") + if not source_path: + raise ValueError("source_path is required for push_versioned") + + # Parse version if needed + version_parts = {} + if self._config.version_parser: + version_parts = self._config.version_parser(version) + + # Create the product key + product_key = self._config.key_factory(key, version, version_parts) + + # Build target key + target_path = kwargs.get("target_path", "") + full_target_key = ( + f"{product_key.path}/{target_path}" if target_path else product_key.path + ) + + logger.info( + f"Pushing to {self._config.folder_name}: {source_path} -> {full_target_key}" + ) + + # Add metadata if available + push_kwargs = { + k: v for k, v in kwargs.items() if k not in ["source_path", "target_path"] + } + if self._config.metadata_generator and "metadata" not in push_kwargs: + push_kwargs["metadata"] = self._config.metadata_generator() + + result = self.storage.push( + key=full_target_key, filepath=source_path, **push_kwargs + ) + return result + + def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: + if self._config.list_versions_impl: + versions = self._config.list_versions_impl(self, key, kwargs) + else: + # Default implementation + folder_key = f"{key}/{self._config.folder_name}" + if not self.storage.exists(folder_key): + return [] + versions = self.storage.get_subfolders(folder_key) + # Filter out 'latest' for published + if "exclude_latest" in kwargs and kwargs["exclude_latest"]: + versions = [v for v in versions if v != "latest"] + + return sorted(versions, reverse=sort_desc) + + def get_latest_version(self, key: str, **kwargs) -> str: + if not self._config.supports_latest: + raise NotImplementedError( + "Builds don't have a meaningful 'latest' version. Use list_versions() to see available builds." + ) + return self.list_versions(key)[0] + + def version_exists(self, key: str, version: str, **kwargs) -> bool: + return version in self.list_versions(key) + + def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: + # This could also be configurable if needed + return Path("edm") / self._config.folder_name / "datasets" / key / version + + +# Version parsers +def _parse_draft_version(version: str) -> dict: + """Parse version in format 'version.revision' into dict.""" + if "." not in version: + raise ValueError(f"Version '{version}' should be in format 'version.revision'") + parts = version.rsplit(".", 1) + return {"version": parts[0], "revision": parts[1]} + + +# Key factories +def _create_draft_key(product: str, version: str, parsed: dict) -> DraftKey: + return DraftKey(product, version=parsed["version"], revision=parsed["revision"]) + + +def _create_build_key(product: str, version: str, parsed: dict) -> BuildKey: + return BuildKey(product, build=version) + + +def _create_publish_key(product: str, version: str, parsed: dict) -> PublishKey: + return PublishKey(product, version=version) + + +def _create_plan_key(product: str, version: str, parsed: dict) -> PlanKey: + return PlanKey(product, version=version) + + +# List versions implementations +def _list_draft_versions( + connector: "EdmConnector", key: str, kwargs: dict +) -> list[str]: + """List draft versions in version.revision format.""" + draft_folder_key = f"{key}/draft" + if not connector.storage.exists(draft_folder_key): + return [] + + versions = [] + version_folders = connector.storage.get_subfolders(draft_folder_key) + for version_name in version_folders: + revision_folder_key = f"{key}/draft/{version_name}" + revision_folders = connector.storage.get_subfolders(revision_folder_key) + for revision_name in revision_folders: + versions.append(f"{version_name}.{revision_name}") + + return versions + + +# Metadata generators +def _generate_build_metadata() -> dict: + """Generate standard S3 metadata for builds.""" + metadata = { + "date-created": datetime.now(pytz.timezone("America/New_York")).isoformat(), + "commit": git.commit_hash(), + } + if CI: + metadata["run-url"] = git.action_url() + return metadata + + +# Factory functions +def create_drafts_connector( + storage: PathedStorageConnector | None = None, +) -> EdmConnector: + config = EdmConnectorConfig( + conn_type="edm.publishing.drafts", + folder_name="draft", + key_factory=_create_draft_key, + version_parser=_parse_draft_version, + list_versions_impl=_list_draft_versions, + ) + return EdmConnector(config=config, storage=storage) + + +def create_builds_connector( + storage: PathedStorageConnector | None = None, +) -> "_BuildsConnector": + config = EdmConnectorConfig( + conn_type="edm.publishing.builds", + folder_name="build", + key_factory=_create_build_key, + supports_latest=False, + metadata_generator=_generate_build_metadata, + ) + # Create instance using object.__new__ to bypass custom __new__ logic + connector = object.__new__(_BuildsConnector) + EdmConnector.__init__(connector, config=config, storage=storage) + return connector + + +def create_published_connector( + storage: PathedStorageConnector | None = None, +) -> EdmConnector: + config = EdmConnectorConfig( + conn_type="edm.publishing.published", + folder_name="publish", + key_factory=_create_publish_key, + ) + return EdmConnector(config=config, storage=storage) + + +def create_plan_connector( + storage: PathedStorageConnector | None = None, +) -> EdmConnector: + config = EdmConnectorConfig( + conn_type="edm.publishing.plan", + folder_name="plan", + key_factory=_create_plan_key, + ) + return EdmConnector(config=config, storage=storage) + + +# Connector wrapper classes for backwards compatibility +class DraftsConnector: + """Connector for EDM draft publishing workflows. + + This is a compatibility wrapper around the unified EdmConnector. + """ + + def __new__( + cls, storage: PathedStorageConnector | None = None, **kwargs + ) -> EdmConnector: + """Create a DraftsConnector instance.""" + return create_drafts_connector(storage=storage) + + @staticmethod + def create(storage: PathedStorageConnector | None = None) -> EdmConnector: + """Create a DraftsConnector with lazy-loaded S3 storage.""" + return create_drafts_connector(storage=storage) + + +class _BuildsConnector(EdmConnector): + """Internal BuildsConnector implementation. + + Extends the unified EdmConnector with build-specific push logic. + """ + + def _upload_build( + self, + build_dir: Path, + product: str, + *, + acl: s3.ACL | None = None, + build_name: str | None = None, + ) -> BuildKey: + """ + Uploads a product build to an S3 bucket using cloudpathlib. + + This function handles uploading a local output folder to a specified + location in an S3 bucket. The path, product, and build name must be + provided, along with an optional ACL (Access Control List) to control + file access in S3. + + Raises: + FileNotFoundError: If the provided output_path does not exist. + ValueError: If the build name is not provided and cannot be found in the environment variables. + """ + if not build_dir.exists(): + raise FileNotFoundError(f"Path {build_dir} does not exist") + build_name = build_name or BUILD_NAME + if not build_name: + raise ValueError( + f"Build name supplied via CLI or the env var 'BUILD_NAME' cannot be '{build_name}'." + ) + build_key = BuildKey(product, build_name) + + logger.info(f'Uploading {build_dir} to {build_key.path} with ACL "{acl}"') + + # Generate metadata using the config's metadata_generator + metadata = ( + self._config.metadata_generator() if self._config.metadata_generator else {} + ) + + self.storage.push( + key=build_key.path, + filepath=str(build_dir), + acl=str(acl), + metadata=metadata, + ) + + return build_key + + def push_versioned(self, key: str, version: str, **kwargs) -> dict: + # For builds, the "version" is the build name/ID + connector_args = kwargs["connector_args"] + acl = ( + s3.string_as_acl(connector_args["acl"]) + if connector_args.get("acl") + else None + ) + + logger.info(f"Pushing build for product: {key}, build: {version}") + result = self._upload_build( + build_dir=kwargs["build_path"], + product=key, + acl=acl, + build_name=version, + ) + return asdict(result) + + def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: + # Builds use "builds" (plural) in the path + return Path("edm") / "builds" / "datasets" / key / version + + +class BuildsConnector: + """Connector for EDM build publishing workflows. + + This is a compatibility wrapper around the unified EdmConnector. + """ + + def __new__( + cls, storage: PathedStorageConnector | None = None, **kwargs + ) -> "_BuildsConnector": + """Create a BuildsConnector instance.""" + return create_builds_connector(storage=storage) + + @staticmethod + def create(storage: PathedStorageConnector | None = None) -> "_BuildsConnector": + """Create a BuildsConnector with lazy-loaded S3 storage.""" + return create_builds_connector(storage=storage) + + +class PublishedConnector: + """Connector for EDM published workflows. + + This is a compatibility wrapper around the unified EdmConnector. + """ + + def __new__( + cls, storage: PathedStorageConnector | None = None, **kwargs + ) -> EdmConnector: + """Create a PublishedConnector instance.""" + return create_published_connector(storage=storage) + + @staticmethod + def create(storage: PathedStorageConnector | None = None) -> EdmConnector: + """Create a PublishedConnector with lazy-loaded S3 storage.""" + return create_published_connector(storage=storage) + + +class PlanConnector: + """Connector for EDM plan workflows. + + This is a compatibility wrapper around the unified EdmConnector. + """ + + def __new__( + cls, storage: PathedStorageConnector | None = None, **kwargs + ) -> EdmConnector: + """Create a PlanConnector instance.""" + return create_plan_connector(storage=storage) + + @staticmethod + def create(storage: PathedStorageConnector | None = None) -> EdmConnector: + """Create a PlanConnector with lazy-loaded S3 storage.""" + return create_plan_connector(storage=storage) + + +# Helper function for external use +def get_builds(product: str) -> list[str]: + """Get all build versions for a product.""" + return sorted(s3.get_subfolders(_bucket(), f"{product}/build/"), reverse=True) diff --git a/dcpy/connectors/edm/drafts.py b/dcpy/connectors/edm/drafts.py deleted file mode 100644 index 459dd35591..0000000000 --- a/dcpy/connectors/edm/drafts.py +++ /dev/null @@ -1,217 +0,0 @@ -from pathlib import Path - -from dcpy.configuration import ( - PUBLISHING_BUCKET, - PUBLISHING_BUCKET_ROOT_FOLDER, -) -from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType -from dcpy.connectors.registry import VersionedConnector -from dcpy.models.connectors.edm.publishing import ( - DraftKey, - ProductKey, -) -from dcpy.utils.logging import logger - -_TEMP_PUBLISHING_FILE_SUFFIXES = { - ".zip", - ".parquet", - ".csv", - ".pdf", - ".xlsx", - ".json", - ".text", -} - - -def _bucket() -> str: - assert PUBLISHING_BUCKET, ( - "'PUBLISHING_BUCKET' must be defined to use edm.recipes connector" - ) - return PUBLISHING_BUCKET - - -class DraftsConnector(VersionedConnector, arbitrary_types_allowed=True): - conn_type: str = "edm.publishing.drafts" - _storage: PathedStorageConnector | None = None - - def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): - """Initialize DraftsConnector with optional storage.""" - super().__init__(**kwargs) - if storage is not None: - self._storage = storage - - @property - def storage(self) -> PathedStorageConnector: - """Lazy-loaded storage connector. Only initializes when first accessed.""" - if self._storage is None: - self._storage = PathedStorageConnector.from_storage_kwargs( - conn_type="edm.publishing.drafts", - storage_backend=StorageType.S3, - s3_bucket=_bucket(), - root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, - _validate_root_path=False, - ) - return self._storage - - @staticmethod - def create() -> "DraftsConnector": - """Create a DraftsConnector with lazy-loaded S3 storage.""" - return DraftsConnector() - - def _parse_version(self, version: str) -> tuple[str, str]: - """Parse version in format 'version.revision' into (version, revision).""" - if "." not in version: - raise ValueError( - f"Version '{version}' should be in format 'version.revision'" - ) - parts = version.rsplit(".", 1) # Split on last dot - return parts[0], parts[1] - - def _get_draft_versions(self, key: str) -> list[str]: - """Get all draft revisions for a specific product. Key should be product name.""" - product = key - draft_folder_key = f"{product}/draft" - if not self.storage.exists(draft_folder_key): - return [] - - # Get all version folders - version_folders = self.storage.get_subfolders(draft_folder_key) - - # Get all version.revision combinations - versions = [] - for version_name in version_folders: - revision_folder_key = f"{product}/draft/{version_name}" - revision_folders = self.storage.get_subfolders(revision_folder_key) - for revision_name in revision_folders: - versions.append(f"{version_name}.{revision_name}") - return sorted(versions, reverse=True) - - def _download_file( - self, product_key: ProductKey, filepath: str, output_dir: Path | None = None - ) -> Path: - """Download a file from storage using PathedStorageConnector.""" - output_dir = output_dir or Path(".") - is_file_path = output_dir.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES - output_filepath = ( - output_dir / Path(filepath).name if not is_file_path else output_dir - ) - logger.info(f"Downloading {product_key}, {filepath} -> {output_filepath}") - - source_key = f"{product_key.path}/{filepath}" - if not self.storage.exists(source_key): - raise FileNotFoundError(f"File {source_key} not found") - - # Use PathedStorageConnector's pull method - self.storage.pull(key=source_key, destination_path=output_filepath) - return output_filepath - - def _pull( - self, - key: str, - version: str, - destination_path: Path, - *, - revision: str, - source_path: str = "", - dataset: str | None = None, - **kwargs, - ) -> dict: - """Pulls draft to destination path. - - `source_path` can be either a file or a directory. When it is a directory - the contents of that directory will be copied recursively to destination_path - """ - # key is product name, version is 'version.revision' format - product = key - - draft_key = DraftKey(product, version=version, revision=revision) - - path_prefix = dataset + "/" if dataset else "" - file_path = f"{path_prefix}{source_path}" - logger.info( - f"Pulling Draft for {draft_key}, path={file_path}, to={destination_path}" - ) - pulled_path = self._download_file( - draft_key, file_path, output_dir=destination_path - ) - return {"path": pulled_path} - - def pull_versioned( - self, key: str, version: str, destination_path: Path, **kwargs - ) -> dict: - version_parsed, revision_parsed = self._parse_version(version) - # Remove revision from kwargs to avoid duplicate parameter - kwargs.pop("revision", None) - return self._pull( - key, - version=version_parsed, - destination_path=destination_path, - revision=revision_parsed, - **kwargs, - ) - - def push_versioned(self, key: str, version: str, **kwargs) -> dict: - """Push data to drafts folder with version.revision structure. - - Args: - key: Product name - version: Version in format 'version.revision' - **kwargs: Additional arguments including: - - source_path: Local path to push from - - acl: Access control level - - target_path: Optional specific target path within version folder - """ - product = key - draft_version, revision = self._parse_version(version) - - source_path = kwargs.get("source_path") - if not source_path: - raise ValueError("source_path is required for push_versioned") - - # Build the draft path: {product}/draft/{version}/{revision}/ - draft_folder_key = f"{product}/draft/{draft_version}/{revision}" - - # If target_path specified, use it; otherwise use the full folder - target_path = kwargs.get("target_path", "") - full_target_key = ( - f"{draft_folder_key}/{target_path}" if target_path else draft_folder_key - ) - - logger.info(f"Pushing to draft: {source_path} -> {full_target_key}") - - # Use PathedStorageConnector's push method - result = self.storage.push( - key=full_target_key, - filepath=source_path, - **{ - k: v - for k, v in kwargs.items() - if k not in ["source_path", "target_path"] - }, - ) - - return result - - def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: - logger.info(f"Listing versions for {key}") - return self._get_draft_versions(key) - - def get_latest_version(self, key: str, **kwargs) -> str: - return self.list_versions(key)[0] - - def version_exists(self, key: str, version: str, **kwargs) -> bool: - return version in self.list_versions(key) - - def data_local_sub_path( - self, key: str, *, version: str, revision: str, **kwargs - ) -> Path: - product = key - draft_version, draft_revision = self._parse_version(version) - return ( - Path("edm") - / "publishing" - / "datasets" - / product - / draft_version - / draft_revision - ) diff --git a/dcpy/connectors/edm/published.py b/dcpy/connectors/edm/published.py deleted file mode 100644 index bd85871756..0000000000 --- a/dcpy/connectors/edm/published.py +++ /dev/null @@ -1,174 +0,0 @@ -from pathlib import Path - -from dcpy.configuration import PUBLISHING_BUCKET, PUBLISHING_BUCKET_ROOT_FOLDER -from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType -from dcpy.connectors.registry import VersionedConnector -from dcpy.models.connectors.edm.publishing import PublishKey -from dcpy.utils.logging import logger - - -def _bucket() -> str: - assert PUBLISHING_BUCKET, ( - "'PUBLISHING_BUCKET' must be defined to use edm.published connector" - ) - return PUBLISHING_BUCKET - - -# This is a (hopefully) temporary hack while we think about -# distinguishing filepaths vs directories in the connector interface. -_TEMP_PUBLISHING_FILE_SUFFIXES = { - ".zip", - ".parquet", - ".csv", - ".pdf", - ".xlsx", - ".json", - ".text", -} - - -class PublishedConnector(VersionedConnector, arbitrary_types_allowed=True): - conn_type: str = "edm.publishing.published" - _storage: PathedStorageConnector | None = None - - def __init__(self, storage: PathedStorageConnector | None = None, **kwargs): - """Initialize PublishedConnector with optional storage.""" - super().__init__(**kwargs) - if storage is not None: - self._storage = storage - - @property - def storage(self) -> PathedStorageConnector: - """Lazy-loaded storage connector. Only initializes when first accessed.""" - if self._storage is None: - self._storage = PathedStorageConnector.from_storage_kwargs( - conn_type="edm.publishing.published", - storage_backend=StorageType.S3, - s3_bucket=_bucket(), - root_folder=PUBLISHING_BUCKET_ROOT_FOLDER, - _validate_root_path=False, - ) - return self._storage - - @staticmethod - def create() -> "PublishedConnector": - """Create a PublishedConnector with lazy-loaded S3 storage.""" - return PublishedConnector() - - def _download_file( - self, product_key, filepath: str, output_dir: Path | None = None - ) -> Path: - """Download a file from storage using cloudpathlib.""" - output_dir = output_dir or Path(".") - is_file_path = output_dir.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES - output_filepath = ( - output_dir / Path(filepath).name if not is_file_path else output_dir - ) - logger.info(f"Downloading {product_key}, {filepath} -> {output_filepath}") - - source_key = f"{product_key.path}/{filepath}" - if not self.storage.exists(source_key): - raise FileNotFoundError(f"File {source_key} not found") - - # Use PathedStorageConnector's pull method - self.storage.pull(key=source_key, destination_path=output_filepath) - return output_filepath - - def _get_published_versions( - self, key: str, exclude_latest: bool = True - ) -> list[str]: - """Get all published versions for a product using PathedStorageConnector.""" - product = key - publish_folder_key = f"{product}/publish" - if not self.storage.exists(publish_folder_key): - return [] - - # Get all version folders - versions = self.storage.get_subfolders(publish_folder_key) - - # Filter out 'latest' if requested - if exclude_latest: - versions = [v for v in versions if v != "latest"] - - return sorted(versions, reverse=True) - - def _pull( - self, - key: str, - version: str, - destination_path: Path, - *, - filepath: str = "", - dataset: str | None = None, - **kwargs, - ) -> dict: - pub_key = PublishKey(key, version) - - s3_path = dataset + "/" if dataset else "" - full_filepath = s3_path + filepath - - pulled_path = self._download_file( - pub_key, - full_filepath, - output_dir=destination_path, - ) - return {"path": pulled_path} - - def pull_versioned( - self, key: str, version: str, destination_path: Path, **kwargs - ) -> dict: - return self._pull(key, version, destination_path, **kwargs) - - def push_versioned(self, key: str, version: str, **kwargs) -> dict: - """Push data to published folder with version structure. - - Args: - key: Product name - version: Version string (e.g., '1.0', '1.0.1', 'latest') - **kwargs: Additional arguments including: - - source_path: Local path to push from - - acl: Access control level - - target_path: Optional specific target path within version folder - """ - product = key - - source_path = kwargs.get("source_path") - if not source_path: - raise ValueError("source_path is required for push_versioned") - - # Build the published path: {product}/publish/{version}/ - publish_folder_key = f"{product}/publish/{version}" - - # If target_path specified, use it; otherwise use the full folder - target_path = kwargs.get("target_path", "") - full_target_key = ( - f"{publish_folder_key}/{target_path}" if target_path else publish_folder_key - ) - - logger.info(f"Pushing to published: {source_path} -> {full_target_key}") - - # Use PathedStorageConnector's push method - result = self.storage.push( - key=full_target_key, - filepath=source_path, - **{ - k: v - for k, v in kwargs.items() - if k not in ["source_path", "target_path"] - }, - ) - - return result - - def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]: - versions = self._get_published_versions(key, **kwargs) - return sorted(versions, reverse=sort_desc) - - def get_latest_version(self, key: str, **kwargs) -> str: - return self.list_versions(key)[0] - - def version_exists(self, key: str, version: str, **kwargs) -> bool: - return version in self.list_versions(key) - - def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path: # type: ignore[override] - return Path("edm") / "publishing" / "datasets" / key / version diff --git a/dcpy/lifecycle/connector_registry.py b/dcpy/lifecycle/connector_registry.py index 2968978697..7c5a65e27d 100644 --- a/dcpy/lifecycle/connector_registry.py +++ b/dcpy/lifecycle/connector_registry.py @@ -6,8 +6,13 @@ SFTP_USER, ) from dcpy.connectors import filesystem, ingest_datastore, s3, sftp, web -from dcpy.connectors.edm import builds, drafts, gis, published +from dcpy.connectors.edm import gis from dcpy.connectors.edm.bytes import BytesConnector +from dcpy.connectors.edm.connectors import ( + BuildsConnector, + DraftsConnector, + PublishedConnector, +) from dcpy.connectors.edm.open_data_nyc import OpenDataConnector from dcpy.connectors.esri.arcgis_feature_service import ArcGISFeatureServiceConnector from dcpy.connectors.hybrid_pathed_storage import ( @@ -61,8 +66,8 @@ def _set_default_connectors(): conns = [ recipes_datasets, recipes_raw, - drafts.DraftsConnector.create(), - published.PublishedConnector.create(), + DraftsConnector.create(), + PublishedConnector.create(), BytesConnector(), gis.GisDatasetsConnector.create(), SocrataConnector(), @@ -76,7 +81,7 @@ def _set_default_connectors(): [web.WebConnector(), "api"], [filesystem.Connector(), "local_file"], [s3.S3Connector(), "s3"], - [builds.BuildsConnector.create(), "edm.publishing.builds"], + [BuildsConnector.create(), "edm.publishing.builds"], [ sftp.SFTPConnector( hostname=SFTP_HOST, diff --git a/dcpy/models/connectors/edm/publishing.py b/dcpy/models/connectors/edm/publishing.py index 3ffbfa58b6..a5a0a6a86a 100644 --- a/dcpy/models/connectors/edm/publishing.py +++ b/dcpy/models/connectors/edm/publishing.py @@ -49,3 +49,16 @@ def __str__(self): @property def path(self) -> str: return f"{self.product}/build/{self.build}" + + +@dataclass +class PlanKey(ProductKey): + product: str + version: str + + def __str__(self): + return f"Plan: {self.product} - {self.version}" + + @property + def path(self) -> str: + return f"{self.product}/plan/{self.version}" diff --git a/dcpy/test_integration/connectors/edm/test_builds_conn.py b/dcpy/test_integration/connectors/edm/test_builds_conn.py index 5e8919f835..7baf5314fa 100644 --- a/dcpy/test_integration/connectors/edm/test_builds_conn.py +++ b/dcpy/test_integration/connectors/edm/test_builds_conn.py @@ -4,7 +4,7 @@ import pytest -from dcpy.connectors.edm.builds import BuildsConnector +from dcpy.connectors.edm.connectors import BuildsConnector from dcpy.connectors.hybrid_pathed_storage import ( PathedStorageConnector, StorageType, diff --git a/dcpy/test_integration/connectors/edm/test_drafts.py b/dcpy/test_integration/connectors/edm/test_drafts.py index 2c8f320e0f..352094fd2f 100644 --- a/dcpy/test_integration/connectors/edm/test_drafts.py +++ b/dcpy/test_integration/connectors/edm/test_drafts.py @@ -4,7 +4,7 @@ import pytest -from dcpy.connectors.edm.drafts import DraftsConnector +from dcpy.connectors.edm.connectors import DraftsConnector from dcpy.connectors.hybrid_pathed_storage import ( PathedStorageConnector, StorageType, diff --git a/dcpy/test_integration/connectors/edm/test_published.py b/dcpy/test_integration/connectors/edm/test_published.py index c3767be33d..66348f0a95 100644 --- a/dcpy/test_integration/connectors/edm/test_published.py +++ b/dcpy/test_integration/connectors/edm/test_published.py @@ -4,7 +4,7 @@ import pytest -from dcpy.connectors.edm.published import PublishedConnector +from dcpy.connectors.edm.connectors import PublishedConnector from dcpy.connectors.hybrid_pathed_storage import ( PathedStorageConnector, StorageType, diff --git a/dcpy/test_integration/lifecycle/builds/conftest.py b/dcpy/test_integration/lifecycle/builds/conftest.py index 88e62b333f..5b91ab0c06 100644 --- a/dcpy/test_integration/lifecycle/builds/conftest.py +++ b/dcpy/test_integration/lifecycle/builds/conftest.py @@ -1,8 +1,10 @@ import pytest -from dcpy.connectors.edm.builds import BuildsConnector -from dcpy.connectors.edm.drafts import DraftsConnector -from dcpy.connectors.edm.published import PublishedConnector +from dcpy.connectors.edm.connectors import ( + BuildsConnector, + DraftsConnector, + PublishedConnector, +) from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType from dcpy.lifecycle import connector_registry from dcpy.lifecycle.connector_registry import connectors as lifecycle_connectors