Skip to content

Commit

Permalink
fix: raise OperationalError and Encoding Errors (#293)
Browse files Browse the repository at this point in the history
Log and raise the OperationalError and EncodingErrors
  • Loading branch information
mkanoor authored Aug 29, 2024
1 parent d1218ba commit bce152c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
4 changes: 3 additions & 1 deletion extensions/eda/plugins/event_source/pg_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ async def main(queue: asyncio.Queue[Any], args: dict[str, Any]) -> None:
else:
await queue.put(data)
except json.decoder.JSONDecodeError:
LOGGER.exception("Error decoding data, ignoring it")
LOGGER.exception("Error decoding data")
raise
except OperationalError:
LOGGER.exception("PG Listen operational error")
raise


async def _handle_chunked_message(
Expand Down
64 changes: 64 additions & 0 deletions tests/unit/event_source/test_pg_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch

import psycopg
import pytest
import xxhash

Expand Down Expand Up @@ -116,3 +117,66 @@ def my_iterator() -> _AsyncIterator:
for event in events:
assert myqueue.queue[index] == event
index += 1


def test_decoding_error() -> None:
"""Test json parsing error"""
notify_payload: list[str] = ['{"a"; "b"}']
myqueue = _MockQueue()

def my_iterator() -> _AsyncIterator:
return _AsyncIterator(notify_payload)

with patch(
"extensions.eda.plugins.event_source.pg_listener.AsyncConnection.connect"
) as conn:
mock_object = AsyncMock()
conn.return_value = mock_object
conn.return_value.__aenter__.return_value = mock_object
mock_object.cursor = AsyncMock
mock_object.notifies = my_iterator

with pytest.raises(json.decoder.JSONDecodeError):
asyncio.run(
pg_listener_main(
myqueue,
{
"dsn": (
"host=localhost dbname=mydb "
"user=postgres password=password"
),
"channels": ["test"],
},
)
)


def test_operational_error() -> None:
"""Test json parsing error"""
notify_payload: list[str] = ['{"a": "b"}']
myqueue = _MockQueue()

def my_iterator() -> _AsyncIterator:
return _AsyncIterator(notify_payload)

with patch(
"extensions.eda.plugins.event_source.pg_listener.AsyncConnection.connect"
) as conn:
mock_object = AsyncMock()
conn.return_value = mock_object
conn.return_value.__aenter__.side_effect = psycopg.OperationalError("Kaboom")
mock_object.cursor = AsyncMock
mock_object.notifies = my_iterator
with pytest.raises(psycopg.OperationalError):
asyncio.run(
pg_listener_main(
myqueue,
{
"dsn": (
"host=localhost dbname=mydb "
"user=postgres password=password"
),
"channels": ["test"],
},
)
)

0 comments on commit bce152c

Please sign in to comment.