From fb8cce3cac68eaeabec937b8744f49c242601ea2 Mon Sep 17 00:00:00 2001 From: jhorman Date: Mon, 15 Jul 2019 20:59:18 -0700 Subject: [PATCH 1/5] Allow tasks to change the visibility timeout of the message they are handling. When a task fails, immediately make it available again instead of waiting for visibility timeout. Fixes https://github.com/spulec/PyQS/issues/47 --- pyqs/__init__.py | 2 +- pyqs/decorator.py | 5 +++++ pyqs/utils.py | 18 ++++++++++++++++++ pyqs/worker.py | 28 ++++++++++++++++++++++++++-- 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/pyqs/__init__.py b/pyqs/__init__.py index 14878b3..bd44c8a 100644 --- a/pyqs/__init__.py +++ b/pyqs/__init__.py @@ -1,4 +1,4 @@ from .decorator import task # noqa __title__ = 'pyqs' -__version__ = '0.1.2' +__version__ = '0.1.4' diff --git a/pyqs/decorator.py b/pyqs/decorator.py index 5f9e3d6..61c4a2b 100644 --- a/pyqs/decorator.py +++ b/pyqs/decorator.py @@ -54,6 +54,11 @@ def wrapper(*args, **kwargs): class task(object): + """ Decorator that enables sqs based task execution. If the function + accepts an optional `_context` argument, an instance of TaskContext is + passed to the task function. The context allows the function to do things + like change message visibility. """ + def __init__(self, queue=None, delay_seconds=None, custom_function_path=None): self.queue_name = queue diff --git a/pyqs/utils.py b/pyqs/utils.py index 11d0a57..1830727 100644 --- a/pyqs/utils.py +++ b/pyqs/utils.py @@ -3,6 +3,7 @@ import pickle import boto3 +from datetime import timedelta def decode_message(message): @@ -36,3 +37,20 @@ def get_aws_region_name(): region_name = 'us-east-1' return region_name + + +class TaskContext(object): + """ Tasks may optionally accept a _context variable. If they do, an + instance of this object is passed as the context. """ + + def __init__(self, conn, queue_url, receipt_handle): + self.conn = conn + self.queue_url = queue_url + self.receipt_handle = receipt_handle + + def change_message_visibility(self, timeout=timedelta(minutes=10)): + self.conn.change_message_visibility( + QueueUrl=self.queue_url, + ReceiptHandle=self.receipt_handle, + VisibilityTimeout=int(timeout.total_seconds()) + ) diff --git a/pyqs/worker.py b/pyqs/worker.py index 0f3c584..ba0c393 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -11,14 +11,20 @@ import time from multiprocessing import Event, Process, Queue + try: from queue import Empty, Full except ImportError: from Queue import Empty, Full +try: + from inspect import getfullargspec as get_args +except ImportError: + from inspect import getargspec as get_args + import boto3 -from pyqs.utils import get_aws_region_name, decode_message +from pyqs.utils import get_aws_region_name, decode_message, TaskContext MESSAGE_DOWNLOAD_BATCH_SIZE = 10 LONG_POLLING_INTERVAL = 20 @@ -180,6 +186,7 @@ def process_message(self): full_task_path = message_body['task'] args = message_body['args'] kwargs = message_body['kwargs'] + receipt_handle = message['ReceiptHandle'] task_name = full_task_path.split(".")[-1] task_path = ".".join(full_task_path.split(".")[:-1]) @@ -188,6 +195,15 @@ def process_message(self): task = getattr(task_module, task_name) + # if the task accepts the optional _context argument, pass it the TaskContext + if '_context' in get_args(task).args: + kwargs = dict(kwargs) + kwargs['_context'] = TaskContext( + conn=self.conn, + queue_url=queue_url, + receipt_handle=receipt_handle + ) + current_time = time.time() if int(current_time - fetch_time) >= timeout: logger.warning( @@ -214,12 +230,20 @@ def process_message(self): traceback.format_exc(), ) ) + + # since the task failed, mark it is available again quickly (10 seconds) + self.conn.change_message_visibility( + QueueUrl=queue_url, + ReceiptHandle=receipt_handle, + VisibilityTimeout=10 + ) + return True else: end_time = time.clock() self.conn.delete_message( QueueUrl=queue_url, - ReceiptHandle=message['ReceiptHandle'] + ReceiptHandle=receipt_handle ) logger.info( "Processed task {} in {:.4f} seconds with args: {} " From ac3e34b02977cf62a8138c408789a128f69bfcc1 Mon Sep 17 00:00:00 2001 From: jhorman Date: Thu, 18 Jul 2019 12:36:52 -0700 Subject: [PATCH 2/5] Also pass the message id in the context. This can allow the server to detect if the message is being sent again, b/c the message had a visibility time out on a different worker. --- pyqs/utils.py | 3 ++- pyqs/worker.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pyqs/utils.py b/pyqs/utils.py index 1830727..c9cc4d0 100644 --- a/pyqs/utils.py +++ b/pyqs/utils.py @@ -43,9 +43,10 @@ class TaskContext(object): """ Tasks may optionally accept a _context variable. If they do, an instance of this object is passed as the context. """ - def __init__(self, conn, queue_url, receipt_handle): + def __init__(self, conn, queue_url, message_id, receipt_handle): self.conn = conn self.queue_url = queue_url + self.message_id = message_id self.receipt_handle = receipt_handle def change_message_visibility(self, timeout=timedelta(minutes=10)): diff --git a/pyqs/worker.py b/pyqs/worker.py index ba0c393..07d6650 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -186,6 +186,7 @@ def process_message(self): full_task_path = message_body['task'] args = message_body['args'] kwargs = message_body['kwargs'] + message_id = message['MessageId'] receipt_handle = message['ReceiptHandle'] task_name = full_task_path.split(".")[-1] @@ -201,6 +202,7 @@ def process_message(self): kwargs['_context'] = TaskContext( conn=self.conn, queue_url=queue_url, + message_id=message_id, receipt_handle=receipt_handle ) From cde0e16e38997fecad79f4ca02224b6e9a779abb Mon Sep 17 00:00:00 2001 From: jhorman Date: Thu, 18 Jul 2019 12:39:49 -0700 Subject: [PATCH 3/5] Don't bump version numbers --- pyqs/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyqs/__init__.py b/pyqs/__init__.py index bd44c8a..14878b3 100644 --- a/pyqs/__init__.py +++ b/pyqs/__init__.py @@ -1,4 +1,4 @@ from .decorator import task # noqa __title__ = 'pyqs' -__version__ = '0.1.4' +__version__ = '0.1.2' From 3a94adbf5dd35b33b5933f55ed01e1289a86d228 Mon Sep 17 00:00:00 2001 From: jhorman Date: Fri, 26 Jul 2019 15:56:01 -0700 Subject: [PATCH 4/5] Adding approx receive count to the context --- pyqs/utils.py | 3 ++- pyqs/worker.py | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pyqs/utils.py b/pyqs/utils.py index c9cc4d0..7f466e5 100644 --- a/pyqs/utils.py +++ b/pyqs/utils.py @@ -43,11 +43,12 @@ class TaskContext(object): """ Tasks may optionally accept a _context variable. If they do, an instance of this object is passed as the context. """ - def __init__(self, conn, queue_url, message_id, receipt_handle): + def __init__(self, conn, queue_url, message_id, receipt_handle, approx_receive_count): self.conn = conn self.queue_url = queue_url self.message_id = message_id self.receipt_handle = receipt_handle + self.approx_receive_count = approx_receive_count def change_message_visibility(self, timeout=timedelta(minutes=10)): self.conn.change_message_visibility( diff --git a/pyqs/worker.py b/pyqs/worker.py index 07d6650..cd1c269 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -98,6 +98,7 @@ def read_message(self): QueueUrl=self.queue_url, MaxNumberOfMessages=self.batchsize, WaitTimeSeconds=LONG_POLLING_INTERVAL, + AttributeNames=["ApproximateReceiveCount"] ).get('Messages', []) logger.debug( @@ -188,6 +189,7 @@ def process_message(self): kwargs = message_body['kwargs'] message_id = message['MessageId'] receipt_handle = message['ReceiptHandle'] + approx_receive_count = message.get('Attributes', {}).get("ApproximateReceiveCount", 1) task_name = full_task_path.split(".")[-1] task_path = ".".join(full_task_path.split(".")[:-1]) @@ -203,7 +205,8 @@ def process_message(self): conn=self.conn, queue_url=queue_url, message_id=message_id, - receipt_handle=receipt_handle + receipt_handle=receipt_handle, + approx_receive_count=approx_receive_count ) current_time = time.time() From 5294cbf619685123c0833997bb22cb99c7a2b95c Mon Sep 17 00:00:00 2001 From: jhorman Date: Fri, 26 Jul 2019 16:19:39 -0700 Subject: [PATCH 5/5] Cast approx receive to an int --- pyqs/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyqs/worker.py b/pyqs/worker.py index cd1c269..d6411aa 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -189,7 +189,7 @@ def process_message(self): kwargs = message_body['kwargs'] message_id = message['MessageId'] receipt_handle = message['ReceiptHandle'] - approx_receive_count = message.get('Attributes', {}).get("ApproximateReceiveCount", 1) + approx_receive_count = int(message.get('Attributes', {}).get("ApproximateReceiveCount", 1)) task_name = full_task_path.split(".")[-1] task_path = ".".join(full_task_path.split(".")[:-1])