Skip to content

Commit

Permalink
[ADD] Added files of release 4.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mencagli committed Dec 12, 2024
1 parent a85048d commit f23cb93
Show file tree
Hide file tree
Showing 40 changed files with 359 additions and 390 deletions.
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "WindFlow_Dashboard"]
path = WindFlow_Dashboard
url = https://github.com/ParaGroup/WindFlow_Dashboard
path = WindFlow_Dashboard
url = https://github.com/ParaGroup/WindFlow_Dashboard
2 changes: 1 addition & 1 deletion API
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ The corresponding builder needs two parameters (for the lift and combine logics)
__host__ __device__ void(const result_t &, const result_t &, result_t &);

Interval_Join
----------------
-------------
std::optional<result_t> (const tuple_t &, const tuple_t &)
std::optional<result_t> (const tuple_t &, const tuple_t &, RuntimeContext &)

Expand Down
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ Fausto Frasca, contributor (Web Dashboard)
Alberto Ottimo, contributor
Matteo Della Bartola, contributor
Simone Frassinelli, contributor
Yuriy Rymarchuk, contributor
3 changes: 0 additions & 3 deletions tests/join_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ set(CMAKE_CXX_FLAGS_RELEASE "-O3 -g -finline-functions")

# Macros to be provided to the compiler
add_definitions(-DFF_BOUNDED_BUFFER)
# -DWF_JOIN_MEASUREMENT to enable the measurement of how uniformerly the tuples are distributed among the joiners
# -DWF_TRACING_ENABLED to enable the tracing with Dashboard

# Header files of WindFlow and FastFlow
include_directories(${PROJECT_SOURCE_DIR}/wf ${ff_root_dir})

# Linking to pthread
# cdt gvc cgraph to enable the tracing with Dashboard
link_libraries(pthread)

# Set output directory
Expand Down
8 changes: 3 additions & 5 deletions tests/join_tests/join_common.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**************************************************************************************
* Copyright (c) 2023- Gabriele Mencagli and Yuriy Rymarchuk
* Copyright (c) 2024- Gabriele Mencagli and Yuriy Rymarchuk
*
* This file is part of WindFlow.
*
Expand Down Expand Up @@ -42,7 +42,6 @@ struct tuple_t
int64_t value;
};

#if 1
template<>
struct std::hash<tuple_t>
{
Expand All @@ -53,7 +52,6 @@ struct std::hash<tuple_t>
return h1 ^ h2;
}
};
#endif

struct res_t
{
Expand Down Expand Up @@ -97,7 +95,7 @@ class Source_Positive_Functor
shipper.setNextWatermark(next_ts);
}
auto offset = (distribution(generator)+1);
next_ts += offset*1000; // in ms
next_ts += offset * 1000; // in ms
}
}
}
Expand Down Expand Up @@ -141,7 +139,7 @@ class Source_Negative_Functor
shipper.setNextWatermark(next_ts);
}
auto offset = (distribution(generator)+1);
next_ts += offset*1000; // in ms
next_ts += offset * 1000; // in ms
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/join_tests/test_join_1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/

/*
* Test 1 of general graphs of operators.
* Test 1 of the Interval Join operator.
*
* +---------------------+ +-----------+
* | +-----+ +-----+ | | +-----+ |
Expand Down Expand Up @@ -264,7 +264,7 @@ int main(int argc, char *argv[])
}
check_degree += (sink1_degree + sink2_degree);
// prepare the test
PipeGraph graph("test_graph_1 (DETERMINISTIC)", Execution_Mode_t::DETERMINISTIC, Time_Policy_t::EVENT_TIME);
PipeGraph graph("test_join_1 (DETERMINISTIC)", Execution_Mode_t::DETERMINISTIC, Time_Policy_t::EVENT_TIME);
// prepare the first MultiPipe
Source_Positive_Functor source_functor_positive(stream_len, n_keys, false);
Source source1 = Source_Builder(source_functor_positive)
Expand Down
2 changes: 1 addition & 1 deletion tests/join_tests/test_join_2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/

/*
* Test 2 of general graphs of operators.
* Test 2 of the Interval Join operator.
*
* +---------------------+
* | +-----+ +-----+ |
Expand Down
2 changes: 1 addition & 1 deletion tests/join_tests/test_join_3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/

/*
* Test 3 of merge between MultiPipes.
* Test 3 of the Interval Join operator.
*
* +-----------+
* | +-----+ |
Expand Down
2 changes: 1 addition & 1 deletion tests/join_tests/test_join_4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/

/*
* Test 11 of general graphs of operators.
* Test 4 of the Interval Join operator.
* +---------------------+
* | +-----+ +-----+ |
* | | S | | F | |
Expand Down
2 changes: 1 addition & 1 deletion tests/join_tests/test_join_5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/

/*
* Test 7 of general graphs of operators.
* Test 5 of the Interval Join operator.
*
* +---------------------+
* | +-----+ +-----+ |
Expand Down
14 changes: 6 additions & 8 deletions wf/archive.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*
* @section Archive (Description)
*
* Buffer of tuples received from the input streams (used in window-based or join-based operators).
* Abstract class implementing the archive used by window-based and join-based operators.
*/

