Skip to content

Commit

Permalink
Run once option
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Oct 11, 2024
1 parent d30bf31 commit 0a6c26f
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions src/queue_processor/QueueProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ def create_queues(self):
self.queue_processor_logger.info(f"Creating queue {queue_name}")
self.get_queue(queue_name).createQueue().vt(120).exceptions(False).execute()

def start(self, process: callable):
def start(self, process: callable, run_once: bool = False):
self.queue_processor_logger.info("QueueProcessor running")
while True:
executed_once = False
for task_queue_name, results_queue_name in zip(self.task_queues_names, self.results_queues_names):
try:
self.create_queues()
task_queue = self.get_queue(task_queue_name)
message = task_queue.receiveMessage().execute()
task_queue.deleteMessage(qname=task_queue_name, id=message["id"]).execute()
results = process(utils.decode_message(message["message"]))

executed_once = True
if results:
self.get_queue(results_queue_name).sendMessage(delay=self.delay_time_for_results).message(
results
Expand All @@ -72,3 +73,6 @@ def start(self, process: callable):
self.exists_queues = False
self.queue_processor_logger.error(f"Error: {e}", exc_info=True)
sleep(60)

if run_once and executed_once:
break

0 comments on commit 0a6c26f

Please sign in to comment.