Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntuple] Fix serialization when a header extension is already present #17597

Merged
merged 4 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions tree/ntuple/v7/inc/ROOT/RNTupleDescriptor.hxx
silverweed marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class RClusterGroupDescriptorBuilder;
class RExtraTypeInfoDescriptorBuilder;
class RFieldDescriptorBuilder;
class RNTupleDescriptorBuilder;

RNTupleDescriptor CloneDescriptorSchema(const RNTupleDescriptor &desc);
} // namespace Internal

// clang-format off
Expand Down Expand Up @@ -538,6 +540,7 @@ writte struct. This allows for forward and backward compatibility when the meta-
// clang-format on
class RNTupleDescriptor final {
friend class Internal::RNTupleDescriptorBuilder;
friend RNTupleDescriptor Internal::CloneDescriptorSchema(const RNTupleDescriptor &desc);

public:
class RHeaderExtension;
Expand All @@ -548,15 +551,26 @@ private:
/// Free text from the user
std::string fDescription;

std::uint64_t fOnDiskHeaderXxHash3 = 0; ///< Set by the descriptor builder when deserialized
std::uint64_t fOnDiskHeaderSize = 0; ///< Set by the descriptor builder when deserialized
std::uint64_t fOnDiskFooterSize = 0; ///< Like fOnDiskHeaderSize, contains both cluster summaries and page locations
DescriptorId_t fFieldZeroId = kInvalidDescriptorId; ///< Set by the descriptor builder

std::uint64_t fNEntries = 0; ///< Updated by the descriptor builder when the cluster groups are added
std::uint64_t fNClusters = 0; ///< Updated by the descriptor builder when the cluster groups are added
std::uint64_t fNPhysicalColumns = 0; ///< Updated by the descriptor builder when columns are added

DescriptorId_t fFieldZeroId = kInvalidDescriptorId; ///< Set by the descriptor builder
std::set<unsigned int> fFeatureFlags;
std::unordered_map<DescriptorId_t, RFieldDescriptor> fFieldDescriptors;
std::unordered_map<DescriptorId_t, RColumnDescriptor> fColumnDescriptors;

std::vector<RExtraTypeInfoDescriptor> fExtraTypeInfoDescriptors;
std::unique_ptr<RHeaderExtension> fHeaderExtension;

//// All fields above are part of the schema and are cloned when creating a new descriptor from a given one
//// (see CloneSchema())

std::uint64_t fOnDiskHeaderSize = 0; ///< Set by the descriptor builder when deserialized
std::uint64_t fOnDiskHeaderXxHash3 = 0; ///< Set by the descriptor builder when deserialized
std::uint64_t fOnDiskFooterSize = 0; ///< Like fOnDiskHeaderSize, contains both cluster summaries and page locations

std::uint64_t fNEntries = 0; ///< Updated by the descriptor builder when the cluster groups are added
std::uint64_t fNClusters = 0; ///< Updated by the descriptor builder when the cluster groups are added

/**
* Once constructed by an RNTupleDescriptorBuilder, the descriptor is mostly immutable except for set of
Expand All @@ -567,9 +581,6 @@ private:
*/
std::uint64_t fGeneration = 0;

std::set<unsigned int> fFeatureFlags;
std::unordered_map<DescriptorId_t, RFieldDescriptor> fFieldDescriptors;
std::unordered_map<DescriptorId_t, RColumnDescriptor> fColumnDescriptors;
std::unordered_map<DescriptorId_t, RClusterGroupDescriptor> fClusterGroupDescriptors;
/// References cluster groups sorted by entry range and thus allows for binary search.
/// Note that this list is empty during the descriptor building process and will only be
Expand All @@ -578,12 +589,15 @@ private:
/// May contain only a subset of all the available clusters, e.g. the clusters of the current file
/// from a chain of files
std::unordered_map<DescriptorId_t, RClusterDescriptor> fClusterDescriptors;
std::vector<RExtraTypeInfoDescriptor> fExtraTypeInfoDescriptors;
std::unique_ptr<RHeaderExtension> fHeaderExtension;

// We don't expose this publicly because when we add sharded clusters, this interface does not make sense anymore
DescriptorId_t FindClusterId(NTupleSize_t entryIdx) const;

/// Creates a descriptor containing only the schema information about this RNTuple, i.e. all the information needed
/// to create a new RNTuple with the same schema as this one but not necessarily the same clustering. This is used
/// when merging two RNTuples.
RNTupleDescriptor CloneSchema() const;

public:
static constexpr unsigned int kFeatureFlagTest = 137; // Bit reserved for forward-compatibility testing

Expand Down Expand Up @@ -1037,6 +1051,13 @@ public:
/// We cannot create this vector when building the fFields because at the time when AddExtendedField is called,
/// the field is not yet linked into the schema tree.
std::vector<DescriptorId_t> GetTopLevelFields(const RNTupleDescriptor &desc) const;

bool ContainsField(DescriptorId_t fieldId) const { return fFieldIdsLookup.find(fieldId) != fFieldIdsLookup.end(); }
bool ContainsExtendedColumnRepresentation(DescriptorId_t columnId) const
{
return std::find(fExtendedColumnRepresentations.begin(), fExtendedColumnRepresentations.end(), columnId) !=
fExtendedColumnRepresentations.end();
}
};

