Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datahub-web-react/src/app/shared/hooks/useCreateFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { PRODUCT_ASSETS_FOLDER } from '@app/shared/constants';
import { useCreateDataHubFileMutation } from '@graphql/app.generated';
import { UploadDownloadScenario } from '@types';

// keep this consistent with same const in li-utils/src/main/java/com/linkedin/metadata/Constants.java
export const S3_FILE_ID_NAME_SEPARATOR = '__';
const MAX_RETRIES = 3;
const RETRY_DELAY_MS = 300;

Expand Down Expand Up @@ -53,7 +55,7 @@ export default function useCreateFile({ scenario, assetUrn, schemaField }: Props
schemaField,
scenario,
sizeInBytes: file.size,
storageKey: `${PRODUCT_ASSETS_FOLDER}/${fileId}`,
storageKey: `${PRODUCT_ASSETS_FOLDER}/${fileId}${S3_FILE_ID_NAME_SEPARATOR}${file.name}`,
contentHash,
},
},
Expand Down
5 changes: 1 addition & 4 deletions datahub-web-react/src/app/shared/hooks/useFileUpload.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import { useApolloClient } from '@apollo/client';

import { PRODUCT_ASSETS_FOLDER } from '@app/shared/constants';
import useCreateFile from '@app/shared/hooks/useCreateFile';
import useCreateFile, { S3_FILE_ID_NAME_SEPARATOR } from '@app/shared/hooks/useCreateFile';
import { useAppConfig } from '@src/app/useAppConfig';
import { resolveRuntimePath } from '@utils/runtimeBasePath';

import { GetPresignedUploadUrlDocument } from '@graphql/app.generated';
import { UploadDownloadScenario } from '@types';

// keep this consistent with same const in li-utils/src/main/java/com/linkedin/metadata/Constants.java
const S3_FILE_ID_NAME_SEPARATOR = '__';

interface Props {
scenario: UploadDownloadScenario;
assetUrn?: string;
Expand Down Expand Up @@ -59,7 +56,7 @@
const uuidFromFileId = fileId.split(S3_FILE_ID_NAME_SEPARATOR)[0];
await createFile(uuidFromFileId, file);
} catch (error) {
throw new Error(`Failed to upload file: ${error}`);

Check failure on line 59 in datahub-web-react/src/app/shared/hooks/useFileUpload.ts

View workflow job for this annotation

GitHub Actions / build (frontend, UTC)

src/app/shared/hooks/__tests__/useFileUpload.test.tsx > useFileUpload > should work without assetUrn when not provided

Error: Failed to upload file: Error: [vitest] No "S3_FILE_ID_NAME_SEPARATOR" export is defined on the "@app/shared/hooks/useCreateFile" mock. Did you forget to return it from "vi.mock"? If you need to partially mock a module, you can use "importOriginal" helper inside: Ignored nodes: comments, script, style <html> <head /> <body /> </html> ❯ Object.uploadFile src/app/shared/hooks/useFileUpload.ts:59:19 ❯ src/app/shared/hooks/__tests__/useFileUpload.test.tsx:359:25

Check failure on line 59 in datahub-web-react/src/app/shared/hooks/useFileUpload.ts

View workflow job for this annotation

GitHub Actions / build (frontend, UTC)

src/app/shared/hooks/__tests__/useFileUpload.test.tsx > useFileUpload > should handle different file types correctly

Error: Failed to upload file: Error: [vitest] No "S3_FILE_ID_NAME_SEPARATOR" export is defined on the "@app/shared/hooks/useCreateFile" mock. Did you forget to return it from "vi.mock"? If you need to partially mock a module, you can use "importOriginal" helper inside: Ignored nodes: comments, script, style <html> <head /> <body /> </html> ❯ Object.uploadFile src/app/shared/hooks/useFileUpload.ts:59:19 ❯ src/app/shared/hooks/__tests__/useFileUpload.test.tsx:287:25

Check failure on line 59 in datahub-web-react/src/app/shared/hooks/useFileUpload.ts

View workflow job for this annotation

GitHub Actions / build (frontend, UTC)

src/app/shared/hooks/__tests__/useFileUpload.test.tsx > useFileUpload > should successfully upload a file and return the file URL

Error: Failed to upload file: Error: [vitest] No "S3_FILE_ID_NAME_SEPARATOR" export is defined on the "@app/shared/hooks/useCreateFile" mock. Did you forget to return it from "vi.mock"? If you need to partially mock a module, you can use "importOriginal" helper inside: Ignored nodes: comments, script, style <html> <head /> <body /> </html> ❯ Object.uploadFile src/app/shared/hooks/useFileUpload.ts:59:19 ❯ src/app/shared/hooks/__tests__/useFileUpload.test.tsx:109:25

Check failure on line 59 in datahub-web-react/src/app/shared/hooks/useFileUpload.ts

View workflow job for this annotation

GitHub Actions / build (frontend, America/New_York)

src/app/shared/hooks/__tests__/useFileUpload.test.tsx > useFileUpload > should work without assetUrn when not provided

Error: Failed to upload file: Error: [vitest] No "S3_FILE_ID_NAME_SEPARATOR" export is defined on the "@app/shared/hooks/useCreateFile" mock. Did you forget to return it from "vi.mock"? If you need to partially mock a module, you can use "importOriginal" helper inside: Ignored nodes: comments, script, style <html> <head /> <body /> </html> ❯ Object.uploadFile src/app/shared/hooks/useFileUpload.ts:59:19 ❯ src/app/shared/hooks/__tests__/useFileUpload.test.tsx:359:25

Check failure on line 59 in datahub-web-react/src/app/shared/hooks/useFileUpload.ts

View workflow job for this annotation

GitHub Actions / build (frontend, America/New_York)

src/app/shared/hooks/__tests__/useFileUpload.test.tsx > useFileUpload > should handle different file types correctly

Error: Failed to upload file: Error: [vitest] No "S3_FILE_ID_NAME_SEPARATOR" export is defined on the "@app/shared/hooks/useCreateFile" mock. Did you forget to return it from "vi.mock"? If you need to partially mock a module, you can use "importOriginal" helper inside: Ignored nodes: comments, script, style <html> <head /> <body /> </html> ❯ Object.uploadFile src/app/shared/hooks/useFileUpload.ts:59:19 ❯ src/app/shared/hooks/__tests__/useFileUpload.test.tsx:287:25

Check failure on line 59 in datahub-web-react/src/app/shared/hooks/useFileUpload.ts

View workflow job for this annotation

GitHub Actions / build (frontend, America/New_York)

src/app/shared/hooks/__tests__/useFileUpload.test.tsx > useFileUpload > should successfully upload a file and return the file URL

Error: Failed to upload file: Error: [vitest] No "S3_FILE_ID_NAME_SEPARATOR" export is defined on the "@app/shared/hooks/useCreateFile" mock. Did you forget to return it from "vi.mock"? If you need to partially mock a module, you can use "importOriginal" helper inside: Ignored nodes: comments, script, style <html> <head /> <body /> </html> ❯ Object.uploadFile src/app/shared/hooks/useFileUpload.ts:59:19 ❯ src/app/shared/hooks/__tests__/useFileUpload.test.tsx:109:25
}

