Skip to content

Commit 7063546

Browse files
committed
refactor!: use try_next() more
I have looked through the code to make sure errors are not ignored, especially in loops. Ignoring errors in loops when reading from streams may result in infinite loop reading the same error over and over again. No bugs found, but I refactored reading from streams to use `try_next()` more to bubble up the errors with `?` as soon as possible. This is a breaking change since `read_response()` is resultified to return `Result<Option<_>>` instead of `Option<Result<_>>`. `read_response()` is a public interface that is used by library users to read the banner.
1 parent 155c375 commit 7063546

File tree

6 files changed

+78
-95
lines changed

6 files changed

+78
-95
lines changed

examples/src/bin/integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async fn session(user: &str) -> Result<Session<async_native_tls::TlsStream<TcpSt
3131
let mut client = async_imap::Client::new(tls_stream);
3232
let _greeting = client
3333
.read_response()
34-
.await
34+
.await?
3535
.context("unexpected end of stream, expected greeting")?;
3636

3737
let session = client
@@ -48,7 +48,7 @@ async fn _connect_insecure_then_secure() -> Result<()> {
4848
let mut client = async_imap::Client::new(tcp_stream);
4949
let _greeting = client
5050
.read_response()
51-
.await
51+
.await?
5252
.context("unexpected end of stream, expected greeting")?;
5353
client.run_command_and_check_ok("STARTTLS", None).await?;
5454
let stream = client.into_inner();

src/client.rs

Lines changed: 59 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use async_std::io::{Read, Write, WriteExt};
1010
use base64::Engine as _;
1111
use extensions::id::{format_identification, parse_id};
1212
use extensions::quota::parse_get_quota_root;
13-
use futures::{io, Stream, StreamExt};
13+
use futures::{io, Stream, TryStreamExt};
1414
use imap_proto::{Metadata, RequestId, Response};
1515
#[cfg(feature = "runtime-tokio")]
1616
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt};
@@ -122,7 +122,7 @@ macro_rules! ok_or_unauth_client_err {
122122
($r:expr, $self:expr) => {
123123
match $r {
124124
Ok(o) => o,
125-
Err(e) => return Err((e, $self)),
125+
Err(e) => return Err((e.into(), $self)),
126126
}
127127
};
128128
}
@@ -262,42 +262,37 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Client<T> {
262262
// explicit match blocks neccessary to convert error to tuple and not bind self too
263263
// early (see also comment on `login`)
264264
loop {
265-
if let Some(res) = self.read_response().await {
266-
let res = ok_or_unauth_client_err!(res.map_err(Into::into), self);
267-
match res.parsed() {
268-
Response::Continue { information, .. } => {
269-
let challenge = if let Some(text) = information {
270-
ok_or_unauth_client_err!(
271-
base64::engine::general_purpose::STANDARD
272-
.decode(text.as_ref())
273-
.map_err(|e| Error::Parse(ParseError::Authentication(
274-
(*text).to_string(),
275-
Some(e)
276-
))),
277-
self
278-
)
279-
} else {
280-
Vec::new()
281-
};
282-
let raw_response = &mut authenticator.process(&challenge);
283-
let auth_response =
284-
base64::engine::general_purpose::STANDARD.encode(raw_response);
285-
286-
ok_or_unauth_client_err!(
287-
self.conn.run_command_untagged(&auth_response).await,
288-
self
289-
);
290-
}
291-
_ => {
265+
let Some(res) = ok_or_unauth_client_err!(self.read_response().await, self) else {
266+
return Err((Error::ConnectionLost, self));
267+
};
268+
match res.parsed() {
269+
Response::Continue { information, .. } => {
270+
let challenge = if let Some(text) = information {
292271
ok_or_unauth_client_err!(
293-
self.check_done_ok_from(&id, None, res).await,
272+
base64::engine::general_purpose::STANDARD
273+
.decode(text.as_ref())
274+
.map_err(|e| Error::Parse(ParseError::Authentication(
275+
(*text).to_string(),
276+
Some(e)
277+
))),
294278
self
295-
);
296-
return Ok(Session::new(self.conn));
297-
}
279+
)
280+
} else {
281+
Vec::new()
282+
};
283+
let raw_response = &mut authenticator.process(&challenge);
284+
let auth_response =
285+
base64::engine::general_purpose::STANDARD.encode(raw_response);
286+
287+
ok_or_unauth_client_err!(
288+
self.conn.run_command_untagged(&auth_response).await,
289+
self
290+
);
291+
}
292+
_ => {
293+
ok_or_unauth_client_err!(self.check_done_ok_from(&id, None, res).await, self);
294+
return Ok(Session::new(self.conn));
298295
}
299-
} else {
300-
return Err((Error::ConnectionLost, self));
301296
}
302297
}
303298
}
@@ -975,12 +970,13 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
975970
mailbox_pattern.unwrap_or("\"\"")
976971
))
977972
.await?;
978-
979-
Ok(parse_names(
973+
let names = parse_names(
980974
&mut self.conn.stream,
981975
self.unsolicited_responses_tx.clone(),
982976
id,
983-
))
977+
);
978+
979+
Ok(names)
984980
}
985981

