Skip to content

Commit d00aed3

Browse files
committed
Commit version 2.2.0
Commit version 2.2.0
1 parent 6012c96 commit d00aed3

File tree

12 files changed

+619
-195
lines changed

12 files changed

+619
-195
lines changed

.DS_Store

0 Bytes
Binary file not shown.

docs/windflow-doxygen.conf

Lines changed: 159 additions & 80 deletions
Large diffs are not rendered by default.

includes/builders.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2278,7 +2278,7 @@ class Sink_Builder
22782278
*
22792279
* \return the object itself
22802280
*/
2281-
Sink_Builder<F_t>& set_KeyBy()
2281+
Sink_Builder<F_t>& enable_KeyBy()
22822282
{
22832283
isKeyed = true;
22842284
return *this;

includes/doxygen-mainpage.hpp

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@
22
*
33
* \section intro_sec Introduction
44
*
5-
* WindFlow is a C++17 library for parallel data stream processing applications
6-
* targeting heterogeneous shared-memory architectures featuring multi-core CPUs
7-
* and GPU devices. The library provides common stream processing operators like
8-
* map, flatmap, filter, fold/reduce as well as sliding-window operators designed with
9-
* complex parallel features. Such operators are called operators in the library, where
10-
* each operator is an instance of a class that can be built and connected with other
11-
* operator instances to create data-flow graphs. Applications are built through the
12-
* MultiPipe programming construct used to create parallel pipelines that can be run
13-
* on the system.
5+
* WindFlow is a C++17 library for parallel data stream processing applications targeting heterogeneous shared-memory architectures
6+
* featuring multi-core CPUs and GPU devices. The library provides common stream processing operators like map, flatmap, filter,
7+
* fold/reduce as well as sliding-window operators designed with complex parallel features. Applications are built through the
8+
* MultiPipe and the PipeGraph programming constructs. The first is used to create parallel pipelines, while the second one allows
9+
* several MultiPipe instances to be interconnected through merge and split operations.
1410
*/

includes/pipegraph.hpp

Lines changed: 149 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,17 @@ class PipeGraph
117117
// method to find the list of AppNode instances that are leaves of the tree rooted at _node
118118
std::vector<AppNode *> get_LeavesList(AppNode *_node);
119119

120+
// method to find the LCA of a set of _leaves starting from _node
121+
AppNode *get_LCA(AppNode *_node, std::vector<AppNode *> _leaves);
122+
120123
// method to delete all the AppNode instances in the tree rooted at _node
121124
void delete_AppNodes(AppNode *_node);
122125

123-
// method to prepare the right list of AppNode instances to be merged
124-
bool get_MergedNodes(std::vector<MultiPipe *> _toBeMerged, std::vector<AppNode *> &_rightList);
126+
// method to prepare the right list of AppNode instances to be merged (case merge-ind and merge-full)
127+
bool get_MergedNodes1(std::vector<MultiPipe *> _toBeMerged, std::vector<AppNode *> &_rightList);
128+
129+
// method to prepare the right list of AppNode instances to be merged (case merge-partial)
130+
AppNode *get_MergedNodes2(std::vector<MultiPipe *> _toBeMerged, std::vector<AppNode *> &_rightList);
125131

126132
// method to execute the split of the MultiPipe _mp
127133
std::vector<MultiPipe *> execute_Split(MultiPipe *_mp);
@@ -515,6 +521,24 @@ inline std::vector<AppNode *> PipeGraph::get_LeavesList(AppNode *_node)
515521
}
516522
}
517523

524+
// implementation of the method to find the LCA of a set of _leaves rooted at _node
525+
inline AppNode *PipeGraph::get_LCA(AppNode *_node, std::vector<AppNode *> _leaves)
526+
{
527+
// compute the leaves rooted at each child of _node
528+
for (auto *child: _node->children) {
529+
auto child_leaves = get_LeavesList(child);
530+
bool foundAll = true;
531+
for (auto *leaf: _leaves) {
532+
if (std::find(child_leaves.begin(), child_leaves.end(), leaf) == child_leaves.end()) { // if not present
533+
foundAll = false;
534+
}
535+
}
536+
if (foundAll)
537+
return get_LCA(child, _leaves);
538+
}
539+
return _node;
540+
}
541+
518542
// implementation of the method to delete all the AppNode instances in the tree rooted at _node
519543
inline void PipeGraph::delete_AppNodes(AppNode *_node)
520544
{
@@ -529,8 +553,8 @@ inline void PipeGraph::delete_AppNodes(AppNode *_node)
529553
}
530554
}
531555

