Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 22 additions & 49 deletions src/Trax.Dashboard/Components/Dialogs/QueueTrainDialog.razor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@
using System.Text.RegularExpressions;
using Microsoft.AspNetCore.Components;
using Radzen;
using Trax.Effect.Configuration.TraxEffectConfiguration;
using Trax.Effect.Data.Services.IDataContextFactory;
using Trax.Effect.Models.WorkQueue;
using Trax.Effect.Models.WorkQueue.DTOs;
using Trax.Effect.Utils;
using Trax.Mediator.Services.TrainDiscovery;
using Trax.Scheduler.Services.Operations;

namespace Trax.Dashboard.Components.Dialogs;

Expand All @@ -18,7 +14,7 @@ public partial class QueueTrainDialog : IDisposable
private readonly CancellationTokenSource _cts = new();

[Inject]
private IDataContextProviderFactory DataContextFactory { get; set; } = default!;
private IOperationsService OperationsService { get; set; } = default!;

[Inject]
private NavigationManager Navigation { get; set; } = default!;
Expand Down Expand Up @@ -70,48 +66,29 @@ private async Task QueueTrain()

try
{
var input =
_selectedTab == 0
? BuildInputFromForm()
: JsonSerializer.Deserialize(
_jsonInput,
Registration.InputType,
TraxEffectConfiguration.StaticSystemJsonSerializerOptions
);

if (input is null)
// The form-builder tab produces strongly typed values; the JSON tab is already
// a JSON string. Either way we end up with a JSON string to hand the shared
// IOperationsService, which performs the actual deserialization + validation.
string? inputJson = _selectedTab == 0 ? BuildInputJsonFromForm() : _jsonInput;

var result = await OperationsService.QueueTrainAsync(
new QueueTrainInput(
TrainName: Registration.ServiceType.FullName!,
InputJson: inputJson,
Priority: _priority
),
_cts.Token
);

if (!result.Success)
{
_error =
$"Deserialization returned null. Ensure the input matches {Registration.InputTypeName}.";
_error = result.Message;
return;
}

var serializedInput = JsonSerializer.Serialize(
input,
Registration.InputType,
TraxJsonSerializationOptions.ManifestProperties
);

var entry = WorkQueue.Create(
new CreateWorkQueue
{
TrainName = Registration.ServiceType.FullName!,
Input = serializedInput,
InputTypeName = Registration.InputType.FullName,
Priority = _priority,
}
);

using var dataContext = await DataContextFactory.CreateDbContextAsync(_cts.Token);
await dataContext.Track(entry);
await dataContext.SaveChanges(_cts.Token);

DialogService.Close();
Navigation.NavigateTo($"trax/data/work-queue/{entry.Id}");
}
catch (JsonException je)
{
_error = $"Invalid JSON: {je.Message}";
if (result.Id is { } id)
Navigation.NavigateTo($"trax/data/work-queue/{id}");
}
catch (Exception ex)
{
Expand All @@ -123,7 +100,7 @@ private async Task QueueTrain()
}
}

private object? BuildInputFromForm()
private string BuildInputJsonFromForm()
{
var jsonObj = new JsonObject();

Expand All @@ -133,11 +110,7 @@ private async Task QueueTrain()
jsonObj[prop.Name] = ToJsonNode(value, prop.PropertyType);
}

return JsonSerializer.Deserialize(
jsonObj.ToJsonString(),
Registration.InputType,
new JsonSerializerOptions { PropertyNameCaseInsensitive = true }
);
return jsonObj.ToJsonString();
}

