diff --git a/poetry.lock b/poetry.lock index 53b51a5..4c10431 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,53 @@ # This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +[[package]] +name = "aiosmtpd" +version = "1.4.4.post2" +description = "aiosmtpd - asyncio based SMTP server" +optional = true +python-versions = "~=3.7" +files = [ + {file = "aiosmtpd-1.4.4.post2-py3-none-any.whl", hash = "sha256:f821fe424b703b2ea391dc2df11d89d2afd728af27393e13cf1a3530f19fdc5e"}, + {file = "aiosmtpd-1.4.4.post2.tar.gz", hash = "sha256:f9243b7dfe00aaf567da8728d891752426b51392174a34d2cf5c18053b63dcbc"}, +] + +[package.dependencies] +atpublic = "*" +attrs = "*" +typing-extensions = {version = "*", markers = "python_version < \"3.8\""} + +[[package]] +name = "atpublic" +version = "3.1.2" +description = "Keep all y'all's __all__'s in sync" +optional = true +python-versions = ">=3.7" +files = [ + {file = "atpublic-3.1.2-py3-none-any.whl", hash = "sha256:53801cb5512a020aeeea3bf461bd67fc671b5ee82ba6f7bddd91c1b54a88a80a"}, + {file = "atpublic-3.1.2.tar.gz", hash = "sha256:88ff77dde0ecd921bb7a31f914faaf8b10fec0478bf4a8998f3be9c5ca1b47da"}, +] + +[[package]] +name = "attrs" +version = "23.1.0" +description = "Classes Without Boilerplate" +optional = true +python-versions = ">=3.7" +files = [ + {file = "attrs-23.1.0-py3-none-any.whl", hash = "sha256:1f28b4522cdc2fb4256ac1a020c78acf9cba2c6b461ccd2c126f3aa8e8335d04"}, + {file = "attrs-23.1.0.tar.gz", hash = "sha256:6279836d581513a26f1bf235f9acd333bc9115683f14f7e8fae46c98fc50e015"}, +] + +[package.dependencies] +importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} + +[package.extras] +cov = ["attrs[tests]", "coverage[toml] (>=5.3)"] +dev = ["attrs[docs,tests]", "pre-commit"] +docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier", "zope-interface"] +tests = ["attrs[tests-no-zope]", "zope-interface"] +tests-no-zope = ["cloudpickle", "hypothesis", "mypy (>=1.1.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] + [[package]] name = "black" version = "23.3.0" @@ -1824,4 +1872,4 @@ test = ["black", "flake8", "isort", "pytest", "pytest-cov"] [metadata] lock-version = "2.0" python-versions = "^3.7" -content-hash = "4eb32310369d0bbbd6e28608396ee246882e8334780179ad83225bb1bff2e9a3" +content-hash = "089aedbdce59ff7e8337d45eaa7446020b9dfd4a504ae08387a82506db28d166" diff --git a/pyproject.toml b/pyproject.toml index c2a8ec4..2385a0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ mkdocs-autorefs = {version = "*", optional = true} pre-commit = {version = "*", optional = true} toml = {version = "*", optional = true} bump2version = {version = "*", optional = true} +aiosmtpd = {version = "^1.4.4.post2", optional = true} [tool.poetry.extras] test = [ @@ -53,7 +54,8 @@ test = [ "black", "isort", "flake8", - "pytest-cov" + "pytest-cov", + "aiosmtpd" ] dev = ["tox", "pre-commit", "virtualenv", "pip", "twine", "toml", "bump2version"] diff --git a/tests/__init__.py b/tests/__init__.py index 447719e..cbc3c43 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,4 @@ +from email import message_from_bytes import unittest from contextlib import contextmanager @@ -12,7 +13,7 @@ class TestCase(unittest.TestCase): MAIL_DEFAULT_SENDER = "support@mysite.com" def setUp(self): - self.app = Flask(__name__) + self.app = Flask(import_name=__name__) self.app.config.from_object(self) self.assertTrue(self.app.testing) self.mail = Mail(self.app) @@ -63,3 +64,46 @@ def assertIsNotNone(self, obj, msg=None): @pytest.fixture(autouse=True) def capsys(self, capsys): self.capsys = capsys + + +class MailmanCustomizedTestCase(TestCase): + def assertMessageHasHeaders(self, message, headers): + """ + Asserts that the `message` has all `headers`. + + message: can be an instance of an email.Message subclass or a string + with the contents of an email message. + headers: should be a set of (header-name, header-value) tuples. + """ + if isinstance(message, bytes): + message = message_from_bytes(message) + msg_headers = set(message.items()) + self.assertTrue( + headers.issubset(msg_headers), + msg="Message is missing " "the following headers: %s" % (headers - msg_headers), + ) + + def get_decoded_attachments(self, message): + """ + Encode the specified EmailMessage, then decode + it using Python's email.parser module and, for each attachment of the + message, return a list of tuples with (filename, content, mimetype). + """ + msg_bytes = message.message().as_bytes() + email_message = message_from_bytes(msg_bytes) + + def iter_attachments(): + for i in email_message.walk(): + if i.get_content_disposition() == "attachment": + filename = i.get_filename() + content = i.get_payload(decode=True) + mimetype = i.get_content_type() + yield filename, content, mimetype + + return list(iter_attachments()) + + def get_message(self): + return self.mail.outbox[0].message() + + def flush_mailbox(self): + self.mail.outbox.clear() diff --git a/tests/attachments/file.eml b/tests/attachments/file.eml new file mode 100644 index 0000000..b5b23c1 --- /dev/null +++ b/tests/attachments/file.eml @@ -0,0 +1,44 @@ +MIME-Version: 1.0 +Received: by 10.220.191.194 with HTTP; Wed, 11 May 2011 12:27:12 -0700 (PDT) +Date: Wed, 11 May 2011 13:27:12 -0600 +Delivered-To: jncjkq@gmail.com +Message-ID: +Subject: Test +From: Bill Jncjkq +To: bookmarks@jncjkq.net +Content-Type: multipart/mixed; boundary=bcaec54eecc63acce904a3050f79 + +--bcaec54eecc63acce904a3050f79 +Content-Type: multipart/alternative; boundary=bcaec54eecc63acce604a3050f77 + +--bcaec54eecc63acce604a3050f77 +Content-Type: text/plain; charset=ISO-8859-1 + +-- +Bill Jncjkq + +--bcaec54eecc63acce604a3050f77 +Content-Type: text/html; charset=ISO-8859-1 + +
--
Bill Jncjkq
+ +--bcaec54eecc63acce604a3050f77-- +--bcaec54eecc63acce904a3050f79 +Content-Type: text/html; charset=US-ASCII; name="bookmarks-really-short.html" +Content-Disposition: attachment; filename="bookmarks-really-short.html" +Content-Transfer-Encoding: base64 +X-Attachment-Id: f_gnknv6u70 + +PCFET0NUWVBFIE5FVFNDQVBFLUJvb2ttYXJrLWZpbGUtMT4KCTxIVE1MPgoJPE1FVEEgSFRUUC1F +UVVJVj0iQ29udGVudC1UeXBlIiBDT05URU5UPSJ0ZXh0L2h0bWw7IGNoYXJzZXQ9VVRGLTgiPgoJ +PFRpdGxlPkJvb2ttYXJrczwvVGl0bGU+Cgk8SDE+Qm9va21hcmtzPC9IMT4KCQk8RFQ+PEgzIEZP +TERFRD5UZWNoIE5ld3M8L0gzPgoJCTxETD48cD4KCQkJPERUPjxBIEhSRUY9Imh0dHA6Ly93d3cu +Y25ldC5jb20vIj5DTmV0PC9BPgoJCQk8RFQ+PEEgSFJFRj0iaHR0cDovL3d3dy53aXJlZC5jb20v +Ij5XaXJlZCBOZXdzPC9BPgoJCTwvREw+PHA+CgkJPERUPjxIMyBGT0xERUQ+VG9vbHMgYW5kIFJl +ZmVyZW5jZTwvSDM+CgkJPERMPjxwPgoJCQk8RFQ+PEEgSFJFRj0iaHR0cDovL3d3dy5tb25zdGVy +LmNvbS8iPk1vbnN0ZXIuY29tPC9BPgoJCQk8RFQ+PEEgSFJFRj0iaHR0cDovL3d3dy53ZWJtZC5j +b20vIj5XZWJNRDwvQT4KCQk8L0RMPjxwPgoJCTxEVD48SDMgRk9MREVEPlRyYXZlbDwvSDM+CgkJ +PERMPjxwPgoJCQk8RFQ+PEEgSFJFRj0iaHR0cDovL2ZvZG9ycy5jb20vIj5Gb2RvcnM8L0E+CgkJ +CTxEVD48QSBIUkVGPSJodHRwOi8vd3d3LnRyYXZlbG9jaXR5LmNvbS8iPlRyYXZlbG9jaXR5PC9B +PgoJCTwvREw+PHA+Cgk8L0RMPjxwPgo8L0hUTUw+ +--bcaec54eecc63acce904a3050f79-- \ No newline at end of file diff --git a/tests/attachments/file.png b/tests/attachments/file.png new file mode 100644 index 0000000..85225ca Binary files /dev/null and b/tests/attachments/file.png differ diff --git a/tests/attachments/file.txt b/tests/attachments/file.txt new file mode 100644 index 0000000..a0722bd --- /dev/null +++ b/tests/attachments/file.txt @@ -0,0 +1 @@ +flask/flask \ No newline at end of file diff --git a/tests/attachments/file_png b/tests/attachments/file_png new file mode 100644 index 0000000..85225ca Binary files /dev/null and b/tests/attachments/file_png differ diff --git a/tests/attachments/file_png.txt b/tests/attachments/file_png.txt new file mode 100644 index 0000000..85225ca Binary files /dev/null and b/tests/attachments/file_png.txt differ diff --git a/tests/attachments/file_txt b/tests/attachments/file_txt new file mode 100644 index 0000000..a0722bd --- /dev/null +++ b/tests/attachments/file_txt @@ -0,0 +1 @@ +flask/flask \ No newline at end of file diff --git a/tests/attachments/file_txt.png b/tests/attachments/file_txt.png new file mode 100644 index 0000000..b884222 --- /dev/null +++ b/tests/attachments/file_txt.png @@ -0,0 +1 @@ +django/django \ No newline at end of file diff --git a/tests/test_backend.py b/tests/test_backend.py index e932828..425be08 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,15 +1,79 @@ +from email.header import Header +from email.utils import parseaddr +import os +import socket import tempfile -from pathlib import Path -from unittest.mock import patch - import pytest +from pathlib import Path +from unittest.mock import Mock, patch +from smtplib import SMTP, SMTPException +from email import message_from_binary_file, message_from_bytes +from io import StringIO from flask_mailman import EmailMessage from flask_mailman.backends import locmem, smtp -from tests import TestCase +from tests import MailmanCustomizedTestCase +from aiosmtpd.controller import Controller + + +class SMTPHandler: + def __init__(self, *args, **kwargs): + self.mailbox = [] + + async def handle_DATA(self, server, session, envelope): + data = envelope.content + mail_from = envelope.mail_from + + message = message_from_bytes(data.rstrip()) + message_addr = parseaddr(message.get("from"))[1] + if mail_from != message_addr: + # According to the spec, mail_from does not necessarily match the + # From header - this is the case where the local part isn't + # encoded, so try to correct that. + lp, domain = mail_from.split("@", 1) + lp = Header(lp, "utf-8").encode() + mail_from = "@".join([lp, domain]) + + if mail_from != message_addr: + return f"553 '{mail_from}' != '{message_addr}'" + self.mailbox.append(message) + return "250 OK" + + def flush_mailbox(self): + self.mailbox[:] = [] + +class SmtpdContext: + def __init__(self, mailman): + self.mailman = mailman + + def __enter__(self): + # Find a free port. + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + self.smtp_handler = SMTPHandler() + self.smtp_controller = Controller( + self.smtp_handler, + hostname="127.0.0.1", + port=port, + ) + self.mailman.port = port + self.smtp_controller.start() + + def __exit__(self, *args): + self.smtp_controller.stop() + + +class TestBackend(MailmanCustomizedTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + + @classmethod + def stop_smtp(cls): + cls.smtp_controller.stop() -class TestBackend(TestCase): def test_console_backend(self): self.app.extensions['mailman'].backend = 'console' msg = EmailMessage( @@ -23,6 +87,34 @@ def test_console_backend(self): assert "testing" in captured.out assert "To: to@example.com" in captured.out + def test_console_stream_kwarg(self): + """ + The console backend can be pointed at an arbitrary stream. + """ + self.app.extensions['mailman'].backend = 'console' + s = StringIO() + connection = self.mail.get_connection(stream=s) + self.mail.send_mail( + "Subject", + "Content", + "from@example.com", + ["to@example.com"], + connection=connection, + ) + message = s.getvalue().split("\n" + ("-" * 79) + "\n")[0].encode() + self.assertMessageHasHeaders( + message, + { + ("MIME-Version", "1.0"), + ("Content-Type", 'text/plain; charset="utf-8"'), + ("Content-Transfer-Encoding", "7bit"), + ("Subject", "Subject"), + ("From", "from@example.com"), + ("To", "to@example.com"), + }, + ) + self.assertIn(b"\nDate: ", message) + def test_dummy_backend(self): self.app.extensions['mailman'].backend = 'dummy' msg = EmailMessage( @@ -49,6 +141,45 @@ def test_file_backend(self): assert wrote_file.is_file() assert "To: to@example.com" in wrote_file.read_text() + def test_file_sessions(self): + """Make sure opening a connection creates a new file""" + with tempfile.TemporaryDirectory() as tempdir: + self.app.extensions['mailman'].backend = 'file' + self.app.extensions['mailman'].file_path = tempdir + msg = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + connection = self.mail.get_connection() + connection.send_messages([msg]) + + self.assertEqual(len(os.listdir(tempdir)), 1) + with open(os.path.join(tempdir, os.listdir(tempdir)[0]), "rb") as fp: + message = message_from_binary_file(fp) + self.assertEqual(message.get_content_type(), "text/plain") + self.assertEqual(message.get("subject"), "Subject") + self.assertEqual(message.get("from"), "from@example.com") + self.assertEqual(message.get("to"), "to@example.com") + + connection2 = self.mail.get_connection() + connection2.send_messages([msg]) + self.assertEqual(len(os.listdir(tempdir)), 2) + + connection.send_messages([msg]) + self.assertEqual(len(os.listdir(tempdir)), 2) + + msg.connection = self.mail.get_connection() + self.assertTrue(connection.open()) + msg.send() + self.assertEqual(len(os.listdir(tempdir)), 3) + msg.send() + self.assertEqual(len(os.listdir(tempdir)), 3) + + connection.close() + def test_locmem_backend(self): self.app.extensions['mailman'].backend = 'locmem' msg = EmailMessage( @@ -65,6 +196,24 @@ def test_locmem_backend(self): self.assertEqual(sent_msg.body, "testing") self.assertEqual(sent_msg.from_email, self.app.extensions["mailman"].default_sender) + def test_locmem_shared_messages(self): + """ + Make sure that the locmen backend populates the outbox. + """ + self.app.extensions['mailman'].backend = 'locmem' + connection = locmem.EmailBackend() + connection2 = locmem.EmailBackend() + email = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + connection.send_messages([email]) + connection2.send_messages([email]) + self.assertEqual(len(self.mail.outbox), 2) + def test_smtp_backend(self): self.app.extensions['mailman'].backend = 'smtp' msg = EmailMessage( @@ -77,6 +226,225 @@ def test_smtp_backend(self): mock_send_fn.return_value = 66 assert msg.send() == 66 + def test_email_authentication_use_settings(self): + mailman = self.app.extensions['mailman'] + mailman.username = 'not empty username' + mailman.password = 'not empty password' + + backend = smtp.EmailBackend() + self.assertEqual(backend.username, "not empty username") + self.assertEqual(backend.password, "not empty password") + + def test_email_disabled_authentication(self): + mailman = self.app.extensions['mailman'] + mailman.username = 'not empty username' + mailman.password = 'not empty password' + + backend = smtp.EmailBackend(username="", password="") + self.assertEqual(backend.username, "") + self.assertEqual(backend.password, "") + + def test_auth_attempted(self): + """ + Opening the backend with non empty username/password tries + to authenticate against the SMTP server. + """ + with SmtpdContext(self.app.extensions['mailman']): + with self.assertRaises(SMTPException, msg="SMTP AUTH extension not supported by server."): + with smtp.EmailBackend(username="not empty username", password="not empty password"): + pass + + def test_server_open(self): + """ + open() returns whether it opened a connection. + """ + with SmtpdContext(self.app.extensions['mailman']): + backend = smtp.EmailBackend(username="", password="") + self.assertIsNone(backend.connection) + opened = backend.open() + backend.close() + self.assertIs(opened, True) + + def test_reopen_connection(self): + backend = smtp.EmailBackend() + # Simulate an already open connection. + backend.connection = Mock(spec=object()) + self.assertIs(backend.open(), False) + + def test_email_tls_use_settings(self): + self.app.extensions['mailman'].use_tls = True + backend = smtp.EmailBackend() + self.assertTrue(backend.use_tls) + + def test_email_tls_override_settings(self): + self.app.extensions['mailman'].use_tls = True + backend = smtp.EmailBackend(use_tls=False) + self.assertFalse(backend.use_tls) + + def test_email_tls_default_disabled(self): + backend = smtp.EmailBackend() + self.assertFalse(backend.use_tls) + + def test_ssl_tls_mutually_exclusive(self): + msg = "USE_TLS/USE_SSL are mutually exclusive, so only set " "one of those settings to True." + with self.assertRaises(ValueError, msg=msg): + smtp.EmailBackend(use_ssl=True, use_tls=True) + + def test_email_ssl_use_settings(self): + self.app.extensions['mailman'].use_ssl = True + backend = smtp.EmailBackend() + self.assertTrue(backend.use_ssl) + + def test_email_ssl_override_settings(self): + self.app.extensions['mailman'].use_ssl = True + backend = smtp.EmailBackend(use_ssl=False) + self.assertFalse(backend.use_ssl) + + def test_email_ssl_default_disabled(self): + backend = smtp.EmailBackend() + self.assertFalse(backend.use_ssl) + + def test_email_ssl_certfile_use_settings(self): + self.app.extensions['mailman'].ssl_certfile = "foo" + backend = smtp.EmailBackend() + self.assertEqual(backend.ssl_certfile, "foo") + + def test_email_ssl_certfile_override_settings(self): + self.app.extensions['mailman'].ssl_certfile = "foo" + backend = smtp.EmailBackend(ssl_certfile="bar") + self.assertEqual(backend.ssl_certfile, "bar") + + def test_email_ssl_certfile_default_disabled(self): + backend = smtp.EmailBackend() + self.assertIsNone(backend.ssl_certfile) + + def test_email_ssl_keyfile_use_settings(self): + self.app.extensions['mailman'].ssl_keyfile = "foo" + backend = smtp.EmailBackend() + self.assertEqual(backend.ssl_keyfile, "foo") + + def test_email_ssl_keyfile_override_settings(self): + self.app.extensions['mailman'].ssl_keyfile = "foo" + backend = smtp.EmailBackend(ssl_keyfile="bar") + self.assertEqual(backend.ssl_keyfile, "bar") + + def test_email_ssl_keyfile_default_disabled(self): + backend = smtp.EmailBackend() + self.assertIsNone(backend.ssl_keyfile) + + def test_email_tls_attempts_starttls(self): + with SmtpdContext(self.app.extensions['mailman']): + self.app.extensions['mailman'].use_tls = True + backend = smtp.EmailBackend() + self.assertTrue(backend.use_tls) + with self.assertRaises(SMTPException, msg="STARTTLS extension not supported by server."): + with backend: + pass + + def test_connection_timeout_default(self): + """The connection's timeout value is None by default.""" + self.app.extensions['mailman'].backend = "smtp" + connection = self.mail.get_connection() + self.assertIsNone(connection.timeout) + + def test_connection_timeout_custom(self): + """The timeout parameter can be customized.""" + with SmtpdContext(self.app.extensions['mailman']): + + class MyEmailBackend(smtp.EmailBackend): + def __init__(self, *args, **kwargs): + kwargs.setdefault("timeout", 42) + super().__init__(*args, **kwargs) + + myemailbackend = MyEmailBackend() + myemailbackend.open() + self.assertEqual(myemailbackend.timeout, 42) + self.assertEqual(myemailbackend.connection.timeout, 42) + myemailbackend.close() + + def test_email_timeout_override_settings(self): + self.app.extensions['mailman'].timeout = 10 + backend = smtp.EmailBackend() + self.assertEqual(backend.timeout, 10) + + def test_email_msg_uses_crlf(self): + """Compliant messages are sent over SMTP.""" + with SmtpdContext(self.app.extensions['mailman']): + send = SMTP.send + try: + smtp_messages = [] + + def mock_send(self, s): + smtp_messages.append(s) + return send(self, s) + + SMTP.send = mock_send + + email = EmailMessage("Subject", "Content", "from@example.com", ["to@example.com"]) + self.mail.get_connection(backend='smtp').send_messages([email]) + + # Find the actual message + msg = None + for i, m in enumerate(smtp_messages): + if m[:4] == "data": + msg = smtp_messages[i + 1] + break + + self.assertTrue(msg) + + msg = msg.decode() + # The message only contains CRLF and not combinations of CRLF, LF, and CR. + msg = msg.replace("\r\n", "") + self.assertNotIn("\r", msg) + self.assertNotIn("\n", msg) + + finally: + SMTP.send = send + + def test_send_messages_after_open_failed(self): + """ + send_messages() shouldn't try to send messages if open() raises an + exception after initializing the connection. + """ + backend = smtp.EmailBackend() + # Simulate connection initialization success and a subsequent + # connection exception. + backend.connection = Mock(spec=object()) + backend.open = lambda: None + email = EmailMessage("Subject", "Content", "from@example.com", ["to@example.com"]) + self.assertEqual(backend.send_messages([email]), 0) + + def test_send_messages_empty_list(self): + backend = smtp.EmailBackend() + backend.connection = Mock(spec=object()) + self.assertEqual(backend.send_messages([]), 0) + + def test_send_messages_zero_sent(self): + """A message isn't sent if it doesn't have any recipients.""" + backend = smtp.EmailBackend() + backend.connection = Mock(spec=object()) + email = EmailMessage("Subject", "Content", "from@example.com", to=[]) + sent = backend.send_messages([email]) + self.assertEqual(sent, 0) + + def test_server_stopped(self): + """ + Closing the backend while the SMTP server is stopped doesn't raise an + exception. + """ + with SmtpdContext(self.app.extensions['mailman']): + self.mail.get_connection('smtp').close() + + def test_fail_silently_on_connection_error(self): + """ + A socket connection error is silenced with fail_silently=True. + """ + backend = self.mail.get_connection('smtp') + with self.assertRaises(ConnectionError): + backend.open() + backend.fail_silently = True + backend.open() + def test_invalid_backend(self): self.app.extensions['mailman'].backend = 'unknown' msg = EmailMessage( diff --git a/tests/test_connection.py b/tests/test_connection.py index 1fea692..7d181c5 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -75,3 +75,41 @@ def test_bad_header_subject(self): msg = EmailMessage(subject="testing\n\r", body="testing", to=["to@example.com"]) with pytest.raises(BadHeaderError): msg.send() + + def test_arbitrary_keyword(self): + """ + Make sure that get_connection() accepts arbitrary keyword that might be + used with custom backends. + """ + c = self.mail.get_connection(fail_silently=True, foo="bar") + self.assertTrue(c.fail_silently) + + def test_close_connection(self): + """ + Connection can be closed (even when not explicitly opened) + """ + conn = self.mail.get_connection(username="", password="") + conn.close() + + def test_use_as_contextmanager(self): + """ + The connection can be used as a contextmanager. + """ + opened = [False] + closed = [False] + conn = self.mail.get_connection(username="", password="") + + def open(): + opened[0] = True + + conn.open = open + + def close(): + closed[0] = True + + conn.close = close + with conn as same_conn: + self.assertTrue(opened[0]) + self.assertIs(same_conn, conn) + self.assertFalse(closed[0]) + self.assertTrue(closed[0]) diff --git a/tests/test_global_state.py b/tests/test_global_state.py new file mode 100644 index 0000000..bdfb486 --- /dev/null +++ b/tests/test_global_state.py @@ -0,0 +1,29 @@ +from email.mime.text import MIMEText +from tests import TestCase + + +class PythonGlobalState(TestCase): + """ + UTF-8 text parts shouldn't pollute global email Python package charset registry when + django.mail.message is imported. + """ + + def test_utf8(self): + txt = MIMEText("UTF-8 encoded body", "plain", "utf-8") + self.assertIn("Content-Transfer-Encoding: base64", txt.as_string()) + + def test_7bit(self): + txt = MIMEText("Body with only ASCII characters.", "plain", "utf-8") + self.assertIn("Content-Transfer-Encoding: base64", txt.as_string()) + + def test_8bit_latin(self): + txt = MIMEText("Body with latin characters: àáä.", "plain", "utf-8") + self.assertIn("Content-Transfer-Encoding: base64", txt.as_string()) + + def test_8bit_non_latin(self): + txt = MIMEText( + "Body with non latin characters: А Б В Г Д Е Ж Ѕ З И І К Л М Н О П.", + "plain", + "utf-8", + ) + self.assertIn("Content-Transfer-Encoding: base64", txt.as_string()) diff --git a/tests/test_mail.py b/tests/test_mail.py index e0faaa8..13dcc7f 100644 --- a/tests/test_mail.py +++ b/tests/test_mail.py @@ -1,10 +1,14 @@ -from unittest import mock -from flask_mailman.message import EmailMessage +import mimetypes +import os from flask_mailman.utils import DNS_NAME -from tests import TestCase +from email import charset, message_from_bytes +from email.mime.text import MIMEText +from unittest import mock +from flask_mailman.message import EmailMessage, EmailMultiAlternatives, sanitize_address +from tests import MailmanCustomizedTestCase -class TestMail(TestCase): +class TestMail(MailmanCustomizedTestCase): def test_send_mail(self): self.mail.send_mail( subject="testing", @@ -14,7 +18,7 @@ def test_send_mail(self): ) self.assertEqual(len(self.mail.outbox), 1) sent_msg = self.mail.outbox[0] - self.assertEqual(sent_msg.from_email, self.MAIL_DEFAULT_SENDER) + self.assertEqual(sent_msg.from_email, second=self.MAIL_DEFAULT_SENDER) def test_send_mail_with_tuple_from_email(self): self.mail.send_mail( @@ -52,3 +56,938 @@ def test_non_ascii_dns_non_unicode_email(self, mocked_getfqdn): email = EmailMessage("subject", "content", "from@example.com", ["to@example.com"]) email.encoding = "iso-8859-1" self.assertIn("@xn--p8s937b>", email.message()["Message-ID"]) + + # Following test cases are originally from: + # https://github.com/django/django/tree/main/tests/mail/tests.py + + def test_header_omitted_for_no_to_recipients(self): + message = EmailMessage("Subject", "Content", "from@example.com", cc=["cc@example.com"]).message() + self.assertNotIn("To", message) + + def test_recipients_with_empty_strings(self): + """ + Empty strings in various recipient arguments are always stripped + off the final recipient list. + """ + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com", ""], + cc=["cc@example.com", ""], + bcc=["", "bcc@example.com"], + reply_to=["", None], + ) + self.assertEqual(email.recipients(), ["to@example.com", "cc@example.com", "bcc@example.com"]) + + def test_cc(self): + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com"], + cc=["cc@example.com"], + ) + message = email.message() + self.assertEqual(message["Cc"], "cc@example.com") + self.assertEqual(email.recipients(), ["to@example.com", "cc@example.com"]) + + # Test multiple CC with multiple To + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com", "other@example.com"], + cc=["cc@example.com", "cc.other@example.com"], + ) + message = email.message() + self.assertEqual(message["Cc"], "cc@example.com, cc.other@example.com") + self.assertEqual( + email.recipients(), + [ + "to@example.com", + "other@example.com", + "cc@example.com", + "cc.other@example.com", + ], + ) + + # Testing with Bcc + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com", "other@example.com"], + cc=["cc@example.com", "cc.other@example.com"], + bcc=["bcc@example.com"], + ) + message = email.message() + self.assertEqual(message["Cc"], "cc@example.com, cc.other@example.com") + self.assertEqual( + email.recipients(), + [ + "to@example.com", + "other@example.com", + "cc@example.com", + "cc.other@example.com", + "bcc@example.com", + ], + ) + + def test_cc_headers(self): + message = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["to@example.com"], + cc=["foo@example.com"], + headers={"Cc": "override@example.com"}, + ).message() + self.assertEqual(message["Cc"], "override@example.com") + + def test_cc_in_headers_only(self): + message = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["to@example.com"], + headers={"Cc": "foo@example.com"}, + ).message() + self.assertEqual(message["Cc"], "foo@example.com") + + def test_reply_to(self): + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com"], + reply_to=["reply_to@example.com"], + ) + message = email.message() + self.assertEqual(message["Reply-To"], "reply_to@example.com") + + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com"], + reply_to=["reply_to1@example.com", "reply_to2@example.com"], + ) + message = email.message() + self.assertEqual(message["Reply-To"], "reply_to1@example.com, reply_to2@example.com") + + def test_recipients_as_tuple(self): + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ("to@example.com", "other@example.com"), + cc=("cc@example.com", "cc.other@example.com"), + bcc=("bcc@example.com",), + ) + message = email.message() + self.assertEqual(message["Cc"], "cc@example.com, cc.other@example.com") + self.assertEqual( + email.recipients(), + [ + "to@example.com", + "other@example.com", + "cc@example.com", + "cc.other@example.com", + "bcc@example.com", + ], + ) + + def test_recipients_as_string(self): + with self.assertRaises(TypeError, msg='"to" argument must be a list or tuple'): + EmailMessage(to="foo@example.com") + with self.assertRaises(TypeError, msg='"cc" argument must be a list or tuple'): + EmailMessage(cc="foo@example.com") + with self.assertRaises(TypeError, msg='"bcc" argument must be a list or tuple'): + EmailMessage(bcc="foo@example.com") + with self.assertRaises(TypeError, msg='"reply_to" argument must be a list or tuple'): + EmailMessage(reply_to="reply_to@example.com") + + def test_space_continuation(self): + """ + Test for space continuation character in long (ASCII) subject headers + """ + email = EmailMessage( + "Long subject lines that get wrapped should contain a space continuation " + "character to get expected behavior in Outlook and Thunderbird", + "Content", + "from@example.com", + ["to@example.com"], + ) + message = email.message() + self.assertEqual( + message["Subject"].encode(), + b"Long subject lines that get wrapped should contain a space continuation\n" + b" character to get expected behavior in Outlook and Thunderbird", + ) + + def test_message_header_overrides(self): + """ + Specifying dates or message-ids in the extra headers overrides the + default values + """ + headers = {"date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + email = EmailMessage( + "subject", + "content", + "from@example.com", + ["to@example.com"], + headers=headers, + ) + + self.assertMessageHasHeaders( + email.message(), + { + ("Content-Transfer-Encoding", "7bit"), + ("Content-Type", 'text/plain; charset="utf-8"'), + ("From", "from@example.com"), + ("MIME-Version", "1.0"), + ("Message-ID", "foo"), + ("Subject", "subject"), + ("To", "to@example.com"), + ("date", "Fri, 09 Nov 2001 01:08:47 -0000"), + }, + ) + + def test_from_header(self): + """ + Make sure we can manually set the From header + """ + email = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + message = email.message() + self.assertEqual(message["From"], "from@example.com") + + def test_to_header(self): + """ + Make sure we can manually set the To header (#17444) + """ + email = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["list-subscriber@example.com", "list-subscriber2@example.com"], + headers={"To": "mailing-list@example.com"}, + ) + message = email.message() + self.assertEqual(message["To"], "mailing-list@example.com") + self.assertEqual(email.to, ["list-subscriber@example.com", "list-subscriber2@example.com"]) + + # If we don't set the To header manually, it should default to the `to` + # argument to the constructor. + email = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["list-subscriber@example.com", "list-subscriber2@example.com"], + ) + message = email.message() + self.assertEqual(message["To"], "list-subscriber@example.com, list-subscriber2@example.com") + self.assertEqual(email.to, ["list-subscriber@example.com", "list-subscriber2@example.com"]) + + def test_to_in_headers_only(self): + message = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + headers={"To": "to@example.com"}, + ).message() + self.assertEqual(message["To"], "to@example.com") + + def test_reply_to_header(self): + """ + Specifying 'Reply-To' in headers should override reply_to. + """ + email = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["to@example.com"], + reply_to=["foo@example.com"], + headers={"Reply-To": "override@example.com"}, + ) + message = email.message() + self.assertEqual(message["Reply-To"], "override@example.com") + + def test_reply_to_in_headers_only(self): + message = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com"], + headers={"Reply-To": "reply_to@example.com"}, + ).message() + self.assertEqual(message["Reply-To"], "reply_to@example.com") + + def test_multiple_message_call(self): + """ + Regression for #13259 - Make sure that headers are not changed when + calling EmailMessage.message() + """ + email = EmailMessage( + "Subject", + "Content", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + message = email.message() + self.assertEqual(message["From"], "from@example.com") + message = email.message() + self.assertEqual(message["From"], "from@example.com") + + def test_unicode_address_header(self): + """ + Regression for #11144 - When a to/from/cc header contains Unicode, + make sure the email addresses are parsed correctly (especially with + regards to commas) + """ + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ['"Firstname Sürname" ', "other@example.com"], + ) + self.assertEqual( + email.message()["To"], + "=?utf-8?q?Firstname_S=C3=BCrname?= , other@example.com", + ) + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ['"Sürname, Firstname" ', "other@example.com"], + ) + self.assertEqual( + email.message()["To"], + "=?utf-8?q?S=C3=BCrname=2C_Firstname?= , other@example.com", + ) + + def test_unicode_headers(self): + email = EmailMessage( + "Gżegżółka", + "Content", + "from@example.com", + ["to@example.com"], + headers={ + "Sender": '"Firstname Sürname" ', + "Comments": "My Sürname is non-ASCII", + }, + ) + message = email.message() + self.assertEqual(message["Subject"], "=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=") + self.assertEqual(message["Sender"], "=?utf-8?q?Firstname_S=C3=BCrname?= ") + self.assertEqual(message["Comments"], "=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=") + + def test_safe_mime_multipart(self): + """ + Make sure headers can be set with a different encoding than utf-8 in + SafeMIMEMultipart as well + """ + headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + from_email, to = "from@example.com", '"Sürname, Firstname" ' + text_content = "This is an important message." + html_content = "

