Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntuple] properly support incremental merging with Union mode #17563

Closed
wants to merge 9 commits into from
31 changes: 22 additions & 9 deletions tree/ntuple/v7/inc/ROOT/RNTupleDescriptor.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -548,15 +548,26 @@ private:
/// Free text from the user
std::string fDescription;

std::uint64_t fOnDiskHeaderXxHash3 = 0; ///< Set by the descriptor builder when deserialized
DescriptorId_t fFieldZeroId = kInvalidDescriptorId; ///< Set by the descriptor builder

std::uint64_t fNPhysicalColumns = 0; ///< Updated by the descriptor builder when columns are added
std::uint64_t fOnDiskHeaderSize = 0; ///< Set by the descriptor builder when deserialized

std::set<unsigned int> fFeatureFlags;
std::unordered_map<DescriptorId_t, RFieldDescriptor> fFieldDescriptors;
std::unordered_map<DescriptorId_t, RColumnDescriptor> fColumnDescriptors;

std::vector<RExtraTypeInfoDescriptor> fExtraTypeInfoDescriptors;
std::unique_ptr<RHeaderExtension> fHeaderExtension;

//// All fields above are part of the schema and are cloned when creating a new descriptor from a given one
//// (see CloneSchema())

std::uint64_t fOnDiskHeaderXxHash3 = 0; ///< Set by the descriptor builder when deserialized
std::uint64_t fOnDiskFooterSize = 0; ///< Like fOnDiskHeaderSize, contains both cluster summaries and page locations

std::uint64_t fNEntries = 0; ///< Updated by the descriptor builder when the cluster groups are added
std::uint64_t fNClusters = 0; ///< Updated by the descriptor builder when the cluster groups are added
std::uint64_t fNPhysicalColumns = 0; ///< Updated by the descriptor builder when columns are added

DescriptorId_t fFieldZeroId = kInvalidDescriptorId; ///< Set by the descriptor builder

/**
* Once constructed by an RNTupleDescriptorBuilder, the descriptor is mostly immutable except for set of
Expand All @@ -567,9 +578,6 @@ private:
*/
std::uint64_t fGeneration = 0;

std::set<unsigned int> fFeatureFlags;
std::unordered_map<DescriptorId_t, RFieldDescriptor> fFieldDescriptors;
std::unordered_map<DescriptorId_t, RColumnDescriptor> fColumnDescriptors;
std::unordered_map<DescriptorId_t, RClusterGroupDescriptor> fClusterGroupDescriptors;
/// References cluster groups sorted by entry range and thus allows for binary search.
/// Note that this list is empty during the descriptor building process and will only be
Expand All @@ -578,12 +586,15 @@ private:
/// May contain only a subset of all the available clusters, e.g. the clusters of the current file
/// from a chain of files
std::unordered_map<DescriptorId_t, RClusterDescriptor> fClusterDescriptors;
std::vector<RExtraTypeInfoDescriptor> fExtraTypeInfoDescriptors;
std::unique_ptr<RHeaderExtension> fHeaderExtension;

// We don't expose this publicly because when we add sharded clusters, this interface does not make sense anymore
DescriptorId_t FindClusterId(NTupleSize_t entryIdx) const;

/// Fills `into` with the schema information about this RNTuple, i.e. all the information needed to create
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

into isn't a parameter, is this comment up-to-date?

/// a new RNTuple with the same schema as this one but not necessarily the same clustering. This is used
/// when merging two RNTuples.
RNTupleDescriptor CloneSchema() const;

public:
static constexpr unsigned int kFeatureFlagTest = 137; // Bit reserved for forward-compatibility testing

Expand Down Expand Up @@ -1399,6 +1410,8 @@ public:
const RNTupleDescriptor &GetDescriptor() const { return fDescriptor; }
RNTupleDescriptor MoveDescriptor();

