Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/target
.idea

34 changes: 15 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ rcgen = { version = "0.14.5", optional = true }
# End of Quic related dependencies.

# WebRTC related dependencies. WebRTC is an experimental feature flag. The dependencies must be updated.
str0m = { version = "0.9.0", optional = true }
str0m = { version = "0.11.1", optional = true }
# End of WebRTC related dependencies.

# Fuzzing related dependencies.
Expand Down
35 changes: 26 additions & 9 deletions src/multistream_select/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,25 @@ impl WebRtcDialerState {
&mut self,
payload: Vec<u8>,
) -> Result<HandshakeResult, crate::error::NegotiationError> {
let Message::Protocols(protocols) =
Message::decode(payload.into()).map_err(|_| ParseError::InvalidData)?
else {
return Err(crate::error::NegotiationError::MultistreamSelectError(
NegotiationError::Failed,
));

let protocols = match Message::decode(payload.into()) {
Ok(Message::Header(HeaderLine::V1)) => {
self.state = HandshakeState::WaitingProtocol;
return Ok(HandshakeResult::NotReady);
}
Ok(Message::Protocol(protocol)) => vec![protocol],
Ok(Message::Protocols(protocols)) => protocols,
Ok(Message::NotAvailable) => {
return match &self.state {
HandshakeState::WaitingProtocol =>
Err(error::NegotiationError::MultistreamSelectError(
NegotiationError::Failed
)),
_ => Err(error::NegotiationError::StateMismatch),
}
}
Ok(Message::ListProtocols) => return Err(error::NegotiationError::StateMismatch),
Err(_) => return Err(error::NegotiationError::ParseError(ParseError::InvalidData)),
};

let mut protocol_iter = protocols.into_iter();
Expand Down Expand Up @@ -410,7 +423,9 @@ mod tests {
use super::*;
use crate::multistream_select::listener_select_proto;
use std::time::Duration;
use bytes::BufMut;
use tokio::net::{TcpListener, TcpStream};
use crate::multistream_select::protocol::MSG_MULTISTREAM_1_0;

#[tokio::test]
async fn select_proto_basic() {
Expand Down Expand Up @@ -805,17 +820,19 @@ mod tests {
}

#[test]
fn register_response_invalid_message() {
// send only header line
fn register_response_header_only() {
let mut bytes = BytesMut::with_capacity(32);
bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8);

let message = Message::Header(HeaderLine::V1);
message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

let (mut dialer_state, _message) =
WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();

match dialer_state.register_response(bytes.freeze().to_vec()) {
Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {}
Ok(HandshakeResult::NotReady) => {},
Err(err) => panic!("unexpected error: {:?}", err),
event => panic!("invalid event: {event:?}"),
}
}
Expand Down
26 changes: 22 additions & 4 deletions src/multistream_select/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ impl Message {

// Skip ahead to the next protocol.
remaining = &tail[len..];
if remaining.is_empty() {
// During negotiation the remote may not append a trailing newline.
break;
}
}

Ok(Message::Protocols(protocols))
Expand All @@ -230,7 +234,7 @@ impl Message {
///
/// # Note
///
/// This is implementation is not compliant with the multistream-select protocol spec.
/// This implementation may not be compliant with the multistream-select protocol spec.
/// The only purpose of this was to get the `multistream-select` protocol working with smoldot.
pub fn webrtc_encode_multistream_message(
messages: impl IntoIterator<Item = Message>,
Expand All @@ -249,9 +253,6 @@ pub fn webrtc_encode_multistream_message(
header.append(&mut proto_bytes);
}

// For the `Message::Protocols` to be interpreted correctly, it must be followed by a newline.
header.push(b'\n');

Ok(BytesMut::from(&header[..]))
}

Expand Down Expand Up @@ -542,4 +543,21 @@ mod tests {
ProtocolError::InvalidMessage
);
}

#[test]
fn test_decode_multiple_protocols_no_trailing_newline() {
let raw: [u8; 38] = [
19, 47, 109, 117, 108, 116, 105, 115, 116, 114, 101, 97, 109, 47, 49, 46, 48, 46, 48,
10, 17, 47, 105, 112, 102, 115, 47, 112, 105, 110, 103, 47, 49, 46, 48, 46, 48, 10,
];
let bytes = Bytes::copy_from_slice(&raw);

assert_eq!(
Message::decode(bytes).unwrap(),
Message::Protocols(vec![
Protocol::try_from(Bytes::from_static(b"/multistream/1.0.0")).unwrap(),
Protocol::try_from(Bytes::from_static(b"/ipfs/ping/1.0.0")).unwrap(),
])
);
}
}
39 changes: 32 additions & 7 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,21 +382,32 @@ impl WebRtcConnection {
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
data_len = ?data.len(),
"handle opening outbound substream",
);

let rtc_message = WebRtcMessage::decode(&data)
.map_err(|err| SubstreamError::NegotiationError(err.into()))?;
let message = rtc_message.payload.ok_or(SubstreamError::NegotiationError(
let payload = rtc_message.payload.ok_or(SubstreamError::NegotiationError(
ParseError::InvalidData.into(),
))?;

let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else {
// All multistream-select messages are length-prefixed. Since this code path is not using
// multistream_select::protocol::MessageIO, we need to decode and remove the length here.
let remaining: &[u8] = &payload;
let (len, tail) = unsigned_varint::decode::usize(remaining).
map_err(|_| SubstreamError::NegotiationError(
ParseError::InvalidData.into(),
))?;

let message = tail[..len].to_vec();

let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
"multisteam-select handshake not ready",
"multistream-select handshake not ready",
);

self.channels.insert(
Expand Down Expand Up @@ -631,6 +642,8 @@ impl WebRtcConnection {
protocol: protocol.to_string(),
});

// self.rtc.channel(channel_id).unwrap().set_buffered_amount_low_threshold(1024);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
Expand Down Expand Up @@ -742,6 +755,20 @@ impl WebRtcConnection {

continue;
}
Event::ChannelBufferedAmountLow(channel_id) => {
if let Some(ChannelState::Closing) = self.channels.get(&channel_id) {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
"buffer drained, closing channel",
);
self.rtc.direct_api().close_data_channel(channel_id);
self.handles.remove(&channel_id);
}

continue;
}
event => {
tracing::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -787,17 +814,15 @@ impl WebRtcConnection {
},
event = self.handles.next() => match event {
None => unreachable!(),
Some((channel_id, None | Some(SubstreamEvent::Close))) => {
Some((_, None)) => {}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a superficial fix for another unrelated issue which I haven't investigated yet. Without this change, a "channel closed" event is fired repeatedly after a while. I believe this is caused by SubstreamHandle::tx being dropped, presumably when the substream(/channel?) is closed. The fact that the event continues firing suggests that SubstreamHandle::rx continues to be polled. Maybe the handle simply needs to be removed from self.handles?

I've also seen this happen when briefly testing with a rust-libp2p dialer, so this is not related to smoldot interop.

Some((channel_id, Some(SubstreamEvent::Close))) => {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
"channel closed",
);

self.rtc.direct_api().close_data_channel(channel_id);
self.channels.insert(channel_id, ChannelState::Closing);
self.handles.remove(&channel_id);
}
Some((channel_id, Some(SubstreamEvent::Message(data)))) => {
if let Err(error) = self.on_outbound_data(channel_id, data) {
Expand Down
8 changes: 4 additions & 4 deletions src/transport/webrtc/opening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ impl OpeningWebRtcConnection {
.rtc
.direct_api()
.remote_dtls_fingerprint()
.clone()
.expect("fingerprint to exist");
.expect("fingerprint to exist")
.clone();
Self::fingerprint_to_bytes(&fingerprint)
}

/// Get local fingerprint as bytes.
fn local_fingerprint(&mut self) -> Vec<u8> {
Self::fingerprint_to_bytes(&self.rtc.direct_api().local_dtls_fingerprint())
Self::fingerprint_to_bytes(self.rtc.direct_api().local_dtls_fingerprint())
}

/// Convert `Fingerprint` to bytes.
Expand Down Expand Up @@ -268,8 +268,8 @@ impl OpeningWebRtcConnection {
.rtc
.direct_api()
.remote_dtls_fingerprint()
.clone()
.expect("fingerprint to exist")
.clone()
.bytes;

const MULTIHASH_SHA256_CODE: u64 = 0x12;
Expand Down