Skip to content

Commit

Permalink
DPL: Move DataInputDirector to arrow::Dataset API
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Dec 3, 2024
1 parent 950b8b7 commit 19424d5
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 58 deletions.
16 changes: 12 additions & 4 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include <memory>
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/RootTableBuilderHelpers.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -41,6 +43,8 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>

using namespace o2;
using namespace o2::aod;
Expand Down Expand Up @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
// Origin file name for derived output map
auto o2 = Output(TFFileNameHeader);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
std::string currentFilename(fileAndFolder.file->GetName());
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string currentFilename(f->GetFile()->GetName());
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
// This is not an absolute local path. Make it absolute.
static std::string pwd = gSystem->pwd() + std::string("/");
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
currentFilename = pwd + std::string(f->GetName());
}
outputs.make<std::string>(o2) = currentFilename;
}
Expand Down Expand Up @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
if (!fileAndFolder.file) {

// In case the filesource is empty, move to the next one.
if (fileAndFolder.path().empty()) {
fcnt += 1;
ntf = 0;
if (didir->atEnd(fcnt)) {
Expand Down
103 changes: 57 additions & 46 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "DataInputDirector.h"
#include "Framework/DataDescriptorQueryBuilder.h"
#include "Framework/Logger.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AnalysisDataModelHelpers.h"
#include "Framework/Output.h"
#include "Headers/DataHeader.h"
Expand All @@ -26,8 +27,12 @@
#include "TGrid.h"
#include "TObjString.h"
#include "TMap.h"
#include "TFile.h"

#include <arrow/dataset/file_base.h>
#include <arrow/dataset/dataset.h>
#include <uv.h>
#include <memory>

#if __has_include(<TJAlienFile.h>)
#include <TJAlienFile.h>
Expand Down Expand Up @@ -108,20 +113,22 @@ bool DataInputDescriptor::setFile(int counter)

// open file
auto filename = mfilenames[counter]->fileName;
if (mcurrentFile) {
if (mcurrentFile->GetName() == filename) {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
if (rootFS.get()) {
if (rootFS->GetFile()->GetName() == filename) {
return true;
}
closeInputFile();
}
mcurrentFile = TFile::Open(filename.c_str());
if (!mcurrentFile) {

mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024);
if (!mCurrentFilesystem.get()) {
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
}
mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);
rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);

// get the parent file map if exists
mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
if (mParentFileMap && !mParentFileReplacement.empty()) {
auto pos = mParentFileReplacement.find(';');
if (pos == std::string::npos) {
Expand All @@ -141,7 +148,7 @@ bool DataInputDescriptor::setFile(int counter)
// get the directory names
if (mfilenames[counter]->numberOfTimeFrames <= 0) {
std::regex TFRegex = std::regex("DF_[0-9]+");
TList* keyList = mcurrentFile->GetListOfKeys();
TList* keyList = rootFS->GetFile()->GetListOfKeys();

// extract TF numbers and sort accordingly
for (auto key : *keyList) {
Expand Down Expand Up @@ -193,26 +200,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF)
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
}

FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF)
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF)
{
FileAndFolder fileAndFolder;

// open file
if (!setFile(counter)) {
return fileAndFolder;
return {};
}

// no TF left
if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
return fileAndFolder;
return {};
}

fileAndFolder.file = mcurrentFile;
fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];

mfilenames[counter]->alreadyRead[numTF] = true;

return fileAndFolder;
return {(mfilenames[counter]->listOfTimeFrameKeys)[numTF], mCurrentFilesystem};
}

DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename)
Expand All @@ -223,15 +225,17 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
}
auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
// The current DF is not found in the parent map (this should not happen and is a fatal error)
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
if (!parentFileName) {
// The current DF is not found in the parent map (this should not happen and is a fatal error)
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName()));
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
return nullptr;
}

if (mParentFile) {
// Is this still the corresponding to the correct file?
if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) {
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
return mParentFile;
} else {
mParentFile->closeInputFile();
Expand All @@ -241,7 +245,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
}

if (mLevel == mAllowedParentLevel) {
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName()));
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
rootFS->GetFile()->GetName()));
}

LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
Expand Down Expand Up @@ -270,11 +275,13 @@ void DataInputDescriptor::printFileStatistics()
if (wait_time < 0) {
wait_time = 0;
}
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(),
mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(),
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
auto f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
#if __has_include(<TJAlienFile.h>)
auto alienFile = dynamic_cast<TJAlienFile*>(mcurrentFile);
auto alienFile = dynamic_cast<TJAlienFile*>(f);
if (alienFile) {
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
}
Expand All @@ -285,7 +292,7 @@ void DataInputDescriptor::printFileStatistics()

void DataInputDescriptor::closeInputFile()
{
if (mcurrentFile) {
if (mCurrentFilesystem.get()) {
if (mParentFile) {
mParentFile->closeInputFile();
delete mParentFile;
Expand All @@ -296,9 +303,7 @@ void DataInputDescriptor::closeInputFile()
mParentFileMap = nullptr;

printFileStatistics();
mcurrentFile->Close();
delete mcurrentFile;
mcurrentFile = nullptr;
mCurrentFilesystem.reset();
}
}

Expand Down Expand Up @@ -358,40 +363,46 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
{
auto ioStart = uv_hrtime();

auto fileAndFolder = getFileFolder(counter, numTF);
if (!fileAndFolder.file) {
auto folder = getFileFolder(counter, numTF);
if (!folder.filesystem()) {
return false;
}

auto fullpath = fileAndFolder.folderName + "/" + treename;
auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str());
auto format = std::make_shared<TTreeFileFormat>(totalSizeCompressed, totalSizeUncompressed);
auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
auto schemaOpt = format->Inspect(fullpath);
auto schema = *schemaOpt;

if (!tree) {
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str());
auto fragment = format->MakeFragment(fullpath, {}, schema);

if (!fragment.ok()) {
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
auto parentFile = getParentFile(counter, numTF, treename);
if (parentFile != nullptr) {
int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName);
int parentNumTF = parentFile->findDFNumber(0, folder.path());
if (parentNumTF == -1) {
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName()));
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
}
// first argument is 0 as the parent file object contains only 1 file
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
}
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName()));
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
}

// create table output
auto o = Output(dh);
auto t2t = outputs.make<TreeToTable>(o);

// add branches to read
// fill the table
t2t->setLabel(tree->GetName());
totalSizeCompressed += tree->GetZipBytes();
totalSizeUncompressed += tree->GetTotBytes();
t2t->addAllColumns(tree);
t2t->fill(tree);
delete tree;
// FIXME: This should allow me to create a memory pool
// which I can then use to scan the dataset.
//
auto f2b = outputs.make<FragmentToBatch>(o);

//// add branches to read
//// fill the table
f2b->setLabel(treename.c_str());
f2b->fill(*fragment, schema, format);

mIOTime += (uv_hrtime() - ioStart);

Expand Down Expand Up @@ -693,7 +704,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade
return result;
}

FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
Expand Down
14 changes: 6 additions & 8 deletions Framework/AnalysisSupport/src/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include "Framework/DataDescriptorMatcher.h"
#include "Framework/DataAllocator.h"

#include <arrow/filesystem/filesystem.h>
#include <arrow/dataset/dataset.h>

#include <regex>
#include "rapidjson/fwd.h"

Expand All @@ -36,11 +39,6 @@ struct FileNameHolder {
};
FileNameHolder* makeFileNameHolder(std::string fileName);

struct FileAndFolder {
TFile* file = nullptr;
std::string folderName = "";
};

class DataInputDescriptor
{
/// Holds information concerning the reading of an aod table.
Expand Down Expand Up @@ -78,7 +76,7 @@ class DataInputDescriptor
int findDFNumber(int file, std::string dfName);

uint64_t getTimeFrameNumber(int counter, int numTF);
FileAndFolder getFileFolder(int counter, int numTF);
arrow::dataset::FileSource getFileFolder(int counter, int numTF);
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename);
int getTimeFramesInFile(int counter);
int getReadTimeFramesInFile(int counter);
Expand All @@ -98,7 +96,7 @@ class DataInputDescriptor
std::string mParentFileReplacement;
std::vector<FileNameHolder*> mfilenames;
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
TFile* mcurrentFile = nullptr;
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
int mCurrentFileID = -1;
bool mAlienSupport = false;

Expand Down Expand Up @@ -143,7 +141,7 @@ class DataInputDirector

bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
int getTimeFramesInFile(header::DataHeader dh, int counter);

uint64_t getTotalSizeCompressed();
Expand Down

0 comments on commit 19424d5

Please sign in to comment.