Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions glommio/src/iou/sqe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
ops::{Deref, DerefMut},
os::unix::io::RawFd,
ptr,
slice,
};

use super::registrar::{UringFd, UringReadBuf, UringWriteBuf};
Expand Down Expand Up @@ -606,13 +605,23 @@ bitflags::bitflags! {

/// A sequence of [`SQE`]s from the [`SubmissionQueue`][crate::SubmissionQueue].
pub struct SQEs<'ring> {
sqes: slice::IterMut<'ring, uring_sys::io_uring_sqe>,
sq: &'ring mut uring_sys::io_uring_sq,
first: u32,
count: u32,
consumed: u32,
}

impl<'ring> SQEs<'ring> {
pub(crate) fn new(slice: &'ring mut [uring_sys::io_uring_sqe]) -> SQEs<'ring> {
pub(crate) fn new(
sq: &'ring mut uring_sys::io_uring_sq,
first: u32,
count: u32,
) -> SQEs<'ring> {
SQEs {
sqes: slice.iter_mut(),
sq,
first,
count,
consumed: 0,
}
}

Expand Down Expand Up @@ -646,14 +655,23 @@ impl<'ring> SQEs<'ring> {

/// Remaining [`SQE`]s that can be modified.
pub fn remaining(&self) -> u32 {
self.sqes.len() as u32
(self.count - self.consumed) as u32
}

fn consume(&mut self) -> Option<SQE<'ring>> {
self.sqes.next().map(|sqe| {
unsafe { uring_sys::io_uring_prep_nop(sqe) }
SQE { sqe }
})
if self.consumed < self.count {
unsafe {
let sqe = self
.sq
.sqes
.offset(((self.first + self.consumed) & *self.sq.kring_mask) as isize);
uring_sys::io_uring_prep_nop(sqe);
self.consumed += 1;
Some(SQE { sqe: &mut *sqe })
}
} else {
None
}
}
}

Expand Down
10 changes: 3 additions & 7 deletions glommio/src/iou/submission_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
io,
marker::PhantomData,
ptr::NonNull,
slice,
sync::atomic::{self, Ordering},
time::Duration,
};
Expand Down Expand Up @@ -119,19 +118,16 @@ pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option<S
}
}

pub(crate) unsafe fn prepare_sqes<'a>(
sq: &mut uring_sys::io_uring_sq,
count: u32,
) -> Option<SQEs<'a>> {
pub(crate) unsafe fn prepare_sqes(sq: &mut uring_sys::io_uring_sq, count: u32) -> Option<SQEs<'_>> {
atomic::fence(Ordering::Acquire);

let head: u32 = *sq.khead;
let next: u32 = sq.sqe_tail + count;

if next - head <= *sq.kring_entries {
let sqe = sq.sqes.offset((sq.sqe_tail & *sq.kring_mask) as isize);
let first = sq.sqe_tail;
sq.sqe_tail = next;
Some(SQEs::new(slice::from_raw_parts_mut(sqe, count as usize)))
Some(SQEs::new(sq, first, count))
} else {
None
}
Expand Down
8 changes: 1 addition & 7 deletions glommio/src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,6 @@ pub(crate) enum PollableStatus {
NonPollable(DirectIO),
}

#[derive(Debug, Copy, Clone)]
pub(crate) enum LinkStatus {
Freestanding,
Linked,
}

