diff --git a/datahub-web-react/src/app/shared/hooks/useCreateFile.ts b/datahub-web-react/src/app/shared/hooks/useCreateFile.ts index 2c3971c0518ee5..188501bdad2f63 100644 --- a/datahub-web-react/src/app/shared/hooks/useCreateFile.ts +++ b/datahub-web-react/src/app/shared/hooks/useCreateFile.ts @@ -5,6 +5,8 @@ import { PRODUCT_ASSETS_FOLDER } from '@app/shared/constants'; import { useCreateDataHubFileMutation } from '@graphql/app.generated'; import { UploadDownloadScenario } from '@types'; +// keep this consistent with same const in li-utils/src/main/java/com/linkedin/metadata/Constants.java +export const S3_FILE_ID_NAME_SEPARATOR = '__'; const MAX_RETRIES = 3; const RETRY_DELAY_MS = 300; @@ -53,7 +55,7 @@ export default function useCreateFile({ scenario, assetUrn, schemaField }: Props schemaField, scenario, sizeInBytes: file.size, - storageKey: `${PRODUCT_ASSETS_FOLDER}/${fileId}`, + storageKey: `${PRODUCT_ASSETS_FOLDER}/${fileId}${S3_FILE_ID_NAME_SEPARATOR}${file.name}`, contentHash, }, }, diff --git a/datahub-web-react/src/app/shared/hooks/useFileUpload.ts b/datahub-web-react/src/app/shared/hooks/useFileUpload.ts index 3e5994a694162b..283c17745860d9 100644 --- a/datahub-web-react/src/app/shared/hooks/useFileUpload.ts +++ b/datahub-web-react/src/app/shared/hooks/useFileUpload.ts @@ -1,16 +1,13 @@ import { useApolloClient } from '@apollo/client'; import { PRODUCT_ASSETS_FOLDER } from '@app/shared/constants'; -import useCreateFile from '@app/shared/hooks/useCreateFile'; +import useCreateFile, { S3_FILE_ID_NAME_SEPARATOR } from '@app/shared/hooks/useCreateFile'; import { useAppConfig } from '@src/app/useAppConfig'; import { resolveRuntimePath } from '@utils/runtimeBasePath'; import { GetPresignedUploadUrlDocument } from '@graphql/app.generated'; import { UploadDownloadScenario } from '@types'; -// keep this consistent with same const in li-utils/src/main/java/com/linkedin/metadata/Constants.java -const S3_FILE_ID_NAME_SEPARATOR = '__'; - interface Props { scenario: UploadDownloadScenario; assetUrn?: string; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java index 46e4941236b0e2..a237a58c03a2c1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java @@ -19,6 +19,8 @@ import com.linkedin.container.Container; import com.linkedin.entity.EntityResponse; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.file.BucketStorageLocation; +import com.linkedin.file.DataHubFileInfo; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.EntityAspect; import com.linkedin.metadata.aspect.models.graph.RelatedEntity; @@ -37,6 +39,7 @@ import com.linkedin.metadata.service.UpdateIndicesService; import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.SystemMetadataUtils; +import com.linkedin.metadata.utils.aws.S3Util; import com.linkedin.mxe.MetadataChangeProposal; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; @@ -68,7 +71,35 @@ public DeleteEntityServiceTest() { new EntityServiceImpl(_aspectDao, mock(EventProducer.class), true, preProcessHooks, true); _entityServiceImpl.setUpdateIndicesService(_mockUpdateIndicesService); _deleteEntityService = - new DeleteEntityService(_entityServiceImpl, _graphService, _mockSearchService); + new DeleteEntityService(_entityServiceImpl, _graphService, _mockSearchService, null); + + setupDefaultFileScrollMock(_mockSearchService); + } + + /** + * Setup default mock for file searches to return empty results. This is needed because + * deleteFileReferences is called for every entity deletion. + */ + private void setupDefaultFileScrollMock(EntitySearchService searchService) { + ScrollResult emptyFileScrollResult = new ScrollResult(); + emptyFileScrollResult.setEntities(new SearchEntityArray()); + emptyFileScrollResult.setNumEntities(0); + + // Mock file entity searches to return empty results + // Use lenient() so other, more specific mocks can override this + Mockito.lenient() + .when( + searchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> set != null && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.isNull(), + Mockito.any(), + Mockito.eq("5m"), + Mockito.anyInt())) + .thenReturn(emptyFileScrollResult); } /** @@ -76,6 +107,23 @@ public DeleteEntityServiceTest() { */ @Test public void testDeleteUniqueRefGeneratesValidMCP() { + // Explicitly set up file mock to return empty results (no files to delete) + ScrollResult emptyFileScrollResult = new ScrollResult(); + emptyFileScrollResult.setEntities(new SearchEntityArray()); + emptyFileScrollResult.setNumEntities(0); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> set != null && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.any(), + Mockito.any(), + Mockito.eq("5m"), + Mockito.anyInt())) + .thenReturn(emptyFileScrollResult); + final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); final Urn container = UrnUtils.getUrn("urn:li:container:d1006cf3-3ff9-48e3-85cd-26eb23775ab2"); @@ -144,12 +192,30 @@ public void testDeleteUniqueRefGeneratesValidMCP() { @Test public void testDeleteSearchReferences() { EntityService mockEntityService = Mockito.mock(EntityService.class); + EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); DeleteEntityService deleteEntityService = - new DeleteEntityService(mockEntityService, _graphService, _mockSearchService); + new DeleteEntityService(mockEntityService, _graphService, mockSearchService, null); final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); final Urn form = UrnUtils.getUrn("urn:li:form:12345"); + // Mock file entity searches to return empty results (no files to delete) + ScrollResult emptyFileScrollResult = new ScrollResult(); + emptyFileScrollResult.setEntities(new SearchEntityArray()); + emptyFileScrollResult.setNumEntities(0); + Mockito.when( + mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> set != null && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.any(), + Mockito.any(), + Mockito.eq("5m"), + Mockito.anyInt())) + .thenReturn(emptyFileScrollResult); + ScrollResult scrollResult = new ScrollResult(); SearchEntityArray entities = new SearchEntityArray(); SearchEntity searchEntity = new SearchEntity(); @@ -159,9 +225,10 @@ public void testDeleteSearchReferences() { scrollResult.setNumEntities(1); scrollResult.setScrollId("1"); Mockito.when( - _mockSearchService.structuredScroll( + mockSearchService.structuredScroll( Mockito.any(OperationContext.class), - Mockito.any(), + Mockito.argThat( + set -> set == null || !set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), Mockito.eq("*"), Mockito.any(Filter.class), Mockito.eq(null), @@ -173,9 +240,10 @@ public void testDeleteSearchReferences() { ScrollResult scrollResult2 = new ScrollResult(); scrollResult2.setNumEntities(0); Mockito.when( - _mockSearchService.structuredScroll( + mockSearchService.structuredScroll( Mockito.any(OperationContext.class), - Mockito.any(), + Mockito.argThat( + set -> set == null || !set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), Mockito.eq("*"), Mockito.any(Filter.class), Mockito.eq(null), @@ -231,19 +299,38 @@ public void testDeleteSearchReferences() { @Test public void testDeleteNoSearchReferences() { EntityService mockEntityService = Mockito.mock(EntityService.class); + EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); DeleteEntityService deleteEntityService = - new DeleteEntityService(mockEntityService, _graphService, _mockSearchService); + new DeleteEntityService(mockEntityService, _graphService, mockSearchService, null); final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); final Urn form = UrnUtils.getUrn("urn:li:form:12345"); + // Mock file entity searches to return empty results (no files to delete) + ScrollResult emptyFileScrollResult = new ScrollResult(); + emptyFileScrollResult.setEntities(new SearchEntityArray()); + emptyFileScrollResult.setNumEntities(0); + Mockito.when( + mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> set != null && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.any(), + Mockito.any(), + Mockito.eq("5m"), + Mockito.anyInt())) + .thenReturn(emptyFileScrollResult); + ScrollResult scrollResult = new ScrollResult(); scrollResult.setEntities(new SearchEntityArray()); scrollResult.setNumEntities(0); Mockito.when( - _mockSearchService.structuredScroll( + mockSearchService.structuredScroll( Mockito.any(OperationContext.class), - Mockito.any(), + Mockito.argThat( + set -> set == null || !set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), Mockito.eq("*"), Mockito.any(Filter.class), Mockito.eq(null), @@ -286,12 +373,30 @@ public void testDeleteNoSearchReferences() { @Test public void testDeleteSearchReferencesDryRun() { EntityService mockEntityService = Mockito.mock(EntityService.class); + EntitySearchService mockSearchService = Mockito.mock(EntitySearchService.class); DeleteEntityService deleteEntityService = - new DeleteEntityService(mockEntityService, _graphService, _mockSearchService); + new DeleteEntityService(mockEntityService, _graphService, mockSearchService, null); final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); final Urn form = UrnUtils.getUrn("urn:li:form:12345"); + // Mock file entity searches to return empty results (no files to delete) + ScrollResult emptyFileScrollResult = new ScrollResult(); + emptyFileScrollResult.setEntities(new SearchEntityArray()); + emptyFileScrollResult.setNumEntities(0); + Mockito.when( + mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> set != null && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.any(), + Mockito.any(), + Mockito.eq("5m"), + Mockito.anyInt())) + .thenReturn(emptyFileScrollResult); + ScrollResult scrollResult = new ScrollResult(); SearchEntityArray entities = new SearchEntityArray(); SearchEntity searchEntity = new SearchEntity(); @@ -301,9 +406,10 @@ public void testDeleteSearchReferencesDryRun() { scrollResult.setNumEntities(1); scrollResult.setScrollId("1"); Mockito.when( - _mockSearchService.structuredScroll( + mockSearchService.structuredScroll( Mockito.any(OperationContext.class), - Mockito.any(), + Mockito.argThat( + set -> set == null || !set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), Mockito.eq("*"), Mockito.any(Filter.class), Mockito.eq(null), @@ -312,6 +418,21 @@ public void testDeleteSearchReferencesDryRun() { Mockito.eq(1000))) .thenReturn(scrollResult); + ScrollResult scrollResult2 = new ScrollResult(); + scrollResult2.setNumEntities(0); + Mockito.when( + mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> set == null || !set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq("1"), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(scrollResult2); + // no entities with relationships on forms final RelatedEntitiesResult mockRelatedEntities = new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); @@ -341,4 +462,315 @@ public void testDeleteSearchReferencesDryRun() { assertEquals(1, (int) response.getTotal()); assertTrue(response.getRelatedAspects().isEmpty()); } + + /** Test that file cleanup is triggered when deleting an entity with files */ + @Test + public void testDeleteFileReferences() { + EntityService mockEntityService = Mockito.mock(EntityService.class); + S3Util mockS3Util = Mockito.mock(S3Util.class); + DeleteEntityService deleteEntityService = + new DeleteEntityService(mockEntityService, _graphService, _mockSearchService, mockS3Util); + + final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); + final Urn fileUrn = UrnUtils.getUrn("urn:li:dataHubFile:test-file-id"); + + // Mock file search result + ScrollResult fileScrollResult = new ScrollResult(); + SearchEntityArray fileEntities = new SearchEntityArray(); + SearchEntity fileEntity = new SearchEntity(); + fileEntity.setEntity(fileUrn); + fileEntities.add(fileEntity); + fileScrollResult.setEntities(fileEntities); + fileScrollResult.setNumEntities(1); + fileScrollResult.setScrollId("1"); + + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> + set != null + && set.size() == 1 + && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq(null), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(fileScrollResult); + + ScrollResult emptyScrollResult = new ScrollResult(); + emptyScrollResult.setNumEntities(0); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> + set != null + && set.size() == 1 + && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq("1"), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(emptyScrollResult); + + // Mock file info aspect + DataHubFileInfo fileInfo = new DataHubFileInfo(); + fileInfo.setReferencedByAsset(dataset); + BucketStorageLocation location = new BucketStorageLocation(); + location.setStorageBucket("test-bucket"); + location.setStorageKey("test-key"); + fileInfo.setBucketStorageLocation(location); + + Mockito.when( + mockEntityService.getLatestAspect( + Mockito.any(OperationContext.class), + eq(fileUrn), + eq(Constants.DATAHUB_FILE_INFO_ASPECT_NAME))) + .thenReturn(fileInfo); + + // No other relationships + final RelatedEntitiesResult mockRelatedEntities = + new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); + Mockito.when( + _graphService.findRelatedEntities( + any(OperationContext.class), + nullable(Set.class), + eq(newFilter("urn", dataset.toString())), + nullable(Set.class), + eq(EMPTY_FILTER), + eq(ImmutableSet.of()), + eq(newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(0), + nullable(Integer.class))) + .thenReturn(mockRelatedEntities); + + final DeleteReferencesResponse response = + deleteEntityService.deleteReferencesTo(opContext, dataset, false); + + // Verify S3 delete was called + Mockito.verify(mockS3Util, Mockito.times(1)).deleteObject("test-bucket", "test-key"); + + // Verify file entity was soft-deleted + Mockito.verify(mockEntityService, Mockito.times(1)) + .ingestProposal( + any(OperationContext.class), + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + eq(true)); + } + + /** Test that file cleanup is skipped when S3Util is not configured */ + @Test + public void testDeleteFileReferencesWithoutS3Util() { + EntityService mockEntityService = Mockito.mock(EntityService.class); + DeleteEntityService deleteEntityService = + new DeleteEntityService(mockEntityService, _graphService, _mockSearchService, null); + + final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); + final Urn fileUrn = UrnUtils.getUrn("urn:li:dataHubFile:test-file-id"); + + // Mock file search result + ScrollResult fileScrollResult = new ScrollResult(); + SearchEntityArray fileEntities = new SearchEntityArray(); + SearchEntity fileEntity = new SearchEntity(); + fileEntity.setEntity(fileUrn); + fileEntities.add(fileEntity); + fileScrollResult.setEntities(fileEntities); + fileScrollResult.setNumEntities(1); + fileScrollResult.setScrollId("1"); + + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> + set != null + && set.size() == 1 + && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq(null), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(fileScrollResult); + + ScrollResult emptyScrollResult = new ScrollResult(); + emptyScrollResult.setNumEntities(0); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> + set != null + && set.size() == 1 + && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq("1"), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(emptyScrollResult); + + // Mock file info aspect + DataHubFileInfo fileInfo = new DataHubFileInfo(); + fileInfo.setReferencedByAsset(dataset); + BucketStorageLocation location = new BucketStorageLocation(); + location.setStorageBucket("test-bucket"); + location.setStorageKey("test-key"); + fileInfo.setBucketStorageLocation(location); + + Mockito.when( + mockEntityService.getLatestAspect( + Mockito.any(OperationContext.class), + eq(fileUrn), + eq(Constants.DATAHUB_FILE_INFO_ASPECT_NAME))) + .thenReturn(fileInfo); + + // No other relationships + final RelatedEntitiesResult mockRelatedEntities = + new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); + Mockito.when( + _graphService.findRelatedEntities( + any(OperationContext.class), + nullable(Set.class), + eq(newFilter("urn", dataset.toString())), + nullable(Set.class), + eq(EMPTY_FILTER), + eq(ImmutableSet.of()), + eq(newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(0), + nullable(Integer.class))) + .thenReturn(mockRelatedEntities); + + final DeleteReferencesResponse response = + deleteEntityService.deleteReferencesTo(opContext, dataset, false); + + // Verify file entity was still soft-deleted even without S3Util + Mockito.verify(mockEntityService, Mockito.times(1)) + .ingestProposal( + any(OperationContext.class), + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + eq(true)); + } + + /** + * Test that file cleanup continues even if S3 deletion fails. We soft-delete the entity to avoid + * leaving the parent entity in limbo. The tradeoff is accepting a potential orphaned S3 object. + */ + @Test + public void testDeleteFileReferencesWithS3Failure() { + EntityService mockEntityService = Mockito.mock(EntityService.class); + S3Util mockS3Util = Mockito.mock(S3Util.class); + DeleteEntityService deleteEntityService = + new DeleteEntityService(mockEntityService, _graphService, _mockSearchService, mockS3Util); + + final Urn dataset = UrnUtils.toDatasetUrn("snowflake", "test", "DEV"); + final Urn fileUrn = UrnUtils.getUrn("urn:li:dataHubFile:test-file-id"); + + // Mock file search result + ScrollResult fileScrollResult = new ScrollResult(); + SearchEntityArray fileEntities = new SearchEntityArray(); + SearchEntity fileEntity = new SearchEntity(); + fileEntity.setEntity(fileUrn); + fileEntities.add(fileEntity); + fileScrollResult.setEntities(fileEntities); + fileScrollResult.setNumEntities(1); + fileScrollResult.setScrollId("1"); + + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> + set != null + && set.size() == 1 + && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq(null), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(fileScrollResult); + + ScrollResult emptyScrollResult = new ScrollResult(); + emptyScrollResult.setNumEntities(0); + Mockito.when( + _mockSearchService.structuredScroll( + Mockito.any(OperationContext.class), + Mockito.argThat( + set -> + set != null + && set.size() == 1 + && set.contains(Constants.DATAHUB_FILE_ENTITY_NAME)), + Mockito.eq("*"), + Mockito.any(Filter.class), + Mockito.eq(null), + Mockito.eq("1"), + Mockito.eq("5m"), + Mockito.eq(1000))) + .thenReturn(emptyScrollResult); + + // Mock file info aspect + DataHubFileInfo fileInfo = new DataHubFileInfo(); + fileInfo.setReferencedByAsset(dataset); + BucketStorageLocation location = new BucketStorageLocation(); + location.setStorageBucket("test-bucket"); + location.setStorageKey("test-key"); + fileInfo.setBucketStorageLocation(location); + + Mockito.when( + mockEntityService.getLatestAspect( + Mockito.any(OperationContext.class), + eq(fileUrn), + eq(Constants.DATAHUB_FILE_INFO_ASPECT_NAME))) + .thenReturn(fileInfo); + + // Make S3 deletion throw an exception + Mockito.doThrow(new RuntimeException("S3 error")) + .when(mockS3Util) + .deleteObject("test-bucket", "test-key"); + + // No other relationships + final RelatedEntitiesResult mockRelatedEntities = + new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); + Mockito.when( + _graphService.findRelatedEntities( + any(OperationContext.class), + nullable(Set.class), + eq(newFilter("urn", dataset.toString())), + nullable(Set.class), + eq(EMPTY_FILTER), + eq(ImmutableSet.of()), + eq(newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(0), + nullable(Integer.class))) + .thenReturn(mockRelatedEntities); + + // Operation should succeed despite S3 failure + final DeleteReferencesResponse response = + deleteEntityService.deleteReferencesTo(opContext, dataset, false); + + // Verify S3 delete was attempted + Mockito.verify(mockS3Util, Mockito.times(1)).deleteObject("test-bucket", "test-key"); + + // Verify file entity was still soft-deleted despite S3 failure to avoid leaving entity in limbo + Mockito.verify(mockEntityService, Mockito.times(1)) + .ingestProposal( + any(OperationContext.class), + Mockito.any(MetadataChangeProposal.class), + Mockito.any(AuditStamp.class), + eq(true)); + + // Verify the response includes the file count + assertEquals(1, (int) response.getTotal()); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityUtilsTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityUtilsTest.java index f210ae94d0d70e..dd304cb048d251 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityUtilsTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityUtilsTest.java @@ -22,6 +22,7 @@ import com.linkedin.metadata.query.filter.Criterion; import com.linkedin.metadata.query.filter.CriterionArray; import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.utils.CriterionUtils; import com.linkedin.schema.SchemaMetadata; import java.util.ArrayList; import java.util.List; @@ -432,4 +433,34 @@ public void testEntityNamesForStructuredPropDeletion() { assertEquals( DeleteEntityUtils.getEntityNamesForStructuredPropertyDeletion(), ImmutableList.of("form")); } + + @Test + public void testGetFilterForFileDeletion() { + Urn deletedEntityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,Test,PROD)"); + + final CriterionArray criterionArray = new CriterionArray(); + criterionArray.add( + CriterionUtils.buildCriterion( + "referencedByAsset", Condition.EQUAL, deletedEntityUrn.toString())); + + Filter expectedFilter = + new Filter() + .setOr( + new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(criterionArray))); + + assertEquals(DeleteEntityUtils.getFilterForFileDeletion(deletedEntityUrn), expectedFilter); + } + + @Test + public void testBuildSoftDeleteProposal() { + Urn fileUrn = UrnUtils.getUrn("urn:li:dataHubFile:test-file-id"); + + var proposal = DeleteEntityUtils.buildSoftDeleteProposal(fileUrn); + + assertEquals(proposal.getEntityUrn(), fileUrn); + assertEquals(proposal.getEntityType(), "dataHubFile"); + assertEquals(proposal.getAspectName(), "status"); + assertEquals(proposal.getChangeType(), com.linkedin.events.metadata.ChangeType.UPSERT); + assertNotNull(proposal.getAspect()); + } } diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index 9a4ec476b46c0b..fc38e52e976aab 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -732,6 +732,7 @@ entities: keyAspect: dataHubFileKey aspects: - dataHubFileInfo + - status - name: dataHubConnection category: internal keyAspect: dataHubConnectionKey diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java index 11eb75009dc090..9546e7bd8e13cd 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/DeleteEntityServiceFactory.java @@ -4,7 +4,9 @@ import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.utils.aws.S3Util; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; @@ -26,9 +28,14 @@ public class DeleteEntityServiceFactory { @Qualifier("entitySearchService") private EntitySearchService _entitySearchService; + @Autowired(required = false) + @Qualifier("s3Util") + @Nullable + private S3Util _s3Util; + @Bean(name = "deleteEntityService") @Nonnull protected DeleteEntityService createDeleteEntityService() { - return new DeleteEntityService(_entityService, _graphService, _entitySearchService); + return new DeleteEntityService(_entityService, _graphService, _entitySearchService, _s3Util); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/s3/S3UtilFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/s3/S3UtilFactory.java index 33b851d03aae3d..f0c9677b6ac6db 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/s3/S3UtilFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/s3/S3UtilFactory.java @@ -34,7 +34,17 @@ protected S3Util getInstance() { return new S3Util(stsClient, roleArn); } else { log.info("Using default S3Util with default credentials"); - S3Client s3Client = S3Client.create(); + var clientBuilder = S3Client.builder(); + + // Configure endpoint URL if provided (for LocalStack or custom S3 endpoints) + String endpointUrl = System.getenv("AWS_ENDPOINT_URL"); + if (endpointUrl != null && !endpointUrl.isEmpty()) { + log.info("Configuring S3Client with custom endpoint: {}", endpointUrl); + clientBuilder.endpointOverride(java.net.URI.create(endpointUrl)); + clientBuilder.forcePathStyle(true); + } + + S3Client s3Client = clientBuilder.build(); return new S3Util(s3Client); } } catch (Exception e) { diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java index 72e63d35ed3883..e7aa6daeec9f8a 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java @@ -16,6 +16,8 @@ import com.linkedin.entity.EntityResponse; import com.linkedin.entity.EnvelopedAspect; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.file.BucketStorageLocation; +import com.linkedin.file.DataHubFileInfo; import com.linkedin.form.FormInfo; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.models.graph.RelatedEntity; @@ -34,6 +36,7 @@ import com.linkedin.metadata.search.ScrollResult; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.aws.S3Util; import com.linkedin.mxe.MetadataChangeProposal; import io.datahubproject.metadata.context.OperationContext; import java.net.URISyntaxException; @@ -61,6 +64,7 @@ public class DeleteEntityService { private final EntityService _entityService; private final GraphService _graphService; private final EntitySearchService _searchService; + private final S3Util _s3Util; private static final Integer ELASTIC_BATCH_DELETE_SLEEP_SEC = 5; private static final Integer BATCH_SIZE = 1000; @@ -81,6 +85,9 @@ public DeleteReferencesResponse deleteReferencesTo( // in CLI final DeleteReferencesResponse result = new DeleteReferencesResponse(); + // Delete file references first (before other references are cleaned up) + int totalFileCount = deleteFileReferences(opContext, urn, dryRun); + // Delete references for entities referencing the deleted urn with searchables. // Only works for Form deletion for now int totalSearchAssetCount = deleteSearchReferences(opContext, urn, dryRun); @@ -110,7 +117,7 @@ public DeleteReferencesResponse deleteReferencesTo( .collect(Collectors.toList()); result.setRelatedAspects(new RelatedAspectArray(relatedAspects)); - result.setTotal(relatedEntities.getTotal() + totalSearchAssetCount); + result.setTotal(relatedEntities.getTotal() + totalSearchAssetCount + totalFileCount); if (dryRun) { return result; @@ -591,6 +598,11 @@ private AssetScrollResult scrollForAssets( "5m", dryRun ? 1 : BATCH_SIZE); // need to pass in 1 for count otherwise get index error if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) { + // Initialize assets to empty list and count to 0 if no results + if (result.assets == null) { + result.assets = new ArrayList<>(); + } + result.totalAssetCount = 0; return result; } result.scrollId = scrollResult.getScrollId(); @@ -710,6 +722,121 @@ private AuditStamp createAuditStamp() { .setTime(System.currentTimeMillis()); } + /** + * Find and delete files from S3 when their referenced entity is being deleted. This method finds + * all DataHub file entities that reference the deleted entity via the referencedByAsset field, + * deletes them from S3, and soft-deletes the file entities. + * + * @param opContext the operation context + * @param deletedUrn the URN of the entity being deleted + * @param dryRun if true, only count files without deleting them + * @return the number of files processed + */ + private int deleteFileReferences( + @Nonnull OperationContext opContext, @Nonnull final Urn deletedUrn, final boolean dryRun) { + + Filter filter = DeleteEntityUtils.getFilterForFileDeletion(deletedUrn); + List entityNames = ImmutableList.of(Constants.DATAHUB_FILE_ENTITY_NAME); + + int totalFileCount = 0; + String scrollId = null; + + do { + AssetScrollResult result = + scrollForAssets( + opContext, new AssetScrollResult(), filter, entityNames, scrollId, dryRun); + + totalFileCount += result.totalAssetCount; + scrollId = dryRun ? null : result.scrollId; + + if (!dryRun) { + result.assets.forEach( + fileUrn -> { + try { + deleteFileAndS3Object(opContext, fileUrn, deletedUrn); + } catch (Exception e) { + log.error( + "Failed to process file deletion for urn: {} referenced by deleted entity: {}", + fileUrn, + deletedUrn, + e); + } + }); + } + } while (scrollId != null); + + if (totalFileCount > 0) { + log.info("Processed {} file(s) referencing deleted entity: {}", totalFileCount, deletedUrn); + } + + return totalFileCount; + } + + /** + * Delete a file from S3 and soft-delete the DataHub file entity. This ensures the file is removed + * from storage and marked as deleted in DataHub for audit purposes. + * + * @param opContext the operation context + * @param fileUrn the URN of the file to delete + * @param deletedEntityUrn the URN of the entity being deleted (for logging) + */ + private void deleteFileAndS3Object( + @Nonnull OperationContext opContext, + @Nonnull final Urn fileUrn, + @Nonnull final Urn deletedEntityUrn) { + + log.info( + "Processing file cleanup for file: {} (referenced by deleted entity: {})", + fileUrn, + deletedEntityUrn); + + try { + // Get file info to retrieve S3 location + RecordTemplate record = + _entityService.getLatestAspect( + opContext, fileUrn, Constants.DATAHUB_FILE_INFO_ASPECT_NAME); + + if (record == null) { + log.warn("Could not retrieve file info for urn: {}, skipping cleanup", fileUrn); + return; + } + + DataHubFileInfo fileInfo = new DataHubFileInfo(record.data()); + + // Delete from S3 if S3Util is available and file has storage location + if (_s3Util != null && fileInfo.hasBucketStorageLocation()) { + BucketStorageLocation location = fileInfo.getBucketStorageLocation(); + String bucket = location.getStorageBucket(); + String key = location.getStorageKey(); + + try { + _s3Util.deleteObject(bucket, key); + log.info( + "Successfully deleted file from S3: bucket={}, key={}, urn={}", bucket, key, fileUrn); + } catch (Exception e) { + log.error( + "Failed to delete file from S3 for urn: {}. Will continue with soft-delete to avoid " + + "leaving entity in inconsistent state. Manual S3 cleanup may be required.", + fileUrn, + e); + } + } else { + log.warn( + "S3Util not configured or file has no storage location, skipping S3 deletion for file: {}", + fileUrn); + } + + // Soft delete the file entity + MetadataChangeProposal softDeleteMcp = DeleteEntityUtils.buildSoftDeleteProposal(fileUrn); + _entityService.ingestProposal(opContext, softDeleteMcp, createAuditStamp(), true); + log.info("Soft-deleted DataHub file entity: {}", fileUrn); + + } catch (Exception e) { + log.error("Failed to process file deletion for urn: {}", fileUrn, e); + throw new RuntimeException("Failed to delete file: " + fileUrn, e); + } + } + @AllArgsConstructor @Data private static class DeleteEntityServiceError { diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityUtils.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityUtils.java index 20dc104e1b436e..4549a5304f915a 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityUtils.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityUtils.java @@ -8,6 +8,7 @@ import com.linkedin.common.FormVerificationAssociation; import com.linkedin.common.FormVerificationAssociationArray; import com.linkedin.common.Forms; +import com.linkedin.common.Status; import com.linkedin.common.urn.Urn; import com.linkedin.data.DataComplex; import com.linkedin.data.DataList; @@ -18,15 +19,18 @@ import com.linkedin.data.schema.RecordDataSchema; import com.linkedin.data.template.RecordTemplate; import com.linkedin.entity.Aspect; +import com.linkedin.events.metadata.ChangeType; import com.linkedin.form.FormInfo; import com.linkedin.form.FormPrompt; import com.linkedin.form.FormPromptArray; +import com.linkedin.metadata.Constants; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterion; import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; import com.linkedin.metadata.query.filter.CriterionArray; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.utils.CriterionUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; import java.util.List; import java.util.ListIterator; @@ -373,4 +377,41 @@ public static FormInfo removePromptsFromFormInfoAspect( return updatedAspect.get(); } + + /** + * Get filter to find all DataHub file entities that reference a deleted entity via the + * referencedByAsset field. + * + * @param deletedUrn the URN of the entity being deleted + * @return Filter to use in search queries + */ + public static Filter getFilterForFileDeletion(@Nonnull Urn deletedUrn) { + return new Filter() + .setOr( + new ConjunctiveCriterionArray( + new ConjunctiveCriterion() + .setAnd( + new CriterionArray( + buildCriterion( + "referencedByAsset", Condition.EQUAL, deletedUrn.toString()))))); + } + + /** + * Build a MetadataChangeProposal to soft-delete an entity by setting its status to removed. + * + * @param urn the URN of the entity to soft-delete + * @return MetadataChangeProposal for soft deletion + */ + public static MetadataChangeProposal buildSoftDeleteProposal(@Nonnull Urn urn) { + Status status = new Status().setRemoved(true); + + MetadataChangeProposal mcp = new MetadataChangeProposal(); + mcp.setEntityUrn(urn); + mcp.setEntityType(urn.getEntityType()); + mcp.setAspectName(Constants.STATUS_ASPECT_NAME); + mcp.setChangeType(ChangeType.UPSERT); + mcp.setAspect(GenericRecordUtils.serializeAspect(status)); + + return mcp; + } } diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/aws/S3Util.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/aws/S3Util.java index 75e9cd69f8cbda..b5b148c9082948 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/aws/S3Util.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/aws/S3Util.java @@ -6,7 +6,10 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; -import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.presigner.S3Presigner; import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest; import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest; @@ -180,4 +183,35 @@ public String generatePresignedUploadUrl( throw new RuntimeException("Failed to generate presigned upload URL: " + e.getMessage(), e); } } + + /** + * Delete an object from S3 + * + * @param bucket The S3 bucket name + * @param key The S3 object key to delete + */ + public void deleteObject(@Nonnull String bucket, @Nonnull String key) { + try { + DeleteObjectRequest deleteObjectRequest = + DeleteObjectRequest.builder().bucket(bucket).key(key).build(); + + DeleteObjectResponse response = s3Client.deleteObject(deleteObjectRequest); + + if (!response.sdkHttpResponse().isSuccessful()) { + log.error( + "Failed to delete object from S3. Bucket: {}, Key: {}, Response: {}", + bucket, + key, + response); + throw new RuntimeException( + "Failed to delete object from S3. Response: " + + response.sdkHttpResponse().statusCode()); + } + + log.info("Successfully deleted object from S3. Bucket: {}, Key: {}", bucket, key); + } catch (Exception e) { + log.error("Failed to delete object from S3. Bucket: {}, Key: {}", bucket, key, e); + throw new RuntimeException("Failed to delete object from S3: " + e.getMessage(), e); + } + } } diff --git a/metadata-utils/src/test/java/com/linkedin/metadata/utils/aws/S3UtilTest.java b/metadata-utils/src/test/java/com/linkedin/metadata/utils/aws/S3UtilTest.java index fa1b5b03c5ae8e..a6d50d35bbd156 100644 --- a/metadata-utils/src/test/java/com/linkedin/metadata/utils/aws/S3UtilTest.java +++ b/metadata-utils/src/test/java/com/linkedin/metadata/utils/aws/S3UtilTest.java @@ -375,4 +375,76 @@ public void testPresignerCreationWithS3ClientConfiguration() { // This will fail during construction when createPresigner() is called assertThrows(RuntimeException.class, () -> new S3Util(mockS3Client)); } + + @Test + public void testDeleteObjectSuccess() { + S3Util s3Util = new S3Util(mockS3Client, mockS3Presigner); + + String bucket = "test-bucket"; + String key = "test-key"; + + software.amazon.awssdk.services.s3.model.DeleteObjectResponse mockResponse = + mock(software.amazon.awssdk.services.s3.model.DeleteObjectResponse.class); + software.amazon.awssdk.http.SdkHttpResponse mockHttpResponse = + mock(software.amazon.awssdk.http.SdkHttpResponse.class); + + when(mockResponse.sdkHttpResponse()).thenReturn(mockHttpResponse); + when(mockHttpResponse.isSuccessful()).thenReturn(true); + when(mockS3Client.deleteObject( + any(software.amazon.awssdk.services.s3.model.DeleteObjectRequest.class))) + .thenReturn(mockResponse); + + // Should not throw any exception + s3Util.deleteObject(bucket, key); + + verify(mockS3Client) + .deleteObject(any(software.amazon.awssdk.services.s3.model.DeleteObjectRequest.class)); + } + + @Test + public void testDeleteObjectFailure() { + S3Util s3Util = new S3Util(mockS3Client, mockS3Presigner); + + String bucket = "test-bucket"; + String key = "test-key"; + + software.amazon.awssdk.services.s3.model.DeleteObjectResponse mockResponse = + mock(software.amazon.awssdk.services.s3.model.DeleteObjectResponse.class); + software.amazon.awssdk.http.SdkHttpResponse mockHttpResponse = + mock(software.amazon.awssdk.http.SdkHttpResponse.class); + + when(mockResponse.sdkHttpResponse()).thenReturn(mockHttpResponse); + when(mockHttpResponse.isSuccessful()).thenReturn(false); + when(mockHttpResponse.statusCode()).thenReturn(403); + when(mockS3Client.deleteObject( + any(software.amazon.awssdk.services.s3.model.DeleteObjectRequest.class))) + .thenReturn(mockResponse); + + assertThrows(RuntimeException.class, () -> s3Util.deleteObject(bucket, key)); + } + + @Test + public void testDeleteObjectException() { + S3Util s3Util = new S3Util(mockS3Client, mockS3Presigner); + + String bucket = "test-bucket"; + String key = "test-key"; + + when(mockS3Client.deleteObject( + any(software.amazon.awssdk.services.s3.model.DeleteObjectRequest.class))) + .thenThrow(new RuntimeException("S3 service error")); + + assertThrows(RuntimeException.class, () -> s3Util.deleteObject(bucket, key)); + } + + @Test + public void testDeleteObjectWithNullParameters() { + S3Util s3Util = new S3Util(mockS3Client, mockS3Presigner); + + // Test with null bucket + assertThrows(Exception.class, () -> s3Util.deleteObject(null, "key")); + + // Test with null key + assertThrows(Exception.class, () -> s3Util.deleteObject("bucket", null)); + } }