Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Please consider the following formatting changes to #12747 #270

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 10 additions & 53 deletions Framework/Core/include/Framework/TMessageSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
#include <mutex>
#include <cstddef>

namespace o2
{
namespace framework
namespace o2::framework
{
class FairTMessage;

Expand All @@ -51,74 +49,46 @@ class FairTMessage : public TMessage
struct TMessageSerializer {
using StreamerList = std::vector<TVirtualStreamerInfo*>;
using CompressionLevel = int;
enum class CacheStreamers { yes,
no };

static void Serialize(fair::mq::Message& msg, const TObject* input,
CacheStreamers streamers = CacheStreamers::no,
CompressionLevel compressionLevel = -1);

template <typename T>
static void Serialize(fair::mq::Message& msg, const T* input, const TClass* cl, //
CacheStreamers streamers = CacheStreamers::no, //
CompressionLevel compressionLevel = -1);

template <typename T = TObject>
static void Deserialize(const fair::mq::Message& msg, std::unique_ptr<T>& output);

static void serialize(FairTMessage& msg, const TObject* input,
CacheStreamers streamers = CacheStreamers::no,
CompressionLevel compressionLevel = -1);

template <typename T>
static void serialize(FairTMessage& msg, const T* input, //
const TClass* cl, CacheStreamers streamers = CacheStreamers::no, //
static void serialize(FairTMessage& msg, const T* input, //
const TClass* cl,
CompressionLevel compressionLevel = -1);

template <typename T = TObject>
static std::unique_ptr<T> deserialize(gsl::span<std::byte> buffer);
template <typename T = TObject>
static inline std::unique_ptr<T> deserialize(std::byte* buffer, size_t size);

// load the schema information from a message/buffer
static void loadSchema(const fair::mq::Message& msg);
static void loadSchema(gsl::span<std::byte> buffer);

// write the schema into an empty message/buffer
static void fillSchema(fair::mq::Message& msg, const StreamerList& streamers);
static void fillSchema(FairTMessage& msg, const StreamerList& streamers);

// get the streamers
static StreamerList getStreamers();

// update the streamer list with infos appropriate for this type
static void updateStreamers(const TObject* object);

private:
// update the cache of streamer infos for serialized classes
static void updateStreamers(const FairTMessage& message, StreamerList& streamers);

// for now this is a static, maybe it would be better to move the storage somewhere else?
static StreamerList sStreamers;
static std::mutex sStreamersLock;
};

inline void TMessageSerializer::serialize(FairTMessage& tm, const TObject* input,
CacheStreamers streamers,
CompressionLevel compressionLevel)
{
return serialize(tm, input, nullptr, streamers, compressionLevel);
return serialize(tm, input, nullptr, compressionLevel);
}

template <typename T>
inline void TMessageSerializer::serialize(FairTMessage& tm, const T* input, //
const TClass* cl, CacheStreamers streamers, //
CompressionLevel compressionLevel)
inline void TMessageSerializer::serialize(FairTMessage& tm, const T* input, //
const TClass* cl, CompressionLevel compressionLevel)
{
if (streamers == CacheStreamers::yes) {
tm.EnableSchemaEvolution(true);
}

if (compressionLevel >= 0) {
// if negative, skip to use ROOT default
tm.SetCompressionLevel(compressionLevel);
Expand All @@ -130,10 +100,6 @@ inline void TMessageSerializer::serialize(FairTMessage& tm, const T* input,
} else {
tm.WriteObjectAny(input, cl);
}

if (streamers == CacheStreamers::yes) {
updateStreamers(tm, sStreamers);
}
}

template <typename T>
Expand Down Expand Up @@ -172,26 +138,24 @@ inline void FairTMessage::free(void* /*data*/, void* hint)
}

inline void TMessageSerializer::Serialize(fair::mq::Message& msg, const TObject* input,
TMessageSerializer::CacheStreamers streamers,
TMessageSerializer::CompressionLevel compressionLevel)
{
std::unique_ptr<FairTMessage> tm = std::make_unique<FairTMessage>(kMESS_OBJECT);

serialize(*tm, input, input->Class(), streamers, compressionLevel);
serialize(*tm, input, input->Class(), compressionLevel);

msg.Rebuild(tm->Buffer(), tm->BufferSize(), FairTMessage::free, tm.get());
tm.release();
}

template <typename T>
inline void TMessageSerializer::Serialize(fair::mq::Message& msg, const T* input, //
const TClass* cl, //
TMessageSerializer::CacheStreamers streamers, //
inline void TMessageSerializer::Serialize(fair::mq::Message& msg, const T* input, //
const TClass* cl, //
TMessageSerializer::CompressionLevel compressionLevel)
{
std::unique_ptr<FairTMessage> tm = std::make_unique<FairTMessage>(kMESS_OBJECT);

serialize(*tm, input, cl, streamers, compressionLevel);
serialize(*tm, input, cl, compressionLevel);

msg.Rebuild(tm->Buffer(), tm->BufferSize(), FairTMessage::free, tm.get());
tm.release();
Expand All @@ -205,12 +169,6 @@ inline void TMessageSerializer::Deserialize(const fair::mq::Message& msg, std::u
output = deserialize(as_span(msg));
}

inline TMessageSerializer::StreamerList TMessageSerializer::getStreamers()
{
std::lock_guard<std::mutex> lock{TMessageSerializer::sStreamersLock};
return sStreamers;
}

// gsl::narrow is used to do a runtime narrowing check, this might be a bit paranoid,
// we would probably be fine with e.g. gsl::narrow_cast (or just a static_cast)
inline gsl::span<std::byte> as_span(const fair::mq::Message& msg)
Expand All @@ -224,6 +182,5 @@ inline gsl::span<std::byte> as_span(const FairTMessage& msg)
gsl::narrow<gsl::span<std::byte>::size_type>(msg.BufferSize())};
}

} // namespace framework
} // namespace o2
#endif // FRAMEWORK_TMESSAGESERIALIZER_H
89 changes: 0 additions & 89 deletions Framework/Core/src/TMessageSerializer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,92 +13,3 @@
#include <memory>

using namespace o2::framework;

TMessageSerializer::StreamerList TMessageSerializer::sStreamers{};
std::mutex TMessageSerializer::sStreamersLock{};

void TMessageSerializer::loadSchema(gsl::span<std::byte> buffer)
{
std::unique_ptr<TObject> obj = deserialize(buffer);

TObjArray* pSchemas = dynamic_cast<TObjArray*>(obj.get());
if (!pSchemas) {
return;
}

// TODO: this is a bit of a problem in general: non-owning ROOT containers should become
// owners at deserialize, otherwise there is a leak. Switch to a better container.
pSchemas->SetOwner(kTRUE);

for (int i = 0; i < pSchemas->GetEntriesFast(); i++) {
TStreamerInfo* pSchema = dynamic_cast<TStreamerInfo*>(pSchemas->At(i));
if (!pSchema) {
continue;
}
int version = pSchema->GetClassVersion();
TClass* pClass = TClass::GetClass(pSchema->GetName());
if (!pClass) {
continue;
}
if (pClass->GetClassVersion() == version) {
continue;
}
TObjArray* pInfos = const_cast<TObjArray*>(pClass->GetStreamerInfos());
if (!pInfos) {
continue;
}
TVirtualStreamerInfo* pInfo = dynamic_cast<TVirtualStreamerInfo*>(pInfos->At(version));
if (pInfo) {
continue;
}
pSchema->SetClass(pClass);
pSchema->BuildOld();
pInfos->AddAtAndExpand(pSchema, version);
pSchemas->Remove(pSchema);
}
}

void TMessageSerializer::fillSchema(FairTMessage& msg, const StreamerList& streamers)
{
// TODO: this is a bit of a problem in general: non-owning ROOT containers should become
// owners at deserialize, otherwise there is a leak. Switch to a better container.
TObjArray infoArray{};
for (const auto& info : streamers) {
infoArray.Add(info);
}
serialize(msg, &infoArray);
}

void TMessageSerializer::loadSchema(const fair::mq::Message& msg) { loadSchema(as_span(msg)); }
void TMessageSerializer::fillSchema(fair::mq::Message& msg, const StreamerList& streamers)
{
// TODO: this is a bit of a problem in general: non-owning ROOT containers should become
// owners at deserialize, otherwise there is a leak. Switch to a better container.
TObjArray infoArray{};
for (const auto& info : streamers) {
infoArray.Add(info);
}
Serialize(msg, &infoArray);
}

void TMessageSerializer::updateStreamers(const FairTMessage& message, StreamerList& streamers)
{
std::lock_guard<std::mutex> lock{TMessageSerializer::sStreamersLock};

TIter nextStreamer(message.GetStreamerInfos()); // unfortunately ROOT uses TList* here
// this looks like we could use std::map here.
while (TVirtualStreamerInfo* in = static_cast<TVirtualStreamerInfo*>(nextStreamer())) {
auto found = std::find_if(streamers.begin(), streamers.end(), [&](const auto& old) {
return (old->GetName() == in->GetName() && old->GetClassVersion() == in->GetClassVersion());
});
if (found == streamers.end()) {
streamers.push_back(in);
}
}
}

void TMessageSerializer::updateStreamers(const TObject* object)
{
FairTMessage msg(kMESS_OBJECT);
serialize(msg, object, CacheStreamers::yes, CompressionLevel{0});
}
Loading