-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce client service instance class for receiving events
- Loading branch information
Showing
4 changed files
with
250 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import asyncio | ||
import ipaddress | ||
import logging | ||
|
||
from someipy.service_discovery import construct_service_discovery | ||
from someipy.client_service_instance import construct_client_service_instance | ||
from someipy.logging import set_someipy_log_level | ||
from temperature_msg import TemparatureMsg | ||
|
||
SD_MULTICAST_GROUP = "224.224.224.245" | ||
SD_PORT = 30490 | ||
INTERFACE_IP = "127.0.0.1" | ||
|
||
SAMPLE_EVENTGROUP_ID = 20 | ||
|
||
|
||
def temperature_callback(payload: bytes) -> None: | ||
print(f"Received {len(payload)} bytes.") | ||
temperature_msg = TemparatureMsg().deserialize(payload) | ||
print(temperature_msg) | ||
|
||
|
||
async def main(): | ||
# It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, .. | ||
set_someipy_log_level(logging.DEBUG) | ||
|
||
# Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function | ||
# use the construct_service_discovery function | ||
# The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set | ||
service_discovery = await construct_service_discovery( | ||
SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP | ||
) | ||
|
||
# 1. For receiving events use a ClientServiceInstance. Since the construction of the class ClientServiceInstance is not | ||
# trivial and would require an async __init__ function use the construct_client_service_instance function | ||
# 2. Pass the service and instance ID, version and endpoint and TTL. The endpoint is needed because it will be the dest IP | ||
# and port to which the events are sent to and the client will listen to | ||
# 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ClientServiceInstance can offer his service to | ||
# other ECUs | ||
service_instance_temperature = await construct_client_service_instance( | ||
service_id=1, | ||
instance_id=1000, | ||
major_version=1, | ||
minor_version=0, | ||
endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3002), | ||
ttl=5, | ||
sd_sender=service_discovery, | ||
) | ||
|
||
# It's possible to optionally register a callback function which will be called when an event from the | ||
# subscribed event group is received. The callback function will get the bytes of the payload passed which | ||
# can be deserialized in the callback function | ||
service_instance_temperature.register_callback(temperature_callback) | ||
|
||
# In order to subscribe to an event group, just pass the event group ID to the subscribe_eventgroup method | ||
service_instance_temperature.subscribe_eventgroup(SAMPLE_EVENTGROUP_ID) | ||
|
||
# The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance | ||
# is notified by the ServiceDiscoveryProtocol about e.g. offers from other ECUs and can also subscribe to offered | ||
# services | ||
service_discovery.attach(service_instance_temperature) | ||
|
||
try: | ||
# Keep the task alive | ||
await asyncio.Future() | ||
except asyncio.CancelledError as e: | ||
print("Shutdown..") | ||
finally: | ||
print("Service Discovery close..") | ||
service_discovery.close() | ||
|
||
print("End main task..") | ||
|
||
|
||
if __name__ == "__main__": | ||
try: | ||
asyncio.run(main()) | ||
except KeyboardInterrupt as e: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
import asyncio | ||
from typing import Tuple, Union, Any, Callable | ||
|
||
from someipy._internal.someip_sd_header import ( | ||
SdService, | ||
TransportLayerProtocol, | ||
SdEventGroupEntry, | ||
) | ||
from someipy._internal.someip_sd_builder import build_subscribe_eventgroup_entry | ||
from someipy._internal.someip_header import ( | ||
get_payload_from_someip_message, | ||
SomeIpHeader, | ||
) | ||
from someipy._internal.service_discovery_abcs import ( | ||
ServiceDiscoveryObserver, | ||
ServiceDiscoverySender, | ||
) | ||
from someipy._internal.utils import create_udp_socket, EndpointType, DatagramAdapter | ||
from someipy._internal.logging import get_logger | ||
|
||
|
||
_logger = get_logger("client_service_instance") | ||
|
||
|
||
class ClientServiceInstance(ServiceDiscoveryObserver): | ||
service_id: int | ||
instance_id: int | ||
major_version: int | ||
minor_version: int | ||
endpoint: EndpointType | ||
ttl: int | ||
sd_sender: ServiceDiscoverySender | ||
|
||
eventgroup_to_subscribe: int | ||
expect_ack: bool | ||
|
||
callback: Callable[[bytes], None] | ||
|
||
def __init__( | ||
self, | ||
service_id: int, | ||
instance_id: int, | ||
major_version: int, | ||
minor_version: int, | ||
endpoint: EndpointType, | ||
ttl: int = 0, | ||
sd_sender=None, | ||
): | ||
self.service_id = service_id | ||
self.instance_id = instance_id | ||
self.major_version = major_version | ||
self.minor_version = minor_version | ||
self.endpoint = endpoint | ||
self.ttl = ttl | ||
self.sd_sender = sd_sender | ||
|
||
self.eventgroup_to_subscribe = -1 | ||
self.expect_ack = False | ||
|
||
self.unicast_transport = None | ||
|
||
self.callback = None | ||
|
||
def register_callback(self, callback: Callable[[bytes], None]) -> None: | ||
self.callback = callback | ||
|
||
def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> None: | ||
# TODO: Test if there is a subscription active for the received data | ||
if self.callback is not None: | ||
header = SomeIpHeader.from_buffer(data) | ||
payload = get_payload_from_someip_message(header, data) | ||
self.callback(payload) | ||
|
||
def connection_lost(self, exc: Exception) -> None: | ||
pass | ||
|
||
def subscribe_eventgroup(self, eventgroup_id: int): | ||
# TODO: Currently only one eventgroup per service is supported | ||
self.eventgroup_to_subscribe = eventgroup_id | ||
|
||
def stop_subscribe_eventgroup(self, eventgroup_id: int): | ||
# TODO: Implement StopSubscribe | ||
raise NotImplementedError | ||
|
||
def find_service_update(self): | ||
# Not needed in client service instance | ||
pass | ||
|
||
def offer_service_update(self, offered_service: SdService): | ||
if ( | ||
self.eventgroup_to_subscribe != -1 | ||
and offered_service.service_id == self.service_id | ||
and offered_service.instance_id == self.instance_id | ||
): | ||
( | ||
session_id, | ||
reboot_flag, | ||
) = self.sd_sender.get_unicast_session_handler().update_session() | ||
|
||
subscribe_sd_header = build_subscribe_eventgroup_entry( | ||
service_id=self.service_id, | ||
instance_id=self.instance_id, | ||
major_version=self.major_version, | ||
ttl=self.ttl, | ||
event_group_id=self.eventgroup_to_subscribe, | ||
session_id=session_id, | ||
reboot_flag=reboot_flag, | ||
endpoint=self.endpoint, | ||
protocol=TransportLayerProtocol.UDP, | ||
) | ||
|
||
# TODO: Subscription shall be only active when ACK is received | ||
self.expect_ack = True | ||
|
||
_logger.debug( | ||
f"Send subscribe for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}, evengroup ID: {self.eventgroup_to_subscribe} TTL: {self.ttl}, version: {self.major_version}.{self.minor_version}, session ID: {session_id}" | ||
) | ||
|
||
self.sd_sender.send_unicast( | ||
buffer=subscribe_sd_header.to_buffer(), | ||
dest_ip=offered_service.endpoint[0], | ||
) | ||
|
||
def subscribe_eventgroup_update(self, _, __) -> None: | ||
# Not needed for client instance | ||
pass | ||
|
||
def subscribe_ack_eventgroup_update( | ||
self, event_group_entry: SdEventGroupEntry | ||
) -> None: | ||
if self.expect_ack: | ||
self.expect_ack = False | ||
_logger.debug( | ||
f"Received subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" | ||
) | ||
|
||
|
||
async def construct_client_service_instance( | ||
service_id: int, | ||
instance_id: int, | ||
major_version: int, | ||
minor_version: int, | ||
endpoint: EndpointType, | ||
ttl: int = 0, | ||
sd_sender=None, | ||
) -> ClientServiceInstance: | ||
client_instance = ClientServiceInstance( | ||
service_id, instance_id, major_version, minor_version, endpoint, ttl, sd_sender | ||
) | ||
|
||
loop = asyncio.get_running_loop() | ||
rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) | ||
|
||
unicast_transport, _ = await loop.create_datagram_endpoint( | ||
lambda: DatagramAdapter(target=client_instance), sock=rcv_socket | ||
) | ||
client_instance.unicast_transport = unicast_transport | ||
|
||
return client_instance |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters