diff --git a/include/ghostty.h b/include/ghostty.h index 65b1cdc5a4..2a017b3535 100644 --- a/include/ghostty.h +++ b/include/ghostty.h @@ -460,6 +460,9 @@ typedef struct { ghostty_surface_io_mode_e io_mode; ghostty_io_write_cb io_write_cb; void* io_write_userdata; + const char* zmx_session; + bool zmx_create; + bool zmx_mode; } ghostty_surface_config_s; typedef struct { diff --git a/src/Surface.zig b/src/Surface.zig index 50e55e722a..132d584194 100644 --- a/src/Surface.zig +++ b/src/Surface.zig @@ -629,40 +629,46 @@ pub fn init( // This separate block ({}) is important because our errdefers must // be scoped here to be valid. { - var env = rt_surface.defaultTermioEnv() catch |err| env: { - // If an error occurs, we don't want to block surface startup. - log.warn("error getting env map for surface err={}", .{err}); - break :env internal_os.getEnvMap(alloc) catch - std.process.EnvMap.init(alloc); - }; - errdefer env.deinit(); - - // don't leak GHOSTTY_LOG to any subprocesses - env.remove("GHOSTTY_LOG"); + // Determine the IO backend: zmx > manual > exec + // Try zmx first (if configured and binary is available) + var zmx_backend: ?termio.Zmx = null; + if (config.@"zmx-session") |session| { + zmx_backend = termio.Zmx.init(alloc, .{ + .session_name = session, + .create_if_missing = config.@"zmx-create", + .working_directory = config.@"working-directory", + }) catch |err| switch (err) { + error.ZmxNotFound => blk: { + log.warn("zmx binary not found, falling back to exec", .{}); + break :blk null; + }, + else => return err, + }; + } - // Initialize our IO backend - if (use_manual_io) { + var io_backend: termio.Backend = if (zmx_backend) |zmx| + .{ .zmx = zmx } + else if (use_manual_io) manual_backend: { var io_manual = try termio.Manual.init(alloc, .{ .write_cb = manual_write_cb, .write_userdata = manual_write_userdata, }); - errdefer io_manual.deinit(); - - var io_mailbox = try termio.Mailbox.initSPSC(alloc); - errdefer io_mailbox.deinit(alloc); - - try termio.Termio.init(&self.io, alloc, .{ - .size = size, - .full_config = config, - .config = try termio.Termio.DerivedConfig.init(alloc, config), - .backend = .{ .manual = io_manual }, - .mailbox = io_mailbox, - .renderer_state = &self.renderer_state, - .renderer_wakeup = render_thread.wakeup, - .renderer_mailbox = render_thread.mailbox, - .surface_mailbox = .{ .surface = self, .app = app_mailbox }, - }); - } else { + _ = &io_manual; + break :manual_backend .{ .manual = io_manual }; + } else exec_backend: { + var env = rt_surface.defaultTermioEnv() catch |err| env: { + log.warn("error getting env map for surface err={}", .{err}); + break :env internal_os.getEnvMap(alloc) catch + std.process.EnvMap.init(alloc); + }; + errdefer env.deinit(); + + env.remove("GHOSTTY_LOG"); + + // Don't leak parent zmx session into child terminals. + // Each terminal gets its own zmx session via the zmx backend config. + env.remove("ZMX_SESSION"); + var io_exec = try termio.Exec.init(alloc, .{ .command = command, .env = env, @@ -676,24 +682,28 @@ pub fn init( .rt_pre_exec_info = .init(config), .rt_post_fork_info = .init(config), }); - errdefer io_exec.deinit(); - - // Initialize our IO mailbox - var io_mailbox = try termio.Mailbox.initSPSC(alloc); - errdefer io_mailbox.deinit(alloc); - - try termio.Termio.init(&self.io, alloc, .{ - .size = size, - .full_config = config, - .config = try termio.Termio.DerivedConfig.init(alloc, config), - .backend = .{ .exec = io_exec }, - .mailbox = io_mailbox, - .renderer_state = &self.renderer_state, - .renderer_wakeup = render_thread.wakeup, - .renderer_mailbox = render_thread.mailbox, - .surface_mailbox = .{ .surface = self, .app = app_mailbox }, - }); - } + _ = &io_exec; + break :exec_backend .{ .exec = io_exec }; + }; + errdefer io_backend.deinit(); + + var io_mailbox = try termio.Mailbox.initSPSC(alloc); + errdefer io_mailbox.deinit(alloc); + + var io_config = try termio.Termio.DerivedConfig.init(alloc, config); + errdefer io_config.deinit(); + + try termio.Termio.init(&self.io, alloc, .{ + .size = size, + .full_config = config, + .config = io_config, + .backend = io_backend, + .mailbox = io_mailbox, + .renderer_state = &self.renderer_state, + .renderer_wakeup = render_thread.wakeup, + .renderer_mailbox = render_thread.mailbox, + .surface_mailbox = .{ .surface = self, .app = app_mailbox }, + }); } // Outside the block, IO has now taken ownership of our temporary state // so we can just defer this and not the subcomponents. @@ -1092,6 +1102,7 @@ pub fn handleMessage(self: *Surface, msg: Message) !void { .close => self.close(), .child_exited => |v| self.childExited(v), + .child_disconnected => |v| self.childDisconnected(v), .desktop_notification => |notification| { if (!self.config.desktop_notifications) { @@ -1311,6 +1322,35 @@ fn childExited(self: *Surface, info: apprt.surface.Message.ChildExited) void { self.close(); } +fn childDisconnected(self: *Surface, info: apprt.surface.Message.ChildExited) void { + self.child_exited = true; + + log.warn("persistent backend disconnected unexpectedly", .{}); + + if (self.rt_app.performAction( + .{ .surface = self }, + .show_child_exited, + info, + ) catch |err| gui: { + log.err("error trying to show native child disconnected GUI err={}", .{err}); + break :gui false; + }) return; + + switch (self.io.backend) { + .zmx => |*zmx| { + self.renderer_state.mutex.lock(); + defer self.renderer_state.mutex.unlock(); + const t: *terminal.Terminal = self.renderer_state.terminal; + zmx.childExitedAbnormally(self.alloc, t, info.exit_code, info.runtime_ms) catch |err| { + log.err("error handling zmx backend disconnect err={}", .{err}); + }; + }, + else => self.childExitedAbnormally(info) catch |err| { + log.err("error handling backend disconnect err={}", .{err}); + }, + } +} + /// Called when the child process exited abnormally. fn childExitedAbnormally( self: *Surface, @@ -1324,6 +1364,7 @@ fn childExitedAbnormally( const command = switch (self.io.backend) { .exec => |*exec| try std.mem.join(alloc, " ", exec.subprocess.args), .manual => "manual backend", + .zmx => |*zmx| try std.fmt.allocPrint(alloc, "zmx session: {s}", .{zmx.session_name}), }; const runtime_str = try std.fmt.allocPrint(alloc, "{d} ms", .{info.runtime_ms}); diff --git a/src/apprt/embedded.zig b/src/apprt/embedded.zig index 76e65abf20..292110adca 100644 --- a/src/apprt/embedded.zig +++ b/src/apprt/embedded.zig @@ -478,6 +478,18 @@ pub const Surface = struct { /// Userdata passed to io_write_cb. io_write_userdata: ?*anyopaque = null, + + /// zmx session name. When non-null, the surface connects to a zmx + /// daemon session instead of spawning a new shell process. + zmx_session: ?[*:0]const u8 = null, + + /// Whether to create the zmx session if it doesn't exist. + zmx_create: bool = true, + + /// Whether this surface should use zmx mode. When true and + /// zmx_session is null, a session name is auto-generated. + /// This is how zmx mode propagates across splits/tabs. + zmx_mode: bool = false, }; pub fn init(self: *Surface, app: *App, opts: Options) !void { @@ -586,6 +598,29 @@ pub const Surface = struct { config.@"wait-after-command" = true; } + // zmx mode: explicit session name takes priority, then zmx_mode flag. + // An explicit empty string clears any inherited session before deciding + // whether to auto-generate one from zmx_mode. + const zmx_session = if (opts.zmx_session) |c_session| blk: { + const session = std.mem.sliceTo(c_session, 0); + break :blk if (session.len > 0) session else null; + } else null; + if (opts.zmx_session != null and zmx_session == null) { + config.@"zmx-session" = null; + } + + if (zmx_session) |session| { + config.@"zmx-session" = session; + config.@"zmx-create" = opts.zmx_create; + } else if (opts.zmx_mode) { + // Inherited zmx mode — auto-generate a unique session name + const zmx_alloc = config.arenaAlloc(); + const uuid = std.crypto.random.int(u128); + const session = try std.fmt.allocPrint(zmx_alloc, "cmux-{x}", .{uuid}); + config.@"zmx-session" = session; + config.@"zmx-create" = true; + } + // Initialize our surface right away. We're given a view that is // ready to use. try self.core_surface.init( @@ -969,6 +1004,11 @@ pub const Surface = struct { break :wd self.app.core_app.alloc.dupeZ(u8, cwd) catch null; }; + // Inherit zmx mode: if this surface uses zmx, new surfaces should too. + // Each new surface gets its own fresh zmx session (name auto-generated + // in Surface.init when zmx_mode=true and zmx_session=null). + const zmx_mode: bool = self.core_surface.io.backend == .zmx; + return .{ .font_size = font_size, .working_directory = working_directory, @@ -976,6 +1016,7 @@ pub const Surface = struct { .io_mode = self.io_mode, .io_write_cb = self.io_write_cb, .io_write_userdata = self.io_write_userdata, + .zmx_mode = zmx_mode, }; } diff --git a/src/apprt/surface.zig b/src/apprt/surface.zig index 5c25281c8d..daf297b2d0 100644 --- a/src/apprt/surface.zig +++ b/src/apprt/surface.zig @@ -51,6 +51,10 @@ pub const Message = union(enum) { /// command are given in the `ChildExited` struct. child_exited: ChildExited, + /// A persistent backend disconnected unexpectedly. This is always treated + /// as an abnormal failure regardless of runtime. + child_disconnected: ChildExited, + /// Show a desktop notification. desktop_notification: struct { /// Desktop notification title. diff --git a/src/config/Config.zig b/src/config/Config.zig index 29a45786fc..ceed84ca8d 100644 --- a/src/config/Config.zig +++ b/src/config/Config.zig @@ -1534,6 +1534,16 @@ class: ?[:0]const u8 = null, /// * `inherit` - The working directory of the launching process. @"working-directory": ?[]const u8 = null, +/// Connect to a zmx daemon session instead of spawning a new shell process. +/// When set, the surface connects to the named zmx session over a Unix +/// domain socket. The zmx daemon owns the PTY and persists independently +/// of the surface, enabling session persistence across restarts. +@"zmx-session": ?[]const u8 = null, + +/// Whether to create the zmx session if it doesn't already exist. +/// Only applies when `zmx-session` is set. Default: true. +@"zmx-create": bool = true, + /// Key bindings. The format is `trigger=action`. Duplicate triggers will /// overwrite previously set values. The list of actions is available in /// the documentation or using the `ghostty +list-actions` command. @@ -10597,6 +10607,40 @@ test "compatibility: gtk-single-instance desktop" { } } +test "parse zmx config defaults and override" { + const testing = std.testing; + const alloc = testing.allocator; + + { + var cfg = try Config.default(alloc); + defer cfg.deinit(); + + var it: TestIterator = .{ .data = &.{ + "--zmx-session=session-1", + } }; + try cfg.loadIter(alloc, &it); + try cfg.finalize(); + + try testing.expectEqualStrings("session-1", cfg.@"zmx-session".?); + try testing.expect(cfg.@"zmx-create"); + } + + { + var cfg = try Config.default(alloc); + defer cfg.deinit(); + + var it: TestIterator = .{ .data = &.{ + "--zmx-session=session-1", + "--zmx-create=false", + } }; + try cfg.loadIter(alloc, &it); + try cfg.finalize(); + + try testing.expectEqualStrings("session-1", cfg.@"zmx-session".?); + try testing.expect(!cfg.@"zmx-create"); + } +} + test "compatibility: removed cursor-invert-fg-bg" { const testing = std.testing; const alloc = testing.allocator; diff --git a/src/termio.zig b/src/termio.zig index 471ca7b4cb..9c588ba638 100644 --- a/src/termio.zig +++ b/src/termio.zig @@ -27,6 +27,7 @@ pub const Exec = @import("termio/Exec.zig"); pub const Manual = manual.Manual; pub const ManualConfig = manual.Config; pub const ManualThreadData = manual.ThreadData; +pub const Zmx = @import("termio/Zmx.zig"); pub const Options = @import("termio/Options.zig"); pub const Termio = @import("termio/Termio.zig"); pub const Thread = @import("termio/Thread.zig"); diff --git a/src/termio/Zmx.zig b/src/termio/Zmx.zig new file mode 100644 index 0000000000..d39f2755d5 --- /dev/null +++ b/src/termio/Zmx.zig @@ -0,0 +1,912 @@ +//! Zmx implements a termio backend that connects to a zmx daemon session +//! over a Unix domain socket instead of spawning a direct PTY subprocess. +//! The zmx daemon owns the PTY and persists independently of the surface, +//! enabling session persistence across restarts. +const Zmx = @This(); + +const std = @import("std"); +const builtin = @import("builtin"); +const Allocator = std.mem.Allocator; +const posix = std.posix; +const xev = @import("../global.zig").xev; +const fastmem = @import("../fastmem.zig"); +const internal_os = @import("../os/main.zig"); +const renderer = @import("../renderer.zig"); +const terminal = @import("../terminal/main.zig"); +const termio = @import("../termio.zig"); +const SegmentedPool = @import("../datastruct/main.zig").SegmentedPool; + +const log = std.log.scoped(.io_zmx); + +// ────────────────────────────────────────────────────────────────────── +// IPC protocol — matches zmx ipc.zig exactly +// ────────────────────────────────────────────────────────────────────── + +pub const IpcTag = enum(u8) { + Input = 0, + Output = 1, + Resize = 2, + Detach = 3, + DetachAll = 4, + Kill = 5, + Info = 6, + Init = 7, + History = 8, + Run = 9, + Ack = 10, +}; + +pub const IpcHeader = packed struct { + tag: IpcTag, + len: u32, +}; + +pub const IpcResize = packed struct { + rows: u16, + cols: u16, +}; + +const DisconnectMetadata = struct { + exit_code: u32, + runtime_ms: u64, +}; + +const MAX_IPC_PAYLOAD = 4 * 1024 * 1024; +const INVALID_FD: posix.fd_t = -1; + +fn ipcSendSync(fd: posix.fd_t, tag: IpcTag, data: []const u8) !void { + const header = IpcHeader{ + .tag = tag, + .len = @intCast(data.len), + }; + const header_bytes = std.mem.asBytes(&header); + try writeAllSync(fd, header_bytes); + if (data.len > 0) { + try writeAllSync(fd, data); + } +} + +fn writeAllSync(fd: posix.fd_t, data: []const u8) !void { + var index: usize = 0; + while (index < data.len) { + const n = posix.write(fd, data[index..]) catch |err| switch (err) { + error.WouldBlock => { + var pollfd = [1]posix.pollfd{.{ + .fd = fd, + .events = posix.POLL.OUT, + .revents = 0, + }}; + _ = try posix.poll(&pollfd, -1); + continue; + }, + error.Interrupted => continue, + else => return err, + }; + if (n == 0) return error.DiskQuota; + index += n; + } +} + +const IpcSocketMsg = struct { + header: IpcHeader, + payload: []const u8, +}; + +const SocketBuffer = struct { + buf: std.ArrayListUnmanaged(u8), + alloc: Allocator, + head: usize, + + fn init(alloc: Allocator) !SocketBuffer { + var buf = std.ArrayListUnmanaged(u8){}; + try buf.ensureTotalCapacity(alloc, 4096); + return .{ + .buf = buf, + .alloc = alloc, + .head = 0, + }; + } + + fn deinit(self: *SocketBuffer) void { + self.buf.deinit(self.alloc); + } + + /// Read from fd into buffer. Returns bytes read (0 = EOF). + fn read(self: *SocketBuffer, fd: posix.fd_t) !usize { + // Compact: shift unprocessed data to front + if (self.head > 0) { + const remaining = self.buf.items.len - self.head; + if (remaining > 0) { + std.mem.copyForwards(u8, self.buf.items[0..remaining], self.buf.items[self.head..]); + self.buf.items.len = remaining; + } else { + self.buf.clearRetainingCapacity(); + } + self.head = 0; + } + + var tmp: [4096]u8 = undefined; + const n = try posix.read(fd, &tmp); + if (n > 0) { + try self.buf.appendSlice(self.alloc, tmp[0..n]); + } + return n; + } + + /// Returns next complete IPC message or null. + fn next(self: *SocketBuffer) error{InvalidIpcFrame}!?IpcSocketMsg { + const available = self.buf.items[self.head..]; + const hdr_size = @sizeOf(IpcHeader); + if (available.len < hdr_size) return null; + + const hdr = std.mem.bytesToValue(IpcHeader, available[0..hdr_size]); + const payload_len: usize = @intCast(hdr.len); + if (payload_len > MAX_IPC_PAYLOAD) return error.InvalidIpcFrame; + + const total = hdr_size + payload_len; + if (available.len < total) return null; + + const pay = available[hdr_size..total]; + self.head += total; + return .{ .header = hdr, .payload = pay }; + } +}; + +// ────────────────────────────────────────────────────────────────────── +// Zmx backend state +// ────────────────────────────────────────────────────────────────────── + +session_name: [:0]const u8, +socket_dir: []const u8, +create_if_missing: bool, +working_directory: ?[]const u8, +grid_size: renderer.GridSize = .{}, +screen_size: renderer.ScreenSize = .{ .width = 0, .height = 0 }, +socket_fd: ?posix.fd_t = null, +alloc: Allocator, +loop: ?*xev.Loop = null, +thread_data: ?*ThreadData = null, +arena: std.heap.ArenaAllocator, + +pub const Config = struct { + session_name: []const u8, + create_if_missing: bool = true, + working_directory: ?[]const u8 = null, +}; + +/// Initialize zmx backend state. Does NOT connect — connection happens +/// in threadEnter on the IO thread. +pub fn init( + alloc: Allocator, + cfg: Config, +) !Zmx { + var arena = std.heap.ArenaAllocator.init(alloc); + errdefer arena.deinit(); + const arena_alloc = arena.allocator(); + + const session_name = try arena_alloc.dupeZ(u8, cfg.session_name); + + // Resolve socket directory: $ZMX_DIR > $XDG_RUNTIME_DIR/zmx > $TMPDIR/zmx-{uid} + const socket_dir = try resolveSocketDir(arena_alloc); + + // Validate socket path length + const full_path = try std.fmt.allocPrint(arena_alloc, "{s}/{s}", .{ socket_dir, session_name }); + _ = std.net.Address.initUnix(full_path) catch { + return error.SocketPathTooLong; + }; + + // Only session creation requires a local zmx binary. Attach-only mode can + // still connect to an already-running daemon session without it. + if (cfg.create_if_missing and !findZmxBinary()) { + return error.ZmxNotFound; + } + + const working_directory = if (cfg.working_directory) |wd| + try arena_alloc.dupe(u8, wd) + else + null; + + return .{ + .session_name = session_name, + .socket_dir = socket_dir, + .create_if_missing = cfg.create_if_missing, + .working_directory = working_directory, + .alloc = alloc, + .arena = arena, + }; +} + +pub fn deinit(self: *Zmx) void { + if (self.socket_fd) |fd| posix.close(fd); + self.arena.deinit(); +} + +pub fn initTerminal(self: *Zmx, term: *terminal.Terminal) void { + if (self.working_directory) |wd| term.setPwd(wd) catch |err| { + log.warn("error setting initial pwd err={}", .{err}); + }; + + self.resize(.{ + .columns = term.cols, + .rows = term.rows, + }, .{ + .width = term.width_px, + .height = term.height_px, + }) catch unreachable; +} + +pub fn threadEnter( + self: *Zmx, + alloc: Allocator, + io: *termio.Termio, + td: *termio.Termio.ThreadData, +) !void { + const start = try std.time.Instant.now(); + + const socket_path = try std.fmt.allocPrint( + self.arena.allocator(), + "{s}/{s}", + .{ self.socket_dir, self.session_name }, + ); + + // Session creation / readiness probing if needed. + const had_ready_socket = socketReady(socket_path); + if (self.create_if_missing and !had_ready_socket) { + try self.createSession(socket_path); + } else if (!had_ready_socket) { + try waitForSocketReady(socket_path, 20, 50); + } + + // Connect to Unix domain socket + const sock = try posix.socket( + posix.AF.UNIX, + posix.SOCK.STREAM | posix.SOCK.CLOEXEC, + 0, + ); + errdefer posix.close(sock); + + const addr = try std.net.Address.initUnix(socket_path); + try posix.connect(sock, &addr.any, addr.getOsSockLen()); + + // Send Init with terminal dimensions + const init_resize = IpcResize{ + .rows = @intCast(self.grid_size.rows), + .cols = @intCast(self.grid_size.columns), + }; + try ipcSendSync(sock, .Init, std.mem.asBytes(&init_resize)); + + // Send Resize immediately after Init (zmx 0.3.0 requirement) + try ipcSendSync(sock, .Resize, std.mem.asBytes(&init_resize)); + + // Create quit pipe for read thread signaling + const pipe = try internal_os.pipe(); + errdefer posix.close(pipe[0]); + errdefer posix.close(pipe[1]); + + // Setup write stream on socket fd + var stream = xev.Stream.initFd(sock); + errdefer stream.deinit(); + + // Allocate shutdown flag (heap-allocated for stable pointer to read thread) + const shutting_down = try alloc.create(std.atomic.Value(bool)); + shutting_down.* = std.atomic.Value(bool).init(false); + errdefer alloc.destroy(shutting_down); + + // Spawn read thread + const read_thread = try std.Thread.spawn( + .{}, + ReadThread.threadMain, + .{ sock, io, pipe[0], shutting_down, start }, + ); + read_thread.setName("zmx-reader") catch {}; + + // Set ThreadData — ownership transfers here, cancel errdefers above + td.backend = .{ .zmx = .{ + .start = start, + .write_stream = stream, + .read_thread = read_thread, + .read_thread_pipe = pipe[1], + .socket_fd = sock, + .shutting_down = shutting_down, + } }; + self.socket_fd = sock; + self.loop = td.loop; + self.thread_data = &td.backend.zmx; +} + +pub fn threadExit(self: *Zmx, td: *termio.Termio.ThreadData) void { + const zmx_td = &td.backend.zmx; + + // Signal read thread that upcoming EOF from Detach is expected + zmx_td.shutting_down.store(true, .release); + + // The xev loop is already stopped here, so Detach must be written + // synchronously instead of via the queued writer path. + if (zmx_td.socket_fd != INVALID_FD) { + ipcSendSync(zmx_td.socket_fd, .Detach, &.{}) catch |err| { + log.warn("error sending detach err={}", .{err}); + }; + } + + // Signal and join read thread + _ = posix.write(zmx_td.read_thread_pipe, "x") catch |err| switch (err) { + error.BrokenPipe => {}, + else => log.warn("error writing to read thread quit pipe err={}", .{err}), + }; + zmx_td.read_thread.join(); + if (zmx_td.socket_fd != INVALID_FD) { + posix.close(zmx_td.socket_fd); + zmx_td.socket_fd = INVALID_FD; + } + self.socket_fd = null; + self.loop = null; + self.thread_data = null; +} + +pub fn focusGained( + self: *Zmx, + td: *termio.Termio.ThreadData, + focused: bool, +) !void { + // No-op: zmx doesn't own the PTY, no termios state to poll + _ = self; + _ = td; + _ = focused; +} + +pub fn resize( + self: *Zmx, + grid_size: renderer.GridSize, + screen_size: renderer.ScreenSize, +) !void { + self.grid_size = grid_size; + self.screen_size = screen_size; + + if (self.socket_fd != null) { + const resize_msg = IpcResize{ + .rows = @intCast(grid_size.rows), + .cols = @intCast(grid_size.columns), + }; + self.queueControlFrame(.Resize, std.mem.asBytes(&resize_msg)) catch |err| { + log.warn("error sending resize err={}", .{err}); + }; + } +} + +fn queueControlFrame(self: *Zmx, tag: IpcTag, data: []const u8) !void { + const zmx_td = self.thread_data orelse return error.NotOpenForWriting; + const loop = self.loop orelse return error.NotOpenForWriting; + const req = try zmx_td.write_req_pool.getGrow(self.alloc); + const buf = try zmx_td.write_buf_pool.getGrow(self.alloc); + const hdr_size = @sizeOf(IpcHeader); + const total_len = hdr_size + data.len; + if (total_len > buf.len) return error.MessageTooBig; + + const header = IpcHeader{ + .tag = tag, + .len = @intCast(data.len), + }; + @memcpy(buf[0..hdr_size], std.mem.asBytes(&header)); + @memcpy(buf[hdr_size..total_len], data); + + zmx_td.write_stream.queueWrite( + loop, + &zmx_td.write_queue, + req, + .{ .slice = buf[0..total_len] }, + termio.Zmx.ThreadData, + zmx_td, + ttyWrite, + ); +} + +pub fn queueWrite( + self: *Zmx, + alloc: Allocator, + td: *termio.Termio.ThreadData, + data: []const u8, + linefeed: bool, +) !void { + _ = self; + const zmx_td = &td.backend.zmx; + + // Chunk data through write pool, wrapping each chunk in IPC Input message + var i: usize = 0; + while (i < data.len) { + const req = try zmx_td.write_req_pool.getGrow(alloc); + const buf = try zmx_td.write_buf_pool.getGrow(alloc); + + // Reserve space for IPC header at the beginning of each buffer + const hdr_size = @sizeOf(IpcHeader); + const payload_buf = buf[hdr_size..]; + + const payload_len: usize = payload_len: { + const max = @min(data.len, i + payload_buf.len); + + if (!linefeed) { + fastmem.copy(u8, payload_buf, data[i..max]); + const len = max - i; + i = max; + break :payload_len len; + } + + // Slow path: replace \r with \r\n + var buf_i: usize = 0; + while (i < data.len and buf_i < payload_buf.len - 1) { + const ch = data[i]; + i += 1; + + if (ch != '\r') { + payload_buf[buf_i] = ch; + buf_i += 1; + continue; + } + + payload_buf[buf_i] = '\r'; + payload_buf[buf_i + 1] = '\n'; + buf_i += 2; + } + + break :payload_len buf_i; + }; + + // Write IPC header into the reserved space + const header = IpcHeader{ + .tag = .Input, + .len = @intCast(payload_len), + }; + const header_bytes = std.mem.asBytes(&header); + @memcpy(buf[0..hdr_size], header_bytes); + + const total_len = hdr_size + payload_len; + + zmx_td.write_stream.queueWrite( + td.loop, + &zmx_td.write_queue, + req, + .{ .slice = buf[0..total_len] }, + termio.Zmx.ThreadData, + zmx_td, + ttyWrite, + ); + } +} + +fn ttyWrite( + td_: ?*ThreadData, + _: *xev.Loop, + _: *xev.Completion, + _: xev.Stream, + _: xev.WriteBuffer, + r: xev.WriteError!usize, +) xev.CallbackAction { + const td = td_.?; + td.write_req_pool.put(); + td.write_buf_pool.put(); + + _ = r catch |err| { + log.err("write error: {}", .{err}); + return .disarm; + }; + + return .disarm; +} + +pub fn childExitedAbnormally( + self: *Zmx, + gpa: Allocator, + t: *terminal.Terminal, + exit_code: u32, + runtime_ms: u64, +) !void { + _ = exit_code; + _ = runtime_ms; + + var arena = std.heap.ArenaAllocator.init(gpa); + defer arena.deinit(); + const alloc = arena.allocator(); + + const session_info = try std.fmt.allocPrint(alloc, "zmx session: {s}", .{self.session_name}); + + // Move cursor to column 0 + t.carriageReturn(); + try t.setAttribute(.{ .unset = {} }); + + // If there is content, add a separator + const viewport_str = try t.plainString(alloc); + if (viewport_str.len > 0) { + try t.linefeed(); + for (0..t.cols) |_| try t.print(0x2501); + t.carriageReturn(); + try t.linefeed(); + try t.linefeed(); + } + + // Output error message + try t.setAttribute(.{ .@"8_fg" = .bright_red }); + try t.setAttribute(.{ .bold = {} }); + try t.printString("zmx session disconnected unexpectedly:"); + try t.setAttribute(.{ .unset = {} }); + + t.carriageReturn(); + try t.linefeed(); + try t.linefeed(); + try t.printString(session_info); + try t.setAttribute(.{ .unset = {} }); + + t.carriageReturn(); + try t.linefeed(); + try t.linefeed(); + try t.printString("The zmx daemon session may have exited or the socket was removed."); + try t.setAttribute(.{ .unset = {} }); +} + +// ────────────────────────────────────────────────────────────────────── +// ThreadData +// ────────────────────────────────────────────────────────────────────── + +pub const ThreadData = struct { + const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5); + + start: std.time.Instant, + write_stream: xev.Stream, + write_req_pool: SegmentedPool(xev.WriteRequest, WRITE_REQ_PREALLOC) = .{}, + write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{}, + write_queue: xev.WriteQueue = .{}, + read_thread: std.Thread, + read_thread_pipe: posix.fd_t, + socket_fd: posix.fd_t, + + /// Heap-allocated so the read thread has a stable pointer independent + /// of ThreadData. Set to true by threadExit before sending Detach. + /// The read thread checks this on EOF to suppress .child_exited for + /// planned detach. + shutting_down: *std.atomic.Value(bool), + + pub fn deinit(self: *ThreadData, alloc: Allocator) void { + posix.close(self.read_thread_pipe); + if (self.socket_fd != INVALID_FD) { + posix.close(self.socket_fd); + self.socket_fd = INVALID_FD; + } + self.write_req_pool.deinit(alloc); + self.write_buf_pool.deinit(alloc); + self.write_stream.deinit(); + alloc.destroy(self.shutting_down); + } +}; + +// ────────────────────────────────────────────────────────────────────── +// ReadThread +// ────────────────────────────────────────────────────────────────────── + +const ReadThread = struct { + fn threadMain( + socket_fd: posix.fd_t, + io: *termio.Termio, + quit: posix.fd_t, + shutting_down: *std.atomic.Value(bool), + start: std.time.Instant, + ) void { + defer posix.close(quit); + + if (builtin.os.tag.isDarwin()) { + internal_os.macos.pthread_setname_np(&"zmx-reader".*); + } + + // Set socket to non-blocking for tight read loop + if (posix.fcntl(socket_fd, posix.F.GETFL, 0)) |flags| { + _ = posix.fcntl( + socket_fd, + posix.F.SETFL, + flags | @as(u32, @bitCast(posix.O{ .NONBLOCK = true })), + ) catch |err| { + log.warn("zmx read thread failed to set non-blocking err={}", .{err}); + }; + } else |err| { + log.warn("zmx read thread failed to get flags err={}", .{err}); + } + + // Poll both socket and quit pipe + var pollfds: [2]posix.pollfd = .{ + .{ .fd = socket_fd, .events = posix.POLL.IN, .revents = undefined }, + .{ .fd = quit, .events = posix.POLL.IN, .revents = undefined }, + }; + + var sock_buf = SocketBuffer.init(std.heap.c_allocator) catch { + log.err("zmx read thread failed to allocate socket buffer", .{}); + return; + }; + defer sock_buf.deinit(); + + while (true) { + // Tight read loop — read and dispatch as many messages as possible + while (true) { + const n = sock_buf.read(socket_fd) catch |err| { + switch (err) { + error.WouldBlock => break, + error.ConnectionResetByPeer, + error.NotOpenForReading, + => { + handleDisconnect(io, shutting_down, start); + return; + }, + else => { + log.err("zmx read error err={}", .{err}); + handleDisconnect(io, shutting_down, start); + return; + }, + } + }; + + // EOF — socket closed + if (n == 0) { + handleDisconnect(io, shutting_down, start); + return; + } + + // Dispatch all complete messages + while (true) { + const next_msg = sock_buf.next() catch |err| { + log.err("zmx invalid frame err={}", .{err}); + handleDisconnect(io, shutting_down, start); + return; + }; + const msg = next_msg orelse break; + switch (msg.header.tag) { + .Output => { + @call(.always_inline, termio.Termio.processOutput, .{ io, msg.payload }); + }, + .Ack => { + log.debug("zmx ack received", .{}); + }, + else => { + log.debug("zmx unexpected tag={}", .{msg.header.tag}); + }, + } + } + } + + // Wait for data + _ = posix.poll(&pollfds, -1) catch |err| { + log.warn("zmx poll failed, exiting read thread err={}", .{err}); + return; + }; + + // Check quit signal + if (pollfds[1].revents & posix.POLL.IN != 0) { + log.info("zmx read thread got quit signal", .{}); + return; + } + + // Check socket HUP + if (pollfds[0].revents & posix.POLL.HUP != 0) { + handleDisconnect(io, shutting_down, start); + return; + } + } + } + + fn handleDisconnect( + io: *termio.Termio, + shutting_down: *std.atomic.Value(bool), + start: std.time.Instant, + ) void { + if (shutting_down.load(.acquire)) { + // Planned detach — just exit quietly + log.info("zmx read thread: planned detach, exiting", .{}); + return; + } + + // Unexpected disconnect — notify surface + log.warn("zmx session disconnected unexpectedly", .{}); + const meta = disconnectMetadata(start); + _ = io.surface_mailbox.push(.{ + .child_disconnected = .{ + .exit_code = meta.exit_code, + .runtime_ms = meta.runtime_ms, + }, + }, .{ .forever = {} }); + } +}; + +// ────────────────────────────────────────────────────────────────────── +// Helpers +// ────────────────────────────────────────────────────────────────────── + +fn resolveSocketDir(alloc: Allocator) ![]const u8 { + return resolveSocketDirWithEnv( + alloc, + std.posix.getenv("ZMX_DIR"), + std.posix.getenv("XDG_RUNTIME_DIR"), + std.posix.getenv("TMPDIR"), + ); +} + +fn resolveSocketDirWithEnv( + alloc: Allocator, + zmx_dir: ?[]const u8, + xdg_runtime_dir: ?[]const u8, + tmpdir: ?[]const u8, +) ![]const u8 { + // Priority: $ZMX_DIR > $XDG_RUNTIME_DIR/zmx > $TMPDIR/zmx-{uid} + if (zmx_dir) |dir| return try alloc.dupe(u8, dir); + if (xdg_runtime_dir) |dir| return try std.fmt.allocPrint(alloc, "{s}/zmx", .{dir}); + return try std.fmt.allocPrint(alloc, "{s}/zmx-{d}", .{ tmpdir orelse "/tmp", std.c.getuid() }); +} + +fn findZmxBinary() bool { + const path_env = std.posix.getenv("PATH") orelse return false; + var it = std.mem.tokenizeScalar(u8, path_env, ':'); + while (it.next()) |dir| { + var buf: [std.fs.max_path_bytes]u8 = undefined; + const full = std.fmt.bufPrint(&buf, "{s}/zmx", .{dir}) catch continue; + std.fs.accessAbsolute(full, .{}) catch continue; + return true; + } + return false; +} + +fn socketReady(socket_path: []const u8) bool { + const sock = posix.socket( + posix.AF.UNIX, + posix.SOCK.STREAM | posix.SOCK.CLOEXEC, + 0, + ) catch return false; + defer posix.close(sock); + + const addr = std.net.Address.initUnix(socket_path) catch return false; + posix.connect(sock, &addr.any, addr.getOsSockLen()) catch return false; + return true; +} + +fn waitForSocketReady(socket_path: []const u8, max_attempts: usize, sleep_ms: u64) !void { + for (0..max_attempts) |_| { + if (socketReady(socket_path)) return; + std.Thread.sleep(sleep_ms * std.time.ns_per_ms); + } + return error.ZmxSessionTimeout; +} + +fn createSession(self: *Zmx, socket_path: []const u8) !void { + // Spawn `zmx run {session_name}` + var argv = [_]?[*:0]const u8{ "zmx", "run", self.session_name.ptr, null }; + const pid = try std.posix.fork(); + if (pid == 0) { + // Child: set working directory if provided + if (self.working_directory) |wd| { + std.posix.chdir(wd) catch {}; + } + // Exec zmx + std.posix.execvpeZ( + "zmx", + @ptrCast(&argv), + @ptrCast(std.c.environ), + ) catch {}; + std.posix.exit(1); + } + + var reaped = false; + errdefer if (!reaped) { + // If we fail in this function, ensure we don't leave a zombie behind. + _ = posix.waitpid(pid, 0); + }; + + // Parent: wait for socket readiness (100ms intervals, 5s timeout) + const max_attempts: usize = 50; + for (0..max_attempts) |_| { + std.Thread.sleep(100 * std.time.ns_per_ms); + + // Reap child if it has exited so we don't create zombies. + if (!reaped) { + const res = posix.waitpid(pid, std.c.W.NOHANG); + if (res.pid != 0) reaped = true; + } + + if (socketReady(socket_path)) { + if (!reaped) { + const res = posix.waitpid(pid, std.c.W.NOHANG); + if (res.pid != 0) reaped = true; + } + return; + } + } + + if (!reaped) { + // Timed out waiting for a socket: terminate/reap launcher process. + posix.kill(pid, posix.SIG.TERM) catch {}; + _ = posix.waitpid(pid, 0); + reaped = true; + } + + return error.ZmxSessionTimeout; +} + +fn disconnectMetadata(start: std.time.Instant) DisconnectMetadata { + const runtime_ms: u64 = runtime: { + const end = std.time.Instant.now() catch break :runtime 0; + break :runtime end.since(start) / std.time.ns_per_ms; + }; + + return .{ + .exit_code = 1, + .runtime_ms = runtime_ms, + }; +} + +// ────────────────────────────────────────────────────────────────────── +// Tests +// ────────────────────────────────────────────────────────────────────── + +test "IPC header serialization round-trip" { + const header = IpcHeader{ .tag = .Input, .len = 42 }; + const bytes = std.mem.asBytes(&header); + const decoded = std.mem.bytesToValue(IpcHeader, bytes); + try std.testing.expectEqual(header.tag, decoded.tag); + try std.testing.expectEqual(header.len, decoded.len); +} + +test "IPC header size is 8 bytes packed" { + // Zig 0.15 rounds this packed struct up to 8 bytes. + try std.testing.expectEqual(@as(usize, 8), @sizeOf(IpcHeader)); +} + +test "SocketBuffer rejects oversized IPC payloads" { + const testing = std.testing; + var sock_buf = try SocketBuffer.init(testing.allocator); + defer sock_buf.deinit(); + + const header = IpcHeader{ + .tag = .Output, + .len = MAX_IPC_PAYLOAD + 1, + }; + try sock_buf.buf.appendSlice(testing.allocator, std.mem.asBytes(&header)); + + try testing.expectError(error.InvalidIpcFrame, sock_buf.next()); +} + +test "socket path resolution with ZMX_DIR" { + const alloc = std.testing.allocator; + const dir = try resolveSocketDirWithEnv(alloc, "/tmp/custom-zmx", null, null); + defer alloc.free(dir); + try std.testing.expectEqualStrings("/tmp/custom-zmx", dir); +} + +test "socket path length validation" { + const alloc = std.testing.allocator; + + // A reasonable path should succeed + const short_path = try std.fmt.allocPrint(alloc, "/tmp/zmx/test-session", .{}); + defer alloc.free(short_path); + const addr = std.net.Address.initUnix(short_path); + try std.testing.expect(addr != error.NameTooLong); +} + +test "SocketBuffer accumulation and framing" { + const alloc = std.testing.allocator; + var sock_buf = try SocketBuffer.init(alloc); + defer sock_buf.deinit(); + + // Manually inject a complete message into the buffer + const header = IpcHeader{ .tag = .Output, .len = 5 }; + try sock_buf.buf.appendSlice(alloc, std.mem.asBytes(&header)); + try sock_buf.buf.appendSlice(alloc, "hello"); + + // Should yield exactly one message + const msg = sock_buf.next(); + try std.testing.expect(msg != null); + try std.testing.expectEqual(IpcTag.Output, msg.?.header.tag); + try std.testing.expectEqualStrings("hello", msg.?.payload); + + // No more messages + try std.testing.expect(sock_buf.next() == null); +} + +test "disconnect metadata uses abnormal exit code and runtime" { + const start = try std.time.Instant.now(); + std.Thread.sleep(2 * std.time.ns_per_ms); + + const meta = disconnectMetadata(start); + try std.testing.expectEqual(@as(u32, 1), meta.exit_code); + try std.testing.expect(meta.runtime_ms >= 1); +} diff --git a/src/termio/backend.zig b/src/termio/backend.zig index e3a7927a7b..92b39b3bce 100644 --- a/src/termio/backend.zig +++ b/src/termio/backend.zig @@ -10,7 +10,7 @@ const termio = @import("../termio.zig"); const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5); /// The kinds of backends. -pub const Kind = enum { exec, manual }; +pub const Kind = enum { exec, manual, zmx }; /// Configuration for the various backend types. pub const Config = union(Kind) { @@ -18,6 +18,8 @@ pub const Config = union(Kind) { exec: termio.Exec.Config, /// Manual uses callbacks for writing and accepts output via processOutput. manual: termio.ManualConfig, + /// Zmx connects to a zmx daemon session over a Unix socket. + zmx: termio.Zmx.Config, }; /// Backend implementations. A backend is responsible for owning the pty @@ -25,11 +27,13 @@ pub const Config = union(Kind) { pub const Backend = union(Kind) { exec: termio.Exec, manual: termio.Manual, + zmx: termio.Zmx, pub fn deinit(self: *Backend) void { switch (self.*) { .exec => |*exec| exec.deinit(), .manual => |*manual| manual.deinit(), + .zmx => |*zmx| zmx.deinit(), } } @@ -37,6 +41,7 @@ pub const Backend = union(Kind) { switch (self.*) { .exec => |*exec| exec.initTerminal(t), .manual => |*manual| manual.initTerminal(t), + .zmx => |*zmx| zmx.initTerminal(t), } } @@ -49,6 +54,7 @@ pub const Backend = union(Kind) { switch (self.*) { .exec => |*exec| try exec.threadEnter(alloc, io, td), .manual => |*manual| try manual.threadEnter(alloc, io, td), + .zmx => |*zmx| try zmx.threadEnter(alloc, io, td), } } @@ -56,6 +62,7 @@ pub const Backend = union(Kind) { switch (self.*) { .exec => |*exec| exec.threadExit(td), .manual => |*manual| manual.threadExit(td), + .zmx => |*zmx| zmx.threadExit(td), } } @@ -67,6 +74,7 @@ pub const Backend = union(Kind) { switch (self.*) { .exec => |*exec| try exec.focusGained(td, focused), .manual => |*manual| try manual.focusGained(td, focused), + .zmx => |*zmx| try zmx.focusGained(td, focused), } } @@ -78,6 +86,7 @@ pub const Backend = union(Kind) { switch (self.*) { .exec => |*exec| try exec.resize(grid_size, screen_size), .manual => |*manual| try manual.resize(grid_size, screen_size), + .zmx => |*zmx| try zmx.resize(grid_size, screen_size), } } @@ -91,6 +100,7 @@ pub const Backend = union(Kind) { switch (self.*) { .exec => |*exec| try exec.queueWrite(alloc, td, data, linefeed), .manual => |*manual| try manual.queueWrite(alloc, td, data, linefeed), + .zmx => |*zmx| try zmx.queueWrite(alloc, td, data, linefeed), } } @@ -114,6 +124,12 @@ pub const Backend = union(Kind) { exit_code, runtime_ms, ), + .zmx => |*zmx| try zmx.childExitedAbnormally( + gpa, + t, + exit_code, + runtime_ms, + ), } } }; @@ -122,11 +138,13 @@ pub const Backend = union(Kind) { pub const ThreadData = union(Kind) { exec: termio.Exec.ThreadData, manual: termio.ManualThreadData, + zmx: termio.Zmx.ThreadData, pub fn deinit(self: *ThreadData, alloc: Allocator) void { switch (self.*) { .exec => |*exec| exec.deinit(alloc), .manual => |*manual| manual.deinit(alloc), + .zmx => |*zmx| zmx.deinit(alloc), } }