Skip to content

Commit 85fae21

Browse files
committed
fix: multistream-select negotiation for outbound webrtc substreams
1 parent 4200246 commit 85fae21

File tree

3 files changed

+45
-14
lines changed

3 files changed

+45
-14
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
/target
2+
.idea
3+

src/multistream_select/dialer_select.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -357,12 +357,25 @@ impl WebRtcDialerState {
357357
&mut self,
358358
payload: Vec<u8>,
359359
) -> Result<HandshakeResult, crate::error::NegotiationError> {
360-
let Message::Protocols(protocols) =
361-
Message::decode(payload.into()).map_err(|_| ParseError::InvalidData)?
362-
else {
363-
return Err(crate::error::NegotiationError::MultistreamSelectError(
364-
NegotiationError::Failed,
365-
));
360+
361+
let protocols = match Message::decode(payload.into()) {
362+
Ok(Message::Header(HeaderLine::V1)) => {
363+
self.state = HandshakeState::WaitingProtocol;
364+
return Ok(HandshakeResult::NotReady);
365+
}
366+
Ok(Message::Protocol(protocol)) => vec![protocol],
367+
Ok(Message::Protocols(protocols)) => protocols,
368+
Ok(Message::NotAvailable) => {
369+
return match &self.state {
370+
HandshakeState::WaitingProtocol =>
371+
Err(error::NegotiationError::MultistreamSelectError(
372+
NegotiationError::Failed
373+
)),
374+
_ => Err(error::NegotiationError::StateMismatch),
375+
}
376+
}
377+
Ok(Message::ListProtocols) => return Err(error::NegotiationError::StateMismatch),
378+
Err(_) => return Err(error::NegotiationError::ParseError(ParseError::InvalidData)),
366379
};
367380

368381
let mut protocol_iter = protocols.into_iter();
@@ -410,7 +423,9 @@ mod tests {
410423
use super::*;
411424
use crate::multistream_select::listener_select_proto;
412425
use std::time::Duration;
426+
use bytes::BufMut;
413427
use tokio::net::{TcpListener, TcpStream};
428+
use crate::multistream_select::protocol::MSG_MULTISTREAM_1_0;
414429

415430
#[tokio::test]
416431
async fn select_proto_basic() {
@@ -805,17 +820,19 @@ mod tests {
805820
}
806821

807822
#[test]
808-
fn register_response_invalid_message() {
809-
// send only header line
823+
fn register_response_header_only() {
810824
let mut bytes = BytesMut::with_capacity(32);
825+
bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8);
826+
811827
let message = Message::Header(HeaderLine::V1);
812828
message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();
813829

814830
let (mut dialer_state, _message) =
815831
WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();
816832

817833
match dialer_state.register_response(bytes.freeze().to_vec()) {
818-
Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {}
834+
Ok(HandshakeResult::NotReady) => {},
835+
Err(err) => panic!("unexpected error: {:?}", err),
819836
event => panic!("invalid event: {event:?}"),
820837
}
821838
}

src/transport/webrtc/connection.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -382,21 +382,32 @@ impl WebRtcConnection {
382382
target: LOG_TARGET,
383383
peer = ?self.peer,
384384
?channel_id,
385+
data_len = ?data.len(),
385386
"handle opening outbound substream",
386387
);
387388

388389
let rtc_message = WebRtcMessage::decode(&data)
389390
.map_err(|err| SubstreamError::NegotiationError(err.into()))?;
390-
let message = rtc_message.payload.ok_or(SubstreamError::NegotiationError(
391+
let payload = rtc_message.payload.ok_or(SubstreamError::NegotiationError(
391392
ParseError::InvalidData.into(),
392393
))?;
393394

394-
let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else {
395+
// All multistream-select messages are length-prefixed. Since this code path is not using
396+
// multistream_select::protocol::MessageIO, we need to decode and remove the length here.
397+
let remaining: &[u8] = &payload;
398+
let (len, tail) = unsigned_varint::decode::usize(remaining).
399+
map_err(|_| SubstreamError::NegotiationError(
400+
ParseError::InvalidData.into(),
401+
))?;
402+
403+
let message = tail[..len].to_vec();
404+
405+
let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else {
395406
tracing::trace!(
396407
target: LOG_TARGET,
397408
peer = ?self.peer,
398409
?channel_id,
399-
"multisteam-select handshake not ready",
410+
"multistream-select handshake not ready",
400411
);
401412

402413
self.channels.insert(
@@ -631,7 +642,7 @@ impl WebRtcConnection {
631642
protocol: protocol.to_string(),
632643
});
633644

634-
self.rtc.channel(channel_id).unwrap().set_buffered_amount_low_threshold(1024);
645+
// self.rtc.channel(channel_id).unwrap().set_buffered_amount_low_threshold(1024);
635646

636647
tracing::trace!(
637648
target: LOG_TARGET,
@@ -803,7 +814,8 @@ impl WebRtcConnection {
803814
},
804815
event = self.handles.next() => match event {
805816
None => unreachable!(),
806-
Some((channel_id, None | Some(SubstreamEvent::Close))) => {
817+
Some((_, None)) => {}
818+
Some((channel_id, Some(SubstreamEvent::Close))) => {
807819
tracing::trace!(
808820
target: LOG_TARGET,
809821
peer = ?self.peer,

0 commit comments

Comments
 (0)