diff --git a/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp b/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp index 2d35983fbe..9a3a059993 100644 --- a/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp +++ b/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp @@ -269,7 +269,7 @@ namespace wrench { WRENCH_WARN("LogicalFileSystem::reserveSpace(): Space was already being reserved for storing file %s at path %s:%s. " "This is likely a redundant copy, and nothing needs to be done", file->getID().c_str(), this->hostname.c_str(), fixed_path.c_str()); - return true; + return true; } if (this->free_space < file->getSize()) { diff --git a/tools/wrench/wrench-daemon/include/SimulationController.h b/tools/wrench/wrench-daemon/include/SimulationController.h old mode 100755 new mode 100644 index f07458b551..353ea7fd6b --- a/tools/wrench/wrench-daemon/include/SimulationController.h +++ b/tools/wrench/wrench-daemon/include/SimulationController.h @@ -117,7 +117,6 @@ namespace wrench { json getVMComputeService(json data); private: - template json startService(T *s); diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index 13533991ff..0ed6464909 100644 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -20,23 +20,25 @@ WRENCH_LOG_CATEGORY(simulation_controller, "Log category for SimulationController"); -#define PARSE_SERVICE_PROPERTY_LIST() WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \ -{ \ - json jsonData = json::parse(property_list_string); \ - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ - auto property_key = ServiceProperty::translateString(it.key()); \ - service_property_list[property_key] = it.value(); \ - } \ -} - -#define PARSE_MESSAGE_PAYLOAD_LIST() WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \ -{ \ - json jsonData = json::parse(message_payload_list_string); \ - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ - service_message_payload_list[message_payload_key] = it.value(); \ - } \ -} +#define PARSE_SERVICE_PROPERTY_LIST() \ + WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \ + { \ + json jsonData = json::parse(property_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto property_key = ServiceProperty::translateString(it.key()); \ + service_property_list[property_key] = it.value(); \ + } \ + } + +#define PARSE_MESSAGE_PAYLOAD_LIST() \ + WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \ + { \ + json jsonData = json::parse(message_payload_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ + service_message_payload_list[message_payload_key] = it.value(); \ + } \ + } namespace wrench { @@ -50,11 +52,11 @@ namespace wrench { const std::string &hostname, int sleep_us) : ExecutionController(hostname, "SimulationController"), workflow(workflow), sleep_us(sleep_us) {} - template + template json SimulationController::startService(T *s) { BlockingQueue> s_created; - this->things_to_do.push([this, s, &s_created](){ + this->things_to_do.push([this, s, &s_created]() { try { auto new_service_shared_ptr = this->simulation->startNewService(s); if (auto cs = std::dynamic_pointer_cast(new_service_shared_ptr)) { @@ -106,10 +108,10 @@ namespace wrench { // Main control loop while (keep_going) { - + // Starting compute and storage services that should be started, if any while (true) { - std::function thing_to_do; + std::function thing_to_do; if (this->things_to_do.tryPop(thing_to_do)) { thing_to_do(); @@ -128,7 +130,7 @@ namespace wrench { // Moves time forward if needed (because the client has done a sleep), // And then add all events that occurred to the event queue double time_to_sleep = std::max(0, time_horizon_to_reach - - wrench::Simulation::getCurrentSimulatedDate()); + wrench::Simulation::getCurrentSimulatedDate()); if (time_to_sleep > 0.0) { WRENCH_INFO("Sleeping %.2lf seconds", time_to_sleep); S4U_Simulation::sleep(time_to_sleep); @@ -333,7 +335,6 @@ namespace wrench { auto new_service = new CloudComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); return this->startService(new_service); - } @@ -357,7 +358,6 @@ namespace wrench { auto new_service = new BatchComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); return this->startService(new_service); - } /** @@ -386,15 +386,15 @@ namespace wrench { BlockingQueue> vm_created; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - std::string vm_name; - try { - vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); - vm_created.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - vm_created.push(std::pair(false, e.getCause()->toString())); - } + this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + std::string vm_name; + try { + vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); + vm_created.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_created.push(std::pair(false, e.getCause()->toString())); + } }); // Poll from the shared queue (will be a single one!) @@ -428,20 +428,20 @@ namespace wrench { BlockingQueue> vm_started; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([this, vm_name, cs, &vm_started](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot start VM because it's not down"); - } - auto bm_cs = cloud_cs->startVM(vm_name); - this->compute_service_registry.insert(bm_cs->getName(), bm_cs); - vm_started.push(std::pair(true, bm_cs->getName())); - } catch (ExecutionException &e) { - vm_started.push(std::pair(false, e.getCause()->toString())); - } catch (std::invalid_argument &e) { - vm_started.push(std::pair(false, e.what())); - } + this->things_to_do.push([this, vm_name, cs, &vm_started]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot start VM because it's not down"); + } + auto bm_cs = cloud_cs->startVM(vm_name); + this->compute_service_registry.insert(bm_cs->getName(), bm_cs); + vm_started.push(std::pair(true, bm_cs->getName())); + } catch (ExecutionException &e) { + vm_started.push(std::pair(false, e.getCause()->toString())); + } catch (std::invalid_argument &e) { + vm_started.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -477,22 +477,22 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) //this->vm_to_shutdown.push(std::pair(vm_name, cs)); - this->things_to_do.push([this, vm_name, cs, &vm_shutdown](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMRunning(vm_name)) { - throw std::invalid_argument("Cannot shutdown VM because it's not running"); - } - auto bm_cs = cloud_cs->getVMComputeService(vm_name); - - this->compute_service_registry.remove(bm_cs->getName()); - cloud_cs->shutdownVM(vm_name); - vm_shutdown.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - vm_shutdown.push(std::pair(false, e.what())); - } catch (std::invalid_argument &e) { - vm_shutdown.push(std::pair(false, e.what())); - } + this->things_to_do.push([this, vm_name, cs, &vm_shutdown]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMRunning(vm_name)) { + throw std::invalid_argument("Cannot shutdown VM because it's not running"); + } + auto bm_cs = cloud_cs->getVMComputeService(vm_name); + + this->compute_service_registry.remove(bm_cs->getName()); + cloud_cs->shutdownVM(vm_name); + vm_shutdown.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_shutdown.push(std::pair(false, e.what())); + } catch (std::invalid_argument &e) { + vm_shutdown.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -525,17 +525,17 @@ namespace wrench { BlockingQueue> vm_destroyed; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([vm_name, cs, &vm_destroyed](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot destroy VM because it's not down"); - } - cloud_cs->destroyVM(vm_name); - vm_destroyed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_destroyed.push(std::pair(false, e.what())); - } + this->things_to_do.push([vm_name, cs, &vm_destroyed]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot destroy VM because it's not down"); + } + cloud_cs->destroyVM(vm_name); + vm_destroyed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_destroyed.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -562,7 +562,6 @@ namespace wrench { // Create the new service auto new_service = SimpleStorageService::createSimpleStorageService(head_host, mount_points, {}, {}); return this->startService(new_service); - } /** @@ -616,13 +615,13 @@ namespace wrench { BlockingQueue> file_looked_up; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([file, ss, &file_looked_up](){ - try { - bool result = ss->lookupFile(file); - file_looked_up.push(std::tuple(true, result, "")); - } catch (std::invalid_argument &e) { - file_looked_up.push(std::tuple(false, false, e.what())); - } + this->things_to_do.push([file, ss, &file_looked_up]() { + try { + bool result = ss->lookupFile(file); + file_looked_up.push(std::tuple(true, result, "")); + } catch (std::invalid_argument &e) { + file_looked_up.push(std::tuple(false, false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -708,7 +707,7 @@ namespace wrench { } BlockingQueue> job_submitted; - this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted](){ + this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted]() { try { WRENCH_INFO("Submitting a job..."); this->job_manager->submitJob(job, cs, service_specific_args); @@ -1036,14 +1035,14 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) //this->vm_to_suspend.push(std::pair(vm_name, cs)); BlockingQueue> vm_suspended; - this->things_to_do.push([vm_name, cs, &vm_suspended](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - cloud_cs->suspendVM(vm_name); - vm_suspended.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_suspended.push(std::pair(false, e.what())); - } + this->things_to_do.push([vm_name, cs, &vm_suspended]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->suspendVM(vm_name); + vm_suspended.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_suspended.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -1097,14 +1096,14 @@ namespace wrench { BlockingQueue> vm_resumed; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([vm_name, cs, &vm_resumed](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - cloud_cs->resumeVM(vm_name); - vm_resumed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_resumed.push(std::pair(false, e.what())); - } + this->things_to_do.push([vm_name, cs, &vm_resumed]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->resumeVM(vm_name); + vm_resumed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_resumed.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -1131,7 +1130,7 @@ namespace wrench { } auto cloud_cs = std::dynamic_pointer_cast(cs); std::vector execution_hosts_list = cloud_cs->getHosts(); - json answer {}; + json answer{}; answer["execution_hosts"] = execution_hosts_list; return answer; }