void CreateFromSchema(const RNTupleDescriptor &descriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this method name is a bit vague, perhaps something like SetSchemaFromExisting?


void SetNTuple(const std::string_view name, const std::string_view description);
void SetFeature(unsigned int flag);

Expand Down
7 changes: 4 additions & 3 deletions tree/ntuple/v7/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ protected:
/// with the page source, we leave it up to the derived class whether or not the compressor gets constructed.
std::unique_ptr<RNTupleCompressor> fCompressor;

/// Flag if sink was initialized
bool fIsInitialized = false;

/// Helper for streaming a page. This is commonly used in derived, concrete page sinks. Note that if
/// compressionSetting is 0 (uncompressed) and the page is mappable and not checksummed, the returned sealed page
/// will point directly to the input page buffer. Otherwise, the sealed page references an internal buffer
Expand All @@ -289,8 +292,6 @@ protected:
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element);

private:
/// Flag if sink was initialized
bool fIsInitialized = false;
std::vector<Callback_t> fOnDatasetCommitCallbacks;
std::vector<unsigned char> fSealPageBuffer; ///< Used as destination buffer in the simple SealPage overload

Expand Down Expand Up @@ -529,7 +530,7 @@ public:
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final;

/// Initialize sink based on an existing descriptor and fill into the descriptor builder.
void InitFromDescriptor(const RNTupleDescriptor &descriptor);
[[nodiscard]] std::unique_ptr<RNTupleModel> InitFromDescriptor(const RNTupleDescriptor &descriptor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not only nodiscard, but the user must also ensure a lifetime longer than the sink. I'm not sure if it is possible to make this clear from the interface, did you consider passing the model as a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you consider passing the model as a parameter?

How would this change the lifetime problem compared to the return value? In the end we have no control on when the model gets destroyed on the caller's site either way, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the lifetime must be at least as long as the sink, not necessarily longer (it's fine if they get dropped at the same time)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this change the lifetime problem compared to the return value?

That's true. IMO it makes it less likely to use wrongly than a unique ptr return value where you had to apply a [[nodiscard]] to avoid one source of problems already.

the lifetime must be at least as long as the sink, not necessarily longer

Actually, I think we have to destroy the model before the sink because the fields are connected to the sink. We have to check RNTupleFillContext regarding the order of destruction.


void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
Expand Down
38 changes: 27 additions & 11 deletions tree/ntuple/v7/src/RNTupleDescriptor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -663,32 +663,43 @@ ROOT::Experimental::RNTupleDescriptor::CreateModel(const RCreateModelOptions &op
return model;
}

ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::RNTupleDescriptor::Clone() const
ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::RNTupleDescriptor::CloneSchema() const
{
RNTupleDescriptor clone;
clone.fName = fName;
clone.fDescription = fDescription;
clone.fOnDiskHeaderXxHash3 = fOnDiskHeaderXxHash3;
clone.fOnDiskHeaderSize = fOnDiskHeaderSize;
clone.fOnDiskFooterSize = fOnDiskFooterSize;
clone.fNEntries = fNEntries;
clone.fNClusters = fNClusters;
clone.fNPhysicalColumns = fNPhysicalColumns;
clone.fFieldZeroId = fFieldZeroId;
clone.fGeneration = fGeneration;
clone.fOnDiskHeaderSize = fOnDiskHeaderSize;
// OnDiskFooterSize not copied because it would not be correct (since we change the clustering)

for (const auto &d : fFieldDescriptors)
clone.fFieldDescriptors.emplace(d.first, d.second.Clone());
for (const auto &d : fColumnDescriptors)
clone.fColumnDescriptors.emplace(d.first, d.second.Clone());

for (const auto &d : fExtraTypeInfoDescriptors)
clone.fExtraTypeInfoDescriptors.emplace_back(d.Clone());
if (fHeaderExtension)
clone.fHeaderExtension = std::make_unique<RHeaderExtension>(*fHeaderExtension);

return clone;
}

ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::RNTupleDescriptor::Clone() const
{
RNTupleDescriptor clone = CloneSchema();

clone.fOnDiskHeaderXxHash3 = fOnDiskHeaderXxHash3;
clone.fOnDiskFooterSize = fOnDiskFooterSize;
clone.fNEntries = fNEntries;
clone.fNClusters = fNClusters;
clone.fGeneration = fGeneration;
for (const auto &d : fClusterGroupDescriptors)
clone.fClusterGroupDescriptors.emplace(d.first, d.second.Clone());
clone.fSortedClusterGroupIds = fSortedClusterGroupIds;
for (const auto &d : fClusterDescriptors)
clone.fClusterDescriptors.emplace(d.first, d.second.Clone());
for (const auto &d : fExtraTypeInfoDescriptors)
clone.fExtraTypeInfoDescriptors.emplace_back(d.Clone());
if (fHeaderExtension)
clone.fHeaderExtension = std::make_unique<RHeaderExtension>(*fHeaderExtension);
return clone;
}

