Skip to content

Commit 5dc2f55

Browse files
committed
Don't clone in non-contiguous case
1 parent 7a2492a commit 5dc2f55

File tree

3 files changed

+128
-79
lines changed

3 files changed

+128
-79
lines changed

commons/zenoh-buffers/src/zbuf.rs

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
// ZettaScale Zenoh Team, <[email protected]>
1313
//
1414
use alloc::{sync::Arc, vec::Vec};
15-
use core::{cmp, iter, num::NonZeroUsize, ptr::NonNull};
15+
use core::{
16+
cmp::{self, Ordering},
17+
iter,
18+
num::NonZeroUsize,
19+
ptr::NonNull,
20+
};
1621
#[cfg(feature = "std")]
1722
use std::io;
1823

@@ -82,6 +87,17 @@ impl ZBuf {
8287
fn opt_zslice_writer(&mut self) -> Option<ZSliceWriter> {
8388
self.slices.last_mut().and_then(|s| s.writer())
8489
}
90+
91+
pub fn chunks(&self, chunk_size: usize) -> ZBufChunkIterator<'_> {
92+
assert_ne!(chunk_size, 0, "cannot split ZBuf into chunks of size 0");
93+
94+
ZBufChunkIterator {
95+
chunk_size,
96+
buf: self,
97+
slice_cursor: 0,
98+
byte_cursor: 0,
99+
}
100+
}
85101
}
86102

87103
// Buffer
@@ -669,3 +685,97 @@ mod tests {
669685
reader.seek(std::io::SeekFrom::Current(-100)).unwrap_err();
670686
}
671687
}
688+
689+
pub struct ZBufChunkIterator<'a> {
690+
chunk_size: usize,
691+
buf: &'a ZBuf,
692+
slice_cursor: usize,
693+
byte_cursor: usize,
694+
}
695+
696+
impl ZBufChunkIterator<'_> {
697+
fn inc_slice_cursor(&mut self) {
698+
self.slice_cursor += 1;
699+
self.byte_cursor = 0;
700+
}
701+
702+
fn inc_byte_cursor(&mut self, mut amt: usize) {
703+
while let Some(slice) = self.buf.slices.get(self.slice_cursor) {
704+
let rem = slice.len() - self.byte_cursor;
705+
706+
match rem.cmp(&amt) {
707+
Ordering::Less => {
708+
self.inc_slice_cursor();
709+
amt -= rem;
710+
continue;
711+
}
712+
Ordering::Equal => {
713+
self.inc_slice_cursor();
714+
break;
715+
}
716+
Ordering::Greater => {
717+
self.byte_cursor += amt;
718+
break;
719+
}
720+
}
721+
}
722+
}
723+
}
724+
725+
impl Iterator for ZBufChunkIterator<'_> {
726+
type Item = ZBuf;
727+
728+
fn next(&mut self) -> Option<Self::Item> {
729+
let slice = self.buf.slices.get(self.slice_cursor)?;
730+
731+
let rem = slice.len() - self.byte_cursor;
732+
733+
match rem.cmp(&self.chunk_size) {
734+
Ordering::Less => {
735+
let mut buf = ZBuf::empty();
736+
let subslice = slice.subslice(self.byte_cursor..).unwrap();
737+
buf.push_zslice(subslice);
738+
self.inc_slice_cursor();
739+
740+
while let Some(slice) = self.buf.slices.get(self.slice_cursor) {
741+
let missing = self.chunk_size - buf.len();
742+
743+
match slice.len().cmp(&missing) {
744+
Ordering::Less => {
745+
buf.push_zslice(slice.clone());
746+
self.inc_slice_cursor();
747+
continue;
748+
}
749+
Ordering::Equal => {
750+
buf.push_zslice(slice.clone());
751+
self.inc_slice_cursor();
752+
break;
753+
}
754+
Ordering::Greater => {
755+
let subslice = slice.subslice(..missing).unwrap();
756+
buf.push_zslice(subslice);
757+
self.inc_byte_cursor(missing);
758+
break;
759+
}
760+
}
761+
}
762+
763+
Some(buf)
764+
}
765+
Ordering::Equal => {
766+
let subslice = slice.subslice(self.byte_cursor..).unwrap();
767+
768+
self.inc_slice_cursor();
769+
Some(ZBuf::from(subslice))
770+
}
771+
Ordering::Greater => {
772+
let subslice = slice
773+
.subslice(self.byte_cursor..self.byte_cursor + self.chunk_size)
774+
.unwrap();
775+
776+
self.inc_byte_cursor(self.chunk_size);
777+
Some(ZBuf::from(subslice))
778+
}
779+
}
780+
}
781+
}

commons/zenoh-buffers/src/zslice.rs

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -198,22 +198,6 @@ impl ZSlice {
198198
None
199199
}
200200
}
201-
202-
pub fn chunks(self, chunk_size: usize) -> impl Iterator<Item = Self> {
203-
assert_ne!(chunk_size, 0, "cannot split ZSlice into chunks of size 0");
204-
205-
let n = self.len() / chunk_size;
206-
let rem = self.len() % chunk_size;
207-
let n = if rem > 0 { n + 1 } else { n };
208-
209-
(0..n).map(move |i| ZSlice {
210-
buf: self.buf.clone(),
211-
start: self.start + i * chunk_size,
212-
end: usize::min(self.end, self.start + (i + 1) * chunk_size),
213-
#[cfg(feature = "shared-memory")]
214-
kind: ZSliceKind::Raw,
215-
})
216-
}
217201
}
218202

