Skip to content

Optimize shm watchdog storage #1869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const_format = "0.2.33"
criterion = "0.5"
crossbeam-utils = "0.8.20"
crossbeam-queue = "0.3.12"
crossbeam-channel = "0.5"
static_assertions = "1.1.0"
derive_more = { version = "1.0.0", features = ["as_ref"] }
derive-new = "0.7.0"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-shm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ num_cpus = { workspace = true, optional = true }
thread-priority = { workspace = true }
stabby = { workspace = true }
crossbeam-queue = { workspace = true }
crossbeam-channel = { workspace = true }
static_assertions = { workspace = true }

[target.'cfg(unix)'.dependencies]
advisory-lock = { workspace = true }
Expand Down
20 changes: 10 additions & 10 deletions commons/zenoh-shm/src/api/client_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

use std::{
collections::HashMap,
collections::BTreeMap,
sync::{Arc, RwLock},
};

Expand All @@ -28,7 +28,7 @@ use crate::{
posix_shm_client::PosixShmClient, protocol_id::POSIX_PROTOCOL_ID,
},
},
reader::{ClientStorage, GlobalDataSegmentID},
reader::{ClientStorage, GlobalDataSegmentId},
};

#[dynamic(lazy, drop)]
Expand All @@ -53,7 +53,7 @@ impl ShmClientSetBuilder {
id: ProtocolID,
client: Arc<dyn ShmClient>,
) -> ShmClientStorageBuilder {
let clients = HashMap::from([(id, client)]);
let clients = BTreeMap::from([(id, client)]);
ShmClientStorageBuilder::new(clients)
}