Expand Down Expand Up @@ -1199,6 +1210,11 @@ void ROOT::Experimental::Internal::RNTupleDescriptorBuilder::Reset()
fDescriptor.fHeaderExtension.reset();
}

void ROOT::Experimental::Internal::RNTupleDescriptorBuilder::CreateFromSchema(const RNTupleDescriptor &descriptor)
{
fDescriptor = descriptor.CloneSchema();
}

void ROOT::Experimental::Internal::RNTupleDescriptorBuilder::BeginHeaderExtension()
{
if (!fDescriptor.fHeaderExtension)
Expand Down
66 changes: 47 additions & 19 deletions tree/ntuple/v7/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -918,39 +918,67 @@ void ROOT::Experimental::Internal::RPagePersistentSink::InitImpl(RNTupleModel &m
fDescriptorBuilder.BeginHeaderExtension();
}

void ROOT::Experimental::Internal::RPagePersistentSink::InitFromDescriptor(const RNTupleDescriptor &descriptor)
std::unique_ptr<ROOT::Experimental::RNTupleModel>
ROOT::Experimental::Internal::RPagePersistentSink::InitFromDescriptor(const RNTupleDescriptor &srcDescriptor)
{
{
auto model = descriptor.CreateModel();
Init(*model.get());
}
// Create new descriptor
fDescriptorBuilder.CreateFromSchema(srcDescriptor);
const auto &descriptor = fDescriptorBuilder.GetDescriptor();

auto clusterId = descriptor.FindClusterId(0, 0);
// Create column/page ranges
const auto nColumns = descriptor.GetNPhysicalColumns();
fOpenColumnRanges.reserve(fOpenColumnRanges.size() + nColumns);
fOpenPageRanges.reserve(fOpenPageRanges.size() + nColumns);
for (DescriptorId_t i = 0; i < nColumns; ++i) {
const auto &column = descriptor.GetColumnDescriptor(i);
RClusterDescriptor::RColumnRange columnRange;
columnRange.fPhysicalColumnId = i;
columnRange.fFirstElementIndex = column.GetFirstElementIndex();
columnRange.fNElements = 0;
columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
fOpenColumnRanges.emplace_back(columnRange);
RClusterDescriptor::RPageRange pageRange;
pageRange.fPhysicalColumnId = i;
fOpenPageRanges.emplace_back(std::move(pageRange));
}

// Clone and add all cluster descriptors
auto clusterId = srcDescriptor.FindClusterId(0, 0);
while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
auto &cluster = descriptor.GetClusterDescriptor(clusterId);
auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
auto nEntries = cluster.GetNEntries();

RClusterDescriptorBuilder clusterBuilder;
clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
.FirstEntryIndex(fPrevClusterNEntries)
.NEntries(nEntries);

for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
if (!cluster.ContainsColumn(i))
break;
const auto &columnRange = cluster.GetColumnRange(i);
R__ASSERT(columnRange.fPhysicalColumnId == i);
const auto &pageRange = cluster.GetPageRange(i);
R__ASSERT(pageRange.fPhysicalColumnId == i);
clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex,
columnRange.fCompressionSettings.value(), pageRange);
// TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
}
fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
fDescriptorBuilder.AddCluster(cluster.Clone());
fPrevClusterNEntries += nEntries;

clusterId = descriptor.FindNextClusterId(clusterId);
clusterId = srcDescriptor.FindNextClusterId(clusterId);
}

// Create model
auto modelOpts = RNTupleDescriptor::RCreateModelOptions();
modelOpts.fReconstructProjections = true;
auto model = descriptor.CreateModel(modelOpts);

// Serialize header and init from it
fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor);
auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor);
InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());

fDescriptorBuilder.BeginHeaderExtension();

// mark this sink as initialized
fIsInitialized = true;

return model;
}

void ROOT::Experimental::Internal::RPagePersistentSink::CommitSuppressedColumn(ColumnHandle_t columnHandle)
Expand Down