Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
339845b
engine/runtime: scrub inherited stage contract env
vadimcomanescu Mar 1, 2026
60dcc72
engine: harden fallback status ingestion diagnostics
vadimcomanescu Mar 4, 2026
8bd2752
engine/runtime: harden run ownership locking
vadimcomanescu Mar 1, 2026
b671b6b
engine: fix fallback retry latency and payload classification
vadimcomanescu Mar 4, 2026
c194cfd
fix(engine): preserve unreadable run ownership locks
vadimcomanescu Mar 4, 2026
ae6331a
attractor: emit AssistantMessage turns for API session events
brianfeucht Mar 5, 2026
44549fc
attractor: add CXDB regression test for API assistant turns
brianfeucht Mar 5, 2026
db0b237
feat(cli): add --validate alias for --preflight in attractor run
mvanhorn Mar 10, 2026
a761b4d
fix(engine): emit heartbeat on interval regardless of output growth
mvanhorn Mar 10, 2026
9ef9f30
fix(engine): tighten input reference scanning to reduce false matches
mvanhorn Mar 10, 2026
d4fc02b
fix(engine): add degraded_success status and verification reporting
mvanhorn Mar 10, 2026
fb87530
style: run gofmt on cli_only_models_test.go
mvanhorn Mar 10, 2026
f4f7727
style: run gofmt on cli_only_models_test.go
mvanhorn Mar 10, 2026
2910d01
style: run gofmt on cli_only_models_test.go
mvanhorn Mar 10, 2026
24a181b
style: run gofmt on cli_only_models_test.go
mvanhorn Mar 10, 2026
e985dd0
fix(ci): add gpt-5.3-codex-spark to CLI-only models and scope demo .a…
mvanhorn Mar 10, 2026
30d884f
fix(ci): add gpt-5.3-codex-spark to CLI-only models and scope demo .a…
mvanhorn Mar 10, 2026
8f65fba
fix(ci): add gpt-5.3-codex-spark to CLI-only models and scope demo .a…
mvanhorn Mar 10, 2026
42c5cb9
fix(ci): add gpt-5.3-codex-spark to CLI-only models and scope demo .a…
mvanhorn Mar 10, 2026
0c84151
fix(ci): gofmt cli_only_models_test and run-scope demo dot paths
leegonzales Mar 18, 2026
e40947d
Merge remote-tracking branch 'leegonzales/fix/ci-green-main-gofmt-and…
mattleaverton Mar 31, 2026
4e49425
Merge remote-tracking branch 'brianfeucht/fix/cxdb-assistant-message-…
mattleaverton Mar 31, 2026
20867d6
Merge remote-tracking branch 'mvanhorn/osc/47-validate-preflight-mode…
mattleaverton Mar 31, 2026
effc576
Merge remote-tracking branch 'mvanhorn/osc/48-input-materialization-o…
mattleaverton Mar 31, 2026
6591e28
Merge remote-tracking branch 'mvanhorn/osc/51-heartbeat-output-gated'…
mattleaverton Mar 31, 2026
e7eabaf
Merge remote-tracking branch 'mvanhorn/osc/50-verification-false-gree…
mattleaverton Mar 31, 2026
6cdc6ce
Merge remote-tracking branch 'vadimcomanescu/upstream/fix/status-inge…
mattleaverton Mar 31, 2026
c4ee10b
Merge remote-tracking branch 'vadimcomanescu/upstream/fix/runtime-run…
mattleaverton Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/kilroy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func loadEnvFile(args []string) []string {
func usage() {
fmt.Fprintln(os.Stderr, "usage:")
fmt.Fprintln(os.Stderr, " kilroy --version")
fmt.Fprintln(os.Stderr, " kilroy [--env-file <path>] attractor run [--detach] [--preflight|--test-run] [--allow-test-shim] [--confirm-stale-build] [--no-cxdb] [--force-model <provider=model>] --graph <file.dot> --config <run.yaml> [--run-id <id>] [--logs-root <dir>]")
fmt.Fprintln(os.Stderr, " kilroy [--env-file <path>] attractor run [--detach] [--validate|--preflight|--test-run] [--allow-test-shim] [--confirm-stale-build] [--no-cxdb] [--force-model <provider=model>] --graph <file.dot> --config <run.yaml> [--run-id <id>] [--logs-root <dir>]")
fmt.Fprintln(os.Stderr, " kilroy attractor resume --logs-root <dir>")
fmt.Fprintln(os.Stderr, " kilroy attractor resume --cxdb <http_base_url> --context-id <id>")
fmt.Fprintln(os.Stderr, " kilroy attractor resume --run-branch <attractor/run/...> [--repo <path>]")
Expand Down Expand Up @@ -175,7 +175,7 @@ func attractorRun(args []string) {
switch args[i] {
case "--detach":
detach = true
case "--preflight":
case "--preflight", "--validate":
preflightOnly = true
case "--test-run":
preflightOnly = true
Expand Down Expand Up @@ -233,7 +233,7 @@ func attractorRun(args []string) {
os.Exit(1)
}
if preflightOnly && detach {
fmt.Fprintln(os.Stderr, "--preflight/--test-run cannot be combined with --detach")
fmt.Fprintln(os.Stderr, "--validate/--preflight/--test-run cannot be combined with --detach")
os.Exit(1)
}
if err := ensureFreshKilroyBuild(confirmStaleBuild); err != nil {
Expand Down
30 changes: 29 additions & 1 deletion cmd/kilroy/main_exit_codes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,31 @@ digraph G {
}
}

func TestAttractorRun_ValidateAlias_Accepted(t *testing.T) {
bin := buildKilroyBinary(t)
repo := initTestRepo(t)
catalog := writePinnedCatalog(t)
cfg := writeRunConfig(t, repo, "http://127.0.0.1:9", "127.0.0.1:9", catalog)

graph := filepath.Join(t.TempDir(), "validate.dot")
_ = os.WriteFile(graph, []byte(`
digraph G {
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}
`), 0o644)

logsRoot := filepath.Join(t.TempDir(), "logs")
code, out := runKilroy(t, bin, "attractor", "run", "--graph", graph, "--config", cfg, "--run-id", "validate-flag", "--logs-root", logsRoot, "--no-cxdb", "--validate")
if code != 0 {
t.Fatalf("exit code: got %d want 0\n%s", code, out)
}
if !strings.Contains(out, "preflight=true") {
t.Fatalf("expected preflight marker, got:\n%s", out)
}
}

func TestAttractorRun_PreflightRejectsDetach(t *testing.T) {
bin := buildKilroyBinary(t)
repo := initTestRepo(t)
Expand All @@ -650,7 +675,7 @@ digraph G {
if code != 1 {
t.Fatalf("exit code: got %d want 1\n%s", code, out)
}
if !strings.Contains(out, "--preflight/--test-run cannot be combined with --detach") {
if !strings.Contains(out, "--validate/--preflight/--test-run cannot be combined with --detach") {
t.Fatalf("expected incompatible flag error, got:\n%s", out)
}
}
Expand Down Expand Up @@ -782,6 +807,9 @@ func TestUsage_IncludesPreflightFlags(t *testing.T) {
if code != 1 {
t.Fatalf("exit code: got %d want 1\n%s", code, out)
}
if !strings.Contains(out, "--validate") {
t.Fatalf("usage should include --validate; output:\n%s", out)
}
if !strings.Contains(out, "--preflight") {
t.Fatalf("usage should include --preflight; output:\n%s", out)
}
Expand Down
22 changes: 11 additions & 11 deletions demo/substack-spec-v01.dot

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion internal/attractor/engine/cli_only_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import "strings"
// cliOnlyModelIDs lists models that MUST route through CLI backend regardless
// of provider backend configuration. These models have no API endpoint.
var cliOnlyModelIDs = map[string]bool{
"gpt-5.4-spark": true,
"gpt-5.3-codex-spark": true,
"gpt-5.4-spark": true,
}

// isCLIOnlyModel returns true if the given model ID (with or without provider
Expand Down
2 changes: 1 addition & 1 deletion internal/attractor/engine/cli_only_models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func TestIsCLIOnlyModel(t *testing.T) {
want bool
}{
{"gpt-5.4-spark", true},
{"GPT-5.4-SPARK", true}, // case-insensitive
{"GPT-5.4-SPARK", true}, // case-insensitive
{"openai/gpt-5.4-spark", true}, // with provider prefix
{"gpt-5.4", false}, // regular codex
{"gpt-5.4", false},
Expand Down
97 changes: 95 additions & 2 deletions internal/attractor/engine/codergen_heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ digraph G {
// TestRunWithConfig_APIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine verifies
// that the stall watchdog still fires when the API agent_loop session is truly
// stalled (no new session events) even though the heartbeat goroutine is running.
// The conditional heartbeat should NOT emit progress when event_count is static.
// Heartbeat events are emitted unconditionally for observability but use
// appendProgressLivenessOnly when no new events are produced, which does not
// reset the stall watchdog timer.
func TestRunWithConfig_APIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine(t *testing.T) {
repo := initTestRepo(t)
logsRoot := t.TempDir()
Expand Down Expand Up @@ -417,7 +419,9 @@ digraph G {
// TestRunWithConfig_CLIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine verifies
// that the stall watchdog still fires when the CLI codergen process is truly
// stalled (no stdout/stderr output) even though the heartbeat goroutine is running.
// The conditional heartbeat should NOT emit progress when file sizes are static.
// Heartbeat events are emitted unconditionally for observability but use
// appendProgressLivenessOnly when no output growth is detected, which does not
// reset the stall watchdog timer.
func TestRunWithConfig_CLIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine(t *testing.T) {
repo := initTestRepo(t)
logsRoot := t.TempDir()
Expand Down Expand Up @@ -472,6 +476,95 @@ digraph G {
t.Logf("stall watchdog fired as expected: %v", err)
}

func TestRunWithConfig_HeartbeatEmitsDuringQuietPeriods(t *testing.T) {
repo := initTestRepo(t)
logsRoot := t.TempDir()

pinned := writePinnedCatalog(t)
cxdbSrv := newCXDBTestServer(t)

// Create a mock codex CLI that produces initial output, then goes quiet,
// then produces more output. The quiet period should still produce heartbeats.
cli := filepath.Join(t.TempDir(), "codex")
if err := os.WriteFile(cli, []byte(`#!/usr/bin/env bash
set -euo pipefail
echo '{"item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"starting"}]}}' >&1
# Quiet period: no output for 3 seconds.
sleep 3
echo '{"item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"done"}]}}' >&1
`), 0o755); err != nil {
t.Fatal(err)
}

t.Setenv("KILROY_CODERGEN_HEARTBEAT_INTERVAL", "1s")
t.Setenv("KILROY_CODEX_IDLE_TIMEOUT", "10s")

cfg := &RunConfigFile{Version: 1}
cfg.Repo.Path = repo
cfg.CXDB.BinaryAddr = cxdbSrv.BinaryAddr()
cfg.CXDB.HTTPBaseURL = cxdbSrv.URL()
cfg.LLM.CLIProfile = "test_shim"
cfg.LLM.Providers = map[string]ProviderConfig{
"openai": {Backend: BackendCLI, Executable: cli},
}
cfg.ModelDB.OpenRouterModelInfoPath = pinned
cfg.ModelDB.OpenRouterModelInfoUpdatePolicy = "pinned"
cfg.Git.RunBranchPrefix = "attractor/run"

dot := []byte(`
digraph G {
graph [goal="test quiet period heartbeats"]
start [shape=Mdiamond]
exit [shape=Msquare]
a [shape=box, llm_provider=openai, llm_model=gpt-5.4, prompt="do something quiet"]
start -> a -> exit
}
`)

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
res, err := RunWithConfig(ctx, dot, cfg, RunOptions{RunID: "quiet-heartbeat-test", LogsRoot: logsRoot, AllowTestShim: true})
if err != nil {
t.Fatalf("RunWithConfig: %v", err)
}

progressPath := filepath.Join(res.LogsRoot, "progress.ndjson")
data, err := os.ReadFile(progressPath)
if err != nil {
t.Fatalf("read progress.ndjson: %v", err)
}

heartbeats := 0
var hasQuietHeartbeat bool
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
var ev map[string]any
if err := json.Unmarshal([]byte(line), &ev); err != nil {
continue
}
if ev["event"] == "stage_heartbeat" && ev["node_id"] == "a" {
heartbeats++
if _, ok := ev["since_last_output_s"]; !ok {
t.Error("heartbeat missing since_last_output_s field")
}
sinceOutput, _ := ev["since_last_output_s"].(float64)
if sinceOutput >= 1 {
hasQuietHeartbeat = true
}
}
}
if heartbeats < 2 {
t.Fatalf("expected at least 2 heartbeat events (some during quiet period), got %d", heartbeats)
}
if !hasQuietHeartbeat {
t.Fatal("expected at least one heartbeat with since_last_output_s >= 1 (quiet period heartbeat)")
}
t.Logf("found %d heartbeat events, quiet period heartbeats present", heartbeats)
}

func TestRunWithConfig_HeartbeatStopsAfterProcessExit(t *testing.T) {
events := runHeartbeatFixture(t)
endIdx := findEventIndex(events, "stage_attempt_end", "a")
Expand Down
70 changes: 53 additions & 17 deletions internal/attractor/engine/codergen_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,21 +328,30 @@ func (r *CodergenRouter) runAPI(ctx context.Context, execCtx *Execution, node *m
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastCount int
lastEventTime := time.Now()
for {
select {
case <-ticker.C:
eventsMu.Lock()
count := len(events)
eventsMu.Unlock()
if count > lastCount {
eventsGrew := count > lastCount
if eventsGrew {
lastCount = count
if execCtx != nil && execCtx.Engine != nil {
execCtx.Engine.appendProgress(map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(apiStart).Seconds()),
"event_count": count,
})
lastEventTime = time.Now()
}
if execCtx != nil && execCtx.Engine != nil {
ev := map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(apiStart).Seconds()),
"event_count": count,
"since_last_output_s": int(time.Since(lastEventTime).Seconds()),
}
if eventsGrew {
execCtx.Engine.appendProgress(ev)
} else {
execCtx.Engine.appendProgressLivenessOnly(ev)
}
}
case <-heartbeatStop:
Expand Down Expand Up @@ -1159,22 +1168,31 @@ func (r *CodergenRouter) runCLI(ctx context.Context, execCtx *Execution, node *m
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastStdoutSz, lastStderrSz int64
lastOutputTime := time.Now()
for {
select {
case <-ticker.C:
stdoutSz, _ := fileSize(stdoutPath)
stderrSz, _ := fileSize(stderrPath)
if stdoutSz > lastStdoutSz || stderrSz > lastStderrSz {
outputGrew := stdoutSz > lastStdoutSz || stderrSz > lastStderrSz
if outputGrew {
lastStdoutSz = stdoutSz
lastStderrSz = stderrSz
if execCtx != nil && execCtx.Engine != nil {
execCtx.Engine.appendProgress(map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(start).Seconds()),
"stdout_bytes": stdoutSz,
"stderr_bytes": stderrSz,
})
lastOutputTime = time.Now()
}
if execCtx != nil && execCtx.Engine != nil {
ev := map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(start).Seconds()),
"stdout_bytes": stdoutSz,
"stderr_bytes": stderrSz,
"since_last_output_s": int(time.Since(lastOutputTime).Seconds()),
}
if outputGrew {
execCtx.Engine.appendProgress(ev)
} else {
execCtx.Engine.appendProgressLivenessOnly(ev)
}
}
case <-heartbeatStop:
Expand Down Expand Up @@ -1845,6 +1863,24 @@ func emitCXDBToolTurns(ctx context.Context, eng *Engine, nodeID string, ev agent
}
runID := eng.Options.RunID
switch ev.Kind {
case agent.EventAssistantTextEnd:
text := strings.TrimSpace(fmt.Sprint(ev.Data["text"]))
// Keep a queryable assistant turn even when the model turn is tool-only.
if text == "" {
text = "[tool_use]"
}
if _, _, err := eng.CXDB.Append(ctx, "com.kilroy.attractor.AssistantMessage", 1, map[string]any{
"run_id": runID,
"node_id": nodeID,
"text": truncate(text, 8_000),
"model": "",
"input_tokens": uint64(0),
"output_tokens": uint64(0),
"tool_use_count": uint32(0),
"timestamp_ms": nowMS(),
}); err != nil {
eng.Warn(fmt.Sprintf("cxdb append AssistantMessage failed (node=%s): %v", nodeID, err))
}
case agent.EventToolCallStart:
toolName := strings.TrimSpace(fmt.Sprint(ev.Data["tool_name"]))
callID := strings.TrimSpace(fmt.Sprint(ev.Data["call_id"]))
Expand Down
Loading
Loading