Skip to content

Commit baced61

Browse files
committed
cache pipeline selection
1 parent 3f62ebc commit baced61

File tree

4 files changed

+93
-51
lines changed

4 files changed

+93
-51
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

io/zenoh-transport/Cargo.toml

+9-8
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ description = "Internal crate for zenoh."
2626

2727
[features]
2828
shared-memory = [
29-
"zenoh-protocol/shared-memory",
30-
"zenoh-shm",
31-
"zenoh-codec/shared-memory",
32-
"zenoh-buffers/shared-memory",
29+
"zenoh-protocol/shared-memory",
30+
"zenoh-shm",
31+
"zenoh-codec/shared-memory",
32+
"zenoh-buffers/shared-memory",
3333
]
3434
auth_pubkey = ["transport_auth", "rsa"]
3535
auth_usrpwd = ["transport_auth"]
@@ -44,13 +44,14 @@ transport_ws = ["zenoh-link/transport_ws"]
4444
transport_serial = ["zenoh-link/transport_serial"]
4545
transport_compression = []
4646
transport_unixpipe = ["zenoh-link/transport_unixpipe"]
47-
transport_vsock= ["zenoh-link/transport_vsock"]
47+
transport_vsock = ["zenoh-link/transport_vsock"]
4848
stats = ["zenoh-protocol/stats"]
4949
test = []
5050
unstable = []
5151
default = ["test", "transport_multilink"]
5252

