From 40116ccd47d13e3fcfbc8e6c968b29178ec674b9 Mon Sep 17 00:00:00 2001 From: simonkagwi Date: Thu, 14 Nov 2024 23:09:48 +0300 Subject: [PATCH 1/2] Send high priority transactional messages before bulk messages (#25) Co-authored-by: Tobias McNulty Co-authored-by: Colin Copeland --- src/smpp_gateway/admin.py | 2 + src/smpp_gateway/client.py | 4 + .../management/commands/smpp_client.py | 10 ++ ...tmessage_mt_message_status_idx_and_more.py | 42 +++++++ src/smpp_gateway/models.py | 14 ++- src/smpp_gateway/outgoing.py | 1 + src/smpp_gateway/queries.py | 8 +- src/smpp_gateway/router.py | 42 +++++++ src/smpp_gateway/smpp.py | 3 + tests/factories.py | 10 +- tests/test_client.py | 103 +++++++++++++++++- tests/test_queries.py | 15 +++ tests/test_router.py | 72 ++++++++++++ 13 files changed, 320 insertions(+), 6 deletions(-) create mode 100644 src/smpp_gateway/migrations/0008_remove_mtmessage_mt_message_status_idx_and_more.py create mode 100644 src/smpp_gateway/router.py create mode 100644 tests/test_router.py diff --git a/src/smpp_gateway/admin.py b/src/smpp_gateway/admin.py index c5cac71..0149ace 100644 --- a/src/smpp_gateway/admin.py +++ b/src/smpp_gateway/admin.py @@ -59,11 +59,13 @@ class MTMessageAdmin(admin.ModelAdmin): list_display = ( "short_message", "backend", + "priority_flag", "status", "create_time", ) list_filter = ( "status", + "priority_flag", "backend", MTMessageCommandStatusListFilter, ) diff --git a/src/smpp_gateway/client.py b/src/smpp_gateway/client.py index 8dc3e2a..09467e6 100644 --- a/src/smpp_gateway/client.py +++ b/src/smpp_gateway/client.py @@ -69,6 +69,7 @@ def __init__( backend: Backend, hc_worker: HealthchecksIoWorker, submit_sm_params: dict, + set_priority_flag: bool, mt_messages_per_second: int, *args, **kwargs, @@ -78,6 +79,7 @@ def __init__( self.backend = backend self.hc_worker = hc_worker self.submit_sm_params = submit_sm_params + self.set_priority_flag = set_priority_flag self.mt_messages_per_second = mt_messages_per_second super().__init__(*args, **kwargs) self._pg_conn = pg_listen(self.backend.name) @@ -182,6 +184,8 @@ def send_mt_messages(self): submit_sm_resps = [] for sms in smses: params = {**self.submit_sm_params, **sms["params"]} + if self.set_priority_flag and sms["priority_flag"] is not None: + params["priority_flag"] = sms["priority_flag"] pdus = self.split_and_send_message(sms["short_message"], **params) # Create placeholder MTMessageStatus objects in the DB, which # the message_sent handler will later update with the actual command_status diff --git a/src/smpp_gateway/management/commands/smpp_client.py b/src/smpp_gateway/management/commands/smpp_client.py index e484f69..be422bd 100644 --- a/src/smpp_gateway/management/commands/smpp_client.py +++ b/src/smpp_gateway/management/commands/smpp_client.py @@ -1,3 +1,4 @@ +import argparse import os from django.core.management.base import BaseCommand @@ -85,6 +86,15 @@ def add_arguments(self, parser): help="Pings healthchecks.io with the specified ping key and check slug. " "If set, --hc-ping-key must also be set.", ) + parser.add_argument( + "--set-priority-flag", + action=argparse.BooleanOptionalAction, + default=False, + help="Whether to set the `priority_flag` param in the PDU, if one " + "is provided for a message. If a priority_flag is included in " + "--submit-sm-params, the priority_flag set on the individual " + "message will take precedence.", + ) def handle(self, *args, **options): start_smpp_client(options) diff --git a/src/smpp_gateway/migrations/0008_remove_mtmessage_mt_message_status_idx_and_more.py b/src/smpp_gateway/migrations/0008_remove_mtmessage_mt_message_status_idx_and_more.py new file mode 100644 index 0000000..4a56ed3 --- /dev/null +++ b/src/smpp_gateway/migrations/0008_remove_mtmessage_mt_message_status_idx_and_more.py @@ -0,0 +1,42 @@ +# Generated by Django 4.2.16 on 2024-11-14 09:35 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("smpp_gateway", "0007_momessage_error_alter_momessage_status"), + ] + + operations = [ + migrations.RemoveIndex( + model_name="mtmessage", + name="mt_message_status_idx", + ), + migrations.AddField( + model_name="mtmessage", + name="priority_flag", + field=models.IntegerField( + choices=[ + (0, "Level 0 (lowest) priority"), + (1, "Level 1 priority"), + (2, "Level 2 priority"), + (3, "Level 3 (highest) priority"), + ], + null=True, + verbose_name="priority flag", + ), + ), + migrations.AddIndex( + model_name="mtmessage", + index=models.Index( + models.F("status"), + models.OrderBy( + models.F("priority_flag"), descending=True, nulls_last=True + ), + condition=models.Q(("status", "new")), + name="mt_message_status_idx", + ), + ), + ] diff --git a/src/smpp_gateway/models.py b/src/smpp_gateway/models.py index c82a0c2..3be49e9 100644 --- a/src/smpp_gateway/models.py +++ b/src/smpp_gateway/models.py @@ -84,6 +84,14 @@ class Status(models.TextChoices): DELIVERED = "delivered", _("Delivered") ERROR = "error", _("Error") + class PriorityFlag(models.IntegerChoices): + # Based on the priority_flag values in the SMPP Spec + # https://smpp.org/SMPP_v3_4_Issue1_2.pdf + LEVEL_0 = 0, _("Level 0 (lowest) priority") + LEVEL_1 = 1, _("Level 1 priority") + LEVEL_2 = 2, _("Level 2 priority") + LEVEL_3 = 3, _("Level 3 (highest) priority") + backend = models.ForeignKey( Backend, on_delete=models.PROTECT, verbose_name=_("backend") ) @@ -91,6 +99,9 @@ class Status(models.TextChoices): short_message = models.TextField(_("short message")) params = models.JSONField(_("params")) status = models.CharField(_("status"), max_length=32, choices=Status.choices) + priority_flag = models.IntegerField( + _("priority flag"), choices=PriorityFlag.choices, null=True + ) def save(self, *args, **kwargs): super().save(*args, **kwargs) @@ -106,7 +117,8 @@ class Meta: indexes = ( models.Index( # Allow for quick filtering of messages that need to be processed - fields=["status"], + "status", + models.F("priority_flag").desc(nulls_last=True), name="mt_message_status_idx", condition=models.Q(status="new"), # No way to access Status.NEW here? ), diff --git a/src/smpp_gateway/outgoing.py b/src/smpp_gateway/outgoing.py index b448974..12878f9 100644 --- a/src/smpp_gateway/outgoing.py +++ b/src/smpp_gateway/outgoing.py @@ -37,6 +37,7 @@ def prepare_request(self, id_, text, identities, context): "short_message": text, "params": params, "status": MTMessage.Status.NEW, + "priority_flag": context.get("priority_flag"), } def send(self, id_, text, identities, context=None): diff --git a/src/smpp_gateway/queries.py b/src/smpp_gateway/queries.py index a580328..cb78448 100644 --- a/src/smpp_gateway/queries.py +++ b/src/smpp_gateway/queries.py @@ -5,7 +5,7 @@ import psycopg2.extensions from django.db import connection, transaction -from django.db.models import QuerySet +from django.db.models import F, QuerySet from rapidsms.models import Backend from smpp_gateway.models import MOMessage, MTMessage @@ -40,13 +40,15 @@ def pg_notify(channel: str): def get_mt_messages_to_send(limit: int, backend: Backend) -> list[dict[str, Any]]: """Fetches up to `limit` messages intended for `backend`, updates their - status to SENDING, and returns select fields from the model. + status to SENDING, and returns select fields from the model. The messages + are sorted by descending `priority_flag`. """ with transaction.atomic(): smses = list( MTMessage.objects.filter(status=MTMessage.Status.NEW, backend=backend) .select_for_update(skip_locked=True) - .values("id", "short_message", "params")[:limit] + .order_by(F("priority_flag").desc(nulls_last=True)) + .values("id", "short_message", "params", "priority_flag")[:limit] ) if smses: pks = [sms["id"] for sms in smses] diff --git a/src/smpp_gateway/router.py b/src/smpp_gateway/router.py new file mode 100644 index 0000000..8628bd5 --- /dev/null +++ b/src/smpp_gateway/router.py @@ -0,0 +1,42 @@ +from rapidsms.messages.incoming import IncomingMessage +from rapidsms.messages.outgoing import OutgoingMessage +from rapidsms.router.blocking import BlockingRouter + +from smpp_gateway.models import MTMessage + + +class PriorityIncomingMessage(IncomingMessage): + default_priority_flag = MTMessage.PriorityFlag.LEVEL_2 + + def respond(self, text, **kwargs): + fields = kwargs.get("fields", {}) + if "priority_flag" not in fields: + fields["priority_flag"] = self.default_priority_flag.value + kwargs["fields"] = fields + return super().respond(text, **kwargs) + + +class PriorityOutgoingMessage(OutgoingMessage): + default_priority_flag = MTMessage.PriorityFlag.LEVEL_1 + + def extra_backend_context(self): + context = super().extra_backend_context() + context["priority_flag"] = self.fields.get( + "priority_flag", self.default_priority_flag.value + ) + return context + + +class PriorityBlockingRouter(BlockingRouter): + incoming_message_class = PriorityIncomingMessage + outgoing_message_class = PriorityOutgoingMessage + + def new_incoming_message(self, text, connections, class_=None, **kwargs): + if class_ is None: + class_ = self.incoming_message_class + return super().new_incoming_message(text, connections, class_=class_, **kwargs) + + def new_outgoing_message(self, text, connections, class_=None, **kwargs): + if class_ is None: + class_ = self.outgoing_message_class + return super().new_outgoing_message(text, connections, class_=class_, **kwargs) diff --git a/src/smpp_gateway/smpp.py b/src/smpp_gateway/smpp.py index 44ff4e2..1c1d401 100644 --- a/src/smpp_gateway/smpp.py +++ b/src/smpp_gateway/smpp.py @@ -18,6 +18,7 @@ def get_smpplib_client( notify_mo_channel: str, backend: Backend, submit_sm_params: dict, + set_priority_flag: bool, mt_messages_per_second: int, hc_check_uuid: str, hc_ping_key: str, @@ -35,6 +36,7 @@ def get_smpplib_client( backend, hc_worker, submit_sm_params, + set_priority_flag, mt_messages_per_second, host, port, @@ -69,6 +71,7 @@ def start_smpp_client(options): options["notify_mo_channel"], backend, json.loads(options["submit_sm_params"]), + options["set_priority_flag"], options["mt_messages_per_second"], options["hc_check_uuid"], options["hc_ping_key"], diff --git a/tests/factories.py b/tests/factories.py index 45b0acd..c09fe8f 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -4,7 +4,7 @@ from django.utils.timezone import now from factory.django import DjangoModelFactory from faker import Faker -from rapidsms.models import Backend +from rapidsms.models import Backend, Connection from smpp_gateway.models import MOMessage, MTMessage, MTMessageStatus @@ -59,3 +59,11 @@ class Meta: command_status = smpplib.consts.SMPP_ESME_ROK message_id = "" delivery_report = b"" + + +class ConnectionFactory(DjangoModelFactory): + class Meta: + model = Connection + + backend = factory.SubFactory(BackendFactory) + identity = factory.Faker("word") diff --git a/tests/test_client.py b/tests/test_client.py index 280ce09..3331ac5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,3 +1,5 @@ +from unittest import mock + import pytest from smpplib import consts as smpplib_consts @@ -5,7 +7,7 @@ from smpp_gateway.models import MOMessage, MTMessage from smpp_gateway.queries import pg_listen -from smpp_gateway.smpp import get_smpplib_client +from smpp_gateway.smpp import PgSmppClient, get_smpplib_client from tests.factories import BackendFactory, MTMessageFactory, MTMessageStatusFactory @@ -22,6 +24,7 @@ def test_received_mo_message(self): "notify_mo_channel", backend, {}, # submit_sm_params + False, # set_priority_flag 20, # mt_messages_per_second "", # hc_check_uuid "", # hc_ping_key @@ -58,6 +61,7 @@ def test_received_message_receipt(self): "notify_mo_channel", backend, {}, # submit_sm_params + False, # set_priority_flag 20, # mt_messages_per_second "", # hc_check_uuid "", # hc_ping_key @@ -100,6 +104,7 @@ def test_received_null_short_message(self): "notify_mo_channel", backend, {}, # submit_sm_params + False, # set_priority_flag 20, # mt_messages_per_second "", # hc_check_uuid "", # hc_ping_key @@ -138,6 +143,7 @@ def test_message_sent_handler(): "notify_mo_channel", backend, {}, # submit_sm_params + False, # set_priority_flag 20, # mt_messages_per_second "", # hc_check_uuid "", # hc_ping_key @@ -157,3 +163,98 @@ def test_message_sent_handler(): assert outbound_msg_status.command_status == smpplib_consts.SMPP_ESME_RSUBMITFAIL assert outbound_msg_status.message_id == "qwerty" + + +@pytest.mark.django_db(transaction=True) +@mock.patch.object(PgSmppClient, "send_message", return_value=mock.Mock(sequence=1)) +class TestSetPriorityFlag: + def get_client_and_message( + self, + submit_sm_params=None, + set_priority_flag=True, + message_priority_flag=MTMessage.PriorityFlag.LEVEL_1, + ): + backend = BackendFactory() + client = get_smpplib_client( + "127.0.0.1", + 8000, + "notify_mo_channel", + backend, + submit_sm_params or {}, + set_priority_flag, + 20, # mt_messages_per_second + "", # hc_check_uuid + "", # hc_ping_key + "", # hc_check_slug + ) + message = MTMessageFactory( + status=MTMessage.Status.NEW, + backend=backend, + priority_flag=message_priority_flag, + ) + return client, message + + def test_set_priority_flag_is_true(self, mock_send_message): + """If set_priority_flag is True and the priority_flag is set on a MTMessage + object, the priority_flag param should be set in the PDU. + """ + client, message = self.get_client_and_message() + client.receive_pg_notify() + + mock_send_message.assert_called_once() + assert ( + mock_send_message.call_args.kwargs["priority_flag"] == message.priority_flag + ) + + def test_set_priority_flag_is_true_and_priority_in_submit_sm_params( + self, mock_send_message + ): + """If set_priority_flag is True and the priority_flag is set on a MTMessage + object and also in the submit_sm_params dictionary, the priority_flag from + the message object should take precendence. + """ + client, message = self.get_client_and_message( + {"priority_flag": MTMessage.PriorityFlag.LEVEL_0} + ) + client.receive_pg_notify() + + mock_send_message.assert_called_once() + assert ( + mock_send_message.call_args.kwargs["priority_flag"] == message.priority_flag + ) + + def test_set_priority_flag_is_true_but_priority_not_set(self, mock_send_message): + """If set_priority_flag is True and but the priority_flag is not set on a + MTMessage object, the priority_flag param should NOT be set in the PDU. + """ + client = self.get_client_and_message(message_priority_flag=None)[0] + client.receive_pg_notify() + + mock_send_message.assert_called_once() + assert "priority_flag" not in mock_send_message.call_args.kwargs + + def test_set_priority_flag_is_false(self, mock_send_message): + """If set_priority_flag is False and the priority_flag is set on a + MTMessage object, the priority_flag param should NOT be set in the PDU. + """ + client = self.get_client_and_message(set_priority_flag=False)[0] + client.receive_pg_notify() + + mock_send_message.assert_called_once() + assert "priority_flag" not in mock_send_message.call_args.kwargs + + def test_set_priority_flag_is_false_but_priority_in_submit_sm_params( + self, mock_send_message + ): + """If set_priority_flag is False and but a priority_flag was set in the + submit_sm_params dictionary, the priority_flag from submit_sm_params + should still be set in the PDU. + """ + priority = MTMessage.PriorityFlag.LEVEL_0 + client, message = self.get_client_and_message( + {"priority_flag": priority}, False + ) + client.receive_pg_notify() + + mock_send_message.assert_called_once() + assert mock_send_message.call_args.kwargs["priority_flag"] == priority diff --git a/tests/test_queries.py b/tests/test_queries.py index 3d5222e..b5439fc 100644 --- a/tests/test_queries.py +++ b/tests/test_queries.py @@ -84,6 +84,21 @@ def test_updates_status_to_sending(self): else: assert message.status == MTMessage.Status.NEW + def test_messages_sorted_by_priority_flag(self): + """Tests that messages are sorted by descending priority_flag, and + messages with null priority_flag come last. + """ + backend = BackendFactory() + MTMessageFactory(backend=backend, priority_flag=MTMessage.PriorityFlag.LEVEL_2) + MTMessageFactory(backend=backend, priority_flag=MTMessage.PriorityFlag.LEVEL_0) + MTMessageFactory(backend=backend, priority_flag=None) + MTMessageFactory(backend=backend, priority_flag=MTMessage.PriorityFlag.LEVEL_1) + MTMessageFactory(backend=backend, priority_flag=MTMessage.PriorityFlag.LEVEL_3) + + messages = get_mt_messages_to_send(10, backend) + + assert [3, 2, 1, 0, None] == [i["priority_flag"] for i in messages] + @pytest.mark.django_db class TestGetMessagesToProcess: diff --git a/tests/test_router.py b/tests/test_router.py new file mode 100644 index 0000000..432f81e --- /dev/null +++ b/tests/test_router.py @@ -0,0 +1,72 @@ +from django.test import TestCase +from django.test.utils import override_settings + +from smpp_gateway.router import PriorityBlockingRouter + +from .factories import ConnectionFactory + + +@override_settings( + INSTALLED_BACKENDS={ + "smppsim": { + "ENGINE": "smpp_gateway.outgoing.SMPPGatewayBackend", + } + } +) +class PriorityBlockingRouterTest(TestCase): + def setUp(self): + self.router = PriorityBlockingRouter(apps=[], backends={}) + self.connection = ConnectionFactory() + + def test_new_incoming_message(self): + """The new_incoming_message method should return a PriorityIncomingMessage + object by default. + """ + msg = self.router.new_incoming_message( + text="foo", connections=[self.connection] + ) + self.assertIsInstance(msg, PriorityBlockingRouter.incoming_message_class) + + def test_incoming_message_respond_sets_priority_flag(self): + """Calling respond() on a PriorityIncomingMessage object should add + a priority_flag in the fields dict, if one is not already set. + """ + msg = self.router.new_incoming_message( + text="foo", connections=[self.connection] + ) + + # Set priority_flag to the default value of 2 if not already set + result = msg.respond("response") + self.assertEqual( + result["fields"]["priority_flag"], msg.default_priority_flag.value + ) + + # Do not change the priority_flag if it's already set + result = msg.respond("response", fields={"priority_flag": 1}) + self.assertEqual(result["fields"]["priority_flag"], 1) + + def test_new_outgoing_message(self): + """The new_outgoing_message method should return a PriorityOutgoingMessage + object by default. + """ + msg = self.router.new_outgoing_message( + text="foo", connections=[self.connection] + ) + self.assertIsInstance(msg, PriorityBlockingRouter.outgoing_message_class) + + def test_outgoing_message_extra_backend_context_has_priority_flag(self): + """The new_outgoing_message method should return a PriorityOutgoingMessage + object by default. + """ + msg = self.router.new_outgoing_message( + text="foo", connections=[self.connection], fields={"priority_flag": 2} + ) + context = msg.extra_backend_context() + self.assertEqual(context["priority_flag"], 2) + + # priority_flag should default to 1 if not set in the message's fields + msg = self.router.new_outgoing_message( + text="foo", connections=[self.connection] + ) + context = msg.extra_backend_context() + self.assertEqual(context["priority_flag"], msg.default_priority_flag.value) From f82f05c8b4baa84b9cf6031cbba696c196afc7fa Mon Sep 17 00:00:00 2001 From: Tobias McNulty Date: Thu, 14 Nov 2024 15:19:03 -0500 Subject: [PATCH 2/2] Add changelog and bump version for 1.4.0 Co-authored-by: Colin Copeland Co-authored-by: Simon Kagwi --- CHANGES.md | 5 +++++ setup.py | 2 +- src/smpp_gateway/__init__.py | 2 +- tests/test_smpp_gateway.py | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0200141..76c5a45 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## 1.4.0 (November 14, 2024) + +- Send high priority transactional messages before bulk messages (#25) +- Allow MT message send rate to be configured (#22) + ## 1.3.0 (November 5, 2024) - Handle message decoding more safely (#20) diff --git a/setup.py b/setup.py index c3d515e..51458e7 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ def read_file(filename): if __name__ == "__main__": setuptools.setup( name="smpp_gateway", - version="1.3.0", + version="1.4.0", license="MIT", install_requires=[ "RapidSMS>=2.0", diff --git a/src/smpp_gateway/__init__.py b/src/smpp_gateway/__init__.py index 3dc1f76..3e8d9f9 100644 --- a/src/smpp_gateway/__init__.py +++ b/src/smpp_gateway/__init__.py @@ -1 +1 @@ -__version__ = "0.1.0" +__version__ = "1.4.0" diff --git a/tests/test_smpp_gateway.py b/tests/test_smpp_gateway.py index 96dc460..3a6325f 100644 --- a/tests/test_smpp_gateway.py +++ b/tests/test_smpp_gateway.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.1.0" + assert __version__ == "1.4.0"