namespace Internal {
Expand Down Expand Up @@ -1399,6 +1420,10 @@ public:
const RNTupleDescriptor &GetDescriptor() const { return fDescriptor; }
RNTupleDescriptor MoveDescriptor();

/// Copies the "schema" part of `descriptor` into the builder's descriptor.
/// This resets the builder's descriptor.
void SetSchemaFromExisting(const RNTupleDescriptor &descriptor);

void SetNTuple(const std::string_view name, const std::string_view description);
void SetFeature(unsigned int flag);

Expand Down Expand Up @@ -1448,6 +1473,11 @@ public:
RNTupleSerializer::StreamerInfoMap_t BuildStreamerInfos() const;
};

inline RNTupleDescriptor CloneDescriptorSchema(const RNTupleDescriptor &desc)
{
return desc.CloneSchema();
}

} // namespace Internal
} // namespace Experimental
} // namespace ROOT
Expand Down
9 changes: 6 additions & 3 deletions tree/ntuple/v7/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ protected:
/// with the page source, we leave it up to the derived class whether or not the compressor gets constructed.
std::unique_ptr<RNTupleCompressor> fCompressor;

/// Flag if sink was initialized
bool fIsInitialized = false;

/// Helper for streaming a page. This is commonly used in derived, concrete page sinks. Note that if
/// compressionSetting is 0 (uncompressed) and the page is mappable and not checksummed, the returned sealed page
/// will point directly to the input page buffer. Otherwise, the sealed page references an internal buffer
Expand All @@ -289,8 +292,6 @@ protected:
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element);

private:
/// Flag if sink was initialized
bool fIsInitialized = false;
std::vector<Callback_t> fOnDatasetCommitCallbacks;
std::vector<unsigned char> fSealPageBuffer; ///< Used as destination buffer in the simple SealPage overload

Expand Down Expand Up @@ -529,7 +530,9 @@ public:
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final;

/// Initialize sink based on an existing descriptor and fill into the descriptor builder.
void InitFromDescriptor(const RNTupleDescriptor &descriptor);
/// \return The model created from the new sink's descriptor. This model should be kept alive
/// for at least as long as the sink.
[[nodiscard]] std::unique_ptr<RNTupleModel> InitFromDescriptor(const RNTupleDescriptor &descriptor);

