Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ jobs:
- run: go mod download
- run: go vet ./...
- run: go build ./...
- run: go test -race -coverprofile=coverage.out ./...
# -coverprofile triggers `go: no such tool "covdata"` on Go 1.22 +
# actions/setup-go@v5 for packages without test files. We don't upload
# coverage anywhere, so just run tests with -race.
- run: go test -race ./...

go-lint:
name: golangci-lint
Expand Down
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,38 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Phase 6d — RE-server lifecycle (`xpc ghidra`, `xpc ida`)**:
- `xpc ghidra start [--binary] [--port] [--repo]` /
`xpc ghidra stop`: detached-spawn `ghidraSvr.bat` (default
`C:\ghidra\support\ghidraSvr.bat`) on the VM via the
`os.dup2`-to-NUL + `DETACHED_PROCESS` trick; PID saved to
`C:\xpc\ghidra.runlog.pid`. Stop kills `java.exe` whose command line
matches `%ghidra%` via WMIC + taskkill.
- `xpc ida start [--binary] [--port]` / `xpc ida stop`: same lifecycle
pattern targeting `win32_remote.exe` on port 23946 by default; stop
also matches `dbgsrv.exe`.
- Tunnel deliberately decoupled — users run
`xpc tun -L <port>:127.0.0.1:<port>` to expose the server.
- Shared helpers `buildDetachedSpawnPy` and `killByCmdLineMatch` in
`internal/cli/ghidra.go` are reused by `xpc ida` and ready for future
server-lifecycle commands.

- **Phase 6c — `xpc tun` (bidirectional ARCP streams)**:
- Agent: new `tun.connect` tool opens a VM-side TCP socket and pumps
bytes to the host via `stream.chunk(delta_b64)` envelopes.
- Agent: `Connection._dispatch` now also routes client-sourced
`stream.chunk` and `stream.close` envelopes -- looking up the running
job by `job_id` and writing/half-closing its `tun_socket`. Non-tun jobs
drop the chunks silently.
- Host: `xpc tun -L localPort:vmHost:vmPort` cobra command with a
reader goroutine + forwarder goroutine + write mutex. The forwarder
waits on a `jobReady` channel so its first `stream.chunk` doesn't
race ahead of `job.accepted`.
- Real-VM verified: xpctl ping JSON probe round-trips through
`xpc tun -L 19578:127.0.0.1:9578` to the xpctl agent on the VM.
- First xpc command to exercise both directions of ARCP streaming on a
single job; foundation for future `xpc ghidra/ida` (RE-server tunnels).

- **Phase 5b — SSH-driven bootstrap + agent lifecycle**:
- `internal/sshlife` Go package wrapping `golang.org/x/crypto/ssh`:
password-auth Dial, Run (with timeout + exit-status capture), PutFile /
Expand Down
14 changes: 8 additions & 6 deletions TASKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,10 @@ Each subcommand: branch `subcommand/<name>`, write spec + tests + impl + real-VM
- [ ] `bat push-run` (cp + run combo) — `xpc cp` + `xpc bat run` covers this manually for v0.

### 6.8 `xpc tun -L|-R`
- [ ] ARCP-multiplexed tunnels: each forwarded TCP connection = one ARCP stream.
- [ ] `xpc tun -L 8080:localhost:80` (forward host:8080 → VM:80) and reverse.
- [ ] Real-VM: forward to a process on the VM, hit it from the host.
- [x] Agent-side `tun.connect` tool + dispatch routing for client-sourced `stream.chunk` / `stream.close` to the job's VM socket.
- [x] Host-side `xpc tun -L localPort:vmHost:vmPort` with reader/forwarder/cancel goroutines and a write mutex.
- [x] Real-VM: forwarded `127.0.0.1:19578 -> 127.0.0.1:9578` and round-tripped xpctl's length-prefixed-JSON ping; agent log shows `tun.connect [job=...] -> 127.0.0.1:9578`.
- [ ] `xpc tun -R` (reverse forward) — deferred.