532-
// implementation of the method to prepare the right list of AppNode instances to be merged
533-
inline bool PipeGraph::get_MergedNodes(std::vector<MultiPipe *> _toBeMerged, std::vector<AppNode *> &_rightList) {
556+
// implementation of the method to prepare the right list of AppNode instances to be merged (case merge-ind and merge-full)
557+
inline bool PipeGraph::get_MergedNodes1(std::vector<MultiPipe *> _toBeMerged, std::vector<AppNode *> &_rightList) {
534558
// all the input MultiPipe instances must be leaves of the Application Tree
535559
std::vector<AppNode *> inputNodes;
536560
assert(_toBeMerged.size() > 1); // redundant check
@@ -578,14 +602,54 @@ inline bool PipeGraph::get_MergedNodes(std::vector<MultiPipe *> _toBeMerged, std
578602
}
579603
// add the found lca to the _rightList
580604
_rightList.push_back(lca);
581-
// important check below
605+
// important check
582606
if (lca->parent != root && ((inputNodes.size() > 0) || (_rightList.size() > 1))) {
583607
return false;
584608
}
585609
}
586610
return true;
587611
}
588612

613+
// implementation of the method to prepare the right list of AppNode instances to be merged (case merge-partial)
614+
inline AppNode *PipeGraph::get_MergedNodes2(std::vector<MultiPipe *> _toBeMerged, std::vector<AppNode *> &_rightList)
615+
{
616+
// all the input MultiPipe instances must be leaves of the Application Tree
617+
std::vector<AppNode *> inputNodes;
618+
assert(_toBeMerged.size() > 1); // redundant check
619+
for (auto *mp: _toBeMerged) {
620+
AppNode *node = find_AppNode(root, mp);
621+
if (node == nullptr) {
622+
std::cerr << RED << "WindFlow Error: MultiPipe to be merged does not belong to this PipeGraph" << DEFAULT << std::endl;
623+
exit(EXIT_FAILURE);
624+
}
625+
assert((node->children).size() == 0); // redundant check
626+
inputNodes.push_back(node);
627+
}
628+
// we have to find the LCA
629+
AppNode *parent_node = get_LCA(root, inputNodes);
630+
if (parent_node != root) {
631+
for (auto *child: parent_node->children) {
632+
auto child_leaves = get_LeavesList(child);
633+
bool foundAll = true;
634+
size_t count = 0;
635+
for (auto *leaf: child_leaves) {
636+
if (std::find(inputNodes.begin(), inputNodes.end(), leaf) == inputNodes.end())
637+
foundAll = false;
638+
else
639+
count++;
640+
}
641+
if (foundAll)
642+
_rightList.push_back(child);
643+
else if (!foundAll && count > 0)
644+
return nullptr;
645+
}
646+
assert(_rightList.size() > 1);
647+
return parent_node;
648+
}
649+
else
650+
return nullptr;
651+
}
652+
589653
// implementation of the method to execute the split of the MultiPipe _mp
590654
inline std::vector<MultiPipe *> PipeGraph::execute_Split(MultiPipe *_mp)
591655
{
@@ -630,8 +694,8 @@ inline MultiPipe *PipeGraph::execute_Merge(std::vector<MultiPipe *> _toBeMerged)
630694
{
631695
// get the right list of AppNode instances to be merged
632696
std::vector<AppNode *> rightList;
633-
if (get_MergedNodes(_toBeMerged, rightList)) {
634-
if (rightList.size() == 1) { // Case 1: self-merge of a splitted MultiPipe
697+
if (get_MergedNodes1(_toBeMerged, rightList)) {
698+
if (rightList.size() == 1) { // Case 2.1: merge-full -> we merge a whole sub-tree
635699
assert(rightList[0] != root); // redundant check
636700
MultiPipe *mp = rightList[0]->mp;
637701
// create the new MultiPipe, the result of the self-merge
@@ -674,11 +738,11 @@ inline MultiPipe *PipeGraph::execute_Merge(std::vector<MultiPipe *> _toBeMerged)
674738
delete_AppNodes(rightList[0]);
675739
return mergedMP;
676740
}
677-
else { // Case 2: merge of more than one MultiPipe instance
741+
else { // Case 1: merge-ind -> merge independent trees
678742
std::vector<MultiPipe *> rightMergedMPs;
679743
// check that merged MultiPipe instances are sons of the root
680-
for (auto *an: rightList) {
681-
if (std::find((root->children).begin(), (root->children).end(), an) == (root->children).end()) { // redundant check
744+
for (auto *an: rightList) { // redundant check
745+
if (std::find((root->children).begin(), (root->children).end(), an) == (root->children).end()) {
682746
std::cerr << RED << "WindFlow Error: the requested merge operation is not supported" << DEFAULT << std::endl;
683747
exit(EXIT_FAILURE);
684748
}
@@ -696,6 +760,7 @@ inline MultiPipe *PipeGraph::execute_Merge(std::vector<MultiPipe *> _toBeMerged)
696760
toBeDeteled.push_back(mergedMP);
697761
// adjust the Application Tree
698762
std::vector<AppNode *> children_new;
763+
// maintaining the previous ordering of the children is not important in this case
699764
for (auto *brother: root->children) {
700765
if (std::find(rightList.begin(), rightList.end(), brother) == rightList.end())
701766
children_new.push_back(brother);
@@ -709,90 +774,89 @@ inline MultiPipe *PipeGraph::execute_Merge(std::vector<MultiPipe *> _toBeMerged)
709774
}
710775
}
711776
else {
712-
// merge is still possible if all the MultiPipe instances are brothers
713777
rightList.clear();
714-
for (auto *mp: _toBeMerged) {
715-
rightList.push_back(find_AppNode(root, mp));
716-
}
717-
std::vector<int> indexes;
718-
AppNode *parent_node = rightList[0]->parent;
719-
for (auto *node: rightList) {
720-
if (node->parent != parent_node) {
721-
std::cerr << RED << "WindFlow Error: the requested merge operation is not supported" << DEFAULT << std::endl;
722-
exit(EXIT_FAILURE);
778+
AppNode *parent_node = get_MergedNodes2(_toBeMerged, rightList);
779+
if (parent_node != nullptr) { // Case 1.2: merge-partial -> merge two or more sub-trees at the same level
780+
std::vector<int> indexes;
781+
for (auto *node: rightList) {
782+
assert(node->parent == parent_node);
783+
// get the index of this child
784+
indexes.push_back(std::distance((parent_node->children).begin(), std::find((parent_node->children).begin(), (parent_node->children).end(), node)));
723785
}
724-
// get the index of this child
725-
indexes.push_back(std::distance((parent_node->children).begin(), std::find((parent_node->children).begin(), (parent_node->children).end(), node)));
726-
}
727-
// check whether the leaves are adjacent
728-
std::sort(indexes.begin(), indexes.end());
729-
for (size_t i=0; i<indexes.size()-1; i++) {
730-
if (indexes[i] + 1 != indexes[i+1]) {
731-
std::cerr << RED << "WindFlow Error: sibling MultiPipes to be merged must be contiguous branches of the same MultiPipe" << DEFAULT << std::endl;
732-
exit(EXIT_FAILURE);
786+
// check whether the nodes in the rightList are adjacent
787+
std::sort(indexes.begin(), indexes.end());
788+
for (size_t i=0; i<indexes.size()-1; i++) {
789+
if (indexes[i]+1 != indexes[i+1]) {
790+
std::cerr << RED << "WindFlow Error: sibling MultiPipes to be merged must be contiguous branches of the same MultiPipe" << DEFAULT << std::endl;
791+
exit(EXIT_FAILURE);
792+
}
733793
}
734-
}
735-
// everything must be put in the right order
736-
rightList.clear();
737-
_toBeMerged.clear();
738-
for (int idx: indexes) {
739-
rightList.push_back((parent_node->children)[idx]);
740-
_toBeMerged.push_back((parent_node->children)[idx]->mp);
741-
}
742-
// if we reach this point the merge is possible -> we create the new MultiPipe
743-
std::vector<ff::ff_node *> normalization;
744-
for (auto *mp: _toBeMerged) {
745-
auto v = mp->normalize();
746-
mp->isMerged = true;
747-
normalization.insert(normalization.end(), v.begin(), v.end());
748-
}
749-
MultiPipe *mergedMP = new MultiPipe(this, normalization);
750-
mergedMP->outputType = _toBeMerged[0]->outputType;
751-
toBeDeteled.push_back(mergedMP);
752-
// adjust the parent MultiPipe
753-
MultiPipe *parentMP = parent_node->mp;
754-
assert(parentMP->isSplit); // redundant check
755-
size_t new_branches = (parentMP->splittingChildren).size() - indexes.size() + 1;
756-
std::vector<MultiPipe *> new_splittingChildren;
757-
std::vector<ff::ff_node *> new_second_set;
758-
auto second_set = (parentMP->last)->getSecondSet();
759-
// the code below is to respect the original indexes
760-
for (size_t i=0; i<(parentMP->splittingChildren).size(); i++) {
761-
if (i < indexes[0]) {
762-
new_splittingChildren.push_back((parentMP->splittingChildren)[i]);
763-
new_second_set.push_back(second_set[i]);
794+
// everything must be put in the right order
795+
rightList.clear();
796+
std::vector<MultiPipe *> rightMergedMPs;
797+
for (int idx: indexes) {
798+
rightList.push_back((parent_node->children)[idx]);
799+
rightMergedMPs.push_back((parent_node->children)[idx]->mp);
764800
}
765-
else if (i == indexes[0]) {
766-
new_splittingChildren.push_back(mergedMP);
767-
new_second_set.push_back(mergedMP);
801+
// if we reach this point the merge is possible -> we create the new MultiPipe
802+
std::vector<ff::ff_node *> normalization;
803+
for (auto *mp: rightMergedMPs) {
804+
auto v = mp->normalize();
805+
mp->isMerged = true;
806+
normalization.insert(normalization.end(), v.begin(), v.end());
768807
}
769-
else if (i > indexes[indexes.size()-1]) {
770-
new_splittingChildren.push_back((parentMP->splittingChildren)[i]);
771-
new_second_set.push_back(second_set[i]);
808+
MultiPipe *mergedMP = new MultiPipe(this, normalization);
809+
mergedMP->outputType = _toBeMerged[0]->outputType;
810+
toBeDeteled.push_back(mergedMP);
811+
// adjust the parent MultiPipe
812+
MultiPipe *parentMP = parent_node->mp;
813+
assert(parentMP->isSplit); // redundant check
814+
size_t new_branches = (parentMP->splittingChildren).size() - indexes.size() + 1;
815+
std::vector<MultiPipe *> new_splittingChildren;
816+
std::vector<ff::ff_node *> new_second_set;
817+
auto second_set = (parentMP->last)->getSecondSet();
818+
// the code below is to respect the original indexes
819+
for (size_t i=0; i<(parentMP->splittingChildren).size(); i++) {
820+
if (i < indexes[0]) {
821+
new_splittingChildren.push_back((parentMP->splittingChildren)[i]);
822+
new_second_set.push_back(second_set[i]);
823+
}
824+
else if (i == indexes[0]) {
825+
new_splittingChildren.push_back(mergedMP);
826+
new_second_set.push_back(mergedMP);
827+
}
828+
else if (i > indexes[indexes.size()-1]) {
829+
new_splittingChildren.push_back((parentMP->splittingChildren)[i]);
830+
new_second_set.push_back(second_set[i]);
831+
}
772832
}
773-
}
774-
assert(new_splittingChildren.size () == new_branches); // redundant check
775-
parentMP->splittingBranches = new_branches;
776-
parentMP->splittingChildren = new_splittingChildren;
777-
(parentMP->last)->change_secondset(new_second_set, false);
778-
mergedMP->fromSplitting = true;
779-
mergedMP->splittingParent = parentMP;
780-
// adjust the Application Tree
781-
std::vector<AppNode *> children_new;
782-
bool done = false;
783-
for (auto *brother: parent_node->children) {
784-
if (std::find(rightList.begin(), rightList.end(), brother) == rightList.end())
785-
children_new.push_back(brother);
786-
else if (!done) {
787-
children_new.push_back(new AppNode(mergedMP, parent_node));
788-
done = true;
833+
assert(new_splittingChildren.size () == new_branches); // redundant check
834+
parentMP->splittingBranches = new_branches;
835+
parentMP->splittingChildren = new_splittingChildren;
836+
(parentMP->last)->change_secondset(new_second_set, false);
837+
mergedMP->fromSplitting = true;
838+
mergedMP->splittingParent = parentMP;
839+
// adjust the Application Tree
840+
std::vector<AppNode *> children_new;
841+
bool done = false;
842+
for (auto *brother: parent_node->children) {
843+
if (std::find(rightList.begin(), rightList.end(), brother) == rightList.end())
844+
children_new.push_back(brother);
845+
else if (!done) {
846+
children_new.push_back(new AppNode(mergedMP, parent_node));
847+
done = true;
848+
}
849+
}
850+
parent_node->children = children_new;
851+
for (auto *an: rightList) {
852+
delete_AppNodes(an);
789853
}
854+
return mergedMP;
790855
}
791-
parent_node->children = children_new;
792-
for (auto *an: rightList) {
793-
delete_AppNodes(an);
856+
else {
857+
std::cerr << RED << "WindFlow Error: the requested merge operation is not supported" << DEFAULT << std::endl;
858+
exit(EXIT_FAILURE);
794859
}
795-
return mergedMP;
796860
}
797861
}
798862

includes/source.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ class Source: public ff::ff_a2a
296296
std::cerr << RED << "WindFlow Error: Source has parallelism zero" << DEFAULT << std::endl;
297297
exit(EXIT_FAILURE);
298298
}
299+
std::cerr << YELLOW << "WindFlow Warning: the use of the single-loop function in the Source is deprecated" << DEFAULT << std::endl;
299300
// vector of Source_Node
300301
std::vector<ff_node *> first_set;
301302
for (size_t i=0; i<_pardegree; i++) {
@@ -328,6 +329,7 @@ class Source: public ff::ff_a2a
328329
std::cerr << RED << "WindFlow Error: Source has parallelism zero" << DEFAULT << std::endl;
329330
exit(EXIT_FAILURE);
330331
}
332+
std::cerr << YELLOW << "WindFlow Warning: the use of the single-loop function in the Source is deprecated" << DEFAULT << std::endl;
331333
// vector of Source_Node
332334
std::vector<ff_node *> first_set;
333335
for (size_t i=0; i<_pardegree; i++) {

includes/standard_nodes.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class Standard_Emitter: public Basic_Emitter
9191
auto key = std::get<0>(t->getControlFields()); // key
9292
size_t hashcode = std::hash<decltype(key)>()(key); // compute the hashcode of the key
9393
// evaluate the routing function
94-
dest_w = routing_func(hashcode, this->get_num_outchannels());
94+
dest_w = routing_func(hashcode, n_dest);
9595
// send the tuple
9696
if (!isCombined)
9797
this->ff_send_out_to(t, dest_w);

src/.DS_Store

0 Bytes
Binary file not shown.

src/graph_test/test_graph_6.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* +---------------------+ | +---------------------+ | | +-----+ +-----+ | | +-----------+
2828
* | +-----+ +-----+ | | | +-----+ +-----+ | | +---------------------+ | | +-----+ |
2929
* | | S | | M | | | | | F | | M | | | | | | S | |
30-
* | | (*) +-->+ (*) | +-------->+ | (*) +-->+ (*) |+--+ +-----> | (1) | |
30+
* | | (*) +-->+ (*) | +------>+ | (*) +-->+ (*) | +---+ +-----> | (1) | |
3131
* | +-----+ +-----+ | | | +-----+ +-----+ | | | +-----+ |
3232
* +---------------------+ | +---------------------+ | +-----------+
3333
* | |

0 commit comments

Comments
 (0)