void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
Expand Down
41 changes: 30 additions & 11 deletions tree/ntuple/v7/src/RNTupleDescriptor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -663,32 +663,46 @@ ROOT::Experimental::RNTupleDescriptor::CreateModel(const RCreateModelOptions &op
return model;
}

ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::RNTupleDescriptor::Clone() const
ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::RNTupleDescriptor::CloneSchema() const
silverweed marked this conversation as resolved.
Show resolved Hide resolved
silverweed marked this conversation as resolved.
Show resolved Hide resolved
{
RNTupleDescriptor clone;
clone.fName = fName;
clone.fDescription = fDescription;
clone.fOnDiskHeaderXxHash3 = fOnDiskHeaderXxHash3;
clone.fOnDiskHeaderSize = fOnDiskHeaderSize;
clone.fOnDiskFooterSize = fOnDiskFooterSize;
clone.fNEntries = fNEntries;
clone.fNClusters = fNClusters;
clone.fNPhysicalColumns = fNPhysicalColumns;
clone.fFieldZeroId = fFieldZeroId;
clone.fGeneration = fGeneration;
clone.fFeatureFlags = fFeatureFlags;
// OnDiskHeaderSize, OnDiskHeaderXxHash3 not copied because they may come from a merged header + extension header
// and therefore not represent the actual sources's header.
// OnDiskFooterSize not copied because it contains information beyond the schema, for example the clustering.

for (const auto &d : fFieldDescriptors)
clone.fFieldDescriptors.emplace(d.first, d.second.Clone());
for (const auto &d : fColumnDescriptors)
clone.fColumnDescriptors.emplace(d.first, d.second.Clone());

for (const auto &d : fExtraTypeInfoDescriptors)
clone.fExtraTypeInfoDescriptors.emplace_back(d.Clone());
if (fHeaderExtension)
clone.fHeaderExtension = std::make_unique<RHeaderExtension>(*fHeaderExtension);

return clone;
}

ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::RNTupleDescriptor::Clone() const
{
RNTupleDescriptor clone = CloneSchema();

clone.fOnDiskHeaderSize = fOnDiskHeaderSize;
clone.fOnDiskHeaderXxHash3 = fOnDiskHeaderXxHash3;
silverweed marked this conversation as resolved.
Show resolved Hide resolved
clone.fOnDiskFooterSize = fOnDiskFooterSize;
clone.fNEntries = fNEntries;
clone.fNClusters = fNClusters;
clone.fGeneration = fGeneration;
for (const auto &d : fClusterGroupDescriptors)
clone.fClusterGroupDescriptors.emplace(d.first, d.second.Clone());
clone.fSortedClusterGroupIds = fSortedClusterGroupIds;
for (const auto &d : fClusterDescriptors)
clone.fClusterDescriptors.emplace(d.first, d.second.Clone());
for (const auto &d : fExtraTypeInfoDescriptors)
clone.fExtraTypeInfoDescriptors.emplace_back(d.Clone());
if (fHeaderExtension)
clone.fHeaderExtension = std::make_unique<RHeaderExtension>(*fHeaderExtension);
return clone;
}

Expand Down Expand Up @@ -1199,6 +1213,11 @@ void ROOT::Experimental::Internal::RNTupleDescriptorBuilder::Reset()
fDescriptor.fHeaderExtension.reset();
}

void ROOT::Experimental::Internal::RNTupleDescriptorBuilder::SetSchemaFromExisting(const RNTupleDescriptor &descriptor)
{
fDescriptor = descriptor.CloneSchema();
}

