From 3fc77feff0685b8a383268153267d9b60f1cb403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20L=C3=B6ffler?= Date: Sat, 22 Jun 2024 14:28:17 +0300 Subject: [PATCH 1/2] Fix remote stream termination --- betfair_parser/stream.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/betfair_parser/stream.py b/betfair_parser/stream.py index 9104f51..0cb059b 100644 --- a/betfair_parser/stream.py +++ b/betfair_parser/stream.py @@ -177,6 +177,8 @@ def iter_changes(self, stream: io.RawIOBase) -> Iterable[ChangeMessageType]: while True: msg = self.esm.receive(stream) + if not msg: + return if isinstance(msg, ChangeMessageType): # type: ignore[arg-type,misc] yield msg @@ -191,6 +193,8 @@ def iter_changes_and_write( with open(path, "ab") as f: while True: raw_msg = stream.readline() + if not raw_msg: + return msg = self.esm.receive_bytes(raw_msg) if isinstance(msg, ChangeMessageType): # type: ignore[arg-type,misc] yield msg @@ -268,6 +272,8 @@ async def iter_changes_async(self, stream: AsyncStream) -> AsyncGenerator[Change while True: msg = self.esm.receive_bytes(await stream.readline()) + if not msg: + return if isinstance(msg, ChangeMessageType): # type: ignore[arg-type,misc] yield msg @@ -283,6 +289,8 @@ async def iter_changes_and_write_async( with open(path, "ab") as f: while True: raw_msg = await stream.readline() + if not raw_msg: + return msg = self.esm.receive_bytes(raw_msg) if isinstance(msg, ChangeMessageType): # type: ignore[arg-type,misc] yield msg From 78180d1975caa66e5102af62128dee4da4388b8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20L=C3=B6ffler?= Date: Sat, 22 Jun 2024 16:13:45 +0300 Subject: [PATCH 2/2] Testcase for stream termination added --- pyproject.toml | 2 +- tests/integration/test_stream.py | 52 +++++++++++++++++++ .../responses/streaming/mcm_samples.ndjson | 20 +++++++ 3 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 tests/resources/responses/streaming/mcm_samples.ndjson diff --git a/pyproject.toml b/pyproject.toml index fca56b8..cb2299c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,7 @@ requests = "^2.20" twine = "^4.0.2" [tool.codespell] -skip = "*.bz2,*.json,*.xml,poetry.lock" +skip = "*.bz2,*.json,*.ndjson,*.xml,poetry.lock" ignore-regex = ".*codespell-ignore$" [tool.mypy] diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 8245442..fdaabc5 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -23,6 +23,7 @@ ) from betfair_parser.stream import AsyncStream, ExchangeStream, StreamReader, changed_markets, create_stream_io from tests.integration.test_live import appconfig # noqa: F401 +from tests.resources import RESOURCES_DIR @pytest.fixture(scope="module") @@ -172,3 +173,54 @@ def test_stream_reader(session, iterations=15): assert volume for volume in runner_order_book.available_to_lay.values(): assert volume + + +class TerminatingStream: + def __init__(self, path, nlines): + self._iter = self.iterator(path, nlines) + + @staticmethod + def iterator(path, nlines): + yield b"""{"op":"connection","connectionId":"002-051134157842-432409"}""" + yield b"""{"op": "status", "id": 1000, "statusCode": "SUCCESS", "connectionClosed": false}""" + with open(path, "rb") as f: + yield from f.readlines()[:nlines] + while True: + # simulating the terminated connection + yield b"" + + def write(self, x): + """Ignore any write attempts from the connection handling.""" + + def read(self): + raise NotImplementedError() + + def readline(self): + return next(self._iter) + + +def test_iter_changes_stream_termination(nlines=10): + path = RESOURCES_DIR / "responses" / "streaming" / "mcm_samples.ndjson" + stream = TerminatingStream(path, nlines) + sr = StreamReader(None, None) + sr.subscribe(SUBSCRIPTIONS[0]) # type: ignore[arg-type] + + msgs = list(sr.iter_changes(stream)) # type: ignore[arg-type] + for msg in msgs: + assert isinstance(msg, MCM) + assert len(msgs) == nlines + + +def test_iter_changes_stream_and_write_termination(tmp_path, nlines=10): + path = RESOURCES_DIR / "responses" / "streaming" / "mcm_samples.ndjson" + out_path = tmp_path / "stream.ndjson" + stream = TerminatingStream(path, nlines) + sr = StreamReader(None, None) + sr.subscribe(SUBSCRIPTIONS[0]) # type: ignore[arg-type] + + msgs = list(sr.iter_changes_and_write(stream, out_path)) # type: ignore[arg-type] + for msg in msgs: + assert isinstance(msg, MCM) + assert len(msgs) == nlines + with open(out_path) as f: + assert len(f.readlines()) == nlines diff --git a/tests/resources/responses/streaming/mcm_samples.ndjson b/tests/resources/responses/streaming/mcm_samples.ndjson new file mode 100644 index 0000000..a50f238 --- /dev/null +++ b/tests/resources/responses/streaming/mcm_samples.ndjson @@ -0,0 +1,20 @@ +{"op":"mcm","id":1,"clk":"AMeYmgIA8oKwAgD++6oC","pt":1719014397705,"mc":[{"id":"1.230015799","rc":[{"atl":[[180,1.15]],"id":15072480}]}]} +{"op":"mcm","id":1,"clk":"AMeYmgIA8oKwAgCA/KoC","pt":1719014397707,"mc":[{"id":"1.230014723","rc":[{"atl":[[930,4.28]],"id":69077197},{"atl":[[600,4]],"id":69803917}]}]} +{"op":"mcm","id":1,"clk":"AM2YmgIA9IKwAgCF/KoC","pt":1719014397811,"mc":[{"id":"1.230014772","rc":[{"atb":[[6.6,420.76]],"id":45720431}]}]} +{"op":"mcm","id":1,"clk":"ANGYmgIA9oKwAgCN/KoC","pt":1719014397911,"mc":[{"id":"1.230014772","rc":[{"atb":[[30,8.3]],"id":30758988},{"atl":[[8.4,5]],"ltp":8.4,"tv":3298.35,"id":2688},{"atb":[[18,22.47]],"id":44575144},{"atb":[[9.2,12.04]],"id":44081484},{"atb":[[8.2,11.57]],"id":26340381},{"atb":[[22,7.28]],"id":19186431},{"atl":[[19,3.08]],"ltp":19,"tv":920.57,"id":45500449},{"atb":[[26,3.89]],"id":25867593},{"atb":[[16,15.85]],"id":8770330},{"atb":[[7.8,21.28]],"id":40034962},{"atb":[[80,0.94]],"atl":[[80,0]],"tv":198.96,"id":31596930},{"atb":[[12.5,40.54]],"id":39923879}],"tv":26098.12}]} +{"op":"mcm","id":1,"clk":"ANGYmgIA9oKwAgCP/KoC","pt":1719014397912,"mc":[{"id":"1.230014935","rc":[{"atl":[[1000,2]],"id":60918881}]}]} +{"op":"mcm","id":1,"clk":"ANWYmgIA94KwAgCU/KoC","pt":1719014398010,"mc":[{"id":"1.230014772","rc":[{"atb":[[130,1.54]],"id":36209652},{"atl":[[19,4.08]],"id":45500449}]}]} +{"op":"mcm","id":1,"clk":"ANWYmgIA+oKwAgCW/KoC","pt":1719014398012,"mc":[{"id":"1.230015799","rc":[{"atl":[[200,1.69]],"id":15072480}]}]} +{"op":"mcm","id":1,"clk":"ANeYmgIA/YKwAgCf/KoC","pt":1719014398114,"mc":[{"id":"1.230015799","rc":[{"atl":[[180,2.42]],"id":15072480}]}]} +{"op":"mcm","id":1,"clk":"ANmYmgIAgIOwAgCo/KoC","pt":1719014398223,"mc":[{"id":"1.230014723","rc":[{"atl":[[940,0]],"id":69077197},{"atl":[[610,0]],"id":69803917}]},{"id":"1.230014935","rc":[{"atl":[[1000,0.52]],"id":60918881}]}]} +{"op":"mcm","id":1,"clk":"AN2YmgIAg4OwAgCw/KoC","pt":1719014398323,"mc":[{"id":"1.230014723","rc":[{"atl":[[55,1.12]],"id":69077197},{"atl":[[590,1.19]],"id":69803917}]}]} +{"op":"mcm","id":1,"clk":"AOGYmgIAiIOwAgC7/KoC","pt":1719014398430,"mc":[{"id":"1.230014935","rc":[{"atl":[[1000,0]],"id":60918881}]}]} +{"op":"mcm","id":1,"clk":"AOWYmgIAioOwAgDB/KoC","pt":1719014398530,"mc":[{"id":"1.230014935","rc":[{"atl":[[18,0.52]],"id":60918881}]}]} +{"op":"mcm","id":1,"clk":"AO6YmgIAlIOwAgDP/KoC","pt":1719014398832,"mc":[{"id":"1.230014757","rc":[{"atb":[[48,4.85]],"id":10635277}]}]} +{"op":"mcm","id":1,"clk":"AO6YmgIAlIOwAgDR/KoC","pt":1719014398833,"mc":[{"id":"1.230014723","rc":[{"atl":[[55,0]],"id":69077197}]}]} +{"op":"mcm","id":1,"clk":"APKYmgIAlYOwAgDY/KoC","pt":1719014398933,"mc":[{"id":"1.230014723","rc":[{"atl":[[920,1.12]],"id":69077197}]}]} +{"op":"mcm","id":1,"clk":"APuYmgIAnYOwAgDf/KoC","pt":1719014399138,"mc":[{"id":"1.230014820","rc":[{"atl":[[920,0.55]],"id":22386662}]}]} +{"op":"mcm","id":1,"clk":"AIWZmgIAooOwAgDy/KoC","pt":1719014399397,"mc":[{"id":"1.230014723","rc":[{"atl":[[920,0]],"id":69077197}]}]} +{"op":"mcm","id":1,"clk":"AIiZmgIApYOwAgD6/KoC","pt":1719014399502,"mc":[{"id":"1.230014723","rc":[{"atl":[[55,1.12]],"id":69077197}]},{"id":"1.230014921","rc":[{"atb":[[18,0.92]],"id":70679606}]}]} +{"op":"mcm","id":1,"clk":"AIyZmgIApoOwAgD6/KoC","pt":1719014399512,"mc":[{"id":"1.230014868","rc":[{"atl":[[70,0]],"id":47194835}]}]} +{"op":"mcm","id":1,"clk":"AIyZmgIAp4OwAgD+/KoC","pt":1719014399550,"mc":[{"id":"1.230015804","rc":[{"atb":[[6.8,2.69]],"id":67951580}]}]}