return resolveRuntimePath(`/openapi/v1/files/${PRODUCT_ASSETS_FOLDER}/${fileId}`);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.utils.CriterionUtils;
import com.linkedin.schema.SchemaMetadata;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -432,4 +433,34 @@ public void testEntityNamesForStructuredPropDeletion() {
assertEquals(
DeleteEntityUtils.getEntityNamesForStructuredPropertyDeletion(), ImmutableList.of("form"));
}

@Test
public void testGetFilterForFileDeletion() {
Urn deletedEntityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,Test,PROD)");

final CriterionArray criterionArray = new CriterionArray();
criterionArray.add(
CriterionUtils.buildCriterion(
"referencedByAsset", Condition.EQUAL, deletedEntityUrn.toString()));

Filter expectedFilter =
new Filter()
.setOr(
new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(criterionArray)));

assertEquals(DeleteEntityUtils.getFilterForFileDeletion(deletedEntityUrn), expectedFilter);
}

@Test
public void testBuildSoftDeleteProposal() {
Urn fileUrn = UrnUtils.getUrn("urn:li:dataHubFile:test-file-id");

var proposal = DeleteEntityUtils.buildSoftDeleteProposal(fileUrn);

assertEquals(proposal.getEntityUrn(), fileUrn);
assertEquals(proposal.getEntityType(), "dataHubFile");
assertEquals(proposal.getAspectName(), "status");
assertEquals(proposal.getChangeType(), com.linkedin.events.metadata.ChangeType.UPSERT);
assertNotNull(proposal.getAspect());
}
}
1 change: 1 addition & 0 deletions metadata-models/src/main/resources/entity-registry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ entities:
keyAspect: dataHubFileKey
aspects:
- dataHubFileInfo
- status
- name: dataHubConnection
category: internal
keyAspect: dataHubConnectionKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.utils.aws.S3Util;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
Expand All @@ -26,9 +28,14 @@ public class DeleteEntityServiceFactory {
@Qualifier("entitySearchService")
private EntitySearchService _entitySearchService;

@Autowired(required = false)
@Qualifier("s3Util")
@Nullable
private S3Util _s3Util;

@Bean(name = "deleteEntityService")
@Nonnull
protected DeleteEntityService createDeleteEntityService() {
return new DeleteEntityService(_entityService, _graphService, _entitySearchService);
return new DeleteEntityService(_entityService, _graphService, _entitySearchService, _s3Util);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,17 @@ protected S3Util getInstance() {
return new S3Util(stsClient, roleArn);
} else {
log.info("Using default S3Util with default credentials");
S3Client s3Client = S3Client.create();
var clientBuilder = S3Client.builder();

// Configure endpoint URL if provided (for LocalStack or custom S3 endpoints)
String endpointUrl = System.getenv("AWS_ENDPOINT_URL");
if (endpointUrl != null && !endpointUrl.isEmpty()) {
log.info("Configuring S3Client with custom endpoint: {}", endpointUrl);
clientBuilder.endpointOverride(java.net.URI.create(endpointUrl));
clientBuilder.forcePathStyle(true);
}

S3Client s3Client = clientBuilder.build();
return new S3Util(s3Client);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.file.BucketStorageLocation;
import com.linkedin.file.DataHubFileInfo;
import com.linkedin.form.FormInfo;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.models.graph.RelatedEntity;
Expand All @@ -34,6 +36,7 @@
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.aws.S3Util;
import com.linkedin.mxe.MetadataChangeProposal;
import io.datahubproject.metadata.context.OperationContext;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -61,6 +64,7 @@ public class DeleteEntityService {
private final EntityService<?> _entityService;
private final GraphService _graphService;
private final EntitySearchService _searchService;
private final S3Util _s3Util;

private static final Integer ELASTIC_BATCH_DELETE_SLEEP_SEC = 5;
private static final Integer BATCH_SIZE = 1000;
Expand All @@ -81,6 +85,9 @@ public DeleteReferencesResponse deleteReferencesTo(
// in CLI
final DeleteReferencesResponse result = new DeleteReferencesResponse();

// Delete file references first (before other references are cleaned up)
int totalFileCount = deleteFileReferences(opContext, urn, dryRun);

// Delete references for entities referencing the deleted urn with searchables.
// Only works for Form deletion for now
int totalSearchAssetCount = deleteSearchReferences(opContext, urn, dryRun);
Expand Down Expand Up @@ -110,7 +117,7 @@ public DeleteReferencesResponse deleteReferencesTo(
.collect(Collectors.toList());

result.setRelatedAspects(new RelatedAspectArray(relatedAspects));
result.setTotal(relatedEntities.getTotal() + totalSearchAssetCount);
result.setTotal(relatedEntities.getTotal() + totalSearchAssetCount + totalFileCount);

if (dryRun) {
return result;
Expand Down Expand Up @@ -591,6 +598,11 @@ private AssetScrollResult scrollForAssets(
"5m",
dryRun ? 1 : BATCH_SIZE); // need to pass in 1 for count otherwise get index error
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
// Initialize assets to empty list and count to 0 if no results
if (result.assets == null) {
result.assets = new ArrayList<>();
}
result.totalAssetCount = 0;
return result;
}
result.scrollId = scrollResult.getScrollId();
Expand Down Expand Up @@ -710,6 +722,121 @@ private AuditStamp createAuditStamp() {
.setTime(System.currentTimeMillis());
}

/**
* Find and delete files from S3 when their referenced entity is being deleted. This method finds
* all DataHub file entities that reference the deleted entity via the referencedByAsset field,
* deletes them from S3, and soft-deletes the file entities.
*
* @param opContext the operation context
* @param deletedUrn the URN of the entity being deleted
* @param dryRun if true, only count files without deleting them
* @return the number of files processed
*/
private int deleteFileReferences(
@Nonnull OperationContext opContext, @Nonnull final Urn deletedUrn, final boolean dryRun) {

Filter filter = DeleteEntityUtils.getFilterForFileDeletion(deletedUrn);
List<String> entityNames = ImmutableList.of(Constants.DATAHUB_FILE_ENTITY_NAME);

int totalFileCount = 0;
String scrollId = null;

do {
AssetScrollResult result =
scrollForAssets(
opContext, new AssetScrollResult(), filter, entityNames, scrollId, dryRun);

totalFileCount += result.totalAssetCount;
scrollId = dryRun ? null : result.scrollId;

if (!dryRun) {
result.assets.forEach(
fileUrn -> {
try {
deleteFileAndS3Object(opContext, fileUrn, deletedUrn);
} catch (Exception e) {
log.error(
"Failed to process file deletion for urn: {} referenced by deleted entity: {}",
fileUrn,
deletedUrn,
e);
}
});
}
} while (scrollId != null);

if (totalFileCount > 0) {
log.info("Processed {} file(s) referencing deleted entity: {}", totalFileCount, deletedUrn);
}

return totalFileCount;
}

/**
* Delete a file from S3 and soft-delete the DataHub file entity. This ensures the file is removed
* from storage and marked as deleted in DataHub for audit purposes.
*
* @param opContext the operation context
* @param fileUrn the URN of the file to delete
* @param deletedEntityUrn the URN of the entity being deleted (for logging)
*/
private void deleteFileAndS3Object(
@Nonnull OperationContext opContext,
@Nonnull final Urn fileUrn,
@Nonnull final Urn deletedEntityUrn) {

log.info(
"Processing file cleanup for file: {} (referenced by deleted entity: {})",
fileUrn,
deletedEntityUrn);

try {
// Get file info to retrieve S3 location
RecordTemplate record =
_entityService.getLatestAspect(
opContext, fileUrn, Constants.DATAHUB_FILE_INFO_ASPECT_NAME);

if (record == null) {
log.warn("Could not retrieve file info for urn: {}, skipping cleanup", fileUrn);
return;
}

DataHubFileInfo fileInfo = new DataHubFileInfo(record.data());

// Delete from S3 if S3Util is available and file has storage location
if (_s3Util != null && fileInfo.hasBucketStorageLocation()) {
BucketStorageLocation location = fileInfo.getBucketStorageLocation();
String bucket = location.getStorageBucket();
String key = location.getStorageKey();

try {
_s3Util.deleteObject(bucket, key);
log.info(
"Successfully deleted file from S3: bucket={}, key={}, urn={}", bucket, key, fileUrn);
} catch (Exception e) {
log.error(
"Failed to delete file from S3 for urn: {}. Will continue with soft-delete to avoid "
+ "leaving entity in inconsistent state. Manual S3 cleanup may be required.",
fileUrn,
e);
}
} else {
log.warn(
"S3Util not configured or file has no storage location, skipping S3 deletion for file: {}",
fileUrn);
}

// Soft delete the file entity
MetadataChangeProposal softDeleteMcp = DeleteEntityUtils.buildSoftDeleteProposal(fileUrn);
_entityService.ingestProposal(opContext, softDeleteMcp, createAuditStamp(), true);
log.info("Soft-deleted DataHub file entity: {}", fileUrn);

} catch (Exception e) {
log.error("Failed to process file deletion for urn: {}", fileUrn, e);
throw new RuntimeException("Failed to delete file: " + fileUrn, e);
}
}

@AllArgsConstructor
@Data
private static class DeleteEntityServiceError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.common.FormVerificationAssociation;
import com.linkedin.common.FormVerificationAssociationArray;
import com.linkedin.common.Forms;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataComplex;
import com.linkedin.data.DataList;
Expand All @@ -18,15 +19,18 @@
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.form.FormInfo;
import com.linkedin.form.FormPrompt;
import com.linkedin.form.FormPromptArray;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.utils.CriterionUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import java.util.List;
import java.util.ListIterator;
Expand Down Expand Up @@ -373,4 +377,41 @@ public static FormInfo removePromptsFromFormInfoAspect(

return updatedAspect.get();
}

/**
* Get filter to find all DataHub file entities that reference a deleted entity via the
* referencedByAsset field.
*
* @param deletedUrn the URN of the entity being deleted
* @return Filter to use in search queries
*/
public static Filter getFilterForFileDeletion(@Nonnull Urn deletedUrn) {
return new Filter()
.setOr(
new ConjunctiveCriterionArray(
new ConjunctiveCriterion()
.setAnd(
new CriterionArray(
buildCriterion(
"referencedByAsset", Condition.EQUAL, deletedUrn.toString())))));
}

/**
* Build a MetadataChangeProposal to soft-delete an entity by setting its status to removed.
*
* @param urn the URN of the entity to soft-delete
* @return MetadataChangeProposal for soft deletion
*/
public static MetadataChangeProposal buildSoftDeleteProposal(@Nonnull Urn urn) {
Status status = new Status().setRemoved(true);

MetadataChangeProposal mcp = new MetadataChangeProposal();
mcp.setEntityUrn(urn);
mcp.setEntityType(urn.getEntityType());
mcp.setAspectName(Constants.STATUS_ASPECT_NAME);
mcp.setChangeType(ChangeType.UPSERT);
mcp.setAspect(GenericRecordUtils.serializeAspect(status));

return mcp;
}
}
Loading
Loading