From cd9a5cc0e8c82a809edae3c98880c4e43c1ac41e Mon Sep 17 00:00:00 2001 From: irving ou Date: Tue, 24 Feb 2026 12:53:04 -0500 Subject: [PATCH 1/2] chore(release): bump ironposh-web to v0.4.3 - Split session_serial.rs into session_serial/{mod,core}.rs for maintainability - Apply cargo fmt formatting fixes across ironposh-async - Bump ironposh-web version for NPM release --- crates/ironposh-async/src/connection.rs | 24 +- crates/ironposh-async/src/session.rs | 4 +- crates/ironposh-async/src/session_serial.rs | 570 ------------------ .../ironposh-async/src/session_serial/core.rs | 422 +++++++++++++ .../ironposh-async/src/session_serial/mod.rs | 204 +++++++ crates/ironposh-web/Cargo.toml | 2 +- 6 files changed, 646 insertions(+), 580 deletions(-) delete mode 100644 crates/ironposh-async/src/session_serial.rs create mode 100644 crates/ironposh-async/src/session_serial/core.rs create mode 100644 crates/ironposh-async/src/session_serial/mod.rs diff --git a/crates/ironposh-async/src/connection.rs b/crates/ironposh-async/src/connection.rs index 2a6ef55..41761ca 100644 --- a/crates/ironposh-async/src/connection.rs +++ b/crates/ironposh-async/src/connection.rs @@ -35,8 +35,7 @@ async fn run_handshake( let step_result = match step_result { Ok(result) => result, Err(e) => { - let _ = - session_event_tx.unbounded_send(crate::SessionEvent::Error(e.to_string())); + let _ = session_event_tx.unbounded_send(crate::SessionEvent::Error(e.to_string())); return Err(e); } }; @@ -71,9 +70,10 @@ fn build_pipeline_multiplexer( mut pipeline_input_rx: mpsc::Receiver, span_prefix: &'static str, ) -> impl std::future::Future> { - let pipeline_map = Arc::new(futures::lock::Mutex::new( - std::collections::HashMap::>::new(), - )); + let pipeline_map = Arc::new(futures::lock::Mutex::new(std::collections::HashMap::< + uuid::Uuid, + mpsc::Sender, + >::new())); let pipeline_map_clone = Arc::clone(&pipeline_map); @@ -151,7 +151,11 @@ fn build_pipeline_multiplexer( Ok::<(), anyhow::Error>(()) } - .instrument(span!(Level::INFO, "PipelineInputLoop", prefix = user_span_name)); + .instrument(span!( + Level::INFO, + "PipelineInputLoop", + prefix = user_span_name + )); let (x, y) = join!(from_server, from_user); x.and(y) @@ -222,8 +226,12 @@ where .instrument(info_span!("MainTask")); let (pipeline_input_tx, pipeline_input_rx) = mpsc::channel(100); - let multiplex_pipeline_task = - build_pipeline_multiplexer(user_input_tx, server_output_rx, pipeline_input_rx, "Parallel"); + let multiplex_pipeline_task = build_pipeline_multiplexer( + user_input_tx, + server_output_rx, + pipeline_input_rx, + "Parallel", + ); let joined_task = async move { let res = join!(active_session_task, multiplex_pipeline_task); diff --git a/crates/ironposh-async/src/session.rs b/crates/ironposh-async/src/session.rs index 3db7514..542fb61 100644 --- a/crates/ironposh-async/src/session.rs +++ b/crates/ironposh-async/src/session.rs @@ -273,7 +273,9 @@ async fn process_session_outputs( | ActiveSessionOutput::SendAndThenReceive { .. } | ActiveSessionOutput::PendingReceive { .. } => { // This should be handled at the caller level - warn!("SendBack/SendAndThenReceive/PendingReceive should not reach process_session_outputs"); + warn!( + "SendBack/SendAndThenReceive/PendingReceive should not reach process_session_outputs" + ); } ActiveSessionOutput::SendBackError(e) => { error!(target: "session", error = %e, "session step failed"); diff --git a/crates/ironposh-async/src/session_serial.rs b/crates/ironposh-async/src/session_serial.rs deleted file mode 100644 index 7fe0f86..0000000 --- a/crates/ironposh-async/src/session_serial.rs +++ /dev/null @@ -1,570 +0,0 @@ -use std::collections::VecDeque; - -use anyhow::Context; -use futures::channel::mpsc; -use futures::future::Either; -use futures::{FutureExt, SinkExt, StreamExt}; -use ironposh_client_core::connector::active_session::{ActiveSession, UserEvent}; -use ironposh_client_core::connector::{ActiveSessionOutput, UserOperation, conntion_pool::TrySend}; -use ironposh_client_core::host::HostCall; -use ironposh_client_core::runspace_pool::DesiredStream; -use tracing::{debug, error, info, instrument, trace}; - -use crate::{HostResponse, HttpClient}; - -/// Console diagnostic logging for WASM debugging. Uses web_sys::console::error_1 -/// when the wasm-diag feature is enabled (shows in Playwright's [browser:error]). -#[cfg(all(feature = "wasm-diag", target_arch = "wasm32"))] -macro_rules! diag { - ($($arg:tt)*) => { - web_sys::console::error_1(&format!($($arg)*).into()); - }; -} -#[cfg(not(all(feature = "wasm-diag", target_arch = "wasm32")))] -macro_rules! diag { - ($($arg:tt)*) => {}; -} - -/// Serial active session loop using a flat `select!`-based event loop. -/// -/// Unlike the old phased design (Phase 1→2→3→4), this loop uses `futures::select!` -/// with three event sources: in-flight HTTP response, user input, and host-call response. -/// -/// **Core invariant:** At most one HTTP request is in flight at any time, and -/// `accept_client_operation` is only called when the connection is idle (no HTTP in-flight). -/// -/// While an HTTP request is in-flight, user operations and host-call responses are -/// **buffered** (not processed) because `accept_client_operation` may call -/// `ConnectionPool::send()` which would allocate a second connection — violating -/// the single-connection constraint enforced by the Devolutions Gateway. -/// -/// **Key difference from the old design:** `enqueue_output()` never sends HTTP requests -/// inline. It only pushes work items into queues. The promotion logic at the top of each -/// outer loop iteration decides what to send next. -/// -/// **HostCall-aware promotion:** Receives are NOT sent while a HostCall is pending -/// (`host_call_active || !pending_host_calls.is_empty()`). This avoids unnecessary -/// timeout round-trips: the server blocks Receives while waiting for our HostCall response, -/// so sending a Receive before submitting the HostCall response would always time out. -#[expect(clippy::too_many_lines)] -#[instrument(skip_all)] -pub async fn start_serial_session_loop( - first_receive: TrySend, - mut active_session: ActiveSession, - client: impl HttpClient, - mut user_input_rx: mpsc::Receiver, - mut user_output_tx: mpsc::Sender, - host_call_tx: mpsc::UnboundedSender, - mut host_resp_rx: mpsc::UnboundedReceiver, -) -> anyhow::Result<()> { - // Pending HTTP requests to send (Command, HostResponse, Signal, etc.) - // Send operations always take priority over Receive polling. - let mut work_queue: VecDeque = VecDeque::new(); - - // Accumulated Receive streams. Merged from PendingReceive and SendAndThenReceive. - // Only built into an actual TrySend (via `fire_receive`) when there's nothing in - // `work_queue`, no HTTP request in flight, and no pending HostCalls. - let mut deferred_streams: Vec = Vec::new(); - - // Set when a `SendAndThenReceive` contributes runspace-pool-only streams. - // This means the server is expected to respond (e.g., PSRP key exchange - // waiting for EncryptedSessionKey), so we MUST issue a Receive even if no - // pipeline streams are present. Cleared after the Receive is promoted. - let mut runspace_receive_demanded = false; - - // HostCalls waiting to be dispatched to the JS/consumer side. - let mut pending_host_calls: VecDeque = VecDeque::new(); - - // Whether we're currently waiting for a host-call response from JS. - let mut host_call_active: bool = false; - - // User operations buffered during HTTP wait. Processed when connection becomes idle. - let mut pending_user_ops: VecDeque = VecDeque::new(); - - // Seed the work queue with the first Receive from the connector handshake. - work_queue.push_back(first_receive); - - info!("Starting serial session loop (flat event loop, single-connection mode)"); - diag!("DIAG serial loop: started (flat event loop)"); - - loop { - // === Process ONE buffered user op — ONLY when work_queue is empty === - // - // When work_queue is non-empty, at least one TrySend has already called - // ConnectionPool::send() internally, moving the connection to Pending. - // Calling accept_client_operation now would trigger another send() on - // the Pending connection → allocating a second connection and breaking - // the single-connection invariant. - // - // We must drain work_queue first (each item does an HTTP round-trip that - // returns the connection to Idle) before processing any buffered user op. - // - // Only one op per iteration: accept_client_operation may call send() - // itself, putting the connection back into Pending. The resulting - // work_queue item will be sent on the next promotion, returning the - // connection to Idle so the next buffered op can be processed. - if work_queue.is_empty() - && let Some(op) = pending_user_ops.pop_front() - { - diag!("DIAG process buffered: {}", op.operation_type()); - debug!(target: "serial", operation = op.operation_type(), "processing buffered user operation"); - let output = active_session - .accept_client_operation(op) - .context("Failed to accept buffered user operation")?; - enqueue_output( - output, - &mut work_queue, - &mut deferred_streams, - &mut pending_host_calls, - &mut user_output_tx, - &mut runspace_receive_demanded, - ) - .await?; - } - - // Dispatch any queued host calls (channel send only, no HTTP). - try_dispatch_next_host_call(&mut pending_host_calls, &mut host_call_active, &host_call_tx)?; - - // === PROMOTION: pick the next thing to send === - let http_future = if let Some(req) = work_queue.pop_front() { - diag!( - "DIAG promote: sending from work_queue ({} remaining)", - work_queue.len() - ); - trace!(target: "serial", remaining_work = work_queue.len(), "promoting work_queue item"); - Some(client.send_request(req).fuse()) - } else if !deferred_streams.is_empty() - && !host_call_active - && pending_host_calls.is_empty() - { - // Only build Receives when all HostCalls are resolved. The server blocks - // Receives while waiting for our HostCall response, so sending one early - // would always time out (adding an unnecessary OperationTimeout delay). - // - // WinRM Receive schema constraint: only ONE element is - // allowed per Receive request. We pick a single stream — preferring pipeline- - // specific streams (with CommandId) over runspace pool streams (without). - // - // When pipeline streams exist, DROP runspace pool streams entirely. In serial - // mode, a runspace pool Receive with no pending data would block for the full - // OperationTimeout (~15-20s), starving pipeline Receives and HostCall delivery. - // The parallel loop avoids this by running them concurrently; we can't do that. - // - // When ONLY runspace pool streams remain (no pipeline), skip promotion entirely. - // A runspace pool Receive when idle would block for OperationTimeout with no - // useful data, freezing all user operations (including the first command after - // connection). Pipeline-triggered Receives already include runspace pool data, - // so nothing is lost. - let has_pipeline = deferred_streams.iter().any(|s| s.command_id().is_some()); - if has_pipeline { - deferred_streams.retain(|s| s.command_id().is_some()); - let stream = deferred_streams.remove(0); - - diag!( - "DIAG promote: Receive for pipeline stream ({} deferred remaining)", - deferred_streams.len() - ); - trace!( - target: "serial", - ?stream, - deferred_remaining = deferred_streams.len(), - "promoting pipeline stream to Receive" - ); - let receive = active_session - .fire_receive(vec![stream]) - .context("Failed to build Receive from deferred stream")?; - Some(client.send_request(receive).fuse()) - } else if runspace_receive_demanded { - // A SendAndThenReceive requested runspace-pool-only streams — the server - // is expected to respond (e.g., PSRP key exchange EncryptedSessionKey). - // We MUST issue this Receive even though no pipeline streams exist. - runspace_receive_demanded = false; - let stream = deferred_streams.remove(0); - - diag!("DIAG promote: demanded runspace-pool Receive (key exchange / server response expected)"); - trace!( - target: "serial", - ?stream, - deferred_remaining = deferred_streams.len(), - "promoting demanded runspace-pool stream to Receive" - ); - let receive = active_session - .fire_receive(vec![stream]) - .context("Failed to build Receive from demanded runspace-pool stream")?; - Some(client.send_request(receive).fuse()) - } else { - // Only runspace pool streams, not demanded — don't promote, fall through - // to idle. A speculative runspace pool Receive when idle blocks for the - // full OperationTimeout (~15-20s) with no useful data, freezing all user - // operations. Pipeline-triggered Receives already include runspace pool - // data, so nothing is lost. - diag!("DIAG promote: skipping speculative runspace-pool-only Receive (would block for OperationTimeout)"); - trace!(target: "serial", "skipping speculative runspace-pool-only Receive, falling through to idle"); - None - } - } else { - None - }; - - if let Some(http_future) = http_future { - // Pin the HTTP future — it must survive across inner loop iterations. - futures::pin_mut!(http_future); - - // Inner loop: keep selecting until the HTTP response arrives. - // User ops and host-call responses are BUFFERED (not processed) - // because accept_client_operation needs an idle connection. - loop { - let mut host_guard = if host_call_active { - Either::Left(host_resp_rx.next()) - } else { - Either::Right(futures::future::pending::>()) - }; - - futures::select! { - resp = http_future => { - let resp = resp.context("Serial HTTP request failed")?; - diag!("DIAG select: HTTP response received"); - trace!(target: "serial", "HTTP response received"); - - let outputs = match active_session.accept_server_response(resp) { - Ok(outputs) => outputs, - Err(e) => { - diag!("DIAG ERROR: accept_server_response failed: {:#}", e); - return Err(e).context("Failed to accept server response"); - } - }; - - let output_types: Vec<&str> = - outputs.iter().map(output_type_name).collect(); - diag!( - "DIAG select: {} outputs: {:?}", - outputs.len(), - output_types - ); - trace!( - target: "serial", - output_count = outputs.len(), - ?output_types, - "processing server response outputs" - ); - - for output in outputs { - enqueue_output( - output, - &mut work_queue, - &mut deferred_streams, - &mut pending_host_calls, - &mut user_output_tx, - &mut runspace_receive_demanded, - ) - .await?; - } - - // Drain any user ops that arrived while HTTP was in flight. - loop { - match user_input_rx.try_next() { - Ok(Some(op)) => { - diag!("DIAG drain: collected {}", op.operation_type()); - pending_user_ops.push_back(op); - } - Ok(None) => { - info!("User input channel closed during drain"); - return Ok(()); - } - Err(_) => break, - } - } - - // Break inner loop — HTTP done, go back to top for - // buffered op processing and next promotion. - break; - } - - op = user_input_rx.next() => { - if let Some(op) = op { - diag!( - "DIAG select: buffering user op {} (HTTP in flight)", - op.operation_type() - ); - debug!( - target: "serial", - operation = op.operation_type(), - "buffering user operation (HTTP in flight)" - ); - pending_user_ops.push_back(op); - // Continue inner loop — HTTP future still alive. - } else { - info!("User input channel closed, ending serial session loop"); - return Ok(()); - } - } - - hr = host_guard => { - if let Some(hr) = hr { - diag!( - "DIAG select: buffering host response call_id={} (HTTP in flight)", - hr.call_id - ); - debug!( - target: "serial", - call_id = hr.call_id, - "buffering host-call response (HTTP in flight)" - ); - // Convert to UserOperation and buffer for later processing. - pending_user_ops.push_back(UserOperation::SubmitHostResponse { - call_id: hr.call_id, - scope: hr.scope, - submission: hr.submission, - }); - host_call_active = false; - // Can dispatch next HostCall to JS (just a channel send, no HTTP). - try_dispatch_next_host_call( - &mut pending_host_calls, - &mut host_call_active, - &host_call_tx, - )?; - // Continue inner loop — HTTP future still alive. - } else { - return Err(anyhow::anyhow!("Host-response channel closed")); - } - } - } - } - } else { - // No HTTP in flight and nothing to promote. Idle — wait for user op or host response. - trace!(target: "serial", "idle: no pending work, waiting for user input or host response"); - diag!("DIAG idle: waiting for user input or host response"); - - let mut host_guard = if host_call_active { - Either::Left(host_resp_rx.next()) - } else { - Either::Right(futures::future::pending::>()) - }; - - futures::select! { - op = user_input_rx.next() => { - if let Some(op) = op { - diag!("DIAG idle: user op {}", op.operation_type()); - debug!( - target: "serial", - operation = op.operation_type(), - "user operation received while idle" - ); - // Connection is idle — safe to process directly. - let output = active_session - .accept_client_operation(op) - .context("Failed to accept user operation (idle)")?; - enqueue_output( - output, - &mut work_queue, - &mut deferred_streams, - &mut pending_host_calls, - &mut user_output_tx, - &mut runspace_receive_demanded, - ) - .await?; - try_dispatch_next_host_call( - &mut pending_host_calls, - &mut host_call_active, - &host_call_tx, - )?; - } else { - info!("User input channel closed (idle), ending serial session loop"); - return Ok(()); - } - } - - hr = host_guard => { - if let Some(hr) = hr { - diag!( - "DIAG idle: host response received call_id={}", - hr.call_id - ); - debug!( - target: "serial", - call_id = hr.call_id, - "host-call response received while idle" - ); - host_call_active = false; - - // Connection is idle — safe to process directly. - let output = active_session - .accept_client_operation(UserOperation::SubmitHostResponse { - call_id: hr.call_id, - scope: hr.scope, - submission: hr.submission, - }) - .context("Failed to submit host response (idle)")?; - - enqueue_output( - output, - &mut work_queue, - &mut deferred_streams, - &mut pending_host_calls, - &mut user_output_tx, - &mut runspace_receive_demanded, - ) - .await?; - - try_dispatch_next_host_call( - &mut pending_host_calls, - &mut host_call_active, - &host_call_tx, - )?; - } else { - return Err(anyhow::anyhow!("Host-response channel closed (idle)")); - } - } - } - } - } -} - -/// Enqueue an `ActiveSessionOutput` into the appropriate queue. **Never sends HTTP inline.** -/// -/// This is the key difference from the old `process_serial_output`: all network work is -/// pushed to `work_queue` or `deferred_streams` for the promotion logic to pick up on the -/// next outer loop iteration. -/// -/// **Important:** This function does NOT dispatch HostCalls. The caller must call -/// `try_dispatch_next_host_call()` after processing all outputs to dispatch queued HostCalls. -async fn enqueue_output( - output: ActiveSessionOutput, - work_queue: &mut VecDeque, - deferred_streams: &mut Vec, - pending_host_calls: &mut VecDeque, - user_output_tx: &mut mpsc::Sender, - runspace_receive_demanded: &mut bool, -) -> anyhow::Result<()> { - match output { - ActiveSessionOutput::SendBack(reqs) => { - trace!(target: "serial", request_count = reqs.len(), "enqueue: SendBack → work_queue"); - diag!("DIAG enqueue: SendBack({}) → work_queue", reqs.len()); - for req in reqs { - work_queue.push_back(req); - } - } - ActiveSessionOutput::SendAndThenReceive { - send_request, - then_receive_streams, - } => { - trace!(target: "serial", "enqueue: SendAndThenReceive → work_queue + deferred_streams"); - diag!( - "DIAG enqueue: SendAndThenReceive → work_queue + {} deferred streams", - then_receive_streams.len() - ); - // If the follow-up streams are runspace-pool-only (no pipeline CommandId), - // mark them as demanded — the server is expected to respond (e.g., PSRP - // key exchange EncryptedSessionKey). We must NOT skip this Receive. - let rp_only = !then_receive_streams.is_empty() - && then_receive_streams - .iter() - .all(|s| s.command_id().is_none()); - if rp_only { - diag!("DIAG enqueue: runspace-pool Receive demanded (SendAndThenReceive)"); - trace!(target: "serial", "marking runspace-pool Receive as demanded (SendAndThenReceive)"); - *runspace_receive_demanded = true; - } - work_queue.push_back(send_request); - merge_deferred_streams(deferred_streams, then_receive_streams); - } - ActiveSessionOutput::PendingReceive { desired_streams } => { - trace!(target: "serial", streams = ?desired_streams, "enqueue: PendingReceive → deferred_streams"); - diag!( - "DIAG enqueue: PendingReceive({}) → deferred_streams", - desired_streams.len() - ); - merge_deferred_streams(deferred_streams, desired_streams); - } - ActiveSessionOutput::HostCall(hc) => { - diag!( - "DIAG enqueue: HostCall method={} call_id={}", - hc.method_name(), - hc.call_id() - ); - info!(target: "serial", method = %hc.method_name(), call_id = hc.call_id(), "enqueue: HostCall → pending_host_calls"); - pending_host_calls.push_back(hc); - } - ActiveSessionOutput::UserEvent(event) => { - diag!("DIAG enqueue: UserEvent sending..."); - trace!(target: "serial", event = ?event, "enqueue: UserEvent → user_output_tx"); - if user_output_tx.send(event).await.is_err() { - return Err(anyhow::anyhow!("User output channel disconnected")); - } - diag!("DIAG enqueue: UserEvent sent OK"); - } - ActiveSessionOutput::SendBackError(e) => { - error!(target: "serial", error = %e, "enqueue: SendBackError"); - return Err(anyhow::anyhow!("Session step failed: {e}")); - } - ActiveSessionOutput::OperationSuccess => { - trace!(target: "serial", "enqueue: OperationSuccess (no-op)"); - } - ActiveSessionOutput::Ignore => { - // No-op - } - } - Ok(()) -} - -/// Merge new desired streams into the deferred list, avoiding duplicates. -fn merge_deferred_streams(existing: &mut Vec, new_streams: Vec) { - let before = existing.len(); - for s in new_streams { - if !existing.contains(&s) { - existing.push(s); - } - } - if existing.len() != before { - diag!( - "DIAG merge_deferred: {} → {} streams", - before, - existing.len() - ); - } -} - -/// Dispatch the next queued HostCall to the consumer, if none is currently active. -fn try_dispatch_next_host_call( - pending_host_calls: &mut VecDeque, - host_call_active: &mut bool, - host_call_tx: &mpsc::UnboundedSender, -) -> anyhow::Result<()> { - if *host_call_active { - return Ok(()); - } - if let Some(hc) = pending_host_calls.pop_front() { - diag!( - "DIAG dispatch: HostCall method={} call_id={} ({} remaining)", - hc.method_name(), - hc.call_id(), - pending_host_calls.len() - ); - info!( - target: "serial", - method = %hc.method_name(), - call_id = hc.call_id(), - remaining = pending_host_calls.len(), - "dispatching HostCall to consumer" - ); - if host_call_tx.unbounded_send(hc).is_err() { - return Err(anyhow::anyhow!("Host-call channel closed")); - } - *host_call_active = true; - } - Ok(()) -} - -/// Helper: get a short name for an `ActiveSessionOutput` variant (for diagnostics). -fn output_type_name(o: &ActiveSessionOutput) -> &'static str { - match o { - ActiveSessionOutput::SendBack(_) => "SendBack", - ActiveSessionOutput::SendAndThenReceive { .. } => "SendAndThenReceive", - ActiveSessionOutput::UserEvent(_) => "UserEvent", - ActiveSessionOutput::HostCall(_) => "HostCall", - ActiveSessionOutput::PendingReceive { .. } => "PendingReceive", - ActiveSessionOutput::OperationSuccess => "OperationSuccess", - ActiveSessionOutput::Ignore => "Ignore", - ActiveSessionOutput::SendBackError(_) => "SendBackError", - } -} diff --git a/crates/ironposh-async/src/session_serial/core.rs b/crates/ironposh-async/src/session_serial/core.rs new file mode 100644 index 0000000..0114d5f --- /dev/null +++ b/crates/ironposh-async/src/session_serial/core.rs @@ -0,0 +1,422 @@ +//! Protocol decision core for the serial session loop. +//! +//! Pure synchronous state machine. All queue management, promotion logic, and +//! protocol decisions live here. **No async, no channels** — fully unit-testable. +//! +//! The event loop ([`super::start_serial_session_loop`]) is a thin async shell +//! that shuttles data between I/O channels and this core. + +use std::collections::VecDeque; + +use anyhow::Context; +use ironposh_client_core::connector::active_session::{ActiveSession, UserEvent}; +use ironposh_client_core::connector::{ActiveSessionOutput, UserOperation, conntion_pool::TrySend}; +use ironposh_client_core::host::HostCall; +use ironposh_client_core::runspace_pool::DesiredStream; +use tracing::{debug, error, info, trace}; + +use super::diag; +use crate::HostResponse; + +/// Protocol decision core for the serial session loop. +/// +/// Owns all internal queues and the [`ActiveSession`] state machine. Every +/// method is synchronous — the async event loop only needs to call these +/// methods and dispatch the resulting effects. +pub(super) struct SessionCore { + active_session: ActiveSession, + + /// Pending HTTP requests (Command, HostResponse, Signal, etc.). + /// **Always promoted before Receives.** + work_queue: VecDeque, + + /// Demanded Receive streams — from `SendAndThenReceive`. The server is + /// expected to have data ready after processing our preceding Send (e.g., + /// PSRP key exchange `EncryptedSessionKey`, host-response acknowledgement). + /// **Always promoted** regardless of pipeline vs runspace-pool. + demanded_streams: VecDeque, + + /// Speculative Receive streams — from `PendingReceive`. Pipeline streams + /// are promoted; runspace-pool-only streams are **skipped** (would block for + /// `OperationTimeout` with no useful data). Pipeline Receives already + /// include runspace pool data, so nothing is lost. + speculative_streams: Vec, + + /// HostCalls waiting to be dispatched to the JS/consumer side. + pending_host_calls: VecDeque, + + /// Whether we're currently waiting for a host-call response from JS. + host_call_active: bool, + + /// User operations buffered while an HTTP request is in flight. + pending_user_ops: VecDeque, + + /// User events accumulated during processing, to be drained by the event loop. + pending_user_events: Vec, +} + +impl SessionCore { + pub(super) fn new(first_receive: TrySend, active_session: ActiveSession) -> Self { + let mut work_queue = VecDeque::new(); + work_queue.push_back(first_receive); + Self { + active_session, + work_queue, + demanded_streams: VecDeque::new(), + speculative_streams: Vec::new(), + pending_host_calls: VecDeque::new(), + host_call_active: false, + pending_user_ops: VecDeque::new(), + pending_user_events: Vec::new(), + } + } + + // ── Buffered user ops ───────────────────────────────────────────────── + + /// Process ONE buffered user operation, if the connection is idle + /// (`work_queue` is empty). Only one per call because + /// `accept_client_operation` may itself call `send()`, moving the + /// connection back to Pending. + pub(super) fn process_one_buffered_op(&mut self) -> anyhow::Result<()> { + if self.work_queue.is_empty() + && let Some(op) = self.pending_user_ops.pop_front() + { + diag!("DIAG process buffered: {}", op.operation_type()); + debug!(target: "serial", operation = op.operation_type(), "processing buffered user operation"); + let output = self + .active_session + .accept_client_operation(op) + .context("Failed to accept buffered user operation")?; + self.route_output(output)?; + } + Ok(()) + } + + // ── Promotion ──────────────────────────────────────────────────────── + + /// Pick the next HTTP request to send. Returns `None` when idle. + /// + /// Promotion priority: + /// 1. `work_queue` (Send operations) + /// 2. `demanded_streams` (server will respond — key exchange, etc.) + /// 3. pipeline streams from `speculative_streams` + /// 4. runspace-pool-only speculative → skip (returns `None`) + pub(super) fn promote_next_request(&mut self) -> anyhow::Result> { + if let Some(req) = self.work_queue.pop_front() { + diag!( + "DIAG promote: sending from work_queue ({} remaining)", + self.work_queue.len() + ); + trace!(target: "serial", remaining_work = self.work_queue.len(), "promoting work_queue item"); + return Ok(Some(req)); + } + + // Only build Receives when all HostCalls are resolved. The server blocks + // Receives while waiting for our HostCall response, so sending one early + // would always time out (adding an unnecessary OperationTimeout delay). + if self.host_call_active || !self.pending_host_calls.is_empty() { + return Ok(None); + } + + // Priority 1: demanded streams (from SendAndThenReceive). + if let Some(stream) = self.demanded_streams.pop_front() { + diag!( + "DIAG promote: demanded Receive ({} demanded remaining, {} speculative)", + self.demanded_streams.len(), + self.speculative_streams.len() + ); + trace!( + target: "serial", + ?stream, + demanded_remaining = self.demanded_streams.len(), + speculative_remaining = self.speculative_streams.len(), + "promoting demanded stream to Receive" + ); + let receive = self + .active_session + .fire_receive(vec![stream]) + .context("Failed to build Receive from demanded stream")?; + return Ok(Some(receive)); + } + + // Priority 2: pipeline streams from speculative. + let has_pipeline = self + .speculative_streams + .iter() + .any(|s| s.command_id().is_some()); + if has_pipeline { + self.speculative_streams + .retain(|s| s.command_id().is_some()); + let stream = self.speculative_streams.remove(0); + + diag!( + "DIAG promote: Receive for pipeline stream ({} speculative remaining)", + self.speculative_streams.len() + ); + trace!( + target: "serial", + ?stream, + speculative_remaining = self.speculative_streams.len(), + "promoting pipeline stream to Receive" + ); + let receive = self + .active_session + .fire_receive(vec![stream]) + .context("Failed to build Receive from speculative stream")?; + return Ok(Some(receive)); + } + + // Only speculative runspace-pool streams — skip to avoid blocking + // for OperationTimeout with no useful data. + if !self.speculative_streams.is_empty() { + diag!( + "DIAG promote: skipping speculative runspace-pool-only Receive (would block for OperationTimeout)" + ); + trace!(target: "serial", "skipping speculative runspace-pool-only Receive, falling through to idle"); + } + + Ok(None) + } + + // ── Server response ────────────────────────────────────────────────── + + /// Process an HTTP response from the server. + pub(super) fn accept_response( + &mut self, + resp: crate::HttpResponseTargeted, + ) -> anyhow::Result<()> { + let outputs = match self.active_session.accept_server_response(resp) { + Ok(outputs) => outputs, + Err(e) => { + diag!("DIAG ERROR: accept_server_response failed: {:#}", e); + return Err(e).context("Failed to accept server response"); + } + }; + + let output_types: Vec<&str> = outputs.iter().map(output_type_name).collect(); + diag!("DIAG select: {} outputs: {:?}", outputs.len(), output_types); + trace!( + target: "serial", + output_count = outputs.len(), + ?output_types, + "processing server response outputs" + ); + + for output in outputs { + self.route_output(output)?; + } + Ok(()) + } + + // ── Idle processing (connection idle — safe to call accept_client_operation) + + /// Process a user operation when the connection is idle. + pub(super) fn accept_user_op(&mut self, op: UserOperation) -> anyhow::Result<()> { + diag!("DIAG idle: user op {}", op.operation_type()); + debug!( + target: "serial", + operation = op.operation_type(), + "user operation received while idle" + ); + let output = self + .active_session + .accept_client_operation(op) + .context("Failed to accept user operation (idle)")?; + self.route_output(output) + } + + /// Process a host-call response when the connection is idle. + pub(super) fn accept_host_response(&mut self, hr: HostResponse) -> anyhow::Result<()> { + diag!("DIAG idle: host response received call_id={}", hr.call_id); + debug!( + target: "serial", + call_id = hr.call_id, + "host-call response received while idle" + ); + self.host_call_active = false; + let output = self + .active_session + .accept_client_operation(UserOperation::SubmitHostResponse { + call_id: hr.call_id, + scope: hr.scope, + submission: hr.submission, + }) + .context("Failed to submit host response (idle)")?; + self.route_output(output) + } + + // ── Buffering (HTTP in-flight — cannot call accept_client_operation) ─ + + /// Buffer a user operation during HTTP in-flight. + pub(super) fn buffer_user_op(&mut self, op: UserOperation) { + diag!( + "DIAG select: buffering user op {} (HTTP in flight)", + op.operation_type() + ); + debug!( + target: "serial", + operation = op.operation_type(), + "buffering user operation (HTTP in flight)" + ); + self.pending_user_ops.push_back(op); + } + + /// Buffer a host-call response during HTTP in-flight. + /// + /// Converts to [`UserOperation::SubmitHostResponse`] for later processing, + /// and marks `host_call_active = false` so the next HostCall can be dispatched. + pub(super) fn buffer_host_response(&mut self, hr: HostResponse) { + diag!( + "DIAG select: buffering host response call_id={} (HTTP in flight)", + hr.call_id + ); + debug!( + target: "serial", + call_id = hr.call_id, + "buffering host-call response (HTTP in flight)" + ); + self.pending_user_ops + .push_back(UserOperation::SubmitHostResponse { + call_id: hr.call_id, + scope: hr.scope, + submission: hr.submission, + }); + self.host_call_active = false; + } + + // ── Effect draining ────────────────────────────────────────────────── + + /// Pop the next HostCall to dispatch, if none is currently active. + pub(super) fn poll_host_call(&mut self) -> Option { + if self.host_call_active { + return None; + } + let hc = self.pending_host_calls.pop_front()?; + diag!( + "DIAG dispatch: HostCall method={} call_id={} ({} remaining)", + hc.method_name(), + hc.call_id(), + self.pending_host_calls.len() + ); + info!( + target: "serial", + method = %hc.method_name(), + call_id = hc.call_id(), + remaining = self.pending_host_calls.len(), + "dispatching HostCall to consumer" + ); + self.host_call_active = true; + Some(hc) + } + + /// Drain accumulated user events. + pub(super) fn drain_user_events(&mut self) -> Vec { + std::mem::take(&mut self.pending_user_events) + } + + /// Whether a HostCall is currently active (event loop uses this for `select!` guard). + pub(super) fn is_host_call_active(&self) -> bool { + self.host_call_active + } + + // ── Internal routing ───────────────────────────────────────────────── + + /// Route an [`ActiveSessionOutput`] to the appropriate internal queue. + /// **Never sends HTTP.** Never touches channels. + fn route_output(&mut self, output: ActiveSessionOutput) -> anyhow::Result<()> { + match output { + ActiveSessionOutput::SendBack(reqs) => { + trace!(target: "serial", request_count = reqs.len(), "enqueue: SendBack → work_queue"); + diag!("DIAG enqueue: SendBack({}) → work_queue", reqs.len()); + for req in reqs { + self.work_queue.push_back(req); + } + } + ActiveSessionOutput::SendAndThenReceive { + send_request, + then_receive_streams, + } => { + // "I just sent something, the server WILL respond." + // Follow-up streams go to demanded_streams (always promoted). + trace!( + target: "serial", + stream_count = then_receive_streams.len(), + "enqueue: SendAndThenReceive → work_queue + demanded_streams" + ); + diag!( + "DIAG enqueue: SendAndThenReceive → work_queue + {} demanded streams", + then_receive_streams.len() + ); + self.work_queue.push_back(send_request); + for s in then_receive_streams { + self.demanded_streams.push_back(s); + } + } + ActiveSessionOutput::PendingReceive { desired_streams } => { + // Speculative: "you should eventually poll these." + trace!(target: "serial", streams = ?desired_streams, "enqueue: PendingReceive → speculative_streams"); + diag!( + "DIAG enqueue: PendingReceive({}) → speculative_streams", + desired_streams.len() + ); + merge_speculative_streams(&mut self.speculative_streams, desired_streams); + } + ActiveSessionOutput::HostCall(hc) => { + diag!( + "DIAG enqueue: HostCall method={} call_id={}", + hc.method_name(), + hc.call_id() + ); + info!(target: "serial", method = %hc.method_name(), call_id = hc.call_id(), "enqueue: HostCall → pending_host_calls"); + self.pending_host_calls.push_back(hc); + } + ActiveSessionOutput::UserEvent(event) => { + diag!("DIAG enqueue: UserEvent queued"); + trace!(target: "serial", event = ?event, "enqueue: UserEvent → pending_user_events"); + self.pending_user_events.push(event); + } + ActiveSessionOutput::SendBackError(e) => { + error!(target: "serial", error = %e, "enqueue: SendBackError"); + return Err(anyhow::anyhow!("Session step failed: {e}")); + } + ActiveSessionOutput::OperationSuccess => { + trace!(target: "serial", "enqueue: OperationSuccess (no-op)"); + } + ActiveSessionOutput::Ignore => {} + } + Ok(()) + } +} + +// ── Helpers ────────────────────────────────────────────────────────────────── + +/// Merge new desired streams into the speculative list, avoiding duplicates. +fn merge_speculative_streams(existing: &mut Vec, new_streams: Vec) { + let before = existing.len(); + for s in new_streams { + if !existing.contains(&s) { + existing.push(s); + } + } + if existing.len() != before { + diag!( + "DIAG merge_speculative: {} → {} streams", + before, + existing.len() + ); + } +} + +/// Short name for an [`ActiveSessionOutput`] variant (for diagnostics). +fn output_type_name(o: &ActiveSessionOutput) -> &'static str { + match o { + ActiveSessionOutput::SendBack(_) => "SendBack", + ActiveSessionOutput::SendAndThenReceive { .. } => "SendAndThenReceive", + ActiveSessionOutput::UserEvent(_) => "UserEvent", + ActiveSessionOutput::HostCall(_) => "HostCall", + ActiveSessionOutput::PendingReceive { .. } => "PendingReceive", + ActiveSessionOutput::OperationSuccess => "OperationSuccess", + ActiveSessionOutput::Ignore => "Ignore", + ActiveSessionOutput::SendBackError(_) => "SendBackError", + } +} diff --git a/crates/ironposh-async/src/session_serial/mod.rs b/crates/ironposh-async/src/session_serial/mod.rs new file mode 100644 index 0000000..d7dcdcc --- /dev/null +++ b/crates/ironposh-async/src/session_serial/mod.rs @@ -0,0 +1,204 @@ +//! Serial session loop — single-connection mode for Devolutions Gateway. +//! +//! Split into two layers: +//! - [`core::SessionCore`] — pure synchronous protocol decisions (queues, promotion, routing) +//! - [`start_serial_session_loop`] — thin async I/O shell (HTTP, channels, `select!`) + +mod core; + +use anyhow::Context; +use futures::channel::mpsc; +use futures::future::Either; +use futures::{FutureExt, SinkExt, StreamExt}; +use ironposh_client_core::connector::active_session::{ActiveSession, UserEvent}; +use ironposh_client_core::connector::conntion_pool::TrySend; +use ironposh_client_core::host::HostCall; +use tracing::{info, instrument, trace}; + +use ironposh_client_core::connector::UserOperation; + +use self::core::SessionCore; +use crate::{HostResponse, HttpClient}; + +/// Console diagnostic logging for WASM debugging. +#[cfg(all(feature = "wasm-diag", target_arch = "wasm32"))] +macro_rules! diag { + ($($arg:tt)*) => { + web_sys::console::error_1(&format!($($arg)*).into()); + }; +} +#[cfg(not(all(feature = "wasm-diag", target_arch = "wasm32")))] +macro_rules! diag { + ($($arg:tt)*) => {}; +} +pub(in crate::session_serial) use diag; + +/// Serial active session loop using a flat `select!`-based event loop. +/// +/// **Core invariant:** At most one HTTP request is in flight at any time. +/// All protocol decisions (promotion priority, HostCall blocking, speculative +/// vs demanded Receives) are handled by [`SessionCore`]. +#[instrument(skip_all)] +pub async fn start_serial_session_loop( + first_receive: TrySend, + active_session: ActiveSession, + client: impl HttpClient, + mut user_input_rx: mpsc::Receiver, + mut user_output_tx: mpsc::Sender, + host_call_tx: mpsc::UnboundedSender, + mut host_resp_rx: mpsc::UnboundedReceiver, +) -> anyhow::Result<()> { + let mut core = SessionCore::new(first_receive, active_session); + + info!("Starting serial session loop (flat event loop, single-connection mode)"); + diag!("DIAG serial loop: started (flat event loop)"); + + loop { + // Dispatch accumulated effects from the previous iteration. + dispatch_effects(&mut core, &mut user_output_tx, &host_call_tx).await?; + + // Process one buffered user op if connection is idle. + core.process_one_buffered_op()?; + + if let Some(req) = core.promote_next_request()? { + // HTTP in-flight: send request and buffer incoming ops until response. + let resp = send_and_buffer( + &client, + req, + &mut core, + &mut user_input_rx, + &mut host_resp_rx, + &host_call_tx, + ) + .await?; + + core.accept_response(resp)?; + + // Drain any user ops that arrived while HTTP was in flight. + if drain_channel(&mut core, &mut user_input_rx) { + return Ok(()); // channel closed + } + } else { + // Idle — wait for user op or host response. + trace!(target: "serial", "idle: no pending work, waiting for user input or host response"); + diag!("DIAG idle: waiting for user input or host response"); + + let mut host_guard = if core.is_host_call_active() { + Either::Left(host_resp_rx.next()) + } else { + Either::Right(futures::future::pending::>()) + }; + + futures::select! { + op = user_input_rx.next() => { + if let Some(op) = op { + core.accept_user_op(op)?; + } else { + info!("User input channel closed (idle), ending serial session loop"); + return Ok(()); + } + } + hr = host_guard => { + if let Some(hr) = hr { + core.accept_host_response(hr)?; + } else { + return Err(anyhow::anyhow!("Host-response channel closed (idle)")); + } + } + } + } + } +} + +/// Send an HTTP request and buffer incoming user ops / host responses until +/// the response arrives. +async fn send_and_buffer( + client: &impl HttpClient, + req: TrySend, + core: &mut SessionCore, + user_input_rx: &mut mpsc::Receiver, + host_resp_rx: &mut mpsc::UnboundedReceiver, + host_call_tx: &mpsc::UnboundedSender, +) -> anyhow::Result { + let http_future = client.send_request(req).fuse(); + futures::pin_mut!(http_future); + + loop { + let mut host_guard = if core.is_host_call_active() { + Either::Left(host_resp_rx.next()) + } else { + Either::Right(futures::future::pending::>()) + }; + + futures::select! { + resp = http_future => { + diag!("DIAG select: HTTP response received"); + trace!(target: "serial", "HTTP response received"); + return resp.context("Serial HTTP request failed"); + } + op = user_input_rx.next() => { + if let Some(op) = op { + core.buffer_user_op(op); + } else { + info!("User input channel closed, ending serial session loop"); + return Err(anyhow::anyhow!("User input channel closed during HTTP wait")); + } + } + hr = host_guard => { + match hr { + Some(hr) => { + core.buffer_host_response(hr); + // Dispatch next HostCall immediately (just a channel send, no HTTP). + while let Some(hc) = core.poll_host_call() { + if host_call_tx.unbounded_send(hc).is_err() { + return Err(anyhow::anyhow!("Host-call channel closed")); + } + } + } + None => return Err(anyhow::anyhow!("Host-response channel closed")), + } + } + } + } +} + +/// Drain buffered user ops from the channel (after HTTP response). +/// Returns `true` if the channel is closed. +fn drain_channel( + core: &mut SessionCore, + user_input_rx: &mut mpsc::Receiver, +) -> bool { + loop { + match user_input_rx.try_next() { + Ok(Some(op)) => { + diag!("DIAG drain: collected {}", op.operation_type()); + core.buffer_user_op(op); + } + Ok(None) => { + info!("User input channel closed during drain"); + return true; + } + Err(_) => return false, + } + } +} + +/// Dispatch accumulated user events and host calls to their channels. +async fn dispatch_effects( + core: &mut SessionCore, + user_output_tx: &mut mpsc::Sender, + host_call_tx: &mpsc::UnboundedSender, +) -> anyhow::Result<()> { + for event in core.drain_user_events() { + diag!("DIAG dispatch: UserEvent"); + if user_output_tx.send(event).await.is_err() { + return Err(anyhow::anyhow!("User output channel disconnected")); + } + } + while let Some(hc) = core.poll_host_call() { + if host_call_tx.unbounded_send(hc).is_err() { + return Err(anyhow::anyhow!("Host-call channel closed")); + } + } + Ok(()) +} diff --git a/crates/ironposh-web/Cargo.toml b/crates/ironposh-web/Cargo.toml index 29d0e09..d6e3891 100644 --- a/crates/ironposh-web/Cargo.toml +++ b/crates/ironposh-web/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ironposh-web" -version = "0.4.2" +version = "0.4.3" authors = ["irving ou "] edition = "2018" description = "PowerShell Remoting over WinRM for WebAssembly" From 1ceddf34b7a4b7bd9ba84b46b80319777f8c48b2 Mon Sep 17 00:00:00 2001 From: irving ou Date: Tue, 24 Feb 2026 12:58:58 -0500 Subject: [PATCH 2/2] style: cargo fmt --all across workspace Fix formatting in ironposh-client-core, ironposh-client-sync, and ironposh-client-tokio to pass CI cargo fmt --all check. --- .../src/connector/active_session.rs | 2 +- crates/ironposh-client-sync/src/main.rs | 11 ++++++++--- crates/ironposh-client-tokio/src/config.rs | 5 ++++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/ironposh-client-core/src/connector/active_session.rs b/crates/ironposh-client-core/src/connector/active_session.rs index 3a938a7..1ff4059 100644 --- a/crates/ironposh-client-core/src/connector/active_session.rs +++ b/crates/ironposh-client-core/src/connector/active_session.rs @@ -1,4 +1,5 @@ use crate::{ + PwshCoreError, connector::{ conntion_pool::{ConnectionPool, ConnectionPoolAccept, TrySend}, http::HttpResponseTargeted, @@ -7,7 +8,6 @@ use crate::{ pipeline::PipelineSpec, powershell::PipelineHandle, runspace_pool::{DesiredStream, RunspacePool, pool::AcceptResponsResult}, - PwshCoreError, }; use ironposh_psrp::{ErrorRecord, PipelineOutput, PsPrimitiveValue, PsValue}; use tracing::{error, info, instrument, warn}; diff --git a/crates/ironposh-client-sync/src/main.rs b/crates/ironposh-client-sync/src/main.rs index 8825682..80f524a 100644 --- a/crates/ironposh-client-sync/src/main.rs +++ b/crates/ironposh-client-sync/src/main.rs @@ -194,7 +194,10 @@ fn run_event_loop( .context("Failed to send HTTP request")?; } } - ActiveSessionOutput::SendAndThenReceive { send_request, then_receive_streams } => { + ActiveSessionOutput::SendAndThenReceive { + send_request, + then_receive_streams, + } => { info!( target: "network", "sending request then queueing receive" @@ -202,7 +205,8 @@ fn run_event_loop( network_request_tx .send(send_request) .context("Failed to send HTTP request")?; - let recv = active_session.fire_receive(then_receive_streams) + let recv = active_session + .fire_receive(then_receive_streams) .context("Failed to build receive after send-then-receive")?; network_request_tx .send(recv) @@ -214,7 +218,8 @@ fn run_event_loop( stream_count = desired_streams.len(), "firing deferred receive" ); - let recv = active_session.fire_receive(desired_streams) + let recv = active_session + .fire_receive(desired_streams) .context("Failed to build deferred receive")?; network_request_tx .send(recv) diff --git a/crates/ironposh-client-tokio/src/config.rs b/crates/ironposh-client-tokio/src/config.rs index 188f77f..bedbf72 100644 --- a/crates/ironposh-client-tokio/src/config.rs +++ b/crates/ironposh-client-tokio/src/config.rs @@ -70,7 +70,10 @@ pub struct Args { pub http_insecure: bool, /// Use parallel (multi-connection) session loop instead of the default serial mode. - #[arg(long, help = "Use parallel session loop (default: serial/single-connection)")] + #[arg( + long, + help = "Use parallel session loop (default: serial/single-connection)" + )] pub parallel: bool, /// Verbose logging (can be repeated for more verbosity)