Skip to content

Commit

Permalink
Added functionality to skip sending messages based on probability (#8)
Browse files Browse the repository at this point in the history
* NEW: Probability based message failure added. Off by default
  • Loading branch information
lewis-chambers authored Sep 3, 2024
1 parent 9931e44 commit a4727b0
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 5 deletions.
43 changes: 42 additions & 1 deletion src/iotswarm/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ def mqtt_topic(self, value):
self._mqtt_topic = value
self.mqtt_base_topic = value

_no_send_probability = 0

@property
def no_send_probability(self) -> int:
"""Defines the chance of data not being sent, can be 0 - 100"""
return self._no_send_probability

@no_send_probability.setter
def no_send_probability(self, probability: int) -> None:
"""Setter for the no_send_probability attribute"""

if not isinstance(probability, int):
probability = round(probability)

if probability < 0 or probability > 100:
raise ValueError(f"`probability` must be between 0 - 100 inclusive, not '{probability}'")

self._no_send_probability = probability

def __eq__(self, obj) -> bool:

base_equality = (
Expand All @@ -97,6 +116,7 @@ def __eq__(self, obj) -> bool:
and self._instance_logger == obj._instance_logger
and self.data_source == obj.data_source
and self.connection == obj.connection
and self.no_send_probability == obj.no_send_probability
)

table_equality = True
Expand All @@ -123,6 +143,7 @@ def __init__(
mqtt_topic: str | None = None,
mqtt_prefix: str | None = None,
mqtt_suffix: str | None = None,
no_send_probability: int = 0
) -> None:
"""Initializer
Expand All @@ -137,6 +158,7 @@ def __init__(
table: A valid table from the database
mqtt_prefix: Prefixes the MQTT topic if MQTT messaging used.
mqtt_suffix: Suffixes the MQTT topic if MQTT messaging used.
no_send_probability: Defines the probability of the device not sending a message.
"""

self.device_id = str(device_id)
Expand Down Expand Up @@ -206,6 +228,8 @@ def __init__(
f"{self.__class__.__name__}-{self.device_id}"
)

self.no_send_probability = no_send_probability

self._instance_logger.info(f"Initialised Site: {repr(self)}")

def __repr__(self):
Expand Down Expand Up @@ -249,6 +273,12 @@ def __repr__(self):
if hasattr(self, "mqtt_suffix")
else ""
)

no_send_probability_arg = (
f", no_send_probability={self.no_send_probability}"
if self.no_send_probability != self.__class__._no_send_probability
else ""
)
return (
f"{self.__class__.__name__}("
f'"{self.device_id}"'
Expand All @@ -261,6 +291,7 @@ def __repr__(self):
f"{mqtt_topic_arg}"
f"{mqtt_prefix_arg}"
f"{mqtt_suffix_arg}"
f"{no_send_probability_arg}"
f")"
)

Expand Down Expand Up @@ -300,7 +331,9 @@ async def run(self):

payload = await self._get_payload()

if payload is not None:
if self._skip_send():
self._instance_logger.debug(f"Skipped send based on probability: {self.no_send_probability}")
elif payload is not None:
payload = self._format_payload(payload)

self._instance_logger.debug("Requesting payload submission.")
Expand Down Expand Up @@ -344,6 +377,14 @@ def _format_payload(self, payload):

def _attach_swarm(self, swarm: object):
self.swarm = swarm

def _skip_send(self) -> bool:
"""Checks if the sending should be skipped
Returns: True or false based on the no_send_probability
"""

return random.random() * 100 < self.no_send_probability


class CR1000XDevice(BaseDevice):
Expand Down
10 changes: 10 additions & 0 deletions src/iotswarm/scripts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def mqtt(
mqtt_suffix,
dry,
device_type,
no_send_probability,
):
"""Sends The cosmos data via MQTT protocol using IoT Core.
Data is from the cosmos database TABLE and sent using CLIENT_ID.
Expand Down Expand Up @@ -217,6 +218,7 @@ async def _mqtt():
mqtt_prefix=mqtt_prefix,
mqtt_suffix=mqtt_suffix,
inherit_logger=ctx.obj["logger"],
no_send_probability=no_send_probability
)
for site in sites
]
Expand Down Expand Up @@ -284,6 +286,7 @@ def mqtt(
dry,
device_type,
resume_session,
no_send_probability,
):
"""Sends The cosmos data via MQTT protocol using IoT Core.
Data is collected from the db using QUERY and sent using CLIENT_ID.
Expand Down Expand Up @@ -314,6 +317,8 @@ async def _mqtt_resume_session():
swarm.devices[i].mqtt_prefix = mqtt_prefix
if mqtt_suffix is not None:
swarm.devices[i].mqtt_suffix = mqtt_suffix
if no_send_probability is not None:
swarm.devices[i].no_send_probability = no_send_probability

click.echo("Loaded swarm from pickle")

