diff --git a/internal/attractor/engine/codergen_router.go b/internal/attractor/engine/codergen_router.go index 929ec4ff..34412256 100644 --- a/internal/attractor/engine/codergen_router.go +++ b/internal/attractor/engine/codergen_router.go @@ -81,12 +81,14 @@ func (r *CodergenRouter) Run(ctx context.Context, exec *Execution, node *model.N if modelID == "" { return "", nil, fmt.Errorf("missing llm_model on node %s", node.ID) } + selectionSource := "graph_attrs" if exec != nil && exec.Engine != nil { if forcedModelID, forced := forceModelForProvider(exec.Engine.Options.ForceModels, prov); forced { if !strings.EqualFold(modelID, forcedModelID) { warnEngine(exec, fmt.Sprintf("force-model override applied: node=%s provider=%s model=%s (was %s)", node.ID, prov, forcedModelID, modelID)) } modelID = forcedModelID + selectionSource = "force_model" } } backend := r.backendForProvider(prov) @@ -99,6 +101,20 @@ func (r *CodergenRouter) Run(ctx context.Context, exec *Execution, node *model.N if isCLIOnlyModel(modelID) && backend != BackendCLI { warnEngine(exec, fmt.Sprintf("cli-only model override: node=%s model=%s backend=%s->cli", node.ID, modelID, backend)) backend = BackendCLI + if selectionSource == "graph_attrs" { + selectionSource = "cli_only_override" + } + } + + if exec != nil && exec.Engine != nil { + exec.Engine.appendProgress(map[string]any{ + "event": "provider_selected", + "node_id": node.ID, + "provider": prov, + "model": modelID, + "backend": string(backend), + "source": selectionSource, + }) } switch backend { diff --git a/internal/attractor/engine/decision_logging_test.go b/internal/attractor/engine/decision_logging_test.go new file mode 100644 index 00000000..f5e56fd6 --- /dev/null +++ b/internal/attractor/engine/decision_logging_test.go @@ -0,0 +1,214 @@ +// Tests for decision logging: edge condition evaluation, edge selection, +// retry decisions, and provider selection progress events. + +package engine + +import ( + "context" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/danshapiro/kilroy/internal/attractor/runtime" +) + +func TestDecisionLogging_ConditionalRoute(t *testing.T) { + t.Setenv("XDG_STATE_HOME", t.TempDir()) + repo := initTestRepo(t) + logsRoot := t.TempDir() + pinned := writePinnedCatalog(t) + + // Graph with conditional routing: process succeeds, routing through check + // which routes to path_a (outcome=success) and NOT path_b (outcome=fail). + // The unconditional fallback edge satisfies graph validation. + dot := []byte(`digraph conditional_decision_log { + graph [goal="Test decision logging for conditional routing", default_max_retry=0] + start [shape=Mdiamond] + process [shape=parallelogram, tool_command="echo processed"] + path_a [shape=parallelogram, tool_command="echo took_path_a"] + path_b [shape=parallelogram, tool_command="echo took_path_b"] + done [shape=Msquare] + start -> process + process -> path_a [condition="outcome=success"] + process -> path_b [condition="outcome=fail"] + process -> done + path_a -> done + path_b -> done +}`) + cfg := minimalToolGraphConfig(repo, pinned) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + res, err := RunWithConfig(ctx, dot, cfg, RunOptions{ + RunID: "conditional-decision-log", + LogsRoot: logsRoot, + DisableCXDB: true, + }) + if err != nil { + t.Fatalf("RunWithConfig: %v", err) + } + if res.FinalStatus != runtime.FinalSuccess { + t.Fatalf("expected success, got %q", res.FinalStatus) + } + + progressPath := filepath.Join(logsRoot, "progress.ndjson") + events := mustReadProgressEventsFile(t, progressPath) + + // Verify edge_condition_evaluated events exist for process node. + var condEvents []map[string]any + for _, ev := range events { + if anyToString(ev["event"]) == "edge_condition_evaluated" && + anyToString(ev["node_id"]) == "process" { + condEvents = append(condEvents, ev) + } + } + if len(condEvents) < 2 { + t.Fatalf("expected at least 2 edge_condition_evaluated events for process, got %d", len(condEvents)) + } + + // Verify one matched (path_a, outcome=success) and one did not (path_b, outcome=fail). + var matchedCount, unmatchedCount int + for _, ev := range condEvents { + matched, ok := ev["matched"].(bool) + if !ok { + t.Fatalf("edge_condition_evaluated event missing matched field: %v", ev) + } + edgeTo := anyToString(ev["edge_to"]) + condition := anyToString(ev["condition"]) + if matched { + matchedCount++ + if edgeTo != "path_a" { + t.Errorf("matched edge_to: got %q want %q", edgeTo, "path_a") + } + if !strings.Contains(condition, "outcome=success") { + t.Errorf("matched condition: got %q want to contain %q", condition, "outcome=success") + } + } else { + unmatchedCount++ + if edgeTo != "path_b" { + t.Errorf("unmatched edge_to: got %q want %q", edgeTo, "path_b") + } + } + } + if matchedCount != 1 { + t.Errorf("expected 1 matched condition, got %d", matchedCount) + } + if unmatchedCount != 1 { + t.Errorf("expected 1 unmatched condition, got %d", unmatchedCount) + } + + // Verify edge_selected event has selection_method=condition_match. + var edgeSelectedEvents []map[string]any + for _, ev := range events { + if anyToString(ev["event"]) == "edge_selected" && + anyToString(ev["from_node"]) == "process" { + edgeSelectedEvents = append(edgeSelectedEvents, ev) + } + } + if len(edgeSelectedEvents) == 0 { + t.Fatalf("expected edge_selected event for process node, got none") + } + sel := edgeSelectedEvents[0] + if anyToString(sel["to_node"]) != "path_a" { + t.Errorf("edge_selected to_node: got %q want %q", anyToString(sel["to_node"]), "path_a") + } + if anyToString(sel["selection_method"]) != "condition_match" { + t.Errorf("edge_selected selection_method: got %q want %q", anyToString(sel["selection_method"]), "condition_match") + } +} + +func TestDecisionLogging_HillClimber(t *testing.T) { + t.Setenv("XDG_STATE_HOME", t.TempDir()) + repo := initTestRepo(t) + logsRoot := t.TempDir() + pinned := writePinnedCatalog(t) + + // Hill-climber: verify fails until attempt 3, routing back to implement each time. + // The unconditional fallback verify->implement satisfies validation and acts as the + // default route when no condition matches. + dot := []byte(`digraph hill_climber_decision_log { + graph [goal="Test decision logging for hill-climber", default_max_retry=0] + start [shape=Mdiamond] + implement [ + shape=parallelogram, + tool_command="count=$(cat attempt_counter 2>/dev/null || echo 0); count=$((count + 1)); echo $count > attempt_counter; echo implemented_attempt_$count" + ] + verify [ + shape=parallelogram, + tool_command="count=$(cat attempt_counter); if [ $count -ge 3 ]; then echo 'all checks pass'; exit 0; else echo 'checks failed'; exit 1; fi" + ] + done [shape=Msquare] + start -> implement + implement -> verify + verify -> done [condition="outcome=success"] + verify -> implement [condition="outcome=fail"] + verify -> implement +}`) + cfg := minimalToolGraphConfig(repo, pinned) + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + res, err := RunWithConfig(ctx, dot, cfg, RunOptions{ + RunID: "hill-climber-decision-log", + LogsRoot: logsRoot, + DisableCXDB: true, + }) + if err != nil { + t.Fatalf("RunWithConfig: %v", err) + } + if res.FinalStatus != runtime.FinalSuccess { + t.Fatalf("expected success, got %q", res.FinalStatus) + } + + progressPath := filepath.Join(logsRoot, "progress.ndjson") + events := mustReadProgressEventsFile(t, progressPath) + + // Collect edge_condition_evaluated events from verify node. + var verifyCondEvents []map[string]any + for _, ev := range events { + if anyToString(ev["event"]) == "edge_condition_evaluated" && + anyToString(ev["node_id"]) == "verify" { + verifyCondEvents = append(verifyCondEvents, ev) + } + } + // verify runs 3 times: 2 failures routing to implement, 1 success routing to done. + // Each time, both conditions are evaluated (outcome=success, outcome=fail). + if len(verifyCondEvents) < 6 { + t.Fatalf("expected at least 6 edge_condition_evaluated events for verify, got %d", len(verifyCondEvents)) + } + + // Verify the fail->implement edge is taken on early iterations. + var failToImplement int + var successToDone int + for _, ev := range events { + if anyToString(ev["event"]) == "edge_selected" && + anyToString(ev["from_node"]) == "verify" { + to := anyToString(ev["to_node"]) + switch to { + case "implement": + failToImplement++ + case "done": + successToDone++ + } + } + } + if failToImplement < 2 { + t.Errorf("expected at least 2 edge_selected verify->implement, got %d", failToImplement) + } + if successToDone != 1 { + t.Errorf("expected 1 edge_selected verify->done, got %d", successToDone) + } + + // Verify retry_decision events appear for verify node failures. + var retryDecisions []map[string]any + for _, ev := range events { + if anyToString(ev["event"]) == "retry_decision" && + anyToString(ev["node_id"]) == "verify" { + retryDecisions = append(retryDecisions, ev) + } + } + if len(retryDecisions) < 2 { + t.Errorf("expected at least 2 retry_decision events for verify, got %d", len(retryDecisions)) + } +} diff --git a/internal/attractor/engine/engine.go b/internal/attractor/engine/engine.go index 66e3ebfd..66b2bb6c 100644 --- a/internal/attractor/engine/engine.go +++ b/internal/attractor/engine/engine.go @@ -744,7 +744,7 @@ func (e *Engine) runLoop(ctx context.Context, current string, completed []string } // Resolve next hop with fan-in failure policy. - nextHop, err := resolveNextHop(e.Graph, node.ID, out, e.Context, failureClass) + nextHop, err := resolveNextHop(e.Graph, node.ID, out, e.Context, failureClass, e.appendProgress) if err != nil { return nil, err } @@ -806,12 +806,15 @@ func (e *Engine) runLoop(ctx context.Context, current string, completed []string } next := nextHop.Edge e.appendProgress(map[string]any{ - "event": "edge_selected", - "from_node": node.ID, - "to_node": next.To, - "label": next.Label(), - "condition": next.Condition(), - "hop_source": string(nextHop.Source), + "event": "edge_selected", + "from_node": node.ID, + "to_node": next.To, + "label": next.Label(), + "condition": next.Condition(), + "hop_source": string(nextHop.Source), + "selection_method": nextHop.SelectionMeta.Method, + "candidates_evaluated": nextHop.SelectionMeta.CandidatesEvaluated, + "conditions_matched": nextHop.SelectionMeta.ConditionsMatched, }) // loop_restart (attractor-spec §3.2 Step 7): terminate current run, re-launch @@ -1289,6 +1292,22 @@ func (e *Engine) executeWithRetry(ctx context.Context, node *model.Node, retries if canRetry { willRetry = true } + reason := out.FailureReason + if willRetry { + reason = fmt.Sprintf("%s, attempts remaining", out.FailureReason) + } else if attempt >= maxAttempts { + reason = fmt.Sprintf("%s, max retries exhausted", out.FailureReason) + } else { + reason = fmt.Sprintf("%s, failure_class=%s not retryable", out.FailureReason, failureClass) + } + e.appendProgress(map[string]any{ + "event": "retry_decision", + "node_id": node.ID, + "attempt": attempt, + "max_retries": maxRetries, + "will_retry": willRetry, + "reason": reason, + }) // Spec §9.6: emit StageFailed CXDB event. e.cxdbStageFailed(ctx, node, out.FailureReason, willRetry, attempt) if canRetry { @@ -1987,15 +2006,31 @@ func findStartNodeID(g *model.Graph) string { } // selectNextEdge implements attractor-spec edge selection with deterministic tie-breaks (metaspec). -func selectNextEdge(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context) (*model.Edge, error) { - edges, err := selectAllEligibleEdges(g, from, out, ctx) +func selectNextEdge(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context, progress ...ProgressFunc) (*model.Edge, error) { + edge, _, err := selectNextEdgeWithMeta(g, from, out, ctx, progress...) + return edge, err +} + +func selectNextEdgeWithMeta(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context, progress ...ProgressFunc) (*model.Edge, edgeSelectionMeta, error) { + edges, meta, err := selectAllEligibleEdgesWithMeta(g, from, out, ctx, progress...) if err != nil { - return nil, err + return nil, meta, err } if len(edges) == 0 { - return nil, nil + return nil, meta, nil } - return bestEdge(edges), nil + // Refine method: if weight selected from multiple candidates, check for tiebreak. + if meta.Method == "weight" && len(edges) > 1 { + best := bestEdge(edges) + // Check if the winner was determined by lexical tiebreak. + wi := parseInt(edges[0].Attr("weight", "0"), 0) + wj := parseInt(edges[1].Attr("weight", "0"), 0) + if wi == wj { + meta.Method = "lexical_tiebreak" + } + return best, meta, nil + } + return bestEdge(edges), meta, nil } // hasMatchingOutgoingCondition returns true if any outgoing edge from the given @@ -2026,14 +2061,26 @@ func hasMatchingOutgoingCondition(g *model.Graph, nodeID string, out runtime.Out return false } +// edgeSelectionMeta captures how edge selection resolved for decision logging. +type edgeSelectionMeta struct { + Method string // condition_match, preferred_label, suggested_next_ids, weight, only_edge, fallback + CandidatesEvaluated int + ConditionsMatched int +} + // selectAllEligibleEdges returns all edges that are eligible for traversal from the given node. // When multiple edges are returned, the caller should treat this as an implicit fan-out. // Preferred-label and suggested-next-ID narrowing still apply — if they narrow to a single edge, // only that edge is returned (no fan-out). -func selectAllEligibleEdges(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context) ([]*model.Edge, error) { +func selectAllEligibleEdges(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context, progress ...ProgressFunc) ([]*model.Edge, error) { + edges, _, err := selectAllEligibleEdgesWithMeta(g, from, out, ctx, progress...) + return edges, err +} + +func selectAllEligibleEdgesWithMeta(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context, progress ...ProgressFunc) ([]*model.Edge, edgeSelectionMeta, error) { rawEdges := g.Outgoing(from) if len(rawEdges) == 0 { - return nil, nil + return nil, edgeSelectionMeta{}, nil } // Filter nil edges once for use in all subsequent steps. @@ -2044,26 +2091,47 @@ func selectAllEligibleEdges(g *model.Graph, from string, out runtime.Outcome, ct } } if len(edges) == 0 { - return nil, nil + return nil, edgeSelectionMeta{}, nil + } + + meta := edgeSelectionMeta{CandidatesEvaluated: len(edges)} + + // Resolve optional progress callback. + var emit ProgressFunc + if len(progress) > 0 && progress[0] != nil { + emit = progress[0] } // Step 1: Eligible conditional edges. var condMatched []*model.Edge + conditionsEvaluated := 0 for _, e := range edges { c := strings.TrimSpace(e.Condition()) if c == "" { continue } + conditionsEvaluated++ ok, err := cond.Evaluate(c, out, ctx) if err != nil { - return nil, err + return nil, edgeSelectionMeta{}, err + } + if emit != nil { + emit(map[string]any{ + "event": "edge_condition_evaluated", + "node_id": from, + "edge_to": e.To, + "condition": c, + "matched": ok, + }) } if ok { condMatched = append(condMatched, e) } } if len(condMatched) > 0 { - return condMatched, nil + meta.Method = "condition_match" + meta.ConditionsMatched = len(condMatched) + return condMatched, meta, nil } // Step 2: Preferred label match narrows to one. @@ -2075,7 +2143,8 @@ func selectAllEligibleEdges(g *model.Graph, from string, out runtime.Outcome, ct sort.SliceStable(sorted, func(i, j int) bool { return sorted[i].Order < sorted[j].Order }) for _, e := range sorted { if normalizeLabel(e.Label()) == want { - return []*model.Edge{e}, nil + meta.Method = "preferred_label" + return []*model.Edge{e}, meta, nil } } } @@ -2089,7 +2158,8 @@ func selectAllEligibleEdges(g *model.Graph, from string, out runtime.Outcome, ct for _, suggested := range out.SuggestedNextIDs { for _, e := range sorted { if e.To == suggested { - return []*model.Edge{e}, nil + meta.Method = "suggested_next_ids" + return []*model.Edge{e}, meta, nil } } } @@ -2103,7 +2173,12 @@ func selectAllEligibleEdges(g *model.Graph, from string, out runtime.Outcome, ct } } if len(uncond) > 0 { - return uncond, nil + if len(uncond) == 1 { + meta.Method = "only_edge" + } else { + meta.Method = "weight" + } + return uncond, meta, nil } // Fallback: any edge (spec §3.3). All edges have conditions and none @@ -2116,7 +2191,8 @@ func selectAllEligibleEdges(g *model.Graph, from string, out runtime.Outcome, ct // all_conditional_edges ERROR promotion). fmt.Fprintf(os.Stderr, `{"event":"step5_all_conditional_fallback","node":%q,"edges_considered":%d,"outcome":%q}`+"\n", from, len(edges), string(out.Status)) - return edges, nil + meta.Method = "fallback" + return edges, meta, nil } func bestEdge(edges []*model.Edge) *model.Edge { diff --git a/internal/attractor/engine/next_hop.go b/internal/attractor/engine/next_hop.go index 8efc87d3..e3ec9010 100644 --- a/internal/attractor/engine/next_hop.go +++ b/internal/attractor/engine/next_hop.go @@ -8,6 +8,10 @@ import ( "github.com/danshapiro/kilroy/internal/attractor/runtime" ) +// ProgressFunc is an optional callback for emitting structured progress events. +// Routing functions accept this to log decisions without depending on the engine. +type ProgressFunc func(map[string]any) + type nextHopSource string const ( @@ -20,9 +24,10 @@ type resolvedNextHop struct { Edge *model.Edge Source nextHopSource RetryTargetSource string + SelectionMeta edgeSelectionMeta } -func resolveNextHop(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context, failureClass string) (*resolvedNextHop, error) { +func resolveNextHop(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context, failureClass string, progress ProgressFunc) (*resolvedNextHop, error) { if g == nil { return nil, nil } @@ -32,14 +37,15 @@ func resolveNextHop(g *model.Graph, from string, out runtime.Outcome, ctx *runti } if isFanInFailureLike(g, from, out.Status) { - conditional, err := selectMatchingConditionalEdge(g, from, out, ctx) + conditional, condMeta, err := selectMatchingConditionalEdge(g, from, out, ctx, progress) if err != nil { return nil, err } if conditional != nil { return &resolvedNextHop{ - Edge: conditional, - Source: nextHopSourceConditional, + Edge: conditional, + Source: nextHopSourceConditional, + SelectionMeta: condMeta, }, nil } @@ -66,7 +72,7 @@ func resolveNextHop(g *model.Graph, from string, out runtime.Outcome, ctx *runti return nil, nil } - next, err := selectNextEdge(g, from, out, ctx) + next, meta, err := selectNextEdgeWithMeta(g, from, out, ctx, progress) if err != nil { return nil, err } @@ -74,16 +80,19 @@ func resolveNextHop(g *model.Graph, from string, out runtime.Outcome, ctx *runti return nil, nil } return &resolvedNextHop{ - Edge: next, - Source: nextHopSourceEdgeSelection, + Edge: next, + Source: nextHopSourceEdgeSelection, + SelectionMeta: meta, }, nil } -func selectMatchingConditionalEdge(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context) (*model.Edge, error) { +func selectMatchingConditionalEdge(g *model.Graph, from string, out runtime.Outcome, ctx *runtime.Context, progress ProgressFunc) (*model.Edge, edgeSelectionMeta, error) { edges := g.Outgoing(from) + meta := edgeSelectionMeta{} if len(edges) == 0 { - return nil, nil + return nil, meta, nil } + meta.CandidatesEvaluated = len(edges) var condMatched []*model.Edge for _, e := range edges { if e == nil { @@ -95,16 +104,27 @@ func selectMatchingConditionalEdge(g *model.Graph, from string, out runtime.Outc } ok, err := cond.Evaluate(c, out, ctx) if err != nil { - return nil, err + return nil, meta, err + } + if progress != nil { + progress(map[string]any{ + "event": "edge_condition_evaluated", + "node_id": from, + "edge_to": e.To, + "condition": c, + "matched": ok, + }) } if ok { condMatched = append(condMatched, e) } } if len(condMatched) == 0 { - return nil, nil + return nil, meta, nil } - return bestEdge(condMatched), nil + meta.Method = "condition_match" + meta.ConditionsMatched = len(condMatched) + return bestEdge(condMatched), meta, nil } func resolveRetryTargetWithSource(g *model.Graph, nodeID string) (target string, source string) { diff --git a/internal/attractor/engine/next_hop_test.go b/internal/attractor/engine/next_hop_test.go index e3817184..4683310d 100644 --- a/internal/attractor/engine/next_hop_test.go +++ b/internal/attractor/engine/next_hop_test.go @@ -23,7 +23,7 @@ digraph G { hop, err := resolveNextHop(g, "join", runtime.Outcome{ Status: runtime.StatusFail, FailureReason: "all parallel branches failed", - }, runtime.NewContext(), "") + }, runtime.NewContext(), "", nil) if err != nil { t.Fatalf("resolveNextHop: %v", err) } @@ -50,7 +50,7 @@ digraph G { hop, err := resolveNextHop(g, "join", runtime.Outcome{ Status: runtime.StatusFail, FailureReason: "all parallel branches failed", - }, runtime.NewContext(), failureClassTransientInfra) + }, runtime.NewContext(), failureClassTransientInfra, nil) if err != nil { t.Fatalf("resolveNextHop: %v", err) } @@ -86,7 +86,7 @@ digraph G { hop, err := resolveNextHop(g, "join", runtime.Outcome{ Status: runtime.StatusFail, FailureReason: "all parallel branches failed", - }, runtime.NewContext(), "") + }, runtime.NewContext(), "", nil) if err != nil { t.Fatalf("resolveNextHop: %v", err) } @@ -119,7 +119,7 @@ digraph G { hop, err := resolveNextHop(g, "join", runtime.Outcome{ Status: runtime.StatusFail, FailureReason: "all parallel branches failed", - }, runtime.NewContext(), failureClassDeterministic) + }, runtime.NewContext(), failureClassDeterministic, nil) if err != nil { t.Fatalf("resolveNextHop: %v", err) } @@ -146,7 +146,7 @@ digraph G { hop, err := resolveNextHop(g, "join", runtime.Outcome{ Status: runtime.StatusFail, FailureReason: "upstream timeout", - }, runtime.NewContext(), failureClassTransientInfra) + }, runtime.NewContext(), failureClassTransientInfra, nil) if err != nil { t.Fatalf("resolveNextHop: %v", err) } @@ -287,7 +287,7 @@ digraph G { hop, err := resolveNextHop(g, "join", runtime.Outcome{ Status: runtime.StatusFail, FailureReason: "all parallel branches failed", - }, runtime.NewContext(), failureClassDeterministic) + }, runtime.NewContext(), failureClassDeterministic, nil) if err != nil { t.Fatalf("resolveNextHop: %v", err) } @@ -317,7 +317,7 @@ digraph G { if err != nil { t.Fatalf("selectNextEdge: %v", err) } - got, err := resolveNextHop(g, "a", out, ctx, "") + got, err := resolveNextHop(g, "a", out, ctx, "", nil) if err != nil { t.Fatalf("resolveNextHop: %v", err) } diff --git a/internal/attractor/engine/resume.go b/internal/attractor/engine/resume.go index eed7708f..3bcc003f 100644 --- a/internal/attractor/engine/resume.go +++ b/internal/attractor/engine/resume.go @@ -366,7 +366,7 @@ func resumeFromLogsRoot(ctx context.Context, logsRoot string, ov ResumeOverrides } // Implicit fan-out: mirror forward-path logic for multi-edge convergence. - allEdges, edgeErr := selectAllEligibleEdges(eng.Graph, lastNodeID, lastOutcome, eng.Context) + allEdges, edgeErr := selectAllEligibleEdges(eng.Graph, lastNodeID, lastOutcome, eng.Context, eng.appendProgress) if edgeErr != nil { return nil, edgeErr } @@ -414,7 +414,7 @@ func resumeFromLogsRoot(ctx context.Context, logsRoot string, ov ResumeOverrides } } - nextHop, err := resolveNextHop(eng.Graph, lastNodeID, lastOutcome, eng.Context, classifyFailureClass(lastOutcome)) + nextHop, err := resolveNextHop(eng.Graph, lastNodeID, lastOutcome, eng.Context, classifyFailureClass(lastOutcome), eng.appendProgress) if err != nil { return nil, err } diff --git a/internal/attractor/engine/subgraph.go b/internal/attractor/engine/subgraph.go index 007117dd..452d512d 100644 --- a/internal/attractor/engine/subgraph.go +++ b/internal/attractor/engine/subgraph.go @@ -181,7 +181,7 @@ func runSubgraphUntil(ctx context.Context, eng *Engine, startNodeID, stopNodeID return canceledReturn(node.ID, lastOutcome, err) } - next, err := selectNextEdge(eng.Graph, node.ID, out, eng.Context) + next, err := selectNextEdge(eng.Graph, node.ID, out, eng.Context, eng.appendProgress) if err != nil { return parallelBranchResult{}, err }