Skip to content

Commit 278bf91

Browse files
committed
patch boy
1 parent 54bea4e commit 278bf91

File tree

3 files changed

+24
-42
lines changed

3 files changed

+24
-42
lines changed

mrsal/amqp/subclass.py

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,18 @@ async def setup_async_connection(self):
276276
except Exception as e:
277277
self.log.error(f'Oh my lordy lord! I caugth an unexpected exception while trying to connect: {e}', exc_info=True)
278278

279-
async def async_start_consumer(
279+
@retry(
280+
retry=retry_if_exception_type((
281+
AMQPConnectionError,
282+
ChannelClosedByBroker,
283+
ConnectionClosedByBroker,
284+
StreamLostError,
285+
)),
286+
stop=stop_after_attempt(3),
287+
wait=wait_fixed(2),
288+
before_sleep=before_sleep_log(log, WARNING)
289+
)
290+
async def start_consumer(
280291
self,
281292
queue_name: str,
282293
callback: Callable | None = None,
@@ -291,6 +302,10 @@ async def async_start_consumer(
291302
):
292303
"""Start the async consumer with the provided setup."""
293304
# Check if there's a connection; if not, create one
305+
try:
306+
asyncio.get_running_loop()
307+
except RuntimeError:
308+
raise MrsalNoAsyncioLoopFound(f'Young grasshopper! You forget to add asyncio.run(mrsal.start_consumer(...))')
294309
if not self._connection:
295310
await self.setup_async_connection()
296311

@@ -368,40 +383,3 @@ async def async_start_consumer(
368383
await message.ack()
369384
self.log.success(f'Young grasshopper! Message ({msg_id}) from {app_id} received and properly processed.')
370385

371-
@retry(
372-
retry=retry_if_exception_type((
373-
AMQPConnectionError,
374-
ChannelClosedByBroker,
375-
ConnectionClosedByBroker,
376-
StreamLostError,
377-
)),
378-
stop=stop_after_attempt(3),
379-
wait=wait_fixed(2),
380-
before_sleep=before_sleep_log(log, WARNING)
381-
)
382-
def start_consumer(
383-
self,
384-
queue_name: str,
385-
callback: Callable | None = None,
386-
callback_args: dict[str, str | int | float | bool] | None = None,
387-
auto_ack: bool = False,
388-
auto_declare: bool = True,
389-
exchange_name: str | None = None,
390-
exchange_type: str | None = None,
391-
routing_key: str | None = None,
392-
payload_model: Type | None = None,
393-
requeue: bool = True
394-
):
395-
"""The client-facing method that runs the async consumer"""
396-
asyncio.run(self.async_start_consumer(
397-
queue_name=queue_name,
398-
callback=callback,
399-
callback_args=callback_args,
400-
auto_ack=auto_ack,
401-
auto_declare=auto_declare,
402-
exchange_name=exchange_name,
403-
exchange_type=exchange_type,
404-
routing_key=routing_key,
405-
payload_model=payload_model,
406-
requeue=requeue
407-
))

mrsal/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ class ValidateTLS(BaseModel):
1313
class AioPikaAttributes(BaseModel):
1414
message_id: str | None
1515
app_id: str | None
16+
17+
18+
class MrsalNoAsyncioLoopFound(Exception):
19+
"""Throw this error when user does not inlude asyncio.run"""

tests/test_mrsal_async_no_tls.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async def message_generator():
7070
mock_callback = AsyncMock()
7171

7272
# Call the async method directly to avoid the asyncio.run() issue
73-
await consumer.async_start_consumer(
73+
await consumer.start_consumer(
7474
queue_name='test_q',
7575
callback=mock_callback,
7676
routing_key='test_route',
@@ -107,7 +107,7 @@ async def message_generator():
107107
mock_callback = AsyncMock()
108108

109109
# Call the async method directly to avoid the asyncio.run() issue
110-
await consumer.async_start_consumer(
110+
await consumer.start_consumer(
111111
queue_name='test_q',
112112
callback=mock_callback,
113113
routing_key='test_route',
@@ -147,7 +147,7 @@ async def message_generator():
147147
mock_callback = AsyncMock()
148148

149149
# Call the async method directly to avoid the asyncio.run() issue
150-
await consumer.async_start_consumer(
150+
await consumer.start_consumer(
151151
queue_name='test_q',
152152
callback=mock_callback,
153153
routing_key='test_route',
@@ -187,7 +187,7 @@ async def message_generator():
187187
# # Patch the setup_async_connection to raise AMQPConnectionError
188188
# with patch.object(MrsalAsyncAMQP, 'setup_async_connection', side_effect=AMQPConnectionError("Connection failed")) as mock_setup:
189189
# with pytest.raises(RetryError): # Expect RetryError after 3 failed attempts
190-
# await consumer.async_start_consumer(
190+
# await consumer.start_consumer(
191191
# queue_name='test_q',
192192
# callback=AsyncMock(),
193193
# routing_key='test_route',

0 commit comments

Comments
 (0)