From e76978e6c083d116122e9241c611916cd5d0ee12 Mon Sep 17 00:00:00 2001 From: Daniel Edwards Date: Thu, 14 Aug 2025 08:36:09 +0200 Subject: [PATCH] itest: rewrite the participant modes itest --- .../ITest_Internals_ParticipantModes.cpp | 1946 +++++++++-------- 1 file changed, 975 insertions(+), 971 deletions(-) diff --git a/SilKit/IntegrationTests/ITest_Internals_ParticipantModes.cpp b/SilKit/IntegrationTests/ITest_Internals_ParticipantModes.cpp index d1c5aa777..22cb65ffd 100644 --- a/SilKit/IntegrationTests/ITest_Internals_ParticipantModes.cpp +++ b/SilKit/IntegrationTests/ITest_Internals_ParticipantModes.cpp @@ -2,15 +2,21 @@ // // SPDX-License-Identifier: MIT +#include "gmock/gmock.h" +#include "gtest/gtest.h" + #include #include +#include #include #include +#include +#include +#include #include -#include - -#include "gmock/gmock.h" -#include "gtest/gtest.h" +#include +#include +#include #include "silkit/SilKit.hpp" #include "silkit/services/orchestration/all.hpp" @@ -20,1229 +26,1227 @@ #include "silkit/services/orchestration/string_utils.hpp" #include "silkit/experimental/participant/ParticipantExtensions.hpp" -#include "ConfigurationTestUtils.hpp" -#include "IParticipantInternal.hpp" -#include "CreateParticipantImpl.hpp" +#include namespace { +namespace SK { +using SilKit::IParticipant; +using SilKit::CreateParticipant; +using SilKit::Config::IParticipantConfiguration; +using SilKit::Config::ParticipantConfigurationFromString; +using SilKit::Experimental::Participant::CreateSystemController; +using SilKit::Experimental::Services::Orchestration::ISystemController; +using SilKit::Services::Orchestration::ILifecycleService; +using SilKit::Services::Orchestration::ITimeSyncService; +using SilKit::Services::Orchestration::ISystemMonitor; +using SilKit::Services::Orchestration::OperationMode; +using SilKit::Services::Orchestration::ParticipantState; +using SilKit::Services::Orchestration::ParticipantStatus; +using SilKit::Services::Orchestration::SystemState; +using SimulationStepHandler = SilKit::Services::Orchestration::ITimeSyncService::SimulationStepHandler; +using SystemStateHandler = SilKit::Services::Orchestration::ISystemMonitor::SystemStateHandler; +using ParticipantStatusHandler = SilKit::Services::Orchestration::ISystemMonitor::ParticipantStatusHandler; +using SilKit::Services::PubSub::IDataPublisher; +using SilKit::Services::PubSub::IDataSubscriber; +using SilKit::Services::PubSub::DataMessageEvent; +using SilKit::Services::PubSub::DataMessageHandler; +using SilKit::Services::PubSub::PubSubSpec; +using SilKit::Vendor::Vector::ISilKitRegistry; +using SilKit::Vendor::Vector::CreateSilKitRegistry; +using ByteSpan = SilKit::Util::Span; +} // namespace SK + using namespace std::chrono_literals; -using namespace SilKit; -using namespace SilKit::Services::Orchestration; -using namespace SilKit::Config; -using namespace SilKit::Services::PubSub; -const std::string systemControllerParticipantName{"systemControllerParticipant"}; -const std::string topic{"Topic"}; -const std::string mediaType{"A"}; +constexpr auto SYSTEM_CONTROLLER_PARTICIPANT_NAME = "SystemController"; +constexpr auto PUBSUB_TOPIC = "Topic"; +constexpr auto PUBSUB_MEDIA_TYPE = "MediaType"; -std::atomic abortSystemControllerRequested{false}; +constexpr auto ASYNC_DELAY_BETWEEN_PUBLICATION = 50ms; -static size_t expectedReceptions = 0; -static size_t globalParticipantIndex = 0; +constexpr auto PARTICIPANT_CONFIGURATION = R"( +#Logging: +# Sinks: +# - Type: Stdout +# Level: Debug -static constexpr std::chrono::seconds TEST_TIMEOUT{20}; +#Experimental: +# TimeSynchronization: +# AnimationFactor: 1.0 +)"; -std::chrono::milliseconds communicationTimeout{10000ms}; -std::chrono::milliseconds asyncDelayBetweenPublication{50ms}; +enum class CoordinationMode +{ + Optional, + Required, +}; -enum class TimeMode +enum struct ParticipantMode { - Async, - Sync + AutonomousFreerunning, + AutonomousSynchronized, + CoordinatedFreerunning, + CoordinatedSynchronized, + IgnorantFreerunning, }; -class ITest_Internals_ParticipantModes : public testing::Test +auto operator<<(std::ostream& os, const ParticipantMode runnerType) -> std::ostream& { -protected: - ITest_Internals_ParticipantModes() {} + switch (runnerType) + { + case ParticipantMode::AutonomousFreerunning: + os << "AutonomousFreerunning"; + break; + case ParticipantMode::AutonomousSynchronized: + os << "AutonomousSynchronized"; + break; + case ParticipantMode::CoordinatedFreerunning: + os << "CoordinatedFreerunning"; + break; + case ParticipantMode::CoordinatedSynchronized: + os << "CoordinatedSynchronized"; + break; + case ParticipantMode::IgnorantFreerunning: + os << "IgnorantFreerunning"; + break; + } + + return os; +} - struct TestParticipant +constexpr auto ALL_RUNNER_TYPES = + std::array{ParticipantMode::AutonomousFreerunning, ParticipantMode::AutonomousSynchronized, + ParticipantMode::CoordinatedFreerunning, ParticipantMode::CoordinatedSynchronized, + ParticipantMode::IgnorantFreerunning}; + +// ===================================================================================================================== + +class WorkQueue +{ +public: + template + auto Push(F&& f) -> std::optional> { - TestParticipant(const std::string& newName, TimeMode newTimeMode, OperationMode newOperationMode) + if (_shutdown) { - name = newName; - id = static_cast(globalParticipantIndex++); - timeMode = newTimeMode; - lifeCycleOperationMode = newOperationMode; + return std::nullopt; } - TestParticipant(TestParticipant&&) = default; - TestParticipant& operator=(TestParticipant&&) = default; + std::packaged_task task{std::forward(f)}; + auto future = task.get_future(); - struct ImmovableMembers { - ImmovableMembers() = default; + std::lock_guard lock{_mutex}; + _queue.push(std::move(task)); + } - ImmovableMembers(ImmovableMembers&& other) noexcept - : allReceived{other.allReceived.load()} - , stopRequested{other.stopRequested.load()} - , errorStateReached{other.errorStateReached.load()} - , runningStateReached{other.runningStateReached.load()} - { - } + _conditionVariable.notify_one(); + + return future; + } + + template + auto PullUntil(std::chrono::time_point deadline) -> std::optional> + { + if (_shutdown) + { + return std::nullopt; + } + + const auto predicate = [this] { return !_shutdown && !_queue.empty(); }; - ImmovableMembers& operator=(ImmovableMembers&& other) noexcept + std::packaged_task task; + + { + std::unique_lock lock{_mutex}; + + if (!_conditionVariable.wait_until(lock, deadline, predicate)) { - if (this != &other) - { - allReceived = other.allReceived.load(); - stopRequested = other.stopRequested.load(); - errorStateReached = other.errorStateReached.load(); - runningStateReached = other.runningStateReached.load(); - } - - return *this; + return std::nullopt; } - std::atomic allReceived{false}; - std::atomic stopRequested{false}; - std::atomic errorStateReached{false}; - std::atomic runningStateReached{false}; - }; + task = std::move(_queue.front()); + _queue.pop(); + } - ImmovableMembers i{}; + return task; + } - std::string name; - uint8_t id; - std::set receivedIds; - std::promise allReceivedPromise; + void ShutDown() + { + _shutdown = true; + _conditionVariable.notify_all(); + } - std::promise errorStatePromise; - std::promise runningStatePromise; +private: + std::mutex _mutex; + std::condition_variable _conditionVariable; + std::queue> _queue; - bool simtimePassed{false}; - std::promise simtimePassedPromise; + std::atomic _shutdown = false; +}; - std::promise simTaskFinishedPromise; +// ===================================================================================================================== - bool allowInvalidLifeCycleOperationMode{false}; +struct IRunner +{ + virtual ~IRunner() = default; - TimeMode timeMode; - OperationMode lifeCycleOperationMode; - ParticipantState participantState{ParticipantState::Invalid}; + /// Sets up the internals of the runner. + virtual void SetUp() = 0; - void ResetReception() - { - receivedIds.clear(); - allReceivedPromise = std::promise{}; - i.allReceived = false; - } + /// Runs the orchestrated runner. + virtual void Main() = 0; - void AwaitCommunication() - { - auto futureStatus = allReceivedPromise.get_future().wait_for(communicationTimeout); - if (futureStatus != std::future_status::ready) - { - FAIL() << "Test Failure: Awaiting test communication timed out"; - } - } + /// Instructs the runner to stop 'normally'. + virtual void TriggerHalt() = 0; - void AwaitErrorState() - { - auto futureStatus = errorStatePromise.get_future().wait_for(communicationTimeout); - if (futureStatus != std::future_status::ready) - { - FAIL() << "Test Failure: Awaiting error state timed out"; - } - } + /// Instructs the runner to abort it's 'main loop' regardless of, e.g., the participant lifecycle. + virtual void TriggerAbort() = 0; +}; - void Stop() - { - i.stopRequested = true; - } - }; +// ===================================================================================================================== + +struct IOrchestratorHandle +{ + virtual ~IOrchestratorHandle() = default; + + /// Create a participant with `name`. + virtual auto CreateParticipant(std::string_view name) -> std::unique_ptr = 0; + + /// Notifies the orchestrator that `name` has received `message`. + virtual void NotifyReceived(std::string_view name, SK::ByteSpan message) = 0; + + /// Notifies the orchestrator that `name` has changed its own participant state `participantState`. + virtual void NotifyParticipantStateChanged(std::string_view name, SK::ParticipantState participantState) = 0; + + /// Notifies the orchestrator that `name` has received a system state change to `systemState`. + virtual void NotifySystemStateChanged(std::string_view name, SK::SystemState systemState) = 0; + + /// Notifies the orchestrator that `name` has completed its main loop. + virtual void NotifyMainLoopComplete(std::string_view name) = 0; + + /// Stop the simulation using 'normal' means. + virtual void TriggerHalt() = 0; - void SyncParticipantThread(TestParticipant& testParticipant) + /// Stop the simulation 'as soon as possible', ignoring any participant's lifecycle. + virtual void TriggerAbort() = 0; +}; + +// ===================================================================================================================== + +class RunnerBase +{ +public: + RunnerBase(IOrchestratorHandle& orchestrator, const std::string_view name) + : _orchestrator{&orchestrator} + , _name{name} { - std::shared_ptr config; - if (logging) - { - config = SilKit::Config::MakeParticipantConfigurationWithLoggingImpl(logLevel); - } - else - { - config = SilKit::Config::MakeEmptyParticipantConfigurationImpl(); - } + } - auto participant = SilKit::CreateParticipantImpl(config, testParticipant.name, _registryUri); - auto* logger = participant->GetLogger(); +protected: + void SetUpParticipant() + { + _participant = _orchestrator->CreateParticipant(_name); - ILifecycleService* lifecycleService{}; - if (testParticipant.allowInvalidLifeCycleOperationMode - || testParticipant.lifeCycleOperationMode != OperationMode::Invalid) - { - lifecycleService = participant->CreateLifecycleService({testParticipant.lifeCycleOperationMode}); - } + _systemMonitor = _participant->CreateSystemMonitor(); + _systemMonitor->AddSystemStateHandler(MakeSystemStateHandler()); + _systemMonitor->AddParticipantStatusHandler(MakeParticipantStatusHandler()); + } - auto participantInternal = dynamic_cast(participant.get()); - auto systemController = participantInternal->GetSystemController(); + void SetUpCommunication() + { + _publisher = _participant->CreateDataPublisher("Publisher", MakeRunnerPubSubSpec(), 0); + _subscriber = + _participant->CreateDataSubscriber("Subscriber", MakeRunnerPubSubSpec(), MakeRunnerDataMessageHandler()); + } - auto systemMonitor = participant->CreateSystemMonitor(); - systemMonitor->AddParticipantStatusHandler([&testParticipant, systemController, logger, - this](const Services::Orchestration::ParticipantStatus& status) { - if (status.participantName == testParticipant.name) - { - if (status.state == ParticipantState::Error && !testParticipant.i.errorStateReached) - { - testParticipant.i.errorStateReached = true; - testParticipant.errorStatePromise.set_value(); - - if (logging) - { - std::stringstream ss; - ss << "AbortSimulation due to ErrorState of participant \'" << testParticipant.name << "\'"; - logger->Info(ss.str()); - } - - systemController->AbortSimulation(); - } - else if (status.state == ParticipantState::Running && !testParticipant.i.runningStateReached) - { - testParticipant.i.runningStateReached = true; - testParticipant.runningStatePromise.set_value(); - } - } - }); + void SetUpLifecycle(SK::OperationMode operationMode) + { + _lifecycleService = _participant->CreateLifecycleService({operationMode}); + } + void SetUpSynchronized() + { + _timeSyncService = _lifecycleService->CreateTimeSyncService(); + _timeSyncService->SetSimulationStepHandler(MakeSimulationStepHandler(), 1ms); + } - ITimeSyncService* timeSyncService{}; - if (testParticipant.allowInvalidLifeCycleOperationMode - || testParticipant.lifeCycleOperationMode != OperationMode::Invalid) +protected: + void MainFreerunning() const + { + auto finalParticipantState = _lifecycleService->StartLifecycle(); + + while (!_aborted) { - timeSyncService = lifecycleService->CreateTimeSyncService(); - } + PublishMessage(); - SilKit::Services::PubSub::PubSubSpec dataSpec{topic, mediaType}; - SilKit::Services::PubSub::PubSubSpec matchingDataSpec{topic, mediaType}; - auto publisher = participant->CreateDataPublisher("TestPublisher", dataSpec, 0); - participant->CreateDataSubscriber( - "TestSubscriber", matchingDataSpec, - [&testParticipant](IDataSubscriber* /*subscriber*/, const DataMessageEvent& dataMessageEvent) { - if (!testParticipant.i.allReceived) - { - auto participantId = dataMessageEvent.data[0]; - if (participantId != testParticipant.id) - { - testParticipant.receivedIds.insert(dataMessageEvent.data[0]); - // No self delivery: Expect expectedReceptions-1 receptions - if (testParticipant.receivedIds.size() == expectedReceptions - 1) - { - testParticipant.i.allReceived = true; - testParticipant.allReceivedPromise.set_value(); - } - } - } - }); + const auto futureState = finalParticipantState.wait_for(ASYNC_DELAY_BETWEEN_PUBLICATION); - timeSyncService->SetSimulationStepHandler( - [lifecycleService, logger, &testParticipant, publisher, this](std::chrono::nanoseconds now, - std::chrono::nanoseconds /*duration*/) { - publisher->Publish(std::vector{testParticipant.id}); - std::stringstream ss; - ss << "now=" << now.count() / 1e9 << "s"; - logger->Info(ss.str()); - if (!testParticipant.simtimePassed && now > _simtimeToPass) + if (futureState == std::future_status::deferred) { - testParticipant.simtimePassed = true; - testParticipant.simtimePassedPromise.set_value(); + std::abort(); } - if (testParticipant.i.stopRequested) + + if (futureState == std::future_status::ready) { - testParticipant.i.stopRequested = false; - lifecycleService->Stop("End Test"); + break; } - }, - 1s); + } - if (testParticipant.lifeCycleOperationMode != OperationMode::Invalid) + if (!_aborted) { - auto finalStateFuture = lifecycleService->StartLifecycle(); - finalStateFuture.wait_for(TEST_TIMEOUT); + finalParticipantState.get(); } + + _orchestrator->NotifyMainLoopComplete(_name); } - void AsyncParticipantThread(TestParticipant& testParticipant) + void MainSynchronized() const { - std::shared_ptr config; - if (logging) - { - config = SilKit::Config::MakeParticipantConfigurationWithLoggingImpl(logLevel); - } - else - { - config = SilKit::Config::MakeEmptyParticipantConfigurationImpl(); - } - - auto participant = SilKit::CreateParticipantImpl(config, testParticipant.name, _registryUri); - auto* logger = participant->GetLogger(); + auto finalParticipantState = _lifecycleService->StartLifecycle(); - ILifecycleService* lifecycleService{}; - if (testParticipant.allowInvalidLifeCycleOperationMode - || testParticipant.lifeCycleOperationMode != OperationMode::Invalid) + while (!_aborted) { - lifecycleService = participant->CreateLifecycleService({testParticipant.lifeCycleOperationMode}); - } + const auto futureState = finalParticipantState.wait_for(ASYNC_DELAY_BETWEEN_PUBLICATION); - auto participantInternal = dynamic_cast(participant.get()); - auto systemController = participantInternal->GetSystemController(); - auto systemMonitor = participant->CreateSystemMonitor(); - systemMonitor->AddParticipantStatusHandler([&testParticipant, systemController, logger, - this](const Services::Orchestration::ParticipantStatus& status) { - if (status.participantName == testParticipant.name) + if (futureState == std::future_status::deferred) { - if (status.state == ParticipantState::Error && !testParticipant.i.errorStateReached) - { - // We also set the runningStatePromise to skip waiting for this - testParticipant.runningStatePromise.set_value(); - - testParticipant.i.errorStateReached = true; - testParticipant.errorStatePromise.set_value(); - if (logging) - { - std::stringstream ss; - ss << "AbortSimulation due to ErrorState of participant \'" << testParticipant.name << "\'"; - logger->Info(ss.str()); - } - systemController->AbortSimulation(); - } - else if (status.state == ParticipantState::Running && !testParticipant.i.runningStateReached) - { - testParticipant.i.runningStateReached = true; - testParticipant.runningStatePromise.set_value(); - } + std::abort(); } - }); - SilKit::Services::PubSub::PubSubSpec dataSpec{topic, mediaType}; - SilKit::Services::PubSub::PubSubSpec matchingDataSpec{topic, mediaType}; - auto publisher = participant->CreateDataPublisher("TestPublisher", dataSpec, 0); - participant->CreateDataSubscriber( - "TestSubscriber", matchingDataSpec, - [&testParticipant](IDataSubscriber* /*subscriber*/, const DataMessageEvent& dataMessageEvent) { - if (!testParticipant.i.allReceived) + if (futureState == std::future_status::ready) { - auto participantId = dataMessageEvent.data[0]; - if (participantId != testParticipant.id) - { - testParticipant.receivedIds.insert(dataMessageEvent.data[0]); - // No self delivery: Expect expectedReceptions-1 receptions - if (testParticipant.receivedIds.size() == expectedReceptions - 1) - { - testParticipant.i.allReceived = true; - testParticipant.allReceivedPromise.set_value(); - } - } + break; } - }); + } + + if (!_aborted) + { + finalParticipantState.get(); + } + + _orchestrator->NotifyMainLoopComplete(_name); + } + +protected: + void PublishMessage() const + { + auto message = std::vector(_name.begin(), _name.end()); + + _publisher->Publish(message); + } + + void HaltLifecycle() const + { + _lifecycleService->Stop("Halt"); + } + + void AbortNow() + { + _aborted = true; + } - auto runTask = [&testParticipant, publisher]() { - while (!testParticipant.i.stopRequested) +private: + [[nodiscard]] static auto MakeRunnerPubSubSpec() -> SK::PubSubSpec + { + return SK::PubSubSpec{PUBSUB_TOPIC, PUBSUB_MEDIA_TYPE}; + } + + [[nodiscard]] auto MakeRunnerDataMessageHandler() const -> SK::DataMessageHandler + { + return [this](auto, const SK::DataMessageEvent& event) { _orchestrator->NotifyReceived(_name, event.data); }; + } + + [[nodiscard]] auto MakeSystemStateHandler() const -> SK::SystemStateHandler + { + return [this](SK::SystemState systemState) { _orchestrator->NotifySystemStateChanged(_name, systemState); }; + } + + [[nodiscard]] auto MakeParticipantStatusHandler() const -> SK::ParticipantStatusHandler + { + return [this](const SK::ParticipantStatus& participantStatus) { + if (participantStatus.participantName != _name) { - publisher->Publish(std::vector{testParticipant.id}); - std::this_thread::sleep_for(asyncDelayBetweenPublication); + return; } - testParticipant.simTaskFinishedPromise.set_value(); + + _orchestrator->NotifyParticipantStateChanged(_name, participantStatus.state); }; + } - if (testParticipant.lifeCycleOperationMode != OperationMode::Invalid) - { - auto finalStateFuture = lifecycleService->StartLifecycle(); + [[nodiscard]] auto MakeSimulationStepHandler() const -> SK::SimulationStepHandler + { + return [this](auto, auto) { PublishMessage(); }; + } - if (!testParticipant.i.errorStateReached) - { - // Wait for ParticipantState::Running - auto runningStateFutureStatus = - testParticipant.runningStatePromise.get_future().wait_for(communicationTimeout); - if (runningStateFutureStatus != std::future_status::ready) - { - FAIL() << "Test Failure: Awaiting running state timed out"; - } - } +protected: + // initialized in the constructor - // Run the task - std::thread runTaskThread{runTask}; - runTaskThread.detach(); + IOrchestratorHandle* _orchestrator; + std::string _name; + std::atomic _aborted = false; - // Wait for task to have received a stop request - auto simTaskFinishedFuture = testParticipant.simTaskFinishedPromise.get_future(); - (void)simTaskFinishedFuture.wait_for(TEST_TIMEOUT); + // initialized in SetUpParticipant - // Stop the lifecycle - lifecycleService->Stop("End Test"); + std::unique_ptr _participant; + SK::ISystemMonitor* _systemMonitor = nullptr; + SK::IDataPublisher* _publisher = nullptr; + SK::IDataSubscriber* _subscriber = nullptr; - (void)finalStateFuture.wait_for(TEST_TIMEOUT); + // initialized in SetUpLifecycle - if (runTaskThread.joinable()) - { - runTaskThread.join(); - } - } - else - { - runTask(); - } + SK::ILifecycleService* _lifecycleService = nullptr; + + // initialized in SetUpSynchronized + + SK::ITimeSyncService* _timeSyncService = nullptr; +}; + +// ===================================================================================================================== + +class IgnorantFreerunning final + : public IRunner + , RunnerBase +{ + std::atomic _running = true; + +public: + using RunnerBase::RunnerBase; + +private: // IRunner + void SetUp() override + { + SetUpParticipant(); + SetUpCommunication(); } - void SystemControllerParticipantThread(const std::vector& required) + void Main() override { - std::shared_ptr config; - if (logging) + while (_running && !_aborted) { - config = SilKit::Config::MakeParticipantConfigurationWithLoggingImpl(logLevel); - } - else - { - config = SilKit::Config::MakeEmptyParticipantConfigurationImpl(); + PublishMessage(); + std::this_thread::sleep_for(ASYNC_DELAY_BETWEEN_PUBLICATION); } - auto systemControllerParticipant = - SilKit::CreateParticipantImpl(config, systemControllerParticipantName, _registryUri); + _orchestrator->NotifyMainLoopComplete(_name); + } + + void TriggerHalt() override + { + _running = false; + } - auto participantInternal = dynamic_cast(systemControllerParticipant.get()); - auto systemController = participantInternal->GetSystemController(); + void TriggerAbort() override + { + AbortNow(); + } +}; - auto* logger = systemControllerParticipant->GetLogger(); +class CoordinatedSynchronized final + : public IRunner + , RunnerBase +{ +public: + using RunnerBase::RunnerBase; - auto systemMonitor = systemControllerParticipant->CreateSystemMonitor(); +private: // IRunner + void SetUp() override + { + SetUpParticipant(); + SetUpCommunication(); + SetUpLifecycle(SK::OperationMode::Coordinated); + SetUpSynchronized(); + } - systemMonitor->AddParticipantStatusHandler([this, logger](ParticipantStatus newStatus) { - if (logging) - { - std::stringstream ss; - ss << "New ParticipantState of " << newStatus.participantName << ": " << newStatus.state - << "; Reason: " << newStatus.enterReason; - logger->Info(ss.str()); - } - }); + void Main() override + { + MainSynchronized(); + } - systemMonitor->AddSystemStateHandler([this, logger, systemController](SystemState newState) { - if (logging) - { - std::stringstream ss; - ss << "New SystemState " << newState; - logger->Info(ss.str()); - } - switch (newState) - { - case SystemState::Error: - if (verbose) - { - std::cout << "SystemState::Error -> Aborting simulation" << std ::endl; - } - if (logging) - { - logger->Info("Aborting simulation due to SystemState::Error"); - } - systemController->AbortSimulation(); - break; + void TriggerHalt() override + { + HaltLifecycle(); + } - case SystemState::Running: - break; + void TriggerAbort() override + { + AbortNow(); + } +}; - default: - break; - } - }); +class CoordinatedFreerunning final + : public IRunner + , RunnerBase +{ +public: + using RunnerBase::RunnerBase; - systemController->SetWorkflowConfiguration({required}); +private: // IRunner + void SetUp() override + { + SetUpParticipant(); + SetUpCommunication(); + SetUpLifecycle(SK::OperationMode::Coordinated); + } - ILifecycleService* systemControllerLifecycleService = systemControllerParticipant->CreateLifecycleService( - {SilKit::Services::Orchestration::OperationMode::Coordinated}); + void Main() override + { + MainFreerunning(); + } - auto finalState = systemControllerLifecycleService->StartLifecycle(); + void TriggerHalt() override + { + HaltLifecycle(); + } - std::promise abortThreadDone{}; - auto waitForAbortTask = [&abortThreadDone, systemController, logger, this]() { - while (!abortSystemControllerRequested) - { - std::this_thread::sleep_for(1ms); - } + void TriggerAbort() override + { + AbortNow(); + } +}; - if (logging) - { - logger->Info("AbortSimulation requested"); - } - systemController->AbortSimulation(); - abortThreadDone.set_value(); - }; - std::thread abortThread{waitForAbortTask}; - abortThread.detach(); - abortThreadDone.get_future().wait_for(TEST_TIMEOUT); - if (abortThread.joinable()) - { - abortThread.join(); - } - abortSystemControllerRequested = false; +class AutonomousSynchronized final + : public IRunner + , RunnerBase +{ +public: + using RunnerBase::RunnerBase; - finalState.wait_for(TEST_TIMEOUT); +private: // IRunner + void SetUp() override + { + SetUpParticipant(); + SetUpCommunication(); + SetUpLifecycle(SK::OperationMode::Autonomous); + SetUpSynchronized(); } - void RunSystemController(const std::vector& requiredParticipants) + void Main() override { - if (!requiredParticipants.empty()) - { - std::vector required{}; - for (auto&& p : requiredParticipants) - { - required.push_back(p); - } - required.push_back(systemControllerParticipantName); + MainSynchronized(); + } - participantThread_SystemController = - ParticipantThread{[this, required] { SystemControllerParticipantThread(required); }}; - } + void TriggerHalt() override + { + HaltLifecycle(); } - void AbortSystemController() + void TriggerAbort() override { - abortSystemControllerRequested = true; - participantThread_SystemController.shutdownFuture.wait_for(TEST_TIMEOUT); - if (participantThread_SystemController.thread.joinable()) + AbortNow(); + } +}; + +class AutonomousFreerunning final + : public IRunner + , RunnerBase +{ +public: + using RunnerBase::RunnerBase; + +private: // IRunner + void SetUp() override + { + SetUpParticipant(); + SetUpCommunication(); + SetUpLifecycle(SK::OperationMode::Autonomous); + } + + void Main() override + { + MainFreerunning(); + } + + void TriggerHalt() override + { + HaltLifecycle(); + } + + void TriggerAbort() override + { + AbortNow(); + } +}; + +class SystemController final + : public IRunner + , RunnerBase +{ + // initialized in the constructor + + std::vector _requiredParticipantNames; + std::atomic _running = true; + + // initialized in SetUp + + SK::ISystemController* _systemController = nullptr; + +public: + SystemController(IOrchestratorHandle& orchestrator, std::vector requiredParticipantNames) + : RunnerBase(orchestrator, SYSTEM_CONTROLLER_PARTICIPANT_NAME) + , _requiredParticipantNames{std::move(requiredParticipantNames)} + { + } + +private: // IRunner + void SetUp() override + { + SetUpParticipant(); + + _systemController = SK::CreateSystemController(_participant.get()); + _systemController->SetWorkflowConfiguration({_requiredParticipantNames}); + } + + void Main() override + { + while (_running && !_aborted) { - participantThread_SystemController.thread.join(); + std::this_thread::sleep_for(ASYNC_DELAY_BETWEEN_PUBLICATION); } } - void RunRegistry() + void TriggerHalt() override + { + _running = false; + } + + void TriggerAbort() override + { + AbortNow(); + } +}; + +// ===================================================================================================================== + +class Orchestrator final : public IOrchestratorHandle +{ +public: + struct RunResult + { + bool communicatedWithEveryone = false; + bool seenAnyParticipantStateError = false; + bool enteredSystemStateError = false; + bool aborted = false; + }; + +private: + struct ParticipantRunner + { + std::unique_ptr runner; + std::string name; + CoordinationMode coordinationMode; + }; + +public: + void StartRegistry() + { + _registry = MakeRegistry(); + _registryUri = _registry->StartListening("silkit://127.0.0.1:0"); + } + + template + void AddOptional(std::string_view name) { - std::shared_ptr config; - config = SilKit::Config::MakeEmptyParticipantConfiguration(); - registry = SilKit::Vendor::Vector::CreateSilKitRegistry(config); - _registryUri = registry->StartListening("silkit://localhost:0"); + AddParticipantRunner({ + std::make_unique(*this, name), + std::string{name}, + CoordinationMode::Optional, + }); } - void StopRegistry() + template + void AddRequired(std::string_view name) { - registry.reset(); + AddParticipantRunner(ParticipantRunner{ + std::make_unique(*this, name), + std::string{name}, + CoordinationMode::Required, + }); } - void RunParticipants(std::list& participants) + void AddAutomatic(const ParticipantMode mode, std::string_view name) { - for (auto& p : participants) + switch (mode) { - if (p.timeMode == TimeMode::Async) - { - if (p.lifeCycleOperationMode == OperationMode::Invalid) - { - _participantThreads_Async_Invalid.emplace_back([this, &p] { AsyncParticipantThread(p); }); - } - else if (p.lifeCycleOperationMode == OperationMode::Autonomous) - { - _participantThreads_Async_Autonomous.emplace_back([this, &p] { AsyncParticipantThread(p); }); - } - else if (p.lifeCycleOperationMode == OperationMode::Coordinated) - { - _participantThreads_Async_Coordinated.emplace_back([this, &p] { AsyncParticipantThread(p); }); - } - } - else if (p.timeMode == TimeMode::Sync) - { - if (p.lifeCycleOperationMode == OperationMode::Invalid) - { - _participantThreads_Sync_Invalid.emplace_back([this, &p] { SyncParticipantThread(p); }); - } - else if (p.lifeCycleOperationMode == OperationMode::Autonomous) - { - participantThreads_Sync_Autonomous.emplace_back([this, &p] { SyncParticipantThread(p); }); - } - else if (p.lifeCycleOperationMode == OperationMode::Coordinated) - { - _participantThreads_Sync_Coordinated.emplace_back([this, &p] { SyncParticipantThread(p); }); - } - } + case ParticipantMode::AutonomousFreerunning: + AddOptional(name); + break; + case ParticipantMode::AutonomousSynchronized: + AddOptional(name); + break; + case ParticipantMode::CoordinatedFreerunning: + AddRequired(name); + break; + case ParticipantMode::CoordinatedSynchronized: + AddRequired(name); + break; + case ParticipantMode::IgnorantFreerunning: + AddOptional(name); } } - struct ParticipantThread + auto Run() -> RunResult { - struct ThreadMain + CreateAndAddSystemControllerRunnerIfNecessary(); + + for (const auto& runner : _runners) { - void operator()(std::function f) - { - try - { - f(); - } - catch (...) - { - promise.set_exception(std::current_exception()); - return; - } - - promise.set_value(); - } + auto future = std::async(std::launch::async, [self = runner] { + self->SetUp(); + self->Main(); + }); - std::promise promise; - }; + _runnerFutures.emplace_back(std::move(future)); + } - ParticipantThread() {} - ParticipantThread(ParticipantThread&&) = default; - ParticipantThread(const ParticipantThread&) = delete; + WorkFor(30s); - ParticipantThread& operator=(ParticipantThread&&) = default; - ParticipantThread& operator=(const ParticipantThread&) = delete; + WaitForAllRunnersDoneOrCrash(10s); - template - ParticipantThread(T&& t) + for (auto& future : _runnerFutures) { - ThreadMain threadMain; - shutdownFuture = threadMain.promise.get_future(); - thread = std::thread{std::move(threadMain), std::forward(t)}; + future.get(); } - ~ParticipantThread() + return _runResult; + } + +private: + void WorkFor(const std::chrono::nanoseconds duration) + { + const auto deadline = std::chrono::steady_clock::now() + duration; + + while (true) { - if (shutdownFuture.valid()) + auto task = _workQueue.PullUntil(deadline); + + if (!task.has_value()) { - try - { - (void)shutdownFuture.wait_for(TEST_TIMEOUT); - } - catch (...) - { - } + return; } - if (thread.joinable()) + + (*task)(); + } + } + + /// Returns the number of runners that have not stopped yet. + auto CountActiveRunners() const -> std::size_t + { + std::size_t count = 0; + + for (auto& future : _runnerFutures) + { + count += future.wait_for(1ns) != std::future_status::ready; + } + + return count; + } + + /// Waits `duration` for all runners to stop. Crashes the process if any runner does not exit in time. + void WaitForAllRunnersDoneOrCrash(const std::chrono::nanoseconds duration) const + { + const auto deadline = std::chrono::steady_clock::now() + duration; + + std::size_t count; + + do + { + count = CountActiveRunners(); + + if (count == 0) { - try - { - thread.join(); - } - catch (...) - { - } + return; } + } while (std::chrono::steady_clock::now() < deadline); + + // if any runner refuses to stop, we can't do anything sensible but crash the process + std::cerr << "FATAL ERROR: " << count << " runners are refusing to stop" << std::endl; + std::abort(); + } + +private: + void AddParticipantRunner(ParticipantRunner participantRunner) + { + auto runner = participantRunner.runner.get(); + + _participantRunners.emplace_back(std::move(participantRunner)); + _runners.emplace_back(runner); + } + + void CreateAndAddSystemControllerRunnerIfNecessary() + { + const auto requiredParticipantNames = CollectRequiredParticipantNames(); + + if (requiredParticipantNames.empty()) + { + std::cerr << "No system controller necessary, as there are no required participants" << std::endl; + return; } - std::thread thread; - std::future shutdownFuture; - }; + _systemControllerRunner = std::make_unique(*this, requiredParticipantNames); + _runners.emplace_back(_systemControllerRunner.get()); + } - void JoinParticipantThreads(std::vector& threads) + [[nodiscard]] auto CollectRequiredParticipantNames() const -> std::vector { - for (auto&& thread : threads) + std::vector requiredParticipantNames; + + for (const auto& runner : _participantRunners) { - if (thread.shutdownFuture.valid()) + if (runner.coordinationMode == CoordinationMode::Required) { - auto&& status = thread.shutdownFuture.wait_for(TEST_TIMEOUT); - ASSERT_EQ(status, std::future_status::ready); // signal failure - thread.shutdownFuture.get(); + requiredParticipantNames.push_back(runner.name); } } - threads.clear(); + + return requiredParticipantNames; } - void SetUp() override +private: // IOrchestratorHandle + auto CreateParticipant(std::string_view name) -> std::unique_ptr override { - globalParticipantIndex = 0; + return MakeParticipant(std::string{name}, _registryUri); } -protected: - std::vector requiredParticipantNames{}; - std::unique_ptr registry; + void NotifyReceived(std::string_view name, SK::ByteSpan message) override + { + auto work = [this, name = std::string{name}, message = ToStdVector(message)]() mutable { + return HandleNotifyReceived(name, std::move(message)); + }; - ParticipantThread participantThread_SystemController; + _workQueue.Push(std::move(work)); + } - std::vector _participantThreads_Sync_Invalid; - std::vector participantThreads_Sync_Autonomous; - std::vector _participantThreads_Sync_Coordinated; + void NotifySystemStateChanged(std::string_view name, SK::SystemState systemState) override + { + auto work = [this, name = std::string{name}, systemState] { + return HandleNotifySystemStateChanged(name, systemState); + }; - std::vector _participantThreads_Async_Invalid; - std::vector _participantThreads_Async_Autonomous; - std::vector _participantThreads_Async_Coordinated; + _workQueue.Push(std::move(work)); + } - std::chrono::seconds _simtimeToPass{3s}; + void NotifyParticipantStateChanged(std::string_view name, SK::ParticipantState participantState) override + { + auto work = [this, name = std::string{name}, participantState] { + return HandleNotifyParticipantStateChanged(name, participantState); + }; - const bool verbose = true; - const bool logging = false; - const Services::Logging::Level logLevel = Services::Logging::Level::Trace; + _workQueue.Push(std::move(work)); + } - std::string _registryUri{"not yet defined"}; + void NotifyMainLoopComplete(std::string_view name) override + { + auto work = [this, name = std::string{name}] { return HandleNotifyMainLoopComplete(name); }; -}; + _workQueue.Push(std::move(work)); + } -// -------------------------- -// Disallowed modes + void TriggerHalt() override + { + _workQueue.Push([this] { return HandleTriggerHalt(); }); + } -// Time Lifecycle Required -// --------------------------------- -// Async Coordinated NonReq -> Disallowed: Coordinated must be required -// Sync Coordinated NonReq -> Disallowed: Coordinated must be required -// Sync Invalid Req/NonReq -> Disallowed + void TriggerAbort() override + { + _workQueue.Push([this] { return HandleTriggerAbort(); }); + } -TEST_F(ITest_Internals_ParticipantModes, test_AsyncCoordinatedNonReq_disallowed) -{ - RunRegistry(); +private: + void HandleNotifyReceived(const std::string& name, std::vector message) + { + std::string other(message.begin(), message.end()); - // Participants - std::list testParticipants; - testParticipants.push_back({"AsyncCoordinated1", TimeMode::Async, OperationMode::Coordinated}); + _received[name].insert(other); - // Workflow configuration without "AsyncCoordinated1" - RunSystemController({"NoSuchParticipant"}); + _runResult.communicatedWithEveryone = std::all_of( + _received.begin(), _received.end(), [expectedSize = _participantRunners.size()](const auto& item) { + const auto& [_, senders] = item; + return senders.size() == expectedSize; + }); - RunParticipants(testParticipants); + if (_runResult.communicatedWithEveryone) + { + std::cerr + << "Halting because each participant has received at least one message from every other participant" + << std::endl; + TriggerHalt(); + } + } - // Await error state - for (auto& p : testParticipants) - p.AwaitErrorState(); + void HandleNotifyParticipantStateChanged(const std::string& name, SK::ParticipantState participantState) + { + std::cerr << std::quoted(name) << " reported participant state " << participantState << std::endl; - // Stop to exit the async task - for (auto& p : testParticipants) - p.Stop(); + switch (participantState) + { + case SK::ParticipantState::Error: + _runResult.seenAnyParticipantStateError = true; + TriggerAbort(); + break; + default: + break; + } + } - // Shutdown - JoinParticipantThreads(_participantThreads_Async_Coordinated); - AbortSystemController(); - StopRegistry(); -} + void HandleNotifySystemStateChanged(const std::string& name, SK::SystemState systemState) + { + std::cerr << std::quoted(name) << " reported system state " << systemState << std::endl; -TEST_F(ITest_Internals_ParticipantModes, test_SyncCoordinatedNonReq_disallowed) -{ - RunRegistry(); + switch (systemState) + { + case SK::SystemState::Error: + _runResult.enteredSystemStateError = true; + TriggerAbort(); + break; + case SK::SystemState::Shutdown: + TriggerHalt(); + break; + default: + break; + } + } - // Participants - std::list testParticipants; - testParticipants.push_back({"SyncCoordinated1", TimeMode::Sync, OperationMode::Coordinated}); + void HandleNotifyMainLoopComplete(const std::string& name) + { + _mainLoopCompleted.emplace(name); - // Workflow configuration without "AsyncCoordinated1" - RunSystemController({"NoSuchParticipant"}); + if (_mainLoopCompleted.size() == _participantRunners.size()) + { + std::cerr << "All runners have completed their main loop, shutting down work queue" << std::endl; + _workQueue.ShutDown(); + } + } - RunParticipants(testParticipants); + void HandleTriggerHalt() const + { + std::cerr << "DoHalt()" << std::endl; - // Await error state - for (auto& p : testParticipants) - p.AwaitErrorState(); + for (const auto& runner : _runners) + { + runner->TriggerHalt(); + } + } - // Stop to exit the async task - for (auto& p : testParticipants) - p.Stop(); + void HandleTriggerAbort() + { + std::cerr << "DoAbort()" << std::endl; - // Shutdown - JoinParticipantThreads(_participantThreads_Sync_Coordinated); - AbortSystemController(); - StopRegistry(); -} + _runResult.aborted = true; -TEST_F(ITest_Internals_ParticipantModes, test_SyncInvalid_disallowed) -{ - RunRegistry(); + for (const auto& runner : _runners) + { + runner->TriggerAbort(); + } + + _workQueue.ShutDown(); + } - // Participants - std::list testParticipants; - testParticipants.push_back({"SyncInvalid1", TimeMode::Sync, OperationMode::Invalid}); - testParticipants.front().allowInvalidLifeCycleOperationMode = true; +private: + static auto MakeRegistry() -> std::unique_ptr + { + auto config = SK::ParticipantConfigurationFromString(PARTICIPANT_CONFIGURATION); + return SK::CreateSilKitRegistry(config); + } - RunParticipants(testParticipants); - // Cannot create lifecycle service with OperationMode::Invalid - EXPECT_THROW(JoinParticipantThreads(_participantThreads_Sync_Invalid), SilKit::ConfigurationError); + static auto MakeParticipant(const std::string& name, + const std::string& registryUri) -> std::unique_ptr + { + const auto config = SK::ParticipantConfigurationFromString(PARTICIPANT_CONFIGURATION); + return SK::CreateParticipant(config, name, registryUri); + } - StopRegistry(); -} +private: + std::unique_ptr _registry; + std::string _registryUri; + std::vector _participantRunners; + std::unique_ptr _systemControllerRunner; -// -------------------------- -// Single participant flavors + std::vector _runners; + std::vector> _runnerFutures; -// Time Lifecycle Required -// --------------------------------- -// Async Invalid NonReq -// Async Autonomous NonReq -// Async Autonomous Req -// Async Coordinated NonReq -> Disallowed -// Async Coordinated Req + std::unordered_map> _received; + std::unordered_set _mainLoopCompleted; -// Sync Invalid Req/NonReq -> Disallowed -// Sync Autonomous NonReq -// Sync Autonomous Req -// Sync Coordinated Req -// Sync Coordinated NonReq -> Disallowed + RunResult _runResult; -TEST_F(ITest_Internals_ParticipantModes, test_AsyncInvalidNonReq) -{ - RunRegistry(); - - // Participants - std::list testParticipants; - testParticipants.push_back({"AsyncInvalid1", TimeMode::Async, OperationMode::Invalid}); - testParticipants.push_back({"AsyncInvalid2", TimeMode::Async, OperationMode::Invalid}); - testParticipants.push_back({"AsyncInvalid3", TimeMode::Async, OperationMode::Invalid}); - testParticipants.push_back({"AsyncInvalid4", TimeMode::Async, OperationMode::Invalid}); - expectedReceptions = testParticipants.size(); - - // Start - RunParticipants(testParticipants); - - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); - - // Async: Stop task - for (auto& p : testParticipants) - p.Stop(); - - // Shutdown - JoinParticipantThreads(_participantThreads_Async_Invalid); - StopRegistry(); -} + WorkQueue _workQueue; +}; + +// ===================================================================================================================== -TEST_F(ITest_Internals_ParticipantModes, test_AsyncAutonomousNonReq) +TEST(ITest_ParticipantModes, ValidCombinationsCommunicateAndStopCleanly) { - RunRegistry(); + std::vector> combinations; - // Participants - std::list testParticipants; - testParticipants.push_back({"AsyncAutonomous1", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"AsyncAutonomous2", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"AsyncAutonomous3", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"AsyncAutonomous4", TimeMode::Async, OperationMode::Autonomous}); - expectedReceptions = testParticipants.size(); + for (const auto a : ALL_RUNNER_TYPES) + { + for (const auto b : ALL_RUNNER_TYPES) + { + if (a == ParticipantMode::CoordinatedSynchronized && b == ParticipantMode::AutonomousSynchronized) + { + continue; + } - // Start - RunParticipants(testParticipants); + if (a == ParticipantMode::AutonomousSynchronized && b == ParticipantMode::CoordinatedSynchronized) + { + continue; + } - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + combinations.emplace_back(a, b); + } + } - // Stop - for (auto& p : testParticipants) - p.Stop(); + for (const auto& [a, b] : combinations) + { + std::cerr << "Combination: " << a << " + " << b << std::endl; - // Shutdown - JoinParticipantThreads(_participantThreads_Async_Autonomous); + Orchestrator orchestrator; - StopRegistry(); -} + orchestrator.StartRegistry(); -TEST_F(ITest_Internals_ParticipantModes, test_AsyncAutonomousReq) -{ - RunRegistry(); + orchestrator.AddAutomatic(a, "A1"); + orchestrator.AddAutomatic(a, "A2"); + orchestrator.AddAutomatic(a, "A3"); + orchestrator.AddAutomatic(a, "A4"); - // Participants - std::list testParticipants; - testParticipants.push_back({"AsyncAutonomous1", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"AsyncAutonomous2", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"AsyncAutonomous3", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"AsyncAutonomous4", TimeMode::Async, OperationMode::Autonomous}); - expectedReceptions = testParticipants.size(); + orchestrator.AddAutomatic(b, "B1"); + orchestrator.AddAutomatic(b, "B2"); + orchestrator.AddAutomatic(b, "B3"); + orchestrator.AddAutomatic(b, "B4"); - // Autonomous are required here, but have no effect since no coordinated participants are in the mix - std::vector required{}; - for (auto&& p : testParticipants) - { - required.push_back(p.name); + const auto result = orchestrator.Run(); + + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); } - RunSystemController(required); +} - // Start - RunParticipants(testParticipants); +TEST(ITest_ParticipantModes, OptionalIgnorantFreerunning) +{ + Orchestrator orchestrator; + + orchestrator.StartRegistry(); - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + orchestrator.AddOptional("O-IF-1"); + orchestrator.AddOptional("O-IF-2"); + orchestrator.AddOptional("O-IF-3"); + orchestrator.AddOptional("O-IF-4"); - // Stop - for (auto& p : testParticipants) - p.Stop(); + const auto result = orchestrator.Run(); - // Shutdown - JoinParticipantThreads(_participantThreads_Async_Autonomous); - AbortSystemController(); - StopRegistry(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); } -TEST_F(ITest_Internals_ParticipantModes, test_AsyncCoordinatedReq) +TEST(ITest_ParticipantModes, OptionalAutonomousFreerunning) { - RunRegistry(); + Orchestrator orchestrator; - // Participants - std::list testParticipants; - testParticipants.push_back({"AsyncCoordinated1", TimeMode::Async, OperationMode::Coordinated}); - testParticipants.push_back({"AsyncCoordinated2", TimeMode::Async, OperationMode::Coordinated}); - testParticipants.push_back({"AsyncCoordinated3", TimeMode::Async, OperationMode::Coordinated}); - testParticipants.push_back({"AsyncCoordinated4", TimeMode::Async, OperationMode::Coordinated}); - expectedReceptions = testParticipants.size(); + orchestrator.StartRegistry(); - // Coordinated are required - std::vector required{}; - for (auto&& p : testParticipants) - { - required.push_back(p.name); - } - RunSystemController(required); + orchestrator.AddOptional("O-AF-1"); + orchestrator.AddOptional("O-AF-2"); + orchestrator.AddOptional("O-AF-3"); + orchestrator.AddOptional("O-AF-4"); - // Start - RunParticipants(testParticipants); + const auto result = orchestrator.Run(); - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} - // Async: Stop task - for (auto& p : testParticipants) - p.Stop(); +TEST(ITest_ParticipantModes, OptionalAutonomousSynchronized) +{ + Orchestrator orchestrator; - // Stop Coordinated Lifecylce: One participant can stop all - testParticipants.front().Stop(); + orchestrator.StartRegistry(); - // Shutdown - JoinParticipantThreads(_participantThreads_Async_Coordinated); + orchestrator.AddOptional("O-AS-1"); + orchestrator.AddOptional("O-AS-2"); + orchestrator.AddOptional("O-AS-3"); + orchestrator.AddOptional("O-AS-4"); - AbortSystemController(); - StopRegistry(); -} + const auto result = orchestrator.Run(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} -TEST_F(ITest_Internals_ParticipantModes, test_SyncAutonomousNonReq) +TEST(ITest_ParticipantModes, RequiredIgnorantFreerunning) { - RunRegistry(); + Orchestrator orchestrator; - // Participants - std::list testParticipants; - testParticipants.push_back({"SyncAutonomous1", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous2", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous3", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous4", TimeMode::Sync, OperationMode::Autonomous}); - expectedReceptions = testParticipants.size(); + orchestrator.StartRegistry(); - // Start - RunParticipants(testParticipants); + orchestrator.AddRequired("R-IF-1"); + orchestrator.AddRequired("R-IF-2"); + orchestrator.AddRequired("R-IF-3"); + orchestrator.AddRequired("R-IF-4"); - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + const auto result = orchestrator.Run(); - // Stop - for (auto& p : testParticipants) - p.Stop(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} - // Shutdown - JoinParticipantThreads(participantThreads_Sync_Autonomous); +TEST(ITest_ParticipantModes, RequiredAutonomousFreerunning) +{ + Orchestrator orchestrator; - StopRegistry(); -} + orchestrator.StartRegistry(); + + orchestrator.AddRequired("R-AF-1"); + orchestrator.AddRequired("R-AF-2"); + orchestrator.AddRequired("R-AF-3"); + orchestrator.AddRequired("R-AF-4"); + + const auto result = orchestrator.Run(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} -TEST_F(ITest_Internals_ParticipantModes, test_SyncAutonomousReq) +TEST(ITest_ParticipantModes, RequiredAutonomousSynchronized) { - RunRegistry(); + Orchestrator orchestrator; - // Participants - std::list testParticipants; - testParticipants.push_back({"SyncAutonomous1", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous2", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous3", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous4", TimeMode::Sync, OperationMode::Autonomous}); - expectedReceptions = testParticipants.size(); + orchestrator.StartRegistry(); - // Autonomous are required here, but have no effect since no coordinated participants are in the mix - std::vector required{}; - for (auto&& p : testParticipants) - { - required.push_back(p.name); - } - RunSystemController(required); + orchestrator.AddRequired("R-AS-1"); + orchestrator.AddRequired("R-AS-2"); + orchestrator.AddRequired("R-AS-3"); + orchestrator.AddRequired("R-AS-4"); - // Start - RunParticipants(testParticipants); + const auto result = orchestrator.Run(); - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} - // Stop - for (auto& p : testParticipants) - p.Stop(); +TEST(ITest_ParticipantModes, RequiredCoordinatedFreerunning) +{ + Orchestrator orchestrator; - // Shutdown - JoinParticipantThreads(participantThreads_Sync_Autonomous); + orchestrator.StartRegistry(); - AbortSystemController(); - StopRegistry(); -} + orchestrator.AddRequired("R-CF-1"); + orchestrator.AddRequired("R-CF-2"); + orchestrator.AddRequired("R-CF-3"); + orchestrator.AddRequired("R-CF-4"); + const auto result = orchestrator.Run(); -TEST_F(ITest_Internals_ParticipantModes, test_SyncCoordinatedReq) + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} + +TEST(ITest_ParticipantModes, RequiredCoordinatedSynchronized) { - RunRegistry(); + Orchestrator orchestrator; - // Participants - std::list testParticipants; - testParticipants.push_back({"SyncCoordinated1", TimeMode::Sync, OperationMode::Coordinated}); - testParticipants.push_back({"SyncCoordinated2", TimeMode::Sync, OperationMode::Coordinated}); - testParticipants.push_back({"SyncCoordinated3", TimeMode::Sync, OperationMode::Coordinated}); - testParticipants.push_back({"SyncCoordinated4", TimeMode::Sync, OperationMode::Coordinated}); - expectedReceptions = testParticipants.size(); + orchestrator.StartRegistry(); - // Coordinated are required - std::vector required{}; - for (auto&& p : testParticipants) - { - required.push_back(p.name); - } - RunSystemController(required); + orchestrator.AddRequired("R-CS-1"); + orchestrator.AddRequired("R-CS-2"); + orchestrator.AddRequired("R-CS-3"); + orchestrator.AddRequired("R-CS-4"); - // Start - RunParticipants(testParticipants); + const auto result = orchestrator.Run(); - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} - // Stop Lifecylce - for (auto& p : testParticipants) - p.Stop(); +TEST(ITest_ParticipantModes, AutonomousFreerunningRequiredWorksWithCoordinatedSynchronized) +{ + Orchestrator orchestrator; - // Shutdown - JoinParticipantThreads(_participantThreads_Sync_Coordinated); + orchestrator.StartRegistry(); - AbortSystemController(); - StopRegistry(); -} + orchestrator.AddRequired("R-AF-1"); + orchestrator.AddRequired("R-AF-2"); + orchestrator.AddRequired("R-AF-3"); + orchestrator.AddRequired("R-AF-4"); -// ----------------- -// Mode combinations + orchestrator.AddRequired("R-CS-1"); + orchestrator.AddRequired("R-CS-2"); + orchestrator.AddRequired("R-CS-3"); + orchestrator.AddRequired("R-CS-4"); -TEST_F(ITest_Internals_ParticipantModes, test_AsyncAutonomousReq_with_SyncCoordinated) -{ - RunRegistry(); + const auto result = orchestrator.Run(); - // Participants - std::list testParticipants; - testParticipants.push_back({"SyncAutonomous1", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous2", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous3", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous4", TimeMode::Async, OperationMode::Autonomous}); - testParticipants.push_back({"SyncCoordinated1", TimeMode::Sync, OperationMode::Coordinated}); - testParticipants.push_back({"SyncCoordinated2", TimeMode::Sync, OperationMode::Coordinated}); - testParticipants.push_back({"SyncCoordinated3", TimeMode::Sync, OperationMode::Coordinated}); - testParticipants.push_back({"SyncCoordinated4", TimeMode::Sync, OperationMode::Coordinated}); - expectedReceptions = testParticipants.size(); + ASSERT_TRUE(result.communicatedWithEveryone); + ASSERT_FALSE(result.seenAnyParticipantStateError); + ASSERT_FALSE(result.enteredSystemStateError); + ASSERT_FALSE(result.aborted); +} - // All are required here - std::vector required{}; - for (auto&& p : testParticipants) - { - required.push_back(p.name); - } - RunSystemController(required); +TEST(ITest_ParticipantModes, OptionalCoordinatedFreerunningFails) +{ + Orchestrator orchestrator; - // Start - RunParticipants(testParticipants); + orchestrator.StartRegistry(); - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + orchestrator.AddOptional("O-CF-1"); - // Stop Lifecylce - for (auto& p : testParticipants) - p.Stop(); + // required dummy participant such that the system controller is started + orchestrator.AddRequired("R-IF-1"); - // Shutdown - JoinParticipantThreads(_participantThreads_Async_Autonomous); - JoinParticipantThreads(_participantThreads_Sync_Coordinated); + const auto result = orchestrator.Run(); - AbortSystemController(); - StopRegistry(); + ASSERT_TRUE(result.seenAnyParticipantStateError); + ASSERT_TRUE(result.aborted); } - -TEST_F(ITest_Internals_ParticipantModes, test_AsyncInvalid_with_SyncAutonomous) +TEST(ITest_ParticipantModes, OptionalCoordinatedSynchronizedFails) { - RunRegistry(); + Orchestrator orchestrator; - // Participants - std::list testParticipants; - testParticipants.push_back({"ASyncInvalid1", TimeMode::Async, OperationMode::Invalid}); - testParticipants.push_back({"ASyncInvalid2", TimeMode::Async, OperationMode::Invalid}); - testParticipants.push_back({"ASyncInvalid3", TimeMode::Async, OperationMode::Invalid}); - testParticipants.push_back({"ASyncInvalid4", TimeMode::Async, OperationMode::Invalid}); - testParticipants.push_back({"SyncAutonomous1", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous2", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous3", TimeMode::Sync, OperationMode::Autonomous}); - testParticipants.push_back({"SyncAutonomous4", TimeMode::Sync, OperationMode::Autonomous}); - expectedReceptions = testParticipants.size(); + orchestrator.StartRegistry(); - // Start - RunParticipants(testParticipants); + orchestrator.AddOptional("O-CS-1"); - // Await successful communication - for (auto& p : testParticipants) - p.AwaitCommunication(); + // required dummy participant such that the system controller is started + orchestrator.AddRequired("R-IF-1"); - // Stop - for (auto& p : testParticipants) - { - p.Stop(); - } - - // Shutdown - JoinParticipantThreads(_participantThreads_Async_Invalid); - JoinParticipantThreads(participantThreads_Sync_Autonomous); + const auto result = orchestrator.Run(); - StopRegistry(); + ASSERT_TRUE(result.seenAnyParticipantStateError); + ASSERT_TRUE(result.aborted); } -// ---------------- -// All combinations -// ---------------- - -TEST_F(ITest_Internals_ParticipantModes, test_Combinations) +TEST(ITest_ParticipantModes, CreateLifecycleServiceThrowsWhenOperationModeIsInvalid) { - RunRegistry(); + const auto participantConfiguration = SK::ParticipantConfigurationFromString(PARTICIPANT_CONFIGURATION); - std::vector timeModes = {TimeMode::Async, TimeMode::Sync}; - std::vector operationModes = {OperationMode::Invalid, OperationMode::Autonomous, - OperationMode::Coordinated}; + const auto registry = SK::CreateSilKitRegistry(participantConfiguration); + const auto registryUri = registry->StartListening("silkit://127.0.0.1:0"); - std::map timeModeNames = {{TimeMode::Async, "Async"}, {TimeMode::Sync, "Sync"}}; - std::map operationModesNames = {{OperationMode::Invalid, "Invalid"}, - {OperationMode::Autonomous, "Autonomous"}, - {OperationMode::Coordinated, "Coordinated"}}; + const auto participant = SK::CreateParticipant(participantConfiguration, "P", registryUri); - // These loops test (all but the first) combinations twice. - // This is intended as the order of which participant is started first is then tested as well. - for (auto p1_timeMode : timeModes) - { - for (auto p1_operationMode : operationModes) - { - for (auto p2_timeMode : timeModes) - { - for (auto p2_operationMode : operationModes) - { - if (verbose) - { - std::cout << "P1:" << timeModeNames[p1_timeMode] << " + " - << operationModesNames[p1_operationMode] << " P2:" << timeModeNames[p2_timeMode] - << " + " << operationModesNames[p2_operationMode]; - } - - if ((p1_timeMode == TimeMode::Sync && p1_operationMode == OperationMode::Invalid) - || (p2_timeMode == TimeMode::Sync && p2_operationMode == OperationMode::Invalid)) - { - std::cout << " -> Invalid combination (Sync+Invalid), skip" << std::endl; - continue; - } - - if ((p1_timeMode == TimeMode::Sync && p1_operationMode == OperationMode::Autonomous - && p2_timeMode == TimeMode::Sync && p2_operationMode == OperationMode::Coordinated) - || (p2_timeMode == TimeMode::Sync && p2_operationMode == OperationMode::Autonomous - && p1_timeMode == TimeMode::Sync && p1_operationMode == OperationMode::Coordinated)) - { - std::cout << " -> Invalid combination (Sync+Autonomous with Sync+Coordinated), skip" - << std::endl; - continue; - } - - // Participants - std::list testParticipants; - testParticipants.push_back({"P1A", p1_timeMode, p1_operationMode}); - testParticipants.push_back({"P1B", p1_timeMode, p1_operationMode}); - testParticipants.push_back({"P1C", p1_timeMode, p1_operationMode}); - testParticipants.push_back({"P1D", p1_timeMode, p1_operationMode}); - testParticipants.push_back({"P2A", p2_timeMode, p2_operationMode}); - testParticipants.push_back({"P2B", p2_timeMode, p2_operationMode}); - testParticipants.push_back({"P2C", p2_timeMode, p2_operationMode}); - testParticipants.push_back({"P2D", p2_timeMode, p2_operationMode}); - expectedReceptions = testParticipants.size(); - - // Required - std::vector required{}; - if (p1_operationMode == OperationMode::Coordinated) - { - required.push_back("P1A"); - required.push_back("P1B"); - required.push_back("P1C"); - required.push_back("P1D"); - } - if (p2_operationMode == OperationMode::Coordinated) - { - required.push_back("P2A"); - required.push_back("P2B"); - required.push_back("P2C"); - required.push_back("P2D"); - } - if (!required.empty()) - { - RunSystemController(required); - } - - if (verbose) - { - std::cout << " -> Run participants"; - } - - // Start - RunParticipants(testParticipants); - - if (verbose) - { - std::cout << " -> Await communication"; - } - - // Await successful communication - for (auto& p : testParticipants) - { - p.AwaitCommunication(); - } - - if (verbose) - { - std::cout << " -> Request Stop"; - } - - // Stop - for (auto& p : testParticipants) - { - p.Stop(); - } - - if (verbose) - { - std::cout << " -> Shutdown"; - } - - // Shutdown - if (!_participantThreads_Sync_Invalid.empty()) - { - JoinParticipantThreads(_participantThreads_Sync_Invalid); - } - if (!participantThreads_Sync_Autonomous.empty()) - { - JoinParticipantThreads(participantThreads_Sync_Autonomous); - } - if (!_participantThreads_Sync_Coordinated.empty()) - { - JoinParticipantThreads(_participantThreads_Sync_Coordinated); - } - if (!_participantThreads_Async_Invalid.empty()) - { - JoinParticipantThreads(_participantThreads_Async_Invalid); - } - if (!_participantThreads_Async_Autonomous.empty()) - { - JoinParticipantThreads(_participantThreads_Async_Autonomous); - } - if (!_participantThreads_Async_Coordinated.empty()) - { - JoinParticipantThreads(_participantThreads_Async_Coordinated); - } - if (!required.empty()) - { - AbortSystemController(); - } - - if (verbose) - { - std::cout << " -> Done" << std::endl; - } - } - } - } - } - StopRegistry(); + ASSERT_THROW(participant->CreateLifecycleService({SK::OperationMode::Invalid}), SilKit::ConfigurationError); } } // anonymous namespace