diff --git a/connectivity/src/drivers/sds011.py b/connectivity/src/drivers/sds011.py index 22f3352..b1dbf90 100644 --- a/connectivity/src/drivers/sds011.py +++ b/connectivity/src/drivers/sds011.py @@ -7,8 +7,8 @@ import serial -from ...constants import MOBILE_GPS -from ..sensors import SensorSDS011 +from connectivity.constants import MOBILE_GPS +from connectivity.src.sensors import SensorSDS011 def sds011_codec(data: bytes, pk: str) -> dict: diff --git a/connectivity/src/feeders/datalog_feeder.py b/connectivity/src/feeders/datalog_feeder.py index 482aa4e..80ae425 100644 --- a/connectivity/src/feeders/datalog_feeder.py +++ b/connectivity/src/feeders/datalog_feeder.py @@ -3,28 +3,28 @@ Every `dump_interval` (from the config) the buffer writes to the file which pins to IPFS. IPFS hash of the file sends to Robonomics Datalog. """ -from crustinterface import Mainnet import json import logging.config import os import threading import time import typing as tp -from tempfile import NamedTemporaryFile -import ipfshttpclient2 -import requests -from pinatapy import PinataPy from prometheus_client import Enum from robonomicsinterface import RWS, Account, Datalog from connectivity.config.logging import LOGGING_CONFIG from connectivity.utils.datalog_db import DatalogDB from connectivity.utils.ipfs_db import IPFSDB -from ...constants import MOBILE_GPS +from connectivity.utils.datalog_payload import ( + create_payload, + sort_payload, + create_tmp_file, +) +from connectivity.constants import PING_MODEL -from ...constants import PING_MODEL from .ifeeder import IFeeder +from .pinning_services import PinningManager logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger("sensors-connectivity") @@ -45,123 +45,6 @@ ) -def _sort_payload(data: dict) -> dict: - """Sort measurements dict with timestamp. - - :param data: Measurements dict. - :return: Sorted measurements dict. - """ - - ordered = {} - for k, v in data.items(): - meas = sorted(v["measurements"], key=lambda x: x["timestamp"]) - if v.get("geo"): - ordered[k] = {"model": v["model"], "geo": v["geo"], "donated_by": v["donated_by"], "measurements": meas} - else: - ordered[k] = {"model": v["model"], "donated_by": v["donated_by"], "measurements": meas} - return ordered - - -def _get_multihash(buf: set, db: object, endpoint: str = "/ip4/127.0.0.1/tcp/5001/http") -> tuple: - """Write sorted measurements to the temp file, add file to IPFS and add - measurements and hash in the database with 'not sent' status. - - :param buf: Set of measurements from all sensors. - :param db: Database class object. - :param endpoint: Endpoint for IPFS node. Default is local. - :return: IPFS hash of the file and path to the temp file. - """ - - payload = {} - for m in buf: - try: - if m.public in payload: - payload[m.public]["measurements"].append(m.measurement) - else: - if m.model == MOBILE_GPS: - payload[m.public] = { - "model": m.model, - "donated_by": m.donated_by, - "measurements": [m.measurement], - } - else: - payload[m.public] = { - "model": m.model, - "geo": "{},{}".format(m.geo_lat, m.geo_lon), - "donated_by": m.donated_by, - "measurements": [m.measurement], - } - except Exception as e: - logger.warning(f"Datalog Feeder: Couldn't store data: {e}") - - - logger.debug(f"Payload before sorting: {payload}") - payload = _sort_payload(payload) - logger.debug(f"Payload sorted: {payload}") - try: - temp = NamedTemporaryFile(mode="w", delete=False) - logger.debug(f"Created temp file: {temp.name}") - temp.write(json.dumps(payload)) - temp.close() - DATALOG_MEMORY_METRIC.state("success") - except Exception as e: - DATALOG_MEMORY_METRIC.state("error") - - with ipfshttpclient2.connect(endpoint) as client: - response = client.add(temp.name) - db.add_data("not sent", response["Hash"], time.time(), json.dumps(payload)) - return (response["Hash"], temp.name, response["Size"]) - - -def _pin_to_pinata(file_path: str, config: dict) -> None: - """Pin file to Pinata for for better accessibility. - Need to provide pinata credentials in the config file. - - :param file_path: Path to the temp file. - :param config: Configuration dictionary. - """ - - pinata_api = config["datalog"]["pinata_api"] - pinata_secret = config["datalog"]["pinata_secret"] - if pinata_secret: - try: - logger.info("DatalogFeeder: Pinning file to Pinata") - pinata = PinataPy(pinata_api, pinata_secret) - pinata.pin_file_to_ipfs(path_to_file=file_path, save_absolute_paths=False) - hash = pinata.pin_list()["rows"][0]["ipfs_pin_hash"] - logger.info(f"DatalogFeeder: File sent to pinata. Hash is {hash}") - except Exception as e: - logger.warning(f"DatalogFeeder: Failed while pining file to Pinata. Error: {e}") - - -def _upload_to_crust(hash: str, file_size: int, seed: str) -> None: - mainnet = Mainnet(seed=seed) - try: - # Check balance - balance = mainnet.get_balance() - logger.debug(f"DatalogFeeder: Actual balance in crust network - {balance}") - - # Check price in Main net. Price in pCRUs - price = mainnet.get_appx_store_price(file_size) - logger.debug(f"DatalogFeeder: Approximate cost to store the file - {price}") - - except Exception as e: - logger.warning(f"DatalogFeeder: Error while getting account balance - {e}") - return None - - if price >= balance: - logger.warning(f"DatalogFeeder: Not enough account balance to store the file in Crust Network") - return None - - try: - logger.info(f"DatalogFeeder: Start adding {hash} to crust with size {file_size}") - file_stored = mainnet.store_file(hash, file_size) - logger.info(f"DatalogFeeder: File stored in Crust. Extrinsic data is {file_stored}") - except Exception as e: - logger.warning(f"error while uploading file to crust - {e}") - return None - - class DatalogFeeder(IFeeder): """ The feeder is responsible for collecting measurements and @@ -180,15 +63,13 @@ def __init__(self, config) -> None: self.last_time: float = time.time() self.buffer: set = set() self.interval: int = self.config["datalog"]["dump_interval"] - self.ipfs_endpoint: str = ( - config["robonomics"]["ipfs_provider"] - if config["robonomics"]["ipfs_provider"] - else "/ip4/127.0.0.1/tcp/5001/http" + self.datalog_db: DatalogDB = DatalogDB( + self.config["general"]["datalog_db_path"] ) - self.datalog_db: DatalogDB = DatalogDB(self.config["general"]["datalog_db_path"]) self.ipfs_db: IPFSDB = IPFSDB(self.config["general"]["ipfs_db_path"]) self.datalog_db.create_table() self.ipfs_db.create_table() + self.pinning_manager = PinningManager(self.config) def feed(self, data: tp.List[dict]) -> None: """Main function of the feeder and it is called in `main.py`. It collects @@ -199,7 +80,7 @@ def feed(self, data: tp.List[dict]) -> None: """ if self.config["datalog"]["enable"]: if data: - for d in data: + for d in data: if d.public and d.model != PING_MODEL: logger.debug(f"DatalogFeeder: Adding data to buffer: {d}") self.buffer.add(d) @@ -207,46 +88,33 @@ def feed(self, data: tp.List[dict]) -> None: if (time.time() - self.last_time) >= self.interval: if self.buffer: self.last_time = time.time() - logger.debug("Datalog Feeder: About to publish collected data...") + logger.debug( + "Datalog Feeder: About to publish collected data..." + ) logger.debug(f"Datalog Feeder: Buffer is {self.buffer}") - ipfs_hash, file_path, file_size = _get_multihash(self.buffer, self.datalog_db, self.ipfs_endpoint) - self.ipfs_db.add_hash(ipfs_hash) - self._pin_to_temporal(file_path) - _pin_to_pinata(file_path, self.config) + payload = create_payload(self.buffer) + sorted_payload = sort_payload(payload) + try: + tmp_file_path = create_tmp_file(sorted_payload) + DATALOG_MEMORY_METRIC.state("success") + except Exception as e: + DATALOG_MEMORY_METRIC.state("error") + logger.warning( + f"Datalog Feeder: couldn't create tmp file: {e}" + ) + + ipfs_hash = self.pinning_manager.pin_to_gateways(tmp_file_path) + self.datalog_db.add_data( + "not sent", ipfs_hash, time.time(), json.dumps(payload) + ) self.buffer = set() - _upload_to_crust(ipfs_hash, int(file_size), self.config["datalog"]["suri"]) - os.unlink(file_path) + os.unlink(tmp_file_path) self.to_datalog(ipfs_hash) else: logger.info("Datalog Feeder:Nothing to publish") else: logger.info("Datalog Feeder: Still collecting measurements...") - def _pin_to_temporal(self, file_path: str) -> None: - """Pin file to Temporal Cloud for for better accessibility. - Need to provide corresponding credentials in the config file. - - :param file_path: Path to the temp file. - """ - - username = self.config["datalog"]["temporal_username"] - password = self.config["datalog"]["temporal_password"] - if username and password: - auth_url = "https://api.temporal.cloud/v2/auth/login" - token_resp = requests.post(auth_url, json={"username": username, "password": password}) - token = token_resp.json() - - url_add = "https://api.temporal.cloud/v2/ipfs/public/file/add" - headers = {"Authorization": f"Bearer {token['token']}"} - resp = requests.post( - url_add, - files={"file": open(file_path), "hold_time": (None, 1)}, - headers=headers, - ) - - if resp.status_code == 200: - logger.info("Datalog Feeder: File pinned to Temporal Cloud") - def to_datalog(self, ipfs_hash: str) -> None: """Send IPFS hash to Robonomics Datalog. It uses seed pharse from the config file. It can be sent either with RWS or general Datalog. To use RWS the account of the provided seed @@ -275,5 +143,7 @@ def to_datalog(self, ipfs_hash: str) -> None: DATALOG_STATUS_METRIC.state("success") self.datalog_db.update_status("sent", ipfs_hash) except Exception as e: - logger.warning(f"Datalog Feeder: Something went wrong during extrinsic submission to Robonomics: {e}") + logger.warning( + f"Datalog Feeder: Something went wrong during extrinsic submission to Robonomics: {e}" + ) DATALOG_STATUS_METRIC.state("error") diff --git a/connectivity/src/feeders/pinning_services/__init__.py b/connectivity/src/feeders/pinning_services/__init__.py new file mode 100644 index 0000000..9313984 --- /dev/null +++ b/connectivity/src/feeders/pinning_services/__init__.py @@ -0,0 +1 @@ +from .pinning_manager import PinningManager \ No newline at end of file diff --git a/connectivity/src/feeders/pinning_services/gateways/__init__.py b/connectivity/src/feeders/pinning_services/gateways/__init__.py new file mode 100644 index 0000000..6ab3983 --- /dev/null +++ b/connectivity/src/feeders/pinning_services/gateways/__init__.py @@ -0,0 +1,5 @@ +from .crust import CrustGateway +from .local import LocalGateway +from .pinata import PinataGateway +from .temporal import TemporalGateway +from .pinning_gateway import PinArgs \ No newline at end of file diff --git a/connectivity/src/feeders/pinning_services/gateways/crust.py b/connectivity/src/feeders/pinning_services/gateways/crust.py new file mode 100644 index 0000000..60b5bda --- /dev/null +++ b/connectivity/src/feeders/pinning_services/gateways/crust.py @@ -0,0 +1,61 @@ +import typing as tp +import logging.config +from crustinterface import Mainnet + +from connectivity.config.logging import LOGGING_CONFIG +from .pinning_gateway import ( + PinningGateway, + PinArgs, +) + +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger("sensors-connectivity") + + +class CrustGateway(PinningGateway): + def __init__(self, seed: str) -> None: + self.mainnet = Mainnet(seed=seed) + + + def pin(self, args: PinArgs) -> None: + file_hash: str = args.hash + file_size: int = args.file_size + if self._can_upload(file_size): + try: + logger.info( + f"CrustGateway: Start adding {file_hash} to crust with size :{file_size}" + ) + file_stored = self.mainnet.store_file(file_hash, file_size) + logger.info( + f"CrustGateway: File stored in Crust. Extrinsic data is: {file_stored}" + ) + except Exception as e: + logger.warning( + f"CrustGateway: error while uploading file to crust: {e}" + ) + return None + else: + logger.warning( + f"CrustGateway: Not enough account balance to store the file in Crust Network" + ) + + def _can_upload(self, file_size: int) -> bool: + """Check whether there is enough tokens on balance""" + balance = self._get_balance() + approximately_price = self._get_store_price(file_size) + return balance >= approximately_price + + def _get_balance(self) -> tp.Optional[int]: + try: + balance = self.mainnet.get_balance() + logger.debug(f"CrustGateway: Actual balance in crust network: {balance}") + return balance + except Exception as e: + logger.warning(f"CrustGateway: Error while getting account balance: {e}") + return None + + def _get_store_price(self, file_size: int) -> tp.Optional[int]: + """Check price in Main net. Price in pCRUs""" + price = self.mainnet.get_appx_store_price(int(file_size)) + logger.debug(f"CrustGateway: Approximate cost to store the file: {price}") + return price diff --git a/connectivity/src/feeders/pinning_services/gateways/local.py b/connectivity/src/feeders/pinning_services/gateways/local.py new file mode 100644 index 0000000..48be2b4 --- /dev/null +++ b/connectivity/src/feeders/pinning_services/gateways/local.py @@ -0,0 +1,31 @@ +import typing as tp +import logging.config +import ipfshttpclient2 + +from connectivity.config.logging import LOGGING_CONFIG +from .pinning_gateway import ( + PinningGateway, + PinArgs, +) + +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger("sensors-connectivity") + + +class LocalGateway(PinningGateway): + def __init__(self, endpoint: str) -> None: + self.ipfs_endpoint = endpoint + + def pin(self, args: PinArgs) -> tp.Optional[tp.Tuple[str, int]]: + file_path: str = args.file_path + try: + with ipfshttpclient2.connect(self.ipfs_endpoint) as client: + response = client.add(file_path) + file_hash = response["Hash"] + file_size = response["Size"] + logger.debug(f"LocalGateway: Hash, size: {file_hash}, {file_size}") + return (file_hash, file_size) + except Exception as e: + logger.warning( + f"LocalGateway: cou;dn't add file or connect to local gateway: {e}" + ) diff --git a/connectivity/src/feeders/pinning_services/gateways/pinata.py b/connectivity/src/feeders/pinning_services/gateways/pinata.py new file mode 100644 index 0000000..507080a --- /dev/null +++ b/connectivity/src/feeders/pinning_services/gateways/pinata.py @@ -0,0 +1,33 @@ +import typing as tp +import logging.config +from pinatapy import PinataPy +from connectivity.config.logging import LOGGING_CONFIG +from .pinning_gateway import ( + PinningGateway, + PinArgs, +) + + +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger("sensors-connectivity") + +class PinataGateway(PinningGateway): + + def __init__(self, api_key: str, secret_key: str) -> None: + self.api_key = api_key + self.secret_key = secret_key + + + def pin(self, args: PinArgs) -> None: + """Pin file to Pinata for for better accessibility. + Need to provide pinata credentials in the config file.""" + file_path: str = args.file_path + try: + logger.info("PinataGateway: Pinning file to Pinata") + pinata = PinataPy(self.api_key, self.secret_key) + result = pinata.pin_file_to_ipfs(path_to_file=file_path, save_absolute_paths=False) + hash = result["IpfsHash"] + logger.info(f"PinataGateway: File sent to pinata. Hash is {hash}") + except Exception as e: + logger.warning(f"PinataGateway: Failed while pining file to Pinata. Error: {e}") + diff --git a/connectivity/src/feeders/pinning_services/gateways/pinning_gateway.py b/connectivity/src/feeders/pinning_services/gateways/pinning_gateway.py new file mode 100644 index 0000000..e4beb48 --- /dev/null +++ b/connectivity/src/feeders/pinning_services/gateways/pinning_gateway.py @@ -0,0 +1,18 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +import typing as tp + + +@dataclass +class PinArgs: + file_path: str + hash: tp.Optional[str] = None + file_size: tp.Optional[int] = None + + +class PinningGateway(ABC): + """Base class for custom pinning services.""" + + @abstractmethod + def pin(self, args: PinArgs) -> tp.Optional[tp.Tuple[str, int]]: + raise NotImplementedError("Gateway must implement pin method") diff --git a/connectivity/src/feeders/pinning_services/gateways/temporal.py b/connectivity/src/feeders/pinning_services/gateways/temporal.py new file mode 100644 index 0000000..72499bc --- /dev/null +++ b/connectivity/src/feeders/pinning_services/gateways/temporal.py @@ -0,0 +1,41 @@ +import typing as tp +import logging.config +import requests + +from connectivity.config.logging import LOGGING_CONFIG +from .pinning_gateway import ( + PinningGateway, + PinArgs, +) + +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger("sensors-connectivity") + + +class TemporalGateway(PinningGateway): + def __init__(self, username: str, password: str) -> None: + self.username = username + self.password = password + + @staticmethod + def pin(self, args: PinArgs) -> None: + """Pin file to Temporal Cloud for for better accessibility. + Need to provide corresponding credentials in the config file.""" + file_path: str = args.file_path + auth_url = "https://api.temporal.cloud/v2/auth/login" + token_resp = requests.post( + auth_url, json={"username": self.username, "password": self.password} + ) + token = token_resp.json() + + url_add = "https://api.temporal.cloud/v2/ipfs/public/file/add" + headers = {"Authorization": f"Bearer {token['token']}"} + resp = requests.post( + url_add, + files={"file": open(file_path), "hold_time": (None, 1)}, + headers=headers, + ) + if resp.status_code == 200: + logger.info("TemporalGateway: File pinned to Temporal Cloud") + else: + logger.warning(f"emporal Cloud: {resp}") diff --git a/connectivity/src/feeders/pinning_services/pinning_manager.py b/connectivity/src/feeders/pinning_services/pinning_manager.py new file mode 100644 index 0000000..78c5d7a --- /dev/null +++ b/connectivity/src/feeders/pinning_services/pinning_manager.py @@ -0,0 +1,44 @@ +import typing as tp + +from .gateways import ( + CrustGateway, + LocalGateway, + PinataGateway, + TemporalGateway, + PinArgs, +) + + +class PinningManager: + def __init__(self, config: dict) -> None: + self.config = config + self.gateways = {} + self._set_gateways() + + def _set_gateways(self) -> None: + ipfs_endpoint: str = ( + self.config["robonomics"]["ipfs_provider"] + if self.config["robonomics"]["ipfs_provider"] + else "/ip4/127.0.0.1/tcp/5001/http" + ) + self.gateways["local"] = LocalGateway(ipfs_endpoint) + self.gateways["crust"] = CrustGateway(self.config["datalog"]["suri"]) + if self.config["datalog"]["pinata_secret"]: + self.gateways["pinata"] = PinataGateway( + self.config["datalog"]["pinata_api"], + self.config["datalog"]["pinata_secret"], + ) + if self.config["datalog"]["temporal_password"]: + self.gateways["temporal"] = TemporalGateway( + self.config["datalog"]["temporal_username"], + self.config["datalog"]["temporal_password"], + ) + + def pin_to_gateways(self, file_path: str) -> tp.Optional[str]: + pinArgs_for_local_gateway = PinArgs(file_path) + file_hash, file_size = self.gateways["local"].pin(pinArgs_for_local_gateway) + pin_args = PinArgs(file_path=file_path, hash=file_hash, file_size=file_size) + for gateway_name, gateway in self.gateways.items(): + if gateway_name != "local": + gateway.pin(pin_args) + return file_hash diff --git a/connectivity/src/feeders/robonomics_feeder.py b/connectivity/src/feeders/robonomics_feeder.py index f0f2524..0913a18 100644 --- a/connectivity/src/feeders/robonomics_feeder.py +++ b/connectivity/src/feeders/robonomics_feeder.py @@ -10,7 +10,9 @@ from connectivity.config.logging import LOGGING_CONFIG -from ...constants import PING_MODEL +from connectivity.constants import PING_MODEL +from connectivity.src.sensors.sensors_types import Device +from connectivity.utils.format_robonomics_feeder_msg import to_pubsub_message, to_ping_message from .ifeeder import IFeeder logging.config.dictConfig(LOGGING_CONFIG) @@ -19,42 +21,6 @@ thlock = threading.RLock() - -def _to_pubsub_message(data: dict) -> str: - """Prepare JSON formatted string with measurements. - - :param data: Dict with the last measurement from one sensor. - :return: JSON formatted string for pubsub. - """ - - message = {} - message[data.public] = { - "model": data.model, - "geo": "{},{}".format(data.geo_lat, data.geo_lon), - "donated_by": data.donated_by, - "measurement": data.measurement, - } - return json.dumps(message) - - -def _to_ping_message(data: dict) -> str: - """Prepare JSON formatted string with base info about sensor. - No measurements. - - :param data: Dict with the base info from one sensor. - :return: JSON formatted string for pubsub. - """ - - message = {} - message[data.public] = { - "model": data.model, - "timestamp": data.measurement.timestamp, - "measurement": {"geo": "{},{}".format(data.geo_lat, data.geo_lon)}, - } - - return json.dumps(message) - - class RobonomicsFeeder(IFeeder): """ Publishes a result or demand message to IPFS pubsub channel @@ -92,7 +58,7 @@ def _publish_to_topic(self, payload): else: self.ipfs_client.pubsub.publish(self.topic, payload) - def feed(self, data: tp.List[dict]) -> None: + def feed(self, data: tp.List[Device]) -> None: """Send data to IPFS pubsub in the topic from config. :param data: Data from the stations. @@ -100,8 +66,8 @@ def feed(self, data: tp.List[dict]) -> None: if self.config["robonomics"]["enable"]: for d in data: if d.public and d.model != PING_MODEL: - pubsub_payload = _to_pubsub_message(d) + pubsub_payload = to_pubsub_message(d) else: - pubsub_payload = _to_ping_message(d) + pubsub_payload = to_ping_message(d) logger.info(f"RobonomicsFeeder: {pubsub_payload}") self._publish_to_topic(pubsub_payload) diff --git a/connectivity/src/sensors/__init__.py b/connectivity/src/sensors/__init__.py index 604e2a8..ebed14a 100644 --- a/connectivity/src/sensors/__init__.py +++ b/connectivity/src/sensors/__init__.py @@ -1,7 +1,2 @@ -# -*- coding: utf-8 -*- - -from .environmental_box import EnvironmentalBox -from .lora_sensors import LoraSensor -from .mobile_lab import MobileLab -from .sensor_sds011 import SensorSDS011 -from .trackagro_sensor import TrackAgro +from .sensors_fabric import SensorsFabcric +from .sensors_types import SensorSDS011, TrackAgro \ No newline at end of file diff --git a/connectivity/src/sensors/sensors_fabric.py b/connectivity/src/sensors/sensors_fabric.py new file mode 100644 index 0000000..d371d67 --- /dev/null +++ b/connectivity/src/sensors/sensors_fabric.py @@ -0,0 +1,16 @@ +from .sensors_types import EnvironmentalBox, LoraSensor, MobileLab, Device + +class SensorsFabcric(): + + @staticmethod + def get_sensor(data: dict) -> Device: + meas = None + if "esp8266id" in data.keys(): + meas = EnvironmentalBox(data) + elif "ID" in data.keys(): + meas = MobileLab(data) + elif "uplink_message" in data.keys(): + if "decoded_payload" in data["uplink_message"]: + id = data["end_device_ids"]["device_id"] + meas = LoraSensor(id=id, data=data["uplink_message"]["decoded_payload"]) + return meas \ No newline at end of file diff --git a/connectivity/src/sensors/sensors_types/__init__.py b/connectivity/src/sensors/sensors_types/__init__.py new file mode 100644 index 0000000..54243c4 --- /dev/null +++ b/connectivity/src/sensors/sensors_types/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- + +from .environmental_box import EnvironmentalBox +from .lora_sensors import LoraSensor +from .mobile_lab import MobileLab +from .sensor_sds011 import SensorSDS011 +from .trackagro_sensor import TrackAgro +from .base import Device diff --git a/connectivity/src/sensors/base.py b/connectivity/src/sensors/sensors_types/base.py similarity index 100% rename from connectivity/src/sensors/base.py rename to connectivity/src/sensors/sensors_types/base.py diff --git a/connectivity/src/sensors/environmental_box.py b/connectivity/src/sensors/sensors_types/environmental_box.py similarity index 97% rename from connectivity/src/sensors/environmental_box.py rename to connectivity/src/sensors/sensors_types/environmental_box.py index 74bbe68..67fedfb 100644 --- a/connectivity/src/sensors/environmental_box.py +++ b/connectivity/src/sensors/sensors_types/environmental_box.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field from functools import reduce -from ...constants import PASKAL2MMHG, SDS011_MODEL +from connectivity.constants import PASKAL2MMHG, SDS011_MODEL from .base import Device diff --git a/connectivity/src/sensors/lora_sensors.py b/connectivity/src/sensors/sensors_types/lora_sensors.py similarity index 93% rename from connectivity/src/sensors/lora_sensors.py rename to connectivity/src/sensors/sensors_types/lora_sensors.py index 38f8316..1b7fc75 100644 --- a/connectivity/src/sensors/lora_sensors.py +++ b/connectivity/src/sensors/sensors_types/lora_sensors.py @@ -1,7 +1,7 @@ import time from dataclasses import dataclass, field -from ...constants import SDS011_MODEL +from connectivity.constants import SDS011_MODEL from .base import Device diff --git a/connectivity/src/sensors/mobile_lab.py b/connectivity/src/sensors/sensors_types/mobile_lab.py similarity index 96% rename from connectivity/src/sensors/mobile_lab.py rename to connectivity/src/sensors/sensors_types/mobile_lab.py index b123dfe..aa94a4d 100644 --- a/connectivity/src/sensors/mobile_lab.py +++ b/connectivity/src/sensors/sensors_types/mobile_lab.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field from functools import reduce -from ...constants import MOBILE_GPS, PASKAL2MMHG +from connectivity.constants import MOBILE_GPS, PASKAL2MMHG from .base import Device diff --git a/connectivity/src/sensors/sensor_sds011.py b/connectivity/src/sensors/sensors_types/sensor_sds011.py similarity index 95% rename from connectivity/src/sensors/sensor_sds011.py rename to connectivity/src/sensors/sensors_types/sensor_sds011.py index afabf56..dea61ca 100644 --- a/connectivity/src/sensors/sensor_sds011.py +++ b/connectivity/src/sensors/sensors_types/sensor_sds011.py @@ -2,7 +2,7 @@ import typing as tp from dataclasses import dataclass, field -from ...constants import PING_MODEL, SDS011_MODEL +from connectivity.constants import PING_MODEL, SDS011_MODEL from .base import Device diff --git a/connectivity/src/sensors/sensor_template.py b/connectivity/src/sensors/sensors_types/sensor_template.py similarity index 100% rename from connectivity/src/sensors/sensor_template.py rename to connectivity/src/sensors/sensors_types/sensor_template.py diff --git a/connectivity/src/sensors/trackagro_sensor.py b/connectivity/src/sensors/sensors_types/trackagro_sensor.py similarity index 98% rename from connectivity/src/sensors/trackagro_sensor.py rename to connectivity/src/sensors/sensors_types/trackagro_sensor.py index e3d1e49..9c8063d 100644 --- a/connectivity/src/sensors/trackagro_sensor.py +++ b/connectivity/src/sensors/sensors_types/trackagro_sensor.py @@ -6,7 +6,7 @@ from connectivity.config.logging import LOGGING_CONFIG -from ...constants import SDS011_MODEL +from connectivity.constants import SDS011_MODEL from .base import Device logging.config.dictConfig(LOGGING_CONFIG) diff --git a/connectivity/src/stations/comstation.py b/connectivity/src/stations/comstation.py index 462cafc..92ee5e0 100644 --- a/connectivity/src/stations/comstation.py +++ b/connectivity/src/stations/comstation.py @@ -8,9 +8,9 @@ from connectivity.config.logging import LOGGING_CONFIG -from ...constants import STATION_VERSION +from connectivity.constants import STATION_VERSION from ..drivers.sds011 import SDS011 -from ..sensors import SensorSDS011 +from connectivity.src.sensors import SensorSDS011 from .istation import IStation logging.config.dictConfig(LOGGING_CONFIG) diff --git a/connectivity/src/stations/httpstation.py b/connectivity/src/stations/httpstation.py index f94669e..ddb5649 100644 --- a/connectivity/src/stations/httpstation.py +++ b/connectivity/src/stations/httpstation.py @@ -12,8 +12,8 @@ from connectivity.config.logging import LOGGING_CONFIG -from ...constants import STATION_VERSION -from ..sensors import EnvironmentalBox, MobileLab +from connectivity.constants import STATION_VERSION +from connectivity.src.sensors import SensorsFabcric from .istation import IStation logging.config.dictConfig(LOGGING_CONFIG) @@ -75,10 +75,9 @@ def do_POST(self) -> None: length = int(self.headers.get("content-length")) d = self.rfile.read(length).decode().replace("SDS_P1", "SDS_pm10").replace("SDS_P2", "SDS_pm25") data = json.loads(d) - if "esp8266id" in data.keys(): - meas = EnvironmentalBox(data) - elif "ID" in data.keys(): - meas = MobileLab(data) + meas = SensorsFabcric.get_sensor(data) + if meas is None: + return with thlock: if meas: sessions[meas.id] = meas diff --git a/connectivity/src/stations/mqttstation.py b/connectivity/src/stations/mqttstation.py index b8b28ab..f0064b4 100644 --- a/connectivity/src/stations/mqttstation.py +++ b/connectivity/src/stations/mqttstation.py @@ -9,8 +9,8 @@ from connectivity.config.logging import LOGGING_CONFIG -from ...constants import STATION_VERSION -from ..sensors import EnvironmentalBox, LoraSensor, MobileLab +from connectivity.constants import STATION_VERSION +from connectivity.src.sensors import SensorsFabcric from .istation import IStation logging.config.dictConfig(LOGGING_CONFIG) @@ -65,15 +65,8 @@ def on_message(self, client, userdata, msg: dict) -> None: global thlock global sessions data = json.loads(msg.payload.decode()) - if "esp8266id" in data.keys(): - meas = EnvironmentalBox(data) - elif "ID" in data.keys(): - meas = MobileLab(data) - elif "uplink_message" in data.keys(): - if "decoded_payload" in data["uplink_message"]: - id = data["end_device_ids"]["device_id"] - meas = LoraSensor(id=id, data=data["uplink_message"]["decoded_payload"]) - else: + meas = SensorsFabcric.get_sensor(data) + if meas is None: return with thlock: if meas: diff --git a/connectivity/src/stations/trackargostation.py b/connectivity/src/stations/trackargostation.py index ce0c3de..db8a4c2 100644 --- a/connectivity/src/stations/trackargostation.py +++ b/connectivity/src/stations/trackargostation.py @@ -7,13 +7,12 @@ import time import typing as tp import urllib.request as ur -# from os import times from urllib import error, parse from connectivity.config.logging import LOGGING_CONFIG -from ...constants import STATION_VERSION -from ..sensors import TrackAgro +from connectivity.constants import STATION_VERSION +from connectivity.src.sensors import TrackAgro from .istation import IStation logging.config.dictConfig(LOGGING_CONFIG) diff --git a/connectivity/utils/datalog_payload.py b/connectivity/utils/datalog_payload.py new file mode 100644 index 0000000..2317bc6 --- /dev/null +++ b/connectivity/utils/datalog_payload.py @@ -0,0 +1,85 @@ +import logging.config +import typing as tp +from tempfile import NamedTemporaryFile +import json + +from connectivity.config.logging import LOGGING_CONFIG +from connectivity.constants import MOBILE_GPS + +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger("sensors-connectivity") + + +def create_payload(buf: set) -> dict: + """Format measurements to payload. + + :param buf: Set of measurements from all sensors. + :return: Payload + """ + payload = {} + for m in buf: + try: + if m.public in payload: + payload[m.public]["measurements"].append(m.measurement) + else: + if m.model == MOBILE_GPS: + payload[m.public] = { + "model": m.model, + "donated_by": m.donated_by, + "measurements": [m.measurement], + } + else: + payload[m.public] = { + "model": m.model, + "geo": "{},{}".format(m.geo_lat, m.geo_lon), + "donated_by": m.donated_by, + "measurements": [m.measurement], + } + except Exception as e: + logger.warning(f"Create datalog payload: Couldn't store data: {e}") + + return payload + + +def sort_payload(payload: dict) -> dict: + """Sort measurements dict with timestamp. + + :param payload: Measurements dict. + :return: Sorted measurements dict. + """ + + ordered = {} + for k, v in payload.items(): + try: + meas = sorted(v["measurements"], key=lambda x: x["timestamp"]) + if v.get("geo"): + ordered[k] = { + "model": v["model"], + "geo": v["geo"], + "donated_by": v["donated_by"], + "measurements": meas, + } + else: + ordered[k] = { + "model": v["model"], + "donated_by": v["donated_by"], + "measurements": meas, + } + except Exception as e: + logger.warning(f"Sort datalog payload: Couldn't sort data: {e}") + return ordered + + +def create_tmp_file(payload: dict) -> str: + """Save payload to a named tmp file. + + :param payload: Payload which will be stored. + + :return: Path to the tmp file + """ + temp = NamedTemporaryFile(mode="w", delete=False) + logger.debug(f"Created temp file: {temp.name}") + temp.write(json.dumps(payload)) + temp.close() + + return temp.name diff --git a/connectivity/utils/format_robonomics_feeder_msg.py b/connectivity/utils/format_robonomics_feeder_msg.py new file mode 100644 index 0000000..ce9eaed --- /dev/null +++ b/connectivity/utils/format_robonomics_feeder_msg.py @@ -0,0 +1,37 @@ +import json + +from connectivity.src.sensors.sensors_types import Device + +def to_pubsub_message(data: Device) -> str: + """Prepare JSON formatted string with measurements. + + :param data: Dict with the last measurement from one sensor. + :return: JSON formatted string for pubsub. + """ + + message = {} + message[data.public] = { + "model": data.model, + "geo": "{},{}".format(data.geo_lat, data.geo_lon), + "donated_by": data.donated_by, + "measurement": data.measurement, + } + return json.dumps(message) + + +def to_ping_message(data: Device) -> str: + """Prepare JSON formatted string with base info about sensor. + No measurements. + + :param data: Dict with the base info from one sensor. + :return: JSON formatted string for pubsub. + """ + + message = {} + message[data.public] = { + "model": data.model, + "timestamp": data.measurement.timestamp, + "measurement": {"geo": "{},{}".format(data.geo_lat, data.geo_lon)}, + } + + return json.dumps(message) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8f93d48..d28fa55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "sensors_connectivity" -version = "1.6.5b" +version = "1.6.6" description = "Robonomics package to read data from sensors and publish to different output channels" authors = [ "Vadim Manaenko ",