-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffer_manager.hpp
130 lines (107 loc) · 2.64 KB
/
buffer_manager.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#pragma once
#include <mpi.h>
#include <memory>
#include <deque>
#include <mutex>
#include <boost/variant.hpp>
class Request;
using PtrRequest = std::shared_ptr<Request>;
class Request
{
public:
MPI_Request _request;
Request(MPI_Request request)
: _request(request)
{}
void wait(std::vector<PtrRequest> &requests)
{
for (auto request : requests) {
request->wait();
}
}
bool test()
{
int complete = 0;
MPI_Test(&_request, &complete, MPI_STATUS_IGNORE);
return complete;
}
void wait()
{
MPI_Wait(&_request, MPI_STATUS_IGNORE);
}
};
class release_visitor : public boost::static_visitor<>
{
public:
template<typename T>
void operator()(T & operand) const
{
operand.release();
}
};
class is_valid_ptr_visitor : public boost::static_visitor<bool>
{
public:
template<typename T>
bool operator()(T & operand) const
{
return static_cast<bool>(operand);
}
};
class BufferManager
{
public:
/// Deleted copy operator for singleton pattern
BufferManager(BufferManager const &) = delete;
/// Deleted assigment operator for singleton pattern
void operator=(BufferManager const &) = delete;
static BufferManager & instance()
{
static BufferManager instance;
return instance;
}
template<typename T>
void put(std::unique_ptr<Request> request, std::unique_ptr<T> buffer)
{
std::lock_guard<std::mutex> lock(bufferedRequestsMutex);
bufferedRequests.emplace_back(std::move(request), std::move(buffer));
}
void run()
{
INFO << "Starting thread";
thread = std::thread(&BufferManager::check, this);
}
void stop()
{
stopFlag = true;
thread.join();
}
void check()
{
INFO << "Check thread started";
while (not stopFlag) {
for (auto it = bufferedRequests.begin(); it != bufferedRequests.end();) {
if (boost::apply_visitor(is_valid_ptr_visitor(), it->second) and it->first->test()) {
std::lock_guard<std::mutex> lock(bufferedRequestsMutex);
it->first.release();
boost::apply_visitor(release_visitor(), it->second);
DEBUG << "Released a ptr, #requests = " << bufferedRequests.size();
it = bufferedRequests.erase(it);
sleep(100);
}
else {
++it;
}
}
std::this_thread::yield();
}
}
private:
/// Private, empty constructor for singleton pattern
BufferManager() {}
using ptr_types = boost::variant< std::unique_ptr<int>, std::unique_ptr<double> >;
std::list<std::pair<std::unique_ptr<Request>, ptr_types>> bufferedRequests;
std::thread thread;
std::mutex bufferedRequestsMutex;
bool stopFlag = false;
};