Skip to content

Commit accfa52

Browse files
authored
add mqv:topic to TD (#137)
1 parent e45b539 commit accfa52

File tree

5 files changed

+41
-27
lines changed

5 files changed

+41
-27
lines changed

hololinked/client/mqtt/consumed_interactions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(
3030
def listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None:
3131
# This method is called from a different thread but also finishes quickly, we wont redo this way
3232
# for the time being.
33-
topic = f"{self.resource.thing_id}/{self.resource.name}"
33+
topic = form.mqv_topic or f"{self.resource.thing_id}/{self.resource.name}"
3434

3535
def on_topic_message(client: PahoMQTTClient, userdata, message: MQTTMessage):
3636
try:
@@ -57,7 +57,7 @@ def on_topic_message(client: PahoMQTTClient, userdata, message: MQTTMessage):
5757
self.sync_client.message_callback_add(topic, on_topic_message)
5858

5959
async def async_listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None:
60-
topic = f"{self.resource.thing_id}/{self.resource.name}"
60+
topic = form.mqv_topic or f"{self.resource.thing_id}/{self.resource.name}"
6161
try:
6262
await self.async_client.__aenter__()
6363
except aiomqtt.MqttReentrantError:

hololinked/server/mqtt.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
import copy
66
import ssl
77

8+
89
from ..utils import get_current_async_loop
910
from .utils import consume_broker_queue, consume_broker_pubsub_per_event
1011
from ..config import global_config
12+
from ..constants import Operations
1113
from ..serializers import Serializers
12-
from ..td.interaction_affordance import EventAffordance
14+
from ..td.interaction_affordance import EventAffordance, PropertyAffordance
1315
from ..param.parameters import Selector, String, Integer, ClassSelector, List
1416

1517

@@ -149,23 +151,23 @@ async def publish_thing_description(self, ZMQ_TD: dict[str, Any]) -> dict[str, A
149151
TD.pop("actions", None)
150152
# remove properties that are not observable
151153
for name in ZMQ_TD.get("properties", {}).keys():
152-
if not TD["properties"][name].get("observable", False):
154+
affordance = PropertyAffordance.from_TD(name, ZMQ_TD)
155+
if not affordance.observable:
153156
TD["properties"].pop(name)
154157
continue
155-
forms = TD["properties"][name].pop("forms", [])
156158
TD["properties"][name]["forms"] = []
157-
for form in forms:
158-
if form["op"] == "observeproperty":
159-
form["href"] = f"mqtt://{self.hostname}:{self.port}"
160-
TD["properties"][name]["forms"].append(form)
159+
form = affordance.retrieve_form(Operations.observeproperty)
160+
form.href = f"mqtt{'s' if self.ssl_context else ''}://{self.hostname}:{self.port}"
161+
form.mqv_topic = f"{TD['id']}/{name}"
162+
TD["properties"][name]["forms"].append(form.json())
161163
# repurpose event
162164
for name in ZMQ_TD.get("events", {}).keys():
163-
forms = TD["events"][name].pop("forms", [])
165+
affordance = EventAffordance.from_TD(name, ZMQ_TD)
164166
TD["events"][name]["forms"] = []
165-
for form in forms:
166-
if form["op"] == "subscribeevent":
167-
form["href"] = f"mqtt://{self.hostname}:{self.port}"
168-
TD["events"][name]["forms"].append(form)
167+
form = affordance.retrieve_form(Operations.subscribeevent)
168+
form.href = f"mqtt{'s' if self.ssl_context else ''}://{self.hostname}:{self.port}"
169+
form.mqv_topic = f"{TD['id']}/{name}"
170+
TD["events"][name]["forms"].append(form.json())
169171
await self.client.publish(
170172
topic=f"{TD['id']}/thing-description",
171173
payload=Serializers.json.dumps(TD),

hololinked/td/forms.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class Form(Schema):
3939
href: str = None
4040
op: str = None
4141
htv_methodName: str = Field(default=None, alias="htv:methodName")
42+
mqv_topic: str = Field(default=None, alias="mqv:topic")
4243
contentType: typing.Optional[str] = "application/json"
4344
additionalResponses: typing.Optional[typing.List[AdditionalExpectedResponse]] = None
4445
contentEncoding: typing.Optional[str] = None
@@ -61,6 +62,8 @@ def from_TD(cls, form_json: typing.Dict[str, typing.Any]) -> "Form":
6162
for field in cls.model_fields:
6263
if field == "htv_methodName" and "htv:methodName" in form_json:
6364
setattr(form, field, form_json["htv:methodName"])
65+
elif field == "mqv_topic" and "mqv:topic" in form_json:
66+
setattr(form, field, form_json["mqv:topic"])
6467
elif field in form_json:
6568
setattr(form, field, form_json[field])
6669
return form

tests/helper-scripts/client.ipynb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2048,6 +2048,7 @@
20482048
" 'observable': True,\n",
20492049
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20502050
" 'op': 'observeproperty',\n",
2051+
" 'mqv:topic': 'example-test/observable_list_prop',\n",
20512052
" 'contentType': 'application/json'}]},\n",
20522053
" 'observable_readonly_prop': {'description': 'An observable readonly property to check observable events on read operations',\n",
20532054
" 'default': 0,\n",
@@ -2056,6 +2057,7 @@
20562057
" 'observable': True,\n",
20572058
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20582059
" 'op': 'observeproperty',\n",
2060+
" 'mqv:topic': 'example-test/observable_readonly_prop',\n",
20592061
" 'contentType': 'application/json'}]},\n",
20602062
" 'sleeping_prop': {'description': 'A property that sleeps for 10 seconds on read operations',\n",
20612063
" 'default': 0,\n",
@@ -2064,6 +2066,7 @@
20642066
" 'observable': True,\n",
20652067
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20662068
" 'op': 'observeproperty',\n",
2069+
" 'mqv:topic': 'example-test/sleeping_prop',\n",
20672070
" 'contentType': 'application/json'}]}},\n",
20682071
" 'events': {'data_point_event': {'description': 'Event raised when a new data point is available',\n",
20692072
" 'data': {'type': 'object',\n",
@@ -2072,26 +2075,32 @@
20722075
" 'required': ['timestamp', 'energy']},\n",
20732076
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20742077
" 'op': 'subscribeevent',\n",
2078+
" 'mqv:topic': 'example-test/data_point_event',\n",
20752079
" 'contentType': 'application/json'}]},\n",
20762080
" 'test_binary_payload_event': {'description': 'test event with binary payload',\n",
20772081
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20782082
" 'op': 'subscribeevent',\n",
2083+
" 'mqv:topic': 'example-test/test_binary_payload_event',\n",
20792084
" 'contentType': 'application/json'}]},\n",
20802085
" 'test_event': {'description': 'test event with arbitrary payload',\n",
20812086
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20822087
" 'op': 'subscribeevent',\n",
2088+
" 'mqv:topic': 'example-test/test_event',\n",
20832089
" 'contentType': 'application/json'}]},\n",
20842090
" 'test_event_with_json_schema': {'description': 'test event with schema validation',\n",
20852091
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20862092
" 'op': 'subscribeevent',\n",
2093+
" 'mqv:topic': 'example-test/test_event_with_json_schema',\n",
20872094
" 'contentType': 'application/json'}]},\n",
20882095
" 'test_event_with_pydantic_schema': {'description': 'test event with pydantic schema validation',\n",
20892096
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20902097
" 'op': 'subscribeevent',\n",
2098+
" 'mqv:topic': 'example-test/test_event_with_pydantic_schema',\n",
20912099
" 'contentType': 'application/json'}]},\n",
20922100
" 'test_mixed_content_payload_event': {'description': 'test event with mixed content payload',\n",
20932101
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
20942102
" 'op': 'subscribeevent',\n",
2103+
" 'mqv:topic': 'example-test/test_mixed_content_payload_event',\n",
20952104
" 'contentType': 'application/json'}]}}}"
20962105
]
20972106
},
@@ -2119,7 +2128,7 @@
21192128
},
21202129
{
21212130
"cell_type": "code",
2122-
"execution_count": 3,
2131+
"execution_count": null,
21232132
"id": "335296e0",
21242133
"metadata": {},
21252134
"outputs": [],
@@ -2141,7 +2150,7 @@
21412150
"\n",
21422151
"# object_proxy.subscribe_event(\"test_event\", cb)\n",
21432152
"# object_proxy.subscribe_event(\"test_event\", [cb1, cb2], concurrent=True)\n",
2144-
"# object_proxy.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True)\n",
2153+
"object_proxy_mqtt.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True)\n",
21452154
"# object_proxy.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True, concurrent=True)"
21462155
]
21472156
},

