Skip to content

Commit aa7a3b7

Browse files
committed
initial integration of remote ptys for target envs which dont support ptys
1 parent da14fe6 commit aa7a3b7

File tree

16 files changed

+1109
-114
lines changed

16 files changed

+1109
-114
lines changed

Cargo.lock

+253-67
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tunshell-client/Cargo.toml

+6-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ cfg-if = "0.1.10"
2727
portable-pty = "0.3.1"
2828

2929
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
30-
tokio = { version = "0.2.21", features=["rt-threaded", "blocking", "dns", "time", "io-util", "io-std", "tcp", "udp", "sync", "process", "macros", "signal"] } #no-wasm
31-
crossterm = { version = "0.17.5" }
30+
tokio = { version = "0.2.21", features=["rt-threaded", "blocking", "dns", "time", "io-util", "io-std", "tcp", "udp", "sync", "process", "macros", "signal", "fs", "uds"] } #no-wasm
31+
crossterm = { version = "0.23.2" }
3232
libc = "0.2.71"
3333
async-tungstenite = { version = "0.8.0", features=["tokio-runtime"] } #no-wasm
3434

@@ -71,5 +71,9 @@ features = [
7171
"WebSocket",
7272
]
7373

74+
[target.'cfg(all(target_os = "linux", any(target_arch = "x86_64", target_arch = "aarch64")))'.dependencies]
75+
remote-pty-common = { git = "https://github.com/TimeToogo/remote-pty" }
76+
remote-pty-master = { git = "https://github.com/TimeToogo/remote-pty" }
77+
7478
[dev-dependencies]
7579
lazy_static = "1.4.0"

tunshell-client/src/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl Config {
5151
while let Some(arg) = args.next() {
5252
match arg.as_str() {
5353
"--echo" => echo_stdout = true,
54-
arg @ _ => panic!(format!("Unknown argument: {}", arg)),
54+
arg @ _ => panic!("Unknown argument: {}", arg),
5555
}
5656
}
5757

tunshell-client/src/p2p/udp/connection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ impl AsyncRead for UdpConnection {
229229
return Poll::Pending;
230230
} else {
231231
let data = con.recv_drain_bytes(buff.len());
232-
&buff[..data.len()].copy_from_slice(&data[..]);
232+
buff[..data.len()].copy_from_slice(&data[..]);
233233
return Poll::Ready(Ok(data.len()));
234234
}
235235
}

tunshell-client/src/shell/client/mod.rs

+36-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use super::{
22
ShellClientMessage, ShellClientStream, ShellServerMessage, StartShellPayload, WindowSize,
33
};
4-
use crate::{util::delay::delay_for, ShellKey, TunnelStream};
4+
use crate::{
5+
remote_pty_supported, shell::proto::ShellStartedPayload, util::delay::delay_for, ShellKey,
6+
TunnelStream,
7+
};
58
use anyhow::{Context, Error, Result};
69
use futures::stream::StreamExt;
710
use log::*;
@@ -20,6 +23,12 @@ cfg_if::cfg_if! {
2023
pub use shell::*;
2124
}
2225
}
26+
cfg_if::cfg_if! {
27+
if #[cfg(all(target_os = "linux", any(target_arch = "x86_64", target_arch = "aarch64")))] {
28+
mod remote_pty;
29+
use remote_pty::start_remote_pty_master;
30+
}
31+
}
2332

