Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

[![NuGet Version](https://img.shields.io/nuget/v/Trax.Scheduler)](https://www.nuget.org/packages/Trax.Scheduler/)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE)
[![codecov](https://codecov.io/gh/TraxSharp/Trax.Scheduler/branch/main/graph/badge.svg)](https://codecov.io/gh/TraxSharp/Trax.Scheduler)

Timetable management for [Trax](https://www.nuget.org/packages/Trax.Effect/) trains — recurring schedules, automatic retries, dead-letter handling, and dependent departures.

Expand Down
217 changes: 217 additions & 0 deletions tests/Trax.Scheduler.Tests.Integration/Fixtures/SchedulerE2EFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Trax.Effect.Data.InMemory.Extensions;
using Trax.Effect.Data.Postgres.Extensions;
using Trax.Effect.Data.Services.DataContext;
using Trax.Effect.Data.Services.IDataContextFactory;
using Trax.Effect.Extensions;
using Trax.Effect.Provider.Json.Extensions;
using Trax.Effect.Provider.Parameter.Extensions;
using Trax.Mediator.Extensions;
using Trax.Scheduler.Configuration;
using Trax.Scheduler.Extensions;
using Trax.Scheduler.Services.TraxScheduler;
using Trax.Scheduler.Trains.DeadLetterCleanup;
using Trax.Scheduler.Trains.JobDispatcher;
using Trax.Scheduler.Trains.JobRunner;
using Trax.Scheduler.Trains.ManifestManager;

namespace Trax.Scheduler.Tests.Integration.Fixtures;

/// <summary>
/// Per-test scaffold that boots a full Trax scheduler with Postgres + the test-train assembly,
/// lets the test customise the SchedulerConfigurationBuilder, materialises the queued
/// PendingManifests via SchedulerStartupService's seeding path, and exposes the orchestration
/// trains (ManifestManager, JobDispatcher) so tests can drive the polling cycle directly.
/// </summary>
/// <remarks>
/// Distinct from <see cref="TestSetup"/>, which uses a single shared ServiceProvider
/// with a fixed scheduler config (no custom Schedule / Include / ScheduleMany). The E2E
/// fixture builds a fresh provider per test so each test can declare its own scheduling
/// graph and assert on the resulting Manifest / WorkQueue / Metadata rows.
/// </remarks>
public sealed class SchedulerE2EFixture : IAsyncDisposable
{
private readonly ServiceProvider _provider;
private readonly IServiceScope _scope;

public IDataContext DataContext { get; }
public ITraxScheduler Scheduler { get; }
public SchedulerConfiguration Configuration { get; }

private SchedulerE2EFixture(
ServiceProvider provider,
IServiceScope scope,
IDataContext dataContext,
ITraxScheduler scheduler,
SchedulerConfiguration configuration
)
{
_provider = provider;
_scope = scope;
DataContext = dataContext;
Scheduler = scheduler;
Configuration = configuration;
}

/// <summary>
/// Builds a fresh ServiceProvider, applies <paramref name="configureScheduler"/>, cleans
/// the database, and returns a fixture wired up for the test. Disposing the fixture tears
/// down the provider.
/// </summary>
public static async Task<SchedulerE2EFixture> CreateAsync(
Action<SchedulerConfigurationBuilder> configureScheduler
)
{
var configuration = new ConfigurationBuilder()
.SetBasePath(AppContext.BaseDirectory)
.AddJsonFile("appsettings.json", optional: false)
.Build();
var connectionString = configuration.GetRequiredSection("Configuration")[
"DatabaseConnectionString"
]!;

// Each E2E test stands up its own ServiceProvider with its own Npgsql connection pool.
// Pin the pool to a single connection that immediately returns to the pool — without
// this, parallel fixtures briefly hold dozens of connections and trip Postgres's
// max_connections cap (the validation tests in this suite see "53300: too many clients").
if (!connectionString.Contains("Maximum Pool Size", StringComparison.OrdinalIgnoreCase))
connectionString +=
";Pooling=true;Maximum Pool Size=1;Minimum Pool Size=0;Connection Idle Lifetime=1;Connection Pruning Interval=1";

var provider = new ServiceCollection()
.AddLogging(b => b.AddProvider(NullLoggerProvider.Instance))
.AddTrax(trax =>
trax.AddEffects(effects =>
effects.UsePostgres(connectionString).AddJson().SaveTrainParameters()
)
.AddMediator(typeof(AssemblyMarker).Assembly, typeof(JobRunnerTrain).Assembly)
.AddScheduler(scheduler =>
{
scheduler.UseInMemoryWorkers();
configureScheduler(scheduler);
return scheduler;
})
)
.AddScoped<IDataContext>(sp =>
{
var factory = sp.GetRequiredService<IDataContextProviderFactory>();
return (IDataContext)factory.Create();
})
.BuildServiceProvider();

var scope = provider.CreateScope();
var dataContext = scope.ServiceProvider.GetRequiredService<IDataContext>();

await TestSetup.CleanupDatabase(dataContext);

return new SchedulerE2EFixture(
provider,
scope,
dataContext,
scope.ServiceProvider.GetRequiredService<ITraxScheduler>(),
scope.ServiceProvider.GetRequiredService<SchedulerConfiguration>()
);
}

/// <summary>
/// Invokes every queued <c>PendingManifest.ScheduleFunc</c> against the live scheduler.
/// Mirrors what <c>SchedulerStartupService.SeedPendingManifests</c> does at host startup,
/// but in the test thread so failures surface as exceptions rather than logged warnings.
/// </summary>
public async Task MaterializePendingManifestsAsync(CancellationToken ct = default)
{
foreach (var pending in Configuration.PendingManifests.ToList())
await pending.ScheduleFunc(Scheduler, ct);
}

/// <summary>
/// Runs the ManifestManager train end-to-end: loads manifests, processes timeouts,
/// determines which jobs to queue, and writes WorkQueue entries. Use after
/// <see cref="MaterializePendingManifestsAsync"/> to exercise the queueing pipeline.
/// </summary>
public Task RunManifestManagerAsync(CancellationToken ct = default)
{
var train = _scope.ServiceProvider.GetRequiredService<IManifestManagerTrain>();
return train.Run(LanguageExt.Unit.Default, ct);
}

/// <summary>
/// Runs the JobDispatcher train end-to-end: loads queued work, applies capacity limits,
/// and dispatches jobs to the registered run executor.
/// </summary>
public Task RunJobDispatcherAsync(CancellationToken ct = default)
{
var train = _scope.ServiceProvider.GetRequiredService<IJobDispatcherTrain>();
return train.Run(LanguageExt.Unit.Default, ct);
}

/// <summary>
/// Runs the DeadLetterCleanup train end-to-end: deletes resolved dead letters older than
/// the configured retention period.
/// </summary>
public Task RunDeadLetterCleanupAsync(CancellationToken ct = default)
{
var train = _scope.ServiceProvider.GetRequiredService<IDeadLetterCleanupTrain>();
return train.Run(new DeadLetterCleanupRequest(), ct);
}

/// <summary>
/// Builds a fresh provider with the InMemory data provider so the test exercises
/// InMemoryManifestManagerTrain + InMemoryDispatchJobsJunction (the path used by tests
/// and small samples that don't need Postgres). Each call gets its own EF Core
/// InMemory database so no cleanup is needed.
/// </summary>
public static SchedulerE2EFixture CreateInMemory(
Action<SchedulerConfigurationBuilder> configureScheduler
)
{
var provider = new ServiceCollection()
.AddLogging(b => b.AddProvider(NullLoggerProvider.Instance))
.AddTrax(trax =>
trax.AddEffects(effects => effects.UseInMemory().AddJson().SaveTrainParameters())
.AddMediator(typeof(AssemblyMarker).Assembly, typeof(JobRunnerTrain).Assembly)
.AddScheduler(scheduler =>
{
configureScheduler(scheduler);
return scheduler;
})
)
.AddScoped<IDataContext>(sp =>
{
var factory = sp.GetRequiredService<IDataContextProviderFactory>();
return (IDataContext)factory.Create();
})
.BuildServiceProvider();

var scope = provider.CreateScope();
var dataContext = scope.ServiceProvider.GetRequiredService<IDataContext>();

return new SchedulerE2EFixture(
provider,
scope,
dataContext,
scope.ServiceProvider.GetRequiredService<ITraxScheduler>(),
scope.ServiceProvider.GetRequiredService<SchedulerConfiguration>()
);
}

public async ValueTask DisposeAsync()
{
if (DataContext is IAsyncDisposable async)
await async.DisposeAsync();
else if (DataContext is IDisposable sync)
sync.Dispose();

_scope.Dispose();
await _provider.DisposeAsync();

// Aggressively clear Npgsql connection pools so the next test in the suite
// (which builds its own ServiceProvider with its own pool) doesn't hit the
// server-side max_connections cap on the shared trax_scheduler_tests database.
Npgsql.NpgsqlConnection.ClearAllPools();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using FluentAssertions;
using LanguageExt;
using Microsoft.EntityFrameworkCore;
using NUnit.Framework;
using Trax.Scheduler.Tests.Integration.Fakes.Trains;
using Trax.Scheduler.Tests.Integration.Fixtures;
using Every = Trax.Scheduler.Services.Scheduling.Every;

namespace Trax.Scheduler.Tests.Integration.IntegrationTests;

/// <summary>
/// Drives the InMemory polling pipeline (InMemoryManifestManagerTrain →
/// InMemoryDispatchJobsJunction → InMemoryJobSubmitter). The Postgres pipeline tests
/// don't reach this code path because UsePostgres registers the standard
/// ManifestManagerTrain + JobDispatcherTrain instead.
/// </summary>
[TestFixture]
public class InMemoryDispatchTests
{
private const string TrainNameFilter = "SchedulerTestTrain";

[Test]
public async Task ManifestManager_InMemory_DispatchesJobsInline()
{
await using var fx = SchedulerE2EFixture.CreateInMemory(s =>
s.Schedule<ISchedulerTestTrain>(
"inmem-dispatch",
new SchedulerTestInput { Value = "hi" },
Every.Minutes(1)
)
);
await fx.MaterializePendingManifestsAsync();

await fx.RunManifestManagerAsync();

// InMemoryDispatchJobsJunction creates Metadata records inline for the test train
// (the orchestration trains create their own Metadata too — filter by train name).
var metadatas = await fx
.DataContext.Metadatas.AsNoTracking()
.Where(m => m.Name.Contains(TrainNameFilter))
.ToListAsync();
metadatas.Should().NotBeEmpty();
}

[Test]
public async Task ManifestManager_InMemory_DisabledManifest_NoMetadataCreated()
{
await using var fx = SchedulerE2EFixture.CreateInMemory(s =>
s.Schedule<ISchedulerTestTrain>(
"inmem-disabled",
new SchedulerTestInput(),
Every.Minutes(1),
opts => opts.Enabled(false)
)
);
await fx.MaterializePendingManifestsAsync();

await fx.RunManifestManagerAsync();

var metadatas = await fx
.DataContext.Metadatas.AsNoTracking()
.Where(m => m.Name.Contains(TrainNameFilter))
.ToListAsync();
metadatas.Should().BeEmpty();
}

[Test]
public async Task ManifestManager_InMemory_NoManifests_NoOp()
{
await using var fx = SchedulerE2EFixture.CreateInMemory(s => { });

await fx.RunManifestManagerAsync();

var testMetadatas = await fx
.DataContext.Metadatas.AsNoTracking()
.Where(m => m.Name.Contains(TrainNameFilter))
.ToListAsync();
testMetadatas.Should().BeEmpty();
}

[Test]
public async Task ManifestManager_InMemory_MultipleManifests_DispatchesEach()
{
await using var fx = SchedulerE2EFixture.CreateInMemory(s =>
s.Schedule<ISchedulerTestTrain>("inmem-a", new SchedulerTestInput(), Every.Minutes(1))
.Include<ISchedulerTestTrain>("inmem-b", new SchedulerTestInput())
);
await fx.MaterializePendingManifestsAsync();

await fx.RunManifestManagerAsync();

var metadatas = await fx
.DataContext.Metadatas.AsNoTracking()
.Where(m => m.Name.Contains(TrainNameFilter))
.ToListAsync();
// Only the root manifest fires on first cycle; dependent fires after parent succeeds.
metadatas.Should().HaveCountGreaterThan(0);
}
}
Loading
Loading