11use std:: cell:: RefCell ;
2- use std:: collections:: HashMap ;
2+ use std:: collections:: { HashMap , HashSet } ;
33use std:: future:: Future ;
44use std:: pin:: Pin ;
55use std:: sync:: {
@@ -14,8 +14,8 @@ use crate::{
1414 logging:: { error, info} ,
1515 set_state, timer, Address , BuildError , LazyLoadBlob , Message , Request , SendError ,
1616} ;
17- use futures_util:: task:: { waker_ref, ArcWake } ;
1817use futures_channel:: { mpsc, oneshot} ;
18+ use futures_util:: task:: { waker_ref, ArcWake } ;
1919use serde:: { Deserialize , Serialize } ;
2020use thiserror:: Error ;
2121use uuid:: Uuid ;
@@ -29,6 +29,7 @@ thread_local! {
2929 } ) ;
3030
3131 pub static RESPONSE_REGISTRY : RefCell <HashMap <String , Vec <u8 >>> = RefCell :: new( HashMap :: new( ) ) ;
32+ pub static CANCELLED_RESPONSES : RefCell <HashSet <String >> = RefCell :: new( HashSet :: new( ) ) ;
3233
3334 pub static APP_HELPERS : RefCell <AppHelpers > = RefCell :: new( AppHelpers {
3435 current_server: None ,
@@ -246,6 +247,7 @@ struct ResponseFuture {
246247 correlation_id : String ,
247248 // Capture HTTP context at creation time
248249 http_context : Option < HttpRequestContext > ,
250+ resolved : bool ,
249251}
250252
251253impl ResponseFuture {
@@ -257,6 +259,7 @@ impl ResponseFuture {
257259 Self {
258260 correlation_id,
259261 http_context,
262+ resolved : false ,
260263 }
261264 }
262265}
@@ -265,16 +268,18 @@ impl Future for ResponseFuture {
265268 type Output = Vec < u8 > ;
266269
267270 fn poll ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
268- let correlation_id = & self . correlation_id ;
271+ let this = self . get_mut ( ) ;
269272
270273 let maybe_bytes = RESPONSE_REGISTRY . with ( |registry| {
271274 let mut registry_mut = registry. borrow_mut ( ) ;
272- registry_mut. remove ( correlation_id)
275+ registry_mut. remove ( & this . correlation_id )
273276 } ) ;
274277
275278 if let Some ( bytes) = maybe_bytes {
279+ this. resolved = true ;
280+
276281 // Restore this future's captured context
277- if let Some ( ref context) = self . http_context {
282+ if let Some ( ref context) = this . http_context {
278283 APP_HELPERS . with ( |helpers| {
279284 helpers. borrow_mut ( ) . current_http_context = Some ( context. clone ( ) ) ;
280285 } ) ;
@@ -287,6 +292,23 @@ impl Future for ResponseFuture {
287292 }
288293}
289294
295+ impl Drop for ResponseFuture {
296+ fn drop ( & mut self ) {
297+ // We want to avoid cleaning up after successful responses
298+ if self . resolved {
299+ return ;
300+ }
301+
302+ RESPONSE_REGISTRY . with ( |registry| {
303+ registry. borrow_mut ( ) . remove ( & self . correlation_id ) ;
304+ } ) ;
305+
306+ CANCELLED_RESPONSES . with ( |set| {
307+ set. borrow_mut ( ) . insert ( self . correlation_id . clone ( ) ) ;
308+ } ) ;
309+ }
310+ }
311+
290312#[ derive( Debug , Clone , Serialize , Deserialize , Error ) ]
291313pub enum AppSendError {
292314 #[ error( "SendError: {0}" ) ]
0 commit comments