Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

patch boy #42

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 16 additions & 38 deletions mrsal/amqp/subclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,18 @@ async def setup_async_connection(self):
except Exception as e:
self.log.error(f'Oh my lordy lord! I caugth an unexpected exception while trying to connect: {e}', exc_info=True)

async def async_start_consumer(
@retry(
retry=retry_if_exception_type((
AMQPConnectionError,
ChannelClosedByBroker,
ConnectionClosedByBroker,
StreamLostError,
)),
stop=stop_after_attempt(3),
wait=wait_fixed(2),
before_sleep=before_sleep_log(log, WARNING)
)
async def start_consumer(
self,
queue_name: str,
callback: Callable | None = None,
Expand All @@ -291,6 +302,10 @@ async def async_start_consumer(
):
"""Start the async consumer with the provided setup."""
# Check if there's a connection; if not, create one
try:
asyncio.get_running_loop()
except RuntimeError:
raise MrsalNoAsyncioLoopFound(f'Young grasshopper! You forget to add asyncio.run(mrsal.start_consumer(...))')
if not self._connection:
await self.setup_async_connection()

Expand Down Expand Up @@ -368,40 +383,3 @@ async def async_start_consumer(
await message.ack()
self.log.success(f'Young grasshopper! Message ({msg_id}) from {app_id} received and properly processed.')

@retry(
retry=retry_if_exception_type((
AMQPConnectionError,
ChannelClosedByBroker,
ConnectionClosedByBroker,
StreamLostError,
)),
stop=stop_after_attempt(3),
wait=wait_fixed(2),
before_sleep=before_sleep_log(log, WARNING)
)
def start_consumer(
self,
queue_name: str,
callback: Callable | None = None,
callback_args: dict[str, str | int | float | bool] | None = None,
auto_ack: bool = False,
auto_declare: bool = True,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None,
payload_model: Type | None = None,
requeue: bool = True
):
"""The client-facing method that runs the async consumer"""
asyncio.run(self.async_start_consumer(
queue_name=queue_name,
callback=callback,
callback_args=callback_args,
auto_ack=auto_ack,
auto_declare=auto_declare,
exchange_name=exchange_name,
exchange_type=exchange_type,
routing_key=routing_key,
payload_model=payload_model,
requeue=requeue
))
4 changes: 4 additions & 0 deletions mrsal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ class ValidateTLS(BaseModel):
class AioPikaAttributes(BaseModel):
message_id: str | None
app_id: str | None


class MrsalNoAsyncioLoopFound(Exception):
"""Throw this error when user does not inlude asyncio.run"""
8 changes: 4 additions & 4 deletions tests/test_mrsal_async_no_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def message_generator():
mock_callback = AsyncMock()

# Call the async method directly to avoid the asyncio.run() issue
await consumer.async_start_consumer(
await consumer.start_consumer(
queue_name='test_q',
callback=mock_callback,
routing_key='test_route',
Expand Down Expand Up @@ -107,7 +107,7 @@ async def message_generator():
mock_callback = AsyncMock()

# Call the async method directly to avoid the asyncio.run() issue
await consumer.async_start_consumer(
await consumer.start_consumer(
queue_name='test_q',
callback=mock_callback,
routing_key='test_route',
Expand Down Expand Up @@ -147,7 +147,7 @@ async def message_generator():
mock_callback = AsyncMock()

# Call the async method directly to avoid the asyncio.run() issue
await consumer.async_start_consumer(
await consumer.start_consumer(
queue_name='test_q',
callback=mock_callback,
routing_key='test_route',
Expand Down Expand Up @@ -187,7 +187,7 @@ async def message_generator():
# # Patch the setup_async_connection to raise AMQPConnectionError
# with patch.object(MrsalAsyncAMQP, 'setup_async_connection', side_effect=AMQPConnectionError("Connection failed")) as mock_setup:
# with pytest.raises(RetryError): # Expect RetryError after 3 failed attempts
# await consumer.async_start_consumer(
# await consumer.start_consumer(
# queue_name='test_q',
# callback=AsyncMock(),
# routing_key='test_route',
Expand Down
Loading