Skip to content

Commit 94190d2

Browse files
committed
Implemted more of the MQ stuff
1 parent 53719e1 commit 94190d2

File tree

4 files changed

+64
-15
lines changed

4 files changed

+64
-15
lines changed

include/wrench/simgrid_S4U_util/S4U_PendingCommunication.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
namespace wrench {
2121

22-
// class SimulationMessage;
23-
2422
/*******************/
2523
/** \cond INTERNAL */
2624
/*******************/
@@ -57,15 +55,17 @@ namespace wrench {
5755

5856
// ~S4U_PendingCommunication() default;
5957

60-
/** @brief The SimGrid communication handle */
61-
simgrid::s4u::CommPtr comm_ptr;
62-
6358
/** @brief The message */
6459
std::unique_ptr<SimulationMessage> simulation_message;
6560
/** @brief The CommPort */
6661
S4U_CommPort *commport;
6762
/** @brief The operation type */
6863
OperationType operation_type;
64+
65+
/** @brief The SimGrid Mailbox communication handle */
66+
simgrid::s4u::CommPtr comm_ptr;
67+
/** @brief The SimGrid MessageQueue communication handle */
68+
simgrid::s4u::MessPtr mess_ptr;
6969
};
7070

7171
/*******************/

src/wrench/simgrid_S4U_util/S4U_CommPort.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,6 @@ namespace wrench {
143143
} else if (finished_recv == mq_comm) {
144144
mb_comm->cancel();
145145
mq_comm->wait();
146-
} else {
147-
std::cerr << "WTF\n";
148146
}
149147

150148
#ifdef MESSAGE_MANAGER
@@ -266,24 +264,24 @@ namespace wrench {
266264
throw std::invalid_argument("S4U_CommPort::igetMessage(): Cannot be called with NULL_COMMPORT");
267265
}
268266

269-
simgrid::s4u::CommPtr comm_ptr;
270-
271267
WRENCH_DEBUG("Igetting a message from commport '%s'", this->s4u_mb->get_cname());
272268

273269
std::shared_ptr<S4U_PendingCommunication> pending_communication = std::make_shared<S4U_PendingCommunication>(
274270
this, S4U_PendingCommunication::OperationType::RECEIVING);
275271

272+
276273
try {
277-
comm_ptr = this->s4u_mb->get_async<void>((void **) (&(pending_communication->simulation_message)));
274+
auto comm_ptr = this->s4u_mb->get_async<void>((void **) (&(pending_communication->simulation_message)));
275+
pending_communication->comm_ptr = comm_ptr;
278276
} catch (simgrid::NetworkFailureException &e) {
279277
throw ExecutionException(std::make_shared<NetworkError>(
280278
NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name(), ""));
281279
}
282-
pending_communication->comm_ptr = comm_ptr;
280+
simgrid::s4u::MessPtr mess_ptr = this->s4u_mq->get_async<void>((void **) (&(pending_communication->simulation_message)));
281+
pending_communication->mess_ptr = mess_ptr;
283282
return pending_communication;
284283
}
285284

286-
287285
/**
288286
* @brief Generate a unique sequence number
289287
*

src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <memory>
1111
#include <iostream>
1212
#include <wrench/failure_causes/NetworkError.h>
13+
#include "wrench/failure_causes/FatalFailure.h"
14+
1315

1416
#ifdef MESSAGE_MANAGER
1517
#include <wrench/util/MessageManager.h>
@@ -27,7 +29,6 @@ WRENCH_LOG_CATEGORY(wrench_core_pending_communication, "Log category for Pending
2729

2830
namespace wrench {
2931

30-
3132
/**
3233
* @brief Wait for the pending communication to complete
3334
*
@@ -48,7 +49,10 @@ namespace wrench {
4849
* @throw std::shared_ptr<NetworkError>
4950
*/
5051
std::unique_ptr<SimulationMessage> S4U_PendingCommunication::wait(double timeout) {
52+
53+
#if 0
5154
try {
55+
// IS THIS NECESSARY?
5256
if (this->comm_ptr->get_state() != simgrid::s4u::Activity::State::FINISHED) {
5357
this->comm_ptr->wait_for(timeout);
5458
}
@@ -73,6 +77,53 @@ namespace wrench {
7377
MessageManager::removeReceivedMessage(this->commport, this->simulation_message.get());
7478
#endif
7579
return std::move(this->simulation_message);
80+
#endif
81+
82+
83+
// if (log) WRENCH_DEBUG("Getting a message from commport '%s' with timeout %lf sec", this->comm_ptr->get_cname(), timeout);
84+
SimulationMessage *msg;
85+
86+
simgrid::s4u::ActivitySet pending_receives;
87+
pending_receives.push(this->comm_ptr);
88+
pending_receives.push(this->mess_ptr);
89+
90+
simgrid::s4u::ActivityPtr finished_recv;
91+
try {
92+
// Wait for one activity to complete
93+
finished_recv = pending_receives.wait_any_for(timeout);
94+
} catch (simgrid::TimeoutException &e) {
95+
mess_ptr->cancel();
96+
comm_ptr->cancel();
97+
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->commport->get_name(), ""));
98+
} catch (simgrid::Exception &e) {
99+
auto failed_recv = pending_receives.get_failed_activity();
100+
if (failed_recv == comm_ptr) {
101+
mess_ptr->cancel();
102+
throw ExecutionException(std::make_shared<NetworkError>(
103+
NetworkError::RECEIVING, NetworkError::FAILURE, this->comm_ptr->get_name(), ""));
104+
} else {
105+
comm_ptr->cancel();
106+
throw ExecutionException(std::make_shared<wrench::FatalFailure>("A communication on a MQ should never fail"));
107+
}
108+
}
109+
110+
WRENCH_DEBUG("Got the message\n");
111+
112+
if (finished_recv == comm_ptr) {
113+
mess_ptr->cancel();
114+
comm_ptr->wait();
115+
} else if (finished_recv == mess_ptr) {
116+
comm_ptr->cancel();
117+
mess_ptr->wait();
118+
}
119+
120+
#ifdef MESSAGE_MANAGER
121+
MessageManager::removeReceivedMessage(this, msg);
122+
#endif
123+
124+
WRENCH_DEBUG("Received a '%s' message from commport '%s'", msg->getName().c_str(), this->commport->get_cname());
125+
126+
return std::unique_ptr<SimulationMessage>(msg);
76127
}
77128

78129
/**

test/services/compute_services/bare_metal_compound_jobs/BareMetalComputeServiceOneActionTests.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,10 @@ void BareMetalComputeServiceOneActionTest::do_Noop_test() {
317317
// Create and initialize a simulation
318318
auto simulation = wrench::Simulation::createSimulation();
319319

320-
int argc = 1;
320+
int argc = 2;
321321
auto argv = (char **) calloc(argc, sizeof(char *));
322322
argv[0] = strdup("one_action_test");
323-
// argv[1] = strdup("--wrench-full-log");
323+
argv[1] = strdup("--wrench-full-log");
324324

325325
ASSERT_NO_THROW(simulation->init(&argc, argv));
326326

0 commit comments

Comments
 (0)