Skip to content

Commit

Permalink
Remove MessageEvent.await_with_auto_ack() (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
vrslev committed Jun 11, 2024
1 parent d286b77 commit 3883c88
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 76 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
default: install lint check-types test
default: install lint check-types test test-integration

install:
poetry install --sync
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async with asyncio.TaskGroup() as task_group:
async for event in client.listen_to_events():
match event:
case stompman.MessageEvent(body=body):
task_group.create_task(event.await_with_auto_ack(handle_message(body)))
task_group.create_task(handle_message(body))
case stompman.ErrorEvent(message_header=short_description, body=body):
logger.error(
"Received an error from server", short_description=short_description, body=body, event=event
Expand All @@ -97,16 +97,17 @@ async with asyncio.TaskGroup() as task_group:
task_group.create_task(update_healthcheck_status())


async def handle_message(body: bytes) -> None:
async def handle_message(event: stompman.MessageEvent) -> None:
try:
validated_message = MyMessageModel.model_validate_json(body)
validated_message = MyMessageModel.model_validate_json(event.body)
await run_business_logic(validated_message)
except Exception:
logger.exception("Failed to handle message", body=body)
await event.nack()
logger.exception("Failed to handle message", event=event)
else:
await event.ack()
```

You can pass awaitable object (coroutine, for example) to `Message.await_with_auto_ack()`. In case of error, it will catch any exceptions, send NACK to server and propagate them to the caller. Otherwise, it will send ACK, acknowledging the message was processed successfully.

### Cleaning Up

stompman takes care of cleaning up resources automatically. When you leave the context of async context managers `stompman.Client()`, `client.subscribe()`, or `client.enter_transaction()`, the necessary frames will be sent to the server.
Expand Down
16 changes: 0 additions & 16 deletions stompman/listening_events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections.abc import Awaitable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -37,21 +36,6 @@ async def nack(self) -> None:
)
)

async def await_with_auto_ack(
self, awaitable: Awaitable[None], exception_types: tuple[type[Exception],] = (Exception,)
) -> None:
called_nack = False

try:
await awaitable
except exception_types:
await self.nack()
called_nack = True
raise
finally:
if not called_nack:
await self.ack()


@dataclass
class ErrorEvent:
Expand Down
53 changes: 0 additions & 53 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,59 +346,6 @@ async def test_ack_nack() -> None:
assert_frames_between_lifespan_match(collected_frames, [message_frame, nack_frame, ack_frame])


def get_mocked_message_event() -> tuple[MessageEvent, mock.AsyncMock, mock.AsyncMock]:
ack_mock, nack_mock = mock.AsyncMock(), mock.AsyncMock()

class CustomMessageEvent(MessageEvent):
ack = ack_mock
nack = nack_mock

return (
CustomMessageEvent(
_frame=MessageFrame(
headers={"destination": "destination", "message-id": "message-id", "subscription": "subscription"},
body=b"",
),
_client=mock.Mock(),
),
ack_mock,
nack_mock,
)


async def test_message_event_await_with_auto_ack_nack() -> None:
event, ack, nack = get_mocked_message_event()

async def raises_runtime_error() -> None: # noqa: RUF029
raise RuntimeError

with suppress(RuntimeError):
await event.await_with_auto_ack(raises_runtime_error(), exception_types=(Exception,))

ack.assert_not_called()
nack.assert_called_once_with()


async def test_message_event_await_with_auto_ack_ack_raises() -> None:
event, ack, nack = get_mocked_message_event()

async def func() -> None: # noqa: RUF029
raise Exception # noqa: TRY002

with suppress(Exception):
await event.await_with_auto_ack(func(), exception_types=(RuntimeError,))

ack.assert_called_once_with()
nack.assert_not_called()


async def test_message_event_await_with_auto_ack_ack_ok() -> None:
event, ack, nack = get_mocked_message_event()
await event.await_with_auto_ack(mock.AsyncMock()())
ack.assert_called_once_with()
nack.assert_not_called()


async def test_send_message_and_enter_transaction_ok(monkeypatch: pytest.MonkeyPatch) -> None:
body = b"hello"
destination = "/queue/test"
Expand Down

0 comments on commit 3883c88

Please sign in to comment.