diff --git a/.github/workflows/nuget_release.yml b/.github/workflows/nuget_release.yml
index 25fc8e9..ee37d87 100644
--- a/.github/workflows/nuget_release.yml
+++ b/.github/workflows/nuget_release.yml
@@ -36,6 +36,18 @@ jobs:
--health-interval=10s
--health-timeout=5s
--health-retries=5
+ rabbitmq:
+ image: rabbitmq:3
+ env:
+ RABBITMQ_DEFAULT_USER: guest
+ RABBITMQ_DEFAULT_PASS: guest
+ ports:
+ - 5672:5672
+ options: >-
+ --health-cmd="rabbitmq-diagnostics -q ping"
+ --health-interval=10s
+ --health-timeout=5s
+ --health-retries=10
steps:
- name: Generate token
id: app-token
diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml
index 16dd894..9cfec1d 100644
--- a/.github/workflows/pull_request.yml
+++ b/.github/workflows/pull_request.yml
@@ -25,6 +25,18 @@ jobs:
--health-interval=10s
--health-timeout=5s
--health-retries=5
+ rabbitmq:
+ image: rabbitmq:3
+ env:
+ RABBITMQ_DEFAULT_USER: guest
+ RABBITMQ_DEFAULT_PASS: guest
+ ports:
+ - 5672:5672
+ options: >-
+ --health-cmd="rabbitmq-diagnostics -q ping"
+ --health-interval=10s
+ --health-timeout=5s
+ --health-retries=10
steps:
- uses: actions/checkout@v4
- uses: actions/setup-dotnet@v4
diff --git a/tests/Trax.Effect.Tests.Broadcaster/UnitTests/RabbitMqBroadcasterIntegrationTests.cs b/tests/Trax.Effect.Tests.Broadcaster/UnitTests/RabbitMqBroadcasterIntegrationTests.cs
new file mode 100644
index 0000000..5aba8b7
--- /dev/null
+++ b/tests/Trax.Effect.Tests.Broadcaster/UnitTests/RabbitMqBroadcasterIntegrationTests.cs
@@ -0,0 +1,150 @@
+using FluentAssertions;
+using Microsoft.Extensions.Logging.Abstractions;
+using NUnit.Framework;
+using Trax.Effect.Broadcaster.RabbitMQ;
+using Trax.Effect.Services.TrainEventBroadcaster;
+
+namespace Trax.Effect.Tests.Broadcaster.UnitTests;
+
+///
+/// Integration tests that exercise and
+/// against a real RabbitMQ broker.
+/// CI provisions a rabbitmq:3 service container; locally a developer-mode broker
+/// listens on amqp://guest:guest@localhost:5672/.
+///
+[TestFixture]
+public class RabbitMqBroadcasterIntegrationTests
+{
+ private const string AmqpUri = "amqp://guest:guest@localhost:5672/";
+
+ private static RabbitMqBroadcasterOptions Options(string suffix) =>
+ new()
+ {
+ ConnectionString = AmqpUri,
+ ExchangeName = $"trax.test.{suffix}.{Guid.NewGuid():N}",
+ };
+
+ private static TrainLifecycleEventMessage SampleMessage(string trainName) =>
+ new(
+ MetadataId: 1,
+ ExternalId: "ext-1",
+ TrainName: trainName,
+ TrainState: "InProgress",
+ Timestamp: DateTime.UtcNow,
+ FailureJunction: null,
+ FailureReason: null,
+ EventType: "Started",
+ Executor: null,
+ Output: null
+ );
+
+ [Test]
+ public async Task PublishAsync_DeliversMessage_ToReceiver()
+ {
+ var opts = Options("publish");
+ await using var broadcaster = new RabbitMqTrainEventBroadcaster(
+ opts,
+ NullLogger.Instance
+ );
+ await using var receiver = new RabbitMqTrainEventReceiver(
+ opts,
+ NullLogger.Instance
+ );
+
+ var received = new TaskCompletionSource();
+ await receiver.StartAsync(
+ (msg, _) =>
+ {
+ received.TrySetResult(msg);
+ return Task.CompletedTask;
+ },
+ CancellationToken.None
+ );
+
+ await broadcaster.PublishAsync(SampleMessage("RoundTrip.Train"), CancellationToken.None);
+
+ var awaited = await Task.WhenAny(received.Task, Task.Delay(TimeSpan.FromSeconds(10)));
+ awaited.Should().Be(received.Task);
+ received.Task.Result.TrainName.Should().Be("RoundTrip.Train");
+
+ await receiver.StopAsync(CancellationToken.None);
+ }
+
+ [Test]
+ public async Task PublishAsync_TwiceOnSameBroadcaster_ReusesChannel()
+ {
+ var opts = Options("reuse");
+ await using var broadcaster = new RabbitMqTrainEventBroadcaster(
+ opts,
+ NullLogger.Instance
+ );
+
+ // First publish opens the connection + declares the exchange. The second
+ // publish should hit the already-open channel and skip the exchange-declare.
+ await broadcaster.PublishAsync(SampleMessage("First"), CancellationToken.None);
+ await broadcaster.PublishAsync(SampleMessage("Second"), CancellationToken.None);
+ }
+
+ [Test]
+ public async Task Receiver_HandlerThrows_NacksAndKeepsConsuming()
+ {
+ var opts = Options("handler-throws");
+ await using var broadcaster = new RabbitMqTrainEventBroadcaster(
+ opts,
+ NullLogger.Instance
+ );
+ await using var receiver = new RabbitMqTrainEventReceiver(
+ opts,
+ NullLogger.Instance
+ );
+
+ var calls = 0;
+ var second = new TaskCompletionSource();
+ await receiver.StartAsync(
+ (msg, _) =>
+ {
+ calls++;
+ if (calls == 1)
+ throw new InvalidOperationException("handler down");
+ second.TrySetResult();
+ return Task.CompletedTask;
+ },
+ CancellationToken.None
+ );
+
+ await broadcaster.PublishAsync(SampleMessage("first"), CancellationToken.None);
+ await broadcaster.PublishAsync(SampleMessage("second"), CancellationToken.None);
+
+ var awaited = await Task.WhenAny(second.Task, Task.Delay(TimeSpan.FromSeconds(10)));
+ awaited.Should().Be(second.Task);
+ calls.Should().BeGreaterThanOrEqualTo(2);
+
+ await receiver.StopAsync(CancellationToken.None);
+ }
+
+ [Test]
+ public async Task Receiver_StopAsync_ThenDisposeAsync_DoesNotThrow()
+ {
+ var opts = Options("stop-dispose");
+ var receiver = new RabbitMqTrainEventReceiver(
+ opts,
+ NullLogger.Instance
+ );
+ await receiver.StartAsync((_, _) => Task.CompletedTask, CancellationToken.None);
+
+ await receiver.StopAsync(CancellationToken.None);
+ await receiver.DisposeAsync();
+ }
+
+ [Test]
+ public async Task Broadcaster_DisposeAsync_WithNoPublishes_DoesNotThrow()
+ {
+ var opts = Options("nop-dispose");
+ var broadcaster = new RabbitMqTrainEventBroadcaster(
+ opts,
+ NullLogger.Instance
+ );
+
+ await broadcaster.DisposeAsync();
+ }
+}
diff --git a/tests/Trax.Effect.Tests.Integration/UnitTests/Services/DataContextLoggingProviderTests.cs b/tests/Trax.Effect.Tests.Integration/UnitTests/Services/DataContextLoggingProviderTests.cs
new file mode 100644
index 0000000..68bdf6e
--- /dev/null
+++ b/tests/Trax.Effect.Tests.Integration/UnitTests/Services/DataContextLoggingProviderTests.cs
@@ -0,0 +1,222 @@
+using System.Text.RegularExpressions;
+using System.Threading.Channels;
+using FluentAssertions;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using NSubstitute;
+using NUnit.Framework;
+using Trax.Effect.Data.InMemory.Extensions;
+using Trax.Effect.Data.Services.DataContext;
+using Trax.Effect.Data.Services.DataContextLoggingProvider;
+using Trax.Effect.Data.Services.IDataContextFactory;
+using Trax.Effect.Extensions;
+
+namespace Trax.Effect.Tests.Integration.UnitTests.Services;
+
+[TestFixture]
+public class DataContextLoggingProviderTests
+{
+ #region DataContextLogger — direct tests
+
+ private static DataContextLogger BuildLogger(
+ out Channel channel,
+ string categoryName = "MyApp.Foo",
+ LogLevel minLevel = LogLevel.Information,
+ HashSet? exact = null,
+ List? wildcards = null
+ )
+ {
+ channel = Channel.CreateUnbounded();
+ return new DataContextLogger(
+ channel.Writer,
+ categoryName,
+ minLevel,
+ exact ?? [],
+ wildcards ?? []
+ );
+ }
+
+ [Test]
+ public void Log_AboveMinimum_WritesToChannel()
+ {
+ var logger = BuildLogger(out var channel);
+
+ logger.Log(LogLevel.Warning, new EventId(7, "evt"), "msg", null, (_, _) => "rendered");
+
+ channel.Reader.TryRead(out var written).Should().BeTrue();
+ written!.Level.Should().Be(LogLevel.Warning);
+ written.Message.Should().Be("rendered");
+ written.Category.Should().Be("MyApp.Foo");
+ written.EventId.Should().Be(7);
+ }
+
+ [Test]
+ public void Log_BelowMinimum_SkipsWrite()
+ {
+ var logger = BuildLogger(out var channel, minLevel: LogLevel.Warning);
+
+ logger.Log(LogLevel.Debug, default, "x", null, (_, _) => "x");
+
+ channel.Reader.TryRead(out _).Should().BeFalse();
+ }
+
+ [Test]
+ public void Log_EFCoreDatabaseCommandCategory_AlwaysSkipped()
+ {
+ // Hardcoded short-circuit to avoid persisting EF's own SQL traces (would
+ // cause infinite log recursion when the logger's flush loop runs SaveChanges).
+ var logger = BuildLogger(
+ out var channel,
+ categoryName: "Microsoft.EntityFrameworkCore.Database.Command"
+ );
+
+ logger.Log(LogLevel.Critical, default, "x", null, (_, _) => "x");
+
+ channel.Reader.TryRead(out _).Should().BeFalse();
+ }
+
+ [Test]
+ public void Log_ExactBlacklisted_Skipped()
+ {
+ var logger = BuildLogger(
+ out var channel,
+ categoryName: "Noisy.Thing",
+ exact: ["Noisy.Thing"]
+ );
+
+ logger.Log(LogLevel.Information, default, "m", null, (_, _) => "m");
+
+ channel.Reader.TryRead(out _).Should().BeFalse();
+ }
+
+ [Test]
+ public void Log_WildcardBlacklisted_Skipped()
+ {
+ var pattern = new Regex(@"^Microsoft\..*$", RegexOptions.Compiled);
+ var logger = BuildLogger(
+ out var channel,
+ categoryName: "Microsoft.SomethingNoisy",
+ wildcards: [pattern]
+ );
+
+ logger.Log(LogLevel.Information, default, "m", null, (_, _) => "m");
+
+ channel.Reader.TryRead(out _).Should().BeFalse();
+ }
+
+ [Test]
+ public void IsEnabled_ChecksMinimumLevel()
+ {
+ var logger = BuildLogger(out _, minLevel: LogLevel.Warning);
+
+ logger.IsEnabled(LogLevel.Trace).Should().BeFalse();
+ logger.IsEnabled(LogLevel.Information).Should().BeFalse();
+ logger.IsEnabled(LogLevel.Warning).Should().BeTrue();
+ logger.IsEnabled(LogLevel.Error).Should().BeTrue();
+ }
+
+ [Test]
+ public void BeginScope_ReturnsNull()
+ {
+ var logger = BuildLogger(out _);
+
+ logger.BeginScope("any").Should().BeNull();
+ }
+
+ #endregion
+
+ #region DataContextLoggingProvider — via InMemory factory
+
+ private sealed class FakeConfig : IDataContextLoggingProviderConfiguration
+ {
+ public LogLevel MinimumLogLevel { get; init; } = LogLevel.Information;
+ public List Blacklist { get; init; } = [];
+ }
+
+ private static (DataContextLoggingProvider provider, IDataContext context) BuildProvider(
+ IDataContextLoggingProviderConfiguration config
+ )
+ {
+ var services = new ServiceCollection();
+ services.AddLogging();
+ services.AddTrax(trax => trax.AddEffects(effects => effects.UseInMemory()));
+ var sp = services.BuildServiceProvider();
+ var factory = sp.GetRequiredService();
+ var provider = new DataContextLoggingProvider(factory, config);
+ return (provider, (IDataContext)factory.Create());
+ }
+
+ [Test]
+ public void CreateLogger_ReturnsConfiguredLogger_HonoringMinimumLevel()
+ {
+ var (provider, _) = BuildProvider(new FakeConfig { MinimumLogLevel = LogLevel.Error });
+
+ var logger = provider.CreateLogger("Foo.Bar");
+
+ logger.IsEnabled(LogLevel.Information).Should().BeFalse();
+ logger.IsEnabled(LogLevel.Error).Should().BeTrue();
+ provider.Dispose();
+ }
+
+ [Test]
+ public void Constructor_BlacklistWithWildcards_BuildsRegexAndExactSets()
+ {
+ // Pattern with '*' becomes a regex; literal pattern goes to the exact set.
+ var (provider, _) = BuildProvider(
+ new FakeConfig { Blacklist = ["LiteralCategory", "EntityFramework.*"] }
+ );
+
+ // Both the literal and wildcard categories should be filtered out.
+ var literalLogger = provider.CreateLogger("LiteralCategory");
+ var wildcardLogger = provider.CreateLogger("EntityFramework.Internal.Stuff");
+ var passLogger = provider.CreateLogger("Allowed.Category");
+
+ literalLogger.Log(LogLevel.Warning, default, "x", null, (_, _) => "x");
+ wildcardLogger.Log(LogLevel.Warning, default, "x", null, (_, _) => "x");
+ passLogger.Log(LogLevel.Warning, default, "x", null, (_, _) => "x");
+
+ // The provider's flush loop will eventually persist the un-filtered log.
+ // We dispose to force the drain rather than wait for the 1-second timer tick.
+ provider.Dispose();
+ }
+
+ [Test]
+ public async Task FlushLoop_BatchesLogsToDatabase()
+ {
+ var (provider, context) = BuildProvider(
+ new FakeConfig { MinimumLogLevel = LogLevel.Trace }
+ );
+
+ var logger = provider.CreateLogger("TestCategory");
+ for (var i = 0; i < 5; i++)
+ logger.Log(LogLevel.Information, default, i, null, (s, _) => $"msg {s}");
+
+ // Wait for the 1-second flush timer to tick once.
+ await Task.Delay(1500);
+ provider.Dispose();
+
+ context.Reset();
+ var logs = await context
+ .Logs.AsNoTracking()
+ .Where(l => l.Category == "TestCategory")
+ .ToListAsync();
+ logs.Should().HaveCountGreaterThanOrEqualTo(1);
+ }
+
+ [Test]
+ public void Dispose_CalledTwice_DoesNotThrow()
+ {
+ var (provider, _) = BuildProvider(new FakeConfig());
+
+ Action act = () =>
+ {
+ provider.Dispose();
+ // Second Dispose should be a no-op (cts already cancelled, channel already completed).
+ };
+
+ act.Should().NotThrow();
+ }
+
+ #endregion
+}