Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send high priority, transactional messages before bulk messages #25

Merged
merged 11 commits into from
Nov 14, 2024
2 changes: 2 additions & 0 deletions src/smpp_gateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ class MTMessageAdmin(admin.ModelAdmin):
list_display = (
"short_message",
"backend",
"priority_flag",
"status",
"create_time",
)
list_filter = (
"status",
"priority_flag",
"backend",
MTMessageCommandStatusListFilter,
)
Expand Down
4 changes: 4 additions & 0 deletions src/smpp_gateway/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
backend: Backend,
hc_worker: HealthchecksIoWorker,
submit_sm_params: dict,
set_priority_flag: bool,
mt_messages_per_second: int,
*args,
**kwargs,
Expand All @@ -78,6 +79,7 @@ def __init__(
self.backend = backend
self.hc_worker = hc_worker
self.submit_sm_params = submit_sm_params
self.set_priority_flag = set_priority_flag
self.mt_messages_per_second = mt_messages_per_second
super().__init__(*args, **kwargs)
self._pg_conn = pg_listen(self.backend.name)
Expand Down Expand Up @@ -182,6 +184,8 @@ def send_mt_messages(self):
submit_sm_resps = []
for sms in smses:
params = {**self.submit_sm_params, **sms["params"]}
if self.set_priority_flag and sms["priority_flag"] is not None:
params["priority_flag"] = sms["priority_flag"]
pdus = self.split_and_send_message(sms["short_message"], **params)
# Create placeholder MTMessageStatus objects in the DB, which
# the message_sent handler will later update with the actual command_status
Expand Down
10 changes: 10 additions & 0 deletions src/smpp_gateway/management/commands/smpp_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import os

from django.core.management.base import BaseCommand
Expand Down Expand Up @@ -85,6 +86,15 @@ def add_arguments(self, parser):
help="Pings healthchecks.io with the specified ping key and check slug. "
"If set, --hc-ping-key must also be set.",
)
parser.add_argument(
"--set-priority-flag",
action=argparse.BooleanOptionalAction,
default=False,
help="Whether to set the `priority_flag` param in the PDU, if one "
"is provided for a message. If a priority_flag is included in "
"--submit-sm-params, the priority_flag set on the individual "
"message will take precedence.",
)

def handle(self, *args, **options):
start_smpp_client(options)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Generated by Django 4.2.16 on 2024-11-14 09:35

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("smpp_gateway", "0007_momessage_error_alter_momessage_status"),
]

operations = [
migrations.RemoveIndex(
model_name="mtmessage",
name="mt_message_status_idx",
),
migrations.AddField(
model_name="mtmessage",
name="priority_flag",
field=models.IntegerField(
choices=[
(0, "Level 0 (lowest) priority"),
(1, "Level 1 priority"),
(2, "Level 2 priority"),
(3, "Level 3 (highest) priority"),
],
null=True,
verbose_name="priority flag",
),
),
migrations.AddIndex(
model_name="mtmessage",
index=models.Index(
models.F("status"),
models.OrderBy(
models.F("priority_flag"), descending=True, nulls_last=True
),
condition=models.Q(("status", "new")),
name="mt_message_status_idx",
),
),
]
14 changes: 13 additions & 1 deletion src/smpp_gateway/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,24 @@ class Status(models.TextChoices):
DELIVERED = "delivered", _("Delivered")
ERROR = "error", _("Error")

class PriorityFlag(models.IntegerChoices):
# Based on the priority_flag values in the SMPP Spec
# https://smpp.org/SMPP_v3_4_Issue1_2.pdf
LEVEL_0 = 0, _("Level 0 (lowest) priority")
LEVEL_1 = 1, _("Level 1 priority")
LEVEL_2 = 2, _("Level 2 priority")
LEVEL_3 = 3, _("Level 3 (highest) priority")

backend = models.ForeignKey(
Backend, on_delete=models.PROTECT, verbose_name=_("backend")
)
# SMPP client will decide how to encode it
short_message = models.TextField(_("short message"))
params = models.JSONField(_("params"))
status = models.CharField(_("status"), max_length=32, choices=Status.choices)
priority_flag = models.IntegerField(
_("priority flag"), choices=PriorityFlag.choices, null=True
)

def save(self, *args, **kwargs):
super().save(*args, **kwargs)
Expand All @@ -106,7 +117,8 @@ class Meta:
indexes = (
models.Index(
# Allow for quick filtering of messages that need to be processed
fields=["status"],
"status",
models.F("priority_flag").desc(nulls_last=True),
name="mt_message_status_idx",
condition=models.Q(status="new"), # No way to access Status.NEW here?
),
Expand Down
1 change: 1 addition & 0 deletions src/smpp_gateway/outgoing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def prepare_request(self, id_, text, identities, context):
"short_message": text,
"params": params,
"status": MTMessage.Status.NEW,
"priority_flag": context.get("priority_flag"),
}

