From d95e161cd8cbb86a7555bf8dace1223a2d01d3a3 Mon Sep 17 00:00:00 2001 From: Wanyu Z Date: Wed, 4 Oct 2023 16:06:53 -0700 Subject: [PATCH 1/7] add lambda --- .../include/SimulationController.h | 21 +- .../src/SimulationController.cpp | 226 +++++++++--------- 2 files changed, 109 insertions(+), 138 deletions(-) diff --git a/tools/wrench/wrench-daemon/include/SimulationController.h b/tools/wrench/wrench-daemon/include/SimulationController.h index f88534a237..ef38d4355a 100755 --- a/tools/wrench/wrench-daemon/include/SimulationController.h +++ b/tools/wrench/wrench-daemon/include/SimulationController.h @@ -131,26 +131,7 @@ namespace wrench { BlockingQueue file_service_to_start; BlockingQueue, std::shared_ptr, std::map>> submissions_to_do; - BlockingQueue, std::shared_ptr>> vm_to_create; - BlockingQueue> vm_created; - - BlockingQueue>> vm_to_start; - BlockingQueue> vm_started; - - BlockingQueue>> vm_to_shutdown; - BlockingQueue> vm_shutdown; - - BlockingQueue>> vm_to_destroy; - BlockingQueue> vm_destroyed; - - BlockingQueue, std::shared_ptr>> file_to_lookup; - BlockingQueue> file_looked_up; - - BlockingQueue>> vm_to_suspend; - BlockingQueue> vm_suspended; - - BlockingQueue>> vm_to_resume; - BlockingQueue> vm_resumed; + BlockingQueue> things_to_do; // The two managers std::shared_ptr job_manager; diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index e31a53c5c1..ade40ee027 100755 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -56,6 +56,7 @@ namespace wrench { std::pair, std::shared_ptr> spec_vm_to_create; std::pair> vm_id; std::pair, std::shared_ptr> file_lookup_request; + std::function thing_to_do; if (this->compute_services_to_start.tryPop(new_compute_service)) { // starting compute service @@ -78,103 +79,11 @@ namespace wrench { // Add the new service to the registry of existing services, so that later we can look it up by name this->file_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr); - } else if (this->vm_to_create.tryPop(spec_vm_to_create)) { - auto cloud_cs = std::dynamic_pointer_cast(spec_vm_to_create.second); - auto num_cores = std::get<0>(spec_vm_to_create.first); - auto ram_size = std::get<1>(spec_vm_to_create.first); - auto properties = std::get<2>(spec_vm_to_create.first); - auto payloads = std::get<3>(spec_vm_to_create.first); - std::string vm_name; - try { - vm_name = cloud_cs->createVM(num_cores, ram_size, properties, payloads); - this->vm_created.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - this->vm_created.push(std::pair(false, e.getCause()->toString())); - } - - } else if (this->vm_to_start.tryPop(vm_id)) { - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot start VM because it's not down"); - } - auto bm_cs = cloud_cs->startVM(vm_name); - this->compute_service_registry.insert(bm_cs->getName(), bm_cs); - this->vm_started.push(std::pair(true, bm_cs->getName())); - } catch (ExecutionException &e) { - this->vm_created.push(std::pair(false, e.getCause()->toString())); - } catch (std::invalid_argument &e) { - this->vm_started.push(std::pair(false, e.what())); - } - - } else if (this->vm_to_shutdown.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - if (not cloud_cs->isVMRunning(vm_name)) { - throw std::invalid_argument("Cannot shutdown VM because it's not running"); - } - auto bm_cs = cloud_cs->getVMComputeService(vm_name); - - this->compute_service_registry.remove(bm_cs->getName()); - cloud_cs->shutdownVM(vm_name); - this->vm_shutdown.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - this->vm_shutdown.push(std::pair(false, e.what())); - } catch (std::invalid_argument &e) { - this->vm_shutdown.push(std::pair(false, e.what())); - } - - } else if (this->vm_to_destroy.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot destroy VM because it's not down"); - } - cloud_cs->destroyVM(vm_name); - this->vm_destroyed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - this->vm_destroyed.push(std::pair(false, e.what())); - } - - } else if (this->file_to_lookup.tryPop(file_lookup_request)) { - - auto file = file_lookup_request.first; - auto ss = file_lookup_request.second; - try { - bool result = ss->lookupFile(file); - this->file_looked_up.push(std::tuple(true, result, "")); - } catch (std::invalid_argument &e) { - this->file_looked_up.push(std::tuple(false, false, e.what())); - } - - } else if (this->vm_to_suspend.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - cloud_cs->suspendVM(vm_name); - this->vm_suspended.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - this->vm_suspended.push(std::pair(false, e.what())); - } - - } else if (this->vm_to_resume.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - cloud_cs->resumeVM(vm_name); - this->vm_resumed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - this->vm_resumed.push(std::pair(false, e.what())); - } - - } else { + } else if (this->things_to_do.tryPop(thing_to_do)) + { + thing_to_do(); + } + else { break; } } @@ -515,18 +424,23 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_created; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_create.push( - std::pair( - std::tuple(num_cores, - ram_memory, - service_property_list, - service_message_payload_list), - cs)); + this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created](){ + auto cloud_cs = std::dynamic_pointer_cast(cs); + std::string vm_name; + try { + vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); + vm_created.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_created.push(std::pair(false, e.getCause()->toString())); + } + }); // Pool from the shared queue (will be a single one!) std::pair reply; - this->vm_created.waitAndPop(reply); + vm_created.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -553,12 +467,27 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_started; // Push the request into the blocking queue (will be a single one!) - this->vm_to_start.push(std::pair(vm_name, cs)); + this->things_to_do.push([this, vm_name, cs, &vm_started](){ + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot start VM because it's not down"); + } + auto bm_cs = cloud_cs->startVM(vm_name); + this->compute_service_registry.insert(bm_cs->getName(), bm_cs); + vm_started.push(std::pair(true, bm_cs->getName())); + } catch (ExecutionException &e) { + vm_started.push(std::pair(false, e.getCause()->toString())); + } catch (std::invalid_argument &e) { + vm_started.push(std::pair(false, e.what())); + } + }); // Pool from the shared queue (will be a single one!) std::pair reply; - this->vm_started.waitAndPop(reply); + vm_started.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -585,12 +514,31 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_shutdown; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_shutdown.push(std::pair(vm_name, cs)); + //this->vm_to_shutdown.push(std::pair(vm_name, cs)); + this->things_to_do.push([this, vm_name, cs, &vm_shutdown](){ + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMRunning(vm_name)) { + throw std::invalid_argument("Cannot shutdown VM because it's not running"); + } + auto bm_cs = cloud_cs->getVMComputeService(vm_name); + + this->compute_service_registry.remove(bm_cs->getName()); + cloud_cs->shutdownVM(vm_name); + vm_shutdown.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_shutdown.push(std::pair(false, e.what())); + } catch (std::invalid_argument &e) { + vm_shutdown.push(std::pair(false, e.what())); + } + }); // Pool from the shared queue (will be a single one!) std::pair reply; - this->vm_shutdown.waitAndPop(reply); + vm_shutdown.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -615,12 +563,25 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_destroyed; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_destroy.push(std::pair(vm_name, cs)); + this->things_to_do.push([this, vm_name, cs, &vm_destroyed](){ + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot destroy VM because it's not down"); + } + cloud_cs->destroyVM(vm_name); + vm_destroyed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_destroyed.push(std::pair(false, e.what())); + } + }); // Pool from the shared queue (will be a single one!) std::pair reply; - this->vm_destroyed.waitAndPop(reply); + vm_destroyed.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -700,12 +661,21 @@ namespace wrench { throw std::runtime_error("Unknown file " + filename); } + BlockingQueue> file_looked_up; + // Push the request into the blocking queue (will be a single one!) - this->file_to_lookup.push(std::pair(file, ss)); + this->things_to_do.push([this, file, ss, &file_looked_up](){ + try { + bool result = ss->lookupFile(file); + file_looked_up.push(std::tuple(true, result, "")); + } catch (std::invalid_argument &e) { + file_looked_up.push(std::tuple(false, false, e.what())); + } + }); // Pool from the shared queue (will be a single one!) std::tuple reply; - this->file_looked_up.waitAndPop(reply); + file_looked_up.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<2>(reply); @@ -1101,11 +1071,21 @@ namespace wrench { } // Push the request into the blocking queue (will be a single one!) - this->vm_to_suspend.push(std::pair(vm_name, cs)); + //this->vm_to_suspend.push(std::pair(vm_name, cs)); + BlockingQueue> vm_suspended; + this->things_to_do.push([this, vm_name, cs, &vm_suspended](){ + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->suspendVM(vm_name); + vm_suspended.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_suspended.push(std::pair(false, e.what())); + } + }); // Pool from the shared queue (will be a single one!) std::pair reply; - this->vm_suspended.waitAndPop(reply); + vm_suspended.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -1151,12 +1131,22 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_resumed; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_resume.push(std::pair(vm_name, cs)); + this->things_to_do.push([this, vm_name, cs, &vm_resumed](){ + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->resumeVM(vm_name); + vm_resumed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_resumed.push(std::pair(false, e.what())); + } + }); // Pool from the shared queue (will be a single one!) std::pair reply; - this->vm_resumed.waitAndPop(reply); + vm_resumed.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -1177,7 +1167,7 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } auto cloud_cs = std::dynamic_pointer_cast(cs); - std::vector execution_hosts_list = cloud_cs->getExecutionHosts(); + std::vector execution_hosts_list = cloud_cs->getHosts(); json answer {}; answer["execution_hosts"] = execution_hosts_list; return answer; From ab3387048582bb7ce7cbc5ac480974012a442401 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 4 Oct 2023 14:03:23 -1000 Subject: [PATCH 2/7] Some re-working of the code to remove code duplication --- .../include/SimulationController.h | 7 +- .../src/SimulationController.cpp | 268 ++++++++---------- 2 files changed, 127 insertions(+), 148 deletions(-) diff --git a/tools/wrench/wrench-daemon/include/SimulationController.h b/tools/wrench/wrench-daemon/include/SimulationController.h index ef38d4355a..f07458b551 100755 --- a/tools/wrench/wrench-daemon/include/SimulationController.h +++ b/tools/wrench/wrench-daemon/include/SimulationController.h @@ -117,6 +117,10 @@ namespace wrench { json getVMComputeService(json data); private: + + template + json startService(T *s); + // Thread-safe key value stores KeyValueStore> job_registry; KeyValueStore> compute_service_registry; @@ -126,9 +130,6 @@ namespace wrench { // Thread-safe queues for the server thread and the simulation thread to communicate BlockingQueue>> event_queue; - BlockingQueue compute_services_to_start; - BlockingQueue storage_services_to_start; - BlockingQueue file_service_to_start; BlockingQueue, std::shared_ptr, std::map>> submissions_to_do; BlockingQueue> things_to_do; diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index ade40ee027..09a33336a3 100755 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -31,6 +31,46 @@ namespace wrench { std::shared_ptr workflow, const std::string &hostname, int sleep_us) : ExecutionController(hostname, "SimulationController"), workflow(workflow), sleep_us(sleep_us) {} + + template + json SimulationController::startService(T *s) { + BlockingQueue> s_created; + + this->things_to_do.push([this, s, &s_created](){ + try { + auto new_service_shared_ptr = this->simulation->startNewService(s); + if (auto cs = std::dynamic_pointer_cast(new_service_shared_ptr)) { + WRENCH_INFO("Started a new compute service"); + this->compute_service_registry.insert(new_service_shared_ptr->getName(), cs); + } else if (auto ss = std::dynamic_pointer_cast(new_service_shared_ptr)) { + WRENCH_INFO("Started a new storage service"); + this->storage_service_registry.insert(new_service_shared_ptr->getName(), ss); + } else if (auto fs = std::dynamic_pointer_cast(new_service_shared_ptr)) { + WRENCH_INFO("Started a new storage service"); + this->file_service_registry.insert(new_service_shared_ptr->getName(), fs); + } else { + throw std::runtime_error("SimulationController::startNewService(): Unknown service type"); + } + s_created.push(std::pair(true, "")); + } catch (ExecutionException &e) { + s_created.push(std::pair(false, e.getCause()->toString())); + } + }); + + // Poll from the shared queue (will be a single one!) + std::pair reply; + s_created.waitAndPop(reply); + bool success = std::get<0>(reply); + if (not success) { + std::string error_msg = std::get<1>(reply); + throw std::runtime_error("Cannot start Service: " + error_msg); + } else { + json answer; + answer["service_name"] = s->getName(); + return answer; + } + } + /** * @brief Simulation execution_controller's main method * @@ -48,42 +88,14 @@ namespace wrench { // Main control loop while (keep_going) { + // Starting compute and storage services that should be started, if any while (true) { - wrench::ComputeService *new_compute_service = nullptr; - wrench::StorageService *new_storage_service = nullptr; - wrench::FileRegistryService *new_file_service = nullptr; - std::pair, std::shared_ptr> spec_vm_to_create; - std::pair> vm_id; - std::pair, std::shared_ptr> file_lookup_request; std::function thing_to_do; - if (this->compute_services_to_start.tryPop(new_compute_service)) { - // starting compute service - WRENCH_INFO("Starting a new compute service..."); - auto new_service_shared_ptr = this->simulation->startNewService(new_compute_service); - // Add the new service to the registry of existing services, so that later we can look it up by name - this->compute_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr); - - } else if (this->storage_services_to_start.tryPop(new_storage_service)) { - // starting storage service - WRENCH_INFO("Starting a new storage service..."); - auto new_service_shared_ptr = this->simulation->startNewService(new_storage_service); - // Add the new service to the registry of existing services, so that later we can look it up by name - this->storage_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr); - - } else if (this->file_service_to_start.tryPop(new_file_service)) { - // start file registry service - WRENCH_INFO("Starting a new file registry service..."); - auto new_service_shared_ptr = this->simulation->startNewService(new_file_service); - // Add the new service to the registry of existing services, so that later we can look it up by name - this->file_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr); - - } else if (this->things_to_do.tryPop(thing_to_do)) - { + if (this->things_to_do.tryPop(thing_to_do)) { thing_to_do(); - } - else { + } else { break; } } @@ -109,7 +121,7 @@ namespace wrench { // Moves time forward if needed (because the client has done a sleep), // And then add all events that occurred to the event queue double time_to_sleep = std::max(0, time_horizon_to_reach - - wrench::Simulation::getCurrentSimulatedDate()); + wrench::Simulation::getCurrentSimulatedDate()); if (time_to_sleep > 0.0) { WRENCH_INFO("Sleeping %.2lf seconds", time_to_sleep); S4U_Simulation::sleep(time_to_sleep); @@ -300,15 +312,8 @@ namespace wrench { // Create the new service auto new_service = new BareMetalComputeService(head_host, resources, scratch_space, service_property_list, service_message_payload_list); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->compute_services_to_start.push(new_service); - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; + return this->startService(new_service); } /** @@ -340,17 +345,11 @@ namespace wrench { // Create the new service auto new_service = new CloudComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->compute_services_to_start.push(new_service); + return this->startService(new_service); - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; } + /** * REST API Handler * @param data JSON input @@ -380,15 +379,8 @@ namespace wrench { // Create the new service auto new_service = new BatchComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->compute_services_to_start.push(new_service); + return this->startService(new_service); - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; } /** @@ -428,17 +420,17 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - std::string vm_name; - try { - vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); - vm_created.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - vm_created.push(std::pair(false, e.getCause()->toString())); - } + auto cloud_cs = std::dynamic_pointer_cast(cs); + std::string vm_name; + try { + vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); + vm_created.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_created.push(std::pair(false, e.getCause()->toString())); + } }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; vm_created.waitAndPop(reply); bool success = std::get<0>(reply); @@ -470,22 +462,22 @@ namespace wrench { BlockingQueue> vm_started; // Push the request into the blocking queue (will be a single one!) this->things_to_do.push([this, vm_name, cs, &vm_started](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot start VM because it's not down"); - } - auto bm_cs = cloud_cs->startVM(vm_name); - this->compute_service_registry.insert(bm_cs->getName(), bm_cs); - vm_started.push(std::pair(true, bm_cs->getName())); - } catch (ExecutionException &e) { - vm_started.push(std::pair(false, e.getCause()->toString())); - } catch (std::invalid_argument &e) { - vm_started.push(std::pair(false, e.what())); - } + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot start VM because it's not down"); + } + auto bm_cs = cloud_cs->startVM(vm_name); + this->compute_service_registry.insert(bm_cs->getName(), bm_cs); + vm_started.push(std::pair(true, bm_cs->getName())); + } catch (ExecutionException &e) { + vm_started.push(std::pair(false, e.getCause()->toString())); + } catch (std::invalid_argument &e) { + vm_started.push(std::pair(false, e.what())); + } }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; vm_started.waitAndPop(reply); bool success = std::get<0>(reply); @@ -519,24 +511,24 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) //this->vm_to_shutdown.push(std::pair(vm_name, cs)); this->things_to_do.push([this, vm_name, cs, &vm_shutdown](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMRunning(vm_name)) { - throw std::invalid_argument("Cannot shutdown VM because it's not running"); - } - auto bm_cs = cloud_cs->getVMComputeService(vm_name); - - this->compute_service_registry.remove(bm_cs->getName()); - cloud_cs->shutdownVM(vm_name); - vm_shutdown.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - vm_shutdown.push(std::pair(false, e.what())); - } catch (std::invalid_argument &e) { - vm_shutdown.push(std::pair(false, e.what())); - } + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMRunning(vm_name)) { + throw std::invalid_argument("Cannot shutdown VM because it's not running"); + } + auto bm_cs = cloud_cs->getVMComputeService(vm_name); + + this->compute_service_registry.remove(bm_cs->getName()); + cloud_cs->shutdownVM(vm_name); + vm_shutdown.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_shutdown.push(std::pair(false, e.what())); + } catch (std::invalid_argument &e) { + vm_shutdown.push(std::pair(false, e.what())); + } }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; vm_shutdown.waitAndPop(reply); bool success = std::get<0>(reply); @@ -567,19 +559,19 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) this->things_to_do.push([this, vm_name, cs, &vm_destroyed](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot destroy VM because it's not down"); - } - cloud_cs->destroyVM(vm_name); - vm_destroyed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_destroyed.push(std::pair(false, e.what())); - } + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot destroy VM because it's not down"); + } + cloud_cs->destroyVM(vm_name); + vm_destroyed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_destroyed.push(std::pair(false, e.what())); + } }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; vm_destroyed.waitAndPop(reply); bool success = std::get<0>(reply); @@ -602,15 +594,8 @@ namespace wrench { // Create the new service auto new_service = SimpleStorageService::createSimpleStorageService(head_host, mount_points, {}, {}); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->storage_services_to_start.push(new_service); + return this->startService(new_service); - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; } /** @@ -665,15 +650,15 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) this->things_to_do.push([this, file, ss, &file_looked_up](){ - try { - bool result = ss->lookupFile(file); - file_looked_up.push(std::tuple(true, result, "")); - } catch (std::invalid_argument &e) { - file_looked_up.push(std::tuple(false, false, e.what())); - } + try { + bool result = ss->lookupFile(file); + file_looked_up.push(std::tuple(true, result, "")); + } catch (std::invalid_argument &e) { + file_looked_up.push(std::tuple(false, false, e.what())); + } }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::tuple reply; file_looked_up.waitAndPop(reply); bool success = std::get<0>(reply); @@ -697,15 +682,8 @@ namespace wrench { // Create the new service auto new_service = new FileRegistryService(head_host, {}, {}); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->file_service_to_start.push(new_service); - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; + return this->startService(new_service); } /** @@ -1074,16 +1052,16 @@ namespace wrench { //this->vm_to_suspend.push(std::pair(vm_name, cs)); BlockingQueue> vm_suspended; this->things_to_do.push([this, vm_name, cs, &vm_suspended](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - cloud_cs->suspendVM(vm_name); - vm_suspended.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_suspended.push(std::pair(false, e.what())); - } + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->suspendVM(vm_name); + vm_suspended.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_suspended.push(std::pair(false, e.what())); + } }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; vm_suspended.waitAndPop(reply); bool success = std::get<0>(reply); @@ -1135,16 +1113,16 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) this->things_to_do.push([this, vm_name, cs, &vm_resumed](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - cloud_cs->resumeVM(vm_name); - vm_resumed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_resumed.push(std::pair(false, e.what())); - } + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->resumeVM(vm_name); + vm_resumed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_resumed.push(std::pair(false, e.what())); + } }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; vm_resumed.waitAndPop(reply); bool success = std::get<0>(reply); From f78ffa9beee6315526045563c544885ee5d1dc88 Mon Sep 17 00:00:00 2001 From: Wanyu Z Date: Sun, 8 Oct 2023 00:18:36 -0700 Subject: [PATCH 3/7] Using marco to replace the duplicate code --- .../src/SimulationController.cpp | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index 09a33336a3..f74917ba0c 100755 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -20,6 +20,20 @@ WRENCH_LOG_CATEGORY(simulation_controller, "Log category for SimulationController"); +#define PARSE_SERVICE_PROPERTY_LIST() WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \ +json jsonData = json::parse(property_list_string); \ +for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto property_key = ServiceProperty::translateString(it.key()); \ + service_property_list[property_key] = it.value(); \ +} \ + +#define PARSE_MESSAGE_PAYLOAD_LIST() WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \ +jsonData = json::parse(message_payload_list_string); \ +for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ + service_message_payload_list[message_payload_key] = it.value(); \ +} + namespace wrench { /** @@ -100,17 +114,6 @@ namespace wrench { } } - // Submit jobs that should be submitted - while (true) { - std::tuple, std::shared_ptr, std::map> submission_to_do; - if (this->submissions_to_do.tryPop(submission_to_do)) { - WRENCH_INFO("Submitting a job..."); - this->job_manager->submitJob(std::get<0>(submission_to_do), std::get<1>(submission_to_do), std::get<2>(submission_to_do)); - } else { - break; - } - } - // If the server thread is waiting for the next event to occur, just do that if (time_horizon_to_reach < 0) { time_horizon_to_reach = Simulation::getCurrentSimulatedDate(); @@ -286,29 +289,29 @@ namespace wrench { std::string head_host = data["head_host"]; std::string resource = data["resources"]; std::string scratch_space = data["scratch_space"]; - std::string property_list = data["property_list"]; - std::string message_payload_list = data["message_payload_list"]; - - map> resources; - json jsonData = json::parse(resource); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - resources.emplace(it.key(), it.value()); - } + std::string property_list_string = data["property_list"]; + std::string message_payload_list_string = data["message_payload_list"]; WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - jsonData = json::parse(property_list); + json jsonData = json::parse(property_list_string); for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { auto property_key = ServiceProperty::translateString(it.key()); service_property_list[property_key] = it.value(); } WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list); + jsonData = json::parse(message_payload_list_string); for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { auto message_payload_key = ServiceMessagePayload::translateString(it.key()); service_message_payload_list[message_payload_key] = it.value(); } + map> resources; + jsonData = json::parse(resource); + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { + resources.emplace(it.key(), it.value()); + } + // Create the new service auto new_service = new BareMetalComputeService(head_host, resources, scratch_space, service_property_list, service_message_payload_list); @@ -740,7 +743,10 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } - this->submissions_to_do.push(std::tuple(job, cs, service_specific_args)); + this->things_to_do.push([this, job, cs, service_specific_args](){ + WRENCH_INFO("Submitting a job..."); + this->job_manager->submitJob(job, cs, service_specific_args); + }); return {}; } From 853cdaacd2493231e4847ceccb83dc1cfbc122ee Mon Sep 17 00:00:00 2001 From: Wanyu Z Date: Sun, 8 Oct 2023 00:22:29 -0700 Subject: [PATCH 4/7] using marco to decrease the redundancy of the code --- .../src/SimulationController.cpp | 56 +++---------------- 1 file changed, 8 insertions(+), 48 deletions(-) diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index f74917ba0c..1d4ab0c84a 100755 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -292,19 +292,9 @@ namespace wrench { std::string property_list_string = data["property_list"]; std::string message_payload_list_string = data["message_payload_list"]; - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - json jsonData = json::parse(property_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } + PARSE_SERVICE_PROPERTY_LIST() - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } + PARSE_MESSAGE_PAYLOAD_LIST() map> resources; jsonData = json::parse(resource); @@ -331,19 +321,9 @@ namespace wrench { std::string property_list_string = data["property_list"]; std::string message_payload_list_string = data["message_payload_list"]; - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - json jsonData = json::parse(property_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } + PARSE_SERVICE_PROPERTY_LIST() - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } + PARSE_MESSAGE_PAYLOAD_LIST() // Create the new service auto new_service = new CloudComputeService(hostname, resources, scratch_space, @@ -365,19 +345,9 @@ namespace wrench { std::string property_list_string = data["property_list"]; std::string message_payload_list_string = data["message_payload_list"]; - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - json jsonData = json::parse(property_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } + PARSE_SERVICE_PROPERTY_LIST() - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } + PARSE_MESSAGE_PAYLOAD_LIST() // Create the new service auto new_service = new BatchComputeService(hostname, resources, scratch_space, @@ -399,19 +369,9 @@ namespace wrench { std::string property_list_string = data["property_list"]; std::string message_payload_list_string = data["message_payload_list"]; - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - json jsonData = json::parse(property_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } + PARSE_SERVICE_PROPERTY_LIST() - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } + PARSE_MESSAGE_PAYLOAD_LIST() // Lookup the cloud compute service std::shared_ptr cs; From 088fc4d77da48b3f7221b96b04ecdba5e7b64243 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 11 Oct 2023 13:18:56 -1000 Subject: [PATCH 5/7] wrench-daemon code refactor complete --- .../src/SimulationController.cpp | 57 ++++++++++++------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index 1d4ab0c84a..13533991ff 100755 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -21,17 +21,21 @@ WRENCH_LOG_CATEGORY(simulation_controller, "Log category for SimulationController"); #define PARSE_SERVICE_PROPERTY_LIST() WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \ -json jsonData = json::parse(property_list_string); \ -for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ - auto property_key = ServiceProperty::translateString(it.key()); \ - service_property_list[property_key] = it.value(); \ -} \ +{ \ + json jsonData = json::parse(property_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto property_key = ServiceProperty::translateString(it.key()); \ + service_property_list[property_key] = it.value(); \ + } \ +} #define PARSE_MESSAGE_PAYLOAD_LIST() WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \ -jsonData = json::parse(message_payload_list_string); \ -for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ - service_message_payload_list[message_payload_key] = it.value(); \ +{ \ + json jsonData = json::parse(message_payload_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ + service_message_payload_list[message_payload_key] = it.value(); \ + } \ } namespace wrench { @@ -297,7 +301,7 @@ namespace wrench { PARSE_MESSAGE_PAYLOAD_LIST() map> resources; - jsonData = json::parse(resource); + json jsonData = json::parse(resource); for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { resources.emplace(it.key(), it.value()); } @@ -521,7 +525,7 @@ namespace wrench { BlockingQueue> vm_destroyed; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([this, vm_name, cs, &vm_destroyed](){ + this->things_to_do.push([vm_name, cs, &vm_destroyed](){ auto cloud_cs = std::dynamic_pointer_cast(cs); try { if (not cloud_cs->isVMDown(vm_name)) { @@ -612,7 +616,7 @@ namespace wrench { BlockingQueue> file_looked_up; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([this, file, ss, &file_looked_up](){ + this->things_to_do.push([file, ss, &file_looked_up](){ try { bool result = ss->lookupFile(file); file_looked_up.push(std::tuple(true, result, "")); @@ -703,11 +707,26 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } - this->things_to_do.push([this, job, cs, service_specific_args](){ - WRENCH_INFO("Submitting a job..."); - this->job_manager->submitJob(job, cs, service_specific_args); + BlockingQueue> job_submitted; + this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted](){ + try { + WRENCH_INFO("Submitting a job..."); + this->job_manager->submitJob(job, cs, service_specific_args); + job_submitted.push(std::make_pair(true, "")); + } catch (std::exception &e) { + job_submitted.push(std::make_pair(false, e.what())); + } }); - return {}; + // Poll from the shared queue (will be a single one!) + std::pair reply; + job_submitted.waitAndPop(reply); + bool success = std::get<0>(reply); + if (not success) { + std::string error_msg = std::get<1>(reply); + throw std::runtime_error("Cannot submit job: " + error_msg); + } else { + return {}; + } } /** @@ -1017,7 +1036,7 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) //this->vm_to_suspend.push(std::pair(vm_name, cs)); BlockingQueue> vm_suspended; - this->things_to_do.push([this, vm_name, cs, &vm_suspended](){ + this->things_to_do.push([vm_name, cs, &vm_suspended](){ auto cloud_cs = std::dynamic_pointer_cast(cs); try { cloud_cs->suspendVM(vm_name); @@ -1078,7 +1097,7 @@ namespace wrench { BlockingQueue> vm_resumed; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([this, vm_name, cs, &vm_resumed](){ + this->things_to_do.push([vm_name, cs, &vm_resumed](){ auto cloud_cs = std::dynamic_pointer_cast(cs); try { cloud_cs->resumeVM(vm_name); @@ -1094,7 +1113,7 @@ namespace wrench { bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); - throw std::runtime_error("Cannot suspend VM: " + error_msg); + throw std::runtime_error("Cannot resume VM: " + error_msg); } else { return {}; } From ca77fe45e5d10de7c2fc881f5a5c95b13ff084ac Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 11 Oct 2023 23:26:54 +0000 Subject: [PATCH 6/7] Committing clang-format changes --- .../LogicalFileSystem.cpp | 2 +- .../include/SimulationController.h | 1 - .../src/SimulationController.cpp | 199 +++++++++--------- 3 files changed, 100 insertions(+), 102 deletions(-) mode change 100755 => 100644 tools/wrench/wrench-daemon/include/SimulationController.h diff --git a/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp b/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp index 2d35983fbe..9a3a059993 100644 --- a/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp +++ b/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp @@ -269,7 +269,7 @@ namespace wrench { WRENCH_WARN("LogicalFileSystem::reserveSpace(): Space was already being reserved for storing file %s at path %s:%s. " "This is likely a redundant copy, and nothing needs to be done", file->getID().c_str(), this->hostname.c_str(), fixed_path.c_str()); - return true; + return true; } if (this->free_space < file->getSize()) { diff --git a/tools/wrench/wrench-daemon/include/SimulationController.h b/tools/wrench/wrench-daemon/include/SimulationController.h old mode 100755 new mode 100644 index f07458b551..353ea7fd6b --- a/tools/wrench/wrench-daemon/include/SimulationController.h +++ b/tools/wrench/wrench-daemon/include/SimulationController.h @@ -117,7 +117,6 @@ namespace wrench { json getVMComputeService(json data); private: - template json startService(T *s); diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index 13533991ff..0ed6464909 100644 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -20,23 +20,25 @@ WRENCH_LOG_CATEGORY(simulation_controller, "Log category for SimulationController"); -#define PARSE_SERVICE_PROPERTY_LIST() WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \ -{ \ - json jsonData = json::parse(property_list_string); \ - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ - auto property_key = ServiceProperty::translateString(it.key()); \ - service_property_list[property_key] = it.value(); \ - } \ -} - -#define PARSE_MESSAGE_PAYLOAD_LIST() WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \ -{ \ - json jsonData = json::parse(message_payload_list_string); \ - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ - service_message_payload_list[message_payload_key] = it.value(); \ - } \ -} +#define PARSE_SERVICE_PROPERTY_LIST() \ + WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \ + { \ + json jsonData = json::parse(property_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto property_key = ServiceProperty::translateString(it.key()); \ + service_property_list[property_key] = it.value(); \ + } \ + } + +#define PARSE_MESSAGE_PAYLOAD_LIST() \ + WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \ + { \ + json jsonData = json::parse(message_payload_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ + service_message_payload_list[message_payload_key] = it.value(); \ + } \ + } namespace wrench { @@ -50,11 +52,11 @@ namespace wrench { const std::string &hostname, int sleep_us) : ExecutionController(hostname, "SimulationController"), workflow(workflow), sleep_us(sleep_us) {} - template + template json SimulationController::startService(T *s) { BlockingQueue> s_created; - this->things_to_do.push([this, s, &s_created](){ + this->things_to_do.push([this, s, &s_created]() { try { auto new_service_shared_ptr = this->simulation->startNewService(s); if (auto cs = std::dynamic_pointer_cast(new_service_shared_ptr)) { @@ -106,10 +108,10 @@ namespace wrench { // Main control loop while (keep_going) { - + // Starting compute and storage services that should be started, if any while (true) { - std::function thing_to_do; + std::function thing_to_do; if (this->things_to_do.tryPop(thing_to_do)) { thing_to_do(); @@ -128,7 +130,7 @@ namespace wrench { // Moves time forward if needed (because the client has done a sleep), // And then add all events that occurred to the event queue double time_to_sleep = std::max(0, time_horizon_to_reach - - wrench::Simulation::getCurrentSimulatedDate()); + wrench::Simulation::getCurrentSimulatedDate()); if (time_to_sleep > 0.0) { WRENCH_INFO("Sleeping %.2lf seconds", time_to_sleep); S4U_Simulation::sleep(time_to_sleep); @@ -333,7 +335,6 @@ namespace wrench { auto new_service = new CloudComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); return this->startService(new_service); - } @@ -357,7 +358,6 @@ namespace wrench { auto new_service = new BatchComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); return this->startService(new_service); - } /** @@ -386,15 +386,15 @@ namespace wrench { BlockingQueue> vm_created; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - std::string vm_name; - try { - vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); - vm_created.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - vm_created.push(std::pair(false, e.getCause()->toString())); - } + this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + std::string vm_name; + try { + vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); + vm_created.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_created.push(std::pair(false, e.getCause()->toString())); + } }); // Poll from the shared queue (will be a single one!) @@ -428,20 +428,20 @@ namespace wrench { BlockingQueue> vm_started; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([this, vm_name, cs, &vm_started](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot start VM because it's not down"); - } - auto bm_cs = cloud_cs->startVM(vm_name); - this->compute_service_registry.insert(bm_cs->getName(), bm_cs); - vm_started.push(std::pair(true, bm_cs->getName())); - } catch (ExecutionException &e) { - vm_started.push(std::pair(false, e.getCause()->toString())); - } catch (std::invalid_argument &e) { - vm_started.push(std::pair(false, e.what())); - } + this->things_to_do.push([this, vm_name, cs, &vm_started]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot start VM because it's not down"); + } + auto bm_cs = cloud_cs->startVM(vm_name); + this->compute_service_registry.insert(bm_cs->getName(), bm_cs); + vm_started.push(std::pair(true, bm_cs->getName())); + } catch (ExecutionException &e) { + vm_started.push(std::pair(false, e.getCause()->toString())); + } catch (std::invalid_argument &e) { + vm_started.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -477,22 +477,22 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) //this->vm_to_shutdown.push(std::pair(vm_name, cs)); - this->things_to_do.push([this, vm_name, cs, &vm_shutdown](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMRunning(vm_name)) { - throw std::invalid_argument("Cannot shutdown VM because it's not running"); - } - auto bm_cs = cloud_cs->getVMComputeService(vm_name); - - this->compute_service_registry.remove(bm_cs->getName()); - cloud_cs->shutdownVM(vm_name); - vm_shutdown.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - vm_shutdown.push(std::pair(false, e.what())); - } catch (std::invalid_argument &e) { - vm_shutdown.push(std::pair(false, e.what())); - } + this->things_to_do.push([this, vm_name, cs, &vm_shutdown]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMRunning(vm_name)) { + throw std::invalid_argument("Cannot shutdown VM because it's not running"); + } + auto bm_cs = cloud_cs->getVMComputeService(vm_name); + + this->compute_service_registry.remove(bm_cs->getName()); + cloud_cs->shutdownVM(vm_name); + vm_shutdown.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_shutdown.push(std::pair(false, e.what())); + } catch (std::invalid_argument &e) { + vm_shutdown.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -525,17 +525,17 @@ namespace wrench { BlockingQueue> vm_destroyed; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([vm_name, cs, &vm_destroyed](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot destroy VM because it's not down"); - } - cloud_cs->destroyVM(vm_name); - vm_destroyed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_destroyed.push(std::pair(false, e.what())); - } + this->things_to_do.push([vm_name, cs, &vm_destroyed]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot destroy VM because it's not down"); + } + cloud_cs->destroyVM(vm_name); + vm_destroyed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_destroyed.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -562,7 +562,6 @@ namespace wrench { // Create the new service auto new_service = SimpleStorageService::createSimpleStorageService(head_host, mount_points, {}, {}); return this->startService(new_service); - } /** @@ -616,13 +615,13 @@ namespace wrench { BlockingQueue> file_looked_up; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([file, ss, &file_looked_up](){ - try { - bool result = ss->lookupFile(file); - file_looked_up.push(std::tuple(true, result, "")); - } catch (std::invalid_argument &e) { - file_looked_up.push(std::tuple(false, false, e.what())); - } + this->things_to_do.push([file, ss, &file_looked_up]() { + try { + bool result = ss->lookupFile(file); + file_looked_up.push(std::tuple(true, result, "")); + } catch (std::invalid_argument &e) { + file_looked_up.push(std::tuple(false, false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -708,7 +707,7 @@ namespace wrench { } BlockingQueue> job_submitted; - this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted](){ + this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted]() { try { WRENCH_INFO("Submitting a job..."); this->job_manager->submitJob(job, cs, service_specific_args); @@ -1036,14 +1035,14 @@ namespace wrench { // Push the request into the blocking queue (will be a single one!) //this->vm_to_suspend.push(std::pair(vm_name, cs)); BlockingQueue> vm_suspended; - this->things_to_do.push([vm_name, cs, &vm_suspended](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - cloud_cs->suspendVM(vm_name); - vm_suspended.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_suspended.push(std::pair(false, e.what())); - } + this->things_to_do.push([vm_name, cs, &vm_suspended]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->suspendVM(vm_name); + vm_suspended.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_suspended.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -1097,14 +1096,14 @@ namespace wrench { BlockingQueue> vm_resumed; // Push the request into the blocking queue (will be a single one!) - this->things_to_do.push([vm_name, cs, &vm_resumed](){ - auto cloud_cs = std::dynamic_pointer_cast(cs); - try { - cloud_cs->resumeVM(vm_name); - vm_resumed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - vm_resumed.push(std::pair(false, e.what())); - } + this->things_to_do.push([vm_name, cs, &vm_resumed]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->resumeVM(vm_name); + vm_resumed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_resumed.push(std::pair(false, e.what())); + } }); // Poll from the shared queue (will be a single one!) @@ -1131,7 +1130,7 @@ namespace wrench { } auto cloud_cs = std::dynamic_pointer_cast(cs); std::vector execution_hosts_list = cloud_cs->getHosts(); - json answer {}; + json answer{}; answer["execution_hosts"] = execution_hosts_list; return answer; } From 052b9c6ae95fe403fe96ad6efd4987e4fb70ce85 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 12 Oct 2023 14:09:35 -1000 Subject: [PATCH 7/7] Removed a few useless simgrid::s4u::Host::by_name() calls --- .../storage/simple/SimpleStorageServiceNonBufferized.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp b/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp index f878a44a34..e5bb66ac24 100755 --- a/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp +++ b/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp @@ -484,8 +484,8 @@ namespace wrench { src_location->toString().c_str(), dst_location->toString().c_str()); - auto src_host = simgrid::s4u::Host::by_name(src_location->getStorageService()->getHostname()); - auto dst_host = simgrid::s4u::Host::by_name(dst_location->getStorageService()->getHostname()); + auto src_host = src_location->getStorageService()->getHost(); + auto dst_host = dst_location->getStorageService()->getHost(); auto src_disk = src_location->getDiskOrNull(); if (src_disk == nullptr) { @@ -577,8 +577,8 @@ namespace wrench { dst_location->toString().c_str()); // TODO: This code is duplicated with the IAmNotTheSource version of this method - auto src_host = simgrid::s4u::Host::by_name(src_location->getStorageService()->getHostname()); - auto dst_host = simgrid::s4u::Host::by_name(dst_location->getStorageService()->getHostname()); + auto src_host = src_location->getStorageService()->getHost(); + auto dst_host = dst_location->getStorageService()->getHost(); auto src_disk = src_location->getDiskOrNull(); if (src_disk == nullptr) {