Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,24 +1512,76 @@ func (a *Agent) EffectiveWorkQuery() string {
if a.PoolName != "" {
target = a.PoolName
}
legacyTarget := legacyWorkflowControlQualifiedName(target)
if legacyTarget == "" {
return `sh -c '` +
// Tier 1: in_progress assigned to any of my identifiers (crash recovery)
`for id in "$GC_SESSION_ID" "$GC_SESSION_NAME" "$GC_ALIAS"; do ` +
`[ -z "$id" ] && continue; ` +
`r=$(bd list --status in_progress --assignee="$id" --json --limit=1 2>/dev/null); ` +
`[ -n "$r" ] && [ "$r" != "[]" ] && printf "%s" "$r" && exit 0; ` +
`done; ` +
// Tier 2: ready assigned to any of my identifiers (pre-assigned)
`for id in "$GC_SESSION_ID" "$GC_SESSION_NAME" "$GC_ALIAS"; do ` +
`[ -z "$id" ] && continue; ` +
`r=$(bd ready --assignee="$id" --json --limit=1 2>/dev/null); ` +
`[ -n "$r" ] && [ "$r" != "[]" ] && printf "%s" "$r" && exit 0; ` +
`done; ` +
// Tier 3: ready unassigned routed to this config (shared routed queue).
// No GC_SESSION_ORIGIN gate here — only control-dispatchers restrict
// demand detection to ephemeral/controller probes (see legacy branch below).
`bd ready --metadata-field gc.routed_to=` + target +
` --unassigned --json --limit=1 2>/dev/null'`
}
return `sh -c '` +
// Tier 1: in_progress assigned to any of my identifiers (crash recovery)
// Tier 1: in_progress assigned to any of my identifiers (crash recovery).
// Built-in control-dispatchers also claim legacy workflow-control names so
// pre-rename workflows keep moving without live metadata rewrites.
`for id in "$GC_SESSION_ID" "$GC_SESSION_NAME" "$GC_ALIAS"; do ` +
`[ -z "$id" ] && continue; ` +
`r=$(bd list --status in_progress --assignee="$id" --json --limit=1 2>/dev/null); ` +
`legacy=""; case "$id" in *control-dispatcher) legacy="${id%control-dispatcher}workflow-control";; esac; ` +
`for cand in "$id" "$legacy"; do ` +
`[ -z "$cand" ] && continue; ` +
`r=$(bd list --status in_progress --assignee="$cand" --json --limit=1 2>/dev/null); ` +
`[ -n "$r" ] && [ "$r" != "[]" ] && printf "%s" "$r" && exit 0; ` +
`done; ` +
`done; ` +
// Tier 2: ready assigned to any of my identifiers (pre-assigned)
`for id in "$GC_SESSION_ID" "$GC_SESSION_NAME" "$GC_ALIAS"; do ` +
`[ -z "$id" ] && continue; ` +
`r=$(bd ready --assignee="$id" --json --limit=1 2>/dev/null); ` +
`legacy=""; case "$id" in *control-dispatcher) legacy="${id%control-dispatcher}workflow-control";; esac; ` +
`for cand in "$id" "$legacy"; do ` +
`[ -z "$cand" ] && continue; ` +
`r=$(bd ready --assignee="$cand" --json --limit=1 2>/dev/null); ` +
`[ -n "$r" ] && [ "$r" != "[]" ] && printf "%s" "$r" && exit 0; ` +
`done; ` +
// Tier 3: ready unassigned routed to this agent (pool queue)
`bd ready --metadata-field gc.routed_to=` + target +
`done; ` +
// Tier 3: ready unassigned routed to this config (shared routed queue),
// then the legacy workflow-control route for pre-rename graphs.
// Demand detection only runs for ephemeral sessions or controller probes.
`case "$GC_SESSION_ORIGIN" in ` +
`ephemeral|"") ;; ` +
`*) exit 0 ;; ` +
`esac; ` +
`r=$(bd ready --metadata-field gc.routed_to=` + target +
` --unassigned --json --limit=1 2>/dev/null); ` +
`[ -n "$r" ] && [ "$r" != "[]" ] && printf "%s" "$r" && exit 0; ` +
`bd ready --metadata-field gc.routed_to=` + legacyTarget +
` --unassigned --json --limit=1 2>/dev/null'`
}

