diff --git a/src/completion_queue.rs b/src/completion_queue.rs index 867fb1a..aacf7ed 100644 --- a/src/completion_queue.rs +++ b/src/completion_queue.rs @@ -2,9 +2,9 @@ use std::fmt; use std::io; use std::marker::PhantomData; use std::mem::MaybeUninit; -use std::ptr::{self, NonNull}; +use std::ptr::NonNull; -use super::{IoUring, CQE, CQEs, CQEsBlocking, resultify}; +use super::{resultify, CQEs, CQEsBlocking, IoUring, CQE}; /// The queue of completed IO events. /// @@ -61,12 +61,10 @@ impl<'ring> CompletionQueue<'ring> { unsafe { let mut cqe = MaybeUninit::uninit(); - resultify(uring_sys::io_uring_wait_cqes( + resultify(uring_sys::io_uring_wait_cqe_nr( self.ring.as_ptr(), cqe.as_mut_ptr(), count as _, - ptr::null(), - ptr::null(), ))?; Ok(&mut *cqe.assume_init()) @@ -89,14 +87,17 @@ impl<'ring> CompletionQueue<'ring> { CQEsBlocking::new(self.ring, wait_for) } + /// Returns how many descriptors are ready for processing on the completion queue. pub fn ready(&self) -> u32 { unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) } } + /// Returns true if the eventfd notification is currently enabled. pub fn eventfd_enabled(&self) -> bool { unsafe { uring_sys::io_uring_cq_eventfd_enabled(self.ring.as_ptr()) } } + /// Toggle eventfd notification on or off, if an eventfd is registered with the ring. pub fn eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> { resultify(unsafe { uring_sys::io_uring_cq_eventfd_toggle(self.ring.as_ptr(), enabled) })?; Ok(()) @@ -106,9 +107,11 @@ impl<'ring> CompletionQueue<'ring> { impl fmt::Debug for CompletionQueue<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let fd = unsafe { self.ring.as_ref().ring_fd }; - f.debug_struct(std::any::type_name::()).field("fd", &fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() } } -unsafe impl<'ring> Send for CompletionQueue<'ring> { } -unsafe impl<'ring> Sync for CompletionQueue<'ring> { } +unsafe impl<'ring> Send for CompletionQueue<'ring> {} +unsafe impl<'ring> Sync for CompletionQueue<'ring> {} diff --git a/src/cqe.rs b/src/cqe.rs index bd97257..25c9a38 100644 --- a/src/cqe.rs +++ b/src/cqe.rs @@ -1,9 +1,9 @@ use std::io; use std::marker::PhantomData; use std::mem::MaybeUninit; -use std::ptr::{self, NonNull}; +use std::ptr::NonNull; -use super::{IoUring, resultify}; +use super::{resultify, IoUring}; /// A completed IO event. #[derive(Debug)] @@ -24,11 +24,16 @@ impl CQE { pub fn from_raw_parts(user_data: u64, res: i32, flags: CompletionFlags) -> CQE { CQE { - user_data, res, flags, + user_data, + res, + flags, } } - pub(crate) fn new(ring: NonNull, cqe: &mut uring_sys::io_uring_cqe) -> CQE { + pub(crate) fn new( + ring: NonNull, + cqe: &mut uring_sys::io_uring_cqe, + ) -> CQE { let user_data = cqe.user_data; let res = cqe.res; let flags = CompletionFlags::from_bits_truncate(cqe.flags); @@ -61,8 +66,8 @@ impl CQE { } } -unsafe impl Send for CQE { } -unsafe impl Sync for CQE { } +unsafe impl Send for CQE {} +unsafe impl Sync for CQE {} /// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue). /// @@ -75,7 +80,11 @@ pub struct CQEs<'a> { impl<'a> CQEs<'a> { pub(crate) fn new(ring: NonNull) -> CQEs<'a> { - CQEs { ring, ready: 0, marker: PhantomData } + CQEs { + ring, + ready: 0, + marker: PhantomData, + } } #[inline(always)] @@ -114,7 +123,6 @@ impl Iterator for CQEs<'_> { } } - /// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue). /// /// This iterator will never be exhausted; if there are no `CQE`s ready, it will block until there @@ -128,7 +136,12 @@ pub struct CQEsBlocking<'a> { impl<'a> CQEsBlocking<'a> { pub(crate) fn new(ring: NonNull, wait_for: u32) -> CQEsBlocking<'a> { - CQEsBlocking { ring, ready: 0, wait_for, marker: PhantomData } + CQEsBlocking { + ring, + ready: 0, + wait_for, + marker: PhantomData, + } } #[inline(always)] @@ -155,12 +168,10 @@ impl<'a> CQEsBlocking<'a> { unsafe { let mut cqe = MaybeUninit::uninit(); - resultify(uring_sys::io_uring_wait_cqes( + resultify(uring_sys::io_uring_wait_cqe_nr( self.ring.as_ptr(), cqe.as_mut_ptr(), self.wait_for as _, - ptr::null(), - ptr::null(), ))?; Ok(&mut *cqe.assume_init()) @@ -176,7 +187,7 @@ impl Iterator for CQEsBlocking<'_> { self.ready = self.ready(); if self.ready == 0 { let ring = self.ring; - return Some(self.wait().map(|cqe| CQE::new(ring, cqe))) + return Some(self.wait().map(|cqe| CQE::new(ring, cqe))); } } diff --git a/src/lib.rs b/src/lib.rs index c9638d5..2793ca3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,16 +60,16 @@ use std::ptr::{self, NonNull}; use std::time::Duration; #[doc(inline)] -pub use sqe::{SQE, SQEs}; +pub use cqe::{CQEs, CQEsBlocking, CQE}; #[doc(inline)] -pub use cqe::{CQE, CQEs, CQEsBlocking}; +pub use sqe::{SQEs, SQE}; pub use completion_queue::CompletionQueue; pub use submission_queue::SubmissionQueue; pub use probe::Probe; #[doc(inline)] -pub use registrar::{Registrar, Personality}; +pub use registrar::{Personality, Registrar}; bitflags::bitflags! { /// [`IoUring`] initialization flags for advanced use cases. @@ -103,9 +103,16 @@ bitflags::bitflags! { /// Force the kernel thread created with `SQPOLL` to be bound to the CPU used by the /// `SubmissionQueue`. Requires `SQPOLL` set. const SQ_AFF = 1 << 2; /* sq_thread_cpu is valid */ - + /// Create the completion queue with struct io_uring_params.cq_entries entries. + /// The value must be greater than entries, and may be rounded up to the next power-of-two. const CQSIZE = 1 << 3; + /// Clamp the values for SQ or CQ ring size to the max values instead of returning -EINVAL. const CLAMP = 1 << 4; + /// Share the asynchronous backend (kernel work thread) with an existing io_uring instance. + /// + /// If ATTACH_WQ is set, io_uring_params::wq_fd should be a valid io_uring fd, io-wq of + /// which will be shared with the newly created io_uring instance. If the flag is set + /// but it can't share io-wq, it fails. const ATTACH_WQ = 1 << 5; } } @@ -180,18 +187,31 @@ impl IoUring { /// Creates a new `IoUring` using a set of `SetupFlags` and `SetupFeatures` for advanced /// use cases. - pub fn new_with_flags(entries: u32, flags: SetupFlags, features: SetupFeatures) -> io::Result { + pub fn new_with_flags( + entries: u32, + flags: SetupFlags, + features: SetupFeatures, + ) -> io::Result { + // TODO: add Builder to support SQ_AFF and ATTACH_WQ, which needs set more fields in the + // uring_sys::io_uring_params struct. + + if flags & (SetupFlags::SQ_AFF | SetupFlags::ATTACH_WQ) != SetupFlags::empty() { + return Err(io::Error::from_raw_os_error(libc::EINVAL)); + } + unsafe { let mut params: uring_sys::io_uring_params = mem::zeroed(); params.flags = flags.bits(); params.features = features.bits(); let mut ring = MaybeUninit::uninit(); resultify(uring_sys::io_uring_queue_init_params( - entries as _, - ring.as_mut_ptr(), - &mut params, + entries as _, + ring.as_mut_ptr(), + &mut params, ))?; - Ok(IoUring { ring: ring.assume_init() }) + Ok(IoUring { + ring: ring.assume_init(), + }) } } @@ -212,27 +232,28 @@ impl IoUring { /// Returns the three constituent parts of the `IoUring`. pub fn queues(&mut self) -> (SubmissionQueue<'_>, CompletionQueue<'_>, Registrar<'_>) { - (SubmissionQueue::new(&*self), CompletionQueue::new(&*self), Registrar::new(&*self)) + ( + SubmissionQueue::new(&*self), + CompletionQueue::new(&*self), + Registrar::new(&*self), + ) } + /// Returns a probe structure to detect supported IO operations. pub fn probe(&mut self) -> io::Result { Probe::for_ring(&mut self.ring) } /// Returns the next [`SQE`] which can be prepared to submit. pub fn prepare_sqe(&mut self) -> Option> { - unsafe { - submission_queue::prepare_sqe(&mut self.ring) - } + unsafe { submission_queue::prepare_sqe(&mut self.ring) } } /// Returns the next `count` [`SQE`]s which can be prepared to submit as an iterator. /// /// See the [`SQEs`] type for more information about how these multiple SQEs can be used. pub fn prepare_sqes(&mut self, count: u32) -> Option> { - unsafe { - submission_queue::prepare_sqes(&mut self.ring.sq, count) - } + unsafe { submission_queue::prepare_sqes(&mut self.ring.sq, count) } } /// Submit all prepared [`SQE`]s to the kernel. @@ -242,16 +263,38 @@ impl IoUring { /// Submit all prepared [`SQE`]s to the kernel and wait until at least `wait_for` events have /// completed. + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. pub fn submit_sqes_and_wait(&mut self, wait_for: u32) -> io::Result { self.sq().submit_and_wait(wait_for) } - /// Submit all prepared [`SQE`]s to the kernel and wait until at least `wait_for` events have /// completed or `duration` has passed. - pub fn submit_sqes_and_wait_with_timeout(&mut self, wait_for: u32, duration: Duration) - -> io::Result - { + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. + /// + /// # Note + /// Due to the way timeout is implemented, there are two possible flaws: + /// - the timeout is unreliable. When all submission queue is full, it fallbacks to submit() + /// silently. + /// - the returned value may be bigger than expectation. There may be one extra descriptor + /// consumed by the timeout mechanism. The user data of descriptor consumed by timeout is + /// set to [`LIBURING_UDATA_TIMEOUT`](uring_sys::LIBURING_UDATA_TIMEOUT)(u64::MAX), so this + /// special value is reserved. + pub fn submit_sqes_and_wait_with_timeout( + &mut self, + wait_for: u32, + duration: Duration, + ) -> io::Result { self.sq().submit_and_wait_with_timeout(wait_for, duration) } @@ -273,20 +316,24 @@ impl IoUring { /// Block until at least one [`CQE`] is completed. This will consume that CQE. pub fn wait_for_cqe(&mut self) -> io::Result { let ring = NonNull::from(&self.ring); - self.inner_wait_for_cqes(1, ptr::null()).map(|cqe| CQE::new(ring, cqe)) + self.inner_wait_for_cqes(1, ptr::null()) + .map(|cqe| CQE::new(ring, cqe)) } /// Block until a [`CQE`] is ready or timeout. - pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration) - -> io::Result - { + /// + /// # Safety + /// The timeout is implemented by adding an IORING_OP_TIMEOUT event to the submission queue, + /// so it touches both the submission and completion queue and not multi-thread safe. + pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration) -> io::Result { let ts = uring_sys::__kernel_timespec { tv_sec: duration.as_secs() as _, - tv_nsec: duration.subsec_nanos() as _ + tv_nsec: duration.subsec_nanos() as _, }; let ring = NonNull::from(&self.ring); - self.inner_wait_for_cqes(1, &ts).map(|cqe| CQE::new(ring, cqe)) + self.inner_wait_for_cqes(1, &ts) + .map(|cqe| CQE::new(ring, cqe)) } /// Returns an iterator of [`CQE`]s which are ready from the kernel. @@ -305,12 +352,15 @@ impl IoUring { /// Wait until `count` [`CQE`]s are ready, without submitting any events. pub fn wait_for_cqes(&mut self, count: u32) -> io::Result<()> { - self.inner_wait_for_cqes(count as _, ptr::null()).map(|_| ()) + self.inner_wait_for_cqes(count as _, ptr::null()) + .map(|_| ()) } - fn inner_wait_for_cqes(&mut self, count: u32, ts: *const uring_sys::__kernel_timespec) - -> io::Result<&mut uring_sys::io_uring_cqe> - { + fn inner_wait_for_cqes( + &mut self, + count: u32, + ts: *const uring_sys::__kernel_timespec, + ) -> io::Result<&mut uring_sys::io_uring_cqe> { unsafe { let mut cqe = MaybeUninit::uninit(); @@ -334,26 +384,32 @@ impl IoUring { &mut self.ring } + /// Returns how many descriptors are ready for processing on the completion queue. pub fn cq_ready(&mut self) -> u32 { self.cq().ready() } + /// Returns the numbers of ready event descriptors on the submission queue. pub fn sq_ready(&mut self) -> u32 { self.sq().ready() } + /// Returns the numbers of available event descriptors on the submission queue. pub fn sq_space_left(&mut self) -> u32 { self.sq().space_left() } + /// Returns true if the eventfd notification is currently enabled. pub fn cq_eventfd_enabled(&mut self) -> bool { self.cq().eventfd_enabled() } + /// Toggle eventfd notification on or off, if an eventfd is registered with the ring. pub fn cq_eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> { self.cq().eventfd_toggle(enabled) } + /// Returns the RawFd for the io_uring handle. pub fn raw_fd(&self) -> RawFd { self.ring.ring_fd } @@ -361,7 +417,9 @@ impl IoUring { impl fmt::Debug for IoUring { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct(std::any::type_name::()).field("fd", &self.ring.ring_fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &self.ring.ring_fd) + .finish() } } @@ -371,13 +429,14 @@ impl Drop for IoUring { } } -unsafe impl Send for IoUring { } -unsafe impl Sync for IoUring { } +unsafe impl Send for IoUring {} +unsafe impl Sync for IoUring {} fn resultify(x: i32) -> io::Result { - match x >= 0 { - true => Ok(x as u32), - false => Err(io::Error::from_raw_os_error(-x)), + if x >= 0 { + Ok(x as u32) + } else { + Err(io::Error::from_raw_os_error(-x)) } } @@ -394,17 +453,26 @@ mod tests { let mut calls = 0; let ret = resultify(side_effect(0, &mut calls)); - assert!(match ret { Ok(0) => true, _ => false }); + assert!(match ret { + Ok(0) => true, + _ => false, + }); assert_eq!(calls, 1); calls = 0; let ret = resultify(side_effect(1, &mut calls)); - assert!(match ret { Ok(1) => true, _ => false }); + assert!(match ret { + Ok(1) => true, + _ => false, + }); assert_eq!(calls, 1); calls = 0; let ret = resultify(side_effect(-1, &mut calls)); - assert!(match ret { Err(e) if e.raw_os_error() == Some(1) => true, _ => false }); + assert!(match ret { + Err(e) if e.raw_os_error() == Some(1) => true, + _ => false, + }); assert_eq!(calls, 1); } } diff --git a/src/probe.rs b/src/probe.rs index 319dc16..1931434 100644 --- a/src/probe.rs +++ b/src/probe.rs @@ -11,17 +11,22 @@ impl Probe { pub fn new() -> io::Result { unsafe { let probe = uring_sys::io_uring_get_probe(); - NonNull::new(probe).ok_or_else(io::Error::last_os_error).map(|probe| Probe { probe }) + NonNull::new(probe) + .ok_or_else(io::Error::last_os_error) + .map(|probe| Probe { probe }) } } pub(crate) fn for_ring(ring: *mut uring_sys::io_uring) -> io::Result { unsafe { let probe = uring_sys::io_uring_get_probe_ring(ring); - NonNull::new(probe).ok_or_else(io::Error::last_os_error).map(|probe| Probe { probe }) + NonNull::new(probe) + .ok_or_else(io::Error::last_os_error) + .map(|probe| Probe { probe }) } } + /// Check whether an operation is supported by this kernel version's io-uring interface. pub fn supports(&self, op: uring_sys::IoRingOp) -> bool { unsafe { uring_sys::io_uring_opcode_supported(self.probe.as_ptr(), op as _) != 0 } } diff --git a/src/registrar/mod.rs b/src/registrar/mod.rs index 1c31fbb..266980f 100644 --- a/src/registrar/mod.rs +++ b/src/registrar/mod.rs @@ -14,10 +14,10 @@ mod registered; use std::fmt; use std::io; use std::marker::PhantomData; -use std::ptr::NonNull; use std::os::unix::io::RawFd; +use std::ptr::NonNull; -use crate::{IoUring, Probe, resultify}; +use crate::{resultify, IoUring, Probe}; pub use registered::*; @@ -57,9 +57,10 @@ impl<'ring> Registrar<'ring> { } } - pub fn register_buffers(&self, buffers: Vec>) - -> io::Result> - { + pub fn register_buffers( + &self, + buffers: Vec>, + ) -> io::Result> { let len = buffers.len(); let addr = buffers.as_ptr() as *const _; resultify(unsafe { @@ -68,13 +69,13 @@ impl<'ring> Registrar<'ring> { Ok(buffers .into_iter() .enumerate() - .map(|(i, buf)| RegisteredBuf::new(i as u32, buf)) - ) + .map(|(i, buf)| RegisteredBuf::new(i as u32, buf))) } - pub fn register_buffers_by_ref<'a>(&self, buffers: &'a [&'a [u8]]) - -> io::Result> + 'a> - { + pub fn register_buffers_by_ref<'a>( + &self, + buffers: &'a [&'a [u8]], + ) -> io::Result> + 'a> { let len = buffers.len(); let addr = buffers.as_ptr() as *const _; resultify(unsafe { @@ -83,13 +84,13 @@ impl<'ring> Registrar<'ring> { Ok(buffers .iter() .enumerate() - .map(|(i, buf)| Registered::new(i as u32, &**buf)) - ) + .map(|(i, buf)| Registered::new(i as u32, &**buf))) } - pub fn register_buffers_by_mut<'a>(&self, buffers: &'a mut [&'a mut [u8]]) - -> io::Result> + 'a> - { + pub fn register_buffers_by_mut<'a>( + &self, + buffers: &'a mut [&'a mut [u8]], + ) -> io::Result> + 'a> { let len = buffers.len(); let addr = buffers.as_ptr() as *const _; resultify(unsafe { @@ -98,22 +99,19 @@ impl<'ring> Registrar<'ring> { Ok(buffers .iter_mut() .enumerate() - .map(|(i, buf)| Registered::new(i as u32, &mut **buf)) - ) + .map(|(i, buf)| Registered::new(i as u32, &mut **buf))) } /// Unregister all currently registered buffers. An explicit call to this method is often unecessary, /// because all buffers will be unregistered automatically when the ring is dropped. pub fn unregister_buffers(&self) -> io::Result<()> { - resultify(unsafe { - uring_sys::io_uring_unregister_buffers(self.ring.as_ptr()) - })?; + resultify(unsafe { uring_sys::io_uring_unregister_buffers(self.ring.as_ptr()) })?; Ok(()) } - /// Register a set of files with the kernel. Registered files handle kernel fileset indexing + /// Register a set of files with the kernel. Registered files handle kernel fileset indexing /// behind the scenes and can often be used in place of raw file descriptors. - /// + /// /// # Errors /// Returns an error if /// * there is a preexisting set of registered files, @@ -134,20 +132,22 @@ impl<'ring> Registrar<'ring> { /// # Ok(()) /// # } /// ``` - pub fn register_files<'a>(&self, files: &'a [RawFd]) -> io::Result + 'a> { + pub fn register_files<'a>( + &self, + files: &'a [RawFd], + ) -> io::Result + 'a> { assert!(files.len() <= u32::MAX as usize); resultify(unsafe { uring_sys::io_uring_register_files( - self.ring.as_ptr(), - files.as_ptr() as *const _, - files.len() as _ + self.ring.as_ptr(), + files.as_ptr() as *const _, + files.len() as _, ) })?; Ok(files .iter() .enumerate() - .map(|(i, &fd)| RegisteredFd::new(i as u32, fd)) - ) + .map(|(i, &fd)| RegisteredFd::new(i as u32, fd))) } /// Update the currently registered kernel fileset. It is usually more efficient to reserve space @@ -157,11 +157,15 @@ impl<'ring> Registrar<'ring> { /// Returns an error if /// * there isn't a registered fileset, /// * the `files` slice was empty, - /// * `offset` is out of bounds, + /// * `offset` is out of bounds, /// * the `files` slice was too large, /// * the inner [`io_uring_register_files_update`](uring_sys::io_uring_register_files_update) call /// failed for another reason - pub fn update_registered_files<'a>(&mut self, offset: usize, files: &'a [RawFd]) -> io::Result + 'a> { + pub fn update_registered_files<'a>( + &mut self, + offset: usize, + files: &'a [RawFd], + ) -> io::Result + 'a> { assert!(files.len() + offset <= u32::MAX as usize); resultify(unsafe { uring_sys::io_uring_register_files_update( @@ -174,8 +178,7 @@ impl<'ring> Registrar<'ring> { Ok(files .iter() .enumerate() - .map(move |(i, &fd)| RegisteredFd::new((i + offset) as u32, fd)) - ) + .map(move |(i, &fd)| RegisteredFd::new((i + offset) as u32, fd))) } /// Unregister all currently registered files. An explicit call to this method is often unecessary, @@ -211,9 +214,7 @@ impl<'ring> Registrar<'ring> { } pub fn register_eventfd(&self, eventfd: RawFd) -> io::Result<()> { - resultify(unsafe { - uring_sys::io_uring_register_eventfd(self.ring.as_ptr(), eventfd) - })?; + resultify(unsafe { uring_sys::io_uring_register_eventfd(self.ring.as_ptr(), eventfd) })?; Ok(()) } @@ -225,14 +226,13 @@ impl<'ring> Registrar<'ring> { } pub fn unregister_eventfd(&self) -> io::Result<()> { - resultify(unsafe { - uring_sys::io_uring_unregister_eventfd(self.ring.as_ptr()) - })?; + resultify(unsafe { uring_sys::io_uring_unregister_eventfd(self.ring.as_ptr()) })?; Ok(()) } pub fn register_personality(&self) -> io::Result { - let id = resultify(unsafe { uring_sys::io_uring_register_personality(self.ring.as_ptr()) })?; + let id = + resultify(unsafe { uring_sys::io_uring_register_personality(self.ring.as_ptr()) })?; debug_assert!(id < u16::MAX as u32); Ok(Personality { id: id as u16 }) } @@ -252,14 +252,17 @@ impl<'ring> Registrar<'ring> { impl fmt::Debug for Registrar<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let fd = unsafe { self.ring.as_ref().ring_fd }; - f.debug_struct(std::any::type_name::()).field("fd", &fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() } } -unsafe impl<'ring> Send for Registrar<'ring> { } -unsafe impl<'ring> Sync for Registrar<'ring> { } +unsafe impl<'ring> Send for Registrar<'ring> {} +unsafe impl<'ring> Sync for Registrar<'ring> {} #[derive(Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Clone, Copy)] +/// An identity for a registered credential. pub struct Personality { pub(crate) id: u16, } @@ -318,7 +321,10 @@ mod tests { let raw_fds = [1, 2]; let ring = IoUring::new(1).unwrap(); let _ = ring.registrar().register_files(&raw_fds).unwrap(); - let _ = ring.registrar().update_registered_files(2, &raw_fds).unwrap(); + let _ = ring + .registrar() + .update_registered_files(2, &raw_fds) + .unwrap(); } #[test] @@ -326,7 +332,10 @@ mod tests { fn slice_len_out_of_bounds_update() { let ring = IoUring::new(1).unwrap(); let _ = ring.registrar().register_files(&[1, 1]).unwrap(); - let _ = ring.registrar().update_registered_files(0, &[1, 1, 1]).unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[1, 1, 1]) + .unwrap(); } #[test] @@ -334,10 +343,16 @@ mod tests { let ring = IoUring::new(1).unwrap(); let file = std::fs::File::create("tmp.txt").unwrap(); - let _ = ring.registrar().register_files(&[file.as_raw_fd()]).unwrap(); + let _ = ring + .registrar() + .register_files(&[file.as_raw_fd()]) + .unwrap(); let new_file = std::fs::File::create("new_tmp.txt").unwrap(); - let _ = ring.registrar().update_registered_files(0, &[new_file.as_raw_fd()]).unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[new_file.as_raw_fd()]) + .unwrap(); let _ = std::fs::remove_file("tmp.txt"); let _ = std::fs::remove_file("new_tmp.txt"); @@ -349,7 +364,10 @@ mod tests { let _ = ring.registrar().register_files(&[-1, -1, -1]).unwrap(); let file = std::fs::File::create("tmp.txt").unwrap(); - let _ = ring.registrar().update_registered_files(0, &[file.as_raw_fd()]).unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[file.as_raw_fd()]) + .unwrap(); let _ = std::fs::remove_file("tmp.txt"); } } diff --git a/src/registrar/registered.rs b/src/registrar/registered.rs index fab0a1f..b3b7c02 100644 --- a/src/registrar/registered.rs +++ b/src/registrar/registered.rs @@ -15,10 +15,10 @@ pub const PLACEHOLDER_FD: RawFd = -1; /// /// Submission event prep methods on `RegisteredFd` will ensure that the submission event's /// `SubmissionFlags::FIXED_FILE` flag is properly set. -pub type RegisteredFd = Registered; -pub type RegisteredBuf = Registered>; -pub type RegisteredBufRef<'a> = Registered<&'a [u8]>; -pub type RegisteredBufMut<'a> = Registered<&'a mut [u8]>; +pub type RegisteredFd = Registered; +pub type RegisteredBuf = Registered>; +pub type RegisteredBufRef<'a> = Registered<&'a [u8]>; +pub type RegisteredBufMut<'a> = Registered<&'a mut [u8]>; /// An object registered with an io-uring instance through a [`Registrar`](crate::Registrar). #[derive(Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] @@ -188,7 +188,7 @@ impl UringFd for RawFd { *self } - fn update_sqe(&self, _: &mut SQE<'_>) { } + fn update_sqe(&self, _: &mut SQE<'_>) {} } impl UringFd for RegisteredFd { @@ -197,7 +197,9 @@ impl UringFd for RegisteredFd { } fn update_sqe(&self, sqe: &mut SQE<'_>) { - unsafe { sqe.raw_mut().fd = self.index as RawFd; } + unsafe { + sqe.raw_mut().fd = self.index as RawFd; + } sqe.set_fixed_file(); } } @@ -220,7 +222,7 @@ impl UringReadBuf for RegisteredBufMut<'_> { self.data.as_mut_ptr() as _, self.data.len() as _, offset as _, - self.index() as _ + self.index() as _, ); fd.update_sqe(sqe); } @@ -286,7 +288,7 @@ impl UringWriteBuf for RegisteredBufRef<'_> { self.data.as_ptr() as _, self.data.len() as _, offset as _, - self.index() as _ + self.index() as _, ); fd.update_sqe(sqe); } diff --git a/src/sqe.rs b/src/sqe.rs index 7b6f70a..e2f044a 100644 --- a/src/sqe.rs +++ b/src/sqe.rs @@ -1,6 +1,6 @@ +use std::ffi::CStr; use std::io; use std::mem; -use std::ffi::CStr; use std::ops::{Deref, DerefMut}; use std::os::unix::io::RawFd; use std::ptr; @@ -8,12 +8,12 @@ use std::slice; use crate::registrar::{UringFd, UringReadBuf, UringWriteBuf}; -pub use nix::fcntl::{OFlag, FallocateFlags, PosixFadviseAdvice}; +pub use nix::fcntl::{FallocateFlags, OFlag, PosixFadviseAdvice}; pub use nix::poll::PollFlags; -pub use nix::sys::epoll::{EpollOp, EpollEvent}; +pub use nix::sys::epoll::{EpollEvent, EpollOp}; pub use nix::sys::mman::MmapAdvise; +pub use nix::sys::socket::{MsgFlags, SockAddr, SockFlag}; pub use nix::sys::stat::Mode; -pub use nix::sys::socket::{SockAddr, SockFlag, MsgFlags}; use crate::Personality; @@ -50,7 +50,7 @@ impl<'a> SQE<'a> { /// `SQE` may impose additional safety invariants which you must adhere to /// when setting the user_data for a submission queue event, which it may rely on when /// processing the corresponding completion queue event. For example, the library - /// [ringbahn][ringbahn] + /// [ringbahn][ringbahn] /// /// # Example /// @@ -108,12 +108,7 @@ impl<'a> SQE<'a> { /// Both the file descriptor and the buffer can be pre-registered. See the /// [`registrar][crate::registrar] module for more information. #[inline] - pub unsafe fn prep_read( - &mut self, - fd: impl UringFd, - buf: impl UringReadBuf, - offset: u64, - ) { + pub unsafe fn prep_read(&mut self, fd: impl UringFd, buf: impl UringReadBuf, offset: u64) { buf.prep_read(fd, self, offset); } @@ -142,12 +137,14 @@ impl<'a> SQE<'a> { ) { let len = buf.len(); let addr = buf.as_mut_ptr(); - uring_sys::io_uring_prep_read_fixed(self.sqe, - fd.as_raw_fd(), - addr as _, - len as _, - offset as _, - buf_index as _); + uring_sys::io_uring_prep_read_fixed( + self.sqe, + fd.as_raw_fd(), + addr as _, + len as _, + offset as _, + buf_index as _, + ); fd.update_sqe(self); } @@ -156,12 +153,7 @@ impl<'a> SQE<'a> { /// Both the file descriptor and the buffer can be pre-registered. See the /// [`registrar][crate::registrar] module for more information. #[inline] - pub unsafe fn prep_write( - &mut self, - fd: impl UringFd, - buf: impl UringWriteBuf, - offset: u64, - ) { + pub unsafe fn prep_write(&mut self, fd: impl UringFd, buf: impl UringWriteBuf, offset: u64) { buf.prep_write(fd, self, offset) } @@ -175,11 +167,7 @@ impl<'a> SQE<'a> { ) { let len = bufs.len(); let addr = bufs.as_ptr(); - uring_sys::io_uring_prep_writev(self.sqe, - fd.as_raw_fd(), - addr as _, - len as _, - offset as _); + uring_sys::io_uring_prep_writev(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _); fd.update_sqe(self); } @@ -194,12 +182,14 @@ impl<'a> SQE<'a> { ) { let len = buf.len(); let addr = buf.as_ptr(); - uring_sys::io_uring_prep_write_fixed(self.sqe, - fd.as_raw_fd(), - addr as _, - len as _, - offset as _, - buf_index as _); + uring_sys::io_uring_prep_write_fixed( + self.sqe, + fd.as_raw_fd(), + addr as _, + len as _, + offset as _, + buf_index as _, + ); fd.update_sqe(self); } @@ -221,7 +211,15 @@ impl<'a> SQE<'a> { count: u32, flags: SpliceFlags, ) { - uring_sys::io_uring_prep_splice(self.sqe, fd_in, off_in, fd_out, off_out, count, flags.bits()); + uring_sys::io_uring_prep_splice( + self.sqe, + fd_in, + off_in, + fd_out, + off_out, + count, + flags.bits(), + ); } /// Prepare a recv event on a file descriptor. @@ -243,26 +241,43 @@ impl<'a> SQE<'a> { } /// Prepare a recvmsg event on a file descriptor. - pub unsafe fn prep_recvmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) { + pub unsafe fn prep_recvmsg( + &mut self, + fd: impl UringFd, + msg: *mut libc::msghdr, + flags: MsgFlags, + ) { uring_sys::io_uring_prep_recvmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _); fd.update_sqe(self); } /// Prepare a sendmsg event on a file descriptor. - pub unsafe fn prep_sendmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) { + pub unsafe fn prep_sendmsg( + &mut self, + fd: impl UringFd, + msg: *mut libc::msghdr, + flags: MsgFlags, + ) { uring_sys::io_uring_prep_sendmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _); fd.update_sqe(self); } /// Prepare a fallocate event. #[inline] - pub unsafe fn prep_fallocate(&mut self, fd: impl UringFd, - offset: u64, size: u64, - flags: FallocateFlags) { - uring_sys::io_uring_prep_fallocate(self.sqe, fd.as_raw_fd(), - flags.bits() as _, - offset as _, - size as _); + pub unsafe fn prep_fallocate( + &mut self, + fd: impl UringFd, + offset: u64, + size: u64, + flags: FallocateFlags, + ) { + uring_sys::io_uring_prep_fallocate( + self.sqe, + fd.as_raw_fd(), + flags.bits() as _, + offset as _, + size as _, + ); fd.update_sqe(self); } @@ -276,21 +291,26 @@ impl<'a> SQE<'a> { mask: StatxMode, buf: &mut libc::statx, ) { - uring_sys::io_uring_prep_statx(self.sqe, dirfd.as_raw_fd(), path.as_ptr() as _, - flags.bits() as _, mask.bits() as _, - buf as _); + uring_sys::io_uring_prep_statx( + self.sqe, + dirfd.as_raw_fd(), + path.as_ptr() as _, + flags.bits() as _, + mask.bits() as _, + buf as _, + ); } /// Prepare an openat event. #[inline] - pub unsafe fn prep_openat( - &mut self, - fd: impl UringFd, - path: &CStr, - flags: OFlag, - mode: Mode, - ) { - uring_sys::io_uring_prep_openat(self.sqe, fd.as_raw_fd(), path.as_ptr() as _, flags.bits(), mode.bits()); + pub unsafe fn prep_openat(&mut self, fd: impl UringFd, path: &CStr, flags: OFlag, mode: Mode) { + uring_sys::io_uring_prep_openat( + self.sqe, + fd.as_raw_fd(), + path.as_ptr() as _, + flags.bits(), + mode.bits(), + ); } // TODO openat2 @@ -301,7 +321,6 @@ impl<'a> SQE<'a> { uring_sys::io_uring_prep_close(self.sqe, fd.as_raw_fd()); } - /// Prepare a timeout event. /// /// ``` @@ -324,11 +343,18 @@ impl<'a> SQE<'a> { /// # } ///``` #[inline] - pub unsafe fn prep_timeout(&mut self, ts: &uring_sys::__kernel_timespec, events: u32, flags: TimeoutFlags) { - uring_sys::io_uring_prep_timeout(self.sqe, - ts as *const _ as *mut _, - events as _, - flags.bits() as _); + pub unsafe fn prep_timeout( + &mut self, + ts: &uring_sys::__kernel_timespec, + events: u32, + flags: TimeoutFlags, + ) { + uring_sys::io_uring_prep_timeout( + self.sqe, + ts as *const _ as *mut _, + events as _, + flags.bits() as _, + ); } #[inline] @@ -360,25 +386,39 @@ impl<'a> SQE<'a> { } #[inline] - pub unsafe fn prep_accept(&mut self, fd: impl UringFd, accept: Option<&mut SockAddrStorage>, flags: SockFlag) { + pub unsafe fn prep_accept( + &mut self, + fd: impl UringFd, + accept: Option<&mut SockAddrStorage>, + flags: SockFlag, + ) { let (addr, len) = match accept { - Some(accept) => (accept.storage.as_mut_ptr() as *mut _, &mut accept.len as *mut _ as *mut _), - None => (std::ptr::null_mut(), std::ptr::null_mut()) + Some(accept) => ( + accept.storage.as_mut_ptr() as *mut _, + &mut accept.len as *mut _ as *mut _, + ), + None => (std::ptr::null_mut(), std::ptr::null_mut()), }; uring_sys::io_uring_prep_accept(self.sqe, fd.as_raw_fd(), addr, len, flags.bits()); fd.update_sqe(self); } #[inline] - pub unsafe fn prep_fadvise(&mut self, fd: impl UringFd, off: u64, len: u64, advice: PosixFadviseAdvice) { + pub unsafe fn prep_fadvise( + &mut self, + fd: impl UringFd, + off: u64, + len: u64, + advice: PosixFadviseAdvice, + ) { use PosixFadviseAdvice::*; let advice = match advice { - POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL, - POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL, - POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM, - POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE, - POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED, - POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED, + POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL, + POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL, + POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM, + POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE, + POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED, + POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED, }; uring_sys::io_uring_prep_fadvise(self.sqe, fd.as_raw_fd(), off as _, len as _, advice); fd.update_sqe(self); @@ -388,33 +428,44 @@ impl<'a> SQE<'a> { pub unsafe fn prep_madvise(&mut self, data: &mut [u8], advice: MmapAdvise) { use MmapAdvise::*; let advice = match advice { - MADV_NORMAL => libc::MADV_NORMAL, - MADV_RANDOM => libc::MADV_RANDOM, - MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL, - MADV_WILLNEED => libc::MADV_WILLNEED, - MADV_DONTNEED => libc::MADV_DONTNEED, - MADV_REMOVE => libc::MADV_REMOVE, - MADV_DONTFORK => libc::MADV_DONTFORK, - MADV_DOFORK => libc::MADV_DOFORK, - MADV_HWPOISON => libc::MADV_HWPOISON, - MADV_MERGEABLE => libc::MADV_MERGEABLE, - MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE, - MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE, - MADV_HUGEPAGE => libc::MADV_HUGEPAGE, - MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE, - MADV_DONTDUMP => libc::MADV_DONTDUMP, - MADV_DODUMP => libc::MADV_DODUMP, - MADV_FREE => libc::MADV_FREE, + MADV_NORMAL => libc::MADV_NORMAL, + MADV_RANDOM => libc::MADV_RANDOM, + MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL, + MADV_WILLNEED => libc::MADV_WILLNEED, + MADV_DONTNEED => libc::MADV_DONTNEED, + MADV_REMOVE => libc::MADV_REMOVE, + MADV_DONTFORK => libc::MADV_DONTFORK, + MADV_DOFORK => libc::MADV_DOFORK, + MADV_HWPOISON => libc::MADV_HWPOISON, + MADV_MERGEABLE => libc::MADV_MERGEABLE, + MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE, + MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE, + MADV_HUGEPAGE => libc::MADV_HUGEPAGE, + MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE, + MADV_DONTDUMP => libc::MADV_DONTDUMP, + MADV_DODUMP => libc::MADV_DODUMP, + MADV_FREE => libc::MADV_FREE, }; - uring_sys::io_uring_prep_madvise(self.sqe, data.as_mut_ptr() as *mut _, data.len() as _, advice); + uring_sys::io_uring_prep_madvise( + self.sqe, + data.as_mut_ptr() as *mut _, + data.len() as _, + advice, + ); } #[inline] - pub unsafe fn prep_epoll_ctl(&mut self, epoll_fd: RawFd, op: EpollOp, fd: RawFd, event: Option<&mut EpollEvent>) { + pub unsafe fn prep_epoll_ctl( + &mut self, + epoll_fd: RawFd, + op: EpollOp, + fd: RawFd, + event: Option<&mut EpollEvent>, + ) { let op = match op { - EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD, - EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL, - EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD, + EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD, + EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL, + EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD, }; let event = event.map_or(ptr::null_mut(), |event| event as *mut EpollEvent as *mut _); uring_sys::io_uring_prep_epoll_ctl(self.sqe, epoll_fd, fd, op, event); @@ -427,7 +478,8 @@ impl<'a> SQE<'a> { uring_sys::io_uring_prep_files_update(self.sqe, addr, len, offset as _); } - pub unsafe fn prep_provide_buffers(&mut self, + pub unsafe fn prep_provide_buffers( + &mut self, buffers: &mut [u8], count: u32, group: BufferGroupId, @@ -435,7 +487,14 @@ impl<'a> SQE<'a> { ) { let addr = buffers.as_mut_ptr() as *mut libc::c_void; let len = buffers.len() as u32 / count; - uring_sys::io_uring_prep_provide_buffers(self.sqe, addr, len as _, count as _, group.id as _, index as _); + uring_sys::io_uring_prep_provide_buffers( + self.sqe, + addr, + len as _, + count as _, + group.id as _, + index as _, + ); } pub unsafe fn prep_remove_buffers(&mut self, count: u32, id: BufferGroupId) { @@ -447,7 +506,6 @@ impl<'a> SQE<'a> { uring_sys::io_uring_prep_cancel(self.sqe, user_data as _, flags); } - /// Prepare a no-op event. /// ``` /// # use iou::{IoUring, sqe::SubmissionFlags}; @@ -520,8 +578,8 @@ impl<'a> SQE<'a> { } } -unsafe impl<'a> Send for SQE<'a> { } -unsafe impl<'a> Sync for SQE<'a> { } +unsafe impl<'a> Send for SQE<'a> {} +unsafe impl<'a> Sync for SQE<'a> {} #[derive(Debug)] pub struct SockAddrStorage { @@ -533,10 +591,7 @@ impl SockAddrStorage { pub fn uninit() -> Self { let storage = mem::MaybeUninit::uninit(); let len = mem::size_of::(); - SockAddrStorage { - storage, - len - } + SockAddrStorage { storage, len } } pub unsafe fn as_socket_addr(&self) -> io::Result { @@ -545,7 +600,7 @@ impl SockAddrStorage { let err_no = e.as_errno(); match err_no { Some(err_no) => io::Error::from_raw_os_error(err_no as _), - None => io::Error::new(io::ErrorKind::Other, "Unknown error") + None => io::Error::new(io::ErrorKind::Other, "Unknown error"), } }) } @@ -563,14 +618,19 @@ bitflags::bitflags! { const FIXED_FILE = 1 << 0; /* use fixed fileset */ /// Submit this event only after completing all ongoing submission events. const IO_DRAIN = 1 << 1; /* issue after inflight IO */ - /// Force the next submission event to wait until this event has completed sucessfully. + /// Force the next submission event to wait until this event has completed successfully. /// /// An event's link only applies to the next event, but link chains can be /// arbitrarily long. const IO_LINK = 1 << 2; /* next IO depends on this one */ - + /// Force the next submission event to wait until this event has completed. + /// + /// An event's link only applies to the next event, but link chains can be arbitrarily long. + /// The next submission event will be executed no matter current event succeeds or fails. const IO_HARDLINK = 1 << 3; + /// Execute the event in asynchronous mode without trying non-blocking mode first. const ASYNC = 1 << 4; + const BUFFER_SELECT = 1 << 5; } } @@ -638,7 +698,10 @@ impl<'ring> SQEs<'ring> { /// additional [`SQE`]s will return `None`. pub fn single(&mut self) -> Option> { let mut next = None; - while let Some(sqe) = self.consume() { next = Some(sqe) } + while let Some(mut sqe) = self.consume() { + unsafe { sqe.prep_nop() }; + next = Some(sqe) + } next } @@ -665,7 +728,7 @@ impl<'ring> SQEs<'ring> { fn consume(&mut self) -> Option> { self.sqes.next().map(|sqe| { - unsafe { uring_sys::io_uring_prep_nop(sqe) } + unsafe { *sqe = mem::zeroed() }; SQE { sqe } }) } @@ -680,6 +743,9 @@ impl<'ring> Iterator for SQEs<'ring> { } /// An Iterator of [`SQE`]s which will be hard linked together. +/// +/// All HardLinked objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO chain. pub struct HardLinked<'ring, 'a> { sqes: &'a mut SQEs<'ring>, } @@ -695,10 +761,27 @@ impl<'ring> Iterator for HardLinked<'ring, '_> { fn next(&mut self) -> Option { let is_final = self.sqes.remaining() == 1; - self.sqes.consume().map(|sqe| HardLinkedSQE { sqe, is_final }) + self.sqes + .consume() + .map(|sqe| HardLinkedSQE { sqe, is_final }) } } +impl<'ring> Drop for HardLinked<'ring, '_> { + fn drop(&mut self) { + // Ensure that all left descriptors are properly consumed. + for mut sqe in &mut self.sqes { + unsafe { + sqe.prep_nop(); + } + } + } +} + +/// Represent a non-tail event descriptor on an hardly linked IO chain. +/// +/// All HardLinkedSQE objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO_HARDLINK flag. pub struct HardLinkedSQE<'ring> { sqe: SQE<'ring>, is_final: bool, @@ -718,6 +801,10 @@ impl<'ring> DerefMut for HardLinkedSQE<'ring> { } } +// TODO: any better way to set the IO_HARDLINK flag? +// If submit() is called before dropping the HardLinkedSQE object, it may caused race windows +// under which the kernel observes malformed IO chains. This type of race window will be very hard +// to root cause. impl<'ring> Drop for HardLinkedSQE<'ring> { fn drop(&mut self) { if !self.is_final { @@ -727,6 +814,9 @@ impl<'ring> Drop for HardLinkedSQE<'ring> { } /// An Iterator of [`SQE`]s which will be soft linked together. +/// +/// All SoftLinked objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO chain. pub struct SoftLinked<'ring, 'a> { sqes: &'a mut SQEs<'ring>, } @@ -742,10 +832,27 @@ impl<'ring> Iterator for SoftLinked<'ring, '_> { fn next(&mut self) -> Option { let is_final = self.sqes.remaining() == 1; - self.sqes.consume().map(|sqe| SoftLinkedSQE { sqe, is_final }) + self.sqes + .consume() + .map(|sqe| SoftLinkedSQE { sqe, is_final }) } } +impl<'ring> Drop for SoftLinked<'ring, '_> { + fn drop(&mut self) { + // Ensure that all left descriptors are properly consumed. + for mut sqe in &mut self.sqes { + unsafe { + sqe.prep_nop(); + } + } + } +} + +/// Represent a non-tail event descriptor on an softly linked IO chain. +/// +/// All SoftLinkedSQE objects must be dropped before submitting the submission queue to ensure +/// correctly handling of the IO_HARDLINK flag. pub struct SoftLinkedSQE<'ring> { sqe: SQE<'ring>, is_final: bool, @@ -765,6 +872,10 @@ impl<'ring> DerefMut for SoftLinkedSQE<'ring> { } } +// TODO: any better way to set the IO_LINK flag? +// If submit() is called before dropping the SoftLinkedSQE object, it may caused race windows +// under which the kernel observes malformed IO chains. This type of race window will be very hard +// to root cause. impl<'ring> Drop for SoftLinkedSQE<'ring> { fn drop(&mut self) { if !self.is_final { diff --git a/src/submission_queue.rs b/src/submission_queue.rs index 7a72a72..3bb4e0b 100644 --- a/src/submission_queue.rs +++ b/src/submission_queue.rs @@ -1,12 +1,13 @@ use std::fmt; use std::io; -use std::ptr::NonNull; use std::marker::PhantomData; +use std::ptr::NonNull; use std::slice; -use std::time::Duration; use std::sync::atomic::{self, Ordering}; +use std::time::Duration; -use super::{IoUring, SQE, SQEs, resultify}; +use super::{resultify, IoUring, SQEs, SQE}; +use bitflags::_core::num::Wrapping; /// The queue of pending IO events. /// @@ -76,13 +77,11 @@ impl<'ring> SubmissionQueue<'ring> { /// # Ok(()) /// # } /// - pub fn prepare_sqe<'a>(&'a mut self) -> Option> { - unsafe { - prepare_sqe(self.ring.as_mut()) - } + pub fn prepare_sqe(&mut self) -> Option { + unsafe { prepare_sqe(self.ring.as_mut()) } } - pub fn prepare_sqes<'a>(&'a mut self, count: u32) -> Option> { + pub fn prepare_sqes(&mut self, count: u32) -> Option { unsafe { let sq: &mut uring_sys::io_uring_sq = &mut (*self.ring.as_ptr()).sq; prepare_sqes(sq, count) @@ -91,30 +90,62 @@ impl<'ring> SubmissionQueue<'ring> { /// Submit all events in the queue. Returns the number of submitted events. /// - /// If this function encounters any IO errors an [`io::Error`](std::io::Result) variant is returned. + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. pub fn submit(&mut self) -> io::Result { resultify(unsafe { uring_sys::io_uring_submit(self.ring.as_ptr()) }) } + /// Submit all events in the queue and wait for `wait_for` event completions before returning. + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. pub fn submit_and_wait(&mut self, wait_for: u32) -> io::Result { resultify(unsafe { uring_sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _) }) } - pub fn submit_and_wait_with_timeout(&mut self, wait_for: u32, duration: Duration) - -> io::Result - { + /// Submit all events in the queue and wait for `wait_for` event completions before returning, + /// timeout after waiting for `duration`. + /// + /// # Return value + /// - the number of submitted events, it's safe to reuse SQE entries in the ring. This is true + /// even if the actual IO submission had to be punted to async context, which means that the + /// SQE may in fact not have been submitted yet. + /// - an [`io::Error`](std::io::Result) variant if this function encounters any IO errors. + /// + /// # Note + /// Due to the way timeout is implemented, there are two possible flaws: + /// - the timeout is unreliable. When all submission queue is full, it fallbacks to submit() + /// silently. + /// - the returned value may be bigger than expectation. There may be one extra descriptor + /// consumed by the timeout mechanism. The user data of descriptor consumed by timeout is + /// set to [`LIBURING_UDATA_TIMEOUT`](uring_sys::LIBURING_UDATA_TIMEOUT)(u64::MAX), so this + /// special value is reserved. + pub fn submit_and_wait_with_timeout( + &mut self, + wait_for: u32, + duration: Duration, + ) -> io::Result { let ts = uring_sys::__kernel_timespec { tv_sec: duration.as_secs() as _, - tv_nsec: duration.subsec_nanos() as _ + tv_nsec: duration.subsec_nanos() as _, }; loop { if let Some(mut sqe) = self.prepare_sqe() { - sqe.clear(); unsafe { sqe.prep_timeout(&ts, 0, crate::sqe::TimeoutFlags::empty()); sqe.set_user_data(uring_sys::LIBURING_UDATA_TIMEOUT); - return resultify(uring_sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _)) + return resultify(uring_sys::io_uring_submit_and_wait( + self.ring.as_ptr(), + wait_for as _, + )); } } @@ -122,10 +153,12 @@ impl<'ring> SubmissionQueue<'ring> { } } + /// Returns the numbers of ready event descriptors on the submission queue. pub fn ready(&self) -> u32 { unsafe { uring_sys::io_uring_sq_ready(self.ring.as_ptr()) as u32 } } + /// Returns the numbers of available event descriptors on the submission queue. pub fn space_left(&self) -> u32 { unsafe { uring_sys::io_uring_sq_space_left(self.ring.as_ptr()) as u32 } } @@ -134,12 +167,14 @@ impl<'ring> SubmissionQueue<'ring> { impl fmt::Debug for SubmissionQueue<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let fd = unsafe { self.ring.as_ref().ring_fd }; - f.debug_struct(std::any::type_name::()).field("fd", &fd).finish() + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() } } -unsafe impl<'ring> Send for SubmissionQueue<'ring> { } -unsafe impl<'ring> Sync for SubmissionQueue<'ring> { } +unsafe impl<'ring> Send for SubmissionQueue<'ring> {} +unsafe impl<'ring> Sync for SubmissionQueue<'ring> {} pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option> { let sqe = uring_sys::io_uring_get_sqe(ring); @@ -152,18 +187,26 @@ pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option(sq: &mut uring_sys::io_uring_sq, count: u32) - -> Option> -{ +pub(crate) unsafe fn prepare_sqes<'a>( + sq: &mut uring_sys::io_uring_sq, + count: u32, +) -> Option> { atomic::fence(Ordering::Acquire); - let head: u32 = *sq.khead; - let next: u32 = sq.sqe_tail + count; + let cap = Wrapping(*sq.kring_entries as u32); + let count = Wrapping(count); + // Protect "next - head <= cap" from over-floating caused by `count` + if count > cap { + return None; + } + + let head = Wrapping(*sq.khead as u32); + let next = Wrapping(sq.sqe_tail as u32) + count; - if next - head <= *sq.kring_entries { + if next - head <= cap { let sqe = sq.sqes.offset((sq.sqe_tail & *sq.kring_mask) as isize); - sq.sqe_tail = next; - Some(SQEs::new(slice::from_raw_parts_mut(sqe, count as usize))) + sq.sqe_tail = next.0; + Some(SQEs::new(slice::from_raw_parts_mut(sqe, count.0 as usize))) } else { None } diff --git a/tests/accept.rs b/tests/accept.rs index 34e0ee5..17e4f44 100644 --- a/tests/accept.rs +++ b/tests/accept.rs @@ -1,10 +1,10 @@ +use iou::sqe::SockAddr; use nix::sys::socket::InetAddr; use std::{ io::{self, Read, Write}, net::{TcpListener, TcpStream}, os::unix::io::{AsRawFd, FromRawFd}, }; -use iou::sqe::SockAddr; const MESSAGE: &'static [u8] = b"Hello World"; diff --git a/tests/connect.rs b/tests/connect.rs index 3a1a41f..23876c5 100644 --- a/tests/connect.rs +++ b/tests/connect.rs @@ -1,4 +1,4 @@ -use nix::sys::socket::{AddressFamily, SockProtocol, SockType, InetAddr, SockFlag}; +use nix::sys::socket::{AddressFamily, InetAddr, SockFlag, SockProtocol, SockType}; use std::{io, net::TcpListener}; #[test] @@ -14,7 +14,7 @@ fn connect() -> io::Result<()> { SockFlag::SOCK_NONBLOCK, SockProtocol::Tcp, ) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to create socket"))?; + .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to create socket"))?; let mut ring = iou::IoUring::new(1)?; let mut sqe = ring.prepare_sqe().expect("failed to get sqe"); diff --git a/tests/exhaust-queue.rs b/tests/exhaust-queue.rs index d9a7137..646e875 100644 --- a/tests/exhaust-queue.rs +++ b/tests/exhaust-queue.rs @@ -35,7 +35,6 @@ fn exhaust_queue_with_prepare_sqes() { for counter in base..counter { let cqe = io_uring.peek_for_cqe().unwrap(); assert_eq!(cqe.user_data(), counter); - } } } diff --git a/tests/fileset-placeholder.rs b/tests/fileset-placeholder.rs index c5eb641..7b96fc5 100644 --- a/tests/fileset-placeholder.rs +++ b/tests/fileset-placeholder.rs @@ -1,4 +1,4 @@ -use iou::{IoUring, registrar::RegisteredFd}; +use iou::{registrar::RegisteredFd, IoUring}; use std::fs::File; use std::io::{IoSlice, Read}; use std::os::unix::io::AsRawFd; @@ -20,7 +20,9 @@ fn main() -> std::io::Result<()> { // update a random fileset entry with a valid file let file = std::fs::File::create(&path)?; - let reg_file = registrar.update_registered_files(713, &[file.as_raw_fd()])?.collect::>()[0]; + let reg_file = registrar + .update_registered_files(713, &[file.as_raw_fd()])? + .collect::>()[0]; assert!(!reg_file.is_placeholder()); let bufs = &[IoSlice::new(&TEXT)]; diff --git a/tests/fixed-file-write.rs b/tests/fixed-file-write.rs index c047813..7d947d2 100644 --- a/tests/fixed-file-write.rs +++ b/tests/fixed-file-write.rs @@ -1,4 +1,4 @@ -use iou::{IoUring, registrar::RegisteredFd, Registrar}; +use iou::{registrar::RegisteredFd, IoUring, Registrar}; use std::fs::{self, File}; use std::io::{IoSlice, Read}; use std::os::unix::io::AsRawFd; diff --git a/tests/read.rs b/tests/read.rs index 9542531..0162b2a 100644 --- a/tests/read.rs +++ b/tests/read.rs @@ -79,7 +79,8 @@ fn read_test() -> io::Result<()> { fn read_registered_buf() -> io::Result<()> { let mut io_uring = iou::IoUring::new(32)?; let bufs = vec![Box::new([0u8; 4096]) as Box<[u8]>]; - let mut buf: iou::registrar::RegisteredBuf = io_uring.registrar().register_buffers(bufs)?.next().unwrap(); + let mut buf: iou::registrar::RegisteredBuf = + io_uring.registrar().register_buffers(bufs)?.next().unwrap(); let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push("props"); @@ -119,7 +120,11 @@ fn read_registered_fd_and_buf() -> io::Result<()> { let file = File::open(&path)?; let mut buf: RegisteredBuf = io_uring.registrar().register_buffers(bufs)?.next().unwrap(); - let fd: RegisteredFd = io_uring.registrar().register_files(&[file.as_raw_fd()])?.next().unwrap(); + let fd: RegisteredFd = io_uring + .registrar() + .register_files(&[file.as_raw_fd()])? + .next() + .unwrap(); unsafe { let mut sq = io_uring.sq(); diff --git a/tests/register-buffers.rs b/tests/register-buffers.rs index f06c806..fea12ed 100644 --- a/tests/register-buffers.rs +++ b/tests/register-buffers.rs @@ -3,9 +3,11 @@ fn register_buffers_by_val() { let buf1 = vec![0; 1024].into_boxed_slice(); let buf2 = vec![0; 1024].into_boxed_slice(); let ring = iou::IoUring::new(8).unwrap(); - let bufs: Vec<_> = ring.registrar() - .register_buffers(vec![buf1, buf2]) - .unwrap().collect(); + let bufs: Vec<_> = ring + .registrar() + .register_buffers(vec![buf1, buf2]) + .unwrap() + .collect(); assert_eq!(bufs.len(), 2); assert_eq!(bufs[0].index(), 0); assert_eq!(bufs[1].index(), 1); @@ -17,9 +19,11 @@ fn register_buffers_by_ref() { let buf2 = vec![0; 1024]; let ring = iou::IoUring::new(8).unwrap(); let bufs = &[&buf1[..], &buf2[..]]; - let bufs: Vec<_> = ring.registrar() - .register_buffers_by_ref(bufs) - .unwrap().collect(); + let bufs: Vec<_> = ring + .registrar() + .register_buffers_by_ref(bufs) + .unwrap() + .collect(); assert_eq!(bufs.len(), 2); assert_eq!(bufs[0].index(), 0); assert_eq!(bufs[1].index(), 1); @@ -31,9 +35,11 @@ fn register_buffers_by_mut() { let mut buf2 = vec![0; 1024]; let ring = iou::IoUring::new(8).unwrap(); let bufs = &mut [&mut buf1[..], &mut buf2[..]]; - let bufs: Vec<_> = ring.registrar() - .register_buffers_by_mut(bufs) - .unwrap().collect(); + let bufs: Vec<_> = ring + .registrar() + .register_buffers_by_mut(bufs) + .unwrap() + .collect(); assert_eq!(bufs.len(), 2); assert_eq!(bufs[0].index(), 0); assert_eq!(bufs[1].index(), 1); diff --git a/tests/write.rs b/tests/write.rs index 8e093a0..f3f4609 100644 --- a/tests/write.rs +++ b/tests/write.rs @@ -1,7 +1,7 @@ use std::fs::{self, File}; use std::io::{self, Read}; -use std::path::PathBuf; use std::os::unix::io::AsRawFd; +use std::path::PathBuf; const TEXT: &[u8] = b"I really wanna stop But I just gotta taste for it @@ -23,12 +23,11 @@ fn vectored_write_test() -> io::Result<()> { path.push("vectored.tmp"); let _ = fs::remove_file(&path); - + let n = { let mut io_uring = iou::IoUring::new(32)?; let bufs = [io::IoSlice::new(TEXT)]; - let file = File::create(&path)?; unsafe { let mut sq = io_uring.sq(); @@ -61,7 +60,7 @@ fn write_test() -> io::Result<()> { path.push("text.tmp"); let _ = fs::remove_file(&path); - + let n = { let mut io_uring = iou::IoUring::new(32)?; @@ -89,7 +88,6 @@ fn write_test() -> io::Result<()> { Ok(()) } - #[test] fn write_registered_buf() -> io::Result<()> { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -100,7 +98,8 @@ fn write_registered_buf() -> io::Result<()> { let mut io_uring = iou::IoUring::new(32)?; let bufs = vec![Box::new([0u8; 4096]) as Box<[u8]>]; - let mut buf: iou::registrar::RegisteredBuf = io_uring.registrar().register_buffers(bufs)?.next().unwrap(); + let mut buf: iou::registrar::RegisteredBuf = + io_uring.registrar().register_buffers(bufs)?.next().unwrap(); buf.as_mut().slice_to_mut(TEXT.len()).copy_from_slice(TEXT);