diff --git a/parallelbar/parallelbar.py b/parallelbar/parallelbar.py index d008ef2..d570f0b 100644 --- a/parallelbar/parallelbar.py +++ b/parallelbar/parallelbar.py @@ -72,18 +72,30 @@ def _bar_size(chunk_size, len_tasks, n_cpu): class ProgressStatus: def __init__(self): self.next_update = 1 - self.last_update_t = time.monotonic() + self.last_update_t = time.perf_counter() self.last_update_val = 0 def func_wrapped(cnt, state, func, need_serialize, error_handling, set_error_value, worker_queue, total, task): + if need_serialize: + func = dill.loads(func) try: - if need_serialize: - func = dill.loads(func) result = func(task) + except Exception as e: + if error_handling == 'raise': + worker_queue.put((1, 1)) + worker_queue.put((None, None)) + raise + else: + worker_queue.put((1, 1)) + _ = next(cnt) + if set_error_value is None: + return e + return set_error_value + else: updated = next(cnt) if updated == state.next_update: - time_now = time.monotonic() + time_now = time.perf_counter() delta_t = time_now - state.last_update_t delta_i = updated - state.last_update_val @@ -94,18 +106,8 @@ def func_wrapped(cnt, state, func, need_serialize, error_handling, set_error_val worker_queue.put_nowait((0, delta_i)) elif updated == total: worker_queue.put_nowait((0, updated - state.last_update_val)) - except Exception as e: - if error_handling == 'raise': - worker_queue.put((1, 1)) - worker_queue.put((None, None)) - raise - else: - worker_queue.put((1, 1)) - if set_error_value is None: - return e - return set_error_value - else: - return result + + return result def _do_parallel(func, pool_type, tasks, initializer, initargs, n_cpu, chunk_size, context, total, bar_step, disable, diff --git a/setup.py b/setup.py index ab63ded..0de35cb 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name='parallelbar', - version='1.1.2', + version='1.1.3', packages=find_packages(), author='Dubovik Pavel', author_email='geometryk@gmail.com',