Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Please consider the following formatting changes to #12529 #242

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DeviceControl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct DeviceControl {
DeviceController* controller = nullptr;
/// What kind of events should run with the TRACE level
int tracingFlags = 0;
/// What kind of log streams should be enabled
int logStreams = 0;
/// An incremental number to identify the device state
int requestedState = 0;
};
Expand Down
9 changes: 9 additions & 0 deletions Framework/Core/include/Framework/DeviceState.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ struct DeviceState {
DATA_CONNECTED = 1 << 19, // Data channel connected
};

enum LogStreams : int {
NO_LOG = 0,
DEVICE_LOG = 1 << 0, // Log for Data Processing Device activities.
COMPLETION_LOG = 1 << 1, // Log for the completion policy of the device.
MONITORING_SERVICE_LOG = 1 << 2, // Log for the monitoring service flushing.
};

std::vector<InputChannelInfo> inputChannelInfos;
StreamingState streaming = StreamingState::Streaming;
bool quitRequested = false;
Expand Down Expand Up @@ -93,6 +100,8 @@ struct DeviceState {
int loopReason = 0;
/// Bitmask of LoopReason to trace
int tracingFlags = 0;
/// Bitmask of log streams which are available
int logStreams = 0;
/// Stack of the severity, so that we can display only
/// the bits we are interested in.
std::vector<int> severityStack;
Expand Down
10 changes: 10 additions & 0 deletions Framework/Core/src/CompletionPolicyHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
#include "Framework/TimesliceIndex.h"
#include "Framework/TimingInfo.h"
#include "DecongestionService.h"
#include "Framework/Signpost.h"

#include <cassert>
#include <regex>

O2_DECLARE_DYNAMIC_LOG(completion);

namespace o2::framework
{

Expand Down Expand Up @@ -108,6 +111,8 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
{
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
assert(inputs.size() == specs.size());
O2_SIGNPOST_ID_GENERATE(sid, completion);
O2_SIGNPOST_START(completion, sid, "consumeWhenAll", "Completion policy invoked");

size_t si = 0;
bool missingSporadic = false;
Expand All @@ -116,15 +121,18 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
assert(si < specs.size());
auto& spec = specs[si++];
if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s due to missing input %lu", "Wait", si);
return CompletionPolicy::CompletionOp::Wait;
}
if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "Missing sporadic found for route index %lu", si);
missingSporadic = true;
}
if (input.header != nullptr && currentTimeslice == -1) {
auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
currentTimeslice = dph->startTime;
O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "currentTimeslice %lu from route index %lu", currentTimeslice, si);
}
}
}
Expand All @@ -134,8 +142,10 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;

if (missingSporadic && currentTimeslice >= oldestPossibleTimeslice) {
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu", "Retry", currentTimeslice, oldestPossibleTimeslice);
return CompletionPolicy::CompletionOp::Retry;
}
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu <= oldestPossibleTimeslice %lu", "Consume", currentTimeslice, oldestPossibleTimeslice);
return CompletionPolicy::CompletionOp::Consume;
};
return CompletionPolicy{name, matcher, callback};
Expand Down
40 changes: 26 additions & 14 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
#include "Framework/TMessageSerializer.h"
#include "Framework/InputRecord.h"
#include "Framework/InputSpan.h"
#if defined(__APPLE__) || defined(NDEBUG)
#define O2_SIGNPOST_IMPLEMENTATION
#endif
#include "Framework/Signpost.h"
#include "Framework/TimingHelpers.h"
#include "Framework/SourceInfoHeader.h"
Expand Down Expand Up @@ -80,6 +83,8 @@
#include <sstream>
#include <boost/property_tree/json_parser.hpp>

O2_DECLARE_DYNAMIC_LOG(device);