986982
/// The [`LSUB` command](https://tools.ietf.org/html/rfc3501#section-6.3.9) returns a subset of
@@ -1136,23 +1132,20 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
11361132
))
11371133
.await?;
11381134

1139-
match self.read_response().await {
1140-
Some(Ok(res)) => {
1141-
if let Response::Continue { .. } = res.parsed() {
1142-
self.stream.as_mut().write_all(content).await?;
1143-
self.stream.as_mut().write_all(b"\r\n").await?;
1144-
self.stream.flush().await?;
1145-
self.conn
1146-
.check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
1147-
.await?;
1148-
Ok(())
1149-
} else {
1150-
Err(Error::Append)
1151-
}
1152-
}
1153-
Some(Err(err)) => Err(err.into()),
1154-
_ => Err(Error::Append),
1155-
}
1135+
let Some(res) = self.read_response().await? else {
1136+
return Err(Error::Append);
1137+
};
1138+
let Response::Continue { .. } = res.parsed() else {
1139+
return Err(Error::Append);
1140+
};
1141+
1142+
self.stream.as_mut().write_all(content).await?;
1143+
self.stream.as_mut().write_all(b"\r\n").await?;
1144+
self.stream.flush().await?;
1145+
self.conn
1146+
.check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
1147+
.await?;
1148+
Ok(())
11561149
}
11571150

11581151
/// The [`SEARCH` command](https://tools.ietf.org/html/rfc3501#section-6.4.4) searches the
@@ -1352,7 +1345,7 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
13521345
}
13531346

13541347
/// Read the next response on the connection.
1355-
pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
1348+
pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
13561349
self.conn.read_response().await
13571350
}
13581351
}
@@ -1377,8 +1370,8 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
13771370
}
13781371

13791372
/// Read the next response on the connection.
1380-
pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
1381-
self.stream.next().await
1373+
pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
1374+
self.stream.try_next().await
13821375
}
13831376

13841377
pub(crate) async fn run_command_untagged(&mut self, command: &str) -> Result<()> {
@@ -1415,8 +1408,8 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
14151408
id: &RequestId,
14161409
unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
14171410
) -> Result<()> {
1418-
if let Some(first_res) = self.stream.next().await {
1419-
self.check_done_ok_from(id, unsolicited, first_res?).await
1411+
if let Some(first_res) = self.stream.try_next().await? {
1412+
self.check_done_ok_from(id, unsolicited, first_res).await
14201413
} else {
14211414
Err(Error::ConnectionLost)
14221415
}
@@ -1447,11 +1440,10 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
14471440
handle_unilateral(response, unsolicited);
14481441
}
14491442

1450-
if let Some(res) = self.stream.next().await {
1451-
response = res?;
1452-
} else {
1443+
let Some(res) = self.stream.try_next().await? else {
14531444
return Err(Error::ConnectionLost);
1454-
}
1445+
};
1446+
response = res;
14551447
}
14561448
}
14571449

