Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Trax.Scheduler/Configuration/SchedulerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ public class SchedulerConfiguration
/// </remarks>
public int? MaxQueuedJobsPerCycle { get; set; } = 100;

/// <summary>
/// The maximum number of work queue entries created per ManifestManager polling cycle.
/// </summary>
/// <remarks>
/// Controls how many entries <see cref="Trains.ManifestManager.Junctions.CreateWorkQueueEntriesJunction"/>
/// creates per cycle. When more manifests are due than this limit, excess manifests are
/// deferred to the next polling cycle (default: 5 seconds). This prevents a burst of DB
/// writes from saturating low-resource database instances, particularly after extended
/// downtime when many manifests become due simultaneously via the <c>FireOnceNow</c>
/// misfire policy.
/// Set to null to disable the limit (unlimited — all due manifests are enqueued per cycle).
/// </remarks>
public int? MaxWorkQueueEntriesPerCycle { get; set; } = 200;

/// <summary>
/// Whether to automatically prune manifests from the database that are no longer
/// defined in the startup configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ public SchedulerConfigurationBuilder MaxQueuedJobsPerCycle(int? limit)
return this;
}

/// <summary>
/// Sets the maximum number of work queue entries created per ManifestManager polling cycle.
/// </summary>
/// <param name="limit">The maximum entries to create (default: 200, null = unlimited, minimum: 1)</param>
/// <returns>The builder for method chaining</returns>
/// <remarks>
/// Prevents a burst of DB writes after extended downtime when many manifests become due
/// simultaneously. Excess manifests are deferred to the next cycle (default: 5 seconds).
/// Set to null to disable the limit.
/// </remarks>
public SchedulerConfigurationBuilder MaxWorkQueueEntriesPerCycle(int? limit)
{
_configuration.MaxWorkQueueEntriesPerCycle = limit.HasValue
? Math.Max(1, limit.Value)
: null;
return this;
}

/// <summary>
/// Excludes a train type from the MaxActiveJobs count.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,54 @@ private async Task RecoverStuckJobs(CancellationToken cancellationToken)
using var scope = serviceProvider.CreateScope();
var dataContext = scope.ServiceProvider.GetRequiredService<IDataContext>();

var stuckJobs = await dataContext
.Metadatas.Where(m =>
m.TrainState == TrainState.InProgress && m.StartTime < serverStartTime
)
.ToListAsync(cancellationToken);
var totalRecovered = 0;

if (stuckJobs.Count == 0)
while (true)
{
logger.LogInformation(
"RecoverStuckJobs: no in-progress jobs found from before server start"
);
return;
}
var stuckIds = await dataContext
.Metadatas.Where(m =>
m.TrainState == TrainState.InProgress && m.StartTime < serverStartTime
)
.OrderBy(m => m.Id)
.Select(m => m.Id)
.Take(PruneBatchSize)
.ToListAsync(cancellationToken);

foreach (var metadata in stuckJobs)
{
metadata.TrainState = TrainState.Failed;
metadata.EndTime = DateTime.UtcNow;
metadata.AddException(
new InvalidOperationException("Server restarted while job was in progress")
);
}
if (stuckIds.Count == 0)
break;

await dataContext.SaveChanges(cancellationToken);
var now = DateTime.UtcNow;

logger.LogWarning(
"RecoverStuckJobs: failed {Count} stuck in-progress job(s) from before server start at {ServerStartTime}",
stuckJobs.Count,
serverStartTime
);
await dataContext
.Metadatas.Where(m =>
stuckIds.Contains(m.Id) && m.TrainState == TrainState.InProgress
)
.ExecuteUpdateAsync(
s =>
s.SetProperty(m => m.TrainState, TrainState.Failed)
.SetProperty(m => m.EndTime, now)
.SetProperty(
m => m.FailureReason,
"Server restarted while job was in progress"
)
.SetProperty(m => m.FailureException, "ServerRestart")
.SetProperty(m => m.FailureJunction, nameof(SchedulerStartupService)),
cancellationToken
);

totalRecovered += stuckIds.Count;
}

if (totalRecovered > 0)
logger.LogWarning(
"RecoverStuckJobs: failed {Count} stuck in-progress job(s) from before server start at {ServerStartTime}",
totalRecovered,
serverStartTime
);
else
logger.LogInformation(
"RecoverStuckJobs: no in-progress jobs found from before server start"
);
}

