Skip to content

Commit

Permalink
DPL Analysis: add RNTuple arrow::Dataset support
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Dec 2, 2024
1 parent 950b8b7 commit edd0bad
Show file tree
Hide file tree
Showing 4 changed files with 813 additions and 1 deletion.
3 changes: 3 additions & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ o2_add_library(Framework
FairMQ::FairMQ
ROOT::Tree
ROOT::Hist
ROOT::ROOTNTuple
ROOT::ROOTNTupleUtil
O2::FrameworkFoundation
O2::CommonConstants
O2::Headers
Expand Down Expand Up @@ -298,6 +300,7 @@ add_executable(o2-test-framework-root
target_link_libraries(o2-test-framework-root PRIVATE O2::Framework)
target_link_libraries(o2-test-framework-root PRIVATE O2::Catch2)
target_link_libraries(o2-test-framework-root PRIVATE ROOT::ROOTDataFrame)
target_link_libraries(o2-test-framework-root PRIVATE ROOT::ROOTNTuple)
set_property(TARGET o2-test-framework-root PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir})
add_test(NAME framework:root COMMAND o2-test-framework-root --skip-benchmarks)
add_test(NAME framework:crash COMMAND sh -e -c "PATH=${CMAKE_RUNTIME_OUTPUT_DIRECTORY}:$PATH ${CMAKE_CURRENT_LIST_DIR}/test/test_AllCrashTypes.sh")
Expand Down
115 changes: 115 additions & 0 deletions Framework/Core/include/Framework/RootArrowFilesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class TTree;
class TBufferFile;
class TDirectoryFile;

namespace ROOT::Experimental
{
class RNTuple;
} // namespace ROOT::Experimental

namespace o2::framework
{

Expand All @@ -35,6 +40,15 @@ class TTreeFileWriteOptions : public arrow::dataset::FileWriteOptions
}
};

class RNTupleFileWriteOptions : public arrow::dataset::FileWriteOptions
{
public:
RNTupleFileWriteOptions(std::shared_ptr<arrow::dataset::FileFormat> format)
: FileWriteOptions(format)
{
}
};

// This is to avoid having to implement a bunch of unimplemented methods
// for all the possible virtual filesystem we can invent on top of ROOT
// data structures.
Expand Down Expand Up @@ -97,6 +111,19 @@ class TTreeFileSystem : public VirtualRootFileSystemBase
virtual TTree* GetTree(arrow::dataset::FileSource source) = 0;
};

// A filesystem which allows me to get a RNTuple
class RNTupleFileSystem : public VirtualRootFileSystemBase
{
public:
~RNTupleFileSystem() override;

std::shared_ptr<VirtualRootFileSystemBase> GetSubFilesystem(arrow::dataset::FileSource source) override
{
return std::dynamic_pointer_cast<VirtualRootFileSystemBase>(shared_from_this());
};
virtual ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource source) = 0;
};

class SingleTreeFileSystem : public TTreeFileSystem
{
public:
Expand All @@ -121,6 +148,30 @@ class SingleTreeFileSystem : public TTreeFileSystem
TTree* mTree;
};

class SingleRNTupleFileSystem : public RNTupleFileSystem
{
public:
SingleRNTupleFileSystem(ROOT::Experimental::RNTuple* tuple)
: RNTupleFileSystem(),
mTuple(tuple)
{
}

std::string type_name() const override
{
return "rntuple";
}

ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource) override
{
// Simply return the only TTree we have
return mTuple;
}

private:
ROOT::Experimental::RNTuple* mTuple;
};

class TFileFileSystem : public VirtualRootFileSystemBase
{
public:
Expand Down Expand Up @@ -179,6 +230,70 @@ class TTreeFileFragment : public arrow::dataset::FileFragment
}
};

class RNTupleFileFragment : public arrow::dataset::FileFragment
{
public:
RNTupleFileFragment(arrow::dataset::FileSource source,
std::shared_ptr<arrow::dataset::FileFormat> format,
arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> physical_schema)
: FileFragment(std::move(source), std::move(format), std::move(partition_expression), std::move(physical_schema))
{
}
};

class RNTupleFileFormat : public arrow::dataset::FileFormat
{
size_t& mTotCompressedSize;
size_t& mTotUncompressedSize;

public:
RNTupleFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize)
: FileFormat({}),
mTotCompressedSize(totalCompressedSize),
mTotUncompressedSize(totalUncompressedSize)
{
}

~RNTupleFileFormat() override = default;

std::string type_name() const override
{
return "rntuple";
}

bool Equals(const FileFormat& other) const override
{
return other.type_name() == this->type_name();
}

arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
{
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
auto subFs = fs->GetSubFilesystem(source);
if (std::dynamic_pointer_cast<RNTupleFileSystem>(subFs)) {
return true;
}
return false;
}

arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;

arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<arrow::dataset::ScanOptions>& options,
const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;

std::shared_ptr<arrow::dataset::FileWriteOptions> DefaultWriteOptions() override;

arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination,
std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<arrow::dataset::FileWriteOptions> options,
arrow::fs::FileLocator destination_locator) const override;
arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> MakeFragment(
arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
std::shared_ptr<arrow::Schema> physical_schema) override;
};

class TTreeFileFormat : public arrow::dataset::FileFormat
{
size_t& mTotCompressedSize;
Expand Down
Loading

0 comments on commit edd0bad

Please sign in to comment.