From bb8d3027122c7a7ab2afbc2ded407479668e5727 Mon Sep 17 00:00:00 2001 From: mencagli Date: Tue, 27 Feb 2024 21:37:49 +0100 Subject: [PATCH] [FIX] Fixing of persistent operators --- tests/rocksdb_tests/rocksdb_common.hpp | 2 +- tests/rocksdb_tests/test_rocksdb_6.cpp | 10 ---------- tests/rocksdb_tests/test_rocksdb_8.cpp | 10 ---------- wf/persistent/builders_rocksdb.hpp | 22 +++++++++++++++++++++- wf/persistent/p_window_replica.hpp | 19 +++++-------------- wf/window_replica.hpp | 6 +++--- 6 files changed, 30 insertions(+), 39 deletions(-) diff --git a/tests/rocksdb_tests/rocksdb_common.hpp b/tests/rocksdb_tests/rocksdb_common.hpp index f34ec3f9..594574f7 100644 --- a/tests/rocksdb_tests/rocksdb_common.hpp +++ b/tests/rocksdb_tests/rocksdb_common.hpp @@ -467,7 +467,7 @@ class WinSink_Functor if (out) { received++; totalsum += (*out).value; - //std::cout << "Ricevuto risultato finestra wid: " << (*out).wid << ", chiave: " << (*out).key << ", valore: " << (*out).value << std::endl; + std::cout << "Ricevuto risultato finestra wid: " << (*out).wid << ", chiave: " << (*out).key << ", valore: " << (*out).value << std::endl; } else { // printf("Received: %ld results, total sum: %ld\n", received, totalsum); diff --git a/tests/rocksdb_tests/test_rocksdb_6.cpp b/tests/rocksdb_tests/test_rocksdb_6.cpp index c0ebe08f..63a58ce0 100644 --- a/tests/rocksdb_tests/test_rocksdb_6.cpp +++ b/tests/rocksdb_tests/test_rocksdb_6.cpp @@ -107,15 +107,6 @@ int main(int argc, char *argv[]) cout << "| | (" << source_degree << ") +-->+ (" << filter_degree << ") +-->+ (" << flatmap_degree << ") +-->+ (" << map_degree << ") +-->+ (" << kw_degree << ") +-->+ (" << sink_degree << ") | |" << endl; cout << "| +-----+ +-----+ +------+ +-----+ +--------+ +-----+ |" << endl; cout << "+-----------------------------------------------------------------+" << endl; - auto tuple_serializer = [](tuple_t &t) -> std::string { - return std::to_string(t.key) + "," + std::to_string(t.value); - }; - auto tuple_deserializer = [](std::string &s) -> tuple_t { - tuple_t t; - t.key = atoi(s.substr(0, s.find(",")).c_str()); - t.value = atoi(s.substr(s.find(",")+1, s.length()-1).c_str()); - return t; - }; auto result_serializer = [](result_t &r) -> std::string { return std::to_string(r.key) + "," + std::to_string(r.value) + ";" + std::to_string(r.wid); }; @@ -160,7 +151,6 @@ int main(int argc, char *argv[]) .withParallelism(kw_degree) .withKeyBy([](const tuple_t &t) -> size_t { return t.key; }) .withCBWindows(win_len, win_slide) - .withTupleSerializerAndDeserializer(tuple_serializer, tuple_deserializer) .withResultSerializerAndDeserializer(result_serializer, result_deserializer) .build(); #else diff --git a/tests/rocksdb_tests/test_rocksdb_8.cpp b/tests/rocksdb_tests/test_rocksdb_8.cpp index b0010644..9c31da65 100644 --- a/tests/rocksdb_tests/test_rocksdb_8.cpp +++ b/tests/rocksdb_tests/test_rocksdb_8.cpp @@ -108,15 +108,6 @@ int main(int argc, char *argv[]) cout << "| | (" << source_degree << ") +-->+ (" << filter_degree << ") +-->+ (" << flatmap_degree << ") +-->+ (" << map_degree << ") +-->+ (" << kw_degree << ") +-->+ (" << sink_degree << ") | |" << endl; cout << "| +-----+ +-----+ +------+ +-----+ +--------+ +-----+ |" << endl; cout << "+-----------------------------------------------------------------+" << endl; - auto tuple_serializer = [](tuple_t &t) -> std::string { - return std::to_string(t.key) + "," + std::to_string(t.value); - }; - auto tuple_deserializer = [](std::string &s) -> tuple_t { - tuple_t t; - t.key = atoi(s.substr(0, s.find(",")).c_str()); - t.value = atoi(s.substr(s.find(",")+1, s.length()-1).c_str()); - return t; - }; auto result_serializer = [](result_t &r) -> std::string { return std::to_string(r.key) + "," + std::to_string(r.value) + ";" + std::to_string(r.wid); }; @@ -165,7 +156,6 @@ int main(int argc, char *argv[]) .withParallelism(kw_degree) .withKeyBy([](const tuple_t &t) -> size_t { return t.key; }) .withTBWindows(microseconds(win_len), microseconds(win_slide)) - .withTupleSerializerAndDeserializer(tuple_serializer, tuple_deserializer) .withResultSerializerAndDeserializer(result_serializer, result_deserializer) .build(); #else diff --git a/wf/persistent/builders_rocksdb.hpp b/wf/persistent/builders_rocksdb.hpp index 80e47174..0bc99163 100644 --- a/wf/persistent/builders_rocksdb.hpp +++ b/wf/persistent/builders_rocksdb.hpp @@ -1273,7 +1273,7 @@ class P_Keyed_Windows_Builder: public P_Basic_Builder key_t { return key_t(); }; // key extractor bool isKeyBySet = false; // true if a key extractor has been provided size_t frag_bytes = sizeof(tuple_t) * 16; // size in bytes of each archive fragment of the stream - bool results_in_memory = true; // flag stating if results must be kepts on memory or on RocksDB + bool results_in_memory = true; // flag stating if results must be kepts in memory or on RocksDB bool isTupleFunctions = false; // flag stating if the tuple serializer/deserializer have been provided bool isResultFunctions = false; // flag stating if the result serializer/deserializer have been provided uint64_t win_len=0; // window length in number of tuples or in time units @@ -1428,6 +1428,7 @@ class P_Keyed_Windows_Builder: public P_Basic_Builder 1 requires a key extractor" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check the presence of the tuple/result serializer/deserializer according to type of the window processing logic + if constexpr (std::is_invocable &, result_t &>::value || + std::is_invocable &, result_t &, RuntimeContext &>::value) { + if (!isTupleFunctions) { + std::cerr << RED << "WindFlow Error: P_Keyed_Windows instantiated with a non-incremental logic without tuple serializer/serializer" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + } + else if constexpr (std::is_invocable::value || + std::is_invocable::value) { + if (isTupleFunctions) { + std::cerr << RED << "WindFlow Error: P_Keyed_Windows receives tuple serializer/deserializer with an incremental logic" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + if (!isResultFunctions) { + std::cerr << RED << "WindFlow Error: P_Keyed_Windows instantiated with an incremental logic without result serializer/serializer" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + } return p_keyed_wins_t(func, key_extr, this->parallelism, diff --git a/wf/persistent/p_window_replica.hpp b/wf/persistent/p_window_replica.hpp index 8f11ee32..cee57753 100644 --- a/wf/persistent/p_window_replica.hpp +++ b/wf/persistent/p_window_replica.hpp @@ -230,6 +230,7 @@ class P_Window_Replica: public Basic_Replica return final_range; } +#if 0 // getDistance method size_t getDistance(const wrapper_t &_w1, const wrapper_t &_w2, @@ -254,6 +255,7 @@ class P_Window_Replica: public Basic_Replica its.second = history_buffer.end(); return std::distance(its.first, its.second); } +#endif // getEnd method input_iterator_t getEnd(Key_Descriptor &_kd) @@ -336,17 +338,6 @@ class P_Window_Replica: public Basic_Replica delete mydb_results; } - /* - template - rocksdb::DB *get_operator_internal_db() - { - if (std::is_same::value) { - return mydb_wrappers->get_internal_db(); - } - return mydb_results->get_internal_db(); - } - */ - // svc (utilized by the FastFlow runtime) void *svc(void *_in) override { @@ -504,9 +495,9 @@ class P_Window_Replica: public Basic_Replica (this->context).setContextParameters(_timestamp, _watermark); // set the parameter of the RuntimeContext func(iter, res, this->context); } - } - if (t_s) { // purge tuples from the archive - purge(*t_s, key_d, key); + if (t_s) { // purge tuples from the archive + purge(*t_s, key_d, key); + } } cnt_fired++; key_d.last_lwid++; diff --git a/wf/window_replica.hpp b/wf/window_replica.hpp index bd3f0c8f..62fbb1bd 100644 --- a/wf/window_replica.hpp +++ b/wf/window_replica.hpp @@ -322,9 +322,9 @@ class Window_Replica: public Basic_Replica (this->context).setContextParameters(_timestamp, _watermark); // set the parameter of the RuntimeContext func(iter, win.getResult(), this->context); } - } - if (t_s) { // purge tuples from the archive - (key_d.archive).purge(*t_s); + if (t_s) { // purge tuples from the archive + (key_d.archive).purge(*t_s); + } } cnt_fired++; key_d.last_lwid++;