Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ dependencies {
implementation "org.apache.commons:commons-csv:1.14.1"
implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.14"
implementation "org.flywaydb:flyway-database-postgresql" // Version managed by Spring Boot to ensure compatibility

// ShedLock ensures scheduled tasks run on only one backend replica at a time (distributed lock held in Postgres)
implementation "net.javacrumbs.shedlock:shedlock-spring:7.7.0"
implementation "net.javacrumbs.shedlock:shedlock-provider-jdbc-template:7.7.0"

implementation platform("org.jetbrains.exposed:exposed-bom:0.61.0")
implementation "org.jetbrains.exposed:exposed-spring-boot-starter"
implementation "org.jetbrains.exposed:exposed-jdbc"
Expand Down
22 changes: 22 additions & 0 deletions backend/docs/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,20 @@ CREATE TABLE public.sequence_upload_aux_table (

ALTER TABLE public.sequence_upload_aux_table OWNER TO postgres;

--
-- Name: shedlock; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.shedlock (
name character varying(64) NOT NULL,
lock_until timestamp with time zone NOT NULL,
locked_at timestamp with time zone NOT NULL,
locked_by character varying(255) NOT NULL
);


ALTER TABLE public.shedlock OWNER TO postgres;

--
-- Name: table_update_tracker; Type: TABLE; Schema: public; Owner: postgres
--
Expand Down Expand Up @@ -904,6 +918,14 @@ ALTER TABLE ONLY public.sequence_upload_aux_table
ADD CONSTRAINT sequence_upload_aux_table_pkey PRIMARY KEY (upload_id, fasta_id);


--
-- Name: shedlock shedlock_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--

ALTER TABLE ONLY public.shedlock
ADD CONSTRAINT shedlock_pkey PRIMARY KEY (name);


--
-- Name: table_update_tracker table_update_tracker_unique; Type: CONSTRAINT; Schema: public; Owner: postgres
--
Expand Down
4 changes: 4 additions & 0 deletions backend/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ junit:junit:4.13.2=testRuntimeClasspath
net.bytebuddy:byte-buddy-agent:1.17.8=testCompileClasspath,testRuntimeClasspath
net.bytebuddy:byte-buddy:1.17.8=testCompileClasspath,testRuntimeClasspath
net.java.dev.jna:jna:5.18.1=testCompileClasspath,testRuntimeClasspath
net.javacrumbs.shedlock:shedlock-core:7.7.0=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
net.javacrumbs.shedlock:shedlock-provider-jdbc-template:7.7.0=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
net.javacrumbs.shedlock:shedlock-spring:7.7.0=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
net.javacrumbs.shedlock:shedlock-sql-support:7.7.0=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
net.minidev:accessors-smart:2.5.2=testCompileClasspath,testRuntimeClasspath
net.minidev:json-smart:2.5.2=testCompileClasspath,testRuntimeClasspath
org.apache.commons:commons-compress:1.28.0=compileClasspath,productionRuntimeClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import com.fasterxml.jackson.module.kotlin.readValue
import io.swagger.v3.oas.models.headers.Header
import io.swagger.v3.oas.models.media.StringSchema
import io.swagger.v3.oas.models.parameters.HeaderParameter
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock
import org.flywaydb.core.Flyway
import org.jetbrains.exposed.spring.autoconfigure.ExposedAutoConfiguration
import org.jetbrains.exposed.sql.Database
Expand All @@ -26,6 +29,7 @@ import org.springframework.boot.context.properties.ConfigurationPropertiesScan
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Profile
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Component
import org.springframework.web.filter.CommonsRequestLoggingFilter
Expand Down Expand Up @@ -58,13 +62,29 @@ private val logger = mu.KotlinLogging.logger {}

@Configuration
@EnableScheduling
// Ensures scheduled tasks run on only one replica at a time. `defaultLockAtMostFor` is a safety
// net: if a replica dies while holding a lock, the lock is released after this duration so another
// replica can take over. Individual tasks override it via `@SchedulerLock(lockAtMostForString = ...)`.
@EnableSchedulerLock(defaultLockAtMostFor = "PT30M")
@ImportAutoConfiguration(
value = [ExposedAutoConfiguration::class],
exclude = [DataSourceTransactionManagerAutoConfiguration::class],
)
@ConfigurationPropertiesScan("org.loculus.backend")
class BackendSpringConfig {

/**
* Backs ShedLock with the existing Postgres datasource. `usingDbTime()` makes ShedLock use the
* database clock for lock timing, which avoids problems caused by clock drift between replicas.
*/
@Bean
fun lockProvider(dataSource: DataSource): LockProvider = JdbcTemplateLockProvider(
JdbcTemplateLockProvider.Configuration.builder()
.withJdbcTemplate(JdbcTemplate(dataSource))
.usingDbTime()
.build(),
)

@Bean
fun logFilter(): CommonsRequestLoggingFilter {
val filter = CommonsRequestLoggingFilter()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.loculus.backend.service.seqsetcitations

import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
import org.loculus.backend.api.SeqSetCitationSource
import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.config.ENABLE_SEQSETS_TRUE_VALUE
Expand Down Expand Up @@ -36,7 +37,7 @@ class SeqSetCrossRefCitationsTask(
private val seqSetCitationsDatabaseService: SeqSetCitationsDatabaseService,
) {
/**
* Runs every six hours, with an initial delay of one minute.
* Runs at most once every six hours (enforced by the ShedLock `lockAtLeastFor`), with an initial delay of one minute.
*
* The task checks that the CrossRef service is active and a DOI prefix is configured for the Loculus instance.
* If configured, it retrieves all CrossRef forward links (citations) which begin with the instance's DOI prefix.
Expand All @@ -45,9 +46,14 @@ class SeqSetCrossRefCitationsTask(
*/
@Scheduled(
initialDelay = 1,
fixedDelay = 360,
fixedDelay = 5,
timeUnit = java.util.concurrent.TimeUnit.MINUTES,
)
@SchedulerLock(
name = "seqSetCrossRefCitations",
lockAtLeastFor = "\${loculus.locks.seqSetCrossRefCitations.atLeast:PT6H}",
lockAtMostFor = "PT6H",
)
fun task() {
log.info { "Updating SeqSet CrossRef citations..." }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.loculus.backend.service.maintenance
import kotlinx.datetime.DateTimeUnit
import kotlinx.datetime.minus
import kotlinx.datetime.toLocalDateTime
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
import org.loculus.backend.log.AuditLogger
import org.loculus.backend.service.submission.UploadDatabaseService
import org.loculus.backend.utils.DateProvider
Expand All @@ -19,9 +20,16 @@ class CleanUpAuxTableTask(
) {

/**
* Runs every hour and deletes auxTable entries older than 24 hours.
* Deletes auxTable entries older than 24 hours. Effectively runs once per `lockAtLeastFor` (1h)
* regardless of replica count; `lockAtMostFor` is larger so an occasional long run keeps the lock
* rather than allowing a parallel run, while still releasing if a replica dies mid-task.
*/
@Scheduled(fixedDelay = 1, timeUnit = java.util.concurrent.TimeUnit.HOURS)
@Scheduled(fixedDelay = 5, timeUnit = java.util.concurrent.TimeUnit.MINUTES)
@SchedulerLock(
name = "cleanUpAuxTable",
lockAtLeastFor = "\${loculus.locks.cleanUpAuxTable.atLeast:PT1H}",
lockAtMostFor = "PT6H",
)
fun task() {
val hourCutoff = 24L
val now = dateProvider.getCurrentInstant()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.loculus.backend.service.submission

import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
import org.loculus.backend.config.BackendSpringProperty
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.Scheduled
Expand All @@ -13,7 +14,19 @@ class CleanUpStaleSequencesInProcessingTask(
private val submissionDatabaseService: SubmissionDatabaseService,
@Value("\${${BackendSpringProperty.STALE_AFTER_SECONDS}}") private val timeToStaleInSeconds: Long,
) {
@Scheduled(fixedRateString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}", timeUnit = TimeUnit.SECONDS)
// `fixedDelay`, not `fixedRate`: scheduling from completion keeps each poll past the lock expiry.
// With `fixedRate` the poll grid would sit on the `lockAtLeastFor` boundary and skip ticks on jitter.
@Scheduled(
fixedDelayString = "\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}",
timeUnit = TimeUnit.SECONDS,
)
@SchedulerLock(
name = "cleanUpStaleSequencesInProcessing",
// Defaults to the configured run-every interval so it is honored, not overridden (tests use PT0S).
lockAtLeastFor = "\${loculus.locks.cleanUpStaleSequencesInProcessing.atLeast:" +
"PT\${${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}}S}",
lockAtMostFor = "PT5M",
)
fun task() {
log.info { "Cleaning up stale sequences in processing, timeToStaleInSeconds: $timeToStaleInSeconds" }
submissionDatabaseService.cleanUpStaleSequencesInProcessing(timeToStaleInSeconds)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.loculus.backend.service.submission

import net.javacrumbs.shedlock.spring.annotation.SchedulerLock
import org.loculus.backend.config.BackendSpringProperty
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
Expand All @@ -18,6 +19,15 @@ class UseNewerProcessingPipelineVersionTask(private val submissionDatabaseServic
fixedDelayString = "\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}",
timeUnit = TimeUnit.SECONDS,
)
@SchedulerLock(
name = "useNewerProcessingPipelineVersion",
// Defaults to the configured check interval so an operator who tunes `interval-seconds` is
// honored, not overridden. The Helm chart sets `atMost` to 5x the interval; PT1M is the
// non-Helm fallback. Both overridable via `loculus.locks.*` (tests use PT0S).
lockAtLeastFor = "\${loculus.locks.useNewerProcessingPipelineVersion.atLeast:" +
"PT\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}S}",
lockAtMostFor = "\${loculus.locks.useNewerProcessingPipelineVersion.atMost:PT1M}",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hard coding is problematic when the task starts taking longer on big instances

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh this is actually configurable but bypassing our explicit config variables

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree it could be confusing to have this set in two ways. Would be happy to discard one

)
fun task() {
log.info { "Checking for newer preprocessing pipeline versions" }
val newVersions = submissionDatabaseService.useNewerProcessingPipelineIfPossible()
Expand Down
10 changes: 10 additions & 0 deletions backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ management.health.readinessState.enabled=true
loculus.cleanup.task.reset-stale-in-processing-after-seconds=60
loculus.cleanup.task.run-every-seconds=60
loculus.pipeline-version-upgrade-check.interval-seconds=10

# ShedLock minimum lock durations (ISO-8601): each scheduled task holds its lock for at least this
# long, which is what makes the task run once per interval regardless of the number of replicas.
# Defaults live on each @SchedulerLock annotation (lockAtLeastFor); the two interval-driven tasks
# default to their configured interval so it is honored rather than overridden. Set the values below
# only to override, and keep each <= the task's lockAtMostFor.
# loculus.locks.cleanUpStaleSequencesInProcessing.atLeast (defaults to run-every-seconds)
# loculus.locks.useNewerProcessingPipelineVersion.atLeast (defaults to interval-seconds; Helm also sets .atMost to 5x)
# loculus.locks.cleanUpAuxTable.atLeast=PT1H
# loculus.locks.seqSetCrossRefCitations.atLeast=PT6H
loculus.stream.batch-size=1000
loculus.debug-mode=false

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Table used by ShedLock to coordinate scheduled tasks across backend replicas.
-- Only one replica can hold the lock for a given task name at a time, so scheduled
-- tasks run once per interval regardless of how many replicas are deployed.
-- Schema as required by net.javacrumbs.shedlock:shedlock-provider-jdbc-template.
CREATE TABLE shedlock (
name VARCHAR(64) NOT NULL,
lock_until TIMESTAMP WITH TIME ZONE NOT NULL,
locked_at TIMESTAMP WITH TIME ZONE NOT NULL,
locked_by VARCHAR(255) NOT NULL,
PRIMARY KEY (name)
);
Comment thread
anna-parker marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.loculus.backend.service.submission

import net.javacrumbs.shedlock.core.LockConfiguration
import net.javacrumbs.shedlock.core.LockProvider
import org.hamcrest.CoreMatchers.`is`
import org.hamcrest.MatcherAssert.assertThat
import org.junit.jupiter.api.Test
import org.loculus.backend.config.BackendSpringProperty
import org.loculus.backend.controller.EndpointTest
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.jdbc.core.JdbcTemplate
import java.time.Duration
import java.time.Instant

@EndpointTest(
properties = [
"${BackendSpringProperty.STALE_AFTER_SECONDS}=0",
"${BackendSpringProperty.CLEAN_UP_RUN_EVERY_SECONDS}=3600",
],
)
class ShedLockIntegrationTest(
@Autowired val jdbcTemplate: JdbcTemplate,
@Autowired val lockProvider: LockProvider,
@Autowired val cleanUpStaleSequencesInProcessingTask: CleanUpStaleSequencesInProcessingTask,
) {
@Test
fun `WHEN a scheduled task is invoked through the Spring proxy THEN a shedlock row is created`() {
cleanUpStaleSequencesInProcessingTask.task()

val count = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM shedlock WHERE name = ?",
Int::class.java,
"cleanUpStaleSequencesInProcessing",
)
assertThat(count, `is`(1))
}

@Test
fun `WHEN a lock is released within lockAtLeastFor THEN it cannot be re-acquired yet`() {
// A unique lock name no scheduled task uses, so the background scheduler can't interfere.
val lockName = "shedLockIntegrationTestLock"
val lockAtMostFor = Duration.ofMinutes(5)
val lockAtLeastFor = Duration.ofMinutes(1)

val firstLock = lockProvider.lock(
LockConfiguration(Instant.now(), lockName, lockAtMostFor, lockAtLeastFor),
)
assertThat("the lock should be acquired when free", firstLock.isPresent, `is`(true))

// Release immediately - because lockAtLeastFor has not elapsed, the lock stays held.
firstLock.get().unlock()

val secondLock = lockProvider.lock(
LockConfiguration(Instant.now(), lockName, lockAtMostFor, lockAtLeastFor),
)
assertThat(
"re-acquisition within lockAtLeastFor should be refused",
secondLock.isPresent,
`is`(false),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.loculus.backend.service.submission

import org.hamcrest.CoreMatchers.`is`
import org.hamcrest.MatcherAssert.assertThat
import org.junit.jupiter.api.Test
import org.loculus.backend.config.BackendSpringProperty
import org.springframework.core.env.MapPropertySource
import org.springframework.core.env.StandardEnvironment

/**
* Verifies the placeholder strings compiled into the `@SchedulerLock` annotations resolve as
* intended. The DB-backed [ShedLockIntegrationTest] always runs with `atLeast` overridden to PT0S,
* so it cannot exercise the production default; this lightweight test covers that path.
*/
class ShedLockPropertyResolutionTest {

// Must mirror the lockAtLeastFor string in UseNewerProcessingPipelineVersionTask exactly.
private val pipelineLockAtLeast =
"\${loculus.locks.useNewerProcessingPipelineVersion.atLeast:" +
"PT\${${BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS}}S}"

private fun resolve(expression: String, props: Map<String, Any>): String {
val env = StandardEnvironment()
env.propertySources.addFirst(MapPropertySource("test", props))
return env.resolvePlaceholders(expression)
}

@Test
fun `WHEN no lock override is set THEN lockAtLeastFor falls back to the configured interval`() {
val resolved = resolve(
pipelineLockAtLeast,
mapOf(BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS to "3"),
)
assertThat(resolved, `is`("PT3S"))
}

@Test
fun `WHEN a lock override is set THEN it takes precedence over the interval default`() {
val resolved = resolve(
pipelineLockAtLeast,
mapOf(
BackendSpringProperty.PIPELINE_VERSION_UPGRADE_CHECK_INTERVAL_SECONDS to "3",
"loculus.locks.useNewerProcessingPipelineVersion.atLeast" to "PT0S",
),
)
assertThat(resolved, `is`("PT0S"))
}
}
8 changes: 8 additions & 0 deletions backend/src/test/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@ keycloak.client=dummy-cli
keycloak.url=dummy:420

spring.security.oauth2.resourceserver.jwt.jwk-set-uri=http://some.value

# Disable the ShedLock minimum lock duration in tests so scheduled tasks can be invoked
# repeatedly within a single test (and across tests sharing the test-container database)
# without being skipped. ShedLockIntegrationTest exercises lockAtLeastFor directly via LockProvider.
loculus.locks.cleanUpStaleSequencesInProcessing.atLeast=PT0S
loculus.locks.useNewerProcessingPipelineVersion.atLeast=PT0S
loculus.locks.cleanUpAuxTable.atLeast=PT0S
loculus.locks.seqSetCrossRefCitations.atLeast=PT0S
4 changes: 4 additions & 0 deletions kubernetes/loculus/templates/loculus-backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ spec:
- "--spring.security.oauth2.resourceserver.jwt.jwk-set-uri=http://loculus-keycloak-service:8083/realms/loculus/protocol/openid-connect/certs"
- "--loculus.cleanup.task.reset-stale-in-processing-after-seconds={{- .Values.preprocessingTimeout | default 120 }}"
- "--loculus.pipeline-version-upgrade-check.interval-seconds={{- .Values.pipelineVersionUpgradeCheckIntervalSeconds | default 10 }}"
# ShedLock lock bounds for the pipeline-version-upgrade check, derived from the configured
# interval so the lock never overrides it: hold for the interval, expire after 5x it.
- "--loculus.locks.useNewerProcessingPipelineVersion.atLeast=PT{{ .Values.pipelineVersionUpgradeCheckIntervalSeconds | default 10 | int }}S"
- "--loculus.locks.useNewerProcessingPipelineVersion.atMost=PT{{ mul (.Values.pipelineVersionUpgradeCheckIntervalSeconds | default 10 | int) 5 }}S"
- "--loculus.s3.enabled=$(S3_ENABLED)"
{{- if $.Values.s3.enabled }}
- "--loculus.s3.bucket.endpoint=$(S3_BUCKET_ENDPOINT)"
Expand Down
Loading