Skip to content

Commit

Permalink
Automatically reconnect MQTT broker #1384
Browse files Browse the repository at this point in the history
  • Loading branch information
dennissiemensma committed May 11, 2021
1 parent c8b0d70 commit f89b199
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 32 deletions.
6 changes: 5 additions & 1 deletion docs/reference/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ Current version
v4.16.1 - 2021-05-11
--------------------

.. note::

There was a bug in the previous ``v4.16.0`` release when using MQTT with QoS level 0 (the former default). This should be fixed in the this new release.

- ``Fixed`` MQTT client keeps reconnecting when using QoS level 0 [`#1383 <https://github.com/dsmrreader/dsmr-reader/issues/1383>`_]
- ``Fixed`` Automatically reconnect MQTT broker [`#1384 <https://github.com/dsmrreader/dsmr-reader/issues/1384>`_]


v4.16.0 - 2021-05-10
Expand All @@ -33,7 +38,6 @@ v4.16.0 - 2021-05-10

Previous DSMR-reader versions (or when using QoS level 0) do **not** guarantee this and defaulted to (QoS) level 0, causing you to *possibly* lose MQTT updates when the connection is unstable.


- ``Added`` New ``DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE`` env var for MQTT max queue size [`#1375 <https://github.com/dsmrreader/dsmr-reader/issues/1375>`_]
- ``Added`` New ``DSMRREADER_MQTT_MAX_CACHE_TIMEOUT`` env var for MQTT cache duration [`#1096 <https://github.com/dsmrreader/dsmr-reader/issues/1096>`_]

Expand Down
39 changes: 22 additions & 17 deletions dsmr_mqtt/services/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,43 @@ def run(mqtt_client):
if not message_queue:
return

logger.info('MQTT: Processing %d message(s)', len(message_queue))
logger.info('MQTT: Processing %d message(s) using QoS level %d', len(message_queue), broker_settings.qos)

