From 241bdfcc01b7299444c24ae1fdbfee3407a5677a Mon Sep 17 00:00:00 2001 From: Peter Harris Date: Wed, 27 Nov 2024 14:46:30 +0000 Subject: [PATCH 1/3] Add latest gtest release as a submodule --- .gitmodules | 3 +++ source_third_party/gtest | 1 + 2 files changed, 4 insertions(+) create mode 160000 source_third_party/gtest diff --git a/.gitmodules b/.gitmodules index 3dfd42d..9f7e75a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "khronos/egl"] path = khronos/egl url = https://github.com/KhronosGroup/EGL-Registry +[submodule "source_third_party/gtest"] + path = source_third_party/gtest + url = https://github.com/google/googletest diff --git a/source_third_party/gtest b/source_third_party/gtest new file mode 160000 index 0000000..b514bdc --- /dev/null +++ b/source_third_party/gtest @@ -0,0 +1 @@ +Subproject commit b514bdc898e2951020cbdca1304b75f5950d1f59 From 444737bcc9b76b6520f542e8d062e16d2cca41fe Mon Sep 17 00:00:00 2001 From: Peter Harris Date: Wed, 27 Nov 2024 16:32:51 +0000 Subject: [PATCH 2/3] Implement client-server communications module --- .github/workflows/build_test.yaml | 9 + .gitignore | 6 +- CMakeLists.txt | 41 ++ lgl_host_server.py | 54 +++ lglpy/__init__.py | 0 lglpy/server.py | 175 +++++++++ lglpy/service_log.py | 42 +++ lglpy/service_test.py | 47 +++ source_common/CMakeLists.txt | 24 ++ source_common/comms/CMakeLists.txt | 38 ++ source_common/comms/comms_interface.hpp | 130 +++++++ source_common/comms/comms_message.hpp | 126 +++++++ source_common/comms/comms_module.cpp | 259 +++++++++++++ source_common/comms/comms_module.hpp | 219 +++++++++++ source_common/comms/comms_receiver.cpp | 186 +++++++++ source_common/comms/comms_receiver.hpp | 140 +++++++ source_common/comms/comms_transmitter.cpp | 146 +++++++ source_common/comms/comms_transmitter.hpp | 111 ++++++ source_common/comms/test/CMakeLists.txt | 74 ++++ .../comms/test/comms_test_server.cpp | 322 ++++++++++++++++ .../comms/test/comms_test_server.hpp | 178 +++++++++ source_common/comms/test/unittest_comms.cpp | 357 ++++++++++++++++++ .../comms/test/unittest_comms_client.cpp | 139 +++++++ source_common/framework/entry_utils.hpp | 2 +- source_common/framework/utils.hpp | 2 +- source_common/utils/queue.hpp | 143 +++++++ 26 files changed, 2966 insertions(+), 4 deletions(-) create mode 100644 CMakeLists.txt create mode 100644 lgl_host_server.py create mode 100644 lglpy/__init__.py create mode 100644 lglpy/server.py create mode 100644 lglpy/service_log.py create mode 100644 lglpy/service_test.py create mode 100644 source_common/CMakeLists.txt create mode 100644 source_common/comms/CMakeLists.txt create mode 100644 source_common/comms/comms_interface.hpp create mode 100644 source_common/comms/comms_message.hpp create mode 100644 source_common/comms/comms_module.cpp create mode 100644 source_common/comms/comms_module.hpp create mode 100644 source_common/comms/comms_receiver.cpp create mode 100644 source_common/comms/comms_receiver.hpp create mode 100644 source_common/comms/comms_transmitter.cpp create mode 100644 source_common/comms/comms_transmitter.hpp create mode 100644 source_common/comms/test/CMakeLists.txt create mode 100644 source_common/comms/test/comms_test_server.cpp create mode 100644 source_common/comms/test/comms_test_server.hpp create mode 100644 source_common/comms/test/unittest_comms.cpp create mode 100644 source_common/comms/test/unittest_comms_client.cpp create mode 100644 source_common/utils/queue.hpp diff --git a/.github/workflows/build_test.yaml b/.github/workflows/build_test.yaml index 3dea890..cb3b832 100644 --- a/.github/workflows/build_test.yaml +++ b/.github/workflows/build_test.yaml @@ -30,6 +30,15 @@ jobs: cmake -G "Unix Makefiles" -DCMAKE_BUILD_TYPE=Release .. make -j4 + - name: Build and run unit tests + run: | + export CXX=clang++ + mkdir build_unit + cd build_unit + cmake -G "Unix Makefiles" -DCMAKE_BUILD_TYPE=Debug -DCMAKE_INSTALL_PREFIX=./ .. + make -j4 + ./bin/unittest_comms + build-ubuntu-x64-gcc: name: Ubuntu x64 GCC runs-on: ubuntu-22.04 diff --git a/.gitignore b/.gitignore index 5e788e3..3b14f38 100644 --- a/.gitignore +++ b/.gitignore @@ -2,9 +2,11 @@ .vs .vscode +# Python directories +__pycache__ + # CMake build directories -build -build_arm64 +build* # Build and debug output files /.cache diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..b06e853 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,41 @@ +# SPDX-License-Identifier: MIT +# ----------------------------------------------------------------------------- +# Copyright (c) 2024 Arm Limited +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ----------------------------------------------------------------------------- + +cmake_minimum_required(VERSION 3.17) + +set(CMAKE_CXX_STANDARD 20) + +project(libGPULayers_UnitTests VERSION 1.0.0) + +# Build steps +set(LGL_UNITTEST ON) + +# Build GoogleTest framework +set(INSTALL_GTEST OFF CACHE BOOL "" FORCE) +add_subdirectory(source_third_party/gtest) + +# Enable ctest +enable_testing() + +# Build unit tests +add_subdirectory(source_common) diff --git a/lgl_host_server.py b/lgl_host_server.py new file mode 100644 index 0000000..dff7e6a --- /dev/null +++ b/lgl_host_server.py @@ -0,0 +1,54 @@ +# SPDX-License-Identifier: MIT +# ----------------------------------------------------------------------------- +# Copyright (c) 2024 Arm Limited +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ----------------------------------------------------------------------------- + +# This module implements a host server that provides services over the network +# to a layer running on a remote device. + +import sys +import lglpy.server +import lglpy.service_test +import lglpy.service_log + +def main(): + # Create a server instance + server = lglpy.server.CommsServer(63412) + + # Register all the services with it + print(f'Registering host services:') + test_service = lglpy.service_test.TestService() + endpoint_id = server.register_endpoint(test_service) + print(f' - [{endpoint_id}] = {test_service.get_service_name()}') + + log_service = lglpy.service_log.LogService() + endpoint_id = server.register_endpoint(log_service) + print(f' - [{endpoint_id}] = {log_service.get_service_name()}') + print() + + # Start it running + server.run() + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/lglpy/__init__.py b/lglpy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lglpy/server.py b/lglpy/server.py new file mode 100644 index 0000000..e654431 --- /dev/null +++ b/lglpy/server.py @@ -0,0 +1,175 @@ +# SPDX-License-Identifier: MIT +# ----------------------------------------------------------------------------- +# Copyright (c) 2024 Arm Limited +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ----------------------------------------------------------------------------- + +# This module implements the server-side communications module that can +# accept client connections from the layer drivers, and dispatch them to +# handlers in the server. +# +# This module currently only accepts a single connection from a layer at a time +# and runs in the context of the calling thread, so if it needs to run in the +# background the user must create a new thread to contain it. It is therefore +# not possible to implement pseudo-host-driven event loops if the layer is +# using multiple services concurrently - this needs threads per service. + +import enum +import socket +import struct + + +class MessageType(enum.Enum): + ''' + The received message type. + ''' + TX_ASYNC = 0 + TX = 1 + TX_RX = 2 + + +class Message: + ''' + A decoded message header packet. + + See the MessageHeader struct in comms_message.hpp for binary layout. + ''' + + def __init__(self, header): + assert len(header) == 14, 'Header length is incorrect' + + fields = struct.unpack(' str: + return 'registry' + + def register_endpoint(self, endpoint) -> int: + endpoint_id = len(self.endpoints) + self.endpoints[endpoint_id] = endpoint + return endpoint_id + + def handle_message(self, message: Message): + data = [] + for endpoint_id, endpoint in self.endpoints.items(): + name = endpoint.get_service_name().encode('utf-8') + data.append(struct.pack(' str: + return 'test' + + def handle_message(self, message: Message): + payload = message.payload.decode('utf-8') + + print(f'{message.message_type.name}: {payload} ({len(payload)} bytes)') + + if message.message_type == MessageType.TX_RX: + response = payload[::-1] + response = response.encode('utf-8') + return response + + return '' diff --git a/source_common/CMakeLists.txt b/source_common/CMakeLists.txt new file mode 100644 index 0000000..ca4c7df --- /dev/null +++ b/source_common/CMakeLists.txt @@ -0,0 +1,24 @@ +# SPDX-License-Identifier: MIT +# ----------------------------------------------------------------------------- +# Copyright (c) 2024 Arm Limited +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ----------------------------------------------------------------------------- + +add_subdirectory(comms) diff --git a/source_common/comms/CMakeLists.txt b/source_common/comms/CMakeLists.txt new file mode 100644 index 0000000..d0dbb5c --- /dev/null +++ b/source_common/comms/CMakeLists.txt @@ -0,0 +1,38 @@ +# SPDX-License-Identifier: MIT +# ----------------------------------------------------------------------------- +# Copyright (c) 2024 Arm Limited +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ----------------------------------------------------------------------------- + +set(LIB_BINARY lib_layer_comms) + +add_library( + ${LIB_BINARY} STATIC + comms_module.cpp + comms_receiver.cpp + comms_transmitter.cpp) + +target_include_directories( + ${LIB_BINARY} PRIVATE + ../) + +if(${LGL_UNITTEST}) + add_subdirectory(test) +endif() \ No newline at end of file diff --git a/source_common/comms/comms_interface.hpp b/source_common/comms/comms_interface.hpp new file mode 100644 index 0000000..8d5890b --- /dev/null +++ b/source_common/comms/comms_interface.hpp @@ -0,0 +1,130 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The declaration of the external interface to the communications module. + * + * See documentation in @c comms_module.hpp for more information. + */ +#pragma once + +#include +#include +#include +#include + +namespace Comms +{ + +/** + * @brief A type used for service endpoint addresses in the host. + */ +using EndpointID = uint8_t; + +/** + * @brief A type used for the message data payload. + */ +using MessageData = std::vector; + +/** + * @brief A type used for service endpoint addresses in the host. + * + * Note that this hides the built-in registry service, which uses endpoint + * zero, because a normaluser should not be calling it. + */ +static const EndpointID NO_ENDPOINT { 0 }; + +/** + * @brief Abstract base class defining the public interface for comms. + */ +class CommsInterface +{ +public: + virtual ~CommsInterface() { } + + /** + * @brief Is this comms module connected to a host server? + * + * @return Returns @c true if connected, @c false otherwise. + */ + virtual bool is_connected() = 0; + + /** + * @brief Get the service endpoint address for the named service. + * + * @param name The name of the service. + * + * @return The service address, or @c NO_ENDPOINT if service is not found. + */ + virtual EndpointID get_endpoint_id( + const std::string& name) = 0; + + /** + * @brief Asynchronously transmit message to the host. + * + * This function aims not to block, but may do so if the total size of + * messages in the message queue exceeds a threshold size. + * + * @param endpoint The address of the destination service. + * @param data The data to transmit. + */ + virtual void tx_async( + EndpointID endpoint, + std::unique_ptr data) = 0; + + /** + * @brief Synchronously transmit message to the host. + * + * This function will block and wait for the message to be sent to the + * host. This implies draining the message queue, as messages are send + * in-order to the host. + * + * @param endpoint The address of the destination service. + * @param data The data to transmit. + */ + virtual void tx( + EndpointID endpoint, + std::unique_ptr data) = 0; + + /** + * @brief Synchronously transmit message to the host and wait for response. + * + * This function will block and wait for the host service to respond to + * the message. Response timing depends on the implementation of the host + * service, and is not guaranteed to be in order with respect to triggering + * transmit messages. + * + * @param endpoint The address of the destination service. + * @param data The data to transmit. + * + * @return The response message data payload. + */ + virtual std::unique_ptr tx_rx( + EndpointID endpoint, + std::unique_ptr data) = 0; +}; + +} \ No newline at end of file diff --git a/source_common/comms/comms_message.hpp b/source_common/comms/comms_message.hpp new file mode 100644 index 0000000..b88fa24 --- /dev/null +++ b/source_common/comms/comms_message.hpp @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The declaration of the communication module internal message types. + */ +#pragma once + +#include "comms/comms_interface.hpp" +#include "../utils/queue.hpp" + +namespace Comms +{ + +/** + * @brief A type used for message sequence identifiers in the protocol. + */ +using MessageID = uint64_t; + +/** + * @brief A type used for message types in the protocol. + */ +enum class MessageType: uint8_t { + /** Message is an asynchronous transmit. */ + TX_ASYNC = 0, + /** Message is a synchronous transmit. */ + TX = 1, + /** Message is a synchronous transmit and wait for receive. */ + TX_RX = 2, + /** Message is a dummy message to unblock the transmitter thread. */ + STOP = 255 +}; + +/** + * @brief The packed data layout (assuming both ends are little-endian) + */ +typedef struct __attribute__((packed)) +{ + uint8_t message_type; // Is this tx_async (0), tx (1), or tx_rx (2)? + uint8_t endpoint_id; // The endpoint service address. + uint64_t message_id; // The unique message ID for a tx_rx pair. + uint32_t payload_size; // The size of the payload in bytes. +} MessageHeader; + +/** + * @brief Class representing a task in the protocol. + */ +class Message: public Task +{ +public: + /** + * @brief Construct a new message. + * + * @param endpoint_id The destination endpoint. + * @param message_type The type of the message. + * @param message_id The sequence ID of the message. + * @param transmit_data The data to transmit. + */ + Message( + EndpointID endpoint_id, + MessageType message_type, + MessageID message_id, + std::unique_ptr transmit_data) : + endpoint_id(endpoint_id), + message_type(message_type), + message_id(message_id), + transmit_data(std::move(transmit_data)) { } + + /** + * @brief The type of the message. + */ + EndpointID endpoint_id; + + /** + * @brief The type of the message. + */ + MessageType message_type; + + /** + * @brief The sequence ID of the message. + * + * Only required if @c message_type is @c TX_RX and we have to match a + * response to a triggering message. + */ + MessageID message_id; + + /** + * @brief The data to transmit. + * + * Can be reset and data discarded once the data is transmitted. + */ + std::unique_ptr transmit_data; + + /** + * @brief The data that was received. + * + * Only present if @c message_type is @c TX_RX and we have received a + * response from the host. + */ + std::unique_ptr response_data; +}; + +} diff --git a/source_common/comms/comms_module.cpp b/source_common/comms/comms_module.cpp new file mode 100644 index 0000000..a0eddec --- /dev/null +++ b/source_common/comms/comms_module.cpp @@ -0,0 +1,259 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The implementation of the main communications module. + */ + +#include "comms_module.hpp" + +#include +#include +#include +#include +#include +#include + + +namespace Comms +{ + +/** See header for documentation. */ +CommsModule::CommsModule( + const std::string& domain_address +) { + sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd < 0) + { + std::cout << " - ERROR: Client socket create failed" << std::endl; + return; + } + + struct sockaddr_un serv_addr {}; + serv_addr.sun_family = AF_UNIX; + + // Copy the domain address, inserting leading NUL needed for abstract UDS + std::strcpy(serv_addr.sun_path + 1, domain_address.c_str()); + serv_addr.sun_path[0] = '\0'; + + int conn = connect( + sockfd, + reinterpret_cast(&serv_addr), + sizeof(serv_addr)); + if (conn != 0) + { + std::cout << " - ERROR: Client connection failed" << std::endl; + close(sockfd); + sockfd = -1; + return; + } + + transmitter = std::make_unique(*this); + receiver = std::make_unique(*this); +} + +/** See header for documentation. */ +CommsModule::CommsModule( + const std::string& host_address, + int port +) { + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) + { + std::cout << " - ERROR: Client socket create failed" << std::endl; + return; + } + + struct sockaddr_in serv_addr {}; + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(port); + serv_addr.sin_addr.s_addr = inet_addr(host_address.c_str()); + + int conn = connect( + sockfd, + reinterpret_cast(&serv_addr), + sizeof(serv_addr)); + if (conn != 0) + { + std::cout << " - ERROR: Client connection failed" << std::endl; + close(sockfd); + sockfd = -1; + return; + } + + transmitter = std::make_unique(*this); + receiver = std::make_unique(*this); +} + +/** See header for documentation. */ +CommsModule::~CommsModule() +{ + // Stop async worker threads before closing the socket + // Transmitter must be shut down first + if (transmitter) + { + // May take some time as any pending messages must be sent + transmitter->stop(); + } + + if (receiver) + { + receiver->stop(); + } + + // Close the socket after workers have stopped + if (sockfd >= 0) + { + close(sockfd); + } +} + +/** See header for documentation. */ +bool CommsModule::is_connected() +{ + return sockfd >= 0; +} + +/** See header for documentation. */ +EndpointID CommsModule::get_endpoint_id( + const std::string& name +) { + std::lock_guard lock(registry_lock); + if (registry.empty()) + { + // Request the registry from the host + auto data = std::make_unique>(); + auto resp = tx_rx(0, std::move(data)); + + // Process the response + while (resp->size()) + { + // If not enough bytes to read the service header then stop + if (resp->size() < 5) + { + break; + } + + uint8_t id = (*resp)[0]; + size_t size = static_cast((*resp)[4] << 24) + | static_cast((*resp)[3] << 16) + | static_cast((*resp)[2] << 8) + | static_cast((*resp)[1] << 0); + + // If not enough bytes to read the service name then stop + if (resp->size() < 5 + size) + { + break; + } + + std::string name(resp->begin() + 5, resp->begin() + 5 + size); + + // Remove the entry we've read + resp->erase(resp->begin(), resp->begin() + 5 + size); + + // Store the persistent registry entry + registry[name] = id; + } + } + + // Service found + try + { + return registry[name]; + } + // Service not found + catch(std::out_of_range) + { + return NO_ENDPOINT; + } +} + +/** See header for documentation. */ +void CommsModule::tx_async( + EndpointID endpoint, + std::unique_ptr data +) { + auto message = std::make_shared( + endpoint, + MessageType::TX_ASYNC, + 0, + std::move(data)); + + enqueue_message(std::move(message)); +} + +/** See header for documentation. */ +void CommsModule::tx( + EndpointID endpoint, + std::unique_ptr data +) { + auto message = std::make_shared( + endpoint, + MessageType::TX, + 0, + std::move(data)); + + enqueue_message(message); + message->wait(); +} + +/** See header for documentation. */ +std::unique_ptr CommsModule::tx_rx( + EndpointID endpoint, + std::unique_ptr data +) { + auto message = std::make_shared( + endpoint, + MessageType::TX_RX, + assign_message_id(), + std::move(data)); + + enqueue_message(message); + message->wait(); + + return std::move(message->response_data); +} + +/** See header for documentation. */ +MessageID CommsModule::assign_message_id() +{ + return next_message_id.fetch_add(1, std::memory_order_relaxed); +} + +/** See header for documentation. */ +void CommsModule::enqueue_message( + std::shared_ptr message +) { + message_queue.put(std::move(message)); +} + +/** See header for documentation. */ +std::shared_ptr CommsModule::dequeue_message() +{ + return message_queue.get(); +} + +} diff --git a/source_common/comms/comms_module.hpp b/source_common/comms/comms_module.hpp new file mode 100644 index 0000000..8ee4ecb --- /dev/null +++ b/source_common/comms/comms_module.hpp @@ -0,0 +1,219 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The declaration of the main communications module. + * + * Module summary + * ============== + * + * Communications implements a message-based network communications link + * between the layer driver in the device and a tool running on the host + * machine. + * + * The host tool is the network server, and can expose multiple service + * endpoints to the network client in the layer. All messages originate in the + * layer, and messages are targeted at an endpoint service in the host server. + * + * Message types + * ------------- + * + * The protocol supports the following types of messages: + * + * - tx_async (asynchronous transmit) + * - tx (synchronous transmit) + * - tx_rx (synchronous transmit and response receive) + * + * There is no support for the host pushing new messages to the layer, but + * simple pseudo-host-driven event loops can be implement by the layer pulling + * the next event from the host using sequences of tx_rx messages. + * + * Message ordering + * ---------------- + * + * Messages are guaranteed to be transmitted to the host in the order that they + * were added to the transmit queue, irrespective of the endpoint address. + * Endpoints in the host will receive the messages in this order, but behavior + * after that is endpoint specific. + * + * There is no guarantee that responses to tx_rx messages are received in the + * order of the original transmissions, although a specific endpoint may + * guarantee that for its own messages. + * + * Performance + * ----------- + * + * This module is designed to be simple, and will process one message at a + * time. Each message will make at least two socket send calls, one for + * the header and one or more for the data payload. For short messages this + * can add some overhead. Client service layers may want to implement local + * buffering to merge multiple services messages into a single comms message. + */ + +#pragma once + +#include "comms/comms_interface.hpp" +#include "comms/comms_message.hpp" +#include "comms/comms_transmitter.hpp" +#include "comms/comms_receiver.hpp" +#include "utils/queue.hpp" + +namespace Comms +{ + +/** + * @brief The main communications module component. + * + * Exposes the CommsInterface to calling code. + */ +class CommsModule: public CommsInterface +{ +public: + /** + * @brief Construct a new instance using a Unix domain socket. + * + * We will create an abstract domain socket, but the domain_address given + * here must NOT include the leading NUL character needed to create an + * abstract domain socket. + * + * @param domain_address The unix domain address to use. + */ + CommsModule( + const std::string& domain_address); + + /** + * @brief Construct a new instance using a TCP/IP socket. + * + * @param host_address The host name or IP address to use. + * @param port The port number to use. + */ + CommsModule( + const std::string& host_address, + int port); + + /** + * @brief Close the host connection and stop all worker threads. + * + * Any pending transmit messages will be sent before the socket is closed, + * but it is not guaranteed that any responses to tx_rx messages will be + * preceived or processed. + */ + virtual ~CommsModule(); + + /** See @c comms_interface.hpp for documentation. */ + virtual bool is_connected(); + + /** See @c comms_interface.hpp for documentation. */ + virtual EndpointID get_endpoint_id( + const std::string& name); + + /** See @c comms_interface.hpp for documentation. */ + virtual void tx_async( + EndpointID endpoint, + std::unique_ptr data); + + /** See @c comms_interface.hpp for documentation. */ + virtual void tx( + EndpointID endpoint, + std::unique_ptr data); + + /** See @c comms_interface.hpp for documentation. */ + virtual std::unique_ptr tx_rx( + EndpointID endpoint, + std::unique_ptr data); + + // Allow module internal classes to access private members + friend class Transmitter; + friend class Receiver; + +private: + /** + * @brief Get the message ID to use for an outbound tx_rx message. + * + * @return The message ID nonce to use. + */ + MessageID assign_message_id(); + + /** + * @brief Add a message to the end of outbound message task queue. + * + * @param message The message to queue. + */ + void enqueue_message( + std::shared_ptr message); + + /** + * @brief Get the oldest message from the outbound message task queue. + * + * @return The message to send. + */ + std::shared_ptr dequeue_message(); + + /** + * @brief Get the host service endpoint list. + * + * @return The message to send. + */ + void get_host_service_endpoints(); + +private: + /** + * @brief The socket for communications. + */ + int sockfd { -1 }; + + /** + * @brief The last message ID nonce used. + */ + std::atomic next_message_id { 1 }; + + /** + * @brief The FIFO queue of messages to send. + */ + TaskQueue> message_queue; + + /** + * @brief The transmitter - runs with its own worker thread. + */ + std::unique_ptr transmitter; + + /** + * @brief The receiver - runs with its own worker thread. + */ + std::unique_ptr receiver; + + /** + * @brief Lock protecting the registry. + */ + std::mutex registry_lock; + + /** + * @brief Host endpoint registry. + */ + std::unordered_map registry; +}; + +} diff --git a/source_common/comms/comms_receiver.cpp b/source_common/comms/comms_receiver.cpp new file mode 100644 index 0000000..f685e85 --- /dev/null +++ b/source_common/comms/comms_receiver.cpp @@ -0,0 +1,186 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The implementation of the communications module receiver worker. + */ + +#include +#include +#include +#include + +#include "comms/comms_receiver.hpp" +#include "comms_module.hpp" + +namespace Comms +{ +/** See header for documentation. */ +Receiver::Receiver( + CommsModule& parent +) : parent(parent) +{ + int pipe_err = pipe(stop_request_pipe); + if (pipe_err) + { + std::cout << " - ERROR: Client pipe create failed" << std::endl; + } + + // Create and start a worker thread + worker = std::thread(&Receiver::run_receiver, this); +} + +/** See header for documentation. */ +Receiver::~Receiver() +{ + // Stop the worker thread if it's not stopped already + if (!stop_requested) + { + stop(); + } + + // Close the pipes + close(stop_request_pipe[0]); + close(stop_request_pipe[1]); +} + +/** See header for documentation. */ +void Receiver::stop() +{ + // Mark the engine as stopping + stop_requested = true; + + // Poke the pipe to wake the worker thread if it is blocked on a read + int data = 0xdead; + write(stop_request_pipe[1], &data, sizeof(int)); + + // Join on the worker thread + worker.join(); +} + +/** See header for documentation. */ +void Receiver::park_message( + std::shared_ptr message +) { + std::lock_guard lock(parking_lock); + parking_buffer.insert({ message->message_id, std::move(message) }); +} + +/** See header for documentation. */ +void Receiver::run_receiver() +{ + while (!stop_requested) + { + bool data_ok; + + // Read the fixed size message header + MessageHeader header; + data_ok = receive_data(reinterpret_cast(&header), sizeof(header)); + if (!data_ok) + { + break; + } + + // Read the a payload based on the data size in the header + size_t payload_size = header.payload_size; + auto payload = std::make_unique(payload_size); + data_ok = receive_data(payload->data(), payload_size); + if (!data_ok) + { + break; + } + + wake_message(header.message_id, std::move(payload)); + } +} + +/** See header for documentation. */ +void Receiver::wake_message( + MessageID message_id, + std::unique_ptr data +) { + std::lock_guard lock(parking_lock); + + // Handle message not found ... + if (parking_buffer.count(message_id) == 0) + { + std::cout << " - ERROR: Cln: Message " << message_id << " not found" << std::endl; + return; + } + + // Extract the message and remove from the parking buffer map + auto message = parking_buffer[message_id]; + parking_buffer.erase(message_id); + + // Notify the sending thread that the response is available + message->response_data = std::move(data); + message->notify(); +} + +/** See header for documentation. */ +bool Receiver::receive_data( + uint8_t* data, + size_t data_size +) { + int sockfd = parent.sockfd; + int pipefd = stop_request_pipe[0]; + int max_fd = std::max(sockfd, pipefd); + + while (data_size) + { + fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(sockfd, &read_fds); + FD_SET(pipefd, &read_fds); + + int sel_resp = select(max_fd + 1, &read_fds, NULL, NULL, NULL); + // Error + if (sel_resp <= 0) + { + return false; + } + + // Received a stop event on the pipe so exit + if (FD_ISSET(pipefd, &read_fds)) + { + return false; + } + + // Otherwise keep reading bytes until we've read them all + int read_bytes = read(sockfd, data, data_size); + if (read_bytes <= 0) + { + return false; + } + + data += read_bytes; + data_size -= read_bytes; + } + + return true; +} + +} diff --git a/source_common/comms/comms_receiver.hpp b/source_common/comms/comms_receiver.hpp new file mode 100644 index 0000000..498efbf --- /dev/null +++ b/source_common/comms/comms_receiver.hpp @@ -0,0 +1,140 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The declaration of the communications module receiver worker. + */ +#pragma once + +#include +#include +#include + +#include "comms/comms_message.hpp" + +namespace Comms +{ + +// Predeclare to break circular reference +class CommsModule; + +/** + * @brief The network communications receiver component. + */ +class Receiver +{ +public: + /** + * @brief Construct a new receiver and start the worker thread. + * + * @param parent The parent comms module. + */ + Receiver( + CommsModule& parent); + + /** + * @brief Destroy this receiver. + * + * This will stop the worker thread if it hasn't stopped already. + */ + ~Receiver(); + + /** + * @brief Stop the worker thread. + */ + void stop(); + + /** + * @brief Park a new message waiting for a response from the host. + * + * @param message The message waiting for a response. + */ + void park_message( + std::shared_ptr message); + +private: + /** + * @brief Entrypoint for the worker thread. + */ + void run_receiver(); + + /** + * @brief Wake a message with the given message ID. + * + * @param message_id The message to wake. + * @param data The response data payload from the host. + */ + void wake_message( + MessageID message_id, + std::unique_ptr data); + + /** + * @brief Receive N bytes of data from the socket. + * + * @param data The data storage to write to. + * @param data_size The number of bytes expected in the message. + * + * @return @c true if we received a message, @c false otherwise. + */ + bool receive_data( + uint8_t* data, + size_t data_size); + +private: + /** + * @brief The parent module that owns this receiver. + */ + CommsModule& parent; + + /** + * @brief The worker thread running the receiver. + */ + std::thread worker; + + /** + * @brief Has the worker been asked to stop? + */ + std::atomic stop_requested; + + /** + * @brief Pipe used to unblock the read socket rather than use timeouts. + * + * Pipe fds are not duplex: [0] is read fd, [1] is write fd. + */ + int stop_request_pipe[2] {-1, -1}; + + /** + * @brief Lock protecting the parking buffer. + */ + std::mutex parking_lock; + + /** + * @brief Parking buffer holding messages waiting for responses. + */ + std::unordered_map> parking_buffer; +}; + +} \ No newline at end of file diff --git a/source_common/comms/comms_transmitter.cpp b/source_common/comms/comms_transmitter.cpp new file mode 100644 index 0000000..d1b55bc --- /dev/null +++ b/source_common/comms/comms_transmitter.cpp @@ -0,0 +1,146 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The implementation of the communications module transmitter worker. + */ +#include "comms_transmitter.hpp" +#include "comms_module.hpp" + +#include +#include + +namespace Comms +{ + +/** See header for documentation. */ +Transmitter::Transmitter( + CommsModule& parent +) : parent(parent) +{ + // Create and start a worker thread + worker = std::thread(&Transmitter::run_transmitter, this); +} + +/** See header for documentation. */ +Transmitter::~Transmitter() +{ + // Stop the worker thread if it's not stopped already + if (!stop_requested) + { + stop(); + } +} + +/** See header for documentation. */ +void Transmitter::run_transmitter() +{ + // Keep looping until we are told to stop and message queue is empty + while (!stop_requested || !parent.message_queue.is_empty()) + { + auto message = parent.dequeue_message(); + + // Stop messages are just used to wake the thread so do nothing + if (message->message_type == MessageType::STOP) + { + continue; + } + + // TX_RX messages need to be parked waiting for a response before + // we send the message to avoid a race condition + if (message->message_type == MessageType::TX_RX) + { + parent.receiver->park_message(message); + } + + send_message(*message); + + // Notify TX messages to wake up the caller + if (message->message_type == MessageType::TX) + { + message->notify(); + } + } +} + +/** See header for documentation. */ +void Transmitter::stop() +{ + // Mark the engine as stopping + stop_requested = true; + + // Use a dummy message to wake worker thread if blocked on the queue + auto stop_data = std::make_unique(); + auto message = std::make_shared( + 0, MessageType::STOP, 0, std::move(stop_data)); + parent.enqueue_message(message); + + // Join on the worker thread + worker.join(); +} + +/** See header for documentation. */ +void Transmitter::send_message( + const Message& message +) { + uint8_t* data = message.transmit_data->data(); + size_t data_size = message.transmit_data->size(); + + MessageHeader header; + header.message_type = static_cast(message.message_type); + header.endpoint_id = message.endpoint_id; + header.message_id = message.message_id; + header.payload_size = static_cast(data_size); + + // Send the packet header + uint8_t* header_data = reinterpret_cast(&header); + send_data(header_data, sizeof(header)); + + // Send the packet data + send_data(data, data_size); +} + +/** See header for documentation. */ +void Transmitter::send_data( + uint8_t* data, + size_t data_size +) { + while(data_size) + { + ssize_t sent_size = send(parent.sockfd, data, data_size, 0); + // An error occurred or server disconnected + if (sent_size < 0) + { + return; + } + + // Update to indicate remaining data + data_size -= sent_size; + data += sent_size; + } +} + +} diff --git a/source_common/comms/comms_transmitter.hpp b/source_common/comms/comms_transmitter.hpp new file mode 100644 index 0000000..576778e --- /dev/null +++ b/source_common/comms/comms_transmitter.hpp @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The declaration of the communications module transmitter worker. + */ +#pragma once + +#include +#include +#include + +#include "comms/comms_message.hpp" + +namespace Comms +{ + +// Predeclare to break circular reference +class CommsModule; + +/** + * @brief The network communications transmitter component. + */ +class Transmitter +{ +public: + /** + * @brief Construct a new transmitter and start the worker thread. + * + * @param parent The parent comms module. + */ + Transmitter( + CommsModule& parent); + + /** + * @brief Destroy this transmitter. + * + * This will stop the worker thread if it hasn't stopped already. + */ + virtual ~Transmitter(); + + /** + * @brief Drain the message queue and stop the worker thread. + */ + void stop(); + +private: + /** + * @brief Entrypoint for the worker thread. + */ + void run_transmitter(); + + /** + * @brief Send a message on the socket. + * + * @param message The message to send. + */ + void send_message( + const Message& message); + + /** + * @brief Send N bytes of data to the socket. + * + * @param data The data to send. + * @param data_size The number of bytes in the data. + */ + void send_data( + uint8_t* data, + size_t data_size); + +private: + /** + * @brief The parent module that owns this transmitter. + */ + CommsModule& parent; + + /** + * @brief The worker thread running the transmitter. + */ + std::thread worker; + + /** + * @brief Has the worker been asked to stop? + */ + std::atomic stop_requested; +}; + +} diff --git a/source_common/comms/test/CMakeLists.txt b/source_common/comms/test/CMakeLists.txt new file mode 100644 index 0000000..f43357f --- /dev/null +++ b/source_common/comms/test/CMakeLists.txt @@ -0,0 +1,74 @@ +# SPDX-License-Identifier: MIT +# ----------------------------------------------------------------------------- +# Copyright (c) 2024 Arm Limited +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# ----------------------------------------------------------------------------- + +# Build client_server unit test module +set(TEST_BINARY unittest_comms) + +add_executable( + ${TEST_BINARY} + unittest_comms.cpp + comms_test_server.cpp) + +target_include_directories( + ${TEST_BINARY} PRIVATE + ../../ + ${gtest_SOURCE_DIR}/include) + +target_link_libraries( + ${TEST_BINARY} PRIVATE + lib_layer_comms + gtest_main) + +add_test( + NAME ${TEST_BINARY} + COMMAND ${TEST_BINARY}) + +install( + TARGETS ${TEST_BINARY} + DESTINATION bin) + +# Build client-only unit test module using external Python server +set(TEST_BINARY unittest_comms_client) + +add_executable( + ${TEST_BINARY} + unittest_comms_client.cpp) + +target_include_directories( + ${TEST_BINARY} PRIVATE + ../../ + ${gtest_SOURCE_DIR}/include) + +target_link_libraries( + ${TEST_BINARY} PRIVATE + lib_layer_comms + gtest_main) + +add_test( + NAME ${TEST_BINARY} + COMMAND ${TEST_BINARY}) + +install( + TARGETS ${TEST_BINARY} + DESTINATION bin) + diff --git a/source_common/comms/test/comms_test_server.cpp b/source_common/comms/test/comms_test_server.cpp new file mode 100644 index 0000000..2ae0c62 --- /dev/null +++ b/source_common/comms/test/comms_test_server.cpp @@ -0,0 +1,322 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The implementation of the communications module unit test dummy server. + */ + +#include "comms/test/comms_test_server.hpp" + +#include +#include +#include +#include +#include +#include + +namespace CommsTest +{ + +/** See header for documentation. */ +CommsTestServer::CommsTestServer( + const std::string& domain_address +) { + int pipe_err = pipe(stop_request_pipe); + if (pipe_err) + { + std::cout << " - ERROR: Svr pipe create failed" << std::endl; + return; + } + + listen_sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (listen_sockfd < 0) + { + std::cout << " - ERROR: Svr socket create failed" << std::endl; + return; + } + + // Build the address to listen on + struct sockaddr_un serv_addr {}; + serv_addr.sun_family = AF_UNIX; + + // Copy the domain address, inserting leading NUL needed for abstract UDS + std::strcpy(serv_addr.sun_path + 1, domain_address.c_str()); + serv_addr.sun_path[0] = '\0'; + + // Bind the socket to the address + int bind_err = bind( + listen_sockfd, + reinterpret_cast(&serv_addr), + sizeof(struct sockaddr_un)); + if (bind_err) + { + std::cout << " - ERROR: Svr socket bind failed" << std::endl; + close(listen_sockfd); + listen_sockfd = -1; + return; + } + + // Listen on the socket + int listen_err = listen(listen_sockfd, 5); + if(listen_err) + { + std::cout << " - ERROR: Svr socket listen failed" << std::endl; + close(listen_sockfd); + listen_sockfd = -1; + return; + } + + // Create and start a worker thread so we can respond while the test + // thread is blocked waiting for a response to a tx_rx message + worker = std::thread(&CommsTestServer::run_server, this); +} + +/** See header for documentation. */ +CommsTestServer::CommsTestServer( + int port +) { + int pipe_err = pipe(stop_request_pipe); + if (pipe_err) + { + std::cout << " - ERROR: Svr pipe create failed" << std::endl; + return; + } + + listen_sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_sockfd < 0) + { + std::cout << " - ERROR: Svr socket create failed" << std::endl; + return; + } + + int reuse = 1; + int result = setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + if (result < 0) + { + std::cout << " - WARN: Svr socket setsockopt failed" << std::endl; + } + + struct sockaddr_in serv_addr {}; + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(port); + serv_addr.sin_addr.s_addr = INADDR_ANY; + + // Bind the socket to the address + int bind_err = bind( + listen_sockfd, + reinterpret_cast(&serv_addr), + sizeof(struct sockaddr_in)); + if (bind_err) + { + std::cout << " - ERROR: Svr socket bind failed " << std::endl; + close(listen_sockfd); + listen_sockfd = -1; + return; + } + + // Listen on the socket + int listen_err = listen(listen_sockfd, 5); + if(listen_err) + { + std::cout << " - ERROR: Svr socket listen failed" << std::endl; + close(listen_sockfd); + listen_sockfd = -1; + return; + } + + // Create and start a worker thread so we can respond while the test + // thread is blocked waiting for a response to a tx_rx message + worker = std::thread(&CommsTestServer::run_server, this); +} + +/** See header for documentation. */ +CommsTestServer::~CommsTestServer() +{ + // Stop the worker thread if it's not stopped already + if (!stop_requested) + { + stop(); + } + + // Close all the sockets + if (listen_sockfd > 0) + { + close(listen_sockfd); + } + + // Close the pipes + close(stop_request_pipe[0]); + close(stop_request_pipe[1]); +} + +/** See header for documentation. */ +void CommsTestServer::stop() +{ + // Mark the engine as stopping + stop_requested = true; + + // Wake the worker thread if it is blocked on socket read + int data = 0xdead; + write(stop_request_pipe[1], &data, sizeof(int)); + + // Wait for the worker to finish + worker.join(); +} + +/** See header for documentation. */ +void CommsTestServer::run_server() +{ + int data_sockfd = accept(listen_sockfd, NULL, NULL); + if(data_sockfd < 0) + { + std::cout << " - ERROR: Svr socket accept failed" << std::endl; + close(listen_sockfd); + return; + } + + while (!stop_requested) + { + bool data_ok; + + // Read the fixed size message header + Comms::MessageHeader header; + data_ok = receive_data(data_sockfd, reinterpret_cast(&header), sizeof(header)); + if (!data_ok) + { + break; + } + + // Read the a payload based on the data size in the header + size_t payload_size = header.payload_size; + auto payload = std::make_unique(payload_size); + data_ok = receive_data(data_sockfd, payload->data(), payload_size); + if (!data_ok) + { + break; + } + + // Store the message for later checking + std::string decoded_payload(payload->begin(), payload->end()); + received.emplace_back( + static_cast(header.endpoint_id), + static_cast(header.message_type), + std::move(payload)); + + // If this is a tx_rx message reverse payload and send it back ... + if (header.message_type == static_cast(Comms::MessageType::TX_RX)) + { + // Response data is same size as request data so we can reuse header + std::vector response_data; + if (decoded_payload.size() > 0) + { + size_t data_len = decoded_payload.size(); + for (size_t i = 0; i < data_len; i++) + { + response_data.push_back(decoded_payload[data_len - i - 1]); + } + } + + // Send the packet header + uint8_t* header_data = reinterpret_cast(&header); + send_data(data_sockfd, header_data, sizeof(header)); + + // Send the packet data + send_data(data_sockfd, response_data.data(), payload_size); + } + } + + close(data_sockfd); +} + +/** See header for documentation. */ +bool CommsTestServer::receive_data( + int sockfd, + uint8_t* data, + size_t data_size +) { + int pipefd = stop_request_pipe[0]; + int max_fd = std::max(sockfd, pipefd); + + while (data_size) + { + fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(sockfd, &read_fds); + FD_SET(pipefd, &read_fds); + + int sel_resp = select(max_fd + 1, &read_fds, NULL, NULL, NULL); + // Error + if (sel_resp <= 0) + { + std::cout << " - ERROR: Svr select failed" << std::endl; + return false; + } + + // Received a stop event on the pipe so exit + if (FD_ISSET(pipefd, &read_fds)) + { + return false; + } + + // Otherwise keep reading bytes until we've read them all + int read_bytes = read(sockfd, data, data_size); + + // Has the client-side of the connection been closed? + if (read_bytes <= 0) + { + return false; + } + + data += read_bytes; + data_size -= read_bytes; + } + + return true; +} + +/** See header for documentation. */ +void CommsTestServer::send_data( + int sockfd, + uint8_t* data, + size_t data_size +) { + while(data_size) + { + ssize_t sent_size = send(sockfd, data, data_size, 0); + // An error occurred + if (sent_size < 0) + { + std::cout << " - ERROR: Svr socket send failed" << std::endl; + return; + } + + // Update to indicate remaining data + data_size -= sent_size; + data += sent_size; + } +} + +} diff --git a/source_common/comms/test/comms_test_server.hpp b/source_common/comms/test/comms_test_server.hpp new file mode 100644 index 0000000..205c1c5 --- /dev/null +++ b/source_common/comms/test/comms_test_server.hpp @@ -0,0 +1,178 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The definition of the communications module unit test dummy server. + */ + +#pragma once + +#include +#include +#include + +#include "comms/comms_message.hpp" + +namespace CommsTest +{ + +/** + * @brief A recording of a message received by the test server. + */ +class TestMessage +{ +public: + /** + * @brief Construct a new message. + * + * @param endpoint_id The destination endpoint. + * @param message_type The type of the message. + * @param data The received data. + */ + TestMessage( + Comms::EndpointID endpoint_id, + Comms::MessageType message_type, + std::unique_ptr data) : + endpoint_id(endpoint_id), + message_type(message_type), + data(std::move(data)) { } + + /** + * @brief The endpoint of the message. + */ + Comms::EndpointID endpoint_id; + + /** + * @brief The type of the message. + */ + Comms::MessageType message_type; + + /** + * @brief The received data. + */ + std::unique_ptr data; +}; + + +class CommsTestServer +{ +public: + /** + * @brief Construct a new server listening on a domain socket. + * + * Note that the UDS address given here must exclude the leading NUL, to + * avoid it being seen as a zero-length string literal. + * + * @param domain_address The unix domain address to use. + */ + CommsTestServer( + const std::string& domain_address); + + /** + * @brief Construct a new server listening on TCP/IP socket. + * + * @param port The port number to use. + */ + CommsTestServer( + int port); + + /** + * @brief Close the host connection and stop all worker threads. + * + * Any pending transmit messages will be sent before the socket is closed, + * but it is not guaranteed that any responses to tx_rx messages will be + * preceived or processed. + */ + ~CommsTestServer(); + +private: + /** + * @brief Stop the worker thread. + */ + void stop(); + + /** + * @brief Entrypoint for the worker thread. + */ + void run_server(); + + /** + * @brief Receive N bytes of data from the socket. + * + * @param sockfd The client connection socket. + * @param data The data storage to write to. + * @param data_size The number of bytes expected in the message. + * + * @return @c true if we received a message, @c false otherwise. + */ + bool receive_data( + int sockfd, + uint8_t* data, + size_t data_size); + + /** + * @brief Send N bytes of data to the socket. + * + * @param sockfd The client connection socket. + * @param data The data to send. + * @param data_size The number of bytes in the data. + */ + void send_data( + int sockfd, + uint8_t* data, + size_t data_size); + +public: + /** + * @brief List of messages received by this test server for later checking. + */ + std::vector received; + +private: + /** + * @brief The socket for listening for connections. + */ + int listen_sockfd { -1 }; + + /** + * @brief Pipe used to unblock the read socket rather than use timeouts. + * + * Pipe fds are not duplex: [0] is read fd, [1] is write fd. + */ + int stop_request_pipe[2] {-1, -1}; + + /** + * @brief The transmitter - runs with its own worker thread. + */ + std::thread worker; + + /** + * @brief Has the worker been asked to stop? + */ + std::atomic stop_requested; +}; + +} diff --git a/source_common/comms/test/unittest_comms.cpp b/source_common/comms/test/unittest_comms.cpp new file mode 100644 index 0000000..eb1903c --- /dev/null +++ b/source_common/comms/test/unittest_comms.cpp @@ -0,0 +1,357 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The implementation of the communications module unit tests. + * + * Note that all tests that validate server-side message receipt must end with + * a tx_rx message to guarantee that the server received all earlier messages. + * A normal tx message guarantees that the messages is sent before returning, + * but does not guarantee that the server has received and processed it. + */ +#include + +#include "comms/comms_interface.hpp" +#include "comms/comms_module.hpp" +#include "comms/test/comms_test_server.hpp" + +using namespace CommsTest; + +std::unique_ptr make_test_payload( + const std::string& str +) { + auto data = std::make_unique(str.begin(), str.end()); + return data; +} + +std::string decode_test_payload( + std::unique_ptr data +) { + std::string str(data->begin(), data->end()); + return str; +} + +std::string decode_test_payload( + TestMessage& msg +) { + std::string str(msg.data->begin(), msg.data->end()); + return str; +} + +// ---------------------------------------------------------------------------- +// Tests using a unix domain socket + +/** @brief Test lifecycle with no sent messages. */ +TEST(Comms, test_uds_no_data) +{ + CommsTest::CommsTestServer server("commstest"); + Comms::CommsModule client("commstest"); +} + +/** @brief Test lifecycle with a TX sent message. */ +TEST(Comms, test_uds_tx_0b) +{ + CommsTest::CommsTestServer server("commstest"); + Comms::CommsModule client("commstest"); + + // Send a zero byte payload + auto data = std::make_unique>(); + client.tx(1, std::move(data)); + + // Ensure server processes the earlier message + data = make_test_payload(""); + client.tx_rx(2, std::move(data)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 0); +} + +/** @brief Test lifecycle with a TX sent message. */ +TEST(Comms, test_uds_tx_nb) +{ + CommsTest::CommsTestServer server("commstest"); + Comms::CommsModule client("commstest"); + + // Send a non-zero byte payload + auto data = make_test_payload("abcd"); + client.tx(2, std::move(data)); + + // Ensure server processes the earlier message + data = make_test_payload(""); + client.tx_rx(2, std::move(data)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 2); + EXPECT_EQ(server.received[0].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[0]),"abcd"); +} + +/** @brief Test lifecycle with a TX_ASYNC sent message. */ +TEST(Comms, test_uds_tx_async_0b) +{ + CommsTest::CommsTestServer server("commstest"); + Comms::CommsModule client("commstest"); + + // Send a zero byte payload + auto data = std::make_unique>(); + client.tx_async(1, std::move(data)); + + // Ensure server processes the earlier message + data = make_test_payload("abcd"); + client.tx_rx(2, std::move(data)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 0); + + EXPECT_EQ(server.received[1].endpoint_id, 2); + EXPECT_EQ(server.received[1].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[1]),"abcd"); +} + +/** @brief Test lifecycle with a TX_ASYNC sent message. */ +TEST(Comms, test_uds_tx_async_nb) +{ + CommsTest::CommsTestServer server("commstest"); + Comms::CommsModule client("commstest"); + + // Send a non-zero byte payload + auto datab = make_test_payload("abcd"); + client.tx_async(1, std::move(datab)); + + // Ensure server processes the earlier message + datab = make_test_payload("efg"); + client.tx_rx(2, std::move(datab)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[0]),"abcd"); + + EXPECT_EQ(server.received[1].endpoint_id, 2); + EXPECT_EQ(server.received[1].data->size(), 3); + EXPECT_EQ(decode_test_payload(server.received[1]),"efg"); +} + +/** @brief Test lifecycle with a TX_RX sent message. */ +TEST(Comms, test_uds_tx_rx_0b) +{ + CommsTest::CommsTestServer server("commstest"); + Comms::CommsModule client("commstest"); + + // Send a zero byte payload + auto data = std::make_unique>(); + auto resp = client.tx_rx(1, std::move(data)); + auto resps = decode_test_payload(std::move(resp)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 1); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 0); + + // Validate it was responded to correctly + EXPECT_EQ(resps.size(), 0); +} + +/** @brief Test lifecycle with a TX_RX sent message. */ +TEST(Comms, test_uds_tx_rx_nb) +{ + CommsTest::CommsTestServer server("commstest"); + Comms::CommsModule client("commstest"); + + // Send a non-zero byte payload + auto datab = make_test_payload("abcd"); + auto resp = client.tx_rx(1, std::move(datab)); + auto resps = decode_test_payload(std::move(resp)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 1); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[0]),"abcd"); + + // Validate it was responded to correctly + EXPECT_EQ(resps.size(), 4); + EXPECT_EQ(resps,"dcba"); +} + +// ---------------------------------------------------------------------------- +// Tests using a TCP/IP socket +/** @brief Test lifecycle with no sent messages. */ +TEST(Comms, test_tcp_no_data) +{ + CommsTest::CommsTestServer server(63412); + Comms::CommsModule client("127.0.0.1", 63412); +} + +/** @brief Test lifecycle with a TX sent message. */ +TEST(Comms, test_tcp_tx_0b) +{ + CommsTest::CommsTestServer server(63412); + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a zero byte payload + auto data = std::make_unique>(); + client.tx(1, std::move(data)); + + // Ensure server processes the earlier message + data = make_test_payload(""); + client.tx_rx(2, std::move(data)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 0); +} + +/** @brief Test lifecycle with a TX sent message. */ +TEST(Comms, test_tcp_tx_nb) +{ + CommsTest::CommsTestServer server(63412); + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a non-zero byte payload + auto data = make_test_payload("abcd"); + client.tx(2, std::move(data)); + + // Ensure server processes the earlier message + data = make_test_payload(""); + client.tx_rx(2, std::move(data)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 2); + EXPECT_EQ(server.received[0].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[0]),"abcd"); +} + +/** @brief Test lifecycle with a TX_ASYNC sent message. */ +TEST(Comms, test_tcp_tx_async_0b) +{ + CommsTest::CommsTestServer server(63412); + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a zero byte payload + auto data = std::make_unique>(); + client.tx_async(1, std::move(data)); + + // Ensure server processes the earlier message + data = make_test_payload("abcd"); + client.tx_rx(2, std::move(data)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 0); + + EXPECT_EQ(server.received[1].endpoint_id, 2); + EXPECT_EQ(server.received[1].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[1]),"abcd"); +} + +/** @brief Test lifecycle with a TX_ASYNC sent message. */ +TEST(Comms, test_tcp_tx_async_nb) +{ + CommsTest::CommsTestServer server(63412); + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a non-zero byte payload + auto datab = make_test_payload("abcd"); + client.tx_async(1, std::move(datab)); + + // Ensure server processes the earlier message + datab = make_test_payload("efg"); + client.tx_rx(2, std::move(datab)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 2); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[0]),"abcd"); + + EXPECT_EQ(server.received[1].endpoint_id, 2); + EXPECT_EQ(server.received[1].data->size(), 3); + EXPECT_EQ(decode_test_payload(server.received[1]),"efg"); +} + +/** @brief Test lifecycle with a TX_RX sent message. */ +TEST(Comms, test_tcp_tx_rx_0b) +{ + CommsTest::CommsTestServer server(63412); + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a zero byte payload + auto data = std::make_unique>(); + auto resp = client.tx_rx(1, std::move(data)); + auto resps = decode_test_payload(std::move(resp)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 1); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 0); + + // Validate it was responded to correctly + EXPECT_EQ(resps.size(), 0); +} + +/** @brief Test lifecycle with a TX_RX sent message. */ +TEST(Comms, test_tcp_tx_rx_nb) +{ + CommsTest::CommsTestServer server(63412); + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a non-zero byte payload + auto datab = make_test_payload("abcd"); + auto resp = client.tx_rx(1, std::move(datab)); + auto resps = decode_test_payload(std::move(resp)); + + // Validate it was received correctly + EXPECT_EQ(server.received.size(), 1); + + EXPECT_EQ(server.received[0].endpoint_id, 1); + EXPECT_EQ(server.received[0].data->size(), 4); + EXPECT_EQ(decode_test_payload(server.received[0]),"abcd"); + + // Validate it was responded to correctly + EXPECT_EQ(resps.size(), 4); + EXPECT_EQ(resps,"dcba"); +} diff --git a/source_common/comms/test/unittest_comms_client.cpp b/source_common/comms/test/unittest_comms_client.cpp new file mode 100644 index 0000000..8966422 --- /dev/null +++ b/source_common/comms/test/unittest_comms_client.cpp @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The implementation of the communications module client-only unit tests. + * + * For these tests to run an instance of the real host Python server must have + * been started for the tests to run against. These tests are semi-manual as + * we have no way of validating what was received by the server for any + * message other than tx_rx. + * + * TODO: Get the test server to echo back tx and tx_async messages in a + * subsequent tx_rx message. + */ +#include + +#include "comms/comms_interface.hpp" +#include "comms/comms_module.hpp" + +std::unique_ptr make_test_payload( + const std::string& str +) { + auto data = std::make_unique(str.begin(), str.end()); + return data; +} + +std::string decode_test_payload( + std::unique_ptr data +) { + std::string str(data->begin(), data->end()); + return str; +} + +// ---------------------------------------------------------------------------- +// Tests using a TCP/IP socket +/** @brief Test lifecycle with no sent messages. */ +TEST(CommsClient, test_tcp_no_data) +{ + Comms::CommsModule client("127.0.0.1", 63412); +} + +TEST(CommsClient, test_tcp_registry) +{ + Comms::CommsModule client("127.0.0.1", 63412); + client.get_endpoint_id("dave"); +} + +/** @brief Test lifecycle with a TX sent message. */ +TEST(CommsClient, test_tcp_tx_0b) +{ + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a zero byte payload + auto data = std::make_unique>(); + client.tx(1, std::move(data)); +} + +/** @brief Test lifecycle with a TX sent message. */ +TEST(CommsClient, test_tcp_tx_nb) +{ + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a 4 byte payload + auto data = make_test_payload("abcd"); + client.tx(1, std::move(data)); +} + +/** @brief Test lifecycle with a TX_ASYNC sent message. */ +TEST(CommsClient, test_tcp_tx_async_0b) +{ + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a zero byte payload + auto data = std::make_unique>(); + client.tx_async(1, std::move(data)); +} + +/** @brief Test lifecycle with a TX_ASYNC sent message. */ +TEST(CommsClient, test_tcp_tx_async_nb) +{ + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a 4 byte payload + auto data = make_test_payload("abcd"); + client.tx_async(1, std::move(data)); +} + +/** @brief Test lifecycle with a TX_RX sent message. */ +TEST(CommsClient, test_tcp_tx_rx_0b) +{ + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a zero byte payload + auto data = std::make_unique>(); + auto resp = client.tx_rx(1, std::move(data)); + auto resps = decode_test_payload(std::move(resp)); + + // Validate it was responded to correctly + EXPECT_EQ(resps.size(), 0); + +} + +/** @brief Test lifecycle with a TX_RX sent message. */ +TEST(CommsClient, test_tcp_tx_rx_nb) +{ + Comms::CommsModule client("127.0.0.1", 63412); + + // Send a 4 byte payload + auto data = make_test_payload("abcd"); + auto resp = client.tx_rx(1, std::move(data)); + auto resps = decode_test_payload(std::move(resp)); + + // Validate it was responded to correctly + EXPECT_EQ(resps.size(), 4); + EXPECT_EQ(resps, "dcba"); +} \ No newline at end of file diff --git a/source_common/framework/entry_utils.hpp b/source_common/framework/entry_utils.hpp index bee6351..395698c 100644 --- a/source_common/framework/entry_utils.hpp +++ b/source_common/framework/entry_utils.hpp @@ -24,7 +24,7 @@ */ /** - * \file + * @file * This module exposes common functionality used by layer entrypoints, * implemented as library code which can be swapped for alternative * implementations on a per-layer basis if needed. diff --git a/source_common/framework/utils.hpp b/source_common/framework/utils.hpp index 11254f3..34b4e50 100644 --- a/source_common/framework/utils.hpp +++ b/source_common/framework/utils.hpp @@ -24,7 +24,7 @@ */ /** - * \file + * @file * This module implements miscellaneous utility functions. */ diff --git a/source_common/utils/queue.hpp b/source_common/utils/queue.hpp new file mode 100644 index 0000000..9702bed --- /dev/null +++ b/source_common/utils/queue.hpp @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: MIT + * ---------------------------------------------------------------------------- + * Copyright (c) 2024 Arm Limited + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + */ + +/** + * @file + * The implementation of a basic thread-safe task queue. + */ + +#pragma once + +#include +#include +#include +#include + +/** + * @brief Baseclass for a task. + */ +class Task +{ +public: + /** + * @brief Destroy the task. + */ + virtual ~Task() { } + + /** + * @brief Wait for the task to be complete. + */ + void wait() { + std::unique_lock lock(condition_lock); + complete_condition.wait(lock, [this]{ return complete.load(); }); + } + + /** + * @brief Notify that the task is complete. + */ + void notify() { + std::unique_lock lock(condition_lock); + complete = true; + lock.unlock(); + complete_condition.notify_all(); + } + +private: + /** @brief Task completion status. */ + std::atomic complete { false }; + + /** @brief Condition variable for notifications. */ + std::condition_variable complete_condition; + + /** @brief Lock for notifications. */ + std::mutex condition_lock; +}; + + +/** + * @brief A thread-safe FIFO task queue. + */ +template +class TaskQueue +{ +private: + /** @brief Lock for thread-safe access. */ + std::mutex store_lock; + + /** @brief Condition variable for notifications. */ + std::condition_variable condition; + + /** @brief Dequeue for data storage. */ + std::deque store; + +public: + /** + * @brief Add a new task to the end of the queue. + * + * @param task The new task to append to the queue. + */ + void put(T task) + { + std::lock_guard lock(store_lock); + store.push_back(task); + condition.notify_one(); + } + + /** + * @brief Get the oldest task from the head of the queue. + * + * This function blocks until a task is available. + * + * @return The oldest task in the list. + */ + T get() + { + std::unique_lock lock(store_lock); + + // Release lock until we have data, and then reacquire + while(store.empty()) + { + condition.wait(lock); + } + + T task = store.front(); + store.pop_front(); + return task; + } + + /** + * @brief Test if the queue is currently empty. + * + * Note that this is racy so the queue may not stay empty if other + * threads are still using it ... + * + * @return @c true if the queue is empty, @c false otherwise. + */ + bool is_empty() + { + std::unique_lock lock(store_lock); + return store.empty(); + } +}; From 75c573d6d39c8a820a4844efe5391d9bb133777d Mon Sep 17 00:00:00 2001 From: Peter Harris Date: Fri, 29 Nov 2024 16:03:02 +0000 Subject: [PATCH 3/3] Fix unit test --- .github/workflows/build_test.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build_test.yaml b/.github/workflows/build_test.yaml index cb3b832..3299fdb 100644 --- a/.github/workflows/build_test.yaml +++ b/.github/workflows/build_test.yaml @@ -33,10 +33,10 @@ jobs: - name: Build and run unit tests run: | export CXX=clang++ - mkdir build_unit - cd build_unit + mkdir build_unittest + cd build_unittest cmake -G "Unix Makefiles" -DCMAKE_BUILD_TYPE=Debug -DCMAKE_INSTALL_PREFIX=./ .. - make -j4 + make install -j4 ./bin/unittest_comms build-ubuntu-x64-gcc: