Skip to content

Commit fedc74c

Browse files
authored
Set the client side SUB IOPub socket subscription *before* we connect (#673)
1 parent 3d7afa5 commit fedc74c

File tree

2 files changed

+22
-37
lines changed

2 files changed

+22
-37
lines changed

crates/amalthea/src/fixtures/dummy_frontend.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,10 @@ impl DummyFrontend {
182182
)
183183
.unwrap();
184184

185-
// Subscribe to IOPub! Server's XPUB socket will receive a notification of
186-
// our subscription with `subscription`, then will publish an IOPub `Welcome`
187-
// message, sending back our `subscription`.
188-
iopub_socket.subscribe(b"").unwrap();
189-
190-
// Immediately block until we've received the IOPub welcome message.
191-
// This confirms that we've fully subscribed and avoids dropping any
192-
// of the initial IOPub messages that a server may send if we start
193-
// perform requests immediately.
185+
// Immediately block until we've received the IOPub welcome message from the XPUB
186+
// server side socket. This confirms that we've fully subscribed and avoids
187+
// dropping any of the initial IOPub messages that a server may send if we start
188+
// to perform requests immediately (in particular, busy/idle messages).
194189
// https://github.com/posit-dev/ark/pull/577
195190
assert_matches!(Self::recv(&iopub_socket), Message::Welcome(data) => {
196191
assert_eq!(data.content.subscription, String::from(""));

crates/amalthea/src/socket/socket.rs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,24 @@ impl Socket {
6262
.map_err(|err| Error::CreateSocketFailed(name.clone(), err))?;
6363
}
6464

65+
if name == "IOPub" && kind == zmq::SocketType::SUB {
66+
// For the client side of IOPub (in tests and eventually kallichore), we need
67+
// to subscribe our SUB to messages from the XPUB on the server side. We use
68+
// `""` to subscribe to all message types, there is no reason to filter any
69+
// out. It is very important that we subscribe BEFORE we `connect()`. If we
70+
// don't subscribe first, then the XPUB on the server side can come online
71+
// first and processes our `connect()` before we've actually subscribed, which
72+
// causes the welcome message the XPUB sends us to get dropped, preventing us
73+
// from correctly starting up, because we block until we've received that
74+
// welcome message. In the link below, you can see proof that zmq only sends
75+
// the welcome message out when it processes our `connect()` call, so if we
76+
// aren't subscribed by that point, we miss it.
77+
// https://github.com/zeromq/libzmq/blob/34f7fa22022bed9e0e390ed3580a1c83ac4a2834/src/xpub.cpp#L56-L65
78+
socket
79+
.set_subscribe(b"")
80+
.map_err(|err| Error::CreateSocketFailed(name.clone(), err))?;
81+
}
82+
6583
// If this is a debug build, set `ZMQ_ROUTER_MANDATORY` on all `ROUTER`
6684
// sockets, so that we get errors instead of silent message drops for
6785
// unroutable messages.
@@ -196,32 +214,4 @@ impl Socket {
196214
pub fn has_incoming_data(&self) -> zmq::Result<bool> {
197215
self.poll_incoming(0)
198216
}
199-
200-
/// Subscribes a SUB socket to messages from an XPUB socket.
201-
///
202-
/// Use `b""` to subscribe to all messages.
203-
///
204-
/// Note that this needs to be called *after* the socket connection is
205-
/// established on both ends.
206-
pub fn subscribe(&self, subscription: &[u8]) -> Result<(), Error> {
207-
let socket_type = match self.socket.get_socket_type() {
208-
Ok(socket_type) => socket_type,
209-
Err(err) => return Err(Error::ZmqError(self.name.clone(), err)),
210-
};
211-
212-
if socket_type != zmq::SocketType::SUB {
213-
return Err(crate::anyhow!(
214-
"Can't subscribe on a non-SUB socket. This socket is a {socket_type:?}."
215-
));
216-
}
217-
218-
// Currently, all SUB sockets subscribe to all topics; in theory
219-
// frontends could subscribe selectively, but in practice all known
220-
// Jupyter frontends subscribe to all topics and just ignore topics
221-
// they don't recognize.
222-
match self.socket.set_subscribe(subscription) {
223-
Ok(_) => Ok(()),
224-
Err(err) => Err(Error::ZmqError(self.name.clone(), err)),
225-
}
226-
}
227217
}

0 commit comments

Comments
 (0)