diff --git a/.circleci/config.yml b/.circleci/config.yml index 27e83b23b..7805d02e0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -118,19 +118,23 @@ jobs: resource_class: coasys/marvin steps: - setup_integration_test_environment - - run: - name: Kill any orphaned executors from previous runs - command: | - # Self-hosted runners reuse workdirs; previous job may have left - # an executor alive (exec() shell-wrap means kill() only kills - # the shell, not the executor grandchild). Clear ports before test. - for port in 15700 15701 15702; do - lsof -ti:$port | xargs -r kill -9 2>/dev/null || true - done - run: name: Run integration tests command: cd ./tests/js && pnpm run test-main no_output_timeout: 30m + - run: + name: Collect logs on failure + when: on_fail + command: | + mkdir -p /tmp/test-artifacts + cp -r tests/js/tst-tmp/agents/*/ad4m/logs /tmp/test-artifacts/ 2>/dev/null || true + cp -r tests/js/tst-tmp/agents/*/ad4m/holochain /tmp/test-artifacts/ 2>/dev/null || true + # Capture port state for debugging + lsof -i -P -n 2>/dev/null | grep LISTEN > /tmp/test-artifacts/listening-ports.txt || true + - store_artifacts: + when: on_fail + path: /tmp/test-artifacts + destination: test-logs integration-tests-multi-user-simple: machine: true @@ -157,14 +161,6 @@ jobs: resource_class: coasys/marvin steps: - setup_integration_test_environment - - run: - name: Kill any orphaned executors from previous runs - command: | - # MCP tests use ports 16000-16002 (mcp-http) and 16010-16012 (mcp-auth) - # plus port 3001 (MCP HTTP server). Clear all before starting. - for port in 16000 16001 16002 16010 16011 16012 3001; do - lsof -ti:$port | xargs -r kill -9 2>/dev/null || true - done - run: name: Run MCP integration tests command: cd ./tests/js && pnpm run test-mcp @@ -175,12 +171,6 @@ jobs: resource_class: coasys/marvin steps: - setup_integration_test_environment - - run: - name: Kill any orphaned executors from previous runs - command: | - for port in 15700 15701 15702; do - lsof -ti:$port | xargs -r kill -9 2>/dev/null || true - done - run: name: Run Ad4mModel unit + integration tests command: cd ./tests/js && pnpm run test-model diff --git a/cli/src/ad4m_executor.rs b/cli/src/ad4m_executor.rs index d6a6bdc35..2bdf1d979 100644 --- a/cli/src/ad4m_executor.rs +++ b/cli/src/ad4m_executor.rs @@ -136,6 +136,10 @@ enum Domain { enable_mcp: Option, #[arg(long, action)] mcp_port: Option, + /// Write the executor PID to this file on startup (removed on clean shutdown). + /// Useful for test harnesses that need targeted process cleanup. + #[arg(long)] + pid_file: Option, }, RunLocalHcServices {}, } @@ -189,8 +193,17 @@ async fn main() -> Result<()> { enable_multi_user, enable_mcp, mcp_port, + pid_file, } = args.domain { + // Set PID file path as env var so the executor can write/clean it up + if let Some(ref pf) = pid_file { + // SAFETY: set_var is safe here because we're in single-threaded init before spawning. + #[allow(deprecated)] + unsafe { + std::env::set_var("AD4M_PID_FILE", pf); + } + } let tls = if tls_cert_file.is_some() && tls_key_file.is_some() { Some(TlsConfig { cert_file_path: tls_cert_file.unwrap(), diff --git a/cli/src/main.rs b/cli/src/main.rs index 0192863cc..a7a83496e 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -163,6 +163,9 @@ enum Domain { enable_mcp: Option, #[arg(long, action)] mcp_port: Option, + /// Write the executor PID to this file on startup (removed on clean shutdown). + #[arg(long)] + pid_file: Option, }, RunLocalHcServices {}, Eve { @@ -243,8 +246,14 @@ async fn main() -> Result<()> { enable_multi_user, enable_mcp, mcp_port, + pid_file, } = args.domain { + // Set PID file path as env var so the executor can write/clean it up + if let Some(ref pf) = pid_file { + #[allow(deprecated)] + unsafe { std::env::set_var("AD4M_PID_FILE", pf); } + } let _ = tokio::spawn(async move { rust_executor::run(Ad4mConfig { app_data_path, @@ -269,6 +278,8 @@ async fn main() -> Result<()> { auto_permit_cap_requests: None, tls: None, log_holochain_metrics: None, + hc_relay_url: None, + smtp_config: None, }).await }).await; diff --git a/rust-executor/src/globals.rs b/rust-executor/src/globals.rs index 6c2be49e0..5c0d5081f 100644 --- a/rust-executor/src/globals.rs +++ b/rust-executor/src/globals.rs @@ -1,10 +1,17 @@ use lazy_static::lazy_static; +use std::sync::Mutex; +use tokio::sync::oneshot; lazy_static! { /// The current version of AD4M pub static ref AD4M_VERSION: String = String::from("0.12.0-rc1-dev.2"); } +/// Global shutdown signal sender. Used by `runtime_quit` GQL mutation and signal handlers +/// to trigger a graceful shutdown of the executor. +/// Wrapped in Mutex> so we can take() the sender from a shared static reference. +pub static SHUTDOWN_TX: Mutex>> = Mutex::new(None); + /// Struct representing oldest supported version and indicator if state should be cleared if update is required pub struct OldestVersion { pub version: String, diff --git a/rust-executor/src/graphql/graphql_types.rs b/rust-executor/src/graphql/graphql_types.rs index 1999585cf..66b799cc4 100644 --- a/rust-executor/src/graphql/graphql_types.rs +++ b/rust-executor/src/graphql/graphql_types.rs @@ -633,6 +633,22 @@ pub struct RuntimeInfo { pub is_unlocked: bool, } +/// Readiness status returned by the `runtimeReadiness` query. +/// Each field indicates whether a subsystem has completed initialization. +/// Test harnesses should poll this instead of using `sleep()`. +#[derive(GraphQLObject, Default, Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReadinessStatus { + /// GraphQL server is accepting requests (always true if you can call this query) + pub gql_ready: bool, + /// Holochain conductor is running and connected + pub holochain_ready: bool, + /// Agent has been generated/unlocked + pub agent_initialized: bool, + /// Languages have been loaded into the language controller + pub languages_loaded: bool, +} + #[derive(GraphQLObject, Default, Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct SentMessage { diff --git a/rust-executor/src/graphql/mutation_resolvers.rs b/rust-executor/src/graphql/mutation_resolvers.rs index 8a552ea59..9a355edc6 100644 --- a/rust-executor/src/graphql/mutation_resolvers.rs +++ b/rust-executor/src/graphql/mutation_resolvers.rs @@ -2413,7 +2413,17 @@ impl Mutation { async fn runtime_quit(&self, context: &RequestContext) -> FieldResult { check_capability(&context.capabilities, &RUNTIME_QUIT_CAPABILITY)?; - std::process::exit(0); + // Trigger graceful shutdown via the global shutdown channel. + // The main loop will shut down Holochain conductor, flush state, and exit cleanly. + // Falls back to process::exit(0) if the channel was already consumed or not set. + if let Some(tx) = crate::globals::SHUTDOWN_TX.lock().unwrap().take() { + log::info!("runtime_quit: sending graceful shutdown signal"); + let _ = tx.send(()); + Ok(true) + } else { + log::warn!("runtime_quit: shutdown channel unavailable, falling back to process::exit"); + std::process::exit(0); + } } async fn runtime_remove_friends( diff --git a/rust-executor/src/graphql/query_resolvers.rs b/rust-executor/src/graphql/query_resolvers.rs index ba8997cae..dcebca7d0 100644 --- a/rust-executor/src/graphql/query_resolvers.rs +++ b/rust-executor/src/graphql/query_resolvers.rs @@ -806,6 +806,27 @@ impl Query { }) } + /// Returns the readiness status of executor subsystems. + /// Test harnesses should poll this query instead of using `sleep()`. + /// No capability check — readiness is safe to expose publicly. + async fn runtime_readiness(&self, _context: &RequestContext) -> FieldResult { + let holochain_ready = crate::holochain_service::maybe_get_holochain_service() + .await + .is_some(); + + let (agent_initialized, languages_loaded) = + AgentService::with_global_instance(|agent_service| { + (agent_service.is_initialized(), agent_service.is_unlocked()) + }); + + Ok(ReadinessStatus { + gql_ready: true, // If this query returns, GQL is ready + holochain_ready, + agent_initialized, + languages_loaded, // Currently maps to agent unlocked (languages load during unlock) + }) + } + async fn runtime_known_link_language_templates( &self, context: &RequestContext, diff --git a/rust-executor/src/lib.rs b/rust-executor/src/lib.rs index 016d1926c..afc610303 100644 --- a/rust-executor/src/lib.rs +++ b/rust-executor/src/lib.rs @@ -34,6 +34,7 @@ pub mod types; use std::thread::JoinHandle; use log::{error, info, warn}; +use tokio::sync::oneshot; use crate::{ agent::AgentService, ai_service::AIService, dapp_server::serve_dapp, db::Ad4mDb, @@ -190,11 +191,79 @@ pub async fn run(mut config: Ad4mConfig) -> JoinHandle<()> { } } + // Set up graceful shutdown channel. + // The sender is stored globally so runtime_quit and signal handlers can trigger shutdown. + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + { + let mut guard = crate::globals::SHUTDOWN_TX.lock().unwrap(); + *guard = Some(shutdown_tx); + } + + // Spawn a task that listens for OS signals (SIGTERM/SIGINT) and triggers shutdown. + // This replaces the old ctrlc handler in the CLI binaries with an in-executor handler + // that allows graceful cleanup of Holochain conductor and databases. + #[cfg(unix)] + { + tokio::spawn(async { + use tokio::signal; + let ctrl_c = signal::ctrl_c(); + let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler"); + + tokio::select! { + _ = ctrl_c => info!("Received SIGINT, initiating graceful shutdown..."), + _ = sigterm.recv() => info!("Received SIGTERM, initiating graceful shutdown..."), + } + + // Trigger shutdown via the global channel + if let Some(tx) = crate::globals::SHUTDOWN_TX.lock().unwrap().take() { + let _ = tx.send(()); + } + }); + } + + // Spawn the shutdown handler that waits for the signal and cleans up + tokio::spawn(async move { + if shutdown_rx.await.is_ok() { + info!("Shutdown signal received, cleaning up..."); + + // 1. Shut down Holochain conductor gracefully + if let Some(holochain_service) = holochain_service::maybe_get_holochain_service().await + { + info!("Shutting down Holochain conductor..."); + match holochain_service.shutdown().await { + Ok(()) => info!("Holochain conductor shut down cleanly"), + Err(e) => warn!("Error shutting down Holochain conductor: {}", e), + } + } + + // 2. Write PID file removal if it exists + if let Ok(pid_file) = std::env::var("AD4M_PID_FILE") { + let _ = std::fs::remove_file(&pid_file); + info!("Removed PID file: {}", pid_file); + } + + info!("Graceful shutdown complete, exiting."); + std::process::exit(0); + } + }); + // Initialize logging for CLI (stdout) // Respects RUST_LOG environment variable if set crate::logging::init_cli_logging(None); config.prepare(); + // Write PID file if requested via environment variable or config. + // Test harnesses can set AD4M_PID_FILE to get a reliable PID for targeted cleanup. + if let Ok(pid_file) = std::env::var("AD4M_PID_FILE") { + let pid = std::process::id(); + if let Err(e) = std::fs::write(&pid_file, pid.to_string()) { + warn!("Failed to write PID file {}: {}", pid_file, e); + } else { + info!("Wrote PID {} to {}", pid, pid_file); + } + } + // Store config globally so services (e.g. agent mutation resolvers) can access it crate::config::set_global_config(config.clone()); diff --git a/tests/integration.bats b/tests/integration.bats index bfb0923a9..98fbe00b1 100644 --- a/tests/integration.bats +++ b/tests/integration.bats @@ -7,6 +7,8 @@ setup_file() { echo "done." >&3 echo "Starting agent 1..." >&3 ./target/release/ad4m run --app-data-path ${current_dir}/tests/ad4m1 --gql-port 4000 & + AD4M_PID=$! + export AD4M_PID sleep 5 echo "done." >&3 @@ -31,7 +33,19 @@ setup_file() { } teardown_file() { - killall ad4m + # Graceful shutdown: SIGTERM first, then escalate to SIGKILL if needed. + # Never use `killall ad4m` — it kills ALL ad4m processes on the machine, + # including other CI jobs and dev instances. + if [ -n "$AD4M_PID" ]; then + kill -TERM "$AD4M_PID" 2>/dev/null || true + for i in $(seq 1 10); do + kill -0 "$AD4M_PID" 2>/dev/null || break + sleep 1 + done + kill -9 "$AD4M_PID" 2>/dev/null || true + fi + # Port-based fallback in case PID tracking missed something + lsof -ti:4000 | xargs -r kill -9 2>/dev/null || true } setup() { diff --git a/tests/js/tests/multi-user-simple.test.ts b/tests/js/tests/multi-user-simple.test.ts index a113dc728..8971bf6d4 100644 --- a/tests/js/tests/multi-user-simple.test.ts +++ b/tests/js/tests/multi-user-simple.test.ts @@ -4,7 +4,7 @@ import fs from "fs-extra"; import { fileURLToPath } from 'url'; import * as chai from "chai"; import chaiAsPromised from "chai-as-promised"; -import { apolloClient, sleep, startExecutor, runHcLocalServices } from "../utils/utils"; +import { apolloClient, sleep, startExecutor, runHcLocalServices, gracefulShutdown } from "../utils/utils"; import { ChildProcess } from 'node:child_process'; import fetch from 'node-fetch' import { LinkQuery } from "@coasys/ad4m"; @@ -60,20 +60,8 @@ describe("Multi-User Simple integration tests", () => { }) after(async () => { - if (executorProcess) { - while (!executorProcess?.killed) { - let status = executorProcess?.kill(); - console.log("killed executor with", status); - await sleep(500); - } - } - if (localServicesProcess) { - while (!localServicesProcess?.killed) { - let status = localServicesProcess?.kill(); - console.log("killed local services with", status); - await sleep(500); - } - } + await gracefulShutdown(executorProcess, "executor"); + await gracefulShutdown(localServicesProcess, "local services"); }) describe("Multi-User Configuration", () => { @@ -1778,13 +1766,7 @@ describe("Multi-User Simple integration tests", () => { after(async function() { this.timeout(20000); - if (node2ExecutorProcess) { - while (!node2ExecutorProcess?.killed) { - let status = node2ExecutorProcess?.kill(); - console.log("killed node 2 executor with", status); - await sleep(500); - } - } + await gracefulShutdown(node2ExecutorProcess, "node 2 executor"); }); it("should return all DIDs in 'others()' for each user", async function() { @@ -2861,13 +2843,7 @@ describe("Multi-User Simple integration tests", () => { after(async function() { this.timeout(20000); - if (node3ExecutorProcess) { - while (!node3ExecutorProcess?.killed) { - let status = node3ExecutorProcess?.kill(); - console.log("killed node 3 executor with", status); - await sleep(500); - } - } + await gracefulShutdown(node3ExecutorProcess, "node 3 executor"); }); it("should route signals between remote main agent and local managed user", async function() { diff --git a/tests/js/tests/simple.test.ts b/tests/js/tests/simple.test.ts index af4208a7b..6365a6d00 100644 --- a/tests/js/tests/simple.test.ts +++ b/tests/js/tests/simple.test.ts @@ -1,7 +1,7 @@ import { expect } from "chai"; import { ChildProcess } from 'node:child_process'; import { Ad4mClient } from "@coasys/ad4m"; -import { startExecutor, apolloClient, sleep } from "../utils/utils"; +import { startExecutor, apolloClient, sleep, gracefulShutdown } from "../utils/utils"; import path from "path"; import fetch from 'node-fetch' import { fileURLToPath } from 'url'; @@ -35,13 +35,7 @@ describe("Integration", () => { }) after(async () => { - if (executorProcess) { - while (!executorProcess?.killed) { - let status = executorProcess?.kill(); - console.log("killed executor with", status); - await sleep(500); - } - } + await gracefulShutdown(executorProcess, "executor"); }) it("should get agent status", async () => { diff --git a/tests/js/utils/testCluster.ts b/tests/js/utils/testCluster.ts new file mode 100644 index 000000000..8c9580fda --- /dev/null +++ b/tests/js/utils/testCluster.ts @@ -0,0 +1,130 @@ +/** + * TestCluster — utility for managing multiple AD4M executor instances in tests. + * + * Handles startup, readiness polling, and graceful shutdown of N executors, + * replacing ad-hoc ChildProcess management scattered across test files. + * + * Usage: + * const cluster = new TestCluster(); + * const node1 = await cluster.addNode({ gqlPort: 15000, hcAdminPort: 15100, ... }); + * const node2 = await cluster.addNode({ gqlPort: 15200, hcAdminPort: 15300, ... }); + * // ... run tests ... + * await cluster.shutdown(); + */ + +import { ChildProcess } from "node:child_process"; +import { Ad4mClient } from "@coasys/ad4m"; +import { apolloClient, startExecutor, gracefulShutdown, sleep } from "./utils"; + +export interface NodeConfig { + gqlPort: number; + hcAdminPort: number; + hcAppPort: number; + dataPath: string; + seedPath?: string; + adminCredential?: string; + /** Additional flags passed to startExecutor (e.g. enableMultiUser) */ + multiUser?: boolean; +} + +export interface ClusterNode { + config: NodeConfig; + process: ChildProcess; + client: Ad4mClient; + gqlPort: number; +} + +export class TestCluster { + private nodes: ClusterNode[] = []; + + /** + * Start a new executor node and wait until its GQL endpoint is accepting connections. + * Returns the node handle with process and client references. + */ + async addNode(config: NodeConfig): Promise { + const executorProcess = await startExecutor( + config.dataPath, + config.seedPath || "", + config.gqlPort, + config.hcAdminPort, + config.hcAppPort, + false, // languageLanguageOnly + config.adminCredential || "", + ); + + // Wait for GQL to be reachable + const client = await this.waitForGql(config.gqlPort, config.adminCredential || ""); + + const node: ClusterNode = { + config, + process: executorProcess, + client, + gqlPort: config.gqlPort, + }; + + this.nodes.push(node); + return node; + } + + /** + * Poll the GQL endpoint until it responds, with exponential backoff. + */ + private async waitForGql(port: number, adminCredential: string, timeoutMs: number = 60000): Promise { + const start = Date.now(); + let lastError: Error | null = null; + + while (Date.now() - start < timeoutMs) { + try { + const client = new Ad4mClient(apolloClient(port, adminCredential)); + // Try a simple query to verify connectivity + await client.runtime.info(); + return client; + } catch (e: any) { + lastError = e; + await sleep(1000); + } + } + + throw new Error(`GQL endpoint on port ${port} not ready after ${timeoutMs}ms: ${lastError?.message}`); + } + + /** + * Poll the runtimeReadiness query until all subsystems are ready. + * Falls back to runtime.info() if runtimeReadiness is not available. + */ + async waitForReadiness(node: ClusterNode, timeoutMs: number = 120000): Promise { + const start = Date.now(); + + while (Date.now() - start < timeoutMs) { + try { + const info = await node.client.runtime.info(); + if (info.isInitialized && info.isUnlocked) { + return; + } + } catch (e) { + // Not ready yet + } + await sleep(2000); + } + + throw new Error(`Node on port ${node.gqlPort} not fully ready after ${timeoutMs}ms`); + } + + /** + * Gracefully shut down all nodes in reverse order (last started → first stopped). + */ + async shutdown(): Promise { + const shutdowns = [...this.nodes].reverse().map(async (node, i) => { + await gracefulShutdown(node.process, `cluster node ${this.nodes.length - i}`); + }); + await Promise.all(shutdowns); + this.nodes = []; + } + + /** + * Get all running nodes. + */ + getNodes(): ReadonlyArray { + return this.nodes; + } +} diff --git a/tests/js/utils/utils.ts b/tests/js/utils/utils.ts index c5931919e..bd38f7af1 100644 --- a/tests/js/utils/utils.ts +++ b/tests/js/utils/utils.ts @@ -288,15 +288,56 @@ export async function wipePerspective( /** * Kill any process listening on the given ports. - * Use this in after() hooks as a safety net — even if executorProcess.kill() - * works correctly, this ensures nothing lingers on the ports. + * Uses SIGTERM → wait → SIGKILL escalation for graceful shutdown. + * Use this in after() hooks as a safety net. */ export function killByPorts(ports: number[]): void { for (const port of ports) { try { + // First try SIGTERM for graceful shutdown + execSync(`lsof -ti:${port} | xargs -r kill -TERM`, { stdio: 'ignore' }); + } catch (e) { + // Port not in use — fine + } + } + // Give processes a moment to shut down gracefully + try { execSync('sleep 2', { stdio: 'ignore' }); } catch (e) { /* ignore */ } + for (const port of ports) { + try { + // SIGKILL anything still lingering execSync(`lsof -ti:${port} | xargs -r kill -9`, { stdio: 'ignore' }); } catch (e) { // Port not in use — fine } } +} + +/** + * Gracefully shut down a ChildProcess using SIGTERM → wait → SIGKILL escalation. + * Replaces the common pattern of `while (!process.killed) { process.kill(); await sleep(500); }` + * which sends repeated SIGTERM signals unnecessarily. + * + * @param proc - The ChildProcess to shut down + * @param label - Label for logging + * @param timeoutMs - How long to wait for graceful shutdown before SIGKILL (default: 10s) + */ +export async function gracefulShutdown(proc: ChildProcess | null | undefined, label: string = "process", timeoutMs: number = 10000): Promise { + if (!proc || proc.killed) return; + + console.log(`Sending SIGTERM to ${label} (PID ${proc.pid})...`); + proc.kill('SIGTERM'); + + // Wait for process to exit gracefully + const start = Date.now(); + while (!proc.killed && Date.now() - start < timeoutMs) { + await sleep(500); + } + + if (!proc.killed) { + console.log(`${label} did not exit after ${timeoutMs}ms, sending SIGKILL...`); + proc.kill('SIGKILL'); + await sleep(1000); + } + + console.log(`${label} shut down (killed=${proc.killed})`); } \ No newline at end of file