Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manager Quality of Life statistics #313

Merged
merged 6 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .lgtm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ path_classifiers:
- qcfractal/dashboard/* # Very early state, some conditions forcing LGTM issues
generated:
- qcfractal/_version.py
queries:
- exclude: py/not-named-self # Blocks Pydantic's @validator not accepting `self` until a better fix can be found
8 changes: 6 additions & 2 deletions qcfractal/cli/qcfractal_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class QueueManagerSettings(BaseSettings):
None,
description="Generally should not be set. Number of tasks to pull from the Fractal Server to keep locally at "
"all times. If `None`, this is automatically computed as "
"`ceil(common.tasks_per_worker*common.max_workers*1.2) + 1`. As tasks are completed, the "
"`ceil(common.tasks_per_worker*common.max_workers*2.0) + 1`. As tasks are completed, the "
"local pool is filled back up to this value. These tasks will all attempt to be run concurrently, "
"but concurrent tasks are limited by number of cluster jobs and tasks per job. Pulling too many of "
"these can result in under-utilized managers from other sites and result in less FIFO returns. As "
Expand Down Expand Up @@ -814,9 +814,10 @@ def main(args=None):

# Build out the manager itself
# Compute max tasks
max_concurrent_tasks = settings.common.tasks_per_worker * settings.common.max_workers
if settings.manager.max_queued_tasks is None:
# Tasks * jobs * buffer + 1
max_queued_tasks = ceil(settings.common.tasks_per_worker * settings.common.max_workers * 1.20) + 1
max_queued_tasks = ceil(max_concurrent_tasks * 2.00) + 1
else:
max_queued_tasks = settings.manager.max_queued_tasks

Expand All @@ -833,6 +834,9 @@ def main(args=None):
verbose=settings.common.verbose
)

# Set stats correctly since we buffer the max tasks a bit
manager.statistics.max_concurrent_tasks = max_concurrent_tasks

# Add exit callbacks
for cb in exit_callbacks:
manager.add_exit_callback(cb[0], *cb[1], **cb[2])
Expand Down
2 changes: 1 addition & 1 deletion qcfractal/cli/qcfractal_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def server_init(args, config):
print("Initializing QCFractal configuration.")
# Configuration settings

config.base_path.mkdir(exist_ok=True)
config.base_path.mkdir(parents=True, exist_ok=True)
overwrite = args.get("overwrite", False)

psql = PostgresHarness(config, quiet=False, logger=print)
Expand Down
2 changes: 1 addition & 1 deletion qcfractal/queue/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .parsl_adapter import ParslAdapter


def build_queue_adapter(workflow_client, logger=None, **kwargs):
def build_queue_adapter(workflow_client, logger=None, **kwargs) -> 'BaseAdapter':
"""Constructs a queue manager based off the incoming queue socket type.

Parameters
Expand Down
17 changes: 17 additions & 0 deletions qcfractal/queue/base_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,23 @@ def close(self) -> bool:
True if the closing was successful.
"""

def count_running_workers(self) -> int:
"""
Adapter-specific implementation to count the currently running workers, helpful for resource consumption.
May not be implemented or possible for each adapter, nor is it required for
operation. As such, this it is not required to be implemented as an abstract method.

Returns
-------
int
Number of running workers

Raises
------
NotImplementedError
"""
raise NotImplementedError("This adapter has not implemented this method yet")

@abc.abstractmethod
def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
"""
Expand Down
8 changes: 8 additions & 0 deletions qcfractal/queue/executor_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
task = self.client.submit(func, *task_spec["spec"]["args"], **task_spec["spec"]["kwargs"])
return task_spec["id"], task

def count_running_workers(self) -> int:
# This is always "running", even if there are no tasks since its running locally
return 1

def acquire_complete(self) -> Dict[str, Any]:
ret = {}
del_keys = []
Expand Down Expand Up @@ -78,6 +82,10 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
func, *task_spec["spec"]["args"], **task_spec["spec"]["kwargs"], resources={"process": 1})
return task_spec["id"], task

def count_running_workers(self) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to call this function.

# Note: This number may not quite be right if its treating "worker" as a "job" or Dask-Distributed "worker"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.client._count_active_workers() * self.client.worker_processes in that case?

return self.client._count_active_workers()

def await_results(self) -> bool:
from dask.distributed import wait
wait(list(self.queue.values()))
Expand Down
117 changes: 115 additions & 2 deletions qcfractal/queue/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import uuid
from typing import Any, Callable, Dict, List, Optional, Union

from pydantic import BaseModel, validator
import qcengine as qcng
from qcfractal.extras import get_information

Expand All @@ -19,6 +20,43 @@
__all__ = ["QueueManager"]


class QueueStatistics(BaseModel):
"""
Queue Manager Job statistics
"""
# Dynamic quantities
total_successful_tasks: int = 0
total_failed_tasks: int = 0
total_core_hours: float = 0
total_core_hours_consumed: float = 0
total_core_hours_possible: float = 0
# Static Quantities
max_concurrent_tasks: int = 0
cores_per_task: int = 0
last_update_time: float = None
# sum_task(task_wall_time * nthread/task) / sum_time(number_running_worker * nthread/worker * interval)

def __init__(self, **kwargs):
if kwargs.get('last_update_time', None) is None:
kwargs['last_update_time'] = time.time()
super().__init__(**kwargs)

@property
def total_completed_tasks(self):
return self.total_successful_tasks + self.total_failed_tasks

@property
def theoretical_max_consumption(self):
"""In Core Hours"""
return self.max_concurrent_tasks * self.cores_per_task * (time.time() - self.last_update_time) / 3600

@validator('cores_per_task', pre=True)
def cores_per_tasks_none(cls, v):
if v is None:
v = 1
return v


class QueueManager:
"""
This object maintains a computational queue and watches for finished tasks for different
Expand All @@ -42,7 +80,7 @@ def __init__(self,
logger: Optional[logging.Logger] = None,
max_tasks: int = 200,
queue_tag: str = None,
manager_name: str = "unlabled",
manager_name: str = "unlabeled",
update_frequency: Union[int, float] = 2,
verbose: bool = True,
server_error_retries: Optional[int] = 1,
Expand Down Expand Up @@ -112,6 +150,10 @@ def __init__(self,
self.max_tasks = max_tasks
self.queue_tag = queue_tag
self.verbose = verbose
self.statistics = QueueStatistics(max_concurrent_tasks=self.max_tasks,
cores_per_task=cores_per_task,
update_frequency=update_frequency
)

self.scheduler = None
self.update_frequency = update_frequency
Expand Down Expand Up @@ -280,7 +322,6 @@ def close_adapter(self) -> bool:

return self.queue_adapter.close()


## Queue Manager functions

def heartbeat(self) -> None:
Expand Down Expand Up @@ -428,9 +469,28 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool:
self._update_stale_jobs(allow_shutdown=allow_shutdown)

results = self.queue_adapter.acquire_complete()

# Stats fetching for running tasks, as close to the time we got the jobs as we can
last_time = self.statistics.last_update_time
now = self.statistics.last_update_time = time.time()
time_delta_seconds = now - last_time
try:
running_workers = self.queue_adapter.count_running_workers()
log_efficiency = True
except NotImplementedError:
running_workers = 0
log_efficiency = False
max_core_hours_running = time_delta_seconds * running_workers * self.statistics.cores_per_task / 3600
max_core_hours_possible = (time_delta_seconds * self.statistics.max_concurrent_tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure max_core_hours_possible is too interesting here? This number focuses on your own queue and how active you were. I guess for low queue its possibly nice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to leave this one in because it gives an upfront estimate of how well we are utilizing the resources you requested over the resources you were allocated. If, say, someone fires this up and they only get half the nodes they requested, they can look into that. Or, for instance in case of Dask sometimes, users request a bunch of resources they know are available and could be allocated, but then the adapter only spins up a fraction of them, it would be indicative.

I could also just mask this behind a verbosity level since it could get distracting, especially in the case of drained queues, or remove it outright, but I think there are valid reasons for tracking it at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, let’s leave it in for now. I think it might be too much standard info, but easier to remove or move to a debug logging than to add back later.

* self.statistics.cores_per_task / 3600)
self.statistics.total_core_hours_consumed += max_core_hours_running
self.statistics.total_core_hours_possible += max_core_hours_possible

# Process jobs
n_success = 0
n_fail = 0
n_result = len(results)
task_cpu_hours = 0
error_payload = []
jobs_pushed = f"Pushed {n_result} complete tasks to the server "
if n_result:
Expand All @@ -448,11 +508,29 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool:

self.active -= n_result
for key, result in results.items():
wall_time_seconds = 0
if result.success:
n_success += 1
if hasattr(result.provenance, 'wall_time'):
wall_time_seconds = float(result.provenance.wall_time)
else:
error_payload.append(f"Job {key} failed: {result.error.error_type} - "
f"Msg: {result.error.error_message}")
# Try to get the wall time in the most fault-tolerant way
try:
wall_time_seconds = float(result.input_data.get('provenance', {}).get('wall_time', 0))
except AttributeError:
# Trap the result.input_data is None, but let other attribute errors go
if result.input_data is None:
wall_time_seconds = 0
else:
raise
except TypeError:
# Trap wall time corruption, e.g. float(None)
# Other Result corruptions will raise an error correctly
wall_time_seconds = 0

task_cpu_hours += wall_time_seconds * self.statistics.cores_per_task / 3600
n_fail = n_result - n_success

self.logger.info(jobs_pushed + f"({n_success} success / {n_fail} fail).")
Expand All @@ -469,6 +547,41 @@ def update(self, new_tasks: bool = True, allow_shutdown=True) -> bool:
# Get new tasks
payload = self._payload_template()
payload["data"]["limit"] = open_slots

# Crunch Statistics
self.statistics.total_failed_tasks += n_fail
self.statistics.total_successful_tasks += n_success
self.statistics.total_core_hours += task_cpu_hours
na_format = ''
percent_format = '.2f'
if self.statistics.total_completed_tasks == 0:
success_rate = "(N/A yet)"
success_format = na_format
else:
success_rate = self.statistics.total_successful_tasks / self.statistics.total_completed_tasks * 100
success_format = percent_format
stats_str = (f"Stats (Tasks): Processed={self.statistics.total_completed_tasks}, "
f"Failed={self.statistics.total_failed_tasks}, "
f"Success={success_rate:{success_format}}%, "
f"Core Hours (est.)={self.statistics.total_core_hours}")

# Handle efficiency calculations
if log_efficiency:
# Efficiency calculated as:
# sum_task(task_wall_time * nthread / task) / sum_time(number_running_worker * nthread / worker * interval)
if self.statistics.total_core_hours_consumed == 0 or self.statistics.total_core_hours_possible == 0:
efficiency_of_running = "(N/A yet)"
efficiency_of_potential = "(N/A yet)"
efficiency_format = na_format
else:
efficiency_of_running = self.statistics.total_core_hours / self.statistics.total_core_hours_consumed
efficiency_of_potential = self.statistics.total_core_hours / self.statistics.total_core_hours_possible
efficiency_format = percent_format
stats_str += (f", Worker Core Efficiency (est.): {efficiency_of_running*100:{efficiency_format}}%, "
f"Max Resource Core Efficiency (est.): {efficiency_of_potential*100:{efficiency_format}}%")

self.logger.info(stats_str)

try:
new_tasks = self.client._automodel_request("queue_manager", "get", payload)
except IOError as exc:
Expand Down
24 changes: 24 additions & 0 deletions qcfractal/queue/parsl_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, client: Any, logger: Optional[logging.Logger] = None, **kwarg

import parsl
self.client = parsl.dataflow.dflow.DataFlowKernel(self.client)
self._parsl_states = parsl.dataflow.states.States
self.app_map = {}

def __repr__(self):
Expand Down Expand Up @@ -77,6 +78,29 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
task = func(*task_spec["spec"]["args"], **task_spec["spec"]["kwargs"])
return task_spec["id"], task

def count_running_workers(self) -> int:

running = 0
executor_running_task_map = {key: False for key in self.client.executors.keys()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like something we could ask of Parsl.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, it would really need to be a way to loop over every executor in the Dataflow client and count the workers in a way which makes sense to the executor. Since we only really allow ThreadPool and HighThroughput executors, I just dealt with those for now.

I could probably make this faster and simpler by simply assuming "Is there a ThreadPoolWorker present? Of so it might as well be working since its effectively running locally." Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is something the parsl team can easily integrate and test without us being in the loop. I think it’s fine for now, but this level of introspection is often easily breakable.

for task in self.queue.values():
status = self.client.tasks.get(task.tid, {}).get('status', None)
if status == self._parsl_states.running:
executor_running_task_map[task['executor']] = True
if all(executor_running_task_map.values()):
# Efficiency loop break
break

for executor_key, executor in self.client.executors.items():
if hasattr(executor, 'connected_workers'):
# Should return an int
running += executor.connected_workers
elif hasattr(executor, 'max_threads') and executor_running_task_map[executor_key]:
running += 1
else:
raise NotImplementedError("Cannot accurately estimate consumption from executors")

return running

def acquire_complete(self) -> Dict[str, Any]:
ret = {}
del_keys = []
Expand Down
4 changes: 2 additions & 2 deletions qcfractal/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self,
The maximum number of ProcessPoolExecutor to spin up.
storage_uri : Optional[str], optional
A database URI to connect to, otherwise builds a default instance in a
tempory directory
temporary directory
storage_project_name : str, optional
The database name
max_active_services : int, optional
Expand Down Expand Up @@ -118,7 +118,7 @@ def __init__(self,
self.logfile = logging
log_prefix = self.logfile
else:
raise KeyError(f"Logfile type not recognized {type(logfile)}.")
raise KeyError(f"Logfile type not recognized {type(logging)}.")

super().__init__(name="QCFractal Snowflake Instance",
port=find_port(),
Expand Down