@@ -1495,6 +1487,7 @@ mod tests {
14951487
use std::future::Future;
14961488

14971489
use async_std::sync::{Arc, Mutex};
1490+
use futures::StreamExt;
14981491
use imap_proto::Status;
14991492

15001493
macro_rules! mock_client {
@@ -1555,7 +1548,7 @@ mod tests {
15551548
async fn readline_eof() {
15561549
let mock_stream = MockStream::default().with_eof();
15571550
let mut client = mock_client!(mock_stream);
1558-
let res = client.read_response().await;
1551+
let res = client.read_response().await.unwrap();
15591552
assert!(res.is_none());
15601553
}
15611554

@@ -2117,7 +2110,7 @@ mod tests {
21172110
.unwrap();
21182111

21192112
// Unexpected EOF.
2120-
let err = fetch_result.next().await.unwrap().unwrap_err();
2113+
let err = fetch_result.try_next().await.unwrap_err();
21212114
let Error::Io(io_err) = err else {
21222115
panic!("Unexpected error type: {err}")
21232116
};

src/extensions/id.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,9 @@ pub(crate) async fn parse_id<T: Stream<Item = io::Result<ResponseData>> + Unpin>
4343
let mut id = None;
4444
while let Some(resp) = stream
4545
.take_while(|res| filter(res, &command_tag))
46-
.next()
47-
.await
46+
.try_next()
47+
.await?
4848
{
49-
let resp = resp?;
5049
match resp.parsed() {
5150
Response::Id(res) => {
5251
id = res.as_ref().map(|m| {

src/extensions/idle.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
182182
pub async fn init(&mut self) -> Result<()> {
183183
let id = self.session.run_command("IDLE").await?;
184184
self.id = Some(id);
185-
while let Some(res) = self.session.stream.next().await {
186-
let res = res?;
185+
while let Some(res) = self.session.stream.try_next().await? {
187186
match res.parsed() {
188187
Response::Continue { .. } => {
189188
return Ok(());

src/extensions/quota.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ pub(crate) async fn parse_get_quota<T: Stream<Item = io::Result<ResponseData>> +
2323
let mut quota = None;
2424
while let Some(resp) = stream
2525
.take_while(|res| filter(res, &command_tag))
26-
.next()
27-
.await
26+
.try_next()
27+
.await?
2828
{
29-
let resp = resp?;
3029
match resp.parsed() {
3130
Response::Quota(q) => quota = Some(q.clone().into()),
3231
_ => {
@@ -53,10 +52,9 @@ pub(crate) async fn parse_get_quota_root<T: Stream<Item = io::Result<ResponseDat
5352

5453
while let Some(resp) = stream
5554
.take_while(|res| filter(res, &command_tag))
56-
.next()
57-
.await
55+
.try_next()
56+
.await?
5857
{
59-
let resp = resp?;
6058
match resp.parsed() {
6159
Response::QuotaRoot(qr) => {
6260
roots.push(qr.clone().into());

src/parse.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ pub(crate) async fn parse_status<T: Stream<Item = io::Result<ResponseData>> + Un
9999
) -> Result<Mailbox> {
100100
let mut mbox = Mailbox::default();
101101

102-
while let Some(resp) = stream.next().await {
103-
let resp = resp?;
102+
while let Some(resp) = stream.try_next().await? {
104103
match resp.parsed() {
105104
Response::Done {
106105
tag,
@@ -192,10 +191,9 @@ pub(crate) async fn parse_capabilities<T: Stream<Item = io::Result<ResponseData>
192191

193192
while let Some(resp) = stream
194193
.take_while(|res| filter(res, &command_tag))
195-
.next()
196-
.await
194+
.try_next()
195+
.await?
197196
{
198-
let resp = resp?;
199197
match resp.parsed() {
200198
Response::Capabilities(cs) => {
201199
for c in cs {
@@ -218,10 +216,9 @@ pub(crate) async fn parse_noop<T: Stream<Item = io::Result<ResponseData>> + Unpi
218216
) -> Result<()> {
219217
while let Some(resp) = stream
220218
.take_while(|res| filter(res, &command_tag))
221-
.next()
222-
.await
219+
.try_next()
220+
.await?
223221
{
224-
let resp = resp?;
225222
handle_unilateral(resp, unsolicited.clone());
226223
}
227224

@@ -235,8 +232,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U
235232
) -> Result<Mailbox> {
236233
let mut mailbox = Mailbox::default();
237234

238-
while let Some(resp) = stream.next().await {
239-
let resp = resp?;
235+
while let Some(resp) = stream.try_next().await? {
240236
match resp.parsed() {
241237
Response::Done {
242238
tag,
@@ -345,10 +341,9 @@ pub(crate) async fn parse_ids<T: Stream<Item = io::Result<ResponseData>> + Unpin
345341

346342
while let Some(resp) = stream
347343
.take_while(|res| filter(res, &command_tag))
348-
.next()
349-
.await
344+
.try_next()
345+
.await?
350346
{
351-
let resp = resp?;
352347
match resp.parsed() {
353348
Response::MailboxData(MailboxDatum::Search(cs)) => {
354349
for c in cs {
@@ -374,10 +369,9 @@ pub(crate) async fn parse_metadata<T: Stream<Item = io::Result<ResponseData>> +
374369
let mut res_values = Vec::new();
375370
while let Some(resp) = stream
376371
.take_while(|res| filter(res, &command_tag))
377-
.next()
378-
.await
372+
.try_next()
373+
.await?
379374
{
380-
let resp = resp?;
381375
match resp.parsed() {
382376
// METADATA Response with Values
383377
// <https://datatracker.ietf.org/doc/html/rfc5464.html#section-4.4.1>

0 commit comments

Comments
 (0)