diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 4d0cac7087611..c9a01d07449e4 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -25,6 +25,7 @@ #include "Framework/DispatchPolicy.h" #include "Framework/DispatchControl.h" #include "Framework/DanglingContext.h" +#include "Framework/DriverInfo.h" #include "Framework/DomainInfoHeader.h" #include "Framework/DriverClient.h" #include "Framework/EndOfStreamContext.h" @@ -160,16 +161,60 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi std::function stateWatcher = [this, ®istry = mServiceRegistry](const fair::mq::State state) -> void { auto ref = ServiceRegistryRef{registry, ServiceRegistry::globalDeviceSalt()}; + auto controlKind = this->GetConfig()->GetPropertyAsString("control"); auto& deviceState = ref.get(); auto& control = ref.get(); auto& callbacks = ref.get(); - control.notifyDeviceState(fair::mq::GetStateName(state)); + auto stateName = fair::mq::GetStateName(state); + control.notifyDeviceState(stateName); callbacks.call(ServiceRegistryRef{ref}, (int)state); + LOG(detail) << "In state watcher callback " << stateName; + + // If the termination policy is not to wait, we simply ignore all + // user imposed state changes and keep running until we are done. + if (controlKind != "gui") { + return; + } + + static bool runningOnce = false; if (deviceState.nextFairMQState.empty() == false) { + LOG(detail) << "State change requested, changing state to " << deviceState.nextFairMQState.back(); auto state = deviceState.nextFairMQState.back(); - (void)this->ChangeState(state); + bool changed = this->ChangeState(state); + if (!changed) { + LOG(error) << "Failed to change state to " << state; + } deviceState.nextFairMQState.pop_back(); + } else if (state == fair::mq::State::Running && deviceState.nextFairMQState.empty()) { + LOGP(detail, "Device is running and no transition expected. We are done."); + deviceState.transitionHandling = TransitionHandlingState::NoTransition; + } else { + while (runningOnce && deviceState.nextFairMQState.empty() && this->NewStatePending() == false) { + LOG(detail) << "No state change requested, waiting for next state change " << this->NewStatePending(); + if (stateName == "EXITING") { + // Send ctrl c to ourselves. To bad FairMQ does not seem to exit when + // reaching the EXITING state. + kill(getpid(), SIGTERM); + return; + } + uv_run(deviceState.loop, UV_RUN_ONCE); + LOG(detail) << "Woke up from event loop"; + } + if (runningOnce && deviceState.nextFairMQState.empty() == false) { + LOG(detail) << "State change requested, changing state to " << deviceState.nextFairMQState.back(); + auto state = deviceState.nextFairMQState.back(); + bool changed = this->ChangeState(state); + if (!changed) { + LOG(error) << "Failed to change state to " << state; + } + deviceState.nextFairMQState.pop_back(); + } + LOG(detail) << "Exiting callback for state " << state; + } + if (runningOnce == false && state == fair::mq::State::Running) { + LOG(detail) << "First iteration, next time we start the event loop"; + runningOnce = true; } }; diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index d4ed77b9a004e..91f011e61cda3 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -15,6 +15,8 @@ #include "Framework/ServiceRegistry.h" #include "Framework/DeviceSpec.h" #include "DriverClientContext.h" +#include "Framework/RawDeviceService.h" +#include "Device.h" #include "DPLWebSocket.h" #include #include @@ -134,6 +136,28 @@ void on_connect(uv_connect_t* connection, int status) state.nextFairMQState.emplace_back("STOP"); }); + client->observe("/shutdown", [ref = context->ref](std::string_view) { + auto currentStateName = ref.get().device()->GetCurrentStateName(); + LOGP(info, "Received shutdown request while in {}", currentStateName); + + auto& state = ref.get(); + state.nextFairMQState.emplace_back("END"); + if (currentStateName == "IDLE") { + return; + } + state.nextFairMQState.emplace_back("AUTO"); + state.nextFairMQState.emplace_back("RESET DEVICE"); + if (currentStateName == "DEVICE READY") { + return; + } + state.nextFairMQState.emplace_back("AUTO"); + state.nextFairMQState.emplace_back("RESET TASK"); + if (currentStateName == "READY") { + return; + } + state.nextFairMQState.emplace_back("STOP"); + }); + client->observe("/trace", [ref = context->ref](std::string_view cmd) { auto& state = ref.get(); static constexpr int prefixSize = std::string_view{"/trace "}.size(); diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index f31629d90416b..f89b5f95693a6 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -29,6 +29,7 @@ #include "Framework/DeviceMetricsInfo.h" #include "Framework/DeviceMetricsHelper.h" #include "Framework/DeviceConfigInfo.h" +#include "Framework/DeviceController.h" #include "Framework/DeviceSpec.h" #include "Framework/DeviceState.h" #include "Framework/DeviceConfig.h" @@ -2068,7 +2069,17 @@ int runStateMachine(DataProcessorSpecs const& workflow, // We send SIGCONT to make sure stopped children are resumed killChildren(infos, SIGCONT); // We send SIGTERM to make sure we do the STOP transition in FairMQ - killChildren(infos, SIGTERM); + if (driverInfo.processingPolicies.termination == TerminationPolicy::WAIT) { + for (size_t di = 0; di < infos.size(); ++di) { + auto& info = infos[di]; + auto& control = controls[di]; + if (info.active == true) { + control.controller->write("/shutdown", strlen("/shutdown")); + } + } + } else { + killChildren(infos, SIGTERM); + } // We have a timer to send SIGUSR1 to make sure we advance all devices // in a timely manner. force_step_timer.data = &infos; diff --git a/Framework/GUISupport/src/FrameworkGUIDebugger.cxx b/Framework/GUISupport/src/FrameworkGUIDebugger.cxx index 375afef1007f9..cc5180093ec94 100644 --- a/Framework/GUISupport/src/FrameworkGUIDebugger.cxx +++ b/Framework/GUISupport/src/FrameworkGUIDebugger.cxx @@ -1185,7 +1185,7 @@ std::function getGUIDebugger(std::vector const& infos, style.Colors[ImGuiCol_WindowBg] = ImVec4(0x1b / 255.f, 0x1b / 255.f, 0x1b / 255.f, 1.00f); style.Colors[ImGuiCol_ScrollbarBg] = ImVec4(0x1b / 255.f, 0x1b / 255.f, 0x1b / 255.f, 1.00f); - showTopologyNodeGraph(guiState, infos, devices, allStates, metadata, controls, metricsInfos); + showTopologyNodeGraph(guiState, infos, devices, allStates, metadata, controls, metricsInfos, driverInfo.processingPolicies.termination); AllMetricsStore metricsStore; diff --git a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx index a82753eb5af1f..9febb93858cb8 100644 --- a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx +++ b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx @@ -11,6 +11,7 @@ #include "FrameworkGUIDeviceInspector.h" #include "Framework/DataProcessorInfo.h" +#include "Framework/ProcessingPolicies.h" #include "Framework/DeviceControl.h" #include "Framework/DeviceSpec.h" @@ -251,7 +252,8 @@ void displayDeviceInspector(DeviceSpec const& spec, DataProcessingStates const& states, DeviceMetricsInfo const& metrics, DataProcessorInfo const& metadata, - DeviceControl& control) + DeviceControl& control, + TerminationPolicy terminationPolicy) { ImGui::Text("Name: %s", spec.name.c_str()); ImGui::Text("Executable: %s", metadata.executable.c_str()); @@ -340,12 +342,33 @@ void displayDeviceInspector(DeviceSpec const& spec, } if (control.requestedState > info.providedState) { - ImGui::Text(ICON_FA_CLOCK_O); + ImGui::TextUnformatted(ICON_FA_CLOCK_O "Requested transition in progress"); } else { + // We only allow navigation if the termination policy is "WAIT" + ImGui::BeginDisabled(terminationPolicy == TerminationPolicy::QUIT); if (ImGui::Button("Restart")) { control.requestedState = info.providedState + 1; control.controller->write("/restart", strlen("/restart")); } + if (info.deviceState == "RUNNING") { + ImGui::SameLine(); + if (ImGui::Button(ICON_FA_STOP)) { + control.requestedState = info.providedState + 1; + control.controller->write("/stop", strlen("/stop")); + } + } else if (info.deviceState == "READY") { + ImGui::SameLine(); + if (ImGui::Button(ICON_FA_PLAY)) { + control.requestedState = info.providedState + 1; + control.controller->write("/start", strlen("/start")); + } + ImGui::SameLine(); + if (ImGui::Button(ICON_FA_POWER_OFF)) { + control.requestedState = info.providedState + 1; + control.controller->write("/shutdown", strlen("/shutdown")); + } + } + ImGui::EndDisabled(); } } diff --git a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.h b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.h index 8dac3204ea3ab..b69c6e9286215 100644 --- a/Framework/GUISupport/src/FrameworkGUIDeviceInspector.h +++ b/Framework/GUISupport/src/FrameworkGUIDeviceInspector.h @@ -18,12 +18,15 @@ struct DeviceInfo; struct DeviceMetricsInfo; struct DataProcessorInfo; struct DataProcessingStates; +enum struct TerminationPolicy; namespace gui { /// Helper to display information about a device -void displayDeviceInspector(DeviceSpec const& spec, DeviceInfo const& info, DataProcessingStates const& states, DeviceMetricsInfo const& metrics, DataProcessorInfo const& metadata, DeviceControl& control); +void displayDeviceInspector(DeviceSpec const& spec, DeviceInfo const& info, DataProcessingStates const& states, + DeviceMetricsInfo const& metrics, DataProcessorInfo const& metadata, DeviceControl& control, + TerminationPolicy TerminationPolicy); } // namespace gui } // namespace o2::framework diff --git a/Framework/GUISupport/src/FrameworkGUIDevicesGraph.cxx b/Framework/GUISupport/src/FrameworkGUIDevicesGraph.cxx index 0b6ca07a9c86d..639122153e599 100644 --- a/Framework/GUISupport/src/FrameworkGUIDevicesGraph.cxx +++ b/Framework/GUISupport/src/FrameworkGUIDevicesGraph.cxx @@ -365,7 +365,8 @@ void showTopologyNodeGraph(WorkspaceGUIState& state, std::vector const& allStates, std::vector const& metadata, std::vector& controls, - std::vector const& metricsInfos) + std::vector const& metricsInfos, + enum TerminationPolicy terminationPolicy) { ImGui::SetNextWindowPos(ImVec2(0, 0), 0); if (state.bottomPaneVisible) { @@ -893,7 +894,7 @@ void showTopologyNodeGraph(WorkspaceGUIState& state, auto& metadatum = metadata[group.metadataId]; if (state.rightPaneVisible) { - gui::displayDeviceInspector(spec, info, states, metrics, metadatum, control); + gui::displayDeviceInspector(spec, info, states, metrics, metadatum, control, terminationPolicy); } } else { ImGui::TextWrapped("Select a node in the topology to display information about it"); diff --git a/Framework/GUISupport/src/FrameworkGUIDevicesGraph.h b/Framework/GUISupport/src/FrameworkGUIDevicesGraph.h index 7950f1893ebba..d53cdf26dd5a8 100644 --- a/Framework/GUISupport/src/FrameworkGUIDevicesGraph.h +++ b/Framework/GUISupport/src/FrameworkGUIDevicesGraph.h @@ -19,6 +19,11 @@ #include +namespace o2::framework +{ +enum struct TerminationPolicy; +} // namespace o2::framework + namespace o2::framework::gui { @@ -30,7 +35,8 @@ void showTopologyNodeGraph(WorkspaceGUIState& state, std::vector const& allStates, std::vector const& metadata, std::vector& controls, - std::vector const& metricsInfos); + std::vector const& metricsInfos, + TerminationPolicy terminationPolicy); } // namespace o2::framework::gui