This is an important message.

" + msg = EmailMultiAlternatives( + "Message from Firstname Sürname", + text_content, + from_email, + [to], + headers=headers, + ) + msg.attach_alternative(html_content, "text/html") + msg.encoding = "iso-8859-1" + self.assertEqual( + msg.message()["To"], + "=?iso-8859-1?q?S=FCrname=2C_Firstname?= ", + ) + self.assertEqual( + msg.message()["Subject"], + "=?iso-8859-1?q?Message_from_Firstname_S=FCrname?=", + ) + + def test_safe_mime_multipart_with_attachments(self): + """ + EmailMultiAlternatives includes alternatives if the body is empty and + it has attachments. + """ + msg = EmailMultiAlternatives(body="") + html_content = "

This is html

" + msg.attach_alternative(html_content, "text/html") + msg.attach("example.txt", "Text file content", "text/plain") + self.assertIn(html_content, msg.message().as_string()) + + def test_none_body(self): + msg = EmailMessage("subject", None, "from@example.com", ["to@example.com"]) + self.assertEqual(msg.body, "") + self.assertEqual(msg.message().get_payload(), "") + + def test_encoding(self): + """ + Regression for #12791 - Encode body correctly with other encodings + than utf-8 + """ + email = EmailMessage( + "Subject", + "Firstname Sürname is a great guy.", + "from@example.com", + ["other@example.com"], + ) + email.encoding = "iso-8859-1" + message = email.message() + self.assertMessageHasHeaders( + message, + { + ("MIME-Version", "1.0"), + ("Content-Type", 'text/plain; charset="iso-8859-1"'), + ("Content-Transfer-Encoding", "quoted-printable"), + ("Subject", "Subject"), + ("From", "from@example.com"), + ("To", "other@example.com"), + }, + ) + self.assertEqual(message.get_payload(), "Firstname S=FCrname is a great guy.") + + # MIME attachments works correctly with other encodings than utf-8. + text_content = "Firstname Sürname is a great guy." + html_content = "

