From 2da270c8df19d0efe02b7023cf7c4c5b5acc5282 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Tue, 11 Feb 2025 16:08:23 +0100 Subject: [PATCH 1/8] feat: allocate rx buffer --- Cargo.lock | 1 + DEFAULT_CONFIG.json5 | 6 - commons/zenoh-buffers/src/zslice.rs | 6 + commons/zenoh-config/src/defaults.rs | 1 - commons/zenoh-config/src/lib.rs | 6 - commons/zenoh-sync/src/fifo_queue.rs | 82 ---------- commons/zenoh-sync/src/lib.rs | 12 -- commons/zenoh-sync/src/lifo_queue.rs | 81 --------- commons/zenoh-sync/src/mvar.rs | 130 --------------- commons/zenoh-sync/src/object_pool.rs | 154 ------------------ commons/zenoh-sync/src/signal.rs | 22 +-- io/zenoh-links/zenoh-link-udp/Cargo.toml | 3 +- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 12 +- io/zenoh-transport/src/common/batch.rs | 43 ++--- io/zenoh-transport/src/common/mod.rs | 19 +++ io/zenoh-transport/src/manager.rs | 10 -- io/zenoh-transport/src/multicast/link.rs | 67 ++------ io/zenoh-transport/src/unicast/link.rs | 48 +++--- .../src/unicast/lowlatency/link.rs | 27 +-- .../src/unicast/universal/link.rs | 42 +---- 20 files changed, 103 insertions(+), 669 deletions(-) delete mode 100644 commons/zenoh-sync/src/fifo_queue.rs delete mode 100644 commons/zenoh-sync/src/lifo_queue.rs delete mode 100644 commons/zenoh-sync/src/mvar.rs delete mode 100644 commons/zenoh-sync/src/object_pool.rs diff --git a/Cargo.lock b/Cargo.lock index 4df57cda16..f4b5751cca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5499,6 +5499,7 @@ name = "zenoh-link-udp" version = "1.2.1" dependencies = [ "async-trait", + "flume", "socket2 0.5.7", "tokio", "tokio-util", diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index e8d6e7ae8f..088db07559 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -514,12 +514,6 @@ }, /// Configure the zenoh RX parameters of a link rx: { - /// Receiving buffer size in bytes for each link - /// The default the rx_buffer_size value is the same as the default batch size: 65535. - /// For very high throughput scenarios, the rx_buffer_size can be increased to accommodate - /// more in-flight data. This is particularly relevant when dealing with large messages. - /// E.g. for 16MiB rx_buffer_size set the value to: 16777216. - buffer_size: 65535, /// Maximum size of the defragmentation buffer at receiver end. /// Fragmented messages that are larger than the configured size will be dropped. /// The default value is 1GiB. This would work in most scenarios. diff --git a/commons/zenoh-buffers/src/zslice.rs b/commons/zenoh-buffers/src/zslice.rs index 5b0d99f07b..07a2c728db 100644 --- a/commons/zenoh-buffers/src/zslice.rs +++ b/commons/zenoh-buffers/src/zslice.rs @@ -168,6 +168,12 @@ impl ZSlice { self.len() == 0 } + #[doc(hidden)] + #[inline] + pub fn capacity(&self) -> usize { + self.buf.as_slice().len() + } + #[inline] #[must_use] pub fn as_slice(&self) -> &[u8] { diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index fccee57a24..5a5c72a083 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -315,7 +315,6 @@ impl Default for BatchingConf { impl Default for LinkRxConf { fn default() -> Self { Self { - buffer_size: BatchSize::MAX as usize, max_message_size: 2_usize.pow(30), } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index c9d8e8f41e..53cc32ca5e 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -519,12 +519,6 @@ validated_struct::validator! { threads: usize, }, pub rx: LinkRxConf { - /// Receiving buffer size in bytes for each link - /// The default the rx_buffer_size value is the same as the default batch size: 65535. - /// For very high throughput scenarios, the rx_buffer_size can be increased to accommodate - /// more in-flight data. This is particularly relevant when dealing with large messages. - /// E.g. for 16MiB rx_buffer_size set the value to: 16777216. - buffer_size: usize, /// Maximum size of the defragmentation buffer at receiver end (default: 1GiB). /// Fragmented messages that are larger than the configured size will be dropped. max_message_size: usize, diff --git a/commons/zenoh-sync/src/fifo_queue.rs b/commons/zenoh-sync/src/fifo_queue.rs deleted file mode 100644 index 44bc2a5b17..0000000000 --- a/commons/zenoh-sync/src/fifo_queue.rs +++ /dev/null @@ -1,82 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use tokio::sync::Mutex; -use zenoh_collections::RingBuffer; -use zenoh_core::zasynclock; - -use crate::Condition; - -pub struct FifoQueue { - not_empty: Condition, - not_full: Condition, - buffer: Mutex>, -} - -impl FifoQueue { - pub fn new(capacity: usize) -> FifoQueue { - FifoQueue { - not_empty: Condition::new(), - not_full: Condition::new(), - buffer: Mutex::new(RingBuffer::new(capacity)), - } - } - - pub fn try_push(&self, x: T) -> Option { - if let Ok(mut guard) = self.buffer.try_lock() { - let res = guard.push(x); - if res.is_none() { - drop(guard); - self.not_empty.notify_one(); - } - return res; - } - Some(x) - } - - pub async fn push(&self, x: T) { - loop { - let mut guard = zasynclock!(self.buffer); - if !guard.is_full() { - guard.push(x); - drop(guard); - self.not_empty.notify_one(); - return; - } - self.not_full.wait(guard).await; - } - } - - pub fn try_pull(&self) -> Option { - if let Ok(mut guard) = self.buffer.try_lock() { - if let Some(e) = guard.pull() { - drop(guard); - self.not_full.notify_one(); - return Some(e); - } - } - None - } - - pub async fn pull(&self) -> T { - loop { - let mut guard = zasynclock!(self.buffer); - if let Some(e) = guard.pull() { - drop(guard); - self.not_full.notify_one(); - return e; - } - self.not_empty.wait(guard).await; - } - } -} diff --git a/commons/zenoh-sync/src/lib.rs b/commons/zenoh-sync/src/lib.rs index 3c213fd4f9..2c0cdfe441 100644 --- a/commons/zenoh-sync/src/lib.rs +++ b/commons/zenoh-sync/src/lib.rs @@ -28,18 +28,6 @@ use futures::FutureExt; pub mod event; pub use event::*; -pub mod fifo_queue; -pub use fifo_queue::*; - -pub mod lifo_queue; -pub use lifo_queue::*; - -pub mod object_pool; -pub use object_pool::*; - -pub mod mvar; -pub use mvar::*; - pub mod condition; pub use condition::*; diff --git a/commons/zenoh-sync/src/lifo_queue.rs b/commons/zenoh-sync/src/lifo_queue.rs deleted file mode 100644 index 9fe541da36..0000000000 --- a/commons/zenoh-sync/src/lifo_queue.rs +++ /dev/null @@ -1,81 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::sync::{Condvar, Mutex}; - -use zenoh_collections::StackBuffer; -use zenoh_core::zlock; - -pub struct LifoQueue { - not_empty: Condvar, - not_full: Condvar, - buffer: Mutex>, -} - -impl LifoQueue { - pub fn new(capacity: usize) -> LifoQueue { - LifoQueue { - not_empty: Condvar::new(), - not_full: Condvar::new(), - buffer: Mutex::new(StackBuffer::new(capacity)), - } - } - - pub fn try_push(&self, x: T) -> Option { - if let Ok(mut guard) = self.buffer.try_lock() { - let res = guard.push(x); - if res.is_none() { - drop(guard); - self.not_empty.notify_one(); - } - return res; - } - Some(x) - } - - pub fn push(&self, x: T) { - let mut guard = zlock!(self.buffer); - loop { - if !guard.is_full() { - guard.push(x); - drop(guard); - self.not_empty.notify_one(); - return; - } - guard = self.not_full.wait(guard).unwrap(); - } - } - - pub fn try_pull(&self) -> Option { - if let Ok(mut guard) = self.buffer.try_lock() { - if let Some(e) = guard.pop() { - drop(guard); - self.not_full.notify_one(); - return Some(e); - } - } - None - } - - pub fn pull(&self) -> T { - let mut guard = zlock!(self.buffer); - loop { - if let Some(e) = guard.pop() { - drop(guard); - self.not_full.notify_one(); - return e; - } - guard = self.not_empty.wait(guard).unwrap(); - } - } -} diff --git a/commons/zenoh-sync/src/mvar.rs b/commons/zenoh-sync/src/mvar.rs deleted file mode 100644 index f818b44071..0000000000 --- a/commons/zenoh-sync/src/mvar.rs +++ /dev/null @@ -1,130 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::sync::atomic::{AtomicUsize, Ordering}; - -use tokio::sync::Mutex; -use zenoh_core::zasynclock; - -use crate::Condition; - -pub struct Mvar { - inner: Mutex>, - cond_put: Condition, - cond_take: Condition, - wait_put: AtomicUsize, - wait_take: AtomicUsize, -} - -impl Mvar { - pub fn new() -> Mvar { - Mvar { - inner: Mutex::new(None), - cond_put: Condition::new(), - cond_take: Condition::new(), - wait_put: AtomicUsize::new(0), - wait_take: AtomicUsize::new(0), - } - } - - pub fn has_take_waiting(&self) -> bool { - self.wait_take.load(Ordering::Acquire) > 0 - } - - pub fn has_put_waiting(&self) -> bool { - self.wait_put.load(Ordering::Acquire) > 0 - } - - pub async fn try_take(&self) -> Option { - let mut guard = zasynclock!(self.inner); - if let Some(inner) = guard.take() { - drop(guard); - self.cond_put.notify_one(); - return Some(inner); - } - None - } - - pub async fn take(&self) -> T { - loop { - let mut guard = zasynclock!(self.inner); - if let Some(inner) = guard.take() { - self.wait_take.fetch_sub(1, Ordering::AcqRel); - drop(guard); - self.cond_put.notify_one(); - return inner; - } - self.wait_take.fetch_add(1, Ordering::AcqRel); - self.cond_take.wait(guard).await; - } - } - - pub async fn put(&self, inner: T) { - loop { - let mut guard = zasynclock!(self.inner); - if guard.is_some() { - self.wait_put.fetch_add(1, Ordering::AcqRel); - self.cond_put.wait(guard).await; - } else { - *guard = Some(inner); - self.wait_put.fetch_sub(1, Ordering::AcqRel); - drop(guard); - self.cond_take.notify_one(); - return; - } - } - } -} - -impl Default for Mvar { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use zenoh_result::ZResult; - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn mvar() -> ZResult<()> { - use std::{sync::Arc, time::Duration}; - - use super::Mvar; - - const TIMEOUT: Duration = Duration::from_secs(60); - - let count: usize = 1_000; - let mvar: Arc> = Arc::new(Mvar::new()); - - let c_mvar = mvar.clone(); - let ch = tokio::task::spawn(async move { - for _ in 0..count { - let n = c_mvar.take().await; - print!("-{n} "); - } - }); - - let ph = tokio::task::spawn(async move { - for i in 0..count { - mvar.put(i).await; - print!("+{i} "); - } - }); - - let _ = tokio::time::timeout(TIMEOUT, ph).await?; - let _ = tokio::time::timeout(TIMEOUT, ch).await?; - println!(); - Ok(()) - } -} diff --git a/commons/zenoh-sync/src/object_pool.rs b/commons/zenoh-sync/src/object_pool.rs deleted file mode 100644 index ee6eed881b..0000000000 --- a/commons/zenoh-sync/src/object_pool.rs +++ /dev/null @@ -1,154 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::{ - any::Any, - fmt, - ops::{Deref, DerefMut, Drop}, - sync::{Arc, Weak}, -}; - -use zenoh_buffers::ZSliceBuffer; - -use super::LifoQueue; - -/// Provides a pool of pre-allocated objects that are automaticlaly reinserted into -/// the pool when dropped. -pub struct RecyclingObjectPool -where - F: Fn() -> T, -{ - inner: Arc>, - f: F, -} - -impl T> RecyclingObjectPool { - pub fn new(num: usize, f: F) -> RecyclingObjectPool { - let inner: Arc> = Arc::new(LifoQueue::new(num)); - for _ in 0..num { - let obj = (f)(); - inner.try_push(obj); - } - RecyclingObjectPool { inner, f } - } - - pub fn alloc(&self) -> RecyclingObject { - RecyclingObject::new((self.f)(), Weak::new()) - } - - pub fn try_take(&self) -> Option> { - self.inner - .try_pull() - .map(|obj| RecyclingObject::new(obj, Arc::downgrade(&self.inner))) - } - - pub fn take(&self) -> RecyclingObject { - let obj = self.inner.pull(); - RecyclingObject::new(obj, Arc::downgrade(&self.inner)) - } -} - -#[derive(Clone)] -pub struct RecyclingObject { - pool: Weak>, - object: Option, -} - -impl RecyclingObject { - pub fn new(obj: T, pool: Weak>) -> RecyclingObject { - RecyclingObject { - pool, - object: Some(obj), - } - } - - pub fn recycle(mut self) { - if let Some(pool) = self.pool.upgrade() { - if let Some(obj) = self.object.take() { - pool.push(obj); - } - } - } -} - -impl Eq for RecyclingObject {} - -impl PartialEq for RecyclingObject { - fn eq(&self, other: &Self) -> bool { - self.object == other.object - } -} - -impl Deref for RecyclingObject { - type Target = T; - #[inline] - fn deref(&self) -> &Self::Target { - self.object.as_ref().unwrap() - } -} - -impl DerefMut for RecyclingObject { - #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { - self.object.as_mut().unwrap() - } -} - -impl From for RecyclingObject { - fn from(obj: T) -> RecyclingObject { - RecyclingObject::new(obj, Weak::new()) - } -} - -impl Drop for RecyclingObject { - fn drop(&mut self) { - if let Some(pool) = self.pool.upgrade() { - if let Some(obj) = self.object.take() { - pool.push(obj); - } - } - } -} - -impl fmt::Debug for RecyclingObject { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("").field("inner", &self.object).finish() - } -} - -// Buffer impl -impl AsRef<[u8]> for RecyclingObject> { - fn as_ref(&self) -> &[u8] { - self.deref() - } -} - -impl AsMut<[u8]> for RecyclingObject> { - fn as_mut(&mut self) -> &mut [u8] { - self.deref_mut() - } -} - -impl ZSliceBuffer for RecyclingObject> { - fn as_slice(&self) -> &[u8] { - self.as_ref() - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} diff --git a/commons/zenoh-sync/src/signal.rs b/commons/zenoh-sync/src/signal.rs index 053f5a13aa..c35520e30a 100644 --- a/commons/zenoh-sync/src/signal.rs +++ b/commons/zenoh-sync/src/signal.rs @@ -16,7 +16,7 @@ use std::sync::{ Arc, }; -use tokio::sync::Semaphore; +use tokio::sync::Notify; #[derive(Debug, Clone)] pub struct Signal { @@ -25,7 +25,7 @@ pub struct Signal { #[derive(Debug)] struct Inner { - semaphore: Semaphore, + notify: Notify, triggered: AtomicBool, } @@ -33,22 +33,15 @@ impl Signal { pub fn new() -> Self { Signal { shared: Arc::new(Inner { - semaphore: Semaphore::new(0), + notify: Notify::new(), triggered: AtomicBool::new(false), }), } } pub fn trigger(&self) { - let result = self - .shared - .triggered - .compare_exchange(false, true, AcqRel, Acquire); - - if result.is_ok() { - // The maximum # of permits is defined in tokio doc. - // https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html#method.add_permits - self.shared.semaphore.add_permits(usize::MAX >> 3); + if !self.shared.triggered.swap(true, AcqRel) { + self.shared.notify.notify_waiters(); } } @@ -58,7 +51,10 @@ impl Signal { pub async fn wait(&self) { if !self.is_triggered() { - let _ = self.shared.semaphore.acquire().await; + let notified = self.shared.notify.notified(); + if !self.is_triggered() { + notified.await; + } } } } diff --git a/io/zenoh-links/zenoh-link-udp/Cargo.toml b/io/zenoh-links/zenoh-link-udp/Cargo.toml index 0e4b35f561..99f3550b36 100644 --- a/io/zenoh-links/zenoh-link-udp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-udp/Cargo.toml @@ -25,10 +25,11 @@ description = "Internal crate for zenoh." # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +flume = { workspace = true } tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] } tokio-util = { workspace = true, features = ["rt"] } async-trait = { workspace = true } -tracing = {workspace = true} +tracing = { workspace = true } socket2 = { workspace = true } zenoh-buffers = { workspace = true } zenoh-core = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 08373af6d5..73256a2a4e 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -32,7 +32,6 @@ use zenoh_protocol::{ transport::BatchSize, }; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; -use zenoh_sync::Mvar; use super::{ get_udp_addrs, socket_addr_to_udp_locator, UDP_ACCEPT_THROTTLE_TIME, UDP_DEFAULT_MTU, @@ -70,13 +69,14 @@ impl LinkUnicastUdpConnected { struct LinkUnicastUdpUnconnected { socket: Weak, links: LinkHashMap, - input: Mvar, + input_tx: flume::Sender, + input_rx: flume::Receiver, leftover: AsyncMutex>, } impl LinkUnicastUdpUnconnected { async fn received(&self, buffer: Vec, len: usize) { - self.input.put((buffer, len)).await; + self.input_tx.send_async((buffer, len)).await.unwrap(); } async fn read(&self, buffer: &mut [u8]) -> ZResult { @@ -84,7 +84,7 @@ impl LinkUnicastUdpUnconnected { let (slice, start, len) = match guard.take() { Some(tuple) => tuple, None => { - let (slice, len) = self.input.take().await; + let (slice, len) = self.input_rx.recv_async().await.unwrap(); (slice, 0, len) } }; @@ -533,10 +533,12 @@ async fn accept_read_task( None => { // A new peers has sent data to this socket tracing::debug!("Accepted UDP connection on {}: {}", src_addr, dst_addr); + let (input_tx, input_rx) = flume::bounded(1); let unconnected = Arc::new(LinkUnicastUdpUnconnected { socket: Arc::downgrade(&socket), links: links.clone(), - input: Mvar::new(), + input_tx, + input_rx, leftover: AsyncMutex::new(None), }); zaddlink!(src_addr, dst_addr, Arc::downgrade(&unconnected)); diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 0a0a41cf91..0251c091f6 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -17,7 +17,7 @@ use zenoh_buffers::{ buffer::Buffer, reader::{DidntRead, HasReader}, writer::{DidntWrite, HasWriter, Writer}, - BBuf, ZBufReader, ZSlice, ZSliceBuffer, + BBuf, ZBufReader, ZSlice, }; use zenoh_codec::{ transport::batch::{BatchError, Zenoh080Batch}, @@ -427,12 +427,9 @@ pub struct RBatch { } impl RBatch { - pub fn new(config: BatchConfig, buffer: T) -> Self - where - T: Into, - { + pub fn new(config: BatchConfig, buffer: ZSlice) -> Self { Self { - buffer: buffer.into(), + buffer, codec: Zenoh080Batch::new(), config, } @@ -453,11 +450,7 @@ impl RBatch { zsplit!(buffer, config) } - pub fn initialize(&mut self, #[allow(unused_variables)] buff: C) -> ZResult<()> - where - C: Fn() -> T + Copy, - T: AsMut<[u8]> + ZSliceBuffer + 'static, - { + pub fn initialize(&mut self) -> ZResult<()> { #[allow(unused_variables)] let (l, h, p) = Self::split(self.buffer.as_slice(), &self.config); @@ -470,8 +463,11 @@ impl RBatch { let header = BatchHeader::new(b); if header.is_compression() { - let zslice = self.decompress(p, buff)?; - self.buffer = zslice; + let mut buffer = vec![0u8; self.config.mtu as usize]; + let n = lz4_flex::block::decompress_into(p, &mut buffer) + .map_err(|_| zerror!("Decompression error"))?; + self.buffer = ZSlice::new(Arc::new(buffer), 0, n) + .map_err(|_| zerror!("Invalid decompression buffer length"))?; return Ok(()); } } @@ -484,19 +480,6 @@ impl RBatch { Ok(()) } - - #[cfg(feature = "transport_compression")] - fn decompress(&self, payload: &[u8], mut buff: impl FnMut() -> T) -> ZResult - where - T: AsMut<[u8]> + ZSliceBuffer + 'static, - { - let mut into = (buff)(); - let n = lz4_flex::block::decompress_into(payload, into.as_mut()) - .map_err(|_| zerror!("Decompression error"))?; - let zslice = ZSlice::new(Arc::new(into), 0, n) - .map_err(|_| zerror!("Invalid decompression buffer length"))?; - Ok(zslice) - } } pub trait Decode { @@ -581,13 +564,9 @@ mod tests { }; println!("Finalized WBatch: {:02x?}", bytes); - let mut rbatch = RBatch::new(config, bytes.to_vec().into_boxed_slice()); + let mut rbatch = RBatch::new(config, bytes.to_vec().into()); println!("Decoded RBatch: {:?}", rbatch); - rbatch - .initialize(|| { - zenoh_buffers::vec::uninit(config.mtu as usize).into_boxed_slice() - }) - .unwrap(); + rbatch.initialize(config.mtu).unwrap(); println!("Initialized RBatch: {:?}", rbatch); let msg_out: TransportMessage = rbatch.decode().unwrap(); assert_eq!(msg_in, msg_out); diff --git a/io/zenoh-transport/src/common/mod.rs b/io/zenoh-transport/src/common/mod.rs index c7de8a64ce..2c857ce821 100644 --- a/io/zenoh-transport/src/common/mod.rs +++ b/io/zenoh-transport/src/common/mod.rs @@ -11,6 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // +use zenoh_buffers::ZSlice; + pub mod batch; pub(crate) mod defragmentation; pub(crate) mod pipeline; @@ -18,3 +20,20 @@ pub(crate) mod priority; pub(crate) mod seq_num; #[cfg(feature = "stats")] pub mod stats; + +/// Read bytes asynchronously into the buffer if it is not shared, or replacing it +/// with a new allocated one otherwise. +pub(crate) fn read_with_buffer<'a, F>( + buffer: &'a mut ZSlice, + read: impl FnOnce(&'a mut [u8]) -> F, +) -> F { + // SAFETY: the buffer slice range is not modified by read + unsafe { + if let Some(buf) = buffer.downcast_mut::>() { + // use an intermediate to circumvent current borrow checker limitation + return read(&mut *(buf.as_mut_slice() as *mut [u8])); + } + *buffer = vec![0u8; buffer.capacity()].into(); + read(buffer.downcast_mut::>().unwrap()) + } +} diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 6c4579568b..3ca9ccc5f5 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -114,7 +114,6 @@ pub struct TransportManagerConfig { pub queue_backoff: Duration, pub queue_alloc: QueueAllocConf, pub defrag_buff_size: usize, - pub link_rx_buffer_size: usize, pub unicast: TransportManagerConfigUnicast, pub multicast: TransportManagerConfigMulticast, pub endpoints: HashMap, // (protocol, config) @@ -146,7 +145,6 @@ pub struct TransportManagerBuilder { queue_size: QueueSizeConf, queue_alloc: QueueAllocConf, defrag_buff_size: usize, - link_rx_buffer_size: usize, unicast: TransportManagerBuilderUnicast, multicast: TransportManagerBuilderMulticast, endpoints: HashMap, // (protocol, config) @@ -218,11 +216,6 @@ impl TransportManagerBuilder { self } - pub fn link_rx_buffer_size(mut self, link_rx_buffer_size: usize) -> Self { - self.link_rx_buffer_size = link_rx_buffer_size; - self - } - pub fn endpoints(mut self, endpoints: HashMap) -> Self { self.endpoints = endpoints; self @@ -266,7 +259,6 @@ impl TransportManagerBuilder { *link.tx().queue().batching().time_limit(), )); self = self.defrag_buff_size(*link.rx().max_message_size()); - self = self.link_rx_buffer_size(*link.rx().buffer_size()); self = self.wait_before_drop(( duration_from_i64us(*cc_drop.wait_before_drop()), duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), @@ -336,7 +328,6 @@ impl TransportManagerBuilder { queue_backoff: self.batching_time_limit, queue_alloc: self.queue_alloc, defrag_buff_size: self.defrag_buff_size, - link_rx_buffer_size: self.link_rx_buffer_size, unicast: unicast.config, multicast: multicast.config, endpoints: self.endpoints, @@ -389,7 +380,6 @@ impl Default for TransportManagerBuilder { queue_alloc: queue.allocation, batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(), - link_rx_buffer_size: *link_rx.buffer_size(), endpoints: HashMap::new(), unicast: TransportManagerBuilderUnicast::default(), multicast: TransportManagerBuilderMulticast::default(), diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index ec26d1246d..4229a4189e 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -19,7 +19,7 @@ use std::{ }; use tokio::task::JoinHandle; -use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; +use zenoh_buffers::{BBuf, ZSlice}; use zenoh_core::{zcondfeat, zlock}; use zenoh_link::{LinkMulticast, Locator}; use zenoh_protocol::{ @@ -29,7 +29,7 @@ use zenoh_protocol::{ }, }; use zenoh_result::{zerror, ZResult}; -use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; +use zenoh_sync::Signal; #[cfg(feature = "stats")] use crate::stats::TransportStats; @@ -41,10 +41,10 @@ use crate::{ TransmissionPipelineProducer, }, priority::TransportPriorityTx, + read_with_buffer, }, multicast::transport::TransportMulticastInner, }; - /****************************/ /* TRANSPORT MULTICAST LINK */ /****************************/ @@ -192,31 +192,16 @@ pub(crate) struct TransportLinkMulticastRx { } impl TransportLinkMulticastRx { - pub async fn recv_batch(&self, buff: C) -> ZResult<(RBatch, Locator)> - where - C: Fn() -> T + Copy, - T: AsMut<[u8]> + ZSliceBuffer + 'static, - { + pub async fn recv_batch(&self, buffer: &mut ZSlice) -> ZResult<(RBatch, Locator)> { const ERR: &str = "Read error from link: "; - - let mut into = (buff)(); - let (n, locator) = self.inner.link.read(into.as_mut()).await?; - let buffer = ZSlice::new(Arc::new(into), 0, n).map_err(|_| zerror!("Error"))?; - let mut batch = RBatch::new(self.inner.config.batch, buffer); - batch.initialize(buff).map_err(|_| zerror!("{ERR}{self}"))?; + let (n, locator) = read_with_buffer(buffer, |buf| self.inner.link.read(buf)).await?; + let mut batch = RBatch::new( + self.inner.config.batch, + buffer.subslice(0..n).ok_or_else(|| zerror!("Error"))?, + ); + batch.initialize().map_err(|_| zerror!("{ERR}{self}"))?; Ok((batch, locator.into_owned())) } - - // pub async fn recv(&mut self) -> ZResult<(TransportMessage, Locator)> { - // let mtu = self.inner.config.mtu as usize; - // let (mut batch, locator) = self - // .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) - // .await?; - // let msg = batch - // .decode() - // .map_err(|_| zerror!("Decode error on link: {}", self))?; - // Ok((msg, locator)) - // } } impl fmt::Display for TransportLinkMulticastRx { @@ -357,7 +342,6 @@ impl TransportLinkMulticastUniversal { let c_link = self.link.clone(); let c_transport = self.transport.clone(); let c_signal = self.signal_rx.clone(); - let c_rx_buffer_size = self.transport.manager.config.link_rx_buffer_size; let handle = zenoh_runtime::ZRuntime::RX.spawn(async move { // Start the consume task @@ -365,7 +349,6 @@ impl TransportLinkMulticastUniversal { c_link.rx(), c_transport.clone(), c_signal.clone(), - c_rx_buffer_size, batch_size, ) .await; @@ -520,40 +503,16 @@ async fn tx_task( } async fn rx_task( - mut link: TransportLinkMulticastRx, + link: TransportLinkMulticastRx, transport: TransportMulticastInner, signal: Signal, - rx_buffer_size: usize, batch_size: BatchSize, ) -> ZResult<()> { - async fn read( - link: &mut TransportLinkMulticastRx, - pool: &RecyclingObjectPool, - ) -> ZResult<(RBatch, Locator)> - where - T: ZSliceBuffer + 'static, - F: Fn() -> T, - RecyclingObject: AsMut<[u8]> + ZSliceBuffer, - { - let (rbatch, locator) = link - .recv_batch(|| pool.try_take().unwrap_or_else(|| pool.alloc())) - .await?; - Ok((rbatch, locator)) - } - - // The pool of buffers - let mtu = link.inner.config.batch.mtu as usize; - let mut n = rx_buffer_size / mtu; - if n == 0 { - tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer."); - n = 1; - } - - let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); + let mut buffer = ZSlice::from(vec![0u8; link.inner.config.batch.mtu as usize]); loop { tokio::select! { _ = signal.wait() => break, - res = read(&mut link, &pool) => { + res = link.recv_batch(&mut buffer) => { let (batch, locator) = res?; #[cfg(feature = "stats")] diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 8b1b3004a9..306e209216 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -11,9 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{fmt, sync::Arc}; +use std::fmt; -use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; +use zenoh_buffers::{BBuf, ZSlice}; use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; use zenoh_protocol::{ @@ -22,7 +22,10 @@ use zenoh_protocol::{ }; use zenoh_result::{zerror, ZResult}; -use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}; +use crate::common::{ + batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}, + read_with_buffer, +}; #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub(crate) enum TransportLinkUnicastDirection { @@ -202,39 +205,36 @@ pub(crate) struct TransportLinkUnicastRx { } impl TransportLinkUnicastRx { - pub async fn recv_batch(&mut self, buff: C) -> ZResult - where - C: Fn() -> T + Copy, - T: AsMut<[u8]> + ZSliceBuffer + 'static, - { + pub async fn recv_batch(&mut self, buffer: &mut ZSlice) -> ZResult { const ERR: &str = "Read error from link: "; - let mut into = (buff)(); - let end = if self.link.is_streamed() { + let end = read_with_buffer(buffer, |buf: &mut [u8]| async { + if !self.link.is_streamed() { + return self.link.read(buf).await; + } // Read and decode the message length let mut len = BatchSize::MIN.to_le_bytes(); self.link.read_exact(&mut len).await?; let l = BatchSize::from_le_bytes(len) as usize; // Read the bytes - let slice = into - .as_mut() + let slice = buf .get_mut(len.len()..len.len() + l) .ok_or_else(|| zerror!("{ERR}{self}. Invalid batch length or buffer size."))?; self.link.read_exact(slice).await?; - len.len() + l - } else { - // Read the bytes - self.link.read(into.as_mut()).await? - }; - + Ok(len.len() + l) + }) + .await?; // tracing::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]); - let buffer = ZSlice::new(Arc::new(into), 0, end) - .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; - let mut batch = RBatch::new(self.config.batch, buffer); + let mut batch = RBatch::new( + self.config.batch, + buffer + .subslice(0..end) + .ok_or_else(|| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?, + ); batch - .initialize(buff) + .initialize() .map_err(|e| zerror!("{ERR}{self}. {e}."))?; // tracing::trace!("RBatch: {:?}", batch); @@ -244,9 +244,7 @@ impl TransportLinkUnicastRx { pub async fn recv(&mut self) -> ZResult { let mtu = self.config.batch.mtu as usize; - let mut batch = self - .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) - .await?; + let mut batch = self.recv_batch(&mut vec![0u8; mtu].into()).await?; let msg = batch .decode() .map_err(|_| zerror!("Decode error on link: {}", self))?; diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 2fe80b37f8..9e0f1b7788 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -26,7 +26,10 @@ use zenoh_runtime::ZRuntime; use super::transport::TransportUnicastLowlatency; #[cfg(feature = "stats")] use crate::stats::TransportStats; -use crate::unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx}; +use crate::{ + common::read_with_buffer, + unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx}, +}; pub(crate) async fn send_with_link( link: &LinkUnicast, @@ -139,7 +142,6 @@ impl TransportUnicastLowlatency { } pub(super) fn internal_start_rx(&self, lease: Duration) { - let rx_buffer_size = self.manager.config.link_rx_buffer_size; let token = self.token.child_token(); let c_transport = self.clone(); @@ -149,25 +151,11 @@ impl TransportUnicastLowlatency { drop(guard); let is_streamed = link_rx.link.is_streamed(); - - // The pool of buffers - let pool = { - let mtu = link_rx.config.batch.mtu as usize; - let mut n = rx_buffer_size / mtu; - if n == 0 { - tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link_rx} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer."); - n = 1; - } - zenoh_sync::RecyclingObjectPool::new(n, move || vec![0_u8; mtu].into_boxed_slice()) - }; - + let mut buffer = ZSlice::from(vec![0u8; link_rx.config.batch.mtu as usize]); loop { - // Retrieve one buffer - let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); - tokio::select! { // Async read from the underlying link - res = tokio::time::timeout(lease, read_with_link(&link_rx, &mut buffer, is_streamed)) => { + res = tokio::time::timeout(lease, read_with_buffer(&mut buffer, |buf| read_with_link(&link_rx, buf, is_streamed))) => { let bytes = res.map_err(|_| zerror!("{}: expired after {} milliseconds", link_rx, lease.as_millis()))??; #[cfg(feature = "stats")] { @@ -176,8 +164,7 @@ impl TransportUnicastLowlatency { } // Deserialize all the messages from the current ZBuf - let zslice = ZSlice::new(Arc::new(buffer), 0, bytes).unwrap(); - c_transport.read_messages(zslice, &link_rx.link).await?; + c_transport.read_messages(buffer.subslice(0..bytes).unwrap(), &link_rx.link).await?; } _ = token.cancelled() => { diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 1e8dc594de..54a5e31910 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -14,18 +14,17 @@ use std::time::Duration; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use zenoh_buffers::ZSliceBuffer; +use zenoh_buffers::ZSlice; use zenoh_link::Link; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; -use zenoh_sync::{RecyclingObject, RecyclingObjectPool}; #[cfg(feature = "stats")] use {crate::common::stats::TransportStats, std::sync::Arc}; use super::transport::TransportUnicastUniversal; use crate::{ common::{ - batch::{BatchConfig, RBatch}, + batch::BatchConfig, pipeline::{ TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, TransmissionPipelineProducer, @@ -122,14 +121,7 @@ impl TransportLinkUnicastUniversal { let token = self.token.clone(); let task = async move { // Start the consume task - let res = rx_task( - &mut rx, - transport.clone(), - lease, - transport.manager.config.link_rx_buffer_size, - token, - ) - .await; + let res = rx_task(&mut rx, transport.clone(), lease, token).await; // TODO(yuyuan): improve this callback if let Err(e) = res { @@ -241,42 +233,18 @@ async fn rx_task( link: &mut TransportLinkUnicastRx, transport: TransportUnicastUniversal, lease: Duration, - rx_buffer_size: usize, token: CancellationToken, ) -> ZResult<()> { - async fn read( - link: &mut TransportLinkUnicastRx, - pool: &RecyclingObjectPool, - ) -> ZResult - where - T: ZSliceBuffer + 'static, - F: Fn() -> T, - RecyclingObject: AsMut<[u8]> + ZSliceBuffer, - { - let batch = link - .recv_batch(|| pool.try_take().unwrap_or_else(|| pool.alloc())) - .await?; - Ok(batch) - } - - // The pool of buffers - let mtu = link.config.batch.mtu as usize; - let mut n = rx_buffer_size / mtu; - if n == 0 { - tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer."); - n = 1; - } - - let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); let l = Link::new_unicast( &link.link, link.config.priorities.clone(), link.config.reliability, ); + let mut buffer = ZSlice::from(vec![0u8; link.config.batch.mtu as usize]); loop { tokio::select! { - batch = tokio::time::timeout(lease, read(link, &pool)) => { + batch = tokio::time::timeout(lease, link.recv_batch(&mut buffer)) => { let batch = batch.map_err(|_| zerror!("{}: expired after {} milliseconds", link, lease.as_millis()))??; #[cfg(feature = "stats")] { From 5d9e3bfdd476cd95de98b33f936001f46093ffdd Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Tue, 11 Feb 2025 19:56:16 +0100 Subject: [PATCH 2/8] refactor: make read buffer a link field --- io/zenoh-transport/src/multicast/link.rs | 16 +++++++++------- io/zenoh-transport/src/unicast/link.rs | 13 +++++++------ io/zenoh-transport/src/unicast/universal/link.rs | 4 +--- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 4229a4189e..7825f8ee66 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -85,6 +85,7 @@ impl TransportLinkMulticast { pub(crate) fn rx(&self) -> TransportLinkMulticastRx { TransportLinkMulticastRx { inner: self.clone(), + buffer: vec![0u8; self.config.batch.mtu as usize].into(), } } @@ -188,16 +189,18 @@ impl fmt::Debug for TransportLinkMulticastTx { } pub(crate) struct TransportLinkMulticastRx { - pub(crate) inner: TransportLinkMulticast, + inner: TransportLinkMulticast, + buffer: ZSlice, } impl TransportLinkMulticastRx { - pub async fn recv_batch(&self, buffer: &mut ZSlice) -> ZResult<(RBatch, Locator)> { + pub async fn recv_batch(&mut self) -> ZResult<(RBatch, Locator)> { const ERR: &str = "Read error from link: "; - let (n, locator) = read_with_buffer(buffer, |buf| self.inner.link.read(buf)).await?; + let (n, locator) = + read_with_buffer(&mut self.buffer, |buf| self.inner.link.read(buf)).await?; let mut batch = RBatch::new( self.inner.config.batch, - buffer.subslice(0..n).ok_or_else(|| zerror!("Error"))?, + self.buffer.subslice(0..n).ok_or_else(|| zerror!("Error"))?, ); batch.initialize().map_err(|_| zerror!("{ERR}{self}"))?; Ok((batch, locator.into_owned())) @@ -503,16 +506,15 @@ async fn tx_task( } async fn rx_task( - link: TransportLinkMulticastRx, + mut link: TransportLinkMulticastRx, transport: TransportMulticastInner, signal: Signal, batch_size: BatchSize, ) -> ZResult<()> { - let mut buffer = ZSlice::from(vec![0u8; link.inner.config.batch.mtu as usize]); loop { tokio::select! { _ = signal.wait() => break, - res = link.recv_batch(&mut buffer) => { + res = link.recv_batch() => { let (batch, locator) = res?; #[cfg(feature = "stats")] diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 306e209216..18f98f204f 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -90,6 +90,7 @@ impl TransportLinkUnicast { TransportLinkUnicastRx { link: self.link.clone(), config: self.config.clone(), + buffer: vec![0u8; self.config.batch.mtu as usize].into(), } } @@ -202,13 +203,14 @@ impl fmt::Debug for TransportLinkUnicastTx { pub(crate) struct TransportLinkUnicastRx { pub(crate) link: LinkUnicast, pub(crate) config: TransportLinkUnicastConfig, + buffer: ZSlice, } impl TransportLinkUnicastRx { - pub async fn recv_batch(&mut self, buffer: &mut ZSlice) -> ZResult { + pub async fn recv_batch(&mut self) -> ZResult { const ERR: &str = "Read error from link: "; - let end = read_with_buffer(buffer, |buf: &mut [u8]| async { + let end = read_with_buffer(&mut self.buffer, |buf: &mut [u8]| async { if !self.link.is_streamed() { return self.link.read(buf).await; } @@ -220,7 +222,7 @@ impl TransportLinkUnicastRx { // Read the bytes let slice = buf .get_mut(len.len()..len.len() + l) - .ok_or_else(|| zerror!("{ERR}{self}. Invalid batch length or buffer size."))?; + .ok_or_else(|| zerror!("Invalid batch length or buffer size."))?; self.link.read_exact(slice).await?; Ok(len.len() + l) }) @@ -229,7 +231,7 @@ impl TransportLinkUnicastRx { let mut batch = RBatch::new( self.config.batch, - buffer + self.buffer .subslice(0..end) .ok_or_else(|| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?, ); @@ -243,8 +245,7 @@ impl TransportLinkUnicastRx { } pub async fn recv(&mut self) -> ZResult { - let mtu = self.config.batch.mtu as usize; - let mut batch = self.recv_batch(&mut vec![0u8; mtu].into()).await?; + let mut batch = self.recv_batch().await?; let msg = batch .decode() .map_err(|_| zerror!("Decode error on link: {}", self))?; diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 54a5e31910..adb46315d3 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -14,7 +14,6 @@ use std::time::Duration; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use zenoh_buffers::ZSlice; use zenoh_link::Link; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; @@ -241,10 +240,9 @@ async fn rx_task( link.config.reliability, ); - let mut buffer = ZSlice::from(vec![0u8; link.config.batch.mtu as usize]); loop { tokio::select! { - batch = tokio::time::timeout(lease, link.recv_batch(&mut buffer)) => { + batch = tokio::time::timeout(lease, link.recv_batch()) => { let batch = batch.map_err(|_| zerror!("{}: expired after {} milliseconds", link, lease.as_millis()))??; #[cfg(feature = "stats")] { From b44e5182de5dbe438b008b46cc0b9d0d610303cf Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 12 Feb 2025 11:18:21 +0100 Subject: [PATCH 3/8] feat: use small read buffer for streamed link --- io/zenoh-transport/src/unicast/link.rs | 50 ++++++++++++++++++++------ 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 18f98f204f..a37b960e6d 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::fmt; +use std::{fmt, mem::size_of}; use zenoh_buffers::{BBuf, ZSlice}; use zenoh_core::zcondfeat; @@ -27,6 +27,9 @@ use crate::common::{ read_with_buffer, }; +const SMALL_BUFFER_SIZE: usize = 1 << 11; +const SMALL_READ_COUNT_BEFORE_SHRINK: usize = 10; + #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub(crate) enum TransportLinkUnicastDirection { Inbound, @@ -87,10 +90,16 @@ impl TransportLinkUnicast { } pub(crate) fn rx(&self) -> TransportLinkUnicastRx { + let buffer_size = if self.config.batch.is_streamed { + SMALL_BUFFER_SIZE + } else { + self.config.batch.mtu as usize + }; TransportLinkUnicastRx { link: self.link.clone(), config: self.config.clone(), - buffer: vec![0u8; self.config.batch.mtu as usize].into(), + buffer: vec![0u8; buffer_size].into(), + small_buffer_count: 0, } } @@ -204,29 +213,50 @@ pub(crate) struct TransportLinkUnicastRx { pub(crate) link: LinkUnicast, pub(crate) config: TransportLinkUnicastConfig, buffer: ZSlice, + small_buffer_count: usize, } impl TransportLinkUnicastRx { pub async fn recv_batch(&mut self) -> ZResult { const ERR: &str = "Read error from link: "; + let mut new_buffer = None; let end = read_with_buffer(&mut self.buffer, |buf: &mut [u8]| async { if !self.link.is_streamed() { return self.link.read(buf).await; } // Read and decode the message length - let mut len = BatchSize::MIN.to_le_bytes(); - self.link.read_exact(&mut len).await?; - let l = BatchSize::from_le_bytes(len) as usize; - + let mut len_bytes = [0u8; size_of::()]; + self.link.read_exact(&mut len_bytes).await?; + let len = BatchSize::from_le_bytes(len_bytes) as usize; + let total_len = size_of::() + len; + // Realloc a new buffer if it's too small + let buf = if buf.len() < total_len { + new_buffer = Some(vec![0u8; total_len]); + new_buffer.as_mut().unwrap()[0..size_of::()].copy_from_slice(&len_bytes); + new_buffer.as_deref_mut().unwrap() + } else { + buf + }; // Read the bytes - let slice = buf - .get_mut(len.len()..len.len() + l) - .ok_or_else(|| zerror!("Invalid batch length or buffer size."))?; + let slice = &mut buf[size_of::()..total_len]; self.link.read_exact(slice).await?; - Ok(len.len() + l) + Ok(total_len) }) .await?; + if let Some(buffer) = new_buffer { + self.buffer = buffer.into(); + } + if end <= SMALL_BUFFER_SIZE { + self.small_buffer_count += 1; + if self.small_buffer_count == SMALL_READ_COUNT_BEFORE_SHRINK + && self.buffer.len() > SMALL_BUFFER_SIZE + { + self.buffer = vec![0u8; SMALL_BUFFER_SIZE].into(); + } + } else { + self.small_buffer_count = 0; + } // tracing::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]); let mut batch = RBatch::new( From 2c3e64e9ae2e7cb64c62e158454c35645f94e116 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 12 Feb 2025 11:26:33 +0100 Subject: [PATCH 4/8] fix: fix test --- io/zenoh-transport/src/common/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 0251c091f6..3a775f620f 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -566,7 +566,7 @@ mod tests { let mut rbatch = RBatch::new(config, bytes.to_vec().into()); println!("Decoded RBatch: {:?}", rbatch); - rbatch.initialize(config.mtu).unwrap(); + rbatch.initialize().unwrap(); println!("Initialized RBatch: {:?}", rbatch); let msg_out: TransportMessage = rbatch.decode().unwrap(); assert_eq!(msg_in, msg_out); From 15f661ebd7fcb53d3633e38604545b0386db5edf Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 12 Feb 2025 14:26:08 +0100 Subject: [PATCH 5/8] fix: remove zenoh-sync dependency from zenoh-link-udp --- Cargo.lock | 1 - io/zenoh-links/zenoh-link-udp/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4b5751cca..89135f4692 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5509,7 +5509,6 @@ dependencies = [ "zenoh-link-commons", "zenoh-protocol", "zenoh-result", - "zenoh-sync", "zenoh-util", ] diff --git a/io/zenoh-links/zenoh-link-udp/Cargo.toml b/io/zenoh-links/zenoh-link-udp/Cargo.toml index 99f3550b36..48ba575748 100644 --- a/io/zenoh-links/zenoh-link-udp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-udp/Cargo.toml @@ -36,5 +36,4 @@ zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } -zenoh-sync = { workspace = true } zenoh-util = { workspace = true } From 7a68e66cfc6200b6da3310caef1d7118f0f5bfe4 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 12 Feb 2025 15:05:39 +0100 Subject: [PATCH 6/8] fix: put buffer shrinking after the last buffer use --- io/zenoh-transport/src/unicast/link.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index a37b960e6d..81ee3b649f 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -27,7 +27,7 @@ use crate::common::{ read_with_buffer, }; -const SMALL_BUFFER_SIZE: usize = 1 << 11; +const SMALL_BUFFER_SIZE: usize = 1 << 5; const SMALL_READ_COUNT_BEFORE_SHRINK: usize = 10; #[derive(Clone, Copy, PartialEq, Eq, Debug)] @@ -247,16 +247,6 @@ impl TransportLinkUnicastRx { if let Some(buffer) = new_buffer { self.buffer = buffer.into(); } - if end <= SMALL_BUFFER_SIZE { - self.small_buffer_count += 1; - if self.small_buffer_count == SMALL_READ_COUNT_BEFORE_SHRINK - && self.buffer.len() > SMALL_BUFFER_SIZE - { - self.buffer = vec![0u8; SMALL_BUFFER_SIZE].into(); - } - } else { - self.small_buffer_count = 0; - } // tracing::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]); let mut batch = RBatch::new( @@ -269,6 +259,17 @@ impl TransportLinkUnicastRx { .initialize() .map_err(|e| zerror!("{ERR}{self}. {e}."))?; + if end <= SMALL_BUFFER_SIZE { + self.small_buffer_count += 1; + if self.small_buffer_count == SMALL_READ_COUNT_BEFORE_SHRINK + && self.buffer.len() > SMALL_BUFFER_SIZE + { + self.buffer = vec![0u8; SMALL_BUFFER_SIZE].into(); + } + } else { + self.small_buffer_count = 0; + } + // tracing::trace!("RBatch: {:?}", batch); Ok(batch) From 283d59ae5501115b36a194471ac282f0ed6a3cca Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 12 Feb 2025 15:14:39 +0100 Subject: [PATCH 7/8] fix: remove useless dependency of zenoh-sync --- Cargo.lock | 4 ---- commons/zenoh-sync/Cargo.toml | 10 +++------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 89135f4692..8b823599bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5743,10 +5743,6 @@ dependencies = [ "event-listener 5.3.1", "futures", "tokio", - "zenoh-buffers", - "zenoh-collections", - "zenoh-core", - "zenoh-result", ] [[package]] diff --git a/commons/zenoh-sync/Cargo.toml b/commons/zenoh-sync/Cargo.toml index 8124a184e7..78c110c599 100644 --- a/commons/zenoh-sync/Cargo.toml +++ b/commons/zenoh-sync/Cargo.toml @@ -18,9 +18,9 @@ version = { workspace = true } repository = { workspace = true } homepage = { workspace = true } authors = [ - "kydos ", - "Luca Cominardi ", - "Pierre Avital ", + "kydos ", + "Luca Cominardi ", + "Pierre Avital ", ] edition = { workspace = true } license = { workspace = true } @@ -32,10 +32,6 @@ description = "Internal crate for zenoh." event-listener = { workspace = true } futures = { workspace = true } tokio = { workspace = true, features = ["sync"] } -zenoh-buffers = { workspace = true } -zenoh-collections = { workspace = true, features = ["default"] } -zenoh-core = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread", "time"] } -zenoh-result = { workspace = true } From 238d90d3d6190bf07f04507e9daba434806af803 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 12 Feb 2025 18:06:19 +0100 Subject: [PATCH 8/8] refactor: remove zenoh-collections --- Cargo.lock | 8 +- Cargo.toml | 3 +- commons/zenoh-buffers/Cargo.toml | 2 +- commons/zenoh-buffers/src/zbuf.rs | 8 +- commons/zenoh-collections/Cargo.toml | 36 --- commons/zenoh-collections/README.md | 6 - commons/zenoh-collections/src/lib.rs | 34 -- commons/zenoh-collections/src/ring_buffer.rs | 85 ----- .../zenoh-collections/src/single_or_vec.rs | 300 ------------------ commons/zenoh-collections/src/stack_buffer.rs | 66 ---- zenoh/Cargo.toml | 30 +- zenoh/src/api/bytes.rs | 2 +- zenoh/src/api/handlers/ring.rs | 117 +++---- zenoh/src/api/session.rs | 9 +- 14 files changed, 81 insertions(+), 625 deletions(-) delete mode 100644 commons/zenoh-collections/Cargo.toml delete mode 100644 commons/zenoh-collections/README.md delete mode 100644 commons/zenoh-collections/src/lib.rs delete mode 100644 commons/zenoh-collections/src/ring_buffer.rs delete mode 100644 commons/zenoh-collections/src/single_or_vec.rs delete mode 100644 commons/zenoh-collections/src/stack_buffer.rs diff --git a/Cargo.lock b/Cargo.lock index 8b823599bb..9e149964f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5191,6 +5191,7 @@ dependencies = [ "rustc_version 0.4.1", "serde", "serde_json", + "smallvec", "socket2 0.5.7", "tokio", "tokio-util", @@ -5199,7 +5200,6 @@ dependencies = [ "vec_map", "zenoh-buffers", "zenoh-codec", - "zenoh-collections", "zenoh-config", "zenoh-core", "zenoh-keyexpr", @@ -5234,7 +5234,7 @@ name = "zenoh-buffers" version = "1.2.1" dependencies = [ "rand 0.8.5", - "zenoh-collections", + "smallvec", ] [[package]] @@ -5251,10 +5251,6 @@ dependencies = [ "zenoh-util", ] -[[package]] -name = "zenoh-collections" -version = "1.2.1" - [[package]] name = "zenoh-config" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 0e7cf279e7..b9632cfe38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ resolver = "2" members = [ "commons/zenoh-buffers", "commons/zenoh-codec", - "commons/zenoh-collections", "commons/zenoh-config", "commons/zenoh-core", "commons/zenoh-crypto", @@ -160,6 +159,7 @@ serde = { version = "1.0.210", default-features = false, features = [ serde_json = "1.0.128" serde_with = "3.12.0" serde_yaml = "0.9.34" +smallvec = "1.13.2" static_init = "1.0.3" stabby = "36.1.1" sha3 = "0.10.8" @@ -206,7 +206,6 @@ zenoh-util = { version = "1.2.1", path = "commons/zenoh-util" } zenoh-crypto = { version = "1.2.1", path = "commons/zenoh-crypto" } zenoh-codec = { version = "1.2.1", path = "commons/zenoh-codec" } zenoh-sync = { version = "1.2.1", path = "commons/zenoh-sync" } -zenoh-collections = { version = "1.2.1", path = "commons/zenoh-collections", default-features = false } zenoh-macros = { version = "1.2.1", path = "commons/zenoh-macros" } zenoh-plugin-trait = { version = "1.2.1", path = "plugins/zenoh-plugin-trait", default-features = false } zenoh_backend_traits = { version = "1.2.1", path = "plugins/zenoh-backend-traits", default-features = false } diff --git a/commons/zenoh-buffers/Cargo.toml b/commons/zenoh-buffers/Cargo.toml index b7a7c87634..22a2b8f1a4 100644 --- a/commons/zenoh-buffers/Cargo.toml +++ b/commons/zenoh-buffers/Cargo.toml @@ -32,4 +32,4 @@ test = ["rand"] [dependencies] rand = { workspace = true, optional = true } -zenoh-collections = { workspace = true, default-features = false } +smallvec = { workspace = true } diff --git a/commons/zenoh-buffers/src/zbuf.rs b/commons/zenoh-buffers/src/zbuf.rs index 1dff2486a5..d948a46e37 100644 --- a/commons/zenoh-buffers/src/zbuf.rs +++ b/commons/zenoh-buffers/src/zbuf.rs @@ -16,7 +16,7 @@ use core::{cmp, iter, num::NonZeroUsize, ptr::NonNull}; #[cfg(feature = "std")] use std::io; -use zenoh_collections::SingleOrVec; +use smallvec::SmallVec; use crate::{ buffer::{Buffer, SplitBuffer}, @@ -30,14 +30,14 @@ use crate::{ #[derive(Debug, Clone, Default, Eq)] pub struct ZBuf { - slices: SingleOrVec, + slices: SmallVec<[ZSlice; 1]>, } impl ZBuf { #[must_use] - pub const fn empty() -> Self { + pub fn empty() -> Self { Self { - slices: SingleOrVec::empty(), + slices: SmallVec::new(), } } diff --git a/commons/zenoh-collections/Cargo.toml b/commons/zenoh-collections/Cargo.toml deleted file mode 100644 index 8a7f2d795b..0000000000 --- a/commons/zenoh-collections/Cargo.toml +++ /dev/null @@ -1,36 +0,0 @@ -# -# Copyright (c) 2023 ZettaScale Technology -# -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License 2.0 which is available at -# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -# which is available at https://www.apache.org/licenses/LICENSE-2.0. -# -# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -# -# Contributors: -# ZettaScale Zenoh Team, -# -[package] -rust-version = { workspace = true } -name = "zenoh-collections" -version = { workspace = true } -repository = { workspace = true } -homepage = { workspace = true } -authors = [ - "kydos ", - "Luca Cominardi ", - "Pierre Avital ", -] -edition = { workspace = true } -license = { workspace = true } -categories = { workspace = true } -description = "Internal crate for zenoh." -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[features] -default = ["std"] -std = [] -test = [] - -[dependencies] diff --git a/commons/zenoh-collections/README.md b/commons/zenoh-collections/README.md deleted file mode 100644 index e97cecd513..0000000000 --- a/commons/zenoh-collections/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# ⚠️ WARNING ⚠️ - -This crate is intended for Zenoh's internal use. - -- [Click here for Zenoh's main repository](https://github.com/eclipse-zenoh/zenoh) -- [Click here for Zenoh's documentation](https://zenoh.io) diff --git a/commons/zenoh-collections/src/lib.rs b/commons/zenoh-collections/src/lib.rs deleted file mode 100644 index f19c10ad25..0000000000 --- a/commons/zenoh-collections/src/lib.rs +++ /dev/null @@ -1,34 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -//! ⚠️ WARNING ⚠️ -//! -//! This crate is intended for Zenoh's internal use. -//! -//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) -#![cfg_attr(not(feature = "std"), no_std)] -extern crate alloc; - -pub mod single_or_vec; -pub use single_or_vec::*; - -#[cfg(feature = "std")] -pub mod ring_buffer; -#[cfg(feature = "std")] -pub use ring_buffer::*; - -#[cfg(feature = "std")] -pub mod stack_buffer; -#[cfg(feature = "std")] -pub use stack_buffer::*; diff --git a/commons/zenoh-collections/src/ring_buffer.rs b/commons/zenoh-collections/src/ring_buffer.rs deleted file mode 100644 index e9f7909d5f..0000000000 --- a/commons/zenoh-collections/src/ring_buffer.rs +++ /dev/null @@ -1,85 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::collections::VecDeque; - -pub struct RingBuffer { - capacity: usize, - len: usize, - buffer: VecDeque, -} - -impl RingBuffer { - #[must_use] - pub fn new(capacity: usize) -> RingBuffer { - let buffer = VecDeque::::with_capacity(capacity); - RingBuffer { - capacity, - len: 0, - buffer, - } - } - - #[inline] - pub fn push(&mut self, elem: T) -> Option { - if self.len < self.capacity { - self.buffer.push_back(elem); - self.len += 1; - return None; - } - Some(elem) - } - - #[inline] - pub fn push_force(&mut self, elem: T) -> Option { - self.push(elem).and_then(|elem| { - let ret = self.buffer.pop_front(); - self.buffer.push_back(elem); - ret - }) - } - - #[inline] - pub fn pull(&mut self) -> Option { - let x = self.buffer.pop_front(); - if x.is_some() { - self.len -= 1; - } - x - } - - #[allow(dead_code)] - #[inline] - #[must_use] - pub fn is_empty(&self) -> bool { - self.buffer.is_empty() - } - - #[inline] - #[must_use] - pub fn is_full(&self) -> bool { - self.len() == self.capacity() - } - - #[inline] - #[must_use] - pub fn len(&self) -> usize { - self.len - } - - #[inline] - #[must_use] - pub fn capacity(&self) -> usize { - self.capacity - } -} diff --git a/commons/zenoh-collections/src/single_or_vec.rs b/commons/zenoh-collections/src/single_or_vec.rs deleted file mode 100644 index 203623491e..0000000000 --- a/commons/zenoh-collections/src/single_or_vec.rs +++ /dev/null @@ -1,300 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use alloc::vec; -#[cfg(not(feature = "std"))] -use alloc::vec::Vec; -use core::{ - cmp::PartialEq, - fmt, iter, - ops::{Index, IndexMut, RangeBounds}, - ptr, slice, -}; - -#[derive(Clone, Eq)] -enum SingleOrVecInner { - Single(T), - Vec(Vec), -} - -impl SingleOrVecInner { - const fn empty() -> Self { - SingleOrVecInner::Vec(Vec::new()) - } - - fn push(&mut self, value: T) { - match self { - SingleOrVecInner::Vec(vec) if vec.capacity() == 0 => *self = Self::Single(value), - SingleOrVecInner::Single(first) => unsafe { - let first = ptr::read(first); - ptr::write(self, Self::Vec(vec![first, value])); - }, - SingleOrVecInner::Vec(vec) => vec.push(value), - } - } -} - -impl PartialEq for SingleOrVecInner -where - T: PartialEq, -{ - fn eq(&self, other: &Self) -> bool { - self.as_ref() == other.as_ref() - } -} - -impl Default for SingleOrVecInner { - fn default() -> Self { - Self::empty() - } -} - -impl AsRef<[T]> for SingleOrVecInner { - fn as_ref(&self) -> &[T] { - match self { - SingleOrVecInner::Single(t) => slice::from_ref(t), - SingleOrVecInner::Vec(t) => t, - } - } -} - -impl AsMut<[T]> for SingleOrVecInner { - fn as_mut(&mut self) -> &mut [T] { - match self { - SingleOrVecInner::Single(t) => slice::from_mut(t), - SingleOrVecInner::Vec(t) => t, - } - } -} - -impl fmt::Debug for SingleOrVecInner -where - T: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.as_ref()) - } -} - -#[derive(Clone, PartialEq, Eq)] -pub struct SingleOrVec(SingleOrVecInner); - -impl SingleOrVec { - pub const fn empty() -> Self { - Self(SingleOrVecInner::empty()) - } - - pub fn push(&mut self, value: T) { - self.0.push(value); - } - - pub fn truncate(&mut self, len: usize) { - if let SingleOrVecInner::Vec(v) = &mut self.0 { - v.truncate(len); - } else if len == 0 { - self.0 = SingleOrVecInner::Vec(Vec::new()); - } - } - - pub fn clear(&mut self) { - self.truncate(0); - } - - pub fn len(&self) -> usize { - match &self.0 { - SingleOrVecInner::Single(_) => 1, - SingleOrVecInner::Vec(v) => v.len(), - } - } - - #[must_use] - pub fn is_empty(&self) -> bool { - matches!(&self.0, SingleOrVecInner::Vec(v) if v.is_empty()) - } - - fn vectorize(&mut self) -> &mut Vec { - if let SingleOrVecInner::Single(v) = &self.0 { - unsafe { - let v = core::ptr::read(v); - core::ptr::write(&mut self.0, SingleOrVecInner::Vec(vec![v])) - }; - } - let SingleOrVecInner::Vec(v) = &mut self.0 else { - unsafe { core::hint::unreachable_unchecked() } - }; - v - } - - pub fn get(&self, index: usize) -> Option<&T> { - match &self.0 { - SingleOrVecInner::Single(v) => (index == 0).then_some(v), - SingleOrVecInner::Vec(v) => v.get(index), - } - } - - pub fn get_mut(&mut self, index: usize) -> Option<&mut T> { - match &mut self.0 { - SingleOrVecInner::Single(v) => (index == 0).then_some(v), - SingleOrVecInner::Vec(v) => v.get_mut(index), - } - } - - pub fn last(&self) -> Option<&T> { - match &self.0 { - SingleOrVecInner::Single(v) => Some(v), - SingleOrVecInner::Vec(v) => v.last(), - } - } - - pub fn last_mut(&mut self) -> Option<&mut T> { - match &mut self.0 { - SingleOrVecInner::Single(v) => Some(v), - SingleOrVecInner::Vec(v) => v.last_mut(), - } - } - pub fn drain>(&mut self, range: Range) -> Drain { - match &mut self.0 { - this @ SingleOrVecInner::Single(_) if range.contains(&0) => Drain { - inner: DrainInner::Single(this), - }, - SingleOrVecInner::Vec(vec) => Drain { - inner: DrainInner::Vec(vec.drain(range)), - }, - _ => Drain { - inner: DrainInner::Done, - }, - } - } - pub fn insert(&mut self, at: usize, value: T) { - assert!(at <= self.len()); - self.vectorize().insert(at, value); - } -} - -enum DrainInner<'a, T> { - Vec(alloc::vec::Drain<'a, T>), - Single(&'a mut SingleOrVecInner), - Done, -} - -pub struct Drain<'a, T> { - inner: DrainInner<'a, T>, -} - -impl Iterator for Drain<'_, T> { - type Item = T; - - fn next(&mut self) -> Option { - match &mut self.inner { - DrainInner::Vec(drain) => drain.next(), - DrainInner::Single(inner) => match unsafe { core::ptr::read(*inner) } { - SingleOrVecInner::Single(value) => unsafe { - core::ptr::write(*inner, SingleOrVecInner::Vec(Vec::new())); - Some(value) - }, - SingleOrVecInner::Vec(_) => None, - }, - _ => None, - } - } -} -impl Drop for Drain<'_, T> { - fn drop(&mut self) { - if let DrainInner::Single(_) = self.inner { - self.next(); - } - } -} - -impl Default for SingleOrVec { - fn default() -> Self { - Self(SingleOrVecInner::default()) - } -} - -impl AsRef<[T]> for SingleOrVec { - fn as_ref(&self) -> &[T] { - self.0.as_ref() - } -} - -impl AsMut<[T]> for SingleOrVec { - fn as_mut(&mut self) -> &mut [T] { - self.0.as_mut() - } -} - -impl fmt::Debug for SingleOrVec -where - T: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -impl IntoIterator for SingleOrVec { - type Item = T; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - match self.0 { - SingleOrVecInner::Single(first) => IntoIter { - last: Some(first), - drain: Vec::new().into_iter(), - }, - SingleOrVecInner::Vec(v) => { - let mut it = v.into_iter(); - IntoIter { - last: it.next_back(), - drain: it, - } - } - } - } -} - -impl iter::Extend for SingleOrVec { - fn extend>(&mut self, iter: I) { - for value in iter { - self.push(value); - } - } -} - -pub struct IntoIter { - pub drain: alloc::vec::IntoIter, - pub last: Option, -} - -impl Iterator for IntoIter { - type Item = T; - fn next(&mut self) -> Option { - self.drain.next().or_else(|| self.last.take()) - } -} - -impl Index for SingleOrVec { - type Output = T; - - fn index(&self, index: usize) -> &Self::Output { - &self.as_ref()[index] - } -} - -impl IndexMut for SingleOrVec { - fn index_mut(&mut self, index: usize) -> &mut Self::Output { - &mut self.as_mut()[index] - } -} diff --git a/commons/zenoh-collections/src/stack_buffer.rs b/commons/zenoh-collections/src/stack_buffer.rs deleted file mode 100644 index dd3ae11d62..0000000000 --- a/commons/zenoh-collections/src/stack_buffer.rs +++ /dev/null @@ -1,66 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::collections::VecDeque; - -pub struct StackBuffer { - buffer: VecDeque, -} - -impl StackBuffer { - #[must_use] - pub fn new(capacity: usize) -> StackBuffer { - let buffer = VecDeque::::with_capacity(capacity); - StackBuffer { buffer } - } - - #[inline] - pub fn push(&mut self, elem: T) -> Option { - if self.len() < self.capacity() { - self.buffer.push_front(elem); - None - } else { - Some(elem) - } - } - - #[inline] - pub fn pop(&mut self) -> Option { - self.buffer.pop_front() - } - - #[allow(dead_code)] - #[inline] - #[must_use] - pub fn is_empty(&self) -> bool { - self.buffer.is_empty() - } - - #[inline] - #[must_use] - pub fn is_full(&self) -> bool { - self.len() == self.capacity() - } - - #[inline] - #[must_use] - pub fn len(&self) -> usize { - self.buffer.len() - } - - #[inline] - #[must_use] - pub fn capacity(&self) -> usize { - self.buffer.capacity() - } -} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index df59c792f2..a85c1b2deb 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -32,25 +32,25 @@ maintenance = { status = "actively-developed" } auth_pubkey = ["zenoh-transport/auth_pubkey"] auth_usrpwd = ["zenoh-transport/auth_usrpwd"] default = [ - "auth_pubkey", - "auth_usrpwd", - "transport_multilink", - "transport_compression", - "transport_quic", - "transport_tcp", - "transport_tls", - "transport_udp", - "transport_unixsock-stream", - "transport_ws" + "auth_pubkey", + "auth_usrpwd", + "transport_multilink", + "transport_compression", + "transport_quic", + "transport_tcp", + "transport_tls", + "transport_udp", + "transport_unixsock-stream", + "transport_ws" ] internal = ["zenoh-keyexpr/internal", "zenoh-config/internal"] plugins = [] runtime_plugins = ["plugins"] shared-memory = [ - "zenoh-shm", - "zenoh-protocol/shared-memory", - "zenoh-transport/shared-memory", - "zenoh-buffers/shared-memory", + "zenoh-shm", + "zenoh-protocol/shared-memory", + "zenoh-transport/shared-memory", + "zenoh-buffers/shared-memory", ] stats = ["zenoh-transport/stats", "zenoh-protocol/stats"] transport_multilink = ["zenoh-transport/transport_multilink"] @@ -88,12 +88,12 @@ rand = { workspace = true, features = ["default"] } ref-cast = { workspace = true } serde = { workspace = true, features = ["default"] } serde_json = { workspace = true } +smallvec = { workspace = true } socket2 = { workspace = true } uhlc = { workspace = true, features = ["default"] } vec_map = { workspace = true } zenoh-buffers = { workspace = true, features = ["std"] } zenoh-codec = { workspace = true } -zenoh-collections = { workspace = true, features = ["std"] } zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-keyexpr = { workspace = true } diff --git a/zenoh/src/api/bytes.rs b/zenoh/src/api/bytes.rs index 6f6e973fcc..207300dee6 100644 --- a/zenoh/src/api/bytes.rs +++ b/zenoh/src/api/bytes.rs @@ -94,7 +94,7 @@ pub struct ZBytes(ZBuf); impl ZBytes { /// Create an empty ZBytes. - pub const fn new() -> Self { + pub fn new() -> Self { Self(ZBuf::empty()) } diff --git a/zenoh/src/api/handlers/ring.rs b/zenoh/src/api/handlers/ring.rs index 95654e57d6..e6a8326178 100644 --- a/zenoh/src/api/handlers/ring.rs +++ b/zenoh/src/api/handlers/ring.rs @@ -14,11 +14,11 @@ //! Callback handler trait. use std::{ - sync::{Arc, Weak}, + collections::VecDeque, + sync::Arc, time::{Duration, Instant}, }; -use zenoh_collections::RingBuffer; use zenoh_result::ZResult; use crate::api::{ @@ -48,27 +48,23 @@ impl Default for RingChannel { } struct RingChannelInner { - ring: std::sync::Mutex>, - not_empty: flume::Receiver<()>, + ring: std::sync::Mutex>>, + capacity: usize, + not_empty_rx: flume::Receiver<()>, } -pub struct RingChannelHandler { - ring: Weak>, -} +pub struct RingChannelHandler(Arc>); impl RingChannelHandler { /// Receive from the ring channel. /// /// If the ring channel is empty, this call will block until an element is available in the channel. pub fn recv(&self) -> ZResult { - let Some(channel) = self.ring.upgrade() else { - bail!("The ringbuffer has been deleted."); - }; loop { - if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { + if let Some(t) = self.try_recv()? { return Ok(t); } - channel.not_empty.recv().map_err(|e| zerror!("{}", e))?; + self.0.not_empty_rx.recv()? } } @@ -77,18 +73,14 @@ impl RingChannelHandler { /// If the ring channel is empty, this call will block until an element is available in the channel, /// or return `None` if the deadline has passed. pub fn recv_deadline(&self, deadline: Instant) -> ZResult> { - let Some(channel) = self.ring.upgrade() else { - bail!("The ringbuffer has been deleted."); - }; - loop { - if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { + if let Some(t) = self.try_recv()? { return Ok(Some(t)); } - match channel.not_empty.recv_deadline(deadline) { - Ok(()) => {} + match self.0.not_empty_rx.recv_deadline(deadline) { + Ok(()) => continue, Err(flume::RecvTimeoutError::Timeout) => return Ok(None), - Err(err) => bail!("{}", err), + Err(err) => return Err(err.into()), } } } @@ -98,38 +90,18 @@ impl RingChannelHandler { /// If the ring channel is empty, this call will block until an element is available in the channel, /// or return `None` if the deadline has expired. pub fn recv_timeout(&self, timeout: Duration) -> ZResult> { - let Some(channel) = self.ring.upgrade() else { - bail!("The ringbuffer has been deleted."); - }; - - loop { - if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { - return Ok(Some(t)); - } - match channel.not_empty.recv_timeout(timeout) { - Ok(()) => {} - Err(flume::RecvTimeoutError::Timeout) => return Ok(None), - Err(err) => bail!("{}", err), - } - } + self.recv_deadline(Instant::now() + timeout) } /// Receive from the ring channel. /// /// If the ring channel is empty, this call will block until an element is available in the channel. pub async fn recv_async(&self) -> ZResult { - let Some(channel) = self.ring.upgrade() else { - bail!("The ringbuffer has been deleted."); - }; loop { - if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { + if let Some(t) = self.try_recv()? { return Ok(t); } - channel - .not_empty - .recv_async() - .await - .map_err(|e| zerror!("{}", e))?; + self.0.not_empty_rx.recv_async().await? } } @@ -137,11 +109,34 @@ impl RingChannelHandler { /// /// If the ring channel is empty, this call will return immediately without blocking. pub fn try_recv(&self) -> ZResult> { - let Some(channel) = self.ring.upgrade() else { - bail!("The ringbuffer has been deleted."); - }; - let mut guard = channel.ring.lock().map_err(|e| zerror!("{}", e))?; - Ok(guard.pull()) + let mut opt_buffer = self.0.ring.lock().unwrap(); + let buffer = opt_buffer + .as_mut() + .ok_or_else(|| zerror!("The ringbuffer has been deleted."))?; + Ok(buffer.pop_front()) + } +} + +struct RingChannelCallback { + inner: Arc>, + not_empty_tx: flume::Sender<()>, +} + +impl RingChannelCallback { + fn push(&self, value: T) { + let mut guard = self.inner.ring.lock().unwrap(); + let buffer = guard.as_mut().unwrap(); + if buffer.len() == self.inner.capacity { + buffer.pop_front(); + } + buffer.push_back(value); + let _ = self.not_empty_tx.try_send(()); + } +} + +impl Drop for RingChannelCallback { + fn drop(&mut self) { + self.inner.ring.lock().unwrap().take(); } } @@ -149,25 +144,17 @@ impl IntoHandler for RingChannel { type Handler = RingChannelHandler; fn into_handler(self) -> (Callback, Self::Handler) { - let (sender, receiver) = flume::bounded(1); + let (not_empty_tx, not_empty_rx) = flume::bounded(1); let inner = Arc::new(RingChannelInner { - ring: std::sync::Mutex::new(RingBuffer::new(self.capacity)), - not_empty: receiver, + ring: std::sync::Mutex::new(Some(VecDeque::with_capacity(self.capacity))), + capacity: self.capacity, + not_empty_rx, }); - let receiver = RingChannelHandler { - ring: Arc::downgrade(&inner), + let handler = RingChannelHandler(inner.clone()); + let callback = RingChannelCallback { + inner, + not_empty_tx, }; - ( - Callback::new(Arc::new(move |t| match inner.ring.lock() { - Ok(mut g) => { - // Eventually drop the oldest element. - g.push_force(t); - drop(g); - let _ = sender.try_send(()); - } - Err(e) => tracing::error!("{}", e), - })), - receiver, - ) + (Callback::new(Arc::new(move |t| callback.push(t))), handler) } } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 907ef70651..bdd1ed7cb1 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -27,12 +27,12 @@ use async_trait::async_trait; #[zenoh_macros::internal] use ref_cast::ref_cast_custom; use ref_cast::RefCastCustom; +use smallvec::SmallVec; use tracing::{error, info, trace, warn}; use uhlc::Timestamp; #[cfg(feature = "internal")] use uhlc::HLC; use zenoh_buffers::ZBuf; -use zenoh_collections::SingleOrVec; use zenoh_config::{qos::PublisherQoSConfig, unwrap_or_default, wrappers::ZenohId}; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; use zenoh_keyexpr::keyexpr_tree::KeBoxTree; @@ -2064,7 +2064,7 @@ impl SessionInner { #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) { - let mut callbacks = SingleOrVec::default(); + let mut callbacks = SmallVec::<[_; 1]>::default(); let state = zread!(self.state); if state.primitives.is_none() { return; // Session closing or closed @@ -2119,8 +2119,9 @@ impl SessionInner { reliability, attachment, ); - let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); - for (cb, key_expr) in drain { + + let last = callbacks.pop(); + for (cb, key_expr) in callbacks { sample.key_expr = key_expr; cb.call(sample.clone()); }