diff --git a/pp/bare_bones/memory.h b/pp/bare_bones/memory.h index 4c55789f8..367bbc719 100644 --- a/pp/bare_bones/memory.h +++ b/pp/bare_bones/memory.h @@ -2,6 +2,7 @@ #include #include +#include #include "preprocess.h" #include "type_traits.h" diff --git a/pp/entrypoint/head/serialization.h b/pp/entrypoint/head/serialization.h new file mode 100644 index 000000000..e94de7624 --- /dev/null +++ b/pp/entrypoint/head/serialization.h @@ -0,0 +1,29 @@ +#pragma once + +#include "series_data/serialization/serialized_data.h" + +namespace entrypoint::head { + +class SerializedDataGo { + public: + explicit SerializedDataGo(const series_data::DataStorage& storage, const series_data::querier::QueriedChunkList& queried_chunks) + : data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)} {} + + [[nodiscard]] PROMPP_ALWAYS_INLINE auto get_buffer() const noexcept { return data_view_.get_buffer(); } + [[nodiscard]] PROMPP_ALWAYS_INLINE auto get_chunks() const noexcept { return data_view_.get_chunks(); } + + [[nodiscard]] PROMPP_ALWAYS_INLINE uint32_t next() noexcept { return data_view_.next_series(); } + [[nodiscard]] PROMPP_ALWAYS_INLINE auto iterator() const noexcept { return data_view_.create_current_series_iterator(); } + + private: + series_data::serialization::SerializedData data_; + series_data::serialization::SerializedDataView data_view_{data_}; +}; + +using SerializedDataPtr = std::unique_ptr; +using SerializedDataIteratorPtr = std::unique_ptr; + +static_assert(sizeof(SerializedDataPtr) == sizeof(void*)); +static_assert(sizeof(SerializedDataIteratorPtr) == sizeof(void*)); + +} // namespace entrypoint::head \ No newline at end of file diff --git a/pp/entrypoint/series_data/querier.h b/pp/entrypoint/series_data/querier.h index 60c03b0a3..76d2f05fd 100644 --- a/pp/entrypoint/series_data/querier.h +++ b/pp/entrypoint/series_data/querier.h @@ -5,6 +5,7 @@ #include "primitives/primitives.h" #include "series_data/querier/instant_querier.h" #include "series_data/querier/querier.h" +#include "series_data/serialization/serialized_data.h" #include "series_data/serialization/serializer.h" namespace entrypoint::series_data { @@ -87,15 +88,49 @@ class RangeQuerierWithArgumentsWrapper { } }; -enum class QuerierType : uint8_t { - kInstantQuerier = 0, - kRangeQuerier, +class RangeQuerierWithArgumentsWrapperNew { + using DataStorage = ::series_data::DataStorage; + using LabelSetID = PromPP::Primitives::LabelSetID; + template + using Slice = PromPP::Primitives::Go::Slice; + using Query = ::series_data::querier::Query>; + using Serializer = ::series_data::serialization::Serializer; + using BytesStream = PromPP::Primitives::Go::BytesStream; + + public: + RangeQuerierWithArgumentsWrapperNew(DataStorage& storage, const Query& query, head::SerializedDataPtr* serialized_data) + : querier_(storage), query_(&query), serialized_data_(serialized_data) {} + + void query() noexcept { + querier_.query(*query_); + if (!querier_.need_loading()) { + serialize_chunks(); + } + } + + PROMPP_ALWAYS_INLINE void query_finalize() const noexcept { serialize_chunks(); } + + [[nodiscard]] const BareBones::Bitset& series_to_load() const noexcept { return querier_.get_series_to_load(); } + [[nodiscard]] bool need_loading() const noexcept { return querier_.need_loading(); } + [[nodiscard]] DataStorage& storage() noexcept { return querier_.get_storage(); } + + private: + ::series_data::querier::Querier querier_; + const Query* query_; + head::SerializedDataPtr* serialized_data_; + + PROMPP_ALWAYS_INLINE void serialize_chunks() const noexcept { + std::construct_at(serialized_data_, std::make_unique(querier_.get_storage(), querier_.chunks())); + } }; -using QuerierVariant = std::variant; +enum class QuerierType : uint8_t { kInstantQuerier = 0, kRangeQuerier, kRangeQuerierNew }; + +using QuerierVariant = std::variant; using QuerierVariantPtr = std::unique_ptr; } // namespace entrypoint::series_data static_assert(entrypoint::series_data::QuerierInterface); static_assert(entrypoint::series_data::QuerierInterface); +static_assert(entrypoint::series_data::QuerierInterface); \ No newline at end of file diff --git a/pp/entrypoint/series_data_data_storage.cpp b/pp/entrypoint/series_data_data_storage.cpp index acaf85bbf..c473d8a17 100644 --- a/pp/entrypoint/series_data_data_storage.cpp +++ b/pp/entrypoint/series_data_data_storage.cpp @@ -5,6 +5,7 @@ #include "head/chunk_recoder.h" #include "head/data_storage.h" #include "head/lss.h" +#include "head/serialization.h" #include "primitives/go_slice.h" #include "series_data/data_storage.h" #include "series_data/loader.h" @@ -142,6 +143,36 @@ extern "C" void prompp_series_data_data_storage_query(void* args, void* res) { } } +extern "C" void prompp_series_data_data_storage_query_new(void* args, void* res) { + using Query = series_data::querier::Query>; + using entrypoint::series_data::RangeQuerierWithArgumentsWrapperNew; + using series_data::querier::Querier; + + struct Arguments { + DataStoragePtr data_storage; + Query query; + }; + + struct Result { + QuerierVariantPtr querier{}; + QueryStatus status{}; + entrypoint::head::SerializedDataPtr* serialized_data{}; + }; + + const auto in = static_cast(args); + const auto out = static_cast(res); + + RangeQuerierWithArgumentsWrapperNew querier(*in->data_storage, in->query, out->serialized_data); + querier.query(); + + if (querier.need_loading()) { + out->querier = std::make_unique(std::in_place_index<2>, std::move(querier)); + out->status = QueryStatus::kNeedDataLoad; + } else { + out->status = QueryStatus::kSuccess; + } +} + extern "C" void prompp_series_data_data_storage_instant_query(void* args, void* res) { using entrypoint::series_data::InstantQuerierWithArgumentsWrapperEntrypoint; using PromPP::Primitives::Timestamp; @@ -249,6 +280,23 @@ extern "C" void prompp_series_data_serialized_chunk_recoder_ctor(void* args, voi }; } +extern "C" void prompp_series_data_serialized_chunk_recoder_new_ctor(void* args, void* res) { + struct Arguments { + entrypoint::head::SerializedDataPtr* serialized_data; + PromPP::Primitives::TimeInterval time_interval; + }; + struct Result { + ChunkRecoderVariantPtr chunk_recoder; + }; + + const auto in = static_cast(args); + new (res) Result{ + .chunk_recoder = std::make_unique( + std::in_place_type, + series_data::chunk::SerializedChunkIterator{in->serialized_data->get()->get_buffer(), in->serialized_data->get()->get_chunks()}, in->time_interval), + }; +} + extern "C" void prompp_series_data_chunk_recoder_recode_next_chunk(void* args, void* res) { struct Arguments { ChunkRecoderVariantPtr chunk_recoder; diff --git a/pp/entrypoint/series_data_data_storage.h b/pp/entrypoint/series_data_data_storage.h index a6c47cd75..e4c6df416 100644 --- a/pp/entrypoint/series_data_data_storage.h +++ b/pp/entrypoint/series_data_data_storage.h @@ -108,6 +108,22 @@ void prompp_series_data_data_storage_allocated_memory(void* args, void* res); */ void prompp_series_data_data_storage_query(void* args, void* res); +/** + * @brief Queries data storage and serializes result (new serialization model). + * + * @param args { + * dataStorage uintptr // pointer to constructed data storage + * query DataStorageQuery // query + * } + * + * @param res { + * Querier uintptr // pointer to constructed Querier if data loading is needed + * Status uint8 // status of a query (0 - Success, 1 - Data loading is needed) + * serializedData uintptr // pointer to serialized data + * } + */ +void prompp_series_data_data_storage_query_new(void* args, void* res); + /** * @brief return samples at given timestamp for label sets. * @@ -178,6 +194,22 @@ void prompp_series_data_chunk_recoder_ctor(void* args, void* res); */ void prompp_series_data_serialized_chunk_recoder_ctor(void* args, void* res); +/** + * @brief Construct a new ChunkRecoder object to recode all serialized chunks (new model) + * + * @param args { + * serializedData uintptr // pointer to serialized data + * time_interval struct { // closed interval [min, max] + * min int64 + * max int64 + * } + * } + * @param res { + * chunk_recoder uintptr // pointer to chunk recoder + * } + */ +void prompp_series_data_serialized_chunk_recoder_new_ctor(void* args, void* res); + /** * @brief Get chunk encoded in prometheus format * diff --git a/pp/entrypoint/series_data_serialization_serialized_data.cpp b/pp/entrypoint/series_data_serialization_serialized_data.cpp new file mode 100644 index 000000000..222ad64b1 --- /dev/null +++ b/pp/entrypoint/series_data_serialization_serialized_data.cpp @@ -0,0 +1,68 @@ +#include "series_data_serialization_serialized_data.h" + +#include "head/serialization.h" + +extern "C" void prompp_series_data_serialization_serialized_data_next(void* args, void* res) { + struct Arguments { + entrypoint::head::SerializedDataPtr serialized_data; + }; + + using Result = struct { + uint32_t series_id; + }; + + new (res) Result{.series_id = static_cast(args)->serialized_data->next()}; +} + +extern "C" void prompp_series_data_serialization_serialized_data_iterator(void* args, void* res) { + struct Arguments { + entrypoint::head::SerializedDataPtr serialized_data; + }; + + using Result = struct { + entrypoint::head::SerializedDataIteratorPtr iterator; + }; + + new (res) Result{ + .iterator = std::make_unique(static_cast(args)->serialized_data->iterator())}; +} + +extern "C" void prompp_series_data_serialization_serialized_data_iterator_next(void* args, void* res) { + using series_data::decoder::DecodeIteratorSentinel; + + struct Arguments { + entrypoint::head::SerializedDataIteratorPtr iterator; + }; + + struct Result { + int64_t timestamp{}; + double value{}; + bool has_value; + }; + + const Arguments* in = static_cast(args); + + if (*in->iterator == DecodeIteratorSentinel{}) { + new (res) Result{.has_value = false}; + } else { + const auto sample = **(in->iterator); + new (res) Result{.timestamp = sample.timestamp, .value = sample.value, .has_value = true}; + ++(*in->iterator); + } +} + +extern "C" void prompp_series_data_serialization_serialized_data_iterator_dtor(void* args) { + struct Arguments { + entrypoint::head::SerializedDataIteratorPtr iterator; + }; + + static_cast(args)->~Arguments(); +} + +extern "C" void prompp_series_data_serialization_serialized_data_dtor(void* args) { + struct Arguments { + entrypoint::head::SerializedDataPtr serialized_data; + }; + + static_cast(args)->~Arguments(); +} \ No newline at end of file diff --git a/pp/entrypoint/series_data_serialization_serialized_data.h b/pp/entrypoint/series_data_serialization_serialized_data.h new file mode 100644 index 000000000..6530bcb31 --- /dev/null +++ b/pp/entrypoint/series_data_serialization_serialized_data.h @@ -0,0 +1,68 @@ +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief Get next series_id in serialized data. + * + * @param args { + * serializedData uintptr // pointer to serialized data. + * } + * + * @param res { + * series_id uint32 // series id (UINT32_MAX if no more series). + * } + */ +void prompp_series_data_serialization_serialized_data_next(void* args, void* res); + +/** + * @brief Create a decode iterator for current series_id (returned by the last call of _next()) + * + * @param args { + * serializedData uintptr // pointer to serialized data. + * } + * + * @param res { + * iterator uintptr // pointer to constructed decode iterator. + * } + */ +void prompp_series_data_serialization_serialized_data_iterator(void* args, void* res); + +/** + * @brief Advance decode iterator. + * + * @param args { + * iterator uintptr // pointer to decode iterator + * } + * + * @param res { + * has_data bool // is iterator has more data to decode. + * timestamp int64 // sample timestamp + * value float64 // sample value + * } + */ +void prompp_series_data_serialization_serialized_data_iterator_next(void* args, void* res); + +/** + * @brief Destroy decode iterator. + * + * @param args { + * iterator uintptr // pointer to decode iterator + * } + * + */ +void prompp_series_data_serialization_serialized_data_iterator_dtor(void* args); + +/** + * @brief Destroy serialized data object. + * + * @param args { + * serializedData uintptr // pointer to serialized data. + * } + * + */ +void prompp_series_data_serialization_serialized_data_dtor(void* args); + +#ifdef __cplusplus +} // extern "C" +#endif diff --git a/pp/go/cppbridge/entrypoint.h b/pp/go/cppbridge/entrypoint.h index a59048052..e4710654d 100755 --- a/pp/go/cppbridge/entrypoint.h +++ b/pp/go/cppbridge/entrypoint.h @@ -1200,6 +1200,22 @@ void prompp_series_data_data_storage_allocated_memory(void* args, void* res); */ void prompp_series_data_data_storage_query(void* args, void* res); +/** + * @brief Queries data storage and serializes result (new serialization model). + * + * @param args { + * dataStorage uintptr // pointer to constructed data storage + * query DataStorageQuery // query + * } + * + * @param res { + * Querier uintptr // pointer to constructed Querier if data loading is needed + * Status uint8 // status of a query (0 - Success, 1 - Data loading is needed) + * serializedData uintptr // pointer to serialized data + * } + */ +void prompp_series_data_data_storage_query_new(void* args, void* res); + /** * @brief return samples at given timestamp for label sets. * @@ -1270,6 +1286,22 @@ void prompp_series_data_chunk_recoder_ctor(void* args, void* res); */ void prompp_series_data_serialized_chunk_recoder_ctor(void* args, void* res); +/** + * @brief Construct a new ChunkRecoder object to recode all serialized chunks (new model) + * + * @param args { + * serializedData uintptr // pointer to serialized data + * time_interval struct { // closed interval [min, max] + * min int64 + * max int64 + * } + * } + * @param res { + * chunk_recoder uintptr // pointer to chunk recoder + * } + */ +void prompp_series_data_serialized_chunk_recoder_new_ctor(void* args, void* res); + /** * @brief Get chunk encoded in prometheus format * @@ -1540,6 +1572,74 @@ void prompp_series_data_encoder_dtor(void* args); extern "C" { #endif +/** + * @brief Get next series_id in serialized data. + * + * @param args { + * serializedData uintptr // pointer to serialized data. + * } + * + * @param res { + * series_id uint32 // series id (UINT32_MAX if no more series). + * } + */ +void prompp_series_data_serialization_serialized_data_next(void* args, void* res); + +/** + * @brief Create a decode iterator for current series_id (returned by the last call of _next()) + * + * @param args { + * serializedData uintptr // pointer to serialized data. + * } + * + * @param res { + * iterator uintptr // pointer to constructed decode iterator. + * } + */ +void prompp_series_data_serialization_serialized_data_iterator(void* args, void* res); + +/** + * @brief Advance decode iterator. + * + * @param args { + * iterator uintptr // pointer to decode iterator + * } + * + * @param res { + * has_data bool // is iterator has more data to decode. + * timestamp int64 // sample timestamp + * value float64 // sample value + * } + */ +void prompp_series_data_serialization_serialized_data_iterator_next(void* args, void* res); + +/** + * @brief Destroy decode iterator. + * + * @param args { + * iterator uintptr // pointer to decode iterator + * } + * + */ +void prompp_series_data_serialization_serialized_data_iterator_dtor(void* args); + +/** + * @brief Destroy serialized data object. + * + * @param args { + * serializedData uintptr // pointer to serialized data. + * } + * + */ +void prompp_series_data_serialization_serialized_data_dtor(void* args); + +#ifdef __cplusplus +} // extern "C" +#endif +#ifdef __cplusplus +extern "C" { +#endif + /** * @brief Construct a new WAL Decoder * diff --git a/pp/series_data/benchmarks/BUILD b/pp/series_data/benchmarks/BUILD index e6d088b9f..8b59537f5 100644 --- a/pp/series_data/benchmarks/BUILD +++ b/pp/series_data/benchmarks/BUILD @@ -8,4 +8,14 @@ cc_binary( "//:series_data", "@google_benchmark//:benchmark_main", ], +) + +cc_binary( + name = "serializer", + srcs = ["serializer_benchmark.cpp"], + malloc = "@jemalloc", + deps = [ + "//:series_data", + "@google_benchmark//:benchmark_main", + ], ) \ No newline at end of file diff --git a/pp/series_data/benchmarks/serializer_benchmark.cpp b/pp/series_data/benchmarks/serializer_benchmark.cpp new file mode 100644 index 000000000..07b10ad05 --- /dev/null +++ b/pp/series_data/benchmarks/serializer_benchmark.cpp @@ -0,0 +1,207 @@ +#include +#include + +#include + +#include +#include + +#include "bare_bones/preprocess.h" +#include "primitives/go_slice.h" +#include "series_data/encoder.h" +#include "series_data/querier/query.h" +#include "series_data/serialization/serialized_data.h" +#include "series_data/serialization/serializer.h" + +namespace { + +using BareBones::StreamVByte::CompactSequence; +using BareBones::StreamVByte::Sequence; +using series_data::serialization::DataSerializer; +using series_data::serialization::SerializedData; +using series_data::serialization::SerializedDataView; + +struct PROMPP_ATTRIBUTE_PACKED SeriesSample { + uint32_t series_id; + int64_t timestamp; + double value; +}; + +const BareBones::Vector& get_samples_for_benchmark() { + constexpr auto get_file_name = [] -> std::string { + if (auto& context = benchmark::internal::GetGlobalContext(); context != nullptr) { + return context->operator[]("wal_file"); + } + + return {}; + }; + + static BareBones::Vector samples_from_file; + if (samples_from_file.empty()) [[likely]] { + std::ifstream istrm(get_file_name(), std::ios::binary); + istrm >> samples_from_file; + } + + return samples_from_file; +} + +series_data::querier::QueriedChunkList generate_query(uint32_t size) { + series_data::querier::QueriedChunkList chunk_list; + + std::vector v(size); + std::iota(v.begin(), v.end(), 0); + + std::mt19937 g(42); + std::ranges::shuffle(v, g); + v.resize(v.size() / 10); + + chunk_list.reserve(v.size()); + for (uint32_t ls_id : v) { + chunk_list.emplace_back(ls_id); + } + + return chunk_list; +} + +void BenchmarkWalSerializer(benchmark::State& state) { + const auto& samples = get_samples_for_benchmark(); + const double percent = static_cast(state.range(0)) / 100.0; + const auto [min, max] = std::ranges::minmax_element(samples, [](auto a, auto b) { return a.timestamp < b.timestamp; }); + const auto min_ts = min->timestamp; + const auto max_ts = max->timestamp; + const auto delta_ts = max_ts - min_ts; + + series_data::DataStorage storage; + series_data::Encoder encoder{storage}; + + for (const auto& sample : samples) { + if (sample.timestamp <= min_ts + static_cast(static_cast(delta_ts) * percent)) { + encoder.encode(sample.series_id, sample.timestamp, sample.value); + } + } + + const series_data::querier::QueriedChunkList chunk_list = generate_query(storage.open_chunks.size()); + + for ([[maybe_unused]] auto _ : state) { + series_data::serialization::Serializer serializer_{storage}; + PromPP::Primitives::Go::Slice slice; + PromPP::Primitives::Go::BytesStream stream{&slice}; + + serializer_.serialize(chunk_list, stream); + } + + { + series_data::serialization::Serializer serializer_{storage}; + PromPP::Primitives::Go::Slice slice; + PromPP::Primitives::Go::BytesStream stream{&slice}; + + serializer_.serialize(chunk_list, stream); + state.counters["Stream Size"] = + benchmark::Counter(static_cast(slice.allocated_memory()), benchmark::Counter::kDefaults, benchmark::Counter::OneK::kIs1024); + } +} + +void BenchmarkWalConstantSerializer(benchmark::State& state) { + const auto& samples = get_samples_for_benchmark(); + const double percent = static_cast(state.range(0)) / 100.0; + const auto [min, max] = std::ranges::minmax_element(samples, [](auto a, auto b) { return a.timestamp < b.timestamp; }); + const auto min_ts = min->timestamp; + const auto max_ts = max->timestamp; + const auto delta_ts = max_ts - min_ts; + + series_data::DataStorage storage; + series_data::Encoder encoder{storage}; + + for (const auto& sample : samples) { + if (sample.timestamp <= min_ts + static_cast(static_cast(delta_ts) * percent)) { + encoder.encode(sample.series_id, sample.timestamp, sample.series_id); + } + } + + const series_data::querier::QueriedChunkList chunk_list = generate_query(storage.open_chunks.size()); + + for ([[maybe_unused]] auto _ : state) { + series_data::serialization::Serializer serializer_{storage}; + PromPP::Primitives::Go::Slice slice; + PromPP::Primitives::Go::BytesStream stream{&slice}; + + serializer_.serialize(chunk_list, stream); + } + + { + series_data::serialization::Serializer serializer_{storage}; + PromPP::Primitives::Go::Slice slice; + PromPP::Primitives::Go::BytesStream stream{&slice}; + + serializer_.serialize(chunk_list, stream); + state.counters["Stream Size"] = + benchmark::Counter(static_cast(slice.allocated_memory()), benchmark::Counter::kDefaults, benchmark::Counter::OneK::kIs1024); + } +} + +void BenchmarkWalSerializedData(benchmark::State& state) { + const auto& samples = get_samples_for_benchmark(); + const double percent = static_cast(state.range(0)) / 100.0; + const auto [min, max] = std::ranges::minmax_element(samples, [](auto a, auto b) { return a.timestamp < b.timestamp; }); + const auto min_ts = min->timestamp; + const auto max_ts = max->timestamp; + const auto delta_ts = max_ts - min_ts; + + series_data::DataStorage storage; + series_data::Encoder encoder{storage}; + + for (const auto& sample : samples) { + if (sample.timestamp <= min_ts + static_cast(static_cast(delta_ts) * percent)) { + encoder.encode(sample.series_id, sample.timestamp, sample.value); + } + } + + const series_data::querier::QueriedChunkList chunk_list = generate_query(storage.open_chunks.size()); + + for ([[maybe_unused]] auto _ : state) { + SerializedData serialized = DataSerializer{storage}.serialize(chunk_list); + benchmark::DoNotOptimize(serialized); + } + + { + const SerializedData serialized = DataSerializer{storage}.serialize(chunk_list); + state.counters["Total Size"] = benchmark::Counter(serialized.allocated_memory(), benchmark::Counter::kDefaults, benchmark::Counter::OneK::kIs1024); + } +} + +void BenchmarkWalConstantSerializedData(benchmark::State& state) { + const auto& samples = get_samples_for_benchmark(); + const double percent = static_cast(state.range(0)) / 100.0; + const auto [min, max] = std::ranges::minmax_element(samples, [](auto a, auto b) { return a.timestamp < b.timestamp; }); + const auto min_ts = min->timestamp; + const auto max_ts = max->timestamp; + const auto delta_ts = max_ts - min_ts; + + series_data::DataStorage storage; + series_data::Encoder encoder{storage}; + + for (const auto& sample : samples) { + if (sample.timestamp <= min_ts + static_cast(static_cast(delta_ts) * percent)) { + encoder.encode(sample.series_id, sample.timestamp, sample.series_id); + } + } + + const series_data::querier::QueriedChunkList chunk_list = generate_query(storage.open_chunks.size()); + + for ([[maybe_unused]] auto _ : state) { + SerializedData serialized = DataSerializer{storage}.serialize(chunk_list); + benchmark::DoNotOptimize(serialized); + } + + { + const SerializedData serialized = DataSerializer{storage}.serialize(chunk_list); + state.counters["Total Size"] = benchmark::Counter(serialized.allocated_memory(), benchmark::Counter::kDefaults, benchmark::Counter::OneK::kIs1024); + } +} + +BENCHMARK(BenchmarkWalSerializer)->Arg(25)->Arg(50)->Arg(75)->Arg(100); +BENCHMARK(BenchmarkWalSerializedData)->Arg(25)->Arg(50)->Arg(75)->Arg(100); +BENCHMARK(BenchmarkWalConstantSerializer)->Arg(25)->Arg(50)->Arg(75)->Arg(100); +BENCHMARK(BenchmarkWalConstantSerializedData)->Arg(25)->Arg(50)->Arg(75)->Arg(100); + +} // namespace diff --git a/pp/series_data/chunk/serialized_chunk.h b/pp/series_data/chunk/serialized_chunk.h index bfa07be76..f0637d5eb 100644 --- a/pp/series_data/chunk/serialized_chunk.h +++ b/pp/series_data/chunk/serialized_chunk.h @@ -55,6 +55,7 @@ class SerializedChunkIterator { using reference = value_type&; explicit SerializedChunkIterator(std::span buffer) : data_(buffer, get_chunks(buffer)) {} + explicit SerializedChunkIterator(std::span buffer, SerializedChunkSpan chunks) : data_(buffer, chunks) {} [[nodiscard]] PROMPP_ALWAYS_INLINE const Data& operator*() const noexcept { return data_; } [[nodiscard]] PROMPP_ALWAYS_INLINE const Data* operator->() const noexcept { return &data_; } diff --git a/pp/series_data/serialization/deserializer.h b/pp/series_data/serialization/deserializer.h index 952aacb17..06725e798 100644 --- a/pp/series_data/serialization/deserializer.h +++ b/pp/series_data/serialization/deserializer.h @@ -23,6 +23,7 @@ class Deserializer { uint32_t chunks_count = *reinterpret_cast(buffer.data()); return {reinterpret_cast(buffer.data() + sizeof(uint32_t)), chunks_count}; } + [[nodiscard]] static decoder::UniversalDecodeIterator create_decode_iterator(std::span buffer, const chunk::SerializedChunk& chunk) { decoder::UniversalDecodeIterator iterator(std::in_place_type, 0, BareBones::BitSequenceReader(nullptr, 0), 0, false); Decoder::create_decode_iterator(buffer, chunk, [&iterator](Iterator&& begin, auto&&) { @@ -30,6 +31,7 @@ class Deserializer { }); return iterator; } + [[nodiscard]] static decoder::UniversalDecodeIterator create_decode_iterator(const chunk::SerializedChunkIterator::Data& chunk) { return create_decode_iterator(chunk.buffer(), chunk.chunk()); } diff --git a/pp/series_data/serialization/serialized_data.h b/pp/series_data/serialization/serialized_data.h new file mode 100644 index 000000000..0515caf8d --- /dev/null +++ b/pp/series_data/serialization/serialized_data.h @@ -0,0 +1,303 @@ +#pragma once +#include "bare_bones/memory.h" +#include "series_data/chunk/serialized_chunk.h" +#include "series_data/data_storage.h" +#include "series_data/decoder.h" +#include "series_data/decoder/universal_decode_iterator.h" +#include "series_data/querier/query.h" + +namespace series_data::serialization { + +struct SerializedData { + using Memory = BareBones::Memory; + + BareBones::Vector chunks; + Memory bytes_buffer; + + [[nodiscard]] PROMPP_ALWAYS_INLINE uint32_t allocated_memory() const noexcept { return chunks.allocated_memory() + bytes_buffer.allocated_memory(); } +}; + +class DataSerializer { + public: + explicit DataSerializer(const DataStorage& storage) : storage_(storage) {} + + SerializedData serialize(const querier::QueriedChunkList& queried_chunks) noexcept { return serialize_internal(queried_chunks); } + SerializedData serialize() noexcept { return serialize_internal(storage_.chunks()); } + + private: + struct TimestampStreamsData { + using TimestampId = uint32_t; + using Offset = uint32_t; + + static constexpr Offset kInvalidOffset = std::numeric_limits::max(); + + phmap::flat_hash_map stream_offsets; + phmap::flat_hash_map finalized_stream_offsets; + }; + + template + SerializedData serialize_internal(const ChunkList& chunks) noexcept { + const auto& kReservedBytesForReader = encoder::CompactBitSequence::reserved_bytes_for_reader(); + + SerializedData serialized_data; + serialized_data.chunks.reserve(get_chunk_count(chunks)); + + TimestampStreamsData timestamp_streams_data; + for (auto& chunk_data : chunks) { + using enum chunk::DataChunk::Type; + + if (chunk_data.is_open()) [[likely]] { + if (const auto& chunk = get_chunk(chunk_data); !chunk.is_empty()) [[likely]] { + fill_serialized_chunk(chunk, serialized_data.chunks.emplace_back(chunk_data.series_id()), timestamp_streams_data, + serialized_data.bytes_buffer); + } + } else { + fill_serialized_chunk(get_chunk(chunk_data), serialized_data.chunks.emplace_back(chunk_data.series_id()), + timestamp_streams_data, serialized_data.bytes_buffer); + } + } + + serialized_data.bytes_buffer.grow_to_fit_at_least(serialized_data.bytes_buffer.control_block().items_count + kReservedBytesForReader.size()); + std::memcpy(serialized_data.bytes_buffer + serialized_data.bytes_buffer.control_block().items_count, kReservedBytesForReader.data(), + kReservedBytesForReader.size()); + + return serialized_data; + } + + template + PROMPP_ALWAYS_INLINE static uint32_t get_chunk_count(const ChunkList& chunks) noexcept { + if constexpr (std::is_same_v) { + return chunks.non_empty_chunk_count(); + } else { + return chunks.size(); + } + } + + template + void fill_serialized_chunk(const chunk::DataChunk& chunk, + chunk::SerializedChunk& serialized_chunk, + TimestampStreamsData& timestamp_streams_data, + SerializedData::Memory& buffer) noexcept { + using enum EncodingType; + + serialized_chunk.encoding_state = chunk.encoding_state; + + uint32_t& data_size = buffer.control_block().items_count; + + if (chunk.encoding_state.encoding_type != kGorilla) [[likely]] { + fill_timestamp_stream_offset(storage_, timestamp_streams_data, chunk.timestamp_encoder_state_id, serialized_chunk, buffer); + } + + switch (chunk.encoding_state.encoding_type) { + case kUint32Constant: { + serialized_chunk.store_value_in_offset(chunk.encoder.uint32_constant); + break; + } + + case kFloat32Constant: { + serialized_chunk.store_value_in_offset(chunk.encoder.float32_constant); + break; + } + + case kDoubleConstant: { + serialized_chunk.set_offset(data_size); + buffer.grow_to_fit_at_least(data_size + sizeof(encoder::value::DoubleConstantEncoder)); + std::memcpy(buffer + data_size, &storage_.variant_encoders[chunk.encoder.external_index].double_constant, + sizeof(encoder::value::DoubleConstantEncoder)); + data_size += sizeof(encoder::value::DoubleConstantEncoder); + break; + } + + case kTwoDoubleConstant: { + serialized_chunk.set_offset(data_size); + buffer.grow_to_fit_at_least(data_size + sizeof(encoder::value::TwoDoubleConstantEncoder)); + std::memcpy(buffer + data_size, &storage_.variant_encoders[chunk.encoder.external_index].two_double_constant, + sizeof(encoder::value::TwoDoubleConstantEncoder)); + data_size += sizeof(encoder::value::TwoDoubleConstantEncoder); + break; + } + + case kAscInteger: { + serialized_chunk.set_offset(data_size); + write_compact_bit_sequence(storage_.get_asc_integer_stream(chunk.encoder.external_index), buffer); + break; + } + + case kAscIntegerThenValuesGorilla: { + serialized_chunk.set_offset(data_size); + write_compact_bit_sequence(storage_.get_asc_integer_then_values_gorilla_stream(chunk.encoder.external_index), buffer); + break; + } + + case kValuesGorilla: { + serialized_chunk.set_offset(data_size); + write_compact_bit_sequence(storage_.get_values_gorilla_stream(chunk.encoder.external_index), buffer); + break; + } + + case kGorilla: { + serialized_chunk.set_offset(data_size); + write_compact_bit_sequence(storage_.get_gorilla_encoder_stream(chunk.encoder.external_index), buffer); + break; + } + + default: { + assert(chunk.encoding_state.encoding_type != kUnknown); + } + } + } + + template + [[nodiscard]] const chunk::DataChunk& get_chunk(const querier::QueriedChunk& queried_chunk) const noexcept { + if constexpr (chunk_type == chunk::DataChunk::Type::kOpen) { + return storage_.open_chunks[queried_chunk.series_id()]; + } else { + auto finalized_chunk_it = storage_.finalized_chunks.find(queried_chunk.series_id())->second.begin(); + std::advance(finalized_chunk_it, queried_chunk.finalized_chunk_id); + return *finalized_chunk_it; + } + } + + template + [[nodiscard]] static const chunk::DataChunk& get_chunk(const DataStorage::SeriesChunkIterator::Data& chunk) noexcept { + return chunk.chunk(); + } + + template + static void fill_timestamp_stream_offset(const DataStorage& storage, + TimestampStreamsData& timestamp_streams_data, + encoder::timestamp::State::Id timestamp_stream_id, + chunk::SerializedChunk& serialized_chunk, + SerializedData::Memory& buffer) noexcept { + uint32_t data_size = buffer.control_block().items_count; + if constexpr (chunk_type == chunk::DataChunk::Type::kOpen) { + if (const auto it = timestamp_streams_data.stream_offsets.find(timestamp_stream_id); it == timestamp_streams_data.stream_offsets.end()) [[unlikely]] { + timestamp_streams_data.stream_offsets.emplace(timestamp_stream_id, data_size); + serialized_chunk.timestamps_offset = data_size; + write_compact_bit_sequence(storage.get_timestamp_stream(timestamp_stream_id).stream, buffer); + } else { + serialized_chunk.timestamps_offset = it->second; + } + } else { + if (const auto it = timestamp_streams_data.finalized_stream_offsets.find(timestamp_stream_id); + it == timestamp_streams_data.finalized_stream_offsets.end()) [[unlikely]] { + timestamp_streams_data.finalized_stream_offsets.emplace(timestamp_stream_id, data_size); + serialized_chunk.timestamps_offset = data_size; + write_compact_bit_sequence(storage.get_timestamp_stream(timestamp_stream_id).stream, buffer); + } else { + serialized_chunk.timestamps_offset = it->second; + } + } + } + + template + static void write_compact_bit_sequence(const CompactBitSequence& bit_sequence, SerializedData::Memory& buffer) noexcept { + const auto bytes_count = bit_sequence.size_in_bytes(); + uint32_t& data_size = buffer.control_block().items_count; + buffer.grow_to_fit_at_least(data_size + bytes_count); + std::memcpy(buffer + data_size, bit_sequence.raw_bytes(), bytes_count); + data_size += bytes_count; + } + + const DataStorage& storage_; +}; + +class SerializedDataView { + public: + static constexpr uint32_t kNoMoreSeries = std::numeric_limits::max(); + + class SeriesIterator { + public: + using iterator_category = std::forward_iterator_tag; + using value_type = encoder::Sample; + using difference_type = ptrdiff_t; + using pointer = value_type*; + using reference = value_type&; + + SeriesIterator(const SerializedData::Memory& buffer, chunk::SerializedChunkSpan chunks, uint32_t chunk_id) + : decode_iter_(std::in_place_type, 0, BareBones::BitSequenceReader(nullptr, 0), 0, false), + chunk_iter_(chunks.begin() + chunk_id), + series_id_(chunk_iter_->label_set_id), + buffer_(buffer.control_block().data, buffer.size()), + chunks_(chunks) { + Decoder::create_decode_iterator(buffer_, *chunk_iter_, [&](Iterator&& begin, auto&&) { + decode_iter_ = decoder::UniversalDecodeIterator{std::in_place_type, std::forward(begin)}; + }); + } + + [[nodiscard]] PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const noexcept { return *decode_iter_; } + [[nodiscard]] PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const noexcept { return decode_iter_.operator->(); } + + PROMPP_ALWAYS_INLINE SeriesIterator& operator++() noexcept { + ++decode_iter_; + if (decode_iter_ == decoder::DecodeIteratorSentinel{}) [[unlikely]] { + if (std::next(chunk_iter_) != chunks_.end() && series_id_ == std::next(chunk_iter_)->label_set_id) { + ++chunk_iter_; + Decoder::create_decode_iterator(buffer_, *chunk_iter_, [&](Iterator&& begin, auto&&) { + decode_iter_ = decoder::UniversalDecodeIterator{std::in_place_type, std::forward(begin)}; + }); + } + } + return *this; + } + + PROMPP_ALWAYS_INLINE SeriesIterator operator++(int) noexcept { + const auto it = *this; + ++*this; + return it; + } + + PROMPP_ALWAYS_INLINE bool operator==(const decoder::DecodeIteratorSentinel&) const noexcept { + return (decode_iter_ == decoder::DecodeIteratorSentinel{}) && + (std::next(chunk_iter_) == chunks_.end() || series_id_ != std::next(chunk_iter_)->label_set_id); + } + + private: + decoder::UniversalDecodeIterator decode_iter_; + chunk::SerializedChunkSpan::const_iterator chunk_iter_; + uint32_t series_id_; + + std::span buffer_; + chunk::SerializedChunkSpan chunks_; + }; + + explicit SerializedDataView(const SerializedData& serialized_data) : data_(serialized_data), series_index_{kNoMoreSeries} {} + + [[nodiscard]] PROMPP_ALWAYS_INLINE chunk::SerializedChunkSpan get_chunks() const noexcept { return {data_.chunks.data(), data_.chunks.size()}; } + [[nodiscard]] PROMPP_ALWAYS_INLINE std::span get_buffer() const noexcept { + return {data_.bytes_buffer.control_block().data, data_.bytes_buffer.size()}; + } + + [[nodiscard]] uint32_t next_series() noexcept { + const auto& chunks = data_.chunks; + if (series_index_ == kNoMoreSeries) [[unlikely]] { + if (chunks.empty()) [[unlikely]] { + return kNoMoreSeries; + } + series_index_ = 0; + return chunks[0].label_set_id; + } + + if (series_index_ == chunks.size()) [[unlikely]] { + return kNoMoreSeries; + } + + const uint32_t current_series_id = chunks[series_index_].label_set_id; + do { + ++series_index_; + } while (series_index_ < chunks.size() && chunks[series_index_].label_set_id == current_series_id); + + if (series_index_ == chunks.size()) [[unlikely]] { + return kNoMoreSeries; + } + + return chunks[series_index_].label_set_id; + } + + [[nodiscard]] SeriesIterator create_current_series_iterator() const noexcept { return {data_.bytes_buffer, get_chunks(), series_index_}; } + + private: + const SerializedData& data_; + uint32_t series_index_; +}; +} // namespace series_data::serialization \ No newline at end of file diff --git a/pp/series_data/serialization/serializer.h b/pp/series_data/serialization/serializer.h index 814105959..c0a357bf5 100644 --- a/pp/series_data/serialization/serializer.h +++ b/pp/series_data/serialization/serializer.h @@ -7,7 +7,6 @@ #include "series_data/querier/query.h" namespace series_data::serialization { - class Serializer { public: explicit Serializer(const DataStorage& storage) : storage_(storage) {} diff --git a/pp/series_data/tests/serialization/serializer_deserializer_new_tests.cpp b/pp/series_data/tests/serialization/serializer_deserializer_new_tests.cpp new file mode 100644 index 000000000..ddf88047d --- /dev/null +++ b/pp/series_data/tests/serialization/serializer_deserializer_new_tests.cpp @@ -0,0 +1,905 @@ +#include + +#include "bare_bones/streams.h" +#include "series_data/data_storage.h" +#include "series_data/encoder.h" +#include "series_data/encoder/bit_sequence.h" +#include "series_data/serialization/deserializer.h" +#include "series_data/serialization/serialized_data.h" +#include "series_data/serialization/serializer.h" + +namespace { + +using BareBones::Encoding::Gorilla::STALE_NAN; +using series_data::ChunkFinalizer; +using series_data::DataStorage; +using series_data::Encoder; +using series_data::EncodingType; +using series_data::chunk::DataChunk; +using series_data::decoder::DecodeIteratorSentinel; +using series_data::encoder::Sample; +using series_data::encoder::SampleList; +using series_data::querier::QueriedChunk; +using series_data::querier::QueriedChunkList; +using series_data::serialization::DataSerializer; +using series_data::serialization::SerializedData; +using series_data::serialization::SerializedDataView; + +class SerializerDeserializerTrait { + protected: + DataStorage storage_; + Encoder<> encoder_{storage_}; + DataSerializer serializer_{storage_}; + + [[nodiscard]] PROMPP_ALWAYS_INLINE static SampleList decode_current_chunk(SerializedDataView& data, uint32_t series_id) { + SampleList result; + + EXPECT_EQ(series_id, data.next_series()); + + std::ranges::copy(data.create_current_series_iterator(), DecodeIteratorSentinel{}, std::back_insert_iterator(result)); + + return result; + } +}; + +class SerializerDeserializerFixtureNew : public SerializerDeserializerTrait, public testing::Test {}; + +TEST_F(SerializerDeserializerFixtureNew, EmptyChunksList) { + // Arrange + + // Act + const SerializedData serialized = serializer_.serialize({}); + const SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(0U, serialized_view.get_chunks().size()); + ASSERT_EQ(series_data::encoder::CompactBitSequence::reserved_bytes_for_reader().size(), serialized_view.get_buffer().size()); +} + +TEST_F(SerializerDeserializerFixtureNew, TwoUint32ConstantChunkWithCommonTimestampStream) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(1, 1, 1.0); + + encoder_.encode(0, 2, 1.0); + encoder_.encode(1, 2, 1.0); + + encoder_.encode(0, 3, 1.0); + encoder_.encode(1, 3, 1.0); + + // Act + const SerializedData serialized = serializer_.serialize({QueriedChunk{0}, QueriedChunk{1}}); + SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(2U, serialized_view.get_chunks().size()); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[0].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[1].encoding_state.encoding_type); + EXPECT_EQ(serialized_view.get_chunks()[0].timestamps_offset, serialized_view.get_chunks()[1].timestamps_offset); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 1, .value = 1.0}, + {.timestamp = 2, .value = 1.0}, + {.timestamp = 3, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 0))); + + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 1, .value = 1.0}, + {.timestamp = 2, .value = 1.0}, + {.timestamp = 3, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 1))); +} + +TEST_F(SerializerDeserializerFixtureNew, TwoUint32ConstantFinalizedChunkWithCommonTimestampStream) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(1, 1, 1.0); + + encoder_.encode(0, 2, 1.0); + encoder_.encode(1, 2, 1.0); + + encoder_.encode(0, 3, 1.0); + encoder_.encode(1, 3, 1.0); + + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + ChunkFinalizer::finalize(storage_, 1, storage_.open_chunks[1]); + encoder_.encode(0, 4, 1.0); + encoder_.encode(1, 4, 1.0); + + // Act + const SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(4U, serialized_view.get_chunks().size()); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[0].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[1].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[2].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[3].encoding_state.encoding_type); + EXPECT_EQ(serialized_view.get_chunks()[0].timestamps_offset, serialized_view.get_chunks()[2].timestamps_offset); + EXPECT_EQ(serialized_view.get_chunks()[1].timestamps_offset, serialized_view.get_chunks()[3].timestamps_offset); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 1, .value = 1.0}, + {.timestamp = 2, .value = 1.0}, + {.timestamp = 3, .value = 1.0}, + {.timestamp = 4, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 0))); + + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 1, .value = 1.0}, + {.timestamp = 2, .value = 1.0}, + {.timestamp = 3, .value = 1.0}, + {.timestamp = 4, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 1))); +} + +TEST_F(SerializerDeserializerFixtureNew, ThreeUint32ConstantChunkWithCommonAndUniqueTimestampStream) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(1, 1, 1.0); + + encoder_.encode(0, 2, 1.0); + encoder_.encode(1, 2, 1.0); + + encoder_.encode(0, 3, 1.0); + encoder_.encode(1, 3, 1.0); + + encoder_.encode(2, 1, 2.0); + encoder_.encode(2, 2, 2.0); + encoder_.encode(2, 3, 2.0); + + // Act + const SerializedData serialized = serializer_.serialize({QueriedChunk{0}, QueriedChunk{1}, QueriedChunk{2}}); + SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(3U, serialized_view.get_chunks().size()); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[0].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[1].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[2].encoding_state.encoding_type); + EXPECT_EQ(serialized_view.get_chunks()[0].timestamps_offset, serialized_view.get_chunks()[1].timestamps_offset); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 1, .value = 1.0}, + {.timestamp = 2, .value = 1.0}, + {.timestamp = 3, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 0))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 1, .value = 1.0}, + {.timestamp = 2, .value = 1.0}, + {.timestamp = 3, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 1))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 1, .value = 2.0}, + {.timestamp = 2, .value = 2.0}, + {.timestamp = 3, .value = 2.0}, + }, + decode_current_chunk(serialized_view, 2))); +} + +TEST_F(SerializerDeserializerFixtureNew, AllChunkTypes) { + // Arrange + encoder_.encode(0, 100, 1.0); + + encoder_.encode(1, 101, 1.1); + + encoder_.encode(2, 102, 1.1); + encoder_.encode(2, 103, 1.2); + + encoder_.encode(3, 104, 1.0); + encoder_.encode(3, 105, 2.0); + encoder_.encode(3, 106, 3.0); + + encoder_.encode(4, 107, 1.1); + encoder_.encode(20, 107, 1.1); + encoder_.encode(4, 108, 2.1); + encoder_.encode(20, 108, 2.1); + encoder_.encode(4, 109, 3.1); + + encoder_.encode(5, 110, 1.1); + encoder_.encode(5, 111, 2.1); + encoder_.encode(5, 112, 3.1); + + encoder_.encode(6, 113, 2.0); + + encoder_.encode(7, 114, -1.0); + encoder_.encode(7, 115, -1.0); + + encoder_.encode(8, 120, 1.0); + encoder_.encode(8, 121, 2.0); + encoder_.encode(8, 122, 3.0); + encoder_.encode(8, 123, 4.1); + + // Act + SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(10U, serialized_view.get_chunks().size()); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[0].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kDoubleConstant, serialized_view.get_chunks()[1].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[2].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscInteger, serialized_view.get_chunks()[3].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kValuesGorilla, serialized_view.get_chunks()[4].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kGorilla, serialized_view.get_chunks()[5].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[6].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kFloat32Constant, serialized_view.get_chunks()[7].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscIntegerThenValuesGorilla, serialized_view.get_chunks()[8].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[9].encoding_state.encoding_type); + ASSERT_EQ(20U, serialized_view.get_chunks()[9].label_set_id); + + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 100, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 0))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 101, .value = 1.1}, + }, + decode_current_chunk(serialized_view, 1))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 102, .value = 1.1}, + {.timestamp = 103, .value = 1.2}, + }, + decode_current_chunk(serialized_view, 2))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 104, .value = 1.0}, + {.timestamp = 105, .value = 2.0}, + {.timestamp = 106, .value = 3.0}, + }, + decode_current_chunk(serialized_view, 3))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 107, .value = 1.1}, + {.timestamp = 108, .value = 2.1}, + {.timestamp = 109, .value = 3.1}, + }, + decode_current_chunk(serialized_view, 4))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 110, .value = 1.1}, + {.timestamp = 111, .value = 2.1}, + {.timestamp = 112, .value = 3.1}, + }, + decode_current_chunk(serialized_view, 5))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 113, .value = 2.0}, + }, + decode_current_chunk(serialized_view, 6))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 114, .value = -1.0}, + {.timestamp = 115, .value = -1.0}, + }, + decode_current_chunk(serialized_view, 7))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 120, .value = 1.0}, + {.timestamp = 121, .value = 2.0}, + {.timestamp = 122, .value = 3.0}, + {.timestamp = 123, .value = 4.1}, + }, + decode_current_chunk(serialized_view, 8))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 107, .value = 1.1}, + {.timestamp = 108, .value = 2.1}, + }, + decode_current_chunk(serialized_view, 20))); +} + +TEST_F(SerializerDeserializerFixtureNew, FinalizedAllChunkTypes) { + // Arrange + encoder_.encode(0, 100, 1.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + + encoder_.encode(1, 101, 1.1); + ChunkFinalizer::finalize(storage_, 1, storage_.open_chunks[1]); + + encoder_.encode(2, 102, 1.1); + encoder_.encode(2, 103, 1.2); + ChunkFinalizer::finalize(storage_, 2, storage_.open_chunks[2]); + + encoder_.encode(3, 104, 1.0); + encoder_.encode(3, 105, 2.0); + encoder_.encode(3, 106, 3.0); + ChunkFinalizer::finalize(storage_, 3, storage_.open_chunks[3]); + + encoder_.encode(4, 107, 1.1); + encoder_.encode(20, 107, 1.1); + encoder_.encode(4, 108, 2.1); + encoder_.encode(20, 108, 2.1); + encoder_.encode(4, 109, 3.1); + ChunkFinalizer::finalize(storage_, 4, storage_.open_chunks[4]); + ChunkFinalizer::finalize(storage_, 20, storage_.open_chunks[20]); + + encoder_.encode(5, 110, 1.1); + encoder_.encode(5, 111, 2.1); + encoder_.encode(5, 112, 3.1); + ChunkFinalizer::finalize(storage_, 5, storage_.open_chunks[5]); + + encoder_.encode(6, 113, 2.0); + ChunkFinalizer::finalize(storage_, 6, storage_.open_chunks[6]); + + encoder_.encode(7, 114, -1.0); + encoder_.encode(7, 115, -1.0); + ChunkFinalizer::finalize(storage_, 7, storage_.open_chunks[7]); + + encoder_.encode(8, 120, 1.0); + encoder_.encode(8, 121, 2.0); + encoder_.encode(8, 122, 3.0); + encoder_.encode(8, 123, 4.1); + ChunkFinalizer::finalize(storage_, 8, storage_.open_chunks[8]); + + // Act + SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(10U, serialized_view.get_chunks().size()); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[0].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kDoubleConstant, serialized_view.get_chunks()[1].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[2].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscInteger, serialized_view.get_chunks()[3].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kValuesGorilla, serialized_view.get_chunks()[4].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kGorilla, serialized_view.get_chunks()[5].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[6].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kFloat32Constant, serialized_view.get_chunks()[7].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscIntegerThenValuesGorilla, serialized_view.get_chunks()[8].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[9].encoding_state.encoding_type); + ASSERT_EQ(20U, serialized_view.get_chunks()[9].label_set_id); + + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 100, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 0))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 101, .value = 1.1}, + }, + decode_current_chunk(serialized_view, 1))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 102, .value = 1.1}, + {.timestamp = 103, .value = 1.2}, + }, + decode_current_chunk(serialized_view, 2))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 104, .value = 1.0}, + {.timestamp = 105, .value = 2.0}, + {.timestamp = 106, .value = 3.0}, + }, + decode_current_chunk(serialized_view, 3))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 107, .value = 1.1}, + {.timestamp = 108, .value = 2.1}, + {.timestamp = 109, .value = 3.1}, + }, + decode_current_chunk(serialized_view, 4))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 110, .value = 1.1}, + {.timestamp = 111, .value = 2.1}, + {.timestamp = 112, .value = 3.1}, + }, + decode_current_chunk(serialized_view, 5))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 113, .value = 2.0}, + }, + decode_current_chunk(serialized_view, 6))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 114, .value = -1.0}, + {.timestamp = 115, .value = -1.0}, + }, + decode_current_chunk(serialized_view, 7))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 120, .value = 1.0}, + {.timestamp = 121, .value = 2.0}, + {.timestamp = 122, .value = 3.0}, + {.timestamp = 123, .value = 4.1}, + }, + decode_current_chunk(serialized_view, 8))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 107, .value = 1.1}, + {.timestamp = 108, .value = 2.1}, + }, + decode_current_chunk(serialized_view, 20))); +} + +TEST_F(SerializerDeserializerFixtureNew, ChunkWithFinalizedTimestampStream) { + // Arrange + encoder_.encode(0, 100, 1.0); + encoder_.encode(1, 100, 1.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + + // Act + const SerializedData serialized = serializer_.serialize({QueriedChunk{1}}); + SerializedDataView serialized_view(serialized); + + // Assert + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 100, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 1))); +} + +TEST_F(SerializerDeserializerFixtureNew, MultipleChunksOnOneSeriesId) { + // Arrange + encoder_.encode(0, 100, 1.0); + encoder_.encode(0, 101, 1.0); + encoder_.encode(0, 102, 1.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + encoder_.encode(0, 103, 1.0); + encoder_.encode(0, 104, 1.0); + encoder_.encode(0, 105, 1.0); + + // Act + const SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + // Assert + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 100, .value = 1.0}, + {.timestamp = 101, .value = 1.0}, + {.timestamp = 102, .value = 1.0}, + {.timestamp = 103, .value = 1.0}, + {.timestamp = 104, .value = 1.0}, + {.timestamp = 105, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 0))); +} + +TEST_F(SerializerDeserializerFixtureNew, QueryFinalizedOnly) { + // Arrange + encoder_.encode(0, 100, 1.0); + encoder_.encode(0, 101, 1.0); + encoder_.encode(0, 102, 1.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + encoder_.encode(0, 103, 1.0); + encoder_.encode(0, 104, 1.0); + encoder_.encode(0, 105, 1.0); + + // Act + const SerializedData serialized = serializer_.serialize({QueriedChunk{0, 0}}); + SerializedDataView serialized_view(serialized); + + // Assert + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 100, .value = 1.0}, + {.timestamp = 101, .value = 1.0}, + {.timestamp = 102, .value = 1.0}, + }, + decode_current_chunk(serialized_view, 0))); +} + +TEST_F(SerializerDeserializerFixtureNew, MultipleChunksOnOneSeriesIdWithSeveralFinalized) { + // Arrange + encoder_.encode(0, 100, 1.0); + encoder_.encode(0, 101, 2.0); + encoder_.encode(0, 102, 3.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + encoder_.encode(0, 103, 4.0); + encoder_.encode(0, 104, 5.0); + encoder_.encode(0, 105, 6.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + encoder_.encode(0, 106, 7.0); + encoder_.encode(0, 107, 8.0); + encoder_.encode(0, 108, 9.0); + + // Act + const SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + // Assert + EXPECT_TRUE(std::ranges::equal(SampleList{{.timestamp = 100, .value = 1.0}, + {.timestamp = 101, .value = 2.0}, + {.timestamp = 102, .value = 3.0}, + {.timestamp = 103, .value = 4.0}, + {.timestamp = 104, .value = 5.0}, + {.timestamp = 105, .value = 6.0}, + {.timestamp = 106, .value = 7.0}, + {.timestamp = 107, .value = 8.0}, + {.timestamp = 108, .value = 9.0}}, + decode_current_chunk(serialized_view, 0))); +} + +TEST_F(SerializerDeserializerFixtureNew, AllChunkTypesWithStalenan) { + // Arrange + encoder_.encode(0, 100, 1.0); + encoder_.encode(0, 101, STALE_NAN); + + encoder_.encode(1, 102, 1.1); + encoder_.encode(1, 103, STALE_NAN); + + encoder_.encode(2, 104, 1.1); + encoder_.encode(2, 105, 1.2); + encoder_.encode(2, 106, STALE_NAN); + + encoder_.encode(3, 107, 1.0); + encoder_.encode(3, 108, 2.0); + encoder_.encode(3, 109, 3.0); + encoder_.encode(3, 110, STALE_NAN); + + encoder_.encode(4, 111, 1.1); + encoder_.encode(20, 111, 1.1); + encoder_.encode(4, 112, 2.1); + encoder_.encode(20, 112, 2.1); + encoder_.encode(4, 113, 3.1); + encoder_.encode(4, 114, STALE_NAN); + encoder_.encode(20, 113, STALE_NAN); + + encoder_.encode(5, 115, 1.1); + encoder_.encode(5, 116, 2.1); + encoder_.encode(5, 117, 3.1); + encoder_.encode(5, 118, STALE_NAN); + + encoder_.encode(6, 119, 2.0); + encoder_.encode(6, 120, STALE_NAN); + + encoder_.encode(7, 121, -1.0); + encoder_.encode(7, 122, -1.0); + encoder_.encode(7, 123, STALE_NAN); + + encoder_.encode(8, 130, 1.0); + encoder_.encode(8, 131, 2.0); + encoder_.encode(8, 132, 3.0); + encoder_.encode(8, 133, 4.1); + encoder_.encode(8, 134, STALE_NAN); + + // Act + SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(10U, serialized_view.get_chunks().size()); + EXPECT_TRUE(std::ranges::all_of(serialized_view.get_chunks(), [](const auto& chunk) { return chunk.encoding_state.has_last_stalenan; })); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[0].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kDoubleConstant, serialized_view.get_chunks()[1].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[2].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscInteger, serialized_view.get_chunks()[3].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kValuesGorilla, serialized_view.get_chunks()[4].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kGorilla, serialized_view.get_chunks()[5].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[6].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kFloat32Constant, serialized_view.get_chunks()[7].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscIntegerThenValuesGorilla, serialized_view.get_chunks()[8].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[9].encoding_state.encoding_type); + ASSERT_EQ(20U, serialized_view.get_chunks()[9].label_set_id); + + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 100, .value = 1.0}, + {.timestamp = 101, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 0))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 102, .value = 1.1}, + {.timestamp = 103, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 1))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 104, .value = 1.1}, + {.timestamp = 105, .value = 1.2}, + {.timestamp = 106, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 2))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 107, .value = 1.0}, + {.timestamp = 108, .value = 2.0}, + {.timestamp = 109, .value = 3.0}, + {.timestamp = 110, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 3))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 111, .value = 1.1}, + {.timestamp = 112, .value = 2.1}, + {.timestamp = 113, .value = 3.1}, + {.timestamp = 114, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 4))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 115, .value = 1.1}, + {.timestamp = 116, .value = 2.1}, + {.timestamp = 117, .value = 3.1}, + {.timestamp = 118, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 5))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 119, .value = 2.0}, + {.timestamp = 120, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 6))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 121, .value = -1.0}, + {.timestamp = 122, .value = -1.0}, + {.timestamp = 123, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 7))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 130, .value = 1.0}, + {.timestamp = 131, .value = 2.0}, + {.timestamp = 132, .value = 3.0}, + {.timestamp = 133, .value = 4.1}, + {.timestamp = 134, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 8))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 111, .value = 1.1}, + {.timestamp = 112, .value = 2.1}, + {.timestamp = 113, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 20))); +} + +TEST_F(SerializerDeserializerFixtureNew, FinalizedAllChunkTypesWithStalenan) { + // Arrange + encoder_.encode(0, 100, 1.0); + encoder_.encode(0, 101, STALE_NAN); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + + encoder_.encode(1, 102, 1.1); + encoder_.encode(1, 103, STALE_NAN); + ChunkFinalizer::finalize(storage_, 1, storage_.open_chunks[1]); + + encoder_.encode(2, 104, 1.1); + encoder_.encode(2, 105, 1.2); + encoder_.encode(2, 106, STALE_NAN); + ChunkFinalizer::finalize(storage_, 2, storage_.open_chunks[2]); + + encoder_.encode(3, 107, 1.0); + encoder_.encode(3, 108, 2.0); + encoder_.encode(3, 109, 3.0); + encoder_.encode(3, 110, STALE_NAN); + ChunkFinalizer::finalize(storage_, 3, storage_.open_chunks[3]); + + encoder_.encode(4, 111, 1.1); + encoder_.encode(20, 111, 1.1); + encoder_.encode(4, 112, 2.1); + encoder_.encode(20, 112, 2.1); + encoder_.encode(4, 113, 3.1); + encoder_.encode(4, 114, STALE_NAN); + encoder_.encode(20, 113, STALE_NAN); + ChunkFinalizer::finalize(storage_, 4, storage_.open_chunks[4]); + ChunkFinalizer::finalize(storage_, 20, storage_.open_chunks[20]); + + encoder_.encode(5, 115, 1.1); + encoder_.encode(5, 116, 2.1); + encoder_.encode(5, 117, 3.1); + encoder_.encode(5, 118, STALE_NAN); + ChunkFinalizer::finalize(storage_, 5, storage_.open_chunks[5]); + + encoder_.encode(6, 119, 2.0); + encoder_.encode(6, 120, STALE_NAN); + ChunkFinalizer::finalize(storage_, 6, storage_.open_chunks[6]); + + encoder_.encode(7, 121, -1.0); + encoder_.encode(7, 122, -1.0); + encoder_.encode(7, 123, STALE_NAN); + ChunkFinalizer::finalize(storage_, 7, storage_.open_chunks[7]); + + encoder_.encode(8, 130, 1.0); + encoder_.encode(8, 131, 2.0); + encoder_.encode(8, 132, 3.0); + encoder_.encode(8, 133, 4.1); + encoder_.encode(8, 134, STALE_NAN); + ChunkFinalizer::finalize(storage_, 8, storage_.open_chunks[8]); + + // Act + SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + // Assert + ASSERT_EQ(10U, serialized_view.get_chunks().size()); + EXPECT_TRUE(std::ranges::all_of(serialized_view.get_chunks(), [](const auto& chunk) { return chunk.encoding_state.has_last_stalenan; })); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[0].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kDoubleConstant, serialized_view.get_chunks()[1].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[2].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscInteger, serialized_view.get_chunks()[3].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kValuesGorilla, serialized_view.get_chunks()[4].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kGorilla, serialized_view.get_chunks()[5].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kUint32Constant, serialized_view.get_chunks()[6].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kFloat32Constant, serialized_view.get_chunks()[7].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kAscIntegerThenValuesGorilla, serialized_view.get_chunks()[8].encoding_state.encoding_type); + ASSERT_EQ(EncodingType::kTwoDoubleConstant, serialized_view.get_chunks()[9].encoding_state.encoding_type); + ASSERT_EQ(20U, serialized_view.get_chunks()[9].label_set_id); + + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 100, .value = 1.0}, + {.timestamp = 101, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 0))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 102, .value = 1.1}, + {.timestamp = 103, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 1))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 104, .value = 1.1}, + {.timestamp = 105, .value = 1.2}, + {.timestamp = 106, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 2))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 107, .value = 1.0}, + {.timestamp = 108, .value = 2.0}, + {.timestamp = 109, .value = 3.0}, + {.timestamp = 110, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 3))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 111, .value = 1.1}, + {.timestamp = 112, .value = 2.1}, + {.timestamp = 113, .value = 3.1}, + {.timestamp = 114, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 4))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 115, .value = 1.1}, + {.timestamp = 116, .value = 2.1}, + {.timestamp = 117, .value = 3.1}, + {.timestamp = 118, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 5))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 119, .value = 2.0}, + {.timestamp = 120, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 6))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 121, .value = -1.0}, + {.timestamp = 122, .value = -1.0}, + {.timestamp = 123, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 7))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 130, .value = 1.0}, + {.timestamp = 131, .value = 2.0}, + {.timestamp = 132, .value = 3.0}, + {.timestamp = 133, .value = 4.1}, + {.timestamp = 134, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 8))); + EXPECT_TRUE(std::ranges::equal( + SampleList{ + {.timestamp = 111, .value = 1.1}, + {.timestamp = 112, .value = 2.1}, + {.timestamp = 113, .value = STALE_NAN}, + }, + decode_current_chunk(serialized_view, 20))); +} + +class SerializedDataNextIterFixture : public SerializerDeserializerTrait, public testing::Test { + protected: + static std::vector get_chunks_ids(SerializedDataView& view) { + std::vector ans{}; + uint32_t id = view.next_series(); + while (id != SerializedDataView::kNoMoreSeries) { + ans.push_back(id); + id = view.next_series(); + } + return ans; + } +}; + +TEST_F(SerializedDataNextIterFixture, EmptyChunksList) { + // Arrange + + // Act + const SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + const auto ids = get_chunks_ids(serialized_view); + + // Assert + EXPECT_TRUE(ids.empty()); + EXPECT_EQ(SerializedDataView::kNoMoreSeries, serialized_view.next_series()); +} + +TEST_F(SerializedDataNextIterFixture, OneChunk) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 1.0); + + // Act + const SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + auto ids = get_chunks_ids(serialized_view); + + // Assert + EXPECT_TRUE(std::ranges::equal(ids, std::initializer_list{0u})); + EXPECT_EQ(SerializedDataView::kNoMoreSeries, serialized_view.next_series()); +} + +TEST_F(SerializedDataNextIterFixture, OneChunkFinalized) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 1.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + encoder_.encode(0, 3, 1.0); + encoder_.encode(0, 4, 1.0); + + // Act + const SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + auto ids = get_chunks_ids(serialized_view); + + // Assert + EXPECT_TRUE(std::ranges::equal(ids, std::initializer_list{0u})); + EXPECT_EQ(SerializedDataView::kNoMoreSeries, serialized_view.next_series()); +} + +TEST_F(SerializedDataNextIterFixture, SeveralChunks) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(1, 1, 1.0); + + encoder_.encode(0, 2, 1.0); + encoder_.encode(1, 2, 1.0); + + encoder_.encode(0, 3, 1.0); + encoder_.encode(1, 3, 1.0); + + encoder_.encode(2, 1, 2.0); + encoder_.encode(2, 2, 2.0); + encoder_.encode(2, 3, 2.0); + + encoder_.encode(100, 4, 2.1); + encoder_.encode(100, 5, 2.2); + encoder_.encode(100, 7, 2.3); + + // Act + const SerializedData serialized = serializer_.serialize(); + SerializedDataView serialized_view(serialized); + + auto ids = get_chunks_ids(serialized_view); + + // Assert + EXPECT_TRUE(std::ranges::equal(ids, std::initializer_list{0u, 1u, 2u, 100u})); + EXPECT_EQ(SerializedDataView::kNoMoreSeries, serialized_view.next_series()); +} + +} // namespace \ No newline at end of file