diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..b9e55a0 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,71 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ master ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ master ] + schedule: + - cron: '39 1 * * 5' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'python' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] + # Learn more: + # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v1 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v1 + + # ℹī¸ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..7fedacb --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,26 @@ +name: Upload Python Package + +on: + release: + types: [created] + +jobs: + deploy: + runs-on: ubuntu-latest + permissions: + id-token: write + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v1 + with: + python-version: '3.x' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel twine + - name: Build + run: | + python setup.py sdist bdist_wheel + - name: Publish + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml new file mode 100644 index 0000000..c3ce97f --- /dev/null +++ b/.github/workflows/testing.yml @@ -0,0 +1,35 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Build + +on: [push, pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-tests.txt + - name: Lint with flake8 + run: | + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + env: + COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} + run: | + pytest diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 2a998ac..0000000 --- a/.travis.yml +++ /dev/null @@ -1,12 +0,0 @@ -language: python -python: - - "2.7" - - "3.4" -env: - - BOTO_CONFIG=/tmp/nowhere -install: - - if [[ $TRAVIS_PYTHON_VERSION == 2* ]]; then pip install -r requirements-py2.txt; fi - - "pip install -r requirements.txt" -script: "py.test tests/ --cov garcon --cov-report term-missing" -after_success: - - coveralls diff --git a/README.rst b/README.rst index b76fd53..c740a12 100644 --- a/README.rst +++ b/README.rst @@ -1,4 +1,4 @@ -|Build Status| |Coverage Status| +|BuildStatus| |Downloads| |CoverageStatus| Lightweight library for AWS SWF. @@ -10,8 +10,8 @@ Lightweight library for AWS SWF. Requirements ~~~~~~~~~~~~ -- Python 2.7, 3.4 (tested) -- Boto 2.34.0 (tested) +- Python 3.8, 3.9, 3.10, 3.11, 3.12 (tested) +- Boto3 (tested) Goal ~~~~ @@ -20,52 +20,43 @@ The goal of this library is to allow the creation of Amazon Simple Workflow without the need to worry about the orchestration of the different activities and building out the different workers. This framework aims to help simple workflows. If you have a more complex -case, you might want to use directly boto. +case, you might want to use directly boto3. Code sample ~~~~~~~~~~~ -The code sample shows a workflow that has 4 activities. It starts with -activity\_1, which after being completed schedule activity\_2 and -activity\_3 to be ran in parallel. The workflow ends after the -completion of activity\_4 which requires activity\_2 and activity\_3 to -be completed. +The code sample shows a workflow where a user enters a coffee shop, orders +a coffee and a chocolate chip cookie. All ordered items are prepared and +completed, the user pays the order, receives the ordered items, then leave +the shop. -.. code:: python - - from __future__ import print_function - - from garcon import activity - from garcon import runner - - - domain = 'dev' - name = 'workflow_sample' - create = activity.create(domain, name) - - test_activity_1 = create( - name='activity_1', - run=runner.Sync( - lambda activity, context: print('activity_1'))) +The code below represents the workflow decider. For the full code sample, +see the `example`_. - test_activity_2 = create( - name='activity_2', - requires=[test_activity_1], - run=runner.Async( - lambda activity, context: print('activity_2_task_1'), - lambda activity, context: print('activity_2_task_2'))) +.. code:: python - test_activity_3 = create( - name='activity_3', - requires=[test_activity_1], - run=runner.Sync( - lambda activity, context: print('activity_3'))) + enter = schedule('enter', self.create_enter_coffee_activity) + enter.wait() + + total = 0 + for item in ['coffee', 'chocolate_chip_cookie']: + activity_name = 'order_{item}'.format(item=item) + activity = schedule(activity_name, + self.create_order_activity, + input={'item': item}) + total += activity.result.get('price') + + pay_activity = schedule( + 'pay', self.create_payment_activity, + input={'total': total}) + + get_order = schedule('get_order', self.create_get_order_activity) + + # Waiting for paying and getting the order to complete before + # we let the user leave the coffee shop. + pay_activity.wait(), get_order.wait() + schedule('leave_coffee_shop', self.create_leave_coffee_shop) - test_activity_4 = create( - name='activity_4', - requires=[test_activity_3, test_activity_2], - run=runner.Sync( - lambda activity, context: print('activity_4'))) Application architecture ~~~~~~~~~~~~~~~~~~~~~~~~ @@ -82,10 +73,15 @@ Application architecture │ └── s3.py # Task that focuses on s3 files. └── task_example.py # Your different tasks. +Trusted by +~~~~~~~~~~ + +|The Orchard| |Sony Music| |DataArt| + Contributors ~~~~~~~~~~~~ -- Michael Ortali +- Michael Ortali (Author) - Adam Griffiths - Raphael Antonmattei - John Penner @@ -93,8 +89,22 @@ Contributors .. _xethorn: github.com/xethorn .. _rantonmattei: github.com/rantonmattei .. _someboredkiddo: github.com/someboredkiddo +.. _example: https://github.com/xethorn/garcon/tree/master/example/custom_decider + +.. |BuildStatus| image:: https://github.com/xethorn/garcon/workflows/Build/badge.svg + :target: https://github.com/xethorn/garcon/actions?query=workflow%3ABuild+branch%3Amaster + +.. |Downloads| image:: https://img.shields.io/pypi/dm/garcon.svg + :target: https://coveralls.io/r/xethorn/garcon?branch=master + +.. |CoverageStatus| image:: https://coveralls.io/repos/xethorn/garcon/badge.svg?branch=master + :target: https://coveralls.io/r/xethorn/garcon?branch=master + +.. |The Orchard| image:: https://media-exp1.licdn.com/dms/image/C4E0BAQGi7o5g9l4JWg/company-logo_200_200/0/1519855981606?e=2159024400&v=beta&t=WBe-gOK2b30vUTGKbA025i9NFVDyOrS4Fotx9fMEZWo + :target: https://theorchard.com -.. |Build Status| image:: https://travis-ci.org/xethorn/garcon.svg - :target: https://travis-ci.org/xethorn/garcon -.. |Coverage Status| image:: https://coveralls.io/repos/xethorn/garcon/badge.svg?branch=master - :target: https://coveralls.io/r/xethorn/garcon?branch=master \ No newline at end of file +.. |Sony Music| image:: https://media-exp1.licdn.com/dms/image/C4D0BAQE9rvU-3ig-jg/company-logo_200_200/0/1604099587507?e=2159024400&v=beta&t=eAAubphf_fI-5GEb0ak1QnmtRHmc8466Qj4sGrCsWYc + :target: https://www.sonymusic.com/ + +.. |DataArt| image:: https://media-exp1.licdn.com/dms/image/C4E0BAQGRi6OIlNQG8Q/company-logo_200_200/0/1519856519357?e=2159024400&v=beta&t=oi6HQpzoeTKA082s-8Ft75vGTvAkEp4VHRyMLeOHXoo + :target: https://www.dataart.com/ diff --git a/docs/index.rst b/docs/index.rst index d8aa0cc..8c9ed93 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,7 +6,7 @@ Lightweight library for AWS SWF. Requirements ------------ -* Python 2.7, 3.4 (tested) +* Python 3.5, 3.6, 3.7, 3.8 (tested) * Boto 2.34.0 (tested) @@ -20,8 +20,6 @@ Code sample The code sample shows a workflow that has 4 activities. It starts with activity_1, which after being completed schedule activity_2 and activity_3 to be ran in parallel. The workflow ends after the completion of activity_4 which requires activity_2 and activity_3 to be completed:: - from __future__ import print_function - from garcon import activity from garcon import runner diff --git a/example/custom_decider/run.py b/example/custom_decider/run.py index 7d0f13a..8b4eaf0 100644 --- a/example/custom_decider/run.py +++ b/example/custom_decider/run.py @@ -1,20 +1,28 @@ from garcon import activity from garcon import decider from threading import Thread -import boto.swf.layer2 as swf import time +import boto3 import workflow # Initiate the workflow on the dev domain and custom_decider name. -flow = workflow.Workflow('dev', 'custom_decider') -deciderworker = decider.DeciderWorker(flow) +client = boto3.client('swf', region_name='us-east-1') +workflow = workflow.Workflow(client, 'dev', 'custom_decider') +deciderworker = decider.DeciderWorker(workflow) -# swf.WorkflowType( -# name=flow.name, domain=flow.domain, -# version='1.0', task_list=flow.name).start() +client.start_workflow_execution( + domain=workflow.domain, + workflowId='unique-workflow-identifier', + workflowType=dict( + name=workflow.name, + version='1.0'), + executionStartToCloseTimeout='3600', + taskStartToCloseTimeout='3600', + childPolicy='TERMINATE', + taskList=dict(name=workflow.name)) -Thread(target=activity.ActivityWorker(flow).run).start() +Thread(target=activity.ActivityWorker(workflow).run).start() while(True): deciderworker.run() time.sleep(1) diff --git a/example/custom_decider/workflow.py b/example/custom_decider/workflow.py index f8c701c..6f7eaef 100644 --- a/example/custom_decider/workflow.py +++ b/example/custom_decider/workflow.py @@ -9,21 +9,20 @@ Workflow: 1. Enter in the coffee shop. 2. Order. - 2.1 Chocolate chip cookie. - 2.2 Coffee. + 2.1 Coffee. + 2.2 Chocolate chip cookie. 3. Finalize the order (any of the activites below can be done in any order) 3.1 Pays. 3.2 Get the order. 4. Leave the coffee shop. Result: - -entering coffee shop -ordering: coffee -ordering: chocolate_chip_cookie -pay $6.98 -get order -leaving the coffee shop + entering coffee shop + ordering: coffee + ordering: chocolate_chip_cookie + pay $6.98 + get order + leaving the coffee shop """ from garcon import activity @@ -33,7 +32,7 @@ class Workflow: - def __init__(self, domain, name): + def __init__(self, client, domain, name): """Create the workflow. Args: @@ -42,7 +41,8 @@ def __init__(self, domain, name): """ self.name = name self.domain = domain - self.create_activity = activity.create(domain, name) + self.client = client + self.create_activity = activity.create(client, domain, name) def decider(self, schedule, context=None): """Custom deciders. diff --git a/example/simple/run.py b/example/simple/run.py index ddb475d..97d630d 100644 --- a/example/simple/run.py +++ b/example/simple/run.py @@ -1,18 +1,23 @@ from garcon import activity from garcon import decider from threading import Thread -import boto.swf.layer2 as swf +import boto3 import time -import test_flow +import workflow -deciderworker = decider.DeciderWorker(test_flow) +client = boto3.client('swf', region_name='us-east-1') +deciderworker = decider.DeciderWorker(client, workflow) -swf.WorkflowType( - name=test_flow.name, domain=test_flow.domain, - version='1.0', task_list=test_flow.name).start() +client.start_workflow_execution( + domain=workflow.domain, + workflowId='unique-workflow-identifier', + workflowType=dict( + name=workflow.name, + version='1.0'), + taskList=dict(name=workflow.name)) -Thread(target=activity.ActivityWorker(test_flow).run).start() +Thread(target=activity.ActivityWorker(client, workflow).run).start() while(True): deciderworker.run() time.sleep(1) diff --git a/example/simple/workflow.py b/example/simple/workflow.py index 3bc0ba8..e75f1d1 100644 --- a/example/simple/workflow.py +++ b/example/simple/workflow.py @@ -1,16 +1,15 @@ -import boto.swf.layer2 as swf - from garcon import activity from garcon import runner +import boto3 import logging import random logger = logging.getLogger(__name__) - +client = boto3.client('swf', region_name='us-east-1') domain = 'dev' name = 'workflow_sample' -create = activity.create(domain, name) +create = activity.create(client, domain, name) def activity_failure(context, activity): @@ -18,8 +17,7 @@ def activity_failure(context, activity): if num != 3: logger.warn('activity_3: fails') raise Exception('fails') - - print('activity_3: end') + logger.debug('activity_3: end') test_activity_1 = create( diff --git a/garcon/activity.py b/garcon/activity.py index df03027..b9fe701 100755 --- a/garcon/activity.py +++ b/garcon/activity.py @@ -13,10 +13,12 @@ Create an activity:: + import boto3 from garcon import activity # First step is to create the workflow on a specific domain. - create = activity.create('domain') + client = boto3.client('swf') + create = activity.create(client, 'domain', 'workflow-name') initial_activity = create( # Name of your activity @@ -37,18 +39,16 @@ """ -import backoff -import boto.exception as boto_exception -import boto.swf.layer2 as swf +from botocore import exceptions import itertools import json import threading +import backoff from garcon import log from garcon import utils from garcon import runner - ACTIVITY_STANDBY = 0 ACTIVITY_SCHEDULED = 1 ACTIVITY_COMPLETED = 2 @@ -243,13 +243,25 @@ def create_execution_input(self): return activity_input -class Activity(swf.ActivityWorker, log.GarconLogger): +class Activity(log.GarconLogger): version = '1.0' task_list = None + def __init__(self, client): + """Instantiates an activity. + + Args: + client: the boto client used for this activity. + """ + + self.client = client + self.name = None + self.domain = None + self.task_list = None + @backoff.on_exception( backoff.expo, - boto_exception.SWFResponseError, + exceptions.ClientError, max_tries=5, giveup=utils.non_throttle_error, on_backoff=utils.throttle_backoff_handler, @@ -266,9 +278,22 @@ def poll_for_activity(self, identity=None): identity (str): Identity of the worker making the request, which is recorded in the ActivityTaskStarted event in the AWS console. This enables diagnostic tracing when problems arise. + Return: + ActivityExecution: activity execution. """ - return self.poll(identity=identity) + additional_params = {} + if identity: + additional_params.update(identity=identity) + + execution_definition = self.client.poll_for_activity_task( + domain=self.domain, taskList=dict(name=self.task_list), + **additional_params) + + return ActivityExecution( + self.client, execution_definition.get('activityId'), + execution_definition.get('taskToken'), + execution_definition.get('input')) def run(self, identity=None): """Activity Runner. @@ -278,15 +303,14 @@ def run(self, identity=None): previous activity is consumed (context). Args: - activity_id (str): Identity of the worker making the request, which + identity (str): Identity of the worker making the request, which is recorded in the ActivityTaskStarted event in the AWS console. This enables diagnostic tracing when problems arise. """ - try: if identity: self.logger.debug('Polling with {}'.format(identity)) - activity_task = self.poll_for_activity(identity) + execution = self.poll_for_activity(identity) except Exception as error: # Catch exceptions raised during poll() to avoid an Activity thread # dying & worker daemon unable to process the affected Activity. @@ -297,43 +321,40 @@ def run(self, identity=None): # when an exception occurs. if self.on_exception: self.on_exception(self, error) - self.logger.error(error, exc_info=True) return True - packed_context = activity_task.get('input') - context = dict() - - if packed_context: - context = json.loads(packed_context) - self.set_log_context(context) - - if 'activityId' in activity_task: + self.set_log_context(execution.context) + if execution.activity_id: try: - context = self.execute_activity(context) - self.complete(result=json.dumps(context)) + context = self.execute_activity(execution) + execution.complete(context) except Exception as error: # If the workflow has been stopped, it is not possible for the # activity to be updated – it throws an exception which stops # the worker immediately. try: - self.fail(reason=str(error)[:255]) + execution.fail(str(error)[:255]) if self.on_exception: self.on_exception(self, error) - except: - pass + except Exception as error2: # noqa: E722 + if self.on_exception: + self.on_exception(self, error2) self.unset_log_context() return True - def execute_activity(self, context): + def execute_activity(self, activity): """Execute the runner. Args: - context (dict): The flow context. + execution (ActivityExecution): the activity execution. + + Return: + dict: The result of the operation. """ - return self.runner.execute(self, context) + return self.runner.execute(activity, activity.context) def hydrate(self, data): """Hydrate the task with information provided. @@ -428,6 +449,7 @@ def __init__(self, timeout=None, heartbeat=None): will be equal to the timeout. """ + Activity.__init__(self, client=None) self.runner = runner.External(timeout=timeout, heartbeat=heartbeat) def run(self): @@ -440,6 +462,56 @@ def run(self): return False +class ActivityExecution(log.GarconLogger): + + def __init__(self, client, activity_id, task_token, context): + """Create an an activity execution. + + Args: + client (boto3.client): the boto client (for easy access if needed). + activity_id (str): the activity id. + task_token (str): the task token. + context (str): data for the execution. + """ + + self.client = client + self.activity_id = activity_id + self.task_token = task_token + self.context = context and json.loads(context) or dict() + + def heartbeat(self, details=None): + """Create a task heartbeat. + + Args: + details (str): details to add to the heartbeat. + """ + + self.client.record_activity_task_heartbeat(taskToken=self.task_token, + details=details or '') + + def fail(self, reason=None): + """Mark the activity execution as failed. + + Args: + reason (str): optional reason for the failure. + """ + + self.client.respond_activity_task_failed( + taskToken=self.task_token, + reason=reason or '') + + def complete(self, context=None): + """Mark the activity execution as completed. + + Args: + context (str or dict): the context result of the operation. + """ + + self.client.respond_activity_task_completed( + taskToken=self.task_token, + result=json.dumps(context)) + + class ActivityWorker(): def __init__(self, flow, activities=None): @@ -465,12 +537,19 @@ def __init__(self, flow, activities=None): def run(self): """Run the activities. """ - + threads = [] for activity in self.activities: if (self.worker_activities and activity.name not in self.worker_activities): continue - threading.Thread(target=worker_runner, args=(activity,)).start() + thread = threading.Thread( + target=worker_runner, + args=(activity,)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() class ActivityState: @@ -549,7 +628,7 @@ def wait(self): """Wait until ready. """ - if not self.ready(): + if not self.ready: raise ActivityInstanceNotReadyException() @@ -560,11 +639,11 @@ def worker_runner(worker): worker (object): the Activity worker. """ - while(worker.run()): + while (worker.run()): continue -def create(domain, name, version='1.0', on_exception=None): +def create(client, domain, workflow_name, version='1.0', on_exception=None): """Helper method to create Activities. The helper method simplifies the creation of an activity by setting the @@ -576,8 +655,9 @@ def create(domain, name, version='1.0', on_exception=None): activity. Always make sure your activity name is unique. Args: + client (boto3.client): the boto3 client. domain (str): the domain name. - name (str): name of the activity. + workflow_name (str): workflow name. version (str): activity version. on_exception (callable): the error handler. @@ -586,7 +666,7 @@ def create(domain, name, version='1.0', on_exception=None): """ def wrapper(**options): - activity = Activity() + activity = Activity(client) if options.get('external'): activity = ExternalActivity( @@ -594,7 +674,7 @@ def wrapper(**options): heartbeat=options.get('heartbeat')) activity_name = '{name}_{activity}'.format( - name=name, + name=workflow_name, activity=options.get('name')) activity.hydrate(dict( @@ -610,6 +690,7 @@ def wrapper(**options): schedule_to_start=options.get('schedule_to_start'), on_exception=options.get('on_exception') or on_exception)) return activity + return wrapper @@ -635,7 +716,7 @@ def find_available_activities(flow, history, context): if states.get_last_state() != ACTIVITY_FAILED: continue elif (not instance.retry or - instance.retry < count_activity_failures(states)): + instance.retry < count_activity_failures(states)): raise Exception( 'The activity failures has exceeded its retry limit.') diff --git a/garcon/decider.py b/garcon/decider.py index f45e717..b8aeb30 100755 --- a/garcon/decider.py +++ b/garcon/decider.py @@ -7,18 +7,15 @@ executed and when based on the flow procided. """ -from boto.swf.exceptions import SWFDomainAlreadyExistsError -from boto.swf.exceptions import SWFTypeAlreadyExistsError -import boto.swf.layer2 as swf import functools import json +import uuid from garcon import activity from garcon import event from garcon import log - -class DeciderWorker(swf.Decider, log.GarconLogger): +class DeciderWorker(log.GarconLogger): def __init__(self, flow, register=True): """Initialize the Decider Worker. @@ -28,18 +25,18 @@ def __init__(self, flow, register=True): register (boolean): If this flow needs to be register on AWS. """ + self.client = flow.client self.flow = flow self.domain = flow.domain self.version = getattr(flow, 'version', '1.0') self.activities = activity.find_workflow_activities(flow) self.task_list = flow.name self.on_exception = getattr(flow, 'on_exception', None) - super(DeciderWorker, self).__init__() if register: self.register() - def get_history(self, poll): + def get_history(self, identity, poll): """Get all the history. The full history needs to be recovered from SWF to make sure that all @@ -47,6 +44,7 @@ def get_history(self, poll): 100 events are provided, this methods retrieves all events. Args: + identity (string): The identity of the decider that pulls the information. poll (object): The poll object (see AWS SWF for details.) Return: list: All the events. @@ -54,7 +52,11 @@ def get_history(self, poll): events = poll['events'] while 'nextPageToken' in poll: - poll = self.poll(next_page_token=poll['nextPageToken']) + poll = self.client.poll_for_decision_task( + domain=self.domain, + identity=identity, + taskList=dict(name=self.task_list), + nextPageToken=poll['nextPageToken']) if 'events' in poll: events += poll['events'] @@ -62,7 +64,6 @@ def get_history(self, poll): # Remove all the events that are related to decisions and only. return [e for e in events if not e['eventType'].startswith('Decision')] - def get_activity_states(self, history): """Get the activity states from the history. @@ -86,28 +87,31 @@ def register(self): """ registerables = [] - registerables.append(swf.Domain(name=self.domain)) - registerables.append(swf.WorkflowType( - domain=self.domain, - name=self.task_list, - version=self.version, - task_list=self.task_list)) + registerables.append(( + self.client.register_domain, + dict(name=self.domain, + workflowExecutionRetentionPeriodInDays='90'))) + registerables.append(( + self.client.register_workflow_type, + dict( + domain=self.domain, + name=self.task_list, + version=self.version))) for current_activity in self.activities: - registerables.append( - swf.ActivityType( + registerables.append(( + self.client.register_activity_type, + dict( domain=self.domain, name=current_activity.name, - version=self.version, - task_list=current_activity.task_list)) + version=self.version))) - for swf_entity in registerables: + for callable, data in registerables: try: - swf_entity.register() - except (SWFDomainAlreadyExistsError, SWFTypeAlreadyExistsError): - print( - swf_entity.__class__.__name__, swf_entity.name, - 'already exists') + callable(**data) + except Exception as e: + print(e) + print(data.get('name'), 'already exists') def create_decisions_from_flow(self, decisions, activity_states, context): """Create the decisions from the flow. @@ -117,7 +121,7 @@ def create_decisions_from_flow(self, decisions, activity_states, context): to schedule is thus very straightforward. Args: - decisions (Layer1Decisions): the layer decision for swf. + decisions (list): the layer decision for swf. activity_states (dict): all the state activities. context (dict): the context of the activities. """ @@ -125,7 +129,6 @@ def create_decisions_from_flow(self, decisions, activity_states, context): try: for current in activity.find_available_activities( self.flow, activity_states, context.current): - schedule_activity_task( decisions, current, version=self.version) else: @@ -133,9 +136,13 @@ def create_decisions_from_flow(self, decisions, activity_states, context): activity.find_uncomplete_activities( self.flow, activity_states, context.current)) if not activities: - decisions.complete_workflow_execution() + decisions.append(dict( + decisionType='CompleteWorkflowExecution')) except Exception as e: - decisions.fail_workflow_execution(reason=str(e)) + decisions.append(dict( + decisionType='FailWorkflowExecution', + failWorkflowExecutionDecisionAttributes=dict( + reason=str(e)))) if self.on_exception: self.on_exception(self, e) self.logger.error(e, exc_info=True) @@ -149,7 +156,7 @@ def delegate_decisions(self, decisions, decider, history, context): and if scheduled, returns its result. Args: - decisions (Layer1Decisions): the layer decision for swf. + decisions (list): the layer decision for swf. decider (callable): the decider (it needs to have schedule) history (dict): all the state activities and its history. context (dict): the context of the activities. @@ -172,11 +179,15 @@ def delegate_decisions(self, decisions, decider, history, context): # When no exceptions are raised and the method decider has returned # it means that there i nothing left to do in the current decider. if schedule_context.completed: - decisions.complete_workflow_execution() + decisions.append(dict( + decisionType='CompleteWorkflowExecution')) except activity.ActivityInstanceNotReadyException: pass except Exception as e: - decisions.fail_workflow_execution(reason=str(e)) + decisions.append(dict( + decisionType='FailWorkflowExecution', + failWorkflowExecutionDecisionAttributes=dict( + reason=str(e)))) if self.on_exception: self.on_exception(self, e) self.logger.error(e, exc_info=True) @@ -205,7 +216,10 @@ def run(self, identity=None): """ try: - poll = self.poll(identity=identity) + poll = self.client.poll_for_decision_task( + domain=self.domain, + taskList=dict(name=self.task_list), + identity=identity or '') except Exception as error: # Catch exceptions raised during poll() to avoid a Decider thread # dying & the daemon unable to process subsequent workflows. @@ -224,19 +238,21 @@ def run(self, identity=None): if 'events' not in poll: return True - history = self.get_history(poll) + history = self.get_history(identity or '', poll) activity_states = self.get_activity_states(history) current_context = event.get_current_context(history) current_context.set_workflow_execution_info(poll, self.domain) - decisions = swf.Layer1Decisions() + decisions = [] if not custom_decider: self.create_decisions_from_flow( decisions, activity_states, current_context) else: self.delegate_decisions( decisions, custom_decider, activity_states, current_context) - self.complete(decisions=decisions) + self.client.respond_decision_task_completed( + taskToken=poll.get('taskToken'), + decisions=decisions) return True @@ -277,16 +293,19 @@ def schedule_activity_task( id (str): optional id of the activity instance. """ - decisions.schedule_activity_task( - id or instance.id, - instance.activity_name, - version, - task_list=instance.activity_worker.task_list, - input=json.dumps(instance.create_execution_input()), - heartbeat_timeout=str(instance.heartbeat_timeout), - start_to_close_timeout=str(instance.timeout), - schedule_to_start_timeout=str(instance.schedule_to_start), - schedule_to_close_timeout=str(instance.schedule_to_close)) + decisions.append(dict( + decisionType='ScheduleActivityTask', + scheduleActivityTaskDecisionAttributes=dict( + activityId=id or instance.id, + activityType=dict( + name=instance.activity_name, + version=version), + taskList=dict(name=instance.activity_worker.task_list), + input=json.dumps(instance.create_execution_input()), + heartbeatTimeout=str(instance.heartbeat_timeout), + startToCloseTimeout=str(instance.timeout), + scheduleToStartTimeout=str(instance.schedule_to_start), + scheduleToCloseTimeout=str(instance.schedule_to_close)))) def schedule( @@ -299,7 +318,7 @@ def schedule( input with the full execution context to send the data to the activity. Args: - decisions (Layer1Decisions): the layer decision for swf. + decisions (list): the layer decision for swf. schedule_context (dict): information about the schedule. history (dict): history of the execution. context (dict): context of the execution. @@ -340,7 +359,7 @@ def schedule( if states.get_last_state() != activity.ACTIVITY_FAILED: continue elif (not current.retry or - current.retry < activity.count_activity_failures(states)): + current.retry < activity.count_activity_failures(states)): raise Exception( 'The activity failures has exceeded its retry limit.') diff --git a/garcon/utils.py b/garcon/utils.py index 3cba14e..0958e2f 100644 --- a/garcon/utils.py +++ b/garcon/utils.py @@ -30,18 +30,18 @@ def create_dictionary_key(dictionary): return hashlib.sha1(key_parts.encode('utf-8')).hexdigest() -def non_throttle_error(swf_response_error): +def non_throttle_error(exception): """Activity Runner. Determine whether SWF Exception was a throttle or a different error. Args: - error: boto.exception.SWFResponseError instance. + exception: botocore.exceptions.Client instance. Return: - bool: True if SWFResponseError was a throttle, False otherwise. + bool: True if exception was a throttle, False otherwise. """ - return swf_response_error.error_code != 'ThrottlingException' + return exception.response.get('Error').get('Code') != 'ThrottlingException' def throttle_backoff_handler(details): """Callback to be used when a throttle backoff is invoked. diff --git a/requirements-py2.txt b/requirements-py2.txt deleted file mode 100755 index 6dc9995..0000000 --- a/requirements-py2.txt +++ /dev/null @@ -1,3 +0,0 @@ --r requirements.txt -mock==1.0.1 -futures==2.2.0 \ No newline at end of file diff --git a/requirements-tests.txt b/requirements-tests.txt new file mode 100644 index 0000000..0ded7db --- /dev/null +++ b/requirements-tests.txt @@ -0,0 +1,6 @@ +flake8 +wheel +pytest-cov==5.0.0 +pytest==8.3.2 +tox==4.18.1 +coveralls==4.0.0 diff --git a/requirements.txt b/requirements.txt index cdc0b3a..d3bd936 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,2 @@ -backoff==1.4.3 -boto==2.35.1 -pytest-cov==1.8.1 -pytest==2.6.4 -python-coveralls==2.4.3 \ No newline at end of file +backoff==2.2.1 +boto3==1.35.0 diff --git a/setup.py b/setup.py index a2c3fe3..cd03f1b 100755 --- a/setup.py +++ b/setup.py @@ -11,19 +11,21 @@ setup( name='Garcon', - version='0.3.5', + version='1.1.1', url='https://github.com/xethorn/garcon/', author='Michael Ortali', author_email='hello@xethorn.net', - description=( - 'Lightweight library for AWS SWF.'), + description=('Lightweight library for AWS SWF.'), long_description=long_description, license='MIT', packages=find_packages(), include_package_data=True, - install_requires=['boto', 'backoff'], + install_requires=['boto3', 'backoff'], zip_safe=False, classifiers=[ - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ],) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..1205cbd --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,13 @@ +try: + from unittest.mock import MagicMock +except: + from mock import MagicMock + +import pytest +import boto3 + + +@pytest.fixture +def boto_client(monkeypatch): + """Create a fake boto client.""" + return MagicMock(spec=boto3.client('swf', region_name='us-east-1')) diff --git a/tests/fixtures/flows/example.py b/tests/fixtures/flows/example.py index d0b0be7..a349072 100755 --- a/tests/fixtures/flows/example.py +++ b/tests/fixtures/flows/example.py @@ -1,12 +1,12 @@ -from __future__ import print_function - from garcon import activity from garcon import runner +import boto3 +client = boto3.client('swf', region_name='us-east-1') domain = 'dev' name = 'workflow_name' -create = activity.create(domain, name) +create = activity.create(client, domain, name) activity_1 = create( name='activity_1', diff --git a/tests/test_activity.py b/tests/test_activity.py index ba7d9df..f0bcb99 100755 --- a/tests/test_activity.py +++ b/tests/test_activity.py @@ -1,10 +1,9 @@ -from __future__ import absolute_import -from __future__ import print_function -try: - from unittest.mock import MagicMock -except: - from mock import MagicMock +from unittest.mock import MagicMock +from unittest.mock import ANY import json +import sys + +from botocore import exceptions import pytest from garcon import activity @@ -14,28 +13,26 @@ from garcon import utils from tests.fixtures import decider -import boto.exception as boto_exception - def activity_run( - monkeypatch, poll=None, complete=None, fail=None, execute=None): + monkeypatch, boto_client, poll=None, complete=None, fail=None, + execute=None): """Create an activity. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - - current_activity = activity.Activity() + current_activity = activity.Activity(boto_client) poll = poll or dict() monkeypatch.setattr( current_activity, 'execute_activity', execute or MagicMock(return_value=dict())) monkeypatch.setattr( - current_activity, 'poll', MagicMock(return_value=poll)) + boto_client, 'poll_for_activity_task', MagicMock(return_value=poll)) monkeypatch.setattr( - current_activity, 'complete', complete or MagicMock()) + boto_client, 'respond_activity_task_completed', complete or MagicMock()) monkeypatch.setattr( - current_activity, 'fail', fail or MagicMock()) + boto_client, 'respond_activity_task_failed', fail or MagicMock()) + return current_activity @@ -61,104 +58,104 @@ def dgenerator(context): @pytest.fixture def poll(): - return dict(activityId='something') + return dict( + activityId='something', + taskToken='taskToken') -def test_poll_for_activity(monkeypatch, poll=poll): +def test_poll_for_activity(monkeypatch, poll, boto_client): """Test that poll_for_activity successfully polls. """ - current_activity = activity_run(monkeypatch, poll) - current_activity.poll.return_value = 'foo' + activity_task = poll + current_activity = activity_run(monkeypatch, boto_client, poll) + boto_client.poll_for_activity_task.return_value = activity_task - activity_results = current_activity.poll_for_activity() - assert current_activity.poll.called - assert activity_results == 'foo' + activity_execution = current_activity.poll_for_activity() + assert boto_client.poll_for_activity_task.called + assert activity_execution.task_token is poll.get('taskToken') -def test_poll_for_activity_throttle_retry(monkeypatch, poll=poll): +def test_poll_for_activity_throttle_retry(monkeypatch, poll, boto_client): """Test that SWF throttles are retried during polling. """ - current_activity = activity_run(monkeypatch, poll) + current_activity = activity_run(monkeypatch, boto_client, poll) + boto_client.poll_for_activity_task.side_effect = exceptions.ClientError( + {'Error': {'Code': 'ThrottlingException'}}, + 'operation name') - response_status = 400 - response_reason = 'Bad Request' - reponse_body = ( - '{"__type": "com.amazon.coral.availability#ThrottlingException",' - '"message": "Rate exceeded"}') - json_body = json.loads(reponse_body) - exception = boto_exception.SWFResponseError( - response_status, response_reason, body=json_body) - current_activity.poll.side_effect = exception - - with pytest.raises(boto_exception.SWFResponseError): + with pytest.raises(exceptions.ClientError): current_activity.poll_for_activity() - assert current_activity.poll.call_count == 5 + assert boto_client.poll_for_activity_task.call_count == 5 -def test_poll_for_activity_error(monkeypatch, poll=poll): +def test_poll_for_activity_error(monkeypatch, poll, boto_client): """Test that non-throttle errors during poll are thrown. """ - current_activity = activity_run(monkeypatch, poll) + current_activity = activity_run(monkeypatch, boto_client, poll) exception = Exception() - current_activity.poll.side_effect = exception + boto_client.poll_for_activity_task.side_effect = exception with pytest.raises(Exception): current_activity.poll_for_activity() -def test_poll_for_activity_identity(monkeypatch, poll=poll): +def test_poll_for_activity_identity(monkeypatch, poll, boto_client): """Test that identity is passed to poll_for_activity. """ - current_activity = activity_run(monkeypatch, poll) + current_activity = activity_run(monkeypatch, boto_client, poll) current_activity.poll_for_activity(identity='foo') - current_activity.poll.assert_called_with(identity='foo') + boto_client.poll_for_activity_task.assert_called_with( + domain=ANY, taskList=ANY, identity='foo') -def test_poll_for_activity_no_identity(monkeypatch, poll=poll): +def test_poll_for_activity_no_identity(monkeypatch, poll, boto_client): """Test poll_for_activity works without identity passed as param. """ - current_activity = activity_run(monkeypatch, poll) + current_activity = activity_run(monkeypatch, boto_client, poll) current_activity.poll_for_activity() - current_activity.poll.assert_called_with(identity=None) + boto_client.poll_for_activity_task.assert_called_with( + domain=ANY, taskList=ANY) -def test_run_activity(monkeypatch, poll): +def test_run_activity(monkeypatch, poll, boto_client): """Run an activity. """ - current_activity = activity_run(monkeypatch, poll=poll) + current_activity = activity_run(monkeypatch, boto_client, poll=poll) current_activity.run() - current_activity.poll.assert_called_with(identity=None) + boto_client.poll_for_activity_task.assert_called_with( + domain=ANY, taskList=ANY) assert current_activity.execute_activity.called - assert current_activity.complete.called + assert boto_client.respond_activity_task_completed.called -def test_run_activity_identity(monkeypatch, poll): +def test_run_activity_identity(monkeypatch, poll, boto_client): """Run an activity with identity as param. """ - current_activity = activity_run(monkeypatch, poll=poll) + current_activity = activity_run(monkeypatch, boto_client, poll=poll) current_activity.run(identity='foo') - current_activity.poll.assert_called_with(identity='foo') + boto_client.poll_for_activity_task.assert_called_with( + domain=ANY, taskList=ANY, identity='foo') assert current_activity.execute_activity.called - assert current_activity.complete.called + assert boto_client.respond_activity_task_completed.called -def test_run_capture_exception(monkeypatch, poll): +def test_run_capture_exception(monkeypatch, poll, boto_client): """Run an activity with an exception raised during activity execution. """ - current_activity = activity_run(monkeypatch, poll=poll) + current_activity = activity_run(monkeypatch, boto_client, poll) current_activity.on_exception = MagicMock() current_activity.execute_activity = MagicMock() error_msg_long = "Error" * 100 @@ -166,28 +163,53 @@ def test_run_capture_exception(monkeypatch, poll): current_activity.execute_activity.side_effect = Exception(error_msg_long) current_activity.run() - assert current_activity.poll.called + assert boto_client.poll_for_activity_task.called assert current_activity.execute_activity.called assert current_activity.on_exception.called - current_activity.fail.assert_called_with(reason=actual_error_msg) + boto_client.respond_activity_task_failed.assert_called_with( + taskToken=poll.get('taskToken'), + reason=actual_error_msg) + assert not boto_client.respond_activity_task_completed.called + + +def test_run_capture_fail_exception(monkeypatch, poll, boto_client): + """Run an activity with an exception raised during failing execution. + """ + + current_activity = activity_run(monkeypatch, boto_client, poll) + current_activity.on_exception = MagicMock() + current_activity.execute_activity = MagicMock() + current_activity.complete = MagicMock() + current_activity.fail = MagicMock() + error_msg_long = "Error" * 100 + current_activity.complete.side_effect = Exception(error_msg_long) + current_activity.fail.side_effect = Exception(error_msg_long) + current_activity.run() + + assert boto_client.poll_for_activity_task.called + assert current_activity.execute_activity.called assert not current_activity.complete.called + assert not current_activity.fail.called + assert current_activity.on_exception.called -def test_run_capture_poll_exception(monkeypatch, poll): +def test_run_capture_poll_exception(monkeypatch, boto_client, poll): """Run an activity with an exception raised during poll. """ - current_activity = activity_run(monkeypatch, poll=poll) + current_activity = activity_run(monkeypatch, boto_client, poll=poll) + current_activity.on_exception = MagicMock() current_activity.execute_activity = MagicMock() + exception = Exception('poll exception') - current_activity.poll.side_effect = exception + boto_client.poll_for_activity_task.side_effect = exception current_activity.run() - assert current_activity.poll.called + assert boto_client.poll_for_activity_task.called assert current_activity.on_exception.called assert not current_activity.execute_activity.called - assert not current_activity.complete.called + assert not boto_client.respond_activity_task_completed.called current_activity.on_exception = None current_activity.logger.error = MagicMock() @@ -195,97 +217,105 @@ def test_run_capture_poll_exception(monkeypatch, poll): current_activity.logger.error.assert_called_with(exception, exc_info=True) -def test_run_activity_without_id(monkeypatch): +def test_run_activity_without_id(monkeypatch, boto_client): """Run an activity without an activity id. """ - current_activity = activity_run(monkeypatch, dict()) + current_activity = activity_run(monkeypatch, boto_client, poll=dict()) current_activity.run() - assert current_activity.poll.called + assert boto_client.poll_for_activity_task.called assert not current_activity.execute_activity.called - assert not current_activity.complete.called + assert not boto_client.respond_activity_task_completed.called -def test_run_activity_with_context(monkeypatch, poll): +def test_run_activity_with_context(monkeypatch, boto_client, poll): """Run an activity with a context. """ context = dict(foo='bar') poll.update(input=json.dumps(context)) - current_activity = activity_run(monkeypatch, poll=poll) + current_activity = activity_run(monkeypatch, boto_client, poll=poll) current_activity.run() - activity_context = current_activity.execute_activity.call_args[0][0] - assert activity_context == context + activity_execution = current_activity.execute_activity.call_args[0][0] + assert activity_execution.context == context -def test_run_activity_with_result(monkeypatch, poll): +def test_run_activity_with_result(monkeypatch, boto_client, poll): """Run an activity with a result. """ - resp = dict(foo='bar') - mock = MagicMock(return_value=resp) - current_activity = activity_run(monkeypatch, poll=poll, execute=mock) + result = dict(foo='bar') + mock = MagicMock(return_value=result) + current_activity = activity_run(monkeypatch, boto_client, poll=poll, + execute=mock) current_activity.run() - result = current_activity.complete.call_args_list[0][1].get('result') - assert result == json.dumps(resp) + boto_client.respond_activity_task_completed.assert_called_with( + result=json.dumps(result), taskToken=poll.get('taskToken')) -def test_task_failure(monkeypatch, poll): +def test_task_failure(monkeypatch, boto_client, poll): """Run an activity that has a bad task. """ resp = dict(foo='bar') mock = MagicMock(return_value=resp) - current_activity = activity_run(monkeypatch, poll=poll, execute=mock) - current_activity.execute_activity.side_effect = Exception('fail') + reason = 'fail' + current_activity = activity_run(monkeypatch, boto_client, poll=poll, + execute=mock) + current_activity.on_exception = MagicMock() + current_activity.execute_activity.side_effect = Exception(reason) current_activity.run() - assert current_activity.fail.called + boto_client.respond_activity_task_failed.assert_called_with( + taskToken=poll.get('taskToken'), + reason=reason) -def test_task_failure_on_close_activity(monkeypatch, poll): +def test_task_failure_on_close_activity(monkeypatch, boto_client, poll): """Run an activity failure when the task is already closed. """ resp = dict(foo='bar') mock = MagicMock(return_value=resp) - current_activity = activity_run(monkeypatch, poll=poll, execute=mock) + current_activity = activity_run(monkeypatch, boto_client, poll=poll, + execute=mock) + current_activity.on_exception = MagicMock() current_activity.execute_activity.side_effect = Exception('fail') - current_activity.fail.side_effect = Exception('fail') + boto_client.respond_activity_task_failed.side_effect = Exception('fail') current_activity.unset_log_context = MagicMock() current_activity.run() assert current_activity.unset_log_context.called -def test_execute_activity(monkeypatch): +def test_execute_activity(monkeypatch, boto_client): """Test the execution of an activity. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - monkeypatch.setattr(activity.Activity, 'heartbeat', lambda self: None) + monkeypatch.setattr(activity.ActivityExecution, 'heartbeat', + lambda self: None) resp = dict(task_resp='something') custom_task = MagicMock(return_value=resp) - current_activity = activity.Activity() + current_activity = activity.Activity(boto_client) current_activity.runner = runner.Sync(custom_task) - val = current_activity.execute_activity(dict(foo='bar')) + val = current_activity.execute_activity(activity.ActivityExecution( + boto_client, 'activityId', 'taskToken', '{"context": "value"}')) assert custom_task.called assert val == resp -def test_hydrate_activity(monkeypatch): +def test_hydrate_activity(monkeypatch, boto_client): """Test the hydratation of an activity. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - current_activity = activity.Activity() + current_activity = activity.Activity(boto_client) current_activity.hydrate(dict( name='activity', domain='domain', @@ -294,26 +324,25 @@ def test_hydrate_activity(monkeypatch): tasks=[lambda: dict('val')])) -def test_create_activity(monkeypatch): +def test_create_activity(monkeypatch, boto_client): """Test the creation of an activity via `create`. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - create = activity.create('domain_name', 'flow_name') + create = activity.create(boto_client, 'domain_name', 'flow_name') current_activity = create(name='activity_name') assert isinstance(current_activity, activity.Activity) assert current_activity.name == 'flow_name_activity_name' assert current_activity.task_list == 'flow_name_activity_name' assert current_activity.domain == 'domain_name' + assert current_activity.client == boto_client -def test_create_external_activity(monkeypatch): +def test_create_external_activity(monkeypatch, boto_client): """Test the creation of an external activity via `create`. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - create = activity.create('domain_name', 'flow_name') + create = activity.create(boto_client, 'domain_name', 'flow_name') current_activity = create( name='activity_name', @@ -335,7 +364,6 @@ def test_create_activity_worker(monkeypatch): """Test the creation of an activity worker. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) from tests.fixtures.flows import example worker = activity.ActivityWorker(example) @@ -345,14 +373,12 @@ def test_create_activity_worker(monkeypatch): assert not worker.worker_activities -def test_instances_creation(monkeypatch, generators): +def test_instances_creation(monkeypatch, boto_client, generators): """Test the creation of an activity instance id with the use of a local context. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - - local_activity = activity.Activity() + local_activity = activity.Activity(boto_client) external_activity = activity.ExternalActivity(timeout=60) for current_activity in [local_activity, external_activity]: @@ -374,7 +400,7 @@ def test_instances_creation(monkeypatch, generators): assert not instances[0].local_context -def test_activity_timeouts(monkeypatch, generators): +def test_activity_timeouts(monkeypatch, boto_client, generators): """Test the creation of an activity timeouts. More details: the timeout of a task is 120s, the schedule to start is 1000, @@ -390,8 +416,7 @@ def test_activity_timeouts(monkeypatch, generators): def local_task(): return - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - current_activity = activity.Activity() + current_activity = activity.Activity(boto_client) current_activity.hydrate(dict(schedule_to_start=start_timeout)) current_activity.generators = generators current_activity.runner = runner.Sync( @@ -408,14 +433,13 @@ def local_task(): schedule_to_start + instance.timeout) -def test_external_activity_timeouts(monkeypatch, generators): +def test_external_activity_timeouts(monkeypatch, boto_client, generators): """Test the creation of an external activity timeouts. """ timeout = 120 start_timeout = 1000 - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) current_activity = activity.ExternalActivity(timeout=timeout) current_activity.hydrate(dict(schedule_to_start=start_timeout)) current_activity.generators = generators @@ -430,44 +454,39 @@ def test_external_activity_timeouts(monkeypatch, generators): schedule_to_start + instance.timeout) -def test_worker_run(monkeypatch): +@pytest.mark.skipif(sys.version_info < (3, 0), reason="requires Python3") +def test_worker_run(monkeypatch, boto_client): """Test running the worker. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - from tests.fixtures.flows import example worker = activity.ActivityWorker(example) assert len(worker.activities) == 4 for current_activity in worker.activities: - monkeypatch.setattr( - current_activity, 'run', MagicMock(return_value=False)) + monkeypatch.setattr( + current_activity, 'run', MagicMock(return_value=False)) worker.run() assert len(worker.activities) == 4 for current_activity in worker.activities: - # this check was originally `assert current_activity.run.called` - # for some reason this fails on py2.7, so we explicitly check for - # `called == 1`. - assert current_activity.run.called == 1 + assert current_activity.run.called def test_worker_run_with_skipped_activities(monkeypatch): """Test running the worker with defined activities. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - monkeypatch.setattr( - activity.Activity, 'run', MagicMock(return_value=False)) + monkeypatch.setattr(activity.Activity, 'run', MagicMock(return_value=False)) + from tests.fixtures.flows import example worker = activity.ActivityWorker(example, activities=['activity_1']) assert len(worker.worker_activities) == 1 for current_activity in worker.activities: - monkeypatch.setattr( - current_activity, 'run', MagicMock(return_value=False)) + monkeypatch.setattr( + current_activity, 'run', MagicMock(return_value=False)) worker.run() diff --git a/tests/test_context.py b/tests/test_context.py index 96a8429..8e95698 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -1,26 +1,13 @@ -from __future__ import absolute_import -try: - from unittest.mock import MagicMock -except: - from mock import MagicMock -import boto.swf.layer2 as swf +from unittest.mock import MagicMock from garcon import context from tests.fixtures import decider as decider_events -def mock(monkeypatch): - for base in [swf.Decider, swf.WorkflowType, swf.ActivityType, swf.Domain]: - monkeypatch.setattr(base, '__init__', MagicMock(return_value=None)) - if base is not swf.Decider: - monkeypatch.setattr(base, 'register', MagicMock()) - - def test_context_creation_without_events(monkeypatch): """Check the basic context creation. """ - mock(monkeypatch) current_context = context.ExecutionContext() assert not current_context.current assert not current_context.workflow_input @@ -30,7 +17,6 @@ def test_context_creation_with_events(monkeypatch): """Test context creation with events. """ - mock(monkeypatch) from tests.fixtures import decider as poll current_context = context.ExecutionContext(poll.history.get('events')) @@ -40,7 +26,6 @@ def test_get_workflow_execution_info(monkeypatch): """Check that the workflow execution info are properly extracted """ - mock(monkeypatch) from tests.fixtures import decider as poll current_context = context.ExecutionContext() diff --git a/tests/test_decider.py b/tests/test_decider.py index 099e179..20baf59 100755 --- a/tests/test_decider.py +++ b/tests/test_decider.py @@ -1,10 +1,6 @@ -from __future__ import absolute_import -try: - from unittest.mock import MagicMock -except: - from mock import MagicMock -import boto.swf.layer2 as swf +from unittest.mock import MagicMock import json + import pytest from garcon import decider @@ -12,21 +8,14 @@ from tests.fixtures import decider as decider_events -def mock(monkeypatch): - for base in [swf.Decider, swf.WorkflowType, swf.ActivityType, swf.Domain]: - monkeypatch.setattr(base, '__init__', MagicMock(return_value=None)) - if base is not swf.Decider: - monkeypatch.setattr(base, 'register', MagicMock()) - - def test_create_decider(monkeypatch): """Create a decider and check the behavior of the registration. """ - mock(monkeypatch) from tests.fixtures.flows import example d = decider.DeciderWorker(example) + assert d.client assert len(d.activities) == 4 assert d.flow assert d.domain @@ -45,7 +34,6 @@ def test_get_history(monkeypatch): """Test the decider history """ - mock(monkeypatch) from tests.fixtures.flows import example events = decider_events.history.get('events') @@ -53,13 +41,19 @@ def test_get_history(monkeypatch): events = events[:half * 2] pool_1 = events[half:] pool_2 = events[:half] + identity = 'identity' d = decider.DeciderWorker(example) - d.poll = MagicMock(return_value={'events': pool_2}) + d.client.poll_for_decision_task = MagicMock(return_value={'events': pool_2}) - resp = d.get_history({'events': pool_1, 'nextPageToken': 'nextPage'}) + resp = d.get_history( + identity, {'events': pool_1, 'nextPageToken': 'nextPage'}) - d.poll.assert_called_with(next_page_token='nextPage') + d.client.poll_for_decision_task.assert_called_with( + domain=example.domain, + nextPageToken='nextPage', + identity=identity, + taskList=dict(name=d.task_list)) assert len(resp) == len([ evt for evt in events if evt['eventType'].startswith('Decision')]) @@ -68,12 +62,12 @@ def test_get_activity_states(monkeypatch): """Test get activity states from history. """ - mock(monkeypatch) from tests.fixtures.flows import example + identity= 'identity' events = decider_events.history.get('events') d = decider.DeciderWorker(example) - history = d.get_history({'events': events}) + history = d.get_history(identity, {'events': events}) states = d.get_activity_states(history) for activity_name, activity_instances in states.items(): @@ -85,11 +79,12 @@ def test_running_workflow(monkeypatch): """Test running a workflow. """ - mock(monkeypatch) from tests.fixtures.flows import example - d = decider.DeciderWorker(example) - d.poll = MagicMock(return_value=decider_events.history) + d = decider.DeciderWorker(example, register=False) + d.client.poll_for_decision_task = MagicMock( + return_value=decider_events.history) + d.client.respond_decision_task_completed = MagicMock() d.complete = MagicMock() d.create_decisions_from_flow = MagicMock() @@ -112,36 +107,41 @@ def test_running_workflow_identity(monkeypatch): """Test running a workflow with and without identity. """ - mock(monkeypatch) from tests.fixtures.flows import example - d = decider.DeciderWorker(example) - d.poll = MagicMock() + d = decider.DeciderWorker(example, register=False) + d.client.poll_for_decision_task = MagicMock() d.complete = MagicMock() d.create_decisions_from_flow = MagicMock() # assert running decider without identity d.run() - d.poll.assert_called_with(identity=None) + d.client.poll_for_decision_task.assert_called_with( + domain=d.domain, + taskList=dict(name=d.task_list), + identity='') # assert running decider with identity d.run('foo') - d.poll.assert_called_with(identity='foo') + d.client.poll_for_decision_task.assert_called_with( + domain=d.domain, + taskList=dict(name=d.task_list), + identity='foo') def test_running_workflow_exception(monkeypatch): """Run a decider with an exception raised during poll. """ - mock(monkeypatch) from tests.fixtures.flows import example - d = decider.DeciderWorker(example) - d.poll = MagicMock(return_value=decider_events.history) + d = decider.DeciderWorker(example, register=False) + d.client.poll_for_decision_task = MagicMock( + return_value=decider_events.history) d.complete = MagicMock() d.create_decisions_from_flow = MagicMock() exception = Exception('test') - d.poll.side_effect = exception + d.client.poll_for_decision_task.side_effect = exception d.on_exception = MagicMock() d.logger.error = MagicMock() d.run() @@ -154,10 +154,9 @@ def test_create_decisions_from_flow_exception(monkeypatch): """Test exception is raised and workflow fails when exception raised. """ - mock(monkeypatch) from tests.fixtures.flows import example - decider_worker = decider.DeciderWorker(example) + decider_worker = decider.DeciderWorker(example, register=False) decider_worker.logger.error = MagicMock() decider_worker.on_exception = MagicMock() @@ -165,14 +164,17 @@ def test_create_decisions_from_flow_exception(monkeypatch): monkeypatch.setattr(decider.activity, 'find_available_activities', MagicMock(side_effect = exception)) - mock_decisions = MagicMock() + decisions = [] mock_activity_states = MagicMock() mock_context = MagicMock() decider_worker.create_decisions_from_flow( - mock_decisions, mock_activity_states, mock_context) + decisions, mock_activity_states, mock_context) + + failed_execution = dict( + decisionType='FailWorkflowExecution', + failWorkflowExecutionDecisionAttributes=dict(reason=str(exception))) - mock_decisions.fail_workflow_execution.assert_called_with( - reason=str(exception)) + assert failed_execution in decisions assert decider_worker.on_exception.called decider_worker.logger.error.assert_called_with(exception, exc_info=True) @@ -181,11 +183,10 @@ def test_running_workflow_without_events(monkeypatch): """Test running a workflow without having any events. """ - mock(monkeypatch) from tests.fixtures.flows import example - d = decider.DeciderWorker(example) - d.poll = MagicMock(return_value={}) + d = decider.DeciderWorker(example, register=False) + d.client.poll_for_decision_task = MagicMock(return_value={}) d.get_history = MagicMock() d.run() @@ -207,7 +208,6 @@ def test_schedule_with_unscheduled_activity(monkeypatch): """Test the scheduling of an activity. """ - mock(monkeypatch) from tests.fixtures.flows import example monkeypatch.setattr(decider, 'schedule_activity_task', MagicMock()) @@ -229,7 +229,6 @@ def test_schedule_with_scheduled_activity(monkeypatch): """Test the scheduling of an activity. """ - mock(monkeypatch) from tests.fixtures.flows import example monkeypatch.setattr(decider, 'schedule_activity_task', MagicMock()) @@ -261,7 +260,6 @@ def test_schedule_with_completed_activity(monkeypatch): """Test the scheduling of an activity. """ - mock(monkeypatch) from tests.fixtures.flows import example monkeypatch.setattr(decider, 'schedule_activity_task', MagicMock()) @@ -306,65 +304,74 @@ def test_schedule_activity_task(monkeypatch): """Test scheduling an activity task. """ - mock(monkeypatch) from tests.fixtures.flows import example instance = list(example.activity_1.instances({}))[0] - decisions = MagicMock() + decisions = [] decider.schedule_activity_task(decisions, instance) - decisions.schedule_activity_task.assert_called_with( - instance.id, - instance.activity_name, - '1.0', - task_list=instance.activity_worker.task_list, - input=json.dumps(instance.create_execution_input()), - heartbeat_timeout=str(instance.heartbeat_timeout), - start_to_close_timeout=str(instance.timeout), - schedule_to_start_timeout=str(instance.schedule_to_start), - schedule_to_close_timeout=str(instance.schedule_to_close)) + expects = dict( + decisionType='ScheduleActivityTask', + scheduleActivityTaskDecisionAttributes=dict( + activityId=instance.id, + activityType=dict( + name=instance.activity_name, + version='1.0'), + taskList=dict(name=instance.activity_worker.task_list), + input=json.dumps(instance.create_execution_input()), + heartbeatTimeout=str(instance.heartbeat_timeout), + startToCloseTimeout=str(instance.timeout), + scheduleToStartTimeout=str(instance.schedule_to_start), + scheduleToCloseTimeout=str(instance.schedule_to_close))) + assert expects in decisions def test_schedule_activity_task_with_version(monkeypatch): """Test scheduling an activity task with a version. """ - mock(monkeypatch) from tests.fixtures.flows import example instance = list(example.activity_1.instances({}))[0] - decisions = MagicMock() + decisions = [] version = '2.0' decider.schedule_activity_task(decisions, instance, version=version) - decisions.schedule_activity_task.assert_called_with( - instance.id, - instance.activity_name, - version, - task_list=instance.activity_worker.task_list, - input=json.dumps(instance.create_execution_input()), - heartbeat_timeout=str(instance.heartbeat_timeout), - start_to_close_timeout=str(instance.timeout), - schedule_to_start_timeout=str(instance.schedule_to_start), - schedule_to_close_timeout=str(instance.schedule_to_close)) - - -def test_schedule_activity_task_with_version(monkeypatch): + expects = dict( + decisionType='ScheduleActivityTask', + scheduleActivityTaskDecisionAttributes=dict( + activityId=instance.id, + activityType=dict( + name=instance.activity_name, + version=version), + taskList=dict(name=instance.activity_worker.task_list), + input=json.dumps(instance.create_execution_input()), + heartbeatTimeout=str(instance.heartbeat_timeout), + startToCloseTimeout=str(instance.timeout), + scheduleToStartTimeout=str(instance.schedule_to_start), + scheduleToCloseTimeout=str(instance.schedule_to_close))) + assert expects in decisions + + +def test_schedule_activity_task_with_custom_id(monkeypatch): """Test scheduling an activity task with a custom id. """ - mock(monkeypatch) from tests.fixtures.flows import example instance = list(example.activity_1.instances({}))[0] - decisions = MagicMock() + decisions = [] custom_id = 'special_id' decider.schedule_activity_task(decisions, instance, id=custom_id) - decisions.schedule_activity_task.assert_called_with( - custom_id, - instance.activity_name, - '1.0', - task_list=instance.activity_worker.task_list, - input=json.dumps(instance.create_execution_input()), - heartbeat_timeout=str(instance.heartbeat_timeout), - start_to_close_timeout=str(instance.timeout), - schedule_to_start_timeout=str(instance.schedule_to_start), - schedule_to_close_timeout=str(instance.schedule_to_close)) + expects = dict( + decisionType='ScheduleActivityTask', + scheduleActivityTaskDecisionAttributes=dict( + activityId=custom_id, + activityType=dict( + name=instance.activity_name, + version='1.0'), + taskList=dict(name=instance.activity_worker.task_list), + input=json.dumps(instance.create_execution_input()), + heartbeatTimeout=str(instance.heartbeat_timeout), + startToCloseTimeout=str(instance.timeout), + scheduleToStartTimeout=str(instance.schedule_to_start), + scheduleToCloseTimeout=str(instance.schedule_to_close))) + assert expects in decisions diff --git a/tests/test_runner.py b/tests/test_runner.py index 0fe408c..8f0d42e 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,8 +1,4 @@ -from __future__ import absolute_import -try: - from unittest.mock import MagicMock -except: - from mock import MagicMock +from unittest.mock import MagicMock import pytest @@ -23,18 +19,18 @@ def test_execute_default_task_runner(): current_runner.execute(None, None) -def test_synchronous_tasks(monkeypatch): +def test_synchronous_tasks(monkeypatch, boto_client): """Test synchronous tasks. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - monkeypatch.setattr(activity.Activity, 'heartbeat', lambda self: None) + monkeypatch.setattr(activity.ActivityExecution, 'heartbeat', + lambda self: None) resp = dict(foo='bar') current_runner = runner.Sync( MagicMock(), MagicMock(return_value=resp)) - current_activity = activity.Activity() - current_activity.hydrate(dict(runner=current_runner)) + current_activity = activity.ActivityExecution( + boto_client, 'activityId', 'taskToken', '{"context": "value"}') result = current_runner.execute(current_activity, EMPTY_CONTEXT) @@ -46,12 +42,12 @@ def test_synchronous_tasks(monkeypatch): assert resp == result -def test_aynchronous_tasks(monkeypatch): +def test_aynchronous_tasks(monkeypatch, boto_client): """Test asynchronous tasks. """ - monkeypatch.setattr(activity.Activity, '__init__', lambda self: None) - monkeypatch.setattr(activity.Activity, 'heartbeat', lambda self: None) + monkeypatch.setattr(activity.ActivityExecution, 'heartbeat', + lambda self: None) tasks = [MagicMock() for i in range(5)] tasks[2].return_value = dict(oi='mondo') @@ -66,11 +62,10 @@ def test_aynchronous_tasks(monkeypatch): assert current_runner.max_workers == workers assert len(current_runner.tasks) == len(tasks) - current_activity = activity.Activity() - current_activity.hydrate(dict(runner=current_runner)) + current_activity = activity.ActivityExecution(boto_client, + 'activityId', 'taskToken', '{"hello": "world"}') - context = dict(hello='world') - resp = current_runner.execute(current_activity, context) + resp = current_runner.execute(current_activity, current_activity.context) for current_task in tasks: assert current_task.called diff --git a/tests/test_task.py b/tests/test_task.py index 7cb9ba6..b060685 100755 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,9 +1,5 @@ -from __future__ import absolute_import import functools -try: - from unittest.mock import MagicMock -except: - from mock import MagicMock +from unittest.mock import MagicMock import pytest @@ -64,7 +60,7 @@ def test(): assert test.__garcon__.get('foo') is None task._decorate(test, 'foo', 'bar') - assert test.__garcon__.get('foo') is 'bar' + assert test.__garcon__.get('foo') == 'bar' def test_generator_decorator(): @@ -108,7 +104,7 @@ def testB(): task._link_decorator(testA, testB) assert not getattr(testA, '__garcon__', None) - assert len(testB.__garcon__) is 0 + assert len(testB.__garcon__) == 0 task._decorate(testB, 'foo', 'value') assert testB.__garcon__.get('foo') == 'value' @@ -123,7 +119,7 @@ def test_task_decorator(): @task.decorate(timeout=timeout) def test(user): - assert user is userinfo + assert user == userinfo assert test.__garcon__.get('timeout') == timeout assert test.__garcon__.get('heartbeat') == timeout @@ -153,6 +149,7 @@ def test_task_decorator_with_heartbeat(): """ heartbeat = 50 + userinfo = None @task.decorate(heartbeat=heartbeat) def test(user): diff --git a/tests/test_utils.py b/tests/test_utils.py index 44564c7..333dac2 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,7 +2,7 @@ from unittest.mock import MagicMock except: from mock import MagicMock -import boto.exception as boto_exception +from botocore import exceptions import datetime import pytest import json @@ -53,23 +53,15 @@ def test_non_throttle_error(): """Assert SWF error is evaluated as non-throttle error properly. """ - response_status = 400 - response_reason = 'Bad Request' - reponse_body = ( - '{"__type": "com.amazon.coral.availability#ThrottlingException",' - '"message": "Rate exceeded"}') - json_body = json.loads(reponse_body) - exception = boto_exception.SWFResponseError( - response_status, response_reason, body=json_body) + exception = exceptions.ClientError( + {'Error': {'Code': 'ThrottlingException'}}, + 'operationName') result = utils.non_throttle_error(exception) assert not utils.non_throttle_error(exception) - reponse_body = ( - '{"__type": "com.amazon.coral.availability#OtheException",' - '"message": "Rate exceeded"}') - json_body = json.loads(reponse_body) - exception = boto_exception.SWFResponseError( - response_status, response_reason, body=json_body) + exception = exceptions.ClientError( + {'Error': {'Code': 'RateExceeded'}}, + 'operationName') assert utils.non_throttle_error(exception) def test_throttle_backoff_handler(): diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..2c7d2b4 --- /dev/null +++ b/tox.ini @@ -0,0 +1,9 @@ +[tox] +envlist = py38,py39,py310,py311,py312 + +[testenv] +deps = -rrequirements-tests.txt +commands = py.test + +[testenv:py3] +deps = -rrequirements.txt