From c33eb9a49542c285fc22b455f95787ad3d9b9f42 Mon Sep 17 00:00:00 2001 From: Levi Naden Date: Wed, 3 Jul 2019 17:30:49 -0400 Subject: [PATCH 1/6] Manager Quality of Life statistics Added quality of life stats to the Queue Manager reporting, only really works in parsl for efficiency, but will track totals as well. Progress towards (and possible resolution) of #262 --- qcfractal/cli/qcfractal_manager.py | 6 ++- qcfractal/queue/adapters.py | 2 +- qcfractal/queue/base_adapter.py | 17 ++++++ qcfractal/queue/managers.py | 83 +++++++++++++++++++++++++++++- qcfractal/queue/parsl_adapter.py | 11 ++++ 5 files changed, 115 insertions(+), 4 deletions(-) diff --git a/qcfractal/cli/qcfractal_manager.py b/qcfractal/cli/qcfractal_manager.py index c51df4788..085af0ed0 100644 --- a/qcfractal/cli/qcfractal_manager.py +++ b/qcfractal/cli/qcfractal_manager.py @@ -814,9 +814,10 @@ def main(args=None): # Build out the manager itself # Compute max tasks + max_concurrent_tasks = settings.common.tasks_per_worker * settings.common.max_workers if settings.manager.max_queued_tasks is None: # Tasks * jobs * buffer + 1 - max_queued_tasks = ceil(settings.common.tasks_per_worker * settings.common.max_workers * 1.20) + 1 + max_queued_tasks = ceil(max_concurrent_tasks * 1.20) + 1 else: max_queued_tasks = settings.manager.max_queued_tasks @@ -833,6 +834,9 @@ def main(args=None): verbose=settings.common.verbose ) + # Set stats correctly since we buffer the max tasks a bit + manager.statistics.max_concurrent_tasks = max_concurrent_tasks + # Add exit callbacks for cb in exit_callbacks: manager.add_exit_callback(cb[0], *cb[1], **cb[2]) diff --git a/qcfractal/queue/adapters.py b/qcfractal/queue/adapters.py index 2ee8d98cc..7e1f942d2 100644 --- a/qcfractal/queue/adapters.py +++ b/qcfractal/queue/adapters.py @@ -7,7 +7,7 @@ from .parsl_adapter import ParslAdapter -def build_queue_adapter(workflow_client, logger=None, **kwargs): +def build_queue_adapter(workflow_client, logger=None, **kwargs) -> 'BaseAdapter': """Constructs a queue manager based off the incoming queue socket type. Parameters diff --git a/qcfractal/queue/base_adapter.py b/qcfractal/queue/base_adapter.py index decad9b80..f1fc35be8 100644 --- a/qcfractal/queue/base_adapter.py +++ b/qcfractal/queue/base_adapter.py @@ -199,6 +199,23 @@ def close(self) -> bool: True if the closing was successful. """ + def count_running(self) -> int: + """ + Similar to count tasks, but relies on adapter-specific implementation to count the currently + running tasks. May not be implemented or possible for each adapter, nor is it required for + operation. As such, this it is not required to be implemented as an abstract method. + + Returns + ------- + int + Number of running tasks + + Raises + ------ + NotImplementedError + """ + raise NotImplementedError("This adapter has not implemented this method yet") + @abc.abstractmethod def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: """ diff --git a/qcfractal/queue/managers.py b/qcfractal/queue/managers.py index 2e5164acd..c7b042f0a 100644 --- a/qcfractal/queue/managers.py +++ b/qcfractal/queue/managers.py @@ -10,6 +10,7 @@ import uuid from typing import Any, Callable, Dict, List, Optional, Union +from pydantic import BaseModel import qcengine as qcng from qcfractal.extras import get_information @@ -19,6 +20,34 @@ __all__ = ["QueueManager"] +class QueueStatistics(BaseModel): + """ + Queue Manager Job statistics + """ + # Dynamic quantities + total_successful_tasks: int = 0 + total_failed_tasks: int = 0 + total_cpu_hours: float = 0 + # Static Quantities + max_concurrent_tasks: int = 0 + cores_per_task: int = 0 + last_update_time: float = None + + def __init__(self, **kwargs): + if kwargs.get('last_update_time', None) is None: + kwargs['last_update_time'] = time.time() + super().__init__(**kwargs) + + @property + def total_completed_tasks(self): + return self.total_successful_tasks + self.total_failed_tasks + + @property + def theoretical_max_consumption(self): + """In CPU Hours""" + return self.max_concurrent_tasks * self.cores_per_task * (time.time() - self.last_update_time) / 3600 + + class QueueManager: """ This object maintains a computational queue and watches for finished tasks for different @@ -42,7 +71,7 @@ def __init__(self, logger: Optional[logging.Logger] = None, max_tasks: int = 200, queue_tag: str = None, - manager_name: str = "unlabled", + manager_name: str = "unlabeled", update_frequency: Union[int, float] = 2, verbose: bool = True, server_error_retries: Optional[int] = 1, @@ -112,6 +141,10 @@ def __init__(self, self.max_tasks = max_tasks self.queue_tag = queue_tag self.verbose = verbose + self.statistics = QueueStatistics(max_concurrent_tasks=self.max_tasks, + cores_per_task=cores_per_task, + update_frequency=update_frequency + ) self.scheduler = None self.update_frequency = update_frequency @@ -280,7 +313,6 @@ def close_adapter(self) -> bool: return self.queue_adapter.close() - ## Queue Manager functions def heartbeat(self) -> None: @@ -428,9 +460,26 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: self._update_stale_jobs(allow_shutdown=allow_shutdown) results = self.queue_adapter.acquire_complete() + + # Stats fetching for running tasks, as close to the time we got the jobs as we can + last_time = self.statistics.last_update_time + now = self.statistics.last_update_time = time.time() + time_delta_seconds = now - last_time + try: + running_tasks = self.queue_adapter.count_running() + log_efficiency = True + except NotImplementedError: + running_tasks = 0 + log_efficiency = False + max_cpu_hours_running = time_delta_seconds * running_tasks * self.statistics.cores_per_task / 3600 + max_cpu_hours_possible = (time_delta_seconds * self.statistics.max_concurrent_tasks + * self.statistics.cores_per_task / 3600) + + # Process jobs n_success = 0 n_fail = 0 n_result = len(results) + task_cpu_hours = 0 error_payload = [] jobs_pushed = f"Pushed {n_result} complete tasks to the server " if n_result: @@ -448,11 +497,24 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: self.active -= n_result for key, result in results.items(): + wall_time_seconds = 0 if result.success: n_success += 1 + try: + wall_time_seconds = result.provenance.wall_time + except: + pass else: error_payload.append(f"Job {key} failed: {result.error.error_type} - " f"Msg: {result.error.error_message}") + try: + wall_time_seconds = result.input_data['provenance'].get('wall_time') + except: + pass + try: + task_cpu_hours += wall_time_seconds * self.statistics.cores_per_task / 3600 + except: + pass n_fail = n_result - n_success self.logger.info(jobs_pushed + f"({n_success} success / {n_fail} fail).") @@ -469,6 +531,23 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: # Get new tasks payload = self._payload_template() payload["data"]["limit"] = open_slots + + # Crunch Statistics + self.statistics.total_failed_tasks += n_fail + self.statistics.total_successful_tasks += n_success + self.statistics.total_cpu_hours += task_cpu_hours + self.logger.info(f"Stats: Number of processed jobs: {self.statistics.total_completed_tasks}") + self.logger.info(f"Stats: {self.statistics.total_successful_tasks} successful / " + f"{self.statistics.total_failed_tasks} failed " + f"({self.statistics.total_successful_tasks/self.statistics.total_completed_tasks*100}% " + f"success rate)") + self.logger.info(f"Stats: {self.statistics.total_cpu_hours} CPU Hours logged successfully (estimate)") + # Handle efficiency calculations + if log_efficiency: + efficiency = min((max_cpu_hours_running + task_cpu_hours)/max_cpu_hours_possible * 100, 100) + self.logger.info(f"Stats: Resource consumption efficiency upper limit: {efficiency} " + f"(estimate, higher is better)") + try: new_tasks = self.client._automodel_request("queue_manager", "get", payload) except IOError as exc: diff --git a/qcfractal/queue/parsl_adapter.py b/qcfractal/queue/parsl_adapter.py index 18ed47a7a..84f988f7b 100644 --- a/qcfractal/queue/parsl_adapter.py +++ b/qcfractal/queue/parsl_adapter.py @@ -33,6 +33,7 @@ def __init__(self, client: Any, logger: Optional[logging.Logger] = None, **kwarg import parsl self.client = parsl.dataflow.dflow.DataFlowKernel(self.client) + self._parsl_states = parsl.dataflow.states.States self.app_map = {} def __repr__(self): @@ -77,6 +78,16 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: task = func(*task_spec["spec"]["args"], **task_spec["spec"]["kwargs"]) return task_spec["id"], task + def count_running(self) -> int: + + running = 0 + for task in self.queue.values(): + status = self.client.tasks.get(task.tid, {}).get('status', None) + if status == self._parsl_states.running: + running += 1 + + return running + def acquire_complete(self) -> Dict[str, Any]: ret = {} del_keys = [] From 9248de0a8aa75b0d1be9cbc1464c820303dfbfb2 Mon Sep 17 00:00:00 2001 From: Levi Naden Date: Mon, 8 Jul 2019 14:02:06 -0400 Subject: [PATCH 2/6] Make changes from suggestions, still need to add tests --- qcfractal/queue/base_adapter.py | 8 ++-- qcfractal/queue/managers.py | 77 ++++++++++++++++++++------------ qcfractal/queue/parsl_adapter.py | 15 ++++++- 3 files changed, 66 insertions(+), 34 deletions(-) diff --git a/qcfractal/queue/base_adapter.py b/qcfractal/queue/base_adapter.py index f1fc35be8..50385fa2a 100644 --- a/qcfractal/queue/base_adapter.py +++ b/qcfractal/queue/base_adapter.py @@ -199,16 +199,16 @@ def close(self) -> bool: True if the closing was successful. """ - def count_running(self) -> int: + def count_running_workers(self) -> int: """ - Similar to count tasks, but relies on adapter-specific implementation to count the currently - running tasks. May not be implemented or possible for each adapter, nor is it required for + Adapter-specific implementation to count the currently running workers, helpful for resource consumption. + May not be implemented or possible for each adapter, nor is it required for operation. As such, this it is not required to be implemented as an abstract method. Returns ------- int - Number of running tasks + Number of running workers Raises ------ diff --git a/qcfractal/queue/managers.py b/qcfractal/queue/managers.py index c7b042f0a..807bbb859 100644 --- a/qcfractal/queue/managers.py +++ b/qcfractal/queue/managers.py @@ -10,7 +10,7 @@ import uuid from typing import Any, Callable, Dict, List, Optional, Union -from pydantic import BaseModel +from pydantic import BaseModel, validator import qcengine as qcng from qcfractal.extras import get_information @@ -27,11 +27,14 @@ class QueueStatistics(BaseModel): # Dynamic quantities total_successful_tasks: int = 0 total_failed_tasks: int = 0 - total_cpu_hours: float = 0 + total_core_hours: float = 0 + total_core_hours_consumed: float = 0 + total_core_hours_possible: float = 0 # Static Quantities max_concurrent_tasks: int = 0 cores_per_task: int = 0 last_update_time: float = None + # sum_task(task_wall_time * nthread/task) / sum_time(number_running_worker * nthread/worker * interval) def __init__(self, **kwargs): if kwargs.get('last_update_time', None) is None: @@ -44,9 +47,15 @@ def total_completed_tasks(self): @property def theoretical_max_consumption(self): - """In CPU Hours""" + """In Core Hours""" return self.max_concurrent_tasks * self.cores_per_task * (time.time() - self.last_update_time) / 3600 + @validator('cores_per_task', pre=True) + def cores_per_tasks_none(cls, v): + if v is None: + v = 1 + return v + class QueueManager: """ @@ -466,14 +475,16 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: now = self.statistics.last_update_time = time.time() time_delta_seconds = now - last_time try: - running_tasks = self.queue_adapter.count_running() + running_workers = self.queue_adapter.count_running_workers() log_efficiency = True except NotImplementedError: - running_tasks = 0 + running_workers = 0 log_efficiency = False - max_cpu_hours_running = time_delta_seconds * running_tasks * self.statistics.cores_per_task / 3600 - max_cpu_hours_possible = (time_delta_seconds * self.statistics.max_concurrent_tasks - * self.statistics.cores_per_task / 3600) + max_core_hours_running = time_delta_seconds * running_workers * self.statistics.cores_per_task / 3600 + max_core_hours_possible = (time_delta_seconds * self.statistics.max_concurrent_tasks + * self.statistics.cores_per_task / 3600) + self.statistics.total_core_hours_consumed += max_core_hours_running + self.statistics.total_core_hours_possible += max_core_hours_possible # Process jobs n_success = 0 @@ -500,21 +511,19 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: wall_time_seconds = 0 if result.success: n_success += 1 - try: - wall_time_seconds = result.provenance.wall_time - except: - pass + if hasattr(result.provenance, 'wall_time'): + wall_time_seconds = float(result.provenance.wall_time) else: error_payload.append(f"Job {key} failed: {result.error.error_type} - " f"Msg: {result.error.error_message}") + # Try to get the wall time in the most fault-tolerant way try: - wall_time_seconds = result.input_data['provenance'].get('wall_time') - except: - pass - try: - task_cpu_hours += wall_time_seconds * self.statistics.cores_per_task / 3600 - except: - pass + wall_time_seconds = float(results.input_data.get('provenance', {}).get('wall_time', 0)) + except TypeError: + # Trap wall time corruption, e.g. float(None) + # Other Result corruptions will raise an error correctly + wall_time_seconds = 0 + task_cpu_hours += wall_time_seconds * self.statistics.cores_per_task / 3600 n_fail = n_result - n_success self.logger.info(jobs_pushed + f"({n_success} success / {n_fail} fail).") @@ -535,18 +544,28 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: # Crunch Statistics self.statistics.total_failed_tasks += n_fail self.statistics.total_successful_tasks += n_success - self.statistics.total_cpu_hours += task_cpu_hours - self.logger.info(f"Stats: Number of processed jobs: {self.statistics.total_completed_tasks}") - self.logger.info(f"Stats: {self.statistics.total_successful_tasks} successful / " - f"{self.statistics.total_failed_tasks} failed " - f"({self.statistics.total_successful_tasks/self.statistics.total_completed_tasks*100}% " - f"success rate)") - self.logger.info(f"Stats: {self.statistics.total_cpu_hours} CPU Hours logged successfully (estimate)") + self.statistics.total_core_hours += task_cpu_hours + try: + success_rate = self.statistics.total_successful_tasks/self.statistics.total_completed_tasks*100 + except ZeroDivisionError: + success_rate = "N/A" + stats_str = (f"Stats (Tasks): Processed={self.statistics.total_completed_tasks}, " + f"Failed={self.statistics.total_failed_tasks}, " + f"Success={success_rate}%, " + f"Core Hours (estimate)={self.statistics.total_core_hours}") # Handle efficiency calculations + if log_efficiency: - efficiency = min((max_cpu_hours_running + task_cpu_hours)/max_cpu_hours_possible * 100, 100) - self.logger.info(f"Stats: Resource consumption efficiency upper limit: {efficiency} " - f"(estimate, higher is better)") + # sum_task(task_wall_time * nthread / task) / sum_time(number_running_worker * nthread / worker * interval) + try: + efficiency_of_running = self.statistics.total_core_hours / self.statistics.total_core_hours_consumed + efficiency_of_potential = self.statistics.total_core_hours / self.statistics.total_core_hours_possible + except ZeroDivisionError: + efficiency_of_running = "(N/A yet)" + efficiency_of_potential = "(N/A yet)" + stats_str += (f", Worker Core Efficiency (est.): {efficiency_of_running}, " + f"Max Resource Core Efficiency (est.): {efficiency_of_potential}") + self.logger.info(stats_str) try: new_tasks = self.client._automodel_request("queue_manager", "get", payload) diff --git a/qcfractal/queue/parsl_adapter.py b/qcfractal/queue/parsl_adapter.py index 84f988f7b..a378f6a9b 100644 --- a/qcfractal/queue/parsl_adapter.py +++ b/qcfractal/queue/parsl_adapter.py @@ -78,13 +78,26 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: task = func(*task_spec["spec"]["args"], **task_spec["spec"]["kwargs"]) return task_spec["id"], task - def count_running(self) -> int: + def count_running_workers(self) -> int: running = 0 + executor_running_task_map = {key: False for key in self.client.executors.keys()} for task in self.queue.values(): status = self.client.tasks.get(task.tid, {}).get('status', None) if status == self._parsl_states.running: + executor_running_task_map[task['executor']] = True + if all(executor_running_task_map.values()): + # Efficiency loop break + break + + for executor in self.client.executors.values(): + if hasattr(executor, 'connected_workers'): + # Should return an int + running += executor.connected_workers + elif hasattr(executor, 'max_threads') and executor_running_task_map[executor]: running += 1 + else: + raise NotImplementedError("Cannot accurately estimate consumption from executors") return running From 261231520a6b5a1a062ebec667d2ea5dd6718ae6 Mon Sep 17 00:00:00 2001 From: Levi Naden Date: Tue, 9 Jul 2019 08:48:36 -0400 Subject: [PATCH 3/6] Finalize testing and I think finally caught all expected edge cases --- qcfractal/queue/executor_adapter.py | 8 ++++++++ qcfractal/queue/managers.py | 11 +++++++++-- qcfractal/queue/parsl_adapter.py | 4 ++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/qcfractal/queue/executor_adapter.py b/qcfractal/queue/executor_adapter.py index bdbee9704..3a795e16e 100644 --- a/qcfractal/queue/executor_adapter.py +++ b/qcfractal/queue/executor_adapter.py @@ -35,6 +35,10 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: task = self.client.submit(func, *task_spec["spec"]["args"], **task_spec["spec"]["kwargs"]) return task_spec["id"], task + def count_running_workers(self) -> int: + # If there are tasks, they are running + return bool(self.queue) + def acquire_complete(self) -> Dict[str, Any]: ret = {} del_keys = [] @@ -78,6 +82,10 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: func, *task_spec["spec"]["args"], **task_spec["spec"]["kwargs"], resources={"process": 1}) return task_spec["id"], task + def count_running_workers(self) -> int: + # Have not worked through this yet for Dask + raise NotImplementedError + def await_results(self) -> bool: from dask.distributed import wait wait(list(self.queue.values())) diff --git a/qcfractal/queue/managers.py b/qcfractal/queue/managers.py index 807bbb859..34b459e00 100644 --- a/qcfractal/queue/managers.py +++ b/qcfractal/queue/managers.py @@ -518,11 +518,18 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: f"Msg: {result.error.error_message}") # Try to get the wall time in the most fault-tolerant way try: - wall_time_seconds = float(results.input_data.get('provenance', {}).get('wall_time', 0)) + wall_time_seconds = float(result.input_data.get('provenance', {}).get('wall_time', 0)) + except AttributeError: + # Trap the result.input_data is None, but let other attribute errors go + if result.input_data is None: + wall_time_seconds = 0 + else: + raise except TypeError: # Trap wall time corruption, e.g. float(None) # Other Result corruptions will raise an error correctly wall_time_seconds = 0 + task_cpu_hours += wall_time_seconds * self.statistics.cores_per_task / 3600 n_fail = n_result - n_success @@ -548,7 +555,7 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: try: success_rate = self.statistics.total_successful_tasks/self.statistics.total_completed_tasks*100 except ZeroDivisionError: - success_rate = "N/A" + success_rate = "(N/A yet)" stats_str = (f"Stats (Tasks): Processed={self.statistics.total_completed_tasks}, " f"Failed={self.statistics.total_failed_tasks}, " f"Success={success_rate}%, " diff --git a/qcfractal/queue/parsl_adapter.py b/qcfractal/queue/parsl_adapter.py index a378f6a9b..4f3b9b8d3 100644 --- a/qcfractal/queue/parsl_adapter.py +++ b/qcfractal/queue/parsl_adapter.py @@ -90,11 +90,11 @@ def count_running_workers(self) -> int: # Efficiency loop break break - for executor in self.client.executors.values(): + for executor_key, executor in self.client.executors.items(): if hasattr(executor, 'connected_workers'): # Should return an int running += executor.connected_workers - elif hasattr(executor, 'max_threads') and executor_running_task_map[executor]: + elif hasattr(executor, 'max_threads') and executor_running_task_map[executor_key]: running += 1 else: raise NotImplementedError("Cannot accurately estimate consumption from executors") From 76b37e5d96c9178ff3b49b03e56bb9d7b6903351 Mon Sep 17 00:00:00 2001 From: Levi Naden Date: Tue, 9 Jul 2019 16:28:27 -0400 Subject: [PATCH 4/6] Make changes from comments Fixed a bug in Snowflake I found in an error catch Made LGTM ignore the @validator error Found a bug in the recursive directory creation of the server Made the stats output numbers less long floats On a related note: Nested F-string formatting! --- .lgtm.yml | 2 ++ qcfractal/cli/qcfractal_manager.py | 4 ++-- qcfractal/cli/qcfractal_server.py | 2 +- qcfractal/queue/executor_adapter.py | 8 ++++---- qcfractal/queue/managers.py | 32 ++++++++++++++++++----------- qcfractal/snowflake.py | 4 ++-- 6 files changed, 31 insertions(+), 21 deletions(-) diff --git a/.lgtm.yml b/.lgtm.yml index f888323bd..92973b8a4 100644 --- a/.lgtm.yml +++ b/.lgtm.yml @@ -11,3 +11,5 @@ path_classifiers: - qcfractal/dashboard/* # Very early state, some conditions forcing LGTM issues generated: - qcfractal/_version.py +queries: +- exclude: py/not-named-self # Blocks Pydantic's @validator not accepting `self` until a better fix can be found diff --git a/qcfractal/cli/qcfractal_manager.py b/qcfractal/cli/qcfractal_manager.py index 085af0ed0..1ccd54abe 100644 --- a/qcfractal/cli/qcfractal_manager.py +++ b/qcfractal/cli/qcfractal_manager.py @@ -186,7 +186,7 @@ class QueueManagerSettings(BaseSettings): None, description="Generally should not be set. Number of tasks to pull from the Fractal Server to keep locally at " "all times. If `None`, this is automatically computed as " - "`ceil(common.tasks_per_worker*common.max_workers*1.2) + 1`. As tasks are completed, the " + "`ceil(common.tasks_per_worker*common.max_workers*2.0) + 1`. As tasks are completed, the " "local pool is filled back up to this value. These tasks will all attempt to be run concurrently, " "but concurrent tasks are limited by number of cluster jobs and tasks per job. Pulling too many of " "these can result in under-utilized managers from other sites and result in less FIFO returns. As " @@ -817,7 +817,7 @@ def main(args=None): max_concurrent_tasks = settings.common.tasks_per_worker * settings.common.max_workers if settings.manager.max_queued_tasks is None: # Tasks * jobs * buffer + 1 - max_queued_tasks = ceil(max_concurrent_tasks * 1.20) + 1 + max_queued_tasks = ceil(max_concurrent_tasks * 2.00) + 1 else: max_queued_tasks = settings.manager.max_queued_tasks diff --git a/qcfractal/cli/qcfractal_server.py b/qcfractal/cli/qcfractal_server.py index 67eb044e6..0f05e4e27 100644 --- a/qcfractal/cli/qcfractal_server.py +++ b/qcfractal/cli/qcfractal_server.py @@ -116,7 +116,7 @@ def server_init(args, config): print("Initializing QCFractal configuration.") # Configuration settings - config.base_path.mkdir(exist_ok=True) + config.base_path.mkdir(parents=True, exist_ok=True) overwrite = args.get("overwrite", False) psql = PostgresHarness(config, quiet=False, logger=print) diff --git a/qcfractal/queue/executor_adapter.py b/qcfractal/queue/executor_adapter.py index 3a795e16e..23f4001ef 100644 --- a/qcfractal/queue/executor_adapter.py +++ b/qcfractal/queue/executor_adapter.py @@ -36,8 +36,8 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: return task_spec["id"], task def count_running_workers(self) -> int: - # If there are tasks, they are running - return bool(self.queue) + # This is always "running", even if there are no tasks since its running locally + return 1 def acquire_complete(self) -> Dict[str, Any]: ret = {} @@ -83,8 +83,8 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: return task_spec["id"], task def count_running_workers(self) -> int: - # Have not worked through this yet for Dask - raise NotImplementedError + # Note: This number may not quite be right if its treating "worker" as a "job" or Dask-Distributed "worker" + return self.client._count_active_workers() def await_results(self) -> bool: from dask.distributed import wait diff --git a/qcfractal/queue/managers.py b/qcfractal/queue/managers.py index 34b459e00..8d0b738f0 100644 --- a/qcfractal/queue/managers.py +++ b/qcfractal/queue/managers.py @@ -552,26 +552,34 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: self.statistics.total_failed_tasks += n_fail self.statistics.total_successful_tasks += n_success self.statistics.total_core_hours += task_cpu_hours - try: - success_rate = self.statistics.total_successful_tasks/self.statistics.total_completed_tasks*100 - except ZeroDivisionError: + na_format = '' + percent_format = '.2f' + if self.statistics.total_completed_tasks == 0: success_rate = "(N/A yet)" + success_format = na_format + else: + success_rate = self.statistics.total_successful_tasks / self.statistics.total_completed_tasks * 100 + success_format = percent_format stats_str = (f"Stats (Tasks): Processed={self.statistics.total_completed_tasks}, " f"Failed={self.statistics.total_failed_tasks}, " - f"Success={success_rate}%, " - f"Core Hours (estimate)={self.statistics.total_core_hours}") - # Handle efficiency calculations + f"Success={success_rate:{success_format}}%, " + f"Core Hours (est.)={self.statistics.total_core_hours}") + # Handle efficiency calculations if log_efficiency: + # Efficiency calculated as: # sum_task(task_wall_time * nthread / task) / sum_time(number_running_worker * nthread / worker * interval) - try: - efficiency_of_running = self.statistics.total_core_hours / self.statistics.total_core_hours_consumed - efficiency_of_potential = self.statistics.total_core_hours / self.statistics.total_core_hours_possible - except ZeroDivisionError: + if self.statistics.total_core_hours_consumed == 0 or self.statistics.total_core_hours_possible == 0: efficiency_of_running = "(N/A yet)" efficiency_of_potential = "(N/A yet)" - stats_str += (f", Worker Core Efficiency (est.): {efficiency_of_running}, " - f"Max Resource Core Efficiency (est.): {efficiency_of_potential}") + efficiency_format = na_format + else: + efficiency_of_running = self.statistics.total_core_hours / self.statistics.total_core_hours_consumed + efficiency_of_potential = self.statistics.total_core_hours / self.statistics.total_core_hours_possible + efficiency_format = percent_format + stats_str += (f", Worker Core Efficiency (est.): {efficiency_of_running*100:{efficiency_format}}%, " + f"Max Resource Core Efficiency (est.): {efficiency_of_potential*100:{efficiency_format}}%") + self.logger.info(stats_str) try: diff --git a/qcfractal/snowflake.py b/qcfractal/snowflake.py index 5a3e21505..2d30e0d8f 100644 --- a/qcfractal/snowflake.py +++ b/qcfractal/snowflake.py @@ -66,7 +66,7 @@ def __init__(self, The maximum number of ProcessPoolExecutor to spin up. storage_uri : Optional[str], optional A database URI to connect to, otherwise builds a default instance in a - tempory directory + temporary directory storage_project_name : str, optional The database name max_active_services : int, optional @@ -118,7 +118,7 @@ def __init__(self, self.logfile = logging log_prefix = self.logfile else: - raise KeyError(f"Logfile type not recognized {type(logfile)}.") + raise KeyError(f"Logfile type not recognized {type(logging)}.") super().__init__(name="QCFractal Snowflake Instance", port=find_port(), From 34eb760becbbf273023ca6c3e96d5dd7f35eb413 Mon Sep 17 00:00:00 2001 From: Levi Naden Date: Tue, 9 Jul 2019 16:56:19 -0400 Subject: [PATCH 5/6] Try to fix the Dask adapter for number of running workers --- qcfractal/queue/executor_adapter.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/qcfractal/queue/executor_adapter.py b/qcfractal/queue/executor_adapter.py index 23f4001ef..444673447 100644 --- a/qcfractal/queue/executor_adapter.py +++ b/qcfractal/queue/executor_adapter.py @@ -83,8 +83,11 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: return task_spec["id"], task def count_running_workers(self) -> int: - # Note: This number may not quite be right if its treating "worker" as a "job" or Dask-Distributed "worker" - return self.client._count_active_workers() + if hasattr(self.client.cluster, '_count_active_workers'): + # Note: This number may not quite be right if its treating "worker" as a "job" or Dask-Distributed "worker" + return self.client.cluster._count_active_workers() + else: + return len(self.client.scheduler_info()['workers']) def await_results(self) -> bool: from dask.distributed import wait From 3a1dd61a0c199e7cf53ddad0747f064b4da23b47 Mon Sep 17 00:00:00 2001 From: Levi Naden Date: Wed, 10 Jul 2019 09:57:59 -0400 Subject: [PATCH 6/6] Update from comments. --- qcfractal/queue/executor_adapter.py | 5 +++-- qcfractal/queue/managers.py | 23 +++++++++++++---------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/qcfractal/queue/executor_adapter.py b/qcfractal/queue/executor_adapter.py index 444673447..243f51b44 100644 --- a/qcfractal/queue/executor_adapter.py +++ b/qcfractal/queue/executor_adapter.py @@ -84,10 +84,11 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]: def count_running_workers(self) -> int: if hasattr(self.client.cluster, '_count_active_workers'): - # Note: This number may not quite be right if its treating "worker" as a "job" or Dask-Distributed "worker" + # Note: This should be right since its counting Dask Workers, and each Dask Worker = 1 task, which we then + # Multiply by cores_per_task in the manager. return self.client.cluster._count_active_workers() else: - return len(self.client.scheduler_info()['workers']) + return len(self.client.cluster.scheduler.workers) def await_results(self) -> bool: from dask.distributed import wait diff --git a/qcfractal/queue/managers.py b/qcfractal/queue/managers.py index 8d0b738f0..ab026524e 100644 --- a/qcfractal/queue/managers.py +++ b/qcfractal/queue/managers.py @@ -553,17 +553,17 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: self.statistics.total_successful_tasks += n_success self.statistics.total_core_hours += task_cpu_hours na_format = '' - percent_format = '.2f' + float_format = ',.2f' if self.statistics.total_completed_tasks == 0: success_rate = "(N/A yet)" success_format = na_format else: success_rate = self.statistics.total_successful_tasks / self.statistics.total_completed_tasks * 100 - success_format = percent_format - stats_str = (f"Stats (Tasks): Processed={self.statistics.total_completed_tasks}, " - f"Failed={self.statistics.total_failed_tasks}, " - f"Success={success_rate:{success_format}}%, " - f"Core Hours (est.)={self.statistics.total_core_hours}") + success_format = float_format + task_stats_str = (f"Task Stats: Processed={self.statistics.total_completed_tasks}, " + f"Failed={self.statistics.total_failed_tasks}, " + f"Success={success_rate:{success_format}}%") + worker_stats_str = f"Worker Stats (est.): Core Hours Used={self.statistics.total_core_hours:{float_format}}, " # Handle efficiency calculations if log_efficiency: @@ -576,11 +576,14 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool: else: efficiency_of_running = self.statistics.total_core_hours / self.statistics.total_core_hours_consumed efficiency_of_potential = self.statistics.total_core_hours / self.statistics.total_core_hours_possible - efficiency_format = percent_format - stats_str += (f", Worker Core Efficiency (est.): {efficiency_of_running*100:{efficiency_format}}%, " - f"Max Resource Core Efficiency (est.): {efficiency_of_potential*100:{efficiency_format}}%") + efficiency_format = float_format + worker_stats_str += f"Core Usage Efficiency: {efficiency_of_running*100:{efficiency_format}}%" + if self.verbose: + worker_stats_str += (f", Core Usage vs. Max Resources Requested: " + f"{efficiency_of_potential*100:{efficiency_format}}%") - self.logger.info(stats_str) + self.logger.info(task_stats_str) + self.logger.info(worker_stats_str) try: new_tasks = self.client._automodel_request("queue_manager", "get", payload)