From 9a9c77b6d4742ef8e62c549ffb1caebb187ca3b9 Mon Sep 17 00:00:00 2001 From: Paolo Fittipaldi Date: Mon, 4 Nov 2024 15:51:31 +0100 Subject: [PATCH] Malfunctioning status, troubleshooting repeater teardown --- quisp/messages/connection_setup_messages.msg | 7 ++ quisp/modules/Common/Router.cc | 6 +- .../ConnectionManager/ConnectionManager.cc | 90 ++++++++++++++----- .../ConnectionManager/ConnectionManager.h | 4 + .../QRSA/HardwareMonitor/HardwareMonitor.cc | 8 +- .../QRSA/HardwareMonitor/HardwareMonitor.h | 2 +- quisp/modules/QRSA/RuleEngine/RuleEngine.cc | 2 + quisp/runtime/RuntimeManager.cc | 8 ++ quisp/runtime/RuntimeManager.h | 1 + quisp/utils/ReservationRegister.cc | 8 ++ quisp/utils/ReservationRegister.h | 1 + 11 files changed, 112 insertions(+), 25 deletions(-) diff --git a/quisp/messages/connection_setup_messages.msg b/quisp/messages/connection_setup_messages.msg index fec837060..6cd19fe90 100644 --- a/quisp/messages/connection_setup_messages.msg +++ b/quisp/messages/connection_setup_messages.msg @@ -51,4 +51,11 @@ packet RequestQnicReservation extends Header { unsigned long RuleSet_id; int qnic_addr @getter(getQnicAddr) @setter(setQnicAddr); + int partner_address @getter(getPartnerAddress) @setter(setPartnerAddress); + bool prepare_teardown @setter(setPrepareTeardown) @getter(getPrepareTeardown); } + +packet RequestRulesetTermination extends Header +{ + unsigned long RuleSet_id; +} \ No newline at end of file diff --git a/quisp/modules/Common/Router.cc b/quisp/modules/Common/Router.cc index 3239d926a..d3664bc77 100644 --- a/quisp/modules/Common/Router.cc +++ b/quisp/modules/Common/Router.cc @@ -154,7 +154,11 @@ void Router::handleMessage(cMessage *msg) { bubble("Qnic Reservation Request received"); send(pk, "cmPort$o"); return; - } + } else if (dest_addr == my_address && dynamic_cast(msg)) { + bubble("Ruleset Termination Request received"); + send(pk, "rePort$o"); + return; + } // RoutingDaemon sends hello packet without desination specified if (dest_addr == unidentified_destination && dynamic_cast(msg)) { diff --git a/quisp/modules/QRSA/ConnectionManager/ConnectionManager.cc b/quisp/modules/QRSA/ConnectionManager/ConnectionManager.cc index a6b80b07a..7be7ab07f 100644 --- a/quisp/modules/QRSA/ConnectionManager/ConnectionManager.cc +++ b/quisp/modules/QRSA/ConnectionManager/ConnectionManager.cc @@ -89,6 +89,7 @@ void ConnectionManager::handleMessage(cMessage *msg) { delete msg; } else if (actual_src == my_address) { // initiator node + req->setConnectionSetupRequestId(createUniqueId()); queueApplicationRequest(req); } else { // intermediate node @@ -98,11 +99,15 @@ void ConnectionManager::handleMessage(cMessage *msg) { } if (auto *resp = dynamic_cast(msg)) { - reservation_register.updateReservationId(resp->getConnectionSetupRequestId(), resp->getRuleSet_id()); - int initiator_addr = resp->getInitiatorAddr(); int responder_addr = resp->getResponderAddr(); + if (initiator_addr == my_address) { + removeAcceptedConnectionSetupRequestFromQueue(resp); + } + + reservation_register.updateReservationId(resp->getConnectionSetupRequestId(), resp->getRuleSet_id()); + if (initiator_addr == my_address || responder_addr == my_address) { // this node is not a swapper storeRuleSetForApplication(resp); @@ -111,6 +116,7 @@ void ConnectionManager::handleMessage(cMessage *msg) { // currently, destinations are separated. (Not accumulated.) storeRuleSet(resp); } + delete msg; return; } @@ -129,15 +135,20 @@ void ConnectionManager::handleMessage(cMessage *msg) { if (auto *pk = dynamic_cast(msg)) { teardownConnections(pk); + delete msg; return; } if (auto *td = dynamic_cast(msg)) { - reservation_register.deleteReservationByRulesetId(td->getRuleSet_id()); + handleTeardownMessage(td); + delete msg; + return; } if (auto *req = dynamic_cast(msg)) { - reservation_register.registerReservation(req->getQnicAddr(), req->getRuleSet_id()); + makeQnicReservationForTomography(req); + delete msg; + return; } } @@ -266,8 +277,6 @@ void ConnectionManager::respondToRequest(ConnectionSetupRequest *req) { auto ruleset_id = createUniqueId(); ruleset_gen::RuleSetGenerator ruleset_gen{my_address}; const auto &rulesets = ruleset_gen.generateRuleSets(req, ruleset_id); - - connection_teardown_messages[ruleset_id].clear(); // distribute rulesets to each qnode in the path for (auto [owner_address, rs] : rulesets) { ConnectionSetupResponse *resp = new ConnectionSetupResponse("ConnectionSetupResponse"); @@ -423,12 +432,12 @@ void ConnectionManager::queueApplicationRequest(ConnectionSetupRequest *req) { void ConnectionManager::popApplicationRequest(int qnic_address) { auto &request_queue = connection_setup_buffer[qnic_address]; - auto *req = request_queue.front(); + // auto *req = request_queue.front(); connection_retry_count[qnic_address] = 0; - request_queue.pop(); - delete req; - reservation_register.deleteReservationByQnicAddr(qnic_address); + // request_queue.pop(); // Old connection, served. + // delete req; + // reservation_register.deleteReservationByQnicAddr(qnic_address); if (!request_queue.empty()) { EV << "schedule from pop" << endl; @@ -450,7 +459,6 @@ void ConnectionManager::initiateApplicationRequest(int qnic_address) { } auto to_send = request_queue.front()->dup(); - to_send->setConnectionSetupRequestId(createUniqueId()); reservation_register.registerReservation(qnic_address, to_send->getConnectionSetupRequestId()); send(to_send, "RouterPort$o"); } @@ -468,17 +476,59 @@ void ConnectionManager::scheduleRequestRetry(int qnic_address) { } void ConnectionManager::teardownConnections(messages::InternalTerminatedRulesetIdsNotifier *pkt) { - if (connection_teardown_messages.size() > 0) { - int terminated_rulesets_number = pkt->getNumberOfTerminatedRulesets(); - for (int i = 0; i < terminated_rulesets_number; i++) { - auto search = connection_teardown_messages.find(pkt->getTerminatedRulesetId(i)); - if (search == connection_teardown_messages.end()) - error("ConnectionManager Error: Trying to tear down a connection but the related ConnectionTeardown messages are not ready. This is likely a bug in the code."); - auto messages_to_send = search->second; - for (auto msg : messages_to_send) { - send(msg, "RouterPort$o"); + +for (int i = 0; i < pkt->getNumberOfTerminatedRulesets(); i++) { +auto search = connection_teardown_messages.find(pkt->getTerminatedRulesetId(i)); +if (search != connection_teardown_messages.end()) { //This node is in charge of terminating this connection. +auto messages_to_send = search->second; +for (auto msg : messages_to_send) { + send(msg->dup(), "RouterPort$o"); } + connection_teardown_messages.erase(search->first); } } } + +void ConnectionManager::handleTeardownMessage(messages::ConnectionTeardown *td) { + auto qnic_addresses = reservation_register.getReservedQnics(td->getRuleSet_id()); + reservation_register.deleteReservationByRulesetId(td->getRuleSet_id()); + requestTerminationOfSwappingRulesets(td->getRuleSet_id()); + for (int qnic_addr : qnic_addresses) { + popApplicationRequest(qnic_addr); + } +} + +void ConnectionManager::removeAcceptedConnectionSetupRequestFromQueue(ConnectionSetupResponse *resp) { + unsigned long connection_setup_id = resp->getConnectionSetupRequestId(); + int qnic_addr = *(reservation_register.getReservedQnics(connection_setup_id).begin()); // Should be only one bc this is the initiator. + auto &first_queue = connection_setup_buffer[qnic_addr]; + if (first_queue.front()->getConnectionSetupRequestId() == resp->getConnectionSetupRequestId()) { + first_queue.pop(); + return; + } + throw cRuntimeError("Mismatched ConnectionSetupRequestId when popping request queue."); +} + +void ConnectionManager::makeQnicReservationForTomography(RequestQnicReservation* req) { + reservation_register.registerReservation(req->getQnicAddr(), req->getRuleSet_id()); + if (!req->getPrepareTeardown()) return; + unsigned long ruleset_id = req->getRuleSet_id(); + ConnectionTeardown *td_partner = new ConnectionTeardown("ConnectionTeardown"); + td_partner->setSrcAddr(my_address); + td_partner->setDestAddr(req->getPartnerAddress()); + td_partner->setRuleSet_id(ruleset_id); + auto td_myself = td_partner->dup(); + td_myself->setDestAddr(my_address); + connection_teardown_messages[ruleset_id].push_back(td_myself); + connection_teardown_messages[ruleset_id].push_back(td_partner); +} + +void ConnectionManager::requestTerminationOfSwappingRulesets(unsigned long ruleset_id) { + auto *rst = new RequestRulesetTermination(); + rst->setSrcAddr(my_address); + rst->setDestAddr(my_address); + rst->setRuleSet_id(ruleset_id); + send(rst,"RouterPort$o"); +} + } // namespace quisp::modules diff --git a/quisp/modules/QRSA/ConnectionManager/ConnectionManager.h b/quisp/modules/QRSA/ConnectionManager/ConnectionManager.h index 297ca4462..659504104 100644 --- a/quisp/modules/QRSA/ConnectionManager/ConnectionManager.h +++ b/quisp/modules/QRSA/ConnectionManager/ConnectionManager.h @@ -104,6 +104,10 @@ class ConnectionManager : public IConnectionManager, public Logger::LoggerBase { // bool isQnicBusy(int qnic_address); void teardownConnections(messages::InternalTerminatedRulesetIdsNotifier *pkt); + void handleTeardownMessage(messages::ConnectionTeardown *td); + void removeAcceptedConnectionSetupRequestFromQueue(messages::ConnectionSetupResponse *resp); + void makeQnicReservationForTomography(messages::RequestQnicReservation* req); + void requestTerminationOfSwappingRulesets(unsigned long ruleset_id); static rules::PurType parsePurType(const std::string &pur_type); diff --git a/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.cc b/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.cc index 702c9d40d..adbf642c2 100644 --- a/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.cc +++ b/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.cc @@ -146,12 +146,12 @@ void HardwareMonitor::handleMessage(cMessage *msg) { // RuleSets sent for this node and the partner node. unsigned long ruleset_id = createUniqueId(); - makeQnicReservationForTomography(my_address, my_qnic_info->qnic.address, ruleset_id); + makeQnicReservationForTomography(my_address, partner_address, my_qnic_info->qnic.address, ruleset_id,true); sendLinkTomographyRuleSet(my_address, partner_address, my_qnic_info->qnic.type, my_qnic_info->qnic.index, ruleset_id); QNIC_type partner_qnic_type = ack->getQnic_type(); int partner_qnic_index = ack->getQnic_index(); - makeQnicReservationForTomography(partner_address, ack->getQnicAddr(), ruleset_id); + makeQnicReservationForTomography(partner_address, my_address, ack->getQnicAddr(), ruleset_id,false); sendLinkTomographyRuleSet(partner_address, my_address, partner_qnic_type, partner_qnic_index, ruleset_id); delete ack; return; @@ -1251,12 +1251,14 @@ std::unique_ptr HardwareMonitor::createNeighborInfo(const cModule thisNode.getClassName()); } -void HardwareMonitor::makeQnicReservationForTomography(int node_address, int qnic_addr, unsigned long ruleset_id) { +void HardwareMonitor::makeQnicReservationForTomography(int node_address, int partner_address, int qnic_addr, unsigned long ruleset_id, bool prepare_teardown) { RequestQnicReservation *pkt = new RequestQnicReservation(); pkt->setSrcAddr(my_address); pkt->setDestAddr(node_address); pkt->setQnicAddr(qnic_addr); pkt->setRuleSet_id(ruleset_id); + pkt->setPrepareTeardown(prepare_teardown); + pkt->setPartnerAddress(partner_address); send(pkt, "RouterPort$o"); } diff --git a/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.h b/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.h index f19b812b3..411ce0056 100644 --- a/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.h +++ b/quisp/modules/QRSA/HardwareMonitor/HardwareMonitor.h @@ -85,7 +85,7 @@ class HardwareMonitor : public IHardwareMonitor { virtual Eigen::Matrix4cd reconstruct_density_matrix(int qnic_id, int partner); virtual unsigned long createUniqueId(); virtual void writeToFile_Topology_with_LinkCost(int qnic_id, double link_cost, double fidelity, double bellpair_per_sec); - virtual void makeQnicReservationForTomography(int node_address, int qnic_addr, unsigned long ruleset_id); + virtual void makeQnicReservationForTomography(int node_address, int partner_address, int qnic_addr, unsigned long ruleset_id, bool prepare_teardown); std::unique_ptr constructPurifyRule(const std::string &rule_name, const rules::PurType pur_type, const int partner_address, const QNIC_type qnic_type, const int qnic_index, const int send_tag) const; diff --git a/quisp/modules/QRSA/RuleEngine/RuleEngine.cc b/quisp/modules/QRSA/RuleEngine/RuleEngine.cc index 2d5fb186e..1437610b8 100644 --- a/quisp/modules/QRSA/RuleEngine/RuleEngine.cc +++ b/quisp/modules/QRSA/RuleEngine/RuleEngine.cc @@ -181,6 +181,8 @@ void RuleEngine::handleMessage(cMessage *msg) { runtimes.acceptRuleSet(ruleset.construct()); } else if (auto *pkt = dynamic_cast(msg)) { handleStopEmitting(pkt); + } else if (auto *pkt = dynamic_cast(msg)) { + runtimes.terminateRuleset(pkt->getRuleSet_id()); } for (int i = 0; i < number_of_qnics; i++) { diff --git a/quisp/runtime/RuntimeManager.cc b/quisp/runtime/RuntimeManager.cc index 7d8fff8bc..43442853a 100644 --- a/quisp/runtime/RuntimeManager.cc +++ b/quisp/runtime/RuntimeManager.cc @@ -29,6 +29,14 @@ void RuntimeManager::exec() { } } +void RuntimeManager::terminateRuleset(unsigned long ruleset_id) { + auto *rst = findById(ruleset_id); + if (rst != nullptr and !rst->terminated) { + rst->terminated = true; + //terminated_rulesets.push_back(ruleset_id); + } +} + std::vector::iterator RuntimeManager::begin() { return runtimes.begin(); } std::vector::iterator RuntimeManager::end() { return runtimes.end(); } std::vector::reference RuntimeManager::at(size_t index) { return runtimes.at(index); } diff --git a/quisp/runtime/RuntimeManager.h b/quisp/runtime/RuntimeManager.h index 47d90cf0f..afd7e6d28 100644 --- a/quisp/runtime/RuntimeManager.h +++ b/quisp/runtime/RuntimeManager.h @@ -13,6 +13,7 @@ class RuntimeManager { std::vector::iterator end(); std::vector::reference at(size_t); size_t size() const; + void terminateRuleset(const unsigned long ruleset_id); const std::vector& getTerminatedRulesetIds() const; protected: diff --git a/quisp/utils/ReservationRegister.cc b/quisp/utils/ReservationRegister.cc index 78da57781..7498ee1af 100644 --- a/quisp/utils/ReservationRegister.cc +++ b/quisp/utils/ReservationRegister.cc @@ -70,6 +70,14 @@ bool ReservationRegister::isQnicBusy(int qnic_addr) { return true; } +const std::set ReservationRegister::getReservedQnics(unsigned long ruleset_id) const { + auto search = ruleset_id_to_qnic_addrs.find(ruleset_id); + if (search == ruleset_id_to_qnic_addrs.end()) { + throw cRuntimeError("No qnics associated to this ruleset id!"); + } else + return search->second; +} + const int ReservationRegister::getRsidToQnicAddrMapSize() const { return ruleset_id_to_qnic_addrs.size(); }; const int ReservationRegister::getQnicAddrToRsidMapSize() const { return qnic_addr_to_ruleset_id.size(); }; const std::map>& ReservationRegister::getRsidToQnicAddrMap() const { return ruleset_id_to_qnic_addrs; }; diff --git a/quisp/utils/ReservationRegister.h b/quisp/utils/ReservationRegister.h index 59045e71a..77f866a09 100644 --- a/quisp/utils/ReservationRegister.h +++ b/quisp/utils/ReservationRegister.h @@ -22,6 +22,7 @@ class ReservationRegister { void deleteReservationByRulesetId(unsigned long ruleset_id); void updateReservationId(unsigned long old_ruleset_id, unsigned long new_ruleset_id); bool isQnicBusy(int qnic_addr); + const std::set getReservedQnics(unsigned long ruleset_id) const; const int getRsidToQnicAddrMapSize() const; const int getQnicAddrToRsidMapSize() const;