Skip to content

Commit 459428c

Browse files
committed
DPL: introduce FragmentToBatch
1 parent c4f9811 commit 459428c

File tree

4 files changed

+120
-1
lines changed

4 files changed

+120
-1
lines changed

Framework/Core/include/Framework/DataAllocator.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,15 @@ class DataAllocator
242242
return t2t;
243243
}
244244

245+
template <typename T, typename... Args>
246+
requires(requires { static_cast<struct FragmentToBatch>(std::declval<std::decay_t<T>>()); })
247+
decltype(auto) make(const Output& spec, Args... args)
248+
{
249+
auto f2b = std::move(LifetimeHolder<FragmentToBatch>(new std::decay_t<T>(args...)));
250+
adopt(spec, f2b);
251+
return f2b;
252+
}
253+
245254
template <typename T>
246255
requires is_messageable<T>::value && (!is_specialization_v<T, UninitializedVector>)
247256
decltype(auto) make(const Output& spec)
@@ -284,6 +293,11 @@ class DataAllocator
284293
void
285294
adopt(const Output& spec, LifetimeHolder<struct TreeToTable>&);
286295

296+
/// Adopt a Source2Batch in the framework and serialise / send
297+
/// it as an Arrow Dataset to all consumers of @a spec once done
298+
void
299+
adopt(const Output& spec, LifetimeHolder<struct FragmentToBatch>&);
300+
287301
/// Adopt an Arrow table and send it to all consumers of @a spec
288302
void
289303
adopt(const Output& spec, std::shared_ptr<class arrow::Table>);

Framework/Core/include/Framework/TableTreeHelpers.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
#ifndef O2_FRAMEWORK_TABLETREEHELPERS_H_
1212
#define O2_FRAMEWORK_TABLETREEHELPERS_H_
1313

14+
#include <arrow/record_batch.h>
1415
#include "TFile.h"
1516
#include "TTreeReader.h"
1617
#include "TTreeReaderValue.h"
1718
#include "TTreeReaderArray.h"
1819
#include "TableBuilder.h"
20+
#include <arrow/dataset/file_base.h>
21+
#include <memory>
1922

2023
// =============================================================================
2124
namespace o2::framework
@@ -140,6 +143,20 @@ class TreeToTable
140143
void addReader(TBranch* branch, std::string const& name, bool VLA);
141144
};
142145

146+
class FragmentToBatch
147+
{
148+
public:
149+
FragmentToBatch(arrow::MemoryPool* pool = arrow::default_memory_pool());
150+
void setLabel(const char* label);
151+
void fill(std::shared_ptr<arrow::dataset::FileFragment>, std::shared_ptr<arrow::Schema> dataSetSchema, std::shared_ptr<arrow::dataset::FileFormat>);
152+
std::shared_ptr<arrow::RecordBatch> finalize();
153+
154+
private:
155+
arrow::MemoryPool* mArrowMemoryPool = nullptr;
156+
std::string mTableLabel;
157+
std::shared_ptr<arrow::RecordBatch> mRecordBatch;
158+
};
159+
143160
// -----------------------------------------------------------------------------
144161
} // namespace o2::framework
145162

Framework/Core/src/DataAllocator.cxx

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,34 @@ void doWriteTable(std::shared_ptr<FairMQResizableBuffer> b, arrow::Table* table)
211211
}
212212
}
213213

214+
void doWriteBatch(std::shared_ptr<FairMQResizableBuffer> b, arrow::RecordBatch* batch)
215+
{
216+
auto mock = std::make_shared<arrow::io::MockOutputStream>();
217+
int64_t expectedSize = 0;
218+
auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
219+
arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
220+
221+
expectedSize = mock->Tell().ValueOrDie();
222+
auto reserve = b->Reserve(expectedSize);
223+
if (reserve.ok() == false) {
224+
throw std::runtime_error("Unable to reserve memory for table");
225+
}
226+
227+
auto stream = std::make_shared<FairMQOutputStream>(b);
228+
// This is a copy maybe we can finally get rid of it by having using the
229+
// dataset API?
230+
auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), batch->schema());
231+
if (outBatch.ok() == false) {
232+
throw ::std::runtime_error("Unable to create batch writer");
233+
}
234+
235+
outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
236+
237+
if (outStatus.ok() == false) {
238+
throw std::runtime_error("Unable to Write batch");
239+
}
240+
}
241+
214242
void DataAllocator::adopt(const Output& spec, LifetimeHolder<TableBuilder>& tb)
215243
{
216244
auto& timingInfo = mRegistry.get<TimingInfo>();
@@ -273,6 +301,38 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<TreeToTable>& t2t)
273301
context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
274302
}
275303

