Skip to content
86 changes: 70 additions & 16 deletions lib/api/apiUtils/object/abortMultipartUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ const async = require('async');
const constants = require('../../../../constants');
const { data } = require('../../../data/wrapper');
const locationConstraintCheck = require('../object/locationConstraintCheck');
const { standardMetadataValidateBucketAndObj } =
require('../../../metadata/metadataUtils');
const metadataUtils = require('../../../metadata/metadataUtils');
const { validateQuotas } = require('../quotas/quotaUtils');
const services = require('../../../services');
const metadata = require('../../../metadata/wrapper');
Expand All @@ -30,7 +29,7 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,

async.waterfall([
function checkDestBucketVal(next) {
standardMetadataValidateBucketAndObj(metadataValParams, authzIdentityResult, log,
metadataUtils.standardMetadataValidateBucketAndObj(metadataValParams, authzIdentityResult, log,
(err, destinationBucket, objectMD) => {
if (err) {
log.error('error validating request', { error: err });
Expand Down Expand Up @@ -61,7 +60,7 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
});
},
function abortExternalMpu(mpuBucket, mpuOverviewObj, destBucket, objectMD,
next) {
next) {
const location = mpuOverviewObj.controllingLocationConstraint;
const originalIdentityAuthzResults = request.actionImplicitDenies;
// eslint-disable-next-line no-param-reassign
Expand Down Expand Up @@ -93,29 +92,82 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
skipDataDelete);
});
},
function deleteObjectMetadata(mpuBucket, storedParts, destBucket, objectMD, skipDataDelete, next) {
if (!objectMD || metadataValMPUparams.uploadId !== objectMD.uploadId) {
// During Abort, we dynamically detect if the previous CompleteMPU call
// created potential object metadata wrongly, e.g. by creating
// an object version when some of the parts are missing.
// By passing a null objectMD, we tell the subsequent steps
// to skip the cleanup.
// Another approach is possible, but not supported by all backends:
// to honor the uploadId filter in standardMetadataValidateBucketAndObj
// ensuring the objMD returned has the right uploadId. But this is not
// supported by Metadata.
function findObjectToCleanup(mpuBucket, storedParts, destBucket,
objectMD, skipDataDelete, next) {
if (!objectMD) {
return next(null, mpuBucket, storedParts, destBucket, null, skipDataDelete);
}

// If objectMD exists and has matching uploadId, use it directly
// This handles all non-versioned cases, and some versioned cases.
if (objectMD.uploadId === uploadId) {
return next(null, mpuBucket, storedParts, destBucket, objectMD, skipDataDelete);
}
// In case there has been an error during cleanup after a complete MPU
// (e.g. failure to delete MPU MD in shadow bucket),
// we need to ensure that the MPU metadata is deleted.

// If bucket is not versioned, no need to check versions:
// as the uploadId is not the same, we skip the cleanup.
if (!destBucket.isVersioningEnabled()) {
return next(null, mpuBucket, storedParts, destBucket, null, skipDataDelete);
}

// Otherwise, list all versions to find one with a matching uploadId.
return services.findObjectVersionByUploadId(bucketName, objectKey, uploadId, log, (err, foundVersion) => {
if (err) {
log.warn('error finding object version by uploadId, proceeding without cleanup', {
error: err,
method: 'abortMultipartUpload.findObjectToCleanup',
});
// On error, continue the abort without an objectMD to clean up.
return next(null, mpuBucket, storedParts, destBucket, null, skipDataDelete);
}
return next(null, mpuBucket, storedParts, destBucket, foundVersion, skipDataDelete);
});
},
function deleteObjectMetadata(mpuBucket, storedParts, destBucket, objectMD,
skipDataDelete, next) {
if (!objectMD) {
return next(null, mpuBucket, storedParts, destBucket, objectMD, skipDataDelete);
}

log.debug('Object has existing metadata, deleting them', {
method: 'abortMultipartUpload',
bucketName,
objectKey,
uploadId,
versionId: objectMD.versionId,
});
return metadata.deleteObjectMD(bucketName, objectKey, { versionId: objectMD.versionId }, log, err => {
return metadata.deleteObjectMD(bucketName, objectKey, {
versionId: objectMD.versionId,
}, log, err => {
if (err) {
log.error('error deleting object metadata', { error: err });
// Handle concurrent deletion of this object metadata
if (err.is?.NoSuchKey) {
log.debug('object metadata already deleted or does not exist', {
method: 'abortMultipartUpload',
bucketName,
objectKey,
versionId: objectMD.versionId,
});
} else {
log.error('error deleting object metadata', { error: err });
}
}
return next(err, mpuBucket, storedParts, destBucket, objectMD, skipDataDelete);
// Continue with the operation regardless of deletion success/failure
// The important part is that we tried to clean up
return next(null, mpuBucket, storedParts, destBucket, objectMD, skipDataDelete);
});
},
function deleteData(mpuBucket, storedParts, destBucket, objectMD,
skipDataDelete, next) {
skipDataDelete, next) {
if (skipDataDelete) {
return next(null, mpuBucket, storedParts, destBucket);
}
Expand All @@ -126,16 +178,18 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
return next(null, mpuBucket, storedParts, destBucket);
}

if (objectMD && objectMD.location && objectMD.uploadId === metadataValMPUparams.uploadId) {
// Add object data locations if they exist
if (objectMD?.location) {
const existingLocations = new Set(locations.map(loc => loc.key));
const remainingObjectLocations = objectMD.location.filter(loc => !existingLocations.has(loc.key));
const remainingObjectLocations = objectMD.
location.filter(loc => !existingLocations.has(loc.key));
locations.push(...remainingObjectLocations);
}

return async.eachLimit(locations, 5, (loc, cb) => {
data.delete(loc, log, err => {
if (err) {
log.fatal('delete ObjectPart failed', { err });
log.warn('delete ObjectPart failed', { err });
}
cb();
});
Expand Down
66 changes: 66 additions & 0 deletions lib/services.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { errors, s3middleware } = require('arsenal');
const ObjectMD = require('arsenal').models.ObjectMD;
const BucketInfo = require('arsenal').models.BucketInfo;
const ObjectMDArchive = require('arsenal').models.ObjectMDArchive;
const { versioning } = require('arsenal');
const acl = require('./metadata/acl');
const constants = require('../constants');
const { config } = require('./Config');
Expand Down Expand Up @@ -418,6 +419,71 @@ const services = {
});
},

/**
* Finds a specific object version by its uploadId by listing and filtering all versions.
* This is used during MPU abort to clean up potentially orphaned object metadata.
* @param {string} bucketName - The name of the bucket.
* @param {string} objectKey - The key of the object.
* @param {string} uploadId - The uploadId to search for.
* @param {object} log - The logger instance.
* @param {function} cb - The callback, called with (err, foundVersion).
* @returns {undefined}
*/
findObjectVersionByUploadId(bucketName, objectKey, uploadId, log, cb) {
let keyMarker = null;
let versionIdMarker = null;
let foundVersion = null;
let shouldContinue = true;

return async.whilst(
() => shouldContinue && !foundVersion,
callback => {
const listParams = {
listingType: 'DelimiterVersions',
// To only list the specific key, we need to add the versionId separator
prefix: `${objectKey}${versioning.VersioningConstants.VersionId.Separator}`,
maxKeys: 1000,
};

if (keyMarker) {
listParams.keyMarker = keyMarker;
}
if (versionIdMarker) {
listParams.versionIdMarker = versionIdMarker;
}

return this.getObjectListing(bucketName, listParams, log, (err, listResponse) => {
if (err) {
log.error('error listing object versions', { error: err });
return callback(err);
}

// Check each version in current batch for matching uploadId
const matchedVersion = (listResponse.Versions || []).find(version =>
version.key === objectKey &&
version.value &&
version.value.uploadId === uploadId
);

if (matchedVersion) {
foundVersion = matchedVersion.value;
}

// Set up for next iteration if needed
if (listResponse.IsTruncated && !foundVersion) {
keyMarker = listResponse.NextKeyMarker;
versionIdMarker = listResponse.NextVersionIdMarker;
} else {
shouldContinue = false;
}

return callback();
});
},
err => cb(err, err ? null : foundVersion)
);
},

/**
* Gets list of objects ready to be lifecycled
* @param {object} bucketName - bucket in which objectMetadata is stored
Expand Down
Loading
Loading