diff --git a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/DataContextExtensionsCoverageTests.cs b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/DataContextExtensionsCoverageTests.cs new file mode 100644 index 0000000..e2d2e49 --- /dev/null +++ b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/DataContextExtensionsCoverageTests.cs @@ -0,0 +1,330 @@ +using FluentAssertions; +using Microsoft.EntityFrameworkCore; +using Trax.Effect.Enums; +using Trax.Scheduler.Configuration; +using Trax.Scheduler.Extensions; +using Trax.Scheduler.Tests.Integration.Fakes.Trains; +using Trax.Scheduler.Tests.Integration.Fixtures; +using Schedule = Trax.Scheduler.Services.Scheduling.Schedule; + +namespace Trax.Scheduler.Tests.Integration.IntegrationTests; + +/// +/// Direct coverage for the existing-row update branches and validation paths in +/// . The happy create paths are exercised by the +/// scheduler E2E tests; this fixture targets the second-call upsert paths plus the +/// variance / misfire validation that the scheduler API funnels through. +/// +[TestFixture] +public class DataContextExtensionsCoverageTests : TestSetup +{ + private static readonly Schedule IntervalSchedule = Schedule.FromInterval( + TimeSpan.FromMinutes(5) + ); + + #region UpsertManifestAsync — existing manifest update branch + + [Test] + public async Task UpsertManifestAsync_ExistingManifest_UpdatesAllFields() + { + var externalId = $"upsert-existing-{Guid.NewGuid():N}"; + + await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + externalId, + new SchedulerTestInput { Value = "first" }, + IntervalSchedule, + new ManifestOptions { Priority = 1, MaxRetries = 1 }, + groupId: "g1", + groupPriority: 1 + ); + await DataContext.SaveChanges(CancellationToken.None); + DataContext.Reset(); + + var updated = await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + externalId, + new SchedulerTestInput { Value = "second" }, + Schedule.FromCron("0 0 * * *"), + new ManifestOptions + { + Priority = 9, + MaxRetries = 7, + Timeout = TimeSpan.FromMinutes(2), + MisfirePolicy = MisfirePolicy.FireOnceNow, + MisfireThreshold = TimeSpan.FromMinutes(3), + }, + groupId: "g1", + groupPriority: 1 + ); + await DataContext.SaveChanges(CancellationToken.None); + + updated.ScheduleType.Should().Be(ScheduleType.Cron); + updated.CronExpression.Should().Be("0 0 * * *"); + updated.Priority.Should().Be(9); + updated.MaxRetries.Should().Be(7); + updated.TimeoutSeconds.Should().Be(120); + updated.MisfirePolicy.Should().Be(MisfirePolicy.FireOnceNow); + updated.MisfireThresholdSeconds.Should().Be(180); + } + + #endregion + + #region UpsertDependentManifestAsync — both branches + + [Test] + public async Task UpsertDependentManifestAsync_NewThenExisting_UpdatesScheduleAndDependency() + { + var parentId = $"dep-parent-{Guid.NewGuid():N}"; + var parent = await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + parentId, + new SchedulerTestInput(), + IntervalSchedule, + new ManifestOptions(), + groupId: "dep-group", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + + var childId = $"dep-child-{Guid.NewGuid():N}"; + + // First call: create branch + var created = await DataContext.UpsertDependentManifestAsync( + typeof(SchedulerTestTrain), + childId, + new SchedulerTestInput { Value = "v1" }, + parent.Id, + new ManifestOptions { IsDormant = false, Priority = 2 }, + groupId: "dep-group", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + DataContext.Reset(); + + // Second call: existing-branch update with IsDormant flipped on + var updated = await DataContext.UpsertDependentManifestAsync( + typeof(SchedulerTestTrain), + childId, + new SchedulerTestInput { Value = "v2" }, + parent.Id, + new ManifestOptions + { + IsDormant = true, + Priority = 5, + MaxRetries = 4, + Timeout = TimeSpan.FromSeconds(30), + MisfirePolicy = MisfirePolicy.DoNothing, + }, + groupId: "dep-group", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + + created.ScheduleType.Should().Be(ScheduleType.Dependent); + updated.Id.Should().Be(created.Id); + updated.ScheduleType.Should().Be(ScheduleType.DormantDependent); + updated.DependsOnManifestId.Should().Be(parent.Id); + updated.Priority.Should().Be(5); + updated.MaxRetries.Should().Be(4); + updated.TimeoutSeconds.Should().Be(30); + updated.CronExpression.Should().BeNull(); + updated.IntervalSeconds.Should().BeNull(); + updated.MisfirePolicy.Should().Be(MisfirePolicy.DoNothing); + } + + #endregion + + #region UpsertOnceManifestAsync — both branches + + [Test] + public async Task UpsertOnceManifestAsync_NewThenExisting_UpdatesScheduledAt() + { + var externalId = $"once-{Guid.NewGuid():N}"; + var firstAt = DateTime.UtcNow.AddHours(1); + + var created = await DataContext.UpsertOnceManifestAsync( + typeof(SchedulerTestTrain), + externalId, + new SchedulerTestInput { Value = "first" }, + firstAt, + new ManifestOptions { Priority = 1 }, + groupId: "once-group", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + DataContext.Reset(); + + var secondAt = DateTime.UtcNow.AddHours(3); + var updated = await DataContext.UpsertOnceManifestAsync( + typeof(SchedulerTestTrain), + externalId, + new SchedulerTestInput { Value = "second" }, + secondAt, + new ManifestOptions + { + Priority = 8, + MaxRetries = 2, + Timeout = TimeSpan.FromMinutes(10), + MisfirePolicy = MisfirePolicy.FireOnceNow, + MisfireThreshold = TimeSpan.FromMinutes(1), + }, + groupId: "once-group", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + + created.ScheduleType.Should().Be(ScheduleType.Once); + updated.Id.Should().Be(created.Id); + updated.ScheduleType.Should().Be(ScheduleType.Once); + updated.ScheduledAt.Should().BeCloseTo(secondAt, TimeSpan.FromSeconds(1)); + updated.Priority.Should().Be(8); + updated.MaxRetries.Should().Be(2); + updated.TimeoutSeconds.Should().Be(600); + updated.CronExpression.Should().BeNull(); + updated.IntervalSeconds.Should().BeNull(); + updated.MisfirePolicy.Should().Be(MisfirePolicy.FireOnceNow); + updated.MisfireThresholdSeconds.Should().Be(60); + } + + #endregion + + #region Variance validation + + [Test] + public async Task UpsertManifestAsync_WithNegativeVariance_Throws() + { + var schedule = IntervalSchedule with { Variance = TimeSpan.FromSeconds(-5) }; + + var act = async () => + { + await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + $"neg-var-{Guid.NewGuid():N}", + new SchedulerTestInput(), + schedule, + new ManifestOptions(), + groupId: "var-g", + groupPriority: 0 + ); + }; + + await act.Should().ThrowAsync().WithMessage("*non-negative*"); + } + + [Test] + public async Task UpsertManifestAsync_WithVarianceOnUnsupportedScheduleType_Throws() + { + var act = async () => + { + await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + $"bad-var-{Guid.NewGuid():N}", + new SchedulerTestInput(), + Schedule.FromInterval(TimeSpan.FromMinutes(1)) with + { + Type = ScheduleType.OnDemand, + Variance = TimeSpan.FromSeconds(10), + }, + new ManifestOptions(), + groupId: "var-g2", + groupPriority: 0 + ); + }; + + await act.Should() + .ThrowAsync() + .WithMessage("*Interval and Cron*"); + } + + [Test] + public async Task UpsertManifestAsync_WithValidVariance_PersistsVarianceSeconds() + { + var externalId = $"var-{Guid.NewGuid():N}"; + var schedule = IntervalSchedule with { Variance = TimeSpan.FromSeconds(45) }; + + var manifest = await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + externalId, + new SchedulerTestInput(), + schedule, + new ManifestOptions(), + groupId: "var-g3", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + + manifest.VarianceSeconds.Should().Be(45); + } + + [Test] + public async Task UpsertManifestAsync_WithoutVariance_ClearsVarianceSeconds() + { + // First write with variance set, then upsert without it to hit the null-clearing branch. + var externalId = $"clear-var-{Guid.NewGuid():N}"; + + await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + externalId, + new SchedulerTestInput(), + IntervalSchedule with + { + Variance = TimeSpan.FromSeconds(20), + }, + new ManifestOptions(), + groupId: "var-g4", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + DataContext.Reset(); + + var updated = await DataContext.UpsertManifestAsync( + typeof(SchedulerTestTrain), + externalId, + new SchedulerTestInput(), + IntervalSchedule, + new ManifestOptions(), + groupId: "var-g4", + groupPriority: 0 + ); + await DataContext.SaveChanges(CancellationToken.None); + + updated.VarianceSeconds.Should().BeNull(); + } + + #endregion + + #region EnsureManifestGroupAsync — existing-row branch + + [Test] + public async Task EnsureManifestGroupAsync_ExistingGroup_UpdatesProperties() + { + var name = $"reuse-{Guid.NewGuid():N}"; + + var firstId = await DataContext.EnsureManifestGroupAsync( + name, + priority: 1, + maxActiveJobs: 5, + isEnabled: true + ); + await DataContext.SaveChanges(CancellationToken.None); + + var secondId = await DataContext.EnsureManifestGroupAsync( + name, + priority: 9, + maxActiveJobs: 10, + isEnabled: false + ); + await DataContext.SaveChanges(CancellationToken.None); + + secondId.Should().Be(firstId); + var reloaded = await DataContext + .ManifestGroups.AsNoTracking() + .FirstAsync(g => g.Id == firstId); + reloaded.Priority.Should().Be(9); + reloaded.MaxActiveJobs.Should().Be(10); + reloaded.IsEnabled.Should().BeFalse(); + } + + #endregion +} diff --git a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/DispatchAndLoadCoverageTests.cs b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/DispatchAndLoadCoverageTests.cs new file mode 100644 index 0000000..401068e --- /dev/null +++ b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/DispatchAndLoadCoverageTests.cs @@ -0,0 +1,118 @@ +using FluentAssertions; +using LanguageExt; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Trax.Effect.Enums; +using Trax.Effect.Models.DeadLetter; +using Trax.Effect.Models.DeadLetter.DTOs; +using Trax.Effect.Models.Manifest; +using Trax.Effect.Models.Manifest.DTOs; +using Trax.Effect.Models.WorkQueue; +using Trax.Effect.Models.WorkQueue.DTOs; +using Trax.Scheduler.Tests.Integration.Fakes.Trains; +using Trax.Scheduler.Tests.Integration.Fixtures; +using Trax.Scheduler.Trains.JobDispatcher; +using Every = Trax.Scheduler.Services.Scheduling.Every; + +namespace Trax.Scheduler.Tests.Integration.IntegrationTests; + +/// +/// Targets uncovered branches in DispatchJobsJunction (dead-letter retry-link, null-input) +/// and LoadQueuedJobsJunction (LoadAllQueued path, used when MaxQueuedJobsPerCycle is null). +/// +[TestFixture] +public class DispatchAndLoadCoverageTests +{ + #region DispatchJobsJunction — dead-letter retry-metadata link + + [Test] + public async Task Dispatch_WhenWorkQueueHasDeadLetterId_LinksRetryMetadataOnDeadLetter() + { + await using var fx = await SchedulerE2EFixture.CreateAsync(s => + s.Schedule( + "dl-retry", + new SchedulerTestInput { Value = "x" }, + Every.Minutes(5) + ) + ); + await fx.MaterializePendingManifestsAsync(); + + var manifest = await fx + .DataContext.Manifests.Include(m => m.ManifestGroup) + .FirstAsync(m => m.ExternalId == "dl-retry"); + + var deadLetter = DeadLetter.Create( + new CreateDeadLetter + { + Manifest = manifest, + Reason = "test", + RetryCount = 1, + } + ); + await fx.DataContext.Track(deadLetter); + await fx.DataContext.SaveChanges(default); + fx.DataContext.Reset(); + + // Requeue creates a WorkQueue with DeadLetterId set + var requeueResult = await fx.Scheduler.RequeueDeadLetterAsync(deadLetter.Id); + requeueResult.Success.Should().BeTrue(); + + // Dispatch — this exercises the LinkRetryMetadata branch + await fx.RunJobDispatcherAsync(); + + var reloadedDl = await fx + .DataContext.DeadLetters.AsNoTracking() + .FirstAsync(d => d.Id == deadLetter.Id); + reloadedDl + .RetryMetadataId.Should() + .NotBeNull("dispatched retry must be linked to dead letter"); + + var retryMetadata = await fx + .DataContext.Metadatas.AsNoTracking() + .FirstAsync(m => m.Id == reloadedDl.RetryMetadataId); + retryMetadata.ManifestId.Should().Be(manifest.Id); + } + + #endregion + + #region LoadQueuedJobsJunction — LoadAllQueued path (no per-cycle limit) + + [Test] + public async Task Dispatch_WhenMaxQueuedJobsPerCycleIsNull_LoadsAllQueuedEntries() + { + await using var fx = await SchedulerE2EFixture.CreateAsync(s => + { + s.MaxQueuedJobsPerCycle(null); + s.Schedule( + "load-all-1", + new SchedulerTestInput(), + Every.Minutes(5) + ); + s.Schedule( + "load-all-2", + new SchedulerTestInput(), + Every.Minutes(5) + ); + }); + await fx.MaterializePendingManifestsAsync(); + + // Force two queued entries (one per manifest) by triggering them directly. + await fx.Scheduler.TriggerAsync("load-all-1"); + await fx.Scheduler.TriggerAsync("load-all-2"); + + await fx.RunJobDispatcherAsync(); + + var dispatched = await fx + .DataContext.WorkQueues.AsNoTracking() + .Where(w => w.Status == WorkQueueStatus.Dispatched) + .ToListAsync(); + dispatched + .Count.Should() + .BeGreaterThanOrEqualTo( + 2, + "all queued entries must be dispatched in the no-limit path" + ); + } + + #endregion +} diff --git a/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/TraxSchedulerUntypedAndEdgeTests.cs b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/TraxSchedulerUntypedAndEdgeTests.cs new file mode 100644 index 0000000..fe03000 --- /dev/null +++ b/tests/Trax.Scheduler.Tests.Integration/IntegrationTests/TraxSchedulerUntypedAndEdgeTests.cs @@ -0,0 +1,425 @@ +using FluentAssertions; +using LanguageExt; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Trax.Effect.Enums; +using Trax.Effect.Models.Manifest; +using Trax.Effect.Models.Manifest.DTOs; +using Trax.Effect.Models.WorkQueue; +using Trax.Scheduler.Configuration; +using Trax.Scheduler.Services.TraxScheduler; +using Trax.Scheduler.Tests.Integration.Fakes.Trains; +using Trax.Scheduler.Tests.Integration.Fixtures; +using Schedule = Trax.Scheduler.Services.Scheduling.Schedule; + +namespace Trax.Scheduler.Tests.Integration.IntegrationTests; + +/// +/// Coverage for the TraxScheduler internal Untyped overloads (used by InferredScheduling), +/// the empty-source short-circuits, exception/rollback paths, group operations, and +/// PruneSafeAsync's catch-block. +/// +[TestFixture] +public class TraxSchedulerUntypedAndEdgeTests : TestSetup +{ + private TraxScheduler _scheduler = null!; + + public override async Task TestSetUp() + { + await base.TestSetUp(); + _scheduler = (TraxScheduler)Scope.ServiceProvider.GetRequiredService(); + } + + private static readonly Schedule Interval = Schedule.FromInterval(TimeSpan.FromMinutes(5)); + + #region Untyped overloads + + [Test] + public async Task ScheduleAsyncUntyped_PersistsManifestWithInterfaceFullName() + { + var externalId = $"untyped-{Guid.NewGuid():N}"; + + var manifest = await _scheduler.ScheduleAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + externalId, + new SchedulerTestInput { Value = "x" }, + Interval + ); + + manifest.ExternalId.Should().Be(externalId); + manifest.Name.Should().Be(typeof(SchedulerTestTrain).FullName); + manifest.ScheduleType.Should().Be(ScheduleType.Interval); + } + + [Test] + public async Task ScheduleOnceAsyncUntyped_PersistsManifestWithScheduledAt() + { + var externalId = $"untyped-once-{Guid.NewGuid():N}"; + + var manifest = await _scheduler.ScheduleOnceAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + externalId, + new SchedulerTestInput(), + TimeSpan.FromMinutes(30) + ); + + manifest.ScheduleType.Should().Be(ScheduleType.Once); + manifest.ScheduledAt.Should().NotBeNull(); + manifest.ScheduledAt!.Value.Should().BeAfter(DateTime.UtcNow.AddMinutes(25)); + } + + [Test] + public async Task ScheduleDependentAsyncUntyped_NewParent_PersistsDependent() + { + var parentId = $"u-parent-{Guid.NewGuid():N}"; + await _scheduler.ScheduleAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + parentId, + new SchedulerTestInput(), + Interval + ); + + var childId = $"u-child-{Guid.NewGuid():N}"; + var dep = await _scheduler.ScheduleDependentAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + childId, + new SchedulerTestInput(), + parentId + ); + + dep.ScheduleType.Should().Be(ScheduleType.Dependent); + dep.DependsOnManifestId.Should().NotBeNull(); + } + + [Test] + public async Task ScheduleDependentAsyncUntyped_MissingParent_Throws() + { + var act = async () => + await _scheduler.ScheduleDependentAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + "child", + new SchedulerTestInput(), + "missing-parent" + ); + + await act.Should().ThrowAsync().WithMessage("*not found*"); + } + + [Test] + public async Task ScheduleManyAsyncUntyped_EmptySources_ReturnsEmptyWithoutTransaction() + { + var result = await _scheduler.ScheduleManyAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + sources: Array.Empty(), + map: s => (s, new SchedulerTestInput { Value = s }), + schedule: Interval + ); + + result.Should().BeEmpty(); + } + + [Test] + public async Task ScheduleManyAsyncUntyped_WithSources_PersistsAllAndCommits() + { + var prefix = $"u-many-{Guid.NewGuid():N}"; + var sources = new[] { "a", "b", "c" }; + + var result = await _scheduler.ScheduleManyAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + sources: sources, + map: s => ($"{prefix}-{s}", new SchedulerTestInput { Value = s }), + schedule: Interval, + options: o => o.PrunePrefix(prefix) + ); + + result.Should().HaveCount(3); + DataContext.Reset(); + var stored = await DataContext + .Manifests.AsNoTracking() + .Where(m => m.ExternalId.StartsWith(prefix)) + .ToListAsync(); + stored.Should().HaveCount(3); + } + + [Test] + public async Task ScheduleManyDependentAsyncUntyped_EmptySources_ReturnsEmpty() + { + var result = await _scheduler.ScheduleManyDependentAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + sources: Array.Empty(), + map: s => (s, new SchedulerTestInput()), + dependsOn: _ => "n/a" + ); + + result.Should().BeEmpty(); + } + + [Test] + public async Task ScheduleManyDependentAsyncUntyped_MissingParent_RollsBack() + { + var prefix = $"u-many-dep-{Guid.NewGuid():N}"; + + var act = async () => + await _scheduler.ScheduleManyDependentAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + sources: new[] { "a", "b" }, + map: s => ($"{prefix}-{s}", new SchedulerTestInput()), + dependsOn: _ => "missing-parent-id" + ); + + await act.Should().ThrowAsync().WithMessage("*not found*"); + + DataContext.Reset(); + var leaked = await DataContext + .Manifests.AsNoTracking() + .CountAsync(m => m.ExternalId.StartsWith(prefix)); + leaked.Should().Be(0, "the transaction must roll back when a parent is missing"); + } + + [Test] + public async Task ScheduleManyDependentAsyncUntyped_WithSources_PersistsDependents() + { + var prefix = $"u-many-dep-ok-{Guid.NewGuid():N}"; + var parentId = $"{prefix}-parent"; + + await _scheduler.ScheduleAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + parentId, + new SchedulerTestInput(), + Interval + ); + + var result = await _scheduler.ScheduleManyDependentAsyncUntyped( + typeof(SchedulerTestTrain), + typeof(SchedulerTestInput), + sources: new[] { "x", "y" }, + map: s => ($"{prefix}-{s}", new SchedulerTestInput { Value = s }), + dependsOn: _ => parentId, + options: o => o.PrunePrefix(prefix) + ); + + result.Should().HaveCount(2); + result.Should().AllSatisfy(m => m.ScheduleType.Should().Be(ScheduleType.Dependent)); + } + + #endregion + + #region Public ScheduleManyAsync — empty + rollback + + [Test] + public async Task ScheduleManyAsync_EmptySources_ReturnsEmpty() + { + var result = await _scheduler.ScheduleManyAsync< + ISchedulerTestTrain, + SchedulerTestInput, + Unit, + string + >( + sources: Array.Empty(), + map: s => (s, new SchedulerTestInput { Value = s }), + schedule: Interval + ); + + result.Should().BeEmpty(); + } + + [Test] + public async Task ScheduleManyDependentAsync_EmptySources_ReturnsEmpty() + { + var result = await _scheduler.ScheduleManyDependentAsync< + ISchedulerTestTrain, + SchedulerTestInput, + Unit, + string + >( + sources: Array.Empty(), + map: s => (s, new SchedulerTestInput()), + dependsOn: _ => "x" + ); + + result.Should().BeEmpty(); + } + + [Test] + public async Task ScheduleManyDependentAsync_MissingParent_RollsBack() + { + var prefix = $"typed-many-dep-{Guid.NewGuid():N}"; + + var act = async () => + await _scheduler.ScheduleManyDependentAsync< + ISchedulerTestTrain, + SchedulerTestInput, + Unit, + string + >( + sources: new[] { "a" }, + map: s => ($"{prefix}-{s}", new SchedulerTestInput()), + dependsOn: _ => "missing-typed-parent" + ); + + await act.Should().ThrowAsync(); + + DataContext.Reset(); + (await DataContext.Manifests.CountAsync(m => m.ExternalId.StartsWith(prefix))) + .Should() + .Be(0); + } + + [Test] + public async Task ScheduleDependentAsync_MissingParent_Throws() + { + var act = async () => + await _scheduler.ScheduleDependentAsync( + "typed-child", + new SchedulerTestInput(), + "missing-typed-parent" + ); + + await act.Should().ThrowAsync().WithMessage("*not found*"); + } + + [Test] + public async Task ScheduleManyAsync_WithBadVariance_RollsBackTransaction() + { + // Variance < 0 throws inside UpsertManifestAsync; the catch path rolls back the tx. + var prefix = $"typed-many-rb-{Guid.NewGuid():N}"; + + var act = async () => + await _scheduler.ScheduleManyAsync< + ISchedulerTestTrain, + SchedulerTestInput, + Unit, + string + >( + sources: new[] { "a", "b" }, + map: s => ($"{prefix}-{s}", new SchedulerTestInput()), + schedule: Interval with + { + Variance = TimeSpan.FromSeconds(-1), + } + ); + + await act.Should().ThrowAsync(); + + DataContext.Reset(); + (await DataContext.Manifests.CountAsync(m => m.ExternalId.StartsWith(prefix))) + .Should() + .Be(0); + } + + #endregion + + #region TriggerGroupAsync — empty short-circuit + + [Test] + public async Task TriggerGroupAsync_NoMatchingManifests_ReturnsZero() + { + var count = await _scheduler.TriggerGroupAsync(groupId: -9999); + count.Should().Be(0); + } + + [Test] + public async Task TriggerGroupAsync_DependentManifestsExcluded_OnlyTriggersScheduled() + { + var group = await TestSetup.CreateAndSaveManifestGroup( + DataContext, + name: $"trig-group-{Guid.NewGuid():N}" + ); + + // Independent (will be triggered) + var indep = Manifest.Create( + new CreateManifest + { + Name = typeof(SchedulerTestTrain), + IsEnabled = true, + ScheduleType = ScheduleType.Interval, + IntervalSeconds = 60, + Properties = new SchedulerTestInput { Value = "indep" }, + } + ); + indep.ExternalId = $"indep-{Guid.NewGuid():N}"; + indep.ManifestGroupId = group.Id; + + // Dependent (must be skipped) + var dep = Manifest.Create( + new CreateManifest + { + Name = typeof(SchedulerTestTrain), + IsEnabled = true, + ScheduleType = ScheduleType.Dependent, + Properties = new SchedulerTestInput { Value = "dep" }, + } + ); + dep.ExternalId = $"dep-{Guid.NewGuid():N}"; + dep.ManifestGroupId = group.Id; + + await DataContext.Track(indep); + await DataContext.Track(dep); + await DataContext.SaveChanges(CancellationToken.None); + DataContext.Reset(); + + var triggered = await _scheduler.TriggerGroupAsync(group.Id); + triggered.Should().Be(1); + + var queued = await DataContext + .WorkQueues.AsNoTracking() + .Where(w => w.ManifestId == indep.Id || w.ManifestId == dep.Id) + .ToListAsync(); + queued.Should().HaveCount(1); + queued[0].ManifestId.Should().Be(indep.Id); + } + + #endregion + + #region CancelAsync / CancelGroupAsync — no in-progress short-circuit + + [Test] + public async Task CancelAsync_NoInProgress_ReturnsZero() + { + var group = await TestSetup.CreateAndSaveManifestGroup( + DataContext, + name: $"cancel-{Guid.NewGuid():N}" + ); + var manifest = Manifest.Create( + new CreateManifest + { + Name = typeof(SchedulerTestTrain), + IsEnabled = true, + ScheduleType = ScheduleType.Interval, + IntervalSeconds = 60, + Properties = new SchedulerTestInput(), + } + ); + manifest.ExternalId = $"cancel-noip-{Guid.NewGuid():N}"; + manifest.ManifestGroupId = group.Id; + await DataContext.Track(manifest); + await DataContext.SaveChanges(CancellationToken.None); + DataContext.Reset(); + + var cancelled = await _scheduler.CancelAsync(manifest.ExternalId); + cancelled.Should().Be(0); + } + + [Test] + public async Task CancelGroupAsync_NoInProgress_ReturnsZero() + { + var group = await TestSetup.CreateAndSaveManifestGroup( + DataContext, + name: $"cg-{Guid.NewGuid():N}" + ); + var cancelled = await _scheduler.CancelGroupAsync(group.Id); + cancelled.Should().Be(0); + } + + #endregion +} diff --git a/tests/Trax.Scheduler.Tests.Integration/UnitTests/ManifestManagerPollingServiceTests.cs b/tests/Trax.Scheduler.Tests.Integration/UnitTests/ManifestManagerPollingServiceTests.cs new file mode 100644 index 0000000..0b34812 --- /dev/null +++ b/tests/Trax.Scheduler.Tests.Integration/UnitTests/ManifestManagerPollingServiceTests.cs @@ -0,0 +1,136 @@ +using FluentAssertions; +using LanguageExt; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using Trax.Scheduler.Configuration; +using Trax.Scheduler.Services.ManifestManagerPollingService; +using Trax.Scheduler.Trains.ManifestManager; + +namespace Trax.Scheduler.Tests.Integration.UnitTests; + +/// +/// Deterministic coverage for ManifestManagerPollingService. Drives the InMemory branch +/// (HasDatabaseProvider = false) and the disabled short-circuit. Postgres advisory-lock +/// branch is exercised by SchedulerPollingCycleTests via the live train. +/// +[TestFixture] +public class ManifestManagerPollingServiceTests +{ + private static readonly TimeSpan SyncTimeout = TimeSpan.FromSeconds(5); + + private static IServiceProvider Provide(IManifestManagerTrain train) + { + var sp = Substitute.For(); + var scope = Substitute.For(); + var scopeFactory = Substitute.For(); + var scopedSp = Substitute.For(); + + scope.ServiceProvider.Returns(scopedSp); + scopeFactory.CreateScope().Returns(scope); + sp.GetService(typeof(IServiceScopeFactory)).Returns(scopeFactory); + scopedSp.GetService(typeof(IManifestManagerTrain)).Returns(train); + return sp; + } + + [Test] + public async Task ExecuteAsync_InMemoryProvider_RunsTrainAndContinuesLoop() + { + var trainHit = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var train = Substitute.For(); + train + .When(t => t.Run(Arg.Any(), Arg.Any())) + .Do(_ => trainHit.TrySetResult()); + + var config = new SchedulerConfiguration + { + ManifestManagerPollingInterval = TimeSpan.FromMinutes(5), + ManifestManagerEnabled = true, + HasDatabaseProvider = false, + }; + + var service = new ManifestManagerPollingService( + Provide(train), + config, + NullLogger.Instance, + sqlDialect: null + ); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + + var winner = await Task.WhenAny(trainHit.Task, Task.Delay(SyncTimeout)); + winner.Should().Be(trainHit.Task, "train should have been invoked on the first cycle"); + + cts.Cancel(); + await service.StopAsync(CancellationToken.None); + + await train.Received().Run(Arg.Any(), Arg.Any()); + } + + [Test] + public async Task ExecuteAsync_TrainThrows_CatchBlockSwallowsAndKeepsLoopAlive() + { + var trainHit = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var train = Substitute.For(); + train + .When(t => t.Run(Arg.Any(), Arg.Any())) + .Do(_ => + { + trainHit.TrySetResult(); + throw new InvalidOperationException("manifest-manager-fail"); + }); + + var config = new SchedulerConfiguration + { + ManifestManagerPollingInterval = TimeSpan.FromMinutes(5), + ManifestManagerEnabled = true, + HasDatabaseProvider = false, + }; + + var service = new ManifestManagerPollingService( + Provide(train), + config, + NullLogger.Instance, + sqlDialect: null + ); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + + var winner = await Task.WhenAny(trainHit.Task, Task.Delay(SyncTimeout)); + winner.Should().Be(trainHit.Task); + + cts.Cancel(); + var stop = async () => await service.StopAsync(CancellationToken.None); + await stop.Should().NotThrowAsync("the catch block must swallow the train exception"); + } + + [Test] + public async Task ExecuteAsync_Disabled_TrainNeverInvoked() + { + var train = Substitute.For(); + var config = new SchedulerConfiguration + { + ManifestManagerPollingInterval = TimeSpan.FromMinutes(5), + ManifestManagerEnabled = false, + HasDatabaseProvider = false, + }; + + var service = new ManifestManagerPollingService( + Provide(train), + config, + NullLogger.Instance, + sqlDialect: null + ); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + // Disabled branch is a no-op; tight wait is sufficient to confirm nothing fires. + await Task.Delay(150); + cts.Cancel(); + await service.StopAsync(CancellationToken.None); + + await train.DidNotReceive().Run(Arg.Any(), Arg.Any()); + } +}