diff --git a/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp b/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp index 20baecb092..524506f855 100755 --- a/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp +++ b/src/wrench/services/storage/simple/SimpleStorageServiceNonBufferized.cpp @@ -511,8 +511,8 @@ namespace wrench { src_location->toString().c_str(), dst_location->toString().c_str()); - auto src_host = simgrid::s4u::Host::by_name(src_location->getStorageService()->getHostname()); - auto dst_host = simgrid::s4u::Host::by_name(dst_location->getStorageService()->getHostname()); + auto src_host = src_location->getStorageService()->getHost(); + auto dst_host = dst_location->getStorageService()->getHost(); auto src_disk = src_location->getDiskOrNull(); if (src_disk == nullptr) { @@ -604,8 +604,8 @@ namespace wrench { dst_location->toString().c_str()); // TODO: This code is duplicated with the IAmNotTheSource version of this method - auto src_host = simgrid::s4u::Host::by_name(src_location->getStorageService()->getHostname()); - auto dst_host = simgrid::s4u::Host::by_name(dst_location->getStorageService()->getHostname()); + auto src_host = src_location->getStorageService()->getHost(); + auto dst_host = dst_location->getStorageService()->getHost(); auto src_disk = src_location->getDiskOrNull(); if (src_disk == nullptr) { diff --git a/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp b/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp index 2d35983fbe..9a3a059993 100644 --- a/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp +++ b/src/wrench/services/storage/storage_helper_classes/LogicalFileSystem.cpp @@ -269,7 +269,7 @@ namespace wrench { WRENCH_WARN("LogicalFileSystem::reserveSpace(): Space was already being reserved for storing file %s at path %s:%s. " "This is likely a redundant copy, and nothing needs to be done", file->getID().c_str(), this->hostname.c_str(), fixed_path.c_str()); - return true; + return true; } if (this->free_space < file->getSize()) { diff --git a/tools/wrench/wrench-daemon/include/SimulationController.h b/tools/wrench/wrench-daemon/include/SimulationController.h old mode 100755 new mode 100644 index f88534a237..353ea7fd6b --- a/tools/wrench/wrench-daemon/include/SimulationController.h +++ b/tools/wrench/wrench-daemon/include/SimulationController.h @@ -117,6 +117,9 @@ namespace wrench { json getVMComputeService(json data); private: + template + json startService(T *s); + // Thread-safe key value stores KeyValueStore> job_registry; KeyValueStore> compute_service_registry; @@ -126,31 +129,9 @@ namespace wrench { // Thread-safe queues for the server thread and the simulation thread to communicate BlockingQueue>> event_queue; - BlockingQueue compute_services_to_start; - BlockingQueue storage_services_to_start; - BlockingQueue file_service_to_start; BlockingQueue, std::shared_ptr, std::map>> submissions_to_do; - BlockingQueue, std::shared_ptr>> vm_to_create; - BlockingQueue> vm_created; - - BlockingQueue>> vm_to_start; - BlockingQueue> vm_started; - - BlockingQueue>> vm_to_shutdown; - BlockingQueue> vm_shutdown; - - BlockingQueue>> vm_to_destroy; - BlockingQueue> vm_destroyed; - - BlockingQueue, std::shared_ptr>> file_to_lookup; - BlockingQueue> file_looked_up; - - BlockingQueue>> vm_to_suspend; - BlockingQueue> vm_suspended; - - BlockingQueue>> vm_to_resume; - BlockingQueue> vm_resumed; + BlockingQueue> things_to_do; // The two managers std::shared_ptr job_manager; diff --git a/tools/wrench/wrench-daemon/src/SimulationController.cpp b/tools/wrench/wrench-daemon/src/SimulationController.cpp index f728742266..0ed6464909 100644 --- a/tools/wrench/wrench-daemon/src/SimulationController.cpp +++ b/tools/wrench/wrench-daemon/src/SimulationController.cpp @@ -20,6 +20,26 @@ WRENCH_LOG_CATEGORY(simulation_controller, "Log category for SimulationController"); +#define PARSE_SERVICE_PROPERTY_LIST() \ + WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \ + { \ + json jsonData = json::parse(property_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto property_key = ServiceProperty::translateString(it.key()); \ + service_property_list[property_key] = it.value(); \ + } \ + } + +#define PARSE_MESSAGE_PAYLOAD_LIST() \ + WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \ + { \ + json jsonData = json::parse(message_payload_list_string); \ + for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { \ + auto message_payload_key = ServiceMessagePayload::translateString(it.key()); \ + service_message_payload_list[message_payload_key] = it.value(); \ + } \ + } + namespace wrench { /** @@ -31,6 +51,46 @@ namespace wrench { std::shared_ptr workflow, const std::string &hostname, int sleep_us) : ExecutionController(hostname, "SimulationController"), workflow(workflow), sleep_us(sleep_us) {} + + template + json SimulationController::startService(T *s) { + BlockingQueue> s_created; + + this->things_to_do.push([this, s, &s_created]() { + try { + auto new_service_shared_ptr = this->simulation->startNewService(s); + if (auto cs = std::dynamic_pointer_cast(new_service_shared_ptr)) { + WRENCH_INFO("Started a new compute service"); + this->compute_service_registry.insert(new_service_shared_ptr->getName(), cs); + } else if (auto ss = std::dynamic_pointer_cast(new_service_shared_ptr)) { + WRENCH_INFO("Started a new storage service"); + this->storage_service_registry.insert(new_service_shared_ptr->getName(), ss); + } else if (auto fs = std::dynamic_pointer_cast(new_service_shared_ptr)) { + WRENCH_INFO("Started a new storage service"); + this->file_service_registry.insert(new_service_shared_ptr->getName(), fs); + } else { + throw std::runtime_error("SimulationController::startNewService(): Unknown service type"); + } + s_created.push(std::pair(true, "")); + } catch (ExecutionException &e) { + s_created.push(std::pair(false, e.getCause()->toString())); + } + }); + + // Poll from the shared queue (will be a single one!) + std::pair reply; + s_created.waitAndPop(reply); + bool success = std::get<0>(reply); + if (not success) { + std::string error_msg = std::get<1>(reply); + throw std::runtime_error("Cannot start Service: " + error_msg); + } else { + json answer; + answer["service_name"] = s->getName(); + return answer; + } + } + /** * @brief Simulation execution_controller's main method * @@ -48,143 +108,13 @@ namespace wrench { // Main control loop while (keep_going) { + // Starting compute and storage services that should be started, if any while (true) { - wrench::ComputeService *new_compute_service = nullptr; - wrench::StorageService *new_storage_service = nullptr; - wrench::FileRegistryService *new_file_service = nullptr; - std::pair, std::shared_ptr> spec_vm_to_create; - std::pair> vm_id; - std::pair, std::shared_ptr> file_lookup_request; - - if (this->compute_services_to_start.tryPop(new_compute_service)) { - // starting compute service - WRENCH_INFO("Starting a new compute service..."); - auto new_service_shared_ptr = this->simulation->startNewService(new_compute_service); - // Add the new service to the registry of existing services, so that later we can look it up by name - this->compute_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr); - - } else if (this->storage_services_to_start.tryPop(new_storage_service)) { - // starting storage service - WRENCH_INFO("Starting a new storage service..."); - auto new_service_shared_ptr = this->simulation->startNewService(new_storage_service); - // Add the new service to the registry of existing services, so that later we can look it up by name - this->storage_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr); - - } else if (this->file_service_to_start.tryPop(new_file_service)) { - // start file registry service - WRENCH_INFO("Starting a new file registry service..."); - auto new_service_shared_ptr = this->simulation->startNewService(new_file_service); - // Add the new service to the registry of existing services, so that later we can look it up by name - this->file_service_registry.insert(new_service_shared_ptr->getName(), new_service_shared_ptr); - - } else if (this->vm_to_create.tryPop(spec_vm_to_create)) { - auto cloud_cs = std::dynamic_pointer_cast(spec_vm_to_create.second); - auto num_cores = std::get<0>(spec_vm_to_create.first); - auto ram_size = std::get<1>(spec_vm_to_create.first); - auto properties = std::get<2>(spec_vm_to_create.first); - auto payloads = std::get<3>(spec_vm_to_create.first); - std::string vm_name; - try { - vm_name = cloud_cs->createVM(num_cores, ram_size, properties, payloads); - this->vm_created.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - this->vm_created.push(std::pair(false, e.getCause()->toString())); - } - - } else if (this->vm_to_start.tryPop(vm_id)) { - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot start VM because it's not down"); - } - auto bm_cs = cloud_cs->startVM(vm_name); - this->compute_service_registry.insert(bm_cs->getName(), bm_cs); - this->vm_started.push(std::pair(true, bm_cs->getName())); - } catch (ExecutionException &e) { - this->vm_created.push(std::pair(false, e.getCause()->toString())); - } catch (std::invalid_argument &e) { - this->vm_started.push(std::pair(false, e.what())); - } - - } else if (this->vm_to_shutdown.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - if (not cloud_cs->isVMRunning(vm_name)) { - throw std::invalid_argument("Cannot shutdown VM because it's not running"); - } - auto bm_cs = cloud_cs->getVMComputeService(vm_name); - - this->compute_service_registry.remove(bm_cs->getName()); - cloud_cs->shutdownVM(vm_name); - this->vm_shutdown.push(std::pair(true, vm_name)); - } catch (ExecutionException &e) { - this->vm_shutdown.push(std::pair(false, e.what())); - } catch (std::invalid_argument &e) { - this->vm_shutdown.push(std::pair(false, e.what())); - } - - } else if (this->vm_to_destroy.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - if (not cloud_cs->isVMDown(vm_name)) { - throw std::invalid_argument("Cannot destroy VM because it's not down"); - } - cloud_cs->destroyVM(vm_name); - this->vm_destroyed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - this->vm_destroyed.push(std::pair(false, e.what())); - } - - } else if (this->file_to_lookup.tryPop(file_lookup_request)) { - - auto file = file_lookup_request.first; - auto ss = file_lookup_request.second; - try { - bool result = ss->lookupFile(file); - this->file_looked_up.push(std::tuple(true, result, "")); - } catch (std::invalid_argument &e) { - this->file_looked_up.push(std::tuple(false, false, e.what())); - } - - } else if (this->vm_to_suspend.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - cloud_cs->suspendVM(vm_name); - this->vm_suspended.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - this->vm_suspended.push(std::pair(false, e.what())); - } - - } else if (this->vm_to_resume.tryPop(vm_id)) { - - auto cloud_cs = std::dynamic_pointer_cast(vm_id.second); - auto vm_name = vm_id.first; - try { - cloud_cs->resumeVM(vm_name); - this->vm_resumed.push(std::pair(true, vm_name)); - } catch (std::invalid_argument &e) { - this->vm_resumed.push(std::pair(false, e.what())); - } + std::function thing_to_do; - } else { - break; - } - } - - // Submit jobs that should be submitted - while (true) { - std::tuple, std::shared_ptr, std::map> submission_to_do; - if (this->submissions_to_do.tryPop(submission_to_do)) { - WRENCH_INFO("Submitting a job..."); - this->job_manager->submitJob(std::get<0>(submission_to_do), std::get<1>(submission_to_do), std::get<2>(submission_to_do)); + if (this->things_to_do.tryPop(thing_to_do)) { + thing_to_do(); } else { break; } @@ -365,8 +295,12 @@ namespace wrench { std::string head_host = data["head_host"]; std::string resource = data["resources"]; std::string scratch_space = data["scratch_space"]; - std::string property_list = data["property_list"]; - std::string message_payload_list = data["message_payload_list"]; + std::string property_list_string = data["property_list"]; + std::string message_payload_list_string = data["message_payload_list"]; + + PARSE_SERVICE_PROPERTY_LIST() + + PARSE_MESSAGE_PAYLOAD_LIST() map> resources; json jsonData = json::parse(resource); @@ -374,32 +308,11 @@ namespace wrench { resources.emplace(it.key(), it.value()); } - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - jsonData = json::parse(property_list); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } - - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } - // Create the new service auto new_service = new BareMetalComputeService(head_host, resources, scratch_space, service_property_list, service_message_payload_list); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->compute_services_to_start.push(new_service); - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; + return this->startService(new_service); } /** @@ -414,34 +327,17 @@ namespace wrench { std::string property_list_string = data["property_list"]; std::string message_payload_list_string = data["message_payload_list"]; - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - json jsonData = json::parse(property_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } + PARSE_SERVICE_PROPERTY_LIST() - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } + PARSE_MESSAGE_PAYLOAD_LIST() // Create the new service auto new_service = new CloudComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->compute_services_to_start.push(new_service); - - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; + return this->startService(new_service); } + /** * REST API Handler * @param data JSON input @@ -454,32 +350,14 @@ namespace wrench { std::string property_list_string = data["property_list"]; std::string message_payload_list_string = data["message_payload_list"]; - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - json jsonData = json::parse(property_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } + PARSE_SERVICE_PROPERTY_LIST() - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } + PARSE_MESSAGE_PAYLOAD_LIST() // Create the new service auto new_service = new BatchComputeService(hostname, resources, scratch_space, service_property_list, service_message_payload_list); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->compute_services_to_start.push(new_service); - - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; + return this->startService(new_service); } /** @@ -495,19 +373,9 @@ namespace wrench { std::string property_list_string = data["property_list"]; std::string message_payload_list_string = data["message_payload_list"]; - WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; - json jsonData = json::parse(property_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto property_key = ServiceProperty::translateString(it.key()); - service_property_list[property_key] = it.value(); - } + PARSE_SERVICE_PROPERTY_LIST() - WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; - jsonData = json::parse(message_payload_list_string); - for (auto it = jsonData.cbegin(); it != jsonData.cend(); ++it) { - auto message_payload_key = ServiceMessagePayload::translateString(it.key()); - service_message_payload_list[message_payload_key] = it.value(); - } + PARSE_MESSAGE_PAYLOAD_LIST() // Lookup the cloud compute service std::shared_ptr cs; @@ -515,18 +383,23 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_created; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_create.push( - std::pair( - std::tuple(num_cores, - ram_memory, - service_property_list, - service_message_payload_list), - cs)); - - // Pool from the shared queue (will be a single one!) + this->things_to_do.push([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + std::string vm_name; + try { + vm_name = cloud_cs->createVM(num_cores, ram_memory, service_property_list, service_message_payload_list); + vm_created.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_created.push(std::pair(false, e.getCause()->toString())); + } + }); + + // Poll from the shared queue (will be a single one!) std::pair reply; - this->vm_created.waitAndPop(reply); + vm_created.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -553,12 +426,27 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_started; // Push the request into the blocking queue (will be a single one!) - this->vm_to_start.push(std::pair(vm_name, cs)); + this->things_to_do.push([this, vm_name, cs, &vm_started]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot start VM because it's not down"); + } + auto bm_cs = cloud_cs->startVM(vm_name); + this->compute_service_registry.insert(bm_cs->getName(), bm_cs); + vm_started.push(std::pair(true, bm_cs->getName())); + } catch (ExecutionException &e) { + vm_started.push(std::pair(false, e.getCause()->toString())); + } catch (std::invalid_argument &e) { + vm_started.push(std::pair(false, e.what())); + } + }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; - this->vm_started.waitAndPop(reply); + vm_started.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -585,12 +473,31 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_shutdown; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_shutdown.push(std::pair(vm_name, cs)); + //this->vm_to_shutdown.push(std::pair(vm_name, cs)); + this->things_to_do.push([this, vm_name, cs, &vm_shutdown]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMRunning(vm_name)) { + throw std::invalid_argument("Cannot shutdown VM because it's not running"); + } + auto bm_cs = cloud_cs->getVMComputeService(vm_name); + + this->compute_service_registry.remove(bm_cs->getName()); + cloud_cs->shutdownVM(vm_name); + vm_shutdown.push(std::pair(true, vm_name)); + } catch (ExecutionException &e) { + vm_shutdown.push(std::pair(false, e.what())); + } catch (std::invalid_argument &e) { + vm_shutdown.push(std::pair(false, e.what())); + } + }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; - this->vm_shutdown.waitAndPop(reply); + vm_shutdown.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -615,12 +522,25 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_destroyed; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_destroy.push(std::pair(vm_name, cs)); + this->things_to_do.push([vm_name, cs, &vm_destroyed]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + if (not cloud_cs->isVMDown(vm_name)) { + throw std::invalid_argument("Cannot destroy VM because it's not down"); + } + cloud_cs->destroyVM(vm_name); + vm_destroyed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_destroyed.push(std::pair(false, e.what())); + } + }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; - this->vm_destroyed.waitAndPop(reply); + vm_destroyed.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -641,15 +561,7 @@ namespace wrench { // Create the new service auto new_service = SimpleStorageService::createSimpleStorageService(head_host, mount_points, {}, {}); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->storage_services_to_start.push(new_service); - - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; + return this->startService(new_service); } /** @@ -700,12 +612,21 @@ namespace wrench { throw std::runtime_error("Unknown file " + filename); } + BlockingQueue> file_looked_up; + // Push the request into the blocking queue (will be a single one!) - this->file_to_lookup.push(std::pair(file, ss)); + this->things_to_do.push([file, ss, &file_looked_up]() { + try { + bool result = ss->lookupFile(file); + file_looked_up.push(std::tuple(true, result, "")); + } catch (std::invalid_argument &e) { + file_looked_up.push(std::tuple(false, false, e.what())); + } + }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::tuple reply; - this->file_looked_up.waitAndPop(reply); + file_looked_up.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<2>(reply); @@ -727,15 +648,8 @@ namespace wrench { // Create the new service auto new_service = new FileRegistryService(head_host, {}, {}); - // Put in the list of services to start (this is because this method is called - // by the server thread, and therefore, it will segfault horribly if it calls any - // SimGrid simulation methods, e.g., to start a service) - this->file_service_to_start.push(new_service); - // Return the expected answer - json answer; - answer["service_name"] = new_service->getName(); - return answer; + return this->startService(new_service); } /** @@ -792,8 +706,26 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } - this->submissions_to_do.push(std::tuple(job, cs, service_specific_args)); - return {}; + BlockingQueue> job_submitted; + this->things_to_do.push([this, job, cs, service_specific_args, &job_submitted]() { + try { + WRENCH_INFO("Submitting a job..."); + this->job_manager->submitJob(job, cs, service_specific_args); + job_submitted.push(std::make_pair(true, "")); + } catch (std::exception &e) { + job_submitted.push(std::make_pair(false, e.what())); + } + }); + // Poll from the shared queue (will be a single one!) + std::pair reply; + job_submitted.waitAndPop(reply); + bool success = std::get<0>(reply); + if (not success) { + std::string error_msg = std::get<1>(reply); + throw std::runtime_error("Cannot submit job: " + error_msg); + } else { + return {}; + } } /** @@ -1101,11 +1033,21 @@ namespace wrench { } // Push the request into the blocking queue (will be a single one!) - this->vm_to_suspend.push(std::pair(vm_name, cs)); + //this->vm_to_suspend.push(std::pair(vm_name, cs)); + BlockingQueue> vm_suspended; + this->things_to_do.push([vm_name, cs, &vm_suspended]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->suspendVM(vm_name); + vm_suspended.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_suspended.push(std::pair(false, e.what())); + } + }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; - this->vm_suspended.waitAndPop(reply); + vm_suspended.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); @@ -1151,16 +1093,26 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } + BlockingQueue> vm_resumed; + // Push the request into the blocking queue (will be a single one!) - this->vm_to_resume.push(std::pair(vm_name, cs)); + this->things_to_do.push([vm_name, cs, &vm_resumed]() { + auto cloud_cs = std::dynamic_pointer_cast(cs); + try { + cloud_cs->resumeVM(vm_name); + vm_resumed.push(std::pair(true, vm_name)); + } catch (std::invalid_argument &e) { + vm_resumed.push(std::pair(false, e.what())); + } + }); - // Pool from the shared queue (will be a single one!) + // Poll from the shared queue (will be a single one!) std::pair reply; - this->vm_resumed.waitAndPop(reply); + vm_resumed.waitAndPop(reply); bool success = std::get<0>(reply); if (not success) { std::string error_msg = std::get<1>(reply); - throw std::runtime_error("Cannot suspend VM: " + error_msg); + throw std::runtime_error("Cannot resume VM: " + error_msg); } else { return {}; } @@ -1177,7 +1129,7 @@ namespace wrench { throw std::runtime_error("Unknown compute service " + cs_name); } auto cloud_cs = std::dynamic_pointer_cast(cs); - std::vector execution_hosts_list = cloud_cs->getExecutionHosts(); + std::vector execution_hosts_list = cloud_cs->getHosts(); json answer{}; answer["execution_hosts"] = execution_hosts_list; return answer;