void ROOT::Experimental::Internal::RNTupleDescriptorBuilder::BeginHeaderExtension()
{
if (!fDescriptor.fHeaderExtension)
Expand Down
4 changes: 3 additions & 1 deletion tree/ntuple/v7/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,15 @@ try {
assert(compression);
writeOpts.SetCompression(*compression);
auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
// TODO: currently unused, will be used by the merger in a future change.
std::unique_ptr<RNTupleModel> model;

// If we already have an existing RNTuple, copy over its descriptor to support incremental merging
if (outNTuple) {
auto outSource = RPageSourceFile::CreateFromAnchor(*outNTuple);
outSource->Attach();
auto desc = outSource->GetSharedDescriptorGuard();
destination->InitFromDescriptor(desc.GetRef());
model = destination->InitFromDescriptor(desc.GetRef());
}

// Interface conversion
Expand Down
38 changes: 29 additions & 9 deletions tree/ntuple/v7/src/RNTupleSerialize.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,23 @@ std::uint32_t SerializePhysicalColumn(const ROOT::Experimental::RColumnDescripto
std::uint32_t SerializeColumnsOfFields(const ROOT::Experimental::RNTupleDescriptor &desc,
std::span<const ROOT::Experimental::DescriptorId_t> fieldList,
const ROOT::Experimental::Internal::RNTupleSerializer::RContext &context,
void *buffer)
void *buffer, bool forHeaderExtension)
{
auto base = reinterpret_cast<unsigned char *>(buffer);
auto pos = base;
void **where = (buffer == nullptr) ? &buffer : reinterpret_cast<void **>(&pos);

const auto *xHeader = !forHeaderExtension ? desc.GetHeaderExtension() : nullptr;

for (auto parentId : fieldList) {
// If we're serializing the non-extended header and we already have a header extension (which may happen if
// we load an RNTuple for incremental merging), we need to skip all the extended fields, as they need to be
// written in the header extension, not in the regular header.
if (xHeader && xHeader->ContainsField(parentId))
jblomer marked this conversation as resolved.
Show resolved Hide resolved
continue;

for (const auto &c : desc.GetColumnIterable(parentId)) {
if (c.IsAliasColumn())
if (c.IsAliasColumn() || (xHeader && xHeader->ContainsExtendedColumnRepresentation(c.GetLogicalId())))
continue;

pos += SerializePhysicalColumn(c, context, *where);
Expand Down Expand Up @@ -476,15 +484,20 @@ std::uint32_t SerializeAliasColumn(const ROOT::Experimental::RColumnDescriptor &
std::uint32_t SerializeAliasColumnsOfFields(const ROOT::Experimental::RNTupleDescriptor &desc,
std::span<const ROOT::Experimental::DescriptorId_t> fieldList,
const ROOT::Experimental::Internal::RNTupleSerializer::RContext &context,
void *buffer)
void *buffer, bool forHeaderExtension)
{
auto base = reinterpret_cast<unsigned char *>(buffer);
auto pos = base;
void **where = (buffer == nullptr) ? &buffer : reinterpret_cast<void **>(&pos);

const auto *xHeader = !forHeaderExtension ? desc.GetHeaderExtension() : nullptr;

for (auto parentId : fieldList) {
if (xHeader && xHeader->ContainsField(parentId))
jblomer marked this conversation as resolved.
Show resolved Hide resolved
continue;

for (const auto &c : desc.GetColumnIterable(parentId)) {
if (!c.IsAliasColumn())
if (!c.IsAliasColumn() || (xHeader && xHeader->ContainsExtendedColumnRepresentation(c.GetLogicalId())))
continue;

pos += SerializeAliasColumn(c, context, *where);
Expand Down Expand Up @@ -1301,9 +1314,16 @@ std::uint32_t ROOT::Experimental::Internal::RNTupleSerializer::SerializeSchemaDe
}
}
} else {
nFields = desc.GetNFields() - 1;
nColumns = desc.GetNPhysicalColumns();
nAliasColumns = desc.GetNLogicalColumns() - desc.GetNPhysicalColumns();
if (auto xHeader = desc.GetHeaderExtension()) {
nFields = desc.GetNFields() - xHeader->GetNFields() - 1;
nColumns = desc.GetNPhysicalColumns() - xHeader->GetNPhysicalColumns();
nAliasColumns = desc.GetNLogicalColumns() - desc.GetNPhysicalColumns() -
(xHeader->GetNLogicalColumns() - xHeader->GetNPhysicalColumns());
} else {
nFields = desc.GetNFields() - 1;
nColumns = desc.GetNPhysicalColumns();
nAliasColumns = desc.GetNLogicalColumns() - desc.GetNPhysicalColumns();
}
}
const auto nExtraTypeInfos = desc.GetNExtraTypeInfos();
const auto &onDiskFields = context.GetOnDiskFieldList();
Expand All @@ -1318,7 +1338,7 @@ std::uint32_t ROOT::Experimental::Internal::RNTupleSerializer::SerializeSchemaDe

frame = pos;
pos += SerializeListFramePreamble(nColumns, *where);
pos += SerializeColumnsOfFields(desc, fieldList, context, *where);
pos += SerializeColumnsOfFields(desc, fieldList, context, *where, forHeaderExtension);
for (const auto &c : extraColumns) {
if (!c.get().IsAliasColumn()) {
pos += SerializePhysicalColumn(c.get(), context, *where);
Expand All @@ -1328,7 +1348,7 @@ std::uint32_t ROOT::Experimental::Internal::RNTupleSerializer::SerializeSchemaDe

frame = pos;
pos += SerializeListFramePreamble(nAliasColumns, *where);
pos += SerializeAliasColumnsOfFields(desc, fieldList, context, *where);
pos += SerializeAliasColumnsOfFields(desc, fieldList, context, *where, forHeaderExtension);
for (const auto &c : extraColumns) {
if (c.get().IsAliasColumn()) {
pos += SerializeAliasColumn(c.get(), context, *where);
Expand Down
Loading
Loading