private static JsonNode? ToJsonNode(object? value, Type targetType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Trax.Effect.Models.Manifest;
using Trax.Effect.Models.ManifestGroup;
using Trax.Effect.Models.Metadata;
using Trax.Scheduler.Services.Operations;
using Trax.Scheduler.Services.TraxScheduler;
using static Trax.Dashboard.Utilities.DashboardFormatters;

Expand All @@ -30,6 +31,9 @@ public partial class ManifestGroupDetailPage
[Inject]
private ITraxScheduler TraxScheduler { get; set; } = default!;

[Inject]
private IOperationsService OperationsService { get; set; } = default!;

[Inject]
private IServiceProvider ServiceProvider { get; set; } = default!;

Expand Down Expand Up @@ -207,93 +211,37 @@ private async Task LoadDependencyGraph(
CancellationToken cancellationToken
)
{
var currentManifestIdsQuery = context
.Manifests.Where(m => m.ManifestGroupId == ManifestGroupId)
.Select(m => m.Id);

if (!await currentManifestIdsQuery.AnyAsync(cancellationToken))
{
_dagLayout = null;
return;
}
// Source the graph from the shared OperationsService so the dashboard's DAG and
// the GraphQL `operations.manifestGroups.graph` query produce identical results.
// The IDataContext parameter is retained for signature compatibility but the
// service opens its own context.
_ = context;

var graph = await OperationsService.GetManifestGroupDependencyGraphAsync(
ManifestGroupId,
cancellationToken
);

// Upstream: groups containing manifests that our manifests depend on
var upstreamGroupIds = await context
.Manifests.AsNoTracking()
.Where(m => m.ManifestGroupId == ManifestGroupId && m.DependsOnManifestId != null)
.Join(
context.Manifests.AsNoTracking(),
dependent => dependent.DependsOnManifestId,
parent => (long?)parent.Id,
(dependent, parent) => parent.ManifestGroupId
)
.Where(parentGroupId => parentGroupId != ManifestGroupId)
.Distinct()
.ToListAsync(cancellationToken);

// Downstream: groups containing manifests that depend on our manifests
var downstreamGroupIds = await context
.Manifests.AsNoTracking()
.Where(m =>
m.DependsOnManifestId != null
&& currentManifestIdsQuery.Contains(m.DependsOnManifestId.Value)
&& m.ManifestGroupId != ManifestGroupId
)
.Select(m => m.ManifestGroupId)
.Distinct()
.ToListAsync(cancellationToken);

var neighborGroupIds = upstreamGroupIds.Union(downstreamGroupIds).ToHashSet();

if (neighborGroupIds.Count == 0)
// Single-node graphs (focal group only, no cross-group dependencies) collapse to
// null here so the UI hides the DAG section entirely, matching the previous
// dashboard behaviour where an isolated group rendered nothing.
if (graph is null || graph.Edges.Count == 0)
{
_dagLayout = null;
return;
}

var allRelevantGroupIds = neighborGroupIds.Append(ManifestGroupId).ToList();

var neighborGroups = await context
.ManifestGroups.AsNoTracking()
.Where(g => allRelevantGroupIds.Contains(g.Id))
.Select(g => new { g.Id, g.Name })
.ToListAsync(cancellationToken);

var dagNodes = neighborGroups
.Select(g => new DagNode
var dagNodes = graph
.Nodes.Select(n => new DagNode
{
Id = g.Id,
Label = g.Name,
IsHighlighted = g.Id == ManifestGroupId,
Id = n.Id,
Label = n.Name,
IsHighlighted = n.IsHighlighted,
})
.ToList();

// Edges between all relevant groups
var crossGroupEdges = await context
.Manifests.AsNoTracking()
.Where(m =>
m.DependsOnManifestId != null && allRelevantGroupIds.Contains(m.ManifestGroupId)
)
.Join(
context.Manifests.AsNoTracking(),
dependent => dependent.DependsOnManifestId,
parent => (long?)parent.Id,
(dependent, parent) =>
new
{
ParentGroupId = parent.ManifestGroupId,
DependentGroupId = dependent.ManifestGroupId,
}
)
.Where(e =>
e.ParentGroupId != e.DependentGroupId
&& allRelevantGroupIds.Contains(e.ParentGroupId)
)
.Distinct()
.ToListAsync(cancellationToken);

var dagEdges = crossGroupEdges
.Select(e => new DagEdge { FromId = e.ParentGroupId, ToId = e.DependentGroupId })
var dagEdges = graph
.Edges.Select(e => new DagEdge { FromId = e.FromId, ToId = e.ToId })
.ToList();

_dagLayout = DagLayoutEngine.ComputeLayout(dagNodes, dagEdges);
Expand All @@ -318,27 +266,46 @@ private async Task SaveSettings()

try
{
using var context = await DataContextFactory.CreateDbContextAsync(DisposalToken);

var entity = await context.ManifestGroups.FindAsync(_group.Id);
if (entity is null)
return;
// Translate the dirty in-memory edits into a patch input for the shared
// service. MaxActiveJobs needs the explicit Clear flag because int? can't
// distinguish "unset" from "set to null" in the patch record.
var maxActiveJobsChanged = _group.MaxActiveJobs != _savedMaxActiveJobs;
var input = new UpdateManifestGroupInput(
MaxActiveJobs: maxActiveJobsChanged ? _group.MaxActiveJobs : null,
ClearMaxActiveJobs: maxActiveJobsChanged && _group.MaxActiveJobs is null,
Priority: _group.Priority != _savedPriority ? _group.Priority : null,
IsEnabled: _group.IsEnabled != _savedIsEnabled ? _group.IsEnabled : null
);

entity.MaxActiveJobs = _group.MaxActiveJobs;
entity.Priority = _group.Priority;
entity.IsEnabled = _group.IsEnabled;
entity.UpdatedAt = DateTime.UtcNow;
var result = await OperationsService.UpdateManifestGroupAsync(
_group.Id,
input,
DisposalToken
);

await context.SaveChanges(DisposalToken);
if (!result.Success)
{
NotificationService.Notify(
new NotificationMessage
{
Severity = NotificationSeverity.Error,
Summary = "Save Failed",
Detail = result.Message ?? "Update failed.",
Duration = 6000,
}
);
return;
}

SnapshotSettings();
// Reload to pick up the bumped UpdatedAt and confirm persistence.
await LoadDataAsync(DisposalToken);

NotificationService.Notify(
new NotificationMessage
{
Severity = NotificationSeverity.Success,
Summary = "Settings Saved",
Detail = $"Group \"{_group.Name}\" settings updated.",
Detail = $"Group \"{_group?.Name}\" settings updated.",
Duration = 4000,
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
using Radzen;
using Trax.Dashboard.Components.Shared;
using Trax.Effect.Data.Services.IDataContextFactory;
using Trax.Effect.Enums;
using Trax.Effect.Models.WorkQueue;
using Trax.Scheduler.Services.Operations;
using static Trax.Dashboard.Utilities.DashboardFormatters;

namespace Trax.Dashboard.Components.Pages.Data;
Expand All @@ -14,6 +14,9 @@ public partial class WorkQueueDetailPage
[Inject]
private IDataContextProviderFactory DataContextFactory { get; set; } = default!;

[Inject]
private IOperationsService OperationsService { get; set; } = default!;

[Inject]
private NavigationManager Navigation { get; set; } = default!;

Expand Down Expand Up @@ -47,30 +50,24 @@ private async Task CancelEntry()

try
{
using var context = await DataContextFactory.CreateDbContextAsync(DisposalToken);
var entry = await context.WorkQueues.FirstOrDefaultAsync(q => q.Id == WorkQueueId);

if (entry is null)
{
_error = "Work queue entry not found.";
return;
}
var result = await OperationsService.CancelWorkQueueEntryAsync(
WorkQueueId,
DisposalToken
);

if (entry.Status != WorkQueueStatus.Queued)
if (!result.Success)
{
_error = $"Cannot cancel entry with status '{entry.Status}'.";
_error = result.Message;
return;
}

entry.Status = WorkQueueStatus.Cancelled;
await context.SaveChanges(DisposalToken);

_entry = entry;
// Reload so the UI reflects the new status without a full page navigation.
await LoadDataAsync(DisposalToken);

NotificationService.Notify(
NotificationSeverity.Success,
"Entry Cancelled",
$"Work queue entry {entry.Id} has been cancelled.",
$"Work queue entry {WorkQueueId} has been cancelled.",
duration: 4000
);
}
Expand Down
Loading
Loading