func legacyWorkflowControlQualifiedName(target string) string {
target = strings.TrimSpace(target)
if target == ControlDispatcherAgentName {
return "workflow-control"
}
const suffix = "/" + ControlDispatcherAgentName
if strings.HasSuffix(target, suffix) {
return strings.TrimSuffix(target, suffix) + "/workflow-control"
}
return ""
}

// EffectiveSlingQuery returns the sling query command template for this agent.
// The template uses {} as a placeholder for the bead ID.
// If SlingQuery is set, returns it as-is. Otherwise returns the default:
Expand Down
85 changes: 85 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
Expand Down Expand Up @@ -1341,6 +1342,69 @@ func TestEffectiveWorkQueryPoolNoPoolName(t *testing.T) {
}
}

func TestEffectiveWorkQueryControlDispatcherIncludesLegacyWorkflowControlRoute(t *testing.T) {
a := Agent{Name: ControlDispatcherAgentName, Dir: "gascity"}
got := a.EffectiveWorkQuery()
if !strings.Contains(got, "gc.routed_to=gascity/control-dispatcher") {
t.Fatalf("EffectiveWorkQuery() missing current control-dispatcher route: %q", got)
}
if !strings.Contains(got, "gc.routed_to=gascity/workflow-control") {
t.Fatalf("EffectiveWorkQuery() missing legacy workflow-control route: %q", got)
}
if !strings.Contains(got, `workflow-control`) {
t.Fatalf("EffectiveWorkQuery() missing legacy assignee alias handling: %q", got)
}
}

func TestEffectiveWorkQueryControlDispatcherClaimsLegacyAssignedWork(t *testing.T) {
a := Agent{Name: ControlDispatcherAgentName, Dir: "gascity"}
out := runEffectiveWorkQuery(t, a, map[string]string{
"GC_SESSION_NAME": "gascity--control-dispatcher",
"GC_ALIAS": "gascity/control-dispatcher",
}, `#!/bin/sh
set -eu
case "$*" in
"list --status in_progress --assignee=gascity--control-dispatcher --json --limit=1"|\
"list --status in_progress --assignee=gascity/control-dispatcher --json --limit=1"|\
"list --status in_progress --assignee=gascity--workflow-control --json --limit=1"|\
"list --status in_progress --assignee=gascity/workflow-control --json --limit=1")
printf '[]'
;;
"ready --assignee=gascity--workflow-control --json --limit=1"|\
"ready --assignee=gascity/workflow-control --json --limit=1")
printf '[{"id":"ga-legacy-ready"}]'
;;
*)
printf '[]'
;;
esac
`)
if got, want := strings.TrimSpace(out), `[{"id":"ga-legacy-ready"}]`; got != want {
t.Fatalf("legacy assigned work query output = %q, want %q", got, want)
}
}

func TestEffectiveWorkQueryControlDispatcherClaimsLegacyUnassignedRoute(t *testing.T) {
a := Agent{Name: ControlDispatcherAgentName, Dir: "gascity"}
out := runEffectiveWorkQuery(t, a, nil, `#!/bin/sh
set -eu
case "$*" in
"ready --metadata-field gc.routed_to=gascity/control-dispatcher --unassigned --json --limit=1")
printf '[]'
;;
"ready --metadata-field gc.routed_to=gascity/workflow-control --unassigned --json --limit=1")
printf '[{"id":"ga-legacy-route"}]'
;;
*)
printf '[]'
;;
esac
`)
if got, want := strings.TrimSpace(out), `[{"id":"ga-legacy-route"}]`; got != want {
t.Fatalf("legacy routed work query output = %q, want %q", got, want)
}
}