#ifndef ARCHIVE_H
Expand All @@ -47,25 +47,26 @@ template<typename tuple_t, typename compare_func_t>
class Archive
{
protected:
//using compare_func_t = std::function<bool(const wrapper_t &, const wrapper_t &)>; // function type to compare two wrapped tuples
using wrapper_t = wrapper_tuple_t<tuple_t>; // alias for the wrapped tuple type
using iterator_t = typename std::deque<wrapper_t>::iterator; // iterator type
compare_func_t lessThan; // function to compare two wrapped tuples
std::deque<wrapper_t> archive; // container implementing the ordered archive of wrapped tuples

// Constructor
Archive(compare_func_t _lessThan):
lessThan(_lessThan) {}
lessThan(_lessThan) {}

public:
// Destructor
virtual ~Archive() = default;

// Add a wrapped tuple to the archive (copy semantics)
virtual void insert(const wrapper_t &_wt) = 0;

// Add a wrapped tuple to the archive (move semantics)
virtual void insert(wrapper_t &&_wt) = 0;

// Remove all the tuples prior to _wt in the ordering
// Remove all the tuples prior to _wt in the archive
virtual size_t purge(const wrapper_t &_wt) = 0;

// Get the size of the archive
Expand All @@ -85,9 +86,6 @@ class Archive
{
return archive.end();
}

virtual ~Archive() = default;

};

} // namespace wf
Expand Down
7 changes: 2 additions & 5 deletions wf/basic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ enum class Time_Policy_t { INGRESS_TIME, EVENT_TIME };
enum class Win_Type_t { CB, TB }; // CB = count based, TB = time based

/// Supported interval join operating modes
// KP = Key Parallelism, DP = Data Parallelism with single-key buffers
enum class Join_Mode_t { NONE, KP, DP };

/// Supported interval join stream tagging
enum class Join_Stream_t { NONE, A, B };

/// Routing modes to distribute inputs to the replicas of an operator
Expand Down Expand Up @@ -316,11 +316,8 @@ inline uint64_t compute_gcd(uint64_t u, uint64_t v)
template<typename tuple_t>
struct wrapper_tuple_t
{
//tuple
tuple_t tuple;
// [Win] identifier (CB) or timestamp (TB)
// [Join] timestamp or watermark
uint64_t index;
uint64_t index; // [win] identifier (CB), timestamp (TB); [join] timestamp or watermark

// Constructor I
wrapper_tuple_t() {}
Expand Down
4 changes: 2 additions & 2 deletions wf/basic_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ class Basic_Replica: public ff::ff_monode
}
else {
emitter = (_other.emitter)->clone(); // clone the emitter if it exists
doEmit = emitter->get_doEmit();
doEmit_inplace = emitter->get_doEmit_inplace();
doEmit = emitter->get_doEmit();
doEmit_inplace = emitter->get_doEmit_inplace();
}
#if defined (WF_TRACING_ENABLED)
stats_record = _other.stats_record;
Expand Down
15 changes: 9 additions & 6 deletions wf/batch_cpu_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ struct Batch_CPU_t: Batch_t<tuple_t>
std::atomic<size_t> delete_counter; // atomic counter to delete correctly the batch
size_t size; // number of meaningful items within the batch
bool isPunctuation; // flag true if the message is a punctuation, false otherwise
Join_Stream_t stream_tag; // flag to discriminate stream flow between Stream A & B (meaningful to join operators)
Join_Stream_t stream_tag; // flag to discriminate the stream between A and B (meaningful to join-based operators)