### 6.9 `xpc py`
- [ ] `py run`, `py repl`, `py pip`, `py local` (run local file with client injected).
Expand Down Expand Up @@ -355,9 +356,10 @@ Each subcommand: branch `subcommand/<name>`, write spec + tests + impl + real-VM
- [ ] Real-VM: trace a tiny program, pull the result, verify entries.

### 6.14 `xpc ghidra` / `xpc ida`
- [ ] `ghidra start|stop` — ghidra_server lifecycle + tunnel.
- [ ] `ida start|stop` — IDA remote-debug stub lifecycle + tunnel.
- [ ] Real-VM: start ghidra_server, connect from local Ghidra over the tunnel.
- [x] `xpc ghidra start [--binary] [--port] [--repo]` / `xpc ghidra stop`. Detached spawn via `os.dup2`-to-NUL + `DETACHED_PROCESS`; PID saved to `C:\xpc\ghidra.runlog.pid`. Stop matches `java.exe` with `%ghidra%` in cmdline.
- [x] `xpc ida start [--binary] [--port]` / `xpc ida stop`. Same lifecycle pattern; defaults target `C:\IDA\dbgsrv\win32_remote.exe` on port 23946. Stop matches both `win32_remote.exe` and `dbgsrv.exe`.
- [x] Tunnel decoupled: users run `xpc tun -L <port>:127.0.0.1:<port>` separately.
- [ ] Live verification waits until Ghidra / IDA are installed on the VM.

### Filesystem extras (preserved from xpctl, renamed)

Expand Down
125 changes: 125 additions & 0 deletions agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import absolute_import, print_function

import argparse
import base64
import binascii
import logging
import logging.handlers
Expand Down Expand Up @@ -251,9 +252,86 @@ def tool_agent_info(arguments, ctx, job):
}


def tool_tun_connect(arguments, ctx, job):
"""tun.connect -- open a TCP socket on the VM and bidirectionally pump bytes.

Arguments:
host (str, required) -- target hostname or IP reachable from the VM
port (int, required) -- target port

Behavior:
* Opens a TCP socket to (host, port).
* Stores the socket on the job so the connection's stream.chunk handler
(see Connection._handle_stream_chunk) can write client-sourced bytes
into the VM-side socket.
* Reads from the VM-side socket and emits stream.chunk envelopes
(delta_b64) on a fresh upstream stream until EOF or cancel.
* On exit, closes the VM-side socket and emits stream.close.
"""
target_host = arguments.get("host", "")
target_port = arguments.get("port", 0)
if not target_host:
raise ToolError("INVALID_ARGS", "host required", retryable=False)
try:
target_port = int(target_port)
except (TypeError, ValueError):
raise ToolError("INVALID_ARGS", "port must be an integer", retryable=False)
if target_port <= 0 or target_port > 65535:
raise ToolError("INVALID_ARGS", "port out of range", retryable=False)

log.info("tun.connect [job=%s] -> %s:%d", job.job_id, target_host, target_port)

try:
sock = socket.create_connection((target_host, target_port), timeout=10)
except (OSError, socket.error) as exc:
raise ToolError("TUN_CONNECT_FAILED", str(exc), retryable=False)
sock.settimeout(0.5) # short timeout so we can poll job.cancel between reads

# Expose the socket so Connection._handle_stream_chunk routes inbound
# bytes to it.
job.tun_socket = sock

upstream_id = arcp.new_id(arcp.PREFIX_STREAM)
ctx.emit(arcp.TYPE_STREAM_OPEN,
{"content_type": "application/octet-stream", "channel": "upstream"},
stream_id=upstream_id)

