diff --git a/.gitignore b/.gitignore index a325920c..acbe1e84 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ examples/cert.pem *.pyc *.pyo .*.swp +.pytype/ diff --git a/.lvimrc b/.lvimrc new file mode 100644 index 00000000..49c62e28 --- /dev/null +++ b/.lvimrc @@ -0,0 +1,4 @@ +let g:ale_fix_on_save = 1 +let g:ale_fixers = { +\ 'python': ['autopep8'], +\} diff --git a/.travis.yml b/.travis.yml index a394177c..9a9a32c7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,18 @@ language: python python: - - "2.7" - - "3.5" - - "3.6" - - "3.7" + - "2.7.18" + - "3.5.9" + - "3.6.12" + - "3.7.9" + - "3.8.6" dist: xenial # https://github.com/travis-ci/travis-ci/issues/9069#issuecomment-425720905 install: - travis_retry pip install -r test/requirements.txt - travis_retry pip install coveralls - travis_retry pip install -e . -script: py.test --cov=slimta +script: + - py.test --cov=slimta + - flake8 slimta + - pytype -k after_success: - coveralls diff --git a/setup.cfg b/setup.cfg index 5851c535..9241ecea 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,3 +13,5 @@ exclude_lines = pragma: no cover raise NotImplementedError +[pytype] +inputs = slimta diff --git a/setup.py b/setup.py index 74f17e4f..eb6733ff 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ setup(name='python-slimta', - version='4.0.11', + version='4.1.0', author='Ian Good', author_email='icgood@gmail.com', description='Lightweight, asynchronous SMTP libraries.', @@ -35,6 +35,12 @@ 'pysasl >= 0.4.0, < 0.5', 'pycares < 3.0.0; python_version < "3.0"', 'pycares >= 1; python_version >= "3.0"'], + extras_require={'spf': ['pyspf', 'py3dns; python_version >= "3.0"', + 'pydns; python_version < "3.0"', + 'ipaddr; python_version < "3.0"'], + 'redis': ['redis'], + 'aws': ['boto'], + 'disk': ['pyaio >= 0.4; platform_system == "Linux"']}, classifiers=['Development Status :: 3 - Alpha', 'Topic :: Communications :: Email :: Mail Transport Agents', 'Intended Audience :: Developers', @@ -42,9 +48,10 @@ 'License :: OSI Approved :: MIT License', 'Programming Language :: Python', 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6']) + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8']) # vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/cloudstorage/__init__.py b/slimta/cloudstorage/__init__.py new file mode 100644 index 00000000..48f9bbc9 --- /dev/null +++ b/slimta/cloudstorage/__init__.py @@ -0,0 +1,127 @@ +# Copyright (c) 2013 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Package containing a module for the different cloud service providers along +with any necessary helper modules. + +.. _Cloud Files: http://www.rackspace.com/cloud/files/ +.. _Cloud Queues: http://www.rackspace.com/cloud/queues/ +.. _S3: http://aws.amazon.com/s3/ +.. _SQS: http://aws.amazon.com/sqs/ + +""" + +from __future__ import absolute_import + +from slimta.queue import QueueError, QueueStorage +from slimta import logging + +__all__ = ['CloudStorageError', 'CloudStorage'] + +log = logging.getQueueStorageLogger(__name__) + + +class CloudStorageError(QueueError): + """Base exception for all exceptions in the package. + + """ + pass + + +class CloudStorage(QueueStorage): + """This class implements a :class:`~slimta.queue.QueueStorage` backend that + uses cloud services to store messages. It coordinates the storage of + messages and metadata (using `Cloud Files`_ or `S3`_) with the optional + message queue mechanisms (using `Cloud Queues`_ or `SQS`_) that can alert + other *slimta* processes that a new message is available in the object + store. + + :param object_store: The object used as the backend for storing message + contents and metadata in the cloud. Currently this can + be an instance of + :class:`~rackspace.RackspaceCloudFiles` or + :class:`~aws.SimpleStorageService`. + :param message_queue: The optional object used + as the backend for alerting other processes that a + new message is in the object store. Currently this + can be an instance of + :class:`~rackspace.RackspaceCloudQueues` or + :class:`~aws.SimpleQueueService`. + + """ + + def __init__(self, object_store, message_queue=None): + super(CloudStorage, self).__init__() + self.obj_store = object_store + self.msg_queue = message_queue + + def write(self, envelope, timestamp): + storage_id = self.obj_store.write_message(envelope, timestamp) + if self.msg_queue: + try: + self.msg_queue.queue_message(storage_id, timestamp) + except Exception: + logging.log_exception(__name__) + log.write(storage_id, envelope) + return storage_id + + def set_timestamp(self, id, timestamp): + self.obj_store.set_message_meta(id, timestamp=timestamp) + log.update_meta(id, timestamp=timestamp) + + def increment_attempts(self, id): + meta = self.obj_store.get_message_meta(id) + new_attempts = meta['attempts'] + 1 + self.obj_store.set_message_meta(id, attempts=new_attempts) + log.update_meta(id, attempts=new_attempts) + return new_attempts + + def set_recipients_delivered(self, id, rcpt_indexes): + meta = self.obj_store.get_message_meta(id) + current = meta.get('delivered_indexes', []) + new = current + rcpt_indexes + self.obj_store.set_message_meta(id, delivered_indexes=new) + log.update_meta(id, delivered_indexes=rcpt_indexes) + + def load(self): + return self.obj_store.list_messages() + + def get(self, id): + envelope, meta = self.obj_store.get_message(id) + delivered_rcpts = meta.get('delivered_indexes', []) + self._remove_delivered_rcpts(envelope, delivered_rcpts) + return envelope, meta.get('attempts', 0) + + def remove(self, id): + self.obj_store.delete_message(id) + log.remove(id) + + def wait(self): + if self.msg_queue: + for timestamp, storage_id, message_id in self.msg_queue.poll(): + yield (timestamp, storage_id) + self.msg_queue.delete(message_id) + self.msg_queue.sleep() + else: + raise NotImplementedError() + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/cloudstorage/aws.py b/slimta/cloudstorage/aws.py new file mode 100644 index 00000000..fc42916f --- /dev/null +++ b/slimta/cloudstorage/aws.py @@ -0,0 +1,215 @@ +# Copyright (c) 2013 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""This module defines the queue storage mechanism specific to the `Amazon Web +Services`_ hosting service. It requires an account as well as the `Simple +Storage Service (S3)`_ and optionally the `Simple Queue Service (SQS)`_ +services. + +For each queued message, the contents and metadata of the message are written +to *S3*. Upon success, a reference to the S3 object is injected into *SQS* as a +new message. + +The *SQS* service is only necessary for alerting separate *slimta* processes +that a new message has been queued. If reception and relaying are happening in +the same process, *SQS* is unnecessary. + +**NOTE:** This module uses the `boto`_ library to communicate with *AWS*. To +avoid performance issues, you must use gevent `monkey patching`_ before using +it! + +:: + + from gevent import monkey; monkey.patch_all() + + s3_conn = boto.connect_s3() + s3_bucket = s3_conn.get_bucket('slimta-queue') + s3 = SimpleStorageService(s3_bucket) + + sqs_conn = boto.sqs.connect_to_region('us-west-2') + sqs_queue = sqs_conn.create_queue('slimta-queue') + sqs = SimpleQueueService(sqs_queue) + + queue_storage = CloudStorage(s3, sqs) + +.. _Amazon Web Services: http://aws.amazon.com/ +.. _Simple Storage Service (S3): http://aws.amazon.com/s3/ +.. _Simple Queue Service (SQS): http://aws.amazon.com/sqs/ +.. _boto: http://boto.readthedocs.org/en/latest/ +.. _monkey patching: http://gevent.org/intro.html#monkey-patching + +""" + +from __future__ import absolute_import + +import uuid +import json + +from six.moves import cPickle + +import gevent +from boto.s3.key import Key +from boto.sqs.message import Message + +__all__ = ['SimpleStorageService', 'SimpleQueueService'] + + +class SimpleStorageService(object): + """Instances of this class may be passed in to the + :class:`~slimta.cloudstorage.CloudStorage` constructor for the ``storage`` + parameter to use *S3* as the storage backend. + + Keys added to the bucket are generated with ``prefix + str(uuid.uuid4())``. + + :param bucket: The S3 bucket object in which all message contents and + metadata will be written. Each created S3 object will use a + :py:mod:`uuid` string as its key. + :type bucket: :class:`boto.s3.bucket.Bucket` + :param timeout: Timeout, in seconds, before requests to *S3* will fail and + raise an exception. + :param prefix: The string prefixed to every key added to the bucket. + + """ + + def __init__(self, bucket, timeout=None, prefix=''): + super(SimpleStorageService, self).__init__() + self.bucket = bucket + self.timeout = timeout + self.prefix = prefix + self.Key = Key + + def _get_key(self, id): + key = self.bucket.get_key(id) + if not key: + raise KeyError(id) + return key + + def write_message(self, envelope, timestamp): + key = self.Key(self.bucket) + key.key = self.prefix+str(uuid.uuid4()) + envelope_raw = cPickle.dumps(envelope, cPickle.HIGHEST_PROTOCOL) + with gevent.Timeout(self.timeout): + key.set_metadata('timestamp', json.dumps(timestamp)) + key.set_metadata('attempts', '') + key.set_metadata('delivered_indexes', '') + key.set_contents_from_string(envelope_raw) + return key.key + + def set_message_meta(self, id, timestamp=None, attempts=None, + delivered_indexes=None): + key = self._get_key(id) + with gevent.Timeout(self.timeout): + if timestamp is not None: + key.set_metadata('timestamp', json.dumps(timestamp)) + if attempts is not None: + key.set_metadata('attempts', json.dumps(attempts)) + if delivered_indexes is not None: + key.set_metadata('delivered_indexes', + json.dumps(delivered_indexes)) + + def delete_message(self, id): + key = self._get_key(id) + with gevent.Timeout(self.timeout): + key.delete() + + def get_message(self, id): + key = self._get_key(id) + with gevent.Timeout(self.timeout): + envelope_raw = key.get_contents_as_string() + timestamp_raw = key.get_metadata('timestamp') + attempts_raw = key.get_metadata('attempts') + delivered_raw = key.get_metadata('delivered_indexes') + envelope = cPickle.loads(envelope_raw) + meta = {'timestamp': json.loads(timestamp_raw)} + if attempts_raw: + meta['attempts'] = json.loads(attempts_raw) + if delivered_raw: + meta['delivered_indexes'] = json.loads(delivered_raw) + return envelope, meta + + def get_message_meta(self, id): + key = self._get_key(id) + with gevent.Timeout(self.timeout): + timestamp_raw = key.get_metadata('timestamp') + attempts_raw = key.get_metadata('attempts') + delivered_raw = key.get_metadata('delivered_indexes') + meta = {'timestamp': json.loads(timestamp_raw)} + if attempts_raw: + meta['attempts'] = json.loads(attempts_raw) + if delivered_raw: + meta['delivered_indexes'] = json.loads(delivered_raw) + return meta + + def list_messages(self): + with gevent.Timeout(self.timeout): + ids = list(self.bucket.list(self.prefix)) + for id in ids: + timestamp, attempts = self.get_message_meta(id) + yield (timestamp, id) + + +class SimpleQueueService(object): + """Instances of this class may be passed in to the + :class:`~slimta.cloudstorage.CloudStorage` constructor for the + ``message_queue`` parameter to use *SQS* as the message queue backend to + alert other processes that a new message was stored. + + :param queue: The SQS queue object in which each new message corresponds to + a new object in storage. + :type queue: :class:`boto.sqs.queue.Queue` + :param timeout: Timeout, in seconds, before requests to *S3* will fail and + raise an exception. + :param poll_pause: The time, in seconds, to idle between attempts to poll + the queue for new messages. + + """ + + def __init__(self, queue, timeout=None, poll_pause=1.0): + super(SimpleQueueService, self).__init__() + self.queue = queue + self.timeout = timeout + self.poll_pause = poll_pause + self.Message = Message + + def queue_message(self, storage_id, timestamp): + msg = self.Message() + payload = {'timestamp': timestamp, 'storage_id': storage_id} + msg.set_body(json.dumps(payload)) + with gevent.Timeout(self.timeout): + while not self.queue.write(msg): + pass + + def poll(self): + with gevent.Timeout(self.timeout): + messages = self.queue.get_messages() + for msg in messages: + payload = json.loads(msg.get_body()) + yield (payload['timestamp'], payload['storage_id'], msg) + + def sleep(self): + gevent.sleep(self.poll_pause) + + def delete(self, msg): + with gevent.Timeout(self.timeout): + self.queue.delete_message(msg) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/cloudstorage/rackspace.py b/slimta/cloudstorage/rackspace.py new file mode 100644 index 00000000..caa91a38 --- /dev/null +++ b/slimta/cloudstorage/rackspace.py @@ -0,0 +1,577 @@ +# Copyright (c) 2013 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""This module defines the queue storage mechanism specific to the `Rackspace +Cloud`_ hosting service. It requires an account as well as the `Cloud Files`_ +and optionally the `Cloud Queues`_ services. + +For each queued message, the contents and metadata of the message are written +to *Cloud Files*. Upon success, a reference to the message is injected into +*Cloud Queues* as a new message. + +The *Cloud Queues* service is only necessary for alerting separate *slimta* +processes that a new message has been queued. If reception and relaying are +happening in the same process, *Cloud Queues* is unnecessary. + +:: + + auth = RackspaceCloudAuth({'username': 'slimta', 'api_key': 'xxxxxx'}, + region='IAD') + cloud_files = RackspaceCloudFiles(auth) + cloud_queues = RackspaceCloudQueues(auth) + + storage = CloudStorage(cloud_files, cloud_queues) + +.. _Rackspace Cloud: http://www.rackspace.com/cloud/ +.. _Cloud Files: http://www.rackspace.com/cloud/files/ +.. _Cloud Queues: http://www.rackspace.com/cloud/queues/ + +""" + +from __future__ import absolute_import + +import uuid +import json +from socket import getfqdn +from functools import partial + +from six.moves import cPickle +from six.moves.urllib.parse import urlsplit, urljoin, urlencode + +import gevent + +from slimta.http import get_connection +from slimta import logging +from . import CloudStorageError + +__all__ = ['RackspaceError', 'RackspaceCloudAuth', 'RackspaceCloudFiles', + 'RackspaceCloudQueues'] + +log = logging.getHttpLogger(__name__) + +_DEFAULT_AUTH_ENDPOINT = 'https://identity.api.rackspacecloud.com/v2.0/' +_DEFAULT_CLIENT_ID = str(uuid.uuid5(uuid.NAMESPACE_DNS, getfqdn())) + +_TIMESTAMP_HDR = 'X-Object-Meta-Timestamp' +_ATTEMPTS_HDR = 'X-Object-Meta-Attempts' +_DELIVERED_RCPTS_HDR = 'X-Object-Meta-Delivered-Rcpts' + + +class RackspaceError(CloudStorageError): + """Thrown when an unexpected status has been returned from a Rackspace + Cloud API request and the engine does not know how to continue. + + """ + + def __init__(self, response): + status = '{0!s} {1}'.format(response.status, response.reason) + msg = 'Received {0!r} from the API.'.format(status) + super(RackspaceError, self).__init__(msg) + + #: The :class:`~httplib.HTTPResponse` object that triggered the + #: exception. + self.response = response + + +class RackspaceCloudAuth(object): + """This class implements and manages the creation of authentication tokens + when :class:`RackspaceCloudFiles` or :class:`RackspaceCloudQueues` objects + require them. + + :param credentials: This dictionary defines how credentials are sent to the + Auth API. + + If the ``function`` key is defined, it must be a + callable that takes no arguments and returns a tuple. + The tuple must contain a token string, a Cloud Files + service endpoint, and a Cloud Queues service endpoint. + + Otherwise, this dictionary must have a ``username`` key + whose value is the Rackspace Cloud username string. + + The ``password`` key may be used to fetch tokens using + the account's password. Alternatively, the ``api_key`` + key may be used to fetch tokens using the account's API + key. With ``username``, either ``password`` or + ``api_key`` must be given. + + Optionally, ``tenant_id`` may also be provided for + situations where it is necessary for authentication. + :type credentials: dict + :param endpoint: If given, this is the Rackspace Cloud Auth endpoint to hit + when creating tokens. + :param region: When discovering API endpoints from the service catalog, + this is the endpoint region to use, e.g. ``IAD`` or ``HKG``. + If not given, the first region returned is used. + :param timeout: Timeout, in seconds, for requests to the Cloud Auth API to + create a new token for the session. + :param tls: Optional dictionary of TLS settings passed directly as keyword + arguments to :class:`gevent.ssl.SSLSocket`. This is only used + for URLs with the ``https`` scheme. + + """ + + def __init__(self, credentials, endpoint=_DEFAULT_AUTH_ENDPOINT, + region=None, timeout=None, tls=None): + super(RackspaceCloudAuth, self).__init__() + self.get_connection = get_connection + self.timeout = timeout + self.region = region + self.tls = tls or {} + self.token_func = None + self._token_id = None + + if 'function' in credentials: + self.token_func = credentials['function'] + elif 'username' in credentials: + username = credentials['username'] + tenant_id = credentials.get('tenant_id') + if 'password' in credentials: + password = credentials['password'] + self.token_func = partial(self._get_token_password, + endpoint, + username, password, tenant_id) + elif 'api_key' in credentials: + api_key = credentials['api_key'] + self.token_func = partial(self._get_token_api_key, + endpoint, + username, api_key, tenant_id) + if not self.token_func: + msg = 'Required keys not found in credentials dictionary.' + raise KeyError(msg) + + #: The current Cloud Queues API endpoint in use by the mechanism. This + #: should be populated automatically on authentication. + self.queues_endpoint = None + + #: The current Cloud Files API endpoint in use by the mechanism. This + #: should be populated automatically on authentication. + self.files_endpoint = None + + def _get_token(self, url, payload): + full_url = urljoin(url+'/', 'tokens') + parsed_url = urlsplit(full_url, 'http') + conn = self.get_connection(parsed_url, self.tls) + json_payload = json.dumps(payload, sort_keys=True) + headers = [('Host', parsed_url.hostname), + ('Content-Type', 'application/json'), + ('Content-Length', str(len(json_payload))), + ('Accept', 'application/json')] + with gevent.Timeout(self.timeout): + log.request(conn, 'POST', parsed_url.path, headers) + conn.putrequest('POST', parsed_url.path) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders(json_payload) + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + return self._get_token_response(res) + + def _get_token_response(self, response): + if response.status != 200: + raise RackspaceError(response) + payload = json.load(response) + token_id = payload['access']['token']['id'] + files_endpoint = None + queues_endpoint = None + for service in payload['access']['serviceCatalog']: + if service['type'] == 'object-store': + for endpoint in service['endpoints']: + if not self.region or endpoint['region'] == self.region: + files_endpoint = endpoint['publicURL'] + break + if service['type'] == 'rax:queues': + for endpoint in service['endpoints']: + if not self.region or endpoint['region'] == self.region: + queues_endpoint = endpoint['publicURL'] + break + return token_id, files_endpoint, queues_endpoint + + def _get_token_password(self, url, username, password, tenant_id): + payload = {'auth': {'passwordCredentials': {'username': username, + 'password': password}}} + if tenant_id: + payload['auth']['tenantId'] = tenant_id + return self._get_token(url, payload) + + def _get_token_api_key(self, url, username, api_key, tenant_id): + payload = {'auth': {'RAX-KSKEY:apiKeyCredentials': + {'username': username, 'apiKey': api_key}}} + if tenant_id: + payload['auth']['tenantId'] = tenant_id + return self._get_token(url, payload) + + @property + def token_id(self): + """The current token in use by the mechanism. + + """ + if not self._token_id: + self.create_token() + return self._token_id + + def create_token(self): + """Creates a new token for use in future requests to Rackspace Cloud + services. This method is called automatically in most cases. The new + token is stored in the :attr:`.token_id` attribute. + + """ + self._token_id, self.files_endpoint, self.queues_endpoint = \ + self.token_func() + + +class RackspaceCloudFiles(object): + """Instances of this class may be passed in to the + :class:`~slimta.cloudstorage.CloudStorage` constructor for the ``storage`` + parameter to use `Cloud Files`_ as the storage backend. + + Keys added to the container are generated with + ``prefix + str(uuid.uuid4())``. + + :param auth: The :class:`RackspaceCloudAuth` object used to manage tokens + this service. + :param container: The Cloud Files container name to use. The files in this + container will be named with random UUID strings. + :param timeout: Timeout, in seconds, for all requests to the Cloud Files + API to return before an exception is thrown. + :param tls: Optional dictionary of TLS settings passed directly as keyword + arguments to :class:`gevent.ssl.SSLSocket`. This is only used + for URLs with the ``https`` scheme. + :param prefix: The string prefixed to every key added to the bucket. + + """ + + def __init__(self, auth, container='slimta-queue', timeout=None, tls=None, + prefix=''): + super(RackspaceCloudFiles, self).__init__() + self.get_connection = get_connection + self.auth = auth + self.container = container + self.timeout = timeout + self.prefix = prefix + self.tls = tls or {} + + def _get_files_url(self, files_id=None): + url = urljoin(self.auth.files_endpoint+'/', self.container) + if files_id: + url = urljoin(url+'/', files_id) + return url + + def write_message(self, envelope, timestamp, retry=False): + envelope_raw = cPickle.dumps(envelope, cPickle.HIGHEST_PROTOCOL) + files_id = self.prefix + str(uuid.uuid4()) + url = self._get_files_url(files_id) + parsed_url = urlsplit(str(url), 'http') + conn = self.get_connection(parsed_url, self.tls) + headers = [('Host', parsed_url.hostname), + ('Content-Type', 'application/octet-stream'), + ('Content-Length', str(len(envelope_raw))), + (_TIMESTAMP_HDR, json.dumps(timestamp)), + ('X-Auth-Token', self.auth.token_id)] + with gevent.Timeout(self.timeout): + log.request(conn, 'PUT', parsed_url.path, headers) + conn.putrequest('PUT', parsed_url.path) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders(envelope_raw) + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + if res.status == 401 and not retry: + self.auth.create_token() + return self.write_message(envelope, timestamp, retry=True) + elif res.status != 201: + raise RackspaceError(res) + return files_id + + def _write_message_meta(self, files_id, meta_headers, retry=False): + url = self._get_files_url(files_id) + parsed_url = urlsplit(url, 'http') + conn = self.get_connection(parsed_url, self.tls) + headers = [('Host', parsed_url.hostname), + ('X-Auth-Token', self.auth.token_id)] + meta_headers + with gevent.Timeout(self.timeout): + log.request(conn, 'POST', parsed_url.path, headers) + conn.putrequest('POST', parsed_url.path) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders() + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + if res.status == 401 and not retry: + self.auth.create_token() + return self._write_message_meta(files_id, meta_headers, retry=True) + elif res.status != 202: + raise RackspaceError(res) + + def set_message_meta(self, files_id, timestamp=None, attempts=None, + delivered_indexes=None): + meta_headers = [] + if timestamp is not None: + timestamp_raw = json.dumps(timestamp) + meta_headers.append((_TIMESTAMP_HDR, timestamp_raw)) + if attempts is not None: + attempts_raw = json.dumps(attempts) + meta_headers.append((_ATTEMPTS_HDR, attempts_raw)) + if delivered_indexes is not None: + delivered_raw = json.dumps(delivered_indexes) + meta_headers.append((_DELIVERED_RCPTS_HDR, delivered_raw)) + return self._write_message_meta(files_id, meta_headers) + + def delete_message(self, files_id, retry=False): + url = self._get_files_url(files_id) + parsed_url = urlsplit(url, 'http') + conn = self.get_connection(parsed_url, self.tls) + headers = [('Host', parsed_url.hostname), + ('X-Auth-Token', self.auth.token_id)] + with gevent.Timeout(self.timeout): + log.request(conn, 'DELETE', parsed_url.path, headers) + conn.putrequest('DELETE', parsed_url.path) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders() + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + if res.status == 401 and not retry: + return self.delete_message(files_id, retry=True) + elif res.status != 204: + raise RackspaceError(res) + + def get_message(self, files_id, only_meta=False, retry=False): + url = self._get_files_url(files_id) + parsed_url = urlsplit(url, 'http') + conn = self.get_connection(parsed_url, self.tls) + headers = [('Host', parsed_url.hostname), + ('X-Auth-Token', self.auth.token_id)] + method = 'HEAD' if only_meta else 'GET' + with gevent.Timeout(self.timeout): + log.request(conn, method, parsed_url.path, headers) + conn.putrequest(method, parsed_url.path) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders() + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + data = None if only_meta else res.read() + if res.status == 401 and not retry: + self.auth.create_token() + return self.get_message(files_id, only_meta, retry=True) + if res.status == 404: + raise KeyError(files_id) + elif res.status != 200: + raise RackspaceError(res) + timestamp_raw = res.getheader(_TIMESTAMP_HDR) + attempts_raw = res.getheader(_ATTEMPTS_HDR, None) + delivered_raw = res.getheader(_DELIVERED_RCPTS_HDR, None) + meta = {'timestamp': json.loads(timestamp_raw)} + if attempts_raw: + meta['attempts'] = json.loads(attempts_raw) + if delivered_raw: + meta['delivered_indexes'] = json.loads(delivered_raw) + if only_meta: + return meta + else: + envelope = cPickle.loads(data) + return envelope, meta + + def get_message_meta(self, files_id): + return self.get_message(files_id, only_meta=True) + + def _list_messages_page(self, marker, retry=False): + url = self._get_files_url() + parsed_url = urlsplit(url, 'http') + conn = self.get_connection(parsed_url, self.tls) + headers = [('Host', parsed_url.hostname), + ('X-Auth-Token', self.auth.token_id)] + query = urlencode({'limit': '1000'}) + if marker: + query += '&{0}'.format(urlencode({'marker': marker})) + selector = '{0}?{1}'.format(parsed_url.path, query) + with gevent.Timeout(self.timeout): + log.request(conn, 'GET', selector, headers) + conn.putrequest('GET', selector) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders() + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + data = res.read() + if res.status == 401 and not retry: + self.auth.create_token() + return self._list_messages_page(marker, retry=True) + if res.status == 200: + lines = data.splitlines() + return [line for line in lines + if line.startswith(self.prefix)], lines[-1] + elif res.status == 204: + return [], None + else: + raise RackspaceError(res) + + def list_messages(self): + marker = None + ids = [] + while True: + ids_batch, marker = self._list_messages_page(marker) + if not marker: + break + ids.extend(ids_batch) + for id in ids: + timestamp, attempts = self.get_message_meta(id) + yield timestamp, id + + +class RackspaceCloudQueues(object): + """Instances of this class may be passed in to the + :class:`~slimta.cloudstorage.CloudStorage` constructor for the + ``message_queue`` parameter to use `Cloud Queues`_ as the message queue + backend to alert other processes that a new message was stored. + + :param auth: The :class:`RackspaceCloudAuth` object used to manage tokens + this service. + :param queue_name: The Cloud Files queue name to use. + :param client_id: The ``Client-ID`` header passed in with all Cloud Queues + requests. By default, this is generated using + :func:`~uuid.uuid5` in conjunction with + :func:`~socket.getfqdn` to be consistent across restarts. + :param timeout: Timeout, in seconds, for all requests to the Cloud Queues + API. + :param poll_pause: The time, in seconds, to idle between attempts to poll + the queue for new messages. + :param tls: Optional dictionary of TLS settings passed directly as keyword + arguments to :class:`gevent.ssl.SSLSocket`. This is only used + for URLs with the ``https`` scheme. + + """ + + def __init__(self, auth, queue_name='slimta-queue', client_id=None, + timeout=None, poll_pause=1.0, tls=None): + super(RackspaceCloudQueues, self).__init__() + self.get_connection = get_connection + self.auth = auth + self.queue_name = queue_name + self.client_id = client_id or _DEFAULT_CLIENT_ID + self.timeout = timeout + self.poll_pause = poll_pause + self.tls = tls or {} + + def queue_message(self, storage_id, timestamp, retry=False): + url = urljoin(self.auth.queues_endpoint+'/', + 'queues/{0}/messages'.format(self.queue_name)) + parsed_url = urlsplit(url, 'http') + conn = self.get_connection(parsed_url, self.tls) + payload = [{'ttl': 86400, + 'body': {'timestamp': timestamp, + 'storage_id': storage_id}}] + json_payload = json.dumps(payload, sort_keys=True) + headers = [('Host', parsed_url.hostname), + ('Client-ID', self.client_id), + ('Content-Type', 'application/json'), + ('Content-Length', str(len(json_payload))), + ('Accept', 'application/json'), + ('X-Auth-Token', self.auth.token_id)] + with gevent.Timeout(self.timeout): + log.request(conn, 'POST', parsed_url.path, headers) + conn.putrequest('POST', parsed_url.path) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders(json_payload) + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + if res.status == 401 and not retry: + self.auth.create_token() + return self.queue_message(storage_id, timestamp, retry=True) + elif res.status != 201: + raise RackspaceError(res) + + def _claim_queued_messages(self, retry=False): + url = urljoin(self.auth.queues_endpoint+'/', + 'queues/{0}/claims'.format(self.queue_name)) + parsed_url = urlsplit(url, 'http') + conn = self.get_connection(parsed_url, self.tls) + json_payload = '{"ttl": 3600, "grace": 3600}' + headers = [('Host', parsed_url.hostname), + ('Client-ID', self.client_id), + ('Content-Type', 'application/json'), + ('Content-Length', str(len(json_payload))), + ('Accept', 'application/json'), + ('X-Auth-Token', self.auth.token_id)] + with gevent.Timeout(self.timeout): + log.request(conn, 'POST', parsed_url.path, headers) + conn.putrequest('POST', parsed_url.path) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders(json_payload) + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + data = res.read() + if res.status == 401 and not retry: + self.auth.create_token() + return self._claim_queued_messages(retry=True) + if res.status == 201: + messages = json.loads(data) + return [(msg['body'], msg['href']) for msg in messages] + elif res.status == 204: + return [] + else: + raise RackspaceError(res) + + def poll(self): + messages = self._claim_queued_messages() + for body, href in messages: + yield (body['timestamp'], body['storage_id'], href) + + def sleep(self): + gevent.sleep(self.poll_pause) + + def delete(self, href, retry=False): + url = self.auth.queues_endpoint + parsed_url = urlsplit(url, 'http') + conn = self.get_connection(parsed_url, self.tls) + headers = [('Host', parsed_url.hostname), + ('Client-ID', self.client_id), + ('Content-Type', 'application/json'), + ('Accept', 'application/json'), + ('X-Auth-Token', self.auth.token_id)] + with gevent.Timeout(self.timeout): + log.request(conn, 'DELETE', href, headers) + conn.putrequest('DELETE', href) + for name, value in headers: + conn.putheader(name, value) + conn.endheaders() + res = conn.getresponse() + status = '{0!s} {1}'.format(res.status, res.reason) + log.response(conn, status, res.getheaders()) + if res.status == 401 and not retry: + self.auth.create_token() + return self.delete(href, retry=True) + elif res.status != 204: + raise RackspaceError(res) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/diskstorage/__init__.py b/slimta/diskstorage/__init__.py new file mode 100644 index 00000000..4e470b4f --- /dev/null +++ b/slimta/diskstorage/__init__.py @@ -0,0 +1,287 @@ +# Copyright (c) 2012 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Package implementing the :mod:`~slimta.queue` storage system on disk. Disk +reads and writes are built using the aio_ interface, provided in python by the +pyaio_ project. + +.. _aio: http://www.kernel.org/doc/man-pages/online/pages/man7/aio.7.html +.. _pyaio: https://github.com/felipecruz/pyaio + +""" + +from __future__ import absolute_import + +import os +import uuid +import os.path +from tempfile import mkstemp +from functools import partial + +from six.moves import cPickle + +from pyaio import aio_read, aio_write # type: ignore +import gevent +from gevent.event import AsyncResult # type: ignore +from gevent.lock import Semaphore + +from slimta.queue import QueueStorage +from slimta import logging + +__all__ = ['DiskStorage'] + +log = logging.getQueueStorageLogger(__name__) + + +class AioFile(object): + + _keep_awake_thread = None + _keep_awake_refs = 0 + _keep_awake_lock = Semaphore(1) + + chunk_size = (16 << 10) + + def __init__(self, path, tmp_dir=None): + self.path = path + self.tmp_dir = tmp_dir + + @classmethod + def _start_keep_awake_thread(cls): + cls._keep_awake_lock.acquire() + try: + if not cls._keep_awake_thread: + cls._keep_awake_thread = gevent.spawn(cls._keep_awake) + cls._keep_awake_refs += 1 + finally: + cls._keep_awake_lock.release() + + @classmethod + def _stop_keep_awake_thread(cls): + cls._keep_awake_lock.acquire() + try: + cls._keep_awake_refs -= 1 + if cls._keep_awake_refs <= 0: + cls._keep_awake_thread.kill() + cls._keep_awake_thread = None + finally: + cls._keep_awake_lock.release() + + @classmethod + def _keep_awake(cls): + while True: + gevent.sleep(0.001) + + def _write_callback(self, event, ret, errno): + if ret > 0: + event.set(ret) + else: + exc = IOError(errno, os.strerror(errno)) + event.set_exception(exc) + + def _write_piece(self, fd, data, data_len, offset): + remaining = data_len - offset + if remaining > self.chunk_size: + remaining = self.chunk_size + piece = data[offset:offset+remaining] + event = AsyncResult() + callback = partial(self._write_callback, event) + aio_write(fd, piece, offset, callback) + return event.get() + + def dump(self, data): + try: + data_view = memoryview(data) + except NameError: + data_view = data + data_len = len(data) + offset = 0 + self._start_keep_awake_thread() + fd, filename = mkstemp(dir=self.tmp_dir) + try: + while True: + ret = self._write_piece(fd, data_view, data_len, offset) + offset += ret + if offset >= data_len: + break + os.rename(filename, self.path) + finally: + os.close(fd) + self._stop_keep_awake_thread() + + def pickle_dump(self, obj): + return self.dump(cPickle.dumps(obj, cPickle.HIGHEST_PROTOCOL)) + + def _read_callback(self, event, buf, ret, errno): + if ret > 0: + event.set(buf) + elif ret == 0: + exc = EOFError() + event.set_exception(exc) + else: + exc = IOError(errno, os.strerror(errno)) + event.set_exception(exc) + + def _read_piece(self, fd, offset): + event = AsyncResult() + callback = partial(self._read_callback, event) + aio_read(fd, offset, self.chunk_size, callback) + return event.get() + + def load(self): + data = bytearray() + offset = 0 + self._start_keep_awake_thread() + fd = os.open(self.path, os.O_RDONLY) + try: + while True: + buf = self._read_piece(fd, offset) + offset += len(buf) + data.extend(buf) + except EOFError: + return bytes(data) + finally: + os.close(fd) + self._stop_keep_awake_thread() + raise RuntimeError() + + def pickle_load(self): + return cPickle.loads(self.load()) + + +class DiskOps(object): + + def __init__(self, env_dir, meta_dir, tmp_dir): + self.env_dir = env_dir + self.meta_dir = meta_dir + self.tmp_dir = tmp_dir + + def check_exists(self, id): + path = os.path.join(self.env_dir, id+'.env') + return os.path.lexists(path) + + def write_env(self, id, envelope): + final_path = os.path.join(self.env_dir, id+'.env') + AioFile(final_path, self.tmp_dir).pickle_dump(envelope) + + def write_meta(self, id, meta): + final_path = os.path.join(self.meta_dir, id+'.meta') + AioFile(final_path, self.tmp_dir).pickle_dump(meta) + + def read_meta(self, id): + path = os.path.join(self.meta_dir, id+'.meta') + return AioFile(path).pickle_load() + + def read_env(self, id): + path = os.path.join(self.env_dir, id+'.env') + return AioFile(path).pickle_load() + + def get_ids(self): + return [fn[:-4] for fn in os.listdir(self.env_dir) + if fn.endswith('.env')] + + def delete_env(self, id): + env_path = os.path.join(self.env_dir, id+'.env') + try: + os.remove(env_path) + except OSError: + pass + + def delete_meta(self, id): + meta_path = os.path.join(self.meta_dir, id+'.meta') + try: + os.remove(meta_path) + except OSError: + pass + + +class DiskStorage(QueueStorage): + """|QueueStorage| mechanism that stores |Envelope| and queue metadata in + two separate files on disk. + + :param env_dir: Directory where queue envelope files are stored. These + files may be large and will not be modified after initial + writing. + :param meta_dir: Directory where queue meta files are stored. These files + will be small and volatile. + :param tmp_dir: Directory that may be used as scratch space. New files are + written here and then moved to their final destination. + System temp directories are used by default. + + """ + + def __init__(self, env_dir, meta_dir, tmp_dir=None): + super(DiskStorage, self).__init__() + self.ops = DiskOps(env_dir, meta_dir, tmp_dir) + + def write(self, envelope, timestamp): + meta = {'timestamp': timestamp, 'attempts': 0} + while True: + id = uuid.uuid4().hex + if not self.ops.check_exists(id): + self.ops.write_env(id, envelope) + self.ops.write_meta(id, meta) + log.write(id, envelope) + return id + + def set_timestamp(self, id, timestamp): + meta = self.ops.read_meta(id) + meta['timestamp'] = timestamp + self.ops.write_meta(id, meta) + log.update_meta(id, timestamp=timestamp) + + def increment_attempts(self, id): + meta = self.ops.read_meta(id) + new_attempts = meta['attempts'] + 1 + meta['attempts'] = new_attempts + self.ops.write_meta(id, meta) + log.update_meta(id, attempts=new_attempts) + return new_attempts + + def set_recipients_delivered(self, id, rcpt_indexes): + meta = self.ops.read_meta(id) + current = meta.get('delivered_indexes', []) + new = current + rcpt_indexes + meta['delivered_indexes'] = new + self.ops.write_meta(id, meta) + log.update_meta(id, delivered_indexes=rcpt_indexes) + + def load(self): + for id in self.ops.get_ids(): + try: + meta = self.ops.read_meta(id) + yield (meta['timestamp'], id) + except OSError: + logging.log_exception(__name__, queue_id=id) + + def get(self, id): + meta = self.ops.read_meta(id) + env = self.ops.read_env(id) + delivered_rcpts = meta.get('delivered_indexes', []) + self._remove_delivered_rcpts(env, delivered_rcpts) + return env, meta['attempts'] + + def remove(self, id): + self.ops.delete_env(id) + self.ops.delete_meta(id) + log.remove(id) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/edge/wsgi.py b/slimta/edge/wsgi.py index a1e09dd5..1f3d921d 100644 --- a/slimta/edge/wsgi.py +++ b/slimta/edge/wsgi.py @@ -68,7 +68,7 @@ from slimta.queue import QueueError from slimta.relay import RelayError from slimta.util.ptrlookup import PtrLookup -from . import Edge, EdgeServer +from . import EdgeServer __all__ = ['WsgiResponse', 'WsgiEdge', 'WsgiValidators'] diff --git a/slimta/envelope.py b/slimta/envelope.py index 5aba8c4e..82e64a92 100644 --- a/slimta/envelope.py +++ b/slimta/envelope.py @@ -30,15 +30,8 @@ import copy from io import BytesIO -try: - from email.parser import BytesParser - from email.generator import BytesGenerator - from email.policy import SMTP -except ImportError: - from email.parser import Parser - from email.generator import Generator - from slimta.util import pycompat +from slimta.util.pycompat import generator_class, parser_class __all__ = ['Envelope'] @@ -94,18 +87,14 @@ def __init__(self, sender=None, recipients=None, self.timestamp = None def _parse_data(self, data, *extra): - if pycompat.PY3: - return BytesParser(policy=SMTP).parse(BytesIO(data), *extra) - else: - return Parser().parse(BytesIO(data), *extra) + return parser_class().parse(BytesIO(data), *extra) def _msg_generator(self, msg): outfp = BytesIO() + generator_class(outfp).flatten(msg, False) if pycompat.PY3: - BytesGenerator(outfp, policy=SMTP).flatten(msg, False) return outfp.getvalue() else: - Generator(outfp).flatten(msg, False) return re.sub(_LINE_BREAK, b'\r\n', outfp.getvalue()) def _merge_payloads(self, headers, payload): @@ -119,12 +108,19 @@ def _merge_payloads(self, headers, payload): return payload def prepend_header(self, name, value): - """This method allows prepending a header to the message. The - :attr:`.headers` object does not directly support header prepending - because the Python implementation only provides appending. + """This method allows prepending a header to the message. + + .. note:: + + This method uses undocumented Python API because the + :attr:`.headers` object does not directly support header + prepending. + + :param name: The header name. + :param value: The header value string. """ - self.headers._headers.insert(0, (name, value)) + self.headers._headers.insert(0, (name, value)) # type: ignore def copy(self, new_rcpts=None): """Builds and returns an exact copy if the current object. This method diff --git a/slimta/logging/__init__.py b/slimta/logging/__init__.py index 67780ce8..48af17a3 100644 --- a/slimta/logging/__init__.py +++ b/slimta/logging/__init__.py @@ -31,11 +31,12 @@ import logging from ast import literal_eval -from slimta.util.pycompat import reprlib +from .log import log_repr from .socket import SocketLogger from .subprocess import SubprocessLogger from .queuestorage import QueueStorageLogger from .http import HttpLogger +from ..util.pycompat import reprlib __all__ = ['getSocketLogger', 'getSubprocessLogger', 'getQueueStorageLogger', 'getHttpLogger', 'log_exception', 'parseline'] @@ -123,20 +124,6 @@ def log_exception(name, **kwargs): type.__name__, data_str, tb_repr.repr(tb_str))) -log_repr = reprlib.Repr() -log_repr.maxstring = 100 -log_repr.maxother = 100 - - -def logline(log, type, typeid, operation, **data): - if not data: - log('{0}:{1}:{2}'.format(type, typeid, operation)) - else: - data_str = ' '.join(['='.join((key, log_repr.repr(val))) - for key, val in sorted(data.items())]) - log('{0}:{1}:{2} {3}'.format(type, typeid, operation, data_str)) - - parseline_pattern = re.compile(r'^([^:]+):([^:]+):(\S+) ?') data_item_pattern = re.compile('^([^=]+)=') diff --git a/slimta/logging/http.py b/slimta/logging/http.py index 07682aac..9d2f013f 100644 --- a/slimta/logging/http.py +++ b/slimta/logging/http.py @@ -28,6 +28,8 @@ from functools import partial +from .log import logline + __all__ = ['HttpLogger'] @@ -41,7 +43,6 @@ class HttpLogger(object): """ def __init__(self, log): - from slimta.logging import logline self.log = partial(logline, log.debug, 'http') def _get_method_from_environ(self, environ): diff --git a/slimta/logging/log.py b/slimta/logging/log.py new file mode 100644 index 00000000..f6127e08 --- /dev/null +++ b/slimta/logging/log.py @@ -0,0 +1,40 @@ +# Copyright (c) 2020 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +from __future__ import absolute_import + +from slimta.util.pycompat import reprlib + +__all__ = ['log_repr', 'logline'] + + +log_repr = reprlib.Repr() +log_repr.maxstring = 100 +log_repr.maxother = 100 + + +def logline(log, type, typeid, operation, **data): + if not data: + log('{0}:{1}:{2}'.format(type, typeid, operation)) + else: + data_str = ' '.join(['='.join((key, log_repr.repr(val))) + for key, val in sorted(data.items())]) + log('{0}:{1}:{2} {3}'.format(type, typeid, operation, data_str)) diff --git a/slimta/logging/queuestorage.py b/slimta/logging/queuestorage.py index a5fea1cd..00f1bac6 100644 --- a/slimta/logging/queuestorage.py +++ b/slimta/logging/queuestorage.py @@ -28,6 +28,8 @@ from functools import partial +from .log import logline + __all__ = ['QueueStorageLogger'] @@ -41,7 +43,6 @@ class QueueStorageLogger(object): """ def __init__(self, log): - from slimta.logging import logline self.log = partial(logline, log.debug, 'queue') def write(self, id, envelope): diff --git a/slimta/logging/socket.py b/slimta/logging/socket.py index dcb391e7..55e507bb 100644 --- a/slimta/logging/socket.py +++ b/slimta/logging/socket.py @@ -28,6 +28,8 @@ from gevent.socket import SHUT_WR, SHUT_RD +from .log import logline + __all__ = ['socket_error_log_level', 'SocketLogger'] #: The log level for logging :py:exc:`socket.error` exceptions. The default log @@ -45,7 +47,6 @@ class SocketLogger(object): """ def __init__(self, logger): - from slimta.logging import logline self.logger = logger self.log = partial(logline, logger.debug, 'fd') self.log_error = partial(logline, self._log_error, 'fd') diff --git a/slimta/logging/subprocess.py b/slimta/logging/subprocess.py index e237d855..28ef8dcd 100644 --- a/slimta/logging/subprocess.py +++ b/slimta/logging/subprocess.py @@ -28,6 +28,8 @@ from functools import partial +from .log import logline + __all__ = ['SubprocessLogger'] @@ -41,7 +43,6 @@ class SubprocessLogger(object): """ def __init__(self, log): - from slimta.logging import logline self.log = partial(logline, log.debug, 'pid') def popen(self, process, args): diff --git a/slimta/lookup/__init__.py b/slimta/lookup/__init__.py new file mode 100644 index 00000000..60cdca04 --- /dev/null +++ b/slimta/lookup/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) 2014 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/lookup/drivers/__init__.py b/slimta/lookup/drivers/__init__.py new file mode 100644 index 00000000..21b61c1e --- /dev/null +++ b/slimta/lookup/drivers/__init__.py @@ -0,0 +1,102 @@ +# Copyright (c) 2014 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""This package contains several implementations of the slimta lookup +mechanism, which provides a simple interface to control actions and policies +with external lookups. Under normal circumstances, slimta lookup drivers do +not modify their backend data source. + +""" + +from __future__ import absolute_import + +import logging + +from ...logging.log import logline + +__all__ = ['LookupBase'] + + +class LookupBase(object): + """Inherit this class to implement a slimta lookup driver. Only the + :meth:`.lookup` method must be overridden. + + """ + + def _format_key(self, key_template, kwargs): + kwargs = kwargs.copy() + while True: + try: + return key_template.format(**kwargs) + except KeyError as exc: + key = exc.args[0] + kwargs[key] = '{'+key+'}' + + def lookup(self, **kwargs): + """The keyword arguments will be used by the lookup driver to return a + dictionary-like object that will be used to affect actions or policy. + For some drivers, these keywords may be used with a template to produce + a lookup key. For SQL-style drivers, they might be used in a ``WHERE`` + clause of a ``SELECT`` query. + + :param kwargs: Used by the driver to lookup records. + :type kwargs: keyword arguments + :returns: A dictionary if a record was found, ``None`` otherwise. + + """ + raise NotImplementedError() + + def lookup_address(self, address, **extra): + """A convenience method where the given address is passed in as the + ``address`` keyword to :meth:`.lookup`. If the address has domain part, + it is substringed and passed in as the ``domain`` keyword a well. + + :param address: An address to lookup, either as the full address or as + its domain part. + :type address: str + :param extra: Additional keyword arguments to pass in to + :meth:`.lookup`. + :type extra: keyword arguments + + """ + if '@' in address: + domain = address.rsplit('@', 1)[1] + return self.lookup(address=address, domain=domain, **extra) + return self.lookup(address=address, **extra) + + def log(self, name, kwargs, ret): + """Implementing drivers should call this method to log the lookup + transaction. + + :param name: The module name, e.g. ``__name__``. + :type name: str + :param kwargs: The keyword arguments given to :meth:`.lookup`. + :type kwargs: dict + :param ret: The return value of the lookup, e.g. a dictionary or + ``None``. + + """ + logger = logging.getLogger(name) + operation = 'notfound' if ret is None else 'found' + logline(logger.debug, 'lookup', id(self), operation, **kwargs) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/lookup/drivers/dbapi2.py b/slimta/lookup/drivers/dbapi2.py new file mode 100644 index 00000000..4a50e980 --- /dev/null +++ b/slimta/lookup/drivers/dbapi2.py @@ -0,0 +1,141 @@ +# Copyright (c) 2014 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Implements slimta lookup against a `DB API 2.0`_ database interface. This +driver should be flexible enough to support any implementing database. + +Any database used with this driver should use gevent sockets (or +monkey-patching) to ensure good performance. + +This module also provides a SQLite_ convenience class. + +.. _DB API 2.0: http://legacy.python.org/dev/peps/pep-0249/ +.. _SQLite: http://www.sqlite.org/ +.. _context manager: \ +https://docs.python.org/2/library/stdtypes.html#context-manager-types + +""" + +from __future__ import absolute_import + +import sqlite3 +from collections import Mapping +from contextlib import contextmanager + +from . import LookupBase + +__all__ = ['DBAPI2Lookup', 'SQLite3Lookup'] + + +class DBAPI2Lookup(LookupBase): + """Implements the slimta lookup interface using the generic `DB API 2.0`_ + specification. + + :param conn_ctxmgr: A `context manager`_ that, given no arguments, produces + an open database connection, and cleans it up + afterwards. This allows flexibility in how connections + are managed or pooled. + :param query: The query string used to lookup records in the database. + :type query: str + :param query_param_order: If ``query`` uses positional parameters, this + must be a list of keywords from the + :meth:`.lookup` to translate from keywords to + positional arguments. + :type query_param_order: list + :param result_order: Most database implementations return rows as a + sequence instead of a mapping. In this case, this + argument must be given to translate the sequence into + a dictionary, ``TypeError`` may be raised in the + :meth:`.lookup` method otherwise. + :param conn: If ``conn_ctxmgr`` is ``None``, a simple one is generated that + simply returns the value of this argument. Useful for + databases that have a single, persistent connection object. + + """ + + def __init__(self, conn_ctxmgr, query, query_param_order=None, + result_order=None, conn=None): + super(DBAPI2Lookup, self).__init__() + self.query = query + self.query_param_order = query_param_order + self.result_order = result_order + + if not conn_ctxmgr: + @contextmanager + def get_conn(): + yield conn + + self.conn_ctxmgr = get_conn + else: + self.conn_ctxmgr = conn_ctxmgr + + def _do_lookup(self, kwargs): + params = kwargs + if self.query_param_order is not None: + params = [kwargs[key] for key in self.query_param_order] + with self.conn_ctxmgr() as conn: + cur = conn.cursor() + try: + cur.execute(self.query, params) + row = cur.fetchone() + if not row: + return + if not isinstance(row, Mapping): + try: + result_order = row.keys() + except AttributeError: + result_order = self.result_order + ret = {} + for i, key in enumerate(result_order): + ret[key] = row[i] + return ret + return row + finally: + conn.rollback() + cur.close() + + def lookup(self, **kwargs): + ret = self._do_lookup(kwargs) + self.log(__name__, kwargs, ret) + return ret + + +class SQLite3Lookup(DBAPI2Lookup): + """Implements the slimta lookup interface using the :py:mod:`sqlite3` + module. The connection object is created immediately and kept open for all + calls to :meth:`.lookup`. + + :param database: The database filename, as given in + :py:func:`sqlite3.connect`. + :type database: str + :param query: The query string used to lookup records in the database. This + query must use named-style placeholders (e.g. + ``col = :keyword``. + :type query: str + + """ + + def __init__(self, database, query): + conn = sqlite3.connect(database) + super(SQLite3Lookup, self).__init__(None, query, conn=conn) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/lookup/drivers/dict.py b/slimta/lookup/drivers/dict.py new file mode 100644 index 00000000..91d9a3cd --- /dev/null +++ b/slimta/lookup/drivers/dict.py @@ -0,0 +1,66 @@ +# Copyright (c) 2014 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Implements slimta lookup against a standard Python dictionary-like object. +This object should be given pre-populated in the constructor, or be a proxy to +a persistent backend like :py:mod:`shelve`. + +""" + +from __future__ import absolute_import + +from . import LookupBase + +__all__ = ['DictLookup'] + + +class DictLookup(LookupBase): + """Instantiate this class with a Python dictionary-like object and it may + be used as a slimta lookup interface. + + :param backend: The backend dictionary-like object that will be queried for + data lookups. The values in this mapping **must** also be + dictionary-like objects. + :type backend: collections.Mapping + :param key_template: This template string is used to determine the key + string to lookup. The :py:meth:`str.format` method is + called with keyword arguments, given the keyword + arguments passed in to :meth:`.lookup`. + :type key_template: str + + """ + + def __init__(self, backend, key_template): + super(DictLookup, self).__init__() + self.backend = backend + self.key_template = key_template + + def lookup(self, **kwargs): + key = self._format_key(self.key_template, kwargs) + try: + ret = self.backend[key] + except KeyError: + ret = None + self.log(__name__, kwargs, ret) + return ret + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/lookup/drivers/redis.py b/slimta/lookup/drivers/redis.py new file mode 100644 index 00000000..a647a04a --- /dev/null +++ b/slimta/lookup/drivers/redis.py @@ -0,0 +1,106 @@ +# Copyright (c) 2014 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Implements slimta lookup against a redis data backend. By default, this +driver expects records to be JSON-encoded string_ values. It can be configured +to use the hash_ data structure instead, but it is less flexible. + +.. _hash: http://redis.io/commands#hash +.. _string: http://redis.io/commands#string +.. _GET: http://redis.io/commands/get +.. _HGETALL: http://redis.io/commands/hgetall + +""" + +from __future__ import absolute_import + +import json + +import redis +from gevent import socket + +from . import LookupBase + +__all__ = ['RedisLookup'] + + +class _GeventConnection(redis.Connection): + + def _connect(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(self.socket_timeout) + sock.connect((self.host, self.port)) + return sock + + +class RedisLookup(LookupBase): + """Implements the slimta lookup interface using the redis key-value storage + as the backend layer. + + :param key_template: This template string is used to determine the key + string to lookup. :The :py:meth:`~str.format` method + is called with keyword arguments, given the keyword + arguments passed in to :meth:`.lookup`. + :type key_template: str + :param host: Hostname of the redis server to connect to. + :param port: Port to connect to. + :param db: Database number to create keys in. + :param password: Optional password to authenticate with. + :param socket_timeout: Timeout, in seconds, for socket operations. If the + timeout is hit, :py:exc:`socket.timeout` is raised. + ``None`` disables the timeout. + :param use_hash: If ``True``, keys will be looked up as hashes with + HGETALL_ instead of as JSON-encoded strings. Hashes do not + allow for complex values in results, and cannot + distinguish missing records from empty records. + + """ + + def __init__(self, key_template, host='localhost', port=6379, db=0, + password=None, socket_timeout=None, use_hash=False): + super(RedisLookup, self).__init__() + self.key_template = key_template + pool = redis.ConnectionPool(connection_class=_GeventConnection, + host=host, port=port, db=db, + password=password, + socket_timeout=socket_timeout) + self.redis = redis.StrictRedis(connection_pool=pool) + if use_hash: + self._key_lookup = self._hash_lookup + else: + self._key_lookup = self._json_lookup + + def _json_lookup(self, key): + value_raw = self.redis.get(key) + if value_raw: + return json.loads(value_raw) + + def _hash_lookup(self, key): + return self.redis.hgetall(key) + + def lookup(self, **kwargs): + key = self._format_key(self.key_template, kwargs) + ret = self._key_lookup(key) + self.log(__name__, kwargs, ret) + return ret + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/lookup/drivers/regex.py b/slimta/lookup/drivers/regex.py new file mode 100644 index 00000000..a5dae19e --- /dev/null +++ b/slimta/lookup/drivers/regex.py @@ -0,0 +1,77 @@ +# Copyright (c) 2014 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Implements slimta lookup against a series of regular expressions.""" + +from __future__ import absolute_import + +import re + +from . import LookupBase + +__all__ = ['RegexLookup'] + + +class RegexLookup(LookupBase): + """Instantiate this class without any mappings. This object may be used as + a slimta lookup interface. + + :param str_template: This template string is used to determine the string + to match against the regular expressions. The + :py:meth:`str.format` method is called with keyword + arguments, given the keyword arguments passed in to + :meth:`.lookup`. + :type str_template: str + + """ + + def __init__(self, str_template): + super(RegexLookup, self).__init__() + self.str_template = str_template + self.regexes = [] + + def add_regex(self, pattern, value): + """Adds a regular expression with the associated value. + + :param pattern: Pattern to check the lookup string against. + :type pattern: :py:obj:`str` or :class:`re.RegexObject` + :param value: The value to return on successful lookup. + + """ + self.regexes.append((re.compile(pattern), value)) + + def lookup(self, **kwargs): + ret = None + try: + lookup_str = self.str_template.format(**kwargs) + except KeyError: + pass + else: + for regex, value in self.regexes: + match = regex.match(lookup_str) + if match: + ret = value + break + self.log(__name__, kwargs, ret) + return ret + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/lookup/policy.py b/slimta/lookup/policy.py new file mode 100644 index 00000000..837e2f99 --- /dev/null +++ b/slimta/lookup/policy.py @@ -0,0 +1,116 @@ +# Copyright (c) 2014 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Provides an implementation of the |QueuePolicy| interface that enforces +policies specified in a slimta lookup record. + +Currently the following record keys are implemented: + +``alias`` + Rewrites the envelope, replacing occurrences of the looked up address with + the contents of this field. + +``add_headers`` + If the contents of this field are a JSON-decodable dictionary, the keys and + values are prepended to the message as new headers. Existing headers are left + untouched. + +""" + +from __future__ import absolute_import + +import json + +from slimta.policy import QueuePolicy + +__all__ = ['LookupPolicy'] + + +class LookupPolicy(QueuePolicy): + """Instances of this class may be configured to run before a message is + queued using :meth:`slimta.queue.Queue.add_policy`. + + :param lookup: The slimta lookup driver, implementing the + :class:`~slimta.lookup.drivers.LookupBase` interface. + :param on_sender: If ``True``, the envelope sender is looked up and has + policies applied. + :type on_sender: bool + :param on_rcpts: If ``True``, the envelope recipients are looked up, each + one applying any policies found in the record. + :type on_rcpts: bool + + """ + + def __init__(self, lookup, on_sender=False, on_rcpts=True): + super(LookupPolicy, self).__init__() + self.lookup = lookup + self.on_sender = on_sender + self.on_rcpts = on_rcpts + + def _add_headers(self, envelope, headers_raw): + try: + headers = json.loads(headers_raw) + except ValueError: + return + for key, val in headers.items(): + envelope.prepend_header(key, val) + + def _verp_enc(self, address, on_domain): + localpart, _, domain = address.rpartition('@') + if localpart: + return '{0!s}={1!s}@{2!s}'.format(localpart, domain, on_domain) + else: + return '{0!s}@{1!s}'.format(domain, on_domain) + + def _get_alias(self, address, alias): + localpart, _, domain = address.rpartition('@') + alias = alias.format(localpart=localpart, domain=domain) + if '@' in alias: + return alias + else: + if localpart: + return '{0!s}@{1!s}'.format(localpart, alias) + else: + return alias + + def apply(self, envelope): + if self.on_sender: + ret = self.lookup.lookup_address(envelope.sender) or {} + if 'verp' in ret: + envelope.sender = self._verp_enc(envelope.sender, ret['verp']) + if 'alias' in ret: + envelope.sender = self._get_alias( + envelope.sender, ret['alias']) + if 'add_headers' in ret: + self._add_headers(envelope, ret['add_headers']) + if self.on_rcpts: + for i, rcpt in enumerate(envelope.recipients): + ret = self.lookup.lookup_address(rcpt) or {} + if 'verp' in ret: + envelope.recipients[i] = self._verp_enc(rcpt, ret['verp']) + if 'alias' in ret: + envelope.recipients[i] = self._get_alias( + rcpt, ret['alias']) + if 'add_headers' in ret: + self._add_headers(envelope, ret['add_headers']) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/queue/__init__.py b/slimta/queue/__init__.py index c3464b78..ff08193c 100644 --- a/slimta/queue/__init__.py +++ b/slimta/queue/__init__.py @@ -33,14 +33,9 @@ import collections from itertools import repeat -try: - from itertools import imap -except ImportError: - imap = map - import gevent from gevent import Greenlet -from gevent.event import Event +from gevent.event import Event # type: ignore from gevent.lock import Semaphore from gevent.pool import Pool @@ -50,6 +45,7 @@ from slimta.smtp.reply import Reply from slimta.bounce import Bounce from slimta.policy import QueuePolicy +from slimta.util.pycompat import map __all__ = ['QueueError', 'Queue', 'QueueStorage'] @@ -292,7 +288,7 @@ def _pool_run(self, which, func, *args, **kwargs): def _pool_imap(self, which, func, *iterables): pool = getattr(self, which+'_pool', gevent) - threads = imap(pool.spawn, repeat(func), *iterables) + threads = map(pool.spawn, repeat(func), *iterables) ret = [] for thread in threads: thread.join() diff --git a/slimta/redisstorage/__init__.py b/slimta/redisstorage/__init__.py new file mode 100644 index 00000000..0049fd30 --- /dev/null +++ b/slimta/redisstorage/__init__.py @@ -0,0 +1,152 @@ +# Copyright (c) 2012 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""Package implementing the :mod:`~slimta.queue` storage system using redis_. + +.. _redis: http://redis.io/ + +""" + +from __future__ import absolute_import + +import uuid +import time + +from six.moves import cPickle + +import redis +from gevent import socket + +from slimta.queue import QueueStorage +from slimta import logging + +__all__ = ['RedisStorage'] + +log = logging.getQueueStorageLogger(__name__) + + +class GeventConnection(redis.Connection): + + def _connect(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(self.socket_timeout) + sock.connect((self.host, self.port)) + return sock + + +class RedisStorage(QueueStorage): + """|QueueStorage| mechanism that stores |Envelope| and queue metadata in + redis hashes. + + :param host: Hostname of the redis server to connect to. + :param port: Port to connect to. + :param db: Database number to create keys in. + :param password: Optional password to authenticate with. + :param socket_timeout: Timeout, in seconds, for socket operations. If the + timeout is hit, :py:exc:`socket.timeout` is raised. + ``None`` disables the timeout. + :param prefix: Any key created is prefixed with this string. + :type prefix: str + + """ + + def __init__(self, host='localhost', port=6379, db=0, password=None, + socket_timeout=None, prefix='slimta:'): + super(RedisStorage, self).__init__() + pool = redis.ConnectionPool(connection_class=GeventConnection, + host=host, port=port, db=db, + password=password, + socket_timeout=socket_timeout) + self.redis = redis.StrictRedis(connection_pool=pool) + self.prefix = prefix + self.queue_key = '{0}queue'.format(prefix) + + def _get_key(self, id): + if isinstance(id, bytes): + id = id.decode('ascii') + return self.prefix + id + + def write(self, envelope, timestamp): + envelope_raw = cPickle.dumps(envelope, cPickle.HIGHEST_PROTOCOL) + while True: + id = uuid.uuid4().hex + key = self._get_key(id) + if self.redis.hsetnx(key, 'envelope', envelope_raw): + queue_raw = cPickle.dumps((timestamp, id), + cPickle.HIGHEST_PROTOCOL) + pipe = self.redis.pipeline() + pipe.hmset(key, {'timestamp': timestamp, + 'attempts': 0}) + pipe.rpush(self.queue_key, queue_raw) + pipe.execute() + log.write(id, envelope) + return id + + def set_timestamp(self, id, timestamp): + self.redis.hset(self._get_key(id), 'timestamp', timestamp) + log.update_meta(id, timestamp=timestamp) + + def increment_attempts(self, id): + new_attempts = self.redis.hincrby(self._get_key(id), 'attempts', 1) + log.update_meta(id, attempts=new_attempts) + return new_attempts + + def set_recipients_delivered(self, id, rcpt_indexes): + current = self.redis.hget(self._get_key(id), 'delivered_indexes') + new_indexes = rcpt_indexes + if current: + new_indexes = cPickle.loads(current) + rcpt_indexes + self.redis.hset(self._get_key(id), 'delivered_indexes', + cPickle.dumps(new_indexes, cPickle.HIGHEST_PROTOCOL)) + log.update_meta(id, delivered_indexes=rcpt_indexes) + + def load(self): + for key in self.redis.keys(self.prefix+'*'): + if key != self.queue_key: + id = key[len(self.prefix):] + timestamp = self.redis.hget(key, 'timestamp') or time.time() + yield float(timestamp), id + + def get(self, id): + envelope_raw, attempts, delivered_indexes_raw = \ + self.redis.hmget(self._get_key(id), 'envelope', 'attempts', + 'delivered_indexes') + if not envelope_raw: + raise KeyError(id) + envelope = cPickle.loads(envelope_raw) + del envelope_raw + if delivered_indexes_raw: + delivered_indexes = cPickle.loads(delivered_indexes_raw) + self._remove_delivered_rcpts(envelope, delivered_indexes) + return envelope, int(attempts or 0) + + def remove(self, id): + self.redis.delete(self._get_key(id)) + log.remove(id) + + def wait(self): + ret = self.redis.blpop([self.queue_key], 0) + if ret: + return [cPickle.loads(ret[1])] + return [] + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/relay/__init__.py b/slimta/relay/__init__.py index efed8cd6..d3ce6e66 100644 --- a/slimta/relay/__init__.py +++ b/slimta/relay/__init__.py @@ -34,6 +34,10 @@ class RelayError(SlimtaError): + + _default_code = None + _default_esc = None + def __init__(self, msg, reply=None): super(RelayError, self).__init__(msg) if reply: diff --git a/slimta/relay/pool.py b/slimta/relay/pool.py index f24661ed..42078ce0 100644 --- a/slimta/relay/pool.py +++ b/slimta/relay/pool.py @@ -26,7 +26,7 @@ from __future__ import absolute_import from gevent import Greenlet, Timeout -from gevent.event import AsyncResult +from gevent.event import AsyncResult # type: ignore from slimta.util.deque import BlockingDeque from . import Relay diff --git a/slimta/relay/smtp/static.py b/slimta/relay/smtp/static.py index 97791e1d..dcf223ff 100644 --- a/slimta/relay/smtp/static.py +++ b/slimta/relay/smtp/static.py @@ -104,7 +104,7 @@ def __init__(self, host, port=25, pool_size=None, client_class=None, def add_client(self): return self._client_class((self.host, self.port), self.queue, - **self._client_kwargs) + **self._client_kwargs) class StaticLmtpRelay(StaticSmtpRelay): diff --git a/slimta/smtp/server.py b/slimta/smtp/server.py index b70add4e..83eeed66 100644 --- a/slimta/smtp/server.py +++ b/slimta/smtp/server.py @@ -37,7 +37,8 @@ from .io import IO from .extensions import Extensions from .auth import ServerAuthError, AuthSession -from .reply import * # NOQA +from .reply import Reply, tls_failure, unknown_command, bad_arguments, \ + unhandled_error, timed_out, bad_sequence, unknown_parameter __all__ = ['Server'] @@ -60,7 +61,7 @@ def find_outside_quotes(haystack, needle, start_i=0, quotes=b'"'): quoted = quote break elif haystack[i] == quoted: - quoted = None + quoted = None return -1 diff --git a/slimta/util/__init__.py b/slimta/util/__init__.py index bc957677..2c164cae 100644 --- a/slimta/util/__init__.py +++ b/slimta/util/__init__.py @@ -119,11 +119,11 @@ def create_listeners(address, def _init_socket(sock, sockaddr): try: sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) - except socket.error as exc: + except socket.error: pass try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - except socket.error as exc: + except socket.error: pass sock.setblocking(0) sock.bind(sockaddr) diff --git a/slimta/util/bytesformat.py b/slimta/util/bytesformat.py index 21473822..b1e1e382 100644 --- a/slimta/util/bytesformat.py +++ b/slimta/util/bytesformat.py @@ -41,7 +41,7 @@ class BytesFormat(object): :py:func:`str.format`. During construction, the template string is scanned for matching ``{...}`` - pairs that contain only characters that match the ``\w`` regular + pairs that contain only characters that match the ``\\w`` regular expression. In the :meth:`.format` method, these ``{...}`` are replaced with a matching argument's value, if an argument matches, or the action specified by ``mode`` happens when it does not match diff --git a/slimta/util/dns.py b/slimta/util/dns.py index b2dbe117..c393cedb 100644 --- a/slimta/util/dns.py +++ b/slimta/util/dns.py @@ -31,7 +31,7 @@ import pycares.errno import gevent from gevent import select -from gevent.event import AsyncResult +from gevent.event import AsyncResult # type: ignore from slimta import logging diff --git a/slimta/util/proxyproto.py b/slimta/util/proxyproto.py index 79b157ea..a0110b83 100644 --- a/slimta/util/proxyproto.py +++ b/slimta/util/proxyproto.py @@ -82,16 +82,18 @@ def __read_pp_line(cls, sock, initial): buf[0:len(initial)] = initial read = initial while len(read) < 8: - where = memoryview(buf)[len(read):] + where = memoryview(buf)[len(read):] # type: ignore read_n = sock.recv_into(where, 8-len(read)) assert read_n, 'Received EOF during proxy protocol header' - read = memoryview(buf)[0:len(read)+read_n].tobytes() + read_view = memoryview(buf)[0:len(read)+read_n] # type: ignore + read = read_view.tobytes() while len(read) < len(buf): - where = memoryview(buf)[len(read):] + where = memoryview(buf)[len(read):] # type: ignore try_read = min(len(where), 1 if read.endswith(b'\r') else 2) read_n = sock.recv_into(where, try_read) assert read_n, 'Received EOF during proxy protocol header' - read = memoryview(buf)[0:len(read)+read_n].tobytes() + read_view = memoryview(buf)[0:len(read)+read_n] # type: ignore + read = read_view.tobytes() if read.endswith(b'\r\n'): break return read @@ -186,7 +188,8 @@ def handle(self, sock, addr): src_addr = invalid_pp_source_address else: log.proxyproto_success(sock, src_addr) - return super(ProxyProtocolV1, self).handle(sock, src_addr) + super_obj = super(ProxyProtocolV1, self) + super_obj.handle(sock, src_addr) # type: ignore class ProxyProtocolV2(object): @@ -216,10 +219,11 @@ def __read_pp_data(cls, sock, length, initial): buf[0:len(initial)] = initial read = initial while len(read) < len(buf): - where = memoryview(buf)[len(read):] + where = memoryview(buf)[len(read):] # type: ignore read_n = sock.recv_into(where, len(buf)-len(read)) assert read_n, 'Received EOF during proxy protocol header' - read = memoryview(buf)[0:len(read)+read_n].tobytes() + read_view = memoryview(buf)[0:len(read)+read_n] # type: ignore + read = read_view.tobytes() return bytearray(read) @classmethod @@ -237,18 +241,19 @@ def __parse_pp_data(cls, data): def __parse_pp_addresses(cls, family, addr_data): if family == socket.AF_INET: src_ip, dst_ip, src_port, dst_port = \ - struct.unpack('!4s4sHH', addr_data) + struct.unpack('!4s4sHH', addr_data[0:12]) src_addr = (socket.inet_ntop(family, src_ip), src_port) dst_addr = (socket.inet_ntop(family, dst_ip), dst_port) return src_addr, dst_addr elif family == socket.AF_INET6: src_ip, dst_ip, src_port, dst_port = \ - struct.unpack('!16s16sHH', addr_data) + struct.unpack('!16s16sHH', addr_data[0:36]) src_addr = (socket.inet_ntop(family, src_ip), src_port) dst_addr = (socket.inet_ntop(family, dst_ip), dst_port) return src_addr, dst_addr elif family == socket.AF_UNIX: - src_addr, dst_addr = struct.unpack('!108s108s', addr_data) + addr_data = addr_data[0:216] + src_addr, dst_addr = struct.unpack('!108s108s', addr_data[0:216]) return src_addr.rstrip(b'\x00'), dst_addr.rstrip(b'\x00') else: return unknown_pp_source_address, unknown_pp_dest_address @@ -297,7 +302,8 @@ def handle(self, sock, addr): src_addr = invalid_pp_source_address else: log.proxyproto_success(sock, src_addr) - return super(ProxyProtocolV2, self).handle(sock, src_addr) + super_obj = super(ProxyProtocolV2, self) + super_obj.handle(sock, src_addr) # type: ignore class ProxyProtocol(object): @@ -318,10 +324,11 @@ def __read_pp_initial(cls, sock): buf = bytearray(8) read = b'' while len(read) < len(buf): - where = memoryview(buf)[len(read):] + where = memoryview(buf)[len(read):] # type: ignore read_n = sock.recv_into(where, 8-len(read)) assert read_n, 'Received EOF during proxy protocol header' - read = memoryview(buf)[0:len(read)+read_n].tobytes() + read_view = memoryview(buf)[0:len(read)+read_n] # type: ignore + read = read_view.tobytes() return read @classmethod @@ -361,7 +368,8 @@ def handle(self, sock, addr): src_addr = invalid_pp_source_address else: log.proxyproto_success(sock, src_addr) - return super(ProxyProtocol, self).handle(sock, src_addr) + super_obj = super(ProxyProtocol, self) + super_obj.handle(sock, src_addr) # type: ignore # vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/util/pycompat.py b/slimta/util/pycompat.py index 8afa9212..f81edad1 100644 --- a/slimta/util/pycompat.py +++ b/slimta/util/pycompat.py @@ -27,23 +27,44 @@ from __future__ import absolute_import import sys +from functools import partial + +__all__ = ['PY3', 'PY2', 'map', 'urlparse', 'httplib', 'reprlib', + 'parser_class', 'generator_class'] + +#: True if the interpreter is Python 3.x, False otherwise. +PY3 = (sys.version_info[0] == 3) + +#: True if the interpreter is Python 2.x, False otherwise. +PY2 = (sys.version_info[0] == 2) + +if PY3: + map_func = map -try: from urllib import parse as urlparse_mod from http import client as httplib_mod import reprlib as reprlib_mod -except ImportError: + + from email.generator import BytesGenerator + from email.parser import BytesParser + from email.policy import SMTP + parser = partial(BytesParser, policy=SMTP) + generator = partial(BytesGenerator, policy=SMTP) +else: + from itertools import imap + map_func = imap + import urlparse as urlparse_mod import httplib as httplib_mod import repr as reprlib_mod -__all__ = ['PY3', 'PY2'] - -#: True if the interpreter is Python 3.x, False otherwise. -PY3 = (sys.version_info[0] == 3) + from email.generator import Generator + from email.parser import Parser + parser = Parser + generator = Generator -#: True if the interpreter is Python 2.x, False otherwise. -PY2 = (sys.version_info[0] == 2) +#: The ``itertools.imap`` function on Python 2, ``map`` on Python 3. +map = map_func #: The ``urlparse`` module on Python 2, ``urllib.parse`` on Python 3. urlparse = urlparse_mod @@ -55,5 +76,13 @@ #: The ``repr`` module on Python 2, ``reprlib`` on Python 3. reprlib = reprlib_mod +#: An ``email.parser.Parser`` instance on Python 2, an +#: ``email.parser.BytesParser`` instance on Python 3. +parser_class = parser + +#: An ``email.generator.Generator`` instance on Python 2, an +#: ``email.generator.BytesGenerator`` instance on Python 3. +generator_class = generator + # vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/slimta/util/spf.py b/slimta/util/spf.py new file mode 100644 index 00000000..4c5c1443 --- /dev/null +++ b/slimta/util/spf.py @@ -0,0 +1,117 @@ +# Copyright (c) 2013 Ian C. Good +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# + +"""This module provides classes to check the `SPF`_ records of the sending +client address. + +.. _SPF: http://en.wikipedia.org/wiki/Sender_Policy_Framework + +""" + +from __future__ import absolute_import + +from functools import wraps + +import gevent +import spf + +__all__ = ['EnforceSpf'] + + +class EnforceSpf(object): + """Class used to check SPF records and enforce a policy against the + results. By default, results are logged but not acted upon. + + :param timeout: Timeout in seconds before giving up the check. An SPF check + that times out is equivalent to a ``'temperror'`` result. + + """ + + def __init__(self, timeout=10.0): + self.policies = {} + self.timeout = timeout + + def set_enforcement(self, result, match_code='550', + match_message='5.7.1 Access denied'): + """Adds an enforcement policy to a particular SPF result. If the given + result is seen, the ``MAIL FROM`` reply is set accordingly. + + :param result: The result code, one of ``'pass'``, ``'permerror'``, + ``'fail'``, ``'temperror'``, ``'softfail'``, ``'none'``, + ``'neutral'``. + :param match_code: When the result code matches, set the |Reply| code + to this string. + :param match_message: When the result code matches, set the |Reply| + message to this string. You can use the + ``{reason}`` template in your string. + + """ + if result.lower() not in ['pass', 'permerror', 'fail', 'temperror', + 'softfail', 'none', 'neutral']: + raise ValueError(result) + self.policies[result.lower()] = (match_code, match_message) + + def query(self, sender, ip, ehlo_as): + """Performs a direct query to check the sender's domain to see if the + given IP and EHLO string are authorized to send for that domain. + + :param sender: The sender address. + :param ip: The IP address string of the sending client. + :param ehlo_as: The EHLO string given by the sending client. + :returns: A tuple of the result and reason strings. + + """ + result, reason = 'temperror', 'Timed out' + with gevent.Timeout(self.timeout, False): + result, reason = spf.check2(i=ip, s=sender, h=ehlo_as) + return result, reason + + def check(self, f): + """Decorates :class:`~slimta.edge.smtp.SmtpValidators` methods that are + given a |Reply| object. It will check the current SMTP session's + connecting IP address and EHLO string against the given sender address. + If enforcement policies are set for the result, the |Reply| is modified + before calling the validator method. + + This decorator can only be used on ``handle_mail()``, + ``handle_rcpt()``, and ``handle_data()``. + + :param f: The overloaded :class:`~slimta.edge.smtp.SmtpValidators` + method to decorate. + + """ + @wraps(f) + def new_f(f_self, reply, *args, **kwargs): + ip = f_self.session.address[0] + ehlo_as = f_self.session.ehlo_as + if f_self.session.envelope: + sender = f_self.session.envelope.sender + else: + sender = args[0] + result, reason = self.query(sender, ip, ehlo_as) + if result in self.policies: + reply.code = self.policies[result][0] + reply.message = self.policies[result][1].format(reason=reason) + return f(f_self, reply, *args, **kwargs) + return new_f + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/requirements.txt b/test/requirements.txt index 32baf50a..31baca94 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,6 +1,9 @@ pytest >= 4 pytest-cov +pytype mox3 testfixtures flake8 twine + +.[spf,redis,aws,disk] diff --git a/test/test_slimta_cloudstorage.py b/test/test_slimta_cloudstorage.py new file mode 100644 index 00000000..c7779e62 --- /dev/null +++ b/test/test_slimta_cloudstorage.py @@ -0,0 +1,96 @@ + +from mox3.mox import MoxTestBase, IsA + +from slimta.queue import QueueError +from slimta.envelope import Envelope +from slimta.cloudstorage import CloudStorage, CloudStorageError + + +class TestCloudStorage(MoxTestBase): + + def setUp(self): + super(TestCloudStorage, self).setUp() + self.obj_store = self.mox.CreateMockAnything() + self.msg_queue = self.mox.CreateMockAnything() + + def test_exception_inheritance(self): + self.assertTrue(isinstance(CloudStorageError(), QueueError)) + + def test_write(self): + env = Envelope('sender@example.com', ['rcpt@example.com']) + self.obj_store.write_message(env, 1234.0).AndReturn('testid') + self.msg_queue.queue_message('testid', 1234.0) + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + self.assertEqual('testid', storage.write(env, 1234.0)) + + def test_write_msg_queue_exception(self): + env = Envelope('sender@example.com', ['rcpt@example.com']) + self.obj_store.write_message(env, 1234.0).AndReturn('testid') + self.msg_queue.queue_message('testid', 1234.0).AndRaise(Exception) + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + self.assertEqual('testid', storage.write(env, 1234.0)) + + def test_write_no_msg_queue(self): + env = Envelope('sender@example.com', ['rcpt@example.com']) + self.obj_store.write_message(env, 1234.0).AndReturn('testid') + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store) + self.assertEqual('testid', storage.write(env, 1234.0)) + + def test_set_timestamp(self): + self.obj_store.set_message_meta('testid', timestamp=1234.0) + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + storage.set_timestamp('testid', 1234.0) + + def test_increment_attempts(self): + self.obj_store.get_message_meta('testid').AndReturn( + {'attempts': 3}) + self.obj_store.set_message_meta('testid', attempts=4) + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + self.assertEqual(4, storage.increment_attempts('testid')) + + def test_load(self): + self.obj_store.list_messages().AndReturn(['1', '2', '3']) + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + self.assertEqual(['1', '2', '3'], storage.load()) + + def test_get(self): + env = Envelope('sender@example.com', ['rcpt1@example.com', + 'rcpt2@example.com']) + self.obj_store.get_message('testid').AndReturn( + (env, {'attempts': 3, + 'delivered_indexes': [0]})) + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + env, attempts = storage.get('testid') + self.assertEqual('sender@example.com', env.sender) + self.assertEqual(['rcpt2@example.com'], env.recipients) + self.assertEqual(3, attempts) + + def test_remove(self): + self.obj_store.delete_message('testid') + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + storage.remove('testid') + + def test_wait(self): + self.msg_queue.poll().AndReturn([(1234.0, 'storeid1', 'msgid1'), (5678.0, 'storeid2', 'msgid2')]) + self.msg_queue.delete('msgid1') + self.msg_queue.delete('msgid2') + self.msg_queue.sleep() + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store, self.msg_queue) + self.assertEqual([(1234.0, 'storeid1'), (5678.0, 'storeid2')], list(storage.wait())) + + def test_wait_no_msg_queue(self): + self.mox.ReplayAll() + storage = CloudStorage(self.obj_store) + self.assertRaises(NotImplementedError, list, storage.wait()) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_cloudstorage_aws.py b/test/test_slimta_cloudstorage_aws.py new file mode 100644 index 00000000..7e92369d --- /dev/null +++ b/test/test_slimta_cloudstorage_aws.py @@ -0,0 +1,128 @@ + +import json + +from mox3.mox import MoxTestBase, IsA +from six.moves import cPickle +import gevent + +from boto.s3.bucket import Bucket +from boto.s3.key import Key +from boto.sqs.queue import Queue +from boto.sqs.message import Message + +from slimta.envelope import Envelope +from slimta.cloudstorage.aws import SimpleStorageService, SimpleQueueService + + +class TestSimpleStorageService(MoxTestBase): + + def setUp(self): + super(TestSimpleStorageService, self).setUp() + self.bucket = self.mox.CreateMock(Bucket) + self.key = self.mox.CreateMock(Key) + self.s3 = SimpleStorageService(self.bucket, prefix='test-') + self.s3.Key = self.mox.CreateMockAnything() + self.env = Envelope('sender@example.com', ['rcpt@example.com']) + self.pickled_env = cPickle.dumps(self.env, cPickle.HIGHEST_PROTOCOL) + + def test_write_message(self): + self.s3.Key.__call__(self.bucket).AndReturn(self.key) + self.key.set_metadata('timestamp', '1234.0') + self.key.set_metadata('attempts', '') + self.key.set_metadata('delivered_indexes', '') + self.key.set_contents_from_string(self.pickled_env) + self.mox.ReplayAll() + self.s3.write_message(self.env, 1234.0) + self.assertTrue(isinstance(self.key.key, str)) + self.assertTrue(self.key.key.startswith('test-')) + + def test_set_message_meta(self): + self.bucket.get_key('storeid').AndReturn(self.key) + self.key.set_metadata('timestamp', '5678.0') + self.key.set_metadata('attempts', '3') + self.mox.ReplayAll() + self.s3.set_message_meta('storeid', 5678.0, 3) + + def test_delete_message(self): + self.bucket.get_key('storeid').AndReturn(self.key) + self.key.delete() + self.mox.ReplayAll() + self.s3.delete_message('storeid') + + def test_get_message(self): + self.bucket.get_key('storeid').AndReturn(self.key) + self.key.get_contents_as_string().AndReturn(self.pickled_env) + self.key.get_metadata('timestamp').AndReturn('4321.0') + self.key.get_metadata('attempts').AndReturn('5') + self.key.get_metadata('delivered_indexes').AndReturn('') + self.mox.ReplayAll() + env, meta = self.s3.get_message('storeid') + self.assertEqual('sender@example.com', env.sender) + self.assertEqual(['rcpt@example.com'], env.recipients) + self.assertEqual(4321.0, meta['timestamp']) + self.assertEqual(5, meta['attempts']) + self.assertFalse('delivered_indexes' in meta) + + def test_get_message_meta(self): + self.bucket.get_key('storeid').AndReturn(self.key) + self.key.get_metadata('timestamp').AndReturn('4321.0') + self.key.get_metadata('attempts').AndReturn('5') + self.key.get_metadata('delivered_indexes').AndReturn('[1, 2]') + self.mox.ReplayAll() + meta = self.s3.get_message_meta('storeid') + self.assertEqual(4321.0, meta['timestamp']) + self.assertEqual(5, meta['attempts']) + self.assertEqual([1, 2], meta['delivered_indexes']) + + def test_list_messages(self): + self.mox.StubOutWithMock(self.s3, 'get_message_meta') + self.bucket.list('test-').AndReturn(['test-storeid1', 'test-storeid2']) + self.s3.get_message_meta('test-storeid1').AndReturn((1234.0, 1)) + self.s3.get_message_meta('test-storeid2').AndReturn((5678.0, 2)) + self.mox.ReplayAll() + ret = list(self.s3.list_messages()) + self.assertEqual([(1234.0, 'test-storeid1'), (5678.0, 'test-storeid2')], ret) + + +class TestSimpleQueueService(MoxTestBase): + + def setUp(self): + super(TestSimpleQueueService, self).setUp() + self.queue = self.mox.CreateMock(Queue) + self.sqs = SimpleQueueService(self.queue) + + def test_queue_message(self): + self.sqs.Message = self.mox.CreateMockAnything() + msg = self.mox.CreateMock(Message) + self.sqs.Message.__call__().AndReturn(msg) + msg.set_body(json.dumps({'timestamp': 1234.0, 'storage_id': 'storeid'})) + self.queue.write(msg).AndReturn(False) + self.queue.write(msg).AndReturn(True) + self.mox.ReplayAll() + self.sqs.queue_message('storeid', 1234.0) + + def test_poll(self): + msg1 = self.mox.CreateMock(Message) + msg2 = self.mox.CreateMock(Message) + self.queue.get_messages().AndReturn([msg1, msg2]) + msg1.get_body().AndReturn('{"timestamp": 1234.0, "storage_id": "storeid1"}') + msg2.get_body().AndReturn('{"timestamp": 5678.0, "storage_id": "storeid2"}') + self.mox.ReplayAll() + ret = list(self.sqs.poll()) + self.assertEqual([(1234.0, 'storeid1', msg1), (5678.0, 'storeid2', msg2)], ret) + + def test_sleep(self): + self.mox.StubOutWithMock(gevent, 'sleep') + gevent.sleep(13.0) + self.mox.ReplayAll() + sqs = SimpleQueueService(None, poll_pause=13.0) + sqs.sleep() + + def test_delete(self): + msg = self.mox.CreateMock(Message) + self.queue.delete_message(msg) + self.mox.ReplayAll() + self.sqs.delete(msg) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_cloudstorage_rackspace_auth.py b/test/test_slimta_cloudstorage_rackspace_auth.py new file mode 100644 index 00000000..531d6d1c --- /dev/null +++ b/test/test_slimta_cloudstorage_rackspace_auth.py @@ -0,0 +1,95 @@ + +import json + +from mox3.mox import MoxTestBase, IsA, Func + +from slimta.cloudstorage.rackspace import RackspaceError, RackspaceCloudAuth + + +class TestRackspaceCloudAuth(MoxTestBase): + + def setUp(self): + super(TestRackspaceCloudAuth, self).setUp() + self.response_payload = {'access': { + 'token': {'id': 'tokenid'}, + 'serviceCatalog': [ + {'type': 'object-store', + 'endpoints': [ + {'region': 'TEST', + 'publicURL': 'http://files/v1'}, + {'region': 'OTHER', + 'publicURL': 'http://files-other/v1'} + ]}, + {'type': 'rax:queues', + 'endpoints': [ + {'region': 'TEST', + 'publicURL': 'http://queues/v1'}, + {'region': 'OTHER', + 'publicURL': 'http://queues-other/v1'} + ]}, + ], + }} + + def test_response_error(self): + res = self.mox.CreateMockAnything() + res.status = 400 + res.reason = 'Bad Request' + exc = RackspaceError(res) + self.assertEqual("Received '400 Bad Request' from the API.", str(exc)) + self.assertEqual(res, exc.response) + + def test_create_token_func(self): + func = self.mox.CreateMockAnything() + func.__call__().AndReturn(('tokenid', 'files', 'queues')) + self.mox.ReplayAll() + auth = RackspaceCloudAuth({'function': func}) + self.assertEqual('tokenid', auth.token_id) + self.assertEqual('files', auth.files_endpoint) + self.assertEqual('queues', auth.queues_endpoint) + + def test_create_token_password(self): + auth = RackspaceCloudAuth({'username': 'testuser', 'password': 'testpass'}, 'http://test/v1', 'TEST') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(auth, 'get_connection') + auth.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('POST', '/v1/tokens') + conn.putheader('Host', 'test') + conn.putheader('Content-Type', 'application/json') + conn.putheader('Content-Length', '83') + conn.putheader('Accept', 'application/json') + conn.endheaders('{"auth": {"passwordCredentials": {"password": "testpass", "username": "testuser"}}}') + res.status = 200 + res.reason = 'OK' + conn.getresponse().AndReturn(res) + res.getheaders().AndReturn([]) + res.read().AndReturn(json.dumps(self.response_payload, sort_keys=True)) + self.mox.ReplayAll() + self.assertEqual('tokenid', auth.token_id) + self.assertEqual('http://files/v1', auth.files_endpoint) + self.assertEqual('http://queues/v1', auth.queues_endpoint) + + def test_create_token_api_key(self): + auth = RackspaceCloudAuth({'username': 'testuser', 'api_key': 'testkey'}, 'http://test/v1', 'TEST') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(auth, 'get_connection') + auth.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('POST', '/v1/tokens') + conn.putheader('Host', 'test') + conn.putheader('Content-Type', 'application/json') + conn.putheader('Content-Length', '88') + conn.putheader('Accept', 'application/json') + conn.endheaders('{"auth": {"RAX-KSKEY:apiKeyCredentials": {"apiKey": "testkey", "username": "testuser"}}}') + res.status = 200 + res.reason = 'OK' + conn.getresponse().AndReturn(res) + res.getheaders().AndReturn([]) + res.read().AndReturn(json.dumps(self.response_payload, sort_keys=True)) + self.mox.ReplayAll() + self.assertEqual('tokenid', auth.token_id) + self.assertEqual('http://files/v1', auth.files_endpoint) + self.assertEqual('http://queues/v1', auth.queues_endpoint) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_cloudstorage_rackspace_files.py b/test/test_slimta_cloudstorage_rackspace_files.py new file mode 100644 index 00000000..5866e64d --- /dev/null +++ b/test/test_slimta_cloudstorage_rackspace_files.py @@ -0,0 +1,165 @@ + +import re + +from mox3.mox import MoxTestBase, IsA, Func +from six.moves import cPickle + +from slimta.envelope import Envelope +from slimta.cloudstorage.rackspace import RackspaceCloudAuth, \ + RackspaceCloudFiles + + +def _is_files_path(path): + match = re.match('^/v1/test/[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}$', path) + return match + + +class TestRackspaceCloudFiles(MoxTestBase): + + def setUp(self): + super(TestRackspaceCloudFiles, self).setUp() + self.auth = self.mox.CreateMock(RackspaceCloudAuth) + self.auth.token_id = 'tokenid' + self.auth.files_endpoint = 'http://files/v1' + self.env = Envelope('sender@example.com', ['rcpt@example.com']) + self.pickled_env = cPickle.dumps(self.env, cPickle.HIGHEST_PROTOCOL) + + def test_write_message(self): + files = RackspaceCloudFiles(self.auth, container='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(files, 'get_connection') + files.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('PUT', Func(_is_files_path)) + conn.putheader('Host', 'files') + conn.putheader('Content-Type', 'application/octet-stream') + conn.putheader('Content-Length', str(len(self.pickled_env))) + conn.putheader('X-Object-Meta-Timestamp', '1234.0') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders(self.pickled_env) + conn.getresponse().AndReturn(res) + res.status = 201 + res.reason = 'Created' + res.getheaders().AndReturn([]) + self.mox.ReplayAll() + self.assertTrue(files.write_message(self.env, 1234.0)) + + def test_set_message_meta(self): + files = RackspaceCloudFiles(self.auth, container='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(files, 'get_connection') + files.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('POST', '/v1/test/4321') + conn.putheader('Host', 'files') + conn.putheader('X-Auth-Token', 'tokenid') + conn.putheader('X-Object-Meta-Timestamp', '1234.0') + conn.putheader('X-Object-Meta-Attempts', '3') + conn.endheaders() + conn.getresponse().AndReturn(res) + res.status = 202 + res.reason = 'Accepted' + res.getheaders().AndReturn([]) + self.mox.ReplayAll() + files.set_message_meta('4321', 1234.0, 3) + + def test_delete_message(self): + files = RackspaceCloudFiles(self.auth, container='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(files, 'get_connection') + files.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('DELETE', '/v1/test/4321') + conn.putheader('Host', 'files') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders() + conn.getresponse().AndReturn(res) + res.status = 204 + res.reason = 'No Content' + res.getheaders().AndReturn([]) + self.mox.ReplayAll() + files.delete_message('4321') + + def test_get_message(self): + files = RackspaceCloudFiles(self.auth, container='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(files, 'get_connection') + files.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('GET', '/v1/test/4321') + conn.putheader('Host', 'files') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders() + conn.getresponse().AndReturn(res) + res.status = 200 + res.reason = 'OK' + res.getheaders().AndReturn([]) + res.read().AndReturn(self.pickled_env) + res.getheader('X-Object-Meta-Timestamp').AndReturn('1234.0') + res.getheader('X-Object-Meta-Attempts', None).AndReturn('3') + res.getheader('X-Object-Meta-Delivered-Rcpts', None).AndReturn('[1, 2]') + self.mox.ReplayAll() + env, meta = files.get_message('4321') + self.assertTrue(isinstance(env, Envelope)) + self.assertEqual('sender@example.com', env.sender) + self.assertEqual(['rcpt@example.com'], env.recipients) + self.assertEqual(1234.0, meta['timestamp']) + self.assertEqual(3, meta['attempts']) + self.assertEqual([1, 2], meta['delivered_indexes']) + + def test_get_message_meta(self): + files = RackspaceCloudFiles(self.auth, container='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(files, 'get_connection') + files.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('HEAD', '/v1/test/4321') + conn.putheader('Host', 'files') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders() + conn.getresponse().AndReturn(res) + res.status = 200 + res.reason = 'OK' + res.getheaders().AndReturn([]) + res.getheader('X-Object-Meta-Timestamp').AndReturn('1234.0') + res.getheader('X-Object-Meta-Attempts', None).AndReturn('3') + res.getheader('X-Object-Meta-Delivered-Rcpts', None).AndReturn(None) + self.mox.ReplayAll() + meta = files.get_message_meta('4321') + self.assertEqual(1234.0, meta['timestamp']) + self.assertEqual(3, meta['attempts']) + self.assertFalse('delivered_indexes' in meta) + + def test_list_messages_page(self): + files = RackspaceCloudFiles(self.auth, container='test', prefix='test-') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(files, 'get_connection') + files.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('GET', '/v1/test?limit=1000&marker=marker') + conn.putheader('Host', 'files') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders() + conn.getresponse().AndReturn(res) + res.status = 200 + res.reason = 'OK' + res.getheaders().AndReturn([]) + res.read().AndReturn('test-one\ntest-two\ntest-three\nfour') + self.mox.ReplayAll() + lines, marker = files._list_messages_page('marker') + self.assertEqual(['test-one', 'test-two', 'test-three'], lines) + + def test_list_messages(self): + files = RackspaceCloudFiles(self.auth, container='test') + self.mox.StubOutWithMock(files, '_list_messages_page') + self.mox.StubOutWithMock(files, 'get_message_meta') + files._list_messages_page(None).AndReturn((['one', 'two'], 'two')) + files._list_messages_page('two').AndReturn(([], None)) + files.get_message_meta('one').AndReturn((1234.0, 0)) + files.get_message_meta('two').AndReturn((5678.0, 0)) + self.mox.ReplayAll() + results = list(files.list_messages()) + self.assertEqual([(1234.0, 'one'), (5678.0, 'two')], results) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_cloudstorage_rackspace_queues.py b/test/test_slimta_cloudstorage_rackspace_queues.py new file mode 100644 index 00000000..f55aafd3 --- /dev/null +++ b/test/test_slimta_cloudstorage_rackspace_queues.py @@ -0,0 +1,94 @@ + +import json + +import gevent +from mox3.mox import MoxTestBase, IsA, Func + +from slimta.cloudstorage.rackspace import RackspaceCloudAuth, \ + RackspaceCloudQueues + + +class TestRackspaceCloudQueues(MoxTestBase): + + def setUp(self): + super(TestRackspaceCloudQueues, self).setUp() + self.auth = self.mox.CreateMock(RackspaceCloudAuth) + self.auth.token_id = 'tokenid' + self.auth.queues_endpoint = 'http://queues/v1' + + def test_queue_message(self): + queues = RackspaceCloudQueues(self.auth, queue_name='test', client_id='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(queues, 'get_connection') + queues.get_connection(IsA(tuple), {}).AndReturn(conn) + json_payload = json.dumps([{'ttl': 86400, 'body': {'timestamp': 1234.0, 'storage_id': 'asdf'}}], sort_keys=True) + conn.putrequest('POST', '/v1/queues/test/messages') + conn.putheader('Host', 'queues') + conn.putheader('Client-ID', 'test') + conn.putheader('Content-Type', 'application/json') + conn.putheader('Content-Length', str(len(json_payload))) + conn.putheader('Accept', 'application/json') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders(json_payload) + conn.getresponse().AndReturn(res) + res.status = 201 + res.reason = 'Created' + res.getheaders().AndReturn([]) + self.mox.ReplayAll() + queues.queue_message('asdf', 1234.0) + + def test_poll(self): + queues = RackspaceCloudQueues(self.auth, queue_name='test', client_id='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(queues, 'get_connection') + queues.get_connection(IsA(tuple), {}).AndReturn(conn) + json_payload = '{"ttl": 3600, "grace": 3600}' + conn.putrequest('POST', '/v1/queues/test/claims') + conn.putheader('Host', 'queues') + conn.putheader('Client-ID', 'test') + conn.putheader('Content-Type', 'application/json') + conn.putheader('Content-Length', str(len(json_payload))) + conn.putheader('Accept', 'application/json') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders(json_payload) + conn.getresponse().AndReturn(res) + res.status = 201 + res.reason = 'Created' + res.getheaders().AndReturn([]) + res.read().AndReturn("""[{"body": {"timestamp": 1234.0, "storage_id": "storeid1"}, "href": "msgid1"}, + {"body": {"timestamp": 5678.0, "storage_id": "storeid2"}, "href": "msgid2"}]""") + self.mox.ReplayAll() + results = list(queues.poll()) + self.assertEqual([(1234.0, 'storeid1', 'msgid1'), (5678.0, 'storeid2', 'msgid2')], results) + + def test_sleep(self): + queues = RackspaceCloudQueues(self.auth, poll_pause=1337.0) + self.mox.StubOutWithMock(gevent, 'sleep') + gevent.sleep(1337.0) + self.mox.ReplayAll() + queues.sleep() + + def test_delete(self): + queues = RackspaceCloudQueues(self.auth, client_id='test') + conn = self.mox.CreateMockAnything() + res = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(queues, 'get_connection') + queues.get_connection(IsA(tuple), {}).AndReturn(conn) + conn.putrequest('DELETE', '/path/to/msg') + conn.putheader('Host', 'queues') + conn.putheader('Client-ID', 'test') + conn.putheader('Content-Type', 'application/json') + conn.putheader('Accept', 'application/json') + conn.putheader('X-Auth-Token', 'tokenid') + conn.endheaders() + conn.getresponse().AndReturn(res) + res.status = 204 + res.reason = 'No Content' + res.getheaders().AndReturn([]) + self.mox.ReplayAll() + queues.delete('/path/to/msg') + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_logging.py b/test/test_slimta_logging.py index 952ca2e2..c532b72c 100644 --- a/test/test_slimta_logging.py +++ b/test/test_slimta_logging.py @@ -2,7 +2,8 @@ from testfixtures import log_capture -from slimta.logging import logline, parseline, log_exception +from slimta.logging import parseline, log_exception +from slimta.logging.log import logline class TestLogging(unittest.TestCase): diff --git a/test/test_slimta_lookup_dbapi2.py b/test/test_slimta_lookup_dbapi2.py new file mode 100644 index 00000000..36b796a8 --- /dev/null +++ b/test/test_slimta_lookup_dbapi2.py @@ -0,0 +1,59 @@ + +from contextlib import contextmanager + +from mox3.mox import MoxTestBase, IsA + +from slimta.lookup.drivers.dbapi2 import DBAPI2Lookup + + +class TestDBAPI2Lookup(MoxTestBase): + + def setUp(self): + super(TestDBAPI2Lookup, self).setUp() + self.conn = self.mox.CreateMockAnything() + self.cur = self.mox.CreateMockAnything() + @contextmanager + def conn_ctxmgr(): + yield self.conn + self.conn_ctxmgr = conn_ctxmgr + + def test_no_conn_ctxmgr(self): + drv = DBAPI2Lookup(None, None, conn=1234) + with drv.conn_ctxmgr() as conn: + self.assertEqual(1234, conn) + + def test_all_keywords_lookup_hit(self): + self.conn.cursor().AndReturn(self.cur) + self.cur.execute('test query', {'one': 1, 'two': 2}) + self.cur.fetchone().AndReturn({'test': 'pass'}) + self.conn.rollback() + self.cur.close() + self.mox.ReplayAll() + drv = DBAPI2Lookup(self.conn_ctxmgr, 'test query') + self.assertEqual({'test': 'pass'}, drv.lookup(one=1, two=2)) + + def test_all_keywords_lookup_miss(self): + self.conn.cursor().AndReturn(self.cur) + self.cur.execute('test query', {'one': 1, 'two': 2}) + self.cur.fetchone().AndReturn(None) + self.conn.rollback() + self.cur.close() + self.mox.ReplayAll() + drv = DBAPI2Lookup(self.conn_ctxmgr, 'test query') + self.assertEqual(None, drv.lookup(one=1, two=2)) + + def test_no_keywords_lookup_hit(self): + self.conn.cursor().AndReturn(self.cur) + self.cur.execute('test query', [1, 2]) + self.cur.fetchone().AndReturn([3, 4]) + self.conn.rollback() + self.cur.close() + self.mox.ReplayAll() + query_param_order = ['one', 'two'] + result_order = ['a', 'b'] + drv = DBAPI2Lookup(self.conn_ctxmgr, 'test query', + query_param_order, result_order) + self.assertEqual({'a': 3, 'b': 4}, drv.lookup(one=1, two=2)) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_lookup_dict.py b/test/test_slimta_lookup_dict.py new file mode 100644 index 00000000..bd70bd68 --- /dev/null +++ b/test/test_slimta_lookup_dict.py @@ -0,0 +1,36 @@ + +from mox3.mox import MoxTestBase, IsA + +from slimta.lookup.drivers.dict import DictLookup + + +class TestDictLookup(MoxTestBase): + + def setUp(self): + super(TestDictLookup, self).setUp() + test = {'test one two': 1, 'test three four': 2} + self.drv = DictLookup(test, 'test {a} {b}') + + def test_lookup_miss(self): + self.assertEqual(None, self.drv.lookup(a='one', b='four')) + self.assertEqual(None, self.drv.lookup(a='three', b='two')) + + def test_lookup_hit(self): + self.assertEqual(1, self.drv.lookup(a='one', b='two')) + self.assertEqual(2, self.drv.lookup(a='three', b='four')) + + def test_lookup_address(self): + test = {'test one': 1, 'test two@example.com': 2} + drv = DictLookup(test, 'test {address}') + self.assertEqual(1, drv.lookup_address('one')) + self.assertEqual(2, drv.lookup_address('two@example.com')) + self.assertEqual(None, drv.lookup_address('three')) + + def test_lookup_address_domain(self): + test = {'test one.com': 1} + drv = DictLookup(test, 'test {domain}') + self.assertEqual(1, drv.lookup_address('test@one.com')) + self.assertEqual(None, drv.lookup_address('test@two.com')) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_lookup_policy.py b/test/test_slimta_lookup_policy.py new file mode 100644 index 00000000..736496de --- /dev/null +++ b/test/test_slimta_lookup_policy.py @@ -0,0 +1,64 @@ + +from mox3.mox import MoxTestBase, IsA + +from slimta.envelope import Envelope +from slimta.lookup.policy import LookupPolicy +from slimta.lookup.drivers.dict import DictLookup + + +class TestDictLookup(MoxTestBase): + + def setUp(self): + super(TestDictLookup, self).setUp() + self.data = {} + self.address_policy = LookupPolicy(DictLookup(self.data, '{address}'), True, True) + self.domain_policy = LookupPolicy(DictLookup(self.data, '{domain}'), True, True) + + def test_verp(self): + self.data['sender@example.com'] = {'verp': 'verp.com'} + self.data['rcpt2@example.com'] = {'verp': 'verp.com'} + env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com']) + self.address_policy.apply(env) + self.assertEquals('sender=example.com@verp.com', env.sender) + self.assertEquals(['rcpt1@example.com', 'rcpt2=example.com@verp.com'], env.recipients) + + def test_alias(self): + self.data['sender@example.com'] = {'alias': 'sender@other.com'} + self.data['rcpt2@example.com'] = {'alias': 'other.com'} + env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com']) + self.address_policy.apply(env) + self.assertEquals('sender@other.com', env.sender) + self.assertEquals(['rcpt1@example.com', 'rcpt2@other.com'], env.recipients) + + def test_alias_domain(self): + self.data['example.com'] = {'alias': 'other.com'} + env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com']) + self.domain_policy.apply(env) + self.assertEquals('sender@other.com', env.sender) + self.assertEquals(['rcpt1@other.com', 'rcpt2@other.com'], env.recipients) + + def test_alias_rewrite(self): + self.data['sender@example.com'] = {'alias': 'test+{localpart}@other.com'} + self.data['rcpt2@example.com'] = {'alias': 'test@{domain}'} + env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com']) + self.address_policy.apply(env) + self.assertEquals('test+sender@other.com', env.sender) + self.assertEquals(['rcpt1@example.com', 'test@example.com'], env.recipients) + + def test_alias_domain_rewrite(self): + self.data['example.com'] = {'alias': 'test+{localpart}@other.com'} + env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com']) + self.domain_policy.apply(env) + self.assertEquals('test+sender@other.com', env.sender) + self.assertEquals(['test+rcpt1@other.com', 'test+rcpt2@other.com'], env.recipients) + + def test_add_headers(self): + self.data['sender@example.com'] = {'add_headers': '{"X-Test-A": "one"}'} + self.data['rcpt2@example.com'] = {'add_headers': '{"X-Test-B": "two"}'} + env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com']) + env.parse(b"""\n\n""") + self.address_policy.apply(env) + self.assertEquals('one', env.headers['x-test-a']) + self.assertEquals('two', env.headers['x-test-b']) + +# vim:et:fdm=marker:sts=4:sw=4:ts=4:tw=0 diff --git a/test/test_slimta_lookup_redis.py b/test/test_slimta_lookup_redis.py new file mode 100644 index 00000000..1919fc23 --- /dev/null +++ b/test/test_slimta_lookup_redis.py @@ -0,0 +1,40 @@ + +from redis import StrictRedis +from mox3.mox import MoxTestBase, IsA + +from slimta.lookup.drivers.redis import RedisLookup + + +class TestRedisLookup(MoxTestBase): + + def setUp(self): + super(TestRedisLookup, self).setUp() + self.drv = RedisLookup('test {a} {b}') + self.drv.redis = self.mox.CreateMock(StrictRedis) + self.drv_hash = RedisLookup('test {a} {b}', use_hash=True) + self.drv_hash.redis = self.mox.CreateMock(StrictRedis) + + def test_lookup_miss(self): + self.drv.redis.get('test one two').AndReturn(None) + self.mox.ReplayAll() + self.assertEqual(None, self.drv.lookup(a='one', b='two')) + + def test_lookup_hit(self): + self.drv.redis.get('test one two').AndReturn('{"test": "pass"}') + self.mox.ReplayAll() + expected = {"test": "pass"} + self.assertEqual(expected, self.drv.lookup(a='one', b='two')) + + def test_hash_lookup_miss(self): + self.drv_hash.redis.hgetall('test one two').AndReturn(None) + self.mox.ReplayAll() + self.assertEqual(None, self.drv_hash.lookup(a='one', b='two')) + + def test_hash_lookup_hit(self): + self.drv_hash.redis.hgetall('test one two').AndReturn({'test': 'pass'}) + self.mox.ReplayAll() + expected = {"test": "pass"} + self.assertEqual(expected, self.drv_hash.lookup(a='one', b='two')) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_lookup_regex.py b/test/test_slimta_lookup_regex.py new file mode 100644 index 00000000..c223469e --- /dev/null +++ b/test/test_slimta_lookup_regex.py @@ -0,0 +1,38 @@ + +from mox3.mox import MoxTestBase + +from slimta.lookup.drivers.regex import RegexLookup + + +class TestRegexLookup(MoxTestBase): + + def setUp(self): + super(TestRegexLookup, self).setUp() + self.drv = RegexLookup('test {a} {b}') + self.drv.add_regex(r'^test [0-9]+ [a-zA-Z]+$', 1) + self.drv.add_regex(r'^test [a-zA-Z]+ [0-9]+$', 2) + + def test_lookup_miss(self): + self.assertEqual(None, self.drv.lookup(a='abc', b='def')) + self.assertEqual(None, self.drv.lookup(a='123', b='456')) + + def test_lookup_hit(self): + self.assertEqual(1, self.drv.lookup(a='123', b='abc')) + self.assertEqual(2, self.drv.lookup(a='def', b='456')) + + def test_lookup_address(self): + drv = RegexLookup('test {address}') + drv.add_regex(r'^test one$', 1) + drv.add_regex(r'^test two@example.com$', 2) + self.assertEqual(1, drv.lookup_address('one')) + self.assertEqual(2, drv.lookup_address('two@example.com')) + self.assertEqual(None, drv.lookup_address('three')) + + def test_lookup_address_domain(self): + drv = RegexLookup('test {domain}') + drv.add_regex(r'^test one.com$', 1) + self.assertEqual(1, drv.lookup_address('test@one.com')) + self.assertEqual(None, drv.lookup_address('test@two.com')) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_queue_disk.py b/test/test_slimta_queue_disk.py new file mode 100644 index 00000000..1eaf9f5b --- /dev/null +++ b/test/test_slimta_queue_disk.py @@ -0,0 +1,119 @@ + +import pytest +_ = pytest.importorskip('pyaio') + +import os +import unittest +import re +from tempfile import mkdtemp +from shutil import rmtree + +from slimta.diskstorage import DiskStorage +from slimta.envelope import Envelope + + +class TestDiskStorage(unittest.TestCase): + + id_pattern = re.compile(r'[0-9a-fA-F]{32}') + + def setUp(self): + self.env_dir = mkdtemp() + self.meta_dir = mkdtemp() + self.tmp_dir = mkdtemp() + self.disk = DiskStorage(self.env_dir, self.meta_dir, self.tmp_dir) + + def tearDown(self): + rmtree(self.env_dir) + rmtree(self.meta_dir) + rmtree(self.tmp_dir) + + def _write_test_envelope(self, rcpts=None): + env = Envelope('sender@example.com', rcpts or ['rcpt@example.com']) + env.timestamp = 9876543210 + id = self.disk.write(env, 1234567890) + return id, env + + def test_tmp_cleanup(self): + id, env = self._write_test_envelope() + self.assertEqual([], os.listdir(self.tmp_dir)) + + def test_write(self): + id, env = self._write_test_envelope() + + written_env = self.disk.ops.read_env(id) + written_meta = self.disk.ops.read_meta(id) + self.assertTrue(self.id_pattern.match(id)) + self.assertEqual(vars(env), vars(written_env)) + self.assertEqual(1234567890, written_meta['timestamp']) + self.assertEqual(0, written_meta['attempts']) + self.assertEqual('sender@example.com', written_env.sender) + self.assertEqual(['rcpt@example.com'], written_env.recipients) + self.assertEqual(9876543210, written_env.timestamp) + + def test_set_timestamp(self): + id, env = self._write_test_envelope() + self.disk.set_timestamp(id, 1111) + + written_env = self.disk.ops.read_env(id) + written_meta = self.disk.ops.read_meta(id) + self.assertEqual(vars(env), vars(written_env)) + self.assertEqual(1111, written_meta['timestamp']) + + def test_increment_attempts(self): + id, env = self._write_test_envelope() + self.assertEqual(1, self.disk.increment_attempts(id)) + self.assertEqual(2, self.disk.increment_attempts(id)) + + written_env = self.disk.ops.read_env(id) + written_meta = self.disk.ops.read_meta(id) + self.assertEqual(vars(env), vars(written_env)) + self.assertEqual(2, written_meta['attempts']) + + def test_set_recipients_delivered(self): + id, env = self._write_test_envelope() + self.disk.set_recipients_delivered(id, [1]) + self.disk.set_recipients_delivered(id, [3]) + + written_env = self.disk.ops.read_env(id) + written_meta = self.disk.ops.read_meta(id) + self.assertEqual(vars(env), vars(written_env)) + self.assertEqual([1, 3], written_meta['delivered_indexes']) + + def test_load(self): + queued = [self._write_test_envelope(), + self._write_test_envelope()] + loaded = [info for info in self.disk.load()] + self.assertEqual(len(queued), len(loaded)) + for timestamp, loaded_id in loaded: + for queued_id, env in queued: + if loaded_id == queued_id: + written_env = self.disk.ops.read_env(loaded_id) + written_meta = self.disk.ops.read_meta(loaded_id) + self.assertEqual(vars(env), vars(written_env)) + self.assertEqual(timestamp, written_meta['timestamp']) + break + else: + raise ValueError('Queued does not match loaded') + + def test_get(self): + id, env = self._write_test_envelope(['rcpt1@example.com', + 'rcpt2@example.com']) + self.disk.increment_attempts(id) + self.disk.set_recipients_delivered(id, [0]) + get_env, get_attempts = self.disk.get(id) + self.assertEqual('sender@example.com', get_env.sender) + self.assertEqual(['rcpt2@example.com'], get_env.recipients) + self.assertEqual(1, get_attempts) + + def test_remove(self): + id, env = self._write_test_envelope() + self.disk.remove(id) + id, env = self._write_test_envelope() + self.disk.ops.delete_env(id) + self.disk.remove(id) + id, env = self._write_test_envelope() + self.disk.ops.delete_meta(id) + self.disk.remove(id) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_redisstorage.py b/test/test_slimta_redisstorage.py new file mode 100644 index 00000000..0c6e1643 --- /dev/null +++ b/test/test_slimta_redisstorage.py @@ -0,0 +1,110 @@ + +import re + +from mox3.mox import MoxTestBase, IsA, Func +from six.moves import cPickle +from redis import StrictRedis + +from slimta.redisstorage import RedisStorage +from slimta.envelope import Envelope + +id_pattern = re.compile(r'^[0-9a-fA-F]{32}$') + + +def _is_id(string): + return id_pattern.match(string) + + +def _is_prefixed_id(string): + if string.startswith('test:'): + return _is_id(string[5:]) + + +class TestRedisStorage(MoxTestBase): + + def setUp(self): + super(TestRedisStorage, self).setUp() + self.storage = RedisStorage(prefix='test:') + self.storage.redis = self.mox.CreateMock(StrictRedis) + + def _write_test_envelope(self, rcpts=None): + return id, env + + def test_write(self): + self.storage.redis.hsetnx(Func(_is_prefixed_id), 'envelope', IsA(bytes)).AndReturn(0) + self.storage.redis.hsetnx(Func(_is_prefixed_id), 'envelope', IsA(bytes)).AndReturn(1) + pipe = self.mox.CreateMockAnything() + self.storage.redis.pipeline().AndReturn(pipe) + def _verify_hmset(val): + self.assertEqual(1234567890, val['timestamp']) + self.assertEqual(0, val['attempts']) + self.assertFalse('envelope' in val) + return True + pipe.hmset(Func(_is_prefixed_id), Func(_verify_hmset)) + def _verify_rpush(val): + timestamp, id = cPickle.loads(val) + self.assertEqual(1234567890, timestamp) + self.assertTrue(_is_id(id)) + return True + pipe.rpush('test:queue', Func(_verify_rpush)) + pipe.execute() + self.mox.ReplayAll() + env = Envelope('sender@example.com', ['rcpt@example.com']) + env.timestamp = 9876543210 + self.storage.write(env, 1234567890) + + def test_set_timestamp(self): + self.storage.redis.hset('test:asdf', 'timestamp', 1111) + self.mox.ReplayAll() + self.storage.set_timestamp('asdf', 1111) + + def test_increment_attempts(self): + self.storage.redis.hincrby('test:asdf', 'attempts', 1).AndReturn(1) + self.storage.redis.hincrby('test:asdf', 'attempts', 1).AndReturn(2) + self.mox.ReplayAll() + self.assertEqual(1, self.storage.increment_attempts('asdf')) + self.assertEqual(2, self.storage.increment_attempts('asdf')) + + def test_get(self): + env = Envelope('sender@example.com', ['rcpt1@example.com', 'rcpt2@example.com']) + envelope_raw = cPickle.dumps(env) + delivered_indexes_raw = cPickle.dumps([0]) + self.storage.redis.hmget('test:asdf', 'envelope', 'attempts', 'delivered_indexes').AndReturn((envelope_raw, 13, delivered_indexes_raw)) + self.mox.ReplayAll() + get_env, attempts = self.storage.get('asdf') + self.assertEqual('sender@example.com', get_env.sender) + self.assertEqual(['rcpt2@example.com'], get_env.recipients) + self.assertEqual(13, attempts) + + def test_get_missing(self): + self.storage.redis.hmget('test:asdf', 'envelope', 'attempts', 'delivered_indexes').AndReturn((None, None, None)) + self.mox.ReplayAll() + self.assertRaises(KeyError, self.storage.get, 'asdf') + + def test_load(self): + self.storage.redis.keys('test:*').AndReturn(['test:one', 'test:two', 'test:three']) + self.storage.redis.hget('test:one', 'timestamp').AndReturn(123) + self.storage.redis.hget('test:two', 'timestamp').AndReturn(456) + self.storage.redis.hget('test:three', 'timestamp').AndReturn(789) + self.mox.ReplayAll() + expected = [(123, 'one'), (456, 'two'), (789, 'three')] + self.assertEqual(expected, list(self.storage.load())) + + def test_remove(self): + self.storage.redis.delete('test:asdf') + self.mox.ReplayAll() + self.storage.remove('asdf') + + def test_wait(self): + ret = cPickle.dumps((1234567890, 'asdf')) + self.storage.redis.blpop(['test:queue'], 0).AndReturn(('test:queue', ret)) + self.mox.ReplayAll() + self.assertEqual([(1234567890, 'asdf')], self.storage.wait()) + + def test_wait_none(self): + self.storage.redis.blpop(['test:queue'], 0).AndReturn(None) + self.mox.ReplayAll() + self.assertFalse(self.storage.wait()) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4 diff --git a/test/test_slimta_util_spf.py b/test/test_slimta_util_spf.py new file mode 100644 index 00000000..3c9a4308 --- /dev/null +++ b/test/test_slimta_util_spf.py @@ -0,0 +1,117 @@ + +import unittest +import threading + +from mox3.mox import MoxTestBase, IsA +import gevent.monkey +import spf + +from slimta.envelope import Envelope +from slimta.smtp.reply import Reply +from slimta.util.spf import EnforceSpf + +gevent.monkey.patch_all() + + +class TestEnforceSpf(MoxTestBase): + + def setUp(self): + super(TestEnforceSpf, self).setUp() + self.mox.StubOutWithMock(spf, 'check2') + + def test_bad_result_type(self): + espf = EnforceSpf() + self.assertRaises(ValueError, espf.set_enforcement, 'asdf') + espf.set_enforcement('PASS') + espf.set_enforcement('pass') + + def test_no_policy_match(self): + espf = EnforceSpf() + espf.set_enforcement('fail', match_code='550') + class TestSession(object): + address = ('1.2.3.4', 56789) + envelope = None + ehlo_as = 'testehlo' + class TestValidators(object): + def __init__(self): + self.session = TestSession() + @espf.check + def validate_mail(self, reply, sender): + pass + + spf.check2(i='1.2.3.4', s='sender@example.com', h='testehlo').AndReturn(('none', 'the reason')) + self.mox.ReplayAll() + validators = TestValidators() + reply = Reply('250', '2.0.0 Ok') + validators.validate_mail(reply, 'sender@example.com') + self.assertEqual('250', reply.code) + self.assertEqual('2.0.0 Ok', reply.message) + + def test_policy_match(self): + espf = EnforceSpf() + espf.set_enforcement('fail', match_code='550') + class TestSession(object): + address = ('1.2.3.4', 56789) + envelope = None + ehlo_as = 'testehlo' + class TestValidators(object): + def __init__(self): + self.session = TestSession() + @espf.check + def validate_mail(self, reply, sender): + pass + + spf.check2(i='1.2.3.4', s='sender@example.com', h='testehlo').AndReturn(('fail', 'the reason')) + self.mox.ReplayAll() + validators = TestValidators() + reply = Reply('250', '2.0.0 Ok') + validators.validate_mail(reply, 'sender@example.com') + self.assertEqual('550', reply.code) + self.assertEqual('5.7.1 Access denied', reply.message) + + def test_on_validate_rcpt(self): + espf = EnforceSpf() + espf.set_enforcement('fail', match_code='550') + class TestSession(object): + address = ('1.2.3.4', 56789) + envelope = Envelope('sender@example.com') + ehlo_as = 'testehlo' + class TestValidators(object): + def __init__(self): + self.session = TestSession() + @espf.check + def validate_rcpt(self, reply, recipient): + pass + + spf.check2(i='1.2.3.4', s='sender@example.com', h='testehlo').AndReturn(('fail', 'the reason')) + self.mox.ReplayAll() + validators = TestValidators() + reply = Reply('250', '2.0.0 Ok') + validators.validate_rcpt(reply, 'asdf') + self.assertEqual('550', reply.code) + self.assertEqual('5.7.1 Access denied', reply.message) + + def test_reason_in_message(self): + espf = EnforceSpf() + espf.set_enforcement('pass', match_code='250', match_message='{reason}') + class TestSession(object): + address = ('1.2.3.4', 56789) + envelope = None + ehlo_as = 'testehlo' + class TestValidators(object): + def __init__(self): + self.session = TestSession() + @espf.check + def validate_mail(self, reply, sender): + pass + + spf.check2(i='1.2.3.4', s='sender@example.com', h='testehlo').AndReturn(('pass', 'the reason')) + self.mox.ReplayAll() + validators = TestValidators() + reply = Reply('250', '2.0.0 Ok') + validators.validate_mail(reply, 'sender@example.com') + self.assertEqual('250', reply.code) + self.assertEqual('2.0.0 the reason', reply.message) + + +# vim:et:fdm=marker:sts=4:sw=4:ts=4