Skip to content

Commit 0e73f12

Browse files
committed
Implement asynchronous method handlers on server side
1 parent ec96e56 commit 0e73f12

File tree

6 files changed

+46
-11
lines changed

6 files changed

+46
-11
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ The library is still under development. The current major limitations and deviat
6363

6464
- Configuration and load balancing options in SOME/IP SD messages are not supported.
6565
- TTL of Service Discovery entries is not checked yet.
66-
- The Initial Wait Phase and Repetition Phase of the Service Discovery specification are skipped. For simplification, the Main Phase is directly entered, i.e. SD Offer Entries are immediately sent cyclically.
66+
- The Initial Wait Phase and Repetition Phase of the Service Discovery specification are skipped. The Main Phase is directly entered, i.e. SD Offer Entries are immediately sent cyclically.
6767

6868
### De-/Serialization
6969

example_apps/offer_method_tcp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
SAMPLE_METHOD_ID = 0x0123
2222

2323

24-
def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
24+
async def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
2525
print(
2626
f"Received data: {' '.join(f'0x{b:02x}' for b in input_data)} from IP: {addr[0]} Port: {addr[1]}"
2727
)

example_apps/offer_method_udp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
SAMPLE_METHOD_ID = 0x0123
2222

2323

24-
def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
24+
async def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
2525
print(
2626
f"Received data: {' '.join(f'0x{b:02x}' for b in input_data)} from IP: {addr[0]} Port: {addr[1]}"
2727
)

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = someipy
3-
version = 0.0.8
3+
version = 0.0.9
44
author = Christian H.
55
author_email = someipy.package@gmail.com
66
description = A Python package implementing the SOME/IP protocol

src/someipy/client_service_instance.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,9 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
234234
endpoint_to_str_int_tuple(self._found_services[0].service.endpoint),
235235
)
236236

237-
# After sending the method call wait for maximum one second
237+
# After sending the method call wait for maximum three seconds
238238
try:
239-
await asyncio.wait_for(self._method_call_future, 1.0)
239+
await asyncio.wait_for(self._method_call_future, 3.0)
240240
except asyncio.TimeoutError:
241241
get_logger(_logger_name).error(
242242
f"Waiting on response for method call 0x{method_id:04X} timed out."

src/someipy/server_service_instance.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ class ServerServiceInstance(ServiceDiscoveryObserver):
6969
_subscribers: Subscribers
7070
_offer_timer: SimplePeriodicTimer
7171

72+
_handler_tasks: set[asyncio.Task]
73+
_is_running: bool
74+
7275
def __init__(
7376
self,
7477
service: Service,
@@ -92,6 +95,9 @@ def __init__(
9295
self._subscribers = Subscribers()
9396
self._offer_timer = None
9497

98+
self._handler_tasks = set()
99+
self._is_running = True
100+
95101
def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None:
96102
"""
97103
Sends an event to subscribers with the given event group ID, event ID, and payload.
@@ -135,6 +141,18 @@ def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None
135141
endpoint_to_str_int_tuple(sub.endpoint),
136142
)
137143

144+
async def _handle_method_call(self, method_handler, dst_addr, header_to_return):
145+
result = await method_handler
146+
header_to_return.message_type = result.message_type.value
147+
header_to_return.return_code = result.return_code.value
148+
payload_to_return = result.payload
149+
150+
# Update length in header to the correct length
151+
header_to_return.length = 8 + len(payload_to_return)
152+
self._someip_endpoint.sendto(
153+
header_to_return.to_buffer() + payload_to_return, dst_addr
154+
)
155+
138156
def someip_message_received(
139157
self, message: SomeIpMessage, addr: Tuple[str, int]
140158
) -> None:
@@ -155,6 +173,10 @@ def someip_message_received(
155173
- The protocol and interface version are not checked yet.
156174
- If the message type in the received header is not a request, a warning is logged.
157175
"""
176+
177+
if not self._is_running:
178+
return
179+
158180
header = message.header
159181
payload_to_return = bytes()
160182
header_to_return = header
@@ -193,12 +215,15 @@ def send_response():
193215
and header.return_code == 0x00
194216
):
195217
method_handler = self._service.methods[header.method_id].method_handler
196-
result = method_handler(message.payload, addr)
218+
coro = method_handler(message.payload, addr)
197219

198-
header_to_return.message_type = result.message_type.value
199-
header_to_return.return_code = result.return_code.value
200-
payload_to_return = result.payload
201-
send_response()
220+
# If a method is called, do it in a separate task to allow for asynchronous processing inside
221+
# method handlers
222+
new_task = asyncio.create_task(
223+
self._handle_method_call(coro, addr, header_to_return)
224+
)
225+
self._handler_tasks.add(new_task)
226+
new_task.add_done_callback(self._handler_tasks.discard)
202227

203228
else:
204229
get_logger(_logger_name).warning(
@@ -400,6 +425,16 @@ async def stop_offer(self):
400425
)
401426
self._sd_sender.send_multicast(sd_header.to_buffer())
402427

428+
# Stop processing incoming calls
429+
self._is_running = False
430+
431+
# Cancel all running handler tasks
432+
for task in self._handler_tasks:
433+
task.cancel()
434+
435+
# Wait for all tasks to be canceled
436+
await asyncio.gather(*self._handler_tasks)
437+
403438

404439
async def construct_server_service_instance(
405440
service: Service,

0 commit comments

Comments
 (0)