Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dotnet new trax-server -n MyApp --ConnectionString "Host=db.example.com;Port=543

The template creates an ASP.NET Core project with:

- `AddTrax` configured with `AddEffects` (Postgres, step logging, step progress), `AddMediator`, and `AddScheduler`
- `AddTrax` configured with `AddEffects` (Postgres, junction logging, junction progress), `AddMediator`, and `AddScheduler`
- `AddTraxDashboard` for the control room
- `AddScheduler` with a sample `HelloWorldTrain` departing every 20 seconds
- A `Trains/` directory with an example train, cargo type, interface, and stop
Expand Down Expand Up @@ -51,7 +51,7 @@ The sample includes:
- **HelloWorldTrain** — a simple scheduled train that logs a greeting every 20 seconds
- **ExtractImportTrain** — a multi-stop ETL train with 10 parallel manifests departing every 5 minutes
- **TransformLoadTrain** — a connected departure that runs after extract arrives
- **DataQualityCheckTrain** — a dormant train waiting in the yard, activated from a step when anomalies are detected
- **DataQualityCheckTrain** — a dormant train waiting in the yard, activated from a junction when anomalies are detected
- Journey log cleanup configuration for the HelloWorld train

### Flowthru Spaceflights
Expand All @@ -62,7 +62,7 @@ A data pipeline sample using the [Flowthru](https://github.com/chaoticgoodcomput

| Package | Purpose |
|---------|---------|
| [Trax.Core](https://www.nuget.org/packages/Trax.Core/) | The locomotive — `Train`, steps, railway programming |
| [Trax.Core](https://www.nuget.org/packages/Trax.Core/) | The locomotive — `Train`, junctions, railway programming |
| [Trax.Effect](https://www.nuget.org/packages/Trax.Effect/) | `ServiceTrain` with journey logging and station services |
| [Trax.Mediator](https://www.nuget.org/packages/Trax.Mediator/) | Dispatch station — route cargo to the right train via `TrainBus` |
| [Trax.Scheduler](https://www.nuget.org/packages/Trax.Scheduler/) | Timetables — recurring trains with retries and dead-lettering |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using LanguageExt;
using Trax.Effect.Attributes;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.ChatService.Trains.CreateChatRoom.Steps;
using Trax.Samples.ChatService.Trains.CreateChatRoom.Junctions;

namespace Trax.Samples.ChatService.Trains.CreateChatRoom;

Expand All @@ -13,5 +13,5 @@ public class CreateChatRoomTrain
{
protected override async Task<Either<Exception, CreateChatRoomOutput>> RunInternal(
CreateChatRoomInput input
) => Activate(input).Chain<ValidateInputStep>().Chain<PersistRoomStep>().Resolve();
) => Activate(input).Chain<ValidateInputJunction>().Chain<PersistRoomJunction>().Resolve();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;
using Trax.Samples.ChatService.Data.Entities;

namespace Trax.Samples.ChatService.Trains.CreateChatRoom.Steps;
namespace Trax.Samples.ChatService.Trains.CreateChatRoom.Junctions;

public class PersistRoomStep(ChatDbContext db, ILogger<PersistRoomStep> logger)
: Step<CreateChatRoomInput, CreateChatRoomOutput>
public class PersistRoomJunction(ChatDbContext db, ILogger<PersistRoomJunction> logger)
: Junction<CreateChatRoomInput, CreateChatRoomOutput>
{
public override async Task<CreateChatRoomOutput> Run(CreateChatRoomInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;

namespace Trax.Samples.ChatService.Trains.CreateChatRoom.Steps;
namespace Trax.Samples.ChatService.Trains.CreateChatRoom.Junctions;

public class ValidateInputStep(ILogger<ValidateInputStep> logger)
: Step<CreateChatRoomInput, CreateChatRoomInput>
public class ValidateInputJunction(ILogger<ValidateInputJunction> logger)
: Junction<CreateChatRoomInput, CreateChatRoomInput>
{
public override Task<CreateChatRoomInput> Run(CreateChatRoomInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using LanguageExt;
using Trax.Effect.Attributes;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.ChatService.Trains.GetChatHistory.Steps;
using Trax.Samples.ChatService.Trains.GetChatHistory.Junctions;

namespace Trax.Samples.ChatService.Trains.GetChatHistory;

Expand All @@ -12,5 +12,5 @@ public class GetChatHistoryTrain
{
protected override async Task<Either<Exception, GetChatHistoryOutput>> RunInternal(
GetChatHistoryInput input
) => Activate(input).Chain<FetchMessagesStep>().Resolve();
) => Activate(input).Chain<FetchMessagesJunction>().Resolve();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;

namespace Trax.Samples.ChatService.Trains.GetChatHistory.Steps;
namespace Trax.Samples.ChatService.Trains.GetChatHistory.Junctions;

public class FetchMessagesStep(ChatDbContext db, ILogger<FetchMessagesStep> logger)
: Step<GetChatHistoryInput, GetChatHistoryOutput>
public class FetchMessagesJunction(ChatDbContext db, ILogger<FetchMessagesJunction> logger)
: Junction<GetChatHistoryInput, GetChatHistoryOutput>
{
public override async Task<GetChatHistoryOutput> Run(GetChatHistoryInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using LanguageExt;
using Trax.Effect.Attributes;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.ChatService.Trains.GetChatRooms.Steps;
using Trax.Samples.ChatService.Trains.GetChatRooms.Junctions;

namespace Trax.Samples.ChatService.Trains.GetChatRooms;

Expand All @@ -12,5 +12,5 @@ public class GetChatRoomsTrain
{
protected override async Task<Either<Exception, GetChatRoomsOutput>> RunInternal(
GetChatRoomsInput input
) => Activate(input).Chain<FetchRoomsStep>().Resolve();
) => Activate(input).Chain<FetchRoomsJunction>().Resolve();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;

namespace Trax.Samples.ChatService.Trains.GetChatRooms.Steps;
namespace Trax.Samples.ChatService.Trains.GetChatRooms.Junctions;

public class FetchRoomsStep(ChatDbContext db, ILogger<FetchRoomsStep> logger)
: Step<GetChatRoomsInput, GetChatRoomsOutput>
public class FetchRoomsJunction(ChatDbContext db, ILogger<FetchRoomsJunction> logger)
: Junction<GetChatRoomsInput, GetChatRoomsOutput>
{
public override async Task<GetChatRoomsOutput> Run(GetChatRoomsInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using LanguageExt;
using Trax.Effect.Attributes;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.ChatService.Trains.JoinChatRoom.Steps;
using Trax.Samples.ChatService.Trains.JoinChatRoom.Junctions;

namespace Trax.Samples.ChatService.Trains.JoinChatRoom;

Expand All @@ -13,5 +13,5 @@ public class JoinChatRoomTrain
{
protected override async Task<Either<Exception, JoinChatRoomOutput>> RunInternal(
JoinChatRoomInput input
) => Activate(input).Chain<ValidateJoinStep>().Chain<AddParticipantStep>().Resolve();
) => Activate(input).Chain<ValidateJoinJunction>().Chain<AddParticipantJunction>().Resolve();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;
using Trax.Samples.ChatService.Data.Entities;

namespace Trax.Samples.ChatService.Trains.JoinChatRoom.Steps;
namespace Trax.Samples.ChatService.Trains.JoinChatRoom.Junctions;

public class AddParticipantStep(ChatDbContext db, ILogger<AddParticipantStep> logger)
: Step<JoinChatRoomInput, JoinChatRoomOutput>
public class AddParticipantJunction(ChatDbContext db, ILogger<AddParticipantJunction> logger)
: Junction<JoinChatRoomInput, JoinChatRoomOutput>
{
public override async Task<JoinChatRoomOutput> Run(JoinChatRoomInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;

namespace Trax.Samples.ChatService.Trains.JoinChatRoom.Steps;
namespace Trax.Samples.ChatService.Trains.JoinChatRoom.Junctions;

public class ValidateJoinStep(ChatDbContext db, ILogger<ValidateJoinStep> logger)
: Step<JoinChatRoomInput, JoinChatRoomInput>
public class ValidateJoinJunction(ChatDbContext db, ILogger<ValidateJoinJunction> logger)
: Junction<JoinChatRoomInput, JoinChatRoomInput>
{
public override async Task<JoinChatRoomInput> Run(JoinChatRoomInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using LanguageExt;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;

namespace Trax.Samples.ChatService.Trains.MarkChatAsRead.Steps;
namespace Trax.Samples.ChatService.Trains.MarkChatAsRead.Junctions;

public class UpdateLastReadStep(ChatDbContext db, ILogger<UpdateLastReadStep> logger)
: Step<MarkChatAsReadInput, Unit>
public class UpdateLastReadJunction(ChatDbContext db, ILogger<UpdateLastReadJunction> logger)
: Junction<MarkChatAsReadInput, Unit>
{
public override async Task<Unit> Run(MarkChatAsReadInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using LanguageExt;
using Trax.Effect.Attributes;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.ChatService.Trains.MarkChatAsRead.Steps;
using Trax.Samples.ChatService.Trains.MarkChatAsRead.Junctions;

namespace Trax.Samples.ChatService.Trains.MarkChatAsRead;

[TraxMutation(Description = "Marks a chat room as read for a user")]
public class MarkChatAsReadTrain : ServiceTrain<MarkChatAsReadInput, Unit>, IMarkChatAsReadTrain
{
protected override async Task<Either<Exception, Unit>> RunInternal(MarkChatAsReadInput input) =>
Activate(input).Chain<UpdateLastReadStep>().Resolve();
Activate(input).Chain<UpdateLastReadJunction>().Resolve();
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;
using Trax.Samples.ChatService.Data.Entities;

namespace Trax.Samples.ChatService.Trains.SendMessage.Steps;
namespace Trax.Samples.ChatService.Trains.SendMessage.Junctions;

public class PersistMessageStep(ChatDbContext db, ILogger<PersistMessageStep> logger)
: Step<SendMessageInput, SendMessageOutput>
public class PersistMessageJunction(ChatDbContext db, ILogger<PersistMessageJunction> logger)
: Junction<SendMessageInput, SendMessageOutput>
{
public override async Task<SendMessageOutput> Run(SendMessageInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;
using Trax.Samples.ChatService.Data;

namespace Trax.Samples.ChatService.Trains.SendMessage.Steps;
namespace Trax.Samples.ChatService.Trains.SendMessage.Junctions;

public class ValidateSenderStep(ChatDbContext db, ILogger<ValidateSenderStep> logger)
: Step<SendMessageInput, SendMessageInput>
public class ValidateSenderJunction(ChatDbContext db, ILogger<ValidateSenderJunction> logger)
: Junction<SendMessageInput, SendMessageInput>
{
public override async Task<SendMessageInput> Run(SendMessageInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using LanguageExt;
using Trax.Effect.Attributes;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.ChatService.Trains.SendMessage.Steps;
using Trax.Samples.ChatService.Trains.SendMessage.Junctions;

namespace Trax.Samples.ChatService.Trains.SendMessage;

Expand All @@ -11,5 +11,5 @@ public class SendMessageTrain : ServiceTrain<SendMessageInput, SendMessageOutput
{
protected override async Task<Either<Exception, SendMessageOutput>> RunInternal(
SendMessageInput input
) => Activate(input).Chain<ValidateSenderStep>().Chain<PersistMessageStep>().Resolve();
) => Activate(input).Chain<ValidateSenderJunction>().Chain<PersistMessageJunction>().Resolve();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
using Trax.Effect.Data.Extensions;
using Trax.Effect.Data.Postgres.Extensions;
using Trax.Effect.Extensions;
using Trax.Effect.JunctionProvider.Progress.Extensions;
using Trax.Effect.Provider.Json.Extensions;
using Trax.Effect.Provider.Parameter.Extensions;
using Trax.Effect.StepProvider.Progress.Extensions;
using Trax.Mediator.Extensions;
using Trax.Samples.Flowthru.Spaceflights;
using Trax.Samples.Flowthru.Spaceflights.Trains.DataProcessing;
Expand Down Expand Up @@ -78,7 +78,7 @@
.AddDataContextLogging()
.AddJson()
.SaveTrainParameters()
.AddStepProgress()
.AddJunctionProgress()
)
.AddMediator(typeof(ManifestNames).Assembly)
.AddScheduler(scheduler =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PackageReference Include="Trax.Effect.Data.InMemory" Version="1.*" />
<PackageReference Include="Trax.Effect.Data" Version="1.*" />
<PackageReference Include="Trax.Effect.Provider.Parameter" Version="1.*" />
<PackageReference Include="Trax.Effect.StepProvider.Progress" Version="1.*" />
<PackageReference Include="Trax.Effect.JunctionProvider.Progress" Version="1.*" />
<PackageReference Include="Trax.Dashboard" Version="1.*" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using LanguageExt;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.Flowthru.Spaceflights.Trains.DataProcessing.Steps;
using Trax.Samples.Flowthru.Spaceflights.Trains.DataProcessing.Junctions;

namespace Trax.Samples.Flowthru.Spaceflights.Trains.DataProcessing;

Expand All @@ -14,5 +14,5 @@ public class DataProcessingTrain
{
protected override async Task<Either<Exception, Unit>> RunInternal(
DataProcessingPipelineInput input
) => Activate(input).Chain<ExecuteDataProcessingStep>().Resolve();
) => Activate(input).Chain<ExecuteDataProcessingJunction>().Resolve();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
using Flowthru.Services;
using LanguageExt;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;

namespace Trax.Samples.Flowthru.Spaceflights.Trains.DataProcessing.Steps;
namespace Trax.Samples.Flowthru.Spaceflights.Trains.DataProcessing.Junctions;

/// <summary>
/// Executes the flowthru DataProcessing pipeline.
/// Preprocesses companies.csv, reviews.csv, and shuttles.xlsx into a ModelInputTable.
///
/// Pipeline logic by @Spelkington — https://github.com/chaoticgoodcomputing/flowthru
/// </summary>
public class ExecuteDataProcessingStep(
public class ExecuteDataProcessingJunction(
IFlowthruService flowthruService,
ILogger<ExecuteDataProcessingStep> logger
) : Step<DataProcessingPipelineInput, Unit>
ILogger<ExecuteDataProcessingJunction> logger
) : Junction<DataProcessingPipelineInput, Unit>
{
public override async Task<Unit> Run(DataProcessingPipelineInput input)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using LanguageExt;
using Trax.Effect.Services.ServiceTrain;
using Trax.Samples.Flowthru.Spaceflights.Trains.DataScience.Steps;
using Trax.Samples.Flowthru.Spaceflights.Trains.DataScience.Junctions;

namespace Trax.Samples.Flowthru.Spaceflights.Trains.DataScience;

Expand All @@ -12,5 +12,5 @@ public class DataScienceTrain : ServiceTrain<DataSciencePipelineInput, Unit>, ID
{
protected override async Task<Either<Exception, Unit>> RunInternal(
DataSciencePipelineInput input
) => Activate(input).Chain<ExecuteDataScienceStep>().Resolve();
) => Activate(input).Chain<ExecuteDataScienceJunction>().Resolve();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
using Flowthru.Services;
using LanguageExt;
using Microsoft.Extensions.Logging;
using Trax.Core.Step;
using Trax.Core.Junction;

namespace Trax.Samples.Flowthru.Spaceflights.Trains.DataScience.Steps;
namespace Trax.Samples.Flowthru.Spaceflights.Trains.DataScience.Junctions;

/// <summary>
/// Executes the flowthru DataScience pipeline.
/// Splits data, trains a linear regression model, and evaluates predictions.
///
/// Pipeline logic by @Spelkington — https://github.com/chaoticgoodcomputing/flowthru
/// </summary>
public class ExecuteDataScienceStep(
public class ExecuteDataScienceJunction(
IFlowthruService flowthruService,
ILogger<ExecuteDataScienceStep> logger
) : Step<DataSciencePipelineInput, Unit>
ILogger<ExecuteDataScienceJunction> logger
) : Junction<DataSciencePipelineInput, Unit>
{
public override async Task<Unit> Run(DataSciencePipelineInput input)
{
Expand Down
Loading
Loading