Skip to content

Commit 849f780

Browse files
committed
Track individual sessions (local and remote id) for each operation
This allows multiple operations to be performed simultaneously
1 parent 39a7f0a commit 849f780

File tree

7 files changed

+95
-87
lines changed

7 files changed

+95
-87
lines changed

adb_client/src/device/adb_message_device.rs

Lines changed: 56 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ use super::{ADBTransportMessage, MessageCommand, models::MessageSubcommand};
1111
#[derive(Debug)]
1212
pub struct ADBMessageDevice<T: ADBMessageTransport> {
1313
transport: T,
14-
local_id: Option<u32>,
15-
remote_id: Option<u32>,
14+
}
15+
16+
#[derive(Debug, Clone, Copy)]
17+
pub struct ADBSession {
18+
pub local_id: u32,
19+
pub remote_id: u32,
1620
}
1721

1822
impl<T: ADBMessageTransport> ADBMessageDevice<T> {
1923
/// Instantiate a new [`ADBMessageTransport`]
2024
pub fn new(transport: T) -> Self {
21-
Self {
22-
transport,
23-
local_id: None,
24-
remote_id: None,
25-
}
25+
Self { transport }
2626
}
2727

2828
pub(crate) fn get_transport(&mut self) -> &T {
@@ -34,12 +34,15 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
3434
}
3535

3636
/// Receive a message and acknowledge it by replying with an `OKAY` command
37-
pub(crate) fn recv_and_reply_okay(&mut self) -> Result<ADBTransportMessage> {
37+
pub(crate) fn recv_and_reply_okay(
38+
&mut self,
39+
session: ADBSession,
40+
) -> Result<ADBTransportMessage> {
3841
let message = self.transport.read_message()?;
3942
self.transport.write_message(ADBTransportMessage::new(
4043
MessageCommand::Okay,
41-
self.get_local_id()?,
42-
self.get_remote_id()?,
44+
session.local_id,
45+
session.remote_id,
4346
&[],
4447
))?;
4548
Ok(message)
@@ -60,11 +63,12 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
6063

6164
pub(crate) fn recv_file<W: std::io::Write>(
6265
&mut self,
66+
session: ADBSession,
6367
mut output: W,
6468
) -> std::result::Result<(), RustADBError> {
6569
let mut len: Option<u64> = None;
6670
loop {
67-
let payload = self.recv_and_reply_okay()?.into_payload();
71+
let payload = self.recv_and_reply_okay(session)?.into_payload();
6872
let mut rdr = Cursor::new(&payload);
6973
while rdr.position() != payload.len() as u64 {
7074
match len.take() {
@@ -97,8 +101,7 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
97101

98102
pub(crate) fn push_file<R: std::io::Read>(
99103
&mut self,
100-
local_id: u32,
101-
remote_id: u32,
104+
session: ADBSession,
102105
mut reader: R,
103106
) -> std::result::Result<(), RustADBError> {
104107
let mut buffer = [0; BUFFER_SIZE];
@@ -111,8 +114,8 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
111114

112115
let message = ADBTransportMessage::new(
113116
MessageCommand::Write,
114-
local_id,
115-
remote_id,
117+
session.local_id,
118+
session.remote_id,
116119
&serialized_message,
117120
);
118121

@@ -131,8 +134,8 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
131134

132135
let message = ADBTransportMessage::new(
133136
MessageCommand::Write,
134-
local_id,
135-
remote_id,
137+
session.local_id,
138+
session.remote_id,
136139
&serialized_message,
137140
);
138141

@@ -159,8 +162,8 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
159162

160163
let message = ADBTransportMessage::new(
161164
MessageCommand::Write,
162-
local_id,
163-
remote_id,
165+
session.local_id,
166+
session.remote_id,
164167
&serialized_message,
165168
);
166169

@@ -173,24 +176,27 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
173176
}
174177
}
175178

176-
pub(crate) fn begin_synchronization(&mut self) -> Result<()> {
177-
self.open_session(b"sync:\0")?;
178-
Ok(())
179+
pub(crate) fn begin_synchronization(&mut self) -> Result<ADBSession> {
180+
self.open_session(b"sync:\0")
179181
}
180182

181-
pub(crate) fn stat_with_explicit_ids(&mut self, remote_path: &str) -> Result<AdbStatResponse> {
183+
pub(crate) fn stat_with_explicit_ids(
184+
&mut self,
185+
session: ADBSession,
186+
remote_path: &str,
187+
) -> Result<AdbStatResponse> {
182188
let stat_buffer = MessageSubcommand::Stat.with_arg(remote_path.len() as u32);
183189
let message = ADBTransportMessage::new(
184190
MessageCommand::Write,
185-
self.get_local_id()?,
186-
self.get_remote_id()?,
191+
session.local_id,
192+
session.remote_id,
187193
&bincode::serialize(&stat_buffer).map_err(|_e| RustADBError::ConversionError)?,
188194
);
189195
self.send_and_expect_okay(message)?;
190196
self.send_and_expect_okay(ADBTransportMessage::new(
191197
MessageCommand::Write,
192-
self.get_local_id()?,
193-
self.get_remote_id()?,
198+
session.local_id,
199+
session.remote_id,
194200
remote_path.as_bytes(),
195201
))?;
196202
let response = self.transport.read_message()?;
@@ -200,46 +206,51 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
200206
.map_err(|_e| RustADBError::ConversionError)
201207
}
202208

203-
pub(crate) fn end_transaction(&mut self) -> Result<()> {
209+
pub(crate) fn end_transaction(&mut self, session: ADBSession) -> Result<()> {
204210
let quit_buffer = MessageSubcommand::Quit.with_arg(0u32);
205211
self.send_and_expect_okay(ADBTransportMessage::new(
206212
MessageCommand::Write,
207-
self.get_local_id()?,
208-
self.get_remote_id()?,
213+
session.local_id,
214+
session.remote_id,
209215
&bincode::serialize(&quit_buffer).map_err(|_e| RustADBError::ConversionError)?,
210216
))?;
211217
let _discard_close = self.transport.read_message()?;
212218
Ok(())
213219
}
214220

215-
pub(crate) fn open_session(&mut self, data: &[u8]) -> Result<ADBTransportMessage> {
221+
pub(crate) fn open_session(&mut self, data: &[u8]) -> Result<ADBSession> {
216222
let mut rng = rand::rng();
223+
let local_id: u32 = rng.random();
217224

218225
let message = ADBTransportMessage::new(
219226
MessageCommand::Open,
220-
rng.random(), // Our 'local-id'
227+
local_id, // Our 'local-id'
221228
0,
222229
data,
223230
);
224231
self.get_transport_mut().write_message(message)?;
225232

226233
let response = self.get_transport_mut().read_message()?;
227234

228-
self.local_id = Some(response.header().arg1());
229-
self.remote_id = Some(response.header().arg0());
235+
if response.header().command() != MessageCommand::Okay {
236+
return Err(RustADBError::ADBRequestFailed(format!(
237+
"Open session failed: got {} in respone instead of OKAY",
238+
response.header().command()
239+
)));
240+
}
230241

231-
Ok(response)
232-
}
242+
if response.header().arg1() != local_id {
243+
return Err(RustADBError::ADBRequestFailed(format!(
244+
"Open session failed: respones used {} for our local_id instead of {local_id}",
245+
response.header().arg1()
246+
)));
247+
}
233248

234-
pub(crate) fn get_local_id(&self) -> Result<u32> {
235-
self.local_id.ok_or(RustADBError::ADBRequestFailed(
236-
"connection not opened, no local_id".into(),
237-
))
238-
}
249+
let session = ADBSession {
250+
local_id,
251+
remote_id: response.header().arg0(),
252+
};
239253

240-
pub(crate) fn get_remote_id(&self) -> Result<u32> {
241-
self.remote_id.ok_or(RustADBError::ADBRequestFailed(
242-
"connection not opened, no remote_id".into(),
243-
))
254+
Ok(session)
244255
}
245256
}

adb_client/src/device/commands/framebuffer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ use crate::{
1111

1212
impl<T: ADBMessageTransport> ADBMessageDevice<T> {
1313
pub(crate) fn framebuffer_inner(&mut self) -> Result<ImageBuffer<Rgba<u8>, Vec<u8>>> {
14-
self.open_session(b"framebuffer:\0")?;
14+
let session = self.open_session(b"framebuffer:\0")?;
1515

16-
let response = self.recv_and_reply_okay()?;
16+
let response = self.recv_and_reply_okay(session)?;
1717

1818
let mut payload_cursor = Cursor::new(response.payload());
1919

@@ -36,7 +36,7 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
3636
break;
3737
}
3838

39-
let response = self.recv_and_reply_okay()?;
39+
let response = self.recv_and_reply_okay(session)?;
4040

4141
framebuffer_data.extend_from_slice(&response.into_payload());
4242

@@ -69,7 +69,7 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
6969
break;
7070
}
7171

72-
let response = self.recv_and_reply_okay()?;
72+
let response = self.recv_and_reply_okay(session)?;
7373

7474
framebuffer_data.extend_from_slice(&response.into_payload());
7575

adb_client/src/device/commands/install.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
1414

1515
let file_size = apk_file.metadata()?.len();
1616

17-
self.open_session(format!("exec:cmd package 'install' -S {}\0", file_size).as_bytes())?;
17+
let session =
18+
self.open_session(format!("exec:cmd package 'install' -S {}\0", file_size).as_bytes())?;
1819

1920
let transport = self.get_transport().clone();
2021

21-
let mut writer = MessageWriter::new(transport, self.get_local_id()?, self.get_remote_id()?);
22+
let mut writer = MessageWriter::new(transport, session.local_id, session.remote_id);
2223

2324
std::io::copy(&mut apk_file, &mut writer)?;
2425

adb_client/src/device/commands/pull.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,24 @@ use crate::{
1010

1111
impl<T: ADBMessageTransport> ADBMessageDevice<T> {
1212
pub(crate) fn pull<A: AsRef<str>, W: Write>(&mut self, source: A, output: W) -> Result<()> {
13-
self.begin_synchronization()?;
13+
let session = self.begin_synchronization()?;
1414
let source = source.as_ref();
1515

16-
let adb_stat_response = self.stat_with_explicit_ids(source)?;
16+
let adb_stat_response = self.stat_with_explicit_ids(session, source)?;
1717

1818
if adb_stat_response.file_perm == 0 {
1919
return Err(RustADBError::UnknownResponseType(
2020
"mode is 0: source file does not exist".to_string(),
2121
));
2222
}
2323

24-
let local_id = self.get_local_id()?;
25-
let remote_id = self.get_remote_id()?;
26-
2724
self.get_transport_mut().write_message_with_timeout(
28-
ADBTransportMessage::new(MessageCommand::Okay, local_id, remote_id, &[]),
25+
ADBTransportMessage::new(
26+
MessageCommand::Okay,
27+
session.local_id,
28+
session.remote_id,
29+
&[],
30+
),
2931
std::time::Duration::from_secs(4),
3032
)?;
3133

@@ -34,19 +36,19 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
3436
bincode::serialize(&recv_buffer).map_err(|_e| RustADBError::ConversionError)?;
3537
self.send_and_expect_okay(ADBTransportMessage::new(
3638
MessageCommand::Write,
37-
self.get_local_id()?,
38-
self.get_remote_id()?,
39+
session.local_id,
40+
session.remote_id,
3941
&recv_buffer,
4042
))?;
4143
self.send_and_expect_okay(ADBTransportMessage::new(
4244
MessageCommand::Write,
43-
self.get_local_id()?,
44-
self.get_remote_id()?,
45+
session.local_id,
46+
session.remote_id,
4547
source.as_bytes(),
4648
))?;
4749

48-
self.recv_file(output)?;
49-
self.end_transaction()?;
50+
self.recv_file(session, output)?;
51+
self.end_transaction(session)?;
5052
Ok(())
5153
}
5254
}

adb_client/src/device/commands/push.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010

1111
impl<T: ADBMessageTransport> ADBMessageDevice<T> {
1212
pub(crate) fn push<R: Read, A: AsRef<str>>(&mut self, stream: R, path: A) -> Result<()> {
13-
self.begin_synchronization()?;
13+
let session = self.begin_synchronization()?;
1414

1515
let path_header = format!("{},0777", path.as_ref());
1616

@@ -21,14 +21,13 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
2121

2222
self.send_and_expect_okay(ADBTransportMessage::new(
2323
MessageCommand::Write,
24-
self.get_local_id()?,
25-
self.get_remote_id()?,
24+
session.local_id,
25+
session.remote_id,
2626
&send_buffer,
2727
))?;
2828

29-
self.push_file(self.get_local_id()?, self.get_remote_id()?, stream)?;
30-
31-
self.end_transaction()?;
29+
self.push_file(session, stream)?;
30+
self.end_transaction(session)?;
3231

3332
Ok(())
3433
}

adb_client/src/device/commands/shell.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,7 @@ use crate::{
1010
impl<T: ADBMessageTransport> ADBMessageDevice<T> {
1111
/// Runs 'command' in a shell on the device, and write its output and error streams into output.
1212
pub(crate) fn shell_command(&mut self, command: &[&str], output: &mut dyn Write) -> Result<()> {
13-
let response = self.open_session(format!("shell:{}\0", command.join(" "),).as_bytes())?;
14-
15-
if response.header().command() != MessageCommand::Okay {
16-
return Err(RustADBError::ADBRequestFailed(format!(
17-
"wrong command {}",
18-
response.header().command()
19-
)));
20-
}
13+
let session = self.open_session(format!("shell:{}\0", command.join(" "),).as_bytes())?;
2114

2215
loop {
2316
let response = self.get_transport_mut().read_message()?;
@@ -38,21 +31,22 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
3831
mut reader: &mut dyn Read,
3932
mut writer: Box<(dyn Write + Send)>,
4033
) -> Result<()> {
41-
self.open_session(b"shell:\0")?;
34+
let session = self.open_session(b"shell:\0")?;
4235

4336
let mut transport = self.get_transport().clone();
4437

45-
let local_id = self.get_local_id()?;
46-
let remote_id = self.get_remote_id()?;
47-
4838
// Reading thread, reads response from adbd
4939
std::thread::spawn(move || -> Result<()> {
5040
loop {
5141
let message = transport.read_message()?;
5242

5343
// Acknowledge for more data
54-
let response =
55-
ADBTransportMessage::new(MessageCommand::Okay, local_id, remote_id, &[]);
44+
let response = ADBTransportMessage::new(
45+
MessageCommand::Okay,
46+
session.local_id,
47+
session.remote_id,
48+
&[],
49+
);
5650
transport.write_message(response)?;
5751

5852
match message.header().command() {
@@ -67,7 +61,8 @@ impl<T: ADBMessageTransport> ADBMessageDevice<T> {
6761
});
6862

6963
let transport = self.get_transport().clone();
70-
let mut shell_writer = ShellMessageWriter::new(transport, local_id, remote_id);
64+
let mut shell_writer =
65+
ShellMessageWriter::new(transport, session.local_id, session.remote_id);
7166

7267
// Read from given reader (that could be stdin e.g), and write content to device adbd
7368
if let Err(e) = std::io::copy(&mut reader, &mut shell_writer) {

0 commit comments

Comments
 (0)