Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Trax.Samples.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
<Project Path="samples/DataPipeline/Trax.Samples.Flowthru.Spaceflights/Trax.Samples.Flowthru.Spaceflights.csproj" />
<Project Path="samples/DataPipeline/Trax.Samples.Flowthru.Spaceflights.Scheduler/Trax.Samples.Flowthru.Spaceflights.Scheduler.csproj" />
</Folder>
<Folder Name="/samples/EphemeralWorkers/">
<Project Path="samples/EphemeralWorkers/Trax.Samples.ContentShield/Trax.Samples.ContentShield.csproj" />
<Project Path="samples/EphemeralWorkers/Trax.Samples.ContentShield.Api/Trax.Samples.ContentShield.Api.csproj" />
<Project Path="samples/EphemeralWorkers/Trax.Samples.ContentShield.Runner/Trax.Samples.ContentShield.Runner.csproj" />
</Folder>
<Folder Name="/templates/">
<Project Path="templates/Trax.Samples.Templates.csproj" />
</Folder>
Expand Down
13 changes: 13 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,16 @@ services:
- default
volumes:
- ./init-databases.sh:/docker-entrypoint-initdb.d/init-databases.sh

rabbitmq:
container_name: trax_rabbitmq
image: rabbitmq:4-management
restart: always
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports:
- "5672:5672"
- "15672:15672"
networks:
- default
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
using Trax.Scheduler.Configuration;
using Trax.Scheduler.Extensions;
using Trax.Scheduler.Services.Scheduling;
using Trax.Scheduler.Trains.ManifestManager;

var builder = WebApplication.CreateBuilder(args);

