diff --git a/backend/build.gradle b/backend/build.gradle index 5e4734bd06..6399791398 100644 --- a/backend/build.gradle +++ b/backend/build.gradle @@ -40,6 +40,7 @@ repositories { dependencies { implementation "org.springframework.boot:spring-boot-starter-web" + implementation "org.springframework.boot:spring-boot-starter-aop" implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation "com.fasterxml.jackson.module:jackson-module-kotlin" implementation "org.jetbrains.kotlin:kotlin-reflect" diff --git a/backend/gradle.lockfile b/backend/gradle.lockfile index 8837136876..1eedf1cb04 100644 --- a/backend/gradle.lockfile +++ b/backend/gradle.lockfile @@ -135,6 +135,7 @@ org.apache.tomcat.embed:tomcat-embed-core:10.1.48=compileClasspath,productionRun org.apache.tomcat.embed:tomcat-embed-el:10.1.48=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.apache.tomcat.embed:tomcat-embed-websocket:10.1.48=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath +org.aspectj:aspectjweaver:1.9.24=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.assertj:assertj-core:3.27.6=testCompileClasspath,testRuntimeClasspath org.awaitility:awaitility:4.3.0=testCompileClasspath,testRuntimeClasspath org.bouncycastle:bcpg-jdk18on:1.80=kotlinBouncyCastleConfiguration @@ -254,6 +255,7 @@ org.springframework.boot:spring-boot-actuator-autoconfigure:3.5.7=compileClasspa org.springframework.boot:spring-boot-actuator:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.springframework.boot:spring-boot-autoconfigure:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.springframework.boot:spring-boot-starter-actuator:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework.boot:spring-boot-starter-aop:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.springframework.boot:spring-boot-starter-jdbc:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.springframework.boot:spring-boot-starter-json:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.springframework.boot:spring-boot-starter-logging:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath diff --git a/backend/src/main/kotlin/org/loculus/backend/service/scheduler/TaskLock.kt b/backend/src/main/kotlin/org/loculus/backend/service/scheduler/TaskLock.kt new file mode 100644 index 0000000000..c62bea31e3 --- /dev/null +++ b/backend/src/main/kotlin/org/loculus/backend/service/scheduler/TaskLock.kt @@ -0,0 +1,7 @@ +package org.loculus.backend.service.scheduler + +import java.util.concurrent.TimeUnit + +@Target(AnnotationTarget.FUNCTION) +@Retention(AnnotationRetention.RUNTIME) +annotation class TaskLock(val name: String, val intervalString: String, val timeUnit: TimeUnit = TimeUnit.SECONDS) diff --git a/backend/src/main/kotlin/org/loculus/backend/service/scheduler/TaskLockAspect.kt b/backend/src/main/kotlin/org/loculus/backend/service/scheduler/TaskLockAspect.kt new file mode 100644 index 0000000000..8e2287ac94 --- /dev/null +++ b/backend/src/main/kotlin/org/loculus/backend/service/scheduler/TaskLockAspect.kt @@ -0,0 +1,40 @@ +package org.loculus.backend.service.scheduler + +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.Around +import org.aspectj.lang.annotation.Aspect +import org.springframework.context.EmbeddedValueResolverAware +import org.springframework.stereotype.Component +import org.springframework.util.StringValueResolver + +@Aspect +@Component +class TaskLockAspect(private val taskLockService: TaskLockService) : EmbeddedValueResolverAware { + private lateinit var embeddedValueResolver: StringValueResolver + + override fun setEmbeddedValueResolver(resolver: StringValueResolver) { + embeddedValueResolver = resolver + } + + @Around(value = "@annotation(taskLock)", argNames = "joinPoint,taskLock") + fun lockTask(joinPoint: ProceedingJoinPoint, taskLock: TaskLock): Any? { + val intervalSeconds = taskLock.timeUnit.toSeconds(resolveInterval(taskLock)) + if (!taskLockService.acquireLock(taskLock.name, frequencyIntervalSeconds = intervalSeconds)) return null + + try { + return joinPoint.proceed() + } finally { + taskLockService.releaseLock(taskLock.name, frequencyIntervalSeconds = intervalSeconds) + } + } + + private fun resolveInterval(taskLock: TaskLock): Long { + val resolvedInterval = embeddedValueResolver.resolveStringValue(taskLock.intervalString) + ?: throw IllegalArgumentException("Could not resolve lock interval for task '${taskLock.name}'") + + return resolvedInterval.toLongOrNull() + ?: throw IllegalArgumentException( + "Lock interval for task '${taskLock.name}' must resolve to a whole number, but was '$resolvedInterval'", + ) + } +} diff --git a/backend/src/main/kotlin/org/loculus/backend/service/seqsetcitations/SeqSetCrossRefCitationsTask.kt b/backend/src/main/kotlin/org/loculus/backend/service/seqsetcitations/SeqSetCrossRefCitationsTask.kt index 747989359d..3f0bd21e62 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/seqsetcitations/SeqSetCrossRefCitationsTask.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/seqsetcitations/SeqSetCrossRefCitationsTask.kt @@ -4,8 +4,7 @@ import org.loculus.backend.api.SeqSetCitationSource import org.loculus.backend.config.BackendSpringProperty import org.loculus.backend.config.ENABLE_SEQSETS_TRUE_VALUE import org.loculus.backend.service.crossref.CrossRefService -import org.loculus.backend.service.scheduler.TaskLockService -import org.springframework.beans.factory.annotation.Value +import org.loculus.backend.service.scheduler.TaskLock import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component @@ -39,9 +38,6 @@ internal fun mergeCitationSources(citationSources: List): class SeqSetCrossRefCitationsTask( private val crossRefService: CrossRefService, private val seqSetCitationsDatabaseService: SeqSetCitationsDatabaseService, - private val taskLockService: TaskLockService, - @Value("\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}") - private val runEveryMinutes: Long, ) { /** * Runs every six hours, with an initial delay of one minute. @@ -56,59 +52,50 @@ class SeqSetCrossRefCitationsTask( fixedRateString = "\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}", timeUnit = java.util.concurrent.TimeUnit.MINUTES, ) + @TaskLock( + name = SEQ_SET_CITATIONS_TASK_NAME, + intervalString = "\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}", + timeUnit = TimeUnit.MINUTES, + ) fun task() { - if (!taskLockService.acquireLock( - SEQ_SET_CITATIONS_TASK_NAME, - frequencyIntervalSeconds = TimeUnit.MINUTES.toSeconds(runEveryMinutes), - ) - ) { + log.info { "Updating SeqSet CrossRef citations..." } + if (!crossRefService.isActive) { + log.info { "CrossRef service is not active, skipping SeqSet citation update." } return } - log.info { "Updating SeqSet CrossRef citations..." } - try { - if (!crossRefService.isActive) { - log.info { "CrossRef service is not active, skipping SeqSet citation update." } - return - } - val doiPrefix = crossRefService.doiPrefix - if (doiPrefix.isNullOrBlank()) { - log.info { "CrossRef service has no DOI prefix, skipping SeqSet citation update." } - return - } + val doiPrefix = crossRefService.doiPrefix + if (doiPrefix.isNullOrBlank()) { + log.info { "CrossRef service has no DOI prefix, skipping SeqSet citation update." } + return + } - log.info { "Fetching CrossRef citations for DOI prefix: $doiPrefix" } - val citedByResult = crossRefService.getCrossRefCitedBy(doiPrefix) - if (citedByResult.validationErrors.isNotEmpty()) { + log.info { "Fetching CrossRef citations for DOI prefix: $doiPrefix" } + val citedByResult = crossRefService.getCrossRefCitedBy(doiPrefix) + if (citedByResult.validationErrors.isNotEmpty()) { + log.warn { + "Skipped ${citedByResult.validationErrors.size} CrossRef citation(s) due to validation errors." + } + citedByResult.validationErrors.forEach { error -> log.warn { - "Skipped ${citedByResult.validationErrors.size} CrossRef citation(s) due to validation errors." - } - citedByResult.validationErrors.forEach { error -> - log.warn { - "Validation error: ${error.reason}" - } + "Validation error: ${error.reason}" } } - val citationSources = mergeCitationSources(citedByResult.sources) - val seqSetDOIs = citationSources.flatMap { it.seqSetDOIs }.toSet() - log.info { - "Fetched ${citationSources.size} citation source(s) from CrossRef covering ${seqSetDOIs.size} SeqSet DOI(s)." - } - if (citationSources.isEmpty()) return + } + val citationSources = mergeCitationSources(citedByResult.sources) + val seqSetDOIs = citationSources.flatMap { it.seqSetDOIs }.toSet() + log.info { + "Fetched ${citationSources.size} citation source(s) from CrossRef covering ${seqSetDOIs.size} SeqSet DOI(s)." + } + if (citationSources.isEmpty()) return - val updateResult = seqSetCitationsDatabaseService.updateCitationSourcesFromCrossRef(citationSources) - if (updateResult.updatedCitationSourceDOIs.isNotEmpty()) { - log.info { "Updated ${updateResult.updatedCitationSourceDOIs.size} citation source(s)." } - } - val skippedCitationSources = citationSources.size - updateResult.updatedCitationSourceDOIs.size - if (skippedCitationSources > 0) { - log.warn { "Skipped $skippedCitationSources citation source(s) with no matching SeqSet." } - } - } finally { - taskLockService.releaseLock( - SEQ_SET_CITATIONS_TASK_NAME, - frequencyIntervalSeconds = TimeUnit.MINUTES.toSeconds(runEveryMinutes), - ) + val updateResult = seqSetCitationsDatabaseService.updateCitationSourcesFromCrossRef(citationSources) + if (updateResult.updatedCitationSourceDOIs.isNotEmpty()) { + log.info { "Updated ${updateResult.updatedCitationSourceDOIs.size} citation source(s)." } + } + val skippedCitationSources = citationSources.size - updateResult.updatedCitationSourceDOIs.size + if (skippedCitationSources > 0) { + log.warn { "Skipped $skippedCitationSources citation source(s) with no matching SeqSet." } } } } diff --git a/backend/src/main/kotlin/org/loculus/backend/service/submission/CleanUpStaleSequencesInProcessingTask.kt b/backend/src/main/kotlin/org/loculus/backend/service/submission/CleanUpStaleSequencesInProcessingTask.kt index 7c62b28a39..43e8474eb5 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/submission/CleanUpStaleSequencesInProcessingTask.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/submission/CleanUpStaleSequencesInProcessingTask.kt @@ -1,7 +1,7 @@ package org.loculus.backend.service.submission import org.loculus.backend.config.BackendSpringProperty -import org.loculus.backend.service.scheduler.TaskLockService +import org.loculus.backend.service.scheduler.TaskLock import org.springframework.beans.factory.annotation.Value import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component @@ -14,27 +14,15 @@ const val CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME = "clean-up-stale-seq @Component class CleanUpStaleSequencesInProcessingTask( private val submissionDatabaseService: SubmissionDatabaseService, - private val taskLockService: TaskLockService, @Value("\${${BackendSpringProperty.STALE_AFTER_SECONDS}}") private val timeToStaleInSeconds: Long, - @Value("\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}") private val runEverySeconds: Long, ) { @Scheduled(fixedRateString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}", timeUnit = TimeUnit.SECONDS) + @TaskLock( + name = CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME, + intervalString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}", + ) fun task() { - if (!taskLockService.acquireLock( - CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME, - frequencyIntervalSeconds = runEverySeconds, - ) - ) { - return - } - try { - log.info { "Cleaning up stale sequences in processing, timeToStaleInSeconds: $timeToStaleInSeconds" } - submissionDatabaseService.cleanUpStaleSequencesInProcessing(timeToStaleInSeconds) - } finally { - taskLockService.releaseLock( - CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME, - frequencyIntervalSeconds = runEverySeconds, - ) - } + log.info { "Cleaning up stale sequences in processing, timeToStaleInSeconds: $timeToStaleInSeconds" } + submissionDatabaseService.cleanUpStaleSequencesInProcessing(timeToStaleInSeconds) } } diff --git a/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt b/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt index b53b795cfe..d3953ba565 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt @@ -1,8 +1,7 @@ package org.loculus.backend.service.submission import org.loculus.backend.config.BackendSpringProperty -import org.loculus.backend.service.scheduler.TaskLockService -import org.springframework.beans.factory.annotation.Value +import org.loculus.backend.service.scheduler.TaskLock import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component import java.util.concurrent.TimeUnit @@ -12,13 +11,7 @@ private val log = mu.KotlinLogging.logger {} const val USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME = "use-newer-processing-pipeline-version" @Component -class UseNewerProcessingPipelineVersionTask( - private val submissionDatabaseService: SubmissionDatabaseService, - private val taskLockService: TaskLockService, - @Value( - "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}", - ) private val lockIntervalSeconds: Long, -) { +class UseNewerProcessingPipelineVersionTask(private val submissionDatabaseService: SubmissionDatabaseService) { // Initial delay to avoid hammering the database on backend startup @Scheduled( @@ -28,35 +21,25 @@ class UseNewerProcessingPipelineVersionTask( fixedDelayString = "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}", timeUnit = TimeUnit.SECONDS, ) + @TaskLock( + name = USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME, + intervalString = "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}", + ) fun task() { - if (!taskLockService.acquireLock( - USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME, - frequencyIntervalSeconds = lockIntervalSeconds, - ) - ) { - return - } - try { - log.info { "Checking for newer preprocessing pipeline versions" } - val newVersions = submissionDatabaseService.useNewerProcessingPipelineIfPossible() + log.info { "Checking for newer preprocessing pipeline versions" } + val newVersions = submissionDatabaseService.useNewerProcessingPipelineIfPossible() - newVersions.forEach { (organism, latestVersion) -> - if (latestVersion != null) { - submissionDatabaseService.cleanUpOutdatedPreprocessingData(organism, latestVersion - 1) - } + newVersions.forEach { (organism, latestVersion) -> + if (latestVersion != null) { + submissionDatabaseService.cleanUpOutdatedPreprocessingData(organism, latestVersion - 1) } + } - val upgradedOrganisms = newVersions.filterValues { it != null } - if (upgradedOrganisms.isNotEmpty()) { - log.info { "Completed pipeline version upgrade check: upgraded ${upgradedOrganisms.size} organism(s)" } - } else { - log.debug { "Completed pipeline version upgrade check: no upgrades needed" } - } - } finally { - taskLockService.releaseLock( - USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME, - frequencyIntervalSeconds = lockIntervalSeconds, - ) + val upgradedOrganisms = newVersions.filterValues { it != null } + if (upgradedOrganisms.isNotEmpty()) { + log.info { "Completed pipeline version upgrade check: upgraded ${upgradedOrganisms.size} organism(s)" } + } else { + log.debug { "Completed pipeline version upgrade check: no upgrades needed" } } } } diff --git a/backend/src/test/kotlin/org/loculus/backend/service/scheduler/TaskLockAspectTest.kt b/backend/src/test/kotlin/org/loculus/backend/service/scheduler/TaskLockAspectTest.kt new file mode 100644 index 0000000000..6000ff96a1 --- /dev/null +++ b/backend/src/test/kotlin/org/loculus/backend/service/scheduler/TaskLockAspectTest.kt @@ -0,0 +1,72 @@ +package org.loculus.backend.service.scheduler + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.aspectj.lang.ProceedingJoinPoint +import org.hamcrest.MatcherAssert.assertThat +import org.hamcrest.Matchers.`is` +import org.hamcrest.Matchers.nullValue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.util.concurrent.TimeUnit + +class TaskLockAspectTest { + private val taskLockService = mockk(relaxed = true) + private val joinPoint = mockk() + private val aspect = TaskLockAspect(taskLockService).also { + it.setEmbeddedValueResolver { value -> if (value == "\${task.interval}") "10" else value } + } + + @Test + fun `WHEN lock cannot be acquired THEN task body is skipped`() { + every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 10) } returns false + + val result = aspect.lockTask(joinPoint, taskLock()) + + assertThat(result, `is`(nullValue())) + verify(exactly = 0) { joinPoint.proceed() } + verify(exactly = 0) { taskLockService.releaseLock(any(), any()) } + } + + @Test + fun `WHEN lock is acquired THEN task body is run and lock is released`() { + every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 10) } returns true + every { joinPoint.proceed() } returns "done" + + val result = aspect.lockTask(joinPoint, taskLock()) + + assertThat(result, `is`("done")) + verify { joinPoint.proceed() } + verify { taskLockService.releaseLock("test-task", frequencyIntervalSeconds = 10) } + } + + @Test + fun `WHEN task body throws THEN lock is still released`() { + every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 10) } returns true + every { joinPoint.proceed() } throws IllegalStateException("boom") + + assertThrows { + aspect.lockTask(joinPoint, taskLock()) + } + + verify { taskLockService.releaseLock("test-task", frequencyIntervalSeconds = 10) } + } + + @Test + fun `WHEN lock interval uses minutes THEN interval is converted to seconds`() { + every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 600) } returns true + every { joinPoint.proceed() } returns Unit + + aspect.lockTask(joinPoint, taskLock(timeUnit = TimeUnit.MINUTES)) + + verify { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 600) } + verify { taskLockService.releaseLock("test-task", frequencyIntervalSeconds = 600) } + } + + private fun taskLock(timeUnit: TimeUnit = TimeUnit.SECONDS) = TaskLock( + name = "test-task", + intervalString = "\${task.interval}", + timeUnit = timeUnit, + ) +}