Firstname Sürname is a great guy.

" + msg = EmailMultiAlternatives("Subject", text_content, "from@example.com", ["to@example.com"]) + msg.encoding = "iso-8859-1" + msg.attach_alternative(html_content, "text/html") + payload0 = msg.message().get_payload(0) + self.assertMessageHasHeaders( + payload0, + { + ("MIME-Version", "1.0"), + ("Content-Type", 'text/plain; charset="iso-8859-1"'), + ("Content-Transfer-Encoding", "quoted-printable"), + }, + ) + self.assertTrue(payload0.as_bytes().endswith(b"\n\nFirstname S=FCrname is a great guy.")) + payload1 = msg.message().get_payload(1) + self.assertMessageHasHeaders( + payload1, + { + ("MIME-Version", "1.0"), + ("Content-Type", 'text/html; charset="iso-8859-1"'), + ("Content-Transfer-Encoding", "quoted-printable"), + }, + ) + self.assertTrue( + payload1.as_bytes().endswith(b"\n\n

Firstname S=FCrname is a great guy.

") + ) + + def test_attachments(self): + headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + subject, from_email, to = "hello", "from@example.com", "to@example.com" + text_content = "This is an important message." + html_content = "

This is an important message.

" + msg = EmailMultiAlternatives(subject, text_content, from_email, [to], headers=headers) + msg.attach_alternative(html_content, "text/html") + msg.attach("an attachment.pdf", b"%PDF-1.4.%...", mimetype="application/pdf") + msg_bytes = msg.message().as_bytes() + message = message_from_bytes(msg_bytes) + self.assertTrue(message.is_multipart()) + self.assertEqual(message.get_content_type(), "multipart/mixed") + self.assertEqual(message.get_default_type(), "text/plain") + payload = message.get_payload() + self.assertEqual(payload[0].get_content_type(), "multipart/alternative") + self.assertEqual(payload[1].get_content_type(), "application/pdf") + + def test_attachments_two_tuple(self): + msg = EmailMessage(attachments=[("filename1", "content1")]) + filename, content, mimetype = self.get_decoded_attachments(msg)[0] + self.assertEqual(filename, "filename1") + self.assertEqual(content, b"content1") + self.assertEqual(mimetype, "application/octet-stream") + + def test_attachments_MIMEText(self): + txt = MIMEText("content1") + msg = EmailMessage(attachments=[txt]) + payload = msg.message().get_payload() + self.assertEqual(payload[0], txt) + + def test_non_ascii_attachment_filename(self): + headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + subject, from_email, to = "hello", "from@example.com", "to@example.com" + content = "This is the message." + msg = EmailMessage(subject, content, from_email, [to], headers=headers) + # Unicode in file name + msg.attach("une pièce jointe.pdf", b"%PDF-1.4.%...", mimetype="application/pdf") + msg_bytes = msg.message().as_bytes() + message = message_from_bytes(msg_bytes) + payload = message.get_payload() + self.assertEqual(payload[1].get_filename(), "une pièce jointe.pdf") + + def test_attach_file(self): + """ + Test attaching a file against different mimetypes and make sure that + a file will be attached and sent properly even if an invalid mimetype + is specified. + """ + files = ( + # filename, actual mimetype + ("file.txt", "text/plain"), + ("file.png", "image/png"), + ("file_txt", None), + ("file_png", None), + ("file_txt.png", "image/png"), + ("file_png.txt", "text/plain"), + ("file.eml", "message/rfc822"), + ) + test_mimetypes = ["text/plain", "image/png", None] + + for basename, real_mimetype in files: + for mimetype in test_mimetypes: + email = EmailMessage("subject", "body", "from@example.com", ["to@example.com"]) + self.assertEqual(mimetypes.guess_type(basename)[0], real_mimetype) + self.assertEqual(email.attachments, []) + file_path = os.path.join(os.path.dirname(__file__), "attachments", basename) + email.attach_file(file_path, mimetype=mimetype) + self.assertEqual(len(email.attachments), 1) + self.assertIn(basename, email.attachments[0]) + msgs_sent_num = email.send() + self.assertEqual(msgs_sent_num, 1) + + def test_attach_text_as_bytes(self): + msg = EmailMessage("subject", "body", "from@example.com", ["to@example.com"]) + msg.attach("file.txt", b"file content") + sent_num = msg.send() + self.assertEqual(sent_num, 1) + filename, content, mimetype = self.get_decoded_attachments(msg)[0] + self.assertEqual(filename, "file.txt") + self.assertEqual(content, b"file content") + self.assertEqual(mimetype, "text/plain") + + def test_attach_utf8_text_as_bytes(self): + """ + Non-ASCII characters encoded as valid UTF-8 are correctly transported + and decoded. + """ + msg = EmailMessage("subject", "body", "from@example.com", ["to@example.com"]) + msg.attach("file.txt", b"\xc3\xa4") # UTF-8 encoded a umlaut. + filename, content, mimetype = self.get_decoded_attachments(msg)[0] + self.assertEqual(filename, "file.txt") + self.assertEqual(content, b"\xc3\xa4") + self.assertEqual(mimetype, "text/plain") + + def test_attach_non_utf8_text_as_bytes(self): + """ + Binary data that can't be decoded as UTF-8 overrides the MIME type + instead of decoding the data. + """ + msg = EmailMessage("subject", "body", "from@example.com", ["to@example.com"]) + msg.attach("file.txt", b"\xff") # Invalid UTF-8. + filename, content, mimetype = self.get_decoded_attachments(msg)[0] + self.assertEqual(filename, "file.txt") + # Content should be passed through unmodified. + self.assertEqual(content, b"\xff") + self.assertEqual(mimetype, "application/octet-stream") + + def test_attach_mimetext_content_mimetype(self): + email_msg = EmailMessage() + txt = MIMEText("content") + msg = "content and mimetype must not be given when a MIMEBase instance " "is provided." + with self.assertRaises(ValueError, msg=msg): + email_msg.attach(txt, content="content") + with self.assertRaises(ValueError, msg=msg): + email_msg.attach(txt, mimetype="text/plain") + + def test_attach_content_none(self): + email_msg = EmailMessage() + msg = "content must be provided." + with self.assertRaises(ValueError, msg=msg): + email_msg.attach("file.txt", mimetype="application/pdf") + + def test_dont_mangle_from_in_body(self): + """ + Make sure that EmailMessage doesn't mangle + 'From ' in message body. + """ + email = EmailMessage( + "Subject", + "From the future", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + self.assertNotIn(b">From the future", email.message().as_bytes()) + + def test_dont_base64_encode(self): + """Shouldn't use Base64 encoding at all""" + msg = EmailMessage( + "Subject", + "UTF-8 encoded body", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + self.assertIn(b"Content-Transfer-Encoding: 7bit", msg.message().as_bytes()) + + # Shouldn't use quoted printable, should detect it can represent + # content with 7 bit data. + msg = EmailMessage( + "Subject", + "Body with only ASCII characters.", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + s = msg.message().as_bytes() + self.assertIn(b"Content-Transfer-Encoding: 7bit", s) + + # Shouldn't use quoted printable, should detect it can represent + # content with 8 bit data. + msg = EmailMessage( + "Subject", + "Body with latin characters: àáä.", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + s = msg.message().as_bytes() + self.assertIn(b"Content-Transfer-Encoding: 8bit", s) + s = msg.message().as_string() + self.assertIn("Content-Transfer-Encoding: 8bit", s) + + msg = EmailMessage( + "Subject", + "Body with non latin characters: А Б В Г Д Е Ж Ѕ З И І К Л М Н О П.", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + s = msg.message().as_bytes() + self.assertIn(b"Content-Transfer-Encoding: 8bit", s) + s = msg.message().as_string() + self.assertIn("Content-Transfer-Encoding: 8bit", s) + + def test_dont_base64_encode_message_rfc822(self): + """Shouldn't use base64 encoding for a child EmailMessage attachment.""" + # Create a child message first + child_msg = EmailMessage( + "Child Subject", + "Some body of child message", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + child_s = child_msg.message().as_string() + + # Now create a parent + parent_msg = EmailMessage( + "Parent Subject", + "Some parent body", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + + # Attach to parent as a string + parent_msg.attach(content=child_s, mimetype="message/rfc822") + parent_s = parent_msg.message().as_string() + + # The child message header is not base64 encoded + self.assertIn("Child Subject", parent_s) + + # Feature test: try attaching email.Message object directly to the mail. + parent_msg = EmailMessage( + "Parent Subject", + "Some parent body", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + parent_msg.attach(content=child_msg.message(), mimetype="message/rfc822") + parent_s = parent_msg.message().as_string() + + # The child message header is not base64 encoded + self.assertIn("Child Subject", parent_s) + + # Feature test: try attaching EmailMessage object directly to the mail. + parent_msg = EmailMessage( + "Parent Subject", + "Some parent body", + "bounce@example.com", + ["to@example.com"], + headers={"From": "from@example.com"}, + ) + parent_msg.attach(content=child_msg, mimetype="message/rfc822") + parent_s = parent_msg.message().as_string() + + # The child message header is not base64 encoded + self.assertIn("Child Subject", parent_s) + + def test_custom_utf8_encoding(self): + """A UTF-8 charset with a custom body encoding is respected.""" + body = "Body with latin characters: àáä." + msg = EmailMessage("Subject", body, "bounce@example.com", ["to@example.com"]) + encoding = charset.Charset("utf-8") + encoding.body_encoding = charset.QP + msg.encoding = encoding + message = msg.message() + self.assertMessageHasHeaders( + message, + { + ("MIME-Version", "1.0"), + ("Content-Type", 'text/plain; charset="utf-8"'), + ("Content-Transfer-Encoding", "quoted-printable"), + }, + ) + self.assertEqual(message.get_payload(), encoding.body_encode(body)) + + def test_sanitize_address(self): + """Email addresses are properly sanitized.""" + for email_address, encoding, expected_result in ( + # ASCII addresses. + ("to@example.com", "ascii", "to@example.com"), + ("to@example.com", "utf-8", "to@example.com"), + (("A name", "to@example.com"), "ascii", "A name "), + ( + ("A name", "to@example.com"), + "utf-8", + "A name ", + ), + ("localpartonly", "ascii", "localpartonly"), + # ASCII addresses with display names. + ("A name ", "ascii", "A name "), + ("A name ", "utf-8", "A name "), + ('"A name" ', "ascii", "A name "), + ('"A name" ', "utf-8", "A name "), + # Unicode addresses (supported per RFC-6532). + ("tó@example.com", "utf-8", "=?utf-8?b?dMOz?=@example.com"), + ("to@éxample.com", "utf-8", "to@xn--xample-9ua.com"), + ( + ("Tó Example", "tó@example.com"), + "utf-8", + "=?utf-8?q?T=C3=B3_Example?= <=?utf-8?b?dMOz?=@example.com>", + ), + # Unicode addresses with display names. + ( + "Tó Example ", + "utf-8", + "=?utf-8?q?T=C3=B3_Example?= <=?utf-8?b?dMOz?=@example.com>", + ), + ( + "To Example ", + "ascii", + "To Example ", + ), + ( + "To Example ", + "utf-8", + "To Example ", + ), + # Addresses with two @ signs. + ('"to@other.com"@example.com', "utf-8", r'"to@other.com"@example.com'), + ( + '"to@other.com" ', + "utf-8", + '"to@other.com" ', + ), + ( + ("To Example", "to@other.com@example.com"), + "utf-8", + 'To Example <"to@other.com"@example.com>', + ), + # Addresses with long unicode display names. + ( + "Tó Example very long" * 4 + " ", + "utf-8", + "=?utf-8?q?T=C3=B3_Example_very_longT=C3=B3_Example_very_longT" + "=C3=B3_Example_?=\n" + " =?utf-8?q?very_longT=C3=B3_Example_very_long?= " + "", + ), + ( + ("Tó Example very long" * 4, "to@example.com"), + "utf-8", + "=?utf-8?q?T=C3=B3_Example_very_longT=C3=B3_Example_very_longT" + "=C3=B3_Example_?=\n" + " =?utf-8?q?very_longT=C3=B3_Example_very_long?= " + "", + ), + # Address with long display name and unicode domain. + ( + ("To Example very long" * 4, "to@exampl€.com"), + "utf-8", + "To Example very longTo Example very longTo Example very longT" + "o Example very\n" + " long ", + ), + ): + with self.subTest(email_address=email_address, encoding=encoding): + self.assertEqual(sanitize_address(email_address, encoding), expected_result) + + def test_sanitize_address_invalid(self): + for email_address in ( + # Invalid address with two @ signs. + "to@other.com@example.com", + # Invalid address without the quotes. + "to@other.com ", + # Other invalid addresses. + "@", + "to@", + "@example.com", + ("", ""), + ): + with self.subTest(email_address=email_address): + with self.assertRaises(ValueError, msg="Invalid address"): + sanitize_address(email_address, encoding="utf-8") + + def test_sanitize_address_header_injection(self): + msg = "Invalid address; address parts cannot contain newlines." + tests = [ + "Name\nInjection ", + ("Name\nInjection", "to@xample.com"), + "Name ", + ("Name", "to\ninjection@example.com"), + ] + for email_address in tests: + with self.subTest(email_address=email_address): + with self.assertRaises(ValueError, msg=msg): + sanitize_address(email_address, encoding="utf-8") + + def test_email_multi_alternatives_content_mimetype_none(self): + email_msg = EmailMultiAlternatives() + msg = "Both content and mimetype must be provided." + with self.assertRaises(ValueError, msg=msg): + email_msg.attach_alternative(None, "text/html") + with self.assertRaises(ValueError, msg=msg): + email_msg.attach_alternative("

