diff --git a/Cargo.lock b/Cargo.lock index 3bfaa8864c..829ecff953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1528,6 +1528,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -1616,6 +1622,12 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "glob" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" + [[package]] name = "gloo-timers" version = "0.3.0" @@ -3173,6 +3185,12 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "ring" version = "0.17.13" @@ -3235,6 +3253,36 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rstest" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fc39292f8613e913f7df8fa892b8944ceb47c247b78e1b1ae2f09e019be789d" +dependencies = [ + "futures-timer", + "futures-util", + "rstest_macros", + "rustc_version 0.4.1", +] + +[[package]] +name = "rstest_macros" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f168d99749d307be9de54d23fd226628d99768225ef08f6ffb52e0182a27746" +dependencies = [ + "cfg-if 1.0.0", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version 0.4.1", + "syn 2.0.87", + "unicode-ident", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -5490,6 +5538,8 @@ name = "zenoh-link-udp" version = "1.3.4" dependencies = [ "async-trait", + "flume", + "futures", "socket2 0.5.7", "tokio", "tokio-util", @@ -5499,6 +5549,7 @@ dependencies = [ "zenoh-link-commons", "zenoh-protocol", "zenoh-result", + "zenoh-runtime", "zenoh-sync", "zenoh-util", ] @@ -5770,6 +5821,7 @@ dependencies = [ "rand 0.8.5", "ringbuffer-spsc", "rsa", + "rstest", "serde", "sha3", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 197bbd2b52..b084b6dc89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,6 +143,7 @@ regex = "1.10.6" ron = "0.8.1" ringbuffer-spsc = "0.1.9" rsa = "0.9" +rstest = "0.25" rustc_version = "0.4.1" rustls = { version = "0.23.13", default-features = false, features = [ "logging", diff --git a/io/zenoh-links/zenoh-link-udp/Cargo.toml b/io/zenoh-links/zenoh-link-udp/Cargo.toml index 0e4b35f561..39ee10cf76 100644 --- a/io/zenoh-links/zenoh-link-udp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-udp/Cargo.toml @@ -25,9 +25,11 @@ description = "Internal crate for zenoh." # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +flume = { workspace = true } tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] } tokio-util = { workspace = true, features = ["rt"] } async-trait = { workspace = true } +futures = { workspace = true } tracing = {workspace = true} socket2 = { workspace = true } zenoh-buffers = { workspace = true } @@ -37,3 +39,4 @@ zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-sync = { workspace = true } zenoh-util = { workspace = true } +zenoh-runtime = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-udp/src/lib.rs b/io/zenoh-links/zenoh-link-udp/src/lib.rs index 935c802dcf..342bc1f890 100644 --- a/io/zenoh-links/zenoh-link-udp/src/lib.rs +++ b/io/zenoh-links/zenoh-link-udp/src/lib.rs @@ -17,6 +17,7 @@ //! This crate is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh) +mod listener; mod multicast; mod unicast; diff --git a/io/zenoh-links/zenoh-link-udp/src/listener.rs b/io/zenoh-links/zenoh-link-udp/src/listener.rs new file mode 100644 index 0000000000..d8092720f0 --- /dev/null +++ b/io/zenoh-links/zenoh-link-udp/src/listener.rs @@ -0,0 +1,141 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, + sync::{Arc, RwLock}, +}; + +use flume::Sender; +use futures::Future; +use tokio_util::sync::CancellationToken; +use zenoh_core::{zread, zwrite}; +use zenoh_link_commons::BIND_INTERFACE; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::{zerror, ZResult}; + +pub struct ListenerUnicastUDP { + endpoint: EndPoint, + sender: Sender<()>, +} + +impl ListenerUnicastUDP { + fn new(endpoint: EndPoint, sender: Sender<()>) -> ListenerUnicastUDP { + ListenerUnicastUDP { endpoint, sender } + } + + async fn stop(&self) { + self.sender.send_async(()).await.ok(); + } +} + +pub struct ListenersUnicastUDP { + listeners: Arc>>, + pub listeners_cancellation_token: CancellationToken, +} + +impl ListenersUnicastUDP { + pub fn new() -> ListenersUnicastUDP { + ListenersUnicastUDP { + listeners: Arc::new(RwLock::new(HashMap::new())), + listeners_cancellation_token: CancellationToken::new(), + } + } + + pub async fn add_listener( + &self, + endpoint: EndPoint, + addr: SocketAddr, + future: F, + sender: flume::Sender<()>, + ) -> ZResult<()> + where + F: Future> + Send + 'static, + { + let mut listeners = zwrite!(self.listeners); + let c_listeners = self.listeners.clone(); + let c_addr = addr; + let task = async move { + // Wait for the accept loop to terminate + let res = future.await; + zwrite!(c_listeners).remove(&c_addr); + res + }; + drop(zenoh_runtime::ZRuntime::Acceptor.spawn(task)); + + let listener = ListenerUnicastUDP::new(endpoint, sender); + // Update the list of active listeners on the manager + listeners.insert(addr, listener); + Ok(()) + } + + pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { + // Stop the listener + let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { + zerror!( + "Can not delete the listener because it has not been found: {}", + addr + ) + })?; + + // Send the stop signal + listener.stop().await; + Ok(()) + } + + pub fn get_endpoints(&self) -> Vec { + zread!(self.listeners) + .values() + .map(|l| l.endpoint.clone()) + .collect() + } + + pub fn get_locators(&self) -> Vec { + let mut locators = vec![]; + + let guard = zread!(self.listeners); + for (key, value) in guard.iter() { + let (kip, kpt) = (key.ip(), key.port()); + let config = value.endpoint.config(); + let iface = config.get(BIND_INTERFACE); + + // Either ipv4/0.0.0.0 or ipv6/[::] + if kip.is_unspecified() { + let mut addrs = match kip { + IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(iface), + IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(iface), + }; + let iter = addrs.drain(..).map(|x| { + Locator::new( + value.endpoint.protocol(), + SocketAddr::new(x, kpt).to_string(), + value.endpoint.metadata(), + ) + .unwrap() + }); + locators.extend(iter); + } else { + locators.push(value.endpoint.to_locator()); + } + } + + locators + } +} + +impl Default for ListenersUnicastUDP { + fn default() -> Self { + Self::new() + } +} diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 827d6a241f..944f3185fb 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -25,8 +25,7 @@ use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, - LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, - BIND_SOCKET, + LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, BIND_INTERFACE, BIND_SOCKET, }; use zenoh_protocol::{ core::{Address, EndPoint, Locator}, @@ -39,6 +38,7 @@ use super::{ get_udp_addrs, socket_addr_to_udp_locator, UDP_ACCEPT_THROTTLE_TIME, UDP_DEFAULT_MTU, UDP_MAX_MTU, }; +use crate::listener::ListenersUnicastUDP; type LinkHashMap = Arc>>>; type LinkInput = (Vec, usize); @@ -73,6 +73,7 @@ struct LinkUnicastUdpUnconnected { links: LinkHashMap, input: Mvar, leftover: AsyncMutex>, + close_link_send: flume::Sender<()>, } impl LinkUnicastUdpUnconnected { @@ -116,7 +117,11 @@ impl LinkUnicastUdpUnconnected { async fn close(&self, src_addr: SocketAddr, dst_addr: SocketAddr) -> ZResult<()> { // Delete the link from the list of links - zlock!(self.links).remove(&(src_addr, dst_addr)); + { + let mut links = zlock!(self.links); + links.remove(&(src_addr, dst_addr)); + } + self.close_link_send.send_async(()).await.unwrap(); Ok(()) } } @@ -250,14 +255,14 @@ impl fmt::Debug for LinkUnicastUdp { pub struct LinkManagerUnicastUdp { manager: NewLinkChannelSender, - listeners: ListenersUnicastIP, + listeners: ListenersUnicastUDP, } impl LinkManagerUnicastUdp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: ListenersUnicastIP::new(), + listeners: ListenersUnicastUDP::new(), } } } @@ -430,18 +435,20 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { endpoint.config(), )?; - let token = self.listeners.token.child_token(); + let listener_cancellation_token = + self.listeners.listeners_cancellation_token.child_token(); + let (sender, receiver) = flume::bounded(1); let task = { - let token = token.clone(); + let token = listener_cancellation_token.clone(); let manager = self.manager.clone(); - async move { accept_read_task(socket, token, manager).await } + async move { accept_read_task(socket, token, receiver, manager).await } }; let locator = endpoint.to_locator(); self.listeners - .add_listener(endpoint, local_addr, task, token) + .add_listener(endpoint, local_addr, task, sender) .await?; return Ok(locator); @@ -504,7 +511,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { async fn accept_read_task( socket: UdpSocket, - token: CancellationToken, + listener_cancellation_token: CancellationToken, + mode_recv: flume::Receiver<()>, manager: NewLinkChannelSender, ) -> ZResult<()> { let socket = Arc::new(socket); @@ -528,7 +536,7 @@ async fn accept_read_task( }; } - async fn receive(socket: Arc, buffer: &mut [u8]) -> ZResult<(usize, SocketAddr)> { + async fn receive(socket: &Arc, buffer: &mut [u8]) -> ZResult<(usize, SocketAddr)> { let res = socket.recv_from(buffer).await.map_err(|e| zerror!(e))?; Ok(res) } @@ -545,14 +553,25 @@ async fn accept_read_task( tracing::warn!("Interceptors (e.g. Access Control, Downsampling) are not guaranteed to work on UDP when listening on 0.0.0.0 or [::]. Their usage is discouraged. See https://github.com/eclipse-zenoh/zenoh/issues/1126."); } + let mut listening_mode = true; + let (close_link_send, close_link_recv) = flume::bounded(32); loop { // Buffers for deserialization let mut buff = zenoh_buffers::vec::uninit(UDP_MAX_MTU as usize); - tokio::select! { - _ = token.cancelled() => break, - - res = receive(socket.clone(), &mut buff) => { + _ = listener_cancellation_token.cancelled() => break, + _ = close_link_recv.recv_async() => { + if !listening_mode && zlock!(links).is_empty() { + break; + } + } + Ok(_) = mode_recv.recv_async() => { + listening_mode = false; + if zlock!(links).is_empty() { + break; + } + } + res = receive(&socket, &mut buff) => { match res { Ok((n, dst_addr)) => { let link = loop { @@ -560,24 +579,30 @@ async fn accept_read_task( match res { Some(link) => break link.upgrade(), None => { - // A new peers has sent data to this socket - tracing::debug!("Accepted UDP connection on {}: {}", src_addr, dst_addr); - let unconnected = Arc::new(LinkUnicastUdpUnconnected { - socket: Arc::downgrade(&socket), - links: links.clone(), - input: Mvar::new(), - leftover: AsyncMutex::new(None), - }); - zaddlink!(src_addr, dst_addr, Arc::downgrade(&unconnected)); - // Create the new link object - let link = Arc::new(LinkUnicastUdp::new( - src_addr, - dst_addr, - LinkUnicastUdpVariant::Unconnected(unconnected), - )); - // Add the new link to the set of connected peers - if let Err(e) = manager.send_async(LinkUnicast(link)).await { - tracing::error!("{}-{}: {}", file!(), line!(), e) + if listening_mode { + // A new peers has sent data to this socket + tracing::debug!("Accepted UDP connection on {}: {}", src_addr, dst_addr); + let unconnected = Arc::new(LinkUnicastUdpUnconnected { + socket: Arc::downgrade(&socket), + links: links.clone(), + input: Mvar::new(), + leftover: AsyncMutex::new(None), + close_link_send: close_link_send.clone(), + }); + zaddlink!(src_addr, dst_addr, Arc::downgrade(&unconnected)); + // Create the new link object + let link = Arc::new(LinkUnicastUdp::new( + src_addr, + dst_addr, + LinkUnicastUdpVariant::Unconnected(unconnected), + )); + // Add the new link to the set of connected peers + if let Err(e) = manager.send_async(LinkUnicast(link)).await { + tracing::error!("{}-{}: {}", file!(), line!(), e) + } + } else { + tracing::debug!("Received connection attempt on a non listening UDP socket."); + break None } } } diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 5089ecfe2a..73b44c78cc 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -94,4 +94,5 @@ futures-util = { workspace = true } zenoh-util = { workspace = true } zenoh-protocol = { workspace = true, features = ["test"] } futures = { workspace = true } +rstest = { workspace = true } zenoh-link-commons = { workspace = true } diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index e2557966d8..b701b15943 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -123,6 +123,10 @@ impl TransportUnicastLowlatency { // Delete the transport on the manager let _ = self.manager.del_transport_unicast(&self.config.zid).await; + if let Some(val) = zasyncwrite!(self.link).as_ref() { + let _ = val.close(Some(close::reason::GENERIC)).await; + } + // Close and drop the link self.token.cancel(); self.tracker.close(); @@ -130,10 +134,6 @@ impl TransportUnicastLowlatency { // self.stop_keepalive().await; // self.stop_rx().await; - if let Some(val) = zasyncwrite!(self.link).as_ref() { - let _ = val.close(Some(close::reason::GENERIC)).await; - } - // Notify the callback that we have closed the transport if let Some(cb) = callback.as_ref() { cb.closed(); diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 74f89cd3f1..3c98c46a96 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -22,6 +22,7 @@ use std::{ time::Duration, }; +use rstest::rstest; use zenoh_core::ztimeout; use zenoh_link::Link; use zenoh_protocol::{ @@ -236,12 +237,14 @@ const MSG_SIZE_ALL: [usize; 3] = [1_024, 131_072, 100 * 1024 * 1024]; feature = "transport_tcp", feature = "transport_udp", feature = "transport_unixsock-stream", + feature = "transport_ws", ))] const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; #[cfg(any( feature = "transport_tcp", feature = "transport_udp", feature = "transport_unixsock-stream", + feature = "transport_ws", ))] const MSG_SIZE_LOWLATENCY: [usize; 1] = MSG_SIZE_NOFRAG; @@ -262,6 +265,10 @@ impl SHRouter { fn get_count(&self) -> usize { self.count.load(Ordering::SeqCst) } + + fn reset_count(&self) { + self.count.store(0, Ordering::SeqCst); + } } impl TransportEventHandler for SHRouter { @@ -357,6 +364,9 @@ async fn open_transport_unicast( TransportManager, TransportUnicast, ) { + // Short timeout for connectionless protocols like udp and quick + let open_timeout = Duration::from_secs(5); + // Define client and router IDs let client_id = ZenohIdProto::try_from([1]).unwrap(); let router_id = ZenohIdProto::try_from([2]).unwrap(); @@ -369,7 +379,9 @@ async fn open_transport_unicast( #[cfg(feature = "shared-memory")] false, lowlatency_transport, - ); + ) + .open_timeout(open_timeout); + let router_manager = TransportManager::builder() .zid(router_id) .whatami(WhatAmI::Router) @@ -390,7 +402,9 @@ async fn open_transport_unicast( #[cfg(feature = "shared-memory")] false, lowlatency_transport, - ); + ) + .open_timeout(open_timeout); + let client_manager = TransportManager::builder() .whatami(WhatAmI::Client) .zid(client_id) @@ -437,9 +451,9 @@ async fn close_transport( }); // Stop the locators on the manager - for e in endpoints.iter() { + for e in router_manager.get_listeners().await { println!("Del locator: {}", e); - ztimeout!(router_manager.del_listener(e)).unwrap(); + ztimeout!(router_manager.del_listener(&e)).unwrap(); } ztimeout!(async { @@ -550,6 +564,34 @@ async fn run_single( ) .await; + // Check transport still works despite closing listener endpoints: + println!("Closing router listeners..."); + for endpoint in router_manager.get_listeners().await { + println!("Del listener: {}", endpoint); + router_manager.del_listener(&endpoint).await.unwrap(); + } + tokio::time::sleep(SLEEP).await; + + println!("Testing back the transports after closing the router listeners..."); + router_handler.reset_count(); + test_transport( + router_handler.clone(), + client_transport.clone(), + channel, + msg_size, + ) + .await; + println!("Transports kept working after closing the router listeners..."); + + // Open transport against closed endpoints -> This should fail or timeout + for e in client_endpoints.iter() { + let _ = ztimeout!(client_manager.open_transport_unicast(e.clone())).unwrap_err(); + println!( + "Attempt to open new transport with '{}' (closed router listener) failed as expected.", + e + ); + } + #[cfg(feature = "stats")] { let c_stats = client_transport.get_stats().unwrap().report(); @@ -761,64 +803,64 @@ async fn transport_unicast_unix_only_with_lowlatency_transport() { } #[cfg(feature = "transport_ws")] +#[rstest( + priority, + reliability, + port1, + port2, + case(Priority::DEFAULT, Reliability::Reliable, 16020, 16021), + case(Priority::DEFAULT, Reliability::BestEffort, 16022, 16023), + case(Priority::RealTime, Reliability::Reliable, 16024, 16025), + case(Priority::RealTime, Reliability::BestEffort, 16026, 16027) +)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn transport_unicast_ws_only() { +async fn transport_unicast_ws_only( + priority: Priority, + reliability: Reliability, + port1: u16, + port2: u16, +) { zenoh_util::init_log_from_env_or("error"); // Define the locators let endpoints: Vec = vec![ - format!("ws/127.0.0.1:{}", 16020).parse().unwrap(), - format!("ws/[::1]:{}", 16021).parse().unwrap(), + format!("ws/127.0.0.1:{}", port1).parse().unwrap(), + format!("ws/[::1]:{}", port2).parse().unwrap(), ]; // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; + let channel = [Channel { + priority, + reliability, + }]; // Run run_with_universal_transport(&endpoints, &endpoints, &channel, &MSG_SIZE_ALL).await; } #[cfg(feature = "transport_ws")] +#[rstest( + priority, + reliability, + port1, + case(Priority::DEFAULT, Reliability::Reliable, 16120), + case(Priority::DEFAULT, Reliability::BestEffort, 16121), + case(Priority::RealTime, Reliability::Reliable, 16122), + case(Priority::RealTime, Reliability::BestEffort, 16123) +)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn transport_unicast_ws_only_with_lowlatency_transport() { +async fn transport_unicast_ws_only_with_lowlatency_transport( + priority: Priority, + reliability: Reliability, + port1: u16, +) { zenoh_util::init_log_from_env_or("error"); // Define the locators - let endpoints: Vec = vec![format!("ws/127.0.0.1:{}", 16120).parse().unwrap()]; + let endpoints: Vec = vec![format!("ws/127.0.0.1:{}", port1).parse().unwrap()]; // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; + let channel = [Channel { + priority, + reliability, + }]; // Run run_with_lowlatency_transport(&endpoints, &endpoints, &channel, &MSG_SIZE_LOWLATENCY).await; } @@ -1053,13 +1095,26 @@ async fn transport_unicast_tls_only_server() { } #[cfg(feature = "transport_quic")] +#[rstest( + priority, + reliability, + port1, + case(Priority::DEFAULT, Reliability::Reliable, 16080), + case(Priority::DEFAULT, Reliability::BestEffort, 16081), + case(Priority::RealTime, Reliability::Reliable, 16082), + case(Priority::RealTime, Reliability::BestEffort, 16083) +)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn transport_unicast_quic_only_server() { +async fn transport_unicast_quic_only_server( + priority: Priority, + reliability: Reliability, + port1: u16, +) { use zenoh_link::quic::config::*; zenoh_util::init_log_from_env_or("error"); // Define the locator - let mut endpoint: EndPoint = format!("quic/localhost:{}", 16080).parse().unwrap(); + let mut endpoint: EndPoint = format!("quic/localhost:{}", port1).parse().unwrap(); endpoint .config_mut() .extend_from_iter( @@ -1074,24 +1129,10 @@ async fn transport_unicast_quic_only_server() { .unwrap(); // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; + let channel = [Channel { + priority, + reliability, + }]; // Run let endpoints = vec![endpoint]; run_with_universal_transport(&endpoints, &endpoints, &channel, &MSG_SIZE_ALL).await; @@ -1314,8 +1355,21 @@ fn transport_unicast_tls_only_mutual_wrong_client_certs_failure() { } #[cfg(all(feature = "transport_quic", target_family = "unix"))] +#[rstest( + priority, + reliability, + port1, + case(Priority::DEFAULT, Reliability::Reliable, 11461), + case(Priority::DEFAULT, Reliability::BestEffort, 11462), + case(Priority::RealTime, Reliability::Reliable, 11463), + case(Priority::RealTime, Reliability::BestEffort, 11464) +)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn transport_unicast_quic_only_mutual_success() { +async fn transport_unicast_quic_only_mutual_success( + priority: Priority, + reliability: Reliability, + port1: u16, +) { use zenoh_link::quic::config::*; zenoh_util::init_log_from_env_or("error"); @@ -1323,7 +1377,7 @@ async fn transport_unicast_quic_only_mutual_success() { let client_auth = "true"; // Define the locator - let mut client_endpoint: EndPoint = ("quic/localhost:10461").parse().unwrap(); + let mut client_endpoint: EndPoint = format!("quic/localhost:{}", port1).parse().unwrap(); client_endpoint .config_mut() .extend_from_iter( @@ -1339,7 +1393,7 @@ async fn transport_unicast_quic_only_mutual_success() { .unwrap(); // Define the locator - let mut server_endpoint: EndPoint = ("quic/localhost:10461").parse().unwrap(); + let mut server_endpoint: EndPoint = format!("quic/localhost:{}", port1).parse().unwrap(); server_endpoint .config_mut() .extend_from_iter( @@ -1354,24 +1408,10 @@ async fn transport_unicast_quic_only_mutual_success() { ) .unwrap(); // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::default(), - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::default(), - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; + let channel = [Channel { + priority, + reliability, + }]; // Run let client_endpoints = vec![client_endpoint]; let server_endpoints = vec![server_endpoint];