Skip to content

Commit

Permalink
rebugging
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Nov 22, 2023
1 parent 1936ee6 commit 761e44a
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ namespace wrench {
}
#endif
req->wait();
WRENCH_INFO("Bytes sent over the network were received");
// WRENCH_INFO("Bytes sent over the network were received");
} catch (std::shared_ptr<NetworkError> &e) {
throw;
}
Expand Down
14 changes: 14 additions & 0 deletions src/wrench/simgrid_S4U_util/S4U_CommPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,28 @@ namespace wrench {
if (log) WRENCH_DEBUG("Getting a message from commport '%s' with timeout %lf sec", this->s4u_mb->get_cname(), timeout);
SimulationMessage *msg;

WRENCH_INFO("IN GET MESSAGE FOR COMMPORT %p", this);
simgrid::s4u::ActivitySet pending_receives;
auto mb_comm = this->s4u_mb->get_async<SimulationMessage>(&msg);
pending_receives.push(mb_comm);
auto mq_comm = this->s4u_mq->get_async<SimulationMessage>(&msg);
pending_receives.push(mq_comm);

WRENCH_INFO("IN GET MESSAGE: %p %p", mb_comm.get(), mq_comm.get());

simgrid::s4u::ActivityPtr finished_recv;
try {
// Wait for one activity to complete
WRENCH_INFO("CALLING WAIT_ANY_FOR");
finished_recv = pending_receives.wait_any_for(timeout);
WRENCH_INFO("CALLED WAIT_ANY_FOR");
} catch (simgrid::TimeoutException &e) {
WRENCH_INFO("IN THIS TRY CATCH");
mq_comm->cancel();
mb_comm->cancel();
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->name, ""));
} catch (simgrid::Exception &e) {
WRENCH_INFO("IN THAT TRY CATCH");
auto failed_recv = pending_receives.get_failed_activity();
if (failed_recv == mb_comm) {
mq_comm->cancel();
Expand All @@ -137,8 +144,12 @@ namespace wrench {

WRENCH_DEBUG("Got the message\n");

WRENCH_INFO("XXX IN GET MESSAGE NBEFIORE CENCEL");
if (finished_recv == mb_comm) {
WRENCH_INFO("IT WAS THE MB_COM");
WRENCH_INFO("CANCELING %p", mq_comm.get());
mq_comm->cancel();
WRENCH_INFO("WAITING ON %p", mb_comm.get());
mb_comm->wait();
} else if (finished_recv == mq_comm) {
mb_comm->cancel();
Expand Down Expand Up @@ -237,6 +248,7 @@ namespace wrench {
MessageManager::manageMessage(this, msg);
#endif
comm_ptr = this->s4u_mb->put_async(msg, (uint64_t) msg->payload);
std::cerr << "IN iPUTMESSAGE: " << comm_ptr.get() << "\n";
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name(), msg->getName()));
Expand Down Expand Up @@ -272,13 +284,15 @@ namespace wrench {

try {
auto comm_ptr = this->s4u_mb->get_async<void>((void **) (&(pending_communication->simulation_message)));
std::cerr << "in IGETMESSAGE " << comm_ptr.get() << "\n";
pending_communication->comm_ptr = comm_ptr;
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name(), ""));
}
simgrid::s4u::MessPtr mess_ptr = this->s4u_mq->get_async<void>((void **) (&(pending_communication->simulation_message)));
pending_communication->mess_ptr = mess_ptr;
std::cerr << "in IGETMESSAGE MQ " << mess_ptr.get() << "\n";
return pending_communication;
}

Expand Down
92 changes: 58 additions & 34 deletions src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,52 +78,76 @@ namespace wrench {
#endif
return std::move(this->simulation_message);
#endif
if (this->operation_type == S4U_PendingCommunication::OperationType::RECEIVING) {

// if (log) WRENCH_DEBUG("Getting a message from commport '%s' with timeout %lf sec", this->comm_ptr->get_cname(), timeout);

// if (log) WRENCH_DEBUG("Getting a message from commport '%s' with timeout %lf sec", this->comm_ptr->get_cname(), timeout);
SimulationMessage *msg;
simgrid::s4u::ActivitySet pending_receives;
pending_receives.push(this->comm_ptr);
pending_receives.push(this->mess_ptr);

simgrid::s4u::ActivitySet pending_receives;
pending_receives.push(this->comm_ptr);
pending_receives.push(this->mess_ptr);

simgrid::s4u::ActivityPtr finished_recv;
try {
// Wait for one activity to complete
finished_recv = pending_receives.wait_any_for(timeout);
} catch (simgrid::TimeoutException &e) {
mess_ptr->cancel();
comm_ptr->cancel();
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->commport->get_name(), ""));
} catch (simgrid::Exception &e) {
auto failed_recv = pending_receives.get_failed_activity();
if (failed_recv == comm_ptr) {
simgrid::s4u::ActivityPtr finished_recv;
try {
// Wait for one activity to complete
finished_recv = pending_receives.wait_any_for(timeout);
} catch (simgrid::TimeoutException &e) {
mess_ptr->cancel();
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->comm_ptr->get_name(), ""));
} else {
comm_ptr->cancel();
throw ExecutionException(std::make_shared<wrench::FatalFailure>("A communication on a MQ should never fail"));
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->commport->get_name(), ""));
} catch (simgrid::Exception &e) {
auto failed_recv = pending_receives.get_failed_activity();
if (failed_recv == comm_ptr) {
mess_ptr->cancel();
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->comm_ptr->get_name(), ""));
} else {
comm_ptr->cancel();
throw ExecutionException(std::make_shared<wrench::FatalFailure>("A communication on a MQ should never fail"));
}
}
}

