diff --git a/Framework/Core/include/Framework/TMessageSerializer.h b/Framework/Core/include/Framework/TMessageSerializer.h index b457f0ffc5f99..05efeec297d82 100644 --- a/Framework/Core/include/Framework/TMessageSerializer.h +++ b/Framework/Core/include/Framework/TMessageSerializer.h @@ -26,9 +26,7 @@ #include #include -namespace o2 -{ -namespace framework +namespace o2::framework { class FairTMessage; @@ -51,28 +49,23 @@ class FairTMessage : public TMessage struct TMessageSerializer { using StreamerList = std::vector; using CompressionLevel = int; - enum class CacheStreamers { yes, - no }; static void Serialize(fair::mq::Message& msg, const TObject* input, - CacheStreamers streamers = CacheStreamers::no, CompressionLevel compressionLevel = -1); template static void Serialize(fair::mq::Message& msg, const T* input, const TClass* cl, // - CacheStreamers streamers = CacheStreamers::no, // CompressionLevel compressionLevel = -1); template static void Deserialize(const fair::mq::Message& msg, std::unique_ptr& output); static void serialize(FairTMessage& msg, const TObject* input, - CacheStreamers streamers = CacheStreamers::no, CompressionLevel compressionLevel = -1); template - static void serialize(FairTMessage& msg, const T* input, // - const TClass* cl, CacheStreamers streamers = CacheStreamers::no, // + static void serialize(FairTMessage& msg, const T* input, // + const TClass* cl, CompressionLevel compressionLevel = -1); template @@ -80,45 +73,22 @@ struct TMessageSerializer { template static inline std::unique_ptr deserialize(std::byte* buffer, size_t size); - // load the schema information from a message/buffer - static void loadSchema(const fair::mq::Message& msg); - static void loadSchema(gsl::span buffer); - - // write the schema into an empty message/buffer - static void fillSchema(fair::mq::Message& msg, const StreamerList& streamers); - static void fillSchema(FairTMessage& msg, const StreamerList& streamers); - // get the streamers static StreamerList getStreamers(); - // update the streamer list with infos appropriate for this type - static void updateStreamers(const TObject* object); - private: - // update the cache of streamer infos for serialized classes - static void updateStreamers(const FairTMessage& message, StreamerList& streamers); - - // for now this is a static, maybe it would be better to move the storage somewhere else? - static StreamerList sStreamers; - static std::mutex sStreamersLock; }; inline void TMessageSerializer::serialize(FairTMessage& tm, const TObject* input, - CacheStreamers streamers, CompressionLevel compressionLevel) { - return serialize(tm, input, nullptr, streamers, compressionLevel); + return serialize(tm, input, nullptr, compressionLevel); } template -inline void TMessageSerializer::serialize(FairTMessage& tm, const T* input, // - const TClass* cl, CacheStreamers streamers, // - CompressionLevel compressionLevel) +inline void TMessageSerializer::serialize(FairTMessage& tm, const T* input, // + const TClass* cl, CompressionLevel compressionLevel) { - if (streamers == CacheStreamers::yes) { - tm.EnableSchemaEvolution(true); - } - if (compressionLevel >= 0) { // if negative, skip to use ROOT default tm.SetCompressionLevel(compressionLevel); @@ -130,10 +100,6 @@ inline void TMessageSerializer::serialize(FairTMessage& tm, const T* input, } else { tm.WriteObjectAny(input, cl); } - - if (streamers == CacheStreamers::yes) { - updateStreamers(tm, sStreamers); - } } template @@ -172,26 +138,24 @@ inline void FairTMessage::free(void* /*data*/, void* hint) } inline void TMessageSerializer::Serialize(fair::mq::Message& msg, const TObject* input, - TMessageSerializer::CacheStreamers streamers, TMessageSerializer::CompressionLevel compressionLevel) { std::unique_ptr tm = std::make_unique(kMESS_OBJECT); - serialize(*tm, input, input->Class(), streamers, compressionLevel); + serialize(*tm, input, input->Class(), compressionLevel); msg.Rebuild(tm->Buffer(), tm->BufferSize(), FairTMessage::free, tm.get()); tm.release(); } template -inline void TMessageSerializer::Serialize(fair::mq::Message& msg, const T* input, // - const TClass* cl, // - TMessageSerializer::CacheStreamers streamers, // +inline void TMessageSerializer::Serialize(fair::mq::Message& msg, const T* input, // + const TClass* cl, // TMessageSerializer::CompressionLevel compressionLevel) { std::unique_ptr tm = std::make_unique(kMESS_OBJECT); - serialize(*tm, input, cl, streamers, compressionLevel); + serialize(*tm, input, cl, compressionLevel); msg.Rebuild(tm->Buffer(), tm->BufferSize(), FairTMessage::free, tm.get()); tm.release(); @@ -205,12 +169,6 @@ inline void TMessageSerializer::Deserialize(const fair::mq::Message& msg, std::u output = deserialize(as_span(msg)); } -inline TMessageSerializer::StreamerList TMessageSerializer::getStreamers() -{ - std::lock_guard lock{TMessageSerializer::sStreamersLock}; - return sStreamers; -} - // gsl::narrow is used to do a runtime narrowing check, this might be a bit paranoid, // we would probably be fine with e.g. gsl::narrow_cast (or just a static_cast) inline gsl::span as_span(const fair::mq::Message& msg) @@ -224,6 +182,5 @@ inline gsl::span as_span(const FairTMessage& msg) gsl::narrow::size_type>(msg.BufferSize())}; } -} // namespace framework } // namespace o2 #endif // FRAMEWORK_TMESSAGESERIALIZER_H diff --git a/Framework/Core/src/TMessageSerializer.cxx b/Framework/Core/src/TMessageSerializer.cxx index 6276cd74152cc..5388a6d716cda 100644 --- a/Framework/Core/src/TMessageSerializer.cxx +++ b/Framework/Core/src/TMessageSerializer.cxx @@ -13,92 +13,3 @@ #include using namespace o2::framework; - -TMessageSerializer::StreamerList TMessageSerializer::sStreamers{}; -std::mutex TMessageSerializer::sStreamersLock{}; - -void TMessageSerializer::loadSchema(gsl::span buffer) -{ - std::unique_ptr obj = deserialize(buffer); - - TObjArray* pSchemas = dynamic_cast(obj.get()); - if (!pSchemas) { - return; - } - - // TODO: this is a bit of a problem in general: non-owning ROOT containers should become - // owners at deserialize, otherwise there is a leak. Switch to a better container. - pSchemas->SetOwner(kTRUE); - - for (int i = 0; i < pSchemas->GetEntriesFast(); i++) { - TStreamerInfo* pSchema = dynamic_cast(pSchemas->At(i)); - if (!pSchema) { - continue; - } - int version = pSchema->GetClassVersion(); - TClass* pClass = TClass::GetClass(pSchema->GetName()); - if (!pClass) { - continue; - } - if (pClass->GetClassVersion() == version) { - continue; - } - TObjArray* pInfos = const_cast(pClass->GetStreamerInfos()); - if (!pInfos) { - continue; - } - TVirtualStreamerInfo* pInfo = dynamic_cast(pInfos->At(version)); - if (pInfo) { - continue; - } - pSchema->SetClass(pClass); - pSchema->BuildOld(); - pInfos->AddAtAndExpand(pSchema, version); - pSchemas->Remove(pSchema); - } -} - -void TMessageSerializer::fillSchema(FairTMessage& msg, const StreamerList& streamers) -{ - // TODO: this is a bit of a problem in general: non-owning ROOT containers should become - // owners at deserialize, otherwise there is a leak. Switch to a better container. - TObjArray infoArray{}; - for (const auto& info : streamers) { - infoArray.Add(info); - } - serialize(msg, &infoArray); -} - -void TMessageSerializer::loadSchema(const fair::mq::Message& msg) { loadSchema(as_span(msg)); } -void TMessageSerializer::fillSchema(fair::mq::Message& msg, const StreamerList& streamers) -{ - // TODO: this is a bit of a problem in general: non-owning ROOT containers should become - // owners at deserialize, otherwise there is a leak. Switch to a better container. - TObjArray infoArray{}; - for (const auto& info : streamers) { - infoArray.Add(info); - } - Serialize(msg, &infoArray); -} - -void TMessageSerializer::updateStreamers(const FairTMessage& message, StreamerList& streamers) -{ - std::lock_guard lock{TMessageSerializer::sStreamersLock}; - - TIter nextStreamer(message.GetStreamerInfos()); // unfortunately ROOT uses TList* here - // this looks like we could use std::map here. - while (TVirtualStreamerInfo* in = static_cast(nextStreamer())) { - auto found = std::find_if(streamers.begin(), streamers.end(), [&](const auto& old) { - return (old->GetName() == in->GetName() && old->GetClassVersion() == in->GetClassVersion()); - }); - if (found == streamers.end()) { - streamers.push_back(in); - } - } -} - -void TMessageSerializer::updateStreamers(const TObject* object) -{ - FairTMessage msg(kMESS_OBJECT); - serialize(msg, object, CacheStreamers::yes, CompressionLevel{0}); -}