Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public async Task Worker_ClaimsAndExecutes_AvailableJob()
Scope.ServiceProvider.GetRequiredService<ISqlDialect>()
);

// Start the worker and give it time to process
// Start the worker and wait for it to process the job
var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(2000); // Allow enough time for claim + execute + cleanup
var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -91,6 +91,7 @@ public async Task Worker_ClaimsAndExecutes_AvailableJob()
}

// Assert - The job should have been executed and deleted
executed.Should().BeTrue("job should be deleted after execution");
DataContext.Reset();
var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId);
remainingJob.Should().BeNull("job should be deleted after execution");
Expand Down Expand Up @@ -140,7 +141,14 @@ public async Task Worker_ExecutesTrain_UpdatesMetadata()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(2000);
var advanced = await WaitUntilAsync(
async () =>
{
var m = await DataContext.Metadatas.FirstOrDefaultAsync(x => x.Id == metadata.Id);
return m is not null && m.TrainState != TrainState.Pending;
},
WorkerCompletionTimeout
);
cts.Cancel();

try
Expand All @@ -150,6 +158,7 @@ public async Task Worker_ExecutesTrain_UpdatesMetadata()
catch (OperationCanceledException) { }

// Assert - Metadata should be updated by the train execution
advanced.Should().BeTrue("the train should have advanced past Pending");
DataContext.Reset();
var updatedMetadata = await DataContext.Metadatas.FirstOrDefaultAsync(m =>
m.Id == metadata.Id
Expand Down Expand Up @@ -192,7 +201,7 @@ public async Task Worker_DeletesJob_AfterSuccessfulExecution()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(2000);
var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -202,6 +211,7 @@ public async Task Worker_DeletesJob_AfterSuccessfulExecution()
catch (OperationCanceledException) { }

// Assert
executed.Should().BeTrue("job should be deleted after successful execution");
DataContext.Reset();
var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId);
remainingJob.Should().BeNull("job should be deleted after successful execution");
Expand Down Expand Up @@ -238,7 +248,7 @@ public async Task Worker_DeletesJob_AfterFailedExecution()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(2000);
var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -248,6 +258,7 @@ public async Task Worker_DeletesJob_AfterFailedExecution()
catch (OperationCanceledException) { }

// Assert - Job should be deleted even on failure (matches AutoDeleteOnSuccessFilter behavior)
executed.Should().BeTrue("job should be deleted even after failed execution");
DataContext.Reset();
var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId);
remainingJob.Should().BeNull("job should be deleted even after failed execution");
Expand Down Expand Up @@ -331,7 +342,7 @@ public async Task Worker_ReclainsStaleJob_AfterVisibilityTimeout()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(2000);
var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -341,6 +352,7 @@ public async Task Worker_ReclainsStaleJob_AfterVisibilityTimeout()
catch (OperationCanceledException) { }

// Assert - The stale job should have been reclaimed and executed (then deleted)
executed.Should().BeTrue("stale job should be reclaimed and executed");
DataContext.Reset();
var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId);
remainingJob.Should().BeNull("stale job should be reclaimed and executed");
Expand Down Expand Up @@ -432,7 +444,7 @@ public async Task MultipleWorkers_ProcessMultipleJobs_NoDuplicates()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(3000); // Allow time for all jobs to be processed
var drained = await WaitForJobCount(0, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -442,6 +454,7 @@ public async Task MultipleWorkers_ProcessMultipleJobs_NoDuplicates()
catch (OperationCanceledException) { }

// Assert - All jobs should have been processed and deleted
drained.Should().BeTrue("all jobs should be processed and deleted");
DataContext.Reset();
var remainingJobs = await DataContext.BackgroundJobs.CountAsync();
remainingJobs.Should().Be(0, "all jobs should be processed and deleted");
Expand Down Expand Up @@ -502,20 +515,48 @@ public async Task Worker_BatchSize1_ClaimsOneJobPerRound()
remaining.Should().Be(0, "all jobs should be processed with BatchSize=1");
}

private async Task<bool> WaitForJobCount(int expected, TimeSpan timeout)
/// <summary>
/// Polls <paramref name="predicate"/> every 50ms until it returns true or
/// <paramref name="timeout"/> elapses. Resets the data context before each
/// evaluation so EF tracking does not return stale results. Returns true if
/// the predicate succeeded, false if the timeout fired.
/// </summary>
/// <remarks>
/// Use instead of fixed <c>Task.Delay</c>s when waiting for the worker to
/// process queued jobs. CI scheduling can stretch each train's effect-runner
/// pass well past historical local timings; fixed sleeps then race the
/// assertion. A poll-on-condition synchronises on the actual completion
/// signal and finishes as soon as it appears, with the timeout serving only
/// as a safety ceiling.
/// </remarks>
private async Task<bool> WaitUntilAsync(Func<Task<bool>> predicate, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
DataContext.Reset();
var count = await DataContext.BackgroundJobs.CountAsync();
if (count == expected)
if (await predicate())
return true;
await Task.Delay(50);
}
return false;
}

private Task<bool> WaitForJobCount(int expected, TimeSpan timeout) =>
WaitUntilAsync(
async () => await DataContext.BackgroundJobs.CountAsync() == expected,
timeout
);

private Task<bool> WaitForJobAbsent(long jobId, TimeSpan timeout) =>
WaitUntilAsync(
async () =>
await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId) is null,
timeout
);

private static readonly TimeSpan WorkerCompletionTimeout = TimeSpan.FromSeconds(15);

[Test]
public async Task Worker_BatchSize5_ClaimsMultipleJobsPerRound()
{
Expand Down Expand Up @@ -547,7 +588,7 @@ public async Task Worker_BatchSize5_ClaimsMultipleJobsPerRound()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(3000);
var drained = await WaitForJobCount(0, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -557,6 +598,7 @@ public async Task Worker_BatchSize5_ClaimsMultipleJobsPerRound()
catch (OperationCanceledException) { }

// Assert - All 10 jobs should have been processed
drained.Should().BeTrue("all jobs should be processed with BatchSize=5");
DataContext.Reset();
var remaining = await DataContext.BackgroundJobs.CountAsync();
remaining.Should().Be(0, "all jobs should be processed with BatchSize=5");
Expand Down Expand Up @@ -593,7 +635,7 @@ public async Task Worker_BatchSize_LargerThanAvailable_ClaimsAllAvailable()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(2000);
var drained = await WaitForJobCount(0, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -603,6 +645,9 @@ public async Task Worker_BatchSize_LargerThanAvailable_ClaimsAllAvailable()
catch (OperationCanceledException) { }

// Assert - All 3 available jobs should be processed
drained
.Should()
.BeTrue("all available jobs should be claimed even when BatchSize > available");
DataContext.Reset();
var remaining = await DataContext.BackgroundJobs.CountAsync();
remaining
Expand Down Expand Up @@ -659,7 +704,7 @@ public async Task Worker_WithInputJob_DeserializesAndPassesToTrain()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(2000);
var executed = await WaitForJobAbsent(jobId, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -669,6 +714,7 @@ public async Task Worker_WithInputJob_DeserializesAndPassesToTrain()
catch (OperationCanceledException) { }

// Assert - Job should be executed and deleted
executed.Should().BeTrue("job with input should be executed and deleted");
DataContext.Reset();
var remainingJob = await DataContext.BackgroundJobs.FirstOrDefaultAsync(j => j.Id == jobId);
remainingJob.Should().BeNull("job with input should be executed and deleted");
Expand Down Expand Up @@ -757,7 +803,7 @@ public async Task Worker_ClaimsHighPriorityJobs_BeforeLowPriority()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(3000);
var drained = await WaitForJobCount(0, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -767,6 +813,7 @@ public async Task Worker_ClaimsHighPriorityJobs_BeforeLowPriority()
catch (OperationCanceledException) { }

// Assert - All jobs processed; high-priority should complete first (sequential in batch)
drained.Should().BeTrue("all jobs should be processed");
DataContext.Reset();
var remaining = await DataContext.BackgroundJobs.CountAsync();
remaining.Should().Be(0, "all jobs should be processed");
Expand Down Expand Up @@ -871,7 +918,7 @@ public async Task Worker_ClaimsFIFO_WithinSamePriority()
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(3000);
var drained = await WaitForJobCount(0, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -881,6 +928,7 @@ public async Task Worker_ClaimsFIFO_WithinSamePriority()
catch (OperationCanceledException) { }

// Assert - FIFO within same priority: job1 completes before job2 before job3
drained.Should().BeTrue("all jobs should be processed");
DataContext.Reset();
var metadatas = await DataContext
.Metadatas.Where(m => m.Id == meta1.Id || m.Id == meta2.Id || m.Id == meta3.Id)
Expand Down Expand Up @@ -976,7 +1024,7 @@ public async Task Worker_MixedPriorities_HighPriorityProcessedFirst_EvenIfCreate
);

var workerTask = workerService.StartAsync(cts.Token);
await Task.Delay(3000);
var drained = await WaitForJobCount(0, WorkerCompletionTimeout);
cts.Cancel();

try
Expand All @@ -986,6 +1034,7 @@ public async Task Worker_MixedPriorities_HighPriorityProcessedFirst_EvenIfCreate
catch (OperationCanceledException) { }

// Assert - High priority job should complete first despite being created later
drained.Should().BeTrue("all jobs should be processed");
DataContext.Reset();
var highMeta = await DataContext.Metadatas.FirstAsync(m => m.Id == metaHigh.Id);
var lowMeta = await DataContext.Metadatas.FirstAsync(m => m.Id == metaLow.Id);
Expand Down
Loading