From c4bf0119c22525d63f7646538b0f4927b3698c34 Mon Sep 17 00:00:00 2001
From: Theaux Masquelier <43664045+Theauxm@users.noreply.github.com>
Date: Wed, 1 Apr 2026 10:23:29 -0600
Subject: [PATCH] fix: improve scheduler startup resilience on low-resource
Postgres
Orphan manifest pruning timed out on a 2 vCPU Postgres instance because
EF Core inlined 5000+ expected external IDs as NOT IN(...) string
parameters, overwhelming the query planner. The fix loads all manifest
ID pairs with a lightweight SELECT and computes the orphan set in C#,
then deletes in batches of 500 by integer PK.
Also batches RecoverStuckJobs via ExecuteUpdateAsync (same pattern as
the reap junctions) and adds MaxWorkQueueEntriesPerCycle (default 200)
to cap work queue creation per manifest manager cycle.
---
.../Configuration/SchedulerConfiguration.cs | 14 ++
.../SchedulerConfigurationBuilder.Settings.cs | 18 ++
.../SchedulerStartupService.cs | 140 +++++++----
.../Services/TraxScheduler/TraxScheduler.cs | 18 +-
.../CreateWorkQueueEntriesJunction.cs | 11 +
.../ManifestManagerTrainTests.cs | 78 ++++++
.../OrphanManifestPruningTests.cs | 89 +++++++
.../IntegrationTests/RecoverStuckJobsTests.cs | 40 +++
.../IntegrationTests/QueryPerformanceTests.cs | 230 ++++++++++++++++--
9 files changed, 564 insertions(+), 74 deletions(-)
diff --git a/src/Trax.Scheduler/Configuration/SchedulerConfiguration.cs b/src/Trax.Scheduler/Configuration/SchedulerConfiguration.cs
index 03d58f3..d24dca6 100644
--- a/src/Trax.Scheduler/Configuration/SchedulerConfiguration.cs
+++ b/src/Trax.Scheduler/Configuration/SchedulerConfiguration.cs
@@ -241,6 +241,20 @@ public class SchedulerConfiguration
///
public int? MaxQueuedJobsPerCycle { get; set; } = 100;
+ ///
+ /// The maximum number of work queue entries created per ManifestManager polling cycle.
+ ///
+ ///
+ /// Controls how many entries
+ /// creates per cycle. When more manifests are due than this limit, excess manifests are
+ /// deferred to the next polling cycle (default: 5 seconds). This prevents a burst of DB
+ /// writes from saturating low-resource database instances, particularly after extended
+ /// downtime when many manifests become due simultaneously via the FireOnceNow
+ /// misfire policy.
+ /// Set to null to disable the limit (unlimited — all due manifests are enqueued per cycle).
+ ///
+ public int? MaxWorkQueueEntriesPerCycle { get; set; } = 200;
+
///
/// Whether to automatically prune manifests from the database that are no longer
/// defined in the startup configuration.
diff --git a/src/Trax.Scheduler/Configuration/SchedulerConfigurationBuilder/SchedulerConfigurationBuilder.Settings.cs b/src/Trax.Scheduler/Configuration/SchedulerConfigurationBuilder/SchedulerConfigurationBuilder.Settings.cs
index 7e2f3c4..73bf3e3 100644
--- a/src/Trax.Scheduler/Configuration/SchedulerConfigurationBuilder/SchedulerConfigurationBuilder.Settings.cs
+++ b/src/Trax.Scheduler/Configuration/SchedulerConfigurationBuilder/SchedulerConfigurationBuilder.Settings.cs
@@ -109,6 +109,24 @@ public SchedulerConfigurationBuilder MaxQueuedJobsPerCycle(int? limit)
return this;
}
+ ///
+ /// Sets the maximum number of work queue entries created per ManifestManager polling cycle.
+ ///
+ /// The maximum entries to create (default: 200, null = unlimited, minimum: 1)
+ /// The builder for method chaining
+ ///
+ /// Prevents a burst of DB writes after extended downtime when many manifests become due
+ /// simultaneously. Excess manifests are deferred to the next cycle (default: 5 seconds).
+ /// Set to null to disable the limit.
+ ///
+ public SchedulerConfigurationBuilder MaxWorkQueueEntriesPerCycle(int? limit)
+ {
+ _configuration.MaxWorkQueueEntriesPerCycle = limit.HasValue
+ ? Math.Max(1, limit.Value)
+ : null;
+ return this;
+ }
+
///
/// Excludes a train type from the MaxActiveJobs count.
///
diff --git a/src/Trax.Scheduler/Services/SchedulerStartupService/SchedulerStartupService.cs b/src/Trax.Scheduler/Services/SchedulerStartupService/SchedulerStartupService.cs
index 75f5c24..07fe576 100644
--- a/src/Trax.Scheduler/Services/SchedulerStartupService/SchedulerStartupService.cs
+++ b/src/Trax.Scheduler/Services/SchedulerStartupService/SchedulerStartupService.cs
@@ -42,36 +42,54 @@ private async Task RecoverStuckJobs(CancellationToken cancellationToken)
using var scope = serviceProvider.CreateScope();
var dataContext = scope.ServiceProvider.GetRequiredService();
- var stuckJobs = await dataContext
- .Metadatas.Where(m =>
- m.TrainState == TrainState.InProgress && m.StartTime < serverStartTime
- )
- .ToListAsync(cancellationToken);
+ var totalRecovered = 0;
- if (stuckJobs.Count == 0)
+ while (true)
{
- logger.LogInformation(
- "RecoverStuckJobs: no in-progress jobs found from before server start"
- );
- return;
- }
+ var stuckIds = await dataContext
+ .Metadatas.Where(m =>
+ m.TrainState == TrainState.InProgress && m.StartTime < serverStartTime
+ )
+ .OrderBy(m => m.Id)
+ .Select(m => m.Id)
+ .Take(PruneBatchSize)
+ .ToListAsync(cancellationToken);
- foreach (var metadata in stuckJobs)
- {
- metadata.TrainState = TrainState.Failed;
- metadata.EndTime = DateTime.UtcNow;
- metadata.AddException(
- new InvalidOperationException("Server restarted while job was in progress")
- );
- }
+ if (stuckIds.Count == 0)
+ break;
- await dataContext.SaveChanges(cancellationToken);
+ var now = DateTime.UtcNow;
- logger.LogWarning(
- "RecoverStuckJobs: failed {Count} stuck in-progress job(s) from before server start at {ServerStartTime}",
- stuckJobs.Count,
- serverStartTime
- );
+ await dataContext
+ .Metadatas.Where(m =>
+ stuckIds.Contains(m.Id) && m.TrainState == TrainState.InProgress
+ )
+ .ExecuteUpdateAsync(
+ s =>
+ s.SetProperty(m => m.TrainState, TrainState.Failed)
+ .SetProperty(m => m.EndTime, now)
+ .SetProperty(
+ m => m.FailureReason,
+ "Server restarted while job was in progress"
+ )
+ .SetProperty(m => m.FailureException, "ServerRestart")
+ .SetProperty(m => m.FailureJunction, nameof(SchedulerStartupService)),
+ cancellationToken
+ );
+
+ totalRecovered += stuckIds.Count;
+ }
+
+ if (totalRecovered > 0)
+ logger.LogWarning(
+ "RecoverStuckJobs: failed {Count} stuck in-progress job(s) from before server start at {ServerStartTime}",
+ totalRecovered,
+ serverStartTime
+ );
+ else
+ logger.LogInformation(
+ "RecoverStuckJobs: no in-progress jobs found from before server start"
+ );
}
private async Task SeedPendingManifests(CancellationToken cancellationToken)
@@ -207,27 +225,54 @@ private async Task PruneOrphanedManifestsAsync(
CancellationToken cancellationToken
)
{
- var totalPruned = 0;
+ // --- Server compute: load lightweight ID pairs, compute orphan set in C# ---
+ //
+ // Why not filter in the database?
+ // EF Core translates HashSet.Contains() into a SQL NOT IN(...) with every element
+ // as a literal parameter. With 5000+ expected IDs, this generates a massive SQL
+ // statement that can exceed Postgres's command timeout just in query planning on
+ // low-resource instances (2 vCPUs).
+ //
+ // Instead, we fetch all (id, external_id) pairs — a lightweight projection that
+ // transfers ~300KB even at 10K manifests — and compute the set difference in C#
+ // where it's a trivial O(n) HashSet lookup. The database only sees simple queries
+ // with small IN(...) clauses during the batched deletes.
+ var allManifests = await dataContext
+ .Manifests.Select(m => new { m.Id, m.ExternalId })
+ .ToListAsync(cancellationToken);
- while (true)
+ var orphanedManifestIds = allManifests
+ .Where(m => !expectedExternalIds.Contains(m.ExternalId))
+ .Select(m => m.Id)
+ .ToList();
+
+ if (orphanedManifestIds.Count == 0)
{
- // Fetch the next batch of orphaned manifest IDs
- var orphanedManifestIds = await dataContext
- .Manifests.Where(m => !expectedExternalIds.Contains(m.ExternalId))
- .OrderBy(m => m.Id)
- .Select(m => m.Id)
- .Take(PruneBatchSize)
- .ToListAsync(cancellationToken);
+ logger.LogDebug("No orphaned manifests found");
+ return;
+ }
- if (orphanedManifestIds.Count == 0)
- break;
+ logger.LogInformation(
+ "Found {OrphanCount} orphaned manifest(s) to prune (of {TotalCount} total)",
+ orphanedManifestIds.Count,
+ allManifests.Count
+ );
+
+ // --- Database compute: delete orphans in batches by integer PK ---
+ //
+ // Each batch generates WHERE id IN (1, 2, ..., 500) — 500 integer PKs is a
+ // trivial query plan for Postgres regardless of instance size.
+ var totalPruned = 0;
+
+ foreach (var batch in orphanedManifestIds.Chunk(PruneBatchSize))
+ {
+ var batchIds = batch.ToList();
// Clear self-referencing FK (DependsOnManifestId) for any manifest pointing to
// an orphan in this batch. Handles both orphan→orphan and kept→orphan references.
await dataContext
.Manifests.Where(m =>
- m.DependsOnManifestId.HasValue
- && orphanedManifestIds.Contains(m.DependsOnManifestId.Value)
+ m.DependsOnManifestId.HasValue && batchIds.Contains(m.DependsOnManifestId.Value)
)
.ExecuteUpdateAsync(
s => s.SetProperty(m => m.DependsOnManifestId, (long?)null),
@@ -237,22 +282,22 @@ await dataContext
// Delete in FK-dependency order: WorkQueues → DeadLetters → Metadata → Manifests
await dataContext
.WorkQueues.Where(w =>
- w.ManifestId.HasValue && orphanedManifestIds.Contains(w.ManifestId.Value)
+ w.ManifestId.HasValue && batchIds.Contains(w.ManifestId.Value)
)
.ExecuteDeleteAsync(cancellationToken);
await dataContext
- .DeadLetters.Where(d => orphanedManifestIds.Contains(d.ManifestId))
+ .DeadLetters.Where(d => batchIds.Contains(d.ManifestId))
.ExecuteDeleteAsync(cancellationToken);
await dataContext
.Metadatas.Where(m =>
- m.ManifestId.HasValue && orphanedManifestIds.Contains(m.ManifestId.Value)
+ m.ManifestId.HasValue && batchIds.Contains(m.ManifestId.Value)
)
.ExecuteDeleteAsync(cancellationToken);
var pruned = await dataContext
- .Manifests.Where(m => orphanedManifestIds.Contains(m.Id))
+ .Manifests.Where(m => batchIds.Contains(m.Id))
.ExecuteDeleteAsync(cancellationToken);
totalPruned += pruned;
@@ -264,12 +309,9 @@ await dataContext
);
}
- if (totalPruned > 0)
- logger.LogInformation(
- "Finished pruning {Count} orphaned manifest(s) from the database",
- totalPruned
- );
- else
- logger.LogDebug("No orphaned manifests found");
+ logger.LogInformation(
+ "Finished pruning {Count} orphaned manifest(s) from the database",
+ totalPruned
+ );
}
}
diff --git a/src/Trax.Scheduler/Services/TraxScheduler/TraxScheduler.cs b/src/Trax.Scheduler/Services/TraxScheduler/TraxScheduler.cs
index 8fcfe9a..d2b1aa5 100644
--- a/src/Trax.Scheduler/Services/TraxScheduler/TraxScheduler.cs
+++ b/src/Trax.Scheduler/Services/TraxScheduler/TraxScheduler.cs
@@ -1127,17 +1127,23 @@ private async Task PruneStaleManifestsAsync(
CancellationToken ct
)
{
- var staleManifestIds = await context
- .Manifests.Where(m =>
- m.ExternalId.StartsWith(prunePrefix) && !keepExternalIds.Contains(m.ExternalId)
- )
- .Select(m => m.Id)
+ // Server compute: load prefixed manifest IDs, filter stale ones in C#.
+ // Avoids a NOT IN(...) clause with many string parameters that can timeout
+ // on low-resource Postgres instances during query planning.
+ var prefixedManifests = await context
+ .Manifests.Where(m => m.ExternalId.StartsWith(prunePrefix))
+ .Select(m => new { m.Id, m.ExternalId })
.ToListAsync(ct);
+ var staleManifestIds = prefixedManifests
+ .Where(m => !keepExternalIds.Contains(m.ExternalId))
+ .Select(m => m.Id)
+ .ToList();
+
if (staleManifestIds.Count == 0)
return;
- // Delete in FK-dependency order: work_queue → dead_letters → metadata → manifests
+ // Database compute: delete by integer PK — small IN(...) clause per query.
await context
.WorkQueues.Where(w =>
w.ManifestId.HasValue && staleManifestIds.Contains(w.ManifestId.Value)
diff --git a/src/Trax.Scheduler/Trains/ManifestManager/Junctions/CreateWorkQueueEntriesJunction.cs b/src/Trax.Scheduler/Trains/ManifestManager/Junctions/CreateWorkQueueEntriesJunction.cs
index 6aa827d..4749096 100644
--- a/src/Trax.Scheduler/Trains/ManifestManager/Junctions/CreateWorkQueueEntriesJunction.cs
+++ b/src/Trax.Scheduler/Trains/ManifestManager/Junctions/CreateWorkQueueEntriesJunction.cs
@@ -33,6 +33,17 @@ public override async Task Run(List views)
views.Count
);
+ var limit = schedulerConfiguration.MaxWorkQueueEntriesPerCycle;
+ if (limit.HasValue && views.Count > limit.Value)
+ {
+ logger.LogInformation(
+ "Limiting to {Limit} of {Total} due manifests (excess deferred to next cycle)",
+ limit.Value,
+ views.Count
+ );
+ views = views.Take(limit.Value).ToList();
+ }
+
foreach (var view in views)
{
try
diff --git a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/ManifestManagerTrainTests.cs b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/ManifestManagerTrainTests.cs
index 63665e9..e0b4942 100644
--- a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/ManifestManagerTrainTests.cs
+++ b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/ManifestManagerTrainTests.cs
@@ -12,6 +12,7 @@
using Trax.Effect.Models.Metadata.DTOs;
using Trax.Effect.Models.WorkQueue;
using Trax.Effect.Models.WorkQueue.DTOs;
+using Trax.Scheduler.Configuration;
using Trax.Scheduler.Tests.Integration.Fakes.Trains;
using Trax.Scheduler.Tests.Integration.Fixtures;
using Trax.Scheduler.Trains.ManifestManager;
@@ -435,6 +436,83 @@ public async Task Run_WhenManifestIsQueued_CreatesWorkQueueWithCorrectTrainName(
#endregion
+ #region MaxWorkQueueEntriesPerCycle Tests
+
+ [Test]
+ public async Task Run_WithMaxWorkQueueEntriesPerCycle_LimitsEntriesCreated()
+ {
+ // Arrange — seed 10 overdue interval manifests, limit creation to 3 per cycle
+ for (var i = 0; i < 10; i++)
+ await CreateAndSaveManifest(
+ scheduleType: ScheduleType.Interval,
+ intervalSeconds: 60,
+ inputValue: $"Limited_{i}"
+ );
+
+ var config = Scope.ServiceProvider.GetRequiredService();
+ var originalLimit = config.MaxWorkQueueEntriesPerCycle;
+ config.MaxWorkQueueEntriesPerCycle = 3;
+
+ try
+ {
+ // Act
+ await _train.Run(Unit.Default);
+
+ // Assert
+ DataContext.Reset();
+ var entryCount = await DataContext
+ .WorkQueues.Where(q => q.Status == WorkQueueStatus.Queued)
+ .CountAsync();
+
+ entryCount
+ .Should()
+ .Be(
+ 3,
+ "only MaxWorkQueueEntriesPerCycle entries should be created, excess deferred"
+ );
+ }
+ finally
+ {
+ config.MaxWorkQueueEntriesPerCycle = originalLimit;
+ }
+ }
+
+ [Test]
+ public async Task Run_WithMaxWorkQueueEntriesPerCycleNull_CreatesAllEntries()
+ {
+ // Arrange — seed 10 overdue interval manifests with no limit
+ for (var i = 0; i < 10; i++)
+ await CreateAndSaveManifest(
+ scheduleType: ScheduleType.Interval,
+ intervalSeconds: 60,
+ inputValue: $"Unlimited_{i}"
+ );
+
+ var config = Scope.ServiceProvider.GetRequiredService();
+ var originalLimit = config.MaxWorkQueueEntriesPerCycle;
+ config.MaxWorkQueueEntriesPerCycle = null;
+
+ try
+ {
+ // Act
+ await _train.Run(Unit.Default);
+
+ // Assert
+ DataContext.Reset();
+ var entryCount = await DataContext
+ .WorkQueues.Where(q => q.Status == WorkQueueStatus.Queued)
+ .CountAsync();
+
+ entryCount.Should().Be(10, "null limit means all due manifests get entries");
+ }
+ finally
+ {
+ config.MaxWorkQueueEntriesPerCycle = originalLimit;
+ }
+ }
+
+ #endregion
+
#region Full Train Integration Tests
[Test]
diff --git a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/OrphanManifestPruningTests.cs b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/OrphanManifestPruningTests.cs
index e5f177f..23ab9bf 100644
--- a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/OrphanManifestPruningTests.cs
+++ b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/OrphanManifestPruningTests.cs
@@ -357,6 +357,95 @@ public async Task StartAsync_WithMoreOrphansThanBatchSize_PrunesAllOrphans()
remaining.Should().Contain("kept-manifest");
}
+ [Test]
+ public async Task StartAsync_WithLargeExpectedIdSet_PrunesOnlyOrphans()
+ {
+ // Arrange: Simulate a production scenario with many expected manifests and a few orphans.
+ // The previous implementation generated NOT IN('id1', ..., 'id100') which could timeout
+ // with large expected sets. The new implementation loads all IDs and filters in C#.
+ var expectedCount = 100;
+ var orphanCount = 5;
+
+ var expectedIds = new List();
+ for (var i = 0; i < expectedCount; i++)
+ {
+ var externalId = $"expected-{i}";
+ await CreateAndSaveManifestWithExternalId(externalId);
+ expectedIds.Add(externalId);
+ }
+
+ for (var i = 0; i < orphanCount; i++)
+ await CreateAndSaveManifestWithExternalId($"orphan-{i}");
+
+ var configuration = CreateConfiguration(
+ expectedExternalIds: expectedIds,
+ pruneOrphanedManifests: true
+ );
+
+ var startupService = CreateStartupService(configuration);
+
+ // Act
+ await startupService.StartAsync(CancellationToken.None);
+
+ // Assert
+ DataContext.Reset();
+ var remaining = await DataContext.Manifests.Select(m => m.ExternalId).ToListAsync();
+
+ remaining.Should().HaveCount(expectedCount);
+ remaining.Should().NotContain(m => m.StartsWith("orphan-"));
+ }
+
+ [Test]
+ public async Task StartAsync_WithLargeExpectedIdSetAndOrphanData_CascadeDeletesRelatedData()
+ {
+ // Arrange: Large expected set + orphans with related data to verify cascade works
+ // with the server-side filtering approach.
+ var expectedIds = new List();
+ for (var i = 0; i < 50; i++)
+ {
+ var externalId = $"expected-{i}";
+ await CreateAndSaveManifestWithExternalId(externalId);
+ expectedIds.Add(externalId);
+ }
+
+ var orphan = await CreateAndSaveManifestWithExternalId("orphan-with-data");
+ await CreateAndSaveWorkQueueEntry(orphan);
+ await CreateAndSaveDeadLetter(orphan);
+ await CreateAndSaveMetadata(orphan, TrainState.Completed);
+
+ var configuration = CreateConfiguration(
+ expectedExternalIds: expectedIds,
+ pruneOrphanedManifests: true
+ );
+
+ var startupService = CreateStartupService(configuration);
+
+ // Act
+ await startupService.StartAsync(CancellationToken.None);
+
+ // Assert
+ DataContext.Reset();
+
+ var remaining = await DataContext.Manifests.Select(m => m.ExternalId).ToListAsync();
+ remaining.Should().HaveCount(50);
+ remaining.Should().NotContain("orphan-with-data");
+
+ var orphanWorkQueues = await DataContext
+ .WorkQueues.Where(w => w.ManifestId == orphan.Id)
+ .CountAsync();
+ orphanWorkQueues.Should().Be(0);
+
+ var orphanDeadLetters = await DataContext
+ .DeadLetters.Where(d => d.ManifestId == orphan.Id)
+ .CountAsync();
+ orphanDeadLetters.Should().Be(0);
+
+ var orphanMetadata = await DataContext
+ .Metadatas.Where(m => m.ManifestId == orphan.Id)
+ .CountAsync();
+ orphanMetadata.Should().Be(0);
+ }
+
#endregion
#region Helper Methods
diff --git a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/RecoverStuckJobsTests.cs b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/RecoverStuckJobsTests.cs
index 94abba4..b325c61 100644
--- a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/RecoverStuckJobsTests.cs
+++ b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/RecoverStuckJobsTests.cs
@@ -47,6 +47,10 @@ public async Task StartAsync_WithStuckInProgressJobs_MarksThemAsFailed()
updated.TrainState.Should().Be(TrainState.Failed);
updated.EndTime.Should().NotBeNull();
updated.FailureReason.Should().Contain("Server restarted while job was in progress");
+ updated.FailureException.Should().Be("ServerRestart");
+ updated
+ .FailureJunction.Should()
+ .Be(nameof(SchedulerStartupService), "recovery sets FailureJunction to its source");
}
[Test]
@@ -170,6 +174,42 @@ await CreateAndSaveMetadata(
await act.Should().NotThrowAsync();
}
+ [Test]
+ public async Task StartAsync_WithMoreStuckJobsThanBatchSize_RecoversAllInBatches()
+ {
+ // Arrange — Create more stuck jobs than PruneBatchSize to verify the batching
+ // loop processes all batches until complete.
+ var count = SchedulerStartupService.PruneBatchSize + 10;
+ var metadataIds = new List();
+
+ for (var i = 0; i < count; i++)
+ {
+ var manifest = await CreateAndSaveManifest(inputValue: $"Stuck_{i}");
+ var metadata = await CreateAndSaveMetadata(
+ manifest,
+ TrainState.InProgress,
+ startTime: DateTime.UtcNow.AddMinutes(-10)
+ );
+ metadataIds.Add(metadata.Id);
+ }
+
+ var configuration = CreateRecoveryConfiguration(recoverStuckJobs: true);
+ var startupService = CreateStartupService(configuration);
+
+ // Act
+ await startupService.StartAsync(CancellationToken.None);
+
+ // Assert — all stuck jobs should be recovered
+ DataContext.Reset();
+ var failedCount = await DataContext
+ .Metadatas.Where(m => metadataIds.Contains(m.Id) && m.TrainState == TrainState.Failed)
+ .CountAsync();
+
+ failedCount
+ .Should()
+ .Be(count, "all stuck jobs should be recovered across multiple batches");
+ }
+
#endregion
#region Helper Methods
diff --git a/tests/Trax.Scheduler.Tests.Stress/IntegrationTests/QueryPerformanceTests.cs b/tests/Trax.Scheduler.Tests.Stress/IntegrationTests/QueryPerformanceTests.cs
index 26df453..0a4f478 100644
--- a/tests/Trax.Scheduler.Tests.Stress/IntegrationTests/QueryPerformanceTests.cs
+++ b/tests/Trax.Scheduler.Tests.Stress/IntegrationTests/QueryPerformanceTests.cs
@@ -852,34 +852,43 @@ FOR UPDATE SKIP LOCKED
#region Orphan Manifest Pruning at Scale
[Test]
- public async Task PruneOrphanedManifests_Batched_AtStartupScale_CompletesWithinTimeout()
+ public async Task PruneOrphanedManifests_ServerSideFilter_CompletesWithinTimeout()
{
- // Simulates SchedulerStartupService.PruneOrphanedManifestsAsync with batched deletes.
- // With 500 manifests and batch size 500, this exercises at least one full batch.
+ // Simulates SchedulerStartupService.PruneOrphanedManifestsAsync:
+ // 1. Server compute: load all (id, external_id) pairs, compute orphan set in C#
+ // 2. Database compute: delete orphans in batches by integer PK
+ //
+ // This avoids the NOT IN(...) clause with many string parameters that caused
+ // command timeouts on low-resource Postgres instances.
var expectedIds = _manifests.Take(50).Select(m => m.ExternalId).ToHashSet();
var batchSize = SchedulerStartupService.PruneBatchSize;
var elapsed = await AssertCompletesWithin(
async () =>
{
+ // Server compute: lightweight projection, filter in C#
+ var allManifests = await DataContext
+ .Manifests.Select(m => new { m.Id, m.ExternalId })
+ .ToListAsync();
+
+ var orphanedIds = allManifests
+ .Where(m => !expectedIds.Contains(m.ExternalId))
+ .Select(m => m.Id)
+ .ToList();
+
+ orphanedIds.Should().HaveCount(ManifestCount - 50);
+
+ // Database compute: delete in batches by integer PK
var totalPruned = 0;
- while (true)
+ foreach (var batch in orphanedIds.Chunk(batchSize))
{
- var orphanedIds = await DataContext
- .Manifests.Where(m => !expectedIds.Contains(m.ExternalId))
- .OrderBy(m => m.Id)
- .Select(m => m.Id)
- .Take(batchSize)
- .ToListAsync();
-
- if (orphanedIds.Count == 0)
- break;
+ var batchIds = batch.ToList();
await DataContext
.Manifests.Where(m =>
m.DependsOnManifestId.HasValue
- && orphanedIds.Contains(m.DependsOnManifestId.Value)
+ && batchIds.Contains(m.DependsOnManifestId.Value)
)
.ExecuteUpdateAsync(s =>
s.SetProperty(m => m.DependsOnManifestId, (long?)null)
@@ -887,22 +896,22 @@ await DataContext
await DataContext
.WorkQueues.Where(w =>
- w.ManifestId.HasValue && orphanedIds.Contains(w.ManifestId.Value)
+ w.ManifestId.HasValue && batchIds.Contains(w.ManifestId.Value)
)
.ExecuteDeleteAsync();
await DataContext
- .DeadLetters.Where(d => orphanedIds.Contains(d.ManifestId))
+ .DeadLetters.Where(d => batchIds.Contains(d.ManifestId))
.ExecuteDeleteAsync();
await DataContext
.Metadatas.Where(m =>
- m.ManifestId.HasValue && orphanedIds.Contains(m.ManifestId.Value)
+ m.ManifestId.HasValue && batchIds.Contains(m.ManifestId.Value)
)
.ExecuteDeleteAsync();
totalPruned += await DataContext
- .Manifests.Where(m => orphanedIds.Contains(m.Id))
+ .Manifests.Where(m => batchIds.Contains(m.Id))
.ExecuteDeleteAsync();
}
@@ -912,7 +921,190 @@ await DataContext
);
TestContext.Out.WriteLine(
- $"PruneOrphanedManifests batched ({ManifestCount - 50} pruned, batch size {batchSize}): {elapsed.TotalMilliseconds:F0}ms"
+ $"PruneOrphanedManifests server-side filter ({ManifestCount - 50} pruned, batch size {batchSize}): {elapsed.TotalMilliseconds:F0}ms"
+ );
+ }
+
+ [Test]
+ public async Task PruneOrphanedManifests_LargeExpectedSet_CompletesWithinTimeout()
+ {
+ // Simulates the SuiteMirror production scenario: 450 expected manifests (large
+ // expected set), 50 orphans to prune. The previous NOT IN('id1', ..., 'id450')
+ // approach would generate a massive SQL statement. The server-side filter loads
+ // all 500 ID pairs (~15KB) and computes the diff in C# with O(n) HashSet lookups.
+ var expectedIds = _manifests.Take(ManifestCount - 50).Select(m => m.ExternalId).ToHashSet();
+ var batchSize = SchedulerStartupService.PruneBatchSize;
+
+ var elapsed = await AssertCompletesWithin(
+ async () =>
+ {
+ var allManifests = await DataContext
+ .Manifests.Select(m => new { m.Id, m.ExternalId })
+ .ToListAsync();
+
+ var orphanedIds = allManifests
+ .Where(m => !expectedIds.Contains(m.ExternalId))
+ .Select(m => m.Id)
+ .ToList();
+
+ orphanedIds.Should().HaveCount(50);
+
+ var totalPruned = 0;
+
+ foreach (var batch in orphanedIds.Chunk(batchSize))
+ {
+ var batchIds = batch.ToList();
+
+ await DataContext
+ .Manifests.Where(m =>
+ m.DependsOnManifestId.HasValue
+ && batchIds.Contains(m.DependsOnManifestId.Value)
+ )
+ .ExecuteUpdateAsync(s =>
+ s.SetProperty(m => m.DependsOnManifestId, (long?)null)
+ );
+
+ await DataContext
+ .WorkQueues.Where(w =>
+ w.ManifestId.HasValue && batchIds.Contains(w.ManifestId.Value)
+ )
+ .ExecuteDeleteAsync();
+
+ await DataContext
+ .DeadLetters.Where(d => batchIds.Contains(d.ManifestId))
+ .ExecuteDeleteAsync();
+
+ await DataContext
+ .Metadatas.Where(m =>
+ m.ManifestId.HasValue && batchIds.Contains(m.ManifestId.Value)
+ )
+ .ExecuteDeleteAsync();
+
+ totalPruned += await DataContext
+ .Manifests.Where(m => batchIds.Contains(m.Id))
+ .ExecuteDeleteAsync();
+ }
+
+ totalPruned.Should().Be(50);
+ },
+ TimeSpan.FromSeconds(60)
+ );
+
+ TestContext.Out.WriteLine(
+ $"PruneOrphanedManifests large expected set ({ManifestCount - 50} expected, 50 pruned): {elapsed.TotalMilliseconds:F0}ms"
+ );
+ }
+
+ [Test]
+ public async Task PruneOrphanedManifests_5KOrphans_CompletesWithinTimeout()
+ {
+ // Simulates the SuiteMirror production scenario at full scale: 5000 orphaned
+ // manifests to prune with 500 expected manifests to keep. This is the exact
+ // scenario that caused command timeouts — the old NOT IN(...) approach would
+ // inline 500 string parameters into the query, which Postgres struggled to plan
+ // on a 2 vCPU instance. The server-side filter loads all 5500 ID pairs (~165KB)
+ // and computes the diff in C#, then deletes in 10 batches of 500 integer PKs.
+ const int orphanCount = 5000;
+
+ // Seed 5000 additional orphan manifests (baseline 500 are the "expected" set)
+ var orphanGroup = await SeedManifestGroup("orphan-5k-group");
+ var orphans = new List(orphanCount);
+ for (var i = 0; i < orphanCount; i++)
+ {
+ var manifest = Manifest.Create(
+ new CreateManifest
+ {
+ Name = typeof(StressTestTrain),
+ IsEnabled = true,
+ ScheduleType = ScheduleType.Interval,
+ IntervalSeconds = 60,
+ MaxRetries = 3,
+ Properties = new StressTestInput { Value = $"orphan-{i}" },
+ }
+ );
+ manifest.ExternalId = $"orphan-{i}";
+ manifest.ManifestGroupId = orphanGroup.Id;
+ await DataContext.Track(manifest);
+ orphans.Add(manifest);
+ }
+ await DataContext.SaveChanges(CancellationToken.None);
+ DataContext.Reset();
+
+ TestContext.Out.WriteLine(
+ $"Seeded {orphanCount} orphan manifests (total: {ManifestCount + orphanCount})"
+ );
+
+ // The 500 baseline manifests (stress-0..stress-499) are the expected set
+ var expectedIds = _manifests.Select(m => m.ExternalId).ToHashSet();
+ var batchSize = SchedulerStartupService.PruneBatchSize;
+
+ var elapsed = await AssertCompletesWithin(
+ async () =>
+ {
+ // Server compute: load all (id, external_id) pairs, filter in C#
+ var allManifests = await DataContext
+ .Manifests.Select(m => new { m.Id, m.ExternalId })
+ .ToListAsync();
+
+ allManifests.Should().HaveCount(ManifestCount + orphanCount);
+
+ var orphanedIds = allManifests
+ .Where(m => !expectedIds.Contains(m.ExternalId))
+ .Select(m => m.Id)
+ .ToList();
+
+ orphanedIds.Should().HaveCount(orphanCount);
+
+ // Database compute: delete in batches of 500 by integer PK
+ var totalPruned = 0;
+
+ foreach (var batch in orphanedIds.Chunk(batchSize))
+ {
+ var batchIds = batch.ToList();
+
+ await DataContext
+ .Manifests.Where(m =>
+ m.DependsOnManifestId.HasValue
+ && batchIds.Contains(m.DependsOnManifestId.Value)
+ )
+ .ExecuteUpdateAsync(s =>
+ s.SetProperty(m => m.DependsOnManifestId, (long?)null)
+ );
+
+ await DataContext
+ .WorkQueues.Where(w =>
+ w.ManifestId.HasValue && batchIds.Contains(w.ManifestId.Value)
+ )
+ .ExecuteDeleteAsync();
+
+ await DataContext
+ .DeadLetters.Where(d => batchIds.Contains(d.ManifestId))
+ .ExecuteDeleteAsync();
+
+ await DataContext
+ .Metadatas.Where(m =>
+ m.ManifestId.HasValue && batchIds.Contains(m.ManifestId.Value)
+ )
+ .ExecuteDeleteAsync();
+
+ totalPruned += await DataContext
+ .Manifests.Where(m => batchIds.Contains(m.Id))
+ .ExecuteDeleteAsync();
+ }
+
+ totalPruned.Should().Be(orphanCount);
+
+ // Verify expected manifests are untouched
+ var remainingCount = await DataContext.Manifests.CountAsync();
+ remainingCount.Should().Be(ManifestCount);
+ },
+ TimeSpan.FromSeconds(120)
+ );
+
+ TestContext.Out.WriteLine(
+ $"PruneOrphanedManifests 5K orphans ({orphanCount} pruned, {ManifestCount} kept, "
+ + $"batch size {batchSize}, {(orphanCount + batchSize - 1) / batchSize} batches): "
+ + $"{elapsed.TotalMilliseconds:F0}ms"
);
}