diff --git a/.gitignore b/.gitignore index 01aa025..8865224 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ .vscode/ dist/ someipy.egg-info/ -__pycache__ \ No newline at end of file +__pycache__ + +test_apps/build +test_apps/install \ No newline at end of file diff --git a/README.md b/README.md index 133fdc5..3638442 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,7 @@ The library is still under development. The current major limitations and deviat ### SOME/IP - Events (and field notifiers) are supported. -- Methods are only supported on server side (offering methods). Calling methods (clients) are not supported yet. -- Only UDP services are supported. +- Methods are only supported on server side (offering methods). Calling methods (clients) is not supported yet. - Only unicast services are supported. - SOME/IP-TP is not supported. - IPv6 endpoints are not supported. diff --git a/example_apps/receive_events.py b/example_apps/receive_events_udp.py similarity index 78% rename from example_apps/receive_events.py rename to example_apps/receive_events_udp.py index d29678d..ee07097 100644 --- a/example_apps/receive_events.py +++ b/example_apps/receive_events_udp.py @@ -2,6 +2,7 @@ import ipaddress import logging +from someipy import ServiceBuilder, EventGroup, TransportLayerProtocol, SomeIpMessage from someipy.service_discovery import construct_service_discovery from someipy.client_service_instance import construct_client_service_instance from someipy.logging import set_someipy_log_level @@ -11,12 +12,15 @@ SD_PORT = 30490 INTERFACE_IP = "127.0.0.1" -SAMPLE_EVENTGROUP_ID = 20 +SAMPLE_SERVICE_ID = 0x1234 +SAMPLE_INSTANCE_ID = 0x5678 +SAMPLE_EVENTGROUP_ID = 0x0321 +SAMPLE_EVENT_ID = 0x0123 -def temperature_callback(payload: bytes) -> None: - print(f"Received {len(payload)} bytes.") - temperature_msg = TemparatureMsg().deserialize(payload) +def temperature_callback(someip_message: SomeIpMessage) -> None: + print(f"Received {len(someip_message.payload)} bytes.") + temperature_msg = TemparatureMsg().deserialize(someip_message.payload) print(temperature_msg) @@ -37,14 +41,24 @@ async def main(): # and port to which the events are sent to and the client will listen to # 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ClientServiceInstance can offer his service to # other ECUs + temperature_eventgroup = EventGroup( + id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID] + ) + temperature_service = ( + ServiceBuilder() + .with_service_id(SAMPLE_SERVICE_ID) + .with_major_version(1) + .with_eventgroup(temperature_eventgroup) + .build() + ) + service_instance_temperature = await construct_client_service_instance( - service_id=1, - instance_id=1000, - major_version=1, - minor_version=0, + service=temperature_service, + instance_id=SAMPLE_INSTANCE_ID, endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3002), ttl=5, sd_sender=service_discovery, + protocol=TransportLayerProtocol.UDP ) # It's possible to optionally register a callback function which will be called when an event from the diff --git a/example_apps/send_events.py b/example_apps/send_events_tcp.py similarity index 56% rename from example_apps/send_events.py rename to example_apps/send_events_tcp.py index 2d02c34..9a853e0 100644 --- a/example_apps/send_events.py +++ b/example_apps/send_events_tcp.py @@ -2,8 +2,8 @@ import ipaddress import logging +from someipy import TransportLayerProtocol, ServiceBuilder, EventGroup, construct_server_service_instance from someipy.service_discovery import construct_service_discovery -from someipy.server_service_instance import construct_server_service_instance from someipy.logging import set_someipy_log_level from someipy.serialization import Uint8, Uint64, Float32 from temperature_msg import TemparatureMsg @@ -12,67 +12,67 @@ SD_PORT = 30490 INTERFACE_IP = "127.0.0.1" -SAMPLE_EVENTGROUP_ID = 20 -SAMPLE_EVENT_ID = 32796 +SAMPLE_SERVICE_ID = 0x1234 +SAMPLE_INSTANCE_ID = 0x5678 +SAMPLE_EVENTGROUP_ID = 0x0321 +SAMPLE_EVENT_ID = 0x0123 -async def main(): +async def main(): # It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, .. set_someipy_log_level(logging.DEBUG) # Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function # use the construct_service_discovery function # The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set - service_discovery = await construct_service_discovery(SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP) - - # 1. For sending events use a ServerServiceInstance - # 2. Pass the service and instance ID, version and endpoint and TTL. The endpoint is needed again as the src-address - # and port of all sent events - # 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ServerServiceInstance can offer his service to - # other ECUs - # 4. cyclic_offer_delay_ms is the period of sending cyclic SD Offer service entries + service_discovery = await construct_service_discovery( + SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP + ) + + temperature_eventgroup = EventGroup( + id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID] + ) + temperature_service = ( + ServiceBuilder() + .with_service_id(SAMPLE_SERVICE_ID) + .with_major_version(1) + .with_eventgroup(temperature_eventgroup) + .build() + ) + + # For sending events use a ServerServiceInstance service_instance_temperature = await construct_server_service_instance( - service_id=1, - instance_id=1000, - major_version=1, - minor_version=0, - endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3000), # src IP and port of the service + temperature_service, + instance_id=SAMPLE_INSTANCE_ID, + endpoint=( + ipaddress.IPv4Address(INTERFACE_IP), + 3000, + ), # src IP and port of the service ttl=5, sd_sender=service_discovery, - cyclic_offer_delay_ms=2000 + cyclic_offer_delay_ms=2000, + protocol=TransportLayerProtocol.TCP ) # The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance # is notified by the ServiceDiscoveryProtocol about e.g. subscriptions from other ECUs service_discovery.attach(service_instance_temperature) - # For demonstration purposes we will construct a second ServerServiceInstance - service_instance_2 = await construct_server_service_instance( - service_id=2, - instance_id=2000, - major_version=1, - minor_version=0, - endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3001), # src IP and port of the service - ttl=5, - sd_sender=service_discovery, - cyclic_offer_delay_ms=2000 - ) - service_discovery.attach(service_instance_2) + # ..it's also possible to construct another ServerServiceInstance and attach it to service_discovery as well - # After constructing and attaching ServerServiceInstances to the ServiceDiscoveryProtocol objects the + # After constructing and attaching ServerServiceInstances to the ServiceDiscoveryProtocol object the # start_offer method has to be called. This will start an internal timer, which will periodically send # Offer service entries with a period of "cyclic_offer_delay_ms" which has been passed above - print("Start offering services..") + print("Start offering service..") service_instance_temperature.start_offer() - service_instance_2.start_offer() tmp_msg = TemparatureMsg() - # Reminder: Do NOT write "tmp_msg.version.major = 1". Always use the provided classes in someipy like Uint8, + # Reminder: Do NOT use "tmp_msg.version.major = 1". Always use the provided classes in someipy like Uint8, # so that the data can be propery serialized. Python literals won't be serialized properly tmp_msg.version.major = Uint8(1) tmp_msg.version.minor = Uint8(0) - + tmp_msg.measurements.data[0] = Float32(20.0) tmp_msg.measurements.data[1] = Float32(21.0) tmp_msg.measurements.data[2] = Float32(22.0) @@ -81,18 +81,19 @@ async def main(): try: # Either cyclically send events in an endless loop.. while True: - await asyncio.sleep(5) + await asyncio.sleep(1) tmp_msg.timestamp = Uint64(tmp_msg.timestamp.value + 1) payload = tmp_msg.serialize() - service_instance_temperature.send_event(SAMPLE_EVENTGROUP_ID, SAMPLE_EVENT_ID, payload) + service_instance_temperature.send_event( + SAMPLE_EVENTGROUP_ID, SAMPLE_EVENT_ID, payload + ) - # .. or in case your app is waiting for external events, using await asyncio.Future() to + # .. or in case your app is waiting for external events, use await asyncio.Future() to # keep the task alive # await asyncio.Future() - except asyncio.CancelledError as e: - print("Stop offering services..") + except asyncio.CancelledError: + print("Stop offering service..") await service_instance_temperature.stop_offer() - await service_instance_2.stop_offer() finally: print("Service Discovery close..") service_discovery.close() @@ -103,5 +104,5 @@ async def main(): if __name__ == "__main__": try: asyncio.run(main()) - except KeyboardInterrupt as e: + except KeyboardInterrupt: pass diff --git a/example_apps/send_events_udp.py b/example_apps/send_events_udp.py new file mode 100644 index 0000000..7e16c13 --- /dev/null +++ b/example_apps/send_events_udp.py @@ -0,0 +1,108 @@ +import asyncio +import ipaddress +import logging + +from someipy import TransportLayerProtocol, ServiceBuilder, EventGroup, construct_server_service_instance +from someipy.service_discovery import construct_service_discovery +from someipy.logging import set_someipy_log_level +from someipy.serialization import Uint8, Uint64, Float32 +from temperature_msg import TemparatureMsg + +SD_MULTICAST_GROUP = "224.224.224.245" +SD_PORT = 30490 +INTERFACE_IP = "127.0.0.1" + +SAMPLE_SERVICE_ID = 0x1234 +SAMPLE_INSTANCE_ID = 0x5678 +SAMPLE_EVENTGROUP_ID = 0x0321 +SAMPLE_EVENT_ID = 0x0123 + + +async def main(): + # It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, .. + set_someipy_log_level(logging.DEBUG) + + # Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function + # use the construct_service_discovery function + # The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set + service_discovery = await construct_service_discovery( + SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP + ) + + temperature_eventgroup = EventGroup( + id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID] + ) + temperature_service = ( + ServiceBuilder() + .with_service_id(SAMPLE_SERVICE_ID) + .with_major_version(1) + .with_eventgroup(temperature_eventgroup) + .build() + ) + + # For sending events use a ServerServiceInstance + service_instance_temperature = await construct_server_service_instance( + temperature_service, + instance_id=SAMPLE_INSTANCE_ID, + endpoint=( + ipaddress.IPv4Address(INTERFACE_IP), + 3000, + ), # src IP and port of the service + ttl=5, + sd_sender=service_discovery, + cyclic_offer_delay_ms=2000, + protocol=TransportLayerProtocol.UDP + ) + + # The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance + # is notified by the ServiceDiscoveryProtocol about e.g. subscriptions from other ECUs + service_discovery.attach(service_instance_temperature) + + # ..it's also possible to construct another ServerServiceInstance and attach it to service_discovery as well + + # After constructing and attaching ServerServiceInstances to the ServiceDiscoveryProtocol object the + # start_offer method has to be called. This will start an internal timer, which will periodically send + # Offer service entries with a period of "cyclic_offer_delay_ms" which has been passed above + print("Start offering service..") + service_instance_temperature.start_offer() + + tmp_msg = TemparatureMsg() + + # Reminder: Do NOT use "tmp_msg.version.major = 1". Always use the provided classes in someipy like Uint8, + # so that the data can be propery serialized. Python literals won't be serialized properly + tmp_msg.version.major = Uint8(1) + tmp_msg.version.minor = Uint8(0) + + tmp_msg.measurements.data[0] = Float32(20.0) + tmp_msg.measurements.data[1] = Float32(21.0) + tmp_msg.measurements.data[2] = Float32(22.0) + tmp_msg.measurements.data[3] = Float32(23.0) + + try: + # Either cyclically send events in an endless loop.. + while True: + await asyncio.sleep(1) + tmp_msg.timestamp = Uint64(tmp_msg.timestamp.value + 1) + payload = tmp_msg.serialize() + service_instance_temperature.send_event( + SAMPLE_EVENTGROUP_ID, SAMPLE_EVENT_ID, payload + ) + + # .. or in case your app is waiting for external events, use await asyncio.Future() to + # keep the task alive + # await asyncio.Future() + except asyncio.CancelledError: + print("Stop offering service..") + await service_instance_temperature.stop_offer() + finally: + print("Service Discovery close..") + service_discovery.close() + + print("End main task..") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/src/someipy/__init__.py b/src/someipy/__init__.py index e69de29..8a7b7f0 100644 --- a/src/someipy/__init__.py +++ b/src/someipy/__init__.py @@ -0,0 +1,4 @@ +from ._internal.someip_sd_header import TransportLayerProtocol +from .server_service_instance import ServerServiceInstance, construct_server_service_instance +from .service import Service, ServiceBuilder, EventGroup +from ._internal.someip_message import SomeIpMessage \ No newline at end of file diff --git a/src/someipy/_internal/service_discovery_abcs.py b/src/someipy/_internal/service_discovery_abcs.py index 23838f0..fde282f 100644 --- a/src/someipy/_internal/service_discovery_abcs.py +++ b/src/someipy/_internal/service_discovery_abcs.py @@ -1,5 +1,6 @@ +import ipaddress from abc import ABC, abstractmethod -from someipy._internal.someip_sd_header import * +from someipy._internal.someip_sd_header import SdService, SdEventGroupEntry, SdIPV4EndpointOption from someipy._internal.session_handler import SessionHandler diff --git a/src/someipy/_internal/someip_data_processor.py b/src/someipy/_internal/someip_data_processor.py new file mode 100644 index 0000000..f471f59 --- /dev/null +++ b/src/someipy/_internal/someip_data_processor.py @@ -0,0 +1,63 @@ +from someipy._internal.someip_header import SomeIpHeader +from someipy._internal.someip_message import SomeIpMessage + + +class SomeipDataProcessor: + + def __init__(self, datagram_mode=True): + self._buffer = bytes() + self._expected_bytes = 0 + self._datagram_mode = datagram_mode + self.someip_message = None + + def _reset(self): + self._buffer = bytes() + self._expected_bytes = 0 + + def process_data(self, new_data: bytes) -> bool: + + received_length = len(new_data) + + # UDP case + if self._datagram_mode: + header = SomeIpHeader.from_buffer(new_data) + expected_total_length = 8 + header.length + payload_length = expected_total_length - 16 + if received_length == expected_total_length: + self.someip_message = SomeIpMessage(header=header, payload=new_data[16:]) + return True + else: + # Malformed package -> return False + return False + + # From here on: TCP case + if self._expected_bytes == 0 and len(self._buffer) == 0: + + if received_length >= SomeIpHeader.MINIMAL_SIZE: + + header = SomeIpHeader.from_buffer(new_data) + expected_total_length = 8 + header.length + payload_length = expected_total_length - 16 + + # Case 1: Received exactly one SOME/IP message + if received_length == expected_total_length: + self.someip_message = SomeIpMessage(header=header, payload=new_data[16:(16+payload_length)]) + self._reset() + return True + # Case 2: Received less bytes than expected + elif received_length < expected_total_length: + self._expected_bytes = (expected_total_length - received_length) + self._buffer = new_data + return False + # Case 3: Received more bytes than expected + elif received_length > expected_total_length: + # Assume it is the beginning of a new SOME/IP message, store remaining bytes in buffer + end_payload = 16 + payload_length + self.someip_message = SomeIpMessage(header=header, payload=new_data[16:end_payload]) + self._buffer = new_data[end_payload:] + self._expected_bytes = 0 + + return True + + else: + pass # store in buffer \ No newline at end of file diff --git a/src/someipy/_internal/someip_endpoint.py b/src/someipy/_internal/someip_endpoint.py new file mode 100644 index 0000000..3c30075 --- /dev/null +++ b/src/someipy/_internal/someip_endpoint.py @@ -0,0 +1,94 @@ +import asyncio +from abc import ABC, abstractmethod +from typing import Callable, Tuple, Any +from someipy._internal.someip_message import SomeIpMessage +from someipy._internal.tcp_client_manager import ( + TcpClientManagerInterface, + TcpClientProtocolInterface, +) +from someipy._internal.utils import EndpointType +from someipy._internal.someip_data_processor import SomeipDataProcessor + + +class SomeipEndpoint(ABC): + @abstractmethod + def set_someip_callback( + self, callback_func: Callable[[SomeIpMessage, Tuple[str, int]], None] + ) -> None: + pass + + @abstractmethod + def sendto(self, data: bytes, addr: EndpointType) -> None: + pass + + @abstractmethod + def sendtoall(self, data: bytes) -> None: + pass + + @abstractmethod + def shutdown(self) -> None: + pass + + +class UDPSomeipEndpoint(SomeipEndpoint, asyncio.DatagramProtocol): + def __init__(self): + self._callback: Callable[[SomeIpMessage, Tuple[str, int]], None] = None + self._transport = None + self._processor = SomeipDataProcessor(datagram_mode=True) + + def set_someip_callback( + self, callback_func: Callable[[SomeIpMessage, Tuple[str, int]], None] + ) -> None: + self._callback = callback_func + + def connection_made(self, transport: asyncio.DatagramTransport) -> None: + self._transport = transport + + def connection_lost(self, exc: Exception | None) -> None: + pass + + def datagram_received(self, data: bytes, addr: Tuple[str | Any | int]) -> None: + result = self._processor.process_data(data) + if result and self._callback is not None: + self._callback(self._processor.someip_message, addr) + + def sendto(self, data: bytes, addr: EndpointType) -> None: + if self._transport is not None: + self._transport.sendto(data, addr) + + def sendtoall(self, data: bytes) -> None: + # TODO: Implement later for multicast support + raise NotImplementedError("No implementation for UDP yet") + + def shutdown(self) -> None: + if self._transport is not None: + self._transport.close() + + +class TCPSomeipEndpoint(SomeipEndpoint): + def __init__(self, server, manager): + self._server: asyncio.Server = server + self._manager: TcpClientManagerInterface = manager + + def set_someip_callback( + self, callback_func: Callable[[SomeIpMessage, Tuple[str, int]], None] + ) -> None: + self._manager.register_callback(callback_func) + + def sendto(self, data: bytes, addr: EndpointType) -> None: + if self._manager is None: + return + client: TcpClientProtocolInterface = self._manager.get_client(addr[0], addr[1]) + if client is not None: + client.write(data) + + def sendtoall(self, data: bytes) -> None: + if self._manager is None: + return + for c in self._manager.get_all_clients(): + c.write(data) + + def shutdown(self) -> None: + if self._manager is None: + return + self._server.close() diff --git a/src/someipy/_internal/someip_header.py b/src/someipy/_internal/someip_header.py index fa5f5a9..d8b6f55 100644 --- a/src/someipy/_internal/someip_header.py +++ b/src/someipy/_internal/someip_header.py @@ -21,6 +21,8 @@ class SomeIpHeader: message_type: int return_code: int + MINIMAL_SIZE = 16 + def is_sd_header(self) -> bool: return ( self.service_id == SERVICE_ID_SD diff --git a/src/someipy/_internal/someip_message.py b/src/someipy/_internal/someip_message.py new file mode 100644 index 0000000..7cfd443 --- /dev/null +++ b/src/someipy/_internal/someip_message.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass +from someipy._internal.someip_header import SomeIpHeader + +@dataclass +class SomeIpMessage: + header: SomeIpHeader + payload: bytes diff --git a/src/someipy/_internal/subscribers.py b/src/someipy/_internal/subscribers.py new file mode 100644 index 0000000..0e05085 --- /dev/null +++ b/src/someipy/_internal/subscribers.py @@ -0,0 +1,58 @@ +import time +from typing import List +from someipy._internal.utils import EndpointType + + +class EventGroupSubscriber: + eventgroup_id: int + endpoint: EndpointType + ttl: int + last_ts_ms: int + + def __init__(self, eventgroup_id: int, endpoint: EndpointType, ttl: int): + self.eventgroup_id = eventgroup_id + self.endpoint = endpoint + self.ttl = ttl + self.last_ts_ms = int(time.time() * 1000) + + def __eq__(self, __value: object) -> bool: + return ( + self.eventgroup_id == __value.eventgroup_id + and self.endpoint[0] == __value.endpoint[0] + and self.endpoint[1] == __value.endpoint[1] + ) + + +class Subscribers: + def __init__(self): + self._subscribers: List[EventGroupSubscriber] = [] + + def update(self): + # From SOME/IP specification: + # TTL shall be set to the lifetime of the subscription. + # If set to 0xFFFFFF, the Subscribe Eventgroup entry shall be considered valid + # until the next reboot. + time_now_ms = int(time.time() * 1000) + + self._subscribers = [ + s + for s in self._subscribers + if (s.ttl == 0xFFFFFF) or (time_now_ms < (s.last_ts_ms + (s.ttl * 1000.0))) + ] + + def add_subscriber(self, new_subscriber: EventGroupSubscriber) -> None: + time_now_ms = int(time.time() * 1000.0) + for s in self._subscribers: + if new_subscriber == s: + s.last_ts_ms = time_now_ms + return + new_subscriber.last_ts_ms = time_now_ms + self._subscribers.append(new_subscriber) + + def remove_subscriber(self, subscriber: EventGroupSubscriber) -> None: + if subscriber in self._subscribers: + self._subscribers.remove(subscriber) + + @property + def subscribers(self) -> List[EventGroupSubscriber]: + return self._subscribers diff --git a/src/someipy/_internal/tcp_client_manager.py b/src/someipy/_internal/tcp_client_manager.py new file mode 100644 index 0000000..5addbe6 --- /dev/null +++ b/src/someipy/_internal/tcp_client_manager.py @@ -0,0 +1,123 @@ +import asyncio +from abc import ABC, abstractmethod +from typing import Callable, Dict, Iterable, Tuple + +from someipy._internal.someip_message import SomeIpMessage +from someipy._internal.someip_data_processor import SomeipDataProcessor + +class TcpClientProtocolInterface(ABC): + @abstractmethod + def write(self, data: bytes) -> None: + pass + + @property + @abstractmethod + def ip_addr(self) -> str: + pass + + @property + @abstractmethod + def port(self) -> int: + pass + + +class TcpClientManagerInterface(ABC): + @abstractmethod + def register_client(self, client: TcpClientProtocolInterface) -> None: + pass + + @abstractmethod + def unregister_client(self, client: TcpClientProtocolInterface) -> None: + pass + + @abstractmethod + def get_client(self, ip_addr: str, port: int) -> TcpClientProtocolInterface: + pass + + @abstractmethod + def get_all_clients(self) -> Iterable[TcpClientProtocolInterface]: + pass + + @abstractmethod + def someip_callback(self, client: TcpClientProtocolInterface, someip_message: SomeIpMessage) -> None: + pass + + @abstractmethod + def register_callback(self, callback: Callable[[SomeIpMessage, Tuple[str, int]], None]) -> None: + pass + + +class TcpClientManager(TcpClientManagerInterface): + + def __init__(self): + self._clients: Dict[str, TcpClientProtocolInterface] = {} + self._someip_callback: Callable[[SomeIpMessage, Tuple[str, int]], None] = None + + def _build_key(self, ip_addr: str, port: int) -> str: + return f"{ip_addr}-{port}" + + def register_client(self, client: TcpClientProtocolInterface) -> None: + print(f"Register new client {client.ip_addr}, {client.port}") + self._clients[self._build_key(client.ip_addr, client.port)] = client + + def unregister_client(self, client: TcpClientProtocolInterface) -> None: + print(f"Unregister client {client.ip_addr}, {client.port}") + if self._build_key(client.ip_addr, client.port) in self._clients.keys(): + del self._clients[self._build_key(client.ip_addr, client.port)] + + def get_client(self, ip_addr: str, port: int) -> TcpClientProtocolInterface: + if self._build_key(ip_addr, port) in self._clients.keys(): + return self._clients[self._build_key(ip_addr, port)] + else: + return None + + def get_all_clients(self) -> Iterable[TcpClientProtocolInterface]: + return self._clients.values() + + def someip_callback(self, client: TcpClientProtocolInterface, someip_message: SomeIpMessage) -> None: + if self._someip_callback is not None: + self._someip_callback(someip_message, (client.ip_addr, client.port)) + + def register_callback(self, callback: Callable[[SomeIpMessage, Tuple[str, int]], None]) -> None: + self._someip_callback = callback + +class TcpClientProtocol(asyncio.Protocol, TcpClientProtocolInterface): + + def __init__(self, client_manager: TcpClientManager): + self._client_manager: TcpClientManager = client_manager + self._transport = None + self._ip_addr_client = None + self._port_client = None + self._data_processor = SomeipDataProcessor(datagram_mode=False) + + def connection_made(self, transport: asyncio.BaseTransport): + peername: Tuple = transport.get_extra_info('peername') + print('Connection from {}'.format(peername)) + self._transport = transport + self._ip_addr_client = peername[0] + self._port_client = peername[1] + + self._client_manager.register_client(self) + + def data_received(self, data: bytes): + print('Data received {}: {}'.format(self.ip_addr, data)) + + # Push data to processor + result = self._data_processor.process_data(data) + if result and self._client_manager is not None: + self._client_manager.someip_callback(self, self._data_processor.someip_message) + + def connection_lost(self, _) -> None: + self._client_manager.unregister_client(self) + + def write(self, data: bytes) -> None: + if self._transport is not None: + self._transport.write(data) + + @property + def ip_addr(self) -> str: + return self._ip_addr_client + + @property + def port(self) -> int: + return self._port_client diff --git a/src/someipy/client_service_instance.py b/src/someipy/client_service_instance.py index d9a252f..83357f7 100644 --- a/src/someipy/client_service_instance.py +++ b/src/someipy/client_service_instance.py @@ -1,82 +1,92 @@ import asyncio -from typing import Tuple, Union, Any, Callable +from typing import Tuple, Callable, Set, List +from someipy import Service from someipy._internal.someip_sd_header import ( SdService, TransportLayerProtocol, SdEventGroupEntry, ) from someipy._internal.someip_sd_builder import build_subscribe_eventgroup_entry -from someipy._internal.someip_header import ( - get_payload_from_someip_message, - SomeIpHeader, -) from someipy._internal.service_discovery_abcs import ( ServiceDiscoveryObserver, ServiceDiscoverySender, ) -from someipy._internal.utils import create_udp_socket, EndpointType, DatagramAdapter +from someipy._internal.tcp_client_manager import TcpClientManager, TcpClientProtocol +from someipy._internal.utils import create_udp_socket, EndpointType from someipy._internal.logging import get_logger - +from someipy._internal.message_types import MessageType +from someipy._internal.someip_endpoint import ( + SomeipEndpoint, + TCPSomeipEndpoint, + UDPSomeipEndpoint, + SomeIpMessage, +) _logger = get_logger("client_service_instance") -class ClientServiceInstance(ServiceDiscoveryObserver): - service_id: int - instance_id: int - major_version: int - minor_version: int - endpoint: EndpointType - ttl: int - sd_sender: ServiceDiscoverySender +class ExpectedAck: + def __init__(self, eventgroup_id: int) -> None: + self.eventgroup_id = eventgroup_id - eventgroup_to_subscribe: int - expect_ack: bool - callback: Callable[[bytes], None] +class ClientServiceInstance(ServiceDiscoveryObserver): + _service: Service + _instance_id: int + _endpoint: EndpointType + _protocol: TransportLayerProtocol + _someip_endpoint: SomeipEndpoint + _ttl: int + _sd_sender: ServiceDiscoverySender + + _eventgroups_to_subscribe: Set[int] + _expected_acks: List[ExpectedAck] + _callback: Callable[[bytes], None] def __init__( self, - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, + protocol: TransportLayerProtocol, + someip_endpoint: SomeipEndpoint, ttl: int = 0, sd_sender=None, ): - self.service_id = service_id - self.instance_id = instance_id - self.major_version = major_version - self.minor_version = minor_version - self.endpoint = endpoint - self.ttl = ttl - self.sd_sender = sd_sender - - self.eventgroup_to_subscribe = -1 - self.expect_ack = False - - self.unicast_transport = None - - self.callback = None - - def register_callback(self, callback: Callable[[bytes], None]) -> None: - self.callback = callback - - def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> None: - # TODO: Test if there is a subscription active for the received data - if self.callback is not None: - header = SomeIpHeader.from_buffer(data) - payload = get_payload_from_someip_message(header, data) - self.callback(payload) - - def connection_lost(self, exc: Exception) -> None: - pass + self._service = service + self._instance_id = instance_id + self._endpoint = endpoint + self._protocol = protocol + self._someip_endpoint = someip_endpoint + self._ttl = ttl + self._sd_sender = sd_sender + + self._eventgroups_to_subscribe = set() + self._expected_acks = [] + self._callback = None + + def register_callback(self, callback: Callable[[SomeIpMessage], None]) -> None: + self._callback = callback + + def someip_message_received( + self, someip_message: SomeIpMessage, addr: Tuple[str, int] + ) -> None: + print(someip_message.header) + if ( + someip_message.header.client_id == 0x00 + and someip_message.header.message_type == MessageType.NOTIFICATION.value + and someip_message.header.return_code == 0x00 + ): + if self._callback is not None: + self._callback(someip_message) def subscribe_eventgroup(self, eventgroup_id: int): - # TODO: Currently only one eventgroup per service is supported - self.eventgroup_to_subscribe = eventgroup_id + if eventgroup_id in self._eventgroups_to_subscribe: + _logger.debug( + f"Eventgroup ID {eventgroup_id} is already in subscription list." + ) + self._eventgroups_to_subscribe.add(eventgroup_id) def stop_subscribe_eventgroup(self, eventgroup_id: int): # TODO: Implement StopSubscribe @@ -87,39 +97,48 @@ def find_service_update(self): pass def offer_service_update(self, offered_service: SdService): - if ( - self.eventgroup_to_subscribe != -1 - and offered_service.service_id == self.service_id - and offered_service.instance_id == self.instance_id - ): - ( - session_id, - reboot_flag, - ) = self.sd_sender.get_unicast_session_handler().update_session() - - subscribe_sd_header = build_subscribe_eventgroup_entry( - service_id=self.service_id, - instance_id=self.instance_id, - major_version=self.major_version, - ttl=self.ttl, - event_group_id=self.eventgroup_to_subscribe, - session_id=session_id, - reboot_flag=reboot_flag, - endpoint=self.endpoint, - protocol=TransportLayerProtocol.UDP, - ) - - # TODO: Subscription shall be only active when ACK is received - self.expect_ack = True + if len(self._eventgroups_to_subscribe) == 0: + return - _logger.debug( - f"Send subscribe for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}, evengroup ID: {self.eventgroup_to_subscribe} TTL: {self.ttl}, version: {self.major_version}.{self.minor_version}, session ID: {session_id}" - ) + if self._service.id != offered_service.service_id: + return + if self._instance_id != offered_service.instance_id: + return - self.sd_sender.send_unicast( - buffer=subscribe_sd_header.to_buffer(), - dest_ip=offered_service.endpoint[0], - ) + if ( + offered_service.service_id == self._service.id + and offered_service.instance_id == self._instance_id + ): + for eventgroup_to_subscribe in self._eventgroups_to_subscribe: + ( + session_id, + reboot_flag, + ) = self._sd_sender.get_unicast_session_handler().update_session() + + # Improvement: Pack all entries into a single SD message + subscribe_sd_header = build_subscribe_eventgroup_entry( + service_id=self._service.id, + instance_id=self._instance_id, + major_version=self._service.major_version, + ttl=self._ttl, + event_group_id=eventgroup_to_subscribe, + session_id=session_id, + reboot_flag=reboot_flag, + endpoint=self._endpoint, + protocol=TransportLayerProtocol.UDP, + ) + + _logger.debug( + f"Send subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, " + f"eventgroup ID: {eventgroup_to_subscribe} TTL: {self._ttl}, version: " + f"session ID: {session_id}" + ) + + self._expected_acks.append(ExpectedAck(eventgroup_to_subscribe)) + self._sd_sender.send_unicast( + buffer=subscribe_sd_header.to_buffer(), + dest_ip=offered_service.endpoint[0], + ) def subscribe_eventgroup_update(self, _, __) -> None: # Not needed for client instance @@ -128,32 +147,76 @@ def subscribe_eventgroup_update(self, _, __) -> None: def subscribe_ack_eventgroup_update( self, event_group_entry: SdEventGroupEntry ) -> None: - if self.expect_ack: - self.expect_ack = False - _logger.debug( - f"Received subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" + new_acks: List[ExpectedAck] = [] + ack_found = False + for expected_ack in self._expected_acks: + if expected_ack.eventgroup_id == event_group_entry.eventgroup_id: + ack_found = True + _logger.debug( + f"Received expected subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" + ) + else: + new_acks.append(expected_ack) + + self._expected_acks = new_acks + if not ack_found: + _logger.warn( + f"Received unexpected subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" ) async def construct_client_service_instance( - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, ttl: int = 0, sd_sender=None, + protocol=TransportLayerProtocol.UDP, ) -> ClientServiceInstance: - client_instance = ClientServiceInstance( - service_id, instance_id, major_version, minor_version, endpoint, ttl, sd_sender - ) - - loop = asyncio.get_running_loop() - rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) - - unicast_transport, _ = await loop.create_datagram_endpoint( - lambda: DatagramAdapter(target=client_instance), sock=rcv_socket - ) - client_instance.unicast_transport = unicast_transport + if protocol == TransportLayerProtocol.UDP: + loop = asyncio.get_running_loop() + rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) + + _, udp_endpoint = await loop.create_datagram_endpoint( + lambda: UDPSomeipEndpoint(), sock=rcv_socket + ) + + client_instance = ClientServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.UDP, + udp_endpoint, + ttl, + sd_sender, + ) + + udp_endpoint.set_someip_callback(client_instance.someip_message_received) + + return client_instance + + elif protocol == TransportLayerProtocol.TCP: + tcp_client_manager = TcpClientManager() + loop = asyncio.get_running_loop() + server = await loop.create_server( + lambda: TcpClientProtocol(client_manager=tcp_client_manager), + str(endpoint[0]), + endpoint[1], + ) + + tcp_someip_endpoint = TCPSomeipEndpoint(server, tcp_client_manager) + + server_instance = ClientServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.TCP, + tcp_someip_endpoint, + ttl, + sd_sender, + ) + return server_instance + + client_instance = ClientServiceInstance(service, instance_id, ttl, sd_sender) return client_instance diff --git a/src/someipy/logging.py b/src/someipy/logging.py index df92067..0d76925 100644 --- a/src/someipy/logging.py +++ b/src/someipy/logging.py @@ -1,6 +1,6 @@ import logging -_log_level = logging.INFO +_log_level = logging.DEBUG def set_someipy_log_level(log_level: int): diff --git a/src/someipy/server_service_instance.py b/src/someipy/server_service_instance.py index 35f67c6..7f7884c 100644 --- a/src/someipy/server_service_instance.py +++ b/src/someipy/server_service_instance.py @@ -1,14 +1,19 @@ import asyncio -from dataclasses import dataclass -from typing import List, Callable, Tuple, Union, Any, Dict +from typing import Tuple +from someipy.service import Service + +from someipy._internal.tcp_client_manager import TcpClientManager, TcpClientProtocol from someipy._internal.message_types import MessageType, ReturnCode from someipy._internal.someip_sd_builder import ( build_subscribe_eventgroup_ack_entry, build_offer_service_sd_header, build_subscribe_eventgroup_ack_sd_header, ) -from someipy._internal.someip_header import SomeIpHeader, get_payload_from_someip_message +from someipy._internal.someip_header import ( + SomeIpHeader, + get_payload_from_someip_message, +) from someipy._internal.someip_sd_header import ( SdService, TransportLayerProtocol, @@ -19,137 +24,140 @@ ServiceDiscoveryObserver, ServiceDiscoverySender, ) - from someipy._internal.simple_timer import SimplePeriodicTimer from someipy._internal.utils import ( - DatagramAdapter, create_udp_socket, - endpoint_to_str_int_tuple, EndpointType, + endpoint_to_str_int_tuple, ) from someipy._internal.logging import get_logger +from someipy._internal.subscribers import Subscribers, EventGroupSubscriber +from someipy._internal.someip_endpoint import ( + SomeipEndpoint, + TCPSomeipEndpoint, + UDPSomeipEndpoint, +) _logger = get_logger("server_service_instance") -@dataclass -class EventGroup: - eventgroup_id: int - ttl: int - event_ids: List[int] - - -@dataclass -class EventGroupSubscriber: - eventgroup_id: int - endpoint: EndpointType - -@dataclass -class Method: - method_id: int - method_handler: Callable[[bytes], Tuple[bool, bytes]] - - def __eq__(self, __value: object) -> bool: - return self.method_id == __value.method_id - - class ServerServiceInstance(ServiceDiscoveryObserver): - service_id: int - instance_id: int - major_version: int - minor_version: int - endpoint: EndpointType - ttl: int - - sd_sender: ServiceDiscoverySender - cyclic_offer_delay_ms: int - - eventgroups: List[EventGroup] - subscribers: List[EventGroupSubscriber] - methods: Dict[int, Method] - offer_timer: SimplePeriodicTimer + _service: Service + _instance_id: int + _endpoint: EndpointType + _protocol: TransportLayerProtocol + _someip_endpoint: SomeipEndpoint + _ttl: int + _sd_sender: ServiceDiscoverySender + _cyclic_offer_delay_ms: int + + _subscribers: Subscribers + _offer_timer: SimplePeriodicTimer def __init__( self, - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, - ttl: int = 0, + protocol: TransportLayerProtocol, + someip_endpoint: SomeipEndpoint, + ttl: int = 0, # TTL used for SD Offer entries sd_sender=None, cyclic_offer_delay_ms=2000, ): - self.service_id = service_id - self.instance_id = instance_id - self.major_version = major_version - self.minor_version = minor_version - self.endpoint = endpoint - self.ttl = ttl - self.sd_sender = sd_sender - self.cyclic_offer_delay_ms = cyclic_offer_delay_ms + self._service = service + self._instance_id = instance_id + self._endpoint = endpoint + self._protocol = protocol + self._someip_endpoint = someip_endpoint + self._ttl = ttl + self._sd_sender = sd_sender + self._cyclic_offer_delay_ms = cyclic_offer_delay_ms + + self._subscribers = Subscribers() + self._offer_timer = None - self.sender_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) - - self.eventgroups = [] - self.subscribers = [] - self.methods = dict() - - self.offer_timer = None + def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None: + self._subscribers.update() - def add_eventgroup(self, eventgroup: EventGroup): - ids = [e.eventgroup_id for e in self.eventgroups] - if eventgroup.eventgroup_id not in ids: - self.eventgroups.append(eventgroup) + length = 8 + len(payload) + someip_header = SomeIpHeader( + service_id=self._service.id, + method_id=event_id, + length=length, + client_id=0x00, # TODO + session_id=0x01, # TODO + protocol_version=1, # TODO + interface_version=1, # TODO + message_type=MessageType.NOTIFICATION.value, + return_code=0x00, + ) - def add_method(self, method: Method): - if self.methods.get(method.method_id) is None: - self.methods[method.method_id] = method + for sub in self._subscribers.subscribers: + if sub.eventgroup_id == event_group_id: + _logger.debug( + f"Send event for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X} to {sub.endpoint[0]}:{sub.endpoint[1]}" + ) + self._someip_endpoint.sendto( + someip_header.to_buffer() + payload, + endpoint_to_str_int_tuple(sub.endpoint), + ) - def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> None: + def someip_message_received(self, data: bytes, addr: Tuple[str, int]) -> None: header = SomeIpHeader.from_buffer(data) payload_to_return = bytes() header_to_return = header def send_response(): - self.sender_socket.sendto( - header_to_return.to_buffer() + payload_to_return, - addr) + self.unicast_transport.sendto( + header_to_return.to_buffer() + payload_to_return, addr + ) if header.service_id != self.service_id: - _logger.warn(f"Unknown service ID received from {addr}: ID 0x{header.service_id:04X}") + _logger.warn( + f"Unknown service ID received from {addr}: ID 0x{header.service_id:04X}" + ) header_to_return.message_type = MessageType.RESPONSE.value header_to_return.return_code = ReturnCode.E_UNKNOWN_SERVICE.value send_response() return - + if header.method_id not in self.methods.keys(): - _logger.warn(f"Unknown method ID received from {addr}: ID 0x{header.method_id:04X}") + _logger.warn( + f"Unknown method ID received from {addr}: ID 0x{header.method_id:04X}" + ) header_to_return.message_type = MessageType.RESPONSE.value header_to_return.return_code = ReturnCode.E_UNKNOWN_METHOD.value send_response() return - - # TODO: Test for protocol and interface version - if header.message_type == MessageType.REQUEST.value and header.return_code == 0x00: + # TODO: Test for protocol and interface version + if ( + header.message_type == MessageType.REQUEST.value + and header.return_code == 0x00 + ): payload_in = get_payload_from_someip_message(header, data) - (success, payload_to_return) = self.methods[header.method_id].method_handler(payload_in) + (success, payload_to_return) = self.methods[ + header.method_id + ].method_handler(payload_in) if not success: - _logger.debug(f"Return ERROR message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}") + _logger.debug( + f"Return ERROR message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self._instance_id:04X}" + ) header_to_return.message_type = MessageType.ERROR.value else: - _logger.debug(f"Return RESPONSE message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}") + _logger.debug( + f"Return RESPONSE message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self._instance_id:04X}" + ) header_to_return.message_type = MessageType.RESPONSE.value send_response() else: - _logger.warn(f"Unknown message type received from {addr}: Type 0x{header.message_type:04X}") - - def connection_lost(self, exc: Exception) -> None: - pass + _logger.warn( + f"Unknown message type received from {addr}: Type 0x{header.message_type:04X}" + ) def find_service_update(self): # TODO: implement SD behaviour and send back offer @@ -159,78 +167,64 @@ def offer_service_update(self, _: SdService): # No reaction in a server instance needed pass - def _insert_subscriber(self, new_subscriber: EventGroupSubscriber) -> None: - for s in self.subscribers: - if new_subscriber == s: - return - self.subscribers.append(new_subscriber) - - def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None: - length = 8 + len(payload) - someip_header = SomeIpHeader( - service_id=self.service_id, - method_id=event_id, - length=length, - client_id=0x00, # TODO - session_id=0x01, # TODO - protocol_version=1, # TODO, - interface_version=1, # TODO, - message_type=MessageType.NOTIFICATION.value, - return_code=0x00, - ) - - for sub in self.subscribers: - if sub.eventgroup_id == event_group_id: - self.sender_socket.sendto( - someip_header.to_buffer() + payload, - endpoint_to_str_int_tuple(sub.endpoint), - ) - def subscribe_eventgroup_update( self, sd_event_group: SdEventGroupEntry, ipv4_endpoint_option: SdIPV4EndpointOption, ) -> None: - # eventgroup_ids = [e.eventgroup_id for e in self.eventgroups] - # [PRS_SOMEIPSD_00829] When receiving a SubscribeEventgroupAck or Sub- # scribeEventgroupNack the Service ID, Instance ID, Eventgroup ID, and Major Ver- # sion shall match exactly to the corresponding SubscribeEventgroup Entry to identify - # an Eventgroup of a Service Instance.c + # an Eventgroup of a Service Instance. + # TODO: enable major version check + if sd_event_group.sd_entry.service_id != self._service.id: + return + if sd_event_group.sd_entry.instance_id != self._instance_id: + return + if sd_event_group.eventgroup_id not in self._service.eventgroupids: + return - if ( - sd_event_group.sd_entry.service_id == self.service_id - and sd_event_group.sd_entry.instance_id == self.instance_id - # and sd_event_group.eventgroup_id in eventgroup_ids - ): - ( - session_id, - reboot_flag, - ) = self.sd_sender.get_unicast_session_handler().update_session() - # TODO: enable major version check - # sd_event_group.sd_entry.major_version == self.major_version: - # print("Service received subscribe entry -> send back ACK") - ack_entry = build_subscribe_eventgroup_ack_entry( - service_id=sd_event_group.sd_entry.service_id, - instance_id=sd_event_group.sd_entry.instance_id, - major_version=sd_event_group.sd_entry.major_version, - ttl=sd_event_group.sd_entry.ttl, - event_group_id=sd_event_group.eventgroup_id, - ) - header_output = build_subscribe_eventgroup_ack_sd_header( - entry=ack_entry, session_id=session_id, reboot_flag=reboot_flag + if ipv4_endpoint_option.protocol != self._protocol: + _logger.warn( + f"Subscribing a different protocol (TCP/UDP) than offered is not supported. Received subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X} " + "from {ipv4_endpoint_option.ipv4_address}/{ipv4_endpoint_option.port} with wrong protocol" ) + return - self.sd_sender.send_unicast( - buffer=header_output.to_buffer(), - dest_ip=ipv4_endpoint_option.ipv4_address, - ) + ( + session_id, + reboot_flag, + ) = self._sd_sender.get_unicast_session_handler().update_session() + + _logger.debug( + f"Send Subscribe ACK for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, TTL: {sd_event_group.sd_entry.ttl}" + ) + ack_entry = build_subscribe_eventgroup_ack_entry( + service_id=sd_event_group.sd_entry.service_id, + instance_id=sd_event_group.sd_entry.instance_id, + major_version=sd_event_group.sd_entry.major_version, + ttl=sd_event_group.sd_entry.ttl, + event_group_id=sd_event_group.eventgroup_id, + ) + header_output = build_subscribe_eventgroup_ack_sd_header( + entry=ack_entry, session_id=session_id, reboot_flag=reboot_flag + ) + + self._sd_sender.send_unicast( + buffer=header_output.to_buffer(), + dest_ip=ipv4_endpoint_option.ipv4_address, + ) - subscriber = EventGroupSubscriber( + self._subscribers.add_subscriber( + EventGroupSubscriber( eventgroup_id=sd_event_group.eventgroup_id, - endpoint=(ipv4_endpoint_option.ipv4_address, ipv4_endpoint_option.port), + endpoint=( + ipv4_endpoint_option.ipv4_address, + ipv4_endpoint_option.port, + ), + ttl=sd_event_group.sd_entry.ttl, ) - self._insert_subscriber(subscriber) + ) def subscribe_ack_eventgroup_update(self, _: SdEventGroupEntry) -> None: # Not needed for server instance @@ -240,63 +234,91 @@ def offer_timer_callback(self): ( session_id, reboot_flag, - ) = self.sd_sender.get_multicast_session_handler().update_session() + ) = self._sd_sender.get_multicast_session_handler().update_session() _logger.debug( - f"Offer service for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}, TTL: {self.ttl}, version: {self.major_version}.{self.minor_version}, session ID: {session_id}" + f"Offer service for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, TTL: {self._ttl}, version: {self._service.major_version}.{self._service.minor_version}, session ID: {session_id}" ) service_to_offer = SdService( - service_id=self.service_id, - instance_id=self.instance_id, + service_id=self._service.id, + instance_id=self._instance_id, major_version=1, minor_version=0, - ttl=self.ttl, - endpoint=self.endpoint, - protocol=TransportLayerProtocol.UDP, + ttl=self._ttl, + endpoint=self._endpoint, + protocol=self._protocol, ) sd_header = build_offer_service_sd_header( service_to_offer, session_id, reboot_flag ) - self.sd_sender.send_multicast(sd_header.to_buffer()) + self._sd_sender.send_multicast(sd_header.to_buffer()) def start_offer(self): - self.offer_timer = SimplePeriodicTimer( - self.cyclic_offer_delay_ms / 1000.0, self.offer_timer_callback + self._offer_timer = SimplePeriodicTimer( + self._cyclic_offer_delay_ms / 1000.0, self.offer_timer_callback ) - self.offer_timer.start() + self._offer_timer.start() async def stop_offer(self): _logger.debug( - f"Stop offer for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}" + f"Stop offer for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}" ) - if self.offer_timer is not None: - self.offer_timer.stop() - await self.offer_timer.task + if self._offer_timer is not None: + self._offer_timer.stop() + await self._offer_timer.task # TODO: send out a stop offer sd message before stopping the timer + async def construct_server_service_instance( - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, ttl: int = 0, sd_sender=None, - cyclic_offer_delay_ms=2000 + cyclic_offer_delay_ms=2000, + protocol=TransportLayerProtocol.UDP, ) -> ServerServiceInstance: - - server_instance = ServerServiceInstance( - service_id, instance_id, major_version, minor_version, endpoint, ttl, sd_sender, cyclic_offer_delay_ms - ) + if protocol == TransportLayerProtocol.UDP: + loop = asyncio.get_running_loop() + rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) - loop = asyncio.get_running_loop() - rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) + _, udp_endpoint = await loop.create_datagram_endpoint( + lambda: UDPSomeipEndpoint(), sock=rcv_socket + ) - unicast_transport, _ = await loop.create_datagram_endpoint( - lambda: DatagramAdapter(target=server_instance), sock=rcv_socket - ) - server_instance.unicast_transport = unicast_transport + server_instance = ServerServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.UDP, + udp_endpoint, + ttl, + sd_sender, + cyclic_offer_delay_ms, + ) + return server_instance + + elif protocol == TransportLayerProtocol.TCP: + tcp_client_manager = TcpClientManager() + loop = asyncio.get_running_loop() + server = await loop.create_server( + lambda: TcpClientProtocol(client_manager=tcp_client_manager), + str(endpoint[0]), + endpoint[1], + ) - return server_instance + tcp_someip_endpoint = TCPSomeipEndpoint(server, tcp_client_manager) + + server_instance = ServerServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.TCP, + tcp_someip_endpoint, + ttl, + sd_sender, + cyclic_offer_delay_ms, + ) + return server_instance diff --git a/src/someipy/service.py b/src/someipy/service.py new file mode 100644 index 0000000..bbde6e4 --- /dev/null +++ b/src/someipy/service.py @@ -0,0 +1,72 @@ +from dataclasses import dataclass +from typing import List, Callable, Tuple, Dict + + +@dataclass +class Method: + id: int + method_handler: Callable[[bytes], Tuple[bool, bytes]] + + def __eq__(self, __value: object) -> bool: + return self.method_id == __value.method_id + + +@dataclass +class EventGroup: + id: int + event_ids: List[int] + + +@dataclass +class Service: + id: int + major_version: int + minor_version: int + + methods: Dict[int, Method] + eventgroups: Dict[int, EventGroup] + + def __init__(self): + self.id = 0 + self.major_version = 0 + self.minor_version = 0 + self.methods = dict() + self.eventgroups = dict() + + @property + def eventgroupids(self) -> List[int]: + return self.eventgroups.keys() + + @property + def methodids(self) -> List[int]: + return self.methods.keys() + + +class ServiceBuilder: + def __init__(self): + self._service: Service = Service() + + def with_service_id(self, id: int): + self._service.id = id + return self + + def with_major_version(self, major_version: int): + self._service.major_version = major_version + return self + + def with_minor_version(self, minor_version: int): + self._service.minor_version = minor_version + return self + + def with_method(self, method: Method): + if self._service.methods.get(method.id) is None: + self._service.methods[method.id] = method + return self + + def with_eventgroup(self, eventgroup: EventGroup): + if self._service.eventgroups.get(eventgroup.id) is None: + self._service.eventgroups[eventgroup.id] = eventgroup + return self + + def build(self): + return self._service diff --git a/src/someipy/service_discovery.py b/src/someipy/service_discovery.py index e18a8ca..34b87d4 100644 --- a/src/someipy/service_discovery.py +++ b/src/someipy/service_discovery.py @@ -3,15 +3,23 @@ from typing import Any, Union, Tuple, List from someipy._internal.someip_header import SomeIpHeader -from someipy._internal.someip_sd_header import * -from someipy._internal.someip_sd_extractors import * +from someipy._internal.someip_sd_header import SomeIpSdHeader +from someipy._internal.someip_sd_extractors import ( + extract_offered_services, + extract_subscribe_eventgroup_entries, + extract_subscribe_ack_eventgroup_entries, +) from someipy._internal.session_handler import SessionHandler from someipy._internal.utils import ( create_rcv_multicast_socket, create_udp_socket, DatagramAdapter, ) -from someipy._internal.service_discovery_abcs import * +from someipy._internal.service_discovery_abcs import ( + ServiceDiscoveryObserver, + ServiceDiscoverySender, + ServiceDiscoverySubject, +) from someipy._internal.logging import get_logger _logger = get_logger("service_discovery") @@ -85,11 +93,13 @@ def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> N for o in self.attached_observers: o.subscribe_eventgroup_update(event_group_entry, ipv4_endpoint_option) - for event_group_entry in extract_subscribe_ack_eventgroup_entries(someip_sd_header): + for event_group_entry in extract_subscribe_ack_eventgroup_entries( + someip_sd_header + ): _logger.debug( f"Received subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" ) - #for o in self.attached_observers: + # for o in self.attached_observers: # o.sub def connection_lost(self, exc: Exception) -> None: diff --git a/test_apps/CMakeLists.txt b/test_apps/CMakeLists.txt new file mode 100644 index 0000000..1eac756 --- /dev/null +++ b/test_apps/CMakeLists.txt @@ -0,0 +1,26 @@ +cmake_minimum_required (VERSION 3.13) + +project(someipy_test_apps) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") + +find_package(vsomeip3 CONFIG REQUIRED) + +set(CMAKE_INSTALL_PREFIX "${CMAKE_SOURCE_DIR}/install") + +function(create_test_target target_name) + + add_executable(${target_name} ${target_name}/${target_name}.cpp) + target_link_libraries(${target_name} ${VSOMEIP_LIBRARIES} pthread) + + install(TARGETS ${target_name} + RUNTIME DESTINATION ${target_name}) + + file(GLOB ADDITIONAL_FILES "${CMAKE_SOURCE_DIR}/${target_name}/*.json" "${CMAKE_SOURCE_DIR}/${target_name}/*.sh") + install(FILES ${ADDITIONAL_FILES} DESTINATION ${target_name}) + +endfunction() + +# The same app can be used for UDP and TCP for the "send_events" example app +create_test_target("send_events") + +create_test_target("receive_events_udp") diff --git a/test_apps/README.md b/test_apps/README.md new file mode 100644 index 0000000..832b7bf --- /dev/null +++ b/test_apps/README.md @@ -0,0 +1,88 @@ +## Build Test Applications using CMake + +```bash +rm -rf build install && mkdir -p build && cd build && cmake .. && make && make install && cd .. +``` + +## Setup Python Pip Package from Source + +```bash +python3.12 -m pip install -e . +``` + +## Test Procedure + +### Network Setup Linux + +```bash +sudo ip addr add 127.0.0.2/8 dev lo +sudo ip addr add 224.224.224.245 dev lo autojoin +``` + +### 1. send_events_udp + +Open two terminals (one for Python someipy example app, one for test app). + +Start apps: + +```bash +python3.12 send_events_udp.py +bash ./install/send_events/start_app.sh +``` + +Expected in test app: +- Expected log for UDP endpoint on port 3000: + - *endpoint_manager_impl::create_remote_client: 127.0.0.1:3000 reliable: 0 using local port: 0* + - *udp_client_endpoint_impl::connect: SO_RCVBUF is: 212992 (1703936) local port:0 remote:127.0.0.1:3000* +- Receive event with frequency 1Hz +- Service ID: 0x1234 +- Instance ID: 0x5678 +- Eventgroup ID: 0x0321 +- Event ID: 0x0123 +- SD Offer is sent with 0.5Hz + + +### 1. send_events_tcp + +Open two terminals (one for Python someipy example app, one for test app). + +Start apps: + +```bash +python3.12 send_events_tcp.py +bash ./install/send_events/start_app.sh +``` + +Expected in test app: +- Expected log for TCP endpoint on port 3000: + - *endpoint_manager_impl::create_remote_client: 127.0.0.1:3000 reliable: 1 using local port: 0* + - Check for "reliable == 1" in above log (i.e. TCP) + - No "udp_client_endpoint_impl::connect" shall appear +- Receive event with frequency 1Hz +- Service ID: 0x1234 +- Instance ID: 0x5678 +- Eventgroup ID: 0x0321 +- Event ID: 0x0123 +- SD Offer is sent with 0.5Hz + +### 1. receive_events_udp + +Open two terminals (one for Python someipy example app, one for test app). + +Start apps: + +```bash +python3.12 receive_events_udp.py +bash install/receive_events_udp/start_app.sh +``` + +Expected in test app: +- Expected log for UDP endpoint: REMOTE SUBSCRIBE(0000): [1234.5678.0321] from 127.0.0.1:3002 unreliable was accepted + +Expected in example app: +- Receive event with frequency 1Hz +- Service ID: 0x1234 +- Instance ID: 0x5678 +- Eventgroup ID: 0x0321 +- Event ID: 0x0123 +- SD Offer is sent with 0.5Hz \ No newline at end of file diff --git a/test_apps/build_test_apps.sh b/test_apps/build_test_apps.sh new file mode 100755 index 0000000..0d80bb7 --- /dev/null +++ b/test_apps/build_test_apps.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +rm -rf build install +mkdir -p build +cd build +cmake .. +make +make install +cd .. \ No newline at end of file diff --git a/test_apps/receive_events_udp/receive_events_udp.cpp b/test_apps/receive_events_udp/receive_events_udp.cpp new file mode 100644 index 0000000..bcd9239 --- /dev/null +++ b/test_apps/receive_events_udp/receive_events_udp.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +#define SAMPLE_SERVICE_ID 0x1234 +#define SAMPLE_INSTANCE_ID 0x5678 +#define SAMPLE_EVENTGROUP_ID 0x0321 +#define SAMPLE_EVENT_ID 0x0123 + +std::shared_ptr payload_; +std::shared_ptr app_; +std::set its_groups; + + +void offer() { + app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 1, 0); + app_->offer_event( + SAMPLE_SERVICE_ID, + SAMPLE_INSTANCE_ID, + SAMPLE_EVENT_ID, + its_groups, + vsomeip::event_type_e::ET_FIELD, std::chrono::milliseconds::zero(), + false, true, nullptr, vsomeip::reliability_type_e::RT_UNKNOWN); +} + +void stop_offer() { + app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); +} + +void run() { + vsomeip::byte_t its_data[10]; + uint32_t its_size = 1; + uint32_t loop_counter = 0; + + while (true) { + for (uint32_t i = 0; i < its_size; ++i) { + its_data[i] = static_cast((loop_counter + i) % 255); + } + loop_counter++; + + payload_->set_data(its_data, its_size); + std::cout << "Setting event (Length=" << std::dec << its_size << ")." << std::endl; + app_->notify(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENT_ID, payload_); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } +} + +int main(int argc, char **argv) { + its_groups.insert(SAMPLE_EVENTGROUP_ID); + + app_ = vsomeip::runtime::get()->create_application("Hello"); + app_->init(); + payload_ = vsomeip::runtime::get()->create_payload(); + offer(); + std::thread receiver(run); + app_->start(); +} \ No newline at end of file diff --git a/test_apps/receive_events_udp/start_app.sh b/test_apps/receive_events_udp/start_app.sh new file mode 100755 index 0000000..ae56694 --- /dev/null +++ b/test_apps/receive_events_udp/start_app.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +script_dir="$(dirname "$0")" + +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/projects/someip/vsomeip_install/lib/ +VSOMEIP_CONFIGURATION="${script_dir}/vsomeip-client.json" "${script_dir}/receive_events_udp" \ No newline at end of file diff --git a/test_apps/receive_events_udp/vsomeip-client.json b/test_apps/receive_events_udp/vsomeip-client.json new file mode 100644 index 0000000..fbe5a0c --- /dev/null +++ b/test_apps/receive_events_udp/vsomeip-client.json @@ -0,0 +1,54 @@ +{ + "unicast" : "127.0.0.2", + "logging" : + { + "level" : "verbose", + "console" : "true", + "file" : { "enable" : "false", "path" : "./vsomeip.log" }, + "dlt" : "false" + }, + "applications" : + [ + { + "name" : "Hello", + "id" : "0x1313" + } + ], + "services" : + [ + { + "service" : "0x1234", + "instance" : "0x5678", + "unreliable" : "30509", + "events" : [ + { + "event" : "0x8778", + "is_field" : "false", + "is_reliable" : "false", + "update-cycle" : "0" + } + ], + "eventgroups": [ + { + "eventgroup" : "0x0321", + "events" : [ "0x0123" ] + } + ] + } + ], + "routing" : "Hello", + "service-discovery" : + { + "enable" : "true", + "multicast" : "224.224.224.245", + "port" : "30490", + "protocol" : "udp", + "initial_delay_min" : "10", + "initial_delay_max" : "100", + "repetitions_base_delay" : "200", + "repetitions_max" : "3", + "ttl" : "3", + "cyclic_offer_delay" : "2000", + "request_response_delay" : "1500" + } +} \ No newline at end of file diff --git a/test_apps/send_events/send_events.cpp b/test_apps/send_events/send_events.cpp new file mode 100644 index 0000000..b77e774 --- /dev/null +++ b/test_apps/send_events/send_events.cpp @@ -0,0 +1,85 @@ +#include +#include +#include +#include +#include +#include + +#define SAMPLE_SERVICE_ID 0x1234 +#define SAMPLE_INSTANCE_ID 0x5678 +#define SAMPLE_EVENTGROUP_ID 0x0321 +#define SAMPLE_EVENT_ID 0x0123 + +std::shared_ptr app; +std::mutex mutex; +std::condition_variable condition; +bool service_available = false; + +void run() +{ + while (true) + { + std::unique_lock its_lock(mutex); + condition.wait(its_lock, [] + { return service_available; }); + + service_available = false; + its_lock.unlock(); + + std::cout << "Run triggered." << std::endl; + + std::set its_groups; + its_groups.insert(SAMPLE_EVENTGROUP_ID); + app->request_event(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENT_ID, its_groups); + app->subscribe(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENTGROUP_ID); + } +} + +void on_message(const std::shared_ptr &_response) +{ + std::stringstream its_message; + its_message << "CLIENT: received a notification for event [" + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_service() << "." + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_instance() << "." + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_method() << "] to Client/Session [" + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_client() << "/" + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_session() + << "] = "; + + std::shared_ptr its_payload = _response->get_payload(); + its_message << "(" << std::dec << its_payload->get_length() << ") "; + for (uint32_t i = 0; i < its_payload->get_length(); ++i) + its_message << std::hex << std::setw(2) << std::setfill('0') + << (int)its_payload->get_data()[i] << " "; + std::cout << its_message.str() << std::endl; +} + +void on_availability(vsomeip::service_t _service, vsomeip::instance_t _instance, bool _is_available) +{ + std::cout << "CLIENT: Service ID / Instance ID [" + << std::setw(4) << std::setfill('0') << std::hex << _service << "." << _instance + << "] is " + << (_is_available ? "available." : "NOT available.") + << std::endl; + { + std::lock_guard l{mutex}; + service_available = _is_available; + } + condition.notify_one(); +} + +int main() +{ + app = vsomeip::runtime::get()->create_application("Hello"); + app->init(); + app->register_availability_handler(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, on_availability); + app->request_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + app->register_message_handler(vsomeip::ANY_SERVICE, vsomeip::ANY_INSTANCE, vsomeip::ANY_METHOD, on_message); + std::thread receiver(run); + app->start(); +} \ No newline at end of file diff --git a/test_apps/send_events/start_app.sh b/test_apps/send_events/start_app.sh new file mode 100755 index 0000000..31a300a --- /dev/null +++ b/test_apps/send_events/start_app.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +script_dir="$(dirname "$0")" + +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/projects/someip/vsomeip_install/lib/ +VSOMEIP_CONFIGURATION="${script_dir}/vsomeip-client.json" "${script_dir}/send_events" \ No newline at end of file diff --git a/test_apps/send_events/vsomeip-client.json b/test_apps/send_events/vsomeip-client.json new file mode 100644 index 0000000..c0cab8c --- /dev/null +++ b/test_apps/send_events/vsomeip-client.json @@ -0,0 +1,32 @@ +{ + "unicast" : "127.0.0.2", + "logging" : + { + "level" : "verbose", + "console" : "true", + "file" : { "enable" : "false", "path" : "./vsomeip.log" }, + "dlt" : "false" + }, + "applications" : + [ + { + "name" : "Hello", + "id" : "0x1313" + } + ], + "routing" : "Hello", + "service-discovery" : + { + "enable" : "true", + "multicast" : "224.224.224.245", + "port" : "30490", + "protocol" : "udp", + "initial_delay_min" : "10", + "initial_delay_max" : "100", + "repetitions_base_delay" : "200", + "repetitions_max" : "3", + "ttl" : "3", + "cyclic_offer_delay" : "2000", + "request_response_delay" : "1500" + } +} \ No newline at end of file diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 138ec74..73f1eb3 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -1,8 +1,19 @@ -import sys -sys.path.append("src") - -import pytest -from someipy.serialization import * +from dataclasses import dataclass +from someipy.serialization import ( + Uint8, + Uint16, + Uint32, + Uint64, + Sint8, + Sint16, + Sint32, + Sint64, + Bool, + Float32, + Float64, + SomeIpPayload, + SomeIpFixedSizeArray, +) def test_base_types_len(): @@ -123,7 +134,7 @@ def test_fixed_size_array_serialization_and_deserialization(): a.data[2] = Uint8(3) a.data[3] = Uint8(4) # Expected hex: 0x 01 02 03 04 - assert bytes.fromhex("01020304") == a.serialize() + assert bytes.fromhex("01020304") == a.serialize() a_again = SomeIpFixedSizeArray(Uint8, 4).deserialize(bytes.fromhex("01020304")) assert a_again == a diff --git a/tests/test_someip_data_processor.py b/tests/test_someip_data_processor.py new file mode 100644 index 0000000..1c37b87 --- /dev/null +++ b/tests/test_someip_data_processor.py @@ -0,0 +1,73 @@ +import pytest +from someipy._internal.someip_data_processor import SomeipDataProcessor +from someipy._internal.someip_header import SomeIpHeader +from someipy._internal.message_types import MessageType +from someipy._internal.someip_message import SomeIpMessage + + +@pytest.fixture +def valid_someip_message() -> SomeIpMessage: + # length is 8 bytes + payload size + payload_size = 64 + length = 8 + payload_size + + someip_header = SomeIpHeader( + service_id=1, + method_id=2, + length=length, + client_id=3, + session_id=4, + protocol_version=1, + interface_version=2, + message_type=MessageType.REQUEST.value, + return_code=0x00, + ) + payload = b"\x00" * payload_size + return SomeIpMessage(header=someip_header, payload=payload) + + +@pytest.fixture +def corrupt_someip_message() -> SomeIpMessage: + # length is 8 bytes + payload size + payload_size = 64 + length = 8 + payload_size + 1 # +1 for corrupt header + + someip_header = SomeIpHeader( + service_id=1, + method_id=2, + length=length, + client_id=3, + session_id=4, + protocol_version=1, + interface_version=2, + message_type=MessageType.REQUEST.value, + return_code=0x00, + ) + payload = b"\x00" * payload_size + return SomeIpMessage(header=someip_header, payload=payload) + + +def test_process_with_datagrams(valid_someip_message): + data = valid_someip_message.header.to_buffer() + valid_someip_message.payload + processor = SomeipDataProcessor(datagram_mode=True) + result = processor.process_data(data) + + assert result is True + assert processor.someip_message.header == valid_someip_message.header + assert processor._expected_bytes == 0 + assert len(processor._buffer) == 0 + + result = processor.process_data(data) + assert result is True + assert processor.someip_message.header == valid_someip_message.header + assert processor._expected_bytes == 0 + assert len(processor._buffer) == 0 + + +def test_process_with_malformed_datagrams(corrupt_someip_message): + data = corrupt_someip_message.header.to_buffer() + corrupt_someip_message.payload + processor = SomeipDataProcessor(datagram_mode=True) + + result = processor.process_data(data) + + assert result is False diff --git a/tests/test_subscribers.py b/tests/test_subscribers.py new file mode 100644 index 0000000..cfc3f55 --- /dev/null +++ b/tests/test_subscribers.py @@ -0,0 +1,64 @@ +import time +from someipy._internal.subscribers import EventGroupSubscriber, Subscribers + +def test_eventgroupsubscriber_equal(): + e_1 = ("123", 5) + e_2 = ("123", 5) + e_3 = ("456", 4) + + s_1 = EventGroupSubscriber(1, e_1, 500) + s_2 = EventGroupSubscriber(1, e_2, 600) + assert s_1 == s_2 + + s_2.endpoint = e_3 + assert s_1 != s_2 + + s_2.endpoint = e_2 + assert s_1 == s_2 + + s_2.eventgroup_id = 2 + assert s_1 != s_2 + + +def test_add_remove_update(): + + subscriber = EventGroupSubscriber(eventgroup_id=1, endpoint=("123", 1), ttl=1) + + subscribers = Subscribers() + initial_ts = subscriber.last_ts_ms + time.sleep(0.1) + + assert len(subscribers.subscribers) == 0 + subscribers.add_subscriber(subscriber) + assert initial_ts != subscribers.subscribers[0].last_ts_ms + assert len(subscribers.subscribers) == 1 + + # Add same subscriber. Length shall not change, but timestamp shall be updated + initial_ts = subscribers.subscribers[0].last_ts_ms + time.sleep(0.2) + subscribers.add_subscriber(subscriber) + assert initial_ts != subscribers.subscribers[0].last_ts_ms + assert len(subscribers.subscribers) == 1 + + subscriber_2 = EventGroupSubscriber(eventgroup_id=2, endpoint=("123", 1), ttl=1) + subscribers.add_subscriber(subscriber_2) + assert len(subscribers.subscribers) == 2 + + subscriber_3 = EventGroupSubscriber(eventgroup_id=2, endpoint=("123", 1), ttl=1) + subscribers.remove_subscriber(subscriber_3) + assert len(subscribers.subscribers) == 1 + + subscriber_4 = EventGroupSubscriber(eventgroup_id=1, endpoint=("123", 1), ttl=1) + subscribers.remove_subscriber(subscriber_4) + assert len(subscribers.subscribers) == 0 + + subscribers.add_subscriber(EventGroupSubscriber(eventgroup_id=1, endpoint=("123", 1), ttl=1)) + subscribers.add_subscriber(EventGroupSubscriber(eventgroup_id=2, endpoint=("123", 1), ttl=3)) + subscribers.add_subscriber(EventGroupSubscriber(eventgroup_id=3, endpoint=("123", 1), ttl=2)) + assert len(subscribers.subscribers) == 3 + time.sleep(1.1) + subscribers.update() + assert len(subscribers.subscribers) == 2 + time.sleep(3) + subscribers.update() + assert len(subscribers.subscribers) == 0 \ No newline at end of file