Skip to content

Commit b33679f

Browse files
authored
Merge pull request #51 from Kapim/ros2
fix: the TRANSIENT_LOCAL topics were not working on server
2 parents fb6ed8a + f88d9ef commit b33679f

File tree

7 files changed

+241
-167
lines changed

7 files changed

+241
-167
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from multiprocessing import Queue
2+
from typing import Optional
3+
4+
import rclpy # pants: no-infer-dep
5+
from rclpy.qos import QoSProfile # pants: no-infer-dep
6+
7+
from era_5g_relay_network_application.utils import ActionServiceVariant, ActionSubscribers
8+
from era_5g_relay_network_application.worker_service import ServiceData, WorkerService
9+
10+
11+
class RelayService:
12+
"""Class that holds information about incoming service (i.e. service that is called from relay client), its type and
13+
related worker."""
14+
15+
def __init__(
16+
self,
17+
service_name: str,
18+
service_type: str,
19+
node: rclpy.node.Node,
20+
qos: Optional[QoSProfile] = None,
21+
action_service_variant: ActionServiceVariant = ActionServiceVariant.NONE,
22+
action_subscribers: Optional[ActionSubscribers] = None,
23+
):
24+
self.service_type = service_type
25+
26+
# Only one call of the service at the time is allowed, therefore queue of length 1 is sufficent
27+
self.queue_request: Queue[ServiceData] = Queue(1)
28+
self.queue_response: Queue[ServiceData] = Queue(1)
29+
30+
self.worker = WorkerService(
31+
service_name,
32+
service_type,
33+
self.queue_request,
34+
self.queue_response,
35+
node,
36+
qos,
37+
action_service_variant,
38+
action_subscribers,
39+
)
40+
41+
# Service name can be changed by worker in case of action-related services
42+
self.service_name = self.worker.service_name
43+
self.channel_name_request = f"service_request/{self.service_name}"
44+
self.channel_name_response = f"service_response/{self.service_name}"
45+
46+
self.worker.daemon = True
47+
self.worker.start()
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import os
2+
from multiprocessing import Queue
3+
from typing import Any, Optional, Union
4+
5+
import rclpy # pants: no-infer-dep
6+
from rclpy.qos import QoSProfile # pants: no-infer-dep
7+
8+
from era_5g_interface.channels import ChannelType
9+
from era_5g_relay_network_application import queue_len_from_qos
10+
from era_5g_relay_network_application.utils import (
11+
IMAGE_CHANNEL_TYPES,
12+
ActionSubscribers,
13+
ActionTopicVariant,
14+
Compressions,
15+
)
16+
from era_5g_relay_network_application.worker_image_publisher import WorkerImagePublisher
17+
from era_5g_relay_network_application.worker_image_subscriber import WorkerImageSubscriber
18+
from era_5g_relay_network_application.worker_publisher import WorkerPublisher
19+
from era_5g_relay_network_application.worker_subscriber import WorkerSubscriber
20+
21+
QUEUE_LENGTH_TOPICS = int(os.getenv("QUEUE_LENGTH_TOPICS", 1))
22+
EXTENDED_MEASURING = bool(os.getenv("EXTENDED_MEASURING", False))
23+
24+
25+
class RelayTopic:
26+
"""Base class that holds information about topic and its type."""
27+
28+
def __init__(
29+
self,
30+
topic_name: str,
31+
topic_type: str,
32+
channel_type: ChannelType,
33+
compression: Optional[Compressions] = None,
34+
qos: Optional[QoSProfile] = None,
35+
):
36+
self.topic_name = topic_name
37+
self.topic_type = topic_type
38+
self.compression = compression
39+
self.qos = qos
40+
self.channel_type = channel_type
41+
self.channel_name = f"topic/{self.topic_name}"
42+
self.queue: Queue[Any] = Queue(queue_len_from_qos(QUEUE_LENGTH_TOPICS, self.qos))
43+
44+
45+
class RelayTopicIncoming(RelayTopic):
46+
"""Class that holds information about incoming topic (i.e. topic that is received from the relay client and is
47+
published here), its type and related publisher."""
48+
49+
def __init__(
50+
self,
51+
topic_name: str,
52+
topic_type: str,
53+
channel_type: ChannelType,
54+
node,
55+
compression: Optional[Compressions] = None,
56+
qos: Optional[QoSProfile] = None,
57+
):
58+
super().__init__(topic_name, topic_type, channel_type, compression, qos)
59+
60+
# this sucks, the classes should have some common ancestor or there should be two properties
61+
self.worker: Union[WorkerImagePublisher, WorkerPublisher]
62+
63+
if self.channel_type in IMAGE_CHANNEL_TYPES:
64+
self.worker = WorkerImagePublisher(
65+
self.queue,
66+
self.topic_name,
67+
self.topic_type,
68+
compression=compression,
69+
node=node,
70+
extended_measuring=EXTENDED_MEASURING,
71+
)
72+
else:
73+
self.worker = WorkerPublisher(
74+
self.queue,
75+
self.topic_name,
76+
self.topic_type,
77+
node,
78+
self.compression,
79+
self.qos,
80+
extended_measuring=EXTENDED_MEASURING,
81+
)
82+
self.worker.daemon = True
83+
self.worker.start()
84+
85+
86+
class RelayTopicOutgoing(RelayTopic):
87+
"""Class that holds information about outgoing topic (i.e. topic that is subscribed to be sent to the relay client),
88+
its type and related subscriber."""
89+
90+
def __init__(
91+
self,
92+
topic_name: str,
93+
topic_type: str,
94+
channel_type: ChannelType,
95+
node: rclpy.node.Node,
96+
compression: Optional[Compressions] = None,
97+
qos: Optional[QoSProfile] = None,
98+
action_topic_variant: ActionTopicVariant = ActionTopicVariant.NONE,
99+
action_subscribers: Optional[ActionSubscribers] = None,
100+
):
101+
super().__init__(topic_name, topic_type, channel_type, compression, qos)
102+
103+
self.action_topic_variant = action_topic_variant
104+
105+
# this sucks, the classes should have some common ancestor or there should be two properties
106+
self.worker: Union[WorkerImageSubscriber, WorkerSubscriber]
107+
108+
if self.channel_type in IMAGE_CHANNEL_TYPES:
109+
self.worker = WorkerImageSubscriber(
110+
topic_name, topic_type, node, self.queue, extended_measuring=EXTENDED_MEASURING
111+
)
112+
else:
113+
self.worker = WorkerSubscriber(
114+
topic_name,
115+
topic_type,
116+
node,
117+
self.queue,
118+
compression,
119+
qos,
120+
action_topic_variant,
121+
action_subscribers,
122+
extended_measuring=EXTENDED_MEASURING,
123+
)
124+
# Topic name may be changed in case of action-related topics
125+
self.channel_name = f"topic/{self.worker.topic_name}"
126+
self.topic_name = self.worker.topic_name

0 commit comments

Comments
 (0)