Skip to content

Commit

Permalink
Malfunctioning status, troubleshooting repeater teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
Paolo Fittipaldi authored and Paolo Fittipaldi committed Nov 4, 2024
1 parent 8ff611b commit 9a9c77b
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 25 deletions.
7 changes: 7 additions & 0 deletions quisp/messages/connection_setup_messages.msg
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,11 @@ packet RequestQnicReservation extends Header
{
unsigned long RuleSet_id;
int qnic_addr @getter(getQnicAddr) @setter(setQnicAddr);
int partner_address @getter(getPartnerAddress) @setter(setPartnerAddress);
bool prepare_teardown @setter(setPrepareTeardown) @getter(getPrepareTeardown);
}

packet RequestRulesetTermination extends Header
{
unsigned long RuleSet_id;
}
6 changes: 5 additions & 1 deletion quisp/modules/Common/Router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ void Router::handleMessage(cMessage *msg) {
bubble("Qnic Reservation Request received");
send(pk, "cmPort$o");
return;
}
} else if (dest_addr == my_address && dynamic_cast<RequestRulesetTermination *>(msg)) {
bubble("Ruleset Termination Request received");
send(pk, "rePort$o");
return;
}

// RoutingDaemon sends hello packet without desination specified
if (dest_addr == unidentified_destination && dynamic_cast<OspfHelloPacket *>(msg)) {
Expand Down
90 changes: 70 additions & 20 deletions quisp/modules/QRSA/ConnectionManager/ConnectionManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ void ConnectionManager::handleMessage(cMessage *msg) {
delete msg;
} else if (actual_src == my_address) {
// initiator node
req->setConnectionSetupRequestId(createUniqueId());
queueApplicationRequest(req);
} else {
// intermediate node
Expand All @@ -98,11 +99,15 @@ void ConnectionManager::handleMessage(cMessage *msg) {
}

if (auto *resp = dynamic_cast<ConnectionSetupResponse *>(msg)) {
reservation_register.updateReservationId(resp->getConnectionSetupRequestId(), resp->getRuleSet_id());

int initiator_addr = resp->getInitiatorAddr();
int responder_addr = resp->getResponderAddr();

if (initiator_addr == my_address) {
removeAcceptedConnectionSetupRequestFromQueue(resp);
}

reservation_register.updateReservationId(resp->getConnectionSetupRequestId(), resp->getRuleSet_id());

if (initiator_addr == my_address || responder_addr == my_address) {
// this node is not a swapper
storeRuleSetForApplication(resp);
Expand All @@ -111,6 +116,7 @@ void ConnectionManager::handleMessage(cMessage *msg) {
// currently, destinations are separated. (Not accumulated.)
storeRuleSet(resp);
}

delete msg;
return;
}
Expand All @@ -129,15 +135,20 @@ void ConnectionManager::handleMessage(cMessage *msg) {

if (auto *pk = dynamic_cast<InternalTerminatedRulesetIdsNotifier *>(msg)) {
teardownConnections(pk);
delete msg;
return;
}

if (auto *td = dynamic_cast<ConnectionTeardown *>(msg)) {
reservation_register.deleteReservationByRulesetId(td->getRuleSet_id());
handleTeardownMessage(td);
delete msg;
return;
}

if (auto *req = dynamic_cast<RequestQnicReservation *>(msg)) {
reservation_register.registerReservation(req->getQnicAddr(), req->getRuleSet_id());
makeQnicReservationForTomography(req);
delete msg;
return;
}
}

Expand Down Expand Up @@ -266,8 +277,6 @@ void ConnectionManager::respondToRequest(ConnectionSetupRequest *req) {
auto ruleset_id = createUniqueId();
ruleset_gen::RuleSetGenerator ruleset_gen{my_address};
const auto &rulesets = ruleset_gen.generateRuleSets(req, ruleset_id);

connection_teardown_messages[ruleset_id].clear();
// distribute rulesets to each qnode in the path
for (auto [owner_address, rs] : rulesets) {
ConnectionSetupResponse *resp = new ConnectionSetupResponse("ConnectionSetupResponse");
Expand Down Expand Up @@ -423,12 +432,12 @@ void ConnectionManager::queueApplicationRequest(ConnectionSetupRequest *req) {

void ConnectionManager::popApplicationRequest(int qnic_address) {
auto &request_queue = connection_setup_buffer[qnic_address];
auto *req = request_queue.front();
// auto *req = request_queue.front();

connection_retry_count[qnic_address] = 0;
request_queue.pop();
delete req;
reservation_register.deleteReservationByQnicAddr(qnic_address);
// request_queue.pop(); // Old connection, served.
// delete req;
// reservation_register.deleteReservationByQnicAddr(qnic_address);

if (!request_queue.empty()) {
EV << "schedule from pop" << endl;
Expand All @@ -450,7 +459,6 @@ void ConnectionManager::initiateApplicationRequest(int qnic_address) {
}

auto to_send = request_queue.front()->dup();
to_send->setConnectionSetupRequestId(createUniqueId());
reservation_register.registerReservation(qnic_address, to_send->getConnectionSetupRequestId());
send(to_send, "RouterPort$o");
}
Expand All @@ -468,17 +476,59 @@ void ConnectionManager::scheduleRequestRetry(int qnic_address) {
}

void ConnectionManager::teardownConnections(messages::InternalTerminatedRulesetIdsNotifier *pkt) {
if (connection_teardown_messages.size() > 0) {
int terminated_rulesets_number = pkt->getNumberOfTerminatedRulesets();
for (int i = 0; i < terminated_rulesets_number; i++) {
auto search = connection_teardown_messages.find(pkt->getTerminatedRulesetId(i));
if (search == connection_teardown_messages.end())
error("ConnectionManager Error: Trying to tear down a connection but the related ConnectionTeardown messages are not ready. This is likely a bug in the code.");
auto messages_to_send = search->second;
for (auto msg : messages_to_send) {
send(msg, "RouterPort$o");

for (int i = 0; i < pkt->getNumberOfTerminatedRulesets(); i++) {
auto search = connection_teardown_messages.find(pkt->getTerminatedRulesetId(i));
if (search != connection_teardown_messages.end()) { //This node is in charge of terminating this connection.
auto messages_to_send = search->second;
for (auto msg : messages_to_send) {
send(msg->dup(), "RouterPort$o");
}
connection_teardown_messages.erase(search->first);
}
}
}

void ConnectionManager::handleTeardownMessage(messages::ConnectionTeardown *td) {
auto qnic_addresses = reservation_register.getReservedQnics(td->getRuleSet_id());
reservation_register.deleteReservationByRulesetId(td->getRuleSet_id());
requestTerminationOfSwappingRulesets(td->getRuleSet_id());
for (int qnic_addr : qnic_addresses) {
popApplicationRequest(qnic_addr);
}
}

void ConnectionManager::removeAcceptedConnectionSetupRequestFromQueue(ConnectionSetupResponse *resp) {
unsigned long connection_setup_id = resp->getConnectionSetupRequestId();
int qnic_addr = *(reservation_register.getReservedQnics(connection_setup_id).begin()); // Should be only one bc this is the initiator.
auto &first_queue = connection_setup_buffer[qnic_addr];
if (first_queue.front()->getConnectionSetupRequestId() == resp->getConnectionSetupRequestId()) {
first_queue.pop();
return;
}
throw cRuntimeError("Mismatched ConnectionSetupRequestId when popping request queue.");
}

void ConnectionManager::makeQnicReservationForTomography(RequestQnicReservation* req) {
reservation_register.registerReservation(req->getQnicAddr(), req->getRuleSet_id());
if (!req->getPrepareTeardown()) return;
unsigned long ruleset_id = req->getRuleSet_id();
ConnectionTeardown *td_partner = new ConnectionTeardown("ConnectionTeardown");
td_partner->setSrcAddr(my_address);
td_partner->setDestAddr(req->getPartnerAddress());
td_partner->setRuleSet_id(ruleset_id);
auto td_myself = td_partner->dup();
td_myself->setDestAddr(my_address);
connection_teardown_messages[ruleset_id].push_back(td_myself);
connection_teardown_messages[ruleset_id].push_back(td_partner);
}

void ConnectionManager::requestTerminationOfSwappingRulesets(unsigned long ruleset_id) {
auto *rst = new RequestRulesetTermination();
rst->setSrcAddr(my_address);
rst->setDestAddr(my_address);
rst->setRuleSet_id(ruleset_id);
send(rst,"RouterPort$o");
}

} // namespace quisp::modules
4 changes: 4 additions & 0 deletions quisp/modules/QRSA/ConnectionManager/ConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class ConnectionManager : public IConnectionManager, public Logger::LoggerBase {
// bool isQnicBusy(int qnic_address);

void teardownConnections(messages::InternalTerminatedRulesetIdsNotifier *pkt);
void handleTeardownMessage(messages::ConnectionTeardown *td);
void removeAcceptedConnectionSetupRequestFromQueue(messages::ConnectionSetupResponse *resp);
void makeQnicReservationForTomography(messages::RequestQnicReservation* req);
void requestTerminationOfSwappingRulesets(unsigned long ruleset_id);

static rules::PurType parsePurType(const std::string &pur_type);

Expand Down
8 changes: 5 additions & 3 deletions quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ void HardwareMonitor::handleMessage(cMessage *msg) {

// RuleSets sent for this node and the partner node.
unsigned long ruleset_id = createUniqueId();
makeQnicReservationForTomography(my_address, my_qnic_info->qnic.address, ruleset_id);
makeQnicReservationForTomography(my_address, partner_address, my_qnic_info->qnic.address, ruleset_id,true);
sendLinkTomographyRuleSet(my_address, partner_address, my_qnic_info->qnic.type, my_qnic_info->qnic.index, ruleset_id);

QNIC_type partner_qnic_type = ack->getQnic_type();
int partner_qnic_index = ack->getQnic_index();
makeQnicReservationForTomography(partner_address, ack->getQnicAddr(), ruleset_id);
makeQnicReservationForTomography(partner_address, my_address, ack->getQnicAddr(), ruleset_id,false);
sendLinkTomographyRuleSet(partner_address, my_address, partner_qnic_type, partner_qnic_index, ruleset_id);
delete ack;
return;
Expand Down Expand Up @@ -1251,12 +1251,14 @@ std::unique_ptr<NeighborInfo> HardwareMonitor::createNeighborInfo(const cModule
thisNode.getClassName());
}

void HardwareMonitor::makeQnicReservationForTomography(int node_address, int qnic_addr, unsigned long ruleset_id) {
void HardwareMonitor::makeQnicReservationForTomography(int node_address, int partner_address, int qnic_addr, unsigned long ruleset_id, bool prepare_teardown) {
RequestQnicReservation *pkt = new RequestQnicReservation();
pkt->setSrcAddr(my_address);
pkt->setDestAddr(node_address);
pkt->setQnicAddr(qnic_addr);
pkt->setRuleSet_id(ruleset_id);
pkt->setPrepareTeardown(prepare_teardown);
pkt->setPartnerAddress(partner_address);
send(pkt, "RouterPort$o");
}

Expand Down
2 changes: 1 addition & 1 deletion quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class HardwareMonitor : public IHardwareMonitor {
virtual Eigen::Matrix4cd reconstruct_density_matrix(int qnic_id, int partner);
virtual unsigned long createUniqueId();
virtual void writeToFile_Topology_with_LinkCost(int qnic_id, double link_cost, double fidelity, double bellpair_per_sec);
virtual void makeQnicReservationForTomography(int node_address, int qnic_addr, unsigned long ruleset_id);
virtual void makeQnicReservationForTomography(int node_address, int partner_address, int qnic_addr, unsigned long ruleset_id, bool prepare_teardown);

std::unique_ptr<quisp::rules::Rule> constructPurifyRule(const std::string &rule_name, const rules::PurType pur_type, const int partner_address, const QNIC_type qnic_type,
const int qnic_index, const int send_tag) const;
Expand Down
2 changes: 2 additions & 0 deletions quisp/modules/QRSA/RuleEngine/RuleEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ void RuleEngine::handleMessage(cMessage *msg) {
runtimes.acceptRuleSet(ruleset.construct());
} else if (auto *pkt = dynamic_cast<StopEmitting *>(msg)) {
handleStopEmitting(pkt);
} else if (auto *pkt = dynamic_cast<RequestRulesetTermination *>(msg)) {
runtimes.terminateRuleset(pkt->getRuleSet_id());
}

for (int i = 0; i < number_of_qnics; i++) {
Expand Down
8 changes: 8 additions & 0 deletions quisp/runtime/RuntimeManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ void RuntimeManager::exec() {
}
}

void RuntimeManager::terminateRuleset(unsigned long ruleset_id) {
auto *rst = findById(ruleset_id);
if (rst != nullptr and !rst->terminated) {
rst->terminated = true;
//terminated_rulesets.push_back(ruleset_id);
}
}

std::vector<Runtime>::iterator RuntimeManager::begin() { return runtimes.begin(); }
std::vector<Runtime>::iterator RuntimeManager::end() { return runtimes.end(); }
std::vector<Runtime>::reference RuntimeManager::at(size_t index) { return runtimes.at(index); }
Expand Down
1 change: 1 addition & 0 deletions quisp/runtime/RuntimeManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RuntimeManager {
std::vector<Runtime>::iterator end();
std::vector<Runtime>::reference at(size_t);
size_t size() const;
void terminateRuleset(const unsigned long ruleset_id);
const std::vector<unsigned long>& getTerminatedRulesetIds() const;

protected:
Expand Down
8 changes: 8 additions & 0 deletions quisp/utils/ReservationRegister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ bool ReservationRegister::isQnicBusy(int qnic_addr) {
return true;
}

const std::set<int> ReservationRegister::getReservedQnics(unsigned long ruleset_id) const {
auto search = ruleset_id_to_qnic_addrs.find(ruleset_id);
if (search == ruleset_id_to_qnic_addrs.end()) {
throw cRuntimeError("No qnics associated to this ruleset id!");
} else
return search->second;
}

const int ReservationRegister::getRsidToQnicAddrMapSize() const { return ruleset_id_to_qnic_addrs.size(); };
const int ReservationRegister::getQnicAddrToRsidMapSize() const { return qnic_addr_to_ruleset_id.size(); };
const std::map<unsigned long, std::set<int>>& ReservationRegister::getRsidToQnicAddrMap() const { return ruleset_id_to_qnic_addrs; };
Expand Down
1 change: 1 addition & 0 deletions quisp/utils/ReservationRegister.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ReservationRegister {
void deleteReservationByRulesetId(unsigned long ruleset_id);
void updateReservationId(unsigned long old_ruleset_id, unsigned long new_ruleset_id);
bool isQnicBusy(int qnic_addr);
const std::set<int> getReservedQnics(unsigned long ruleset_id) const;

const int getRsidToQnicAddrMapSize() const;
const int getQnicAddrToRsidMapSize() const;
Expand Down

0 comments on commit 9a9c77b

Please sign in to comment.