diff --git a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/LocalWorkerServiceTests.cs b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/LocalWorkerServiceTests.cs index 39c260a..3bdb85f 100644 --- a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/LocalWorkerServiceTests.cs +++ b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/LocalWorkerServiceTests.cs @@ -76,9 +76,9 @@ public async Task Worker_ClaimsAndExecutes_AvailableJob() Scope.ServiceProvider.GetRequiredService() ); - // Start the worker and give it time to process + // Start the worker and wait for it to process the job var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(2000); // Allow enough time for claim + execute + cleanup + var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout); cts.Cancel(); try @@ -91,6 +91,7 @@ public async Task Worker_ClaimsAndExecutes_AvailableJob() } // Assert - The job should have been executed and deleted + executed.Should().BeTrue("job should be deleted after execution"); DataContext.Reset(); var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId); remainingJob.Should().BeNull("job should be deleted after execution"); @@ -140,7 +141,14 @@ public async Task Worker_ExecutesTrain_UpdatesMetadata() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(2000); + var advanced = await WaitUntilAsync( + async () => + { + var m = await DataContext.Metadatas.FirstOrDefaultAsync(x => x.Id == metadata.Id); + return m is not null && m.TrainState != TrainState.Pending; + }, + WorkerCompletionTimeout + ); cts.Cancel(); try @@ -150,6 +158,7 @@ public async Task Worker_ExecutesTrain_UpdatesMetadata() catch (OperationCanceledException) { } // Assert - Metadata should be updated by the train execution + advanced.Should().BeTrue("the train should have advanced past Pending"); DataContext.Reset(); var updatedMetadata = await DataContext.Metadatas.FirstOrDefaultAsync(m => m.Id == metadata.Id @@ -192,7 +201,7 @@ public async Task Worker_DeletesJob_AfterSuccessfulExecution() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(2000); + var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout); cts.Cancel(); try @@ -202,6 +211,7 @@ public async Task Worker_DeletesJob_AfterSuccessfulExecution() catch (OperationCanceledException) { } // Assert + executed.Should().BeTrue("job should be deleted after successful execution"); DataContext.Reset(); var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId); remainingJob.Should().BeNull("job should be deleted after successful execution"); @@ -238,7 +248,7 @@ public async Task Worker_DeletesJob_AfterFailedExecution() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(2000); + var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout); cts.Cancel(); try @@ -248,6 +258,7 @@ public async Task Worker_DeletesJob_AfterFailedExecution() catch (OperationCanceledException) { } // Assert - Job should be deleted even on failure (matches AutoDeleteOnSuccessFilter behavior) + executed.Should().BeTrue("job should be deleted even after failed execution"); DataContext.Reset(); var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId); remainingJob.Should().BeNull("job should be deleted even after failed execution"); @@ -331,7 +342,7 @@ public async Task Worker_ReclainsStaleJob_AfterVisibilityTimeout() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(2000); + var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout); cts.Cancel(); try @@ -341,6 +352,7 @@ public async Task Worker_ReclainsStaleJob_AfterVisibilityTimeout() catch (OperationCanceledException) { } // Assert - The stale job should have been reclaimed and executed (then deleted) + executed.Should().BeTrue("stale job should be reclaimed and executed"); DataContext.Reset(); var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId); remainingJob.Should().BeNull("stale job should be reclaimed and executed"); @@ -432,7 +444,7 @@ public async Task MultipleWorkers_ProcessMultipleJobs_NoDuplicates() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(3000); // Allow time for all jobs to be processed + var drained = await WaitForJobCount(0, WorkerCompletionTimeout); cts.Cancel(); try @@ -442,6 +454,7 @@ public async Task MultipleWorkers_ProcessMultipleJobs_NoDuplicates() catch (OperationCanceledException) { } // Assert - All jobs should have been processed and deleted + drained.Should().BeTrue("all jobs should be processed and deleted"); DataContext.Reset(); var remainingJobs = await DataContext.BackgroundJobs.CountAsync(); remainingJobs.Should().Be(0, "all jobs should be processed and deleted"); @@ -502,20 +515,48 @@ public async Task Worker_BatchSize1_ClaimsOneJobPerRound() remaining.Should().Be(0, "all jobs should be processed with BatchSize=1"); } - private async Task WaitForJobCount(int expected, TimeSpan timeout) + /// + /// Polls every 50ms until it returns true or + /// elapses. Resets the data context before each + /// evaluation so EF tracking does not return stale results. Returns true if + /// the predicate succeeded, false if the timeout fired. + /// + /// + /// Use instead of fixed Task.Delays when waiting for the worker to + /// process queued jobs. CI scheduling can stretch each train's effect-runner + /// pass well past historical local timings; fixed sleeps then race the + /// assertion. A poll-on-condition synchronises on the actual completion + /// signal and finishes as soon as it appears, with the timeout serving only + /// as a safety ceiling. + /// + private async Task WaitUntilAsync(Func> predicate, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { DataContext.Reset(); - var count = await DataContext.BackgroundJobs.CountAsync(); - if (count == expected) + if (await predicate()) return true; await Task.Delay(50); } return false; } + private Task WaitForJobCount(int expected, TimeSpan timeout) => + WaitUntilAsync( + async () => await DataContext.BackgroundJobs.CountAsync() == expected, + timeout + ); + + private Task WaitForJobAbsent(long jobId, TimeSpan timeout) => + WaitUntilAsync( + async () => + await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId) is null, + timeout + ); + + private static readonly TimeSpan WorkerCompletionTimeout = TimeSpan.FromSeconds(15); + [Test] public async Task Worker_BatchSize5_ClaimsMultipleJobsPerRound() { @@ -547,7 +588,7 @@ public async Task Worker_BatchSize5_ClaimsMultipleJobsPerRound() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(3000); + var drained = await WaitForJobCount(0, WorkerCompletionTimeout); cts.Cancel(); try @@ -557,6 +598,7 @@ public async Task Worker_BatchSize5_ClaimsMultipleJobsPerRound() catch (OperationCanceledException) { } // Assert - All 10 jobs should have been processed + drained.Should().BeTrue("all jobs should be processed with BatchSize=5"); DataContext.Reset(); var remaining = await DataContext.BackgroundJobs.CountAsync(); remaining.Should().Be(0, "all jobs should be processed with BatchSize=5"); @@ -593,7 +635,7 @@ public async Task Worker_BatchSize_LargerThanAvailable_ClaimsAllAvailable() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(2000); + var drained = await WaitForJobCount(0, WorkerCompletionTimeout); cts.Cancel(); try @@ -603,6 +645,9 @@ public async Task Worker_BatchSize_LargerThanAvailable_ClaimsAllAvailable() catch (OperationCanceledException) { } // Assert - All 3 available jobs should be processed + drained + .Should() + .BeTrue("all available jobs should be claimed even when BatchSize > available"); DataContext.Reset(); var remaining = await DataContext.BackgroundJobs.CountAsync(); remaining @@ -659,7 +704,7 @@ public async Task Worker_WithInputJob_DeserializesAndPassesToTrain() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(2000); + var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout); cts.Cancel(); try @@ -669,6 +714,7 @@ public async Task Worker_WithInputJob_DeserializesAndPassesToTrain() catch (OperationCanceledException) { } // Assert - Job should be executed and deleted + executed.Should().BeTrue("job with input should be executed and deleted"); DataContext.Reset(); var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId); remainingJob.Should().BeNull("job with input should be executed and deleted"); @@ -757,7 +803,7 @@ public async Task Worker_ClaimsHighPriorityJobs_BeforeLowPriority() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(3000); + var drained = await WaitForJobCount(0, WorkerCompletionTimeout); cts.Cancel(); try @@ -767,6 +813,7 @@ public async Task Worker_ClaimsHighPriorityJobs_BeforeLowPriority() catch (OperationCanceledException) { } // Assert - All jobs processed; high-priority should complete first (sequential in batch) + drained.Should().BeTrue("all jobs should be processed"); DataContext.Reset(); var remaining = await DataContext.BackgroundJobs.CountAsync(); remaining.Should().Be(0, "all jobs should be processed"); @@ -871,7 +918,7 @@ public async Task Worker_ClaimsFIFO_WithinSamePriority() ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(3000); + var drained = await WaitForJobCount(0, WorkerCompletionTimeout); cts.Cancel(); try @@ -881,6 +928,7 @@ public async Task Worker_ClaimsFIFO_WithinSamePriority() catch (OperationCanceledException) { } // Assert - FIFO within same priority: job1 completes before job2 before job3 + drained.Should().BeTrue("all jobs should be processed"); DataContext.Reset(); var metadatas = await DataContext .Metadatas.Where(m => m.Id == meta1.Id || m.Id == meta2.Id || m.Id == meta3.Id) @@ -976,7 +1024,7 @@ public async Task Worker_MixedPriorities_HighPriorityProcessedFirst_EvenIfCreate ); var workerTask = workerService.StartAsync(cts.Token); - await Task.Delay(3000); + var drained = await WaitForJobCount(0, WorkerCompletionTimeout); cts.Cancel(); try @@ -986,6 +1034,7 @@ public async Task Worker_MixedPriorities_HighPriorityProcessedFirst_EvenIfCreate catch (OperationCanceledException) { } // Assert - High priority job should complete first despite being created later + drained.Should().BeTrue("all jobs should be processed"); DataContext.Reset(); var highMeta = await DataContext.Metadatas.FirstAsync(m => m.Id == metaHigh.Id); var lowMeta = await DataContext.Metadatas.FirstAsync(m => m.Id == metaLow.Id);