diff --git a/src/lib.rs b/src/lib.rs index c8c2b91cc..971bab041 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,7 @@ mod sockopt; use crate::message::msg_ptr; pub use crate::message::Message; +pub use crate::DeviceType::*; pub use crate::SocketType::*; /// `zmq`-specific Result type. @@ -95,6 +96,36 @@ impl SocketType { } } +/// Device types +#[allow(non_camel_case_types)] +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum DeviceType { + STREAMER, + FORWARDER, + QUEUE, +} + +impl DeviceType { + fn to_raw(self) -> c_int { + let raw = match self { + STREAMER => zmq_sys::ZMQ_STREAMER, + FORWARDER => zmq_sys::ZMQ_FORWARDER, + QUEUE => zmq_sys::ZMQ_QUEUE, + }; + raw as c_int + } + + fn from_raw(raw: c_int) -> DeviceType { + match raw as u32 { + zmq_sys::ZMQ_STREAMER => STREAMER, + zmq_sys::ZMQ_FORWARDER => FORWARDER, + zmq_sys::ZMQ_QUEUE => QUEUE, + // return STREAMER instead of panic + _ => STREAMER, + } + } +} + /// Socket Events #[allow(non_camel_case_types)] #[derive(Clone, Copy, Debug, PartialEq)] @@ -1128,6 +1159,18 @@ pub fn poll(items: &mut [PollItem], timeout: i64) -> Result { Ok(rc as i32) } +/// Start a 0MQ device in the current thread. +/// +/// A device connects a frontend socket with a backend socket, where both the +/// server and client are dynamic +/// +/// This function only returns (always with an `Err`) when the sockets' context +/// has been closed. +pub fn device(device_type: DeviceType, frontend: &Socket, backend: &Socket) -> Result<()> { + zmq_try!(unsafe { zmq_sys::zmq_device(device_type.to_raw(), frontend.sock, backend.sock,) }); + Ok(()) +} + /// Start a 0MQ proxy in the current thread. /// /// A proxy connects a frontend socket with a backend socket, where the exact diff --git a/zmq-sys/src/lib.rs b/zmq-sys/src/lib.rs index 9960d8751..8cff0a1d6 100644 --- a/zmq-sys/src/lib.rs +++ b/zmq-sys/src/lib.rs @@ -179,6 +179,11 @@ pub use crate::ffi::{ ZMQ_POLLITEMS_DFLT, + // device types + ZMQ_STREAMER, + ZMQ_FORWARDER, + ZMQ_QUEUE, + // Undeprecated types. zmq_msg_t, zmq_free_fn,