diff --git a/parallelbar/parallelbar.py b/parallelbar/parallelbar.py index bc9729e..47b9bac 100644 --- a/parallelbar/parallelbar.py +++ b/parallelbar/parallelbar.py @@ -11,6 +11,7 @@ from tqdm.auto import tqdm from tools import get_len +from tools import _wrapped_func class ProgressBar(tqdm): @@ -118,7 +119,7 @@ def _do_parallel(func, pool_type, tasks, n_cpu, chunk_size, core_progress, thread = Thread(target=_process_status, args=(bar_size, bar_step, disable, parent)) thread.start() target = partial(_process, func, child) - bar_parameters = dict(total=bar_size, disable=disable, position=1, desc='ERROR', colour='red') + bar_parameters = dict(total=len_tasks, disable=disable, position=1, desc='ERROR', colour='red') error_bar = {} result = list() if pool_type == 'map': @@ -167,18 +168,27 @@ def progress_map(func, tasks, n_cpu=None, chunk_size=None, core_progress=False, def progress_imap(func, tasks, n_cpu=None, chunk_size=1, core_progress=False, context=None, total=None, - bar_step=1, disable=False): + bar_step=1, disable=False, process_timeout=None): + if process_timeout and chunk_size != 1: + raise ValueError('the process_timeout can only be used if chunk_size=1') if isinstance(tasks, abc.Iterator) and not total: raise ValueError('If the tasks are an iterator, the total parameter must be specified') + if process_timeout: + func = partial(_wrapped_func, func, process_timeout) result = _do_parallel(func, 'imap', tasks, n_cpu, chunk_size, core_progress, context, total, bar_step, disable, None) return result def progress_imapu(func, tasks, n_cpu=None, chunk_size=1, core_progress=False, context=None, total=None, - bar_step=1, disable=False): + bar_step=1, disable=False, process_timeout=None): + if process_timeout and chunk_size != 1: + raise ValueError('the process_timeout can only be used if chunk_size=1') if isinstance(tasks, abc.Iterator) and not total: raise ValueError('If the tasks are an iterator, the total parameter must be specified') + if process_timeout: + func = partial(_wrapped_func, func, process_timeout) result = _do_parallel(func, 'imap_unordered', tasks, n_cpu, chunk_size, core_progress, context, total, bar_step, disable, None) return result + diff --git a/parallelbar/tools.py b/parallelbar/tools.py index 0e39961..4845a27 100644 --- a/parallelbar/tools.py +++ b/parallelbar/tools.py @@ -1,4 +1,6 @@ from math import sin, cos, radians +import threading +import _thread as thread def func_args_unpack(func, args): @@ -50,3 +52,32 @@ def get_packs_count(array, pack_size): if extra: total += 1 return total + + +def stop_function(): + thread.interrupt_main() + + +def stopit_after_timeout(s, raise_exception=True): + def actual_decorator(func): + def wrapper(*args, **kwargs): + timer = threading.Timer(s, stop_function) + timer.start() + try: + result = func(*args, **kwargs) + except KeyboardInterrupt: + msg = f'function {func.__name__} took longer than {s} s.' + if raise_exception: + raise TimeoutError(msg) + result = msg + finally: + timer.cancel() + return result + + return wrapper + + return actual_decorator + + +def _wrapped_func(func, s, *args, **kwargs): + return stopit_after_timeout(s)(func)(*args, **kwargs) diff --git a/setup.py b/setup.py index a4cd0b7..4b031f6 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='parallelbar', - version='0.2.9', + version='0.2.10', packages=find_packages(), author='Dubovik Pavel', author_email='geometryk@gmail.com',