private async Task SeedPendingManifests(CancellationToken cancellationToken)
Expand Down Expand Up @@ -207,27 +225,54 @@ private async Task PruneOrphanedManifestsAsync(
CancellationToken cancellationToken
)
{
var totalPruned = 0;
// --- Server compute: load lightweight ID pairs, compute orphan set in C# ---
//
// Why not filter in the database?
// EF Core translates HashSet.Contains() into a SQL NOT IN(...) with every element
// as a literal parameter. With 5000+ expected IDs, this generates a massive SQL
// statement that can exceed Postgres's command timeout just in query planning on
// low-resource instances (2 vCPUs).
//
// Instead, we fetch all (id, external_id) pairs — a lightweight projection that
// transfers ~300KB even at 10K manifests — and compute the set difference in C#
// where it's a trivial O(n) HashSet lookup. The database only sees simple queries
// with small IN(...) clauses during the batched deletes.
var allManifests = await dataContext
.Manifests.Select(m => new { m.Id, m.ExternalId })
.ToListAsync(cancellationToken);

while (true)
var orphanedManifestIds = allManifests
.Where(m => !expectedExternalIds.Contains(m.ExternalId))
.Select(m => m.Id)
.ToList();

if (orphanedManifestIds.Count == 0)
{
// Fetch the next batch of orphaned manifest IDs
var orphanedManifestIds = await dataContext
.Manifests.Where(m => !expectedExternalIds.Contains(m.ExternalId))
.OrderBy(m => m.Id)
.Select(m => m.Id)
.Take(PruneBatchSize)
.ToListAsync(cancellationToken);
logger.LogDebug("No orphaned manifests found");
return;
}

if (orphanedManifestIds.Count == 0)
break;
logger.LogInformation(
"Found {OrphanCount} orphaned manifest(s) to prune (of {TotalCount} total)",
orphanedManifestIds.Count,
allManifests.Count
);

// --- Database compute: delete orphans in batches by integer PK ---
//
// Each batch generates WHERE id IN (1, 2, ..., 500) — 500 integer PKs is a
// trivial query plan for Postgres regardless of instance size.
var totalPruned = 0;

foreach (var batch in orphanedManifestIds.Chunk(PruneBatchSize))
{
var batchIds = batch.ToList();

// Clear self-referencing FK (DependsOnManifestId) for any manifest pointing to
// an orphan in this batch. Handles both orphan→orphan and kept→orphan references.
await dataContext
.Manifests.Where(m =>
m.DependsOnManifestId.HasValue
&& orphanedManifestIds.Contains(m.DependsOnManifestId.Value)
m.DependsOnManifestId.HasValue && batchIds.Contains(m.DependsOnManifestId.Value)
)
.ExecuteUpdateAsync(
s => s.SetProperty(m => m.DependsOnManifestId, (long?)null),
Expand All @@ -237,22 +282,22 @@ await dataContext
// Delete in FK-dependency order: WorkQueues → DeadLetters → Metadata → Manifests
await dataContext
.WorkQueues.Where(w =>
w.ManifestId.HasValue && orphanedManifestIds.Contains(w.ManifestId.Value)
w.ManifestId.HasValue && batchIds.Contains(w.ManifestId.Value)
)
.ExecuteDeleteAsync(cancellationToken);

await dataContext
.DeadLetters.Where(d => orphanedManifestIds.Contains(d.ManifestId))
.DeadLetters.Where(d => batchIds.Contains(d.ManifestId))
.ExecuteDeleteAsync(cancellationToken);

await dataContext
.Metadatas.Where(m =>
m.ManifestId.HasValue && orphanedManifestIds.Contains(m.ManifestId.Value)
m.ManifestId.HasValue && batchIds.Contains(m.ManifestId.Value)
)
.ExecuteDeleteAsync(cancellationToken);

var pruned = await dataContext
.Manifests.Where(m => orphanedManifestIds.Contains(m.Id))
.Manifests.Where(m => batchIds.Contains(m.Id))
.ExecuteDeleteAsync(cancellationToken);

totalPruned += pruned;
Expand All @@ -264,12 +309,9 @@ await dataContext
);
}

if (totalPruned > 0)
logger.LogInformation(
"Finished pruning {Count} orphaned manifest(s) from the database",
totalPruned
);
else
logger.LogDebug("No orphaned manifests found");
logger.LogInformation(
"Finished pruning {Count} orphaned manifest(s) from the database",
totalPruned
);
}
}
18 changes: 12 additions & 6 deletions src/Trax.Scheduler/Services/TraxScheduler/TraxScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,17 +1127,23 @@ private async Task PruneStaleManifestsAsync(
CancellationToken ct
)
{
var staleManifestIds = await context
.Manifests.Where(m =>
m.ExternalId.StartsWith(prunePrefix) && !keepExternalIds.Contains(m.ExternalId)
)
.Select(m => m.Id)
// Server compute: load prefixed manifest IDs, filter stale ones in C#.
// Avoids a NOT IN(...) clause with many string parameters that can timeout
// on low-resource Postgres instances during query planning.
var prefixedManifests = await context
.Manifests.Where(m => m.ExternalId.StartsWith(prunePrefix))
.Select(m => new { m.Id, m.ExternalId })
.ToListAsync(ct);

var staleManifestIds = prefixedManifests
.Where(m => !keepExternalIds.Contains(m.ExternalId))
.Select(m => m.Id)
.ToList();

if (staleManifestIds.Count == 0)
return;

// Delete in FK-dependency order: work_queue → dead_letters → metadata → manifests
// Database compute: delete by integer PK — small IN(...) clause per query.
await context
.WorkQueues.Where(w =>
w.ManifestId.HasValue && staleManifestIds.Contains(w.ManifestId.Value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ public override async Task<Unit> Run(List<ManifestDispatchView> views)
views.Count
);

var limit = schedulerConfiguration.MaxWorkQueueEntriesPerCycle;
if (limit.HasValue && views.Count > limit.Value)
{
logger.LogInformation(
"Limiting to {Limit} of {Total} due manifests (excess deferred to next cycle)",
limit.Value,
views.Count
);
views = views.Take(limit.Value).ToList();
}

foreach (var view in views)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Trax.Effect.Models.Metadata.DTOs;
using Trax.Effect.Models.WorkQueue;
using Trax.Effect.Models.WorkQueue.DTOs;
using Trax.Scheduler.Configuration;
using Trax.Scheduler.Tests.Integration.Fakes.Trains;
using Trax.Scheduler.Tests.Integration.Fixtures;
using Trax.Scheduler.Trains.ManifestManager;
Expand Down Expand Up @@ -435,6 +436,83 @@ public async Task Run_WhenManifestIsQueued_CreatesWorkQueueWithCorrectTrainName(

#endregion

#region MaxWorkQueueEntriesPerCycle Tests

[Test]
public async Task Run_WithMaxWorkQueueEntriesPerCycle_LimitsEntriesCreated()
{
// Arrange — seed 10 overdue interval manifests, limit creation to 3 per cycle
for (var i = 0; i < 10; i++)
await CreateAndSaveManifest(
scheduleType: ScheduleType.Interval,
intervalSeconds: 60,
inputValue: $"Limited_{i}"
);

var config = Scope.ServiceProvider.GetRequiredService<SchedulerConfiguration>();
var originalLimit = config.MaxWorkQueueEntriesPerCycle;
config.MaxWorkQueueEntriesPerCycle = 3;

try
{
// Act
await _train.Run(Unit.Default);

// Assert
DataContext.Reset();
var entryCount = await DataContext
.WorkQueues.Where(q => q.Status == WorkQueueStatus.Queued)
.CountAsync();

entryCount
.Should()
.Be(
3,
"only MaxWorkQueueEntriesPerCycle entries should be created, excess deferred"
);
}
finally
{
config.MaxWorkQueueEntriesPerCycle = originalLimit;
}
}

[Test]
public async Task Run_WithMaxWorkQueueEntriesPerCycleNull_CreatesAllEntries()
{
// Arrange — seed 10 overdue interval manifests with no limit
for (var i = 0; i < 10; i++)
await CreateAndSaveManifest(
scheduleType: ScheduleType.Interval,
intervalSeconds: 60,
inputValue: $"Unlimited_{i}"
);

var config = Scope.ServiceProvider.GetRequiredService<SchedulerConfiguration>();
var originalLimit = config.MaxWorkQueueEntriesPerCycle;
config.MaxWorkQueueEntriesPerCycle = null;

try
{
// Act
await _train.Run(Unit.Default);

// Assert
DataContext.Reset();
var entryCount = await DataContext
.WorkQueues.Where(q => q.Status == WorkQueueStatus.Queued)
.CountAsync();

entryCount.Should().Be(10, "null limit means all due manifests get entries");
}
finally
{
config.MaxWorkQueueEntriesPerCycle = originalLimit;
}
}

#endregion

#region Full Train Integration Tests

[Test]
Expand Down
Loading
Loading