func TestEffectiveSlingQueryPoolNameOverride(t *testing.T) {
a := Agent{
Name: "dog-1",
Expand Down Expand Up @@ -3050,6 +3114,27 @@ func TestEffectiveMethodsQualifyConsistently(t *testing.T) {
}
}

func runEffectiveWorkQuery(t *testing.T, a Agent, env map[string]string, bdScript string) string {
t.Helper()

tmp := t.TempDir()
bdPath := filepath.Join(tmp, "bd")
if err := os.WriteFile(bdPath, []byte(bdScript), 0o755); err != nil {
t.Fatalf("write fake bd: %v", err)
}

cmd := exec.Command("sh", "-c", a.EffectiveWorkQuery())
cmd.Env = []string{"PATH=" + tmp + ":" + os.Getenv("PATH")}
for k, v := range env {
cmd.Env = append(cmd.Env, k+"="+v)
}
out, err := cmd.Output()
if err != nil {
t.Fatalf("run work query: %v", err)
}
return string(out)
}

// TestEffectiveMethodsAgentRouting verifies that all agents use
// gc.routed_to=<qualified-name> metadata routing.
func TestEffectiveMethodsAgentRouting(t *testing.T) {
Expand Down
41 changes: 36 additions & 5 deletions internal/dispatch/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ func processRetryControl(store beads.Store, bead beads.Bead, opts ProcessOptions
if err := setOutcomeAndClose(store, bead.ID, "pass"); err != nil {
return ControlResult{}, fmt.Errorf("%s: closing passed: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "pass"}, nil
scopeResult, err := reconcileClosedScopeMember(store, bead.ID)
if err != nil {
return ControlResult{}, fmt.Errorf("%s: reconciling enclosing scope: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "pass", Skipped: scopeResult.Skipped}, nil

case "hard":
if err := store.SetMetadataBatch(bead.ID, map[string]string{
Expand All @@ -76,11 +80,24 @@ func processRetryControl(store beads.Store, bead beads.Bead, opts ProcessOptions
if err := setOutcomeAndClose(store, bead.ID, "fail"); err != nil {
return ControlResult{}, fmt.Errorf("%s: closing hard-failed: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "hard-fail"}, nil
scopeResult, err := reconcileClosedScopeMember(store, bead.ID)
if err != nil {
return ControlResult{}, fmt.Errorf("%s: reconciling enclosing scope: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "hard-fail", Skipped: scopeResult.Skipped}, nil

case "transient":
if attemptNum >= maxAttempts {
return handleRetryExhaustion(store, bead.ID, attemptNum, result.Reason, onExhausted)
exhaustedResult, err := handleRetryExhaustion(store, bead.ID, attemptNum, result.Reason, onExhausted)
if err != nil {
return ControlResult{}, err
}
scopeResult, err := reconcileClosedScopeMember(store, bead.ID)
if err != nil {
return ControlResult{}, fmt.Errorf("%s: reconciling enclosing scope: %w", bead.ID, err)
}
exhaustedResult.Skipped += scopeResult.Skipped
return exhaustedResult, nil
}

// Spawn next attempt.
Expand All @@ -92,6 +109,9 @@ func processRetryControl(store beads.Store, bead beads.Bead, opts ProcessOptions
"gc.final_disposition": "controller_error",
})
_ = setOutcomeAndClose(store, bead.ID, "fail")
// Reconcile any enclosing scope so a controller_error terminal
// closure does not leave the scope body stalled.
_, _ = reconcileClosedScopeMember(store, bead.ID)
return ControlResult{}, fmt.Errorf("%s: spawning attempt %d: %w", bead.ID, nextAttempt, err)
}

Expand Down Expand Up @@ -156,7 +176,11 @@ func processRalphControl(store beads.Store, bead beads.Bead, opts ProcessOptions
if err := setOutcomeAndClose(store, bead.ID, "pass"); err != nil {
return ControlResult{}, fmt.Errorf("%s: closing passed: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "pass"}, nil
scopeResult, err := reconcileClosedScopeMember(store, bead.ID)
if err != nil {
return ControlResult{}, fmt.Errorf("%s: reconciling enclosing scope: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "pass", Skipped: scopeResult.Skipped}, nil
}

if iterationNum >= maxAttempts {
Expand All @@ -169,7 +193,11 @@ func processRalphControl(store beads.Store, bead beads.Bead, opts ProcessOptions
if err := setOutcomeAndClose(store, bead.ID, "fail"); err != nil {
return ControlResult{}, fmt.Errorf("%s: closing exhausted: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "fail"}, nil
scopeResult, err := reconcileClosedScopeMember(store, bead.ID)
if err != nil {
return ControlResult{}, fmt.Errorf("%s: reconciling enclosing scope: %w", bead.ID, err)
}
return ControlResult{Processed: true, Action: "fail", Skipped: scopeResult.Skipped}, nil
}

// Spawn next iteration.
Expand All @@ -180,6 +208,9 @@ func processRalphControl(store beads.Store, bead beads.Bead, opts ProcessOptions
"gc.final_disposition": "controller_error",
})
_ = setOutcomeAndClose(store, bead.ID, "fail")
// Reconcile any enclosing scope so a controller_error terminal
// closure does not leave the scope body stalled.
_, _ = reconcileClosedScopeMember(store, bead.ID)
return ControlResult{}, fmt.Errorf("%s: spawning iteration %d: %w", bead.ID, nextIteration, err)
}

Expand Down
Loading
Loading