-
Notifications
You must be signed in to change notification settings - Fork 8
feat(backend): S3 garbage collection #6543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
884c04e
e107ba5
0443280
af95fb7
6f3d146
a3aa7f0
aed1fb6
a6500b8
683c5b0
71ca764
4797c66
702ccb3
0ae53a4
0cf9e12
cbf53be
7fdcd15
9f4e8f8
5b89ed1
e97e00a
d77f923
26968a2
f45252d
dd93d92
9adaa8e
fc3f17f
febad3f
3cd7cc4
27138b7
2441fa2
ff618b5
a8df88e
16065cb
06a1cd7
f68bf0c
6a83024
56caffe
13c1444
c15ac0d
f2798aa
a6a2bd0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,10 +1,15 @@ | ||||||
| package org.loculus.backend.service.files | ||||||
|
|
||||||
| import kotlinx.datetime.LocalDateTime | ||||||
| import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq | ||||||
| import org.jetbrains.exposed.sql.and | ||||||
| import org.jetbrains.exposed.sql.deleteWhere | ||||||
| import org.jetbrains.exposed.sql.insert | ||||||
| import org.jetbrains.exposed.sql.kotlin.datetime.KotlinLocalDateTimeColumnType | ||||||
| import org.jetbrains.exposed.sql.not | ||||||
| import org.jetbrains.exposed.sql.statements.StatementType | ||||||
| import org.jetbrains.exposed.sql.transactions.transaction | ||||||
| import org.jetbrains.exposed.sql.update | ||||||
| import org.loculus.backend.utils.DatabaseConstants | ||||||
| import org.loculus.backend.utils.DateProvider | ||||||
| import org.loculus.backend.utils.chunkedForDatabase | ||||||
| import org.springframework.stereotype.Service | ||||||
|
|
@@ -26,6 +31,10 @@ class FilesDatabaseService(private val dateProvider: DateProvider) { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| fun deleteFileEntry(fileId: UUID) { | ||||||
| FilesTable.deleteWhere { FilesTable.idColumn eq fileId } | ||||||
| } | ||||||
|
|
||||||
| fun getGroupIds(fileIds: Set<FileId>): Map<FileId, Int> = fileIds.chunkedForDatabase({ chunk -> | ||||||
| FilesTable.select(FilesTable.idColumn, FilesTable.groupIdColumn) | ||||||
| .where { FilesTable.idColumn inList chunk } | ||||||
|
|
@@ -57,6 +66,48 @@ class FilesDatabaseService(private val dateProvider: DateProvider) { | |||||
| chunk.filterNot { it in existingIds } | ||||||
| }, 1).toSet() | ||||||
|
|
||||||
| fun getOrphanedFileIds(threshold: LocalDateTime): Set<FileId> { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We ran this on staging to do some perf (added one entry in the files table to actually run). Scanning every entry is definitely not teh greatest, so in future we might need to refactor but this should work for now :-) Details
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @corneliusroemer and I also discussed one potential race condition. If a user submits a file, waits for like a month (or however long the grace period is) and then submits data with the files at the same exact time this transaction is run the fileId will get into the list of files to delete but the submission will not be rejected.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we dont foresee this happening but it might be good to somehow mark columns as "marked for deletion" after running this job, stopping submission of files that are marked for deletion and then rerunning this transaction again before deleting |
||||||
| val sql = """ | ||||||
| -- check for files for which an upload was requested > threshold days ago | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: we're reading through the whole database for this to find referenced files. If this becomes perf problematic or otherwise we might want to consider normalizing files with proper foreign keys etc to allow full power of SQL. |
||||||
| -- but are not referenced by a submission. For this, check the submitted_data | ||||||
| -- and processed_data jsonb objects (but not archive_of_submitted_data) | ||||||
|
anna-parker marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets check the file is also not in the archive for consistency
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright I'll make it scan the archive as well, although I expect it will take the query take even longer unfortunately. The more I think about it the more I become convinced we should take files out of the jsonb and add join tables from |
||||||
| WITH referenced AS ( | ||||||
|
anna-parker marked this conversation as resolved.
|
||||||
| -- fetch ids for files uploaded by users and referenced in submissions | ||||||
| SELECT (fil->>'fileId')::uuid AS file_id | ||||||
| FROM sequence_entries, | ||||||
| LATERAL jsonb_each(COALESCE(NULLIF(submitted_data->'files', 'null'::jsonb),'{}'::jsonb)) AS cat(k,v), | ||||||
| LATERAL jsonb_array_elements(cat.v) AS fil | ||||||
| UNION | ||||||
| -- also need to check processed_data since preprocessing | ||||||
| -- can create files that are never referenced in submissions | ||||||
| SELECT (fil->>'fileId')::uuid AS file_id | ||||||
| FROM sequence_entries_preprocessed_data sepd | ||||||
| JOIN sequence_entries se | ||||||
| ON se.accession = sepd.accession | ||||||
| AND se.version = sepd.version, | ||||||
| LATERAL jsonb_each(COALESCE(NULLIF(sepd.processed_data->'files', 'null'::jsonb),'{}'::jsonb)) AS cat(k,v), | ||||||
| LATERAL jsonb_array_elements(cat.v) AS fil | ||||||
| ) | ||||||
| SELECT f.id FROM files f | ||||||
| LEFT JOIN referenced r ON r.file_id = f.id | ||||||
| WHERE r.file_id IS NULL | ||||||
| AND f.upload_requested_at < ?; | ||||||
| """.trimIndent() | ||||||
| return transaction { | ||||||
| exec( | ||||||
| sql, | ||||||
| listOf(KotlinLocalDateTimeColumnType() to threshold), | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm...
Suggested change
|
||||||
| explicitStatementType = StatementType.SELECT, | ||||||
| ) { rs -> | ||||||
| buildSet<FileId> { | ||||||
| while (rs.next()) { | ||||||
| add(rs.getObject("id", UUID::class.java)) | ||||||
| } | ||||||
| } | ||||||
| } ?: emptySet() | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Return the subset of file IDs for which the file size hasn't been checked yet or | ||||||
| * no file has been uploaded yet (and therefore there's no file size). | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest | |
| import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload | ||
| import software.amazon.awssdk.services.s3.model.CompletedPart | ||
| import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest | ||
| import software.amazon.awssdk.services.s3.model.DeleteObjectRequest | ||
| import software.amazon.awssdk.services.s3.model.GetObjectRequest | ||
| import software.amazon.awssdk.services.s3.model.HeadObjectRequest | ||
| import software.amazon.awssdk.services.s3.model.PutObjectRequest | ||
|
|
@@ -176,6 +177,18 @@ class S3Service(private val s3Config: S3Config) { | |
| Unit | ||
| } | ||
|
|
||
| fun deleteFile(fileId: FileId) = s3ErrorMapping { | ||
| val config = getS3BucketConfig() | ||
| s3Client.deleteObject( | ||
| DeleteObjectRequest | ||
| .builder() | ||
| .bucket(config.bucket) | ||
| .key(getFileIdPath(fileId)) | ||
| .build(), | ||
| ) | ||
| Unit | ||
| } | ||
|
|
||
| private fun assertIsEnabled() { | ||
| if (!s3Config.enabled) { | ||
| throw IllegalStateException("S3 is not enabled") | ||
|
|
@@ -258,6 +271,11 @@ fun <T> s3ErrorMapping(block: () -> T): T { | |
| "by part number.", | ||
| ) | ||
|
|
||
| "NoSuchKey" -> UnprocessableEntityException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reminds me there are no tests for non-happy-path cases like if we're trying to delete something that doesn't exist. |
||
| "NoSuchKey: The referenced file does not exist in storage. Uploaded files that are not " + | ||
| "referenced by a submission for too long are cleaned up automatically.", | ||
| ) | ||
|
|
||
| else -> RuntimeException("Unexpected S3 error: ${e.awsErrorDetails().errorCode()}") | ||
| } | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason this is under |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| package org.loculus.backend.service.submission | ||
|
|
||
| import kotlinx.datetime.DateTimeUnit | ||
| import kotlinx.datetime.minus | ||
| import kotlinx.datetime.toLocalDateTime | ||
| import org.loculus.backend.config.BackendSpringProperty | ||
| import org.loculus.backend.log.AuditLogger | ||
| import org.loculus.backend.service.files.FilesDatabaseService | ||
| import org.loculus.backend.service.files.S3Service | ||
| import org.loculus.backend.utils.DateProvider | ||
| import org.springframework.beans.factory.annotation.Value | ||
| import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty | ||
| import org.springframework.scheduling.annotation.Scheduled | ||
| import org.springframework.stereotype.Component | ||
| import java.util.UUID | ||
| import java.util.concurrent.TimeUnit | ||
| import kotlin.math.max | ||
|
|
||
| private val log = mu.KotlinLogging.logger {} | ||
|
|
||
| @Component | ||
| @ConditionalOnProperty("loculus.s3.enabled", havingValue = "true") | ||
| class S3GarbageCollectionTask( | ||
| private val filesDatabaseService: FilesDatabaseService, | ||
| private val s3Service: S3Service, | ||
| private val dateProvider: DateProvider, | ||
| private val auditLogger: AuditLogger, | ||
| @Value("\${${BackendSpringProperty.S3_GC_GRACE_PERIOD_MINUTES}}") private val gracePeriod: Int, | ||
| @Value("\${${BackendSpringProperty.S3_GC_ENABLED}}") private val deleteOrphans: Boolean, | ||
| ) { | ||
|
|
||
| /** | ||
| * Deletes S3 objects older than `loculus.s3.gc-grace-period-minutes` that are | ||
| * not referenced in submitted_data or processed_data | ||
| */ | ||
| @Scheduled( | ||
| initialDelayString = "\${${BackendSpringProperty.S3_GC_INITIAL_DELAY_MINUTES}}", | ||
| fixedDelay = 60 * 24, | ||
| timeUnit = TimeUnit.MINUTES, | ||
| ) | ||
| fun task() { | ||
| // `gracePeriod` must be at least 1 or files produced by preprocessing may be | ||
| // garbage collected before they're attached to sequence entries | ||
| val gracePeriod = max(gracePeriod, 1) | ||
| log.info { | ||
| "Running S3 garbage collection task to clean up orphan files at least $gracePeriod " + | ||
| "minutes old (garbageCollectionEnabled = $deleteOrphans)" | ||
| } | ||
|
|
||
| val threshold = dateProvider.getCurrentInstant() | ||
| .minus(gracePeriod, DateTimeUnit.MINUTE, DateProvider.timeZone) | ||
| .toLocalDateTime(DateProvider.timeZone) | ||
|
Comment on lines
+50
to
+52
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this the same function
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Almost: Can probably be unified
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might be using dateprovider for a reason - so maybe shouldn't be replaced by Clock.System.now() in tests |
||
| val orphans = filesDatabaseService.getOrphanedFileIds(threshold) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not 100% certain but this whole task looks like it has not transaction isolation at all. Considering the 2 backends racing this is probably not a good idea. |
||
|
|
||
| if (!deleteOrphans) { | ||
| log.info { "S3 garbage collection task would have deleted ${orphans.size} files: $orphans" } | ||
| return | ||
| } | ||
|
|
||
| var deleteFailures = 0 | ||
| orphans.forEach { fileId: UUID -> | ||
| try { | ||
| s3Service.deleteFile(fileId) | ||
| filesDatabaseService.deleteFileEntry(fileId) | ||
|
Comment on lines
+63
to
+64
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Transactionally this might be some of the most critical code here - shoudl we really wrap it in a single try/catch? No retry, no recovery if deletion fails at step 1? Or at step 2? Off the top of my head, for example, we might want to handle s3 deletion failure by setting a flag in our db about it. Vice versa, if s3 deletion succeeds but then db deletion doesn't what do we do? In the future deletion might fail every time as the file is no longer there? So we end in unrecoverable state?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An actual failure mode that can happen is that we have 2 backends in production - they might happen to cross and run at the same time racing to deletion. We might want to add a lock for this, maybe a table that tracks last cleanup time and if cleanup is in progress so we don't cleanup at the same time twice.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be good to check this out... chatGPT tells me the s3 deletion should not raise an error if the file is missing which means we would still try to remove the file from the db: But then Im not sure why we have code to handle
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes trying to delete a deleted file should be fine as far as I'm aware. The |
||
| } catch (e: Exception) { | ||
| log.warn("Failed to delete $fileId", e) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about it we definitely at the very least want different error messages for failing to delete file from s3 and from postgres db. |
||
| deleteFailures++ | ||
| } | ||
|
maverbiest marked this conversation as resolved.
|
||
| } | ||
|
|
||
| if (orphans.isNotEmpty()) { | ||
| log.info { | ||
| "S3 garbage collection task deleted ${orphans.size - deleteFailures} orphan(s) not referenced by a " + | ||
| "submission after $gracePeriod minutes" | ||
| } | ||
| auditLogger | ||
| .log( | ||
| "CLEANUP", | ||
| "S3 garbage collection task deleted ${orphans.size - deleteFailures} orphan(s) " + | ||
| "not referenced by a submission after $gracePeriod minutes", | ||
| ) | ||
|
|
||
| if (deleteFailures > 0) { | ||
| log.warn { | ||
| "S3 garbage collection task unsuccessfully attempted to delete $deleteFailures orphan file(s)" | ||
| } | ||
| } | ||
| } else { | ||
| log.info { "S3 garbage collection task identified no orphan files on S3" } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| CREATE INDEX IF NOT EXISTS files_upload_requested_at_idx ON files (upload_requested_at); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The real index we'd need is knowing fileIds referenced by se and sepd - this one here doesn't harm but is not very helpful either..
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do select based on upload_requested_at so it should help some :-) |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| package org.loculus.backend.service | ||
|
|
||
| import kotlinx.datetime.DateTimeUnit | ||
| import kotlinx.datetime.LocalDateTime | ||
| import kotlinx.datetime.minus | ||
| import kotlinx.datetime.toLocalDateTime | ||
| import org.jetbrains.exposed.sql.insert | ||
| import org.jetbrains.exposed.sql.transactions.transaction | ||
| import org.loculus.backend.service.files.FilesTable | ||
| import org.loculus.backend.utils.DateProvider | ||
| import java.util.UUID | ||
| import kotlin.time.Clock | ||
|
|
||
| fun insertFile(id: UUID, groupId: Int, requestedAt: LocalDateTime, uploader: String = "testuser") = transaction { | ||
| FilesTable.insert { | ||
| it[idColumn] = id | ||
| it[uploadRequestedAtColumn] = requestedAt | ||
| it[uploaderColumn] = uploader | ||
| it[groupIdColumn] = groupId | ||
| it[multipartCompleted] = true | ||
| } | ||
| } | ||
|
|
||
| fun daysAgo(days: Long): LocalDateTime = Clock.System.now() | ||
| .minus(days, DateTimeUnit.DAY, DateProvider.timeZone) | ||
| .toLocalDateTime(DateProvider.timeZone) |
Uh oh!
There was an error while loading. Please reload this page.