304+
void DataAllocator::adopt(const Output& spec, LifetimeHolder<FragmentToBatch>& f2b)
305+
{
306+
auto& timingInfo = mRegistry.get<TimingInfo>();
307+
RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);
308+
309+
auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
310+
auto& context = mRegistry.get<ArrowContext>();
311+
312+
auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
313+
return transport->CreateMessage(s);
314+
};
315+
auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
316+
317+
f2b.callback = [buffer = buffer, transport = context.proxy().getOutputTransport(routeIndex)](FragmentToBatch& source) {
318+
// Serialization happens in here, so that we can
319+
// get rid of the intermediate tree 2 table object, saving memory.
320+
auto batch = source.finalize();
321+
doWriteBatch(buffer, batch.get());
322+
// deletion happens in the caller
323+
};
324+
325+
/// To finalise this we write the table to the buffer.
326+
/// FIXME: most likely not a great idea. We should probably write to the buffer
327+
/// directly in the TableBuilder, incrementally.
328+
auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
329+
// This is empty because we already serialised the object when
330+
// the LifetimeHolder goes out of scope.
331+
};
332+
333+
context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
334+
}
335+
276336
void DataAllocator::adopt(const Output& spec, std::shared_ptr<arrow::Table> ptr)
277337
{
278338
auto& timingInfo = mRegistry.get<TimingInfo>();

Framework/Core/src/TableTreeHelpers.cxx

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@
1313
#include "Framework/Endian.h"
1414

1515
#include "arrow/type_traits.h"
16+
#include <arrow/dataset/file_base.h>
17+
#include <arrow/record_batch.h>
18+
#include <arrow/type.h>
1619
#include <arrow/util/key_value_metadata.h>
1720
#include <TBufferFile.h>
1821

22+
#include <memory>
1923
#include <utility>
2024
namespace TableTreeHelpers
2125
{
@@ -407,7 +411,7 @@ std::shared_ptr<TTree> TableToTree::process()
407411

408412
for (auto& reader : mColumnReaders) {
409413
int idealBasketSize = 1024 + reader->fieldSize() * reader->columnEntries(); // minimal additional size needed, otherwise we get 2 baskets
410-
int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
414+
int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
411415
// std::cout << "Setting baskets size for " << reader->branchName() << " to " << basketSize << " = 1024 + "
412416
// << reader->fieldSize() << " * " << reader->columnEntries() << ". mRows was " << mRows << std::endl;
413417
mTree->SetBasketSize(reader->branchName(), basketSize);
@@ -555,4 +559,28 @@ std::shared_ptr<arrow::Table> TreeToTable::finalize()
555559
return mTable;
556560
}
557561

562+
FragmentToBatch::FragmentToBatch(arrow::MemoryPool* pool)
563+
: mArrowMemoryPool{pool}
564+
{
565+
}
566+
567+
void FragmentToBatch::setLabel(const char* label)
568+
{
569+
mTableLabel = label;
570+
}
571+
572+
void FragmentToBatch::fill(std::shared_ptr<arrow::dataset::FileFragment> fragment, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat> format)
573+
{
574+
auto options = std::make_shared<arrow::dataset::ScanOptions>();
575+
options->dataset_schema = schema;
576+
auto scanner = format->ScanBatchesAsync(options, fragment);
577+
auto batch = (*scanner)();
578+
mRecordBatch = *batch.result();
579+
}
580+
581+
std::shared_ptr<arrow::RecordBatch> FragmentToBatch::finalize()
582+
{
583+
return mRecordBatch;
584+
}
585+
558586
} // namespace o2::framework

0 commit comments

Comments
 (0)