From aabd0d243184326a5a78f510656dc5f101c8d9e4 Mon Sep 17 00:00:00 2001 From: chrizog Date: Sun, 17 Mar 2024 17:13:19 +0100 Subject: [PATCH 1/3] Initial version of TCP support for sending events --- README.md | 3 +- article.md | 48 +++ .../{send_events.py => send_events_tcp.py} | 83 +++--- example_apps/send_events_udp.py | 110 +++++++ src/someipy/__init__.py | 2 + .../_internal/service_discovery_abcs.py | 3 +- .../_internal/someip_data_processor.py | 63 ++++ src/someipy/_internal/someip_endpoint.py | 94 ++++++ src/someipy/_internal/someip_header.py | 2 + src/someipy/_internal/someip_message.py | 7 + src/someipy/_internal/subscribers.py | 58 ++++ src/someipy/_internal/tcp_client_manager.py | 123 ++++++++ src/someipy/server_service_instance.py | 274 ++++++++++-------- src/someipy/service.py | 72 +++++ src/someipy/service_discovery.py | 20 +- tests/test_serialization.py | 23 +- tests/test_someip_data_processor.py | 73 +++++ tests/test_subscribers.py | 64 ++++ 18 files changed, 940 insertions(+), 182 deletions(-) create mode 100644 article.md rename example_apps/{send_events.py => send_events_tcp.py} (59%) create mode 100644 example_apps/send_events_udp.py create mode 100644 src/someipy/_internal/someip_data_processor.py create mode 100644 src/someipy/_internal/someip_endpoint.py create mode 100644 src/someipy/_internal/someip_message.py create mode 100644 src/someipy/_internal/subscribers.py create mode 100644 src/someipy/_internal/tcp_client_manager.py create mode 100644 src/someipy/service.py create mode 100644 tests/test_someip_data_processor.py create mode 100644 tests/test_subscribers.py diff --git a/README.md b/README.md index 133fdc5..3638442 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,7 @@ The library is still under development. The current major limitations and deviat ### SOME/IP - Events (and field notifiers) are supported. -- Methods are only supported on server side (offering methods). Calling methods (clients) are not supported yet. -- Only UDP services are supported. +- Methods are only supported on server side (offering methods). Calling methods (clients) is not supported yet. - Only unicast services are supported. - SOME/IP-TP is not supported. - IPv6 endpoints are not supported. diff --git a/article.md b/article.md new file mode 100644 index 0000000..230e3a0 --- /dev/null +++ b/article.md @@ -0,0 +1,48 @@ +Everybody needs a "five minutes" article to get started with a new topic. Here is a quick overview about SOME/IP. + +## SOME/IP + +SOME/IP stands for "**S**calable service-**O**riented **M**iddlewar**E** over **IP**". It is a communication protocol which targets embedded projects and typically the automotive industry. It is driven by the AUTOSAR (**AUT**omotive **O**pen **S**ystem **AR**chitecture) consortium. Therefore you can find the specification of the SOME/IP communication protocol on [autosar.org](autosar.org), e.g. [Release 22-11](https://www.autosar.org/fileadmin/standards/R22-11/FO/AUTOSAR_PRS_SOMEIPProtocol.pdf). + +## Use Cases + +SOME/IP is typically used for **inter-ECU** communication. Since SOME/IP works over UDP or TCP, it does not matter which operating system is running on the ECUs or PCs. It could Linux, Windows, QNX, AUTOSAR Classic or even a bare-metal microcontroller as long as UDP or TCP is supported. + +For **intra-ECU** communication, using an IPC mechanism is recommended. The SOME/IP serialization for communication on the same ECU is not needed. Therefore an IPC mechanism is the better choice, e.g. Unix Domain Sockets or Shared Memory communication. + +Here is an example network topology showing ECUs and PCs using different operating operating systems. The devices could be connected via switched Ethernet. + +![Network Topology for SOME/IP](./network_topology.svg) + +## Services + +In SOME/IP everything is based on **Services**. A service is a functionality offered by an ECU (= server). A Server can offer multiple services. Clients, i.e. other ECUs can use the offered services. + +### Methods + +For example one ECU could offer a "Calculator service" via SOME/IP. Clients could call the Calculator Service remotely and pass an operation code (add/subtract/multiply/divide) and two operands. The Server will calculate the result and send back a response to the calling client. This type of Service is called a **Method** in SOME/IP. It describes a request-response-communication. + +### Notification Events + +Another type of Services are Notification Events. If one ECU has a button connected and wants to notify other ECUs everytime the button is pressed, it can offer a SOME/IP containing an Event. Other ECUs can subscribe to this service and will receive a notification everytime the button is pressed. Notification Events in SOME/IP describe a publish/subscribe-communication. + +### Services, Instances and IDs + +In SOME/IP Services can be seen as a schema similar to a class in object oriented programming. Services are instantiated into Service instances allowing ECUs to offer multiple instances of the same service. E.g. if there a three buttons connected, the ECU could simply offer three service instances all providing the "button press service". + +Services, Instances, Methods and Events are identified by IDs (numbers) in SOME/IP. In other middlewares, e.g. in ROS (Robot Operating System) instances (ROS topics) are identified by names. + +## On-wire serialization + +Besides providing functionality as Methods and Notification Events, SOME/IP also takes care of the on-wire serialization of data. SOME/IP allows you to use basic datatypes (like unsigned/signed integers and floating point numbers) and strings or define structures and arrays. SOME/IP takes care of properly serializing data on sender-side in order to transport it via UDP or TCP and deserialize data on reception. + +## SOME/IP SD (Service Discovery) + +While it is possible to statically define a network topology and all IDs in a system beforehand, a major feature of SOME/IP is **Service Discovery**. Service Discovery allows a Service to announce itself on the network to potential clients by sending SOME/IP SD "Offer Entries" via UDP multicast. + +Clients can search for services by sending Find Entries via multicast. In case the offered and request IDs match, server and client can dynamically find each other without knowing the IP addresses of each ECU beforehand. For applications using SOME/IP the network topology is therefore transparent which allows it to develop portable applications being only dependent on Service, Service Instance and Method/Event IDs. + + +## Implementations + +## Further Reading \ No newline at end of file diff --git a/example_apps/send_events.py b/example_apps/send_events_tcp.py similarity index 59% rename from example_apps/send_events.py rename to example_apps/send_events_tcp.py index 2d02c34..086a341 100644 --- a/example_apps/send_events.py +++ b/example_apps/send_events_tcp.py @@ -2,6 +2,8 @@ import ipaddress import logging +from someipy import TransportLayerProtocol +from someipy.service import ServiceBuilder, EventGroup from someipy.service_discovery import construct_service_discovery from someipy.server_service_instance import construct_server_service_instance from someipy.logging import set_someipy_log_level @@ -12,67 +14,67 @@ SD_PORT = 30490 INTERFACE_IP = "127.0.0.1" -SAMPLE_EVENTGROUP_ID = 20 -SAMPLE_EVENT_ID = 32796 +SAMPLE_SERVICE_ID = 0x1234 +SAMPLE_INSTANCE_ID = 0x5678 +SAMPLE_EVENTGROUP_ID = 0x0321 +SAMPLE_EVENT_ID = 0x0123 -async def main(): +async def main(): # It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, .. set_someipy_log_level(logging.DEBUG) # Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function # use the construct_service_discovery function # The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set - service_discovery = await construct_service_discovery(SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP) - - # 1. For sending events use a ServerServiceInstance - # 2. Pass the service and instance ID, version and endpoint and TTL. The endpoint is needed again as the src-address - # and port of all sent events - # 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ServerServiceInstance can offer his service to - # other ECUs - # 4. cyclic_offer_delay_ms is the period of sending cyclic SD Offer service entries + service_discovery = await construct_service_discovery( + SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP + ) + + temperature_eventgroup = EventGroup( + id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID] + ) + temperature_service = ( + ServiceBuilder() + .with_service_id(SAMPLE_SERVICE_ID) + .with_major_version(1) + .with_eventgroup(temperature_eventgroup) + .build() + ) + + # For sending events use a ServerServiceInstance service_instance_temperature = await construct_server_service_instance( - service_id=1, - instance_id=1000, - major_version=1, - minor_version=0, - endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3000), # src IP and port of the service + temperature_service, + instance_id=SAMPLE_INSTANCE_ID, + endpoint=( + ipaddress.IPv4Address(INTERFACE_IP), + 3000, + ), # src IP and port of the service ttl=5, sd_sender=service_discovery, - cyclic_offer_delay_ms=2000 + cyclic_offer_delay_ms=2000, + protocol=TransportLayerProtocol.TCP ) # The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance # is notified by the ServiceDiscoveryProtocol about e.g. subscriptions from other ECUs service_discovery.attach(service_instance_temperature) - # For demonstration purposes we will construct a second ServerServiceInstance - service_instance_2 = await construct_server_service_instance( - service_id=2, - instance_id=2000, - major_version=1, - minor_version=0, - endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3001), # src IP and port of the service - ttl=5, - sd_sender=service_discovery, - cyclic_offer_delay_ms=2000 - ) - service_discovery.attach(service_instance_2) + # ..it's also possible to construct another ServerServiceInstance and attach it to service_discovery as well - # After constructing and attaching ServerServiceInstances to the ServiceDiscoveryProtocol objects the + # After constructing and attaching ServerServiceInstances to the ServiceDiscoveryProtocol object the # start_offer method has to be called. This will start an internal timer, which will periodically send # Offer service entries with a period of "cyclic_offer_delay_ms" which has been passed above - print("Start offering services..") + print("Start offering service..") service_instance_temperature.start_offer() - service_instance_2.start_offer() tmp_msg = TemparatureMsg() - # Reminder: Do NOT write "tmp_msg.version.major = 1". Always use the provided classes in someipy like Uint8, + # Reminder: Do NOT use "tmp_msg.version.major = 1". Always use the provided classes in someipy like Uint8, # so that the data can be propery serialized. Python literals won't be serialized properly tmp_msg.version.major = Uint8(1) tmp_msg.version.minor = Uint8(0) - + tmp_msg.measurements.data[0] = Float32(20.0) tmp_msg.measurements.data[1] = Float32(21.0) tmp_msg.measurements.data[2] = Float32(22.0) @@ -84,15 +86,16 @@ async def main(): await asyncio.sleep(5) tmp_msg.timestamp = Uint64(tmp_msg.timestamp.value + 1) payload = tmp_msg.serialize() - service_instance_temperature.send_event(SAMPLE_EVENTGROUP_ID, SAMPLE_EVENT_ID, payload) + service_instance_temperature.send_event( + SAMPLE_EVENTGROUP_ID, SAMPLE_EVENT_ID, payload + ) - # .. or in case your app is waiting for external events, using await asyncio.Future() to + # .. or in case your app is waiting for external events, use await asyncio.Future() to # keep the task alive # await asyncio.Future() - except asyncio.CancelledError as e: - print("Stop offering services..") + except asyncio.CancelledError: + print("Stop offering service..") await service_instance_temperature.stop_offer() - await service_instance_2.stop_offer() finally: print("Service Discovery close..") service_discovery.close() @@ -103,5 +106,5 @@ async def main(): if __name__ == "__main__": try: asyncio.run(main()) - except KeyboardInterrupt as e: + except KeyboardInterrupt: pass diff --git a/example_apps/send_events_udp.py b/example_apps/send_events_udp.py new file mode 100644 index 0000000..6894738 --- /dev/null +++ b/example_apps/send_events_udp.py @@ -0,0 +1,110 @@ +import asyncio +import ipaddress +import logging + +from someipy import TransportLayerProtocol +from someipy.service import ServiceBuilder, EventGroup +from someipy.service_discovery import construct_service_discovery +from someipy.server_service_instance import construct_server_service_instance +from someipy.logging import set_someipy_log_level +from someipy.serialization import Uint8, Uint64, Float32 +from temperature_msg import TemparatureMsg + +SD_MULTICAST_GROUP = "224.224.224.245" +SD_PORT = 30490 +INTERFACE_IP = "127.0.0.1" + +SAMPLE_SERVICE_ID = 0x1234 +SAMPLE_INSTANCE_ID = 0x5678 +SAMPLE_EVENTGROUP_ID = 0x0321 +SAMPLE_EVENT_ID = 0x0123 + + +async def main(): + # It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, .. + set_someipy_log_level(logging.DEBUG) + + # Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function + # use the construct_service_discovery function + # The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set + service_discovery = await construct_service_discovery( + SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP + ) + + temperature_eventgroup = EventGroup( + id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID] + ) + temperature_service = ( + ServiceBuilder() + .with_service_id(SAMPLE_SERVICE_ID) + .with_major_version(1) + .with_eventgroup(temperature_eventgroup) + .build() + ) + + # For sending events use a ServerServiceInstance + service_instance_temperature = await construct_server_service_instance( + temperature_service, + instance_id=SAMPLE_INSTANCE_ID, + endpoint=( + ipaddress.IPv4Address(INTERFACE_IP), + 3000, + ), # src IP and port of the service + ttl=5, + sd_sender=service_discovery, + cyclic_offer_delay_ms=2000, + protocol=TransportLayerProtocol.UDP + ) + + # The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance + # is notified by the ServiceDiscoveryProtocol about e.g. subscriptions from other ECUs + service_discovery.attach(service_instance_temperature) + + # ..it's also possible to construct another ServerServiceInstance and attach it to service_discovery as well + + # After constructing and attaching ServerServiceInstances to the ServiceDiscoveryProtocol object the + # start_offer method has to be called. This will start an internal timer, which will periodically send + # Offer service entries with a period of "cyclic_offer_delay_ms" which has been passed above + print("Start offering service..") + service_instance_temperature.start_offer() + + tmp_msg = TemparatureMsg() + + # Reminder: Do NOT use "tmp_msg.version.major = 1". Always use the provided classes in someipy like Uint8, + # so that the data can be propery serialized. Python literals won't be serialized properly + tmp_msg.version.major = Uint8(1) + tmp_msg.version.minor = Uint8(0) + + tmp_msg.measurements.data[0] = Float32(20.0) + tmp_msg.measurements.data[1] = Float32(21.0) + tmp_msg.measurements.data[2] = Float32(22.0) + tmp_msg.measurements.data[3] = Float32(23.0) + + try: + # Either cyclically send events in an endless loop.. + while True: + await asyncio.sleep(5) + tmp_msg.timestamp = Uint64(tmp_msg.timestamp.value + 1) + payload = tmp_msg.serialize() + service_instance_temperature.send_event( + SAMPLE_EVENTGROUP_ID, SAMPLE_EVENT_ID, payload + ) + + # .. or in case your app is waiting for external events, use await asyncio.Future() to + # keep the task alive + # await asyncio.Future() + except asyncio.CancelledError: + print("Stop offering service..") + await service_instance_temperature.stop_offer() + finally: + print("Service Discovery close..") + service_discovery.close() + + print("End main task..") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass diff --git a/src/someipy/__init__.py b/src/someipy/__init__.py index e69de29..d92427f 100644 --- a/src/someipy/__init__.py +++ b/src/someipy/__init__.py @@ -0,0 +1,2 @@ +from ._internal.someip_sd_header import TransportLayerProtocol +from .server_service_instance import ServerServiceInstance, construct_server_service_instance diff --git a/src/someipy/_internal/service_discovery_abcs.py b/src/someipy/_internal/service_discovery_abcs.py index 23838f0..fde282f 100644 --- a/src/someipy/_internal/service_discovery_abcs.py +++ b/src/someipy/_internal/service_discovery_abcs.py @@ -1,5 +1,6 @@ +import ipaddress from abc import ABC, abstractmethod -from someipy._internal.someip_sd_header import * +from someipy._internal.someip_sd_header import SdService, SdEventGroupEntry, SdIPV4EndpointOption from someipy._internal.session_handler import SessionHandler diff --git a/src/someipy/_internal/someip_data_processor.py b/src/someipy/_internal/someip_data_processor.py new file mode 100644 index 0000000..f471f59 --- /dev/null +++ b/src/someipy/_internal/someip_data_processor.py @@ -0,0 +1,63 @@ +from someipy._internal.someip_header import SomeIpHeader +from someipy._internal.someip_message import SomeIpMessage + + +class SomeipDataProcessor: + + def __init__(self, datagram_mode=True): + self._buffer = bytes() + self._expected_bytes = 0 + self._datagram_mode = datagram_mode + self.someip_message = None + + def _reset(self): + self._buffer = bytes() + self._expected_bytes = 0 + + def process_data(self, new_data: bytes) -> bool: + + received_length = len(new_data) + + # UDP case + if self._datagram_mode: + header = SomeIpHeader.from_buffer(new_data) + expected_total_length = 8 + header.length + payload_length = expected_total_length - 16 + if received_length == expected_total_length: + self.someip_message = SomeIpMessage(header=header, payload=new_data[16:]) + return True + else: + # Malformed package -> return False + return False + + # From here on: TCP case + if self._expected_bytes == 0 and len(self._buffer) == 0: + + if received_length >= SomeIpHeader.MINIMAL_SIZE: + + header = SomeIpHeader.from_buffer(new_data) + expected_total_length = 8 + header.length + payload_length = expected_total_length - 16 + + # Case 1: Received exactly one SOME/IP message + if received_length == expected_total_length: + self.someip_message = SomeIpMessage(header=header, payload=new_data[16:(16+payload_length)]) + self._reset() + return True + # Case 2: Received less bytes than expected + elif received_length < expected_total_length: + self._expected_bytes = (expected_total_length - received_length) + self._buffer = new_data + return False + # Case 3: Received more bytes than expected + elif received_length > expected_total_length: + # Assume it is the beginning of a new SOME/IP message, store remaining bytes in buffer + end_payload = 16 + payload_length + self.someip_message = SomeIpMessage(header=header, payload=new_data[16:end_payload]) + self._buffer = new_data[end_payload:] + self._expected_bytes = 0 + + return True + + else: + pass # store in buffer \ No newline at end of file diff --git a/src/someipy/_internal/someip_endpoint.py b/src/someipy/_internal/someip_endpoint.py new file mode 100644 index 0000000..3c30075 --- /dev/null +++ b/src/someipy/_internal/someip_endpoint.py @@ -0,0 +1,94 @@ +import asyncio +from abc import ABC, abstractmethod +from typing import Callable, Tuple, Any +from someipy._internal.someip_message import SomeIpMessage +from someipy._internal.tcp_client_manager import ( + TcpClientManagerInterface, + TcpClientProtocolInterface, +) +from someipy._internal.utils import EndpointType +from someipy._internal.someip_data_processor import SomeipDataProcessor + + +class SomeipEndpoint(ABC): + @abstractmethod + def set_someip_callback( + self, callback_func: Callable[[SomeIpMessage, Tuple[str, int]], None] + ) -> None: + pass + + @abstractmethod + def sendto(self, data: bytes, addr: EndpointType) -> None: + pass + + @abstractmethod + def sendtoall(self, data: bytes) -> None: + pass + + @abstractmethod + def shutdown(self) -> None: + pass + + +class UDPSomeipEndpoint(SomeipEndpoint, asyncio.DatagramProtocol): + def __init__(self): + self._callback: Callable[[SomeIpMessage, Tuple[str, int]], None] = None + self._transport = None + self._processor = SomeipDataProcessor(datagram_mode=True) + + def set_someip_callback( + self, callback_func: Callable[[SomeIpMessage, Tuple[str, int]], None] + ) -> None: + self._callback = callback_func + + def connection_made(self, transport: asyncio.DatagramTransport) -> None: + self._transport = transport + + def connection_lost(self, exc: Exception | None) -> None: + pass + + def datagram_received(self, data: bytes, addr: Tuple[str | Any | int]) -> None: + result = self._processor.process_data(data) + if result and self._callback is not None: + self._callback(self._processor.someip_message, addr) + + def sendto(self, data: bytes, addr: EndpointType) -> None: + if self._transport is not None: + self._transport.sendto(data, addr) + + def sendtoall(self, data: bytes) -> None: + # TODO: Implement later for multicast support + raise NotImplementedError("No implementation for UDP yet") + + def shutdown(self) -> None: + if self._transport is not None: + self._transport.close() + + +class TCPSomeipEndpoint(SomeipEndpoint): + def __init__(self, server, manager): + self._server: asyncio.Server = server + self._manager: TcpClientManagerInterface = manager + + def set_someip_callback( + self, callback_func: Callable[[SomeIpMessage, Tuple[str, int]], None] + ) -> None: + self._manager.register_callback(callback_func) + + def sendto(self, data: bytes, addr: EndpointType) -> None: + if self._manager is None: + return + client: TcpClientProtocolInterface = self._manager.get_client(addr[0], addr[1]) + if client is not None: + client.write(data) + + def sendtoall(self, data: bytes) -> None: + if self._manager is None: + return + for c in self._manager.get_all_clients(): + c.write(data) + + def shutdown(self) -> None: + if self._manager is None: + return + self._server.close() diff --git a/src/someipy/_internal/someip_header.py b/src/someipy/_internal/someip_header.py index fa5f5a9..d8b6f55 100644 --- a/src/someipy/_internal/someip_header.py +++ b/src/someipy/_internal/someip_header.py @@ -21,6 +21,8 @@ class SomeIpHeader: message_type: int return_code: int + MINIMAL_SIZE = 16 + def is_sd_header(self) -> bool: return ( self.service_id == SERVICE_ID_SD diff --git a/src/someipy/_internal/someip_message.py b/src/someipy/_internal/someip_message.py new file mode 100644 index 0000000..7cfd443 --- /dev/null +++ b/src/someipy/_internal/someip_message.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass +from someipy._internal.someip_header import SomeIpHeader + +@dataclass +class SomeIpMessage: + header: SomeIpHeader + payload: bytes diff --git a/src/someipy/_internal/subscribers.py b/src/someipy/_internal/subscribers.py new file mode 100644 index 0000000..0e05085 --- /dev/null +++ b/src/someipy/_internal/subscribers.py @@ -0,0 +1,58 @@ +import time +from typing import List +from someipy._internal.utils import EndpointType + + +class EventGroupSubscriber: + eventgroup_id: int + endpoint: EndpointType + ttl: int + last_ts_ms: int + + def __init__(self, eventgroup_id: int, endpoint: EndpointType, ttl: int): + self.eventgroup_id = eventgroup_id + self.endpoint = endpoint + self.ttl = ttl + self.last_ts_ms = int(time.time() * 1000) + + def __eq__(self, __value: object) -> bool: + return ( + self.eventgroup_id == __value.eventgroup_id + and self.endpoint[0] == __value.endpoint[0] + and self.endpoint[1] == __value.endpoint[1] + ) + + +class Subscribers: + def __init__(self): + self._subscribers: List[EventGroupSubscriber] = [] + + def update(self): + # From SOME/IP specification: + # TTL shall be set to the lifetime of the subscription. + # If set to 0xFFFFFF, the Subscribe Eventgroup entry shall be considered valid + # until the next reboot. + time_now_ms = int(time.time() * 1000) + + self._subscribers = [ + s + for s in self._subscribers + if (s.ttl == 0xFFFFFF) or (time_now_ms < (s.last_ts_ms + (s.ttl * 1000.0))) + ] + + def add_subscriber(self, new_subscriber: EventGroupSubscriber) -> None: + time_now_ms = int(time.time() * 1000.0) + for s in self._subscribers: + if new_subscriber == s: + s.last_ts_ms = time_now_ms + return + new_subscriber.last_ts_ms = time_now_ms + self._subscribers.append(new_subscriber) + + def remove_subscriber(self, subscriber: EventGroupSubscriber) -> None: + if subscriber in self._subscribers: + self._subscribers.remove(subscriber) + + @property + def subscribers(self) -> List[EventGroupSubscriber]: + return self._subscribers diff --git a/src/someipy/_internal/tcp_client_manager.py b/src/someipy/_internal/tcp_client_manager.py new file mode 100644 index 0000000..5addbe6 --- /dev/null +++ b/src/someipy/_internal/tcp_client_manager.py @@ -0,0 +1,123 @@ +import asyncio +from abc import ABC, abstractmethod +from typing import Callable, Dict, Iterable, Tuple + +from someipy._internal.someip_message import SomeIpMessage +from someipy._internal.someip_data_processor import SomeipDataProcessor + +class TcpClientProtocolInterface(ABC): + @abstractmethod + def write(self, data: bytes) -> None: + pass + + @property + @abstractmethod + def ip_addr(self) -> str: + pass + + @property + @abstractmethod + def port(self) -> int: + pass + + +class TcpClientManagerInterface(ABC): + @abstractmethod + def register_client(self, client: TcpClientProtocolInterface) -> None: + pass + + @abstractmethod + def unregister_client(self, client: TcpClientProtocolInterface) -> None: + pass + + @abstractmethod + def get_client(self, ip_addr: str, port: int) -> TcpClientProtocolInterface: + pass + + @abstractmethod + def get_all_clients(self) -> Iterable[TcpClientProtocolInterface]: + pass + + @abstractmethod + def someip_callback(self, client: TcpClientProtocolInterface, someip_message: SomeIpMessage) -> None: + pass + + @abstractmethod + def register_callback(self, callback: Callable[[SomeIpMessage, Tuple[str, int]], None]) -> None: + pass + + +class TcpClientManager(TcpClientManagerInterface): + + def __init__(self): + self._clients: Dict[str, TcpClientProtocolInterface] = {} + self._someip_callback: Callable[[SomeIpMessage, Tuple[str, int]], None] = None + + def _build_key(self, ip_addr: str, port: int) -> str: + return f"{ip_addr}-{port}" + + def register_client(self, client: TcpClientProtocolInterface) -> None: + print(f"Register new client {client.ip_addr}, {client.port}") + self._clients[self._build_key(client.ip_addr, client.port)] = client + + def unregister_client(self, client: TcpClientProtocolInterface) -> None: + print(f"Unregister client {client.ip_addr}, {client.port}") + if self._build_key(client.ip_addr, client.port) in self._clients.keys(): + del self._clients[self._build_key(client.ip_addr, client.port)] + + def get_client(self, ip_addr: str, port: int) -> TcpClientProtocolInterface: + if self._build_key(ip_addr, port) in self._clients.keys(): + return self._clients[self._build_key(ip_addr, port)] + else: + return None + + def get_all_clients(self) -> Iterable[TcpClientProtocolInterface]: + return self._clients.values() + + def someip_callback(self, client: TcpClientProtocolInterface, someip_message: SomeIpMessage) -> None: + if self._someip_callback is not None: + self._someip_callback(someip_message, (client.ip_addr, client.port)) + + def register_callback(self, callback: Callable[[SomeIpMessage, Tuple[str, int]], None]) -> None: + self._someip_callback = callback + +class TcpClientProtocol(asyncio.Protocol, TcpClientProtocolInterface): + + def __init__(self, client_manager: TcpClientManager): + self._client_manager: TcpClientManager = client_manager + self._transport = None + self._ip_addr_client = None + self._port_client = None + self._data_processor = SomeipDataProcessor(datagram_mode=False) + + def connection_made(self, transport: asyncio.BaseTransport): + peername: Tuple = transport.get_extra_info('peername') + print('Connection from {}'.format(peername)) + self._transport = transport + self._ip_addr_client = peername[0] + self._port_client = peername[1] + + self._client_manager.register_client(self) + + def data_received(self, data: bytes): + print('Data received {}: {}'.format(self.ip_addr, data)) + + # Push data to processor + result = self._data_processor.process_data(data) + if result and self._client_manager is not None: + self._client_manager.someip_callback(self, self._data_processor.someip_message) + + def connection_lost(self, _) -> None: + self._client_manager.unregister_client(self) + + def write(self, data: bytes) -> None: + if self._transport is not None: + self._transport.write(data) + + @property + def ip_addr(self) -> str: + return self._ip_addr_client + + @property + def port(self) -> int: + return self._port_client diff --git a/src/someipy/server_service_instance.py b/src/someipy/server_service_instance.py index 35f67c6..180a49c 100644 --- a/src/someipy/server_service_instance.py +++ b/src/someipy/server_service_instance.py @@ -1,6 +1,8 @@ import asyncio -from dataclasses import dataclass -from typing import List, Callable, Tuple, Union, Any, Dict +from typing import Tuple + +from someipy._internal.tcp_client_manager import TcpClientManager, TcpClientProtocol +from someipy.service import Service from someipy._internal.message_types import MessageType, ReturnCode from someipy._internal.someip_sd_builder import ( @@ -8,7 +10,10 @@ build_offer_service_sd_header, build_subscribe_eventgroup_ack_sd_header, ) -from someipy._internal.someip_header import SomeIpHeader, get_payload_from_someip_message +from someipy._internal.someip_header import ( + SomeIpHeader, + get_payload_from_someip_message, +) from someipy._internal.someip_sd_header import ( SdService, TransportLayerProtocol, @@ -22,134 +27,141 @@ from someipy._internal.simple_timer import SimplePeriodicTimer from someipy._internal.utils import ( - DatagramAdapter, create_udp_socket, - endpoint_to_str_int_tuple, EndpointType, + endpoint_to_str_int_tuple, ) from someipy._internal.logging import get_logger +from someipy._internal.subscribers import Subscribers, EventGroupSubscriber +from someipy._internal.someip_endpoint import ( + SomeipEndpoint, + TCPSomeipEndpoint, + UDPSomeipEndpoint, +) _logger = get_logger("server_service_instance") -@dataclass -class EventGroup: - eventgroup_id: int - ttl: int - event_ids: List[int] - - -@dataclass -class EventGroupSubscriber: - eventgroup_id: int - endpoint: EndpointType - -@dataclass -class Method: - method_id: int - method_handler: Callable[[bytes], Tuple[bool, bytes]] - - def __eq__(self, __value: object) -> bool: - return self.method_id == __value.method_id - - class ServerServiceInstance(ServiceDiscoveryObserver): - service_id: int + service: Service instance_id: int - major_version: int - minor_version: int endpoint: EndpointType + protocol: TransportLayerProtocol + someip_endpoint: SomeipEndpoint ttl: int - sd_sender: ServiceDiscoverySender cyclic_offer_delay_ms: int - eventgroups: List[EventGroup] - subscribers: List[EventGroupSubscriber] - methods: Dict[int, Method] + subscribers: Subscribers offer_timer: SimplePeriodicTimer def __init__( self, - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, + protocol: TransportLayerProtocol, + someip_endpoint: SomeipEndpoint, ttl: int = 0, sd_sender=None, cyclic_offer_delay_ms=2000, ): - self.service_id = service_id + self.service = service self.instance_id = instance_id - self.major_version = major_version - self.minor_version = minor_version self.endpoint = endpoint + self.protocol = protocol + self.someip_endpoint = someip_endpoint self.ttl = ttl self.sd_sender = sd_sender self.cyclic_offer_delay_ms = cyclic_offer_delay_ms - self.sender_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) - - self.eventgroups = [] - self.subscribers = [] - self.methods = dict() - + self.subscribers = Subscribers() self.offer_timer = None - def add_eventgroup(self, eventgroup: EventGroup): - ids = [e.eventgroup_id for e in self.eventgroups] - if eventgroup.eventgroup_id not in ids: - self.eventgroups.append(eventgroup) + def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None: + self.subscribers.update() - def add_method(self, method: Method): - if self.methods.get(method.method_id) is None: - self.methods[method.method_id] = method + length = 8 + len(payload) + someip_header = SomeIpHeader( + service_id=self.service.id, + method_id=event_id, + length=length, + client_id=0x00, # TODO + session_id=0x01, # TODO + protocol_version=1, # TODO + interface_version=1, # TODO + message_type=MessageType.NOTIFICATION.value, + return_code=0x00, + ) + _logger.debug( + f"Send event for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}" + ) - def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> None: + for sub in self.subscribers.subscribers: + if sub.eventgroup_id == event_group_id: + _logger.debug( + f"Send event for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X} to {sub.endpoint[0]}:{sub.endpoint[1]}" + ) + self.someip_endpoint.sendto( + someip_header.to_buffer() + payload, + endpoint_to_str_int_tuple(sub.endpoint), + ) + + def someip_message_received(self, data: bytes, addr: Tuple[str, int]) -> None: header = SomeIpHeader.from_buffer(data) payload_to_return = bytes() header_to_return = header def send_response(): - self.sender_socket.sendto( - header_to_return.to_buffer() + payload_to_return, - addr) + self.unicast_transport.sendto( + header_to_return.to_buffer() + payload_to_return, addr + ) if header.service_id != self.service_id: - _logger.warn(f"Unknown service ID received from {addr}: ID 0x{header.service_id:04X}") + _logger.warn( + f"Unknown service ID received from {addr}: ID 0x{header.service_id:04X}" + ) header_to_return.message_type = MessageType.RESPONSE.value header_to_return.return_code = ReturnCode.E_UNKNOWN_SERVICE.value send_response() return - + if header.method_id not in self.methods.keys(): - _logger.warn(f"Unknown method ID received from {addr}: ID 0x{header.method_id:04X}") + _logger.warn( + f"Unknown method ID received from {addr}: ID 0x{header.method_id:04X}" + ) header_to_return.message_type = MessageType.RESPONSE.value header_to_return.return_code = ReturnCode.E_UNKNOWN_METHOD.value send_response() return - - # TODO: Test for protocol and interface version - if header.message_type == MessageType.REQUEST.value and header.return_code == 0x00: + # TODO: Test for protocol and interface version + if ( + header.message_type == MessageType.REQUEST.value + and header.return_code == 0x00 + ): payload_in = get_payload_from_someip_message(header, data) - (success, payload_to_return) = self.methods[header.method_id].method_handler(payload_in) + (success, payload_to_return) = self.methods[ + header.method_id + ].method_handler(payload_in) if not success: - _logger.debug(f"Return ERROR message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}") + _logger.debug( + f"Return ERROR message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}" + ) header_to_return.message_type = MessageType.ERROR.value else: - _logger.debug(f"Return RESPONSE message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}") + _logger.debug( + f"Return RESPONSE message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}" + ) header_to_return.message_type = MessageType.RESPONSE.value send_response() else: - _logger.warn(f"Unknown message type received from {addr}: Type 0x{header.message_type:04X}") - - def connection_lost(self, exc: Exception) -> None: - pass + _logger.warn( + f"Unknown message type received from {addr}: Type 0x{header.message_type:04X}" + ) def find_service_update(self): # TODO: implement SD behaviour and send back offer @@ -159,57 +171,29 @@ def offer_service_update(self, _: SdService): # No reaction in a server instance needed pass - def _insert_subscriber(self, new_subscriber: EventGroupSubscriber) -> None: - for s in self.subscribers: - if new_subscriber == s: - return - self.subscribers.append(new_subscriber) - - def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None: - length = 8 + len(payload) - someip_header = SomeIpHeader( - service_id=self.service_id, - method_id=event_id, - length=length, - client_id=0x00, # TODO - session_id=0x01, # TODO - protocol_version=1, # TODO, - interface_version=1, # TODO, - message_type=MessageType.NOTIFICATION.value, - return_code=0x00, - ) - - for sub in self.subscribers: - if sub.eventgroup_id == event_group_id: - self.sender_socket.sendto( - someip_header.to_buffer() + payload, - endpoint_to_str_int_tuple(sub.endpoint), - ) - def subscribe_eventgroup_update( self, sd_event_group: SdEventGroupEntry, ipv4_endpoint_option: SdIPV4EndpointOption, ) -> None: - # eventgroup_ids = [e.eventgroup_id for e in self.eventgroups] - # [PRS_SOMEIPSD_00829] When receiving a SubscribeEventgroupAck or Sub- # scribeEventgroupNack the Service ID, Instance ID, Eventgroup ID, and Major Ver- # sion shall match exactly to the corresponding SubscribeEventgroup Entry to identify - # an Eventgroup of a Service Instance.c - + # an Eventgroup of a Service Instance. + # TODO: enable major version check if ( - sd_event_group.sd_entry.service_id == self.service_id + sd_event_group.sd_entry.service_id == self.service.id and sd_event_group.sd_entry.instance_id == self.instance_id - # and sd_event_group.eventgroup_id in eventgroup_ids + and sd_event_group.eventgroup_id in self.service.eventgroupids ): ( session_id, reboot_flag, ) = self.sd_sender.get_unicast_session_handler().update_session() - # TODO: enable major version check - # sd_event_group.sd_entry.major_version == self.major_version: - # print("Service received subscribe entry -> send back ACK") + + _logger.debug( + f"Send Subscribe ACK for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}, TTL: {sd_event_group.sd_entry.ttl}" + ) ack_entry = build_subscribe_eventgroup_ack_entry( service_id=sd_event_group.sd_entry.service_id, instance_id=sd_event_group.sd_entry.instance_id, @@ -226,11 +210,17 @@ def subscribe_eventgroup_update( dest_ip=ipv4_endpoint_option.ipv4_address, ) - subscriber = EventGroupSubscriber( - eventgroup_id=sd_event_group.eventgroup_id, - endpoint=(ipv4_endpoint_option.ipv4_address, ipv4_endpoint_option.port), + self.subscribers.add_subscriber( + EventGroupSubscriber( + eventgroup_id=sd_event_group.eventgroup_id, + endpoint=( + ipv4_endpoint_option.ipv4_address, + ipv4_endpoint_option.port, + ), + ttl=sd_event_group.sd_entry.ttl, + ) ) - self._insert_subscriber(subscriber) + pass def subscribe_ack_eventgroup_update(self, _: SdEventGroupEntry) -> None: # Not needed for server instance @@ -243,17 +233,17 @@ def offer_timer_callback(self): ) = self.sd_sender.get_multicast_session_handler().update_session() _logger.debug( - f"Offer service for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}, TTL: {self.ttl}, version: {self.major_version}.{self.minor_version}, session ID: {session_id}" + f"Offer service for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}, TTL: {self.ttl}, version: {self.service.major_version}.{self.service.minor_version}, session ID: {session_id}" ) service_to_offer = SdService( - service_id=self.service_id, + service_id=self.service.id, instance_id=self.instance_id, major_version=1, minor_version=0, ttl=self.ttl, endpoint=self.endpoint, - protocol=TransportLayerProtocol.UDP, + protocol=self.protocol, ) sd_header = build_offer_service_sd_header( service_to_offer, session_id, reboot_flag @@ -268,7 +258,7 @@ def start_offer(self): async def stop_offer(self): _logger.debug( - f"Stop offer for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}" + f"Stop offer for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}" ) if self.offer_timer is not None: @@ -276,27 +266,55 @@ async def stop_offer(self): await self.offer_timer.task # TODO: send out a stop offer sd message before stopping the timer + async def construct_server_service_instance( - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, ttl: int = 0, sd_sender=None, - cyclic_offer_delay_ms=2000 + cyclic_offer_delay_ms=2000, + protocol=TransportLayerProtocol.UDP, ) -> ServerServiceInstance: - - server_instance = ServerServiceInstance( - service_id, instance_id, major_version, minor_version, endpoint, ttl, sd_sender, cyclic_offer_delay_ms - ) + if protocol == TransportLayerProtocol.UDP: + loop = asyncio.get_running_loop() + rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) - loop = asyncio.get_running_loop() - rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) + _, udp_endpoint = await loop.create_datagram_endpoint( + lambda: UDPSomeipEndpoint(), sock=rcv_socket + ) - unicast_transport, _ = await loop.create_datagram_endpoint( - lambda: DatagramAdapter(target=server_instance), sock=rcv_socket - ) - server_instance.unicast_transport = unicast_transport + server_instance = ServerServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.UDP, + udp_endpoint, + ttl, + sd_sender, + cyclic_offer_delay_ms, + ) + return server_instance + + elif protocol == TransportLayerProtocol.TCP: + tcp_client_manager = TcpClientManager() + loop = asyncio.get_running_loop() + server = await loop.create_server( + lambda: TcpClientProtocol(client_manager=tcp_client_manager), + str(endpoint[0]), + endpoint[1], + ) - return server_instance + tcp_someip_endpoint = TCPSomeipEndpoint(server, tcp_client_manager) + + server_instance = ServerServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.TCP, + tcp_someip_endpoint, + ttl, + sd_sender, + cyclic_offer_delay_ms, + ) + return server_instance diff --git a/src/someipy/service.py b/src/someipy/service.py new file mode 100644 index 0000000..bbde6e4 --- /dev/null +++ b/src/someipy/service.py @@ -0,0 +1,72 @@ +from dataclasses import dataclass +from typing import List, Callable, Tuple, Dict + + +@dataclass +class Method: + id: int + method_handler: Callable[[bytes], Tuple[bool, bytes]] + + def __eq__(self, __value: object) -> bool: + return self.method_id == __value.method_id + + +@dataclass +class EventGroup: + id: int + event_ids: List[int] + + +@dataclass +class Service: + id: int + major_version: int + minor_version: int + + methods: Dict[int, Method] + eventgroups: Dict[int, EventGroup] + + def __init__(self): + self.id = 0 + self.major_version = 0 + self.minor_version = 0 + self.methods = dict() + self.eventgroups = dict() + + @property + def eventgroupids(self) -> List[int]: + return self.eventgroups.keys() + + @property + def methodids(self) -> List[int]: + return self.methods.keys() + + +class ServiceBuilder: + def __init__(self): + self._service: Service = Service() + + def with_service_id(self, id: int): + self._service.id = id + return self + + def with_major_version(self, major_version: int): + self._service.major_version = major_version + return self + + def with_minor_version(self, minor_version: int): + self._service.minor_version = minor_version + return self + + def with_method(self, method: Method): + if self._service.methods.get(method.id) is None: + self._service.methods[method.id] = method + return self + + def with_eventgroup(self, eventgroup: EventGroup): + if self._service.eventgroups.get(eventgroup.id) is None: + self._service.eventgroups[eventgroup.id] = eventgroup + return self + + def build(self): + return self._service diff --git a/src/someipy/service_discovery.py b/src/someipy/service_discovery.py index e18a8ca..34b87d4 100644 --- a/src/someipy/service_discovery.py +++ b/src/someipy/service_discovery.py @@ -3,15 +3,23 @@ from typing import Any, Union, Tuple, List from someipy._internal.someip_header import SomeIpHeader -from someipy._internal.someip_sd_header import * -from someipy._internal.someip_sd_extractors import * +from someipy._internal.someip_sd_header import SomeIpSdHeader +from someipy._internal.someip_sd_extractors import ( + extract_offered_services, + extract_subscribe_eventgroup_entries, + extract_subscribe_ack_eventgroup_entries, +) from someipy._internal.session_handler import SessionHandler from someipy._internal.utils import ( create_rcv_multicast_socket, create_udp_socket, DatagramAdapter, ) -from someipy._internal.service_discovery_abcs import * +from someipy._internal.service_discovery_abcs import ( + ServiceDiscoveryObserver, + ServiceDiscoverySender, + ServiceDiscoverySubject, +) from someipy._internal.logging import get_logger _logger = get_logger("service_discovery") @@ -85,11 +93,13 @@ def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> N for o in self.attached_observers: o.subscribe_eventgroup_update(event_group_entry, ipv4_endpoint_option) - for event_group_entry in extract_subscribe_ack_eventgroup_entries(someip_sd_header): + for event_group_entry in extract_subscribe_ack_eventgroup_entries( + someip_sd_header + ): _logger.debug( f"Received subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" ) - #for o in self.attached_observers: + # for o in self.attached_observers: # o.sub def connection_lost(self, exc: Exception) -> None: diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 138ec74..73f1eb3 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -1,8 +1,19 @@ -import sys -sys.path.append("src") - -import pytest -from someipy.serialization import * +from dataclasses import dataclass +from someipy.serialization import ( + Uint8, + Uint16, + Uint32, + Uint64, + Sint8, + Sint16, + Sint32, + Sint64, + Bool, + Float32, + Float64, + SomeIpPayload, + SomeIpFixedSizeArray, +) def test_base_types_len(): @@ -123,7 +134,7 @@ def test_fixed_size_array_serialization_and_deserialization(): a.data[2] = Uint8(3) a.data[3] = Uint8(4) # Expected hex: 0x 01 02 03 04 - assert bytes.fromhex("01020304") == a.serialize() + assert bytes.fromhex("01020304") == a.serialize() a_again = SomeIpFixedSizeArray(Uint8, 4).deserialize(bytes.fromhex("01020304")) assert a_again == a diff --git a/tests/test_someip_data_processor.py b/tests/test_someip_data_processor.py new file mode 100644 index 0000000..1c37b87 --- /dev/null +++ b/tests/test_someip_data_processor.py @@ -0,0 +1,73 @@ +import pytest +from someipy._internal.someip_data_processor import SomeipDataProcessor +from someipy._internal.someip_header import SomeIpHeader +from someipy._internal.message_types import MessageType +from someipy._internal.someip_message import SomeIpMessage + + +@pytest.fixture +def valid_someip_message() -> SomeIpMessage: + # length is 8 bytes + payload size + payload_size = 64 + length = 8 + payload_size + + someip_header = SomeIpHeader( + service_id=1, + method_id=2, + length=length, + client_id=3, + session_id=4, + protocol_version=1, + interface_version=2, + message_type=MessageType.REQUEST.value, + return_code=0x00, + ) + payload = b"\x00" * payload_size + return SomeIpMessage(header=someip_header, payload=payload) + + +@pytest.fixture +def corrupt_someip_message() -> SomeIpMessage: + # length is 8 bytes + payload size + payload_size = 64 + length = 8 + payload_size + 1 # +1 for corrupt header + + someip_header = SomeIpHeader( + service_id=1, + method_id=2, + length=length, + client_id=3, + session_id=4, + protocol_version=1, + interface_version=2, + message_type=MessageType.REQUEST.value, + return_code=0x00, + ) + payload = b"\x00" * payload_size + return SomeIpMessage(header=someip_header, payload=payload) + + +def test_process_with_datagrams(valid_someip_message): + data = valid_someip_message.header.to_buffer() + valid_someip_message.payload + processor = SomeipDataProcessor(datagram_mode=True) + result = processor.process_data(data) + + assert result is True + assert processor.someip_message.header == valid_someip_message.header + assert processor._expected_bytes == 0 + assert len(processor._buffer) == 0 + + result = processor.process_data(data) + assert result is True + assert processor.someip_message.header == valid_someip_message.header + assert processor._expected_bytes == 0 + assert len(processor._buffer) == 0 + + +def test_process_with_malformed_datagrams(corrupt_someip_message): + data = corrupt_someip_message.header.to_buffer() + corrupt_someip_message.payload + processor = SomeipDataProcessor(datagram_mode=True) + + result = processor.process_data(data) + + assert result is False diff --git a/tests/test_subscribers.py b/tests/test_subscribers.py new file mode 100644 index 0000000..cfc3f55 --- /dev/null +++ b/tests/test_subscribers.py @@ -0,0 +1,64 @@ +import time +from someipy._internal.subscribers import EventGroupSubscriber, Subscribers + +def test_eventgroupsubscriber_equal(): + e_1 = ("123", 5) + e_2 = ("123", 5) + e_3 = ("456", 4) + + s_1 = EventGroupSubscriber(1, e_1, 500) + s_2 = EventGroupSubscriber(1, e_2, 600) + assert s_1 == s_2 + + s_2.endpoint = e_3 + assert s_1 != s_2 + + s_2.endpoint = e_2 + assert s_1 == s_2 + + s_2.eventgroup_id = 2 + assert s_1 != s_2 + + +def test_add_remove_update(): + + subscriber = EventGroupSubscriber(eventgroup_id=1, endpoint=("123", 1), ttl=1) + + subscribers = Subscribers() + initial_ts = subscriber.last_ts_ms + time.sleep(0.1) + + assert len(subscribers.subscribers) == 0 + subscribers.add_subscriber(subscriber) + assert initial_ts != subscribers.subscribers[0].last_ts_ms + assert len(subscribers.subscribers) == 1 + + # Add same subscriber. Length shall not change, but timestamp shall be updated + initial_ts = subscribers.subscribers[0].last_ts_ms + time.sleep(0.2) + subscribers.add_subscriber(subscriber) + assert initial_ts != subscribers.subscribers[0].last_ts_ms + assert len(subscribers.subscribers) == 1 + + subscriber_2 = EventGroupSubscriber(eventgroup_id=2, endpoint=("123", 1), ttl=1) + subscribers.add_subscriber(subscriber_2) + assert len(subscribers.subscribers) == 2 + + subscriber_3 = EventGroupSubscriber(eventgroup_id=2, endpoint=("123", 1), ttl=1) + subscribers.remove_subscriber(subscriber_3) + assert len(subscribers.subscribers) == 1 + + subscriber_4 = EventGroupSubscriber(eventgroup_id=1, endpoint=("123", 1), ttl=1) + subscribers.remove_subscriber(subscriber_4) + assert len(subscribers.subscribers) == 0 + + subscribers.add_subscriber(EventGroupSubscriber(eventgroup_id=1, endpoint=("123", 1), ttl=1)) + subscribers.add_subscriber(EventGroupSubscriber(eventgroup_id=2, endpoint=("123", 1), ttl=3)) + subscribers.add_subscriber(EventGroupSubscriber(eventgroup_id=3, endpoint=("123", 1), ttl=2)) + assert len(subscribers.subscribers) == 3 + time.sleep(1.1) + subscribers.update() + assert len(subscribers.subscribers) == 2 + time.sleep(3) + subscribers.update() + assert len(subscribers.subscribers) == 0 \ No newline at end of file From 89e06bf5ef792448620c83935b85b64af376c4ce Mon Sep 17 00:00:00 2001 From: chrizog Date: Sun, 14 Apr 2024 12:22:12 +0200 Subject: [PATCH 2/3] Add first test application using vsomeip --- .gitignore | 5 +- example_apps/send_events_udp.py | 6 +- src/someipy/__init__.py | 1 + src/someipy/logging.py | 2 +- src/someipy/server_service_instance.py | 117 +++++++++--------- test_apps/CMakeLists.txt | 25 ++++ test_apps/README.md | 39 ++++++ test_apps/send_events_udp/send_events_udp.cpp | 85 +++++++++++++ test_apps/send_events_udp/start_app.sh | 6 + test_apps/send_events_udp/vsomeip-client.json | 32 +++++ 10 files changed, 256 insertions(+), 62 deletions(-) create mode 100644 test_apps/CMakeLists.txt create mode 100644 test_apps/README.md create mode 100644 test_apps/send_events_udp/send_events_udp.cpp create mode 100755 test_apps/send_events_udp/start_app.sh create mode 100644 test_apps/send_events_udp/vsomeip-client.json diff --git a/.gitignore b/.gitignore index 01aa025..8865224 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ .vscode/ dist/ someipy.egg-info/ -__pycache__ \ No newline at end of file +__pycache__ + +test_apps/build +test_apps/install \ No newline at end of file diff --git a/example_apps/send_events_udp.py b/example_apps/send_events_udp.py index 6894738..7e16c13 100644 --- a/example_apps/send_events_udp.py +++ b/example_apps/send_events_udp.py @@ -2,10 +2,8 @@ import ipaddress import logging -from someipy import TransportLayerProtocol -from someipy.service import ServiceBuilder, EventGroup +from someipy import TransportLayerProtocol, ServiceBuilder, EventGroup, construct_server_service_instance from someipy.service_discovery import construct_service_discovery -from someipy.server_service_instance import construct_server_service_instance from someipy.logging import set_someipy_log_level from someipy.serialization import Uint8, Uint64, Float32 from temperature_msg import TemparatureMsg @@ -83,7 +81,7 @@ async def main(): try: # Either cyclically send events in an endless loop.. while True: - await asyncio.sleep(5) + await asyncio.sleep(1) tmp_msg.timestamp = Uint64(tmp_msg.timestamp.value + 1) payload = tmp_msg.serialize() service_instance_temperature.send_event( diff --git a/src/someipy/__init__.py b/src/someipy/__init__.py index d92427f..68d9611 100644 --- a/src/someipy/__init__.py +++ b/src/someipy/__init__.py @@ -1,2 +1,3 @@ from ._internal.someip_sd_header import TransportLayerProtocol from .server_service_instance import ServerServiceInstance, construct_server_service_instance +from .service import Service, ServiceBuilder, EventGroup \ No newline at end of file diff --git a/src/someipy/logging.py b/src/someipy/logging.py index df92067..0d76925 100644 --- a/src/someipy/logging.py +++ b/src/someipy/logging.py @@ -1,6 +1,6 @@ import logging -_log_level = logging.INFO +_log_level = logging.DEBUG def set_someipy_log_level(log_level: int): diff --git a/src/someipy/server_service_instance.py b/src/someipy/server_service_instance.py index 180a49c..d2fa76e 100644 --- a/src/someipy/server_service_instance.py +++ b/src/someipy/server_service_instance.py @@ -1,9 +1,9 @@ import asyncio from typing import Tuple -from someipy._internal.tcp_client_manager import TcpClientManager, TcpClientProtocol from someipy.service import Service +from someipy._internal.tcp_client_manager import TcpClientManager, TcpClientProtocol from someipy._internal.message_types import MessageType, ReturnCode from someipy._internal.someip_sd_builder import ( build_subscribe_eventgroup_ack_entry, @@ -24,7 +24,6 @@ ServiceDiscoveryObserver, ServiceDiscoverySender, ) - from someipy._internal.simple_timer import SimplePeriodicTimer from someipy._internal.utils import ( create_udp_socket, @@ -43,17 +42,26 @@ class ServerServiceInstance(ServiceDiscoveryObserver): - service: Service - instance_id: int - endpoint: EndpointType - protocol: TransportLayerProtocol - someip_endpoint: SomeipEndpoint - ttl: int - sd_sender: ServiceDiscoverySender - cyclic_offer_delay_ms: int - - subscribers: Subscribers - offer_timer: SimplePeriodicTimer + """AI is creating summary for ServerServiceInstance + + Args: + ServiceDiscoveryObserver ([type]): [description] + + Returns: + [type]: [description] + """ + + _service: Service + _instance_id: int + _endpoint: EndpointType + _protocol: TransportLayerProtocol + _someip_endpoint: SomeipEndpoint + _ttl: int + _sd_sender: ServiceDiscoverySender + _cyclic_offer_delay_ms: int + + _subscribers: Subscribers + _offer_timer: SimplePeriodicTimer def __init__( self, @@ -66,24 +74,24 @@ def __init__( sd_sender=None, cyclic_offer_delay_ms=2000, ): - self.service = service - self.instance_id = instance_id - self.endpoint = endpoint - self.protocol = protocol - self.someip_endpoint = someip_endpoint - self.ttl = ttl - self.sd_sender = sd_sender - self.cyclic_offer_delay_ms = cyclic_offer_delay_ms - - self.subscribers = Subscribers() - self.offer_timer = None + self._service = service + self._instance_id = instance_id + self._endpoint = endpoint + self._protocol = protocol + self._someip_endpoint = someip_endpoint + self._ttl = ttl + self._sd_sender = sd_sender + self._cyclic_offer_delay_ms = cyclic_offer_delay_ms + + self._subscribers = Subscribers() + self._offer_timer = None def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None: - self.subscribers.update() + self._subscribers.update() length = 8 + len(payload) someip_header = SomeIpHeader( - service_id=self.service.id, + service_id=self._service.id, method_id=event_id, length=length, client_id=0x00, # TODO @@ -93,16 +101,13 @@ def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None message_type=MessageType.NOTIFICATION.value, return_code=0x00, ) - _logger.debug( - f"Send event for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}" - ) - for sub in self.subscribers.subscribers: + for sub in self._subscribers.subscribers: if sub.eventgroup_id == event_group_id: _logger.debug( - f"Send event for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X} to {sub.endpoint[0]}:{sub.endpoint[1]}" + f"Send event for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X} to {sub.endpoint[0]}:{sub.endpoint[1]}" ) - self.someip_endpoint.sendto( + self._someip_endpoint.sendto( someip_header.to_buffer() + payload, endpoint_to_str_int_tuple(sub.endpoint), ) @@ -148,12 +153,12 @@ def send_response(): if not success: _logger.debug( - f"Return ERROR message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}" + f"Return ERROR message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self._instance_id:04X}" ) header_to_return.message_type = MessageType.ERROR.value else: _logger.debug( - f"Return RESPONSE message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self.instance_id:04X}" + f"Return RESPONSE message type to {addr} for service and instance ID: 0x{self.service_id:04X} / 0x{self._instance_id:04X}" ) header_to_return.message_type = MessageType.RESPONSE.value @@ -182,17 +187,17 @@ def subscribe_eventgroup_update( # an Eventgroup of a Service Instance. # TODO: enable major version check if ( - sd_event_group.sd_entry.service_id == self.service.id - and sd_event_group.sd_entry.instance_id == self.instance_id - and sd_event_group.eventgroup_id in self.service.eventgroupids + sd_event_group.sd_entry.service_id == self._service.id + and sd_event_group.sd_entry.instance_id == self._instance_id + and sd_event_group.eventgroup_id in self._service.eventgroupids ): ( session_id, reboot_flag, - ) = self.sd_sender.get_unicast_session_handler().update_session() + ) = self._sd_sender.get_unicast_session_handler().update_session() _logger.debug( - f"Send Subscribe ACK for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}, TTL: {sd_event_group.sd_entry.ttl}" + f"Send Subscribe ACK for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, TTL: {sd_event_group.sd_entry.ttl}" ) ack_entry = build_subscribe_eventgroup_ack_entry( service_id=sd_event_group.sd_entry.service_id, @@ -205,12 +210,12 @@ def subscribe_eventgroup_update( entry=ack_entry, session_id=session_id, reboot_flag=reboot_flag ) - self.sd_sender.send_unicast( + self._sd_sender.send_unicast( buffer=header_output.to_buffer(), dest_ip=ipv4_endpoint_option.ipv4_address, ) - self.subscribers.add_subscriber( + self._subscribers.add_subscriber( EventGroupSubscriber( eventgroup_id=sd_event_group.eventgroup_id, endpoint=( @@ -230,40 +235,40 @@ def offer_timer_callback(self): ( session_id, reboot_flag, - ) = self.sd_sender.get_multicast_session_handler().update_session() + ) = self._sd_sender.get_multicast_session_handler().update_session() _logger.debug( - f"Offer service for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}, TTL: {self.ttl}, version: {self.service.major_version}.{self.service.minor_version}, session ID: {session_id}" + f"Offer service for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, TTL: {self._ttl}, version: {self._service.major_version}.{self._service.minor_version}, session ID: {session_id}" ) service_to_offer = SdService( - service_id=self.service.id, - instance_id=self.instance_id, + service_id=self._service.id, + instance_id=self._instance_id, major_version=1, minor_version=0, - ttl=self.ttl, - endpoint=self.endpoint, - protocol=self.protocol, + ttl=self._ttl, + endpoint=self._endpoint, + protocol=self._protocol, ) sd_header = build_offer_service_sd_header( service_to_offer, session_id, reboot_flag ) - self.sd_sender.send_multicast(sd_header.to_buffer()) + self._sd_sender.send_multicast(sd_header.to_buffer()) def start_offer(self): - self.offer_timer = SimplePeriodicTimer( - self.cyclic_offer_delay_ms / 1000.0, self.offer_timer_callback + self._offer_timer = SimplePeriodicTimer( + self._cyclic_offer_delay_ms / 1000.0, self.offer_timer_callback ) - self.offer_timer.start() + self._offer_timer.start() async def stop_offer(self): _logger.debug( - f"Stop offer for instance 0x{self.instance_id:04X}, service: 0x{self.service.id:04X}" + f"Stop offer for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}" ) - if self.offer_timer is not None: - self.offer_timer.stop() - await self.offer_timer.task + if self._offer_timer is not None: + self._offer_timer.stop() + await self._offer_timer.task # TODO: send out a stop offer sd message before stopping the timer diff --git a/test_apps/CMakeLists.txt b/test_apps/CMakeLists.txt new file mode 100644 index 0000000..b4707c4 --- /dev/null +++ b/test_apps/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required (VERSION 3.13) + +project(someipy_test_apps) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") + +find_package(vsomeip3 CONFIG REQUIRED) + +# TODO: Common function for building a single test target +function(add_test_app_target) + + + +endfunction(add_test_app_target) + + +add_executable(send_events_udp send_events_udp/send_events_udp.cpp) +target_link_libraries(send_events_udp ${VSOMEIP_LIBRARIES} pthread) + +set(CMAKE_INSTALL_PREFIX "${CMAKE_SOURCE_DIR}/install") + +install(TARGETS send_events_udp + RUNTIME DESTINATION send_events_udp) + +file(GLOB JSON_FILES "${CMAKE_SOURCE_DIR}/send_events_udp/*.json" "${CMAKE_SOURCE_DIR}/send_events_udp/*.sh") +install(FILES ${JSON_FILES} DESTINATION send_events_udp) \ No newline at end of file diff --git a/test_apps/README.md b/test_apps/README.md new file mode 100644 index 0000000..e1ba551 --- /dev/null +++ b/test_apps/README.md @@ -0,0 +1,39 @@ +## Build Test Applications using CMake + +```bash +rm -rf build install && mkdir -p build && cd build && cmake .. && make && make install && cd .. +``` + +## Setup Python Pip Package from Source + +```bash +python3.12 -m pip install -e . +``` + +## Test Procedure + +### Network Setup Linux + +```bash +sudo ip addr add 127.0.0.2/8 dev lo +sudo ip addr add 224.224.224.245 dev lo autojoin +``` + +### 1. send_events_udp + +Open two terminals (one for Python someipy example app, one for test app) + +Start apps: + +```bash +python3.12 send_events_udp.py +bash ./install/send_events_udp/start_app.sh +``` + +Expected in test app: +- Receive event with frequency 1Hz +- Service ID: 0x1234 +- Instance ID: 0x5678 +- Eventgroup ID: 0x0321 +- Event ID: 0x0123 +- SD Offer is sent with 0.5Hz \ No newline at end of file diff --git a/test_apps/send_events_udp/send_events_udp.cpp b/test_apps/send_events_udp/send_events_udp.cpp new file mode 100644 index 0000000..b77e774 --- /dev/null +++ b/test_apps/send_events_udp/send_events_udp.cpp @@ -0,0 +1,85 @@ +#include +#include +#include +#include +#include +#include + +#define SAMPLE_SERVICE_ID 0x1234 +#define SAMPLE_INSTANCE_ID 0x5678 +#define SAMPLE_EVENTGROUP_ID 0x0321 +#define SAMPLE_EVENT_ID 0x0123 + +std::shared_ptr app; +std::mutex mutex; +std::condition_variable condition; +bool service_available = false; + +void run() +{ + while (true) + { + std::unique_lock its_lock(mutex); + condition.wait(its_lock, [] + { return service_available; }); + + service_available = false; + its_lock.unlock(); + + std::cout << "Run triggered." << std::endl; + + std::set its_groups; + its_groups.insert(SAMPLE_EVENTGROUP_ID); + app->request_event(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENT_ID, its_groups); + app->subscribe(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENTGROUP_ID); + } +} + +void on_message(const std::shared_ptr &_response) +{ + std::stringstream its_message; + its_message << "CLIENT: received a notification for event [" + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_service() << "." + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_instance() << "." + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_method() << "] to Client/Session [" + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_client() << "/" + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_session() + << "] = "; + + std::shared_ptr its_payload = _response->get_payload(); + its_message << "(" << std::dec << its_payload->get_length() << ") "; + for (uint32_t i = 0; i < its_payload->get_length(); ++i) + its_message << std::hex << std::setw(2) << std::setfill('0') + << (int)its_payload->get_data()[i] << " "; + std::cout << its_message.str() << std::endl; +} + +void on_availability(vsomeip::service_t _service, vsomeip::instance_t _instance, bool _is_available) +{ + std::cout << "CLIENT: Service ID / Instance ID [" + << std::setw(4) << std::setfill('0') << std::hex << _service << "." << _instance + << "] is " + << (_is_available ? "available." : "NOT available.") + << std::endl; + { + std::lock_guard l{mutex}; + service_available = _is_available; + } + condition.notify_one(); +} + +int main() +{ + app = vsomeip::runtime::get()->create_application("Hello"); + app->init(); + app->register_availability_handler(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, on_availability); + app->request_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + app->register_message_handler(vsomeip::ANY_SERVICE, vsomeip::ANY_INSTANCE, vsomeip::ANY_METHOD, on_message); + std::thread receiver(run); + app->start(); +} \ No newline at end of file diff --git a/test_apps/send_events_udp/start_app.sh b/test_apps/send_events_udp/start_app.sh new file mode 100755 index 0000000..2da731a --- /dev/null +++ b/test_apps/send_events_udp/start_app.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +script_dir="$(dirname "$0")" + +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/projects/someip/vsomeip_install/lib/ +VSOMEIP_CONFIGURATION="${script_dir}/vsomeip-client.json" "${script_dir}/send_events_udp" \ No newline at end of file diff --git a/test_apps/send_events_udp/vsomeip-client.json b/test_apps/send_events_udp/vsomeip-client.json new file mode 100644 index 0000000..c0cab8c --- /dev/null +++ b/test_apps/send_events_udp/vsomeip-client.json @@ -0,0 +1,32 @@ +{ + "unicast" : "127.0.0.2", + "logging" : + { + "level" : "verbose", + "console" : "true", + "file" : { "enable" : "false", "path" : "./vsomeip.log" }, + "dlt" : "false" + }, + "applications" : + [ + { + "name" : "Hello", + "id" : "0x1313" + } + ], + "routing" : "Hello", + "service-discovery" : + { + "enable" : "true", + "multicast" : "224.224.224.245", + "port" : "30490", + "protocol" : "udp", + "initial_delay_min" : "10", + "initial_delay_max" : "100", + "repetitions_base_delay" : "200", + "repetitions_max" : "3", + "ttl" : "3", + "cyclic_offer_delay" : "2000", + "request_response_delay" : "1500" + } +} \ No newline at end of file From 46bf66ffd94d92411fca6408aba3ac565b9b974e Mon Sep 17 00:00:00 2001 From: chrizog Date: Thu, 25 Apr 2024 19:40:40 +0200 Subject: [PATCH 3/3] Refactor client service instance and add test for receiving events --- article.md | 48 ---- ...eceive_events.py => receive_events_udp.py} | 30 +- example_apps/send_events_tcp.py | 6 +- src/someipy/__init__.py | 3 +- src/someipy/client_service_instance.py | 261 +++++++++++------- src/someipy/server_service_instance.py | 89 +++--- test_apps/CMakeLists.txt | 25 +- test_apps/README.md | 53 +++- test_apps/build_test_apps.sh | 9 + .../receive_events_udp/receive_events_udp.cpp | 63 +++++ test_apps/receive_events_udp/start_app.sh | 6 + .../receive_events_udp/vsomeip-client.json | 54 ++++ .../send_events.cpp} | 0 .../start_app.sh | 2 +- .../vsomeip-client.json | 0 15 files changed, 429 insertions(+), 220 deletions(-) delete mode 100644 article.md rename example_apps/{receive_events.py => receive_events_udp.py} (78%) create mode 100755 test_apps/build_test_apps.sh create mode 100644 test_apps/receive_events_udp/receive_events_udp.cpp create mode 100755 test_apps/receive_events_udp/start_app.sh create mode 100644 test_apps/receive_events_udp/vsomeip-client.json rename test_apps/{send_events_udp/send_events_udp.cpp => send_events/send_events.cpp} (100%) rename test_apps/{send_events_udp => send_events}/start_app.sh (88%) rename test_apps/{send_events_udp => send_events}/vsomeip-client.json (100%) diff --git a/article.md b/article.md deleted file mode 100644 index 230e3a0..0000000 --- a/article.md +++ /dev/null @@ -1,48 +0,0 @@ -Everybody needs a "five minutes" article to get started with a new topic. Here is a quick overview about SOME/IP. - -## SOME/IP - -SOME/IP stands for "**S**calable service-**O**riented **M**iddlewar**E** over **IP**". It is a communication protocol which targets embedded projects and typically the automotive industry. It is driven by the AUTOSAR (**AUT**omotive **O**pen **S**ystem **AR**chitecture) consortium. Therefore you can find the specification of the SOME/IP communication protocol on [autosar.org](autosar.org), e.g. [Release 22-11](https://www.autosar.org/fileadmin/standards/R22-11/FO/AUTOSAR_PRS_SOMEIPProtocol.pdf). - -## Use Cases - -SOME/IP is typically used for **inter-ECU** communication. Since SOME/IP works over UDP or TCP, it does not matter which operating system is running on the ECUs or PCs. It could Linux, Windows, QNX, AUTOSAR Classic or even a bare-metal microcontroller as long as UDP or TCP is supported. - -For **intra-ECU** communication, using an IPC mechanism is recommended. The SOME/IP serialization for communication on the same ECU is not needed. Therefore an IPC mechanism is the better choice, e.g. Unix Domain Sockets or Shared Memory communication. - -Here is an example network topology showing ECUs and PCs using different operating operating systems. The devices could be connected via switched Ethernet. - -![Network Topology for SOME/IP](./network_topology.svg) - -## Services - -In SOME/IP everything is based on **Services**. A service is a functionality offered by an ECU (= server). A Server can offer multiple services. Clients, i.e. other ECUs can use the offered services. - -### Methods - -For example one ECU could offer a "Calculator service" via SOME/IP. Clients could call the Calculator Service remotely and pass an operation code (add/subtract/multiply/divide) and two operands. The Server will calculate the result and send back a response to the calling client. This type of Service is called a **Method** in SOME/IP. It describes a request-response-communication. - -### Notification Events - -Another type of Services are Notification Events. If one ECU has a button connected and wants to notify other ECUs everytime the button is pressed, it can offer a SOME/IP containing an Event. Other ECUs can subscribe to this service and will receive a notification everytime the button is pressed. Notification Events in SOME/IP describe a publish/subscribe-communication. - -### Services, Instances and IDs - -In SOME/IP Services can be seen as a schema similar to a class in object oriented programming. Services are instantiated into Service instances allowing ECUs to offer multiple instances of the same service. E.g. if there a three buttons connected, the ECU could simply offer three service instances all providing the "button press service". - -Services, Instances, Methods and Events are identified by IDs (numbers) in SOME/IP. In other middlewares, e.g. in ROS (Robot Operating System) instances (ROS topics) are identified by names. - -## On-wire serialization - -Besides providing functionality as Methods and Notification Events, SOME/IP also takes care of the on-wire serialization of data. SOME/IP allows you to use basic datatypes (like unsigned/signed integers and floating point numbers) and strings or define structures and arrays. SOME/IP takes care of properly serializing data on sender-side in order to transport it via UDP or TCP and deserialize data on reception. - -## SOME/IP SD (Service Discovery) - -While it is possible to statically define a network topology and all IDs in a system beforehand, a major feature of SOME/IP is **Service Discovery**. Service Discovery allows a Service to announce itself on the network to potential clients by sending SOME/IP SD "Offer Entries" via UDP multicast. - -Clients can search for services by sending Find Entries via multicast. In case the offered and request IDs match, server and client can dynamically find each other without knowing the IP addresses of each ECU beforehand. For applications using SOME/IP the network topology is therefore transparent which allows it to develop portable applications being only dependent on Service, Service Instance and Method/Event IDs. - - -## Implementations - -## Further Reading \ No newline at end of file diff --git a/example_apps/receive_events.py b/example_apps/receive_events_udp.py similarity index 78% rename from example_apps/receive_events.py rename to example_apps/receive_events_udp.py index d29678d..ee07097 100644 --- a/example_apps/receive_events.py +++ b/example_apps/receive_events_udp.py @@ -2,6 +2,7 @@ import ipaddress import logging +from someipy import ServiceBuilder, EventGroup, TransportLayerProtocol, SomeIpMessage from someipy.service_discovery import construct_service_discovery from someipy.client_service_instance import construct_client_service_instance from someipy.logging import set_someipy_log_level @@ -11,12 +12,15 @@ SD_PORT = 30490 INTERFACE_IP = "127.0.0.1" -SAMPLE_EVENTGROUP_ID = 20 +SAMPLE_SERVICE_ID = 0x1234 +SAMPLE_INSTANCE_ID = 0x5678 +SAMPLE_EVENTGROUP_ID = 0x0321 +SAMPLE_EVENT_ID = 0x0123 -def temperature_callback(payload: bytes) -> None: - print(f"Received {len(payload)} bytes.") - temperature_msg = TemparatureMsg().deserialize(payload) +def temperature_callback(someip_message: SomeIpMessage) -> None: + print(f"Received {len(someip_message.payload)} bytes.") + temperature_msg = TemparatureMsg().deserialize(someip_message.payload) print(temperature_msg) @@ -37,14 +41,24 @@ async def main(): # and port to which the events are sent to and the client will listen to # 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ClientServiceInstance can offer his service to # other ECUs + temperature_eventgroup = EventGroup( + id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID] + ) + temperature_service = ( + ServiceBuilder() + .with_service_id(SAMPLE_SERVICE_ID) + .with_major_version(1) + .with_eventgroup(temperature_eventgroup) + .build() + ) + service_instance_temperature = await construct_client_service_instance( - service_id=1, - instance_id=1000, - major_version=1, - minor_version=0, + service=temperature_service, + instance_id=SAMPLE_INSTANCE_ID, endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3002), ttl=5, sd_sender=service_discovery, + protocol=TransportLayerProtocol.UDP ) # It's possible to optionally register a callback function which will be called when an event from the diff --git a/example_apps/send_events_tcp.py b/example_apps/send_events_tcp.py index 086a341..9a853e0 100644 --- a/example_apps/send_events_tcp.py +++ b/example_apps/send_events_tcp.py @@ -2,10 +2,8 @@ import ipaddress import logging -from someipy import TransportLayerProtocol -from someipy.service import ServiceBuilder, EventGroup +from someipy import TransportLayerProtocol, ServiceBuilder, EventGroup, construct_server_service_instance from someipy.service_discovery import construct_service_discovery -from someipy.server_service_instance import construct_server_service_instance from someipy.logging import set_someipy_log_level from someipy.serialization import Uint8, Uint64, Float32 from temperature_msg import TemparatureMsg @@ -83,7 +81,7 @@ async def main(): try: # Either cyclically send events in an endless loop.. while True: - await asyncio.sleep(5) + await asyncio.sleep(1) tmp_msg.timestamp = Uint64(tmp_msg.timestamp.value + 1) payload = tmp_msg.serialize() service_instance_temperature.send_event( diff --git a/src/someipy/__init__.py b/src/someipy/__init__.py index 68d9611..8a7b7f0 100644 --- a/src/someipy/__init__.py +++ b/src/someipy/__init__.py @@ -1,3 +1,4 @@ from ._internal.someip_sd_header import TransportLayerProtocol from .server_service_instance import ServerServiceInstance, construct_server_service_instance -from .service import Service, ServiceBuilder, EventGroup \ No newline at end of file +from .service import Service, ServiceBuilder, EventGroup +from ._internal.someip_message import SomeIpMessage \ No newline at end of file diff --git a/src/someipy/client_service_instance.py b/src/someipy/client_service_instance.py index d9a252f..83357f7 100644 --- a/src/someipy/client_service_instance.py +++ b/src/someipy/client_service_instance.py @@ -1,82 +1,92 @@ import asyncio -from typing import Tuple, Union, Any, Callable +from typing import Tuple, Callable, Set, List +from someipy import Service from someipy._internal.someip_sd_header import ( SdService, TransportLayerProtocol, SdEventGroupEntry, ) from someipy._internal.someip_sd_builder import build_subscribe_eventgroup_entry -from someipy._internal.someip_header import ( - get_payload_from_someip_message, - SomeIpHeader, -) from someipy._internal.service_discovery_abcs import ( ServiceDiscoveryObserver, ServiceDiscoverySender, ) -from someipy._internal.utils import create_udp_socket, EndpointType, DatagramAdapter +from someipy._internal.tcp_client_manager import TcpClientManager, TcpClientProtocol +from someipy._internal.utils import create_udp_socket, EndpointType from someipy._internal.logging import get_logger - +from someipy._internal.message_types import MessageType +from someipy._internal.someip_endpoint import ( + SomeipEndpoint, + TCPSomeipEndpoint, + UDPSomeipEndpoint, + SomeIpMessage, +) _logger = get_logger("client_service_instance") -class ClientServiceInstance(ServiceDiscoveryObserver): - service_id: int - instance_id: int - major_version: int - minor_version: int - endpoint: EndpointType - ttl: int - sd_sender: ServiceDiscoverySender +class ExpectedAck: + def __init__(self, eventgroup_id: int) -> None: + self.eventgroup_id = eventgroup_id - eventgroup_to_subscribe: int - expect_ack: bool - callback: Callable[[bytes], None] +class ClientServiceInstance(ServiceDiscoveryObserver): + _service: Service + _instance_id: int + _endpoint: EndpointType + _protocol: TransportLayerProtocol + _someip_endpoint: SomeipEndpoint + _ttl: int + _sd_sender: ServiceDiscoverySender + + _eventgroups_to_subscribe: Set[int] + _expected_acks: List[ExpectedAck] + _callback: Callable[[bytes], None] def __init__( self, - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, + protocol: TransportLayerProtocol, + someip_endpoint: SomeipEndpoint, ttl: int = 0, sd_sender=None, ): - self.service_id = service_id - self.instance_id = instance_id - self.major_version = major_version - self.minor_version = minor_version - self.endpoint = endpoint - self.ttl = ttl - self.sd_sender = sd_sender - - self.eventgroup_to_subscribe = -1 - self.expect_ack = False - - self.unicast_transport = None - - self.callback = None - - def register_callback(self, callback: Callable[[bytes], None]) -> None: - self.callback = callback - - def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> None: - # TODO: Test if there is a subscription active for the received data - if self.callback is not None: - header = SomeIpHeader.from_buffer(data) - payload = get_payload_from_someip_message(header, data) - self.callback(payload) - - def connection_lost(self, exc: Exception) -> None: - pass + self._service = service + self._instance_id = instance_id + self._endpoint = endpoint + self._protocol = protocol + self._someip_endpoint = someip_endpoint + self._ttl = ttl + self._sd_sender = sd_sender + + self._eventgroups_to_subscribe = set() + self._expected_acks = [] + self._callback = None + + def register_callback(self, callback: Callable[[SomeIpMessage], None]) -> None: + self._callback = callback + + def someip_message_received( + self, someip_message: SomeIpMessage, addr: Tuple[str, int] + ) -> None: + print(someip_message.header) + if ( + someip_message.header.client_id == 0x00 + and someip_message.header.message_type == MessageType.NOTIFICATION.value + and someip_message.header.return_code == 0x00 + ): + if self._callback is not None: + self._callback(someip_message) def subscribe_eventgroup(self, eventgroup_id: int): - # TODO: Currently only one eventgroup per service is supported - self.eventgroup_to_subscribe = eventgroup_id + if eventgroup_id in self._eventgroups_to_subscribe: + _logger.debug( + f"Eventgroup ID {eventgroup_id} is already in subscription list." + ) + self._eventgroups_to_subscribe.add(eventgroup_id) def stop_subscribe_eventgroup(self, eventgroup_id: int): # TODO: Implement StopSubscribe @@ -87,39 +97,48 @@ def find_service_update(self): pass def offer_service_update(self, offered_service: SdService): - if ( - self.eventgroup_to_subscribe != -1 - and offered_service.service_id == self.service_id - and offered_service.instance_id == self.instance_id - ): - ( - session_id, - reboot_flag, - ) = self.sd_sender.get_unicast_session_handler().update_session() - - subscribe_sd_header = build_subscribe_eventgroup_entry( - service_id=self.service_id, - instance_id=self.instance_id, - major_version=self.major_version, - ttl=self.ttl, - event_group_id=self.eventgroup_to_subscribe, - session_id=session_id, - reboot_flag=reboot_flag, - endpoint=self.endpoint, - protocol=TransportLayerProtocol.UDP, - ) - - # TODO: Subscription shall be only active when ACK is received - self.expect_ack = True + if len(self._eventgroups_to_subscribe) == 0: + return - _logger.debug( - f"Send subscribe for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}, evengroup ID: {self.eventgroup_to_subscribe} TTL: {self.ttl}, version: {self.major_version}.{self.minor_version}, session ID: {session_id}" - ) + if self._service.id != offered_service.service_id: + return + if self._instance_id != offered_service.instance_id: + return - self.sd_sender.send_unicast( - buffer=subscribe_sd_header.to_buffer(), - dest_ip=offered_service.endpoint[0], - ) + if ( + offered_service.service_id == self._service.id + and offered_service.instance_id == self._instance_id + ): + for eventgroup_to_subscribe in self._eventgroups_to_subscribe: + ( + session_id, + reboot_flag, + ) = self._sd_sender.get_unicast_session_handler().update_session() + + # Improvement: Pack all entries into a single SD message + subscribe_sd_header = build_subscribe_eventgroup_entry( + service_id=self._service.id, + instance_id=self._instance_id, + major_version=self._service.major_version, + ttl=self._ttl, + event_group_id=eventgroup_to_subscribe, + session_id=session_id, + reboot_flag=reboot_flag, + endpoint=self._endpoint, + protocol=TransportLayerProtocol.UDP, + ) + + _logger.debug( + f"Send subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, " + f"eventgroup ID: {eventgroup_to_subscribe} TTL: {self._ttl}, version: " + f"session ID: {session_id}" + ) + + self._expected_acks.append(ExpectedAck(eventgroup_to_subscribe)) + self._sd_sender.send_unicast( + buffer=subscribe_sd_header.to_buffer(), + dest_ip=offered_service.endpoint[0], + ) def subscribe_eventgroup_update(self, _, __) -> None: # Not needed for client instance @@ -128,32 +147,76 @@ def subscribe_eventgroup_update(self, _, __) -> None: def subscribe_ack_eventgroup_update( self, event_group_entry: SdEventGroupEntry ) -> None: - if self.expect_ack: - self.expect_ack = False - _logger.debug( - f"Received subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" + new_acks: List[ExpectedAck] = [] + ack_found = False + for expected_ack in self._expected_acks: + if expected_ack.eventgroup_id == event_group_entry.eventgroup_id: + ack_found = True + _logger.debug( + f"Received expected subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" + ) + else: + new_acks.append(expected_ack) + + self._expected_acks = new_acks + if not ack_found: + _logger.warn( + f"Received unexpected subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" ) async def construct_client_service_instance( - service_id: int, + service: Service, instance_id: int, - major_version: int, - minor_version: int, endpoint: EndpointType, ttl: int = 0, sd_sender=None, + protocol=TransportLayerProtocol.UDP, ) -> ClientServiceInstance: - client_instance = ClientServiceInstance( - service_id, instance_id, major_version, minor_version, endpoint, ttl, sd_sender - ) - - loop = asyncio.get_running_loop() - rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) - - unicast_transport, _ = await loop.create_datagram_endpoint( - lambda: DatagramAdapter(target=client_instance), sock=rcv_socket - ) - client_instance.unicast_transport = unicast_transport + if protocol == TransportLayerProtocol.UDP: + loop = asyncio.get_running_loop() + rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) + + _, udp_endpoint = await loop.create_datagram_endpoint( + lambda: UDPSomeipEndpoint(), sock=rcv_socket + ) + + client_instance = ClientServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.UDP, + udp_endpoint, + ttl, + sd_sender, + ) + + udp_endpoint.set_someip_callback(client_instance.someip_message_received) + + return client_instance + + elif protocol == TransportLayerProtocol.TCP: + tcp_client_manager = TcpClientManager() + loop = asyncio.get_running_loop() + server = await loop.create_server( + lambda: TcpClientProtocol(client_manager=tcp_client_manager), + str(endpoint[0]), + endpoint[1], + ) + + tcp_someip_endpoint = TCPSomeipEndpoint(server, tcp_client_manager) + + server_instance = ClientServiceInstance( + service, + instance_id, + endpoint, + TransportLayerProtocol.TCP, + tcp_someip_endpoint, + ttl, + sd_sender, + ) + return server_instance + + client_instance = ClientServiceInstance(service, instance_id, ttl, sd_sender) return client_instance diff --git a/src/someipy/server_service_instance.py b/src/someipy/server_service_instance.py index d2fa76e..7f7884c 100644 --- a/src/someipy/server_service_instance.py +++ b/src/someipy/server_service_instance.py @@ -42,15 +42,6 @@ class ServerServiceInstance(ServiceDiscoveryObserver): - """AI is creating summary for ServerServiceInstance - - Args: - ServiceDiscoveryObserver ([type]): [description] - - Returns: - [type]: [description] - """ - _service: Service _instance_id: int _endpoint: EndpointType @@ -70,7 +61,7 @@ def __init__( endpoint: EndpointType, protocol: TransportLayerProtocol, someip_endpoint: SomeipEndpoint, - ttl: int = 0, + ttl: int = 0, # TTL used for SD Offer entries sd_sender=None, cyclic_offer_delay_ms=2000, ): @@ -186,46 +177,54 @@ def subscribe_eventgroup_update( # sion shall match exactly to the corresponding SubscribeEventgroup Entry to identify # an Eventgroup of a Service Instance. # TODO: enable major version check - if ( - sd_event_group.sd_entry.service_id == self._service.id - and sd_event_group.sd_entry.instance_id == self._instance_id - and sd_event_group.eventgroup_id in self._service.eventgroupids - ): - ( - session_id, - reboot_flag, - ) = self._sd_sender.get_unicast_session_handler().update_session() + if sd_event_group.sd_entry.service_id != self._service.id: + return + if sd_event_group.sd_entry.instance_id != self._instance_id: + return + if sd_event_group.eventgroup_id not in self._service.eventgroupids: + return - _logger.debug( - f"Send Subscribe ACK for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, TTL: {sd_event_group.sd_entry.ttl}" - ) - ack_entry = build_subscribe_eventgroup_ack_entry( - service_id=sd_event_group.sd_entry.service_id, - instance_id=sd_event_group.sd_entry.instance_id, - major_version=sd_event_group.sd_entry.major_version, - ttl=sd_event_group.sd_entry.ttl, - event_group_id=sd_event_group.eventgroup_id, - ) - header_output = build_subscribe_eventgroup_ack_sd_header( - entry=ack_entry, session_id=session_id, reboot_flag=reboot_flag + if ipv4_endpoint_option.protocol != self._protocol: + _logger.warn( + f"Subscribing a different protocol (TCP/UDP) than offered is not supported. Received subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X} " + "from {ipv4_endpoint_option.ipv4_address}/{ipv4_endpoint_option.port} with wrong protocol" ) + return - self._sd_sender.send_unicast( - buffer=header_output.to_buffer(), - dest_ip=ipv4_endpoint_option.ipv4_address, - ) + ( + session_id, + reboot_flag, + ) = self._sd_sender.get_unicast_session_handler().update_session() - self._subscribers.add_subscriber( - EventGroupSubscriber( - eventgroup_id=sd_event_group.eventgroup_id, - endpoint=( - ipv4_endpoint_option.ipv4_address, - ipv4_endpoint_option.port, - ), - ttl=sd_event_group.sd_entry.ttl, - ) + _logger.debug( + f"Send Subscribe ACK for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, TTL: {sd_event_group.sd_entry.ttl}" + ) + ack_entry = build_subscribe_eventgroup_ack_entry( + service_id=sd_event_group.sd_entry.service_id, + instance_id=sd_event_group.sd_entry.instance_id, + major_version=sd_event_group.sd_entry.major_version, + ttl=sd_event_group.sd_entry.ttl, + event_group_id=sd_event_group.eventgroup_id, + ) + header_output = build_subscribe_eventgroup_ack_sd_header( + entry=ack_entry, session_id=session_id, reboot_flag=reboot_flag + ) + + self._sd_sender.send_unicast( + buffer=header_output.to_buffer(), + dest_ip=ipv4_endpoint_option.ipv4_address, + ) + + self._subscribers.add_subscriber( + EventGroupSubscriber( + eventgroup_id=sd_event_group.eventgroup_id, + endpoint=( + ipv4_endpoint_option.ipv4_address, + ipv4_endpoint_option.port, + ), + ttl=sd_event_group.sd_entry.ttl, ) - pass + ) def subscribe_ack_eventgroup_update(self, _: SdEventGroupEntry) -> None: # Not needed for server instance diff --git a/test_apps/CMakeLists.txt b/test_apps/CMakeLists.txt index b4707c4..1eac756 100644 --- a/test_apps/CMakeLists.txt +++ b/test_apps/CMakeLists.txt @@ -5,21 +5,22 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") find_package(vsomeip3 CONFIG REQUIRED) -# TODO: Common function for building a single test target -function(add_test_app_target) - +set(CMAKE_INSTALL_PREFIX "${CMAKE_SOURCE_DIR}/install") - -endfunction(add_test_app_target) +function(create_test_target target_name) + add_executable(${target_name} ${target_name}/${target_name}.cpp) + target_link_libraries(${target_name} ${VSOMEIP_LIBRARIES} pthread) -add_executable(send_events_udp send_events_udp/send_events_udp.cpp) -target_link_libraries(send_events_udp ${VSOMEIP_LIBRARIES} pthread) + install(TARGETS ${target_name} + RUNTIME DESTINATION ${target_name}) -set(CMAKE_INSTALL_PREFIX "${CMAKE_SOURCE_DIR}/install") + file(GLOB ADDITIONAL_FILES "${CMAKE_SOURCE_DIR}/${target_name}/*.json" "${CMAKE_SOURCE_DIR}/${target_name}/*.sh") + install(FILES ${ADDITIONAL_FILES} DESTINATION ${target_name}) + +endfunction() -install(TARGETS send_events_udp - RUNTIME DESTINATION send_events_udp) +# The same app can be used for UDP and TCP for the "send_events" example app +create_test_target("send_events") -file(GLOB JSON_FILES "${CMAKE_SOURCE_DIR}/send_events_udp/*.json" "${CMAKE_SOURCE_DIR}/send_events_udp/*.sh") -install(FILES ${JSON_FILES} DESTINATION send_events_udp) \ No newline at end of file +create_test_target("receive_events_udp") diff --git a/test_apps/README.md b/test_apps/README.md index e1ba551..832b7bf 100644 --- a/test_apps/README.md +++ b/test_apps/README.md @@ -21,16 +21,65 @@ sudo ip addr add 224.224.224.245 dev lo autojoin ### 1. send_events_udp -Open two terminals (one for Python someipy example app, one for test app) +Open two terminals (one for Python someipy example app, one for test app). Start apps: ```bash python3.12 send_events_udp.py -bash ./install/send_events_udp/start_app.sh +bash ./install/send_events/start_app.sh ``` Expected in test app: +- Expected log for UDP endpoint on port 3000: + - *endpoint_manager_impl::create_remote_client: 127.0.0.1:3000 reliable: 0 using local port: 0* + - *udp_client_endpoint_impl::connect: SO_RCVBUF is: 212992 (1703936) local port:0 remote:127.0.0.1:3000* +- Receive event with frequency 1Hz +- Service ID: 0x1234 +- Instance ID: 0x5678 +- Eventgroup ID: 0x0321 +- Event ID: 0x0123 +- SD Offer is sent with 0.5Hz + + +### 1. send_events_tcp + +Open two terminals (one for Python someipy example app, one for test app). + +Start apps: + +```bash +python3.12 send_events_tcp.py +bash ./install/send_events/start_app.sh +``` + +Expected in test app: +- Expected log for TCP endpoint on port 3000: + - *endpoint_manager_impl::create_remote_client: 127.0.0.1:3000 reliable: 1 using local port: 0* + - Check for "reliable == 1" in above log (i.e. TCP) + - No "udp_client_endpoint_impl::connect" shall appear +- Receive event with frequency 1Hz +- Service ID: 0x1234 +- Instance ID: 0x5678 +- Eventgroup ID: 0x0321 +- Event ID: 0x0123 +- SD Offer is sent with 0.5Hz + +### 1. receive_events_udp + +Open two terminals (one for Python someipy example app, one for test app). + +Start apps: + +```bash +python3.12 receive_events_udp.py +bash install/receive_events_udp/start_app.sh +``` + +Expected in test app: +- Expected log for UDP endpoint: REMOTE SUBSCRIBE(0000): [1234.5678.0321] from 127.0.0.1:3002 unreliable was accepted + +Expected in example app: - Receive event with frequency 1Hz - Service ID: 0x1234 - Instance ID: 0x5678 diff --git a/test_apps/build_test_apps.sh b/test_apps/build_test_apps.sh new file mode 100755 index 0000000..0d80bb7 --- /dev/null +++ b/test_apps/build_test_apps.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +rm -rf build install +mkdir -p build +cd build +cmake .. +make +make install +cd .. \ No newline at end of file diff --git a/test_apps/receive_events_udp/receive_events_udp.cpp b/test_apps/receive_events_udp/receive_events_udp.cpp new file mode 100644 index 0000000..bcd9239 --- /dev/null +++ b/test_apps/receive_events_udp/receive_events_udp.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +#define SAMPLE_SERVICE_ID 0x1234 +#define SAMPLE_INSTANCE_ID 0x5678 +#define SAMPLE_EVENTGROUP_ID 0x0321 +#define SAMPLE_EVENT_ID 0x0123 + +std::shared_ptr payload_; +std::shared_ptr app_; +std::set its_groups; + + +void offer() { + app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 1, 0); + app_->offer_event( + SAMPLE_SERVICE_ID, + SAMPLE_INSTANCE_ID, + SAMPLE_EVENT_ID, + its_groups, + vsomeip::event_type_e::ET_FIELD, std::chrono::milliseconds::zero(), + false, true, nullptr, vsomeip::reliability_type_e::RT_UNKNOWN); +} + +void stop_offer() { + app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); +} + +void run() { + vsomeip::byte_t its_data[10]; + uint32_t its_size = 1; + uint32_t loop_counter = 0; + + while (true) { + for (uint32_t i = 0; i < its_size; ++i) { + its_data[i] = static_cast((loop_counter + i) % 255); + } + loop_counter++; + + payload_->set_data(its_data, its_size); + std::cout << "Setting event (Length=" << std::dec << its_size << ")." << std::endl; + app_->notify(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENT_ID, payload_); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } +} + +int main(int argc, char **argv) { + its_groups.insert(SAMPLE_EVENTGROUP_ID); + + app_ = vsomeip::runtime::get()->create_application("Hello"); + app_->init(); + payload_ = vsomeip::runtime::get()->create_payload(); + offer(); + std::thread receiver(run); + app_->start(); +} \ No newline at end of file diff --git a/test_apps/receive_events_udp/start_app.sh b/test_apps/receive_events_udp/start_app.sh new file mode 100755 index 0000000..ae56694 --- /dev/null +++ b/test_apps/receive_events_udp/start_app.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +script_dir="$(dirname "$0")" + +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/projects/someip/vsomeip_install/lib/ +VSOMEIP_CONFIGURATION="${script_dir}/vsomeip-client.json" "${script_dir}/receive_events_udp" \ No newline at end of file diff --git a/test_apps/receive_events_udp/vsomeip-client.json b/test_apps/receive_events_udp/vsomeip-client.json new file mode 100644 index 0000000..fbe5a0c --- /dev/null +++ b/test_apps/receive_events_udp/vsomeip-client.json @@ -0,0 +1,54 @@ +{ + "unicast" : "127.0.0.2", + "logging" : + { + "level" : "verbose", + "console" : "true", + "file" : { "enable" : "false", "path" : "./vsomeip.log" }, + "dlt" : "false" + }, + "applications" : + [ + { + "name" : "Hello", + "id" : "0x1313" + } + ], + "services" : + [ + { + "service" : "0x1234", + "instance" : "0x5678", + "unreliable" : "30509", + "events" : [ + { + "event" : "0x8778", + "is_field" : "false", + "is_reliable" : "false", + "update-cycle" : "0" + } + ], + "eventgroups": [ + { + "eventgroup" : "0x0321", + "events" : [ "0x0123" ] + } + ] + } + ], + "routing" : "Hello", + "service-discovery" : + { + "enable" : "true", + "multicast" : "224.224.224.245", + "port" : "30490", + "protocol" : "udp", + "initial_delay_min" : "10", + "initial_delay_max" : "100", + "repetitions_base_delay" : "200", + "repetitions_max" : "3", + "ttl" : "3", + "cyclic_offer_delay" : "2000", + "request_response_delay" : "1500" + } +} \ No newline at end of file diff --git a/test_apps/send_events_udp/send_events_udp.cpp b/test_apps/send_events/send_events.cpp similarity index 100% rename from test_apps/send_events_udp/send_events_udp.cpp rename to test_apps/send_events/send_events.cpp diff --git a/test_apps/send_events_udp/start_app.sh b/test_apps/send_events/start_app.sh similarity index 88% rename from test_apps/send_events_udp/start_app.sh rename to test_apps/send_events/start_app.sh index 2da731a..31a300a 100755 --- a/test_apps/send_events_udp/start_app.sh +++ b/test_apps/send_events/start_app.sh @@ -3,4 +3,4 @@ script_dir="$(dirname "$0")" export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/projects/someip/vsomeip_install/lib/ -VSOMEIP_CONFIGURATION="${script_dir}/vsomeip-client.json" "${script_dir}/send_events_udp" \ No newline at end of file +VSOMEIP_CONFIGURATION="${script_dir}/vsomeip-client.json" "${script_dir}/send_events" \ No newline at end of file diff --git a/test_apps/send_events_udp/vsomeip-client.json b/test_apps/send_events/vsomeip-client.json similarity index 100% rename from test_apps/send_events_udp/vsomeip-client.json rename to test_apps/send_events/vsomeip-client.json