@@ -35,7 +35,6 @@ use vss_client::util::retry::{
3535use vss_client:: util:: storable_builder:: { EntropySource , StorableBuilder } ;
3636
3737use crate :: io:: utils:: check_namespace_key_validity;
38- use crate :: runtime:: Runtime ;
3938
4039type CustomRetryPolicy = FilteredRetryPolicy <
4140 JitteredRetryPolicy <
@@ -52,14 +51,13 @@ pub struct VssStore {
5251 // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
5352 // operations aren't sensitive to the order of execution.
5453 next_version : AtomicU64 ,
55- runtime : Arc < Runtime > ,
5654 internal_runtime : Option < tokio:: runtime:: Runtime > ,
5755}
5856
5957impl VssStore {
6058 pub ( crate ) fn new (
6159 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
62- header_provider : Arc < dyn VssHeaderProvider > , runtime : Arc < Runtime > ,
60+ header_provider : Arc < dyn VssHeaderProvider > ,
6361 ) -> Self {
6462 let inner = Arc :: new ( VssStoreInner :: new ( base_url, store_id, vss_seed, header_provider) ) ;
6563 let next_version = AtomicU64 :: new ( 1 ) ;
@@ -77,7 +75,7 @@ impl VssStore {
7775 . unwrap ( ) ,
7876 ) ;
7977
80- Self { inner, next_version, runtime , internal_runtime }
78+ Self { inner, next_version, internal_runtime }
8179 }
8280
8381 // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -122,11 +120,7 @@ impl KVStoreSync for VssStore {
122120 let inner = Arc :: clone ( & self . inner ) ;
123121 let fut =
124122 async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ;
125- let spawned_fut = internal_runtime. spawn ( fut) ;
126- self . runtime . block_on ( async move { spawned_fut. await } ) . map_err ( |e| {
127- let msg = format ! ( "Failed to join read future: {}" , e) ;
128- Error :: new ( ErrorKind :: Other , msg)
129- } ) ?
123+ tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
130124 }
131125
132126 fn write (
@@ -156,11 +150,7 @@ impl KVStoreSync for VssStore {
156150 )
157151 . await
158152 } ;
159- let spawned_fut = internal_runtime. spawn ( fut) ;
160- self . runtime . block_on ( async move { spawned_fut. await } ) . map_err ( |e| {
161- let msg = format ! ( "Failed to join write future: {}" , e) ;
162- Error :: new ( ErrorKind :: Other , msg)
163- } ) ?
153+ tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
164154 }
165155
166156 fn remove (
@@ -189,11 +179,7 @@ impl KVStoreSync for VssStore {
189179 )
190180 . await
191181 } ;
192- let spawned_fut = internal_runtime. spawn ( fut) ;
193- self . runtime . block_on ( async move { spawned_fut. await } ) . map_err ( |e| {
194- let msg = format ! ( "Failed to join remove future: {}" , e) ;
195- Error :: new ( ErrorKind :: Other , msg)
196- } ) ?
182+ tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
197183 }
198184
199185 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
@@ -206,11 +192,7 @@ impl KVStoreSync for VssStore {
206192 let secondary_namespace = secondary_namespace. to_string ( ) ;
207193 let inner = Arc :: clone ( & self . inner ) ;
208194 let fut = async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } ;
209- let spawned_fut = internal_runtime. spawn ( fut) ;
210- self . runtime . block_on ( async move { spawned_fut. await } ) . map_err ( |e| {
211- let msg = format ! ( "Failed to join list future: {}" , e) ;
212- Error :: new ( ErrorKind :: Other , msg)
213- } ) ?
195+ tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
214196 }
215197}
216198
@@ -581,38 +563,33 @@ mod tests {
581563
582564 use rand:: distributions:: Alphanumeric ;
583565 use rand:: { thread_rng, Rng , RngCore } ;
584- use tokio:: runtime;
585566 use vss_client:: headers:: FixedHeaders ;
586567
587568 use super :: * ;
588569 use crate :: io:: test_utils:: do_read_write_remove_list_persist;
589570
590571 #[ test]
591572 fn vss_read_write_remove_list_persist ( ) {
592- let runtime = Arc :: new ( Runtime :: new ( ) . unwrap ( ) ) ;
593573 let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
594574 let mut rng = thread_rng ( ) ;
595575 let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
596576 let mut vss_seed = [ 0u8 ; 32 ] ;
597577 rng. fill_bytes ( & mut vss_seed) ;
598578 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
599- let vss_store =
600- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) . unwrap ( ) ;
579+ let vss_store = VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) ;
601580
602581 do_read_write_remove_list_persist ( & vss_store) ;
603582 }
604583
605584 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
606585 async fn vss_read_write_remove_list_persist_in_runtime_context ( ) {
607- let runtime = Arc :: new ( Runtime :: new ( ) . unwrap ( ) ) ;
608586 let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
609587 let mut rng = thread_rng ( ) ;
610588 let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
611589 let mut vss_seed = [ 0u8 ; 32 ] ;
612590 rng. fill_bytes ( & mut vss_seed) ;
613591 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
614- let vss_store =
615- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) . unwrap ( ) ;
592+ let vss_store = VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) ;
616593
617594 do_read_write_remove_list_persist ( & vss_store) ;
618595 drop ( vss_store)
0 commit comments