Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pp/bare_bones/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <atomic>
#include <cstring>
#include <limits>

#include "preprocess.h"
#include "type_traits.h"
Expand Down
29 changes: 29 additions & 0 deletions pp/entrypoint/head/serialization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include "series_data/serialization/serialized_data.h"

namespace entrypoint::head {

class SerializedDataGo {
public:
explicit SerializedDataGo(const series_data::DataStorage& storage, const series_data::querier::QueriedChunkList& queried_chunks)
: data_{series_data::serialization::DataSerializer::serialize(storage, queried_chunks)} {}

[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_buffer() const noexcept { return data_view_.get_buffer(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_chunks() const noexcept { return data_view_.get_chunks(); }

[[nodiscard]] PROMPP_ALWAYS_INLINE uint32_t next() noexcept { return data_view_.next_series(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto iterator() const noexcept { return data_view_.create_current_series_iterator(); }

private:
series_data::serialization::SerializedData data_;
series_data::serialization::SerializedDataView data_view_{data_};
};

using SerializedDataPtr = std::unique_ptr<SerializedDataGo>;
using SerializedDataIteratorPtr = std::unique_ptr<series_data::serialization::SerializedDataView::SerializedSeriesIterator>;

static_assert(sizeof(SerializedDataPtr) == sizeof(void*));
static_assert(sizeof(SerializedDataIteratorPtr) == sizeof(void*));

} // namespace entrypoint::head
43 changes: 39 additions & 4 deletions pp/entrypoint/series_data/querier.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "primitives/primitives.h"
#include "series_data/querier/instant_querier.h"
#include "series_data/querier/querier.h"
#include "series_data/serialization/serialized_data.h"
#include "series_data/serialization/serializer.h"

namespace entrypoint::series_data {
Expand Down Expand Up @@ -87,15 +88,49 @@ class RangeQuerierWithArgumentsWrapper {
}
};

enum class QuerierType : uint8_t {
kInstantQuerier = 0,
kRangeQuerier,
class RangeQuerierWithArgumentsWrapperNew {
using DataStorage = ::series_data::DataStorage;
using LabelSetID = PromPP::Primitives::LabelSetID;
template <class T>
using Slice = PromPP::Primitives::Go::Slice<T>;
using Query = ::series_data::querier::Query<Slice<LabelSetID>>;
using Serializer = ::series_data::serialization::Serializer;
using BytesStream = PromPP::Primitives::Go::BytesStream;

public:
RangeQuerierWithArgumentsWrapperNew(DataStorage& storage, const Query& query, head::SerializedDataPtr* serialized_data)
: querier_(storage), query_(&query), serialized_data_(serialized_data) {}

void query() noexcept {
querier_.query(*query_);
if (!querier_.need_loading()) {
serialize_chunks();
}
}

PROMPP_ALWAYS_INLINE void query_finalize() const noexcept { serialize_chunks(); }

[[nodiscard]] const BareBones::Bitset& series_to_load() const noexcept { return querier_.get_series_to_load(); }
[[nodiscard]] bool need_loading() const noexcept { return querier_.need_loading(); }
[[nodiscard]] DataStorage& storage() noexcept { return querier_.get_storage(); }

private:
::series_data::querier::Querier querier_;
const Query* query_;
head::SerializedDataPtr* serialized_data_;

PROMPP_ALWAYS_INLINE void serialize_chunks() const noexcept {
std::construct_at(serialized_data_, std::make_unique<head::SerializedDataGo>(querier_.get_storage(), querier_.chunks()));
}
};

using QuerierVariant = std::variant<InstantQuerierWithArgumentsWrapperEntrypoint, RangeQuerierWithArgumentsWrapper>;
enum class QuerierType : uint8_t { kInstantQuerier = 0, kRangeQuerier, kRangeQuerierNew };

using QuerierVariant = std::variant<InstantQuerierWithArgumentsWrapperEntrypoint, RangeQuerierWithArgumentsWrapper, RangeQuerierWithArgumentsWrapperNew>;
using QuerierVariantPtr = std::unique_ptr<QuerierVariant>;

} // namespace entrypoint::series_data

static_assert(entrypoint::series_data::QuerierInterface<entrypoint::series_data::InstantQuerierWithArgumentsWrapperEntrypoint>);
static_assert(entrypoint::series_data::QuerierInterface<entrypoint::series_data::RangeQuerierWithArgumentsWrapper>);
static_assert(entrypoint::series_data::QuerierInterface<entrypoint::series_data::RangeQuerierWithArgumentsWrapperNew>);
48 changes: 48 additions & 0 deletions pp/entrypoint/series_data_data_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "head/chunk_recoder.h"
#include "head/data_storage.h"
#include "head/lss.h"
#include "head/serialization.h"
#include "primitives/go_slice.h"
#include "series_data/data_storage.h"
#include "series_data/loader.h"
Expand Down Expand Up @@ -142,6 +143,36 @@ extern "C" void prompp_series_data_data_storage_query(void* args, void* res) {
}
}

extern "C" void prompp_series_data_data_storage_query_new(void* args, void* res) {
using Query = series_data::querier::Query<Slice<LabelSetID>>;
using entrypoint::series_data::RangeQuerierWithArgumentsWrapperNew;
using series_data::querier::Querier;

struct Arguments {
DataStoragePtr data_storage;
Query query;
};

struct Result {
QuerierVariantPtr querier{};
QueryStatus status;
entrypoint::head::SerializedDataPtr* serialized_data{};
};

const auto in = static_cast<Arguments*>(args);
auto* out = new (res) Result();

RangeQuerierWithArgumentsWrapperNew querier(*in->data_storage, in->query, out->serialized_data);
querier.query();

if (querier.need_loading()) {
out->querier = std::make_unique<QuerierVariant>(std::in_place_index<2>, std::move(querier));
out->status = QueryStatus::kNeedDataLoad;
} else {
out->status = QueryStatus::kSuccess;
}
}

extern "C" void prompp_series_data_data_storage_instant_query(void* args, void* res) {
using entrypoint::series_data::InstantQuerierWithArgumentsWrapperEntrypoint;
using PromPP::Primitives::Timestamp;
Expand Down Expand Up @@ -249,6 +280,23 @@ extern "C" void prompp_series_data_serialized_chunk_recoder_ctor(void* args, voi
};
}

extern "C" void prompp_series_data_serialized_chunk_recoder_new_ctor(void* args, void* res) {
struct Arguments {
entrypoint::head::SerializedDataPtr* serialized_data;
PromPP::Primitives::TimeInterval time_interval;
};
struct Result {
ChunkRecoderVariantPtr chunk_recoder;
};

const auto in = static_cast<Arguments*>(args);
new (res) Result{
.chunk_recoder = std::make_unique<ChunkRecoderVariant>(
std::in_place_type<SerializedChunkRecoder>,
series_data::chunk::SerializedChunkIterator{in->serialized_data->get()->get_buffer(), in->serialized_data->get()->get_chunks()}, in->time_interval),
};
}

extern "C" void prompp_series_data_chunk_recoder_recode_next_chunk(void* args, void* res) {
struct Arguments {
ChunkRecoderVariantPtr chunk_recoder;
Expand Down
32 changes: 32 additions & 0 deletions pp/entrypoint/series_data_data_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ void prompp_series_data_data_storage_allocated_memory(void* args, void* res);
*/
void prompp_series_data_data_storage_query(void* args, void* res);

/**
* @brief Queries data storage and serializes result (new serialization model).
*
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* query DataStorageQuery // query
* }
*
* @param res {
* Querier uintptr // pointer to constructed Querier if data loading is needed
* Status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* serializedData uintptr // pointer to serialized data
* }
*/
void prompp_series_data_data_storage_query_new(void* args, void* res);

/**
* @brief return samples at given timestamp for label sets.
*
Expand Down Expand Up @@ -178,6 +194,22 @@ void prompp_series_data_chunk_recoder_ctor(void* args, void* res);
*/
void prompp_series_data_serialized_chunk_recoder_ctor(void* args, void* res);

/**
* @brief Construct a new ChunkRecoder object to recode all serialized chunks (new model)
*
* @param args {
* serializedData uintptr // pointer to serialized data
* time_interval struct { // closed interval [min, max]
* min int64
* max int64
* }
* }
* @param res {
* chunk_recoder uintptr // pointer to chunk recoder
* }
*/
void prompp_series_data_serialized_chunk_recoder_new_ctor(void* args, void* res);

/**
* @brief Get chunk encoded in prometheus format
*
Expand Down
69 changes: 69 additions & 0 deletions pp/entrypoint/series_data_serialization_serialized_data.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include "series_data_serialization_serialized_data.h"

#include "head/serialization.h"

extern "C" void prompp_series_data_serialization_serialized_data_next(void* args, void* res) {
struct Arguments {
entrypoint::head::SerializedDataPtr* serialized_data;
};

using Result = struct {
uint32_t series_id;
};

new (res) Result{.series_id = reinterpret_cast<Arguments*>(args)->serialized_data->get()->next()};
}

extern "C" void prompp_series_data_serialization_serialized_data_iterator(void* args, void* res) {
struct Arguments {
entrypoint::head::SerializedDataPtr* serialized_data;
};

using Result = struct {
entrypoint::head::SerializedDataIteratorPtr iterator;
};

new (res) Result{.iterator = std::make_unique<series_data::serialization::SerializedDataView::SerializedSeriesIterator>(
static_cast<Arguments*>(args)->serialized_data->get()->iterator())};
}

extern "C" void prompp_series_data_serialization_serialized_data_iterator_next(void* args, void* res) {
using series_data::decoder::DecodeIteratorSentinel;

struct Arguments {
entrypoint::head::SerializedDataIteratorPtr iterator;
};

struct Result {
int64_t timestamp{};
double value{};
bool has_value;
};

Arguments* in = reinterpret_cast<Arguments*>(args);

if (*in->iterator == DecodeIteratorSentinel{}) {
new (res) Result{.has_value = false};
} else {
const auto sample = **(in->iterator);
new (res) Result{.timestamp = sample.timestamp, .value = sample.value, .has_value = true};
++(*in->iterator);
}
}

extern "C" void prompp_series_data_serialization_serialized_data_iterator_dtor(void* args) {
struct Arguments {
entrypoint::head::SerializedDataIteratorPtr iterator;
};

static_cast<Arguments*>(args)->~Arguments();
}

extern "C" void prompp_series_data_serialization_serialized_data_dtor(void* args) {
struct Arguments {
entrypoint::head::SerializedDataPtr* serialized_data;
};

std::destroy_at(static_cast<Arguments*>(args)->serialized_data);
static_cast<Arguments*>(args)->~Arguments();
}
68 changes: 68 additions & 0 deletions pp/entrypoint/series_data_serialization_serialized_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#ifdef __cplusplus
extern "C" {
#endif

/**
* @brief Get next series_id in serialized data.
*
* @param args {
* serializedData uintptr // pointer to serialized data.
* }
*
* @param res {
* series_id uint32 // series id (UINT32_MAX if no more series).
* }
*/
void prompp_series_data_serialization_serialized_data_next(void* args, void* res);

/**
* @brief Create a decode iterator for current series_id (returned by the last call of _next())
*
* @param args {
* serializedData uintptr // pointer to serialized data.
* }
*
* @param res {
* iterator uintptr // pointer to constructed decode iterator.
* }
*/
void prompp_series_data_serialization_serialized_data_iterator(void* args, void* res);

/**
* @brief Advance decode iterator.
*
* @param args {
* iterator uintptr // pointer to decode iterator
* }
*
* @param res {
* has_data bool // is iterator has more data to decode.
* timestamp int64 // sample timestamp
* value float64 // sample value
* }
*/
void prompp_series_data_serialization_serialized_data_iterator_next(void* args, void* res);

/**
* @brief Destroy decode iterator.
*
* @param args {
* iterator uintptr // pointer to decode iterator
* }
*
*/
void prompp_series_data_serialization_serialized_data_iterator_dtor(void* args);

/**
* @brief Destroy serialized data object.
*
* @param args {
* serializedData uintptr // pointer to serialized data.
* }
*
*/
void prompp_series_data_serialization_serialized_data_dtor(void* args);

#ifdef __cplusplus
} // extern "C"
#endif
Loading
Loading