2828//! Run: `RUST_LOG=info cargo run --example custom_executor`
2929
3030use litep2p:: {
31- config:: ConfigBuilder ,
32- executor:: Executor ,
33- protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34- transport:: tcp:: config:: Config as TcpConfig ,
35- Litep2p ,
31+ config:: ConfigBuilder ,
32+ executor:: Executor ,
33+ protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34+ transport:: tcp:: config:: Config as TcpConfig ,
35+ Litep2p ,
3636} ;
3737
3838use futures:: { future:: BoxFuture , stream:: FuturesUnordered , Stream , StreamExt } ;
@@ -44,112 +44,102 @@ use std::{future::Future, pin::Pin, sync::Arc};
4444///
4545/// Just a wrapper around `FuturesUnordered` which receives the futures over `mpsc::Receiver`.
4646struct TaskExecutor {
47- rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48- futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
47+ rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48+ futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
4949}
5050
5151impl TaskExecutor {
52- /// Create new [`TaskExecutor`].
53- fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54- let ( tx, rx) = channel ( 64 ) ;
55-
56- (
57- Self {
58- rx,
59- futures : FuturesUnordered :: new ( ) ,
60- } ,
61- tx,
62- )
63- }
64-
65- /// Drive the futures forward and poll the receiver for any new futures.
66- async fn next ( & mut self ) {
67- loop {
68- tokio:: select! {
69- future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
70- _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
71- }
72- }
73- }
52+ /// Create new [`TaskExecutor`].
53+ fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54+ let ( tx, rx) = channel ( 64 ) ;
55+
56+ ( Self { rx, futures : FuturesUnordered :: new ( ) } , tx)
57+ }
58+
59+ /// Drive the futures forward and poll the receiver for any new futures.
60+ async fn next ( & mut self ) {
61+ loop {
62+ tokio:: select! {
63+ future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
64+ _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
65+ }
66+ }
67+ }
7468}
7569
7670struct TaskExecutorHandle {
77- tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
71+ tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
7872}
7973
8074impl Executor for TaskExecutorHandle {
81- fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
82- let _ = self . tx . try_send ( future) ;
83- }
75+ fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
76+ let _ = self . tx . try_send ( future) ;
77+ }
8478
85- fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
86- let _ = self . tx . try_send ( future) ;
87- }
79+ fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
80+ let _ = self . tx . try_send ( future) ;
81+ }
8882}
8983
90- fn make_litep2p ( ) -> (
91- Litep2p ,
92- TaskExecutor ,
93- Box < dyn Stream < Item = PingEvent > + Send + Unpin > ,
94- ) {
95- let ( executor, sender) = TaskExecutor :: new ( ) ;
96- let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
97-
98- let litep2p = Litep2p :: new (
99- ConfigBuilder :: new ( )
100- . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
101- . with_tcp ( TcpConfig {
102- listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
103- ..Default :: default ( )
104- } )
105- . with_libp2p_ping ( ping_config)
106- . build ( ) ,
107- )
108- . unwrap ( ) ;
109-
110- ( litep2p, executor, ping_event_stream)
84+ fn make_litep2p ( ) -> ( Litep2p , TaskExecutor , Box < dyn Stream < Item = PingEvent > + Send + Unpin > ) {
85+ let ( executor, sender) = TaskExecutor :: new ( ) ;
86+ let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
87+
88+ let litep2p = Litep2p :: new (
89+ ConfigBuilder :: new ( )
90+ . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
91+ . with_tcp ( TcpConfig {
92+ listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
93+ ..Default :: default ( )
94+ } )
95+ . with_libp2p_ping ( ping_config)
96+ . build ( ) ,
97+ )
98+ . unwrap ( ) ;
99+
100+ ( litep2p, executor, ping_event_stream)
111101}
112102
113103#[ tokio:: main]
114104async fn main ( ) {
115- let _ = tracing_subscriber:: fmt ( )
116- . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
117- . try_init ( ) ;
118-
119- // create two identical litep2ps
120- let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
121- let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
122-
123- // dial `litep2p1`
124- litep2p2
125- . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
126- . await
127- . unwrap ( ) ;
128-
129- tokio:: spawn ( async move {
130- loop {
131- tokio:: select! {
132- _ = executor1. next( ) => { }
133- _ = litep2p1. next_event( ) => { } ,
134- _ = ping_event_stream1. next( ) => { } ,
135- }
136- }
137- } ) ;
138-
139- // poll litep2p, task executor and ping event stream all together
140- //
141- // since a custom task executor was provided, it's now the user's responsibility
142- // to actually make sure to poll those futures so that litep2p can make progress
143- loop {
144- tokio:: select! {
145- _ = executor2. next( ) => { }
146- _ = litep2p2. next_event( ) => { } ,
147- event = ping_event_stream2. next( ) => match event {
148- Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
149- "ping time with {peer:?}: {ping:?}"
150- ) ,
151- _ => { }
152- }
153- }
154- }
105+ let _ = tracing_subscriber:: fmt ( )
106+ . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
107+ . try_init ( ) ;
108+
109+ // create two identical litep2ps
110+ let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
111+ let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
112+
113+ // dial `litep2p1`
114+ litep2p2
115+ . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
116+ . await
117+ . unwrap ( ) ;
118+
119+ tokio:: spawn ( async move {
120+ loop {
121+ tokio:: select! {
122+ _ = executor1. next( ) => { }
123+ _ = litep2p1. next_event( ) => { } ,
124+ _ = ping_event_stream1. next( ) => { } ,
125+ }
126+ }
127+ } ) ;
128+
129+ // poll litep2p, task executor and ping event stream all together
130+ //
131+ // since a custom task executor was provided, it's now the user's responsibility
132+ // to actually make sure to poll those futures so that litep2p can make progress
133+ loop {
134+ tokio:: select! {
135+ _ = executor2. next( ) => { }
136+ _ = litep2p2. next_event( ) => { } ,
137+ event = ping_event_stream2. next( ) => match event {
138+ Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
139+ "ping time with {peer:?}: {ping:?}"
140+ ) ,
141+ _ => { }
142+ }
143+ }
144+ }
155145}
0 commit comments