From ca2ab3a924df7f4b4fe3f6fae077845a7509b330 Mon Sep 17 00:00:00 2001 From: bd Date: Sun, 3 Aug 2025 20:51:40 -0400 Subject: [PATCH 01/14] Redirect python shebang to use `env' Revert before merge. Allows tests to run on guix/nix. --- scalene/redirect_python.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalene/redirect_python.py b/scalene/redirect_python.py index b51d0245b..cf96c7c84 100644 --- a/scalene/redirect_python.py +++ b/scalene/redirect_python.py @@ -22,7 +22,7 @@ def redirect_python( f"python{sys.version_info.major}.{sys.version_info.minor}{base_python_extension}", ] - shebang = "@echo off" if sys.platform == "win32" else "#!/bin/bash" + shebang = "@echo off" if sys.platform == "win32" else "#!/usr/bin/env bash" all_args = "%*" if sys.platform == "win32" else '"$@"' payload = f"{shebang}\n{preface} {sys.executable} -m scalene {cmdline} {all_args}\n" From ee06f3ad7de18cbe30622502bb03e522734b8b31 Mon Sep 17 00:00:00 2001 From: bd Date: Sun, 3 Aug 2025 22:51:12 -0400 Subject: [PATCH 02/14] Initial methods to collect idle asyncio task frames --- scalene/scalene_asyncio.py | 119 +++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 scalene/scalene_asyncio.py diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py new file mode 100644 index 000000000..55fffae2a --- /dev/null +++ b/scalene/scalene_asyncio.py @@ -0,0 +1,119 @@ +import asyncio +import sys +import threading + +from types import FrameType +from typing import List + + +class ScaleneAsyncio: + """Provides a set of methods to collect idle task frames.""" + + @staticmethod + def compute_suspended_frames_to_record() -> List[FrameType]: + """Collect all frames which belong to suspended tasks.""" + loops = ScaleneAsyncio._get_event_loops() + return ScaleneAsyncio._get_frames_from_loops(loops) + + @staticmethod + def _get_event_loops() -> List[asyncio.AbstractEventLoop]: + """Returns each thread's event loop. If there are none, returns + the empty array.""" + loops = [] + for t in threading.enumerate(): + frame = sys._current_frames().get(t.ident) + if frame: + loop = ScaleneAsyncio._walk_back_until_loop(frame) + # duplicates shouldn't be possible, but just in case... + if loop and loop not in loops: + loops.append(loop) + return loops + + @staticmethod + def _walk_back_until_loop(frame) -> asyncio.AbstractEventLoop: + """Helper for get_event_loops. + + Walks back the callstack until we are in a method named '_run_once'. + If this becomes true and the 'self' variable is an instance of + AbstractEventLoop, then we return that variable. + + This works because _run_once is one of the main methods asyncio uses + to facilitate its event loop, and is always on the stack while the + loop runs.""" + while frame: + if frame.f_code.co_name == '_run_once' and \ + 'self' in frame.f_locals: + loop = frame.f_locals['self'] + if isinstance(loop, asyncio.AbstractEventLoop): + return loop + else: + frame = frame.f_back + return None + + @staticmethod + def _get_frames_from_loops(loops) -> List[FrameType]: + """Given LOOPS, returns a flat list of frames corresponding to idle + tasks.""" + return [ + frames for loop in loops + for frames in ScaleneAsyncio._get_idle_task_frames(loop) + ] + + @staticmethod + def _get_idle_task_frames(loop) -> List[FrameType]: + """Given an asyncio event loop, returns the list of idle task frames. + We only care about idle task frames, as running tasks are already + included elsewhere. + + A task is considered 'idle' if it is pending and not the current + task.""" + idle = [] + current = asyncio.current_task(loop) + for task in asyncio.all_tasks(loop): + + # the task is not idle + if task == current: + continue + + coro = task.get_coro() + + # the task is suspended but not waiting on any other coroutines. + # this means it has not started---unstarted tasks do not report + # meaningful line numbers, so they are also thrown out + # (note that created tasks are scheduled and not run immediately) + if getattr(coro, 'cr_await', None) is None: + continue + + f = ScaleneAsyncio._get_deepest_traceable_frame(coro) + if f: + idle.append(f) + + # TODO + # handle async generators + # ideally, we would access these from _get_deepest_traceable_frame. + # doing it this way causes us to also assign the generator's time to + # the coroutine that called this generator in + # _get_deepest_traceable_frame + for ag in loop._asyncgens: + f = getattr(ag, 'ag_frame', None) + if f and should_trace(f.f_code.co_filename): + idle.append(f) + return idle + + @staticmethod + def _get_deepest_traceable_frame(coro) -> FrameType: + """Get the deepest frame of coro we care to trace. + This is possible because each corooutine keeps a reference to the + coroutine it is waiting on. + + Note that it cannot be the case that a task is suspended in a frame + that does not belong to a coroutine, asyncio is very particular about + that! This is also why we only track idle tasks this way.""" + curr = coro + deepest_frame = None + while curr: + frame = getattr(curr, 'cr_frame', None) + if frame and should_trace(frame.f_code.co_filename): + deepest_frame = frame + curr = getattr(curr, 'cr_await', None) + return deepest_frame From 1e18c766c6f47fcd9f8abae8a688f46178707c63 Mon Sep 17 00:00:00 2001 From: bd Date: Mon, 4 Aug 2025 00:30:32 -0400 Subject: [PATCH 03/14] Initial logic to incorporate profiling idle tasks Includes some shortcuts in logic. There seems to be an issue with reporting pertaining now that it is possible to collect more than one frame per thread. --- scalene/scalene_asyncio.py | 35 ++++++++++++++++++++++++----------- scalene/scalene_profiler.py | 27 ++++++++++++++++----------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index 55fffae2a..afcf142fa 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -3,20 +3,30 @@ import threading from types import FrameType -from typing import List +from typing import ( + List, + Tuple, + cast, +) class ScaleneAsyncio: """Provides a set of methods to collect idle task frames.""" + should_trace = None + @staticmethod - def compute_suspended_frames_to_record() -> List[FrameType]: + def compute_suspended_frames_to_record(should_trace) -> \ + List[Tuple[FrameType, int, FrameType]]: """Collect all frames which belong to suspended tasks.""" + # TODO + ScaleneAsyncio.should_trace = should_trace + loops = ScaleneAsyncio._get_event_loops() return ScaleneAsyncio._get_frames_from_loops(loops) @staticmethod - def _get_event_loops() -> List[asyncio.AbstractEventLoop]: + def _get_event_loops() -> List[Tuple[asyncio.AbstractEventLoop, int]]: """Returns each thread's event loop. If there are none, returns the empty array.""" loops = [] @@ -26,7 +36,7 @@ def _get_event_loops() -> List[asyncio.AbstractEventLoop]: loop = ScaleneAsyncio._walk_back_until_loop(frame) # duplicates shouldn't be possible, but just in case... if loop and loop not in loops: - loops.append(loop) + loops.append((loop, cast(int, t.ident))) return loops @staticmethod @@ -51,12 +61,13 @@ def _walk_back_until_loop(frame) -> asyncio.AbstractEventLoop: return None @staticmethod - def _get_frames_from_loops(loops) -> List[FrameType]: + def _get_frames_from_loops(loops) -> \ + List[Tuple[FrameType, int, FrameType]]: """Given LOOPS, returns a flat list of frames corresponding to idle tasks.""" return [ - frames for loop in loops - for frames in ScaleneAsyncio._get_idle_task_frames(loop) + (frame, tident, None) for loop, tident in loops + for frame in ScaleneAsyncio._get_idle_task_frames(loop) ] @staticmethod @@ -86,7 +97,7 @@ def _get_idle_task_frames(loop) -> List[FrameType]: f = ScaleneAsyncio._get_deepest_traceable_frame(coro) if f: - idle.append(f) + idle.append(cast(FrameType, f)) # TODO # handle async generators @@ -96,8 +107,9 @@ def _get_idle_task_frames(loop) -> List[FrameType]: # _get_deepest_traceable_frame for ag in loop._asyncgens: f = getattr(ag, 'ag_frame', None) - if f and should_trace(f.f_code.co_filename): - idle.append(f) + if f and \ + ScaleneAsyncio.should_trace(f.f_code.co_filename, f.f_code.co_name): + idle.append(cast(FrameType, f)) return idle @staticmethod @@ -113,7 +125,8 @@ def _get_deepest_traceable_frame(coro) -> FrameType: deepest_frame = None while curr: frame = getattr(curr, 'cr_frame', None) - if frame and should_trace(frame.f_code.co_filename): + if frame and \ + ScaleneAsyncio.should_trace(frame.f_code.co_filename, frame.f_code.co_name): deepest_frame = frame curr = getattr(curr, 'cr_await', None) return deepest_frame diff --git a/scalene/scalene_profiler.py b/scalene/scalene_profiler.py index e7f5d0ca4..1a7996ece 100644 --- a/scalene/scalene_profiler.py +++ b/scalene/scalene_profiler.py @@ -77,6 +77,7 @@ import scalene.scalene_config from scalene.scalene_arguments import ScaleneArguments +from scalene.scalene_asyncio import ScaleneAsyncio from scalene.scalene_client_timer import ScaleneClientTimer from scalene.scalene_funcutils import ScaleneFuncUtils from scalene.scalene_json import ScaleneJSON @@ -163,7 +164,7 @@ def enable_profiling() -> Generator[None, None, None]: yield stop() - + class Scalene: """The Scalene profiler itself.""" @@ -512,13 +513,13 @@ def malloc_signal_handler( ): Scalene.update_profiled() pywhere.set_last_profiled_invalidated_false() - # In the setprofile callback, we rely on - # __last_profiled always having the same memory address. + # In the setprofile callback, we rely on + # __last_profiled always having the same memory address. # This is an optimization to not have to traverse the Scalene profiler # object's dictionary every time we want to update the last profiled line. # # A previous change to this code set Scalene.__last_profiled = [fname, lineno, lasti], - # which created a new list object and set the __last_profiled attribute to the new list. This + # which created a new list object and set the __last_profiled attribute to the new list. This # made the object held in `pywhere.cpp` out of date, and caused the profiler to not update the last profiled line. Scalene.__last_profiled[:] = [ Filename(f.f_code.co_filename), @@ -756,6 +757,7 @@ def cpu_signal_handler( Scalene.process_cpu_sample( signum, Scalene.compute_frames_to_record(), + ScaleneAsyncio.compute_suspended_frames_to_record(Scalene.should_trace), now, gpu_load, gpu_mem_used, @@ -906,6 +908,7 @@ def process_cpu_sample( None, ], new_frames: List[Tuple[FrameType, int, FrameType]], + async_frames: List[Tuple[FrameType, int, FrameType]], now: TimeInfo, gpu_load: float, gpu_mem_used: float, @@ -1042,7 +1045,7 @@ def process_cpu_sample( Scalene.__stats.gpu_stats.gpu_mem_samples[fname][lineno].push(gpu_mem_used) # Now handle the rest of the threads. - for frame, tident, orig_frame in new_frames: + for frame, tident, orig_frame in new_frames + async_frames: if frame == main_thread_frame: continue add_stack( @@ -1068,10 +1071,12 @@ def process_cpu_sample( # Ignore sleeping threads. continue # Check if the original caller is stuck inside a call. - if ScaleneFuncUtils.is_call_function( - orig_frame.f_code, - ByteCodeIndex(orig_frame.f_lasti), - ): + # TODO + if orig_frame is None or \ + ScaleneFuncUtils.is_call_function( + orig_frame.f_code, + ByteCodeIndex(orig_frame.f_lasti), + ): # It is. Attribute time to native. Scalene.__stats.cpu_stats.cpu_samples_c[fname][lineno] += normalized_time else: @@ -1225,7 +1230,7 @@ def alloc_sigqueue_processor(x: Optional[List[int]]) -> None: freed_last_trigger = 0 for item in arr: is_malloc = item.action == Scalene.MALLOC_ACTION - if item.count == scalene.scalene_config.NEWLINE_TRIGGER_LENGTH + 1: + if item.count == scalene.scalene_config.NEWLINE_TRIGGER_LENGTH + 1: continue # in previous implementations, we were adding NEWLINE to the footprint. # We should not account for this in the user-facing profile. count = item.count / Scalene.BYTES_PER_MB @@ -1441,7 +1446,7 @@ def memcpy_sigqueue_processor( lineno=LineNumber(int(lineno)), bytecode_index=ByteCodeIndex(int(bytei))) arr.append(memcpy_profiling_sample) - + arr.sort() for item in arr: From 4af02e4f212693c5df71d28f3a5c804b2cf0aea4 Mon Sep 17 00:00:00 2001 From: bd Date: Tue, 5 Aug 2025 20:19:26 -0400 Subject: [PATCH 04/14] Correctly discard frames from tasks which call other tasks --- scalene/scalene_asyncio.py | 81 +++++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 5 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index afcf142fa..26f1cdb3a 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -1,6 +1,7 @@ import asyncio import sys import threading +import gc from types import FrameType from typing import ( @@ -14,16 +15,29 @@ class ScaleneAsyncio: """Provides a set of methods to collect idle task frames.""" should_trace = None + loops: List[Tuple[asyncio.AbstractEventLoop, int]] = [] + current_task = None + + @staticmethod + def current_task_exists(tident) -> bool: + """Given TIDENT, returns true if a current task exists. Returns + true if no event loop is running on TIDENT.""" + current = True + for loop, t in ScaleneAsyncio.loops: + if t == tident: + current = asyncio.current_task(loop) + break + return bool(current) @staticmethod def compute_suspended_frames_to_record(should_trace) -> \ List[Tuple[FrameType, int, FrameType]]: """Collect all frames which belong to suspended tasks.""" - # TODO + # TODO this is an ugly way to access the function ScaleneAsyncio.should_trace = should_trace + ScaleneAsyncio.loops = ScaleneAsyncio._get_event_loops() - loops = ScaleneAsyncio._get_event_loops() - return ScaleneAsyncio._get_frames_from_loops(loops) + return ScaleneAsyncio._get_frames_from_loops(ScaleneAsyncio.loops) @staticmethod def _get_event_loops() -> List[Tuple[asyncio.AbstractEventLoop, int]]: @@ -125,8 +139,65 @@ def _get_deepest_traceable_frame(coro) -> FrameType: deepest_frame = None while curr: frame = getattr(curr, 'cr_frame', None) - if frame and \ - ScaleneAsyncio.should_trace(frame.f_code.co_filename, frame.f_code.co_name): + if not frame: + break + if ScaleneAsyncio.should_trace(frame.f_code.co_filename, + frame.f_code.co_name): deepest_frame = frame curr = getattr(curr, 'cr_await', None) + + # if this task is found to point to another task we're profiling, + # then we will get the deepest frame later and should return nothing. + if curr and any( + ScaleneAsyncio._should_trace_task(task) + for task in ScaleneAsyncio._try_link_tasks(curr) + ): + return None + return deepest_frame + + @staticmethod + def _try_link_tasks(awaitable) -> List[asyncio.Task]: + """Given an AWAITABLE which is not a coroutine, assume it is a future + and attempt to find references to which tasks it is waiting for.""" + + if not isinstance(awaitable, asyncio.Future): + # TODO some wrappers like _asyncio.FutureIter get caught here, + # I am not sure if a more robust approach is necessary + + # can gc be avoided here? + refs = gc.get_referents(awaitable) + if refs: + awaitable = refs[0] + + if not isinstance(awaitable, asyncio.Future): + return [] + + return getattr(awaitable, '_children', []) + + @staticmethod + def _should_trace_task(task) -> bool: + """Returns FALSE if TASK is uninteresting to the user. + + A task is interesting if it is not the current task, if it has actually + started executing, and if a child task did not originate from it. + """ + # the task is not idle + if task == ScaleneAsyncio.current_task: + return False + + coro = task.get_coro() + + # the task hasn't even run yet + # assumes that all started tasks are sitting at an await + # statement. + # if this isn't the case, the associated coroutine will + # be 'waiting' on the coroutine declaration. No! Bad! + if getattr(coro, 'cr_frame', None) is None or \ + getattr(coro, 'cr_await', None) is None: + return False + + frame = getattr(coro, 'cr_frame', None) + + return ScaleneAsyncio.should_trace(frame.f_code.co_filename, + frame.f_code.co_name) From 4ee248687e75a50bdfb146411491b7333b2ede1d Mon Sep 17 00:00:00 2001 From: bd Date: Tue, 5 Aug 2025 20:55:58 -0400 Subject: [PATCH 05/14] Remove some leftover duplicated logic from _get_idle_task_frames --- scalene/scalene_asyncio.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index 26f1cdb3a..dc3775c09 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -93,25 +93,20 @@ def _get_idle_task_frames(loop) -> List[FrameType]: A task is considered 'idle' if it is pending and not the current task.""" idle = [] - current = asyncio.current_task(loop) - for task in asyncio.all_tasks(loop): - # the task is not idle - if task == current: + # set this when we start processing a loop. + # it is required later, but I only want to set it once. + ScaleneAsyncio.current_task = asyncio.current_task(loop) + + for task in asyncio.all_tasks(loop): + if not ScaleneAsyncio._should_trace_task(task): continue coro = task.get_coro() - # the task is suspended but not waiting on any other coroutines. - # this means it has not started---unstarted tasks do not report - # meaningful line numbers, so they are also thrown out - # (note that created tasks are scheduled and not run immediately) - if getattr(coro, 'cr_await', None) is None: - continue - - f = ScaleneAsyncio._get_deepest_traceable_frame(coro) - if f: - idle.append(cast(FrameType, f)) + frame = ScaleneAsyncio._get_deepest_traceable_frame(coro) + if frame: + idle.append(cast(FrameType, frame)) # TODO # handle async generators From 8791e7556f60d41c2cb9d79cd7387516f92384f8 Mon Sep 17 00:00:00 2001 From: bd Date: Tue, 5 Aug 2025 22:29:00 -0400 Subject: [PATCH 06/14] Exhaustively search async generators, fix asyncgen double assignment --- scalene/scalene_asyncio.py | 67 +++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index dc3775c09..4ffc26a82 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -3,7 +3,10 @@ import threading import gc -from types import FrameType +from types import ( + AsyncGeneratorType, + FrameType +) from typing import ( List, Tuple, @@ -108,17 +111,6 @@ def _get_idle_task_frames(loop) -> List[FrameType]: if frame: idle.append(cast(FrameType, frame)) - # TODO - # handle async generators - # ideally, we would access these from _get_deepest_traceable_frame. - # doing it this way causes us to also assign the generator's time to - # the coroutine that called this generator in - # _get_deepest_traceable_frame - for ag in loop._asyncgens: - f = getattr(ag, 'ag_frame', None) - if f and \ - ScaleneAsyncio.should_trace(f.f_code.co_filename, f.f_code.co_name): - idle.append(cast(FrameType, f)) return idle @staticmethod @@ -134,41 +126,53 @@ def _get_deepest_traceable_frame(coro) -> FrameType: deepest_frame = None while curr: frame = getattr(curr, 'cr_frame', None) + if not frame: - break + curr = ScaleneAsyncio._search_awaitable(curr) + if isinstance(curr, AsyncGeneratorType): + frame = getattr(curr, 'ag_frame', None) + else: + break + if ScaleneAsyncio.should_trace(frame.f_code.co_filename, frame.f_code.co_name): deepest_frame = frame - curr = getattr(curr, 'cr_await', None) + + if isinstance(curr, AsyncGeneratorType): + curr = getattr(curr, 'ag_await', None) + else: + curr = getattr(curr, 'cr_await', None) # if this task is found to point to another task we're profiling, # then we will get the deepest frame later and should return nothing. - if curr and any( - ScaleneAsyncio._should_trace_task(task) - for task in ScaleneAsyncio._try_link_tasks(curr) - ): - return None + # this is specific to gathering futures, i.e., gather statement. + if isinstance(curr, asyncio.Future): + tasks = getattr(curr, '_children', []) + if any( + ScaleneAsyncio._should_trace_task(task) + for task in tasks + ): + return None return deepest_frame @staticmethod - def _try_link_tasks(awaitable) -> List[asyncio.Task]: - """Given an AWAITABLE which is not a coroutine, assume it is a future - and attempt to find references to which tasks it is waiting for.""" - + def _search_awaitable(awaitable): + """Given an awaitable which is not a coroutine, assume it is a future + and attempt to find references to further futures or async generators. + """ + future = None if not isinstance(awaitable, asyncio.Future): - # TODO some wrappers like _asyncio.FutureIter get caught here, - # I am not sure if a more robust approach is necessary + # TODO some wrappers like _asyncio.FutureIter, + # async_generator_asend get caught here, I am not sure if a more + # robust approach is necessary # can gc be avoided here? refs = gc.get_referents(awaitable) if refs: - awaitable = refs[0] - - if not isinstance(awaitable, asyncio.Future): - return [] + future = refs[0] - return getattr(awaitable, '_children', []) + return future @staticmethod def _should_trace_task(task) -> bool: @@ -177,6 +181,9 @@ def _should_trace_task(task) -> bool: A task is interesting if it is not the current task, if it has actually started executing, and if a child task did not originate from it. """ + if not isinstance(task, asyncio.Task): + return False + # the task is not idle if task == ScaleneAsyncio.current_task: return False From c0b97dcae27fedde6130c27c926b1cb03e58a159 Mon Sep 17 00:00:00 2001 From: bd Date: Fri, 8 Aug 2025 21:21:06 -0400 Subject: [PATCH 07/14] Address request for type checks in search_awaitable (now trace_down) --- scalene/scalene_asyncio.py | 68 ++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index 4ffc26a82..ad08a7ae0 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -8,6 +8,7 @@ FrameType ) from typing import ( + Optional, List, Tuple, cast, @@ -125,28 +126,19 @@ def _get_deepest_traceable_frame(coro) -> FrameType: curr = coro deepest_frame = None while curr: - frame = getattr(curr, 'cr_frame', None) + frame, curr = ScaleneAsyncio._trace_down(curr) - if not frame: - curr = ScaleneAsyncio._search_awaitable(curr) - if isinstance(curr, AsyncGeneratorType): - frame = getattr(curr, 'ag_frame', None) - else: - break + if frame is None: + break if ScaleneAsyncio.should_trace(frame.f_code.co_filename, frame.f_code.co_name): deepest_frame = frame - if isinstance(curr, AsyncGeneratorType): - curr = getattr(curr, 'ag_await', None) - else: - curr = getattr(curr, 'cr_await', None) - # if this task is found to point to another task we're profiling, # then we will get the deepest frame later and should return nothing. # this is specific to gathering futures, i.e., gather statement. - if isinstance(curr, asyncio.Future): + if curr is not None: tasks = getattr(curr, '_children', []) if any( ScaleneAsyncio._should_trace_task(task) @@ -156,24 +148,6 @@ def _get_deepest_traceable_frame(coro) -> FrameType: return deepest_frame - @staticmethod - def _search_awaitable(awaitable): - """Given an awaitable which is not a coroutine, assume it is a future - and attempt to find references to further futures or async generators. - """ - future = None - if not isinstance(awaitable, asyncio.Future): - # TODO some wrappers like _asyncio.FutureIter, - # async_generator_asend get caught here, I am not sure if a more - # robust approach is necessary - - # can gc be avoided here? - refs = gc.get_referents(awaitable) - if refs: - future = refs[0] - - return future - @staticmethod def _should_trace_task(task) -> bool: """Returns FALSE if TASK is uninteresting to the user. @@ -195,11 +169,39 @@ def _should_trace_task(task) -> bool: # statement. # if this isn't the case, the associated coroutine will # be 'waiting' on the coroutine declaration. No! Bad! - if getattr(coro, 'cr_frame', None) is None or \ - getattr(coro, 'cr_await', None) is None: + frame, awaitable = ScaleneAsyncio._trace_down(coro) + if frame is None or awaitable is None: return False frame = getattr(coro, 'cr_frame', None) return ScaleneAsyncio.should_trace(frame.f_code.co_filename, frame.f_code.co_name) + + @staticmethod + def _trace_down(awaitable) -> \ + Tuple[Optional[FrameType], Optional[asyncio.Future]]: + """Helper for _get_deepest_traceable_frame + Given AWAITABLE, returns its associated frame and the future it is + waiting on, if any.""" + if asyncio.iscoroutine(awaitable) and \ + hasattr(awaitable, 'cr_await') and \ + hasattr(awaitable, 'cr_frame'): + return getattr(awaitable, 'cr_frame', None), \ + getattr(awaitable, 'cr_await', None) + + # attempt to obtain an async-generator + # can gc be avoided here? + refs = gc.get_referents(awaitable) + if refs: + awaitable = refs[0] + + if isinstance(awaitable, AsyncGeneratorType): + return getattr(awaitable, 'ag_frame', None), \ + getattr(awaitable, 'ag_await', None) + + if isinstance(awaitable, asyncio.Future): + # return whatever future we found. + return None, awaitable + + return None, None From b5bf0da34b6dffd306e9218de7a1830a701f1637 Mon Sep 17 00:00:00 2001 From: bd Date: Fri, 8 Aug 2025 22:58:20 -0400 Subject: [PATCH 08/14] (Fix?) Ensure task is not assigned time when waiting on current task --- scalene/scalene_asyncio.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index ad08a7ae0..f8d978386 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -98,12 +98,12 @@ def _get_idle_task_frames(loop) -> List[FrameType]: task.""" idle = [] - # set this when we start processing a loop. - # it is required later, but I only want to set it once. + # required later ScaleneAsyncio.current_task = asyncio.current_task(loop) for task in asyncio.all_tasks(loop): - if not ScaleneAsyncio._should_trace_task(task): + if not ScaleneAsyncio._task_is_valid(task) or \ + task == ScaleneAsyncio.current_task: continue coro = task.get_coro() @@ -141,7 +141,7 @@ def _get_deepest_traceable_frame(coro) -> FrameType: if curr is not None: tasks = getattr(curr, '_children', []) if any( - ScaleneAsyncio._should_trace_task(task) + ScaleneAsyncio._task_is_valid(task) for task in tasks ): return None @@ -149,17 +149,16 @@ def _get_deepest_traceable_frame(coro) -> FrameType: return deepest_frame @staticmethod - def _should_trace_task(task) -> bool: + def _task_is_valid(task) -> bool: """Returns FALSE if TASK is uninteresting to the user. - A task is interesting if it is not the current task, if it has actually - started executing, and if a child task did not originate from it. + A task is interesting if it has actually started executing, and if + a child task did not originate from it. """ if not isinstance(task, asyncio.Task): return False - # the task is not idle - if task == ScaleneAsyncio.current_task: + if task._state != 'PENDING': return False coro = task.get_coro() @@ -170,13 +169,11 @@ def _should_trace_task(task) -> bool: # if this isn't the case, the associated coroutine will # be 'waiting' on the coroutine declaration. No! Bad! frame, awaitable = ScaleneAsyncio._trace_down(coro) - if frame is None or awaitable is None: + if task != ScaleneAsyncio.current_task and \ + (frame is None or awaitable is None): return False - frame = getattr(coro, 'cr_frame', None) - - return ScaleneAsyncio.should_trace(frame.f_code.co_filename, - frame.f_code.co_name) + return True @staticmethod def _trace_down(awaitable) -> \ From 5ed76ccea19b6338faa4ab1a0b030912b6a9750f Mon Sep 17 00:00:00 2001 From: bd Date: Sat, 9 Aug 2025 00:19:50 -0400 Subject: [PATCH 09/14] Do not factor in thread information when adding time to idle tasks --- scalene/scalene_asyncio.py | 18 ++++++++--------- scalene/scalene_profiler.py | 39 +++++++++++++++++++++++++++++++------ 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index f8d978386..4e3c6872f 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -8,10 +8,10 @@ FrameType ) from typing import ( - Optional, - List, - Tuple, cast, + List, + Optional, + Tuple ) @@ -34,8 +34,7 @@ def current_task_exists(tident) -> bool: return bool(current) @staticmethod - def compute_suspended_frames_to_record(should_trace) -> \ - List[Tuple[FrameType, int, FrameType]]: + def compute_suspended_frames_to_record(should_trace) -> List[FrameType]: """Collect all frames which belong to suspended tasks.""" # TODO this is an ugly way to access the function ScaleneAsyncio.should_trace = should_trace @@ -44,7 +43,7 @@ def compute_suspended_frames_to_record(should_trace) -> \ return ScaleneAsyncio._get_frames_from_loops(ScaleneAsyncio.loops) @staticmethod - def _get_event_loops() -> List[Tuple[asyncio.AbstractEventLoop, int]]: + def _get_event_loops() -> List[asyncio.AbstractEventLoop]: """Returns each thread's event loop. If there are none, returns the empty array.""" loops = [] @@ -54,7 +53,7 @@ def _get_event_loops() -> List[Tuple[asyncio.AbstractEventLoop, int]]: loop = ScaleneAsyncio._walk_back_until_loop(frame) # duplicates shouldn't be possible, but just in case... if loop and loop not in loops: - loops.append((loop, cast(int, t.ident))) + loops.append(loop) return loops @staticmethod @@ -79,12 +78,11 @@ def _walk_back_until_loop(frame) -> asyncio.AbstractEventLoop: return None @staticmethod - def _get_frames_from_loops(loops) -> \ - List[Tuple[FrameType, int, FrameType]]: + def _get_frames_from_loops(loops) -> List[FrameType]: """Given LOOPS, returns a flat list of frames corresponding to idle tasks.""" return [ - (frame, tident, None) for loop, tident in loops + frame for loop in loops for frame in ScaleneAsyncio._get_idle_task_frames(loop) ] diff --git a/scalene/scalene_profiler.py b/scalene/scalene_profiler.py index 1a7996ece..596b605b8 100644 --- a/scalene/scalene_profiler.py +++ b/scalene/scalene_profiler.py @@ -908,7 +908,7 @@ def process_cpu_sample( None, ], new_frames: List[Tuple[FrameType, int, FrameType]], - async_frames: List[Tuple[FrameType, int, FrameType]], + idle_async_frames: List[FrameType], now: TimeInfo, gpu_load: float, gpu_mem_used: float, @@ -933,7 +933,7 @@ def process_cpu_sample( Scalene.output_profile() stats.start_clock() - if not new_frames: + if not new_frames and not idle_async_frames: # No new frames, so nothing to update. return @@ -1045,7 +1045,7 @@ def process_cpu_sample( Scalene.__stats.gpu_stats.gpu_mem_samples[fname][lineno].push(gpu_mem_used) # Now handle the rest of the threads. - for frame, tident, orig_frame in new_frames + async_frames: + for frame, tident, orig_frame in new_frames: if frame == main_thread_frame: continue add_stack( @@ -1071,9 +1071,7 @@ def process_cpu_sample( # Ignore sleeping threads. continue # Check if the original caller is stuck inside a call. - # TODO - if orig_frame is None or \ - ScaleneFuncUtils.is_call_function( + if ScaleneFuncUtils.is_call_function( orig_frame.f_code, ByteCodeIndex(orig_frame.f_lasti), ): @@ -1092,9 +1090,38 @@ def process_cpu_sample( core_utilization ) + # Finally, handle idle asynchronous tasks + for frame in idle_async_frames: + add_stack( + frame, + Scalene.should_trace, + Scalene.__stats.stacks, + average_python_time, + average_c_time, + average_cpu_time, + ) + + fname = Filename(frame.f_code.co_filename) + lineno = LineNumber(frame.f_lineno) + Scalene.enter_function_meta(frame, Scalene.__stats) + # TODO don't do this + # asynchronous frames are always counted as native time. + # additionally, even if the associated thread is sleeping, + # idle tasks still... idle. + + Scalene.__stats.cpu_stats.cpu_samples_c[fname][lineno] += total_time + Scalene.__stats.cpu_stats.cpu_samples[fname] += total_time + Scalene.__stats.cpu_stats.cpu_utilization[fname][lineno].push( + cpu_utilization + ) + Scalene.__stats.cpu_stats.core_utilization[fname][lineno].push( + core_utilization + ) + # Clean up all the frames del new_frames[:] del new_frames + del idle_async_frames del is_thread_sleeping Scalene.__stats.cpu_stats.total_cpu_samples += total_time From ce8be5aaeccd4db5b7b07c3318bc822fe0238707 Mon Sep 17 00:00:00 2001 From: bd Date: Sat, 9 Aug 2025 00:27:49 -0400 Subject: [PATCH 10/14] Add safety for when new_frames are empty, idle_async_frames are not --- scalene/scalene_profiler.py | 73 +++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/scalene/scalene_profiler.py b/scalene/scalene_profiler.py index 596b605b8..f42972297 100644 --- a/scalene/scalene_profiler.py +++ b/scalene/scalene_profiler.py @@ -1001,48 +1001,49 @@ def process_cpu_sample( normalized_time = total_time / total_frames - # Now attribute execution time. - - main_thread_frame = new_frames[0][0] - average_python_time = python_time / total_frames average_c_time = c_time / total_frames average_cpu_time = (python_time + c_time) / total_frames - if Scalene.__args.stacks: - add_stack( - main_thread_frame, - Scalene.should_trace, - Scalene.__stats.stacks, - average_python_time, - average_c_time, - average_cpu_time, - ) + # Now attribute execution time. # First, handle the main thread. - Scalene.enter_function_meta(main_thread_frame, Scalene.__stats) - fname = Filename(main_thread_frame.f_code.co_filename) - lineno = LineNumber(main_thread_frame.f_lineno) - # print(main_thread_frame) - # print(fname, lineno) - main_tid = cast(int, threading.main_thread().ident) - if not is_thread_sleeping[main_tid]: - Scalene.__stats.cpu_stats.cpu_samples_python[fname][ - lineno - ] += average_python_time - Scalene.__stats.cpu_stats.cpu_samples_c[fname][lineno] += average_c_time - Scalene.__stats.cpu_stats.cpu_samples[fname] += average_cpu_time - Scalene.__stats.cpu_stats.cpu_utilization[fname][lineno].push( - cpu_utilization - ) - Scalene.__stats.cpu_stats.core_utilization[fname][lineno].push( - core_utilization - ) - Scalene.__stats.gpu_stats.gpu_samples[fname][lineno] += ( - gpu_load * elapsed.wallclock - ) - Scalene.__stats.gpu_stats.n_gpu_samples[fname][lineno] += elapsed.wallclock - Scalene.__stats.gpu_stats.gpu_mem_samples[fname][lineno].push(gpu_mem_used) + if (new_frames): + main_thread_frame = new_frames[0][0] + + if Scalene.__args.stacks: + add_stack( + main_thread_frame, + Scalene.should_trace, + Scalene.__stats.stacks, + average_python_time, + average_c_time, + average_cpu_time, + ) + + Scalene.enter_function_meta(main_thread_frame, Scalene.__stats) + fname = Filename(main_thread_frame.f_code.co_filename) + lineno = LineNumber(main_thread_frame.f_lineno) + # print(main_thread_frame) + # print(fname, lineno) + main_tid = cast(int, threading.main_thread().ident) + if not is_thread_sleeping[main_tid]: + Scalene.__stats.cpu_stats.cpu_samples_python[fname][ + lineno + ] += average_python_time + Scalene.__stats.cpu_stats.cpu_samples_c[fname][lineno] += average_c_time + Scalene.__stats.cpu_stats.cpu_samples[fname] += average_cpu_time + Scalene.__stats.cpu_stats.cpu_utilization[fname][lineno].push( + cpu_utilization + ) + Scalene.__stats.cpu_stats.core_utilization[fname][lineno].push( + core_utilization + ) + Scalene.__stats.gpu_stats.gpu_samples[fname][lineno] += ( + gpu_load * elapsed.wallclock + ) + Scalene.__stats.gpu_stats.n_gpu_samples[fname][lineno] += elapsed.wallclock + Scalene.__stats.gpu_stats.gpu_mem_samples[fname][lineno].push(gpu_mem_used) # Now handle the rest of the threads. for frame, tident, orig_frame in new_frames: From d10fed492f1b3305e02aca1f26766a6476633bbe Mon Sep 17 00:00:00 2001 From: bd Date: Sat, 9 Aug 2025 00:53:33 -0400 Subject: [PATCH 11/14] Do not profile frames if they belong to an event loop without a task --- scalene/scalene_profiler.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/scalene/scalene_profiler.py b/scalene/scalene_profiler.py index f42972297..0bbdac44c 100644 --- a/scalene/scalene_profiler.py +++ b/scalene/scalene_profiler.py @@ -1005,11 +1005,20 @@ def process_cpu_sample( average_c_time = c_time / total_frames average_cpu_time = (python_time + c_time) / total_frames + # filter out all frames which are not running a current task. + # this is done after calculating the total frames (threads), because + # an idle thread still takes up CPU time. + active_frames = [ + (frame, tident, orig_frame) + for frame, tident, orig_frame in new_frames + if ScaleneAsyncio.current_task_exists(tident) + ] + # Now attribute execution time. # First, handle the main thread. - if (new_frames): - main_thread_frame = new_frames[0][0] + if (active_frames): + main_thread_frame = active_frames[0][0] if Scalene.__args.stacks: add_stack( @@ -1046,7 +1055,7 @@ def process_cpu_sample( Scalene.__stats.gpu_stats.gpu_mem_samples[fname][lineno].push(gpu_mem_used) # Now handle the rest of the threads. - for frame, tident, orig_frame in new_frames: + for frame, tident, orig_frame in active_frames: if frame == main_thread_frame: continue add_stack( @@ -1120,8 +1129,11 @@ def process_cpu_sample( ) # Clean up all the frames + del active_frames[:] + del active_frames del new_frames[:] del new_frames + del idle_async_frames[:] del idle_async_frames del is_thread_sleeping Scalene.__stats.cpu_stats.total_cpu_samples += total_time From 630a2dc705229c5007f012d147e5211ab200bba3 Mon Sep 17 00:00:00 2001 From: bd Date: Sat, 9 Aug 2025 00:57:05 -0400 Subject: [PATCH 12/14] Fix typing inconsistency on ScaleneAsyncio.loops --- scalene/scalene_asyncio.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index 4e3c6872f..8b2ddb1a5 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -11,7 +11,8 @@ cast, List, Optional, - Tuple + Tuple, + Dict ) @@ -19,7 +20,7 @@ class ScaleneAsyncio: """Provides a set of methods to collect idle task frames.""" should_trace = None - loops: List[Tuple[asyncio.AbstractEventLoop, int]] = [] + loops: Dict[int, asyncio.AbstractEventLoop] = dict() current_task = None @staticmethod @@ -27,10 +28,9 @@ def current_task_exists(tident) -> bool: """Given TIDENT, returns true if a current task exists. Returns true if no event loop is running on TIDENT.""" current = True - for loop, t in ScaleneAsyncio.loops: - if t == tident: - current = asyncio.current_task(loop) - break + loop = ScaleneAsyncio.loops.get(tident, None) + if isinstance(loop, asyncio.AbstractEventLoop): + current = asyncio.current_task(loop) return bool(current) @staticmethod @@ -43,17 +43,16 @@ def compute_suspended_frames_to_record(should_trace) -> List[FrameType]: return ScaleneAsyncio._get_frames_from_loops(ScaleneAsyncio.loops) @staticmethod - def _get_event_loops() -> List[asyncio.AbstractEventLoop]: + def _get_event_loops() -> Dict[int, asyncio.AbstractEventLoop]: """Returns each thread's event loop. If there are none, returns the empty array.""" - loops = [] + loops = dict() for t in threading.enumerate(): frame = sys._current_frames().get(t.ident) if frame: loop = ScaleneAsyncio._walk_back_until_loop(frame) - # duplicates shouldn't be possible, but just in case... - if loop and loop not in loops: - loops.append(loop) + if loop: + loops[cast(int, t.ident)] = loop return loops @staticmethod @@ -82,7 +81,7 @@ def _get_frames_from_loops(loops) -> List[FrameType]: """Given LOOPS, returns a flat list of frames corresponding to idle tasks.""" return [ - frame for loop in loops + frame for loop in loops.values() for frame in ScaleneAsyncio._get_idle_task_frames(loop) ] From 0a6e83682c57bf783c10c011cbc4dfcbd2728da2 Mon Sep 17 00:00:00 2001 From: bd Date: Mon, 11 Aug 2025 17:06:12 -0400 Subject: [PATCH 13/14] New metric to output percentages: total samples, not total time --- scalene/scalene_profiler.py | 45 ++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/scalene/scalene_profiler.py b/scalene/scalene_profiler.py index 0bbdac44c..483b5e97a 100644 --- a/scalene/scalene_profiler.py +++ b/scalene/scalene_profiler.py @@ -981,7 +981,7 @@ def process_cpu_sample( gpu_load = 0.0 assert gpu_load >= 0.0 and gpu_load <= 1.0 gpu_time = gpu_load * elapsed.wallclock - Scalene.__stats.gpu_stats.total_gpu_samples += gpu_time + Scalene.__stats.gpu_stats.total_gpu_samples += gpu_time # TODO asyncio-extension: should this be moved? python_time = Scalene.__args.cpu_sampling_rate c_time = elapsed.virtual - python_time c_time = max(c_time, 0) @@ -990,35 +990,35 @@ def process_cpu_sample( # First, find out how many frames are not sleeping. We need # to know this number so we can parcel out time appropriately - # (equally to each running thread). - total_frames = sum( + total_threads = sum( not is_thread_sleeping[tident] for frame, tident, orig_frame in new_frames ) - if total_frames == 0: - total_frames = 1 + if total_threads == 0: + total_threads = 1 - normalized_time = total_time / total_frames + # the time each thread spends, which always adds up to TOTAL_TIME. + normalized_time = total_time / total_threads + async_time = total_time * len(idle_async_frames) - average_python_time = python_time / total_frames - average_c_time = c_time / total_frames - average_cpu_time = (python_time + c_time) / total_frames + average_python_time = python_time / total_threads + average_c_time = c_time / total_threads + average_cpu_time = (python_time + c_time) / total_threads # filter out all frames which are not running a current task. # this is done after calculating the total frames (threads), because # an idle thread still takes up CPU time. - active_frames = [ + new_frames = [ (frame, tident, orig_frame) for frame, tident, orig_frame in new_frames if ScaleneAsyncio.current_task_exists(tident) ] # Now attribute execution time. - # First, handle the main thread. - if (active_frames): - main_thread_frame = active_frames[0][0] + if (new_frames): + main_thread_frame = new_frames[0][0] if Scalene.__args.stacks: add_stack( @@ -1055,7 +1055,7 @@ def process_cpu_sample( Scalene.__stats.gpu_stats.gpu_mem_samples[fname][lineno].push(gpu_mem_used) # Now handle the rest of the threads. - for frame, tident, orig_frame in active_frames: + for frame, tident, orig_frame in new_frames: if frame == main_thread_frame: continue add_stack( @@ -1114,29 +1114,22 @@ def process_cpu_sample( fname = Filename(frame.f_code.co_filename) lineno = LineNumber(frame.f_lineno) Scalene.enter_function_meta(frame, Scalene.__stats) - # TODO don't do this - # asynchronous frames are always counted as native time. + + # asynchronous frames are always counted as system time. # additionally, even if the associated thread is sleeping, # idle tasks still... idle. - Scalene.__stats.cpu_stats.cpu_samples_c[fname][lineno] += total_time Scalene.__stats.cpu_stats.cpu_samples[fname] += total_time - Scalene.__stats.cpu_stats.cpu_utilization[fname][lineno].push( - cpu_utilization - ) - Scalene.__stats.cpu_stats.core_utilization[fname][lineno].push( - core_utilization - ) + Scalene.__stats.cpu_stats.cpu_utilization[fname][lineno].push(0) + Scalene.__stats.cpu_stats.core_utilization[fname][lineno].push(0) # Clean up all the frames - del active_frames[:] - del active_frames del new_frames[:] del new_frames del idle_async_frames[:] del idle_async_frames del is_thread_sleeping - Scalene.__stats.cpu_stats.total_cpu_samples += total_time + Scalene.__stats.cpu_stats.total_cpu_samples += total_time + async_time # Returns final frame (up to a line in a file we are profiling), the thread identifier, and the original frame. @staticmethod From c49a69160170f4ae5b5530c69b259cb0b59df3e1 Mon Sep 17 00:00:00 2001 From: bd Date: Mon, 11 Aug 2025 17:10:36 -0400 Subject: [PATCH 14/14] Readd samples belonging to the event loop. --- scalene/scalene_asyncio.py | 10 ---------- scalene/scalene_profiler.py | 9 --------- 2 files changed, 19 deletions(-) diff --git a/scalene/scalene_asyncio.py b/scalene/scalene_asyncio.py index 8b2ddb1a5..9820b723a 100644 --- a/scalene/scalene_asyncio.py +++ b/scalene/scalene_asyncio.py @@ -23,16 +23,6 @@ class ScaleneAsyncio: loops: Dict[int, asyncio.AbstractEventLoop] = dict() current_task = None - @staticmethod - def current_task_exists(tident) -> bool: - """Given TIDENT, returns true if a current task exists. Returns - true if no event loop is running on TIDENT.""" - current = True - loop = ScaleneAsyncio.loops.get(tident, None) - if isinstance(loop, asyncio.AbstractEventLoop): - current = asyncio.current_task(loop) - return bool(current) - @staticmethod def compute_suspended_frames_to_record(should_trace) -> List[FrameType]: """Collect all frames which belong to suspended tasks.""" diff --git a/scalene/scalene_profiler.py b/scalene/scalene_profiler.py index 483b5e97a..68b46055d 100644 --- a/scalene/scalene_profiler.py +++ b/scalene/scalene_profiler.py @@ -1006,15 +1006,6 @@ def process_cpu_sample( average_c_time = c_time / total_threads average_cpu_time = (python_time + c_time) / total_threads - # filter out all frames which are not running a current task. - # this is done after calculating the total frames (threads), because - # an idle thread still takes up CPU time. - new_frames = [ - (frame, tident, orig_frame) - for frame, tident, orig_frame in new_frames - if ScaleneAsyncio.current_task_exists(tident) - ] - # Now attribute execution time. # First, handle the main thread. if (new_frames):