219203
impl Deref for ZSlice {
@@ -442,58 +426,3 @@ impl ZSlice {
442426
(0..len).map(|_| rng.gen()).collect::<Vec<u8>>().into()
443427
}
444428
}
445-
446-
#[cfg(test)]
447-
mod tests {
448-
use super::*;
449-
450-
#[test]
451-
fn zslice() {
452-
let buf = crate::vec::uninit(16);
453-
let mut zslice: ZSlice = buf.clone().into();
454-
assert_eq!(buf.as_slice(), zslice.as_slice());
455-
456-
// SAFETY: buffer slize size is not modified
457-
let mut_slice = unsafe { zslice.downcast_mut::<Vec<u8>>() }.unwrap();
458-
459-
mut_slice[..buf.len()].clone_from_slice(&buf[..]);
460-
461-
assert_eq!(buf.as_slice(), zslice.as_slice());
462-
}
463-
464-
#[test]
465-
fn test_chunks_inexact_division() {
466-
let vec = (0..15).collect::<Vec<_>>();
467-
let zslice = ZSlice::from(vec);
468-
469-
const EXPECTED_CHUNKS: &[&[u8]] =
470-
&[&[0, 1, 2, 3], &[4, 5, 6, 7], &[8, 9, 10, 11], &[12, 13, 14]];
471-
const CHUNK_SIZE: usize = 4;
472-
473-
for (found, expected) in zslice.chunks(CHUNK_SIZE).zip(EXPECTED_CHUNKS) {
474-
assert_eq!(found.as_slice(), *expected);
475-
assert!(found.len() <= CHUNK_SIZE);
476-
}
477-
}
478-
479-
#[test]
480-
fn test_chunks_exact_division() {
481-
let vec = (0..8).collect::<Vec<_>>();
482-
let zslice = ZSlice::from(vec);
483-
484-
const EXPECTED_CHUNKS: &[&[u8]] = &[&[0, 1], &[2, 3], &[4, 5], &[6, 7]];
485-
const CHUNK_SIZE: usize = 2;
486-
487-
for (found, expected) in zslice.chunks(CHUNK_SIZE).zip(EXPECTED_CHUNKS) {
488-
assert_eq!(found.as_slice(), *expected);
489-
assert!(found.len() <= CHUNK_SIZE);
490-
}
491-
}
492-
493-
#[test]
494-
fn test_chunks_empty() {
495-
let zslice = ZSlice::empty();
496-
497-
assert_eq!(zslice.chunks(3).next(), None);
498-
}
499-
}

zenoh/src/api/bytes.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{borrow::Cow, fmt::Debug, mem, str::Utf8Error};
1818
use zenoh_buffers::{
1919
buffer::{Buffer, SplitBuffer},
2020
reader::{HasReader, Reader},
21-
ZBuf, ZBufReader, ZSlice, ZSliceBuffer,
21+
ZBuf, ZBufChunkIterator, ZBufReader, ZSlice, ZSliceBuffer,
2222
};
2323
use zenoh_protocol::zenoh::ext::AttachmentType;
2424

@@ -213,7 +213,7 @@ impl ZBytes {
213213

214214
/// Return an iterator of chunks in [`ZBytes`].
215215
///
216-
/// This method does not entail cloning the underlying buffer unless [`ZBytes`] is not
216+
/// This method does not entail cloning the underlying buffer even if [`ZBytes`] is not
217217
/// contiguous.
218218
///
219219
/// All chunks have size less then or equal to `chunk_size`.
@@ -222,11 +222,8 @@ impl ZBytes {
222222
///
223223
/// Panics if `chunk_size` is zero.
224224
#[zenoh_macros::unstable]
225-
pub fn chunks(&self, chunk_size: usize) -> impl Iterator<Item = ZBytes> {
226-
self.0
227-
.to_zslice()
228-
.chunks(chunk_size)
229-
.map(|zs| ZBytes::from(ZBuf::from(zs)))
225+
pub fn chunks(&self, chunk_size: usize) -> ZBytesChunkIterator {
226+
ZBytesChunkIterator(self.0.chunks(chunk_size))
230227
}
231228
}
232229

@@ -507,3 +504,16 @@ impl<const ID: u8> From<AttachmentType<ID>> for ZBytes {
507504
this.buffer.into()
508505
}
509506
}
507+
508+
/// An iterator of chunks in [`ZBytes`], see [`ZBytes::chunks`].
509+
#[zenoh_macros::unstable]
510+
pub struct ZBytesChunkIterator<'a>(ZBufChunkIterator<'a>);
511+
512+
#[zenoh_macros::unstable]
513+
impl Iterator for ZBytesChunkIterator<'_> {
514+
type Item = ZBytes;
515+
516+
fn next(&mut self) -> Option<Self::Item> {
517+
self.0.next().map(ZBytes::from)
518+
}
519+
}

0 commit comments

Comments
 (0)