Skip to content

Commit 8d403cf

Browse files
authored
Merge pull request #351 from singnet/senna-320-2
[#320] Implementation of DAS service bus
2 parents 9ab950b + 438802a commit 8d403cf

File tree

13 files changed

+531
-68
lines changed

13 files changed

+531
-68
lines changed

src/distributed_algorithm_node/BusNode.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ void BusNode::send_bus_command(const string& command, const vector<string>& args
8888
void BusNode::take_ownership(const set<string>& commands) {
8989
for (auto command : commands) {
9090
this->bus.set_ownership(command, node_id());
91+
this->my_commands.insert(command);
9192
}
9293
broadcast_my_commands();
9394
}

src/distributed_algorithm_node/BusNode.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
#ifndef _DISTRIBUTED_ALGORITHM_NODE_STARNODE_H
2-
#define _DISTRIBUTED_ALGORITHM_NODE_STARNODE_H
1+
#pragma once
32

43
#include <string>
54

@@ -203,5 +202,3 @@ class SetCommandOwnership : public Message {
203202
};
204203

205204
} // namespace distributed_algorithm_node
206-
207-
#endif // _DISTRIBUTED_ALGORITHM_NODE_STARNODE_H

src/distributed_algorithm_node/StarNode.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class StarNode : public DistributedAlgorithmNode {
5555
*
5656
* @param node_id ID of the newly inserted node.
5757
*/
58-
void node_joined_network(const string& node_id);
58+
virtual void node_joined_network(const string& node_id);
5959

6060
/**
6161
* Method called when a leadershipo election is requested.

src/service_bus/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ cc_library(
88
deps = [
99
":service_bus_singleton",
1010
":service_bus",
11+
":bus_command_proxy",
1112
":bus_command_processor",
13+
"//distributed_algorithm_node:distributed_algorithm_node_lib",
1214
"//commons:commons_lib",
1315
],
1416
)
@@ -34,11 +36,22 @@ cc_library(
3436
],
3537
)
3638

39+
cc_library(
40+
name = "bus_command_proxy",
41+
srcs = ["BusCommandProxy.cc"],
42+
hdrs = ["BusCommandProxy.h"],
43+
deps = [
44+
"//distributed_algorithm_node:bus_node",
45+
"//commons:commons_lib",
46+
],
47+
)
48+
3749
cc_library(
3850
name = "bus_command_processor",
3951
srcs = ["BusCommandProcessor.cc"],
4052
hdrs = ["BusCommandProcessor.h"],
4153
deps = [
54+
":bus_command_proxy",
4255
"//distributed_algorithm_node:bus_node",
4356
"//commons:commons_lib",
4457
],

src/service_bus/BusCommandProcessor.h

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
1-
#ifndef _SERVICE_BUS_BUSCOMMANDPROCESSOR_H
2-
#define _SERVICE_BUS_BUSCOMMANDPROCESSOR_H
1+
#pragma once
32

43
#include <set>
54
#include <string>
65
#include <vector>
76

7+
#include "BusCommandProxy.h"
8+
89
using namespace std;
910

1011
namespace service_bus {
1112

1213
/**
1314
* Bus element responsible for processing of one or more bus commands.
1415
*
15-
* Subclasses must provide concrete implementation of run_command(), which
16+
* Concrete subclasses must provide concrete implementation of run_command(), which
1617
* is the method called when one of the taken commands are issued in the bus.
18+
*
19+
* Whenever a new command processor is implemented, it's required to implement a
20+
* concrete subclass of BusCommandProxy as well (or reuse some other concrete
21+
* subclass). Anyway, the processor is required to define which subclass to use as
22+
* it must implement the virtual method factory_empty_proxy() which is supposed to
23+
* return an empty object of the proper BusCommandProxy.
1724
*/
1825
class BusCommandProcessor {
1926
friend class ServiceBus;
@@ -25,12 +32,26 @@ class BusCommandProcessor {
2532
* @param commands Set of commands owned by the BusCommandProcessor
2633
*/
2734
BusCommandProcessor(const set<string>& commands);
35+
36+
/**
37+
* Desctructor.
38+
*/
2839
virtual ~BusCommandProcessor() {}
2940

3041
// ---------------------------------------------------------------------------------------------
31-
// Virtual API which need to be iomplemented in concrete subclasses.
42+
// Virtual API which need to be implemented in concrete subclasses.
43+
44+
/**
45+
* Returns an empty instance of the BusCommandProxy required to issue the command.
46+
*
47+
* @return An empty instance of the BusCommandProxy required to issue the command.
48+
*/
49+
virtual shared_ptr<BusCommandProxy> factory_empty_proxy() = 0;
3250

33-
virtual void run_command(const string& command, const vector<string>& args) = 0;
51+
/**
52+
* Method which is called when a command owned by the processor is issued in the bus.
53+
*/
54+
virtual void run_command(shared_ptr<BusCommandProxy> proxy) = 0;
3455

3556
private:
3657
bool check_command(const string& command);
@@ -39,5 +60,3 @@ class BusCommandProcessor {
3960
};
4061

4162
} // namespace service_bus
42-
43-
#endif // _SERVICE_BUS_BUSCOMMANDPROCESSOR_H

src/service_bus/BusCommandProxy.cc

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#include "BusCommandProxy.h"
2+
3+
#include "Utils.h"
4+
5+
using namespace service_bus;
6+
7+
string ProxyNode::PROXY_COMMAND = "bus_command_proxy";
8+
9+
// -------------------------------------------------------------------------------------------------
10+
// Constructors and destructors
11+
12+
BusCommandProxy::BusCommandProxy() {
13+
this->proxy_port = 0;
14+
this->port_pool = NULL;
15+
}
16+
17+
BusCommandProxy::BusCommandProxy(const string& command, const vector<string>& args)
18+
: command(command), args(args) {}
19+
20+
BusCommandProxy::~BusCommandProxy() {
21+
// port_pool is initialized by ServiceBus. It's an static member of a singleton so there's no
22+
// need to manage its deletion
23+
if ((this->port_pool != NULL) && (this->proxy_port != 0)) {
24+
// Return the port to the pool of available ports
25+
this->port_pool->enqueue((void*) this->proxy_port);
26+
}
27+
}
28+
29+
ProxyNode::ProxyNode(BusCommandProxy* proxy, const string& node_id) : StarNode(node_id) {
30+
// SERVER running in BUS command requestor
31+
this->proxy = proxy;
32+
}
33+
34+
ProxyNode::ProxyNode(BusCommandProxy* proxy, const string& node_id, const string& server_id)
35+
: StarNode(node_id, server_id) {
36+
// CLIENT running in BUS command processor
37+
this->proxy = proxy;
38+
}
39+
40+
ProxyNode::~ProxyNode() {}
41+
42+
// -------------------------------------------------------------------------------------------------
43+
// Proxy API
44+
45+
void BusCommandProxy::setup_proxy_node(const string& client_id, const string& server_id) {
46+
if ((this->port_pool == NULL) || (this->proxy_port == 0)) {
47+
Utils::error("Proxy node can't be set up");
48+
} else {
49+
if (client_id == "") {
50+
// This proxy is running in the requestor
51+
string id = this->requestor_id;
52+
string requestor_host = id.substr(0, id.find(":"));
53+
string requestor_id = requestor_host + ":" + to_string(this->proxy_port);
54+
this->proxy_node = new ProxyNode(this, requestor_id);
55+
} else {
56+
// This proxy is running in the processor
57+
this->proxy_node = new ProxyNode(this, client_id, server_id);
58+
this->proxy_node->peer_id = server_id;
59+
}
60+
}
61+
}
62+
63+
void BusCommandProxy::to_remote_peer(const string& command, const vector<string>& args) {
64+
this->proxy_node->to_remote_peer(command, args);
65+
}
66+
67+
const string& BusCommandProxy::get_command() { return this->command; }
68+
69+
const vector<string>& BusCommandProxy::get_args() { return this->args; }
70+
71+
// -------------------------------------------------------------------------------------------------
72+
// ProxyNode API
73+
74+
void ProxyNode::remote_call(const string& command, const vector<string>& args) {
75+
this->proxy->from_remote_peer(command, args);
76+
}
77+
78+
void ProxyNode::to_remote_peer(const string& command, const vector<string>& args) {
79+
if (this->peer_id == "") {
80+
Utils::error("Unknown peer");
81+
}
82+
vector<string> new_args(args);
83+
new_args.push_back(command);
84+
send(PROXY_COMMAND, new_args, this->peer_id);
85+
}
86+
87+
void ProxyNode::node_joined_network(const string& node_id) {
88+
StarNode::node_joined_network(node_id);
89+
this->peer_id = node_id;
90+
}
91+
92+
// -------------------------------------------------------------------------------------------------
93+
// Messages
94+
95+
shared_ptr<Message> ProxyNode::message_factory(string& command, vector<string>& args) {
96+
std::shared_ptr<Message> message = DistributedAlgorithmNode::message_factory(command, args);
97+
if (message) {
98+
return message;
99+
}
100+
if (command == ProxyNode::PROXY_COMMAND) {
101+
vector<string> new_args(args);
102+
new_args.pop_back();
103+
return std::shared_ptr<Message>(new ProxyMessage(args.back(), new_args));
104+
}
105+
return std::shared_ptr<Message>{};
106+
}
107+
108+
ProxyMessage::ProxyMessage(const string& command, const vector<string>& args) {
109+
this->command = command;
110+
this->args = args;
111+
}
112+
113+
void ProxyMessage::act(shared_ptr<MessageFactory> node) {
114+
auto proxy_node = dynamic_pointer_cast<ProxyNode>(node);
115+
proxy_node->remote_call(this->command, this->args);
116+
}

0 commit comments

Comments
 (0)