Skip to content

Commit 8835564

Browse files
committed
Fix epic blunder
1 parent 42f414b commit 8835564

File tree

4 files changed

+73
-55
lines changed

4 files changed

+73
-55
lines changed

zenoh/src/net/routing/dispatcher/face.rs

+24-40
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ use std::{
1818
ops::Not,
1919
sync::{Arc, Weak},
2020
time::Duration,
21+
usize,
2122
};
2223

2324
use arc_swap::ArcSwap;
2425
use tokio_util::sync::CancellationToken;
26+
use zenoh_config::InterceptorFlow;
2527
use zenoh_protocol::{
2628
core::{ExprId, Reliability, WhatAmI, ZenohIdProto},
2729
network::{
@@ -153,60 +155,62 @@ impl FaceState {
153155
id
154156
}
155157

158+
pub(crate) fn load_interceptors(
159+
&self,
160+
flow: InterceptorFlow,
161+
) -> Option<arc_swap::Guard<Arc<InterceptorsChain>>> {
162+
match flow {
163+
InterceptorFlow::Egress => &self.eg_interceptors,
164+
InterceptorFlow::Ingress => &self.in_interceptors,
165+
}
166+
.as_ref()
167+
.map(|iceptors| iceptors.load())
168+
.and_then(|iceptors| iceptors.is_empty().not().then_some(iceptors))
169+
}
170+
156171
pub(crate) fn update_interceptors_caches(&self, res: &mut Arc<Resource>) {
157-
if let Some(interceptor) = self
158-
.in_interceptors
159-
.as_ref()
160-
.map(|itor| itor.load())
161-
.and_then(|is| is.is_empty().not().then_some(is))
162-
{
172+
if let Some(iceptor) = self.load_interceptors(InterceptorFlow::Ingress) {
163173
if let Ok(expr) = KeyExpr::try_from(res.expr().to_string()) {
164-
let cache = interceptor.compute_keyexpr_cache(&expr);
174+
let cache = iceptor.compute_keyexpr_cache(&expr);
165175
get_mut_unchecked(
166176
get_mut_unchecked(res)
167177
.session_ctxs
168178
.get_mut(&self.id)
169179
.unwrap(),
170180
)
171-
.in_interceptor_cache = InterceptorCache::new(cache, interceptor.version);
181+
.in_interceptor_cache = InterceptorCache::new(cache, iceptor.version);
172182
}
173183
}
174184

175-
if let Some(interceptor) = self
176-
.primitives
177-
.as_any()
178-
.downcast_ref::<Mux>()
179-
.map(|mux| mux.interceptor.load())
180-
.and_then(|is| is.is_empty().not().then_some(is))
181-
{
185+
if let Some(iceptor) = self.load_interceptors(InterceptorFlow::Egress) {
182186
if let Ok(expr) = KeyExpr::try_from(res.expr().to_string()) {
183-
let cache = interceptor.compute_keyexpr_cache(&expr);
187+
let cache = iceptor.compute_keyexpr_cache(&expr);
184188
get_mut_unchecked(
185189
get_mut_unchecked(res)
186190
.session_ctxs
187191
.get_mut(&self.id)
188192
.unwrap(),
189193
)
190-
.e_interceptor_cache = InterceptorCache::new(cache, interceptor.version);
194+
.e_interceptor_cache = InterceptorCache::new(cache, iceptor.version);
191195
}
192196
}
193197

194-
if let Some(interceptor) = self
198+
if let Some(iceptor) = self
195199
.primitives
196200
.as_any()
197201
.downcast_ref::<McastMux>()
198202
.map(|mux| mux.interceptor.load())
199203
.and_then(|is| is.is_empty().not().then_some(is))
200204
{
201205
if let Ok(expr) = KeyExpr::try_from(res.expr().to_string()) {
202-
let cache = interceptor.compute_keyexpr_cache(&expr);
206+
let cache = iceptor.compute_keyexpr_cache(&expr);
203207
get_mut_unchecked(
204208
get_mut_unchecked(res)
205209
.session_ctxs
206210
.get_mut(&self.id)
207211
.unwrap(),
208212
)
209-
.e_interceptor_cache = InterceptorCache::new(cache, interceptor.version);
213+
.e_interceptor_cache = InterceptorCache::new(cache, iceptor.version);
210214
}
211215
}
212216
}
@@ -301,26 +305,6 @@ impl Face {
301305
interest.rejection_token.cancel();
302306
}
303307
}
304-
305-
pub(crate) fn load_egress_interceptors(
306-
&self,
307-
) -> Option<arc_swap::Guard<Arc<InterceptorsChain>>> {
308-
self.state
309-
.eg_interceptors
310-
.as_ref()
311-
.map(|i| i.load())
312-
.and_then(|i| i.is_empty().not().then_some(i))
313-
}
314-
315-
pub(crate) fn load_ingress_interceptors(
316-
&self,
317-
) -> Option<arc_swap::Guard<Arc<InterceptorsChain>>> {
318-
self.state
319-
.in_interceptors
320-
.as_ref()
321-
.map(|i| i.load())
322-
.and_then(|i| i.is_empty().not().then_some(i))
323-
}
324308
}
325309

326310
impl Primitives for Face {

zenoh/src/net/routing/dispatcher/pubsub.rs

+21-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use std::collections::HashMap;
1717
use std::sync::Arc;
1818

19+
use zenoh_config::InterceptorFlow;
1920
use zenoh_core::zread;
2021
use zenoh_protocol::{
2122
core::{key_expr::keyexpr, Reliability, WireExpr},
@@ -299,15 +300,15 @@ pub fn route_data(
299300
msg.wire_expr.suffix.as_ref()
300301
);
301302

302-
if let Some(interceptor) = face.load_ingress_interceptors() {
303+
if let Some(interceptor) = face.state.load_interceptors(InterceptorFlow::Ingress) {
303304
let ctx = &mut RoutingContext::new(NetworkMessageMut {
304305
body: NetworkBodyMut::Push(msg),
305306
reliability,
306307
#[cfg(feature = "stats")]
307308
size: None,
308309
});
309310

310-
if !interceptor.intercept_with_face(ctx, face, &prefix) {
311+
if !interceptor.intercept_with_face(ctx, face, &prefix, InterceptorFlow::Ingress) {
311312
return;
312313
}
313314
};
@@ -356,15 +357,22 @@ pub fn route_data(
356357
msg.wire_expr = key_expr.into();
357358
msg.ext_nodeid = ext::NodeIdType { node_id: *context };
358359

359-
if let Some(interceptor) = face.load_egress_interceptors() {
360+
if let Some(interceptor) =
361+
face.state.load_interceptors(InterceptorFlow::Egress)
362+
{
360363
let ctx = &mut RoutingContext::new(NetworkMessageMut {
361364
body: NetworkBodyMut::Push(msg),
362365
reliability,
363366
#[cfg(feature = "stats")]
364367
size: None,
365368
});
366369

367-
if !interceptor.intercept_with_face(ctx, face, &prefix) {
370+
if !interceptor.intercept_with_face(
371+
ctx,
372+
face,
373+
&prefix,
374+
InterceptorFlow::Egress,
375+
) {
368376
return;
369377
}
370378
};
@@ -402,15 +410,22 @@ pub fn route_data(
402410
payload: msg.payload.clone(),
403411
};
404412

405-
if let Some(interceptor) = face.load_egress_interceptors() {
413+
if let Some(interceptor) =
414+
face.state.load_interceptors(InterceptorFlow::Egress)
415+
{
406416
let ctx = &mut RoutingContext::new(NetworkMessageMut {
407417
body: NetworkBodyMut::Push(msg),
408418
reliability,
409419
#[cfg(feature = "stats")]
410420
size: None,
411421
});
412422

413-
if !interceptor.intercept_with_face(ctx, face, &prefix) {
423+
if !interceptor.intercept_with_face(
424+
ctx,
425+
face,
426+
&prefix,
427+
InterceptorFlow::Egress,
428+
) {
414429
continue;
415430
}
416431
};

zenoh/src/net/routing/dispatcher/resource.rs

+18-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::{
2222
};
2323

2424
use zenoh_collections::SingleOrBoxHashSet;
25-
use zenoh_config::WhatAmI;
25+
use zenoh_config::{InterceptorFlow, WhatAmI};
2626
use zenoh_protocol::{
2727
core::{key_expr::keyexpr, ExprId, WireExpr},
2828
network::{
@@ -759,19 +759,30 @@ impl Resource {
759759
face: &Face,
760760
interceptor: &InterceptorsChain,
761761
) -> Option<InterceptorCacheValueType> {
762-
self.session_ctxs
763-
.get(&face.state.id)
764-
.and_then(|ctx| ctx.in_interceptor_cache.value(interceptor, self))
762+
self.interceptor_cache(face, interceptor, InterceptorFlow::Ingress)
765763
}
766764

767765
pub(crate) fn get_egress_cache(
768766
&self,
769767
face: &Face,
770768
interceptor: &InterceptorsChain,
771769
) -> Option<InterceptorCacheValueType> {
772-
self.session_ctxs
773-
.get(&face.state.id)
774-
.and_then(|ctx| ctx.e_interceptor_cache.value(interceptor, self))
770+
self.interceptor_cache(face, interceptor, InterceptorFlow::Egress)
771+
}
772+
773+
pub(crate) fn interceptor_cache(
774+
&self,
775+
face: &Face,
776+
interceptor: &InterceptorsChain,
777+
flow: InterceptorFlow,
778+
) -> Option<InterceptorCacheValueType> {
779+
self.session_ctxs.get(&face.state.id).and_then(|ctx| {
780+
match flow {
781+
InterceptorFlow::Egress => &ctx.e_interceptor_cache,
782+
InterceptorFlow::Ingress => &ctx.in_interceptor_cache,
783+
}
784+
.value(interceptor, self)
785+
})
775786
}
776787
}
777788

zenoh/src/net/routing/interceptor/mod.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use zenoh_link::LinkAuthId;
2525

2626
mod authorization;
2727
use std::any::Any;
28+
use std::ops::Not;
2829

2930
mod low_pass;
3031
use low_pass::low_pass_interceptor_factories;
@@ -159,10 +160,17 @@ impl InterceptorsChain {
159160
ctx: &mut RoutingContext<NetworkMessageMut>,
160161
face: &Face,
161162
prefix: &Resource,
163+
flow: InterceptorFlow,
162164
) -> bool {
163-
let cache_guard = prefix.get_ingress_cache(face, self);
165+
// NOTE: the cache should be empty if the wire expr has no suffix, i.e. when the prefix
166+
// doesn't represent a full keyexpr.
167+
let prefix = ctx
168+
.wire_expr()
169+
.and_then(|we| we.has_suffix().not().then(|| prefix));
170+
let cache_guard = prefix
171+
.as_ref()
172+
.and_then(|p| p.interceptor_cache(face, self, flow));
164173
let cache = cache_guard.as_ref().and_then(|c| c.get_ref().as_ref());
165-
166174
self.intercept(ctx, cache)
167175
}
168176
}

0 commit comments

Comments
 (0)