content

", None) + + def test_send_unicode(self): + email = EmailMessage("Chère maman", "Je t'aime très fort", "from@example.com", ["to@example.com"]) + num_sent = email.send() + self.assertEqual(num_sent, 1) + sent_msg = self.get_message() + self.assertEqual(sent_msg['subject'], "=?utf-8?q?Ch=C3=A8re_maman?=") + self.assertEqual(sent_msg.get_payload(decode=True).decode(), "Je t'aime très fort") + + def test_send_long_lines(self): + """ + Email line length is limited to 998 chars by the RFC 5322 Section + 2.1.1. + Message body containing longer lines are converted to Quoted-Printable + to avoid having to insert newlines, which could be hairy to do properly. + """ + # Unencoded body length is < 998 (840) but > 998 when utf-8 encoded. + email = EmailMessage("Subject", "В южных морях " * 60, "from@example.com", ["to@example.com"]) + email.send() + message = self.get_message() + self.assertMessageHasHeaders( + message, + { + ("MIME-Version", "1.0"), + ("Content-Type", 'text/plain; charset="utf-8"'), + ("Content-Transfer-Encoding", "quoted-printable"), + }, + ) + + def test_send_verbose_name(self): + email = EmailMessage( + "Subject", + "Content", + '"Firstname Sürname" ', + ["to@example.com"], + ) + email.send() + message = self.get_message() + self.assertEqual(message["subject"], "Subject") + self.assertEqual(message.get_payload(), "Content") + self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= ") + + def test_plaintext_send_mail(self): + """ + Test send_mail without the html_message + regression test for adding html_message parameter to send_mail() + """ + self.mail.send_mail("Subject", "Content", "sender@example.com", ["nobody@example.com"]) + message = self.get_message() + + self.assertEqual(message.get("subject"), "Subject") + self.assertEqual(message.get_all("to"), ["nobody@example.com"]) + self.assertFalse(message.is_multipart()) + self.assertEqual(message.get_payload(), "Content") + self.assertEqual(message.get_content_type(), "text/plain") + + def test_html_send_mail(self): + """Test html_message argument to send_mail""" + self.mail.send_mail( + "Subject", + "Content", + "sender@example.com", + ["nobody@example.com"], + html_message="HTML Content", + ) + message = self.get_message() + + self.assertEqual(message.get("subject"), "Subject") + self.assertEqual(message.get_all("to"), ["nobody@example.com"]) + self.assertTrue(message.is_multipart()) + self.assertEqual(len(message.get_payload()), 2) + self.assertEqual(message.get_payload(0).get_payload(), "Content") + self.assertEqual(message.get_payload(0).get_content_type(), "text/plain") + self.assertEqual(message.get_payload(1).get_payload(), "HTML Content") + self.assertEqual(message.get_payload(1).get_content_type(), "text/html") + + def test_message_cc_header(self): + email = EmailMessage( + "Subject", + "Content", + "from@example.com", + ["to@example.com"], + cc=["cc@example.com"], + ) + self.mail.get_connection().send_messages([email]) + message = self.get_message() + self.assertMessageHasHeaders( + message, + { + ("MIME-Version", "1.0"), + ("Content-Type", 'text/plain; charset="utf-8"'), + ("Content-Transfer-Encoding", "7bit"), + ("Subject", "Subject"), + ("From", "from@example.com"), + ("To", "to@example.com"), + ("Cc", "cc@example.com"), + }, + ) + self.assertIn("\nDate: ", message.as_string()) + + def test_idn_send(self): + self.assertTrue(self.mail.send_mail("Subject", "Content", "from@öäü.com", ["to@öäü.com"])) + message = self.get_message() + self.assertEqual(message.get("subject"), "Subject") + self.assertEqual(message.get("from"), "from@xn--4ca9at.com") + self.assertEqual(message.get("to"), "to@xn--4ca9at.com") + + self.flush_mailbox() + + m = EmailMessage("Subject", "Content", "from@öäü.com", ["to@öäü.com"], cc=["cc@öäü.com"]) + m.send() + message = self.get_message() + self.assertEqual(message.get("subject"), "Subject") + self.assertEqual(message.get("from"), "from@xn--4ca9at.com") + self.assertEqual(message.get("to"), "to@xn--4ca9at.com") + self.assertEqual(message.get("cc"), "cc@xn--4ca9at.com") + + def test_recipient_without_domain(self): + """ + Regression test for #15042 + """ + self.assertTrue(self.mail.send_mail("Subject", "Content", "tester", ["little bird"])) + message = self.get_message() + self.assertEqual(message.get("subject"), "Subject") + self.assertEqual(message.get("from"), "tester") + self.assertEqual(message.get("to"), "little bird")