tests/helper-scripts/run_test_thing.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515

1616
global_config.DEBUG = True
1717

18-
# thing = TestThing(id="example-test")
19-
thing = OceanOpticsSpectrometer(id="example-test", serial_number="simulation")
18+
thing = TestThing(id="example-test")
19+
# thing = OceanOpticsSpectrometer(id="example-test", serial_number="simulation")
2020

2121

2222
Serializers.register_for_object(TestThing.db_init_int_prop, Serializers.pickle)
@@ -35,20 +35,20 @@
3535
http_server = HTTPServer(port=9000)
3636
zmq_server = ZMQServer(id="example-test-server", things=[thing], access_points="IPC")
3737

38-
# mqtt_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
39-
# if not os.path.exists("ca.crt"):
40-
# raise FileNotFoundError("CA certificate 'ca.crt' not found in current directory for MQTT TLS connection")
41-
# mqtt_ssl.load_verify_locations(cafile="ca.crt")
42-
# mqtt_ssl.check_hostname = True
43-
# mqtt_ssl.verify_mode = ssl.CERT_REQUIRED
44-
# mqtt_ssl.minimum_version = ssl.TLSVersion.TLSv1_2
38+
mqtt_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
39+
if not os.path.exists("ca.crt"):
40+
raise FileNotFoundError("CA certificate 'ca.crt' not found in current directory for MQTT TLS connection")
41+
mqtt_ssl.load_verify_locations(cafile="ca.crt")
42+
mqtt_ssl.check_hostname = True
43+
mqtt_ssl.verify_mode = ssl.CERT_REQUIRED
44+
mqtt_ssl.minimum_version = ssl.TLSVersion.TLSv1_2
4545

4646
mqtt_publisher = MQTTPublisher(
4747
hostname="localhost",
4848
port=8883,
4949
username="sampleuser",
5050
password="samplepass",
5151
qos=1,
52-
# ssl_context=mqtt_ssl,
52+
ssl_context=mqtt_ssl,
5353
)
54-
thing.run(servers=[http_server, zmq_server])
54+
thing.run(servers=[http_server, zmq_server, mqtt_publisher])

0 commit comments

Comments
 (0)