Expand All @@ -70,7 +70,7 @@ impl ShmClientSetBuilder {
/// Include default clients
#[zenoh_macros::unstable_doc]
pub fn with_default_client_set(self) -> ShmClientStorageBuilder {
let clients = HashMap::from([(
let clients = BTreeMap::from([(
POSIX_PROTOCOL_ID,
Arc::new(PosixShmClient {}) as Arc<dyn ShmClient>,
)]);
Expand All @@ -80,22 +80,22 @@ impl ShmClientSetBuilder {

#[zenoh_macros::unstable_doc]
pub struct ShmClientStorageBuilder {
clients: HashMap<ProtocolID, Arc<dyn ShmClient>>,
clients: BTreeMap<ProtocolID, Arc<dyn ShmClient>>,
}

impl ShmClientStorageBuilder {
fn new(clients: HashMap<ProtocolID, Arc<dyn ShmClient>>) -> Self {
fn new(clients: BTreeMap<ProtocolID, Arc<dyn ShmClient>>) -> Self {
Self { clients }
}

/// Add client to the storage
#[zenoh_macros::unstable_doc]
pub fn with_client(mut self, id: ProtocolID, client: Arc<dyn ShmClient>) -> ZResult<Self> {
match self.clients.entry(id) {
std::collections::hash_map::Entry::Occupied(occupied) => {
std::collections::btree_map::Entry::Occupied(occupied) => {
bail!("Client already exists for id {id}: {:?}!", occupied)
}
std::collections::hash_map::Entry::Vacant(vacant) => {
std::collections::btree_map::Entry::Vacant(vacant) => {
vacant.insert(client as Arc<dyn ShmClient>);
Ok(self)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ impl ShmClientStorageBuilder {
#[derive(Debug)]
pub struct ShmClientStorage {
pub(crate) clients: ClientStorage<Arc<dyn ShmClient>>,
pub(crate) segments: RwLock<HashMap<GlobalDataSegmentID, Arc<dyn ShmSegment>>>,
pub(crate) segments: RwLock<BTreeMap<GlobalDataSegmentId, Arc<dyn ShmSegment>>>,
}

impl Eq for ShmClientStorage {}
Expand All @@ -147,7 +147,7 @@ impl ShmClientStorage {
self.clients.get_clients().keys().copied().collect()
}

fn new(clients: HashMap<ProtocolID, Arc<dyn ShmClient>>) -> Self {
fn new(clients: BTreeMap<ProtocolID, Arc<dyn ShmClient>>) -> Self {
Self {
clients: ClientStorage::new(clients),
segments: RwLock::default(),
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
marker::PhantomData,
num::NonZeroUsize,
pin::Pin,
sync::{atomic::Ordering, Arc, Mutex},
sync::{atomic::Ordering, Mutex},
time::Duration,
};

Expand Down Expand Up @@ -937,7 +937,7 @@ where

// Create buffer
let shmb = ShmBufInner {
metadata: Arc::new(confirmed_metadata),
metadata: confirmed_metadata,
buf: chunk.data,
info,
};
Expand Down
7 changes: 2 additions & 5 deletions commons/zenoh-shm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
use std::{
any::Any,
num::NonZeroUsize,
sync::{
atomic::{AtomicPtr, Ordering},
Arc,
},
sync::atomic::{AtomicPtr, Ordering},
};

use api::common::types::ProtocolID;
Expand Down Expand Up @@ -94,7 +91,7 @@ impl ShmBufInfo {

/// A zenoh buffer in shared memory.
pub struct ShmBufInner {
pub(crate) metadata: Arc<ConfirmedDescriptor>,
pub(crate) metadata: ConfirmedDescriptor,
pub(crate) buf: AtomicPtr<u8>,
pub info: ShmBufInfo,
}
Expand Down
99 changes: 71 additions & 28 deletions commons/zenoh-shm/src/metadata/descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::{atomic::AtomicU64, Arc};
use std::{
ops::Deref,
sync::{atomic::AtomicU64, Arc},
};

use super::segment::MetadataSegment;
use crate::header::chunk_header::ChunkHeaderType;
Expand All @@ -36,33 +39,47 @@ impl From<&OwnedMetadataDescriptor> for MetadataDescriptor {
}

#[derive(Clone)]
pub struct OwnedMetadataDescriptor {
pub(crate) segment: Arc<MetadataSegment>,
header: &'static ChunkHeaderType,
pub struct OwnedWatchdog {
watchdog_atomic: &'static AtomicU64,
watchdog_mask: u64,
}

impl OwnedMetadataDescriptor {
pub(crate) fn new(
segment: Arc<MetadataSegment>,
header: &'static ChunkHeaderType,
watchdog_atomic: &'static AtomicU64,
watchdog_mask: u64,
) -> Self {
// The ordering strategy is important. See storage implementation for details
impl Ord for OwnedWatchdog {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self
.watchdog_atomic
.as_ptr()
.cmp(&other.watchdog_atomic.as_ptr())
{
core::cmp::Ordering::Equal => self.watchdog_mask.cmp(&other.watchdog_mask),
ord => ord,
}
}
}

impl PartialOrd for OwnedWatchdog {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl PartialEq for OwnedWatchdog {
fn eq(&self, other: &Self) -> bool {
self.watchdog_atomic.as_ptr() == other.watchdog_atomic.as_ptr()
&& self.watchdog_mask == other.watchdog_mask
}
}
impl Eq for OwnedWatchdog {}

impl OwnedWatchdog {
pub fn new(watchdog_atomic: &'static AtomicU64, watchdog_mask: u64) -> Self {
Self {
segment,
header,
watchdog_atomic,
watchdog_mask,
}
}

#[inline(always)]
pub fn header(&self) -> &ChunkHeaderType {
self.header
}

pub fn confirm(&self) {
self.watchdog_atomic
.fetch_or(self.watchdog_mask, std::sync::atomic::Ordering::SeqCst);
Expand All @@ -73,6 +90,40 @@ impl OwnedMetadataDescriptor {
.fetch_and(!self.watchdog_mask, std::sync::atomic::Ordering::SeqCst)
& self.watchdog_mask
}
}

#[derive(Clone)]
pub struct OwnedMetadataDescriptor {
pub(crate) segment: Arc<MetadataSegment>,
header: &'static ChunkHeaderType,
watchdog: OwnedWatchdog,
}

impl Deref for OwnedMetadataDescriptor {
type Target = OwnedWatchdog;

fn deref(&self) -> &Self::Target {
&self.watchdog
}
}

impl OwnedMetadataDescriptor {
pub(crate) fn new(
segment: Arc<MetadataSegment>,
header: &'static ChunkHeaderType,
watchdog: OwnedWatchdog,
) -> Self {
Self {
segment,
header,
watchdog,
}
}

#[inline(always)]
pub fn header(&self) -> &ChunkHeaderType {
self.header
}

#[cfg(feature = "test")]
pub fn test_validate(&self) -> u64 {
Expand All @@ -83,14 +134,7 @@ impl OwnedMetadataDescriptor {
// The ordering strategy is important. See storage implementation for details
impl Ord for OwnedMetadataDescriptor {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self
.watchdog_atomic
.as_ptr()
.cmp(&other.watchdog_atomic.as_ptr())
{
core::cmp::Ordering::Equal => self.watchdog_mask.cmp(&other.watchdog_mask),
ord => ord,
}
self.watchdog.cmp(&other.watchdog)
}
}

Expand All @@ -102,8 +146,7 @@ impl PartialOrd for OwnedMetadataDescriptor {

impl PartialEq for OwnedMetadataDescriptor {
fn eq(&self, other: &Self) -> bool {
self.watchdog_atomic.as_ptr() == other.watchdog_atomic.as_ptr()
&& self.watchdog_mask == other.watchdog_mask
self.watchdog.eq(&other.watchdog)
}
}
impl Eq for OwnedMetadataDescriptor {}
Expand Down
3 changes: 1 addition & 2 deletions commons/zenoh-shm/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ tested_crate_module!(storage);
tested_crate_module!(subscription);

pub(crate) mod allocated_descriptor;

mod segment;
pub(crate) mod segment;
12 changes: 8 additions & 4 deletions commons/zenoh-shm/src/metadata/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use std::sync::atomic::AtomicU64;

use zenoh_result::ZResult;

use super::descriptor::{MetadataIndex, MetadataSegmentID};
use super::descriptor::{MetadataIndex, MetadataSegmentID, OwnedWatchdog};
use crate::{header::chunk_header::ChunkHeaderType, posix_shm::struct_in_shm::StructInSHM};

#[derive(Debug)]
#[stabby::stabby]
pub struct Metadata<const S: usize> {
headers: [ChunkHeaderType; S],
Expand All @@ -35,13 +36,15 @@ impl<const S: usize> Metadata<S> {
pub unsafe fn fast_elem_compute(
&self,
index: MetadataIndex,
) -> (&'static ChunkHeaderType, &'static AtomicU64, u64) {
) -> (&'static ChunkHeaderType, OwnedWatchdog) {
let watchdog_index = index / 64;
let watchdog_mask_index = index % 64;
(
&*(self.headers.as_ptr().offset(index as isize)),
&*(self.watchdogs.as_ptr().offset(watchdog_index as isize)),
1u64 << watchdog_mask_index,
OwnedWatchdog::new(
&*(self.watchdogs.as_ptr().offset(watchdog_index as isize)),
1u64 << watchdog_mask_index,
),
)
}

Expand All @@ -51,6 +54,7 @@ impl<const S: usize> Metadata<S> {
}
}

#[derive(Debug)]
pub struct MetadataSegment<const S: usize = 32768> {
pub data: StructInSHM<MetadataSegmentID, Metadata<S>>,
}
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-shm/src/metadata/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ impl MetadataStorage {
let mut initially_available = BTreeSet::<OwnedMetadataDescriptor>::default();

for index in 0..initial_segment.data.count() {
let (header, watchdog, mask) = unsafe {
let (header, watchdog) = unsafe {
initial_segment
.data
.fast_elem_compute(index as MetadataIndex)
};
let descriptor =
OwnedMetadataDescriptor::new(initial_segment.clone(), header, watchdog, mask);
OwnedMetadataDescriptor::new(initial_segment.clone(), header, watchdog);

// init generation (this is not really necessary, but we do)
descriptor
Expand Down
Loading
Loading