Expand Down Expand Up @@ -81,7 +80,7 @@
.SaveTrainParameters()
.AddStepProgress()
)
.AddMediator(typeof(ManifestNames).Assembly, typeof(ManifestManagerTrain).Assembly)
.AddMediator(typeof(ManifestNames).Assembly)
.AddScheduler(scheduler =>
{
scheduler
Expand Down
29 changes: 16 additions & 13 deletions samples/DistributedWorkers/Trax.Samples.EnergyHub.Hub/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
using Trax.Api.Extensions;
using Trax.Api.GraphQL.Extensions;
using Trax.Dashboard.Extensions;
using Trax.Effect.Broadcaster.RabbitMQ.Extensions;
using Trax.Effect.Data.Extensions;
using Trax.Effect.Data.Postgres.Extensions;
using Trax.Effect.Extensions;
Expand All @@ -67,14 +68,17 @@
using Trax.Scheduler.Configuration;
using Trax.Scheduler.Extensions;
using Trax.Scheduler.Services.Scheduling;
using Trax.Scheduler.Trains.ManifestManager;

var builder = WebApplication.CreateBuilder(args);

var connectionString =
builder.Configuration.GetConnectionString("TraxDatabase")
?? throw new InvalidOperationException("Connection string 'TraxDatabase' not found.");

var rabbitMqConnectionString =
builder.Configuration.GetConnectionString("RabbitMQ")
?? throw new InvalidOperationException("Connection string 'RabbitMQ' not found.");

builder.Services.AddLogging(logging => logging.AddConsole());

builder.Services.AddTrax(trax =>
Expand All @@ -85,24 +89,23 @@
.AddJson()
.SaveTrainParameters()
.AddStepProgress()
.UseBroadcaster(b => b.UseRabbitMq(rabbitMqConnectionString))
)
.AddMediator(typeof(ManifestNames).Assembly, typeof(ManifestManagerTrain).Assembly)
.AddMediator(typeof(ManifestNames).Assembly)
.AddScheduler(scheduler =>
{
// ── Key: scheduling only, no local execution ──────────────────
// PostgresJobSubmitter is the default — omit UseLocalWorkers() to schedule without executing locally
// Jobs accumulate until the Worker process picks them up.
scheduler
.AddMetadataCleanup(cleanup =>
{
cleanup.AddTrainType<IMonitorSolarProductionTrain>();
cleanup.AddTrainType<IManageBatteryStorageTrain>();
cleanup.AddTrainType<IProcessChargingSessionTrain>();
cleanup.AddTrainType<IOptimizeMicrogridTrain>();
cleanup.AddTrainType<ITradeGridEnergyTrain>();
cleanup.AddTrainType<IGenerateSustainabilityReportTrain>();
})
.JobDispatcherPollingInterval(TimeSpan.FromSeconds(2));
scheduler.AddMetadataCleanup(cleanup =>
{
cleanup.AddTrainType<IMonitorSolarProductionTrain>();
cleanup.AddTrainType<IManageBatteryStorageTrain>();
cleanup.AddTrainType<IProcessChargingSessionTrain>();
cleanup.AddTrainType<IOptimizeMicrogridTrain>();
cleanup.AddTrainType<ITradeGridEnergyTrain>();
cleanup.AddTrainType<IGenerateSustainabilityReportTrain>();
});

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1. INTERVAL + DEPENDENCY CHAIN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
<PackageReference Include="Trax.Dashboard" Version="1.*" />
<PackageReference Include="Trax.Api" Version="1.*" />
<PackageReference Include="Trax.Api.GraphQL" Version="1.*" />
<PackageReference Include="Trax.Effect.Broadcaster.RabbitMQ" Version="1.*" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"ConnectionStrings": {
"TraxDatabase": "Host=localhost;Port=5432;Database=trax;Username=trax;Password=trax123"
"TraxDatabase": "Host=localhost;Port=5432;Database=trax;Username=trax;Password=trax123",
"RabbitMQ": "amqp://guest:guest@localhost:5672"
},
"Kestrel": {
"Endpoints": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
// so multiple worker instances can run safely without duplicate execution.
// ─────────────────────────────────────────────────────────────────────────────

using Trax.Effect.Broadcaster.RabbitMQ.Extensions;
using Trax.Effect.Data.Postgres.Extensions;
using Trax.Effect.Extensions;
using Trax.Effect.Provider.Json.Extensions;
Expand All @@ -31,24 +32,34 @@
using Trax.Mediator.Extensions;
using Trax.Samples.EnergyHub;
using Trax.Scheduler.Extensions;
using Trax.Scheduler.Trains.ManifestManager;

var builder = WebApplication.CreateBuilder(args);

var connectionString =
builder.Configuration.GetConnectionString("TraxDatabase")
?? throw new InvalidOperationException("Connection string 'TraxDatabase' not found.");

var rabbitMqConnectionString =
builder.Configuration.GetConnectionString("RabbitMQ")
?? throw new InvalidOperationException("Connection string 'RabbitMQ' not found.");

builder.Services.AddLogging(logging => logging.AddConsole());

// ── Register Trax Effect + Mediator (trains, bus, discovery, execution) ──
// The worker must reference the same train assemblies as the scheduler so it
// can resolve and execute any train type that gets dispatched.
// UseBroadcaster() publishes lifecycle events to RabbitMQ so the Hub's
// GraphQL subscriptions are notified when queued trains complete.
builder.Services.AddTrax(trax =>
trax.AddEffects(effects =>
effects.UsePostgres(connectionString).AddJson().SaveTrainParameters().AddStepProgress()
effects
.UsePostgres(connectionString)
.AddJson()
.SaveTrainParameters()
.AddStepProgress()
.UseBroadcaster(b => b.UseRabbitMq(rabbitMqConnectionString))
)
.AddMediator(typeof(ManifestNames).Assembly, typeof(ManifestManagerTrain).Assembly)
.AddMediator(typeof(ManifestNames).Assembly)
);

// ── Register standalone worker ───────────────────────────────────────────
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
<PackageReference Include="Trax.Effect.Provider.Json" Version="1.*" />
<PackageReference Include="Trax.Effect.Provider.Parameter" Version="1.*" />
<PackageReference Include="Trax.Effect.StepProvider.Progress" Version="1.*" />
<PackageReference Include="Trax.Effect.Broadcaster.RabbitMQ" Version="1.*" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"ConnectionStrings": {
"TraxDatabase": "Host=localhost;Port=5432;Database=trax;Username=trax;Password=trax123"
"TraxDatabase": "Host=localhost;Port=5432;Database=trax;Username=trax;Password=trax123",
"RabbitMQ": "amqp://guest:guest@localhost:5672"
},
"Kestrel": {
"Endpoints": {
Expand Down
121 changes: 121 additions & 0 deletions samples/EphemeralWorkers/Trax.Samples.ContentShield.Api/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// ─────────────────────────────────────────────────────────────────────────────
// ContentShield — API (GraphQL + Dashboard, ephemeral HTTP dispatch)
//
// A single process that serves the GraphQL API and hosts the Trax dashboard.
// There are NO scheduled jobs — all work is triggered by GraphQL mutations.
// Queued mutations are dispatched via HTTP to the ephemeral Runner process
// using UseRemoteWorkers(). The Runner simulates a Lambda/serverless function.
//
// This demonstrates the Ephemeral Workers pattern:
// - Query trains (LookupModerationResult) run synchronously on this process
// - Queued trains (ReviewContent, SendViolationNotice, GenerateModerationReport)
// are POSTed to the Runner via HTTP — no background_job table, no DB polling
// - Run mutations (GenerateModerationReport) are also offloaded to the Runner
// via UseRemoteRun() — the API blocks until the Runner returns the output
//
// GraphQL schema (auto-generated from train attributes):
// Queries: lookupModerationResult — [TraxQuery]
// Mutations: queueReviewContent — [TraxMutation(Queue)]
// queueSendViolationNotice — [TraxMutation(Queue)]
// runGenerateModerationReport — [TraxMutation(RunAndQueue)]
// queueGenerateModerationReport — [TraxMutation(RunAndQueue)]
// Subscriptions: onTrainStarted, onTrainCompleted, onTrainFailed
//
// Prerequisites:
// 1. Start Postgres: cd Trax.Samples && docker compose up -d
// 2. Pack local: ./pack-local.sh
// 3. Start runner: dotnet run --project samples/EphemeralWorkers/Trax.Samples.ContentShield.Runner
// 4. Start API: dotnet run --project samples/EphemeralWorkers/Trax.Samples.ContentShield.Api
//
// Endpoints:
// Dashboard: http://localhost:5204/trax
// GraphQL IDE: http://localhost:5204/trax/graphql (Banana Cake Pop)
//
// Try it:
// # Look up a moderation result (runs on API)
// curl -X POST http://localhost:5204/trax/graphql \
// -H "Content-Type: application/json" \
// -d '{"query":"{ discover { lookupModerationResult(input: {contentId: \"test-001\"}) { contentId moderationStatus classification threatScore } } }"}'
//
// # Queue a content review (dispatched to Runner via HTTP)
// curl -X POST http://localhost:5204/trax/graphql \
// -H "Content-Type: application/json" \
// -d '{"query":"mutation { dispatch { queueReviewContent(input: {contentId: \"test-002\", contentType: \"video\", contentBody: \"suspicious video content\"}) { workQueueId externalId } } }"}'
//
// # Generate a moderation report (offloaded to Runner via UseRemoteRun, blocks until done)
// curl -X POST http://localhost:5204/trax/graphql \
// -H "Content-Type: application/json" \
// -d '{"query":"mutation { dispatch { runGenerateModerationReport(input: {reportPeriod: \"Daily\"}) { totalReviewed totalFlagged topViolationTypes falsePositiveRate } } }"}'
// ─────────────────────────────────────────────────────────────────────────────

using Trax.Api.Extensions;
using Trax.Api.GraphQL.Extensions;
using Trax.Dashboard.Extensions;
using Trax.Effect.Broadcaster.RabbitMQ.Extensions;
using Trax.Effect.Data.Extensions;
using Trax.Effect.Data.Postgres.Extensions;
using Trax.Effect.Extensions;
using Trax.Effect.Provider.Json.Extensions;
using Trax.Effect.Provider.Parameter.Extensions;
using Trax.Effect.StepProvider.Progress.Extensions;
using Trax.Mediator.Extensions;
using Trax.Samples.ContentShield.Trains.ContentReview.ReviewContent;
using Trax.Scheduler.Extensions;

var builder = WebApplication.CreateBuilder(args);

var connectionString =
builder.Configuration.GetConnectionString("TraxDatabase")
?? throw new InvalidOperationException("Connection string 'TraxDatabase' not found.");

var rabbitMqConnectionString =
builder.Configuration.GetConnectionString("RabbitMQ")
?? throw new InvalidOperationException("Connection string 'RabbitMQ' not found.");

builder.Services.AddLogging(logging => logging.AddConsole());

builder.Services.AddTrax(trax =>
trax.AddEffects(effects =>
effects
.UsePostgres(connectionString)
.AddDataContextLogging()
.AddJson()
.SaveTrainParameters()
.AddStepProgress()
.UseBroadcaster(b => b.UseRabbitMq(rabbitMqConnectionString))
)
.AddMediator(typeof(ReviewContentTrain).Assembly)
.AddScheduler(scheduler =>
{
// ── Ephemeral dispatch only — no scheduled jobs ──────────────────
// UseRemoteWorkers replaces the default PostgresJobSubmitter with
// HttpJobSubmitter. When a GraphQL queue* mutation is called, the
// JobDispatcher POSTs the job directly to the Runner via HTTP.
// No cron schedules, no intervals, no manifests — purely on-demand.
scheduler.UseRemoteWorkers(remote =>
remote.BaseUrl = "http://localhost:5205/trax/execute"
);

// ── Remote run offloading ────────────────────────────────────────
// UseRemoteRun replaces the default LocalRunExecutor with
// HttpRunExecutor. When a GraphQL run* mutation is called, the
// request is POSTed to the Runner and blocks until the train
// completes. Without this, runs execute in-process on this API.
scheduler.UseRemoteRun(remote => remote.BaseUrl = "http://localhost:5205/trax/run");
})
);

// ── Register GraphQL API ────────────────────────────────────────────────
// Trains annotated with [TraxQuery] or [TraxMutation] get typed GraphQL
// fields auto-generated. [TraxBroadcast] trains emit subscription events.
builder.AddTraxDashboard();
builder.Services.AddTraxGraphQL();
builder.Services.AddHealthChecks().AddTraxHealthCheck();

var app = builder.Build();

app.UseTraxDashboard();
app.UseTraxGraphQL();
app.MapHealthChecks("/trax/health");

app.Run();
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AllowMissingPrunePackageData>true</AllowMissingPrunePackageData>
<RequiresAspNetWebAssets>true</RequiresAspNetWebAssets>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Trax.Samples.ContentShield\Trax.Samples.ContentShield.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Trax.Effect.Provider.Json" Version="1.*" />
<PackageReference Include="Trax.Effect.Provider.Parameter" Version="1.*" />
<PackageReference Include="Trax.Effect.StepProvider.Progress" Version="1.*" />
<PackageReference Include="Trax.Dashboard" Version="1.*" />
<PackageReference Include="Trax.Api" Version="1.*" />
<PackageReference Include="Trax.Api.GraphQL" Version="1.*" />
<PackageReference Include="Trax.Effect.Broadcaster.RabbitMQ" Version="1.*" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"ConnectionStrings": {
"TraxDatabase": "Host=localhost;Port=5432;Database=trax;Username=trax;Password=trax123",
"RabbitMQ": "amqp://guest:guest@localhost:5672"
},
"Kestrel": {
"Endpoints": {
"Http": {
"Url": "http://localhost:5204"
}
}
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
Loading