diff --git a/tests/icde_tmp/Makefile b/tests/icde_tmp/Makefile new file mode 100644 index 00000000..eceae080 --- /dev/null +++ b/tests/icde_tmp/Makefile @@ -0,0 +1,59 @@ +# Author: Gabriele Mencagli +# Date: 04/10/2023 + +FF_ROOT = $(HOME)/fastflow +WF_INCLUDES = $(HOME)/WindFlow-3.6.0/wf + +CXX = g++ +CXXFLAGS = -std=c++17 +INCLUDES = -I $(FF_ROOT) -I $(WF_INCLUDES) -I . +MACRO = -DFF_BOUNDED_BUFFER -DDEFAULT_BUFFER_CAPACITY=32786 +OPTFLAGS = -g -O3 +LDFLAGS = -pthread + +NVXX = /usr/local/cuda/bin/nvcc +NVXXFLAGS = -std=c++17 -x cu +NVOPTFLAGS = -w --expt-extended-lambda -O3 -g -gencode arch=compute_80,code=sm_80 -Wno-deprecated-gpu-targets --expt-relaxed-constexpr + +all: test_synth_gpu test_synth_gpu_keyed test_synth_gpu_delayed test_saber test_saber_v2 + +test_synth_gpu.o: test_synth_gpu.cpp + $(NVXX) $(NVXXFLAGS) $(NVOPTFLAGS) $(INCLUDES) $(MACRO) $(OPTFLAGS) $< -c + +test_synth_gpu: test_synth_gpu.o + $(NVXX) test_synth_gpu.o -o test_synth_gpu + +test_synth_gpu_keyed.o: test_synth_gpu_keyed.cpp + $(NVXX) $(NVXXFLAGS) $(NVOPTFLAGS) $(INCLUDES) $(MACRO) $(OPTFLAGS) $< -c + +test_synth_gpu_keyed: test_synth_gpu_keyed.o + $(NVXX) test_synth_gpu_keyed.o -o test_synth_gpu_keyed + +test_synth_gpu_delayed.o: test_synth_gpu_delayed.cpp + $(NVXX) $(NVXXFLAGS) $(NVOPTFLAGS) $(INCLUDES) $(MACRO) $(OPTFLAGS) $< -c + +test_synth_gpu_delayed: test_synth_gpu_delayed.o + $(NVXX) test_synth_gpu_delayed.o -o test_synth_gpu_delayed + +test_saber.o: test_saber.cpp + $(NVXX) $(NVXXFLAGS) $(NVOPTFLAGS) $(INCLUDES) $(MACRO) $(OPTFLAGS) $< -c + +test_saber: test_saber.o + $(NVXX) test_saber.o -o test_saber + +test_saber_v2.o: test_saber_v2.cpp + $(NVXX) $(NVXXFLAGS) $(NVOPTFLAGS) $(INCLUDES) $(MACRO) $(OPTFLAGS) $< -c + +test_saber_v2: test_saber_v2.o + $(NVXX) test_saber_v2.o -o test_saber_v2 + +clean: + rm -f test_synth_gpu + rm -f test_synth_gpu_keyed + rm -f test_synth_gpu_delayed + rm -f test_saber + rm -f test_saber_v2 + rm -f *.o + +.DEFAULT_GOAL := all +.PHONY: all clean diff --git a/tests/icde_tmp/aggregates.hpp b/tests/icde_tmp/aggregates.hpp new file mode 100644 index 00000000..42654c75 --- /dev/null +++ b/tests/icde_tmp/aggregates.hpp @@ -0,0 +1,311 @@ +/******************************************************************************* + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ****************************************************************************** + */ + +/* + * Functors (lift and combine) of the different associative and commutative + * aggregation functions. + */ + +// includes +#include +#include +#include + +using namespace std; + +// lift functor of the SUM aggregate +class Lift_SUM_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v1_t &result) + { + result.key = tuple.key; + result._1 = tuple._1; + } +}; + +// combine functor of the SUM aggregate +class Combine_SUM_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v1_t &input1, const output_v1_t &input2, output_v1_t &result) + { + result.key = input1.key; + result._1 = input1._1 + input2._1; + } +}; + +// lift functor of the COUNT aggregate +class Lift_COUNT_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v1_t &result) + { + result.key = tuple.key; + result._1 = 1; + } +}; + +// combine functor of the COUNT aggregate +class Combine_COUNT_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v1_t &input1, const output_v1_t &input2, output_v1_t &result) + { + result.key = input1.key; + result._1 = input1._1 + input2._1; + } +}; + +// lift functor of the MAX aggregate +class Lift_MAX_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v1_t &result) + { + result.key = tuple.key; + result._1 = tuple._1; + } +}; + +// combine functor of the MAX aggregate +class Combine_MAX_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v1_t &input1, const output_v1_t &input2, output_v1_t &result) + { + result.key = input1.key; + result._1 = (input1._1 > input2._1) ? input1._1 : input2._1; + } +}; + +// lift functor of the MAX_COUNT aggregate +class Lift_MAX_COUNT_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v2_t &result) + { + result.key = tuple.key; + result._1 = 1; + result._2 = tuple._2; + } +}; + +// combine functor of the MAX_COUNT aggregate +class Combine_MAX_COUNT_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v2_t &input1, const output_v2_t &input2, output_v2_t &result) + { + result.key = input1.key; + if (input1._2 > input2._2) { + result._2 = input1._2; + result._1 = input1._1; + } + else if (input1._2 < input2._2) { + result._2 = input2._2; + result._1 = input2._1; + } + else { + result._2 = input2._2; + result._1 = input1._1 + input2._1; + } + } +}; + +// lift functor of the MIN aggregate +class Lift_MIN_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v1_t &result) + { + result.key = tuple.key; + result._1 = tuple._1; + } +}; + +// combine functor of the MIN aggregate +class Combine_MIN_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v1_t &input1, const output_v1_t &input2, output_v1_t &result) + { + result.key = input1.key; + result._1 = (input1._1 < input2._1) ? input1._1 : input2._1; + } +}; + +// lift functor of the MIN_COUNT aggregate +class Lift_MIN_COUNT_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v2_t &result) + { + result.key = tuple.key; + result._1 = 1; + result._2 = tuple._2; + } +}; + +// combine functor of the MIB_COUNT aggregate +class Combine_MIN_COUNT_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v2_t &input1, const output_v2_t &input2, output_v2_t &result) + { + result.key = input1.key; + if (input1._2 < input2._2) { + result._2 = input1._2; + result._1 = input1._1; + } + else if (input1._2 > input2._2) { + result._2 = input2._2; + result._1 = input2._1; + } + else { + result._2 = input2._2; + result._1 = input1._1 + input2._1; + } + } +}; + +// lift functor of the AVG aggregate +class Lift_AVG_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v2_t &result) + { + result.key = tuple.key; + result._1 = 1; + result._2 = tuple._2; + } +}; + +// combine functor of the AVG aggregate +class Combine_AVG_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v2_t &input1, const output_v2_t &input2, output_v2_t &result) + { + result.key = input1.key; + float alpha1 = (((float) input1._1) / (input1._1 + input2._1)); + float alpha2 = (((float) input2._1) / (input1._1 + input2._1)); + result._2 = alpha1 * input1._2 + alpha2 * input2._2; + result._1 + input1._1 + input2._1; + } +}; + +// lift functor of the GEOM aggregate +class Lift_GEOM_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v2_t &result) + { + result.key = tuple.key; + result._1 = 1; + result._2 = tuple._2; + } +}; + +// combine functor of the GEOM aggregate +class Combine_GEOM_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v2_t &input1, const output_v2_t &input2, output_v2_t &result) + { + result.key = input1.key; + float r1 = pow(input1._2, input1._1); + float r2 = pow(input2._2, input2._1); + result._1 = input1._1 + input2._1; + result._2 = pow((r1 * r2), result._1); + } +}; + +// lift functor of the SSTD aggregate +class Lift_SSTD_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v3_t &result) + { + result.key = tuple.key; + result._1 = 1; + result._2 = tuple._2; + result._3 = pow(tuple._2, 2); + result._4 = sqrt((1.0/((float) result._1)) * (result._3 - pow(result._2, 2)/result._1)); + } +}; + +// combine functor of the SSTD aggregate +class Combine_SSTD_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v3_t &input1, const output_v3_t &input2, output_v3_t &result) + { + result.key = input1.key; + result._1 = input1._1 + input2._1; + result._2 = input1._2 + input2._2; + result._3 = input1._3 + input2._3; + result._4 = sqrt((1.0/((float) (result._1 - 1))) * (result._3 - pow(result._2, 2)/result._1)); + } +}; + +// lift functor of the PSTD aggregate +class Lift_PSTD_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_v3_t &result) + { + result.key = tuple.key; + result._1 = 1; + result._2 = tuple._2; + result._3 = pow(tuple._2, 2); + result._4 = sqrt((1.0/((float) result._1)) * (result._3 - pow(result._2, 2)/result._1)); + } +}; + +// combine functor of the PSTD aggregate +class Combine_PSTD_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_v3_t &input1, const output_v3_t &input2, output_v3_t &result) + { + result.key = input1.key; + result._1 = input1._1 + input2._1; + result._2 = input1._2 + input2._2; + result._3 = input1._3 + input2._3; + result._4 = sqrt((1/((float) result._1)) * (result._3 - pow(result._2, 2)/result._1)); + } +}; diff --git a/tests/icde_tmp/common.hpp b/tests/icde_tmp/common.hpp new file mode 100644 index 00000000..8d9e75fa --- /dev/null +++ b/tests/icde_tmp/common.hpp @@ -0,0 +1,287 @@ +/******************************************************************************* + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ****************************************************************************** + */ + +/* + * Data types and operator functors used by the synthetic tests of the + * FFAT_Windows_GPU operator. + */ + +// includes +#include +#include +#include + +using namespace std; +using namespace wf; + +// global variables +extern atomic sent_tuples; + +// input tuple +struct input_t +{ + int key; + int _1; + float _2; + int _3; + + // Default Constructor + __host__ __device__ input_t(): key(0), _1(0), _2(0), _3(0) {} +}; + +// output tuple (version 1) +struct output_v1_t +{ + int key; + int _1; + + // Default Constructor + __host__ __device__ output_v1_t(int _key, uint64_t _wid): key(_key), _1(0) {} + + // Default Constructor + __host__ __device__ output_v1_t(uint64_t _wid): key(0), _1(0) {} + + // Default Constructor + __host__ __device__ output_v1_t(): key(0), _1(0) {} +}; + +// output tuple (version 2) +struct output_v2_t +{ + int key; + int _1; + float _2; + + // Default Constructor + __host__ __device__ output_v2_t(int _key, uint64_t _wid): key(_key), _1(0), _2(0) {} + + __host__ __device__ output_v2_t(uint64_t _wid): key(0), _1(0), _2(0) {} + + // Default Constructor + __host__ __device__ output_v2_t(): key(0), _1(0), _2(0) {} +}; + +// output tuple (version 3) +struct output_v3_t +{ + int key; + int _1; + float _2; + int _3; + float _4; + + // Default Constructor + __host__ __device__ output_v3_t(int _key, uint64_t _wid): key(_key), _1(0), _2(0), _3(0), _4(0) {} + + // Default Constructor + __host__ __device__ output_v3_t(uint64_t _wid): key(0), _1(0), _2(0), _3(0), _4(0) {} + + // Default Constructor + __host__ __device__ output_v3_t(): key(0), _1(0), _2(0), _3(0), _4(0) {} +}; + +// inizialize the input buffer of tuples +void init_inputBuffer(input_t *_buffer, size_t _size, size_t _num_keys) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::random_device rd2; + std::mt19937 gen2(rd2()); + std::random_device rd3; + std::mt19937 gen3(rd3()); + std::uniform_int_distribution<> distrib_keys(0, _num_keys-1); + std::uniform_int_distribution<> distrib_int_values(0, 1000); + std::uniform_real_distribution<> distrib_fp_values(0, 1000); + for (size_t i=0; i<_size; i++) { + _buffer[i].key = distrib_keys(gen); + _buffer[i]._1 = distrib_int_values(gen2); + _buffer[i]._2 = distrib_fp_values(gen3); + _buffer[i]._3 = distrib_int_values(gen2); + } +} + +// Source functor +class Source_Functor +{ +private: + unsigned long app_start_time; // application start time + unsigned long generated_tuples; // tuples counter + input_t *buffer; // buffer of tuples to be read + size_t size_buffer; // number of tuples in the buffer + +public: + // Constructor + Source_Functor(input_t *_buffer, + size_t _size_buffer, + unsigned long _app_start_time): + buffer(_buffer), + size_buffer(_size_buffer), + app_start_time(_app_start_time), + generated_tuples(0) {} + + // operator() + void operator()(Source_Shipper &shipper) + { + bool endGeneration = false; + unsigned long current_time = current_time_nsecs(); + uint64_t start_time = current_time_usecs(); + uint64_t timestamp = 0; + size_t idx = 0; + while (!endGeneration) { + timestamp = (current_time_usecs() - start_time); + shipper.pushWithTimestamp(buffer[idx], timestamp); // send the tuple + idx = (idx + 1) % size_buffer; + shipper.setNextWatermark(timestamp); + generated_tuples++; + if (generated_tuples % 10000 == 0) { + current_time = current_time_nsecs(); + } + // check EOS + if (current_time - app_start_time >= 120e9) { + sent_tuples.fetch_add(generated_tuples); + endGeneration = true; + } + } + } +}; + +// Source functor (Version 2) +class Source_Functor_V2 +{ +private: + unsigned long app_start_time; // application start time + unsigned long generated_tuples; // tuples counter + input_t *buffer; // buffer of tuples to be read + size_t size_buffer; // number of tuples in the buffer + size_t n_keys; // total number of keys; + size_t local_n_keys; // total number of keys assigned to this replica + +public: + // Constructor + Source_Functor_V2(input_t *_buffer, + size_t _size_buffer, + unsigned long _app_start_time, + size_t _n_keys): + app_start_time(_app_start_time), + generated_tuples(0), + buffer(_buffer), + size_buffer(_size_buffer), + n_keys(_n_keys) {} + + // operator() + void operator()(Source_Shipper &shipper, RuntimeContext &ctx) + { + local_n_keys = n_keys / ctx.getParallelism(); + bool endGeneration = false; + unsigned long current_time = current_time_nsecs(); + uint64_t start_time = current_time_usecs(); + uint64_t timestamp = 0; + size_t idx = 0; + while (!endGeneration) { + timestamp = (current_time_usecs() - start_time); + input_t t = buffer[idx]; + if (local_n_keys > 0) { + t.key = (local_n_keys * ctx.getReplicaIndex()) + (t.key % ctx.getParallelism()); + } + shipper.pushWithTimestamp(std::move(t), timestamp); // send the tuple + idx = (idx + 1) % size_buffer; + shipper.setNextWatermark(timestamp); + generated_tuples++; + if (generated_tuples % 10000 == 0) { + current_time = current_time_nsecs(); + } + // check EOS + if (current_time - app_start_time >= 120e9) { + sent_tuples.fetch_add(generated_tuples); + endGeneration = true; + } + } + } +}; + +// Source functor (delayed) +class Source_Functor_Delayed +{ +private: + unsigned long app_start_time; // application start time + unsigned long generated_tuples; // tuples counter + input_t *buffer; // buffer of tuples to be read + size_t size_buffer; // number of tuples in the buffer + unsigned long delay_usec; // average delay of tuples (in microseconds) + +public: + // Constructor + Source_Functor_Delayed(input_t *_buffer, + size_t _size_buffer, + unsigned long _app_start_time, + unsigned long _delay_usec): + buffer(_buffer), + size_buffer(_size_buffer), + app_start_time(_app_start_time), + generated_tuples(0), + delay_usec(_delay_usec) {} + + // operator() + void operator()(Source_Shipper &shipper) + { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distrib_delay(0, (2*delay_usec)-1); + bool endGeneration = false; + unsigned long current_time = current_time_nsecs(); + uint64_t start_time = current_time_usecs(); + uint64_t timestamp = 0; + size_t idx = 0; + while (!endGeneration) { + timestamp = (current_time_usecs() - start_time); + shipper.pushWithTimestamp(buffer[idx], timestamp + distrib_delay(gen)); // send the tuple + idx = (idx + 1) % size_buffer; + shipper.setNextWatermark(timestamp); + generated_tuples++; + if (generated_tuples % 10000 == 0) { + current_time = current_time_nsecs(); + } + // check EOS + if (current_time - app_start_time >= 120e9) { + sent_tuples.fetch_add(generated_tuples); + endGeneration = true; + } + } + } +}; + +// Sink functor +template +class Sink_Functor +{ +private: + size_t received; + +public: + // Constructor + Sink_Functor(): received(0) {} + + // operator() + void operator()(optional &out) + { + if (out) { + received++; + } + else { + std::cout << "Received " << received << " window results" << std::endl; + } + } +}; diff --git a/tests/icde_tmp/common_saber.hpp b/tests/icde_tmp/common_saber.hpp new file mode 100644 index 00000000..55dd3057 --- /dev/null +++ b/tests/icde_tmp/common_saber.hpp @@ -0,0 +1,223 @@ +/******************************************************************************* + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ****************************************************************************** + */ + +/* + * Data types and operator functors used by the test for the comparison + * between WindFlow and Saber. + * + * We report below some configuration parameters used by Saber (experiment Fig. 11b): + * batch size -> 1048576 + * winType -> CB + * window length -> 1024 + * window slide -> variable from 2 to 1024 + * attributes per tuple (except timestamp) -> 6 (float, int, ..., int) + * groups -> 0 + * tuples_per_insert -> 32768 + */ + +// includes +#include +#include +#include + +using namespace std; +using namespace wf; + +// global variables +extern atomic sent_tuples; + +// input tuple +struct input_t +{ + long t; + float _1; + int _2; + int _3; + int _4; + int _5; + int _6; + + // Default Constructor + __host__ __device__ input_t(): t(0), _1(0), _2(0), _3(0), _4(0), _5(0), _6(0) {} + +} __attribute__((aligned(1))); + +// output tuple +struct output_t +{ + long t; + float _1; + int _2; + + // Default Constructor + __host__ __device__ output_t(): t(0), _1(0), _2(0) {} + + // Constructor + __host__ __device__ output_t(uint64_t _id) {} +} __attribute__((aligned(1))); + +// class Lift_Functor on GPU +class Lift_Functor_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const input_t &tuple, output_t &result) + { + result.t = tuple.t; + result._1 = tuple._1; + result._2 = tuple._2; + } +}; + +// Combine functor on GPU +class Comb_Functor_GPU +{ +public: + // operator() + __host__ __device__ void operator()(const output_t &input1, const output_t &input2, output_t &result) + { + result.t = (input1.t < input2.t) ? input1.t : input2.t; + result._1 = input1._1 + input2._1; + result._2 = input1._2 + input2._2; + } +}; + +// function to inizialize the input buffer of tuples +void init_inputBuffer(input_t *_buffer, size_t _size) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_real_distribution dis(0, 1.0); + for (size_t i=0; i<_size; i++) { + _buffer[i].t = 1; + _buffer[i]._1 = dis(gen); + _buffer[i]._2 = 1; + _buffer[i]._3 = 1; + _buffer[i]._4 = 1; + _buffer[i]._5 = 1; + _buffer[i]._6 = 1; + } +} + +// Source functor +class Source_Functor +{ +private: + unsigned long app_start_time; // application start time + unsigned long generated_tuples; // tuples counter + input_t *buffer; // buffer of tuples to be read + size_t size_buffer; // number of tuples in the buffer + +public: + // Constructor + Source_Functor(input_t *_buffer, + size_t _size_buffer, + unsigned long _app_start_time): + buffer(_buffer), + size_buffer(_size_buffer), + app_start_time(_app_start_time), + generated_tuples(0) {} + + // operator() + void operator()(Source_Shipper &shipper) + { + bool endGeneration = false; + unsigned long current_time = current_time_nsecs(); + uint64_t timestamp = 0; + size_t idx = 0; + while (!endGeneration) { + shipper.pushWithTimestamp(buffer[idx], timestamp); // send the tuple + idx = (idx + 1) % size_buffer; + shipper.setNextWatermark(timestamp); + timestamp++; + generated_tuples++; + if (generated_tuples % 10000 == 0) { + current_time = current_time_nsecs(); + } + // check EOS + if (current_time - app_start_time >= 120e9) { + sent_tuples.fetch_add(generated_tuples); + endGeneration = true; + } + } + } +}; + +// Source functor (Version 2) +class Source_Functor_V2 +{ +private: + unsigned long app_start_time; // application start time + unsigned long generated_tuples; // tuples counter + input_t *buffer; // buffer of tuples to be read + size_t size_buffer; // number of tuples in the buffer + size_t delay; // delay to generate timestamps + +public: + // Constructor + Source_Functor_V2(input_t *_buffer, + size_t _size_buffer, + unsigned long _app_start_time, + size_t _delay): + buffer(_buffer), + size_buffer(_size_buffer), + app_start_time(_app_start_time), + delay(_delay), + generated_tuples(0) {} + + // operator() + void operator()(Source_Shipper &shipper) + { + bool endGeneration = false; + unsigned long current_time = current_time_nsecs(); + uint64_t timestamp = 0; + size_t idx = 0; + while (!endGeneration) { + shipper.pushWithTimestamp(buffer[idx], timestamp); // send the tuple + idx = (idx + 1) % size_buffer; + shipper.setNextWatermark(timestamp); + timestamp += delay; + generated_tuples++; + if (generated_tuples % 10000 == 0) { + current_time = current_time_nsecs(); + } + // check EOS + if (current_time - app_start_time >= 120e9) { + sent_tuples.fetch_add(generated_tuples); + endGeneration = true; + } + } + } +}; + +// Sink functor +class Sink_Functor +{ +private: + size_t received; + +public: + // Constructor + Sink_Functor(): received(0) {} + + // operator() + void operator()(optional &out) + { + if (out) { + received++; + } + } +}; diff --git a/tests/icde_tmp/test_saber.cpp b/tests/icde_tmp/test_saber.cpp new file mode 100644 index 00000000..967e769b --- /dev/null +++ b/tests/icde_tmp/test_saber.cpp @@ -0,0 +1,114 @@ +/****************************************************************************** + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ****************************************************************************** + */ + +/* + * Test for the comparison between WindFlow and Saber. + */ + +// includes +#include +#include +#include +#include +#include +#include +#include +#include"common_saber.hpp" + +#define BUFFER_SIZE 32768 + +using namespace std; +using namespace chrono; +using namespace wf; + +// global variables +atomic sent_tuples; + +// main +int main(int argc, char *argv[]) +{ + int option = 0; + size_t src_degree = 1; + size_t win_len = 0; + size_t win_slide = 0; + size_t batch_size = 0; + size_t win_batch_size = 1; + // initialize global variable + sent_tuples = 0; + // arguments from command line + if (argc != 11) { + cout << argv[0] << " -n [num sources] -w [win length] -s [win slide] -b [batch size] -r [num wins per batch]" << endl; + exit(EXIT_SUCCESS); + } + while ((option = getopt(argc, argv, "n:k:w:s:b:r:")) != -1) { + switch (option) { + case 'n': src_degree = atoi(optarg); + break; + case 'w': win_len = atoi(optarg); + break; + case 's': win_slide = atoi(optarg); + break; + case 'b': batch_size = atoi(optarg); + break; + case 'r': win_batch_size = atoi(optarg); + break; + default: { + cout << argv[0] << " -n [num sources] -w [win length] -s [win slide] -b [batch size] -r [num wins per batch]" << endl; + exit(EXIT_SUCCESS); + } + } + } + // create and initialize the input stream shared by all sources + input_t *buffer = (input_t *) malloc(sizeof(input_t) * BUFFER_SIZE); + init_inputBuffer(buffer, BUFFER_SIZE); + // application starting time + volatile unsigned long app_start_time = current_time_nsecs(); + + // prepare the test + PipeGraph graph("test_saber", Execution_Mode_t::DEFAULT, Time_Policy_t::EVENT_TIME); + + Source_Functor source_functor(buffer, BUFFER_SIZE, app_start_time); + Source source = Source_Builder(source_functor) + .withName("source") + .withParallelism(src_degree) + .withOutputBatchSize(batch_size) + .build(); + MultiPipe &mp = graph.add_source(source); + Lift_Functor_GPU lift_functor; + Comb_Functor_GPU comb_functor; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift_functor, comb_functor) + .withName("fat_gpu") + .withCBWindows(win_len, win_slide) + .withNumWinPerBatch(win_batch_size) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + + volatile unsigned long start_time_main_usecs = current_time_usecs(); + // run the application + graph.run(); + volatile unsigned long end_time_main_usecs = current_time_usecs(); + double elapsed_time_seconds = (end_time_main_usecs - start_time_main_usecs) / (1000000.0); + double throughput = sent_tuples / elapsed_time_seconds; + cout << "Measured throughput: " << (int) throughput << " tuples/second" << endl; + cout << "Measured throughput: " << ((int) throughput * (sizeof(batch_item_gpu_t))) / (1024*1024) << " MB/s" << endl; + return 0; +} diff --git a/tests/icde_tmp/test_synth_gpu.cpp b/tests/icde_tmp/test_synth_gpu.cpp new file mode 100644 index 00000000..5394011c --- /dev/null +++ b/tests/icde_tmp/test_synth_gpu.cpp @@ -0,0 +1,273 @@ +/****************************************************************************** + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ****************************************************************************** + */ + +/* + * Test of the FFAT_Windows_GPU operator with a given aggregate function. + */ + +// includes +#include +#include +#include +#include +#include +#include +#include +#include"common.hpp" +#include"aggregates.hpp" + +#define BUFFER_SIZE 32768 + +using namespace std; +using namespace chrono; +using namespace wf; + +// global variables +atomic sent_tuples; + +// main +int main(int argc, char *argv[]) +{ + int option = 0; + size_t n = 1; + size_t w = 0; + size_t s = 0; + size_t b = 0; + size_t r = 1; + string agg; + // initialize global variable + sent_tuples = 0; + // arguments from command line + if (argc != 13) { + cout << argv[0] << " -n [num sources] -w [win length usec] -s [win slide usec] -b [batch size] -r [num wins per batch] -a [aggregation]" << endl; + exit(EXIT_SUCCESS); + } + while ((option = getopt(argc, argv, "n:w:s:b:r:a:")) != -1) { + switch (option) { + case 'n': n = atoi(optarg); + break; + case 'w': w = atoi(optarg); + break; + case 's': s = atoi(optarg); + break; + case 'b': b = atoi(optarg); + break; + case 'r': r = atoi(optarg); + break; + case 'a': agg = string(optarg); + break; + default: { + cout << argv[0] << " -n [num sources] -w [win length usec] -s [win slide usec] -b [batch size] -r [num wins per batch] -a [aggregation]" << endl; + exit(EXIT_SUCCESS); + } + } + } + + // create and initialize the input stream shared by all sources + input_t *buffer = (input_t *) malloc(sizeof(input_t) * BUFFER_SIZE); + init_inputBuffer(buffer, BUFFER_SIZE, 1); + + // application starting time + volatile unsigned long app_start_time = current_time_nsecs(); + + // prepare the application + PipeGraph graph("test_synth_gpu", Execution_Mode_t::DEFAULT, Time_Policy_t::EVENT_TIME); + + Source_Functor source_functor(buffer, BUFFER_SIZE, app_start_time); + Source source = Source_Builder(source_functor) + .withName("source") + .withParallelism(n) + .withOutputBatchSize(b) + .build(); + MultiPipe &mp = graph.add_source(source); + + if (agg == "sum") { + Lift_SUM_GPU lift; + Combine_SUM_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("sum_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "count") { + Lift_COUNT_GPU lift; + Combine_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("count_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "max") { + Lift_MAX_GPU lift; + Combine_MAX_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("max_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "max_count") { + Lift_MAX_COUNT_GPU lift; + Combine_MAX_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("maxcount_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "min") { + Lift_MIN_GPU lift; + Combine_MIN_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("min_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "min_count") { + Lift_MIN_COUNT_GPU lift; + Combine_MIN_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("mincount_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "avg") { + Lift_AVG_GPU lift; + Combine_AVG_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("avg_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "geom") { + Lift_GEOM_GPU lift; + Combine_GEOM_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("geom_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "sstd") { + Lift_SSTD_GPU lift; + Combine_SSTD_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("sstd_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "pstd") { + Lift_PSTD_GPU lift; + Combine_PSTD_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("Pstd_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else { + cout << "Not a valid aggregate!" << endl; + abort(); + } + + volatile unsigned long start_time_main_usecs = current_time_usecs(); + // run the application + graph.run(); + volatile unsigned long end_time_main_usecs = current_time_usecs(); + + // compute statistics + double elapsed_time_seconds = (end_time_main_usecs - start_time_main_usecs) / (1000000.0); + double throughput = sent_tuples / elapsed_time_seconds; + cout << "Measured throughput: " << (int) throughput << " tuples/second" << endl; + cout << "Measured throughput: " << ((int) throughput * (sizeof(batch_item_gpu_t) + 8)) / (1024*1024) << " MB/s" << endl; + return 0; +} diff --git a/tests/icde_tmp/test_synth_gpu_delayed.cpp b/tests/icde_tmp/test_synth_gpu_delayed.cpp new file mode 100644 index 00000000..e45f694d --- /dev/null +++ b/tests/icde_tmp/test_synth_gpu_delayed.cpp @@ -0,0 +1,277 @@ +/****************************************************************************** + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ****************************************************************************** + */ + +/* + * Test of the FFAT_Windows_GPU operator with a given aggregate function + * and a controlled delay of the input stream. + */ + +// includes +#include +#include +#include +#include +#include +#include +#include +#include"common.hpp" +#include"aggregates.hpp" + +#define BUFFER_SIZE 32768 + +using namespace std; +using namespace chrono; +using namespace wf; + +// global variables +atomic sent_tuples; + +// main +int main(int argc, char *argv[]) +{ + int option = 0; + size_t n = 1; + size_t w = 0; + size_t s = 0; + size_t b = 0; + size_t r = 1; + size_t d = 0; + string agg; + // initialize global variable + sent_tuples = 0; + // arguments from command line + if (argc != 15) { + cout << argv[0] << " -n [num sources] -w [win length usec] -s [win slide usec] -b [batch size] -r [num wins per batch] -a [aggregation] -d [avg delay usec]" << endl; + exit(EXIT_SUCCESS); + } + while ((option = getopt(argc, argv, "n:w:s:b:r:a:d:")) != -1) { + switch (option) { + case 'n': n = atoi(optarg); + break; + case 'w': w = atoi(optarg); + break; + case 's': s = atoi(optarg); + break; + case 'b': b = atoi(optarg); + break; + case 'r': r = atoi(optarg); + break; + case 'a': agg = string(optarg); + break; + case 'd': d = atoi(optarg); + break; + default: { + cout << argv[0] << " -n [num sources] -w [win length usec] -s [win slide usec] -b [batch size] -r [num wins per batch] -a [aggregation] -d [avg delay usec]" << endl; + exit(EXIT_SUCCESS); + } + } + } + + // create and initialize the input stream shared by all sources + input_t *buffer = (input_t *) malloc(sizeof(input_t) * BUFFER_SIZE); + init_inputBuffer(buffer, BUFFER_SIZE, 1); + + // application starting time + volatile unsigned long app_start_time = current_time_nsecs(); + + // prepare the application + PipeGraph graph("test_synth_gpu_delayed", Execution_Mode_t::DEFAULT, Time_Policy_t::EVENT_TIME); + + Source_Functor_Delayed source_functor(buffer, BUFFER_SIZE, app_start_time, d); + Source source = Source_Builder(source_functor) + .withName("source") + .withParallelism(n) + .withOutputBatchSize(b) + .build(); + MultiPipe &mp = graph.add_source(source); + + if (agg == "sum") { + Lift_SUM_GPU lift; + Combine_SUM_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("sum_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "count") { + Lift_COUNT_GPU lift; + Combine_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("count_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "max") { + Lift_MAX_GPU lift; + Combine_MAX_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("max_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "max_count") { + Lift_MAX_COUNT_GPU lift; + Combine_MAX_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("maxcount_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "min") { + Lift_MIN_GPU lift; + Combine_MIN_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("min_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "min_count") { + Lift_MIN_COUNT_GPU lift; + Combine_MIN_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("mincount_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "avg") { + Lift_AVG_GPU lift; + Combine_AVG_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("avg_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "geom") { + Lift_GEOM_GPU lift; + Combine_GEOM_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("geom_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "sstd") { + Lift_SSTD_GPU lift; + Combine_SSTD_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("sstd_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "pstd") { + Lift_PSTD_GPU lift; + Combine_PSTD_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("Pstd_gpu") + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else { + cout << "Not a valid aggregate!" << endl; + abort(); + } + + volatile unsigned long start_time_main_usecs = current_time_usecs(); + // run the application + graph.run(); + volatile unsigned long end_time_main_usecs = current_time_usecs(); + + // compute statistics + double elapsed_time_seconds = (end_time_main_usecs - start_time_main_usecs) / (1000000.0); + double throughput = sent_tuples / elapsed_time_seconds; + cout << "Measured throughput: " << (int) throughput << " tuples/second" << endl; + cout << "Measured throughput: " << ((int) throughput * (sizeof(batch_item_gpu_t) + 8)) / (1024*1024) << " MB/s" << endl; + return 0; +} diff --git a/tests/icde_tmp/test_synth_gpu_keyed.cpp b/tests/icde_tmp/test_synth_gpu_keyed.cpp new file mode 100644 index 00000000..679a1745 --- /dev/null +++ b/tests/icde_tmp/test_synth_gpu_keyed.cpp @@ -0,0 +1,287 @@ +/****************************************************************************** + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 3 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + ****************************************************************************** + */ + +/* + * Test of the FFAT_Windows_GPU operator with a given aggregate function. + * Version with multiple keys. + */ + +// includes +#include +#include +#include +#include +#include +#include +#include +#include"common.hpp" +#include"aggregates.hpp" + +#define BUFFER_SIZE 32768 + +using namespace std; +using namespace chrono; +using namespace wf; + +// global variables +atomic sent_tuples; + +// main +int main(int argc, char *argv[]) +{ + int option = 0; + size_t n = 1; + size_t w = 0; + size_t s = 0; + size_t k = 1; + size_t b = 0; + size_t r = 1; + string agg; + // initialize global variable + sent_tuples = 0; + // arguments from command line + if (argc != 15) { + cout << argv[0] << " -n [num sources] -k [n_keys] -w [win length usec] -s [win slide usec] -b [batch size] -r [num wins per batch] -a [aggregation]" << endl; + exit(EXIT_SUCCESS); + } + while ((option = getopt(argc, argv, "n:k:w:s:b:r:a:")) != -1) { + switch (option) { + case 'n': n = atoi(optarg); + break; + case 'k': k = atoi(optarg); + break; + case 'w': w = atoi(optarg); + break; + case 's': s = atoi(optarg); + break; + case 'b': b = atoi(optarg); + break; + case 'r': r = atoi(optarg); + break; + case 'a': agg = string(optarg); + break; + default: { + cout << argv[0] << " -n [num sources] -k [n_keys] -w [win length usec] -s [win slide usec] -b [batch size] -r [num wins per batch] -a [aggregation]" << endl; + exit(EXIT_SUCCESS); + } + } + } + + // create and initialize the input stream shared by all sources + input_t *buffer = (input_t *) malloc(sizeof(input_t) * BUFFER_SIZE); + init_inputBuffer(buffer, BUFFER_SIZE, k); + + // application starting time + volatile unsigned long app_start_time = current_time_nsecs(); + + // prepare the application + PipeGraph graph("test_synth_gpu_keyed", Execution_Mode_t::DEFAULT, Time_Policy_t::EVENT_TIME); + + Source_Functor source_functor(buffer, BUFFER_SIZE, app_start_time); + Source source = Source_Builder(source_functor) + .withName("source") + .withParallelism(n) + .withOutputBatchSize(b) + .build(); + MultiPipe &mp = graph.add_source(source); + + if (agg == "sum") { + Lift_SUM_GPU lift; + Combine_SUM_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("sum_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "count") { + Lift_COUNT_GPU lift; + Combine_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("count_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "max") { + Lift_MAX_GPU lift; + Combine_MAX_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("max_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "max_count") { + Lift_MAX_COUNT_GPU lift; + Combine_MAX_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("maxcount_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "min") { + Lift_MIN_GPU lift; + Combine_MIN_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("min_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "min_count") { + Lift_MIN_COUNT_GPU lift; + Combine_MIN_COUNT_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("mincount_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "avg") { + Lift_AVG_GPU lift; + Combine_AVG_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("avg_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "geom") { + Lift_GEOM_GPU lift; + Combine_GEOM_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("geom_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "sstd") { + Lift_SSTD_GPU lift; + Combine_SSTD_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("sstd_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else if (agg == "pstd") { + Lift_PSTD_GPU lift; + Combine_PSTD_GPU comb; + Ffat_Windows_GPU ffat_gpu = Ffat_WindowsGPU_Builder(lift, comb) + .withName("Pstd_gpu") + .withKeyBy([] __host__ __device__ (const input_t &in) { return in.key; }) + .withTBWindows(microseconds(w), microseconds(s)) + .withNumWinPerBatch(r) + .build(); + mp.add(ffat_gpu); + Sink_Functor sink_functor; + Sink sink = Sink_Builder(sink_functor) + .withName("sink") + .withParallelism(1) + .build(); + mp.chain_sink(sink); + } + else { + cout << "Not a valid aggregate!" << endl; + abort(); + } + + volatile unsigned long start_time_main_usecs = current_time_usecs(); + // run the application + graph.run(); + volatile unsigned long end_time_main_usecs = current_time_usecs(); + + // compute statistics + double elapsed_time_seconds = (end_time_main_usecs - start_time_main_usecs) / (1000000.0); + double throughput = sent_tuples / elapsed_time_seconds; + cout << "Measured throughput: " << (int) throughput << " tuples/second" << endl; + cout << "Measured throughput: " << ((int) throughput * (sizeof(batch_item_gpu_t) + 8)) / (1024*1024) << " MB/s" << endl; + return 0; +}