Skip to content

Commit 52bc6b6

Browse files
authored
fs: reduce blocking ops in fs::read_dir (tokio-rs#5653)
1 parent f478ff4 commit 52bc6b6

File tree

1 file changed

+22
-25
lines changed

1 file changed

+22
-25
lines changed

tokio/src/fs/read_dir.rs

+22-25
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ const CHUNK_SIZE: usize = 32;
3333
pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
3434
let path = path.as_ref().to_owned();
3535
asyncify(|| -> io::Result<ReadDir> {
36-
let mut std = std::fs::read_dir(path)?.fuse();
36+
let mut std = std::fs::read_dir(path)?;
3737
let mut buf = VecDeque::with_capacity(CHUNK_SIZE);
38-
ReadDir::next_chunk(&mut buf, &mut std);
38+
let remain = ReadDir::next_chunk(&mut buf, &mut std);
3939

40-
Ok(ReadDir(State::Idle(Some((buf, std)))))
40+
Ok(ReadDir(State::Idle(Some((buf, std, remain)))))
4141
})
4242
.await
4343
}
@@ -64,12 +64,10 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
6464
#[must_use = "streams do nothing unless polled"]
6565
pub struct ReadDir(State);
6666

67-
type StdReadDir = std::iter::Fuse<std::fs::ReadDir>;
68-
6967
#[derive(Debug)]
7068
enum State {
71-
Idle(Option<(VecDeque<io::Result<DirEntry>>, StdReadDir)>),
72-
Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, StdReadDir)>),
69+
Idle(Option<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>),
70+
Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>),
7371
}
7472

7573
impl ReadDir {
@@ -105,38 +103,35 @@ impl ReadDir {
105103
loop {
106104
match self.0 {
107105
State::Idle(ref mut data) => {
108-
let (buf, _) = data.as_mut().unwrap();
106+
let (buf, _, ref remain) = data.as_mut().unwrap();
109107

110108
if let Some(ent) = buf.pop_front() {
111109
return Poll::Ready(ent.map(Some));
112-
};
110+
} else if !remain {
111+
return Poll::Ready(Ok(None));
112+
}
113113

114-
let (mut buf, mut std) = data.take().unwrap();
114+
let (mut buf, mut std, _) = data.take().unwrap();
115115

116116
self.0 = State::Pending(spawn_blocking(move || {
117-
ReadDir::next_chunk(&mut buf, &mut std);
118-
(buf, std)
117+
let remain = ReadDir::next_chunk(&mut buf, &mut std);
118+
(buf, std, remain)
119119
}));
120120
}
121121
State::Pending(ref mut rx) => {
122-
let (mut buf, std) = ready!(Pin::new(rx).poll(cx))?;
123-
124-
let ret = match buf.pop_front() {
125-
Some(Ok(x)) => Ok(Some(x)),
126-
Some(Err(e)) => Err(e),
127-
None => Ok(None),
128-
};
129-
130-
self.0 = State::Idle(Some((buf, std)));
131-
132-
return Poll::Ready(ret);
122+
self.0 = State::Idle(Some(ready!(Pin::new(rx).poll(cx))?));
133123
}
134124
}
135125
}
136126
}
137127

138-
fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut StdReadDir) {
139-
for ret in std.by_ref().take(CHUNK_SIZE) {
128+
fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut std::fs::ReadDir) -> bool {
129+
for _ in 0..CHUNK_SIZE {
130+
let ret = match std.next() {
131+
Some(ret) => ret,
132+
None => return false,
133+
};
134+
140135
let success = ret.is_ok();
141136

142137
buf.push_back(ret.map(|std| DirEntry {
@@ -154,6 +149,8 @@ impl ReadDir {
154149
break;
155150
}
156151
}
152+
153+
true
157154
}
158155
}
159156

0 commit comments

Comments
 (0)