Skip to content

Commit 68ef1c7

Browse files
committed
Fix epic blunder
1 parent 494144f commit 68ef1c7

File tree

4 files changed

+73
-55
lines changed

4 files changed

+73
-55
lines changed

zenoh/src/net/routing/dispatcher/face.rs

+24-40
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ use std::{
1818
ops::Not,
1919
sync::{Arc, Weak},
2020
time::Duration,
21+
usize,
2122
};
2223

2324
use arc_swap::ArcSwap;
2425
use tokio_util::sync::CancellationToken;
26+
use zenoh_config::InterceptorFlow;
2527
use zenoh_protocol::{
2628
core::{ExprId, Reliability, WhatAmI, ZenohIdProto},
2729
network::{
@@ -150,60 +152,62 @@ impl FaceState {
150152
id
151153
}
152154

155+
pub(crate) fn load_interceptors(
156+
&self,
157+
flow: InterceptorFlow,
158+
) -> Option<arc_swap::Guard<Arc<InterceptorsChain>>> {
159+
match flow {
160+
InterceptorFlow::Egress => &self.eg_interceptors,
161+
InterceptorFlow::Ingress => &self.in_interceptors,
162+
}
163+
.as_ref()
164+
.map(|iceptors| iceptors.load())
165+
.and_then(|iceptors| iceptors.is_empty().not().then_some(iceptors))
166+
}
167+
153168
pub(crate) fn update_interceptors_caches(&self, res: &mut Arc<Resource>) {
154-
if let Some(interceptor) = self
155-
.in_interceptors
156-
.as_ref()
157-
.map(|itor| itor.load())
158-
.and_then(|is| is.is_empty().not().then_some(is))
159-
{
169+
if let Some(iceptor) = self.load_interceptors(InterceptorFlow::Ingress) {
160170
if let Some(expr) = res.keyexpr() {
161-
let cache = interceptor.compute_keyexpr_cache(expr);
171+
let cache = iceptor.compute_keyexpr_cache(&expr);
162172
get_mut_unchecked(
163173
get_mut_unchecked(res)
164174
.session_ctxs
165175
.get_mut(&self.id)
166176
.unwrap(),
167177
)
168-
.in_interceptor_cache = InterceptorCache::new(cache, interceptor.version);
178+
.in_interceptor_cache = InterceptorCache::new(cache, iceptor.version);
169179
}
170180
}
171181

172-
if let Some(interceptor) = self
173-
.primitives
174-
.as_any()
175-
.downcast_ref::<Mux>()
176-
.map(|mux| mux.interceptor.load())
177-
.and_then(|is| is.is_empty().not().then_some(is))
178-
{
182+
if let Some(iceptor) = self.load_interceptors(InterceptorFlow::Egress) {
179183
if let Some(expr) = res.keyexpr() {
180-
let cache = interceptor.compute_keyexpr_cache(expr);
184+
let cache = iceptor.compute_keyexpr_cache(&expr);
181185
get_mut_unchecked(
182186
get_mut_unchecked(res)
183187
.session_ctxs
184188
.get_mut(&self.id)
185189
.unwrap(),
186190
)
187-
.e_interceptor_cache = InterceptorCache::new(cache, interceptor.version);
191+
.e_interceptor_cache = InterceptorCache::new(cache, iceptor.version);
188192
}
189193
}
190194

191-
if let Some(interceptor) = self
195+
if let Some(iceptor) = self
192196
.primitives
193197
.as_any()
194198
.downcast_ref::<McastMux>()
195199
.map(|mux| mux.interceptor.load())
196200
.and_then(|is| is.is_empty().not().then_some(is))
197201
{
198202
if let Some(expr) = res.keyexpr() {
199-
let cache = interceptor.compute_keyexpr_cache(expr);
203+
let cache = iceptor.compute_keyexpr_cache(&expr);
200204
get_mut_unchecked(
201205
get_mut_unchecked(res)
202206
.session_ctxs
203207
.get_mut(&self.id)
204208
.unwrap(),
205209
)
206-
.e_interceptor_cache = InterceptorCache::new(cache, interceptor.version);
210+
.e_interceptor_cache = InterceptorCache::new(cache, iceptor.version);
207211
}
208212
}
209213
}
@@ -298,26 +302,6 @@ impl Face {
298302
interest.rejection_token.cancel();
299303
}
300304
}
301-
302-
pub(crate) fn load_egress_interceptors(
303-
&self,
304-
) -> Option<arc_swap::Guard<Arc<InterceptorsChain>>> {
305-
self.state
306-
.eg_interceptors
307-
.as_ref()
308-
.map(|i| i.load())
309-
.and_then(|i| i.is_empty().not().then_some(i))
310-
}
311-
312-
pub(crate) fn load_ingress_interceptors(
313-
&self,
314-
) -> Option<arc_swap::Guard<Arc<InterceptorsChain>>> {
315-
self.state
316-
.in_interceptors
317-
.as_ref()
318-
.map(|i| i.load())
319-
.and_then(|i| i.is_empty().not().then_some(i))
320-
}
321305
}
322306

323307
impl Primitives for Face {

zenoh/src/net/routing/dispatcher/pubsub.rs

+21-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use std::collections::HashMap;
1717
use std::sync::Arc;
1818

19+
use zenoh_config::InterceptorFlow;
1920
use zenoh_core::zread;
2021
use zenoh_protocol::{
2122
core::{key_expr::keyexpr, Reliability, WireExpr},
@@ -299,15 +300,15 @@ pub fn route_data(
299300
msg.wire_expr.suffix.as_ref()
300301
);
301302

302-
if let Some(interceptor) = face.load_ingress_interceptors() {
303+
if let Some(interceptor) = face.state.load_interceptors(InterceptorFlow::Ingress) {
303304
let ctx = &mut RoutingContext::new(NetworkMessageMut {
304305
body: NetworkBodyMut::Push(msg),
305306
reliability,
306307
#[cfg(feature = "stats")]
307308
size: None,
308309
});
309310

310-
if !interceptor.intercept_with_face(ctx, face, &prefix) {
311+
if !interceptor.intercept_with_face(ctx, face, &prefix, InterceptorFlow::Ingress) {
311312
return;
312313
}
313314
};
@@ -356,15 +357,22 @@ pub fn route_data(
356357
msg.wire_expr = key_expr.into();
357358
msg.ext_nodeid = ext::NodeIdType { node_id: *context };
358359

359-
if let Some(interceptor) = face.load_egress_interceptors() {
360+
if let Some(interceptor) =
361+
face.state.load_interceptors(InterceptorFlow::Egress)
362+
{
360363
let ctx = &mut RoutingContext::new(NetworkMessageMut {
361364
body: NetworkBodyMut::Push(msg),
362365
reliability,
363366
#[cfg(feature = "stats")]
364367
size: None,
365368
});
366369

367-
if !interceptor.intercept_with_face(ctx, face, &prefix) {
370+
if !interceptor.intercept_with_face(
371+
ctx,
372+
face,
373+
&prefix,
374+
InterceptorFlow::Egress,
375+
) {
368376
return;
369377
}
370378
};
@@ -402,15 +410,22 @@ pub fn route_data(
402410
payload: msg.payload.clone(),
403411
};
404412

405-
if let Some(interceptor) = face.load_egress_interceptors() {
413+
if let Some(interceptor) =
414+
face.state.load_interceptors(InterceptorFlow::Egress)
415+
{
406416
let ctx = &mut RoutingContext::new(NetworkMessageMut {
407417
body: NetworkBodyMut::Push(msg),
408418
reliability,
409419
#[cfg(feature = "stats")]
410420
size: None,
411421
});
412422

413-
if !interceptor.intercept_with_face(ctx, face, &prefix) {
423+
if !interceptor.intercept_with_face(
424+
ctx,
425+
face,
426+
&prefix,
427+
InterceptorFlow::Egress,
428+
) {
414429
continue;
415430
}
416431
};

zenoh/src/net/routing/dispatcher/resource.rs

+18-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::{
2222
};
2323

2424
use zenoh_collections::SingleOrBoxHashSet;
25-
use zenoh_config::WhatAmI;
25+
use zenoh_config::{InterceptorFlow, WhatAmI};
2626
use zenoh_protocol::{
2727
core::{key_expr::keyexpr, ExprId, WireExpr},
2828
network::{
@@ -765,19 +765,30 @@ impl Resource {
765765
face: &Face,
766766
interceptor: &InterceptorsChain,
767767
) -> Option<InterceptorCacheValueType> {
768-
self.session_ctxs
769-
.get(&face.state.id)
770-
.and_then(|ctx| ctx.in_interceptor_cache.value(interceptor, self))
768+
self.interceptor_cache(face, interceptor, InterceptorFlow::Ingress)
771769
}
772770

773771
pub(crate) fn get_egress_cache(
774772
&self,
775773
face: &Face,
776774
interceptor: &InterceptorsChain,
777775
) -> Option<InterceptorCacheValueType> {
778-
self.session_ctxs
779-
.get(&face.state.id)
780-
.and_then(|ctx| ctx.e_interceptor_cache.value(interceptor, self))
776+
self.interceptor_cache(face, interceptor, InterceptorFlow::Egress)
777+
}
778+
779+
pub(crate) fn interceptor_cache(
780+
&self,
781+
face: &Face,
782+
interceptor: &InterceptorsChain,
783+
flow: InterceptorFlow,
784+
) -> Option<InterceptorCacheValueType> {
785+
self.session_ctxs.get(&face.state.id).and_then(|ctx| {
786+
match flow {
787+
InterceptorFlow::Egress => &ctx.e_interceptor_cache,
788+
InterceptorFlow::Ingress => &ctx.in_interceptor_cache,
789+
}
790+
.value(interceptor, self)
791+
})
781792
}
782793
}
783794

zenoh/src/net/routing/interceptor/mod.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use zenoh_link::LinkAuthId;
2525

2626
mod authorization;
2727
use std::any::Any;
28+
use std::ops::Not;
2829

2930
mod low_pass;
3031
use low_pass::low_pass_interceptor_factories;
@@ -159,10 +160,17 @@ impl InterceptorsChain {
159160
ctx: &mut RoutingContext<NetworkMessageMut>,
160161
face: &Face,
161162
prefix: &Resource,
163+
flow: InterceptorFlow,
162164
) -> bool {
163-
let cache_guard = prefix.get_ingress_cache(face, self);
165+
// NOTE: the cache should be empty if the wire expr has no suffix, i.e. when the prefix
166+
// doesn't represent a full keyexpr.
167+
let prefix = ctx
168+
.wire_expr()
169+
.and_then(|we| we.has_suffix().not().then(|| prefix));
170+
let cache_guard = prefix
171+
.as_ref()
172+
.and_then(|p| p.interceptor_cache(face, self, flow));
164173
let cache = cache_guard.as_ref().and_then(|c| c.get_ref().as_ref());
165-
166174
self.intercept(ctx, cache)
167175
}
168176
}

0 commit comments

Comments
 (0)