Skip to content

Commit eec49cb

Browse files
committed
Add switch_map operator and equivalent starred and indexed
This draws from the definition in rxjs and maintains parity with the map operator and its variants
1 parent 3130ffc commit eec49cb

File tree

4 files changed

+1234
-0
lines changed

4 files changed

+1234
-0
lines changed

reactivex/operators/__init__.py

+201
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
Observable,
2727
abc,
2828
compose,
29+
of,
2930
typing,
3031
)
3132
from reactivex.internal.basic import identity
@@ -3325,6 +3326,202 @@ def switch_latest() -> Callable[
33253326
return switch_latest_()
33263327

33273328

3329+
def switch_map(
3330+
mapper: Optional[Mapper[_T1, Observable[_T2]]] = None
3331+
) -> Callable[[Observable[_T1]], Observable[_T2]]:
3332+
"""
3333+
The switch_map operator.
3334+
3335+
Project each element of an observable sequence into a new observable.
3336+
3337+
.. marble::
3338+
:alt: switch_map
3339+
3340+
---1---2---3--->
3341+
[ switch_map(i: of(i, i ** 2, i ** 3)) ]
3342+
---1---1---1---2---4---8---3---9---27--->
3343+
3344+
Example:
3345+
>>> switch_map(lambda value: of(value, value // 2))
3346+
3347+
Args:
3348+
mapper: A transform function to apply to each source element.
3349+
3350+
Returns:
3351+
A partially applied operator function that takes an observable
3352+
source and returns an observable sequence whose elements are
3353+
each element of the result of invoking the transform function
3354+
on each element of the source.
3355+
"""
3356+
from ._switch_map import switch_map_
3357+
3358+
return switch_map_(mapper)
3359+
3360+
3361+
def switch_map_indexed(
3362+
mapper_indexed: Optional[MapperIndexed[_T1, Observable[_T2]]] = None
3363+
) -> Callable[[Observable[_T1]], Observable[_T2]]:
3364+
"""
3365+
The switch_map_indexed operator.
3366+
3367+
Project each element of an observable sequence into a new observable
3368+
by incorporating the element's index.
3369+
3370+
.. marble::
3371+
:alt: switch_map_indexed
3372+
3373+
---1-----------2-----------3----------->
3374+
[ switch_map_indexed(i,id: of(i, i ** 2, i + id)) ]
3375+
---1---1---1---2---4---3---3---9---5--->
3376+
3377+
Example:
3378+
>>> switch_map_indexed(lambda value, index: of(value, value // 2))
3379+
3380+
Args:
3381+
mapper_indexed: A transform function to apply to each source
3382+
element. The second parameter of the function represents
3383+
the index of the source element.
3384+
3385+
Returns:
3386+
A partially applied operator function that takes an observable
3387+
source and returns an observable sequence whose elements are
3388+
each element of the result of invoking the transform function
3389+
on each element of the source.
3390+
"""
3391+
from ._switch_map import switch_map_indexed_
3392+
3393+
return switch_map_indexed_(mapper_indexed)
3394+
3395+
3396+
@overload
3397+
def switch_starmap(
3398+
mapper: Callable[[_A, _B], Observable[_T]]
3399+
) -> Callable[[Observable[Tuple[_A, _B]]], Observable[_T]]:
3400+
...
3401+
3402+
3403+
@overload
3404+
def switch_starmap(
3405+
mapper: Callable[[_A, _B, _C], Observable[_T]]
3406+
) -> Callable[[Observable[Tuple[_A, _B, _C]]], Observable[_T]]:
3407+
...
3408+
3409+
3410+
@overload
3411+
def switch_starmap(
3412+
mapper: Callable[[_A, _B, _C, _D], Observable[_T]]
3413+
) -> Callable[[Observable[Tuple[_A, _B, _C, _D]]], Observable[_T]]:
3414+
...
3415+
3416+
3417+
def switch_starmap(
3418+
mapper: Optional[Callable[..., Observable[Any]]] = None
3419+
) -> Callable[[Observable[Any]], Observable[Any]]:
3420+
"""The switch_starmap operator.
3421+
3422+
Unpack arguments grouped as tuple elements of an observable sequence
3423+
and return an observable sequence whose values are each element of
3424+
the observable returned by invoking the mapper function with star
3425+
applied on unpacked elements as positional arguments.
3426+
3427+
Use instead of `switch_map()` when the the arguments to the mapper is
3428+
grouped as tuples and the mapper function takes multiple arguments.
3429+
3430+
.. marble::
3431+
:alt: switch_starmap
3432+
3433+
----1,2-------3,4---------|
3434+
[ switch_starmap(of) ]
3435+
----1----2----3----4------|
3436+
3437+
Example:
3438+
>>> switch_starmap(lambda x, y: of(x + y, x * y))
3439+
3440+
Args:
3441+
mapper: A transform function to invoke with unpacked elements
3442+
as arguments.
3443+
3444+
Returns:
3445+
An operator function that takes an observable source and returns
3446+
an observable sequence whose values are each element of the
3447+
observable returned by invoking the mapper function with the
3448+
unpacked elements of the source.
3449+
"""
3450+
3451+
if mapper is None:
3452+
mapper = of
3453+
3454+
def starred(values: Tuple[Any, ...]) -> Observable[Any]:
3455+
return mapper(*values)
3456+
3457+
return compose(switch_map(starred))
3458+
3459+
3460+
@overload
3461+
def switch_starmap_indexed(
3462+
mapper: Callable[[_A, int], Observable[_T]]
3463+
) -> Callable[[Observable[_A]], Observable[_T]]:
3464+
...
3465+
3466+
3467+
@overload
3468+
def switch_starmap_indexed(
3469+
mapper: Callable[[_A, _B, int], Observable[_T]]
3470+
) -> Callable[[Observable[Tuple[_A, _B]]], Observable[_T]]:
3471+
...
3472+
3473+
3474+
@overload
3475+
def switch_starmap_indexed(
3476+
mapper: Callable[[_A, _B, _C, int], Observable[_T]]
3477+
) -> Callable[[Observable[Tuple[_A, _B, _C]]], Observable[_T]]:
3478+
...
3479+
3480+
3481+
@overload
3482+
def switch_starmap_indexed(
3483+
mapper: Callable[[_A, _B, _C, _D, int], Observable[_T]]
3484+
) -> Callable[[Observable[Tuple[_A, _B, _C, _D]]], Observable[_T]]:
3485+
...
3486+
3487+
3488+
def switch_starmap_indexed(
3489+
mapper: Optional[Callable[..., Observable[Any]]] = None
3490+
) -> Callable[[Observable[Any]], Observable[Any]]:
3491+
"""Variant of :func:`switch_starmap` which accepts an indexed mapper.
3492+
3493+
.. marble::
3494+
:alt: switch_starmap_indexed
3495+
3496+
------1,2----------3,4-----------|
3497+
[ switch_starmap_indexed(of) ]
3498+
------1---2---0----3---4---1-----|
3499+
3500+
Example:
3501+
>>> switch_starmap_indexed(lambda x, y, i: of(x + y + i, x * y - i))
3502+
3503+
Args:
3504+
mapper: A transform function to invoke with unpacked elements
3505+
as arguments.
3506+
3507+
Returns:
3508+
An operator function that takes an observable source and returns
3509+
an observable sequence whose values are each element of the
3510+
observable returned by invoking the mapper function with the
3511+
unpacked elements of the source.
3512+
"""
3513+
from ._switch_map import switch_map_
3514+
3515+
if mapper is None:
3516+
return compose(of)
3517+
3518+
def starred(values: Tuple[Any, ...]) -> Observable[Any]:
3519+
assert mapper # mypy is paranoid
3520+
return mapper(*values)
3521+
3522+
return compose(switch_map_(starred))
3523+
3524+
33283525
def take(count: int) -> Callable[[Observable[_T]], Observable[_T]]:
33293526
"""Returns a specified number of contiguous elements from the start
33303527
of an observable sequence.
@@ -4272,6 +4469,10 @@ def zip_with_iterable(
42724469
"subscribe_on",
42734470
"sum",
42744471
"switch_latest",
4472+
"switch_map",
4473+
"switch_map_indexed",
4474+
"switch_starmap",
4475+
"switch_starmap_indexed",
42754476
"take",
42764477
"take_last",
42774478
"take_last_buffer",

reactivex/operators/_switch_map.py

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from typing import Callable, Optional, TypeVar, cast
2+
3+
from reactivex import Observable, abc, compose, of
4+
from reactivex import operators as ops
5+
from reactivex.internal import infinite
6+
from reactivex.typing import Mapper, MapperIndexed
7+
8+
_T1 = TypeVar("_T1")
9+
_T2 = TypeVar("_T2")
10+
11+
12+
def switch_map_(
13+
mapper: Optional[Mapper[_T1, Observable[_T2]]] = None
14+
) -> Callable[[Observable[_T1]], Observable[_T2]]:
15+
_mapper = mapper or cast(Mapper[_T1, Observable[_T2]], of)
16+
17+
def switch_map(source: Observable[_T1]) -> Observable[_T2]:
18+
"""
19+
Partially applied switch_map operator.
20+
21+
Project each element of an observable sequence into a new observable
22+
by incorporating the element's index.
23+
24+
Example:
25+
>>> switch_map(source)
26+
27+
Args:
28+
source: The observable source to transform.
29+
30+
Returns:
31+
Returns an observable sequence whose elements are each element of
32+
the result of invoking the transform function on each element of the
33+
source.
34+
"""
35+
36+
inner_observer: Optional[abc.DisposableBase] = None
37+
is_complete = False
38+
39+
def subscribe(
40+
obv: abc.ObserverBase[_T2], scheduler: Optional[abc.SchedulerBase] = None
41+
) -> abc.DisposableBase:
42+
nonlocal inner_observer, is_complete
43+
44+
def check_complete() -> None:
45+
if is_complete and not inner_observer:
46+
obv.on_completed()
47+
48+
def on_next(value: _T1) -> None:
49+
nonlocal inner_observer
50+
51+
def inner_complete():
52+
nonlocal inner_observer
53+
inner_observer = None
54+
check_complete()
55+
56+
if inner_observer:
57+
inner_observer.dispose()
58+
59+
try:
60+
mapped_source = _mapper(value)
61+
except Exception as err:
62+
obv.on_error(err)
63+
else:
64+
inner_observer = mapped_source.subscribe(
65+
obv.on_next, obv.on_error, inner_complete, scheduler=scheduler
66+
)
67+
68+
def source_complete():
69+
nonlocal is_complete
70+
is_complete = True
71+
check_complete()
72+
73+
return source.subscribe(
74+
on_next, obv.on_error, source_complete, scheduler=scheduler
75+
)
76+
77+
return Observable(subscribe)
78+
79+
return switch_map
80+
81+
82+
def switch_map_indexed_(
83+
mapper_indexed: Optional[MapperIndexed[_T1, Observable[_T2]]] = None
84+
) -> Callable[[Observable[_T1]], Observable[_T2]]:
85+
def _of(value: _T1, _: int) -> Observable[_T2]:
86+
return of(cast(_T2, value))
87+
88+
_mapper_indexed = mapper_indexed or cast(MapperIndexed[_T1, Observable[_T2]], _of)
89+
90+
return compose(
91+
ops.zip_with_iterable(infinite()),
92+
ops.switch_starmap_indexed(_mapper_indexed),
93+
)
94+
95+
96+
__all__ = ["switch_map_", "switch_map_indexed_"]

0 commit comments

Comments
 (0)