Skip to content

Commit

Permalink
fix(iceberg): delete associated platform resources when deleting ware…
Browse files Browse the repository at this point in the history
…house (#12564)
  • Loading branch information
chakru-r authored Feb 10, 2025
1 parent 6a35cd6 commit 704526b
Showing 1 changed file with 30 additions and 6 deletions.
36 changes: 30 additions & 6 deletions metadata-ingestion/src/datahub/cli/iceberg_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datahub.configuration.common import GraphError
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.metadata.schema_classes import SystemMetadataClass
from datahub.telemetry import telemetry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,6 +162,7 @@ def validate_warehouse(data_root: str) -> None:
type=int,
help=f"Expiration duration for temporary credentials used for role. Defaults to {DEFAULT_CREDS_EXPIRY_DURATION_SECONDS} seconds if unspecified",
)
@telemetry.with_telemetry(capture_kwargs=["duration_seconds"])
def create(
warehouse: str,
description: Optional[str],
Expand Down Expand Up @@ -313,6 +315,7 @@ def create(
type=int,
help=f"Expiration duration for temporary credentials used for role. Defaults to {DEFAULT_CREDS_EXPIRY_DURATION_SECONDS} seconds if unspecified",
)
@telemetry.with_telemetry(capture_kwargs=["duration_seconds"])
def update(
warehouse: str,
data_root: str,
Expand Down Expand Up @@ -398,6 +401,7 @@ def update(


@iceberg.command()
@telemetry.with_telemetry()
def list() -> None:
"""
List iceberg warehouses
Expand All @@ -413,6 +417,7 @@ def list() -> None:
@click.option(
"-w", "--warehouse", required=True, type=str, help="The name of the warehouse"
)
@telemetry.with_telemetry()
def get(warehouse: str) -> None:
"""Fetches the details of the specified iceberg warehouse"""
client = get_default_graph()
Expand Down Expand Up @@ -442,6 +447,7 @@ def get(warehouse: str) -> None:
is_flag=True,
help="force the delete if set without confirmation",
)
@telemetry.with_telemetry(capture_kwargs=["dry_run", "force"])
def delete(warehouse: str, dry_run: bool, force: bool) -> None:
"""
Delete warehouse
Expand Down Expand Up @@ -470,11 +476,19 @@ def delete(warehouse: str, dry_run: bool, force: bool) -> None:
# Do we really need this double-check?
if "__typename" in entity and "urn" in entity:
if entity["__typename"] in ["Container", "Dataset"]:
# add the Platform Resource URN to also be deleted for each dataset.
# This is not user visible, so no need to show a name to the user and include it in the count. Each
# instance corresponds to a dataset whose name is shown.
if entity["__typename"] == "Dataset":
resource_urn = platform_resource_urn(
entity["properties"]["qualifiedName"]
)
urns_to_delete.append(resource_urn)

urns_to_delete.append(entity["urn"])
resource_names_to_be_deleted.append(
entity.get("name", entity.get("urn"))
)
# TODO: PlatformResource associated with datasets need to be deleted.

if dry_run:
click.echo(
Expand All @@ -485,25 +499,32 @@ def delete(warehouse: str, dry_run: bool, force: bool) -> None:
else:
if not force:
click.confirm(
f"This will delete {warehouse} warehouse, credentials, and {len(urns_to_delete)} datasets and namespaces from DataHub. Do you want to continue?",
f"This will delete {warehouse} warehouse, credentials, and {len(resource_names_to_be_deleted)} datasets and namespaces from DataHub. Do you want to continue?",
abort=True,
)
client.hard_delete_entity(urn)
client.hard_delete_entity(warehouse_aspect.clientId)
client.hard_delete_entity(warehouse_aspect.clientSecret)

# Delete the resources in the warehouse first, so that in case it is interrupted, the warehouse itself is
# still available to enumerate the resources in it that are not yet deleted.
for urn_to_delete in urns_to_delete:
client.hard_delete_entity(urn_to_delete)

client.hard_delete_entity(urn)
client.hard_delete_entity(warehouse_aspect.clientId)
client.hard_delete_entity(warehouse_aspect.clientSecret)

click.echo(
f"✅ Successfully deleted iceberg warehouse {warehouse} and associated credentials, {len(urns_to_delete)} datasets and namespaces"
f"✅ Successfully deleted iceberg warehouse {warehouse} and associated credentials, {len(resource_names_to_be_deleted)} datasets and namespaces"
)


def iceberg_data_platform_instance_urn(warehouse: str) -> str:
return f"urn:li:dataPlatformInstance:({iceberg_data_platform()},{warehouse})"


def platform_resource_urn(dataset_name: str) -> str:
return f"urn:li:platformResource:iceberg.{dataset_name}"


def iceberg_data_platform() -> str:
return "urn:li:dataPlatform:iceberg"

Expand Down Expand Up @@ -677,6 +698,9 @@ def get_related_entities_for_platform_instance(
... on Dataset {
urn
name
properties{
qualifiedName
}
}
}
}
Expand Down

0 comments on commit 704526b

Please sign in to comment.