#[derive(Debug)]
pub(crate) enum SourceType {
Write(PollableStatus, IOBuffer),
Expand All @@ -373,7 +367,7 @@ pub(crate) enum SourceType {
FdataSync,
Fallocate,
Close,
LinkRings(LinkStatus),
LinkRings,
Statx(CString, Box<RefCell<libc::statx>>),
Timeout(TimeSpec64),
Connect(SockAddr),
Expand Down
69 changes: 26 additions & 43 deletions glommio/src/sys/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::{
DirectIO,
IOBuffer,
InnerSource,
LinkStatus,
PollableStatus,
Source,
SourceType,
Expand Down Expand Up @@ -790,10 +789,6 @@ impl Source {
self.inner.source_type.borrow_mut()
}

pub(crate) fn update_source_type(&self, source_type: SourceType) -> SourceType {
self.inner.update_source_type(source_type)
}

pub(crate) fn extract_source_type(&self) -> SourceType {
self.inner.update_source_type(SourceType::Invalid)
}
Expand Down Expand Up @@ -926,30 +921,21 @@ impl SleepableRing {
}

fn sleep(&mut self, link: &mut Source, eventfd_src: &Source) -> io::Result<usize> {
let is_freestanding = match &*link.source_type() {
SourceType::LinkRings(LinkStatus::Linked) => false, // nothing to do
SourceType::LinkRings(LinkStatus::Freestanding) => true,
_ => panic!("Unexpected source type when linking rings"),
};

if is_freestanding {
if let Some(mut sqe) = self.ring.prepare_sqe() {
self.waiting_submission += 1;
link.update_source_type(SourceType::LinkRings(LinkStatus::Linked));
if let Some(mut sqe) = self.ring.prepare_sqe() {
self.waiting_submission += 1;

let op = UringDescriptor {
fd: link.raw(),
flags: SubmissionFlags::empty(),
user_data: to_user_data(add_source(link, self.submission_queue.clone())),
args: UringOpDescriptor::PollAdd(common_flags() | read_flags()),
};
fill_sqe(&mut sqe, &op, DmaBuffer::new);
} else {
// Can't link rings because we ran out of CQEs. Just can't sleep.
// Submit what we have, once we're out of here we'll consume them
// and at some point will be able to sleep again.
return self.ring.submit_sqes().map(|x| x as usize);
}
let op = UringDescriptor {
fd: link.raw(),
flags: SubmissionFlags::empty(),
user_data: to_user_data(add_source(link, self.submission_queue.clone())),
args: UringOpDescriptor::PollAdd(common_flags() | read_flags()),
};
fill_sqe(&mut sqe, &op, DmaBuffer::new);
} else {
// Can't link rings because we ran out of CQEs. Just can't sleep.
// Submit what we have, once we're out of here we'll consume them
// and at some point will be able to sleep again.
return self.ring.submit_sqes().map(|x| x as usize);
}

let res = eventfd_src.take_result();
Expand Down Expand Up @@ -1002,13 +988,7 @@ impl UringCommon for SleepableRing {
process_one_event(
self.ring.peek_for_cqe(),
|source| match &mut *source.source_type.borrow_mut() {
SourceType::LinkRings(status @ LinkStatus::Linked) => {
*status = LinkStatus::Freestanding;
Some(())
}
SourceType::LinkRings(LinkStatus::Freestanding) => {
panic!("Impossible to have an event firing like this");
}
SourceType::LinkRings => Some(()),
SourceType::Timeout(_) => Some(()),
_ => None,
},
Expand Down Expand Up @@ -1062,9 +1042,10 @@ pub(crate) struct Reactor {
latency_ring: RefCell<SleepableRing>,
poll_ring: RefCell<PollRing>,

link_rings_src: RefCell<Source>,
timeout_src: Cell<Option<Source>>,

link_fd: RawFd,

// This keeps the eventfd alive. Drop will close it when we're done
notifier: Arc<sys::SleepNotifier>,
// This is the source used to handle the notifications into the ring
Expand Down Expand Up @@ -1158,11 +1139,6 @@ impl Reactor {

let latency_ring = SleepableRing::new(128, "latency", allocator.clone())?;
let link_fd = latency_ring.ring_fd();
let link_rings_src = Source::new(
IoRequirements::default(),
link_fd,
SourceType::LinkRings(LinkStatus::Freestanding),
);

let eventfd_src = Source::new(
IoRequirements::default(),
Expand All @@ -1175,8 +1151,8 @@ impl Reactor {
main_ring: RefCell::new(main_ring),
latency_ring: RefCell::new(latency_ring),
poll_ring: RefCell::new(poll_ring),
link_rings_src: RefCell::new(link_rings_src),
timeout_src: Cell::new(None),
link_fd,
notifier,
eventfd_src,
})
Expand Down Expand Up @@ -1344,7 +1320,11 @@ impl Reactor {
ring: &mut SleepableRing,
eventfd_src: &Source,
) -> io::Result<()> {
let mut link_rings = self.link_rings_src.borrow_mut();
let mut link_rings = Source::new(
IoRequirements::default(),
self.link_fd,
SourceType::LinkRings,
);
ring.sleep(&mut link_rings, eventfd_src)
.or_else(Self::busy_ok)?;
Ok(())
Expand Down Expand Up @@ -1467,6 +1447,9 @@ impl Reactor {
.expect("some error");
// woke up, so no need to notify us anymore.
self.notifier.wake_up();
// may have new cancellations related to the link ring fd.
flush_cancellations!(into wakers; main_ring);
flush_rings!(main_ring)?;
consume_rings!(into wakers; lat_ring, poll_ring, main_ring);
}
}
Expand Down