5353
[dependencies]
54+
arc-swap = { workspace = true }
5455
async-trait = { workspace = true }
5556
crossbeam-utils = { workspace = true }
5657
tokio = { workspace = true, features = [
@@ -63,9 +64,9 @@ tokio = { workspace = true, features = [
6364
"net",
6465
] }
6566
lazy_static = { workspace = true }
66-
tokio-util = { workspace = true, features = ["rt"]}
67+
tokio-util = { workspace = true, features = ["rt"] }
6768
flume = { workspace = true }
68-
tracing = {workspace = true}
69+
tracing = { workspace = true }
6970
lz4_flex = { workspace = true }
7071
paste = { workspace = true }
7172
rand = { workspace = true, features = ["default"] }
@@ -91,7 +92,7 @@ zenoh-task = { workspace = true }
9192

9293
[dev-dependencies]
9394
futures-util = { workspace = true }
94-
zenoh-util = {workspace = true }
95+
zenoh-util = { workspace = true }
9596
zenoh-protocol = { workspace = true, features = ["test"] }
9697
futures = { workspace = true }
9798
zenoh-link-commons = { workspace = true }

io/zenoh-transport/src/unicast/universal/transport.rs

+64-13
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@
1313
//
1414
use std::{
1515
fmt::DebugStruct,
16-
sync::{Arc, RwLock},
16+
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
1717
time::Duration,
1818
};
1919

20+
use arc_swap::ArcSwap;
2021
use async_trait::async_trait;
2122
use tokio::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
2223
use zenoh_core::{zasynclock, zcondfeat, zread, zwrite};
2324
use zenoh_link::Link;
2425
use zenoh_protocol::{
25-
core::{Priority, WhatAmI, ZenohIdProto},
26+
core::{Locator, Priority, Reliability, WhatAmI, ZenohIdProto},
2627
network::NetworkMessage,
2728
transport::{close, Close, PrioritySn, TransportMessage, TransportSn},
2829
};
@@ -31,7 +32,10 @@ use zenoh_result::{bail, zerror, ZResult};
3132
#[cfg(feature = "stats")]
3233
use crate::stats::TransportStats;
3334
use crate::{
34-
common::priority::{TransportPriorityRx, TransportPriorityTx},
35+
common::{
36+
pipeline::TransmissionPipelineProducer,
37+
priority::{TransportPriorityRx, TransportPriorityTx},
38+
},
3539
unicast::{
3640
authentication::TransportAuthId,
3741
link::{LinkUnicastWithOpenAck, TransportLinkUnicastDirection},
@@ -41,10 +45,54 @@ use crate::{
4145
},
4246
TransportManager, TransportPeerEventHandler,
4347
};
44-
4548
/*************************************/
4649
/* UNIVERSAL TRANSPORT */
4750
/*************************************/
51+
/// Transport link storage with link selection caching functionality
52+
#[derive(Default)]
53+
pub(crate) struct TransportLinks {
54+
links: RwLock<Box<[TransportLinkUnicastUniversal]>>,
55+
cache: ArcSwap<[[Option<Arc<ScheduledLink>>; Priority::NUM]; 2]>,
56+
}
57+
58+
impl TransportLinks {
59+
fn write(&self) -> RwLockWriteGuard<'_, Box<[TransportLinkUnicastUniversal]>> {
60+
// on write access to the links we clear pipeline cache
61+
self.cache.store(Default::default());
62+
zwrite!(self.links)
63+
}
64+
65+
fn read(&self) -> RwLockReadGuard<'_, Box<[TransportLinkUnicastUniversal]>> {
66+
zread!(self.links)
67+
}
68+
69+
pub(super) fn get_link(
70+
&self,
71+
reliability: Reliability,
72+
priority: Priority,
73+
select: impl FnOnce(&[TransportLinkUnicastUniversal]) -> Option<usize>,
74+
) -> Option<Arc<ScheduledLink>> {
75+
if let Some(link) = self.cache.load()[reliability as usize][priority as usize].clone() {
76+
return Some(link);
77+
}
78+
let guard = self.read();
79+
let transport_link = &guard[select(&guard)?];
80+
let link = Arc::new(ScheduledLink {
81+
pipeline: transport_link.pipeline.clone(),
82+
dst: transport_link.link.link.get_dst().clone(),
83+
});
84+
let mut cache = self.cache.load().as_ref().clone();
85+
cache[reliability as usize][priority as usize] = Some(link.clone());
86+
self.cache.store(Arc::new(cache));
87+
Some(link)
88+
}
89+
}
90+
91+
pub(super) struct ScheduledLink {
92+
pub(super) pipeline: TransmissionPipelineProducer,
93+
pub(super) dst: Locator,
94+
}
95+
4896
#[derive(Clone)]
4997
pub(crate) struct TransportUnicastUniversal {
5098
// Transport Manager
@@ -56,7 +104,7 @@ pub(crate) struct TransportUnicastUniversal {
56104
// Rx priorities
57105
pub(super) priority_rx: Arc<[TransportPriorityRx]>,
58106
// The links associated to the channel
59-
pub(super) links: Arc<RwLock<Box<[TransportLinkUnicastUniversal]>>>,
107+
pub(super) links: Arc<TransportLinks>,
60108
// The callback
61109
pub(super) callback: Arc<RwLock<Option<Arc<dyn TransportPeerEventHandler>>>>,
62110
// Lock used to ensure no race in add_link method
@@ -104,7 +152,7 @@ impl TransportUnicastUniversal {
104152
config,
105153
priority_tx: priority_tx.into_boxed_slice().into(),
106154
priority_rx: priority_rx.into_boxed_slice().into(),
107-
links: Arc::new(RwLock::new(vec![].into_boxed_slice())),
155+
links: Default::default(),
108156
add_link_lock: Arc::new(AsyncMutex::new(())),
109157
callback: Arc::new(RwLock::new(None)),
110158
alive: Arc::new(AsyncMutex::new(false)),
@@ -136,7 +184,7 @@ impl TransportUnicastUniversal {
136184

137185
// Close all the links
138186
let mut links = {
139-
let mut l_guard = zwrite!(self.links);
187+
let mut l_guard = self.links.write();
140188
let links = l_guard.to_vec();
141189
*l_guard = vec![].into_boxed_slice();
142190
links
@@ -161,7 +209,7 @@ impl TransportUnicastUniversal {
161209

162210
// Try to remove the link
163211
let target = {
164-
let mut guard = zwrite!(self.links);
212+
let mut guard = self.links.write();
165213

166214
if let Some(index) = guard.iter().position(|tl| {
167215
// Compare LinkUnicast link to not compare TransportLinkUnicast direction
@@ -245,7 +293,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
245293

246294
// Check if we can add more inbound links
247295
{
248-
let guard = zread!(self.links);
296+
let guard = self.links.read();
249297
if let TransportLinkUnicastDirection::Inbound = link.inner_config().direction {
250298
let count = guard
251299
.iter()
@@ -283,7 +331,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
283331
TransportLinkUnicastUniversal::new(self, link, &self.priority_tx);
284332

285333
// Add the link to the channel
286-
let mut guard = zwrite!(self.links);
334+
let mut guard = self.links.write();
287335
let mut links = Vec::with_capacity(guard.len() + 1);
288336
links.extend_from_slice(&guard);
289337
links.push(link.clone());
@@ -357,7 +405,9 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
357405
async fn close(&self, reason: u8) -> ZResult<()> {
358406
tracing::trace!("Closing transport with peer: {}", self.config.zid);
359407

360-
let mut pipelines = zread!(self.links)
408+
let mut pipelines = self
409+
.links
410+
.read()
361411
.iter()
362412
.map(|sl| sl.pipeline.clone())
363413
.collect::<Vec<_>>();
@@ -379,13 +429,14 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
379429
}
380430

381431
fn get_links(&self) -> Vec<Link> {
382-
zread!(self.links).iter().map(|l| l.link.link()).collect()
432+
self.links.read().iter().map(|l| l.link.link()).collect()
383433
}
384434

385435
fn get_auth_ids(&self) -> TransportAuthId {
386436
let mut transport_auth_id = TransportAuthId::default();
387437
// Convert LinkUnicast auth ids to AuthId
388-
zread!(self.links)
438+
self.links
439+
.read()
389440
.iter()
390441
.for_each(|l| transport_auth_id.push_link_auth_id(l.link.link.get_auth_id().clone()));
391442

io/zenoh-transport/src/unicast/universal/tx.rs

+19-30
Original file line numberDiff line numberDiff line change
@@ -70,50 +70,39 @@ impl TransportUnicastUniversal {
7070
}
7171

7272
fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult<bool> {
73-
let transport_links = self
73+
let Some(link) = self
7474
.links
75-
.read()
76-
.expect("reading `TransportUnicastUniversal::links` should not fail");
77-
78-
let Some(transport_link_index) = Self::select(
79-
transport_links.iter().map(|tl| {
80-
(
81-
tl.link
82-
.config
83-
.reliability
84-
.unwrap_or(Reliability::from(tl.link.link.is_reliable())),
85-
tl.link.config.priorities.clone(),
75+
.get_link(msg.is_reliable().into(), msg.priority(), |links| {
76+
Self::select(
77+
links.iter().map(|tl| {
78+
(
79+
tl.link
80+
.config
81+
.reliability
82+
.unwrap_or(Reliability::from(tl.link.link.is_reliable())),
83+
tl.link.config.priorities.clone(),
84+
)
85+
}),
86+
Reliability::from(msg.is_reliable()),
87+
msg.priority(),
8688
)
87-
}),
88-
Reliability::from(msg.is_reliable()),
89-
msg.priority(),
90-
) else {
89+
})
90+
else {
91+
// No Link found
9192
tracing::trace!(
9293
"Message dropped because the transport has no links: {}",
9394
msg
9495
);
95-
96-
// No Link found
9796
return Ok(false);
9897
};
99-
100-
let transport_link = transport_links
101-
.get(transport_link_index)
102-
.expect("transport link index should be valid");
103-
104-
let pipeline = transport_link.pipeline.clone();
10598
tracing::trace!(
10699
"Scheduled {:?} for transmission to {} ({})",
107100
msg,
108-
transport_link.link.link.get_dst(),
101+
link.dst,
109102
self.get_zid()
110103
);
111-
// Drop the guard before the push_zenoh_message since
112-
// the link could be congested and this operation could
113-
// block for fairly long time
114-
drop(transport_links);
115104
let droppable = msg.is_droppable();
116-
let push = pipeline.push_network_message(msg)?;
105+
let push = link.pipeline.push_network_message(msg)?;
117106
if !push && !droppable {
118107
tracing::error!(
119108
"Unable to push non droppable network message to {}. Closing transport!",

0 commit comments

Comments
 (0)