diff --git a/HISTORY.rst b/HISTORY.rst index db9e3dd..8955884 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,7 +6,7 @@ TaskGraph Release History Unreleased Changes ------------------ -* Updating primary repo url to Github. +* Updating primary repository URL to GitHub. * Adding support for Python 3.8. * Removing the ``EncapsulatedOp`` abstract class. In practice the development loop that encouraged the use of ``EncapsulatedOp`` is flawed and can lead to @@ -16,6 +16,26 @@ Unreleased Changes * Refactor to support separate TaskGraph objects that use the same database. * Removed the ``n_retries`` parameter from ``add_task``. Users are recommended to handle retries within functions themselves. +* Added a ``hash_target_files`` flag to ``add_task`` that when set to False, + causes TaskGraph to only note the existence of target files after execution + or as part of an evaluation to determine if the Task was precalculated. + This is useful for operations that initialize a file but subsequent runs of + the program modify it such as a new database or a downloaded file. +* Fixed an issue on the monitor execution thread that caused shutdown of a + TaskGraph object to be delayed up to the amount of delay in the monitor + reporting update. +* Added a ``.get()`` function for ``Task`` objects that returns the result of + the respective ``func`` call. This value is cached in the TaskGraph database + and hence can be used to avoid repeated execution. Note the addition of this + function changes the functionality of calling ``add_task`` with no target + path list. In previous versions the Task would execute once per TaskGraph + instance, now successive ``Task`` objects with the same execution signature + will use cached results. +* To support the addition of the ``.get()`` function a ``transient_run`` + parameter is added to ``add_task`` that causes TaskGraph to avoid + recording a completed ``Task`` even if the execution hash would have been + identical to a previously completed run where the target artifacts still + existed. 0.8.5 (2019-09-11) ------------------ diff --git a/taskgraph/Task.py b/taskgraph/Task.py index 47d6926..3d4d54b 100644 --- a/taskgraph/Task.py +++ b/taskgraph/Task.py @@ -69,7 +69,7 @@ class NoDaemonContext(type(multiprocessing.get_context('spawn'))): class NonDaemonicPool(multiprocessing.pool.Pool): """NonDaemonic Process Pool.""" - # Inovking super to set the context of Pool class explicitly + # Invoking super to set the context of Pool class explicitly def __init__(self, *args, **kwargs): kwargs['context'] = NoDaemonContext() super(NonDaemonicPool, self).__init__(*args, **kwargs) @@ -179,10 +179,6 @@ def __init__( # and are waiting for a worker self._task_waiting_count = 0 - # Synchronization objects: - # this lock is used to synchronize the following objects - self._taskgraph_lock = threading.RLock() - # this might hold the threads to execute tasks if n_workers >= 0 self._task_executor_thread_list = [] @@ -213,6 +209,7 @@ def __init__( CREATE TABLE IF NOT EXISTS taskgraph_data ( task_reexecution_hash TEXT NOT NULL, target_path_stats BLOB NOT NULL, + result BLOB NOT NULL, PRIMARY KEY (task_reexecution_hash) ); CREATE UNIQUE INDEX IF NOT EXISTS task_reexecution_hash_index @@ -231,13 +228,15 @@ def __init__( # start concurrent reporting of taskgraph if reporting interval is set self._reporting_interval = reporting_interval if reporting_interval is not None: - self._monitor_thread = threading.Thread( + self._execution_monitor_wait_event = threading.Event() + self._execution_monitor_thread = threading.Thread( target=self._execution_monitor, + args=(self._execution_monitor_wait_event,), name='_execution_monitor') # make it a daemon so we don't have to figure out how to # close it when execution complete - self._monitor_thread.daemon = True - self._monitor_thread.start() + self._execution_monitor_thread.daemon = True + self._execution_monitor_thread.start() # launch executor threads for thread_id in range(max(1, n_workers)): @@ -279,6 +278,7 @@ def __del__(self): try: # it's possible the global state is not well defined, so just in # case we'll wrap it all up in a try/except + self._terminated = True if self._n_workers > 0: LOGGER.debug("shutting down workers") self._worker_pool.terminate() @@ -308,7 +308,8 @@ def __del__(self): self._executor_ready_event.set() for executor_thread in self._task_executor_thread_list: try: - timedout = not executor_thread.join(_MAX_TIMEOUT) + executor_thread.join(_MAX_TIMEOUT) + timedout = executor_thread.is_alive() if timedout: LOGGER.debug( 'task executor thread timed out %s', @@ -318,11 +319,15 @@ def __del__(self): "Exception when joining %s", executor_thread) if self._reporting_interval is not None: LOGGER.debug("joining _monitor_thread.") - timedout = not self._monitor_thread.join(_MAX_TIMEOUT) + if self._logging_queue: + self._logging_queue.put(None) + self._execution_monitor_wait_event.set() + self._execution_monitor_thread.join(_MAX_TIMEOUT) + timedout = self._execution_monitor_thread.is_alive() if timedout: LOGGER.debug( '_monitor_thread %s timed out', - self._monitor_thread) + self._execution_monitor_thread) for task in self._task_hash_map.values(): # this is a shortcut to get the tasks to mark as joined task.task_done_executing_event.set() @@ -337,7 +342,7 @@ def __del__(self): break LOGGER.debug('taskgraph terminated') except Exception: - LOGGER.exception('exception occured during __del__') + LOGGER.exception('exception occurred during __del__') def _task_executor(self): """Worker that executes Tasks that have satisfied dependencies.""" @@ -421,8 +426,10 @@ def _task_executor(self): def add_task( self, func=None, args=None, kwargs=None, task_name=None, target_path_list=None, ignore_path_list=None, - dependent_task_list=None, ignore_directories=True, priority=0, - hash_algorithm='sizetimestamp', copy_duplicate_artifact=False): + hash_target_files=True, dependent_task_list=None, + ignore_directories=True, priority=0, + hash_algorithm='sizetimestamp', copy_duplicate_artifact=False, + transient_run=False): """Add a task to the task graph. Parameters: @@ -446,6 +453,11 @@ def add_task( ignore_path_list (list): list of file paths that could be in args/kwargs that should be ignored when considering timestamp hashes. + hash_target_files (bool): If True, the hash value of the target + files will be recorded to determine if a future run of this + function is precalculated. If False, this function only notes + the existence of the target files before determining if + a function call is precalculated. dependent_task_list (list): list of `Task`s that this task must `join` before executing. ignore_directories (boolean): if the existence/timestamp of any @@ -468,12 +480,18 @@ def add_task( `hash_algorithm` is 'sizetimestamp' the task will require the same base path files to determine equality. If it is a `hashlib` algorithm only file contents will be considered. - copy_duplicate_artifact (bool): if true and the Tasks' + copy_duplicate_artifact (bool): if True and the Tasks' argument signature matches a previous Tasks without direct comparison of the target path files in the arguments other than their positions in the target path list, the target artifacts from a previously successful Task execution will be copied to the new one. + transient_run (bool): if True a call with an identical execution + hash will be reexecuted on a subsequent instantiation of a + future TaskGraph object. If a duplicate task is submitted + to the same object it will not be re-run in any scenario. + Otherwise if False, subsequent tasks with an identical + execution hash will be skipped. Returns: Task which was just added to the graph or an existing Task that @@ -489,126 +507,125 @@ def add_task( reached a terminate state. """ - with self._taskgraph_lock: - try: - if self._terminated: - raise RuntimeError( - "add_task when Taskgraph is terminated.") - if self._closed: - raise ValueError( - "The task graph is closed and cannot accept more " - "tasks.") - self._added_task_count += 1 - if args is None: - args = [] - if kwargs is None: - kwargs = {} - if task_name is None: - task_name = 'UNNAMED TASK' - if dependent_task_list is None: - dependent_task_list = [] - if target_path_list is None: - target_path_list = [] - if ignore_path_list is None: - ignore_path_list = [] - if func is None: - def func(): return None - - # this is a pretty common error to accidentally not pass a - # Task to the dependent task list. - if any(not isinstance(task, Task) - for task in dependent_task_list): - raise ValueError( - "Objects passed to dependent task list that are not " - "tasks: %s", dependent_task_list) - - task_name = '%s (%d)' % (task_name, len(self._task_hash_map)) - new_task = Task( - task_name, func, args, kwargs, target_path_list, - ignore_path_list, ignore_directories, - self._worker_pool, self._taskgraph_cache_dir_path, - priority, hash_algorithm, copy_duplicate_artifact, - self._taskgraph_started_event, - self._task_database_path) - - self._task_name_map[new_task.task_name] = new_task - # it may be this task was already created in an earlier call, - # use that object in its place - if new_task in self._task_hash_map: - duplicate_task = self._task_hash_map[new_task] - new_task_target_set = set(new_task._target_path_list) - duplicate_task_target_set = set( - duplicate_task._target_path_list) - if new_task_target_set == duplicate_task_target_set: - LOGGER.warning( - "A duplicate task was submitted: %s original: %s", - new_task, self._task_hash_map[new_task]) - return duplicate_task - disjoint_target_set = ( - new_task_target_set.symmetric_difference( - duplicate_task_target_set)) - if len(disjoint_target_set) == ( - len(new_task_target_set) + - len(duplicate_task_target_set)): - if duplicate_task not in dependent_task_list: - LOGGER.info( - "A task was created that had an identical " - "args signature sans target paths, but a " - "different target_path_list of the same " - "length. To avoid recomputation, dynamically " - "adding previous Task (%s) as a dependent " - "task to this one (%s).", - duplicate_task.task_name, task_name) - dependent_task_list = ( - dependent_task_list + [duplicate_task]) - else: - raise RuntimeError( - "A task was created that has the same arguments " - "as another task, but only partially different " - "expected target paths. This runs the risk of " - "unpredictably overwriting output so treating as " - "a runtime error: submitted task: %s, existing " - "task: %s" % (new_task, duplicate_task)) - self._task_hash_map[new_task] = new_task - if self._n_workers < 0: - # call directly if single threaded - new_task._call() + try: + if self._terminated: + raise RuntimeError( + "add_task when Taskgraph is terminated.") + if self._closed: + raise ValueError( + "The task graph is closed and cannot accept more " + "tasks.") + self._added_task_count += 1 + if args is None: + args = [] + if kwargs is None: + kwargs = {} + if task_name is None: + task_name = 'UNNAMED TASK' + if dependent_task_list is None: + dependent_task_list = [] + if target_path_list is None: + target_path_list = [] + if ignore_path_list is None: + ignore_path_list = [] + if func is None: + def func(): return None + + # this is a pretty common error to accidentally not pass a + # Task to the dependent task list. + if any(not isinstance(task, Task) + for task in dependent_task_list): + raise ValueError( + "Objects passed to dependent task list that are not " + "tasks: %s", dependent_task_list) + + task_name = '%s (%d)' % (task_name, len(self._task_hash_map)) + new_task = Task( + task_name, func, args, kwargs, target_path_list, + ignore_path_list, hash_target_files, ignore_directories, + transient_run, self._worker_pool, + self._taskgraph_cache_dir_path, priority, hash_algorithm, + copy_duplicate_artifact, self._taskgraph_started_event, + self._task_database_path) + + self._task_name_map[new_task.task_name] = new_task + # it may be this task was already created in an earlier call, + # use that object in its place + if new_task in self._task_hash_map: + duplicate_task = self._task_hash_map[new_task] + new_task_target_set = set(new_task._target_path_list) + duplicate_task_target_set = set( + duplicate_task._target_path_list) + if new_task_target_set == duplicate_task_target_set: + LOGGER.warning( + "A duplicate task was submitted: %s original: %s", + new_task, self._task_hash_map[new_task]) + return duplicate_task + disjoint_target_set = ( + new_task_target_set.symmetric_difference( + duplicate_task_target_set)) + if len(disjoint_target_set) == ( + len(new_task_target_set) + + len(duplicate_task_target_set)): + if duplicate_task not in dependent_task_list: + LOGGER.info( + "A task was created that had an identical " + "args signature sans target paths, but a " + "different target_path_list of the same " + "length. To avoid recomputation, dynamically " + "adding previous Task (%s) as a dependent " + "task to this one (%s).", + duplicate_task.task_name, task_name) + dependent_task_list = ( + dependent_task_list + [duplicate_task]) else: - # determine if task is ready or is dependent on other - # tasks + raise RuntimeError( + "A task was created that has the same arguments " + "as another task, but only partially different " + "expected target paths. This runs the risk of " + "unpredictably overwriting output so treating as " + "a runtime error: submitted task: %s, existing " + "task: %s" % (new_task, duplicate_task)) + self._task_hash_map[new_task] = new_task + if self._n_workers < 0: + # call directly if single threaded + new_task._call() + else: + # determine if task is ready or is dependent on other + # tasks + LOGGER.debug( + "multithreaded: %s sending to new task queue.", + task_name) + outstanding_dep_task_name_list = [ + dep_task.task_name for dep_task in dependent_task_list + if dep_task.task_name + not in self._completed_task_names] + if not outstanding_dep_task_name_list: LOGGER.debug( - "multithreaded: %s sending to new task queue.", - task_name) - outstanding_dep_task_name_list = [ - dep_task.task_name for dep_task in dependent_task_list - if dep_task.task_name - not in self._completed_task_names] - if not outstanding_dep_task_name_list: - LOGGER.debug( - "sending task %s right away", new_task.task_name) - self._task_ready_priority_queue.put(new_task) - self._task_waiting_count += 1 - self._executor_ready_event.set() - else: - # there are unresolved tasks that the waiting - # process scheduler has not been notified of. - # Record dependencies. - for dep_task_name in outstanding_dep_task_name_list: - # record tasks that are dependent on dep_task_name - self._task_dependent_map[dep_task_name].add( - new_task.task_name) - # record tasks that new_task depends on - self._dependent_task_map[new_task.task_name].add( - dep_task_name) - return new_task - - except Exception: - # something went wrong, shut down the taskgraph - LOGGER.exception( - "Something went wrong when adding task %s, " - "terminating taskgraph.", task_name) - self._terminate() - raise + "sending task %s right away", new_task.task_name) + self._task_ready_priority_queue.put(new_task) + self._task_waiting_count += 1 + self._executor_ready_event.set() + else: + # there are unresolved tasks that the waiting + # process scheduler has not been notified of. + # Record dependencies. + for dep_task_name in outstanding_dep_task_name_list: + # record tasks that are dependent on dep_task_name + self._task_dependent_map[dep_task_name].add( + new_task.task_name) + # record tasks that new_task depends on + self._dependent_task_map[new_task.task_name].add( + dep_task_name) + return new_task + + except Exception: + # something went wrong, shut down the taskgraph + LOGGER.exception( + "Something went wrong when adding task %s, " + "terminating taskgraph.", task_name) + self._terminate() + raise def _handle_logs_from_processes(self, queue_): LOGGER.debug('Starting logging worker') @@ -620,8 +637,18 @@ def _handle_logs_from_processes(self, queue_): logger.handle(record) LOGGER.debug('_handle_logs_from_processes shutting down') - def _execution_monitor(self): - """Log state of taskgraph every `self._reporting_interval` seconds.""" + def _execution_monitor(self, monitor_wait_event): + """Log state of taskgraph every `self._reporting_interval` seconds. + + Parameters: + monitor_wait_event (threading.Event): used to sleep the monitor + for `self._reporting_interval` seconds, or to wake up to + terminate for shutdown. + + Returns: + None. + + """ start_time = time.time() while True: if self._terminated: @@ -649,8 +676,8 @@ def _execution_monitor(self): 'closed' if self._closed else 'open', active_task_message) - time.sleep( - self._reporting_interval - ( + monitor_wait_event.wait( + timeout=self._reporting_interval - ( (time.time() - start_time)) % self._reporting_interval) LOGGER.debug("_execution monitor shutting down") @@ -696,7 +723,9 @@ def join(self, timeout=None): self._logging_monitor_thread.join(timeout) if self._reporting_interval is not None: LOGGER.debug("joining _monitor_thread.") - self._monitor_thread.join(timeout) + # wake up the execution monitor + self._execution_monitor_wait_event.set() + self._execution_monitor_thread.join(timeout) for executor_task in self._task_executor_thread_list: executor_task.join(timeout) LOGGER.debug('taskgraph terminated') @@ -745,8 +774,8 @@ class Task(object): def __init__( self, task_name, func, args, kwargs, target_path_list, - ignore_path_list, ignore_directories, - worker_pool, cache_dir, priority, hash_algorithm, + ignore_path_list, hash_target_files, ignore_directories, + transient_run, worker_pool, cache_dir, priority, hash_algorithm, copy_duplicate_artifact, taskgraph_started_event, task_database_path): """Make a Task. @@ -764,9 +793,20 @@ def __init__( ignore_path_list (list): list of file paths that could be in args/kwargs that should be ignored when considering timestamp hashes. + hash_target_files (bool): If True, the hash value of the target + files will be recorded to determine if a future run of this + function is precalculated. If False, this function only notes + the existence of the target files before determining if + a function call is precalculated. ignore_directories (bool): if the existence/timestamp of any directories discovered in args or kwargs is used as part of the work token hash. + transient_run (bool): if True a call with an identical execution + hash will be reexecuted on a subsequent instantiation of a + future TaskGraph object. If a duplicate task is submitted + to the same object it will not be re-run in any scenario. + Otherwise if False, subsequent tasks with an identical + execution hash will be skipped. worker_pool (multiprocessing.Pool): if not None, is a multiprocessing pool that can be used for `_call` execution. cache_dir (string): path to a directory to both write and expect @@ -791,13 +831,15 @@ def __init__( taskgraph_started_event (Event): can be used to start the main TaskGraph if it has not yet started in case a Task is joined. task_database_path (str): path to an SQLITE database that has - table named "taskgraph_data" with the two fields: + table named "taskgraph_data" with the three fields: task_hash TEXT NOT NULL, target_path_stats BLOB NOT NULL + result BLOB NOT NULL If a call is successful its hash is inserted/updated in the - table and the target_path_stats stores the base/target stats + table, the target_path_stats stores the base/target stats for the target files created by the call and listed in - `target_path_list`. + `target_path_list`, and the result of `func` is stored in + "result". """ # it is a common error to accidentally pass a non string as to the @@ -820,7 +862,9 @@ def __init__( self._cache_dir = cache_dir self._ignore_path_list = [ _normalize_path(path) for path in ignore_path_list] + self._hash_target_files = hash_target_files self._ignore_directories = ignore_directories + self._transient_run = transient_run self._worker_pool = worker_pool self._taskgraph_started_event = taskgraph_started_event self._task_database_path = task_database_path @@ -837,6 +881,9 @@ def __init__( # a _call and there are no more attempts at reexecution. self.task_done_executing_event = threading.Event() + # These are used to store and later access the result of the call. + self._result = None + # Calculate a hash based only on argument inputs. try: if not hasattr(Task, 'func_source_map'): @@ -935,7 +982,8 @@ def __repr__(self): "task_id_hash": self._task_id_hash, "task_reexecution_hash": self._task_reexecution_hash, "exception_object": self.exception_object, - "self._reexecution_info": self._reexecution_info + "self._reexecution_info": self._reexecution_info, + "self._result": self._result, }) def _call(self): @@ -956,7 +1004,7 @@ def _call(self): self.task_done_executing_event.set() return LOGGER.debug("not precalculated %s", self.task_name) - result_calculated = False + artifact_copied = False if self._copy_duplicate_artifact: # try to see if we can copy old files database_result = _execute_sqlite( @@ -969,9 +1017,10 @@ def _call(self): execute='execute', fetch='one') try: if database_result: - result_target_path_stats = pickle.loads( - database_result[0]) - LOGGER.debug('duplicate artifact db results: %s', result_target_path_stats) + result_target_path_stats = pickle.loads(database_result[0]) + LOGGER.debug( + 'duplicate artifact db results: %s', + result_target_path_stats) if (len(result_target_path_stats) == len(self._target_path_list)): if all([ @@ -1002,27 +1051,31 @@ def _call(self): artifact_target, result_target_path_stats, self._target_path_list)) - result_calculated = True + artifact_copied = True except IOError as e: LOGGER.warning( "IOError encountered when hashing original source " "files.\n%s" % e) - if not result_calculated: + if not artifact_copied: if self._worker_pool is not None: result = self._worker_pool.apply_async( func=self._func, args=self._args, kwds=self._kwargs) # the following blocks and raises an exception if result # raised an exception LOGGER.debug("apply_async for task %s", self.task_name) - result.get() + self._result = result.get() else: LOGGER.debug("direct _func for task %s", self.task_name) - self._func(*self._args, **self._kwargs) + self._result = self._func(*self._args, **self._kwargs) # check that the target paths exist and record stats for later + if not self._hash_target_files: + target_hash_algorithm = 'exists' + else: + target_hash_algorithm = self._hash_algorithm result_target_path_stats = list( _get_file_stats( - self._target_path_list, self._hash_algorithm, [], False)) + self._target_path_list, target_hash_algorithm, [], False)) result_target_path_set = set( [x[0] for x in result_target_path_stats]) target_path_set = set(self._target_path_list) @@ -1037,19 +1090,23 @@ def _call(self): # target file. Otherwise we infer the result of this call is # transient between taskgraph executions and we should expect to # run it again. - if self._target_path_list: + if not self._transient_run: _execute_sqlite( - "INSERT OR REPLACE INTO taskgraph_data VALUES (?, ?)", + "INSERT OR REPLACE INTO taskgraph_data VALUES (?, ?, ?)", self._task_database_path, mode='modify', argument_list=( self._task_reexecution_hash, - pickle.dumps(result_target_path_stats))) + pickle.dumps(result_target_path_stats), + pickle.dumps(self._result))) self.task_done_executing_event.set() LOGGER.debug("successful run on task %s", self.task_name) def is_precalculated(self): """Return true if _call need not be invoked. + If the task has been precalculated it will fetch the return result from + the previous run. + Returns: True if the Task's target paths exist in the same state as the last recorded run at the time this function is called. It is @@ -1062,9 +1119,13 @@ def is_precalculated(self): # an expected result. This will allow a task to change its hash in # case a different version of a file was passed in. # these are the stats of the files that exist that aren't ignored + if not self._hash_target_files: + target_hash_algorithm = 'exists' + else: + target_hash_algorithm = self._hash_algorithm file_stat_list = list(_get_file_stats( [self._args, self._kwargs], - self._hash_algorithm, + target_hash_algorithm, self._target_path_list+self._ignore_path_list, self._ignore_directories)) @@ -1095,7 +1156,7 @@ def is_precalculated(self): reexecution_string.encode('utf-8')).hexdigest() try: database_result = _execute_sqlite( - """SELECT target_path_stats from taskgraph_data + """SELECT target_path_stats, result from taskgraph_data WHERE (task_reexecution_hash == ?)""", self._task_database_path, mode='read_only', argument_list=(self._task_reexecution_hash,), fetch='one') @@ -1115,6 +1176,10 @@ def is_precalculated(self): mismatched_target_file_list.append( 'Path not found: %s' % path) continue + elif hash_algorithm == 'exists': + # this is the case where hash_algorithm == 'exists' but + # we already know the file exists so we do nothing + continue if hash_algorithm == 'sizetimestamp': size, modified_time, actual_path = [ x for x in hash_string.split('::')] @@ -1148,6 +1213,7 @@ def is_precalculated(self): "but there are these mismatches: %s", self.task_name, '\n'.join(mismatched_target_file_list)) return False + self._result = pickle.loads(database_result[1]) LOGGER.debug("precalculated (%s)" % self) return True except EOFError: @@ -1162,20 +1228,55 @@ def join(self, timeout=None): LOGGER.debug( "joining %s done executing: %s", self.task_name, self.task_done_executing_event) - timed_out = self.task_done_executing_event.wait(timeout) + successful_wait = self.task_done_executing_event.wait(timeout) if self.exception_object: raise self.exception_object - return timed_out + return successful_wait + + def get(self, timeout=None): + """Return the result of the `func` once it is ready. + + If `timeout` is None, this call blocks until the task is complete + determined by a call to `.join()`. Otherwise will wait up to `timeout` + seconds before raising a `RuntimeError` if exceeded. + + Parameters: + timeout (float): if not None this parameter is a floating point + number specifying a timeout for the operation in seconds. + + Returns: + value of the result + Raises: + RuntimeError when `timeout` exceeded. + + """ + timeout = not self.join(timeout) + if timeout: + raise RuntimeError('call to get timed out') + return self._result def _get_file_stats( - base_value, hash_algorithm, ignore_list, ignore_directories): + base_value, hash_algorithm, ignore_list, + ignore_directories): """Return fingerprints of any filepaths in `base_value`. Parameters: base_value: any python value. Any file paths in `base_value` should be "os.path.norm"ed before this function is called. contains filepaths in any nested structure. + hash_algorithm (string): either a hash function id that + exists in hashlib.algorithms_available, 'exists', or + 'sizetimestamp'. Any paths to actual files in the arguments will be + digested with this algorithm. If value is 'sizetimestamp' the + digest will only use the normed path, size, and timestamp of any + files found in the arguments. This value is used when + determining whether a task is precalculated or its target + files can be copied to an equivalent task. Note if + `hash_algorithm` is 'sizetimestamp' the task will require the + same base path files to determine equality. If it is a + `hashlib` algorithm only file contents will be considered. If this + value is 'exists' the value of the hash will be 'exists'. ignore_list (list): any paths found in this list are not included as part of the file stats. All paths in this list should be "os.path.norm"ed. @@ -1195,9 +1296,12 @@ def _get_file_stats( if norm_path not in ignore_list and ( not os.path.isdir(norm_path) or not ignore_directories) and os.path.exists(norm_path): - yield ( - norm_path, hash_algorithm, - _hash_file(norm_path, hash_algorithm)) + if hash_algorithm == 'exists': + yield (norm_path, 'exists', 'exists') + else: + yield ( + norm_path, hash_algorithm, + _hash_file(norm_path, hash_algorithm)) except (OSError, ValueError): # I ran across a ValueError when one of the os.path functions # interpreted the value as a path that was too long. @@ -1406,7 +1510,6 @@ def _execute_sqlite( database_path))) connection = sqlite3.connect(ro_uri, uri=True) elif mode == 'modify': - LOGGER.debug('modify') connection = sqlite3.connect(database_path) else: raise ValueError('Unknown mode: %s' % mode) diff --git a/tests/test_task.py b/tests/test_task.py index 221f54f..faeebf9 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -21,6 +21,14 @@ MAX_TRY_WAIT_MS = 500 +def _return_value_once(value): + """Returns the value passed to it only once.""" + if hasattr(_return_value_once, 'executed'): + raise RuntimeError("this function was called twice") + _return_value_once.executed = True + return value + + def _noop_function(**kwargs): """Does nothing except allow kwargs to be passed.""" pass @@ -586,8 +594,8 @@ def test_get_file_stats(self): result = list(_get_file_stats(nofile, 'sizetimestamp', [], False)) self.assertEqual(result, []) - def test_repeat_targetless_runs(self): - """TaskGraph: ensure that repeated runs with no targets reexecute.""" + def test_transient_runs(self): + """TaskGraph: ensure that transent tasks reexecute.""" task_graph = taskgraph.TaskGraph(self.workspace_dir, -1) target_path = os.path.join(self.workspace_dir, '1000.dat') value = 5 @@ -595,6 +603,7 @@ def test_repeat_targetless_runs(self): _ = task_graph.add_task( func=_create_list_on_disk, args=(value, list_len), + transient_run=True, kwargs={ 'target_path': target_path, }) @@ -948,6 +957,10 @@ def test_duplicate_call(self): """TaskGraph: test that duplicate calls copy target path.""" task_graph = taskgraph.TaskGraph(self.workspace_dir, 0) target_path = os.path.join(self.workspace_dir, 'testfile.txt') + + if hasattr(_create_file_once, 'executed'): + del _create_file_once.executed + task_graph.add_task( func=_create_file_once, args=(target_path, 'test'), @@ -979,6 +992,45 @@ def test_duplicate_call(self): alt_contents = alt_target_file.read() self.assertEqual(contents, alt_contents) + def test_duplicate_call_changed_target(self): + """TaskGraph: test that duplicate calls copy target path.""" + task_graph = taskgraph.TaskGraph(self.workspace_dir, 0) + target_path = os.path.join(self.workspace_dir, 'testfile.txt') + + if hasattr(_create_file_once, 'executed'): + del _create_file_once.executed + + task_graph.add_task( + func=_create_file_once, + args=(target_path, 'test'), + target_path_list=[target_path], + hash_target_files=False, + task_name='first _create_file_once') + + task_graph.close() + task_graph.join() + del task_graph + + with open(target_path, 'a') as target_file: + target_file.write('updated') + + task_graph = taskgraph.TaskGraph(self.workspace_dir, 0) + task_graph.add_task( + func=_create_file_once, + args=(target_path, 'test'), + target_path_list=[target_path], + hash_target_files=False, + task_name='first _create_file_once') + + task_graph.close() + task_graph.join() + del task_graph + + with open(target_path, 'r') as result_file: + result_contents = result_file.read() + self.assertEqual('testupdated', result_contents) + + def test_duplicate_call_modify_timestamp(self): """TaskGraph: test that duplicate call modified stamp recompute.""" task_graph = taskgraph.TaskGraph(self.workspace_dir, 0) @@ -1282,6 +1334,49 @@ def test_same_timestamp_and_value(self): task_a._task_id_hash, task_b._task_id_hash, "task ids should be different since the filenames are different") + def test_return_value(self): + """TaskGraph: test that `.get` behavior works as expected.""" + n_iterations = 3 + for iteration_id in range(n_iterations): + transient_run = iteration_id == n_iterations-1 + LOGGER.debug(iteration_id) + task_graph = taskgraph.TaskGraph(self.workspace_dir, 0, 0) + expected_value = 'a good value' + value_task = task_graph.add_task( + func=_return_value_once, + transient_run=transient_run, + args=(expected_value,)) + value = value_task.get() + self.assertEqual(value, expected_value) + task_graph.close() + task_graph.join() + task_graph = None + + # reset run + del _return_value_once.executed + for iteration_id in range(n_iterations): + LOGGER.debug(iteration_id) + task_graph = taskgraph.TaskGraph(self.workspace_dir, 0, 0) + expected_value = 'transient run' + if iteration_id == 0: + value_task = task_graph.add_task( + func=_return_value_once, + transient_run=True, + args=(expected_value,)) + value = value_task.get() + self.assertEqual(value, expected_value) + else: + with self.assertRaises(RuntimeError): + value_task = task_graph.add_task( + func=_return_value_once, + transient_run=True, + args=(expected_value,)) + value = value_task.get() + + task_graph.close() + task_graph.join() + task_graph = None + def Fail(n_tries, result_path): """Create a function that fails after `n_tries`."""