A messaging integration for the event driven pattern utilizing AWS SNS and AWS SQS for Insanic.
Iniesta is a messaging integration plugin for Insanic. This was initially created to easily apply the event driven pattern to services running Insanic.
For a bit of context, Andrés Iniesta is a Spanish professional soccer player who plays as a central midfielder. He is considered one the best soccer players and one of the greatest midfielders of all time. For those of you unfamiliar with soccer, a midfielder is responsible for playmaking, passing the soccer ball from the defense to the forwards.
Consequently, this project aims to be the messenger between services; a proxy, for sending messages(the soccer ball) from the producers(defenders) to the consumer(strikers) albeit the messages fan out and there is only one soccer ball.
- Asynchronous message handling.
- Produce messages to a global SNS.
- Filters for verification and subscribing SQS to SNS.
- Polling for SQS and receiving messages.
- Decorator for consuming messages with defined parameters.
- Locks for idempotent message handling.
- Checks for if proper subscriptions have been setup.
- Checks for if proper permissions has been setup.
- Decorators for emitting messages.
Prerequisites:
- python >= 3.6
- AWS Credentials
To install:
pip install insanic-iniesta
To setup, we need a couple settings.
INIESTA_INITIALIZATION_TYPE
: (list[string]) List of initialization types defined by InitializationTypes enum.- Choices: "QUEUE_POLLING", "EVENT_POLLING", "SNS_PRODUCER", "CUSTOM"
INIESTA_SQS_CONSUMER_FILTERS
: (list) default:[] A list of filters for the message events your service will want to receive.INIESTA_SNS_PRODUCER_GLOBAL_TOPIC_ARN
: (string) default:None The global sns arn.INIESTA_SNS_EVENT_KEY
: (string) default:iniesta_pass The key the event will be published under. Will NOT want to change this.INIESTA_LOCK_RETRY_COUNT
: (int) default:1 Lock retry count when lock is unable to be requiredINIESTA_LOCK_TIMEOUT
: (int) default:10s Timeout for the lock when received
There are several initialization types because not all applications need to produce, or receive messages at the same time. So you would need to set the initialization type catered towards your use case.
First we need to decide on the type of initialization we need
to use. For Iniesta to know the initialization type,
we need to set INIESTA_INITIALIZATION_TYPE
in our
config. Until we do so, you will not be able to run
Insanic.
# in your application config.py
INIESTA_INITIALIZATION_TYPE = ["SNS_PRODUCER"]
# and finally in app.py
from insanic import Insanic
from iniesta import Iniesta
app = Insanic('producer', version="0.0.1")
Iniesta.init_app(app)
For more documentation on initialization types, please refer to the Iniesta's Documentation.
You would want to setup Iniesta if you ONLY need to produce messages to SNS.
from iniesta.sns import SNSClient
sns = SNSClient(topic_arn)
message = sns.create_message(event="EventHappened", message={"id": 1}, version=1)
await message.publish()
This will publish a message to SNS with the event
specified in the parameters. The published event will be
{event}.{service_name}
. Even if you don't specify the service_name in
the event, it will automatically be appended.
To consume messages that other applications have produced,
we setup Iniesta for EVENT_POLLING
.
There are several checks Iniesta performs when
initializing for EVENT_POLLING
.
- Checks if the AWS SQS has been created.
- Checks if global arn is set (
INIESTA_SNS_PRODUCER_GLOBAL_TOPIC_ARN
) - Checks if filters have been defined (
INIESTA_SQS_CONSUMER_FILTERS
). - Checks if subscriptions has been made with service SQS and SNS.
- Checks if necessary permissions have been put in place.
Initial setup for event polling:
# in your config.py
INIESTA_INITIALIZATION_TYPE = ['EVENT_POLLING']
# in service named receiver
from insanic import Insanic
from iniesta import Iniesta
app = Insanic('receiver')
Iniesta.init_app(app)
Since we have initialized for polling our queue, we need to create handlers for processing the messages.
For creating a handler for an event:
from iniesta.sqs import SQSClient
@SQSClient.handler('EventHappened.producer')
def event_happened_handler(message):
# .. do some logic ..
pass
The function must receive message
as an argument.
If the function successfully runs, the message will be
deleted from SQS.
For other use cases for Iniesta, please refer to the Documentation.
Iniesta provides several commands to help testing during development.
$ pip install iniesta[cli]
$ iniesta --help
Usage: iniesta [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
filter-policies
initialization-type
publish
send
Please refer to the Iniesta's Commands Documentation for more information on each of the available commands.
- If the module including the handlers are not imported, they do not properly register.
- To prevent this import the module somewhere(e.g. in your app.py) until a better solution is found.
View release history here
For guidance on setting up a development environment and how to make a contribution to Iniesta, see the CONTRIBUTING.rst guidelines.
Distributed under the MIT license. See LICENSE for more information.
Thanks to all the people at my prior company that worked with me to make this possible.
- Documentation: https://iniesta.readthedocs.io/en/latest/
- Releases: https://pypi.org/project/insanic-iniesta/
- Code: https://www.github.com/crazytruth/iniesta/
- Issue Tracker: https://www.github.com/crazytruth/iniesta/issues
- Insanic Documentation: http://insanic.readthedocs.io/
- Insanic Repository: https://www.github.com/crazytruth/insanic/