Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions demo/substack-spec-v01.dot

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion internal/attractor/engine/cli_only_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import "strings"
// cliOnlyModelIDs lists models that MUST route through CLI backend regardless
// of provider backend configuration. These models have no API endpoint.
var cliOnlyModelIDs = map[string]bool{
"gpt-5.4-spark": true,
"gpt-5.3-codex-spark": true,
"gpt-5.4-spark": true,
}

// isCLIOnlyModel returns true if the given model ID (with or without provider
Expand Down
2 changes: 1 addition & 1 deletion internal/attractor/engine/cli_only_models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func TestIsCLIOnlyModel(t *testing.T) {
want bool
}{
{"gpt-5.4-spark", true},
{"GPT-5.3-CODEX-SPARK", true}, // case-insensitive
{"GPT-5.3-CODEX-SPARK", true}, // case-insensitive
{"openai/gpt-5.4-spark", true}, // with provider prefix
{"gpt-5.4", false}, // regular codex
{"gpt-5.4", false},
Expand Down
97 changes: 95 additions & 2 deletions internal/attractor/engine/codergen_heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ digraph G {
// TestRunWithConfig_APIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine verifies
// that the stall watchdog still fires when the API agent_loop session is truly
// stalled (no new session events) even though the heartbeat goroutine is running.
// The conditional heartbeat should NOT emit progress when event_count is static.
// Heartbeat events are emitted unconditionally for observability but use
// appendProgressLivenessOnly when no new events are produced, which does not
// reset the stall watchdog timer.
func TestRunWithConfig_APIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine(t *testing.T) {
repo := initTestRepo(t)
logsRoot := t.TempDir()
Expand Down Expand Up @@ -417,7 +419,9 @@ digraph G {
// TestRunWithConfig_CLIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine verifies
// that the stall watchdog still fires when the CLI codergen process is truly
// stalled (no stdout/stderr output) even though the heartbeat goroutine is running.
// The conditional heartbeat should NOT emit progress when file sizes are static.
// Heartbeat events are emitted unconditionally for observability but use
// appendProgressLivenessOnly when no output growth is detected, which does not
// reset the stall watchdog timer.
func TestRunWithConfig_CLIBackend_StallWatchdogFiresDespiteHeartbeatGoroutine(t *testing.T) {
repo := initTestRepo(t)
logsRoot := t.TempDir()
Expand Down Expand Up @@ -472,6 +476,95 @@ digraph G {
t.Logf("stall watchdog fired as expected: %v", err)
}

func TestRunWithConfig_HeartbeatEmitsDuringQuietPeriods(t *testing.T) {
repo := initTestRepo(t)
logsRoot := t.TempDir()

pinned := writePinnedCatalog(t)
cxdbSrv := newCXDBTestServer(t)

// Create a mock codex CLI that produces initial output, then goes quiet,
// then produces more output. The quiet period should still produce heartbeats.
cli := filepath.Join(t.TempDir(), "codex")
if err := os.WriteFile(cli, []byte(`#!/usr/bin/env bash
set -euo pipefail
echo '{"item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"starting"}]}}' >&1
# Quiet period: no output for 3 seconds.
sleep 3
echo '{"item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"done"}]}}' >&1
`), 0o755); err != nil {
t.Fatal(err)
}

t.Setenv("KILROY_CODERGEN_HEARTBEAT_INTERVAL", "1s")
t.Setenv("KILROY_CODEX_IDLE_TIMEOUT", "10s")

cfg := &RunConfigFile{Version: 1}
cfg.Repo.Path = repo
cfg.CXDB.BinaryAddr = cxdbSrv.BinaryAddr()
cfg.CXDB.HTTPBaseURL = cxdbSrv.URL()
cfg.LLM.CLIProfile = "test_shim"
cfg.LLM.Providers = map[string]ProviderConfig{
"openai": {Backend: BackendCLI, Executable: cli},
}
cfg.ModelDB.OpenRouterModelInfoPath = pinned
cfg.ModelDB.OpenRouterModelInfoUpdatePolicy = "pinned"
cfg.Git.RunBranchPrefix = "attractor/run"

dot := []byte(`
digraph G {
graph [goal="test quiet period heartbeats"]
start [shape=Mdiamond]
exit [shape=Msquare]
a [shape=box, llm_provider=openai, llm_model=gpt-5.4, prompt="do something quiet"]
start -> a -> exit
}
`)

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
res, err := RunWithConfig(ctx, dot, cfg, RunOptions{RunID: "quiet-heartbeat-test", LogsRoot: logsRoot, AllowTestShim: true})
if err != nil {
t.Fatalf("RunWithConfig: %v", err)
}

progressPath := filepath.Join(res.LogsRoot, "progress.ndjson")
data, err := os.ReadFile(progressPath)
if err != nil {
t.Fatalf("read progress.ndjson: %v", err)
}

heartbeats := 0
var hasQuietHeartbeat bool
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
var ev map[string]any
if err := json.Unmarshal([]byte(line), &ev); err != nil {
continue
}
if ev["event"] == "stage_heartbeat" && ev["node_id"] == "a" {
heartbeats++
if _, ok := ev["since_last_output_s"]; !ok {
t.Error("heartbeat missing since_last_output_s field")
}
sinceOutput, _ := ev["since_last_output_s"].(float64)
if sinceOutput >= 1 {
hasQuietHeartbeat = true
}
}
}
if heartbeats < 2 {
t.Fatalf("expected at least 2 heartbeat events (some during quiet period), got %d", heartbeats)
}
if !hasQuietHeartbeat {
t.Fatal("expected at least one heartbeat with since_last_output_s >= 1 (quiet period heartbeat)")
}
t.Logf("found %d heartbeat events, quiet period heartbeats present", heartbeats)
}

func TestRunWithConfig_HeartbeatStopsAfterProcessExit(t *testing.T) {
events := runHeartbeatFixture(t)
endIdx := findEventIndex(events, "stage_attempt_end", "a")
Expand Down
52 changes: 35 additions & 17 deletions internal/attractor/engine/codergen_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,21 +328,30 @@ func (r *CodergenRouter) runAPI(ctx context.Context, execCtx *Execution, node *m
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastCount int
lastEventTime := time.Now()
for {
select {
case <-ticker.C:
eventsMu.Lock()
count := len(events)
eventsMu.Unlock()
if count > lastCount {
eventsGrew := count > lastCount
if eventsGrew {
lastCount = count
if execCtx != nil && execCtx.Engine != nil {
execCtx.Engine.appendProgress(map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(apiStart).Seconds()),
"event_count": count,
})
lastEventTime = time.Now()
}
if execCtx != nil && execCtx.Engine != nil {
ev := map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(apiStart).Seconds()),
"event_count": count,
"since_last_output_s": int(time.Since(lastEventTime).Seconds()),
}
if eventsGrew {
execCtx.Engine.appendProgress(ev)
} else {
execCtx.Engine.appendProgressLivenessOnly(ev)
}
}
case <-heartbeatStop:
Expand Down Expand Up @@ -1159,22 +1168,31 @@ func (r *CodergenRouter) runCLI(ctx context.Context, execCtx *Execution, node *m
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastStdoutSz, lastStderrSz int64
lastOutputTime := time.Now()
for {
select {
case <-ticker.C:
stdoutSz, _ := fileSize(stdoutPath)
stderrSz, _ := fileSize(stderrPath)
if stdoutSz > lastStdoutSz || stderrSz > lastStderrSz {
outputGrew := stdoutSz > lastStdoutSz || stderrSz > lastStderrSz
if outputGrew {
lastStdoutSz = stdoutSz
lastStderrSz = stderrSz
if execCtx != nil && execCtx.Engine != nil {
execCtx.Engine.appendProgress(map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(start).Seconds()),
"stdout_bytes": stdoutSz,
"stderr_bytes": stderrSz,
})
lastOutputTime = time.Now()
}
if execCtx != nil && execCtx.Engine != nil {
ev := map[string]any{
"event": "stage_heartbeat",
"node_id": node.ID,
"elapsed_s": int(time.Since(start).Seconds()),
"stdout_bytes": stdoutSz,
"stderr_bytes": stderrSz,
"since_last_output_s": int(time.Since(lastOutputTime).Seconds()),
}
if outputGrew {
execCtx.Engine.appendProgress(ev)
} else {
execCtx.Engine.appendProgressLivenessOnly(ev)
}
}
case <-heartbeatStop:
Expand Down
15 changes: 14 additions & 1 deletion internal/attractor/engine/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ import (
//
// This is best-effort: progress logging must never block or fail a run.
func (e *Engine) appendProgress(ev map[string]any) {
e.appendProgressImpl(ev, true)
}

// appendProgressLivenessOnly writes a progress event to progress.ndjson and
// live.json for observability but does NOT reset the stall watchdog timer.
// Use this for unconditional heartbeat emissions that should not mask true stalls.
func (e *Engine) appendProgressLivenessOnly(ev map[string]any) {
e.appendProgressImpl(ev, false)
}

func (e *Engine) appendProgressImpl(ev map[string]any, updateStallTimer bool) {
if e == nil {
return
}
Expand Down Expand Up @@ -47,7 +58,9 @@ func (e *Engine) appendProgress(ev map[string]any) {

e.progressMu.Lock()
defer e.progressMu.Unlock()
e.lastProgressAt = now
if updateStallTimer {
e.lastProgressAt = now
}

// Append to progress.ndjson.
// Intentionally open/close on each event so writes are immediately flushed
Expand Down