Skip to content

Commit

Permalink
feat: fail interactive task callback
Browse files Browse the repository at this point in the history
  • Loading branch information
NamelessOIer committed Aug 8, 2024
1 parent e979a09 commit 83b7d45
Showing 1 changed file with 58 additions and 35 deletions.
93 changes: 58 additions & 35 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,9 +1083,13 @@ void TaskScheduler::ScheduleThread_() {
for (auto& task_id : dep_fail_task_vec) {
auto it = m_pending_task_map_.find(task_id);
if (it != m_pending_task_map_.end()) {
it->second->SetStatus(crane::grpc::Failed);
failed_task_raw_ptrs.emplace_back(it->second.get());
failed_tasks.emplace_back(std::move(it->second));
auto& task = it->second;
task->SetStatus(crane::grpc::Failed);
if (task->type != crane::grpc::Batch) {
// TODO: Add crun and calloc callback here!
}
failed_task_raw_ptrs.emplace_back(task.get());
failed_tasks.emplace_back(std::move(task));
m_pending_task_map_.erase(it);
}
}
Expand All @@ -1098,21 +1102,24 @@ void TaskScheduler::ScheduleThread_() {
}
for (auto& [task_id, dep_ids] : dependencies) {
auto it = m_pending_task_map_.find(task_id);
if (it != m_pending_task_map_.end() &&
it->second->HasDependency()) {
it->second->DependencyAdd(dep_ids);
if (it->second->NoWaitingDependency()) {
if (it->second->dependencies.depend_all()) {
it->second->SetDependencyOK();
auto& task = it->second;
if (it != m_pending_task_map_.end() && task->HasDependency()) {
task->DependencyAdd(dep_ids);
if (task->NoWaitingDependency()) {
if (task->dependencies.depend_all()) {
task->SetDependencyOK();
} else {
it->second->SetStatus(crane::grpc::Failed);
failed_task_raw_ptrs.emplace_back(it->second.get());
failed_tasks.emplace_back(std::move(it->second));
task->SetStatus(crane::grpc::Failed);
if (task->type != crane::grpc::Batch) {
// TODO: Add crun and calloc callback here!
}
failed_task_raw_ptrs.emplace_back(task.get());
failed_tasks.emplace_back(std::move(task));
m_pending_task_map_.erase(it);
continue;
}
}
updated_task_raw_ptrs.emplace_back(it->second.get());
updated_task_raw_ptrs.emplace_back(task.get());
}
}
}
Expand Down Expand Up @@ -1610,9 +1617,13 @@ void TaskScheduler::CleanCancelQueueCb_() {
for (auto& task_id : dep_fail_task_vec) {
auto it = m_pending_task_map_.find(task_id);
if (it != m_pending_task_map_.end()) {
it->second->SetStatus(crane::grpc::Failed);
task_raw_ptr_vec.emplace_back(it->second.get());
task_ptr_vec.emplace_back(std::move(it->second));
auto& task = it->second;
task->SetStatus(crane::grpc::Failed);
if (task->type != crane::grpc::Batch) {
// TODO: Add crun and calloc callback here!
}
task_raw_ptr_vec.emplace_back(task.get());
task_ptr_vec.emplace_back(std::move(task));
m_pending_task_map_.erase(it);
}
}
Expand All @@ -1626,19 +1637,23 @@ void TaskScheduler::CleanCancelQueueCb_() {
for (auto& [task_id, dep_ids] : dependencies) {
auto it = m_pending_task_map_.find(task_id);
if (it != m_pending_task_map_.end() && it->second->HasDependency()) {
it->second->DependencyAdd(dep_ids);
if (it->second->NoWaitingDependency()) {
if (it->second->dependencies.depend_all()) {
it->second->SetDependencyOK();
auto& task = it->second;
task->DependencyAdd(dep_ids);
if (task->NoWaitingDependency()) {
if (task->dependencies.depend_all()) {
task->SetDependencyOK();
} else {
it->second->SetStatus(crane::grpc::Failed);
task_raw_ptr_vec.emplace_back(it->second.get());
task_ptr_vec.emplace_back(std::move(it->second));
task->SetStatus(crane::grpc::Failed);
if (task->type != crane::grpc::Batch) {
// TODO: Add crun and calloc callback here!
}
task_raw_ptr_vec.emplace_back(task.get());
task_ptr_vec.emplace_back(std::move(task));
m_pending_task_map_.erase(it);
continue;
}
}
updated_raw_ptr_vec.emplace_back(it->second.get());
updated_raw_ptr_vec.emplace_back(task.get());
}
}
}
Expand Down Expand Up @@ -1926,9 +1941,13 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {
for (auto& task_id : dep_fail_task_vec) {
auto it = m_pending_task_map_.find(task_id);
if (it != m_pending_task_map_.end()) {
it->second->SetStatus(crane::grpc::Failed);
task_raw_ptr_vec.emplace_back(it->second.get());
task_ptr_vec.emplace_back(std::move(it->second));
auto& task = it->second;
task->SetStatus(crane::grpc::Failed);
if (task->type != crane::grpc::Batch) {
// TODO: Add crun and calloc callback here!
}
task_raw_ptr_vec.emplace_back(task.get());
task_ptr_vec.emplace_back(std::move(task));
m_pending_task_map_.erase(it);
}
}
Expand All @@ -1942,19 +1961,23 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {
for (auto& [task_id, dep_ids] : dependencies) {
auto it = m_pending_task_map_.find(task_id);
if (it != m_pending_task_map_.end() && it->second->HasDependency()) {
it->second->DependencyAdd(dep_ids);
if (it->second->NoWaitingDependency()) {
if (it->second->dependencies.depend_all()) {
it->second->SetDependencyOK();
auto& task = it->second;
task->DependencyAdd(dep_ids);
if (task->NoWaitingDependency()) {
if (task->dependencies.depend_all()) {
task->SetDependencyOK();
} else {
it->second->SetStatus(crane::grpc::Failed);
task_raw_ptr_vec.emplace_back(it->second.get());
task_ptr_vec.emplace_back(std::move(it->second));
task->SetStatus(crane::grpc::Failed);
if (task->type != crane::grpc::Batch) {
// TODO: Add crun and calloc callback here!
}
task_raw_ptr_vec.emplace_back(task.get());
task_ptr_vec.emplace_back(std::move(task));
m_pending_task_map_.erase(it);
continue;
}
}
updated_raw_ptr_vec.emplace_back(it->second.get());
updated_raw_ptr_vec.emplace_back(task.get());
}
}
}
Expand Down

0 comments on commit 83b7d45

Please sign in to comment.