20
20
21
21
WRENCH_LOG_CATEGORY (simulation_controller, " Log category for SimulationController" );
22
22
23
- #define PARSE_SERVICE_PROPERTY_LIST () WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \
24
- { \
25
- json jsonData = json::parse (property_list_string); \
26
- for (auto it = jsonData.cbegin (); it != jsonData.cend (); ++it) { \
27
- auto property_key = ServiceProperty::translateString (it.key ()); \
28
- service_property_list[property_key] = it.value (); \
29
- } \
30
- }
31
-
32
- #define PARSE_MESSAGE_PAYLOAD_LIST () WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \
33
- { \
34
- json jsonData = json::parse (message_payload_list_string); \
35
- for (auto it = jsonData.cbegin (); it != jsonData.cend (); ++it) { \
36
- auto message_payload_key = ServiceMessagePayload::translateString (it.key ()); \
37
- service_message_payload_list[message_payload_key] = it.value (); \
38
- } \
39
- }
23
+ #define PARSE_SERVICE_PROPERTY_LIST () \
24
+ WRENCH_PROPERTY_COLLECTION_TYPE service_property_list; \
25
+ { \
26
+ json jsonData = json::parse (property_list_string); \
27
+ for (auto it = jsonData.cbegin (); it != jsonData.cend (); ++it) { \
28
+ auto property_key = ServiceProperty::translateString (it.key ()); \
29
+ service_property_list[property_key] = it.value (); \
30
+ } \
31
+ }
32
+
33
+ #define PARSE_MESSAGE_PAYLOAD_LIST () \
34
+ WRENCH_MESSAGE_PAYLOADCOLLECTION_TYPE service_message_payload_list; \
35
+ { \
36
+ json jsonData = json::parse (message_payload_list_string); \
37
+ for (auto it = jsonData.cbegin (); it != jsonData.cend (); ++it) { \
38
+ auto message_payload_key = ServiceMessagePayload::translateString (it.key ()); \
39
+ service_message_payload_list[message_payload_key] = it.value (); \
40
+ } \
41
+ }
40
42
41
43
namespace wrench {
42
44
@@ -50,11 +52,11 @@ namespace wrench {
50
52
const std::string &hostname, int sleep_us) : ExecutionController(hostname, " SimulationController" ), workflow(workflow), sleep_us(sleep_us) {}
51
53
52
54
53
- template <class T >
55
+ template <class T >
54
56
json SimulationController::startService (T *s) {
55
57
BlockingQueue<std::pair<bool , std::string>> s_created;
56
58
57
- this ->things_to_do .push ([this , s, &s_created](){
59
+ this ->things_to_do .push ([this , s, &s_created]() {
58
60
try {
59
61
auto new_service_shared_ptr = this ->simulation ->startNewService (s);
60
62
if (auto cs = std::dynamic_pointer_cast<wrench::ComputeService>(new_service_shared_ptr)) {
@@ -106,10 +108,10 @@ namespace wrench {
106
108
107
109
// Main control loop
108
110
while (keep_going) {
109
-
111
+
110
112
// Starting compute and storage services that should be started, if any
111
113
while (true ) {
112
- std::function<void ()> thing_to_do;
114
+ std::function<void ()> thing_to_do;
113
115
114
116
if (this ->things_to_do .tryPop (thing_to_do)) {
115
117
thing_to_do ();
@@ -128,7 +130,7 @@ namespace wrench {
128
130
// Moves time forward if needed (because the client has done a sleep),
129
131
// And then add all events that occurred to the event queue
130
132
double time_to_sleep = std::max<double >(0 , time_horizon_to_reach -
131
- wrench::Simulation::getCurrentSimulatedDate ());
133
+ wrench::Simulation::getCurrentSimulatedDate ());
132
134
if (time_to_sleep > 0.0 ) {
133
135
WRENCH_INFO (" Sleeping %.2lf seconds" , time_to_sleep);
134
136
S4U_Simulation::sleep (time_to_sleep);
@@ -333,7 +335,6 @@ namespace wrench {
333
335
auto new_service = new CloudComputeService (hostname, resources, scratch_space,
334
336
service_property_list, service_message_payload_list);
335
337
return this ->startService <wrench::ComputeService>(new_service);
336
-
337
338
}
338
339
339
340
@@ -357,7 +358,6 @@ namespace wrench {
357
358
auto new_service = new BatchComputeService (hostname, resources, scratch_space,
358
359
service_property_list, service_message_payload_list);
359
360
return this ->startService <wrench::ComputeService>(new_service);
360
-
361
361
}
362
362
363
363
/* *
@@ -386,15 +386,15 @@ namespace wrench {
386
386
BlockingQueue<std::pair<bool , std::string>> vm_created;
387
387
388
388
// Push the request into the blocking queue (will be a single one!)
389
- this ->things_to_do .push ([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created](){
390
- auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
391
- std::string vm_name;
392
- try {
393
- vm_name = cloud_cs->createVM (num_cores, ram_memory, service_property_list, service_message_payload_list);
394
- vm_created.push (std::pair (true , vm_name));
395
- } catch (ExecutionException &e) {
396
- vm_created.push (std::pair (false , e.getCause ()->toString ()));
397
- }
389
+ this ->things_to_do .push ([num_cores, ram_memory, service_property_list, service_message_payload_list, cs, &vm_created]() {
390
+ auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
391
+ std::string vm_name;
392
+ try {
393
+ vm_name = cloud_cs->createVM (num_cores, ram_memory, service_property_list, service_message_payload_list);
394
+ vm_created.push (std::pair (true , vm_name));
395
+ } catch (ExecutionException &e) {
396
+ vm_created.push (std::pair (false , e.getCause ()->toString ()));
397
+ }
398
398
});
399
399
400
400
// Poll from the shared queue (will be a single one!)
@@ -428,20 +428,20 @@ namespace wrench {
428
428
429
429
BlockingQueue<std::pair<bool , std::string>> vm_started;
430
430
// Push the request into the blocking queue (will be a single one!)
431
- this ->things_to_do .push ([this , vm_name, cs, &vm_started](){
432
- auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
433
- try {
434
- if (not cloud_cs->isVMDown (vm_name)) {
435
- throw std::invalid_argument (" Cannot start VM because it's not down" );
436
- }
437
- auto bm_cs = cloud_cs->startVM (vm_name);
438
- this ->compute_service_registry .insert (bm_cs->getName (), bm_cs);
439
- vm_started.push (std::pair (true , bm_cs->getName ()));
440
- } catch (ExecutionException &e) {
441
- vm_started.push (std::pair (false , e.getCause ()->toString ()));
442
- } catch (std::invalid_argument &e) {
443
- vm_started.push (std::pair (false , e.what ()));
444
- }
431
+ this ->things_to_do .push ([this , vm_name, cs, &vm_started]() {
432
+ auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
433
+ try {
434
+ if (not cloud_cs->isVMDown (vm_name)) {
435
+ throw std::invalid_argument (" Cannot start VM because it's not down" );
436
+ }
437
+ auto bm_cs = cloud_cs->startVM (vm_name);
438
+ this ->compute_service_registry .insert (bm_cs->getName (), bm_cs);
439
+ vm_started.push (std::pair (true , bm_cs->getName ()));
440
+ } catch (ExecutionException &e) {
441
+ vm_started.push (std::pair (false , e.getCause ()->toString ()));
442
+ } catch (std::invalid_argument &e) {
443
+ vm_started.push (std::pair (false , e.what ()));
444
+ }
445
445
});
446
446
447
447
// Poll from the shared queue (will be a single one!)
@@ -477,22 +477,22 @@ namespace wrench {
477
477
478
478
// Push the request into the blocking queue (will be a single one!)
479
479
// this->vm_to_shutdown.push(std::pair(vm_name, cs));
480
- this ->things_to_do .push ([this , vm_name, cs, &vm_shutdown](){
481
- auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
482
- try {
483
- if (not cloud_cs->isVMRunning (vm_name)) {
484
- throw std::invalid_argument (" Cannot shutdown VM because it's not running" );
485
- }
486
- auto bm_cs = cloud_cs->getVMComputeService (vm_name);
487
-
488
- this ->compute_service_registry .remove (bm_cs->getName ());
489
- cloud_cs->shutdownVM (vm_name);
490
- vm_shutdown.push (std::pair (true , vm_name));
491
- } catch (ExecutionException &e) {
492
- vm_shutdown.push (std::pair (false , e.what ()));
493
- } catch (std::invalid_argument &e) {
494
- vm_shutdown.push (std::pair (false , e.what ()));
495
- }
480
+ this ->things_to_do .push ([this , vm_name, cs, &vm_shutdown]() {
481
+ auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
482
+ try {
483
+ if (not cloud_cs->isVMRunning (vm_name)) {
484
+ throw std::invalid_argument (" Cannot shutdown VM because it's not running" );
485
+ }
486
+ auto bm_cs = cloud_cs->getVMComputeService (vm_name);
487
+
488
+ this ->compute_service_registry .remove (bm_cs->getName ());
489
+ cloud_cs->shutdownVM (vm_name);
490
+ vm_shutdown.push (std::pair (true , vm_name));
491
+ } catch (ExecutionException &e) {
492
+ vm_shutdown.push (std::pair (false , e.what ()));
493
+ } catch (std::invalid_argument &e) {
494
+ vm_shutdown.push (std::pair (false , e.what ()));
495
+ }
496
496
});
497
497
498
498
// Poll from the shared queue (will be a single one!)
@@ -525,17 +525,17 @@ namespace wrench {
525
525
BlockingQueue<std::pair<bool , std::string>> vm_destroyed;
526
526
527
527
// Push the request into the blocking queue (will be a single one!)
528
- this ->things_to_do .push ([vm_name, cs, &vm_destroyed](){
529
- auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
530
- try {
531
- if (not cloud_cs->isVMDown (vm_name)) {
532
- throw std::invalid_argument (" Cannot destroy VM because it's not down" );
533
- }
534
- cloud_cs->destroyVM (vm_name);
535
- vm_destroyed.push (std::pair (true , vm_name));
536
- } catch (std::invalid_argument &e) {
537
- vm_destroyed.push (std::pair (false , e.what ()));
538
- }
528
+ this ->things_to_do .push ([vm_name, cs, &vm_destroyed]() {
529
+ auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
530
+ try {
531
+ if (not cloud_cs->isVMDown (vm_name)) {
532
+ throw std::invalid_argument (" Cannot destroy VM because it's not down" );
533
+ }
534
+ cloud_cs->destroyVM (vm_name);
535
+ vm_destroyed.push (std::pair (true , vm_name));
536
+ } catch (std::invalid_argument &e) {
537
+ vm_destroyed.push (std::pair (false , e.what ()));
538
+ }
539
539
});
540
540
541
541
// Poll from the shared queue (will be a single one!)
@@ -562,7 +562,6 @@ namespace wrench {
562
562
// Create the new service
563
563
auto new_service = SimpleStorageService::createSimpleStorageService (head_host, mount_points, {}, {});
564
564
return this ->startService <wrench::StorageService>(new_service);
565
-
566
565
}
567
566
568
567
/* *
@@ -616,13 +615,13 @@ namespace wrench {
616
615
BlockingQueue<std::tuple<bool , bool , std::string>> file_looked_up;
617
616
618
617
// Push the request into the blocking queue (will be a single one!)
619
- this ->things_to_do .push ([file, ss, &file_looked_up](){
620
- try {
621
- bool result = ss->lookupFile (file);
622
- file_looked_up.push (std::tuple (true , result, " " ));
623
- } catch (std::invalid_argument &e) {
624
- file_looked_up.push (std::tuple (false , false , e.what ()));
625
- }
618
+ this ->things_to_do .push ([file, ss, &file_looked_up]() {
619
+ try {
620
+ bool result = ss->lookupFile (file);
621
+ file_looked_up.push (std::tuple (true , result, " " ));
622
+ } catch (std::invalid_argument &e) {
623
+ file_looked_up.push (std::tuple (false , false , e.what ()));
624
+ }
626
625
});
627
626
628
627
// Poll from the shared queue (will be a single one!)
@@ -708,7 +707,7 @@ namespace wrench {
708
707
}
709
708
710
709
BlockingQueue<std::pair<bool , std::string>> job_submitted;
711
- this ->things_to_do .push ([this , job, cs, service_specific_args, &job_submitted](){
710
+ this ->things_to_do .push ([this , job, cs, service_specific_args, &job_submitted]() {
712
711
try {
713
712
WRENCH_INFO (" Submitting a job..." );
714
713
this ->job_manager ->submitJob (job, cs, service_specific_args);
@@ -1036,14 +1035,14 @@ namespace wrench {
1036
1035
// Push the request into the blocking queue (will be a single one!)
1037
1036
// this->vm_to_suspend.push(std::pair(vm_name, cs));
1038
1037
BlockingQueue<std::pair<bool , std::string>> vm_suspended;
1039
- this ->things_to_do .push ([vm_name, cs, &vm_suspended](){
1040
- auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
1041
- try {
1042
- cloud_cs->suspendVM (vm_name);
1043
- vm_suspended.push (std::pair (true , vm_name));
1044
- } catch (std::invalid_argument &e) {
1045
- vm_suspended.push (std::pair (false , e.what ()));
1046
- }
1038
+ this ->things_to_do .push ([vm_name, cs, &vm_suspended]() {
1039
+ auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
1040
+ try {
1041
+ cloud_cs->suspendVM (vm_name);
1042
+ vm_suspended.push (std::pair (true , vm_name));
1043
+ } catch (std::invalid_argument &e) {
1044
+ vm_suspended.push (std::pair (false , e.what ()));
1045
+ }
1047
1046
});
1048
1047
1049
1048
// Poll from the shared queue (will be a single one!)
@@ -1097,14 +1096,14 @@ namespace wrench {
1097
1096
BlockingQueue<std::pair<bool , std::string>> vm_resumed;
1098
1097
1099
1098
// Push the request into the blocking queue (will be a single one!)
1100
- this ->things_to_do .push ([vm_name, cs, &vm_resumed](){
1101
- auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
1102
- try {
1103
- cloud_cs->resumeVM (vm_name);
1104
- vm_resumed.push (std::pair (true , vm_name));
1105
- } catch (std::invalid_argument &e) {
1106
- vm_resumed.push (std::pair (false , e.what ()));
1107
- }
1099
+ this ->things_to_do .push ([vm_name, cs, &vm_resumed]() {
1100
+ auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
1101
+ try {
1102
+ cloud_cs->resumeVM (vm_name);
1103
+ vm_resumed.push (std::pair (true , vm_name));
1104
+ } catch (std::invalid_argument &e) {
1105
+ vm_resumed.push (std::pair (false , e.what ()));
1106
+ }
1108
1107
});
1109
1108
1110
1109
// Poll from the shared queue (will be a single one!)
@@ -1131,7 +1130,7 @@ namespace wrench {
1131
1130
}
1132
1131
auto cloud_cs = std::dynamic_pointer_cast<CloudComputeService>(cs);
1133
1132
std::vector<std::string> execution_hosts_list = cloud_cs->getHosts ();
1134
- json answer {};
1133
+ json answer{};
1135
1134
answer[" execution_hosts" ] = execution_hosts_list;
1136
1135
return answer;
1137
1136
}
0 commit comments