Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add combine_throttle operator #711

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
- hooks:
- id: isort
repo: https://github.com/timothycrosley/isort
rev: 5.10.1
rev: 5.12.0
- hooks:
- id: black
repo: https://github.com/psf/black
Expand Down
38 changes: 38 additions & 0 deletions reactivex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@ def combine_latest(*__sources: Observable[Any]) -> Observable[Any]:
return combine_latest_(*__sources)


def combine_throttle(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"""Merges the specified observable sequences into one observable
sequence by creating a result whenever all of the
observable sequences have produced an element at a corresponding
index. Faster observables, that emits events more frequently, are
throttled, so that they match speed of slower observables.
Speed of emitting items matches speed of slowest source observable.
It is somewhat similar to :func:`reactivex.zip` operator, returns tuple,
but items of faster observables are dropped, so that only latest values are
at each index.
It is also similar to :func:`reactivex.combine_latest`, but emits new item
only when all sources produce new item. Only latest items are included in
resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`.

.. marble::
:alt: combine_throttle

--1---2-3--------4---|
-a-------b--c-d------|
[ combine_throttle() ]
--1,a----3,b-----4,d-|

Example:
>>> res = combine_throttle(obs1, obs2)

Args:
args: Observable sources to combine.

Returns:
An observable sequence containing the result of combining
elements of the sources as tuple.
"""
from .observable.combinethrottle import combine_throttle_

return combine_throttle_(*args)


def concat(*sources: Observable[_T]) -> Observable[_T]:
"""Concatenates all of the specified observable sequences.

Expand Down Expand Up @@ -1303,6 +1340,7 @@ def zip(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"catch_with_iterable",
"create",
"combine_latest",
"combine_throttle",
"compose",
"concat",
"concat_with_iterable",
Expand Down
76 changes: 76 additions & 0 deletions reactivex/observable/combinethrottle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from asyncio import Future
from threading import RLock
from typing import Any, Callable, List, Optional, Tuple

from reactivex import Observable, abc, from_future
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal import synchronized


def combine_throttle_(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"""Merges the specified observable sequences into one observable
sequence by creating a tuple whenever all of the observable sequences
have produced an element at a corresponding index.

Example:
>>> res = combine_throttle(source)

Args:
args: Observable sources to combine_throttle.

Returns:
An observable sequence containing the result of combining
elements of the sources as a tuple.
"""

n = len(args)

sources = list(args)

def subscribe(
observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
) -> CompositeDisposable:

flags = (1 << (n - 1)) & 0 # Reserve n zero bits.
full_mask = 1 << (n - 1)
full_mask |= full_mask - 1 # Create mask with n 1 bits.
lock = RLock()

results: List[None] = [None] * n

def create_on_next(i: int) -> Callable[[Any], None]:
@synchronized(lock)
def on_next(item: Any) -> None:
nonlocal flags
results[i] = item
flags |= 1 << i
if flags == full_mask:
flags = 0
observer.on_next(tuple(results))

return on_next

subscriptions: List[abc.DisposableBase] = []

for i in range(len(sources)):
source: Observable[Any] = sources[i]
if isinstance(source, Future):
source = from_future(source)

sad = SingleAssignmentDisposable()

sad.disposable = source.subscribe(
create_on_next(i),
observer.on_error,
observer.on_completed,
scheduler=scheduler,
)

subscriptions.append(sad)

return CompositeDisposable(subscriptions)

return Observable(subscribe=subscribe)


__all__ = ["combine_throttle_"]
39 changes: 39 additions & 0 deletions reactivex/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,45 @@ def combine_latest(
return combine_latest_(*others)


def combine_throttle(
*args: Observable[Any],
) -> Callable[[Observable[Any]], Observable[Any]]:
"""Merges the specified observable sequences into one observable
sequence by creating a result whenever all of the
observable sequences have produced an element at a corresponding
index. Faster observables, that emits events more frequently, are
throttled, so that they match speed of slower observables.
Speed of emitting items matches speed of slowest source observable.
It is somewhat similar to :func:`reactivex.zip` operator, returns tuple,
but items of faster observables are dropped, so that only latest values are
at each index.
It is also similar to :func:`reactivex.combine_latest`, but emits new item
only when all sources produce new item. Only latest items are included in
resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`.

.. marble::
:alt: combine_throttle

--1---2-3--------4---|
-a-------b--c-d------|
[ combine_throttle() ]
--1,a----3,b-----4,d-|

Example:
>>> res = combine_throttle(obs1, obs2)

Args:
args: Observable sources to combine.

Returns:
An observable sequence containing the result of combining
elements of the sources as tuple.
"""
from ._combinethrottle import combine_throttle_

return combine_throttle_(*args)


def concat(*sources: Observable[_T]) -> Callable[[Observable[_T]], Observable[_T]]:
"""Concatenates all the observable sequences.

Expand Down
31 changes: 31 additions & 0 deletions reactivex/operators/_combinethrottle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Any, Callable, Tuple

import reactivex
from reactivex import Observable


def combine_throttle_(
*args: Observable[Any],
) -> Callable[[Observable[Any]], Observable[Tuple[Any, ...]]]:
def _combine_throttle(source: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"""Merges the specified observable sequences into one observable
sequence by creating a tuple whenever all of the observable sequences
have produced an element at a corresponding index.

Example:
>>> res = combine_throttle(source)

Args:
args: Observable sources to combine_throttle.

Returns:
An observable sequence containing the result of combining
elements of the sources as a tuple.
"""

return reactivex.combine_throttle(source, *args)

return _combine_throttle


__all__ = ["combine_throttle_"]
Loading
Loading