// Constructor
Batch_CPU_t(size_t _reserved_size, size_t _delete_counter=1):
Batch_CPU_t(size_t _reserved_size,
size_t _delete_counter=1):
delete_counter(_delete_counter),
size(0),
isPunctuation(false)
isPunctuation(false),
stream_tag(Join_Stream_t::NONE)
{
batch_data.reserve(_reserved_size);
watermarks.push_back(std::numeric_limits<uint64_t>::max());
Expand All @@ -87,7 +89,8 @@ struct Batch_CPU_t: Batch_t<tuple_t>
watermarks(_other.watermarks),
delete_counter(1),
size(_other.size),
isPunctuation(_other.isPunctuation) {}
isPunctuation(_other.isPunctuation),
stream_tag(_other.stream_tag) {}

// Destructor
~Batch_CPU_t() override = default;
Expand Down Expand Up @@ -153,13 +156,13 @@ struct Batch_CPU_t: Batch_t<tuple_t>
}

// Get the stream tag of the batch
Join_Stream_t getStreamTag() const
Join_Stream_t getStreamTag() const override
{
return stream_tag;
}

// Set the stream tag of the batch
void setStreamTag(Join_Stream_t _tag)
void setStreamTag(Join_Stream_t _tag) override
{
stream_tag = _tag;
}
Expand Down
9 changes: 5 additions & 4 deletions wf/batch_gpu_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct Batch_GPU_t: Batch_t<tuple_t>
int *map_idxs_gpu; // GPU array to find tuples with the same key within the batch
cudaStream_t cudaStream; // CUDA stream associated with the batch
std::atomic<int> *inTransit_counter; // pointer to the counter of in-transit batches
Join_Stream_t stream_tag; // flag to discriminate stream flow between Stream A & B (meaningful to join operators)
Join_Stream_t stream_tag; // flag to discriminate the stream between A and B (meaningful to join-based operators)

// Constructor
Batch_GPU_t(size_t _size,
Expand All @@ -76,7 +76,8 @@ struct Batch_GPU_t: Batch_t<tuple_t>
pinned_data_cpu(nullptr),
num_dist_keys(0),
dist_keys_cpu(nullptr),
inTransit_counter(_inTransit_counter)
inTransit_counter(_inTransit_counter),
stream_tag(Join_Stream_t::NONE)
{
watermarks.push_back(std::numeric_limits<uint64_t>::max());
if (cudaMalloc(&data_gpu, sizeof(batch_item_gpu_t<tuple_t>) * size) == cudaErrorMemoryAllocation) {
Expand Down Expand Up @@ -209,13 +210,13 @@ struct Batch_GPU_t: Batch_t<tuple_t>
}

// Get the stream tag of the batch
Join_Stream_t getStreamTag() const
Join_Stream_t getStreamTag() const override
{
return stream_tag;
}

// Set the stream tag of the batch
void setStreamTag(Join_Stream_t _tag)
void setStreamTag(Join_Stream_t _tag) override
{
stream_tag = _tag;
}
Expand Down
16 changes: 15 additions & 1 deletion wf/batch_gpu_t_u.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct Batch_GPU_t: Batch_t<tuple_t>
int *map_idxs_gpu; // array to find tuples with the same key within the batch (in CUDA unified memory or Pinned memory)
cudaStream_t cudaStream; // CUDA stream associated with the batch
std::atomic<int> *inTransit_counter; // pointer to the counter of in-transit batches
Join_Stream_t stream_tag; // flag to discriminate the stream between A and B (meaningful to join-based operators)
cudaDeviceProp deviceProp; // object containing the properties of the used GPU device
int isTegra; // flag equal to 1 if the GPU is integrated (Tegra), 0 otherwise
int gpu_id; // identifier of the currently used GPU
Expand All @@ -78,7 +79,8 @@ struct Batch_GPU_t: Batch_t<tuple_t>
delete_counter(_delete_counter),
num_dist_keys(0),
dist_keys_cpu(nullptr),
inTransit_counter(_inTransit_counter)
inTransit_counter(_inTransit_counter),
stream_tag(Join_Stream_t::NONE)
{
watermarks.push_back(std::numeric_limits<uint64_t>::max());
gpuErrChk(cudaGetDevice(&gpu_id));
Expand Down Expand Up @@ -293,6 +295,18 @@ struct Batch_GPU_t: Batch_t<tuple_t>
}
}

// Get the stream tag of the batch
Join_Stream_t getStreamTag() const override
{
return stream_tag;
}

// Set the stream tag of the batch
void setStreamTag(Join_Stream_t _tag) override
{
stream_tag = _tag;
}

// Reset the batch content
void reset(size_t _delete_counter=1)
{
Expand Down
Loading

0 comments on commit f23cb93

Please sign in to comment.