diff --git a/metadata-ingestion/src/datahub/cli/iceberg_cli.py b/metadata-ingestion/src/datahub/cli/iceberg_cli.py index f3552f34f2b5bd..f26edf444b7810 100644 --- a/metadata-ingestion/src/datahub/cli/iceberg_cli.py +++ b/metadata-ingestion/src/datahub/cli/iceberg_cli.py @@ -14,6 +14,7 @@ from datahub.configuration.common import GraphError from datahub.ingestion.graph.client import DataHubGraph, get_default_graph from datahub.metadata.schema_classes import SystemMetadataClass +from datahub.telemetry import telemetry logger = logging.getLogger(__name__) @@ -161,6 +162,7 @@ def validate_warehouse(data_root: str) -> None: type=int, help=f"Expiration duration for temporary credentials used for role. Defaults to {DEFAULT_CREDS_EXPIRY_DURATION_SECONDS} seconds if unspecified", ) +@telemetry.with_telemetry(capture_kwargs=["duration_seconds"]) def create( warehouse: str, description: Optional[str], @@ -313,6 +315,7 @@ def create( type=int, help=f"Expiration duration for temporary credentials used for role. Defaults to {DEFAULT_CREDS_EXPIRY_DURATION_SECONDS} seconds if unspecified", ) +@telemetry.with_telemetry(capture_kwargs=["duration_seconds"]) def update( warehouse: str, data_root: str, @@ -398,6 +401,7 @@ def update( @iceberg.command() +@telemetry.with_telemetry() def list() -> None: """ List iceberg warehouses @@ -413,6 +417,7 @@ def list() -> None: @click.option( "-w", "--warehouse", required=True, type=str, help="The name of the warehouse" ) +@telemetry.with_telemetry() def get(warehouse: str) -> None: """Fetches the details of the specified iceberg warehouse""" client = get_default_graph() @@ -442,6 +447,7 @@ def get(warehouse: str) -> None: is_flag=True, help="force the delete if set without confirmation", ) +@telemetry.with_telemetry(capture_kwargs=["dry_run", "force"]) def delete(warehouse: str, dry_run: bool, force: bool) -> None: """ Delete warehouse @@ -470,11 +476,19 @@ def delete(warehouse: str, dry_run: bool, force: bool) -> None: # Do we really need this double-check? if "__typename" in entity and "urn" in entity: if entity["__typename"] in ["Container", "Dataset"]: + # add the Platform Resource URN to also be deleted for each dataset. + # This is not user visible, so no need to show a name to the user and include it in the count. Each + # instance corresponds to a dataset whose name is shown. + if entity["__typename"] == "Dataset": + resource_urn = platform_resource_urn( + entity["properties"]["qualifiedName"] + ) + urns_to_delete.append(resource_urn) + urns_to_delete.append(entity["urn"]) resource_names_to_be_deleted.append( entity.get("name", entity.get("urn")) ) - # TODO: PlatformResource associated with datasets need to be deleted. if dry_run: click.echo( @@ -485,18 +499,21 @@ def delete(warehouse: str, dry_run: bool, force: bool) -> None: else: if not force: click.confirm( - f"This will delete {warehouse} warehouse, credentials, and {len(urns_to_delete)} datasets and namespaces from DataHub. Do you want to continue?", + f"This will delete {warehouse} warehouse, credentials, and {len(resource_names_to_be_deleted)} datasets and namespaces from DataHub. Do you want to continue?", abort=True, ) - client.hard_delete_entity(urn) - client.hard_delete_entity(warehouse_aspect.clientId) - client.hard_delete_entity(warehouse_aspect.clientSecret) + # Delete the resources in the warehouse first, so that in case it is interrupted, the warehouse itself is + # still available to enumerate the resources in it that are not yet deleted. for urn_to_delete in urns_to_delete: client.hard_delete_entity(urn_to_delete) + client.hard_delete_entity(urn) + client.hard_delete_entity(warehouse_aspect.clientId) + client.hard_delete_entity(warehouse_aspect.clientSecret) + click.echo( - f"✅ Successfully deleted iceberg warehouse {warehouse} and associated credentials, {len(urns_to_delete)} datasets and namespaces" + f"✅ Successfully deleted iceberg warehouse {warehouse} and associated credentials, {len(resource_names_to_be_deleted)} datasets and namespaces" ) @@ -504,6 +521,10 @@ def iceberg_data_platform_instance_urn(warehouse: str) -> str: return f"urn:li:dataPlatformInstance:({iceberg_data_platform()},{warehouse})" +def platform_resource_urn(dataset_name: str) -> str: + return f"urn:li:platformResource:iceberg.{dataset_name}" + + def iceberg_data_platform() -> str: return "urn:li:dataPlatform:iceberg" @@ -677,6 +698,9 @@ def get_related_entities_for_platform_instance( ... on Dataset { urn name + properties{ + qualifiedName + } } } }