From 312860c34a9e0105aa787f55dfc732c3106d45fe Mon Sep 17 00:00:00 2001 From: Robert Mahfoud Date: Sun, 25 May 2025 16:52:15 -0700 Subject: [PATCH 1/2] Give run a scheduler parameter --- reactivex/observable/observable.py | 4 ++-- reactivex/run.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/reactivex/observable/observable.py b/reactivex/observable/observable.py index 6e864da98..e540fe467 100644 --- a/reactivex/observable/observable.py +++ b/reactivex/observable/observable.py @@ -229,7 +229,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any: return pipe_(self, *operators) - def run(self) -> Any: + def run(self, scheduler: Optional[abc.SchedulerBase] = None) -> _T_out: """Run source synchronously. Subscribes to the observable source. Then blocks and waits for the @@ -249,7 +249,7 @@ def run(self) -> Any: """ from ..run import run - return run(self) + return run(self, scheduler) def __await__(self) -> Generator[Any, None, _T_out]: """Awaits the given observable. diff --git a/reactivex/run.py b/reactivex/run.py index 6e932b2ec..15b1950b9 100644 --- a/reactivex/run.py +++ b/reactivex/run.py @@ -1,17 +1,18 @@ import threading from typing import Optional, TypeVar, cast +from reactivex import abc from reactivex.internal.exceptions import SequenceContainsNoElementsError from reactivex.scheduler import NewThreadScheduler from .observable import Observable -scheduler = NewThreadScheduler() +_default_scheduler = NewThreadScheduler() _T = TypeVar("_T") -def run(source: Observable[_T]) -> _T: +def run(source: Observable[_T], scheduler: Optional[abc.SchedulerBase] = None) -> _T: """Run source synchronously. Subscribes to the observable source. Then blocks and waits for the @@ -32,6 +33,7 @@ def run(source: Observable[_T]) -> _T: Returns: The last element emitted from the observable. """ + scheduler = scheduler or _default_scheduler exception: Optional[Exception] = None latch = threading.Event() has_result = False From b1c109e4c86e5ac8657ab936cc441d754f56eeaa Mon Sep 17 00:00:00 2001 From: Robert Mahfoud Date: Sun, 26 Oct 2025 20:00:14 -0700 Subject: [PATCH 2/2] Add docstring --- reactivex/run.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/reactivex/run.py b/reactivex/run.py index 15b1950b9..72636f4c7 100644 --- a/reactivex/run.py +++ b/reactivex/run.py @@ -24,6 +24,8 @@ def run(source: Observable[_T], scheduler: Optional[abc.SchedulerBase] = None) - Args: source: Observable source to run. + scheduler: Optional scheduler to use for subscription. If not + specified, defaults to a NewThreadScheduler. Raises: SequenceContainsNoElementsError: if observable completes