diff --git a/.DS_Store b/.DS_Store index 60bcd144..71b2d471 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/wf/accumulator.hpp b/wf/accumulator.hpp index 3bc9be5b..3bcec871 100644 --- a/wf/accumulator.hpp +++ b/wf/accumulator.hpp @@ -74,6 +74,7 @@ class Accumulator: public ff::ff_farm using key_t = typename std::remove_reference(tmp.getControlFields()))>::type; // friendships with other classes in the library friend class MultiPipe; + bool used; // true if the operator has been added/chained in a MultiPipe // class Accumulator_Node class Accumulator_Node: public ff::ff_node_t { @@ -233,7 +234,7 @@ class Accumulator: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func, - routing_func_t _routing_func) + routing_func_t _routing_func): used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -269,7 +270,7 @@ class Accumulator: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func, - routing_func_t _routing_func) + routing_func_t _routing_func): used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -289,6 +290,21 @@ class Accumulator: public ff::ff_farm // when the Accumulator will be destroyed we need aslo to destroy the emitter, workers and collector ff::ff_farm::cleanup_all(); } + + /** + * \brief Check whether the Accumulator has been used in a MultiPipe + * \return true if the Accumulator has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Accumulator(const Accumulator &) = delete; // copy constructor + Accumulator(Accumulator &&) = delete; // move constructor + Accumulator &operator=(const Accumulator &) = delete; // copy assignment operator + Accumulator &operator=(Accumulator &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/basic.hpp b/wf/basic.hpp index 7f81830f..a0159c01 100644 --- a/wf/basic.hpp +++ b/wf/basic.hpp @@ -70,9 +70,9 @@ inline unsigned long current_time_nsecs() } /// utility macros -#define DEFAULT_COLOR_VECTOR_CAPACITY 500 //< default capacity of vectors used internally by the library -#define DEFAULT_COLOR_BATCH_SIZE_TB 1000 //< inital batch size (in no. of tuples) used by GPU operators with time-based windows -#define DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK 256 //< default number of threads per block used by GPU operators +#define DEFAULT_VECTOR_CAPACITY 500 //< default capacity of vectors used internally by the library +#define DEFAULT_BATCH_SIZE_TB 1000 //< inital batch size (in no. of tuples) used by GPU operators with time-based windows +#define DEFAULT_CUDA_NUM_THREAD_BLOCK 256 //< default number of threads per block used by GPU operators #define gpuErrChk(ans) { gpuAssert((ans), __FILE__, __LINE__); } // supported processing modes of the PipeGraph diff --git a/wf/builders.hpp b/wf/builders.hpp index 2c8692a2..3c7b0618 100644 --- a/wf/builders.hpp +++ b/wf/builders.hpp @@ -755,7 +755,7 @@ class WinSeqGPU_Builder uint64_t slide_len = 1; win_type_t winType = CB; size_t batch_len = 1; - size_t n_thread_block = DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK; + size_t n_thread_block = DEFAULT_CUDA_NUM_THREAD_BLOCK; std::string name = "anonymous_seq_gpu"; size_t scratchpad_size = 0; @@ -804,7 +804,7 @@ class WinSeqGPU_Builder * \param _n_thread_block number of threads per block * \return the object itself */ - WinSeqGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK) + WinSeqGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_CUDA_NUM_THREAD_BLOCK) { batch_len = _batch_len; n_thread_block = _n_thread_block; @@ -867,9 +867,9 @@ template class WinFarm_Builder { private: - T input; + T &input; // type of the operator to be created by this builder - using winfarm_t = decltype(get_WF_nested_type(input)); + using winfarm_t = std::remove_reference_t; // type of the closing function using closing_func_t = std::function; uint64_t win_len = 1; @@ -882,7 +882,7 @@ class WinFarm_Builder // window parameters initialization (input is a Pane_Farm) template - void initWindowConf(Pane_Farm _pf) + void initWindowConf(Pane_Farm &_pf) { win_len = _pf.win_len; slide_len = _pf.slide_len; @@ -891,7 +891,7 @@ class WinFarm_Builder // window parameters initialization (input is a Win_MapReduce) template - void initWindowConf(Win_MapReduce _wm) + void initWindowConf(Win_MapReduce &_wm) { win_len = _wm.win_len; slide_len = _wm.slide_len; @@ -900,7 +900,7 @@ class WinFarm_Builder // window parameters initialization (input is a function) template - void initWindowConf(T2 f) + void initWindowConf(T2 &f) { win_len = 1; slide_len = 1; @@ -913,7 +913,7 @@ class WinFarm_Builder * * \param _input can be either a function or an already instantiated Pane_Farm or Win_MapReduce operator. */ - WinFarm_Builder(T _input): input(_input) + WinFarm_Builder(T &_input): input(_input) { initWindowConf(input); } @@ -1042,22 +1042,22 @@ template class WinFarmGPU_Builder { private: - T input; + T &input; // type of the operator to be created by this builder - using winfarm_gpu_t = decltype(get_WF_GPU_nested_type(input)); + using winfarm_gpu_t = std::remove_reference_t; uint64_t win_len = 1; uint64_t slide_len = 1; win_type_t winType = CB; size_t pardegree = 1; size_t batch_len = 1; - size_t n_thread_block = DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK; + size_t n_thread_block = DEFAULT_CUDA_NUM_THREAD_BLOCK; std::string name = "anonymous_wf_gpu"; size_t scratchpad_size = 0; opt_level_t opt_level = LEVEL2; // window parameters initialization (input is a Pane_Farm_GPU) template - void initWindowConf(Pane_Farm_GPU _pf) + void initWindowConf(Pane_Farm_GPU &_pf) { win_len = _pf.win_len; slide_len = _pf.slide_len; @@ -1068,7 +1068,7 @@ class WinFarmGPU_Builder // window parameters initialization (input is a Win_MapReduce_GPU) template - void initWindowConf(Win_MapReduce_GPU _wm) + void initWindowConf(Win_MapReduce_GPU &_wm) { win_len = _wm.win_len; slide_len = _wm.slide_len; @@ -1079,13 +1079,13 @@ class WinFarmGPU_Builder // window parameters initialization (input is a function) template - void initWindowConf(T2 f) + void initWindowConf(T2 &f) { win_len = 1; slide_len = 1; winType = CB; batch_len = 1; - n_thread_block = DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK; + n_thread_block = DEFAULT_CUDA_NUM_THREAD_BLOCK; } public: @@ -1094,7 +1094,7 @@ class WinFarmGPU_Builder * * \param _input can be either a host/device function or an already instantiated Pane_Farm_GPU or Win_MapReduce_GPU operator. */ - WinFarmGPU_Builder(T _input): input(_input) { + WinFarmGPU_Builder(T &_input): input(_input) { initWindowConf(input); } @@ -1147,7 +1147,7 @@ class WinFarmGPU_Builder * \param _n_thread_block number of threads per block * \return the object itself */ - WinFarmGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK) + WinFarmGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_CUDA_NUM_THREAD_BLOCK) { batch_len = _batch_len; n_thread_block = _n_thread_block; @@ -1222,9 +1222,9 @@ template class KeyFarm_Builder { private: - T input; + T &input; // type of the operator to be created by this builder - using keyfarm_t = decltype(get_KF_nested_type(input)); + using keyfarm_t = std::remove_reference_t; // type of the closing function using closing_func_t = std::function; // type of the function to map the key hashcode onto an identifier starting from zero to pardegree-1 @@ -1240,7 +1240,7 @@ class KeyFarm_Builder // window parameters initialization (input is a Pane_Farm) template - void initWindowConf(Pane_Farm _pf) + void initWindowConf(Pane_Farm &_pf) { win_len = _pf.win_len; slide_len = _pf.slide_len; @@ -1249,7 +1249,7 @@ class KeyFarm_Builder // window parameters initialization (input is a Win_MapReduce) template - void initWindowConf(Win_MapReduce _wm) + void initWindowConf(Win_MapReduce &_wm) { win_len = _wm.win_len; slide_len = _wm.slide_len; @@ -1258,7 +1258,7 @@ class KeyFarm_Builder // window parameters initialization (input is a function) template - void initWindowConf(T2 f) + void initWindowConf(T2 &f) { win_len = 1; slide_len = 1; @@ -1271,7 +1271,7 @@ class KeyFarm_Builder * * \param _input can be either a function or an already instantiated Pane_Farm or Win_MapReduce operator. */ - KeyFarm_Builder(T _input): input(_input) + KeyFarm_Builder(T &_input): input(_input) { initWindowConf(input); } @@ -1398,17 +1398,17 @@ template class KeyFarmGPU_Builder { private: - T input; + T &input; // type of the function to map the key hashcode onto an identifier starting from zero to pardegree-1 using routing_func_t = std::function; // type of the operator to be created by this builder - using keyfarm_gpu_t = decltype(get_KF_GPU_nested_type(input)); + using keyfarm_gpu_t = std::remove_reference_t; uint64_t win_len = 1; uint64_t slide_len = 1; win_type_t winType = CB; size_t pardegree = 1; size_t batch_len = 1; - size_t n_thread_block = DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK; + size_t n_thread_block = DEFAULT_CUDA_NUM_THREAD_BLOCK; std::string name = "anonymous_wf_gpu"; size_t scratchpad_size = 0; routing_func_t routing_func = [](size_t k, size_t n) { return k%n; }; @@ -1416,7 +1416,7 @@ class KeyFarmGPU_Builder // window parameters initialization (input is a Pane_Farm_GPU) template - void initWindowConf(Pane_Farm_GPU _pf) + void initWindowConf(Pane_Farm_GPU &_pf) { win_len = _pf.win_len; slide_len = _pf.slide_len; @@ -1427,7 +1427,7 @@ class KeyFarmGPU_Builder // window parameters initialization (input is a Win_MapReduce_GPU) template - void initWindowConf(Win_MapReduce_GPU _wm) + void initWindowConf(Win_MapReduce_GPU &_wm) { win_len = _wm.win_len; slide_len = _wm.slide_len; @@ -1438,13 +1438,13 @@ class KeyFarmGPU_Builder // window parameters initialization (input is a function) template - void initWindowConf(T2 f) + void initWindowConf(T2 &f) { win_len = 1; slide_len = 1; winType = CB; batch_len = 1; - n_thread_block = DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK; + n_thread_block = DEFAULT_CUDA_NUM_THREAD_BLOCK; } public: @@ -1453,7 +1453,7 @@ class KeyFarmGPU_Builder * * \param _input can be either a host/device function or an already instantiated Pane_Farm_GPU or Win_MapReduce_GPU operator. */ - KeyFarmGPU_Builder(T _input): input(_input) { + KeyFarmGPU_Builder(T &_input): input(_input) { initWindowConf(input); } @@ -1506,7 +1506,7 @@ class KeyFarmGPU_Builder * \param _n_thread_block number of threads per block * \return the object itself */ - KeyFarmGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK) + KeyFarmGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_CUDA_NUM_THREAD_BLOCK) { batch_len = _batch_len; n_thread_block = _n_thread_block; @@ -1751,7 +1751,7 @@ class PaneFarmGPU_Builder size_t plq_degree = 1; size_t wlq_degree = 1; size_t batch_len = 1; - size_t n_thread_block = DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK; + size_t n_thread_block = DEFAULT_CUDA_NUM_THREAD_BLOCK; std::string name = "anonymous_pf_gpu"; size_t scratchpad_size = 0; opt_level_t opt_level = LEVEL0; @@ -1818,7 +1818,7 @@ class PaneFarmGPU_Builder * \param _n_thread_block number of threads per block * \return the object itself */ - PaneFarmGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK) + PaneFarmGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_CUDA_NUM_THREAD_BLOCK) { batch_len = _batch_len; n_thread_block = _n_thread_block; @@ -2075,7 +2075,7 @@ class WinMapReduceGPU_Builder size_t map_degree = 2; size_t reduce_degree = 1; size_t batch_len = 1; - size_t n_thread_block = DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK; + size_t n_thread_block = DEFAULT_CUDA_NUM_THREAD_BLOCK; std::string name = "anonymous_wmw_gpu"; size_t scratchpad_size = 0; opt_level_t opt_level = LEVEL0; @@ -2142,7 +2142,7 @@ class WinMapReduceGPU_Builder * \param _n_thread_block number of threads per block * \return the object itself */ - WinMapReduceGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_COLOR_CUDA_NUM_THREAD_BLOCK) + WinMapReduceGPU_Builder& withBatch(size_t _batch_len, size_t _n_thread_block=DEFAULT_CUDA_NUM_THREAD_BLOCK) { batch_len = _batch_len; n_thread_block = _n_thread_block; diff --git a/wf/filter.hpp b/wf/filter.hpp index c1dd0c8c..23148825 100644 --- a/wf/filter.hpp +++ b/wf/filter.hpp @@ -70,6 +70,7 @@ class Filter: public ff::ff_farm private: // friendships with other classes in the library friend class MultiPipe; + bool used; // true if the operator has been added/chained in a MultiPipe bool keyed; // flag stating whether the Filter is configured with keyBy or not // class Filter_Node class Filter_Node: public ff::ff_node_t @@ -203,7 +204,7 @@ class Filter: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -240,7 +241,7 @@ class Filter: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -275,7 +276,7 @@ class Filter: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -312,7 +313,7 @@ class Filter: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -343,6 +344,21 @@ class Filter: public ff::ff_farm { return keyed; } + + /** + * \brief Check whether the Filter has been used in a MultiPipe + * \return true if the Filter has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Filter(const Filter &) = delete; // copy constructor + Filter(Filter &&) = delete; // move constructor + Filter &operator=(const Filter &) = delete; // copy assignment operator + Filter &operator=(Filter &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/flatmap.hpp b/wf/flatmap.hpp index 88b857c6..0ebf7e86 100644 --- a/wf/flatmap.hpp +++ b/wf/flatmap.hpp @@ -72,6 +72,7 @@ class FlatMap: public ff::ff_farm // friendships with other classes in the library friend class MultiPipe; bool keyed; // flag stating whether the FlatMap is configured with keyBy or not + bool used; // true if the operator has been added/chained in a MultiPipe // class FlatMap_Node class FlatMap_Node: public ff::ff_node_t { @@ -209,7 +210,7 @@ class FlatMap: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -246,7 +247,7 @@ class FlatMap: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -281,7 +282,7 @@ class FlatMap: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -318,7 +319,7 @@ class FlatMap: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -349,6 +350,21 @@ class FlatMap: public ff::ff_farm { return keyed; } + + /** + * \brief Check whether the FlatMap has been used in a MultiPipe + * \return true if the FlatMap has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + FlatMap(const FlatMap &) = delete; // copy constructor + FlatMap(FlatMap &&) = delete; // move constructor + FlatMap &operator=(const FlatMap &) = delete; // copy assignment operator + FlatMap &operator=(FlatMap &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/key_farm.hpp b/wf/key_farm.hpp index 83aeecc0..f5301d92 100644 --- a/wf/key_farm.hpp +++ b/wf/key_farm.hpp @@ -96,6 +96,7 @@ class Key_Farm: public ff::ff_farm // friendships with other classes in the library template friend auto get_KF_nested_type(T); + friend class MultiPipe; // flag stating whether the Key_Farm has been instantiated with complex workers (Pane_Farm or Win_MapReduce) bool hasComplexWorkers; // optimization level of the Key_Farm @@ -111,11 +112,9 @@ class Key_Farm: public ff::ff_farm size_t inner_parallelism_2; // window type (CB or TB) win_type_t winType; + bool used; // true if the operator has been added/chained in a MultiPipe - // Private Constructor I (stub) - Key_Farm() {} - - // Private Constructor II + // Private Constructor I template Key_Farm(F_t _func, uint64_t _win_len, @@ -226,7 +225,9 @@ class Key_Farm: public ff::ff_farm routing_func_t _routing_func, opt_level_t _opt_level): Key_Farm(_win_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _routing_func, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor II @@ -251,7 +252,9 @@ class Key_Farm: public ff::ff_farm routing_func_t _routing_func, opt_level_t _opt_level): Key_Farm(_rich_win_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _routing_func, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor III @@ -276,7 +279,9 @@ class Key_Farm: public ff::ff_farm routing_func_t _routing_func, opt_level_t _opt_level): Key_Farm(_winupdate_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _routing_func, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor IV @@ -301,7 +306,9 @@ class Key_Farm: public ff::ff_farm routing_func_t _routing_func, opt_level_t _opt_level): Key_Farm(_rich_winupdate_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _routing_func, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor V (Nesting with Pane_Farm) @@ -316,7 +323,7 @@ class Key_Farm: public ff::ff_farm * \param _routing_func function to map the key hashcode onto an identifier starting from zero to pardegree-1 * \param _opt_level optimization level used to build the operator */ - Key_Farm(const pane_farm_t &_pf, + Key_Farm(pane_farm_t &_pf, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -329,7 +336,7 @@ class Key_Farm: public ff::ff_farm outer_opt_level(_opt_level), inner_type(PF_CPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), used(false) { // check the validity of the windowing parameters if (_win_len == 0 || _slide_len == 0) { @@ -341,6 +348,14 @@ class Key_Farm: public ff::ff_farm std::cerr << RED << "WindFlow Error: Key_Farm has parallelism zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Pane_Farm has not already been used in a nested structure + if (_pf.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Pane_Farm has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _pf.used4Nesting = true; + } // check the compatibility of the windowing parameters if (_pf.win_len != _win_len || _pf.slide_len != _slide_len || _pf.winType != _winType) { std::cerr << RED << "WindFlow Error: incompatible windowing parameters between Key_Farm and Pane_Farm" << DEFAULT_COLOR << std::endl; @@ -414,7 +429,7 @@ class Key_Farm: public ff::ff_farm * \param _routing_func function to map the key hashcode onto an identifier starting from zero to pardegree-1 * \param _opt_level optimization level used to build the operator */ - Key_Farm(const win_mapreduce_t &_wm, + Key_Farm(win_mapreduce_t &_wm, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -427,7 +442,7 @@ class Key_Farm: public ff::ff_farm outer_opt_level(_opt_level), inner_type(WMR_CPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), used(false) { // check the validity of the windowing parameters if (_win_len == 0 || _slide_len == 0) { @@ -439,6 +454,14 @@ class Key_Farm: public ff::ff_farm std::cerr << RED << "WindFlow Error: Key_Farm has parallelism zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Win_MapReduce has not already been used in a nested structure + if (_wm.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Win_MapReduce has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _wm.used4Nesting = true; + } // check the compatibility of the windowing parameters if (_wm.win_len != _win_len || _wm.slide_len != _slide_len || _wm.winType != _winType) { std::cerr << RED << "WindFlow Error: incompatible windowing parameters between Key_Farm and Win_MapReduce" << DEFAULT_COLOR << std::endl; @@ -561,6 +584,21 @@ class Key_Farm: public ff::ff_farm { return winType; } + + /** + * \brief Check whether the Key_Farm has been used in a MultiPipe + * \return true if the Key_Farm has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Key_Farm(const Key_Farm &) = delete; // copy constructor + Key_Farm(Key_Farm &&) = delete; // move constructor + Key_Farm &operator=(const Key_Farm &) = delete; // copy assignment operator + Key_Farm &operator=(Key_Farm &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/key_farm_gpu.hpp b/wf/key_farm_gpu.hpp index 1c169229..20d5e526 100644 --- a/wf/key_farm_gpu.hpp +++ b/wf/key_farm_gpu.hpp @@ -89,6 +89,7 @@ class Key_Farm_GPU: public ff::ff_farm // friendships with other classes in the library template friend auto get_KF_GPU_nested_type(T); + friend class MultiPipe; // flag stating whether the Key_Farm_GPU has been instantiated with complex workers (Pane_Farm_GPU or Win_MapReduce_GPU) bool hasComplexWorkers; // optimization level of the Key_Farm_GPU @@ -104,9 +105,7 @@ class Key_Farm_GPU: public ff::ff_farm size_t inner_parallelism_2; // window type (CB or TB) win_type_t winType; - - // Private Constructor (stub) - Key_Farm_GPU() {} + bool used; // true if the operator has been added/chained in a MultiPipe // method to optimize the structure of the Key_Farm_GPU operator void optimize_KeyFarmGPU(opt_level_t opt) @@ -176,7 +175,8 @@ class Key_Farm_GPU: public ff::ff_farm parallelism(_pardegree), inner_parallelism_1(1), inner_parallelism_2(0), - winType(_winType) + winType(_winType), + used(false) { // check the validity of the windowing parameters if (_win_len == 0 || _slide_len == 0) { @@ -228,7 +228,7 @@ class Key_Farm_GPU: public ff::ff_farm * \param _routing_func function to map the key hashcode onto an identifier starting from zero to pardegree-1 * \param _opt_level optimization level used to build the operator */ - Key_Farm_GPU(const pane_farm_gpu_t &_pf, + Key_Farm_GPU(pane_farm_gpu_t &_pf, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -243,7 +243,8 @@ class Key_Farm_GPU: public ff::ff_farm outer_opt_level(_opt_level), inner_type(PF_GPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), + used(false) { // check the validity of the windowing parameters if (_win_len == 0 || _slide_len == 0) { @@ -260,6 +261,14 @@ class Key_Farm_GPU: public ff::ff_farm std::cerr << RED << "WindFlow Error: batch length in Key_Farm_GPU cannot be zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Pane_Farm_GPU has not already been used in a nested structure + if (_pf.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Pane_Farm_GPU has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _pf.used4Nesting = true; + } // check the compatibility of the windowing/batching parameters if (_pf.win_len != _win_len || _pf.slide_len != _slide_len || _pf.winType != _winType || _pf.batch_len != _batch_len || _pf.n_thread_block != _n_thread_block) { std::cerr << RED << "WindFlow Error: incompatible windowing and batching parameters between Key_Farm_GPU and Pane_Farm_GPU" << DEFAULT_COLOR << std::endl; @@ -315,7 +324,7 @@ class Key_Farm_GPU: public ff::ff_farm * \param _routing_func function to map the key hashcode onto an identifier starting from zero to pardegree-1 * \param _opt_level optimization level used to build the operator */ - Key_Farm_GPU(const win_mapreduce_gpu_t &_wm, + Key_Farm_GPU(win_mapreduce_gpu_t &_wm, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -330,7 +339,8 @@ class Key_Farm_GPU: public ff::ff_farm outer_opt_level(_opt_level), inner_type(WMR_GPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), + used(false) { // check the validity of the windowing parameters if (_win_len == 0 || _slide_len == 0) { @@ -347,6 +357,14 @@ class Key_Farm_GPU: public ff::ff_farm std::cerr << RED << "WindFlow Error: batch length in Key_Farm_GPU cannot be zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Win_MapReduce_GPU has not already been used in a nested structure + if (_wm.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Win_MapReduce_GPU has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _wm.used4Nesting = true; + } // check the compatibility of the windowing/batching parameters if (_wm.win_len != _win_len || _wm.slide_len != _slide_len || _wm.winType != _winType || _wm.batch_len != _batch_len || _wm.n_thread_block != _n_thread_block) { std::cerr << RED << "WindFlow Error: incompatible windowing and batching parameters between Key_Farm_GPU and Win_MapReduce_GPU" << DEFAULT_COLOR << std::endl; @@ -449,6 +467,21 @@ class Key_Farm_GPU: public ff::ff_farm { return winType; } + + /** + * \brief Check whether the Key_Farm_GPU has been used in a MultiPipe + * \return true if the Key_Farm_GPU has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Key_Farm_GPU(const Key_Farm_GPU &) = delete; // copy constructor + //Key_Farm_GPU(Key_Farm_GPU &&) = delete; // move constructor + Key_Farm_GPU &operator=(const Key_Farm_GPU &) = delete; // copy assignment operator + Key_Farm_GPU &operator=(Key_Farm_GPU &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/map.hpp b/wf/map.hpp index 6cb5fd66..f4f29e4d 100644 --- a/wf/map.hpp +++ b/wf/map.hpp @@ -75,6 +75,7 @@ class Map: public ff::ff_farm // friendships with other classes in the library friend class MultiPipe; bool keyed; // flag stating whether the Map is configured with keyBy or not + bool used; // true if the operator has been added/chained in a MultiPipe // class Map_Node class Map_Node: public ff::ff_node_t { @@ -251,7 +252,7 @@ class Map: public ff::ff_farm T _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -289,7 +290,7 @@ class Map: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -325,7 +326,7 @@ class Map: public ff::ff_farm T _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -363,7 +364,7 @@ class Map: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -399,7 +400,7 @@ class Map: public ff::ff_farm T _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -437,7 +438,7 @@ class Map: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -473,7 +474,7 @@ class Map: public ff::ff_farm T _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -511,7 +512,7 @@ class Map: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -542,6 +543,21 @@ class Map: public ff::ff_farm { return keyed; } + + /** + * \brief Check whether the Map has been used in a MultiPipe + * \return true if the Map has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Map(const Map &) = delete; // copy constructor + Map(Map &&) = delete; // move constructor + Map &operator=(const Map &) = delete; // copy assignment operator + Map &operator=(Map &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/meta_utils.hpp b/wf/meta_utils.hpp index 1b117d16..f44f654f 100644 --- a/wf/meta_utils.hpp +++ b/wf/meta_utils.hpp @@ -301,68 +301,72 @@ decltype(get_result_t(&F_t::operator())) get_result_t(F_t); // metafunction to extract the type of Win_Farm from the inner operator of type Pane_Farm template -Win_Farm get_WF_nested_type(Pane_Farm const); +Win_Farm *get_WF_nested_type(Pane_Farm const&); // metafunction to extract the type of Win_Farm from the inner operator of type Win_MapReduce template -Win_Farm get_WF_nested_type(Win_MapReduce const); +Win_Farm *get_WF_nested_type(Win_MapReduce const&); // metafunction to extract the type of Win_Farm from a callable type (e.g., function, lambda, functor) template -auto get_WF_nested_type(F_t _f) +auto *get_WF_nested_type(F_t _f) { - return Win_Farm(); // stub Constructor + Win_Farm *ptr = nullptr; + return ptr; } // metafunction to extract the type of Win_Farm_GPU from the inner operator of type Pane_Farm_GPU template -Win_Farm_GPU get_WF_GPU_nested_type(Pane_Farm_GPU const); +Win_Farm_GPU *get_WF_GPU_nested_type(Pane_Farm_GPU const&); // metafunction to extract the type of Win_Farm_GPU from the inner operator of type Win_MapReduce_GPU template -Win_Farm_GPU get_WF_GPU_nested_type(Win_MapReduce_GPU const); +Win_Farm_GPU *get_WF_GPU_nested_type(Win_MapReduce_GPU const&); // metafunction to extract the type of Win_Farm_GPU from a callable type (e.g., function, lambda, functor) template -auto get_WF_GPU_nested_type(F_t _f) +auto *get_WF_GPU_nested_type(F_t _f) { - return Win_Farm_GPU(); // stub Constructor + decltype(_f)> *ptr = nullptr; + return ptr; } // metafunction to extract the type of Key_Farm from the inner operator of type Pane_Farm template -Key_Farm get_KF_nested_type(Pane_Farm const); +Key_Farm *get_KF_nested_type(Pane_Farm const&); // metafunction to extract the type of Key_Farm from the inner operator of type Win_MapReduce template -Key_Farm get_KF_nested_type(Win_MapReduce const); +Key_Farm *get_KF_nested_type(Win_MapReduce const&); // metafunction to extract the type of Key_Farm from a callable type (e.g., function, lambda, functor) template -auto get_KF_nested_type(F_t _f) +auto *get_KF_nested_type(F_t _f) { - return Key_Farm(); // stub Constructor + Key_Farm *ptr = nullptr; + return ptr; } // metafunction to extract the type of Key_Farm_GPU from the inner operator of type Pane_Farm_GPU template -Key_Farm_GPU get_KF_GPU_nested_type(Pane_Farm_GPU const); +Key_Farm_GPU *get_KF_GPU_nested_type(Pane_Farm_GPU const&); // metafunction to extract the type of Key_Farm_GPU from the inner operator of type Win_MapReduce_GPU template -Key_Farm_GPU get_KF_GPU_nested_type(Win_MapReduce_GPU const); +Key_Farm_GPU *get_KF_GPU_nested_type(Win_MapReduce_GPU const&); // metafunction to extract the type of Key_Farm_GPU from a callable type (e.g., function, lambda, functor) template -auto get_KF_GPU_nested_type(F_t _f) +auto *get_KF_GPU_nested_type(F_t _f) { - return Key_Farm_GPU(); // stub Constructor + decltype(_f)> *ptr = nullptr; + return ptr; } // metafunctions to return the callable type to be executed on the GPU (only lambda or functor!) diff --git a/wf/pane_farm.hpp b/wf/pane_farm.hpp index 99410bc6..1d7063c1 100644 --- a/wf/pane_farm.hpp +++ b/wf/pane_farm.hpp @@ -92,6 +92,7 @@ class Pane_Farm: public ff::ff_pipeline friend class WinFarm_Builder; template friend class KeyFarm_Builder; + friend class MultiPipe; // compute the gcd between two numbers std::function gcd = [](uint64_t u, uint64_t v) { while (v != 0) { @@ -124,6 +125,8 @@ class Pane_Farm: public ff::ff_pipeline bool ordered; opt_level_t opt_level; PatternConfig config; + bool used; // true if the operator has been added/chained in a MultiPipe + bool used4Nesting; // true if the operator has been used in a nested structure // Private Constructor template @@ -284,6 +287,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = false; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -320,6 +325,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = true; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -356,6 +363,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = false; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -392,6 +401,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = true; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -428,6 +439,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = false; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -464,6 +477,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = true; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -500,6 +515,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = false; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -536,6 +553,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = true; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -572,6 +591,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = false; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -608,6 +629,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = true; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -644,6 +667,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = false; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -680,6 +705,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = false; isRichPLQ = true; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -716,6 +743,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = false; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -752,6 +781,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = true; isRichWLQ = false; + used = false; + used4Nesting = false; } /** @@ -788,6 +819,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = false; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -824,6 +857,8 @@ class Pane_Farm: public ff::ff_pipeline isNICWLQ = true; isRichPLQ = true; isRichWLQ = true; + used = false; + used4Nesting = false; } /** @@ -861,6 +896,30 @@ class Pane_Farm: public ff::ff_pipeline { return wlq_degree; } + + /** + * \brief Check whether the Pane_Farm has been used in a MultiPipe + * \return true if the Pane_Farm has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /** + * \brief Check whether the Pane_Farm has been used in a nested structure + * \return true if the Pane_Farm has been used in a nested structure + */ + bool isUsed4Nesting() const + { + return used4Nesting; + } + + /// deleted constructors/operators + Pane_Farm(const Pane_Farm &) = delete; // copy constructor + Pane_Farm(Pane_Farm &&) = delete; // move constructor + Pane_Farm &operator=(const Pane_Farm &) = delete; // copy assignment operator + Pane_Farm &operator=(Pane_Farm &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/pane_farm_gpu.hpp b/wf/pane_farm_gpu.hpp index e39e579d..1238f8f0 100644 --- a/wf/pane_farm_gpu.hpp +++ b/wf/pane_farm_gpu.hpp @@ -89,6 +89,7 @@ class Pane_Farm_GPU: public ff::ff_pipeline friend class WinFarmGPU_Builder; template friend class KeyFarmGPU_Builder; + friend class MultiPipe; // compute the gcd between two numbers std::function gcd = [](uint64_t u, uint64_t v) { while (v != 0) { @@ -120,6 +121,8 @@ class Pane_Farm_GPU: public ff::ff_pipeline bool ordered; opt_level_t opt_level; PatternConfig config; + bool used; // true if the operator has been added/chained in a MultiPipe + bool used4Nesting; // true if the operator has been used in a nested structure // Private Constructor I Pane_Farm_GPU(F_t _gpuFunction, @@ -570,7 +573,10 @@ class Pane_Farm_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Pane_Farm_GPU(_plq_func, _wlq_func, _win_len, _slide_len, _winType, _plq_degree, _wlq_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Constructor II @@ -603,7 +609,10 @@ class Pane_Farm_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Pane_Farm_GPU(_plq_func, _wlqupdate_func, _win_len, _slide_len, _winType, _plq_degree, _wlq_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Constructor III @@ -636,7 +645,10 @@ class Pane_Farm_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Pane_Farm_GPU(_plq_func, _wlq_func, _win_len, _slide_len, _winType, _plq_degree, _wlq_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Constructor IV @@ -669,7 +681,10 @@ class Pane_Farm_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Pane_Farm_GPU(_plqupdate_func, _wlq_func, _win_len, _slide_len, _winType, _plq_degree, _wlq_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Get the optimization level used to build the operator @@ -706,6 +721,30 @@ class Pane_Farm_GPU: public ff::ff_pipeline { return wlq_degree; } + + /** + * \brief Check whether the Pane_Farm_GPU has been used in a MultiPipe + * \return true if the Pane_Farm_GPU has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /** + * \brief Check whether the Pane_Farm_GPU has been used in a nested structure + * \return true if the Pane_Farm_GPU has been used in a nested structure + */ + bool isUsed4Nesting() const + { + return used4Nesting; + } + + /// deleted constructors/operators + Pane_Farm_GPU(const Pane_Farm_GPU &) = delete; // copy constructor + Pane_Farm_GPU(Pane_Farm_GPU &&) = delete; // move constructor + Pane_Farm_GPU &operator=(const Pane_Farm_GPU &) = delete; // copy assignment operator + Pane_Farm_GPU &operator=(Pane_Farm_GPU &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/pipegraph.hpp b/wf/pipegraph.hpp index c1afd66f..f6b57cbe 100644 --- a/wf/pipegraph.hpp +++ b/wf/pipegraph.hpp @@ -192,7 +192,7 @@ class MultiPipe: public ff::ff_pipeline { private: // enumeration of the routing types - enum routing_types_t { SIMPLE, COMPLEX }; + enum routing_types_t { STATELESS, KEYED, COMPLEX }; PipeGraph *graph; // PipeGraph creating this MultiPipe bool has_source; // true if the MultiPipe starts with a Source bool has_sink; // true if the MultiPipe ends with a Sink @@ -946,9 +946,17 @@ inline size_t PipeGraph::getNumThreads() const template MultiPipe &MultiPipe::add_source(Source &_source) { + // check whether the operator has already been used in a MultiPipe + if (_source.isUsed()) { + std::cerr << RED << "WindFlow Error: Source operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _source.used = true; + } // check the Source presence if (has_source) { - std::cerr << RED << "WindFlow Error: Source has been already defined for the MultiPipe" << DEFAULT_COLOR << std::endl; + std::cerr << RED << "WindFlow Error: Source has already been defined for the MultiPipe" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } // create the initial matrioska @@ -1035,7 +1043,7 @@ void MultiPipe::add_operator(ff::ff_farm *_op, routing_types_t _type, ordering_m size_t n1 = (last->getFirstSet()).size(); size_t n2 = (_op->getWorkers()).size(); // Case 2: direct connection - if ((n1 == n2) && _type == SIMPLE && !forceShuffling) { + if ((n1 == n2) && _type == STATELESS && !forceShuffling) { auto first_set = last->getFirstSet(); auto worker_set = _op->getWorkers(); // add the operator's workers to the pipelines in the first set of the matrioska @@ -1121,7 +1129,7 @@ bool MultiPipe::chain_operator(ff::ff_farm *_op) return false; size_t n1 = (last->getFirstSet()).size(); size_t n2 = (_op->getWorkers()).size(); - // _op is for sure SIMPLE: check additional conditions for chaining + // distribution of _op is for sure STATELESS: check additional conditions for chaining if ((n1 == n2) && (!forceShuffling)) { auto first_set = (last)->getFirstSet(); auto worker_set = _op->getWorkers(); @@ -1211,9 +1219,9 @@ inline std::vector MultiPipe::prepareMergeSet() template std::vector MultiPipe::prepareMergeSet(MULTIPIPE &_pipe) { - // check whether the MultiPipe has been already merged + // check whether the MultiPipe has already been merged if (_pipe.isMerged) { - std::cerr << RED << "WindFlow Error: MultiPipe has been already merged" << DEFAULT_COLOR << std::endl; + std::cerr << RED << "WindFlow Error: MultiPipe has already been merged" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } // check whether the MultiPipe has been split @@ -1230,9 +1238,9 @@ std::vector MultiPipe::prepareMergeSet(MULTIPIPE &_pipe) template std::vector MultiPipe::prepareMergeSet(MULTIPIPE &_first, MULTIPIPES&... _pipes) { - // check whether the MultiPipe has been already merged + // check whether the MultiPipe has already been merged if (_first.isMerged) { - std::cerr << RED << "WindFlow Error: MultiPipe has been already merged" << DEFAULT_COLOR << std::endl; + std::cerr << RED << "WindFlow Error: MultiPipe has already been merged" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } // check whether the MultiPipe has been split @@ -1305,6 +1313,11 @@ inline int MultiPipe::run_and_wait_end() template MultiPipe &MultiPipe::add(Filter &_filter) { + // check whether the operator has already been used in a MultiPipe + if (_filter.isUsed()) { + std::cerr << RED << "WindFlow Error: Filter operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -1313,9 +1326,10 @@ MultiPipe &MultiPipe::add(Filter &_filter) exit(EXIT_FAILURE); } // call the generic method to add the operator to the MultiPipe - add_operator, Ordering_Node>(&_filter, _filter.isKeyed() ? COMPLEX : SIMPLE, TS); + add_operator, Ordering_Node>(&_filter, _filter.isKeyed() ? KEYED : STATELESS, TS); // save the new output type from this MultiPipe outputType = opInType; + _filter.used = true; return *this; } @@ -1323,6 +1337,11 @@ MultiPipe &MultiPipe::add(Filter &_filter) template MultiPipe &MultiPipe::chain(Filter &_filter) { + // check whether the operator has already been used in a MultiPipe + if (_filter.isUsed()) { + std::cerr << RED << "WindFlow Error: Filter operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -1340,6 +1359,7 @@ MultiPipe &MultiPipe::chain(Filter &_filter) add(_filter); // save the new output type from this MultiPipe outputType = opInType; + _filter.used = true; return *this; } @@ -1347,6 +1367,11 @@ MultiPipe &MultiPipe::chain(Filter &_filter) template MultiPipe &MultiPipe::add(Map &_map) { + // check whether the operator has already been used in a MultiPipe + if (_map.isUsed()) { + std::cerr << RED << "WindFlow Error: Map operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -1355,10 +1380,11 @@ MultiPipe &MultiPipe::add(Map &_map) exit(EXIT_FAILURE); } // call the generic method to add the operator to the MultiPipe - add_operator, Ordering_Node>(&_map, _map.isKeyed() ? COMPLEX : SIMPLE, TS); + add_operator, Ordering_Node>(&_map, _map.isKeyed() ? KEYED : STATELESS, TS); // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _map.used = true; return *this; } @@ -1366,6 +1392,11 @@ MultiPipe &MultiPipe::add(Map &_map) template MultiPipe &MultiPipe::chain(Map &_map) { + // check whether the operator has already been used in a MultiPipe + if (_map.isUsed()) { + std::cerr << RED << "WindFlow Error: Map operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -1384,6 +1415,7 @@ MultiPipe &MultiPipe::chain(Map &_map) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _map.used = true; return *this; } @@ -1391,6 +1423,11 @@ MultiPipe &MultiPipe::chain(Map &_map) template MultiPipe &MultiPipe::add(FlatMap &_flatmap) { + // check whether the operator has already been used in a MultiPipe + if (_flatmap.isUsed()) { + std::cerr << RED << "WindFlow Error: FlatMap operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -1399,10 +1436,11 @@ MultiPipe &MultiPipe::add(FlatMap &_flatmap) exit(EXIT_FAILURE); } // call the generic method to add the operator - add_operator, Ordering_Node>(&_flatmap, _flatmap.isKeyed() ? COMPLEX : SIMPLE, TS); + add_operator, Ordering_Node>(&_flatmap, _flatmap.isKeyed() ? KEYED : STATELESS, TS); // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _flatmap.used = true; return *this; } @@ -1410,6 +1448,11 @@ MultiPipe &MultiPipe::add(FlatMap &_flatmap) template MultiPipe &MultiPipe::chain(FlatMap &_flatmap) { + // check whether the operator has already been used in a MultiPipe + if (_flatmap.isUsed()) { + std::cerr << RED << "WindFlow Error: FlatMap operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -1427,6 +1470,7 @@ MultiPipe &MultiPipe::chain(FlatMap &_flatmap) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _flatmap.used = true; return *this; } @@ -1434,6 +1478,11 @@ MultiPipe &MultiPipe::chain(FlatMap &_flatmap) template MultiPipe &MultiPipe::add(Accumulator &_acc) { + // check whether the operator has already been used in a MultiPipe + if (_acc.isUsed()) { + std::cerr << RED << "WindFlow Error: Accumulator operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -1442,10 +1491,11 @@ MultiPipe &MultiPipe::add(Accumulator &_acc) exit(EXIT_FAILURE); } // call the generic method to add the operator to the MultiPipe - add_operator, Ordering_Node>(&_acc, COMPLEX, TS); + add_operator, Ordering_Node>(&_acc, KEYED, TS); // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _acc.used = true; return *this; } @@ -1453,6 +1503,11 @@ MultiPipe &MultiPipe::add(Accumulator &_acc) template MultiPipe &MultiPipe::add(Win_Farm &_wf) { + // check whether the operator has already been used in a MultiPipe + if (_wf.isUsed()) { + std::cerr << RED << "WindFlow Error: Win_Farm operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Win_Farm operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -1543,6 +1598,7 @@ MultiPipe &MultiPipe::add(Win_Farm &_wf) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _wf.used = true; return *this; } @@ -1550,6 +1606,11 @@ MultiPipe &MultiPipe::add(Win_Farm &_wf) template MultiPipe &MultiPipe::add(Win_Farm_GPU &_wf) { + // check whether the operator has already been used in a MultiPipe + if (_wf.isUsed()) { + std::cerr << RED << "WindFlow Error: Win_Farm_GPU operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Win_Farm_GPU operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -1640,6 +1701,7 @@ MultiPipe &MultiPipe::add(Win_Farm_GPU &_wf) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _wf.used = true; return *this; } @@ -1647,6 +1709,11 @@ MultiPipe &MultiPipe::add(Win_Farm_GPU &_wf) template MultiPipe &MultiPipe::add(Key_Farm &_kf) { + // check whether the operator has already been used in a MultiPipe + if (_kf.isUsed()) { + std::cerr << RED << "WindFlow Error: Key_Farm operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Key_Farm operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -1740,6 +1807,7 @@ MultiPipe &MultiPipe::add(Key_Farm &_kf) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _kf.used = true; return *this; } @@ -1747,6 +1815,11 @@ MultiPipe &MultiPipe::add(Key_Farm &_kf) template MultiPipe &MultiPipe::add(Key_Farm_GPU &_kf) { + // check whether the operator has already been used in a MultiPipe + if (_kf.isUsed()) { + std::cerr << RED << "WindFlow Error: Key_Farm_GPU operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Key_Farm_GPU operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -1840,6 +1913,7 @@ MultiPipe &MultiPipe::add(Key_Farm_GPU &_kf) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _kf.used = true; return *this; } @@ -1847,6 +1921,11 @@ MultiPipe &MultiPipe::add(Key_Farm_GPU &_kf) template MultiPipe &MultiPipe::add(Pane_Farm &_pf) { + // check whether the operator has already been used in a MultiPipe + if (_pf.isUsed()) { + std::cerr << RED << "WindFlow Error: Pane_Farm operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Pane_Farm operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -1878,7 +1957,7 @@ MultiPipe &MultiPipe::add(Pane_Farm &_pf) plq->cleanup_emitter(true); plq->cleanup_workers(false); // call the generic method to add the operator (PLQ stage) to the MultiPipe - add_operator, Ordering_Node>(plq, COMPLEX, (_pf.getWinType() == TB) ? TS : TS_RENUMBERING); + add_operator, Ordering_Node>(plq, COMPLEX, (_pf.getWinType() == TB) ? TS : TS_RENUMBERING); delete plq; } else { @@ -1908,7 +1987,7 @@ MultiPipe &MultiPipe::add(Pane_Farm &_pf) wlq->cleanup_emitter(true); wlq->cleanup_workers(false); // call the generic method to add the operator (WLQ stage) to the MultiPipe - add_operator, Ordering_Node>(wlq, COMPLEX, ID); + add_operator, Ordering_Node>(wlq, COMPLEX, ID); delete wlq; } else { @@ -1919,6 +1998,7 @@ MultiPipe &MultiPipe::add(Pane_Farm &_pf) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _pf.used = true; return *this; } @@ -1926,6 +2006,11 @@ MultiPipe &MultiPipe::add(Pane_Farm &_pf) template MultiPipe &MultiPipe::add(Pane_Farm_GPU &_pf) { + // check whether the operator has already been used in a MultiPipe + if (_pf.isUsed()) { + std::cerr << RED << "WindFlow Error: Pane_Farm_GPU operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Pane_Farm_GPU operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -1957,7 +2042,7 @@ MultiPipe &MultiPipe::add(Pane_Farm_GPU &_pf) plq->cleanup_emitter(true); plq->cleanup_workers(false); // call the generic method to add the operator (PLQ stage) to the MultiPipe - add_operator, Ordering_Node>(plq, COMPLEX, (_pf.getWinType() == TB) ? TS : TS_RENUMBERING); + add_operator, Ordering_Node>(plq, COMPLEX, (_pf.getWinType() == TB) ? TS : TS_RENUMBERING); delete plq; } else { @@ -1987,7 +2072,7 @@ MultiPipe &MultiPipe::add(Pane_Farm_GPU &_pf) wlq->cleanup_emitter(true); wlq->cleanup_workers(false); // call the generic method to add the operator (WLQ stage) to the MultiPipe - add_operator, Ordering_Node>(wlq, COMPLEX, ID); + add_operator, Ordering_Node>(wlq, COMPLEX, ID); delete wlq; } else { @@ -1998,6 +2083,7 @@ MultiPipe &MultiPipe::add(Pane_Farm_GPU &_pf) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _pf.used = true; return *this; } @@ -2005,6 +2091,11 @@ MultiPipe &MultiPipe::add(Pane_Farm_GPU &_pf) template MultiPipe &MultiPipe::add(Win_MapReduce &_wmr) { + // check whether the operator has already been used in a MultiPipe + if (_wmr.isUsed()) { + std::cerr << RED << "WindFlow Error: Win_MapReduce operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Win_MapReduce operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -2063,7 +2154,7 @@ MultiPipe &MultiPipe::add(Win_MapReduce &_wmr) reduce->cleanup_emitter(true); reduce->cleanup_workers(false); // call the generic method to add the operator (REDUCE stage) to the MultiPipe - add_operator, Ordering_Node>(reduce, COMPLEX, ID); + add_operator, Ordering_Node>(reduce, COMPLEX, ID); delete reduce; } else { @@ -2074,6 +2165,7 @@ MultiPipe &MultiPipe::add(Win_MapReduce &_wmr) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _wmr.used = true; return *this; } @@ -2081,6 +2173,11 @@ MultiPipe &MultiPipe::add(Win_MapReduce &_wmr) template MultiPipe &MultiPipe::add(Win_MapReduce_GPU &_wmr) { + // check whether the operator has already been used in a MultiPipe + if (_wmr.isUsed()) { + std::cerr << RED << "WindFlow Error: Win_MapReduce_GPU operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the processing mode if (graph->mode != Mode::DETERMINISTIC) { std::cerr << RED << "WindFlow Error: the Win_MapReduce_GPU operator requires the DETERMINISTIC mode" << DEFAULT_COLOR << std::endl; @@ -2139,7 +2236,7 @@ MultiPipe &MultiPipe::add(Win_MapReduce_GPU &_wmr) reduce->cleanup_emitter(true); reduce->cleanup_workers(false); // call the generic method to add the operator (REDUCE stage) to the MultiPipe - add_operator, Ordering_Node>(reduce, COMPLEX, ID); + add_operator, Ordering_Node>(reduce, COMPLEX, ID); delete reduce; } else { @@ -2150,6 +2247,7 @@ MultiPipe &MultiPipe::add(Win_MapReduce_GPU &_wmr) // save the new output type from this MultiPipe result_t r; outputType = typeid(r).name(); + _wmr.used = true; return *this; } @@ -2157,6 +2255,11 @@ MultiPipe &MultiPipe::add(Win_MapReduce_GPU &_wmr) template MultiPipe &MultiPipe::add_sink(Sink &_sink) { + // check whether the operator has already been used in a MultiPipe + if (_sink.isUsed()) { + std::cerr << RED << "WindFlow Error: Sink operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -2165,10 +2268,11 @@ MultiPipe &MultiPipe::add_sink(Sink &_sink) exit(EXIT_FAILURE); } // call the generic method to add the operator to the MultiPipe - add_operator, Ordering_Node>(&_sink, _sink.isKeyed() ? COMPLEX : SIMPLE, TS); + add_operator, Ordering_Node>(&_sink, _sink.isKeyed() ? KEYED : STATELESS, TS); has_sink = true; // save the new output type from this MultiPipe outputType = opInType; + _sink.used = true; return *this; } @@ -2176,6 +2280,11 @@ MultiPipe &MultiPipe::add_sink(Sink &_sink) template MultiPipe &MultiPipe::chain_sink(Sink &_sink) { + // check whether the operator has already been used in a MultiPipe + if (_sink.isUsed()) { + std::cerr << RED << "WindFlow Error: Sink operator has already been used in a MultiPipe" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } // check the type compatibility tuple_t t; std::string opInType = typeid(t).name(); @@ -2194,6 +2303,7 @@ MultiPipe &MultiPipe::chain_sink(Sink &_sink) has_sink = true; // save the new output type from this MultiPipe outputType = opInType; + _sink.used = true; return *this; } @@ -2239,9 +2349,9 @@ MultiPipe &MultiPipe::split(F_t _splitting_func, size_t _cardinality) std::cerr << RED << "WindFlow Error: MultiPipe has been merged and cannot be split" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } - // check whether the MultiPipe has been already split + // check whether the MultiPipe has already been split if (isSplit) { - std::cerr << RED << "WindFlow Error: MultiPipe has been already split" << DEFAULT_COLOR << std::endl; + std::cerr << RED << "WindFlow Error: MultiPipe has already been split" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } // prepare the splitting of this diff --git a/wf/sink.hpp b/wf/sink.hpp index 10d70e4b..774313f0 100644 --- a/wf/sink.hpp +++ b/wf/sink.hpp @@ -75,6 +75,7 @@ class Sink: public ff::ff_farm // friendships with other classes in the library friend class MultiPipe; bool keyed; // flag stating whether the Sink is configured with keyBy or not + bool used; // true if the operator has been added/chained in a MultiPipe // class Sink_Node class Sink_Node: public ff::ff_monode_t { @@ -218,7 +219,7 @@ class Sink: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -253,7 +254,7 @@ class Sink: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -286,7 +287,7 @@ class Sink: public ff::ff_farm size_t _pardegree, std::string _name, closing_func_t _closing_func): - keyed(false) + keyed(false), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -321,7 +322,7 @@ class Sink: public ff::ff_farm std::string _name, closing_func_t _closing_func, routing_func_t _routing_func): - keyed(true) + keyed(true), used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -350,6 +351,21 @@ class Sink: public ff::ff_farm { return keyed; } + + /** + * \brief Check whether the Sink has been used in a MultiPipe + * \return true if the Sink has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Sink(const Sink &) = delete; // copy constructor + Sink(Sink &&) = delete; // move constructor + Sink &operator=(const Sink &) = delete; // copy assignment operator + Sink &operator=(Sink &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/source.hpp b/wf/source.hpp index 61c4699e..2340ae0d 100644 --- a/wf/source.hpp +++ b/wf/source.hpp @@ -71,6 +71,7 @@ class Source: public ff::ff_a2a private: // friendships with other classes in the library friend class MultiPipe; + bool used; // true if the operator has been added/chained in a MultiPipe // class Source_Node class Source_Node: public ff::ff_node_t { @@ -239,7 +240,7 @@ class Source: public ff::ff_a2a Source(source_item_func_t _func, size_t _pardegree, std::string _name, - closing_func_t _closing_func) + closing_func_t _closing_func): used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -271,7 +272,7 @@ class Source: public ff::ff_a2a Source(rich_source_item_func_t _func, size_t _pardegree, std::string _name, - closing_func_t _closing_func) + closing_func_t _closing_func): used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -303,7 +304,7 @@ class Source: public ff::ff_a2a Source(source_loop_func_t _func, size_t _pardegree, std::string _name, - closing_func_t _closing_func) + closing_func_t _closing_func): used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -336,7 +337,7 @@ class Source: public ff::ff_a2a Source(rich_source_loop_func_t _func, size_t _pardegree, std::string _name, - closing_func_t _closing_func) + closing_func_t _closing_func): used(false) { // check the validity of the parallelism degree if (_pardegree == 0) { @@ -357,6 +358,21 @@ class Source: public ff::ff_a2a // add second set ff::ff_a2a::add_secondset(second_set, true); } + + /** + * \brief Check whether the Source has been used in a MultiPipe + * \return true if the Source has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Source(const Source &) = delete; // copy constructor + Source(Source &&) = delete; // move constructor + Source &operator=(const Source &) = delete; // copy assignment operator + Source &operator=(Source &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/win_farm.hpp b/wf/win_farm.hpp index 62e2c323..444522ec 100644 --- a/wf/win_farm.hpp +++ b/wf/win_farm.hpp @@ -99,6 +99,7 @@ class Win_Farm: public ff::ff_farm friend class Win_MapReduce_GPU; template friend auto get_WF_nested_type(T); + friend class MultiPipe; // flag stating whether the Win_Farm has been instantiated with complex workers (Pane_Farm or Win_MapReduce) bool hasComplexWorkers; // optimization level of the Win_Farm @@ -114,11 +115,9 @@ class Win_Farm: public ff::ff_farm size_t inner_parallelism_2; // window type (CB or TB) win_type_t winType; + bool used; // true if the operator has been added/chained in a MultiPipe - // Private Constructor I (stub) - Win_Farm() {} - - // Private Constructor II + // Private Constructor I template Win_Farm(F_t _func, uint64_t _win_len, @@ -235,7 +234,9 @@ class Win_Farm: public ff::ff_farm bool _ordered, opt_level_t _opt_level): Win_Farm(_win_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor II @@ -260,7 +261,9 @@ class Win_Farm: public ff::ff_farm bool _ordered, opt_level_t _opt_level): Win_Farm(_rich_win_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor III @@ -285,7 +288,9 @@ class Win_Farm: public ff::ff_farm bool _ordered, opt_level_t _opt_level): Win_Farm(_winupdate_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor IV @@ -310,7 +315,9 @@ class Win_Farm: public ff::ff_farm bool _ordered, opt_level_t _opt_level): Win_Farm(_rich_winupdate_func, _win_len, _slide_len, _winType, _pardegree, _name, _closing_func, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor V (Nesting with Pane_Farm) @@ -325,7 +332,7 @@ class Win_Farm: public ff::ff_farm * \param _ordered true if the results of the same key must be emitted in order, false otherwise * \param _opt_level optimization level used to build the operator */ - Win_Farm(const pane_farm_t &_pf, + Win_Farm(pane_farm_t &_pf, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -338,7 +345,7 @@ class Win_Farm: public ff::ff_farm outer_opt_level(_opt_level), inner_type(PF_CPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), used(false) { // type of the Pane_Farm to be created within the Win_Farm operator using panewrap_farm_t = Pane_Farm; @@ -352,6 +359,14 @@ class Win_Farm: public ff::ff_farm std::cerr << RED << "WindFlow Error: Win_Farm has parallelism zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Pane_Farm has not already been used in a nested structure + if (_pf.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Pane_Farm has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _pf.used4Nesting = true; + } // check the compatibility of the windowing parameters if (_pf.win_len != _win_len || _pf.slide_len != _slide_len || _pf.winType != _winType) { std::cerr << RED << "WindFlow Error: incompatible windowing parameters between Win_Farm and Pane_Farm" << DEFAULT_COLOR << std::endl; @@ -428,7 +443,7 @@ class Win_Farm: public ff::ff_farm * \param _ordered true if the results of the same key must be emitted in order, false otherwise * \param _opt_level optimization level used to build the operator */ - Win_Farm(const win_mapreduce_t &_wm, + Win_Farm(win_mapreduce_t &_wm, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -441,7 +456,7 @@ class Win_Farm: public ff::ff_farm outer_opt_level(_opt_level), inner_type(WMR_CPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), used(false) { // type of the Win_MapReduce to be created within the Win_Farm operator using winwrap_map_t = Win_MapReduce; @@ -455,6 +470,14 @@ class Win_Farm: public ff::ff_farm std::cerr << RED << "WindFlow Error: Win_Farm has parallelism zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Win_MapReduce has not already been used in a nested structure + if (_wm.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Win_MapReduce has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _wm.used4Nesting = true; + } // check the compatibility of the windowing parameters if (_wm.win_len != _win_len || _wm.slide_len != _slide_len || _wm.winType != _winType) { std::cerr << RED << "WindFlow Error: incompatible windowing parameters between Win_Farm and Win_MapReduce" << DEFAULT_COLOR << std::endl; @@ -580,6 +603,21 @@ class Win_Farm: public ff::ff_farm { return winType; } + + /** + * \brief Check whether the Win_Farm has been used in a MultiPipe + * \return true if the Win_Farm has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Win_Farm(const Win_Farm &) = delete; // copy constructor + Win_Farm(Win_Farm &&) = delete; // move constructor + Win_Farm &operator=(const Win_Farm &) = delete; // copy assignment operator + Win_Farm &operator=(Win_Farm &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/win_farm_gpu.hpp b/wf/win_farm_gpu.hpp index 3ddbc09b..232608c6 100644 --- a/wf/win_farm_gpu.hpp +++ b/wf/win_farm_gpu.hpp @@ -90,6 +90,7 @@ class Win_Farm_GPU: public ff::ff_farm friend class Win_MapReduce_GPU; template friend auto get_WF_GPU_nested_type(T); + friend class MultiPipe; // flag stating whether the Win_Farm_GPU has been instantiated with complex workers (Pane_Farm_GPU or Win_MapReduce_GPU) bool hasComplexWorkers; // optimization level of the Win_Farm_GPU @@ -105,11 +106,9 @@ class Win_Farm_GPU: public ff::ff_farm size_t inner_parallelism_2; // window type (CB or TB) win_type_t winType; + bool used; // true if the operator has been added/chained in a MultiPipe - // Private Constructor I (stub) - Win_Farm_GPU() {} - - // Private Constructor II + // Private Constructor I Win_Farm_GPU(win_F_t _win_func, uint64_t _win_len, uint64_t _slide_len, @@ -236,7 +235,9 @@ class Win_Farm_GPU: public ff::ff_farm bool _ordered, opt_level_t _opt_level): Win_Farm_GPU(_win_func, _win_len, _slide_len, _winType, _pardegree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len), SEQ) - {} + { + used = false; + } /** * \brief Constructor II (Nesting with Pane_Farm_GPU) @@ -253,7 +254,7 @@ class Win_Farm_GPU: public ff::ff_farm * \param _ordered true if the results of the same key must be emitted in order, false otherwise * \param _opt_level optimization level used to build the operator */ - Win_Farm_GPU(const pane_farm_gpu_t &_pf, + Win_Farm_GPU(pane_farm_gpu_t &_pf, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -268,7 +269,7 @@ class Win_Farm_GPU: public ff::ff_farm outer_opt_level(_opt_level), inner_type(PF_GPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), used(false) { // type of the Pane_Farm_GPU to be created within the Win_Farm_GPU operator using panewrap_farm_gpu_t = Pane_Farm_GPU; @@ -287,6 +288,14 @@ class Win_Farm_GPU: public ff::ff_farm std::cerr << RED << "WindFlow Error: batch length in Win_Farm_GPUcannot be zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Pane_Farm_GPU has not already been used in a nested structure + if (_pf.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Pane_Farm_GPU has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _pf.used4Nesting = true; + } // check the compatibility of the windowing/batching parameters if (_pf.win_len != _win_len || _pf.slide_len != _slide_len || _pf.winType != _winType || _pf.batch_len != _batch_len || _pf.n_thread_block != _n_thread_block) { std::cerr << RED << "WindFlow Error: incompatible windowing and batching parameters betweem Win_Farm_GPU and Pane_Farm_GPU" << DEFAULT_COLOR << std::endl; @@ -345,7 +354,7 @@ class Win_Farm_GPU: public ff::ff_farm * \param _ordered true if the results of the same key must be emitted in order, false otherwise * \param _opt_level optimization level used to build the operator */ - Win_Farm_GPU(const win_mapreduce_gpu_t &_wm, + Win_Farm_GPU(win_mapreduce_gpu_t &_wm, uint64_t _win_len, uint64_t _slide_len, win_type_t _winType, @@ -360,7 +369,7 @@ class Win_Farm_GPU: public ff::ff_farm outer_opt_level(_opt_level), inner_type(WMR_GPU), parallelism(_pardegree), - winType(_winType) + winType(_winType), used(false) { // type of the Win_MapReduce_GPU to be created within the Win_Farm_GPU operator using winwrap_mapreduce_gpu_t = Win_MapReduce_GPU; @@ -379,6 +388,14 @@ class Win_Farm_GPU: public ff::ff_farm std::cerr << RED << "WindFlow Error: batch length in Win_Farm_GPU cannot be zero" << DEFAULT_COLOR << std::endl; exit(EXIT_FAILURE); } + // check that the Win_MapReduce_GPU has not already been used in a nested structure + if (_wm.isUsed4Nesting()) { + std::cerr << RED << "WindFlow Error: Win_MapReduce_GPU has already been used in a nested structure" << DEFAULT_COLOR << std::endl; + exit(EXIT_FAILURE); + } + else { + _wm.used4Nesting = true; + } // check the compatibility of the windowing/batching parameters if (_wm.win_len != _win_len || _wm.slide_len != _slide_len || _wm.winType != _winType || _wm.batch_len != _batch_len || _wm.n_thread_block != _n_thread_block) { std::cerr << RED << "WindFlow Error: incompatible windowing and batching parameters between Win_Farm_GPU and Win_MapReduce_GPU" << DEFAULT_COLOR << std::endl; @@ -484,6 +501,21 @@ class Win_Farm_GPU: public ff::ff_farm { return winType; } + + /** + * \brief Check whether the Win_Farm_GPU has been used in a MultiPipe + * \return true if the Win_Farm_GPU has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /// deleted constructors/operators + Win_Farm_GPU(const Win_Farm_GPU &) = delete; // copy constructor + Win_Farm_GPU(Win_Farm_GPU &&) = delete; // move constructor + Win_Farm_GPU &operator=(const Win_Farm_GPU &) = delete; // copy assignment operator + Win_Farm_GPU &operator=(Win_Farm_GPU &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/win_mapreduce.hpp b/wf/win_mapreduce.hpp index 2000652e..7c03e72c 100644 --- a/wf/win_mapreduce.hpp +++ b/wf/win_mapreduce.hpp @@ -97,6 +97,7 @@ class Win_MapReduce: public ff::ff_pipeline friend class WinFarm_Builder; template friend class KeyFarm_Builder; + friend class MultiPipe; // configuration variables of the Win_MapReduce map_func_t map_func; rich_map_func_t rich_map_func; @@ -120,6 +121,8 @@ class Win_MapReduce: public ff::ff_pipeline bool ordered; opt_level_t opt_level; PatternConfig config; + bool used; // true if the operator has been added/chained in a MultiPipe + bool used4Nesting; // true if the operator has been used in a nested structure // Private Constructor template @@ -278,6 +281,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = false; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -314,6 +319,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = true; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -350,6 +357,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = false; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -386,6 +395,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = true; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -422,6 +433,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = false; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -458,6 +471,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = true; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -494,6 +509,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = false; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -530,6 +547,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = true; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -566,6 +585,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = false; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -602,6 +623,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = true; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -638,6 +661,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = false; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -674,6 +699,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = false; isRichMAP = true; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -710,6 +737,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = false; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -746,6 +775,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = true; isRichREDUCE = false; + used = false; + used4Nesting = false; } /** @@ -782,6 +813,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = false; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -818,6 +851,8 @@ class Win_MapReduce: public ff::ff_pipeline isNICREDUCE = true; isRichMAP = true; isRichREDUCE = true; + used = false; + used4Nesting = false; } /** @@ -855,6 +890,30 @@ class Win_MapReduce: public ff::ff_pipeline { return reduce_degree; } + + /** + * \brief Check whether the Win_MapReduce has been used in a MultiPipe + * \return true if the Win_MapReduce has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /** + * \brief Check whether the Win_MapReduce has been used in a nested structure + * \return true if the Win_MapReduce has been used in a nested structure + */ + bool isUsed4Nesting() const + { + return used4Nesting; + } + + /// deleted constructors/operators + Win_MapReduce(const Win_MapReduce &) = delete; // copy constructor + Win_MapReduce(Win_MapReduce &&) = delete; // move constructor + Win_MapReduce &operator=(const Win_MapReduce &) = delete; // copy assignment operator + Win_MapReduce &operator=(Win_MapReduce &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/win_mapreduce_gpu.hpp b/wf/win_mapreduce_gpu.hpp index 2e6c90f9..3fe9c68f 100644 --- a/wf/win_mapreduce_gpu.hpp +++ b/wf/win_mapreduce_gpu.hpp @@ -93,6 +93,7 @@ class Win_MapReduce_GPU: public ff::ff_pipeline friend class WinFarmGPU_Builder; template friend class KeyFarmGPU_Builder; + friend class MultiPipe; // configuration variables of the Win_MapReduce_GPU F_t gpuFunction; map_func_t map_func; @@ -115,6 +116,8 @@ class Win_MapReduce_GPU: public ff::ff_pipeline bool ordered; opt_level_t opt_level; PatternConfig config; + bool used; // true if the operator has been added/chained in a MultiPipe + bool used4Nesting; // true if the operator has been used in a nested structure // Private Constructor I Win_MapReduce_GPU(F_t _gpuFunction, @@ -597,7 +600,10 @@ class Win_MapReduce_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Win_MapReduce_GPU(_map_func, _reduce_func, _win_len, _slide_len, _winType, _map_degree, _reduce_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Constructor II @@ -630,7 +636,10 @@ class Win_MapReduce_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Win_MapReduce_GPU(_map_func, _reduceupdate_func, _win_len, _slide_len, _winType, _map_degree, _reduce_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Constructor III @@ -663,7 +672,10 @@ class Win_MapReduce_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Win_MapReduce_GPU(_map_func, _reduce_func, _win_len, _slide_len, _winType, _map_degree, _reduce_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Constructor IV @@ -696,7 +708,10 @@ class Win_MapReduce_GPU: public ff::ff_pipeline bool _ordered, opt_level_t _opt_level): Win_MapReduce_GPU(_mapupdate_func, _reduceupdate_func, _win_len, _slide_len, _winType, _map_degree, _reduce_degree, _batch_len, _n_thread_block, _name, _scratchpad_size, _ordered, _opt_level, PatternConfig(0, 1, _slide_len, 0, 1, _slide_len)) - {} + { + used = false; + used4Nesting = false; + } /** * \brief Get the optimization level used to build the operator @@ -733,6 +748,30 @@ class Win_MapReduce_GPU: public ff::ff_pipeline { return reduce_degree; } + + /** + * \brief Check whether the Win_MapReduce_GPU has been used in a MultiPipe + * \return true if the Win_MapReduce_GPU has been added/chained to an existing MultiPipe + */ + bool isUsed() const + { + return used; + } + + /** + * \brief Check whether the Win_MapReduce_GPU has been used in a nested structure + * \return true if the Win_MapReduce_GPU has been used in a nested structure + */ + bool isUsed4Nesting() const + { + return used4Nesting; + } + + /// deleted constructors/operators + Win_MapReduce_GPU(const Win_MapReduce_GPU &) = delete; // copy constructor + Win_MapReduce_GPU(Win_MapReduce_GPU &&) = delete; // move constructor + Win_MapReduce_GPU &operator=(const Win_MapReduce_GPU &) = delete; // copy assignment operator + Win_MapReduce_GPU &operator=(Win_MapReduce_GPU &&) = delete; // move assignment operator }; } // namespace wf diff --git a/wf/win_seq.hpp b/wf/win_seq.hpp index 9d1d7e5a..03fa2066 100644 --- a/wf/win_seq.hpp +++ b/wf/win_seq.hpp @@ -115,7 +115,7 @@ class Win_Seq: public ff::ff_node_t rcv_counter(0), next_lwid(0) { - wins.reserve(DEFAULT_COLOR_VECTOR_CAPACITY); + wins.reserve(DEFAULT_VECTOR_CAPACITY); } // move Constructor diff --git a/wf/win_seq_gpu.hpp b/wf/win_seq_gpu.hpp index d1ebc6a8..54f38469 100644 --- a/wf/win_seq_gpu.hpp +++ b/wf/win_seq_gpu.hpp @@ -145,7 +145,7 @@ class Win_Seq_GPU: public ff::ff_node_t next_lwid(0), batchedWin(0) { - wins.reserve(DEFAULT_COLOR_VECTOR_CAPACITY); + wins.reserve(DEFAULT_VECTOR_CAPACITY); } // move Constructor @@ -296,7 +296,7 @@ class Win_Seq_GPU: public ff::ff_node_t } // initialization with time-based windows else { - tuples_per_batch = DEFAULT_COLOR_BATCH_SIZE_TB; + tuples_per_batch = DEFAULT_BATCH_SIZE_TB; // allocate Bin (with default size) on the GPU gpuErrChk(cudaMalloc((tuple_t **) &Bin, tuples_per_batch * sizeof(tuple_t))); // Bin }