Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/completion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::ptr::NonNull;

use super::{IoUring, CQE, CQEs, CQEsBlocking, resultify};
use super::{resultify, CQEs, CQEsBlocking, IoUring, CQE};

/// The queue of completed IO events.
///
Expand Down Expand Up @@ -61,12 +61,10 @@ impl<'ring> CompletionQueue<'ring> {
unsafe {
let mut cqe = MaybeUninit::uninit();

resultify(uring_sys::io_uring_wait_cqes(
resultify(uring_sys::io_uring_wait_cqe_nr(
self.ring.as_ptr(),
cqe.as_mut_ptr(),
count as _,
ptr::null(),
ptr::null(),
))?;

Ok(&mut *cqe.assume_init())
Expand All @@ -89,14 +87,17 @@ impl<'ring> CompletionQueue<'ring> {
CQEsBlocking::new(self.ring, wait_for)
}

/// Returns how many descriptors are ready for processing on the completion queue.
pub fn ready(&self) -> u32 {
unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) }
}

/// Returns true if the eventfd notification is currently enabled.
pub fn eventfd_enabled(&self) -> bool {
unsafe { uring_sys::io_uring_cq_eventfd_enabled(self.ring.as_ptr()) }
}

/// Toggle eventfd notification on or off, if an eventfd is registered with the ring.
pub fn eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> {
resultify(unsafe { uring_sys::io_uring_cq_eventfd_toggle(self.ring.as_ptr(), enabled) })?;
Ok(())
Expand All @@ -106,9 +107,11 @@ impl<'ring> CompletionQueue<'ring> {
impl fmt::Debug for CompletionQueue<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let fd = unsafe { self.ring.as_ref().ring_fd };
f.debug_struct(std::any::type_name::<Self>()).field("fd", &fd).finish()
f.debug_struct(std::any::type_name::<Self>())
.field("fd", &fd)
.finish()
}
}

unsafe impl<'ring> Send for CompletionQueue<'ring> { }
unsafe impl<'ring> Sync for CompletionQueue<'ring> { }
unsafe impl<'ring> Send for CompletionQueue<'ring> {}
unsafe impl<'ring> Sync for CompletionQueue<'ring> {}
37 changes: 24 additions & 13 deletions src/cqe.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::io;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::ptr::NonNull;

use super::{IoUring, resultify};
use super::{resultify, IoUring};

/// A completed IO event.
#[derive(Debug)]
Expand All @@ -24,11 +24,16 @@ impl CQE {

pub fn from_raw_parts(user_data: u64, res: i32, flags: CompletionFlags) -> CQE {
CQE {
user_data, res, flags,
user_data,
res,
flags,
}
}

pub(crate) fn new(ring: NonNull<uring_sys::io_uring>, cqe: &mut uring_sys::io_uring_cqe) -> CQE {
pub(crate) fn new(
ring: NonNull<uring_sys::io_uring>,
cqe: &mut uring_sys::io_uring_cqe,
) -> CQE {
let user_data = cqe.user_data;
let res = cqe.res;
let flags = CompletionFlags::from_bits_truncate(cqe.flags);
Expand Down Expand Up @@ -61,8 +66,8 @@ impl CQE {
}
}

unsafe impl Send for CQE { }
unsafe impl Sync for CQE { }
unsafe impl Send for CQE {}
unsafe impl Sync for CQE {}

/// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue).
///
Expand All @@ -75,7 +80,11 @@ pub struct CQEs<'a> {

impl<'a> CQEs<'a> {
pub(crate) fn new(ring: NonNull<uring_sys::io_uring>) -> CQEs<'a> {
CQEs { ring, ready: 0, marker: PhantomData }
CQEs {
ring,
ready: 0,
marker: PhantomData,
}
}

#[inline(always)]
Expand Down Expand Up @@ -114,7 +123,6 @@ impl Iterator for CQEs<'_> {
}
}


/// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue).
///
/// This iterator will never be exhausted; if there are no `CQE`s ready, it will block until there
Expand All @@ -128,7 +136,12 @@ pub struct CQEsBlocking<'a> {

impl<'a> CQEsBlocking<'a> {
pub(crate) fn new(ring: NonNull<uring_sys::io_uring>, wait_for: u32) -> CQEsBlocking<'a> {
CQEsBlocking { ring, ready: 0, wait_for, marker: PhantomData }
CQEsBlocking {
ring,
ready: 0,
wait_for,
marker: PhantomData,
}
}

#[inline(always)]
Expand All @@ -155,12 +168,10 @@ impl<'a> CQEsBlocking<'a> {
unsafe {
let mut cqe = MaybeUninit::uninit();

resultify(uring_sys::io_uring_wait_cqes(
resultify(uring_sys::io_uring_wait_cqe_nr(
self.ring.as_ptr(),
cqe.as_mut_ptr(),
self.wait_for as _,
ptr::null(),
ptr::null(),
))?;

Ok(&mut *cqe.assume_init())
Expand All @@ -176,7 +187,7 @@ impl Iterator for CQEsBlocking<'_> {
self.ready = self.ready();
if self.ready == 0 {
let ring = self.ring;
return Some(self.wait().map(|cqe| CQE::new(ring, cqe)))
return Some(self.wait().map(|cqe| CQE::new(ring, cqe)));
}
}

Expand Down
Loading