Skip to content

Commit

Permalink
Add Raw Data Size Stat to Nimble Files (facebookincubator#74)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#74

# Context
Knowing the size of the raw data being written is useful for identifying the cause of changes to file sizes after compressing and encoding when writing to files.

# Changes
- Adding the optional Stats section to Tablet
- Calculating the raw data size in bytes via running sum of memory used right before encoding and flushing, and writing this to tablet.
- Unit testing and fuzz testing

# Important Notes
- In `VeloxWriter.cpp` we have special handling for chunked null streams, which is why `materialize` is called before one of the rawSize stat calculations and not the other.
- `rawStripeSize` name updated for clarity and value is now as the name implies, which I can guarantee is correct because it calculated in the same way as raw file size.
  - Some investigation determined that rawStripeSize in stripe flush metrics is also currently not used anywhere.

Differential Revision: D60534808
  • Loading branch information
phoenixawe authored and facebook-github-bot committed Aug 20, 2024
1 parent 270f788 commit 1c71502
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 13 deletions.
4 changes: 2 additions & 2 deletions dwio/nimble/common/MetricsLogger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ folly::dynamic StripeLoadMetrics::serialize() const {

folly::dynamic StripeFlushMetrics::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["inputSize"] = inputSize;
obj["rawStripeSize"] = rawStripeSize;
obj["rowCount"] = rowCount;
obj["stripeSize"] = stripeSize;
obj["trackedMemory"] = trackedMemory;
Expand All @@ -38,7 +38,7 @@ folly::dynamic StripeFlushMetrics::serialize() const {
folly::dynamic FileCloseMetrics::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["rowCount"] = rowCount;
obj["inputSize"] = inputSize;
obj["rawDataSize"] = rawDataSize;
obj["stripeCount"] = stripeCount;
obj["fileSize"] = fileSize;
obj["totalFlushCpuUsec"] = totalFlushCpuUsec;
Expand Down
4 changes: 2 additions & 2 deletions dwio/nimble/common/MetricsLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct StripeLoadMetrics {
// We can then adapt the run stats to file writer run stats.
struct StripeFlushMetrics {
// Stripe shape summary.
uint64_t inputSize;
uint64_t rawStripeSize;
uint64_t rowCount;
uint64_t stripeSize;

Expand All @@ -61,7 +61,7 @@ struct StripeFlushMetrics {

struct FileCloseMetrics {
uint64_t rowCount;
uint64_t inputSize;
uint64_t rawDataSize;
uint64_t stripeCount;
uint64_t fileSize;

Expand Down
2 changes: 1 addition & 1 deletion dwio/nimble/encodings/RleEncoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class RLEEncodingBase

} // namespace internal

// Handles the numeric cases. Bools and strings are templated below.
// Handles the numeric and string cases. Bools are templated below.
// Data layout is:
// RLEEncodingBase bytes
// 4 * sizeof(physicalType) bytes: run values
Expand Down
1 change: 1 addition & 0 deletions dwio/nimble/tablet/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ constexpr uint32_t kPostscriptChecksumedSize = 5;

constexpr std::string_view kSchemaSection = "columnar.schema";
constexpr std::string_view kMetadataSection = "columnar.metadata";
constexpr std::string_view kStatsSection = "columnar.stats";

} // namespace facebook::nimble
20 changes: 20 additions & 0 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class StreamData {
virtual bool hasNulls() const = 0;
virtual bool empty() const = 0;
virtual uint64_t memoryUsed() const = 0;
virtual uint64_t rawSize() const = 0;

virtual void reset() = 0;
virtual void materialize() {}
Expand Down Expand Up @@ -88,6 +89,14 @@ class ContentStreamData final : public StreamData {
return (data_.size() * sizeof(T)) + extraMemory_;
}

inline virtual uint64_t rawSize() const override {
auto size = extraMemory_;
if constexpr (!std::is_same_v<T, std::string_view>) {
size += (data_.size() * sizeof(T));
}
return size;
}

inline Vector<T>& mutableData() {
return data_;
}
Expand Down Expand Up @@ -141,6 +150,9 @@ class NullsStreamData : public StreamData {
inline virtual uint64_t memoryUsed() const override {
return nonNulls_.size();
}
inline virtual uint64_t rawSize() const override {
return memoryUsed();
}

inline Vector<bool>& mutableNonNulls() {
return nonNulls_;
Expand Down Expand Up @@ -195,6 +207,14 @@ class NullableContentStreamData final : public NullsStreamData {
NullsStreamData::memoryUsed();
}

inline virtual uint64_t rawSize() const override {
auto size = extraMemory_;
if constexpr (!std::is_same_v<T, std::string_view>) {
size += (data_.size() * sizeof(T));
}
return NullsStreamData::memoryUsed() - size;
}

inline Vector<T>& mutableData() {
return data_;
}
Expand Down
46 changes: 38 additions & 8 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/SchemaSerialization.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "dwio/nimble/velox/StatsGenerated.h"
#include "folly/ScopeGuard.h"
#include "velox/common/time/CpuWallTimer.h"
#include "velox/dwio/common/ExecutorBarrier.h"
Expand All @@ -54,6 +55,8 @@ class WriterContext : public FieldWriterContext {
// writer option.
std::shared_ptr<MetricsLogger> logger;

uint64_t rawDataSize{0};
uint64_t rawStripeSize{0};
uint64_t memoryUsed{0};
uint64_t bytesWritten{0};
uint64_t rowsInFile{0};
Expand Down Expand Up @@ -83,6 +86,7 @@ class WriterContext : public FieldWriterContext {
memoryUsed = 0;
rowsInStripe = 0;
stripeSize = 0;
rawStripeSize = 0;
++stripeIndex_;
}

Expand Down Expand Up @@ -564,14 +568,24 @@ void VeloxWriter::close() {
serializer.serialize(context_->schemaBuilder));
}

{
flatbuffers::FlatBufferBuilder builder;
builder.Finish(
serialization::CreateStats(builder, context_->rawDataSize));
writer_.writeOptionalSection(
std::string(kStatsSection),
{reinterpret_cast<const char*>(builder.GetBufferPointer()),
builder.GetSize()});
}

writer_.close();
file_->close();
context_->bytesWritten = file_->size();

auto runStats = getRunStats();
// TODO: compute and populate input size.
FileCloseMetrics metrics{
.rowCount = context_->rowsInFile,
.rawDataSize = context_->rawDataSize,
.stripeCount = context_->getStripeIndex(),
.fileSize = context_->bytesWritten,
.totalFlushCpuUsec = runStats.flushCpuTimeUsec,
Expand Down Expand Up @@ -621,14 +635,12 @@ void VeloxWriter::writeChunk(bool lastChunk) {
streams_.resize(context_->schemaBuilder.nodeCount());

// When writing null streams, we write the nulls as data, and the stream
// itself is non-nullable. This adpater class is how we expose the nulls as
// values.
// itself is non-nullable. This adapter class is how we expose the nulls
// as values.
class NullsAsDataStreamData : public StreamData {
public:
explicit NullsAsDataStreamData(StreamData& streamData)
: StreamData(streamData.descriptor()), streamData_{streamData} {
streamData_.materialize();
}
: StreamData(streamData.descriptor()), streamData_{streamData} {}

inline virtual std::string_view data() const override {
return {
Expand All @@ -650,7 +662,9 @@ void VeloxWriter::writeChunk(bool lastChunk) {
inline virtual uint64_t memoryUsed() const override {
return streamData_.memoryUsed();
}

inline virtual uint64_t rawSize() const override {
return streamData_.rawSize();
}
inline virtual void reset() override {
streamData_.reset();
}
Expand Down Expand Up @@ -692,12 +706,28 @@ void VeloxWriter::writeChunk(bool lastChunk) {
streamData.nonNulls().size() > minStreamSize) ||
(lastChunk && !streamData.empty() &&
!streams_[offset].content.empty())) {
// If we ended up in this code block, it means that we either saw
// enough nulls to flush a chunk, or that we are on the last chunk of
// a stream containing nulls.
// If previous chunks in the same stream contained nulls, then
// regardless of the presence of nulls in the last chunk we must flush
// it. If the last chunk doesn't contain nulls, the DataStream (which
// is a NullsStreamData) will contain an empty nulls buffer, so we
// must materialize it before we write.
streamData.materialize();

const int streamSize = streamData.memoryUsed();
context_->rawStripeSize += streamSize;
context_->rawDataSize += streamSize;
encoder(streamData, true);
}
} else {
if (streamData.data().size() > minStreamSize ||
(lastChunk && streamData.nonNulls().size() > 0 &&
!streams_[offset].content.empty())) {
const int streamSize = streamData.memoryUsed();
context_->rawStripeSize += streamSize;
context_->rawDataSize += streamSize;
encoder(streamData, false);
}
}
Expand Down Expand Up @@ -827,7 +857,7 @@ bool VeloxWriter::tryWriteStripe(bool force) {
}

StripeFlushMetrics metrics{
.inputSize = context_->stripeSize,
.rawStripeSize = context_->rawStripeSize,
.rowCount = context_->rowsInStripe,
.trackedMemory = context_->memoryUsed,
};
Expand Down
15 changes: 15 additions & 0 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
#include "dwio/nimble/common/Vector.h"
#include "dwio/nimble/common/tests/NimbleFileWriter.h"
#include "dwio/nimble/common/tests/TestUtils.h"
#include "dwio/nimble/tablet/Constants.h"
#include "dwio/nimble/velox/SchemaUtils.h"
#include "dwio/nimble/velox/StatsGenerated.h"
#include "dwio/nimble/velox/VeloxReader.h"
#include "dwio/nimble/velox/VeloxWriter.h"
#include "dwio/nimble/velox/tests/TestUtils.h"
#include "folly/FileUtil.h"
#include "folly/Random.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
Expand Down Expand Up @@ -593,6 +596,18 @@ class VeloxReaderTests : public ::testing::Test {
}

velox::InMemoryReadFile readFile(file);

// Test rawDataSize Stat
nimble::TabletReader tablet{pool, &readFile};
auto section =
tablet.loadOptionalSection(std::string(nimble::kStatsSection));
EXPECT_TRUE(section.has_value());
auto stats = flatbuffers::GetRoot<nimble::serialization::Stats>(
section->content().data());
uint64_t resultStat = stats->raw_size();
auto expectedStat = nimble::test::getFileRawDataSize(tablet, pool);
ASSERT_EQ(resultStat, expectedStat);

auto selector = std::make_shared<velox::dwio::common::ColumnSelector>(type);
// new pool with to limit already used memory and with tracking enabled
auto leakDetectPool =
Expand Down
Loading

0 comments on commit 1c71502

Please sign in to comment.