Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timer second subscription #698

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions reactivex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,9 @@ def timer(
[ timer(2) ]
--0-|

[ timer(2, 4) ]
--0----1----2--

Examples:
>>> res = reactivex.timer(datetime(...))
>>> res = reactivex.timer(datetime(...), 0.1)
Expand Down
10 changes: 5 additions & 5 deletions reactivex/observable/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ def subscribe(
observer: abc.ObserverBase[int], scheduler_: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
nonlocal duetime
due_time = duetime

if not isinstance(duetime, datetime):
duetime = _scheduler.now + _scheduler.to_timedelta(duetime)
if not isinstance(due_time, datetime):
due_time = _scheduler.now + _scheduler.to_timedelta(due_time)

p = max(0.0, _scheduler.to_seconds(period))
mad = MultipleAssignmentDisposable()
dt = duetime
dt = due_time
count = 0

def action(scheduler: abc.SchedulerBase, state: Any) -> None:
Expand Down Expand Up @@ -107,7 +107,7 @@ def action(count: Optional[int] = None) -> Optional[int]:
return None

if not isinstance(_scheduler, PeriodicScheduler):
raise ValueError("Sceduler must be PeriodicScheduler")
raise ValueError("Scheduler must be PeriodicScheduler")
return _scheduler.schedule_periodic(period, action, state=0)

return Observable(subscribe)
Expand Down
90 changes: 90 additions & 0 deletions tests/test_observable/test_timer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import unittest

import reactivex
from reactivex import operators
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
Expand Down Expand Up @@ -126,3 +127,92 @@ def create():

results = scheduler.start(create)
assert results.messages == [on_next(500, 0), on_next(800, 1)]

def test_periodic_timer_repeat(self):
scheduler = TestScheduler()
t = reactivex.timer(duetime=130, period=200, scheduler=scheduler)

def create():
return t.pipe(operators.take(3), operators.repeat())

results = scheduler.start(create)
assert results.messages == [
on_next(330, 0),
on_next(530, 1),
on_next(730, 2),
on_next(860, 0),
]

def test_periodic_timer_repeat_with_absolute_datetime(self):
scheduler = TestScheduler()
t = reactivex.timer(
duetime=scheduler.to_datetime(360), period=200, scheduler=scheduler
) # here we have an absolute first value, so on second subscription, the timer should emit immediately

def create():
return t.pipe(operators.take(3), operators.repeat())

results = scheduler.start(create)
assert results.messages == [
on_next(360, 0),
on_next(560, 1),
on_next(760, 2),
on_next(
760, 0
), # our duetime is absolute and in the past so new sub emits immediately
on_next(960, 1),
]

def test_periodic_timer_repeat_with_relative_timespan(self):
scheduler = TestScheduler()
t = reactivex.timer(
duetime=scheduler.to_timedelta(130),
period=scheduler.to_timedelta(250),
scheduler=scheduler,
)

def create():
return t.pipe(operators.take(3), operators.repeat())

results = scheduler.start(create)
assert results.messages == [
on_next(330, 0),
on_next(580, 1),
on_next(830, 2),
on_next(960, 0),
]

def test_periodic_timer_second_subscription(self):
scheduler = TestScheduler()
t = reactivex.timer(duetime=200, period=300, scheduler=scheduler)

def create():
return reactivex.merge(
t.pipe(operators.map(lambda x: (x, "first"))),
reactivex.concat(reactivex.timer(100, scheduler=scheduler), t).pipe(
operators.map(lambda x: (x, "second"))
),
)

results = scheduler.start(create)
assert results.messages == [
on_next(300, (0, "second")),
on_next(400, (0, "first")),
on_next(500, (0, "second")),
on_next(700, (1, "first")),
on_next(800, (1, "second")),
]

def test_one_off_timer_repeat(self):
scheduler = TestScheduler()
t = reactivex.timer(duetime=230, scheduler=scheduler)

def create():
return t.pipe(operators.repeat())

results = scheduler.start(create)
assert results.messages == [
on_next(430, 0),
on_next(660, 0),
on_next(890, 0),
]
Loading