From a234e4502e2641c1b4f8f0f79608d3612b135968 Mon Sep 17 00:00:00 2001 From: Stephanie Han Date: Wed, 21 Aug 2024 19:04:34 -0700 Subject: [PATCH] Add Raw Data Size Stat to Nimble Files (#74) Summary: Pull Request resolved: https://github.com/facebookincubator/nimble/pull/74 # Context Knowing the size of the raw data being written is useful for identifying the cause of changes to file sizes after compressing and encoding when writing to files. # Changes - Adding the optional Stats section to Tablet - `VeloxWriter.cpp`: Calculating the raw data size in bytes via running sum of memory used right before encoding and flushing, and writing this to tablet. - `FieldWriter.h`: Added `rawSize()` virtual function to handle string and nullable cases. - Unit testing and fuzz testing # Important Notes - In `VeloxWriter.cpp` we have special handling for chunked null streams, which is why `materialize` is called before one of the rawSize stat calculations and not the other. - `rawStripeSize` name updated for clarity and value is now as the name implies, which I can guarantee is correct because it calculated in the same way as raw file size. - Some investigation determined that rawStripeSize in stripe flush metrics is also currently not used anywhere. Differential Revision: D60534808 --- dwio/nimble/common/MetricsLogger.cpp | 4 +- dwio/nimble/common/MetricsLogger.h | 4 +- dwio/nimble/encodings/RleEncoding.h | 2 +- dwio/nimble/tablet/Constants.h | 1 + dwio/nimble/velox/CMakeLists.txt | 13 ++ dwio/nimble/velox/FieldWriter.h | 27 +++ dwio/nimble/velox/VeloxWriter.cpp | 49 +++-- dwio/nimble/velox/tests/VeloxReaderTests.cpp | 18 +- dwio/nimble/velox/tests/VeloxWriterTests.cpp | 194 ++++++++++++++++++- 9 files changed, 291 insertions(+), 21 deletions(-) diff --git a/dwio/nimble/common/MetricsLogger.cpp b/dwio/nimble/common/MetricsLogger.cpp index fc5c19a..7d58a9c 100644 --- a/dwio/nimble/common/MetricsLogger.cpp +++ b/dwio/nimble/common/MetricsLogger.cpp @@ -28,7 +28,7 @@ folly::dynamic StripeLoadMetrics::serialize() const { folly::dynamic StripeFlushMetrics::serialize() const { folly::dynamic obj = folly::dynamic::object; - obj["inputSize"] = inputSize; + obj["rawStripeSize"] = rawStripeSize; obj["rowCount"] = rowCount; obj["stripeSize"] = stripeSize; obj["trackedMemory"] = trackedMemory; @@ -38,7 +38,7 @@ folly::dynamic StripeFlushMetrics::serialize() const { folly::dynamic FileCloseMetrics::serialize() const { folly::dynamic obj = folly::dynamic::object; obj["rowCount"] = rowCount; - obj["inputSize"] = inputSize; + obj["rawDataSize"] = rawDataSize; obj["stripeCount"] = stripeCount; obj["fileSize"] = fileSize; obj["totalFlushCpuUsec"] = totalFlushCpuUsec; diff --git a/dwio/nimble/common/MetricsLogger.h b/dwio/nimble/common/MetricsLogger.h index 1f20d79..a0ca687 100644 --- a/dwio/nimble/common/MetricsLogger.h +++ b/dwio/nimble/common/MetricsLogger.h @@ -38,7 +38,7 @@ struct StripeLoadMetrics { // We can then adapt the run stats to file writer run stats. struct StripeFlushMetrics { // Stripe shape summary. - uint64_t inputSize; + uint64_t rawStripeSize; uint64_t rowCount; uint64_t stripeSize; @@ -61,7 +61,7 @@ struct StripeFlushMetrics { struct FileCloseMetrics { uint64_t rowCount; - uint64_t inputSize; + uint64_t rawDataSize; uint64_t stripeCount; uint64_t fileSize; diff --git a/dwio/nimble/encodings/RleEncoding.h b/dwio/nimble/encodings/RleEncoding.h index 614c297..3217d01 100644 --- a/dwio/nimble/encodings/RleEncoding.h +++ b/dwio/nimble/encodings/RleEncoding.h @@ -174,7 +174,7 @@ class RLEEncodingBase } // namespace internal -// Handles the numeric cases. Bools and strings are templated below. +// Handles the numeric and string cases. Bools are templated below. // Data layout is: // RLEEncodingBase bytes // 4 * sizeof(physicalType) bytes: run values diff --git a/dwio/nimble/tablet/Constants.h b/dwio/nimble/tablet/Constants.h index a269048..d9dc390 100644 --- a/dwio/nimble/tablet/Constants.h +++ b/dwio/nimble/tablet/Constants.h @@ -31,5 +31,6 @@ constexpr uint32_t kPostscriptChecksumedSize = 5; constexpr std::string_view kSchemaSection = "columnar.schema"; constexpr std::string_view kMetadataSection = "columnar.metadata"; +constexpr std::string_view kStatsSection = "columnar.stats"; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/CMakeLists.txt b/dwio/nimble/velox/CMakeLists.txt index bf7fd5b..4bcf8b2 100644 --- a/dwio/nimble/velox/CMakeLists.txt +++ b/dwio/nimble/velox/CMakeLists.txt @@ -71,6 +71,19 @@ target_include_directories(nimble_velox_metadata_fb INTERFACE ${FLATBUFFERS_INCLUDE_DIR}) add_dependencies(nimble_velox_metadata_fb nimble_velox_metadata_schema_fb) +build_flatbuffers( + "${CMAKE_CURRENT_SOURCE_DIR}/Stats.fbs" + "" + nimble_velox_stats_schema_fb + "" + "${CMAKE_CURRENT_BINARY_DIR}" + "" + "") +add_library(nimble_velox_stats_fb INTERFACE) +target_include_directories(nimble_velox_stats_fb + INTERFACE ${FLATBUFFERS_INCLUDE_DIR}) +add_dependencies(nimble_velox_stats_fb nimble_velox_stats_schema_fb) + add_library(nimble_velox_schema_serialization SchemaSerialization.cpp) target_link_libraries( nimble_velox_schema_serialization nimble_velox_schema_reader diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 5838906..22c610e 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -43,6 +43,7 @@ class StreamData { virtual bool hasNulls() const = 0; virtual bool empty() const = 0; virtual uint64_t memoryUsed() const = 0; + virtual uint64_t rawSize() const = 0; virtual void reset() = 0; virtual void materialize() {} @@ -88,6 +89,14 @@ class ContentStreamData final : public StreamData { return (data_.size() * sizeof(T)) + extraMemory_; } + inline virtual uint64_t rawSize() const override { + if constexpr (std::is_same_v) { + return extraMemory_; + } else { + return ContentStreamData::memoryUsed(); + } + } + inline Vector& mutableData() { return data_; } @@ -141,6 +150,10 @@ class NullsStreamData : public StreamData { inline virtual uint64_t memoryUsed() const override { return nonNulls_.size(); } + inline virtual uint64_t rawSize() const override { + // Includes both nulls and non-nulls bits. + return NullsStreamData::memoryUsed(); + } inline Vector& mutableNonNulls() { return nonNulls_; @@ -195,6 +208,20 @@ class NullableContentStreamData final : public NullsStreamData { NullsStreamData::memoryUsed(); } + inline virtual uint64_t rawSize() const override { + auto size = 0; + if constexpr (std::is_same_v) { + size = extraMemory_; + } else { + size = (data_.size() * sizeof(T)) + extraMemory_; + } + if (NullsStreamData::rawSize() == 0) { + return size; + } else { + return size + (NullsStreamData::rawSize() - data_.size()); + } + } + inline Vector& mutableData() { return data_; } diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 9980969..b9e3872 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -15,14 +15,11 @@ */ #include "dwio/nimble/velox/VeloxWriter.h" -#include #include #include "dwio/nimble/common/Exceptions.h" #include "dwio/nimble/common/Types.h" -#include "dwio/nimble/encodings/Encoding.h" #include "dwio/nimble/encodings/EncodingSelectionPolicy.h" -#include "dwio/nimble/encodings/SentinelEncoding.h" #include "dwio/nimble/tablet/Constants.h" #include "dwio/nimble/velox/BufferGrowthPolicy.h" #include "dwio/nimble/velox/ChunkedStreamWriter.h" @@ -34,6 +31,7 @@ #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaSerialization.h" #include "dwio/nimble/velox/SchemaTypes.h" +#include "dwio/nimble/velox/StatsGenerated.h" #include "folly/ScopeGuard.h" #include "velox/common/time/CpuWallTimer.h" #include "velox/dwio/common/ExecutorBarrier.h" @@ -54,6 +52,8 @@ class WriterContext : public FieldWriterContext { // writer option. std::shared_ptr logger; + uint64_t rawDataSize{0}; + uint64_t rawStripeSize{0}; uint64_t memoryUsed{0}; uint64_t bytesWritten{0}; uint64_t rowsInFile{0}; @@ -83,6 +83,7 @@ class WriterContext : public FieldWriterContext { memoryUsed = 0; rowsInStripe = 0; stripeSize = 0; + rawStripeSize = 0; ++stripeIndex_; } @@ -564,14 +565,24 @@ void VeloxWriter::close() { serializer.serialize(context_->schemaBuilder)); } + { + flatbuffers::FlatBufferBuilder builder; + builder.Finish( + serialization::CreateStats(builder, context_->rawDataSize)); + writer_.writeOptionalSection( + std::string(kStatsSection), + {reinterpret_cast(builder.GetBufferPointer()), + builder.GetSize()}); + } + writer_.close(); file_->close(); context_->bytesWritten = file_->size(); auto runStats = getRunStats(); - // TODO: compute and populate input size. FileCloseMetrics metrics{ .rowCount = context_->rowsInFile, + .rawDataSize = context_->rawDataSize, .stripeCount = context_->getStripeIndex(), .fileSize = context_->bytesWritten, .totalFlushCpuUsec = runStats.flushCpuTimeUsec, @@ -621,14 +632,12 @@ void VeloxWriter::writeChunk(bool lastChunk) { streams_.resize(context_->schemaBuilder.nodeCount()); // When writing null streams, we write the nulls as data, and the stream - // itself is non-nullable. This adpater class is how we expose the nulls as - // values. + // itself is non-nullable. This adapter class is how we expose the nulls + // as values. class NullsAsDataStreamData : public StreamData { public: explicit NullsAsDataStreamData(StreamData& streamData) - : StreamData(streamData.descriptor()), streamData_{streamData} { - streamData_.materialize(); - } + : StreamData(streamData.descriptor()), streamData_{streamData} {} inline virtual std::string_view data() const override { return { @@ -650,7 +659,9 @@ void VeloxWriter::writeChunk(bool lastChunk) { inline virtual uint64_t memoryUsed() const override { return streamData_.memoryUsed(); } - + inline virtual uint64_t rawSize() const override { + return streamData_.rawSize(); + } inline virtual void reset() override { streamData_.reset(); } @@ -692,12 +703,28 @@ void VeloxWriter::writeChunk(bool lastChunk) { streamData.nonNulls().size() > minStreamSize) || (lastChunk && !streamData.empty() && !streams_[offset].content.empty())) { + // If we ended up in this code block, it means that we either saw + // enough nulls to flush a chunk, or that we are on the last chunk of + // a stream containing nulls. + // If previous chunks in the same stream contained nulls, then + // regardless of the presence of nulls in the last chunk we must flush + // it. If the last chunk doesn't contain nulls, the DataStream (which + // is a NullsStreamData) will contain an empty nulls buffer, so we + // must materialize it before we write. + streamData.materialize(); + + const int streamSize = streamData.rawSize(); + context_->rawStripeSize += streamSize; + context_->rawDataSize += streamSize; encoder(streamData, true); } } else { if (streamData.data().size() > minStreamSize || (lastChunk && streamData.nonNulls().size() > 0 && !streams_[offset].content.empty())) { + const int streamSize = streamData.rawSize(); + context_->rawStripeSize += streamSize; + context_->rawDataSize += streamSize; encoder(streamData, false); } } @@ -827,7 +854,7 @@ bool VeloxWriter::tryWriteStripe(bool force) { } StripeFlushMetrics metrics{ - .inputSize = context_->stripeSize, + .rawStripeSize = context_->rawStripeSize, .rowCount = context_->rowsInStripe, .trackedMemory = context_->memoryUsed, }; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index b747758..94d9be2 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include "dwio/nimble/common/Buffer.h" @@ -25,9 +24,12 @@ #include "dwio/nimble/common/Vector.h" #include "dwio/nimble/common/tests/NimbleFileWriter.h" #include "dwio/nimble/common/tests/TestUtils.h" +#include "dwio/nimble/tablet/Constants.h" #include "dwio/nimble/velox/SchemaUtils.h" +#include "dwio/nimble/velox/StatsGenerated.h" #include "dwio/nimble/velox/VeloxReader.h" #include "dwio/nimble/velox/VeloxWriter.h" +#include "dwio/nimble/velox/tests/TestUtils.h" #include "folly/FileUtil.h" #include "folly/Random.h" #include "folly/executors/CPUThreadPoolExecutor.h" @@ -35,9 +37,7 @@ #include "velox/type/Type.h" #include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h" -#include "velox/vector/DecodedVector.h" #include "velox/vector/NullsBuilder.h" -#include "velox/vector/SelectivityVector.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorMaker.h" @@ -593,6 +593,18 @@ class VeloxReaderTests : public ::testing::Test { } velox::InMemoryReadFile readFile(file); + + // Test rawDataSize Stat + nimble::TabletReader tablet{pool, &readFile}; + auto section = + tablet.loadOptionalSection(std::string(nimble::kStatsSection)); + EXPECT_TRUE(section.has_value()); + auto stats = flatbuffers::GetRoot( + section->content().data()); + uint64_t resultStat = stats->raw_size(); + auto expectedStat = nimble::test::getFileRawDataSize(tablet, pool); + ASSERT_EQ(resultStat, expectedStat); + auto selector = std::make_shared(type); // new pool with to limit already used memory and with tracking enabled auto leakDetectPool = diff --git a/dwio/nimble/velox/tests/VeloxWriterTests.cpp b/dwio/nimble/velox/tests/VeloxWriterTests.cpp index 3ec254d..9f07824 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTests.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTests.cpp @@ -17,20 +17,20 @@ #include #include -#include "dwio/nimble/common/EncodingPrimitives.h" #include "dwio/nimble/common/tests/TestUtils.h" #include "dwio/nimble/encodings/EncodingLayoutCapture.h" #include "dwio/nimble/tablet/Constants.h" #include "dwio/nimble/velox/ChunkedStream.h" #include "dwio/nimble/velox/EncodingLayoutTree.h" #include "dwio/nimble/velox/SchemaSerialization.h" +#include "dwio/nimble/velox/StatsGenerated.h" #include "dwio/nimble/velox/VeloxReader.h" #include "dwio/nimble/velox/VeloxWriter.h" +#include "dwio/nimble/velox/tests/TestUtils.h" #include "folly/FileUtil.h" #include "folly/Random.h" #include "velox/common/memory/MemoryArbitrator.h" #include "velox/common/memory/SharedArbitrator.h" -#include "velox/vector/VectorStream.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include "velox/vector/tests/utils/VectorMaker.h" @@ -1811,6 +1811,196 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeZero) { /* flatmapColumns */ {"c1"}); } +TEST_F(VeloxWriterTests, RawDataSizeDataTypes) { + auto type = velox::ROW({ + {"tinyint_val", velox::SMALLINT()}, + {"smallint_val", velox::INTEGER()}, + {"int_val", velox::BIGINT()}, + {"long_val", velox::BIGINT()}, + {"float_val", velox::DOUBLE()}, + {"double_val", velox::DOUBLE()}, + {"bool_val", velox::INTEGER()}, + {"string_val", velox::VARCHAR()}, + {"array_val", velox::ARRAY(velox::BIGINT())}, + {"map_val", velox::MAP(velox::INTEGER(), velox::BIGINT())}, + {"struct_val", + velox::ROW({ + {"float_val", velox::REAL()}, + {"double_val", velox::DOUBLE()}, + })}, + {"nested_val", + velox::MAP( + velox::INTEGER(), + velox::ROW({ + {"float_val", velox::REAL()}, + {"array_val", + velox::ARRAY(velox::MAP(velox::INTEGER(), velox::BIGINT()))}, + }))}, + }); + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer(*rootPool_, type, std::move(writeFile), {}); + + auto batches = + generateBatches(type, 50, 500, folly::Random::rand32(), *leafPool_); + for (const auto& batch : batches) { + writer.write(batch); + } + + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::TabletReader tablet{*leafPool_, &readFile}; + auto section = tablet.loadOptionalSection(std::string(nimble::kStatsSection)); + EXPECT_TRUE(section.has_value()); + auto stats = flatbuffers::GetRoot( + section->content().data()); + uint64_t result = stats->raw_size(); + auto expected = nimble::test::getFileRawDataSize(tablet, *leafPool_); + ASSERT_EQ(result, expected); +} + +TEST_F(VeloxWriterTests, RawDataSizeTrivialString) { + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + auto vector = vectorMaker.rowVector( + {vectorMaker.flatVector({"1", "2", "3"})}); + + std::string file; + auto writeFile = std::make_unique(&file); + + nimble::VeloxWriter writer( + *rootPool_, vector->type(), std::move(writeFile), {}); + writer.write(vector); + writer.close(); + velox::InMemoryReadFile readFile(file); + nimble::TabletReader tablet{*leafPool_, &readFile}; + auto section = tablet.loadOptionalSection(std::string(nimble::kStatsSection)); + EXPECT_TRUE(section.has_value()); + auto stats = flatbuffers::GetRoot( + section->content().data()); + uint64_t result = stats->raw_size(); + auto expected = nimble::test::getFileRawDataSize(tablet, *leafPool_); + ASSERT_EQ(result, expected); +} + +TEST_F(VeloxWriterTests, RawDataSizeConstantString) { + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + auto vector = vectorMaker.rowVector( + {vectorMaker.flatVector({"abc", "abc", "abc", "abc"})}); + + std::string file; + auto writeFile = std::make_unique(&file); + + nimble::VeloxWriter writer( + *rootPool_, vector->type(), std::move(writeFile), {}); + writer.write(vector); + writer.close(); + velox::InMemoryReadFile readFile(file); + nimble::TabletReader tablet{*leafPool_, &readFile}; + auto section = tablet.loadOptionalSection(std::string(nimble::kStatsSection)); + EXPECT_TRUE(section.has_value()); + auto stats = flatbuffers::GetRoot( + section->content().data()); + uint64_t result = stats->raw_size(); + auto expected = nimble::test::getFileRawDataSize(tablet, *leafPool_); + ASSERT_EQ(result, expected); +} + +TEST_F(VeloxWriterTests, RawDataSizeNullableString) { + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + + auto vector = vectorMaker.rowVector( + {"string"}, + { + vectorMaker.flatVector( + 25, + [](auto /* row */) { + return std::string("abcdefghijklmnopqrstuvwxyz0123456789"); + }, + [](auto row) { return row == 6; }), + }); + + std::string file; + auto writeFile = std::make_unique(&file); + + nimble::VeloxWriter writer( + *rootPool_, vector->type(), std::move(writeFile), {}); + writer.write(vector); + writer.close(); + velox::InMemoryReadFile readFile(file); + nimble::TabletReader tablet{*leafPool_, &readFile}; + auto section = tablet.loadOptionalSection(std::string(nimble::kStatsSection)); + EXPECT_TRUE(section.has_value()); + auto stats = flatbuffers::GetRoot( + section->content().data()); + uint64_t result = stats->raw_size(); + auto expected = nimble::test::getFileRawDataSize(tablet, *leafPool_); + ASSERT_EQ(result, expected); +} + +TEST_F(VeloxWriterTests, RawDataSizeManyStripes) { + auto type = velox::ROW({{"simple", velox::INTEGER()}}); + nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() { + // Buffering 256MB data before encoding stripes. + return std::make_unique(256 << 10); + }}; + + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer( + *rootPool_, type, std::move(writeFile), std::move(writerOptions)); + uint32_t seed = folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + auto batches = generateBatches(type, 100, 4000, seed, *leafPool_); + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::TabletReader tablet{*leafPool_, &readFile}; + auto section = tablet.loadOptionalSection(std::string(nimble::kStatsSection)); + EXPECT_TRUE(section.has_value()); + ASSERT_EQ(7, tablet.stripeCount()); + auto stats = flatbuffers::GetRoot( + section->content().data()); + uint64_t result = stats->raw_size(); + auto expected = nimble::test::getFileRawDataSize(tablet, *leafPool_); + ASSERT_EQ(result, expected); +} + +TEST_F(VeloxWriterTests, RawDataSizeOneStripe) { + auto type = velox::ROW({{"simple", velox::INTEGER()}}); + nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() { + // Buffering 256MB data before encoding stripes. + return std::make_unique(256 << 20); + }}; + + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer( + *rootPool_, type, std::move(writeFile), std::move(writerOptions)); + uint32_t seed = folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + auto batches = generateBatches(type, 5, 15, seed, *leafPool_); + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::TabletReader tablet{*leafPool_, &readFile}; + auto section = tablet.loadOptionalSection(std::string(nimble::kStatsSection)); + EXPECT_TRUE(section.has_value()); + ASSERT_EQ(1, tablet.stripeCount()); + auto stats = flatbuffers::GetRoot( + section->content().data()); + uint64_t result = stats->raw_size(); + auto expected = nimble::test::getFileRawDataSize(tablet, *leafPool_); + ASSERT_EQ(result, expected); +} + INSTANTIATE_TEST_CASE_P( RawStripeSizeFlushPolicyTestSuite, RawStripeSizeFlushPolicyTest,