using namespace o2::framework;
using ConfigurationInterface = o2::configuration::ConfigurationInterface;
using DataHeader = o2::header::DataHeader;
Expand Down Expand Up @@ -274,21 +279,22 @@ struct PollerContext {
void on_socket_polled(uv_poll_t* poller, int status, int events)
{
auto* context = (PollerContext*)poller->data;
assert(context);
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
switch (events) {
case UV_READABLE: {
ZoneScopedN("socket readable event");
LOG(debug) << "socket polled UV_READABLE: " << context->name;
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
context->state->loopReason |= DeviceState::DATA_INCOMING;
} break;
case UV_WRITABLE: {
ZoneScopedN("socket writeable");
O2_SIGNPOST_END(device, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
if (context->read) {
LOG(debug) << "socket polled UV_CONNECT" << context->name;
O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
context->state->loopReason |= DeviceState::DATA_CONNECTED;
} else {
LOG(debug) << "socket polled UV_WRITABLE" << context->name;
O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
context->state->loopReason |= DeviceState::DATA_OUTGOING;
// If the socket is writable, fairmq will handle the rest, so we can stop polling and
// just wait for the disconnect.
Expand All @@ -297,12 +303,10 @@ void on_socket_polled(uv_poll_t* poller, int status, int events)
context->pollerState = PollerContext::PollerState::Connected;
} break;
case UV_DISCONNECT: {
ZoneScopedN("socket disconnect");
LOG(debug) << "socket polled UV_DISCONNECT";
O2_SIGNPOST_END(device, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
} break;
case UV_PRIORITIZED: {
ZoneScopedN("socket prioritized");
LOG(debug) << "socket polled UV_PRIORITIZED";
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for context %{public}s", context->name);
} break;
}
// We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
Expand Down Expand Up @@ -873,15 +877,19 @@ void DataProcessingDevice::startPollers()
auto& deviceContext = ref.get<DeviceContext>();
auto& state = ref.get<DeviceState>();

for (auto& poller : state.activeInputPollers) {
for (auto* poller : state.activeInputPollers) {
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
}
for (auto& poller : state.activeOutOfBandPollers) {
uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
}
for (auto& poller : state.activeOutputPollers) {
for (auto* poller : state.activeOutputPollers) {
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
}
Expand All @@ -897,17 +905,21 @@ void DataProcessingDevice::stopPollers()
auto& deviceContext = ref.get<DeviceContext>();
auto& state = ref.get<DeviceState>();
LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
for (auto& poller : state.activeInputPollers) {
for (auto* poller : state.activeInputPollers) {
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
uv_poll_stop(poller);
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
}
LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
for (auto& poller : state.activeOutOfBandPollers) {
for (auto* poller : state.activeOutOfBandPollers) {
uv_poll_stop(poller);
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
}
LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
for (auto& poller : state.activeOutputPollers) {
for (auto* poller : state.activeOutputPollers) {
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
uv_poll_stop(poller);
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
}
Expand Down
40 changes: 40 additions & 0 deletions Framework/Core/src/WSDriverClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
#include "Framework/DeviceSpec.h"
#include "DriverClientContext.h"
#include "DPLWebSocket.h"
#include "Framework/Signpost.h"
#include <uv.h>
#include <string_view>
#include <charconv>

O2_DECLARE_DYNAMIC_LOG(device);
O2_DECLARE_DYNAMIC_LOG(completion);
O2_DECLARE_DYNAMIC_LOG(monitoring_service);

namespace o2::framework
{

Expand Down Expand Up @@ -152,6 +157,41 @@ void on_connect(uv_connect_t* connection, int status)
state.tracingFlags = tracingFlags;
});

client->observe("/log-streams", [ref = context->ref](std::string_view cmd) {
auto& state = ref.get<DeviceState>();
static constexpr int prefixSize = std::string_view{"/log-streams "}.size();
if (prefixSize > cmd.size()) {
LOG(error) << "Malformed log-streams request";
return;
}
cmd.remove_prefix(prefixSize);
int logStreams = 0;

auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams);
if (error.ec != std::errc()) {
LOG(error) << "Malformed log-streams mask";
return;
}
LOGP(info, "Logstreams flags set to {}", logStreams);
state.logStreams = logStreams;
if ((state.logStreams & DeviceState::LogStreams::DEVICE_LOG) != 0) {
O2_LOG_ENABLE(device);
} else {
O2_LOG_DISABLE(device);
}
if ((state.logStreams & DeviceState::LogStreams::COMPLETION_LOG) != 0) {
O2_LOG_ENABLE(completion);
} else {
O2_LOG_DISABLE(completion);
}

if ((state.logStreams & DeviceState::LogStreams::MONITORING_SERVICE_LOG) != 0) {
O2_LOG_ENABLE(monitoring_service);
} else {
O2_LOG_DISABLE(monitoring_service);
}
});

// Client will be filled in the line after. I can probably have a single
// client per device.
auto dplClient = std::make_unique<WSDPLClient>();
Expand Down
Loading