Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manager Quality of Life statistics #313

Merged
merged 6 commits into from
Jul 10, 2019
Merged

Conversation

Lnaden
Copy link
Collaborator

@Lnaden Lnaden commented Jul 3, 2019

Added quality of life stats to the Queue Manager reporting,
only really works in parsl for efficiency, but will track totals
as well.

Progress towards (and possible resolution) of #262

Still a work in progress, but I wanted to open this anyways. Getting stats on running jobs turns out to be kind of a large pain.

Status

  • Changelog updated
  • Ready to go

@Lnaden Lnaden added Enhancement Project enhancement discussion Queue Related to the Queue system labels Jul 3, 2019
now = self.statistics.last_update_time = time.time()
time_delta_seconds = now - last_time
try:
running_tasks = self.queue_adapter.count_running()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of running tasks why not count the number of running workers? The number of running workers is often available see connected_workers here and Dask.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because counting the running tasks is a better gauge of how many resources are being consumed. If we only count workers, then we don't take into account that each worker can have multiple tasks running on it. I did try to work it out such that cores cannot be oversubscribed, but I don't know how effective that is.

if result.success:
n_success += 1
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would recommend if hasattr(result.provenance 'wall_time').

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a cleaner construct, I'll change that

wall_time_seconds = result.input_data['provenance'].get('wall_time')
except:
pass
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a try here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case for whatever reason the wall_time variable does not get filled in with an actual float. I'm trying to preempt getting something like a None here, or having a job crash because the cluster job died and something mangled came back. This is just a safety catch which won't bring down the whole manager if something goes very wrong.

Then again, we may want it to do exactly that to better catch actual bugs. Thoughts?

