Skip to content

Commit 337538f

Browse files
altonendmitry-markinlexnv
authored
Refactor WebRTC code (#51)
This PR refactors the WebRTC code into something that actually works. The `WebRtcConnection` code is split into `OpeningWebRtcConnection` and `WebRtcConnection`. The former only deals with the Noise handshake whereas the latter implements a typical connection event loop, dealing with inbound/outbound substreams and data exchange. The PR also implements a new substream type for WebRTC which is implemented using `tokio::sync::mpsc::{Sender, Receiver}` since `str0m` doesn't expose a substream type. The `multistream-select` code used by WebRTC is refactored and its test coverage is increased. This PR also updates `str0m` to the latest version but during testing it was discovered that there are issues with `ChannelId`/SCTP stream id reuse and those have to be reported to upstream and fixed. They're are not blockers for merging this PR though because the fixes should be contained to `str0m` and for litep2p it would only be a version bump if the fixes are accepted. The WebRTC implementation (with pending fixes to `str0m`) has been tested with smoldot and the code works. IPFS Ping, Kademlia and Identify all work, opening and negotiating notification protocols work and block requests work. There are, however, two issues that I was able to find: 1) The first "notification" smoldot reads from the substream is not actually a notification but the substream handshake which results in "protocol-error": ``` [network] protocol-error; peer_id=12D3KooWNGCW63xu9tGh1WxWEazHs6UgMdTjrLSos89NrVSCh9bz, error=BadBlockAnnounce(DecodeBlockAnnounceError(Verify)) [network] protocol-error; peer_id=12D3KooWNGCW63xu9tGh1WxWEazHs6UgMdTjrLSos89NrVSCh9bz, error=BadGrandpaNotification(DecodeGrandpaNotificationError(Verify)) ``` It looks like smoldot is still able to receive block announcements and GRANDPA notifications so I'm not entirely what the issue here is. 2) There is another issues with justifications but I don't see how it could be related to WebRTC ``` [sync-service-westend2] finality-proof-verify-error; error=FinalityVerify(UnknownTargetBlock { block_number: 19772928, block_hash: [22, 163, 175, 94, 114, 75, 142, 174, 83, 146, 11, 251, 184, 119, 212, 76, 34, 85, 144, 47, 83, 251, 83, 86, 152, 141, 204, 104, 253, 236, 21, 48] }), sender=12D3KooWNGCW63xu9tGh1WxWEazHs6UgMdTjrLSos89NrVSCh9bz [sync-service-westend2] Error while verifying justification: Justification targets a block (#19772928) that isn't in the chain. ``` This happens on the first try, gossip is closed, then it's opened again and it's able to start warp sync. --------- Co-authored-by: Dmitry Markin <[email protected]> Co-authored-by: Alexandru Vasile <[email protected]>
1 parent fd8a99f commit 337538f

File tree

16 files changed

+2406
-757
lines changed

16 files changed

+2406
-757
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ simple-dns = "0.5.3"
3636
smallvec = "1.10.0"
3737
snow = { version = "0.9.3", features = ["ring-resolver"], default-features = false }
3838
socket2 = { version = "0.5.5", features = ["all"] }
39-
str0m = "0.2.0"
39+
str0m = "0.4.1"
4040
thiserror = "1.0.39"
4141
tokio-stream = "0.1.12"
4242
tokio-tungstenite = { version = "0.20.0", features = ["rustls-tls-native-roots"] }

src/multistream_select/dialer_select.rs

Lines changed: 159 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use crate::{
2424
codec::unsigned_varint::UnsignedVarint,
2525
error::{self, Error},
2626
multistream_select::{
27-
protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError},
27+
protocol::{
28+
encode_multistream_message, HeaderLine, Message, MessageIO, Protocol, ProtocolError,
29+
},
2830
Negotiated, NegotiationError, Version,
2931
},
3032
types::protocol::ProtocolName,
@@ -224,7 +226,7 @@ where
224226
}
225227

