diff --git a/asgiref/sync.py b/asgiref/sync.py index 3710a7f1..33357e7f 100644 --- a/asgiref/sync.py +++ b/asgiref/sync.py @@ -19,6 +19,9 @@ contextvars = None +SYNC_POOL_WORKER_COUNT = 10 # TODO: this is random for testing now + + def _restore_context(context): # Check for changes in contextvars, and set them to the current # context for downstream consumers @@ -84,9 +87,7 @@ async def __aexit__(self, exc, value, tb): if not self.token: return - executor = SyncToAsync.context_to_thread_executor.pop(self, None) - if executor: - executor.shutdown() + SyncToAsync.release_executor(self) SyncToAsync.thread_sensitive_context.reset(self.token) else: @@ -343,6 +344,8 @@ class SyncToAsync: # Single-thread executor for thread-sensitive code single_thread_executor = ThreadPoolExecutor(max_workers=1) + executor_pool = [] + # Maintain a contextvar for the current execution context. Optionally used # for thread sensitive mode. if sys.version_info >= (3, 7): @@ -407,7 +410,10 @@ async def __call__(self, *args, **kwargs): executor = self.context_to_thread_executor[thread_sensitive_context] else: # Create new thread executor in current context - executor = ThreadPoolExecutor(max_workers=1) + try: + executor = self.executor_pool.pop() + except IndexError: + executor = ThreadPoolExecutor(max_workers=1) self.context_to_thread_executor[thread_sensitive_context] = executor elif self.deadlock_context and self.deadlock_context.get(False): raise RuntimeError( @@ -506,6 +512,16 @@ def get_current_task(): except RuntimeError: return None + @classmethod + def release_executor(cls, obj): + executor = cls.context_to_thread_executor.pop(obj, None) + if executor is None: + return + if len(cls.executor_pool) < SYNC_POOL_WORKER_COUNT: + cls.executor_pool.append(executor) + else: + executor.shutdown() + # Lowercase aliases (and decorator friendliness) async_to_sync = AsyncToSync