closed_reason = "vm_eof"
try:
while not job.cancel.is_set():
try:
chunk = sock.recv(8192)
except socket.timeout:
continue
except (OSError, socket.error) as exc:
closed_reason = "vm_error: {0}".format(exc)
break
if not chunk:
closed_reason = "vm_eof"
break
ctx.emit(arcp.TYPE_STREAM_CHUNK,
{"delta_b64": base64.b64encode(chunk).decode("ascii")},
stream_id=upstream_id)
finally:
try:
sock.close()
except Exception:
pass
try:
del job.tun_socket
except AttributeError:
pass
ctx.emit(arcp.TYPE_STREAM_CLOSE,
{"reason": closed_reason},
stream_id=upstream_id)

return {"closed": True, "reason": closed_reason}


TOOLS = {
"exec": tool_exec,
"agent.info": tool_agent_info,
"tun.connect": tool_tun_connect,
}


Expand Down Expand Up @@ -330,13 +408,60 @@ def _dispatch(self, envelope):
self._handle_tool_invoke(envelope)
elif ty == arcp.TYPE_CANCEL:
self._handle_cancel(envelope)
elif ty == arcp.TYPE_STREAM_CHUNK:
self._handle_stream_chunk(envelope)
elif ty == arcp.TYPE_STREAM_CLOSE:
self._handle_stream_close(envelope)
else:
self._send_nack(envelope, "unsupported_type",
"type {0!r} not supported".format(ty))
except Exception as exc:
log.exception("dispatch error for type=%s", ty)
self._send_nack(envelope, "invalid_envelope", str(exc))

def _handle_stream_chunk(self, envelope):
"""Route a client-sourced stream.chunk to the matching tun socket.

v0 only honors stream.chunk for jobs that own a tun_socket (i.e.
tun.connect). For exec, stream.chunks flow agent->host only.
"""
job_id = envelope.get("job_id", "")
with self.jobs_lock:
job = self.jobs.get(job_id)
if job is None:
return
sock = getattr(job, "tun_socket", None)
if sock is None:
return
delta_b64 = (envelope.get("payload") or {}).get("delta_b64", "")
if not delta_b64:
return
try:
sock.sendall(base64.b64decode(delta_b64))
except (OSError, socket.error) as exc:
log.debug("tun_socket sendall failed: %s", exc)
try:
sock.close()
except Exception:
pass

def _handle_stream_close(self, envelope):
"""Client signalled end-of-input on its downstream stream; close the
VM-side write half so the upstream pump in tool_tun_connect notices
and exits."""
job_id = envelope.get("job_id", "")
with self.jobs_lock:
job = self.jobs.get(job_id)
if job is None:
return
sock = getattr(job, "tun_socket", None)
if sock is None:
return
try:
sock.shutdown(socket.SHUT_WR)
except Exception:
pass

# ---- handlers ---------------------------------------------------------

def _handle_session_open(self, envelope):
Expand Down
109 changes: 109 additions & 0 deletions docs/sessions/phase-6c-tun.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Phase 6c — `xpc tun` (ARCP bidirectional streams)

**Date:** 2026-05-09
**Branch:** `phase-5/host-cli` (continuing local; cumulative push pending)

---

## What landed

`xpc tun -L localPort:vmHost:vmPort` opens a TCP listener on the host. Each
accepted connection invokes the agent's `tun.connect` tool and bidirectionally
pipes bytes through ARCP `stream.chunk` envelopes (`delta_b64` binary frames).
This is the first command to exercise both directions of streaming on a
single ARCP job.

## Agent-side changes

* New tool `tun.connect`:
* Opens a VM-side TCP socket to `arguments.host:port`.
* Stores the socket on the running `Job` so the connection's stream.chunk
handler can write client-sourced bytes into it.
* Pumps VM-side recv → host via `stream.chunk(delta_b64)` envelopes on a
fresh upstream stream id.
* On EOF / error / cancel, closes the socket and emits `stream.close`.

