diff --git a/reactivex/subject/replaysubject.py b/reactivex/subject/replaysubject.py index 42118dbf2..84fa93af5 100644 --- a/reactivex/subject/replaysubject.py +++ b/reactivex/subject/replaysubject.py @@ -1,6 +1,7 @@ import sys +from collections import deque from datetime import datetime, timedelta -from typing import Any, List, NamedTuple, Optional, TypeVar, cast +from typing import Any, Deque, NamedTuple, Optional, TypeVar, cast from reactivex.observer.scheduledobserver import ScheduledObserver from reactivex.scheduler import CurrentThreadScheduler @@ -56,7 +57,7 @@ def __init__( self.window = ( timedelta.max if window is None else self.scheduler.to_timedelta(window) ) - self.queue: List[QueueItem] = [] + self.queue: Deque[QueueItem] = deque() def _subscribe_core( self, @@ -84,10 +85,10 @@ def _subscribe_core( def _trim(self, now: datetime) -> None: while len(self.queue) > self.buffer_size: - self.queue.pop(0) + self.queue.popleft() while self.queue and (now - self.queue[0].interval) > self.window: - self.queue.pop(0) + self.queue.popleft() def _on_next_core(self, value: _T) -> None: """Notifies all subscribed observers with the value."""