2433
pub struct ShellClient {
2534
pub(crate) host_shell: HostShell,
@@ -52,18 +61,38 @@ impl ShellClient {
5261
.write(&ShellClientMessage::StartShell(StartShellPayload {
5362
term: self.host_shell.term().unwrap_or("".to_owned()),
5463
size: WindowSize::from(self.host_shell.size().await?),
64+
remote_pty_support: remote_pty_supported(),
5565
}))
5666
.await?;
5767

5868
info!("shell requested");
59-
info!("starting shell stream");
6069

61-
self.host_shell.enable_raw_mode()?;
62-
63-
let exit_code = self.stream_shell_io(&mut stream).await;
64-
65-
self.host_shell.disable_raw_mode()?;
70+
let response = tokio::select! {
71+
message = stream.next() => match message {
72+
Some(Ok(ShellServerMessage::ShellStarted(res))) => res,
73+
Some(Ok(_)) => return Err(Error::msg("shell server returned an unexpected response")),
74+
Some(Err(err)) => return Err(Error::from(err).context("shell server returned an error")),
75+
None => return Err(Error::msg("did not receive shell started response"))
76+
},
77+
_ = delay_for(Duration::from_millis(30000)) => return Err(Error::msg("timed out while waiting for shell"))
78+
};
6679

80+
let exit_code = if let ShellStartedPayload::RemotePty = response {
81+
cfg_if::cfg_if! {
82+
if #[cfg(not(all(target_os = "linux", any(target_arch = "x86_64", target_arch = "aarch64"))))] {
83+
return Err(Error::msg("shell server started remote pty when not supported on local"));
84+
} else {
85+
info!("starting remote pty master");
86+
start_remote_pty_master(stream).await
87+
}
88+
}
89+
} else {
90+
info!("starting shell stream");
91+
self.host_shell.enable_raw_mode()?;
92+
let exit_code = self.stream_shell_io(&mut stream).await;
93+
self.host_shell.disable_raw_mode()?;
94+
exit_code
95+
};
6796
info!("session finished");
6897

6998
Ok(exit_code?)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
use std::{
2+
cmp,
3+
collections::HashMap,
4+
io,
5+
sync::{
6+
self,
7+
atomic::{AtomicBool, Ordering},
8+
Arc, Mutex,
9+
},
10+
};
11+
12+
use anyhow::{Error, Result};
13+
use futures::StreamExt;
14+
use remote_pty_common::channel;
15+
16+
use crate::shell::proto::{RemotePtyDataPayload, RemotePtyEventPayload, ShellServerMessage};
17+
18+
use super::ShellStream;
19+
20+
use remote_pty_master::{server::listener::Listener, *};
21+
22+
const STDIN_FD: libc::c_int = 0;
23+
24+
struct TransportRx {
25+
rx: sync::mpsc::Receiver<Vec<u8>>,
26+
recv_buff: Vec<u8>,
27+
}
28+
29+
impl TransportRx {
30+
fn new(rx: sync::mpsc::Receiver<Vec<u8>>) -> Self {
31+
Self {
32+
rx,
33+
recv_buff: vec![],
34+
}
35+
}
36+
}
37+
38+
impl io::Read for TransportRx {
39+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
40+
while self.recv_buff.is_empty() {
41+
let buf = self.rx.recv().map_err(|_| io::Error::last_os_error())?;
42+
self.recv_buff.extend_from_slice(buf.as_slice());
43+
}
44+
45+
let n = cmp::min(buf.len(), self.recv_buff.len());
46+
buf[..n].copy_from_slice(&self.recv_buff[..n]);
47+
self.recv_buff.drain(..n);
48+
Ok(n)
49+
}
50+
}
51+
52+
struct TransportTx {
53+
tx: sync::mpsc::Sender<Vec<u8>>,
54+
}
55+
56+
impl TransportTx {
57+
fn new(tx: sync::mpsc::Sender<Vec<u8>>) -> Self {
58+
Self { tx }
59+
}
60+
}
61+
62+
impl io::Write for TransportTx {
63+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
64+
self.tx
65+
.send(buf.to_vec())
66+
.map_err(|_| io::Error::last_os_error())?;
67+
Ok(buf.len())
68+
}
69+
70+
fn flush(&mut self) -> io::Result<()> {
71+
Ok(())
72+
}
73+
}
74+
75+
struct TransportChannel {
76+
rx: TransportRx,
77+
tx: TransportTx,
78+
}
79+
80+
impl TransportChannel {
81+
fn new(rx: sync::mpsc::Receiver<Vec<u8>>, tx: sync::mpsc::Sender<Vec<u8>>) -> Self {
82+
Self {
83+
rx: TransportRx::new(rx),
84+
tx: TransportTx::new(tx),
85+
}
86+
}
87+
88+
fn pair() -> (Self, Self) {
89+
let (t1, r1) = sync::mpsc::channel();
90+
let (t2, r2) = sync::mpsc::channel();
91+
92+
(Self::new(r1, t2), Self::new(r2, t1))
93+
}
94+
95+
fn streamed_to(
96+
self,
97+
con_id: u32,
98+
tx: tokio::sync::mpsc::UnboundedSender<RemotePtyDataPayload>,
99+
terminated: Arc<AtomicBool>,
100+
) -> TransportTx {
101+
let rx = self.rx;
102+
tokio::task::spawn_blocking(move || -> Result<()> {
103+
loop {
104+
let data = rx.rx.recv()?;
105+
106+
if terminated.load(Ordering::SeqCst) {
107+
return Ok(());
108+
}
109+
110+
tx.send(RemotePtyDataPayload { con_id, data })?;
111+
}
112+
});
113+
114+
self.tx
115+
}
116+
}
117+
118+
impl Into<channel::RemoteChannel> for TransportChannel {
119+
fn into(self) -> channel::RemoteChannel {
120+
channel::RemoteChannel::new(channel::transport::rw::ReadWriteTransport::new(
121+
self.rx, self.tx,
122+
))
123+
}
124+
}
125+
126+
struct RxListener {
127+
rx: sync::mpsc::Receiver<TransportChannel>,
128+
}
129+
130+
impl Listener for RxListener {
131+
fn accept(&mut self) -> io::Result<channel::RemoteChannel> {
132+
Ok(self
133+
.rx
134+
.recv()
135+
.map_err(|_| io::Error::last_os_error())?
136+
.into())
137+
}
138+
}
139+
140+
struct ChannelDemuxer {
141+
state: Arc<Mutex<HashMap<u32, (TransportTx, Arc<AtomicBool>)>>>,
142+
chan_tx: sync::mpsc::Sender<TransportChannel>,
143+
read_tx: tokio::sync::mpsc::UnboundedSender<RemotePtyDataPayload>,
144+
read_rx: tokio::sync::mpsc::UnboundedReceiver<RemotePtyDataPayload>,
145+
}
146+
147+
impl ChannelDemuxer {
148+
fn new() -> (Self, RxListener) {
149+
let (chan_tx, chan_rx) = sync::mpsc::channel();
150+
let chan_rx = RxListener { rx: chan_rx };
151+
152+
let (read_tx, read_rx) = tokio::sync::mpsc::unbounded_channel();
153+
154+
let demux = Self {
155+
state: Arc::new(Mutex::new(HashMap::new())),
156+
chan_tx,
157+
read_rx,
158+
read_tx,
159+
};
160+
161+
(demux, chan_rx)
162+
}
163+
164+
fn handle(&mut self, msg: RemotePtyEventPayload) -> Result<()> {
165+
match msg {
166+
RemotePtyEventPayload::Connect(con_id) => self.handle_new_connection(con_id),
167+
RemotePtyEventPayload::Payload(data) => self.handle_data(data),
168+
RemotePtyEventPayload::Close(con_id) => self.handle_close_connection(con_id),
169+
}
170+
}
171+
172+
fn state(
173+
&self,
174+
) -> Result<std::sync::MutexGuard<'_, HashMap<u32, (TransportTx, Arc<AtomicBool>)>>> {
175+
self.state
176+
.lock()
177+
.map_err(|_| Error::msg("failed to lock mutex"))
178+
}
179+
180+
fn handle_new_connection(&self, con_id: u32) -> Result<(), Error> {
181+
let mut state = self.state()?;
182+
183+
if state.contains_key(&con_id) {
184+
return Err(Error::msg("connection id taken"));
185+
}
186+
187+
let (c1, c2) = TransportChannel::pair();
188+
let terminated = Arc::new(AtomicBool::new(false));
189+
190+
let tx1 = c1.streamed_to(con_id, self.read_tx.clone(), Arc::clone(&terminated));
191+
state.insert(con_id, (tx1, terminated));
192+
self.chan_tx
193+
.send(c2)
194+
.map_err(|_| Error::msg("failed to send new connection message"))?;
195+
196+
Ok(())
197+
}
198+
199+
fn handle_data(&self, data: RemotePtyDataPayload) -> Result<(), Error> {
200+
let state = self.state()?;
201+
202+
if !state.contains_key(&data.con_id) {
203+
return Err(Error::msg("connection id does not exist"));
204+
}
205+
206+
state
207+
.get(&data.con_id)
208+
.unwrap()
209+
.0
210+
.tx
211+
.send(data.data.to_vec())?;
212+
213+
Ok(())
214+
}
215+
216+
fn handle_close_connection(&self, con_id: u32) -> Result<(), Error> {
217+
let mut state = self.state()?;
218+
219+
if !state.contains_key(&con_id) {
220+
return Err(Error::msg("connection id does not exist"));
221+
}
222+
223+
let (_, terminated) = state.remove(&con_id).unwrap();
224+
terminated.store(true, Ordering::SeqCst);
225+
226+
Ok(())
227+
}
228+
229+
async fn next(&mut self) -> Result<RemotePtyDataPayload> {
230+
self.read_rx
231+
.recv()
232+
.await
233+
.ok_or_else(|| Error::msg("failed to recv message"))
234+
}
235+
}
236+
237+
pub(super) async fn start_remote_pty_master(mut stream: ShellStream) -> Result<u8> {
238+
let (mut demuxer, listener) = ChannelDemuxer::new();
239+
240+
let mut server = tokio::task::spawn_blocking(move || {
241+
let ctx = context::Context::from_pair(STDIN_FD, STDIN_FD);
242+
server::Server::new(ctx, Box::new(listener))
243+
.start()
244+
.join()
245+
.map_err(|_| Error::msg("failed to join server"))?;
246+
Ok(0u8)
247+
});
248+
249+
loop {
250+
tokio::select! {
251+
res = &mut server => return res.unwrap_or_else(|_| Err(Error::msg("failed join server"))),
252+
msg = stream.next() => match msg {
253+
Some(Ok(ShellServerMessage::RemotePtyEvent(msg))) => demuxer.handle(msg)?,
254+
Some(Ok(ShellServerMessage::Exited(code))) => return Ok(code),
255+
None => return Err(Error::msg("shell server stream finished unexpectedly")),
256+
_ => return Err(Error::msg("received unexpected message from shell server stream"))
257+
},
258+
msg = demuxer.next() => {
259+
stream.write(&crate::shell::proto::ShellClientMessage::RemotePtyData(msg?)).await?;
260+
}
261+
};
262+
}
263+
}

tunshell-client/src/shell/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@ impl ShellKey {
2626
&self.key
2727
}
2828
}
29+
30+
pub(super) fn remote_pty_supported() -> bool {
31+
cfg!(all(target_os = "linux", any(target_arch = "x86_64", target_arch = "aarch64")))
32+
}

0 commit comments

Comments
 (0)