Expand Down Expand Up @@ -354,6 +359,7 @@ async def _mqtt_clean_session():
mqtt_prefix=mqtt_prefix,
mqtt_suffix=mqtt_suffix,
inherit_logger=ctx.obj["logger"],
no_send_probability=no_send_probability,
)
for site in sites
]
Expand Down Expand Up @@ -440,6 +446,7 @@ def mqtt(
dry,
device_type,
resume_session,
no_send_probability,
):
"""Sends The cosmos data via MQTT protocol using IoT Core.
Data is collected from the db using QUERY and sent using CLIENT_ID.
Expand Down Expand Up @@ -472,6 +479,8 @@ async def _mqtt_resume_session():
swarm.devices[i].mqtt_prefix = mqtt_prefix
if mqtt_suffix is not None:
swarm.devices[i].mqtt_suffix = mqtt_suffix
if no_send_probability is not None:
swarm.devices[i].no_send_probability = no_send_probability

click.echo(swarm.devices[0].cycle)
click.echo("Loaded swarm from pickle")
Expand Down Expand Up @@ -514,6 +523,7 @@ async def _mqtt_clean_session():
mqtt_suffix=mqtt_suffix,
table=table,
inherit_logger=ctx.obj["logger"],
no_send_probability=no_send_probability
)
for site in sites
]
Expand Down
6 changes: 6 additions & 0 deletions src/iotswarm/scripts/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def device_options(function):
"--device-type", type=click.Choice(["basic", "cr1000x"]), default="basic"
)(function)

click.option(
"--no-send-probability",
type=click.IntRange(0, 100),
help="Probability of not sending a message, can be 0 - 100 where 0 is no skip and 100 is always skip",
)(function)

return function


Expand Down
48 changes: 44 additions & 4 deletions tests/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,20 @@ def test_connection_type_check(self, conn):

@parameterized.expand(
[
[0, 0, None, (0, 0, False)],
[100, 30, True, (100, 30, True)],
[12.4, 30001.9, None, (12, 30001, False)],
[0, 0, None, 5, (0, 0, False)],
[100, 30, True, 9, (100, 30, True)],
[12.4, 30001.9, None, 100, (12, 30001, False)],
]
)
def test_optional_args_set(self, max_cycles, sleep_time, delay_start, expected):
def test_optional_args_set(self, max_cycles, sleep_time, delay_start,no_send_probability, expected):
inst = BaseDevice(
"TEST_ID",
self.data_source,
self.connection,
max_cycles=max_cycles,
sleep_time=sleep_time,
delay_start=delay_start,
no_send_probability=no_send_probability
)

self.assertEqual(inst.max_cycles, expected[0])
Expand Down Expand Up @@ -166,6 +167,11 @@ def test_delay_start_value_check(self, delay_start):
{"max_cycles": 4, "sleep_time": 5, "delay_start": True},
'BaseDevice("TEST_ID", MockDB(), MockMessageConnection(), sleep_time=5, max_cycles=4, delay_start=True)',
],
[
MockDB(),
{"max_cycles": 4, "sleep_time": 5, "delay_start": True, "no_send_probability":10},
'BaseDevice("TEST_ID", MockDB(), MockMessageConnection(), sleep_time=5, max_cycles=4, delay_start=True, no_send_probability=10)',
],
]
)
def test__repr__(self, data_source, kwargs, expected):
Expand Down Expand Up @@ -266,6 +272,40 @@ def test__repr__mqtt_opts_mqtt_connection(self, topic, prefix, suffix,expected_a

self.assertEqual(inst.__repr__(), expected)

class TestProbabilitySend(unittest.TestCase):
def setUp(self):
self.data_source = MockDB()
self.connection = MockMessageConnection()

@parameterized.expand([0, 10, 20, 50, 81, 100])
def test_probability_send(self, probability):
device = BaseDevice("ID", self.data_source, self.connection, no_send_probability=probability)

skipped = 0

for i in range(10000):
if device._skip_send():
skipped += 1

self.assertAlmostEqual(skipped/100, probability, delta=1)

def test_probability_zero_if_not_given(self):
device = BaseDevice("ID", self.data_source, self.connection)

self.assertEqual(device.no_send_probability, 0)

@parameterized.expand(["Four", None])
def test_probability_bad_values(self, value):

with self.assertRaises((TypeError, ValueError)):
device = BaseDevice("ID", self.data_source, self.connection, no_send_probability=value)

@parameterized.expand([[2,2], [0,0], [27.34, 27], [99.5, 100]])
def test_probability_set(self, value, expected):
device = BaseDevice("ID", self.data_source, self.connection, no_send_probability=value)

self.assertEqual(device.no_send_probability, expected)


class TestBaseDeviceOracleUsed(unittest.IsolatedAsyncioTestCase):

Expand Down

0 comments on commit a4727b0

Please sign in to comment.