Skip to content

Commit

Permalink
The process_timeout keyword argument has been added to the progress_i…
Browse files Browse the repository at this point in the history
…map and progress_imap methods. The stopit_after_timeout decorator has been added to the tools module
  • Loading branch information
padu committed Jul 22, 2022
1 parent 762b790 commit 28ec17d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
16 changes: 13 additions & 3 deletions parallelbar/parallelbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from tqdm.auto import tqdm
from tools import get_len
from tools import _wrapped_func


class ProgressBar(tqdm):
Expand Down Expand Up @@ -118,7 +119,7 @@ def _do_parallel(func, pool_type, tasks, n_cpu, chunk_size, core_progress,
thread = Thread(target=_process_status, args=(bar_size, bar_step, disable, parent))
thread.start()
target = partial(_process, func, child)
bar_parameters = dict(total=bar_size, disable=disable, position=1, desc='ERROR', colour='red')
bar_parameters = dict(total=len_tasks, disable=disable, position=1, desc='ERROR', colour='red')
error_bar = {}
result = list()
if pool_type == 'map':
Expand Down Expand Up @@ -167,18 +168,27 @@ def progress_map(func, tasks, n_cpu=None, chunk_size=None, core_progress=False,


def progress_imap(func, tasks, n_cpu=None, chunk_size=1, core_progress=False, context=None, total=None,
bar_step=1, disable=False):
bar_step=1, disable=False, process_timeout=None):
if process_timeout and chunk_size != 1:
raise ValueError('the process_timeout can only be used if chunk_size=1')
if isinstance(tasks, abc.Iterator) and not total:
raise ValueError('If the tasks are an iterator, the total parameter must be specified')
if process_timeout:
func = partial(_wrapped_func, func, process_timeout)
result = _do_parallel(func, 'imap', tasks, n_cpu, chunk_size, core_progress, context, total, bar_step, disable,
None)
return result


def progress_imapu(func, tasks, n_cpu=None, chunk_size=1, core_progress=False, context=None, total=None,
bar_step=1, disable=False):
bar_step=1, disable=False, process_timeout=None):
if process_timeout and chunk_size != 1:
raise ValueError('the process_timeout can only be used if chunk_size=1')
if isinstance(tasks, abc.Iterator) and not total:
raise ValueError('If the tasks are an iterator, the total parameter must be specified')
if process_timeout:
func = partial(_wrapped_func, func, process_timeout)
result = _do_parallel(func, 'imap_unordered', tasks, n_cpu, chunk_size, core_progress, context, total, bar_step,
disable, None)
return result

31 changes: 31 additions & 0 deletions parallelbar/tools.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from math import sin, cos, radians
import threading
import _thread as thread


def func_args_unpack(func, args):
Expand Down Expand Up @@ -50,3 +52,32 @@ def get_packs_count(array, pack_size):
if extra:
total += 1
return total


def stop_function():
thread.interrupt_main()


def stopit_after_timeout(s, raise_exception=True):
def actual_decorator(func):
def wrapper(*args, **kwargs):
timer = threading.Timer(s, stop_function)
timer.start()
try:
result = func(*args, **kwargs)
except KeyboardInterrupt:
msg = f'function {func.__name__} took longer than {s} s.'
if raise_exception:
raise TimeoutError(msg)
result = msg
finally:
timer.cancel()
return result

return wrapper

return actual_decorator


def _wrapped_func(func, s, *args, **kwargs):
return stopit_after_timeout(s)(func)(*args, **kwargs)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='parallelbar',
version='0.2.9',
version='0.2.10',
packages=find_packages(),
author='Dubovik Pavel',
author_email='geometryk@gmail.com',
Expand Down

0 comments on commit 28ec17d

Please sign in to comment.