226228
/// `multistream-select` handshake result for dialer.
227-
#[derive(Debug)]
229+
#[derive(Debug, PartialEq, Eq)]
228230
pub enum HandshakeResult {
229231
/// Handshake is not complete, data missing.
230232
NotReady,
@@ -259,7 +261,6 @@ pub struct DialerState {
259261
state: HandshakeState,
260262
}
261263

262-
// TODO: tests
263264
impl DialerState {
264265
/// Propose protocol to remote peer.
265266
///
@@ -269,29 +270,22 @@ impl DialerState {
269270
protocol: ProtocolName,
270271
fallback_names: Vec<ProtocolName>,
271272
) -> crate::Result<(Self, Vec<u8>)> {
272-
// encode `/multistream-select/1.0.0` header
273-
let mut bytes = BytesMut::with_capacity(64);
274-
let message = Message::Header(HeaderLine::V1);
275-
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData)?;
276-
let mut header = UnsignedVarint::encode(bytes)?;
277-
278-
// encode proposed protocol
279-
let mut proto_bytes = BytesMut::with_capacity(512);
280-
let message = Message::Protocol(Protocol::try_from(protocol.as_bytes()).unwrap());
281-
let _ = message.encode(&mut proto_bytes).map_err(|_| Error::InvalidData)?;
282-
let proto_bytes = UnsignedVarint::encode(proto_bytes)?;
283-
284-
// TODO: add fallback names
285-
286-
header.append(&mut proto_bytes.into());
273+
let message = encode_multistream_message(
274+
std::iter::once(protocol.clone())
275+
.chain(fallback_names.clone())
276+
.filter_map(|protocol| Protocol::try_from(protocol.as_ref()).ok())
277+
.map(|protocol| Message::Protocol(protocol)),
278+
)?
279+
.freeze()
280+
.to_vec();
287281

288282
Ok((
289283
Self {
290284
protocol,
291285
fallback_names,
292286
state: HandshakeState::WaitingResponse,
293287
},
294-
header,
288+
message,
295289
))
296290
}
297291

@@ -328,10 +322,9 @@ impl DialerState {
328322
return Ok(HandshakeResult::Succeeded(self.protocol.clone()));
329323
}
330324

331-
// TODO: zzz
332325
for fallback in &self.fallback_names {
333326
if fallback.as_bytes() == protocol.as_ref() {
334-
return Ok(HandshakeResult::Succeeded(self.protocol.clone()));
327+
return Ok(HandshakeResult::Succeeded(fallback.clone()));
335328
}
336329
}
337330

@@ -346,3 +339,148 @@ impl DialerState {
346339
}
347340
}
348341
}
342+
343+
#[cfg(test)]
344+
mod tests {
345+
use super::*;
346+
347+
#[test]
348+
fn propose() {
349+
let (mut dialer_state, message) =
350+
DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();
351+
let message = bytes::BytesMut::from(&message[..]).freeze();
352+
353+
let Message::Protocols(protocols) = Message::decode(message).unwrap() else {
354+
panic!("invalid message type");
355+
};
356+
357+
assert_eq!(protocols.len(), 2);
358+
assert_eq!(
359+
protocols[0],
360+
Protocol::try_from(&b"/multistream/1.0.0"[..])
361+
.expect("valid multitstream-select header")
362+
);
363+
assert_eq!(
364+
protocols[1],
365+
Protocol::try_from(&b"/13371338/proto/1"[..])
366+
.expect("valid multitstream-select header")
367+
);
368+
}
369+
370+
#[test]
371+
fn propose_with_fallback() {
372+
let (mut dialer_state, message) = DialerState::propose(
373+
ProtocolName::from("/13371338/proto/1"),
374+
vec![ProtocolName::from("/sup/proto/1")],
375+
)
376+
.unwrap();
377+
let message = bytes::BytesMut::from(&message[..]).freeze();
378+
379+
let Message::Protocols(protocols) = Message::decode(message).unwrap() else {
380+
panic!("invalid message type");
381+
};
382+
383+
assert_eq!(protocols.len(), 3);
384+
assert_eq!(
385+
protocols[0],
386+
Protocol::try_from(&b"/multistream/1.0.0"[..])
387+
.expect("valid multitstream-select header")
388+
);
389+
assert_eq!(
390+
protocols[1],
391+
Protocol::try_from(&b"/13371338/proto/1"[..])
392+
.expect("valid multitstream-select header")
393+
);
394+
assert_eq!(
395+
protocols[2],
396+
Protocol::try_from(&b"/sup/proto/1"[..]).expect("valid multitstream-select header")
397+
);
398+
}
399+
400+
#[test]
401+
fn register_response_invalid_message() {
402+
// send only header line
403+
let mut bytes = BytesMut::with_capacity(32);
404+
let message = Message::Header(HeaderLine::V1);
405+
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();
406+
407+
let (mut dialer_state, _message) =
408+
DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();
409+
410+
match dialer_state.register_response(bytes.freeze().to_vec()) {
411+
Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError(
412+
NegotiationError::Failed,
413+
))) => {}
414+
event => panic!("invalid event: {event:?}"),
415+
}
416+
}
417+
418+
#[test]
419+
fn header_line_missing() {
420+
// header line missing
421+
let mut bytes = BytesMut::with_capacity(256);
422+
let message = Message::Protocols(vec![
423+
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
424+
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
425+
]);
426+
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();
427+
428+
let (mut dialer_state, _message) =
429+
DialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();
430+
431+
match dialer_state.register_response(bytes.freeze().to_vec()) {
432+
Err(Error::NegotiationError(error::NegotiationError::MultistreamSelectError(
433+
NegotiationError::Failed,
434+
))) => {}
435+
event => panic!("invalid event: {event:?}"),
436+
}
437+
}
438+
439+
#[test]
440+
fn negotiate_main_protocol() {
441+
let message = encode_multistream_message(
442+
vec![Message::Protocol(
443+
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
444+
)]
445+
.into_iter(),
446+
)
447+
.unwrap()
448+
.freeze();
449+
450+
let (mut dialer_state, _message) = DialerState::propose(
451+
ProtocolName::from("/13371338/proto/1"),
452+
vec![ProtocolName::from("/sup/proto/1")],
453+
)
454+
.unwrap();
455+
456+
match dialer_state.register_response(message.to_vec()) {
457+
Ok(HandshakeResult::Succeeded(negotiated)) =>
458+
assert_eq!(negotiated, ProtocolName::from("/13371338/proto/1")),
459+
_ => panic!("invalid event"),
460+
}
461+
}
462+
463+
#[test]
464+
fn negotiate_fallback_protocol() {
465+
let message = encode_multistream_message(
466+
vec![Message::Protocol(
467+
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
468+
)]
469+
.into_iter(),
470+
)
471+
.unwrap()
472+
.freeze();
473+
474+
let (mut dialer_state, _message) = DialerState::propose(
475+
ProtocolName::from("/13371338/proto/1"),
476+
vec![ProtocolName::from("/sup/proto/1")],
477+
)
478+
.unwrap();
479+
480+
match dialer_state.register_response(message.to_vec()) {
481+
Ok(HandshakeResult::Succeeded(negotiated)) =>
482+
assert_eq!(negotiated, ProtocolName::from("/sup/proto/1")),
483+
_ => panic!("invalid event"),
484+
}
485+
}
486+
}

0 commit comments

Comments
 (0)