diff --git a/reactivex/__init__.py b/reactivex/__init__.py index a707f978..9a2761fd 100644 --- a/reactivex/__init__.py +++ b/reactivex/__init__.py @@ -232,6 +232,43 @@ def combine_latest(*__sources: Observable[Any]) -> Observable[Any]: return combine_latest_(*__sources) +def combine_throttle(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]: + """Merges the specified observable sequences into one observable + sequence by creating a result whenever all of the + observable sequences have produced an element at a corresponding + index. Faster observables, that emits events more frequently, are + throttled, so that they match speed of slower observables. + Speed of emitting items matches speed of slowest source observable. + It is somewhat similar to :func:`reactivex.zip` operator, returns tuple, + but items of faster observables are dropped, so that only latest values are + at each index. + It is also similar to :func:`reactivex.combine_latest`, but emits new item + only when all sources produce new item. Only latest items are included in + resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`. + + .. marble:: + :alt: combine_throttle + + --1---2-3--------4---| + -a-------b--c-d------| + [ combine_throttle() ] + --1,a----3,b-----4,d-| + + Example: + >>> res = combine_throttle(obs1, obs2) + + Args: + args: Observable sources to combine. + + Returns: + An observable sequence containing the result of combining + elements of the sources as tuple. + """ + from .observable.combinethrottle import combine_throttle_ + + return combine_throttle_(*args) + + def concat(*sources: Observable[_T]) -> Observable[_T]: """Concatenates all of the specified observable sequences. @@ -1303,6 +1340,7 @@ def zip(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]: "catch_with_iterable", "create", "combine_latest", + "combine_throttle", "compose", "concat", "concat_with_iterable", diff --git a/reactivex/observable/combinethrottle.py b/reactivex/observable/combinethrottle.py new file mode 100644 index 00000000..9569f4b1 --- /dev/null +++ b/reactivex/observable/combinethrottle.py @@ -0,0 +1,76 @@ +from asyncio import Future +from threading import RLock +from typing import Any, Callable, List, Optional, Tuple + +from reactivex import Observable, abc, from_future +from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable +from reactivex.internal import synchronized + + +def combine_throttle_(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]: + """Merges the specified observable sequences into one observable + sequence by creating a tuple whenever all of the observable sequences + have produced an element at a corresponding index. + + Example: + >>> res = combine_throttle(source) + + Args: + args: Observable sources to combine_throttle. + + Returns: + An observable sequence containing the result of combining + elements of the sources as a tuple. + """ + + n = len(args) + + sources = list(args) + + def subscribe( + observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None + ) -> CompositeDisposable: + + flags = (1 << (n - 1)) & 0 # Reserve n zero bits. + full_mask = 1 << (n - 1) + full_mask |= full_mask - 1 # Create mask with n 1 bits. + lock = RLock() + + results: List[None] = [None] * n + + def create_on_next(i: int) -> Callable[[Any], None]: + @synchronized(lock) + def on_next(item: Any) -> None: + nonlocal flags + results[i] = item + flags |= 1 << i + if flags == full_mask: + flags = 0 + observer.on_next(tuple(results)) + + return on_next + + subscriptions: List[abc.DisposableBase] = [] + + for i in range(len(sources)): + source: Observable[Any] = sources[i] + if isinstance(source, Future): + source = from_future(source) + + sad = SingleAssignmentDisposable() + + sad.disposable = source.subscribe( + create_on_next(i), + observer.on_error, + observer.on_completed, + scheduler=scheduler, + ) + + subscriptions.append(sad) + + return CompositeDisposable(subscriptions) + + return Observable(subscribe=subscribe) + + +__all__ = ["combine_throttle_"] diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 76bd13c0..57c28cf9 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -430,6 +430,45 @@ def combine_latest( return combine_latest_(*others) +def combine_throttle( + *args: Observable[Any], +) -> Callable[[Observable[Any]], Observable[Any]]: + """Merges the specified observable sequences into one observable + sequence by creating a result whenever all of the + observable sequences have produced an element at a corresponding + index. Faster observables, that emits events more frequently, are + throttled, so that they match speed of slower observables. + Speed of emitting items matches speed of slowest source observable. + It is somewhat similar to :func:`reactivex.zip` operator, returns tuple, + but items of faster observables are dropped, so that only latest values are + at each index. + It is also similar to :func:`reactivex.combine_latest`, but emits new item + only when all sources produce new item. Only latest items are included in + resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`. + + .. marble:: + :alt: combine_throttle + + --1---2-3--------4---| + -a-------b--c-d------| + [ combine_throttle() ] + --1,a----3,b-----4,d-| + + Example: + >>> res = combine_throttle(obs1, obs2) + + Args: + args: Observable sources to combine. + + Returns: + An observable sequence containing the result of combining + elements of the sources as tuple. + """ + from ._combinethrottle import combine_throttle_ + + return combine_throttle_(*args) + + def concat(*sources: Observable[_T]) -> Callable[[Observable[_T]], Observable[_T]]: """Concatenates all the observable sequences. diff --git a/reactivex/operators/_combinethrottle.py b/reactivex/operators/_combinethrottle.py new file mode 100644 index 00000000..0ca2e8d3 --- /dev/null +++ b/reactivex/operators/_combinethrottle.py @@ -0,0 +1,31 @@ +from typing import Any, Callable, Tuple + +import reactivex +from reactivex import Observable + + +def combine_throttle_( + *args: Observable[Any], +) -> Callable[[Observable[Any]], Observable[Tuple[Any, ...]]]: + def _combine_throttle(source: Observable[Any]) -> Observable[Tuple[Any, ...]]: + """Merges the specified observable sequences into one observable + sequence by creating a tuple whenever all of the observable sequences + have produced an element at a corresponding index. + + Example: + >>> res = combine_throttle(source) + + Args: + args: Observable sources to combine_throttle. + + Returns: + An observable sequence containing the result of combining + elements of the sources as a tuple. + """ + + return reactivex.combine_throttle(source, *args) + + return _combine_throttle + + +__all__ = ["combine_throttle_"] diff --git a/tests/test_observable/test_combinethrottle.py b/tests/test_observable/test_combinethrottle.py new file mode 100644 index 00000000..51f67808 --- /dev/null +++ b/tests/test_observable/test_combinethrottle.py @@ -0,0 +1,288 @@ +import unittest +from typing import List + +import reactivex +from reactivex import operators as ops +from reactivex.observable.observable import Observable +from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.recorded import Recorded + +on_next = ReactiveTest.on_next +on_completed = ReactiveTest.on_completed +on_error = ReactiveTest.on_error +subscribe = ReactiveTest.subscribe +subscribed = ReactiveTest.subscribed +disposed = ReactiveTest.disposed +created = ReactiveTest.created + + +class TestCombineThrottle(unittest.TestCase): + def test_combine_throttle_never_never(self): + scheduler = TestScheduler() + o1 = reactivex.never() + o2 = reactivex.never() + + def create(): + return o1.pipe(ops.combine_throttle(o2)) + + results = scheduler.start(create) + assert results.messages == [] + + def test_combine_throttle_never_empty(self): + scheduler = TestScheduler() + msgs = [on_next(150, 1), on_completed(210)] + o1 = reactivex.never() + o2 = scheduler.create_hot_observable(msgs) + + def create(): + return o1.pipe(ops.combine_throttle(o2)) + + results = scheduler.start(create) + assert results.messages == [on_completed(210)] + + def test_combine_throttle_empty_empty(self): + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_completed(210)] + msgs2 = [on_next(150, 1), on_completed(210)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert results.messages == [on_completed(210)] + + def test_combine_throttle_empty_non_empty(self): + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_completed(210)] + msgs2 = [on_next(150, 1), on_next(215, 2), on_completed(220)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert results.messages == [on_completed(210)] + + def test_combine_throttle_non_empty_empty(self): + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_completed(210)] + msgs2 = [on_next(150, 1), on_next(215, 2), on_completed(220)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e2.pipe(ops.combine_throttle(e1)) + + results = scheduler.start(create) + assert results.messages == [on_completed(210)] + + def test_combine_throttle_never_non_empty(self): + scheduler = TestScheduler() + msgs = [on_next(150, 1), on_next(215, 2), on_completed(220)] + e1 = scheduler.create_hot_observable(msgs) + e2 = reactivex.never() + + def create(): + return e2.pipe(ops.combine_throttle(e1)) + + results = scheduler.start(create) + assert results.messages == [on_completed(220)] + + def test_combine_throttle_non_empty_never(self): + scheduler = TestScheduler() + msgs = [on_next(150, 1), on_next(215, 2), on_completed(220)] + e1 = scheduler.create_hot_observable(msgs) + e2 = reactivex.never() + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert results.messages == [on_completed(220)] + + def test_combine_throttle_non_empty_non_empty(self): + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)] + msgs2 = [on_next(150, 1), on_next(220, 3), on_completed(240)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert results.messages == [on_next(220, (2, 3)), on_completed(230)] + + def test_combine_throttle_empty_error(self): + ex = Exception("ex") + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_completed(230)] + msgs2 = [on_next(150, 1), on_error(220, ex)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert results.messages == [on_error(220, ex)] + + def test_combine_throttle_error_empty(self): + ex = Exception("ex") + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_completed(230)] + msgs2 = [on_next(150, 1), on_error(220, ex)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e2.pipe(ops.combine_throttle(e1)) + + results = scheduler.start(create) + assert results.messages == [on_error(220, ex)] + + def test_combine_throttle_never_error(self): + ex = Exception("ex") + scheduler = TestScheduler() + msgs2 = [on_next(150, 1), on_error(220, ex)] + e1 = reactivex.never() + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert results.messages == [on_error(220, ex)] + + def test_combine_throttle_error_never(self): + ex = Exception("ex") + scheduler = TestScheduler() + msgs2 = [on_next(150, 1), on_error(220, ex)] + e1 = reactivex.never() + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e2.pipe(ops.combine_throttle(e1)) + + results = scheduler.start(create) + assert results.messages == [on_error(220, ex)] + + def test_combine_throttle_error_error(self): + ex1 = Exception("ex1") + ex2 = Exception("ex2") + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_error(230, ex1)] + msgs2 = [on_next(150, 1), on_error(220, ex2)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e2.pipe(ops.combine_throttle(e1)) + + results = scheduler.start(create) + assert results.messages == [on_error(220, ex2)] + + def test_combine_throttle_some_error(self): + ex = Exception("ex") + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)] + msgs2 = [on_next(150, 1), on_error(220, ex)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert results.messages == [on_error(220, ex)] + + def test_combine_throttle_error_some(self): + ex = Exception("ex") + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)] + msgs2 = [on_next(150, 1), on_error(220, ex)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e2.pipe(ops.combine_throttle(e1)) + + results = scheduler.start(create) + assert results.messages == [on_error(220, ex)] + + def test_combine_throttle_different_speeds(self): + scheduler = TestScheduler() + msgs1 = [ + on_next(150, 1), + on_next(215, 2), + on_next(230, 3), + on_next(240, 4), + on_next(290, 5), + on_completed(310), + ] + msgs2 = [ + on_next(150, "a"), + on_next(210, "b"), + on_next(250, "c"), + on_next(270, "d"), + on_next(280, "e"), + on_completed(300), + ] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert e1.subscriptions == [subscribe(200, 300)] + assert results.messages == [ + on_next(215, (2, "b")), + on_next(250, (4, "c")), + on_next(290, (5, "e")), + on_completed(300), + ] + + def test_combine_throttle_one_after_other(self): + scheduler = TestScheduler() + msgs1 = [on_next(150, 1), on_next(215, 2), on_next(230, 3), on_completed(240)] + msgs2 = [on_next(250, "a"), on_next(260, "b"), on_completed(270)] + e1 = scheduler.create_hot_observable(msgs1) + e2 = scheduler.create_hot_observable(msgs2) + + def create(): + return e1.pipe(ops.combine_throttle(e2)) + + results = scheduler.start(create) + assert e1.subscriptions == [subscribe(200, 240)] + assert results.messages == [on_completed(240)] + + def test_combine_throttle_100_observables_with_linearly_increased_speeds(self): + scheduler = TestScheduler() + + obeservables: List[Observable[int]] = [] + all_msgs: List[List[Recorded[int]]] = [] + + for i in range(1, 101): + msgs: List[Recorded[int]] = [] + for j in range(0, 200, i): + msgs.append(on_next(201 + j, i)) + + msgs.append(on_completed(500)) + + obeservables.append(scheduler.create_hot_observable(msgs)) + all_msgs.append(msgs) + + def create(): + return obeservables[0].pipe(ops.combine_throttle(*obeservables[1:])) + + results = scheduler.start(create) + + assert results.messages == [ + on_next(201, tuple(range(1, 101))), + on_next(301, tuple(range(1, 101))), + on_completed(500), + ]