def send(self, id_, text, identities, context=None):
Expand Down
8 changes: 5 additions & 3 deletions src/smpp_gateway/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import psycopg2.extensions

from django.db import connection, transaction
from django.db.models import QuerySet
from django.db.models import F, QuerySet
from rapidsms.models import Backend

from smpp_gateway.models import MOMessage, MTMessage
Expand Down Expand Up @@ -40,13 +40,15 @@ def pg_notify(channel: str):

def get_mt_messages_to_send(limit: int, backend: Backend) -> list[dict[str, Any]]:
"""Fetches up to `limit` messages intended for `backend`, updates their
status to SENDING, and returns select fields from the model.
status to SENDING, and returns select fields from the model. The messages
are sorted by descending `priority_flag`.
"""
with transaction.atomic():
smses = list(
MTMessage.objects.filter(status=MTMessage.Status.NEW, backend=backend)
.select_for_update(skip_locked=True)
.values("id", "short_message", "params")[:limit]
.order_by(F("priority_flag").desc(nulls_last=True))
.values("id", "short_message", "params", "priority_flag")[:limit]
)
if smses:
pks = [sms["id"] for sms in smses]
Expand Down
42 changes: 42 additions & 0 deletions src/smpp_gateway/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from rapidsms.messages.incoming import IncomingMessage
from rapidsms.messages.outgoing import OutgoingMessage
from rapidsms.router.blocking import BlockingRouter

from smpp_gateway.models import MTMessage


class PriorityIncomingMessage(IncomingMessage):
default_priority_flag = MTMessage.PriorityFlag.LEVEL_2

def respond(self, text, **kwargs):
fields = kwargs.get("fields", {})
if "priority_flag" not in fields:
fields["priority_flag"] = self.default_priority_flag.value
kwargs["fields"] = fields
return super().respond(text, **kwargs)


class PriorityOutgoingMessage(OutgoingMessage):
default_priority_flag = MTMessage.PriorityFlag.LEVEL_1

def extra_backend_context(self):
context = super().extra_backend_context()
tobiasmcnulty marked this conversation as resolved.
Show resolved Hide resolved
context["priority_flag"] = self.fields.get(
"priority_flag", self.default_priority_flag.value
)
return context


class PriorityBlockingRouter(BlockingRouter):
tobiasmcnulty marked this conversation as resolved.
Show resolved Hide resolved
incoming_message_class = PriorityIncomingMessage
outgoing_message_class = PriorityOutgoingMessage

def new_incoming_message(self, text, connections, class_=None, **kwargs):
if class_ is None:
class_ = self.incoming_message_class
return super().new_incoming_message(text, connections, class_=class_, **kwargs)

def new_outgoing_message(self, text, connections, class_=None, **kwargs):
if class_ is None:
class_ = self.outgoing_message_class
return super().new_outgoing_message(text, connections, class_=class_, **kwargs)
3 changes: 3 additions & 0 deletions src/smpp_gateway/smpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def get_smpplib_client(
notify_mo_channel: str,
backend: Backend,
submit_sm_params: dict,
set_priority_flag: bool,
mt_messages_per_second: int,
hc_check_uuid: str,
hc_ping_key: str,
Expand All @@ -35,6 +36,7 @@ def get_smpplib_client(
backend,
hc_worker,
submit_sm_params,
set_priority_flag,
mt_messages_per_second,
host,
port,
Expand Down Expand Up @@ -69,6 +71,7 @@ def start_smpp_client(options):
options["notify_mo_channel"],
backend,
json.loads(options["submit_sm_params"]),
options["set_priority_flag"],
options["mt_messages_per_second"],
options["hc_check_uuid"],
options["hc_ping_key"],
Expand Down
10 changes: 9 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.utils.timezone import now
from factory.django import DjangoModelFactory
from faker import Faker
from rapidsms.models import Backend
from rapidsms.models import Backend, Connection

from smpp_gateway.models import MOMessage, MTMessage, MTMessageStatus

Expand Down Expand Up @@ -59,3 +59,11 @@ class Meta:
command_status = smpplib.consts.SMPP_ESME_ROK
message_id = ""
delivery_report = b""


class ConnectionFactory(DjangoModelFactory):
class Meta:
model = Connection

backend = factory.SubFactory(BackendFactory)
identity = factory.Faker("word")
Loading
Loading