feat(daemon): add daemon mode with session wait/attach#528
feat(daemon): add daemon mode with session wait/attach#528RyderFreeman4Logos merged 3 commits intomainfrom
Conversation
Implement `csa run --daemon` to detach as a background process using setsid + spool file I/O. Add `csa session wait/attach` for polling and real-time tailing of daemon sessions. Session ID is pre-assigned by the parent and propagated via CSA_DAEMON_SESSION_ID env var so the pipeline reuses the same directory as the spool files. Key changes: - DaemonSpawner in csa-process with setsid, stdout/stderr spool files - --daemon (opt-in) and --daemon-child (internal) flags on `csa run` - `csa session wait --session <ID>` polls result.toml - `csa session attach --session <ID>` tails stdout.log in real time - RPJ directive CSA:SESSION_STARTED with id/pid/dir/wait_cmd/attach_cmd - Extract session_dispatch.rs, run_cmd_daemon.rs, session_cmds_daemon.rs from main.rs/session_cmds.rs to stay under 800-line monolith limit
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f6bce994e8
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| cmd.env(k, v); | ||
| } | ||
|
|
||
| cmd.stdin(Stdio::null()); |
There was a problem hiding this comment.
Preserve stdin when daemonizing piped prompts
Redirecting daemon child stdin to /dev/null breaks the documented stdin prompt path (read_prompt reads stdin when positional prompt is omitted). In csa run --daemon, a piped prompt like echo "task" | csa run --daemon ... reaches a child with empty stdin, so the run fails with an empty-prompt error before producing result.toml, and session wait can then block indefinitely unless a timeout is set.
Useful? React with 👍 / 👎.
crates/csa-session/src/manager.rs
Outdated
| let session_id = std::env::var("CSA_DAEMON_SESSION_ID") | ||
| .ok() | ||
| .filter(|s| !s.is_empty()) | ||
| .unwrap_or_else(new_session_id); |
There was a problem hiding this comment.
Validate daemon session ID before using it as a path
create_session_in now accepts CSA_DAEMON_SESSION_ID verbatim for all session creation paths, but this value is not validated as a ULID before being used to construct session_dir. If that env var is set externally (or via the hidden --session-id path), malformed values can create invalid/nonstandard session IDs and path traversal segments (because the value is joined directly), which can corrupt session storage and break commands that assume ULID IDs.
Useful? React with 👍 / 👎.
| if result_path.exists() { | ||
| loop { | ||
| let n = stdout_file.read(&mut buf)?; | ||
| if n == 0 { | ||
| break; | ||
| } | ||
| std::io::stdout().write_all(&buf[..n])?; | ||
| } | ||
| std::io::stdout().flush()?; | ||
| return Ok(0); |
There was a problem hiding this comment.
Drain stderr to EOF before returning from attach
When session attach --stderr sees result.toml, it drains stdout.log but returns without draining stderr.log. This truncates stderr output whenever unread bytes remain at completion (e.g., attaching after completion or large stderr bursts), so users do not see the full daemon output even though --stderr was requested.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request implements daemonization for the run command, enabling background execution and management via new session wait and attach subcommands. The implementation includes a low-level daemon spawning utility and a refactored session command dispatcher. Review feedback highlights a potential memory issue when reading log files, the need to propagate actual session exit codes to the caller, and fragility in the argument forwarding logic that could lead to lost global flags.
| if result_path.exists() { | ||
| let stdout_log = session_dir.join("stdout.log"); | ||
| if stdout_log.is_file() { | ||
| let content = fs::read_to_string(&stdout_log)?; | ||
| if !content.is_empty() { | ||
| print!("{content}"); | ||
| } | ||
| } | ||
| return Ok(0); | ||
| } |
There was a problem hiding this comment.
Reading the entire stdout.log into memory using fs::read_to_string is dangerous and can lead to an Out-Of-Memory (OOM) crash if the daemon process generates a large amount of output. It is better to stream the file content directly to standard output. Additionally, the command should return the actual exit code of the session from result.toml rather than always returning 0 on success.
| if result_path.exists() { | |
| let stdout_log = session_dir.join("stdout.log"); | |
| if stdout_log.is_file() { | |
| let content = fs::read_to_string(&stdout_log)?; | |
| if !content.is_empty() { | |
| print!("{content}"); | |
| } | |
| } | |
| return Ok(0); | |
| } | |
| if result_path.exists() { | |
| let stdout_log = session_dir.join("stdout.log"); | |
| if stdout_log.is_file() { | |
| let mut file = fs::File::open(&stdout_log)?; | |
| std::io::copy(&mut file, &mut std::io::stdout())?; | |
| } | |
| let exit_code = csa_session::load_result(&project_root, &resolved.session_id)? | |
| .map(|r| r.exit_code) | |
| .unwrap_or(0); | |
| return Ok(exit_code); | |
| } |
| let run_pos = all_args.iter().position(|a| a == "run").unwrap_or(1); | ||
| let forwarded_args: Vec<String> = all_args.iter().skip(run_pos + 1).cloned().collect(); |
There was a problem hiding this comment.
The current logic for collecting forwarded arguments is fragile and results in the loss of global flags (e.g., --format, --config) that appear before the run subcommand. This happens because run_pos only captures the position of the run verb and skips everything before it. Additionally, if the user provides a command like csa run run ..., this will find the first run and potentially skip the second one if it was intended as a positional argument.
| if result_path.exists() { | ||
| loop { | ||
| let n = stdout_file.read(&mut buf)?; | ||
| if n == 0 { | ||
| break; | ||
| } | ||
| std::io::stdout().write_all(&buf[..n])?; | ||
| } | ||
| std::io::stdout().flush()?; | ||
| return Ok(0); | ||
| } |
There was a problem hiding this comment.
The attach command should return the actual exit code of the session from result.toml when the session completes, rather than always returning 0. This allows callers to determine if the background task succeeded or failed.
if result_path.exists() {
loop {
let n = stdout_file.read(&mut buf)?;
if n == 0 {
break;
}
std::io::stdout().write_all(&buf[..n])?;
}
std::io::stdout().flush()?;
let exit_code = csa_session::load_result(&project_root, &resolved.session_id)?
.map(|r| r.exit_code)
.unwrap_or(0);
return Ok(exit_code);
}
Bot Finding ArbitrationG2 [MEDIUM] run_cmd_daemon.rs:29 — Fragile arg forwardingClassification: Accepted limitation (not false positive, but low risk for v1). The arg forwarding uses C1 [P1] daemon.rs:62 — Preserve stdin for piped promptsClassification: False positive. The |
- G1: stream stdout.log via io::copy instead of read_to_string (OOM risk) - G3: attach returns exit code from result.toml instead of always 0 - C2: validate CSA_DAEMON_SESSION_ID as ULID before use - C3: drain stderr to EOF before returning from attach
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request adds daemonization support to the csa run command, enabling background execution and monitoring through new wait and attach subcommands. The implementation introduces a daemon module for process detaching and refactors session command dispatching. Review feedback identifies several critical improvements, including handling daemon crashes in polling loops, resolving race conditions when tailing logs, and adding necessary platform-specific guards for Unix-only code. Concerns were also raised regarding thread-safe environment variable management and the preservation of global CLI flags when spawning the daemon process.
| loop { | ||
| if result_path.exists() { | ||
| // Stream stdout.log to avoid OOM on large daemon output. | ||
| let stdout_log = session_dir.join("stdout.log"); | ||
| if stdout_log.is_file() { | ||
| let mut f = std::fs::File::open(&stdout_log)?; | ||
| std::io::copy(&mut f, &mut std::io::stdout().lock())?; | ||
| } | ||
| return Ok(0); | ||
| } | ||
|
|
||
| if timeout_secs > 0 && start.elapsed().as_secs() >= timeout_secs { | ||
| eprintln!( | ||
| "Timeout: session {} did not complete within {}s", | ||
| resolved.session_id, timeout_secs | ||
| ); | ||
| return Ok(124); | ||
| } | ||
|
|
||
| std::thread::sleep(poll_interval); | ||
| } |
There was a problem hiding this comment.
The polling loop in handle_session_wait only exits when result.toml appears or a timeout occurs. If the daemon process crashes or is killed (e.g., by the OOM killer) before it can write the result file, this command will hang indefinitely (or until the timeout). It is recommended to check if the daemon process is still alive using csa_process::ToolLiveness::is_alive(&session_dir) within the loop.
loop {
if result_path.exists() {
// Stream stdout.log to avoid OOM on large daemon output.
let stdout_log = session_dir.join("stdout.log");
if stdout_log.is_file() {
let mut f = std::fs::File::open(&stdout_log)?;
std::io::copy(&mut f, &mut std::io::stdout().lock())?;
}
return Ok(0);
}
if timeout_secs > 0 && start.elapsed().as_secs() >= timeout_secs {
eprintln!(
"Timeout: session {} did not complete within {}s",
resolved.session_id, timeout_secs
);
return Ok(124);
}
if !csa_process::ToolLiveness::is_alive(&session_dir) {
// Final check for result.toml to avoid race condition
if !result_path.exists() {
anyhow::bail!("Daemon process for session {} died unexpectedly", resolved.session_id);
}
}
std::thread::sleep(poll_interval);
}| loop { | ||
| let mut any_output = false; | ||
|
|
||
| let n = stdout_file.read(&mut buf)?; | ||
| if n > 0 { | ||
| std::io::stdout().write_all(&buf[..n])?; | ||
| std::io::stdout().flush()?; | ||
| any_output = true; | ||
| } | ||
|
|
||
| if let Some(ref mut f) = stderr_file { | ||
| let n = f.read(&mut buf)?; | ||
| if n > 0 { | ||
| std::io::stderr().write_all(&buf[..n])?; | ||
| std::io::stderr().flush()?; | ||
| any_output = true; | ||
| } | ||
| } | ||
|
|
||
| if result_path.exists() { | ||
| // Drain remaining stdout. | ||
| loop { | ||
| let n = stdout_file.read(&mut buf)?; | ||
| if n == 0 { | ||
| break; | ||
| } | ||
| std::io::stdout().write_all(&buf[..n])?; | ||
| } | ||
| std::io::stdout().flush()?; | ||
| // Drain remaining stderr. | ||
| if let Some(ref mut f) = stderr_file { | ||
| loop { | ||
| let n = f.read(&mut buf)?; | ||
| if n == 0 { | ||
| break; | ||
| } | ||
| std::io::stderr().write_all(&buf[..n])?; | ||
| } | ||
| std::io::stderr().flush()?; | ||
| } | ||
| // Return the session's exit code from result.toml. | ||
| let exit_code = fs::read_to_string(&result_path) | ||
| .ok() | ||
| .and_then(|s| toml::from_str::<csa_session::result::SessionResult>(&s).ok()) | ||
| .map(|r| r.exit_code) | ||
| .unwrap_or(0); | ||
| return Ok(exit_code); | ||
| } | ||
|
|
||
| if !any_output { | ||
| std::thread::sleep(poll_interval); | ||
| } | ||
| } |
There was a problem hiding this comment.
Similar to handle_session_wait, the Attach loop can hang indefinitely if the daemon process terminates without creating a result file. Additionally, the stderr file should be opened dynamically if it appears after the loop has started.
loop {
let mut any_output = false;
if show_stderr && stderr_file.is_none() && stderr_path.exists() {
stderr_file = Some(std::fs::File::open(&stderr_path)?);
}
let n = stdout_file.read(&mut buf)?;
if n > 0 {
std::io::stdout().write_all(&buf[..n])?;
std::io::stdout().flush()?;
any_output = true;
}
if let Some(ref mut f) = stderr_file {
let n = f.read(&mut buf)?;
if n > 0 {
std::io::stderr().write_all(&buf[..n])?;
std::io::stderr().flush()?;
any_output = true;
}
}
if result_path.exists() {
// Drain remaining stdout.
loop {
let n = stdout_file.read(&mut buf)?;
if n == 0 {
break;
}
std::io::stdout().write_all(&buf[..n])?;
}
std::io::stdout().flush()?;
// Drain remaining stderr.
if let Some(ref mut f) = stderr_file {
loop {
let n = f.read(&mut buf)?;
if n == 0 {
break;
}
std::io::stderr().write_all(&buf[..n])?;
}
std::io::stderr().flush()?;
}
// Return the session's exit code from result.toml.
let exit_code = fs::read_to_string(&result_path)
.ok()
.and_then(|s| toml::from_str::<csa_session::result::SessionResult>(&s).ok())
.map(|r| r.exit_code)
.unwrap_or(0);
return Ok(exit_code);
}
if !any_output {
if !csa_process::ToolLiveness::is_alive(&session_dir) && !result_path.exists() {
anyhow::bail!("Daemon process for session {} died unexpectedly", resolved.session_id);
}
std::thread::sleep(poll_interval);
}
}
crates/cli-sub-agent/src/main.rs
Outdated
| // SAFETY: daemon child is a single-threaded process at this point. | ||
| unsafe { std::env::set_var("CSA_DAEMON_SESSION_ID", sid) }; |
There was a problem hiding this comment.
The SAFETY comment is inaccurate. Since this code is running inside an async function managed by #[tokio::main], the tokio runtime has already started its worker threads, making the process multi-threaded. Calling std::env::set_var in a multi-threaded environment is technically undefined behavior in Rust 1.80+. While unlikely to cause issues here as it happens early in the execution, it would be safer to parse the CLI and set the environment variable in a synchronous main before starting the tokio runtime.
| let all_args: Vec<String> = std::env::args().collect(); | ||
| let run_pos = all_args.iter().position(|a| a == "run").unwrap_or(1); | ||
| let forwarded_args: Vec<String> = all_args.iter().skip(run_pos + 1).cloned().collect(); |
There was a problem hiding this comment.
This logic for collecting forwarded arguments only captures arguments appearing after the run subcommand. Any global flags passed before the subcommand (e.g., csa --format json run --daemon) will be lost in the daemonized child process. You should consider capturing and forwarding global flags as well to ensure the daemon child respects the user's configuration.
| let mut stderr_file = if show_stderr && stderr_path.exists() { | ||
| Some(std::fs::File::open(&stderr_path)?) | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
If stderr.log does not exist at the exact moment attach starts (which is common as the daemon child may still be initializing), stderr_file will be None for the duration of the command. This means csa session attach --stderr will fail to show any stderr output if the file is created even a few milliseconds late. The logic to open the stderr file should be moved inside the loop.
let mut stderr_file = None;| @@ -0,0 +1,202 @@ | |||
| //! Daemon spawning: detach a child process with setsid + redirected I/O. | |||
There was a problem hiding this comment.
This file uses Unix-specific extensions (std::os::unix::process::CommandExt) and POSIX APIs (libc::setsid). It will fail to compile on non-Unix platforms such as Windows. Please add a #[cfg(unix)] guard to the file or the module declaration.
| //! Daemon spawning: detach a child process with setsid + redirected I/O. | |
| #[cfg(unix)] | |
| //! Daemon spawning: detach a child process with setsid + redirected I/O. |
| use tool_liveness::record_spool_bytes_written; | ||
| pub use tool_liveness::{DEFAULT_LIVENESS_DEAD_SECS, ToolLiveness}; | ||
|
|
||
| pub mod daemon; |
Daemon fixes: - G4/G5: PID liveness check prevents wait/attach from hanging on dead daemons - G6: Corrected SAFETY comment for set_var (pre-tokio thread spawn) - G8: Lazy-open stderr in attach (handles late stderr.log creation) - G9/G10: #[cfg(unix)] guard on daemon module pr-bot improvements: - Changed bot quiet-wait from 5 minutes to 250 seconds (KV cache alignment) - Added review gate (csa review --range main...HEAD) after fix commit in Step 8 - Updated PATTERN.md and workflow.toml in sync (rule 027)
|
/gemini review |
Round 3 Bot Comment Staleness AnalysisAll HIGH/MEDIUM findings in this round are stale repeats of issues already fixed in commit
Bot reviews are against the full PR diff, not the latest commit. All actionable findings have been addressed. |
There was a problem hiding this comment.
Code Review
This pull request implements daemonization for the csa run command, enabling background execution of agent sessions, and includes a version bump to 0.1.194. It introduces session wait and session attach subcommands for monitoring background processes, along with low-level utilities for process detachment and I/O redirection. Feedback identifies critical issues including unsafe environment variable modification in a multi-threaded context, lack of log rotation support during attachment, fragile PID parsing from log files, and a race condition in PID detection. It is also suggested that the wait command return the actual session exit code for better consistency.
| if let Some(ref sid) = session_id { | ||
| // SAFETY: This runs in the daemon child before tokio spawns worker | ||
| // threads (we are still in the synchronous dispatch path of main). | ||
| unsafe { std::env::set_var("CSA_DAEMON_SESSION_ID", sid) }; |
There was a problem hiding this comment.
Using std::env::set_var in a multi-threaded environment is unsafe and can lead to data races or undefined behavior. Since this code runs within a #[tokio::main] async context, the runtime is already initialized and potentially multi-threaded. It is highly recommended to pass the session ID explicitly through the pipeline configuration or function arguments instead of relying on a global environment variable.
| loop { | ||
| let mut any_output = false; | ||
|
|
||
| let n = stdout_file.read(&mut buf)?; | ||
| if n > 0 { | ||
| std::io::stdout().write_all(&buf[..n])?; | ||
| std::io::stdout().flush()?; | ||
| any_output = true; | ||
| } | ||
|
|
||
| // Lazy-open stderr if it appeared after we started. | ||
| if show_stderr && stderr_file.is_none() && stderr_path.exists() { | ||
| stderr_file = std::fs::File::open(&stderr_path).ok(); | ||
| } | ||
| if let Some(ref mut f) = stderr_file { | ||
| let n = f.read(&mut buf)?; | ||
| if n > 0 { | ||
| std::io::stderr().write_all(&buf[..n])?; | ||
| std::io::stderr().flush()?; | ||
| any_output = true; | ||
| } | ||
| } | ||
|
|
||
| if result_path.exists() { | ||
| // Drain remaining stdout. | ||
| loop { | ||
| let n = stdout_file.read(&mut buf)?; | ||
| if n == 0 { | ||
| break; | ||
| } | ||
| std::io::stdout().write_all(&buf[..n])?; | ||
| } | ||
| std::io::stdout().flush()?; | ||
| // Drain remaining stderr. | ||
| if let Some(ref mut f) = stderr_file { | ||
| loop { | ||
| let n = f.read(&mut buf)?; | ||
| if n == 0 { | ||
| break; | ||
| } | ||
| std::io::stderr().write_all(&buf[..n])?; | ||
| } | ||
| std::io::stderr().flush()?; | ||
| } | ||
| // Return the session's exit code from result.toml. | ||
| let exit_code = fs::read_to_string(&result_path) | ||
| .ok() | ||
| .and_then(|s| toml::from_str::<csa_session::result::SessionResult>(&s).ok()) | ||
| .map(|r| r.exit_code) | ||
| .unwrap_or(0); | ||
| return Ok(exit_code); | ||
| } | ||
|
|
||
| // Detect dead daemon: PID gone but no result.toml. | ||
| if let Some(pid) = daemon_pid | ||
| && !is_pid_alive(pid) | ||
| { | ||
| eprintln!( | ||
| "Daemon process {} exited without producing result.toml", | ||
| pid, | ||
| ); | ||
| return Ok(1); | ||
| } | ||
|
|
||
| if !any_output { | ||
| std::thread::sleep(poll_interval); | ||
| } | ||
| } |
There was a problem hiding this comment.
The attach logic tails stdout.log using a single file handle. If the daemon process uses SpoolRotator (which appears to support rotation/truncation based on csa-process/src/lib.rs), the current file handle will stop receiving new data once rotation occurs. The tailing logic should detect when the file has been rotated and reopen the new log file to continue streaming output.
| if let Some(pid_start) = content.find("pid=") { | ||
| let rest = &content[pid_start + 4..]; | ||
| let pid_str: String = rest.chars().take_while(|c| c.is_ascii_digit()).collect(); |
There was a problem hiding this comment.
|
|
||
| let start = std::time::Instant::now(); | ||
| let poll_interval = std::time::Duration::from_secs(1); | ||
| let daemon_pid = read_daemon_pid(&session_dir); |
There was a problem hiding this comment.
The daemon PID is only read once before the polling loop starts. If the daemon process takes a moment to start and write the CSA:SESSION_STARTED directive to stderr.log, read_daemon_pid will return None, and the loop will never be able to detect if the daemon process has crashed (line 67). Consider retrying read_daemon_pid inside the loop if it initially returns None.
| let mut f = std::fs::File::open(&stdout_log)?; | ||
| std::io::copy(&mut f, &mut std::io::stdout().lock())?; | ||
| } | ||
| return Ok(0); |
There was a problem hiding this comment.
The handle_session_wait function returns a hardcoded Ok(0) upon successful completion. To maintain consistency with handle_session_attach and provide more useful information to the caller, it should read and return the actual exit code from result.toml.
let exit_code = fs::read_to_string(&result_path)
.ok()
.and_then(|s| toml::from_str::<csa_session::result::SessionResult>(&s).ok())
.map(|r| r.exit_code)
.unwrap_or(0);
return Ok(exit_code);
Summary
csa run --daemonto detach as background daemon (setsid + spool files)csa session wait --session <ID>for polling result.tomlcsa session attach --session <ID>for real-time stdout tailingCSA:SESSION_STARTEDwith id/pid/dir/wait_cmd/attach_cmdCSA_DAEMON_SESSION_IDenv var for directory reuseDesign Decisions
--daemonflag required (not default) to avoid breaking existing skills/patterns before migration (chore(hooks): migrate to git native + CSA prompt guards #155)Test plan
cargo test --package csa-process --lib daemon(2 tests: spool files + detachment)just pre-commitpasses (fmt, clippy, test, monolith check)csa review --diffPASS (claude-code)csa review --range main...HEADPASS (cumulative)csa run --daemon "echo hello"→ prints session ID,csa session waitreturns output🤖 Generated with Claude Code