Skip to content

Commit

Permalink
Finalize testing and I think finally caught all expected edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Lnaden committed Jul 9, 2019
1 parent 9248de0 commit 2612315
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
8 changes: 8 additions & 0 deletions qcfractal/queue/executor_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
task = self.client.submit(func, *task_spec["spec"]["args"], **task_spec["spec"]["kwargs"])
return task_spec["id"], task

def count_running_workers(self) -> int:
# If there are tasks, they are running
return bool(self.queue)

def acquire_complete(self) -> Dict[str, Any]:
ret = {}
del_keys = []
Expand Down Expand Up @@ -78,6 +82,10 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
func, *task_spec["spec"]["args"], **task_spec["spec"]["kwargs"], resources={"process": 1})
return task_spec["id"], task

def count_running_workers(self) -> int:
# Have not worked through this yet for Dask
raise NotImplementedError

def await_results(self) -> bool:
from dask.distributed import wait
wait(list(self.queue.values()))
Expand Down
11 changes: 9 additions & 2 deletions qcfractal/queue/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,18 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool:
f"Msg: {result.error.error_message}")
# Try to get the wall time in the most fault-tolerant way
try:
wall_time_seconds = float(results.input_data.get('provenance', {}).get('wall_time', 0))
wall_time_seconds = float(result.input_data.get('provenance', {}).get('wall_time', 0))
except AttributeError:
# Trap the result.input_data is None, but let other attribute errors go
if result.input_data is None:
wall_time_seconds = 0
else:
raise
except TypeError:
# Trap wall time corruption, e.g. float(None)
# Other Result corruptions will raise an error correctly
wall_time_seconds = 0

task_cpu_hours += wall_time_seconds * self.statistics.cores_per_task / 3600
n_fail = n_result - n_success

Expand All @@ -548,7 +555,7 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool:
try:
success_rate = self.statistics.total_successful_tasks/self.statistics.total_completed_tasks*100
except ZeroDivisionError:
success_rate = "N/A"
success_rate = "(N/A yet)"
stats_str = (f"Stats (Tasks): Processed={self.statistics.total_completed_tasks}, "
f"Failed={self.statistics.total_failed_tasks}, "
f"Success={success_rate}%, "
Expand Down
4 changes: 2 additions & 2 deletions qcfractal/queue/parsl_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ def count_running_workers(self) -> int:
# Efficiency loop break
break

for executor in self.client.executors.values():
for executor_key, executor in self.client.executors.items():
if hasattr(executor, 'connected_workers'):
# Should return an int
running += executor.connected_workers
elif hasattr(executor, 'max_threads') and executor_running_task_map[executor]:
elif hasattr(executor, 'max_threads') and executor_running_task_map[executor_key]:
running += 1
else:
raise NotImplementedError("Cannot accurately estimate consumption from executors")
Expand Down

0 comments on commit 2612315

Please sign in to comment.