* `Connection._dispatch` now also routes `stream.chunk` and `stream.close`
envelopes from the client. They look up the matching job and:
* `stream.chunk` -> `job.tun_socket.sendall(base64-decoded delta_b64)`
* `stream.close` -> `job.tun_socket.shutdown(SHUT_WR)` so the upstream pump
notices and exits.

For non-tun jobs the new dispatch branches are no-ops (a job without a
`tun_socket` attribute simply drops the chunk silently).

## Host-side `internal/cli/tun.go`

* Listener on `127.0.0.1:<localPort>`.
* For each accepted local conn, opens an ARCP session, sends
`tool.invoke{tool: "tun.connect", arguments: {host, port}}`, then runs:
* **reader** goroutine: decodes envelopes, routes `stream.chunk` (decode
`delta_b64`, write to local conn), `stream.close` (half-close local),
terminal types (close everything).
* **forwarder** goroutine: waits for `job_id` via a `jobReady` channel,
then loops `local.Read(buf)` and emits `stream.chunk` envelopes on a
fresh downstream stream id. On EOF, emits `stream.close`.
* A single `writeMu` mutex serialises envelope writes so concurrent
goroutines (forwarder + cancel sender) don't interleave bytes on the TLS
conn.

A subtle race surfaced during real-VM testing: the forwarder must wait for
`job.accepted` to populate `job_id` before it sends the first `stream.chunk`.
Without that gate the agent silently drops the chunk because the lookup by
empty `job_id` finds no job. Fixed with a one-shot `jobReady` channel.

## Real-VM verification

```text
$ ./bin/xpc bootstrap # redeploy agent so the VM gets tun.connect
... bootstrap complete ...

$ ./bin/xpc tun -L 19578:127.0.0.1:9578 &
xpc tun: 127.0.0.1:19578 -> 127.0.0.1:9578 (Ctrl-C to stop)

$ python3 - <<'PY'
import json, socket, struct
s = socket.create_connection(("127.0.0.1", 19578), timeout=10)
msg = {"id":"tun-test","type":"request","action":"ping","params":{}}
p = json.dumps(msg).encode()
s.sendall(struct.pack("!I", len(p)) + p)
hdr = s.recv(4)
n = struct.unpack("!I", hdr)[0]
buf = b""
while len(buf) < n:
buf += s.recv(n - len(buf))
print("xpctl response:", json.loads(buf))
PY

xpctl response: {'status': 'ok', 'type': 'response', 'data':
{'uptime': 72791.625, 'pong': True},
'error': None, 'id': 'tun-test'}
```

The probe sent xpctl's length-prefixed-JSON ping through the tunnel and
received the canonical xpctl pong (uptime 72791 s = ~20 hours). Bytes round-
tripped both ways through the agent's ARCP streams.

Agent log shows the dispatch:
```
2026-05-09 12:40:07,484 [INFO] xpc.agent: tun.connect [job=job_ryp...] -> 127.0.0.1:9578
2026-05-09 12:40:12,468 [INFO] xpc.agent: connection end: ('172.16.20.125', 50441)
```

## Phase 6c exit gate: PASSED

- [x] Agent-side `tun.connect` tool with bidirectional stream routing.
- [x] Host-side `xpc tun -L` cobra command with reader/forwarder/cancel pumps.
- [x] Existing test suite (42 Python + Go suites) still green after agent dispatch additions.
- [x] Real-VM verification: xpctl ping over the tunnel.
- [x] Session log captured (this file).

## Still deferred

* `xpc tun -R` (remote-to-local reverse forwarding).
* `xpc dbg attach|run|server` (long-running debugger sessions).
* `xpc trace start|stop|pull` (procmon wrapper).
* `xpc ghidra start/stop`, `xpc ida start/stop` (server lifecycle + tunnel,
now buildable on top of `xpc tun`).
* `xpc snap` (Proxmox API; user input still pending).
* `xpc daemon` (host-side multiplex; latency optimization).
Loading
Loading