-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmxevt.h
136 lines (118 loc) · 4.41 KB
/
mxevt.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#pragma once
#include <cstdint>
#include <string>
#include <thread>
#include <map>
#include <set>
#include "network/socket.h"
#include "network/rpc.h"
#include "mxsystem.h"
namespace mulex
{
static constexpr std::uint16_t EVT_PORT = 5702;
static constexpr std::uint16_t EVT_RECV_TIMEOUT = 10000; // 10 sec
enum class EvtResult
{
OK,
FAILED,
TIMEOUT
};
struct EvtHeader
{
std::uint64_t client;
std::uint16_t eventid;
std::uint64_t msgid;
std::uint32_t payloadsize;
// std::uint8_t padding[10];
};
constexpr std::uint64_t EVT_HEADER_SIZE = sizeof(EvtHeader);
constexpr std::uint64_t EVT_MAX_SUB = 64;
std::uint64_t GetNextEventMessageId();
class EvtClientThread
{
public:
using EvtCallbackFunc = std::function<void(const std::uint8_t* data, std::uint64_t len, const std::uint8_t* userdata)>;
EvtClientThread(
const std::string& hostname,
const Experiment* exp = nullptr,
std::uint16_t evtport = EVT_PORT,
bool ghost = false,
std::uint64_t customid = 0x0
);
~EvtClientThread();
void emit(const std::string& event, const std::uint8_t* data, std::uint64_t len);
void regist(const std::string& event);
void subscribe(const std::string& event, EvtCallbackFunc callback);
void unsubscribe(const std::string& event);
void unsubscribeAll();
std::uint16_t findEvent(const std::string& event);
private:
void clientListenThread(const Socket& socket);
void clientEmitThread(const Socket& socket);
private:
const Experiment* _exp;
Socket _evt_socket;
std::unique_ptr<std::thread> _evt_listen_thread;
std::unique_ptr<std::thread> _evt_emit_thread;
SysByteStream* _evt_stream;
SysBufferStack _evt_emit_stack;
std::atomic<bool> _evt_thread_running = false;
std::atomic<bool> _evt_thread_ready = false;
std::map<std::string, std::uint16_t> _evt_registry;
std::set<std::string> _evt_subscriptions;
std::map<std::uint16_t, EvtCallbackFunc> _evt_callbacks;
std::map<std::uint16_t, std::uint8_t*> _evt_userdata;
bool _evt_has_custom_id = false;
std::uint64_t _evt_custom_id;
};
class EvtServerThread
{
public:
EvtServerThread();
~EvtServerThread();
bool ready() const;
bool emit(const std::string& event, const std::uint8_t* data, std::uint64_t len);
void relay(const std::uint64_t clientid, const std::uint8_t* data, std::uint64_t len);
void unsub(const std::uint64_t cid);
private:
void serverConnAcceptThread();
void serverListenThread(const Socket& socket);
void serverEmitThread(const Socket& socket);
void clientStatisticsThread();
private:
Socket _server_socket;
std::map<Socket, std::unique_ptr<std::thread>> _evt_listen_thread;
std::map<Socket, std::unique_ptr<std::thread>> _evt_emit_thread;
std::map<Socket, SysByteStream*> _evt_stream;
std::map<Socket, std::atomic<bool>> _evt_thread_sig;
std::map<Socket, SysBufferStack> _evt_emit_stack;
// SysRefBufferStack _evt_emit_stack;
std::unique_ptr<std::thread> _evt_accept_thread;
std::unique_ptr<std::thread> _evt_stats_thread;
std::atomic<bool> _evt_thread_running = false;
std::atomic<bool> _evt_thread_ready = false;
std::mutex _connections_mutex;
std::condition_variable _evt_notifier;
};
MX_RPC_METHOD bool EvtRegister(mulex::string32 name);
MX_RPC_METHOD std::uint16_t EvtGetId(mulex::string32 name);
MX_RPC_METHOD bool EvtSubscribe(mulex::string32 name);
MX_RPC_METHOD bool EvtUnsubscribe(mulex::string32 name);
bool EvtUnsubscribe(std::uint64_t clientid, std::uint16_t eventid);
void EvtServerRegisterCallback(mulex::string32 name, std::function<void(const Socket&, std::uint64_t, std::uint16_t, const std::uint8_t*, std::uint64_t)> callback);
void EvtTryRunServerCallback(std::uint64_t clientid, std::uint16_t eventid, const std::uint8_t* data, std::uint64_t len, const Socket& socket);
bool EvtEmit(const std::string& event, const std::uint8_t* data, std::uint64_t len);
void EvtAccumulateClientStatistics(std::uint64_t clientid, std::uint64_t framebytes);
MX_RPC_METHOD mulex::RPCGenericType EvtGetAllRegisteredEvents();
template <typename T>
inline std::uint64_t EvtDataAppend(std::uint64_t offset, std::vector<std::uint8_t>* buffer, const T& value)
{
std::memcpy(buffer->data() + offset, &value, sizeof(T));
return offset + sizeof(T);
}
inline std::uint64_t EvtDataAppend(std::uint64_t offset, std::vector<std::uint8_t>* buffer, const std::uint8_t* value, std::uint64_t len)
{
std::memcpy(buffer->data() + offset, value, len);
return offset + len;
}
} // namespace mulex