Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ repositories {

dependencies {
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-aop"
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
implementation "org.jetbrains.kotlin:kotlin-reflect"
Expand Down
2 changes: 2 additions & 0 deletions backend/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ org.apache.tomcat.embed:tomcat-embed-core:10.1.48=compileClasspath,productionRun
org.apache.tomcat.embed:tomcat-embed-el:10.1.48=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.apache.tomcat.embed:tomcat-embed-websocket:10.1.48=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath
org.aspectj:aspectjweaver:1.9.24=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.assertj:assertj-core:3.27.6=testCompileClasspath,testRuntimeClasspath
org.awaitility:awaitility:4.3.0=testCompileClasspath,testRuntimeClasspath
org.bouncycastle:bcpg-jdk18on:1.80=kotlinBouncyCastleConfiguration
Expand Down Expand Up @@ -254,6 +255,7 @@ org.springframework.boot:spring-boot-actuator-autoconfigure:3.5.7=compileClasspa
org.springframework.boot:spring-boot-actuator:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-autoconfigure:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-actuator:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-aop:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-jdbc:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-json:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-logging:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.loculus.backend.service.scheduler

import java.util.concurrent.TimeUnit

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TaskLock(val name: String, val intervalString: String, val timeUnit: TimeUnit = TimeUnit.SECONDS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.loculus.backend.service.scheduler

import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.Around
import org.aspectj.lang.annotation.Aspect
import org.springframework.context.EmbeddedValueResolverAware
import org.springframework.stereotype.Component
import org.springframework.util.StringValueResolver

@Aspect
@Component
class TaskLockAspect(private val taskLockService: TaskLockService) : EmbeddedValueResolverAware {
private lateinit var embeddedValueResolver: StringValueResolver
Comment on lines +12 to +13

override fun setEmbeddedValueResolver(resolver: StringValueResolver) {
embeddedValueResolver = resolver
}

@Around(value = "@annotation(taskLock)", argNames = "joinPoint,taskLock")
fun lockTask(joinPoint: ProceedingJoinPoint, taskLock: TaskLock): Any? {
val intervalSeconds = taskLock.timeUnit.toSeconds(resolveInterval(taskLock))
if (!taskLockService.acquireLock(taskLock.name, frequencyIntervalSeconds = intervalSeconds)) return null

Comment on lines +20 to +23
try {
return joinPoint.proceed()
} finally {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intervalSeconds is computed before acquireLock is called, so if resolveInterval throws (unresolvable or non-numeric property), the lock is never acquired and there's nothing to release — correct. Just noting this ordering is load-bearing.

taskLockService.releaseLock(taskLock.name, frequencyIntervalSeconds = intervalSeconds)
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring AOP proxies only intercept calls coming through the proxy (i.e., from external callers). If task() were ever called from within the same bean, the lock annotation would be silently ignored. This is the standard Spring AOP limitation — worth a brief comment since it's non-obvious that @TaskLock won't fire on self-calls.

private fun resolveInterval(taskLock: TaskLock): Long {
val resolvedInterval = embeddedValueResolver.resolveStringValue(taskLock.intervalString)
?: throw IllegalArgumentException("Could not resolve lock interval for task '${taskLock.name}'")

return resolvedInterval.toLongOrNull()
?: throw IllegalArgumentException(
"Lock interval for task '${taskLock.name}' must resolve to a whole number, but was '$resolvedInterval'",
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import org.loculus.backend.api.SeqSetCitationSource
import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.config.ENABLE_SEQSETS_TRUE_VALUE
import org.loculus.backend.service.crossref.CrossRefService
import org.loculus.backend.service.scheduler.TaskLockService
import org.springframework.beans.factory.annotation.Value
import org.loculus.backend.service.scheduler.TaskLock
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
Expand Down Expand Up @@ -39,9 +38,6 @@ internal fun mergeCitationSources(citationSources: List<SeqSetCitationSource>):
class SeqSetCrossRefCitationsTask(
private val crossRefService: CrossRefService,
private val seqSetCitationsDatabaseService: SeqSetCitationsDatabaseService,
private val taskLockService: TaskLockService,
@Value("\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}")
private val runEveryMinutes: Long,
) {
/**
* Runs every six hours, with an initial delay of one minute.
Expand All @@ -56,59 +52,50 @@ class SeqSetCrossRefCitationsTask(
fixedRateString = "\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}",
timeUnit = java.util.concurrent.TimeUnit.MINUTES,
)
@TaskLock(
name = SEQ_SET_CITATIONS_TASK_NAME,
intervalString = "\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}",
timeUnit = TimeUnit.MINUTES,
)
fun task() {
if (!taskLockService.acquireLock(
SEQ_SET_CITATIONS_TASK_NAME,
frequencyIntervalSeconds = TimeUnit.MINUTES.toSeconds(runEveryMinutes),
)
) {
log.info { "Updating SeqSet CrossRef citations..." }
if (!crossRefService.isActive) {
log.info { "CrossRef service is not active, skipping SeqSet citation update." }
return
}
log.info { "Updating SeqSet CrossRef citations..." }
try {
if (!crossRefService.isActive) {
log.info { "CrossRef service is not active, skipping SeqSet citation update." }
return
}

val doiPrefix = crossRefService.doiPrefix
if (doiPrefix.isNullOrBlank()) {
log.info { "CrossRef service has no DOI prefix, skipping SeqSet citation update." }
return
}
val doiPrefix = crossRefService.doiPrefix
if (doiPrefix.isNullOrBlank()) {
log.info { "CrossRef service has no DOI prefix, skipping SeqSet citation update." }
return
}

log.info { "Fetching CrossRef citations for DOI prefix: $doiPrefix" }
val citedByResult = crossRefService.getCrossRefCitedBy(doiPrefix)
if (citedByResult.validationErrors.isNotEmpty()) {
log.info { "Fetching CrossRef citations for DOI prefix: $doiPrefix" }
val citedByResult = crossRefService.getCrossRefCitedBy(doiPrefix)
if (citedByResult.validationErrors.isNotEmpty()) {
log.warn {
"Skipped ${citedByResult.validationErrors.size} CrossRef citation(s) due to validation errors."
}
citedByResult.validationErrors.forEach { error ->
log.warn {
"Skipped ${citedByResult.validationErrors.size} CrossRef citation(s) due to validation errors."
}
citedByResult.validationErrors.forEach { error ->
log.warn {
"Validation error: ${error.reason}"
}
"Validation error: ${error.reason}"
}
}
val citationSources = mergeCitationSources(citedByResult.sources)
val seqSetDOIs = citationSources.flatMap { it.seqSetDOIs }.toSet()
log.info {
"Fetched ${citationSources.size} citation source(s) from CrossRef covering ${seqSetDOIs.size} SeqSet DOI(s)."
}
if (citationSources.isEmpty()) return
}
val citationSources = mergeCitationSources(citedByResult.sources)
val seqSetDOIs = citationSources.flatMap { it.seqSetDOIs }.toSet()
log.info {
"Fetched ${citationSources.size} citation source(s) from CrossRef covering ${seqSetDOIs.size} SeqSet DOI(s)."
}
if (citationSources.isEmpty()) return

val updateResult = seqSetCitationsDatabaseService.updateCitationSourcesFromCrossRef(citationSources)
if (updateResult.updatedCitationSourceDOIs.isNotEmpty()) {
log.info { "Updated ${updateResult.updatedCitationSourceDOIs.size} citation source(s)." }
}
val skippedCitationSources = citationSources.size - updateResult.updatedCitationSourceDOIs.size
if (skippedCitationSources > 0) {
log.warn { "Skipped $skippedCitationSources citation source(s) with no matching SeqSet." }
}
} finally {
taskLockService.releaseLock(
SEQ_SET_CITATIONS_TASK_NAME,
frequencyIntervalSeconds = TimeUnit.MINUTES.toSeconds(runEveryMinutes),
)
val updateResult = seqSetCitationsDatabaseService.updateCitationSourcesFromCrossRef(citationSources)
if (updateResult.updatedCitationSourceDOIs.isNotEmpty()) {
log.info { "Updated ${updateResult.updatedCitationSourceDOIs.size} citation source(s)." }
}
val skippedCitationSources = citationSources.size - updateResult.updatedCitationSourceDOIs.size
if (skippedCitationSources > 0) {
log.warn { "Skipped $skippedCitationSources citation source(s) with no matching SeqSet." }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.loculus.backend.service.submission

import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.service.scheduler.TaskLockService
import org.loculus.backend.service.scheduler.TaskLock
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
Expand All @@ -14,27 +14,15 @@ const val CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME = "clean-up-stale-seq
@Component
class CleanUpStaleSequencesInProcessingTask(
private val submissionDatabaseService: SubmissionDatabaseService,
private val taskLockService: TaskLockService,
@Value("\${${BackendSpringProperty.STALE_AFTER_SECONDS}}") private val timeToStaleInSeconds: Long,
@Value("\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}") private val runEverySeconds: Long,
) {
@Scheduled(fixedRateString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}", timeUnit = TimeUnit.SECONDS)
@TaskLock(
name = CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interval property is specified twice — once in @Scheduled(fixedRateString = ...) and again in @TaskLock(intervalString = ...). They must stay in sync; a silent mismatch would cause the lock to be held for the wrong duration without any error. A comment here that both refer to the same property would help, or document this in the @TaskLock KDoc.

intervalString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}",
)
fun task() {
if (!taskLockService.acquireLock(
CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME,
frequencyIntervalSeconds = runEverySeconds,
)
) {
return
}
try {
log.info { "Cleaning up stale sequences in processing, timeToStaleInSeconds: $timeToStaleInSeconds" }
submissionDatabaseService.cleanUpStaleSequencesInProcessing(timeToStaleInSeconds)
} finally {
taskLockService.releaseLock(
CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME,
frequencyIntervalSeconds = runEverySeconds,
)
}
log.info { "Cleaning up stale sequences in processing, timeToStaleInSeconds: $timeToStaleInSeconds" }
submissionDatabaseService.cleanUpStaleSequencesInProcessing(timeToStaleInSeconds)
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.loculus.backend.service.submission

import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.service.scheduler.TaskLockService
import org.springframework.beans.factory.annotation.Value
import org.loculus.backend.service.scheduler.TaskLock
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit
Expand All @@ -12,13 +11,7 @@ private val log = mu.KotlinLogging.logger {}
const val USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME = "use-newer-processing-pipeline-version"

@Component
class UseNewerProcessingPipelineVersionTask(
private val submissionDatabaseService: SubmissionDatabaseService,
private val taskLockService: TaskLockService,
@Value(
"\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}",
) private val lockIntervalSeconds: Long,
) {
class UseNewerProcessingPipelineVersionTask(private val submissionDatabaseService: SubmissionDatabaseService) {

// Initial delay to avoid hammering the database on backend startup
@Scheduled(
Expand All @@ -28,35 +21,25 @@ class UseNewerProcessingPipelineVersionTask(
fixedDelayString = "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}",
timeUnit = TimeUnit.SECONDS,
)
@TaskLock(
name = USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME,
intervalString = "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}",
)
fun task() {
if (!taskLockService.acquireLock(
USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME,
frequencyIntervalSeconds = lockIntervalSeconds,
)
) {
return
}
try {
log.info { "Checking for newer preprocessing pipeline versions" }
val newVersions = submissionDatabaseService.useNewerProcessingPipelineIfPossible()
log.info { "Checking for newer preprocessing pipeline versions" }
val newVersions = submissionDatabaseService.useNewerProcessingPipelineIfPossible()

newVersions.forEach { (organism, latestVersion) ->
if (latestVersion != null) {
submissionDatabaseService.cleanUpOutdatedPreprocessingData(organism, latestVersion - 1)
}
newVersions.forEach { (organism, latestVersion) ->
if (latestVersion != null) {
submissionDatabaseService.cleanUpOutdatedPreprocessingData(organism, latestVersion - 1)
}
}

val upgradedOrganisms = newVersions.filterValues { it != null }
if (upgradedOrganisms.isNotEmpty()) {
log.info { "Completed pipeline version upgrade check: upgraded ${upgradedOrganisms.size} organism(s)" }
} else {
log.debug { "Completed pipeline version upgrade check: no upgrades needed" }
}
} finally {
taskLockService.releaseLock(
USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME,
frequencyIntervalSeconds = lockIntervalSeconds,
)
val upgradedOrganisms = newVersions.filterValues { it != null }
if (upgradedOrganisms.isNotEmpty()) {
log.info { "Completed pipeline version upgrade check: upgraded ${upgradedOrganisms.size} organism(s)" }
} else {
log.debug { "Completed pipeline version upgrade check: no upgrades needed" }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.loculus.backend.service.scheduler

import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.aspectj.lang.ProceedingJoinPoint
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.`is`
import org.hamcrest.Matchers.nullValue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.util.concurrent.TimeUnit

class TaskLockAspectTest {
private val taskLockService = mockk<TaskLockService>(relaxed = true)
private val joinPoint = mockk<ProceedingJoinPoint>()
private val aspect = TaskLockAspect(taskLockService).also {
it.setEmbeddedValueResolver { value -> if (value == "\${task.interval}") "10" else value }
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing tests for the two error paths in resolveInterval:

  1. Property placeholder that resolves to null (should throw IllegalArgumentException)
  2. Property that resolves to a non-numeric string like "10s" (should throw IllegalArgumentException)

These paths are in the implementation but have no coverage. Fix this →


@Test
fun `WHEN lock cannot be acquired THEN task body is skipped`() {
every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 10) } returns false

val result = aspect.lockTask(joinPoint, taskLock())

assertThat(result, `is`(nullValue()))
verify(exactly = 0) { joinPoint.proceed() }
verify(exactly = 0) { taskLockService.releaseLock(any(), any()) }
}

@Test
fun `WHEN lock is acquired THEN task body is run and lock is released`() {
every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 10) } returns true
every { joinPoint.proceed() } returns "done"

val result = aspect.lockTask(joinPoint, taskLock())

assertThat(result, `is`("done"))
verify { joinPoint.proceed() }
verify { taskLockService.releaseLock("test-task", frequencyIntervalSeconds = 10) }
}

@Test
fun `WHEN task body throws THEN lock is still released`() {
every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 10) } returns true
every { joinPoint.proceed() } throws IllegalStateException("boom")

assertThrows<IllegalStateException> {
aspect.lockTask(joinPoint, taskLock())
}

verify { taskLockService.releaseLock("test-task", frequencyIntervalSeconds = 10) }
}

@Test
fun `WHEN lock interval uses minutes THEN interval is converted to seconds`() {
every { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 600) } returns true
every { joinPoint.proceed() } returns Unit

aspect.lockTask(joinPoint, taskLock(timeUnit = TimeUnit.MINUTES))

verify { taskLockService.acquireLock("test-task", frequencyIntervalSeconds = 600) }
verify { taskLockService.releaseLock("test-task", frequencyIntervalSeconds = 600) }
}

private fun taskLock(timeUnit: TimeUnit = TimeUnit.SECONDS) = TaskLock(
name = "test-task",
intervalString = "\${task.interval}",
timeUnit = timeUnit,
)
}
Loading