WRENCH_DEBUG("Got the message\n");

if (finished_recv == comm_ptr) {
mess_ptr->cancel();
comm_ptr->wait();
} else if (finished_recv == mess_ptr) {
comm_ptr->cancel();
mess_ptr->wait();
}
if (finished_recv == comm_ptr) {
mess_ptr->cancel();
comm_ptr->wait();
} else if (finished_recv == mess_ptr) {
comm_ptr->cancel();
mess_ptr->wait();
}

#ifdef MESSAGE_MANAGER
MessageManager::removeReceivedMessage(this, msg);
MessageManager::removeReceivedMessage(this, msg);
#endif

WRENCH_DEBUG("Received a '%s' message from commport '%s'", this->simulation_message->getName().c_str(), this->commport->get_cname());
WRENCH_DEBUG("Received a '%s' message from commport '%s'", this->simulation_message->getName().c_str(), this->commport->get_cname());

return std::move(this->simulation_message);
return std::move(this->simulation_message);
} else {
if (this->comm_ptr) {
try {
this->comm_ptr->wait_for(timeout);
return nullptr;
} catch (simgrid::NetworkFailureException &e) {
if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::SENDING, NetworkError::FAILURE, this->commport->s4u_mb->get_name(), ""));
} else {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, this->commport->s4u_mb->get_name(), ""));
}
} catch (simgrid::TimeoutException &e) {
if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::SENDING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name(), ""));
} else {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::RECEIVING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name(), ""));
}
}
} else if (this->mess_ptr) {
this->mess_ptr->wait_for(timeout);
return nullptr;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ void BareMetalComputeServiceOneActionTest::do_Noop_test() {
// Create and initialize a simulation
auto simulation = wrench::Simulation::createSimulation();

int argc = 2;
int argc = 1;
auto argv = (char **) calloc(argc, sizeof(char *));
argv[0] = strdup("one_action_test");
argv[1] = strdup("--wrench-full-log");
// argv[1] = strdup("--wrench-full-log");

ASSERT_NO_THROW(simulation->init(&argc, argv));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2049,9 +2049,10 @@ TEST_F(BareMetalComputeServiceOneTaskTest, ExecutionWithSuspendedService) {
void BareMetalComputeServiceOneTaskTest::do_ExecutionWithSuspendedService_test() {
// Create and initialize a simulation
auto simulation = wrench::Simulation::createSimulation();
int argc = 1;
int argc = 2;
auto **argv = (char **) calloc(argc, sizeof(char *));
argv[0] = strdup("one_task_test");
argv[1] = strdup("--wrench-full-log");

simulation->init(&argc, argv);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void FileCopyActionExecutorTest::do_FileCopyActionExecutorSuccessTest_test() {
int argc = 1;
char **argv = (char **) calloc(argc, sizeof(char *));
argv[0] = strdup("unit_test");
// argv[1] = strdup("--wrench-full-log");
// argv[1] = strdup("--wrench-full-log");

simulation->init(&argc, argv);

Expand Down

0 comments on commit 761e44a

Please sign in to comment.