-
-
Notifications
You must be signed in to change notification settings - Fork 59
/
Copy pathipc.cc
120 lines (102 loc) · 2.44 KB
/
ipc.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include <cassert>
#include <iostream>
#include "msgq/ipc.h"
#include "msgq/impl_zmq.h"
#include "msgq/impl_msgq.h"
#include "msgq/impl_fake.h"
#ifdef __APPLE__
const bool MUST_USE_ZMQ = true;
#else
const bool MUST_USE_ZMQ = false;
#endif
bool messaging_use_zmq(){
if (std::getenv("ZMQ") || MUST_USE_ZMQ) {
if (std::getenv("OPENPILOT_PREFIX")) {
std::cerr << "OPENPILOT_PREFIX not supported with ZMQ backend\n";
assert(false);
}
return true;
}
return false;
}
bool messaging_use_fake(){
char* fake_enabled = std::getenv("CEREAL_FAKE");
return fake_enabled != NULL;
}
Context * Context::create(){
Context * c;
if (messaging_use_zmq()){
c = new ZMQContext();
} else {
c = new MSGQContext();
}
return c;
}
SubSocket * SubSocket::create(){
SubSocket * s;
if (messaging_use_fake()) {
if (messaging_use_zmq()) {
s = new FakeSubSocket<ZMQSubSocket>();
} else {
s = new FakeSubSocket<MSGQSubSocket>();
}
} else {
if (messaging_use_zmq()){
s = new ZMQSubSocket();
} else {
s = new MSGQSubSocket();
}
}
return s;
}
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address, bool conflate, bool check_endpoint){
SubSocket *s = SubSocket::create();
int r = s->connect(context, endpoint, address, conflate, check_endpoint);
if (r == 0) {
return s;
} else {
std::cerr << "Error, failed to connect SubSocket to " << endpoint << ": " << strerror(errno) << std::endl;
delete s;
return nullptr;
}
}
PubSocket * PubSocket::create(){
PubSocket * s;
if (messaging_use_zmq()){
s = new ZMQPubSocket();
} else {
s = new MSGQPubSocket();
}
return s;
}
PubSocket * PubSocket::create(Context * context, std::string endpoint, bool check_endpoint){
PubSocket *s = PubSocket::create();
int r = s->connect(context, endpoint, check_endpoint);
if (r == 0) {
return s;
} else {
std::cerr << "Error, failed to bind PubSocket to " << endpoint << ": " << strerror(errno) << std::endl;
delete s;
return nullptr;
}
}
Poller * Poller::create(){
Poller * p;
if (messaging_use_fake()) {
p = new FakePoller();
} else {
if (messaging_use_zmq()){
p = new ZMQPoller();
} else {
p = new MSGQPoller();
}
}
return p;
}
Poller * Poller::create(std::vector<SubSocket*> sockets){
Poller * p = Poller::create();
for (auto s : sockets){
p->registerSocket(s);
}
return p;
}