for current in message_queue:
logger.debug('MQTT: Publishing queued message (#%s) for %s: %s', current.pk, current.topic, current.payload)
result = mqtt_client.publish(
message_info = mqtt_client.publish(
topic=current.topic,
payload=current.payload,
qos=broker_settings.qos,
retain=True
)

# Do NOT remove this. It is both required for networking when having QoS > 1 and mqtt_client.is_connected()
# below. Omitting this loop will have the client think it's disconnected.
mqtt_client.loop(0.5)

# Does nothing when using QoS 0 (as designed). For QoS 1 and 2 however, this blocks further processing and
# message deletion below, until the broker acknowledges the message received.
logger.debug('MQTT: Waiting for message (#%s) to be marked published', current.pk)
while not result.is_published():
mqtt_client.loop(0.1)
# Make sure to call this, since message_info.is_published() will ALWAYS be True when using QoS level 0!
loop_result = mqtt_client.loop(settings.DSMRREADER_CLIENT_TIMEOUT)

# Detect any networking errors early.
if loop_result != paho.MQTT_ERR_SUCCESS:
signal_reconnect()
raise RuntimeError('MQTT: Client loop() failed, requesting restart...')
print('message_info.is_published()', message_info.is_published())

# Always True when using QoS 0 (as designed). For QoS 1 and 2 however, this BLOCKS further processing and
# message deletion below, until the broker acknowledges the message was received.
# Networking errors should terminate this loop as well, along with a request for restart.
while not message_info.is_published():
logger.debug('MQTT: Waiting for message (#%s) to be marked published by broker', current.pk)
loop_result = mqtt_client.loop(settings.DSMRREADER_CLIENT_TIMEOUT)

# Prevents infinite loop on connection errors.
if loop_result != paho.MQTT_ERR_SUCCESS:
signal_reconnect()
raise RuntimeError('MQTT: Client loop() failed, requesting restart to prevent waiting forever...')

logger.debug('MQTT: Deleting published message (#%s) from queue', current.pk)
current.delete()

# Delete any overflow in messages.
queue.Message.objects.all().delete()

# We cannot raise any exception in callbacks, this is our check point. This MUST be called AFTER the first loop().
if not mqtt_client.is_connected():
signal_reconnect()
raise RuntimeError('MQTT: Client no longer connected')


def signal_reconnect():
backend_restart_required.send_robust(None)
Expand Down
6 changes: 5 additions & 1 deletion dsmr_mqtt/services/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ def queue_message(topic, payload):
"""

if queue.Message.objects.all().count() >= settings.DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE:
return logger.warning('MQTT: Rejecting message for topic due to maximum queue size: %s', topic)
return logger.warning(
'MQTT: Rejecting message for topic due to maximum queue size (%d): %s',
settings.DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE,
topic
)

cache_storage = caches['mqtt']
cache_key = topic
Expand Down
41 changes: 28 additions & 13 deletions dsmr_mqtt/tests/services/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,19 @@ def test_on_connect(self):
dsmr_mqtt.services.broker.on_connect(client, None, None, rc=-1)

@mock.patch('paho.mqtt.client.Client.loop')
@mock.patch('paho.mqtt.client.Client.is_connected')
@mock.patch('paho.mqtt.client.Client.publish')
def test_run_no_data(self, publish_mock, is_connected_mock, loop_mock):
is_connected_mock.return_value = True
def test_run_no_data(self, publish_mock, loop_mock):
loop_mock.return_value = paho.MQTT_ERR_SUCCESS
client = paho.Client('xxx')

self.assertFalse(loop_mock.called)
dsmr_mqtt.services.broker.run(mqtt_client=client)
self.assertFalse(loop_mock.called)

@mock.patch('paho.mqtt.client.Client.loop')
@mock.patch('paho.mqtt.client.Client.is_connected')
@mock.patch('paho.mqtt.client.Client.publish')
def test_run(self, publish_mock, is_connected_mock, loop_mock):
is_connected_mock.return_value = True
def test_run(self, publish_mock, loop_mock):
loop_mock.return_value = paho.MQTT_ERR_SUCCESS
client = paho.Client('xxx')

msginfo_mock = mock.MagicMock()
Expand All @@ -150,11 +148,11 @@ def test_run(self, publish_mock, is_connected_mock, loop_mock):
self.assertTrue(publish_mock.called)

@mock.patch('paho.mqtt.client.Client.loop')
@mock.patch('paho.mqtt.client.Client.is_connected')
@mock.patch('paho.mqtt.client.Client.publish')
def test_run_cleanup(self, publish_mock, is_connected_mock, *mocks):
def test_run_cleanup(self, publish_mock, loop_mock):
""" Test whether any excess of messages is cleared. """
is_connected_mock.return_value = True
loop_mock.return_value = paho.MQTT_ERR_SUCCESS

client = paho.Client('xxx')
MAX = settings.DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE

Expand All @@ -173,12 +171,29 @@ def test_run_cleanup(self, publish_mock, is_connected_mock, *mocks):

self.assertFalse(queue.Message.objects.exists())

@mock.patch('paho.mqtt.client.Client.publish')
@mock.patch('paho.mqtt.client.Client.loop')
def test_run_disconnected(self, loop_mock, *mocks):
""" Check whether we exit the command when we're disconnected at some point. For all QoS levels. """
loop_mock.return_value = paho.MQTT_ERR_CONN_LOST # Connection failure.

client = paho.Client('xxx')
Message.objects.create(topic='x', payload='y')

with self.assertRaises(RuntimeError):
dsmr_mqtt.services.broker.run(mqtt_client=client)

@mock.patch('paho.mqtt.client.Client.publish')
@mock.patch('paho.mqtt.client.Client.is_connected')
def test_run_disconnected(self, is_connected_mock, *mocks):
""" Check whether we exit the command when we're disconnected at some point. """
is_connected_mock.return_value = False # Connection failure.
@mock.patch('paho.mqtt.client.Client.loop')
def test_run_disconnected_qos1_or_qos2(self, loop_mock, publish_mock):
""" Similar to test_run_disconnected(), but testing the internal loop() check for QoS 1/2. """
# Ok first, then fail. Additional OK, for coverage.
loop_mock.side_effect = [paho.MQTT_ERR_SUCCESS, paho.MQTT_ERR_SUCCESS, paho.MQTT_ERR_CONN_LOST]

message_info_mock = mock.MagicMock()
message_info_mock.is_published.return_value = False
publish_mock.return_value = message_info_mock

client = paho.Client('xxx')
Message.objects.create(topic='x', payload='y')

Expand Down

0 comments on commit f89b199

Please sign in to comment.