2828//! Run: `RUST_LOG=info cargo run --example custom_executor`
2929
3030use litep2p:: {
31- config:: ConfigBuilder ,
32- executor:: Executor ,
33- protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34- transport:: tcp:: config:: Config as TcpConfig ,
35- Litep2p ,
31+ config:: ConfigBuilder ,
32+ executor:: Executor ,
33+ protocol:: libp2p:: ping:: { Config as PingConfig , PingEvent } ,
34+ transport:: tcp:: config:: Config as TcpConfig ,
35+ Litep2p ,
3636} ;
3737
3838use futures:: { future:: BoxFuture , stream:: FuturesUnordered , Stream , StreamExt } ;
@@ -44,102 +44,112 @@ use std::{future::Future, pin::Pin, sync::Arc};
4444///
4545/// Just a wrapper around `FuturesUnordered` which receives the futures over `mpsc::Receiver`.
4646struct TaskExecutor {
47- rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48- futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
47+ rx : Receiver < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
48+ futures : FuturesUnordered < BoxFuture < ' static , ( ) > > ,
4949}
5050
5151impl TaskExecutor {
52- /// Create new [`TaskExecutor`].
53- fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54- let ( tx, rx) = channel ( 64 ) ;
55-
56- ( Self { rx, futures : FuturesUnordered :: new ( ) } , tx)
57- }
58-
59- /// Drive the futures forward and poll the receiver for any new futures.
60- async fn next ( & mut self ) {
61- loop {
62- tokio:: select! {
63- future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
64- _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
65- }
66- }
67- }
52+ /// Create new [`TaskExecutor`].
53+ fn new ( ) -> ( Self , Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ) {
54+ let ( tx, rx) = channel ( 64 ) ;
55+
56+ (
57+ Self {
58+ rx,
59+ futures : FuturesUnordered :: new ( ) ,
60+ } ,
61+ tx,
62+ )
63+ }
64+
65+ /// Drive the futures forward and poll the receiver for any new futures.
66+ async fn next ( & mut self ) {
67+ loop {
68+ tokio:: select! {
69+ future = self . rx. recv( ) => self . futures. push( future. unwrap( ) ) ,
70+ _ = self . futures. next( ) , if !self . futures. is_empty( ) => { }
71+ }
72+ }
73+ }
6874}
6975
7076struct TaskExecutorHandle {
71- tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
77+ tx : Sender < Pin < Box < dyn Future < Output = ( ) > + Send > > > ,
7278}
7379
7480impl Executor for TaskExecutorHandle {
75- fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
76- let _ = self . tx . try_send ( future) ;
77- }
81+ fn run ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
82+ let _ = self . tx . try_send ( future) ;
83+ }
7884
79- fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
80- let _ = self . tx . try_send ( future) ;
81- }
85+ fn run_with_name ( & self , _: & ' static str , future : Pin < Box < dyn Future < Output = ( ) > + Send > > ) {
86+ let _ = self . tx . try_send ( future) ;
87+ }
8288}
8389
84- fn make_litep2p ( ) -> ( Litep2p , TaskExecutor , Box < dyn Stream < Item = PingEvent > + Send + Unpin > ) {
85- let ( executor, sender) = TaskExecutor :: new ( ) ;
86- let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
87-
88- let litep2p = Litep2p :: new (
89- ConfigBuilder :: new ( )
90- . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
91- . with_tcp ( TcpConfig {
92- listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
93- ..Default :: default ( )
94- } )
95- . with_libp2p_ping ( ping_config)
96- . build ( ) ,
97- )
98- . unwrap ( ) ;
99-
100- ( litep2p, executor, ping_event_stream)
90+ fn make_litep2p ( ) -> (
91+ Litep2p ,
92+ TaskExecutor ,
93+ Box < dyn Stream < Item = PingEvent > + Send + Unpin > ,
94+ ) {
95+ let ( executor, sender) = TaskExecutor :: new ( ) ;
96+ let ( ping_config, ping_event_stream) = PingConfig :: default ( ) ;
97+
98+ let litep2p = Litep2p :: new (
99+ ConfigBuilder :: new ( )
100+ . with_executor ( Arc :: new ( TaskExecutorHandle { tx : sender. clone ( ) } ) )
101+ . with_tcp ( TcpConfig {
102+ listen_addresses : vec ! [ "/ip6/::1/tcp/0" . parse( ) . unwrap( ) ] ,
103+ ..Default :: default ( )
104+ } )
105+ . with_libp2p_ping ( ping_config)
106+ . build ( ) ,
107+ )
108+ . unwrap ( ) ;
109+
110+ ( litep2p, executor, ping_event_stream)
101111}
102112
103113#[ tokio:: main]
104114async fn main ( ) {
105- let _ = tracing_subscriber:: fmt ( )
106- . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
107- . try_init ( ) ;
108-
109- // create two identical litep2ps
110- let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
111- let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
112-
113- // dial `litep2p1`
114- litep2p2
115- . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
116- . await
117- . unwrap ( ) ;
118-
119- tokio:: spawn ( async move {
120- loop {
121- tokio:: select! {
122- _ = executor1. next( ) => { }
123- _ = litep2p1. next_event( ) => { } ,
124- _ = ping_event_stream1. next( ) => { } ,
125- }
126- }
127- } ) ;
128-
129- // poll litep2p, task executor and ping event stream all together
130- //
131- // since a custom task executor was provided, it's now the user's responsibility
132- // to actually make sure to poll those futures so that litep2p can make progress
133- loop {
134- tokio:: select! {
135- _ = executor2. next( ) => { }
136- _ = litep2p2. next_event( ) => { } ,
137- event = ping_event_stream2. next( ) => match event {
138- Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
139- "ping time with {peer:?}: {ping:?}"
140- ) ,
141- _ => { }
142- }
143- }
144- }
115+ let _ = tracing_subscriber:: fmt ( )
116+ . with_env_filter ( tracing_subscriber:: EnvFilter :: from_default_env ( ) )
117+ . try_init ( ) ;
118+
119+ // create two identical litep2ps
120+ let ( mut litep2p1, mut executor1, mut ping_event_stream1) = make_litep2p ( ) ;
121+ let ( mut litep2p2, mut executor2, mut ping_event_stream2) = make_litep2p ( ) ;
122+
123+ // dial `litep2p1`
124+ litep2p2
125+ . dial_address ( litep2p1. listen_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) )
126+ . await
127+ . unwrap ( ) ;
128+
129+ tokio:: spawn ( async move {
130+ loop {
131+ tokio:: select! {
132+ _ = executor1. next( ) => { }
133+ _ = litep2p1. next_event( ) => { } ,
134+ _ = ping_event_stream1. next( ) => { } ,
135+ }
136+ }
137+ } ) ;
138+
139+ // poll litep2p, task executor and ping event stream all together
140+ //
141+ // since a custom task executor was provided, it's now the user's responsibility
142+ // to actually make sure to poll those futures so that litep2p can make progress
143+ loop {
144+ tokio:: select! {
145+ _ = executor2. next( ) => { }
146+ _ = litep2p2. next_event( ) => { } ,
147+ event = ping_event_stream2. next( ) => match event {
148+ Some ( PingEvent :: Ping { peer, ping } ) => tracing:: info!(
149+ "ping time with {peer:?}: {ping:?}"
150+ ) ,
151+ _ => { }
152+ }
153+ }
154+ }
145155}
0 commit comments