Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ repositories {

dependencies {
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-aop"
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
implementation "org.jetbrains.kotlin:kotlin-reflect"
Expand Down
21 changes: 21 additions & 0 deletions backend/docs/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,19 @@ CREATE TABLE public.table_update_tracker (

ALTER TABLE public.table_update_tracker OWNER TO postgres;

--
-- Name: task_lock; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.task_lock (
task_name text NOT NULL,
started_at timestamp with time zone DEFAULT now() NOT NULL,
locked_until timestamp with time zone NOT NULL
);


ALTER TABLE public.task_lock OWNER TO postgres;

--
-- Name: user_groups_table; Type: TABLE; Schema: public; Owner: postgres
--
Expand Down Expand Up @@ -912,6 +925,14 @@ ALTER TABLE ONLY public.table_update_tracker
ADD CONSTRAINT table_update_tracker_unique UNIQUE NULLS NOT DISTINCT (table_name, organism, pipeline_version);


--
-- Name: task_lock task_lock_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--

ALTER TABLE ONLY public.task_lock
ADD CONSTRAINT task_lock_pkey PRIMARY KEY (task_name);


--
-- Name: user_groups_table user_groups_table_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--
Expand Down
2 changes: 2 additions & 0 deletions backend/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ org.apache.tomcat.embed:tomcat-embed-core:10.1.48=compileClasspath,productionRun
org.apache.tomcat.embed:tomcat-embed-el:10.1.48=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.apache.tomcat.embed:tomcat-embed-websocket:10.1.48=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath
org.aspectj:aspectjweaver:1.9.24=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.assertj:assertj-core:3.27.6=testCompileClasspath,testRuntimeClasspath
org.awaitility:awaitility:4.3.0=testCompileClasspath,testRuntimeClasspath
org.bouncycastle:bcpg-jdk18on:1.80=kotlinBouncyCastleConfiguration
Expand Down Expand Up @@ -254,6 +255,7 @@ org.springframework.boot:spring-boot-actuator-autoconfigure:3.5.7=compileClasspa
org.springframework.boot:spring-boot-actuator:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-autoconfigure:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-actuator:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-aop:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-jdbc:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-json:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
org.springframework.boot:spring-boot-starter-logging:3.5.7=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ object BackendSpringProperty {
const val STREAM_BATCH_SIZE = "loculus.stream.batch-size"
const val DEBUG_MODE = "loculus.debug-mode"
const val ENABLE_SEQSETS = "loculus.enable-seqsets"
const val SEQSET_CITATIONS_RUN_EVERY_MINUTES = "loculus.seqset-citations.run-every-minutes"
const val CLEAN_UP_AUX_TABLE_RUN_EVERY_HOURS = "loculus.maintenance.clean-up-aux-table.run-every-hours"

const val S3_ENABLED = "loculus.s3.enabled"
const val S3_BUCKET_ENDPOINT = "loculus.s3.bucket.endpoint"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.loculus.backend.service.scheduler

import java.util.concurrent.TimeUnit

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TaskLock(val name: String, val intervalString: String, val timeUnit: TimeUnit = TimeUnit.SECONDS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.loculus.backend.service.scheduler

import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.Around
import org.aspectj.lang.annotation.Aspect
import org.springframework.context.EmbeddedValueResolverAware
import org.springframework.stereotype.Component
import org.springframework.util.StringValueResolver

@Aspect
@Component
class TaskLockAspect(private val taskLockService: TaskLockService) : EmbeddedValueResolverAware {
private lateinit var embeddedValueResolver: StringValueResolver

override fun setEmbeddedValueResolver(resolver: StringValueResolver) {
embeddedValueResolver = resolver
}

@Around(value = "@annotation(taskLock)", argNames = "joinPoint,taskLock")
fun lockTask(joinPoint: ProceedingJoinPoint, taskLock: TaskLock): Any? {
val intervalSeconds = taskLock.timeUnit.toSeconds(resolveInterval(taskLock))
if (!taskLockService.acquireLock(taskLock.name, frequencyIntervalSeconds = intervalSeconds)) return null

try {
return joinPoint.proceed()
} finally {
taskLockService.releaseLock(taskLock.name, frequencyIntervalSeconds = intervalSeconds)
}
}

private fun resolveInterval(taskLock: TaskLock): Long {
val resolvedInterval = embeddedValueResolver.resolveStringValue(taskLock.intervalString)
?: throw IllegalArgumentException("Could not resolve lock interval for task '${taskLock.name}'")

return resolvedInterval.toLongOrNull()
?: throw IllegalArgumentException(
"Lock interval for task '${taskLock.name}' must resolve to a whole number, but was '$resolvedInterval'",
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.loculus.backend.service.scheduler

import mu.KotlinLogging
import org.jetbrains.exposed.sql.LongColumnType
import org.jetbrains.exposed.sql.TextColumnType
import org.jetbrains.exposed.sql.statements.StatementType
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service

private val log = KotlinLogging.logger {}

const val TASK_LOCK_TABLE_NAME = "task_lock"

/**
* Lock can be acquired when current time > `locked_until` (or no row for this task exists)
* When acquiring, `locked_until` is set to `currentTime + maxLockFactor * frequencyIntervalSeconds`
* When releasing, `locked_until` is reduced to `started_at + minLockFactor * frequencyIntervalSeconds`
* maxLockFactor prevents concurrent execution even if tasks run longer than the interval
* minLockFactor prevents multiple backends from starting new tasks before the interval is almost over
*/
@Service
class TaskLockService(
Comment thread
anna-parker marked this conversation as resolved.
@Value("\${loculus.task-lock.min-lock-factor:0.9}") private val minLockFactor: Double,
@Value("\${loculus.task-lock.max-lock-factor:5.0}") private val maxLockFactor: Double,
) {

/**
* Attempts to acquire a lock for the given task.
*
* If the task dies or is terminated, the lock will be released after [maxDuration] seconds.
*
* @param taskName unique name identifying the task.
* @param maxDuration maximum duration for which to hold the lock in seconds.
* @return true if the lock was acquired, false if another instance holds it.
*/
fun acquireLock(taskName: String, frequencyIntervalSeconds: Long): Boolean = transaction {
val maxDuration = (frequencyIntervalSeconds * maxLockFactor).toLong()
val acquired = exec(
"""
WITH lock_attempt AS (
INSERT INTO task_lock (task_name, started_at, locked_until)
VALUES (?, NOW(), NOW() + (? * interval '1 second'))
ON CONFLICT (task_name) DO UPDATE
SET started_at = NOW(), locked_until = NOW() + (? * interval '1 second')
WHERE task_lock.locked_until <= NOW()
RETURNING task_name
)
SELECT COUNT(*) FROM lock_attempt
Comment thread
anna-parker marked this conversation as resolved.
""".trimIndent(),
args = listOf(
TextColumnType() to taskName,
LongColumnType() to maxDuration,
LongColumnType() to maxDuration,
),
// The CTE starts with INSERT, so Exposed would default to StatementType.INSERT and
// not return a ResultSet. Overriding to SELECT lets us read the outer COUNT(*).
explicitStatementType = StatementType.SELECT,
) { rs ->
rs.next() && rs.getLong(1) > 0L
} ?: false

if (!acquired) {
log.debug {
"Task '$taskName' skipped: another replica acquired the lock within the last ${maxDuration}s"
}
}
acquired
}

/**
* Attempts to "release" a lock for the given task, this is only possible if the
* lock is considered expired based on the [minLockFactor].
* If locked_until is still in the future update locked_until to the minimum duration, otherwise do nothing.
*
* @param taskName unique name identifying the task.
*/
fun releaseLock(taskName: String, frequencyIntervalSeconds: Long) = transaction {
// The effective lock duration is shortened by [minLockFactor] to prevent tasks
// from being blocked after their scheduled interval due to minor clock skew,
// execution delays, or lock acquisition latency.
val minDuration = (frequencyIntervalSeconds * minLockFactor).toLong()

val updated = exec(
"""
UPDATE task_lock
SET locked_until = started_at + (? * interval '1 second')
WHERE task_name = ?
AND locked_until > NOW()
RETURNING task_name
""".trimIndent(),
args = listOf(
LongColumnType() to minDuration,
TextColumnType() to taskName,
),
explicitStatementType = StatementType.SELECT,
) { rs ->
rs.next()
} ?: false

if (updated) {
log.debug {
"Task '$taskName' lock: 'locked_until' shortened to minimum duration (${minDuration}s)"
}
} else {
log.debug {
"Task '$taskName' lock: not shortened because 'locked_until' has already elapsed"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import org.loculus.backend.api.SeqSetCitationSource
import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.config.ENABLE_SEQSETS_TRUE_VALUE
import org.loculus.backend.service.crossref.CrossRefService
import org.loculus.backend.service.scheduler.TaskLock
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit

private val log = mu.KotlinLogging.logger {}

const val SEQ_SET_CITATIONS_TASK_NAME = "seq-set-cross-ref-citations"

internal fun mergeCitationSources(citationSources: List<SeqSetCitationSource>): Set<SeqSetCitationSource> {
val mergedSources = mutableMapOf<String, SeqSetCitationSource>()

Expand Down Expand Up @@ -45,12 +49,16 @@ class SeqSetCrossRefCitationsTask(
*/
@Scheduled(
initialDelay = 1,
fixedDelay = 360,
fixedRateString = "\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}",
timeUnit = java.util.concurrent.TimeUnit.MINUTES,
)
@TaskLock(
name = SEQ_SET_CITATIONS_TASK_NAME,
intervalString = "\${${BackendSpringProperty.SEQSET_CITATIONS_RUN_EVERY_MINUTES}}",
timeUnit = TimeUnit.MINUTES,
)
fun task() {
log.info { "Updating SeqSet CrossRef citations..." }

if (!crossRefService.isActive) {
log.info { "CrossRef service is not active, skipping SeqSet citation update." }
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,62 @@ package org.loculus.backend.service.maintenance
import kotlinx.datetime.DateTimeUnit
import kotlinx.datetime.minus
import kotlinx.datetime.toLocalDateTime
import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.log.AuditLogger
import org.loculus.backend.service.scheduler.TaskLockService
import org.loculus.backend.service.submission.UploadDatabaseService
import org.loculus.backend.utils.DateProvider
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit

private val log = mu.KotlinLogging.logger {}

const val CLEAN_UP_AUX_TABLE_TASK_NAME = "clean-up-aux-table"

@Component
class CleanUpAuxTableTask(
private val uploadDatabaseService: UploadDatabaseService,
private val taskLockService: TaskLockService,
private val dateProvider: DateProvider,
private val auditLogger: AuditLogger,
@Value("\${${BackendSpringProperty.CLEAN_UP_AUX_TABLE_RUN_EVERY_HOURS}}")
private val runEveryHours: Long,
) {

/**
* Runs every hour and deletes auxTable entries older than 24 hours.
* Scheduled to poll hourly; the task lock limits actual execution to at most once per [runEveryHours].
* Deletes auxTable entries older than 24 hours.
*/
@Scheduled(fixedDelay = 1, timeUnit = java.util.concurrent.TimeUnit.HOURS)
fun task() {
val hourCutoff = 24L
val now = dateProvider.getCurrentInstant()
val thresholdInstant = now.minus(
hourCutoff,
DateTimeUnit.HOUR,
DateProvider.timeZone,
).toLocalDateTime(DateProvider.timeZone)
val deletedCount = uploadDatabaseService.deleteAuxTableEntriesOlderThan(thresholdInstant)
if (!taskLockService.acquireLock(
CLEAN_UP_AUX_TABLE_TASK_NAME,
frequencyIntervalSeconds = TimeUnit.HOURS.toSeconds(runEveryHours),
)
) {
return
}
try {
val hourCutoff = 24L
val now = dateProvider.getCurrentInstant()
val thresholdInstant = now.minus(
hourCutoff,
DateTimeUnit.HOUR,
DateProvider.timeZone,
).toLocalDateTime(DateProvider.timeZone)
val deletedCount = uploadDatabaseService.deleteAuxTableEntriesOlderThan(thresholdInstant)

if (deletedCount > 0) {
log.info { "Deleted $deletedCount auxTable entries older than $hourCutoff" }
auditLogger.log("CLEANUP", "Deleted $deletedCount auxTable entries older than $hourCutoff hours.")
if (deletedCount > 0) {
log.info { "Deleted $deletedCount auxTable entries older than $hourCutoff" }
auditLogger.log("CLEANUP", "Deleted $deletedCount auxTable entries older than $hourCutoff hours.")
}
} finally {
taskLockService.releaseLock(
CLEAN_UP_AUX_TABLE_TASK_NAME,
frequencyIntervalSeconds = TimeUnit.HOURS.toSeconds(runEveryHours),
)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package org.loculus.backend.service.submission

import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.service.scheduler.TaskLock
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit

private val log = mu.KotlinLogging.logger {}

const val CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME = "clean-up-stale-sequences-in-processing"

@Component
class CleanUpStaleSequencesInProcessingTask(
private val submissionDatabaseService: SubmissionDatabaseService,
@Value("\${${BackendSpringProperty.STALE_AFTER_SECONDS}}") private val timeToStaleInSeconds: Long,
) {
@Scheduled(fixedRateString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}", timeUnit = TimeUnit.SECONDS)
@TaskLock(
name = CLEAN_UP_STALE_SEQUENCES_IN_PROCESSING_TASK_NAME,
intervalString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}",
)
fun task() {
log.info { "Cleaning up stale sequences in processing, timeToStaleInSeconds: $timeToStaleInSeconds" }
submissionDatabaseService.cleanUpStaleSequencesInProcessing(timeToStaleInSeconds)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.loculus.backend.service.submission

import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.service.scheduler.TaskLock
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit

private val log = mu.KotlinLogging.logger {}

const val USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME = "use-newer-processing-pipeline-version"

@Component
class UseNewerProcessingPipelineVersionTask(private val submissionDatabaseService: SubmissionDatabaseService) {

Expand All @@ -18,6 +21,10 @@ class UseNewerProcessingPipelineVersionTask(private val submissionDatabaseServic
fixedDelayString = "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}",
timeUnit = TimeUnit.SECONDS,
)
@TaskLock(
name = USE_NEWER_PROCESSING_PIPELINE_VERSION_TASK_NAME,
intervalString = "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}",
)
fun task() {
log.info { "Checking for newer preprocessing pipeline versions" }
val newVersions = submissionDatabaseService.useNewerProcessingPipelineIfPossible()
Expand Down
Loading
Loading