Skip to content

Make sure that UDP transports remain open when UDP listener is closed #1859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ regex = "1.10.6"
ron = "0.8.1"
ringbuffer-spsc = "0.1.9"
rsa = "0.9"
rstest = "0.25"
rustc_version = "0.4.1"
rustls = { version = "0.23.13", default-features = false, features = [
"logging",
Expand Down
3 changes: 3 additions & 0 deletions io/zenoh-links/zenoh-link-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ description = "Internal crate for zenoh."
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
flume = { workspace = true }
tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] }
tokio-util = { workspace = true, features = ["rt"] }
async-trait = { workspace = true }
futures = { workspace = true }
tracing = {workspace = true}
socket2 = { workspace = true }
zenoh-buffers = { workspace = true }
Expand All @@ -37,3 +39,4 @@ zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-sync = { workspace = true }
zenoh-util = { workspace = true }
zenoh-runtime = { workspace = true }
1 change: 1 addition & 0 deletions io/zenoh-links/zenoh-link-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! This crate is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
mod listener;
mod multicast;
mod unicast;

Expand Down
141 changes: 141 additions & 0 deletions io/zenoh-links/zenoh-link-udp/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::{Arc, RwLock},
};

use flume::Sender;
use futures::Future;
use tokio_util::sync::CancellationToken;
use zenoh_core::{zread, zwrite};
use zenoh_link_commons::BIND_INTERFACE;
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{zerror, ZResult};

pub struct ListenerUnicastUDP {
endpoint: EndPoint,
sender: Sender<()>,
}

impl ListenerUnicastUDP {
fn new(endpoint: EndPoint, sender: Sender<()>) -> ListenerUnicastUDP {
ListenerUnicastUDP { endpoint, sender }
}

async fn stop(&self) {
self.sender.send_async(()).await.ok();
}
}

pub struct ListenersUnicastUDP {
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastUDP>>>,
pub listeners_cancellation_token: CancellationToken,
}

impl ListenersUnicastUDP {
pub fn new() -> ListenersUnicastUDP {
ListenersUnicastUDP {
listeners: Arc::new(RwLock::new(HashMap::new())),
listeners_cancellation_token: CancellationToken::new(),
}
}

pub async fn add_listener<F>(
&self,
endpoint: EndPoint,
addr: SocketAddr,
future: F,
sender: flume::Sender<()>,
) -> ZResult<()>
where
F: Future<Output = ZResult<()>> + Send + 'static,
{
let mut listeners = zwrite!(self.listeners);
let c_listeners = self.listeners.clone();
let c_addr = addr;
let task = async move {
// Wait for the accept loop to terminate
let res = future.await;
zwrite!(c_listeners).remove(&c_addr);
res
};
drop(zenoh_runtime::ZRuntime::Acceptor.spawn(task));

let listener = ListenerUnicastUDP::new(endpoint, sender);
// Update the list of active listeners on the manager
listeners.insert(addr, listener);
Ok(())
}

pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> {
// Stop the listener
let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| {
zerror!(
"Can not delete the listener because it has not been found: {}",
addr
)
})?;

// Send the stop signal
listener.stop().await;
Ok(())
}

pub fn get_endpoints(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|l| l.endpoint.clone())
.collect()
}

pub fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];

let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());
let config = value.endpoint.config();
let iface = config.get(BIND_INTERFACE);

// Either ipv4/0.0.0.0 or ipv6/[::]
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(iface),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(iface),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}

locators
}
}

impl Default for ListenersUnicastUDP {
fn default() -> Self {
Self::new()
}
}
Loading
Loading