diff --git a/src/iotswarm/devices.py b/src/iotswarm/devices.py index a580e73..c6b1fc6 100644 --- a/src/iotswarm/devices.py +++ b/src/iotswarm/devices.py @@ -85,6 +85,25 @@ def mqtt_topic(self, value): self._mqtt_topic = value self.mqtt_base_topic = value + _no_send_probability = 0 + + @property + def no_send_probability(self) -> int: + """Defines the chance of data not being sent, can be 0 - 100""" + return self._no_send_probability + + @no_send_probability.setter + def no_send_probability(self, probability: int) -> None: + """Setter for the no_send_probability attribute""" + + if not isinstance(probability, int): + probability = round(probability) + + if probability < 0 or probability > 100: + raise ValueError(f"`probability` must be between 0 - 100 inclusive, not '{probability}'") + + self._no_send_probability = probability + def __eq__(self, obj) -> bool: base_equality = ( @@ -97,6 +116,7 @@ def __eq__(self, obj) -> bool: and self._instance_logger == obj._instance_logger and self.data_source == obj.data_source and self.connection == obj.connection + and self.no_send_probability == obj.no_send_probability ) table_equality = True @@ -123,6 +143,7 @@ def __init__( mqtt_topic: str | None = None, mqtt_prefix: str | None = None, mqtt_suffix: str | None = None, + no_send_probability: int = 0 ) -> None: """Initializer @@ -137,6 +158,7 @@ def __init__( table: A valid table from the database mqtt_prefix: Prefixes the MQTT topic if MQTT messaging used. mqtt_suffix: Suffixes the MQTT topic if MQTT messaging used. + no_send_probability: Defines the probability of the device not sending a message. """ self.device_id = str(device_id) @@ -206,6 +228,8 @@ def __init__( f"{self.__class__.__name__}-{self.device_id}" ) + self.no_send_probability = no_send_probability + self._instance_logger.info(f"Initialised Site: {repr(self)}") def __repr__(self): @@ -249,6 +273,12 @@ def __repr__(self): if hasattr(self, "mqtt_suffix") else "" ) + + no_send_probability_arg = ( + f", no_send_probability={self.no_send_probability}" + if self.no_send_probability != self.__class__._no_send_probability + else "" + ) return ( f"{self.__class__.__name__}(" f'"{self.device_id}"' @@ -261,6 +291,7 @@ def __repr__(self): f"{mqtt_topic_arg}" f"{mqtt_prefix_arg}" f"{mqtt_suffix_arg}" + f"{no_send_probability_arg}" f")" ) @@ -300,7 +331,9 @@ async def run(self): payload = await self._get_payload() - if payload is not None: + if self._skip_send(): + self._instance_logger.debug(f"Skipped send based on probability: {self.no_send_probability}") + elif payload is not None: payload = self._format_payload(payload) self._instance_logger.debug("Requesting payload submission.") @@ -344,6 +377,14 @@ def _format_payload(self, payload): def _attach_swarm(self, swarm: object): self.swarm = swarm + + def _skip_send(self) -> bool: + """Checks if the sending should be skipped + + Returns: True or false based on the no_send_probability + """ + + return random.random() * 100 < self.no_send_probability class CR1000XDevice(BaseDevice): diff --git a/src/iotswarm/scripts/cli.py b/src/iotswarm/scripts/cli.py index 9cd2009..87c507c 100644 --- a/src/iotswarm/scripts/cli.py +++ b/src/iotswarm/scripts/cli.py @@ -169,6 +169,7 @@ def mqtt( mqtt_suffix, dry, device_type, + no_send_probability, ): """Sends The cosmos data via MQTT protocol using IoT Core. Data is from the cosmos database TABLE and sent using CLIENT_ID. @@ -217,6 +218,7 @@ async def _mqtt(): mqtt_prefix=mqtt_prefix, mqtt_suffix=mqtt_suffix, inherit_logger=ctx.obj["logger"], + no_send_probability=no_send_probability ) for site in sites ] @@ -284,6 +286,7 @@ def mqtt( dry, device_type, resume_session, + no_send_probability, ): """Sends The cosmos data via MQTT protocol using IoT Core. Data is collected from the db using QUERY and sent using CLIENT_ID. @@ -314,6 +317,8 @@ async def _mqtt_resume_session(): swarm.devices[i].mqtt_prefix = mqtt_prefix if mqtt_suffix is not None: swarm.devices[i].mqtt_suffix = mqtt_suffix + if no_send_probability is not None: + swarm.devices[i].no_send_probability = no_send_probability click.echo("Loaded swarm from pickle") @@ -354,6 +359,7 @@ async def _mqtt_clean_session(): mqtt_prefix=mqtt_prefix, mqtt_suffix=mqtt_suffix, inherit_logger=ctx.obj["logger"], + no_send_probability=no_send_probability, ) for site in sites ] @@ -440,6 +446,7 @@ def mqtt( dry, device_type, resume_session, + no_send_probability, ): """Sends The cosmos data via MQTT protocol using IoT Core. Data is collected from the db using QUERY and sent using CLIENT_ID. @@ -472,6 +479,8 @@ async def _mqtt_resume_session(): swarm.devices[i].mqtt_prefix = mqtt_prefix if mqtt_suffix is not None: swarm.devices[i].mqtt_suffix = mqtt_suffix + if no_send_probability is not None: + swarm.devices[i].no_send_probability = no_send_probability click.echo(swarm.devices[0].cycle) click.echo("Loaded swarm from pickle") @@ -514,6 +523,7 @@ async def _mqtt_clean_session(): mqtt_suffix=mqtt_suffix, table=table, inherit_logger=ctx.obj["logger"], + no_send_probability=no_send_probability ) for site in sites ] diff --git a/src/iotswarm/scripts/common.py b/src/iotswarm/scripts/common.py index 1b8b1e0..408279f 100644 --- a/src/iotswarm/scripts/common.py +++ b/src/iotswarm/scripts/common.py @@ -39,6 +39,12 @@ def device_options(function): "--device-type", type=click.Choice(["basic", "cr1000x"]), default="basic" )(function) + click.option( + "--no-send-probability", + type=click.IntRange(0, 100), + help="Probability of not sending a message, can be 0 - 100 where 0 is no skip and 100 is always skip", + )(function) + return function diff --git a/tests/test_devices.py b/tests/test_devices.py index 26c6508..7104560 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -91,12 +91,12 @@ def test_connection_type_check(self, conn): @parameterized.expand( [ - [0, 0, None, (0, 0, False)], - [100, 30, True, (100, 30, True)], - [12.4, 30001.9, None, (12, 30001, False)], + [0, 0, None, 5, (0, 0, False)], + [100, 30, True, 9, (100, 30, True)], + [12.4, 30001.9, None, 100, (12, 30001, False)], ] ) - def test_optional_args_set(self, max_cycles, sleep_time, delay_start, expected): + def test_optional_args_set(self, max_cycles, sleep_time, delay_start,no_send_probability, expected): inst = BaseDevice( "TEST_ID", self.data_source, @@ -104,6 +104,7 @@ def test_optional_args_set(self, max_cycles, sleep_time, delay_start, expected): max_cycles=max_cycles, sleep_time=sleep_time, delay_start=delay_start, + no_send_probability=no_send_probability ) self.assertEqual(inst.max_cycles, expected[0]) @@ -166,6 +167,11 @@ def test_delay_start_value_check(self, delay_start): {"max_cycles": 4, "sleep_time": 5, "delay_start": True}, 'BaseDevice("TEST_ID", MockDB(), MockMessageConnection(), sleep_time=5, max_cycles=4, delay_start=True)', ], + [ + MockDB(), + {"max_cycles": 4, "sleep_time": 5, "delay_start": True, "no_send_probability":10}, + 'BaseDevice("TEST_ID", MockDB(), MockMessageConnection(), sleep_time=5, max_cycles=4, delay_start=True, no_send_probability=10)', + ], ] ) def test__repr__(self, data_source, kwargs, expected): @@ -266,6 +272,40 @@ def test__repr__mqtt_opts_mqtt_connection(self, topic, prefix, suffix,expected_a self.assertEqual(inst.__repr__(), expected) +class TestProbabilitySend(unittest.TestCase): + def setUp(self): + self.data_source = MockDB() + self.connection = MockMessageConnection() + + @parameterized.expand([0, 10, 20, 50, 81, 100]) + def test_probability_send(self, probability): + device = BaseDevice("ID", self.data_source, self.connection, no_send_probability=probability) + + skipped = 0 + + for i in range(10000): + if device._skip_send(): + skipped += 1 + + self.assertAlmostEqual(skipped/100, probability, delta=1) + + def test_probability_zero_if_not_given(self): + device = BaseDevice("ID", self.data_source, self.connection) + + self.assertEqual(device.no_send_probability, 0) + + @parameterized.expand(["Four", None]) + def test_probability_bad_values(self, value): + + with self.assertRaises((TypeError, ValueError)): + device = BaseDevice("ID", self.data_source, self.connection, no_send_probability=value) + + @parameterized.expand([[2,2], [0,0], [27.34, 27], [99.5, 100]]) + def test_probability_set(self, value, expected): + device = BaseDevice("ID", self.data_source, self.connection, no_send_probability=value) + + self.assertEqual(device.no_send_probability, expected) + class TestBaseDeviceOracleUsed(unittest.IsolatedAsyncioTestCase):