f"{self.statistics.total_failed_tasks} failed "
f"({self.statistics.total_successful_tasks/self.statistics.total_completed_tasks*100}% "
f"success rate)")
self.logger.info(f"Stats: {self.statistics.total_cpu_hours} CPU Hours logged successfully (estimate)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to make this a single line?

(processed=5, failed=1, success=80%, core_hours=53.3)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I like that better

self.logger.info(f"Stats: {self.statistics.total_cpu_hours} CPU Hours logged successfully (estimate)")
# Handle efficiency calculations
if log_efficiency:
efficiency = min((max_cpu_hours_running + task_cpu_hours)/max_cpu_hours_possible * 100, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be very careful here, this isn't cpu hours but core hours.

@@ -199,6 +199,23 @@ def close(self) -> bool:
True if the closing was successful.
"""

def count_running(self) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do leave this, perhaps count_running_tasks to be explicit.

Added quality of life stats to the Queue Manager reporting,
only really works in parsl for efficiency, but will track totals
as well.

Progress towards (and possible resolution) of MolSSI#262
Copy link
Contributor

@dgasmith dgasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, I think this will be really nice! Is there any way you can snapshot a log to give an idea of what this looks like in practice?

if settings.manager.max_queued_tasks is None:
# Tasks * jobs * buffer + 1
max_queued_tasks = ceil(settings.common.tasks_per_worker * settings.common.max_workers * 1.20) + 1
max_queued_tasks = ceil(max_concurrent_tasks * 1.20) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us go ahead and bump this to 2x.

@@ -78,6 +82,10 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
func, *task_spec["spec"]["args"], **task_spec["spec"]["kwargs"], resources={"process": 1})
return task_spec["id"], task

def count_running_workers(self) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to call this function.

running_workers = 0
log_efficiency = False
max_core_hours_running = time_delta_seconds * running_workers * self.statistics.cores_per_task / 3600
max_core_hours_possible = (time_delta_seconds * self.statistics.max_concurrent_tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure max_core_hours_possible is too interesting here? This number focuses on your own queue and how active you were. I guess for low queue its possibly nice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to leave this one in because it gives an upfront estimate of how well we are utilizing the resources you requested over the resources you were allocated. If, say, someone fires this up and they only get half the nodes they requested, they can look into that. Or, for instance in case of Dask sometimes, users request a bunch of resources they know are available and could be allocated, but then the adapter only spins up a fraction of them, it would be indicative.

I could also just mask this behind a verbosity level since it could get distracting, especially in the case of drained queues, or remove it outright, but I think there are valid reasons for tracking it at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, let’s leave it in for now. I think it might be too much standard info, but easier to remove or move to a debug logging than to add back later.

self.statistics.total_failed_tasks += n_fail
self.statistics.total_successful_tasks += n_success
self.statistics.total_core_hours += task_cpu_hours
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this via an if?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I do this twice so I'll switch it, any particular reason or just preference of if...else over try...except?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just seems a bit cleaner and more inline with the rest of the code base. I have often found exception catching to be imprecise for these catches.

def count_running_workers(self) -> int:

running = 0
executor_running_task_map = {key: False for key in self.client.executors.keys()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like something we could ask of Parsl.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, it would really need to be a way to loop over every executor in the Dataflow client and count the workers in a way which makes sense to the executor. Since we only really allow ThreadPool and HighThroughput executors, I just dealt with those for now.

I could probably make this faster and simpler by simply assuming "Is there a ThreadPoolWorker present? Of so it might as well be working since its effectively running locally." Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is something the parsl team can easily integrate and test without us being in the loop. I think it’s fine for now, but this level of introspection is often easily breakable.

Fixed a bug in Snowflake I found in an error catch
Made LGTM ignore the @validator error
Found a bug in the recursive directory creation of the server
Made the stats output numbers less long floats
 On a related note: Nested F-string formatting!
@codecov
Copy link

codecov bot commented Jul 9, 2019

Codecov Report

Merging #313 into master will decrease coverage by 2.18%.
The diff coverage is 66.33%.

@Lnaden
Copy link
Collaborator Author

Lnaden commented Jul 9, 2019

The most recent commit made a few other changes which I needed to do while getting this sample of the logfile output from below, I'll want a sign off on some of these since they do touch a few things I was not working on at first (cc @dgasmith for the first 3, I can undo them and make a separate PR if you want for them.):

  • Fixed a bug in Snowflake I found in an error catch
  • Made LGTM ignore the @validator error
  • Found a bug in the recursive directory creation of the qcfractal-server init
  • Made the stats output numbers less long floats
    • On a related note: Nested f-string formatting is a thing

Sample of the output (with changes from latest commit):

Uses a Snowflake and the ProcessPoolExecutor to do short tasks on 1 core, 1 worker. Since it is the Pool executor, the potential and used resources will be the same as its always running.

[I 190709 16:23:50 managers:288] QueueManager successfully started.
    
[I 190709 16:23:50 managers:536] Pushed 0 complete tasks to the server (0 success / 0 fail).
[I 190709 16:23:50 managers:583] Stats (Tasks): Processed=0, Failed=0, Success=(N/A yet)%, Core Hours (est.)=0, Worker Core Efficiency (est.): 0.00%, Max Resource Core Efficiency (est.): 0.00%
[I 190709 16:23:50 managers:592] Acquired 10 new tasks.
[I 190709 16:24:01 managers:536] Pushed 10 complete tasks to the server (10 success / 0 fail).
[I 190709 16:24:01 managers:583] Stats (Tasks): Processed=10, Failed=0, Success=100.00%, Core Hours (est.)=0.002128599683443705, Worker Core Efficiency (est.): 74.55%, Max Resource Core Efficiency (est.): 74.55%
[I 190709 16:24:01 managers:592] Acquired 10 new tasks.
[I 190709 16:24:11 managers:536] Pushed 10 complete tasks to the server (10 success / 0 fail).
[I 190709 16:24:11 managers:583] Stats (Tasks): Processed=20, Failed=0, Success=100.00%, Core Hours (est.)=0.004118374983469645, Worker Core Efficiency (est.): 72.57%, Max Resource Core Efficiency (est.): 72.57%
[I 190709 16:24:11 managers:592] Acquired 10 new tasks.
[I 190709 16:24:21 managers:536] Pushed 10 complete tasks to the server (10 success / 0 fail).
[I 190709 16:24:21 managers:583] Stats (Tasks): Processed=30, Failed=0, Success=100.00%, Core Hours (est.)=0.006145976119571262, Worker Core Efficiency (est.): 72.40%, Max Resource Core Efficiency (est.): 72.40%
[I 190709 16:24:21 managers:592] Acquired 10 new tasks.
[I 190709 16:24:31 managers:536] Pushed 10 complete tasks to the server (10 success / 0 fail).
[I 190709 16:24:31 managers:583] Stats (Tasks): Processed=40, Failed=0, Success=100.00%, Core Hours (est.)=0.008195060094197591, Worker Core Efficiency (est.): 72.50%, Max Resource Core Efficiency (est.): 72.50%
[I 190709 16:24:31 managers:592] Acquired 10 new tasks.
[I 190709 16:24:41 managers:536] Pushed 10 complete tasks to the server (10 success / 0 fail).
[I 190709 16:24:41 managers:583] Stats (Tasks): Processed=50, Failed=0, Success=100.00%, Core Hours (est.)=0.010241767830318875, Worker Core Efficiency (est.): 72.47%, Max Resource Core Efficiency (est.): 72.47%
[I 190709 16:24:41 managers:592] Acquired 0 new tasks.

@@ -83,8 +83,8 @@ def _submit_task(self, task_spec: Dict[str, Any]) -> Tuple[Hashable, Any]:
return task_spec["id"], task

def count_running_workers(self) -> int:
# Have not worked through this yet for Dask
raise NotImplementedError
# Note: This number may not quite be right if its treating "worker" as a "job" or Dask-Distributed "worker"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.client._count_active_workers() * self.client.worker_processes in that case?

@dgasmith
Copy link
Contributor

dgasmith commented Jul 9, 2019

I think the status line is too long clocking in at 211 chars, what about:

[I 190709 16:24:41 managers:583] Task stats: Complete=50, Failures=0, Success=100.00%, Pending=30
[I 190709 16:24:41 managers:583] Worker stats (est.): Core-hours=%.2f, Distribution-efficiency=%.2f, Worker-uptime=%.2f

Still clocks in at ~120 chars. Should probably try to shorten slightly more.

For core-hours we can use the lovely:

>>> val = 300000012340000.12341
>>> f'{val:,.2f}'
'300,000,012,340,000.12'

@lgtm-com
Copy link

lgtm-com bot commented Jul 9, 2019

This pull request fixes 1 alert when merging 34eb760 into f5e0e0c - view on LGTM.com

fixed alerts:

  • 1 for Module is imported with 'import' and 'import from'

@Lnaden
Copy link
Collaborator Author

Lnaden commented Jul 10, 2019

The log now looks like this:

[I 190710 10:00:49 managers:536] Pushed 10 complete tasks to the server (10 success / 0 fail).
[I 190710 10:00:49 managers:585] Task Stats: Processed=30, Failed=0, Success=100.00%
[I 190710 10:00:49 managers:586] Worker Stats (est.): Core Hours Used=0.01, Core Usage Efficiency: 80.45%

In verbose mode, it includes an additional field

[I 190710 09:57:21 managers:536] Pushed 10 complete tasks to the server (10 success / 0 fail).
[I 190710 09:57:21 managers:585] Task Stats: Processed=40, Failed=0, Success=100.00%
[I 190710 09:57:21 managers:586] Worker Stats (est.): Core Hours Used=0.01, Core Usage Efficiency: 83.35%, Core Usage vs. Max Resources Requested: 83.35%

Because of rounding, the Core Hours used don't report until above 0.01 hours.

Copy link
Contributor

@dgasmith dgasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@dgasmith dgasmith merged commit 24c29cf into MolSSI:master Jul 10, 2019
@Lnaden Lnaden deleted the manager_qol branch July 10, 2019 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Enhancement Project enhancement discussion Queue Related to the Queue system
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants