diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 83eb2716cf..e21d44f260 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -94,6 +94,13 @@ jobs: pip install .[monitoring,visualization] parsl/tests/test-viz.sh + # config_local_test comes after viz so that the large monitoring.db + # created by `make test` is still around + - name: make config_local_test + run: | + source .venv/bin/activate + make config_local_test + - name: make coverage run: | source .venv/bin/activate @@ -109,3 +116,88 @@ jobs: pytest-parsl/ ci_job_info.txt compression-level: 9 + + + gen3: + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ ubuntu-latest ] + py: [ "3.11" ] + CC: [ gcc ] + CXX: [ g++ ] + + defaults: + run: + # cf. https://github.com/conda-incubator/setup-miniconda#important + shell: bash -l {0} + + steps: + - uses: actions/checkout@v2 + with: + repository: LSSTDESC/gen3_workflow + + - uses: actions/checkout@master + with: + path: ./parsl-repo + + - name: Setup conda + uses: conda-incubator/setup-miniconda@v2 + with: + activate-environment: stack + python-version: "3.11" + condarc-file: etc/.condarc + + - name: Install conda deps + run: | + conda info + conda list + conda install -y mamba + mamba install -y --file conda_requirements.txt + conda info + conda list + + + - name: Install workflow packages + run: | + # pip install -U --no-deps 'parsl[monitoring,workqueue] @ git+https://github.com/parsl/parsl@desc' + pip install -U --no-deps './parsl-repo[monitoring,workqueue]' + pip install typeguard tblib paramiko dill pyzmq globus-sdk sqlalchemy_utils + conda install -c conda-forge ndcctools=7.6.1=py311h689c632_0 --no-deps + + - name: Clone gen3_workflow checkout the branch + shell: bash -l {0} + run: | + echo pwd + pwd + + echo ls + ls + + echo Cloning gen3_workflow repo + git clone https://github.com/LSSTDESC/gen3_workflow + + echo declaring gen3_workflow to eups + eups declare gen3_workflow -r ${PWD}/gen3_workflow -t current + + cd gen3_workflow + git fetch origin master:TESTING + git checkout TESTING + + - name: Run the test pipelines + run: | + setup gen3_workflow + (eups list lsst_distrib) 2>&1 | grep -v "Unknown tag" + (eups list gen3_workflow) 2>&1 | grep -v "Unknown tag" + cd tests + pytest test_query_workflow.py test_bps_restart.py + + + - name: Archive test logs + if: ${{ always() }} + uses: actions/upload-artifact@v2 + with: + name: testinfo-${{ matrix.python-version }} + path: gen3_workflow/ + diff --git a/Makefile b/Makefile index ad127f2c23..8ade93bf43 100644 --- a/Makefile +++ b/Makefile @@ -86,10 +86,16 @@ radical_local_test: mkdir -p ~/.radical/pilot/configs && echo '{"localhost": {"virtenv_mode": "local"}}' > ~/.radical/pilot/configs/resource_local.json pytest parsl/tests/ -k "not cleannet and not issue3328 and not executor_supports_std_stream_tuples" --config parsl/tests/configs/local_radical.py --random-order --durations 10 +.PHONY: workqueue_mon_test +workqueue_mon_test: $(WORKQUEUE_INSTALL) ## run all tests with workqueue_ex config + pip3 install ".[monitoring]" + PYTHONPATH=.:/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet and not issue363" --config parsl/tests/configs/workqueue_monitoring_config.py --cov=parsl --cov-append --cov-report= --random-order + + .PHONY: config_local_test config_local_test: $(CCTOOLS_INSTALL) pip3 install ".[monitoring,visualization,proxystore,kubernetes]" - PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 + PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet and not site" --config local --random-order --durations 10 .PHONY: site_test site_test: @@ -101,7 +107,7 @@ perf_test: parsl-perf --time 5 --config parsl/tests/configs/local_threads.py .PHONY: test ## run all tests with all config types -test: clean_coverage isort lint flake8 mypy local_thread_test htex_local_test htex_local_alternate_test wqex_local_test vineex_local_test radical_local_test config_local_test perf_test ## run all tests +test: clean_coverage isort lint flake8 mypy local_thread_test htex_local_test htex_local_alternate_test wqex_local_test workqueue_mon_test vineex_local_test radical_local_test perf_test ## run all tests .PHONY: tag tag: ## create a tag in git. to run, do a 'make VERSION="version string" tag diff --git a/parsl/app/app.py b/parsl/app/app.py index 8d0d829b33..e9089f0ca7 100644 --- a/parsl/app/app.py +++ b/parsl/app/app.py @@ -8,7 +8,6 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Union import typeguard -from typing_extensions import Literal from parsl.dataflow.dflow import DataFlowKernel from parsl.dataflow.futures import AppFuture @@ -27,7 +26,7 @@ class AppBase(metaclass=ABCMeta): @typeguard.typechecked def __init__(self, func: Callable, data_flow_kernel: Optional[DataFlowKernel] = None, - executors: Union[List[str], Literal['all']] = 'all', + executors: Union[List[str], str] = 'all', cache: bool = False, ignore_for_cache: Optional[Sequence[str]] = None) -> None: """Construct the App object. @@ -80,7 +79,7 @@ def __call__(self, *args: Any, **kwargs: Any) -> AppFuture: def python_app(function: Optional[Callable] = None, data_flow_kernel: Optional[DataFlowKernel] = None, cache: bool = False, - executors: Union[List[str], Literal['all']] = 'all', + executors: Union[List[str], str] = 'all', ignore_for_cache: Optional[Sequence[str]] = None) -> Callable: """Decorator function for making python apps. @@ -146,7 +145,7 @@ def wrapper(f: Callable) -> PythonApp: return PythonApp(f, data_flow_kernel=data_flow_kernel, cache=cache, - executors=["_parsl_internal"], + executors="_parsl_internal", ignore_for_cache=ignore_for_cache, join=True) return wrapper(func) @@ -159,7 +158,7 @@ def wrapper(f: Callable) -> PythonApp: def bash_app(function: Optional[Callable] = None, data_flow_kernel: Optional[DataFlowKernel] = None, cache: bool = False, - executors: Union[List[str], Literal['all']] = 'all', + executors: Union[List[str], str] = 'all', ignore_for_cache: Optional[Sequence[str]] = None) -> Callable: """Decorator function for making bash apps. diff --git a/parsl/config.py b/parsl/config.py index 1358e99d28..c3679fa2fe 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -177,6 +177,8 @@ def _validate_executors(self) -> None: if len(duplicates) > 0: raise ConfigurationError('Executors must have unique labels ({})'.format( ', '.join(['label={}'.format(repr(d)) for d in duplicates]))) + if 'all' in labels: + raise ConfigurationError('Executor cannot be labelled "all"') def validate_usage_tracking(self, level: int) -> None: if not USAGE_TRACKING_DISABLED <= level <= USAGE_TRACKING_LEVEL_3: diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 3b3c0ca6e4..ebd0abbe26 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -47,6 +47,7 @@ from parsl.monitoring.message_type import MessageType from parsl.monitoring.remote import monitor_wrapper from parsl.process_loggers import wrap_with_logs +from parsl.trace import Span, event, output_event_stats, span_bind_sub from parsl.usage_tracking.usage import UsageTracker from parsl.utils import Timer, get_all_checkpoints, get_std_fname_mode, get_version @@ -636,17 +637,23 @@ def _launch_if_ready_async(self, task_record: TaskRecord) -> None: _launch_if_ready will launch the specified task, if it is ready to run (for example, without dependencies, and in pending state). """ + task_id = task_record['id'] + event("DFK_LAUNCH_IF_READY_START", task_record['span']) exec_fu = None - task_id = task_record['id'] with task_record['task_launch_lock']: if task_record['status'] != States.pending: logger.debug(f"Task {task_id} is not pending, so launch_if_ready skipping") return - if self._count_deps(task_record['depends']) != 0: - logger.debug(f"Task {task_id} has outstanding dependencies, so launch_if_ready skipping") + counted_deps = self._count_deps(task_record['depends']) + + logger.debug(f"METRIC COUNTED_DEPS {task_id} " + f"total={len(task_record['depends'])} outstanding={counted_deps}") + + if counted_deps != 0: + logger.debug(f"Task {task_id} has {counted_deps} outstanding dependencies, so launch_if_ready skipping") return # We can now launch the task or handle any dependency failures @@ -697,6 +704,7 @@ def _launch_if_ready_async(self, task_record: TaskRecord) -> None: logger.error("add_done_callback got an exception which will be ignored", exc_info=True) task_record['exec_fu'] = exec_fu + event("DFK_LAUNCH_IF_READY_END", task_record['span']) def launch_task(self, task_record: TaskRecord) -> Future: """Handle the actual submission of the task to the executor layer. @@ -708,6 +716,8 @@ def launch_task(self, task_record: TaskRecord) -> Future: Future that tracks the execution of the submitted function """ task_id = task_record['id'] + task_span = task_record['span'] + event("DFK_LAUNCH_TASK_START", task_span) function = task_record['func'] args = task_record['args'] kwargs = task_record['kwargs'] @@ -719,6 +729,7 @@ def launch_task(self, task_record: TaskRecord) -> Future: logger.info("Reusing cached result for task {}".format(task_id)) task_record['from_memo'] = True assert isinstance(memo_fu, Future) + event("DFK_LAUNCH_TASK_END_MEMO", task_record['span']) return memo_fu task_record['from_memo'] = False @@ -730,8 +741,11 @@ def launch_task(self, task_record: TaskRecord) -> Future: raise ValueError("Task {} requested invalid executor {}".format(task_id, executor_label)) try_id = task_record['fail_count'] + try_span = Span("TRY", (task_id, try_id)) + span_bind_sub(task_span, try_span) if self.monitoring is not None and self.monitoring.resource_monitoring_enabled: + event("DFK_LAUNCH_TASK_MONITORING_WRAP_START", try_span) wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO (function, args, kwargs) = monitor_wrapper(f=function, args=args, @@ -745,14 +759,22 @@ def launch_task(self, task_record: TaskRecord) -> Future: radio_mode=executor.radio_mode, monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) + event("DFK_LAUNCH_TASK_MONITORING_WRAP_END", try_span) + event("DFK_LAUNCH_TASK_GET_SUBMITTER_LOCK_START", try_span) with self.submitter_lock: + event("DFK_LAUNCH_TASK_GET_SUBMITTER_LOCK_END", try_span) exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs) + event("DFK_LAUNCH_TASK_UPDATE_TASK_STATE_START", try_span) self.update_task_state(task_record, States.launched) + event("DFK_LAUNCH_TASK_UPDATE_TASK_STATE_END", try_span) + event("DFK_LAUNCH_TASK_SEND_TASK_LOG_INFO_START", try_span) self._send_task_log_info(task_record) + event("DFK_LAUNCH_TASK_SEND_TASK_LOG_INFO_END", try_span) if hasattr(exec_fu, "parsl_executor_task_id"): + # span_bind_sub(try_span, exec_fu.parsl_executor_task_span) logger.info( f"Parsl task {task_id} try {try_id} launched on executor {executor.label} " f"with executor id {exec_fu.parsl_executor_task_id}") @@ -760,8 +782,11 @@ def launch_task(self, task_record: TaskRecord) -> Future: else: logger.info(f"Parsl task {task_id} try {try_id} launched on executor {executor.label}") + event("DFK_LAUNCH_TASK_LOG_STD_STREAMS_START", try_span) self._log_std_streams(task_record) + event("DFK_LAUNCH_TASK_LOG_STD_STREAMS_END", try_span) + event("DFK_LAUNCH_TASK_END_LAUNCHED", try_span) return exec_fu def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable) -> Tuple[Sequence[Any], Dict[str, Any], @@ -973,7 +998,10 @@ def submit(self, AppFuture """ - + task_id = self.task_count + self.task_count += 1 + task_span = Span("TASK", task_id) + event("DFK_SUBMIT_START", task_span) if ignore_for_cache is None: ignore_for_cache = [] else: @@ -983,15 +1011,17 @@ def submit(self, if self.cleanup_called: raise NoDataFlowKernelError("Cannot submit to a DFK that has been cleaned up") - task_id = self.task_count - self.task_count += 1 + event("DFK_SUBMIT_CHOOSE_EXECUTOR_START", task_span) if isinstance(executors, str) and executors.lower() == 'all': choices = list(e for e in self.executors if e != '_parsl_internal') + elif isinstance(executors, str): # and not 'all' + choices = [executors] elif isinstance(executors, list): choices = executors else: raise ValueError("Task {} supplied invalid type for executors: {}".format(task_id, type(executors))) executor = random.choice(choices) + event("DFK_SUBMIT_CHOOSE_EXECUTOR_END", task_span) logger.debug("Task {} will be sent to executor {}".format(task_id, executor)) resource_specification = app_kwargs.get('parsl_resource_specification', {}) @@ -1017,14 +1047,18 @@ def submit(self, 'try_id': 0, 'id': task_id, 'task_launch_lock': threading.Lock(), + 'span': task_span, 'time_invoked': datetime.datetime.now(), 'time_returned': None, 'try_time_launched': None, 'try_time_returned': None, 'resource_specification': resource_specification} + event("DFK_SUBMIT_UPDATE_UNSCHED_STATE_START", task_span) self.update_task_state(task_record, States.unsched) + event("DFK_SUBMIT_UPDATE_UNSCHED_STATE_END", task_span) + event("DFK_SUBMIT_MUNGE_ARGS_START", task_span) for kw in ['stdout', 'stderr']: if kw in app_kwargs: if app_kwargs[kw] == parsl.AUTO_LOGNAME: @@ -1034,19 +1068,23 @@ def submit(self, app_kwargs[kw] = self.default_std_autopath(task_record, kw) else: app_kwargs[kw] = self.config.std_autopath(task_record, kw) + event("DFK_SUBMIT_MUNGE_ARGS_END", task_span) app_fu = AppFuture(task_record) task_record['app_fu'] = app_fu # Transform remote input files to data futures + event("DFK_SUBMIT_ADD_DEPS_START", task_span) app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func) func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func) + event("DFK_SUBMIT_ADD_DEPS_END", task_span) logger.debug("Added output dependencies") # Replace the function invocation in the TaskRecord with whatever file-staging # substitutions have been made. + task_record.update({ 'args': app_args, 'func': func, @@ -1058,6 +1096,7 @@ def submit(self, logger.debug("Gathering dependencies") # Get the list of dependencies for the task + event("DFK_SUBMIT_EXAMINE_DEPS_START", task_span) depends = self._gather_all_deps(app_args, app_kwargs) logger.debug("Gathered dependencies") task_record['depends'] = depends @@ -1071,15 +1110,25 @@ def submit(self, else: waiting_message = "not waiting on any dependency" + logger.debug(f"METRIC GATHERED_DEPS {task_id} " + f"depends={len(depends)}") + + event("DFK_SUBMIT_EXAMINE_DEPS_END", task_span) + logger.info("Task {} submitted for App {}, {}".format(task_id, task_record['func_name'], waiting_message)) + event("DFK_SUBMIT_ADD_CALLBACK_START", task_span) app_fu.add_done_callback(partial(self.handle_app_update, task_record)) + event("DFK_SUBMIT_UPDATE_PENDING_STATE_START", task_span) self.update_task_state(task_record, States.pending) + event("DFK_SUBMIT_UPDATE_PENDING_STATE_END", task_span) logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu'])) + event("DFK_SUBMIT_MONITORING_PENDING_START", task_span) self._send_task_log_info(task_record) + event("DFK_SUBMIT_MONITORING_PENDING_END", task_span) # at this point add callbacks to all dependencies to do a launch_if_ready # call whenever a dependency completes. @@ -1105,6 +1154,7 @@ def callback_adapter(dep_fut: Future) -> None: self.launch_if_ready(task_record) + event("DFK_SUBMIT_END", task_span) return app_fu # it might also be interesting to assert that all DFK @@ -1245,6 +1295,10 @@ def cleanup(self) -> None: else: logger.debug("Cleaning up non-default DFK - not unregistering") + # TODO: enabling based on whether dict tracing is enabled or not. + logger.info("Writing tracing pickle file") + output_event_stats(directory=self.run_dir) + logger.info("DFK cleanup complete") def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str: diff --git a/parsl/dataflow/errors.py b/parsl/dataflow/errors.py index 83fff06de8..08ed0b4179 100644 --- a/parsl/dataflow/errors.py +++ b/parsl/dataflow/errors.py @@ -103,3 +103,7 @@ class JoinError(PropagatedException): def __init__(self, dependent_exceptions_tids: Sequence[Tuple[BaseException, str]], task_id: int) -> None: super().__init__(dependent_exceptions_tids, task_id, failure_description="Join failure") + + +class RundirCreateError(ParslError): + pass diff --git a/parsl/dataflow/rundirs.py b/parsl/dataflow/rundirs.py index 141773089e..230d811ca5 100644 --- a/parsl/dataflow/rundirs.py +++ b/parsl/dataflow/rundirs.py @@ -1,11 +1,15 @@ import logging import os +import random +import time from glob import glob +from parsl.dataflow.errors import RundirCreateError + logger = logging.getLogger(__name__) -def make_rundir(path: str) -> str: +def make_rundir(path: str, *, max_tries: int = 3) -> str: """Create a numbered run directory under the specified path. ./runinfo <- specified path @@ -17,23 +21,39 @@ def make_rundir(path: str) -> str: Args: - path (str): String path to root of all rundirs """ - try: - if not os.path.exists(path): - os.makedirs(path) - - prev_rundirs = glob(os.path.join(path, "[0-9]*[0-9]")) - - current_rundir = os.path.join(path, '000') - - if prev_rundirs: - # Since we globbed on files named as 0-9 - x = sorted([int(os.path.basename(x)) for x in prev_rundirs])[-1] - current_rundir = os.path.join(path, '{0:03}'.format(x + 1)) - - os.makedirs(current_rundir) - logger.debug("Parsl run initializing in rundir: {0}".format(current_rundir)) - return os.path.abspath(current_rundir) - - except Exception: - logger.exception("Failed to create run directory") - raise + backoff_time_s = 1 + random.random() + + os.makedirs(path, exist_ok=True) + + # try_count is 1-based for human readability + try_count = 1 + while True: + + # Python 3.10 introduces root_dir argument to glob which in future + # can be used to simplify this code, something like: + # prev_rundirs = glob("[0-9]*[0-9]", root_dir=path) + full_prev_rundirs = glob(os.path.join(path, "[0-9]*[0-9]")) + prev_rundirs = [os.path.basename(d) for d in full_prev_rundirs] + + next = max([int(d) for d in prev_rundirs] + [-1]) + 1 + + current_rundir = os.path.join(path, '{0:03}'.format(next)) + + try: + os.makedirs(current_rundir) + logger.debug("rundir created: %s", current_rundir) + return os.path.abspath(current_rundir) + except FileExistsError: + logger.warning(f"Could not create rundir {current_rundir} on try {try_count}") + + if try_count >= max_tries: + raise + else: + logger.debug("Backing off {}s", backoff_time_s) + time.sleep(backoff_time_s) + backoff_time_s *= 2 + random.random() + try_count += 1 + + # this should never be reached - the above loop should have either returned + # or raised an exception on the last try + raise RundirCreateError() diff --git a/parsl/dataflow/taskrecord.py b/parsl/dataflow/taskrecord.py index 0621ab8f41..a9c2ce1897 100644 --- a/parsl/dataflow/taskrecord.py +++ b/parsl/dataflow/taskrecord.py @@ -14,6 +14,7 @@ import parsl.dataflow.dflow as dflow from parsl.dataflow.states import States +from parsl.trace import Span class TaskRecord(TypedDict, total=False): @@ -101,3 +102,7 @@ class TaskRecord(TypedDict, total=False): """Restricts access to end-of-join behavior to ensure that joins only complete once, even if several joining Futures complete close together in time.""" + + span: Span + """Event tracing span for this task. + """ diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index e350581fe0..0c94938f2c 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -774,21 +774,26 @@ def new_block_info(): if len(block_ids_to_kill) < blocks: logger.warning(f"Could not find enough blocks to kill: wanted {blocks} but only selected {len(block_ids_to_kill)}") + logger.info("Iterating over block IDs") # Hold the block for block_id in block_ids_to_kill: self._hold_block(block_id) + logger.info("Iterated over block IDs") # Now kill via provider # Potential issue with multiple threads trying to remove the same blocks to_kill = [self.blocks_to_job_id[bid] for bid in block_ids_to_kill if bid in self.blocks_to_job_id] + logger.info("Calling provider cancel") r = self.provider.cancel(to_kill) + logger.info("Provide cancel returned") job_ids = self._filter_scale_in_ids(to_kill, r) # to_kill block_ids are fetched from self.blocks_to_job_id # If a block_id is in self.blocks_to_job_id, it must exist in self.job_ids_to_block block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + logger.info("htex scale in returning") return block_ids_killed def _get_launch_command(self, block_id: str) -> str: diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 2c0f47ac11..8820b0e79a 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -27,7 +27,7 @@ from parsl.utils import setproctitle from parsl.version import VERSION as PARSL_VERSION -PKL_HEARTBEAT_CODE = pickle.dumps((2 ** 32) - 1) +PKL_HEARTBEAT_RESPONSE_CODE = pickle.dumps((2 ** 32) - 1) PKL_DRAINED_CODE = pickle.dumps((2 ** 32) - 2) LOGGER_NAME = "interchange" @@ -226,6 +226,7 @@ def process_command(self, monitoring_radio: Optional[MonitoringRadioSender]) -> if self.command_channel in self.socks and self.socks[self.command_channel] == zmq.POLLIN: + logger.debug("Waiting for command request") command_req = self.command_channel.recv_pyobj() logger.debug("Received command request: {}".format(command_req)) if command_req == "CONNECTED_BLOCKS": @@ -379,6 +380,9 @@ def process_task_outgoing_incoming( if msg['type'] == 'registration': # We set up an entry only if registration works correctly + # XXXX this makes a partially initialised manager record visible + # (for example via MANAGERS). Initialize it properly before + # assigning it into _ready_managers? self._ready_managers[manager_id] = {'last_heartbeat': time.time(), 'idle_since': time.time(), 'block_id': None, @@ -396,6 +400,12 @@ def process_task_outgoing_incoming( logger.info(f"Adding manager: {manager_id!r} to ready queue") m = self._ready_managers[manager_id] + # XXXX maybe should be a bit more strict about which fields are + # being copied here... why are we using python_version and also + # copying in python_v here? + # what are the fields we actually care about, rather than this being + # a dumping ground? + # m is a ManagerRecord, but msg is a dict[Any,Any] and so can # contain arbitrary fields beyond those in ManagerRecord (and # indeed does - for example, python_v) which are then ignored @@ -426,8 +436,8 @@ def process_task_outgoing_incoming( manager = self._ready_managers.get(manager_id) if manager: manager['last_heartbeat'] = time.time() - logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) - self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) + logger.debug("Received heartbeat from manager %r via tasks connection", manager_id) + self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_RESPONSE_CODE]) else: logger.warning("Received heartbeat via tasks connection for not-registered manager %r", manager_id) elif msg['type'] == 'drain': @@ -502,6 +512,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: logger.warning(f"Received a result from a un-registered manager: {manager_id!r}") + # this could happen for a heartbeat message (either before registration has happened in another thread, or after cleanup) else: logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id) @@ -521,7 +532,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': - logger.debug("Manager %r sent heartbeat via results connection", manager_id) + logger.debug("Received heartbeat from manager %r via results connection", manager_id) else: logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index dfb8ee7728..d3965d01fc 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -41,7 +41,7 @@ from parsl.serialize import serialize from parsl.version import VERSION as PARSL_VERSION -HEARTBEAT_CODE = (2 ** 32) - 1 +HEARTBEAT_RESPONSE_CODE = (2 ** 32) - 1 DRAINED_CODE = (2 ** 32) - 2 @@ -362,8 +362,8 @@ def pull_tasks(self): tasks = pickle.loads(pkl_msg) last_interchange_contact = time.time() - if tasks == HEARTBEAT_CODE: - logger.debug("Got heartbeat from interchange") + if tasks == HEARTBEAT_RESPONSE_CODE: + logger.debug("Got heartbeat response from interchange") elif tasks == DRAINED_CODE: logger.info("Got fully drained message from interchange - setting kill flag") self._stop_event.set() diff --git a/parsl/executors/high_throughput/zmq_pipes.py b/parsl/executors/high_throughput/zmq_pipes.py index 9b368c713c..ddc0b1b284 100644 --- a/parsl/executors/high_throughput/zmq_pipes.py +++ b/parsl/executors/high_throughput/zmq_pipes.py @@ -44,6 +44,7 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): self.create_socket_and_bind() self._lock = threading.Lock() self.ok = True + self._my_thread = None def create_socket_and_bind(self): """ Creates socket and binds to a port. @@ -59,7 +60,7 @@ def create_socket_and_bind(self): else: self.zmq_socket.bind(tcp_url(self.ip_address, self.port)) - def run(self, message, max_retries=3, timeout_s=None): + def run(self, message, max_retries=3, timeout_s=30): """ This function needs to be fast at the same time aware of the possibility of ZMQ pipes overflowing. @@ -72,9 +73,18 @@ def run(self, message, max_retries=3, timeout_s=None): start_time_s = time.monotonic() + if self._my_thread is None: + self._my_thread = threading.current_thread() + elif self._my_thread != threading.current_thread(): + logger.warning(f"Command socket suspicious thread usage: {self._my_thread} vs {threading.current_thread()}") + # otherwise, _my_thread and current_thread match, which is ok and no need to log + reply = '__PARSL_ZMQ_PIPES_MAGIC__' + logger.debug("acquiring command lock") with self._lock: - for _ in range(max_retries): + logger.debug("acquired command lock") + for i in range(max_retries): + logger.debug(f"try {i} for command {message}") try: logger.debug("Sending command client command") @@ -103,14 +113,15 @@ def run(self, message, max_retries=3, timeout_s=None): else: raise InternalConsistencyError(f"ZMQ poll returned unexpected value: {poll_result}") - logger.debug("Receiving command client response") + logger.debug(f"Receiving command client response to {message}") reply = self.zmq_socket.recv_pyobj() - logger.debug("Received command client response") + logger.debug(f"got response from command {message}") except zmq.ZMQError: logger.exception("Potential ZMQ REQ-REP deadlock caught") - logger.info("Trying to reestablish context") + logger.info("Trying to reestablish context after ZMQError") self.zmq_context.recreate() self.create_socket_and_bind() + self._my_thread = None else: break @@ -152,6 +163,7 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): max_port=port_range[1]) self.poller = zmq.Poller() self.poller.register(self.zmq_socket, zmq.POLLOUT) + self._lock = threading.Lock() def put(self, message): """ This function needs to be fast at the same time aware of the possibility of @@ -162,22 +174,29 @@ def put(self, message): in ZMQ sockets reaching a broken state once there are ~10k tasks in flight. This issue can be magnified if each the serialized buffer itself is larger. """ + logger.debug("Putting task to outgoing_q") timeout_ms = 1 - while True: - socks = dict(self.poller.poll(timeout=timeout_ms)) - if self.zmq_socket in socks and socks[self.zmq_socket] == zmq.POLLOUT: - # The copy option adds latency but reduces the risk of ZMQ overflow - logger.debug("Sending TasksOutgoing message") - self.zmq_socket.send_pyobj(message, copy=True) - logger.debug("Sent TasksOutgoing message") - return - else: - timeout_ms *= 2 - logger.debug("Not sending due to non-ready zmq pipe, timeout: {} ms".format(timeout_ms)) + with self._lock: + while True: + socks = dict(self.poller.poll(timeout=timeout_ms)) + if self.zmq_socket in socks and socks[self.zmq_socket] == zmq.POLLOUT: + # The copy option adds latency but reduces the risk of ZMQ overflow + logger.debug("Sending TasksOutgoing message") + self.zmq_socket.send_pyobj(message, copy=True) + logger.debug("Sent TasksOutgoing message") + return + else: + timeout_ms = max(timeout_ms, 1) + timeout_ms *= 2 + logger.error("Not sending due to non-ready zmq pipe, timeout: {} ms".format(timeout_ms)) + if timeout_ms >= 10000: + logger.error("Hit big timeout, raising exception") + raise RuntimeError("BENC: hit big timeout for pipe put - failing rather than trying forever") def close(self): - self.zmq_socket.close() - self.zmq_context.term() + with self._lock: + self.zmq_socket.close() + self.zmq_context.term() class ResultsIncoming: @@ -208,19 +227,22 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): max_port=port_range[1]) self.poller = zmq.Poller() self.poller.register(self.results_receiver, zmq.POLLIN) + self._lock = threading.Lock() def get(self, timeout_ms=None): """Get a message from the queue, returning None if timeout expires without a message. timeout is measured in milliseconds. """ - socks = dict(self.poller.poll(timeout=timeout_ms)) - if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN: - m = self.results_receiver.recv_multipart() - logger.debug("Received ResultsIncoming message") - return m - else: - return None + with self._lock: + socks = dict(self.poller.poll(timeout=timeout_ms)) + if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN: + m = self.results_receiver.recv_multipart() + logger.debug("Received ResultsIncoming message") + return m + else: + return None def close(self): - self.results_receiver.close() - self.zmq_context.term() + with self._lock: + self.results_receiver.close() + self.zmq_context.term() diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 5896d333b5..54e7721b9b 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -40,6 +40,7 @@ from parsl.executors.taskvine.manager import _taskvine_submit_wait from parsl.executors.taskvine.manager_config import TaskVineManagerConfig from parsl.executors.taskvine.utils import ParslFileToVine, ParslTaskToVine +from parsl.multiprocessing import SpawnContext from parsl.process_loggers import wrap_with_logs from parsl.providers import CondorProvider, LocalProvider from parsl.providers.base import ExecutionProvider @@ -134,13 +135,13 @@ def __init__(self, self.storage_access = storage_access # Queue to send ready tasks from TaskVine executor process to TaskVine manager process - self._ready_task_queue: multiprocessing.Queue = multiprocessing.Queue() + self._ready_task_queue: multiprocessing.Queue = SpawnContext.Queue() # Queue to send finished tasks from TaskVine manager process to TaskVine executor process - self._finished_task_queue: multiprocessing.Queue = multiprocessing.Queue() + self._finished_task_queue: multiprocessing.Queue = SpawnContext.Queue() # Event to signal whether the manager and factory processes should stop running - self._should_stop = multiprocessing.Event() + self._should_stop = SpawnContext.Event() # TaskVine manager process self._submit_process = None @@ -252,17 +253,17 @@ def start(self): "finished_task_queue": self._finished_task_queue, "should_stop": self._should_stop, "manager_config": self.manager_config} - self._submit_process = multiprocessing.Process(target=_taskvine_submit_wait, - name="TaskVine-Submit-Process", - kwargs=submit_process_kwargs) + self._submit_process = SpawnContext.Process(target=_taskvine_submit_wait, + name="TaskVine-Submit-Process", + kwargs=submit_process_kwargs) # Create a process to run the TaskVine factory if enabled. if self.worker_launch_method == 'factory': factory_process_kwargs = {"should_stop": self._should_stop, "factory_config": self.factory_config} - self._factory_process = multiprocessing.Process(target=_taskvine_factory, - name="TaskVine-Factory-Process", - kwargs=factory_process_kwargs) + self._factory_process = SpawnContext.Process(target=_taskvine_factory, + name="TaskVine-Factory-Process", + kwargs=factory_process_kwargs) # Run thread to collect results and set tasks' futures. self._collector_thread = threading.Thread(target=self._collect_taskvine_results, @@ -621,8 +622,8 @@ def _collect_taskvine_results(self): with self._tasks_lock: future = self.tasks.pop(task_report.executor_id) - logger.debug(f'Updating Future for Parsl Task: {task_report.executor_id}. \ - Task {task_report.executor_id} has result_received set to {task_report.result_received}') + logger.debug(f'Updating Future for Parsl Task: {task_report.executor_id}. ' + f'Task {task_report.executor_id} has result_received set to {task_report.result_received}') if task_report.result_received: try: with open(task_report.result_file, 'rb') as f_in: diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 94cfb2e391..0d286574ac 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -151,8 +151,11 @@ def _taskvine_submit_wait(ready_task_queue=None, means that any communication should be done using the multiprocessing module capabilities, rather than shared memory. """ - logger.debug("Starting TaskVine Submit/Wait Process") + print("BENC: about to set proc title") setproctitle("parsl: TaskVine submit/wait") + print("BENC: about to log that submit process starts") + logger.debug("Starting TaskVine Submit/Wait Process") + print("BENC: after log that submit process starts") # Enable debugging flags and create logging file if manager_config.vine_log_dir is not None: diff --git a/parsl/executors/workqueue/exec_parsl_function.py b/parsl/executors/workqueue/exec_parsl_function.py index d19d92efe6..4b206358e2 100644 --- a/parsl/executors/workqueue/exec_parsl_function.py +++ b/parsl/executors/workqueue/exec_parsl_function.py @@ -1,3 +1,26 @@ +import time + +t_start = time.time() +import sys +from typing import Any, List + +metas = [] + + +class MetaPathLogger: + + def find_spec(*args, **kwargs): + metas.append(f"{time.time()} META_PATH {args[0]}") + return None + + +if __name__ == "__main__": + mpl: List[Any] # failure in CI... doesn't happen on my laptop. I haven't dug into why + mpl = [MetaPathLogger] + sys.meta_path = mpl + sys.meta_path + +t_postmeta = time.time() + import pickle import sys import traceback @@ -7,6 +30,7 @@ from parsl.serialize import serialize from parsl.utils import get_std_fname_mode +t_postimport = time.time() # This scripts executes a parsl function which is pickled in a file: # # exec_parsl_function.py map_file function_file result_file @@ -27,9 +51,13 @@ # -def load_pickled_file(filename): +def load_pickled_file(filename, logfile): + print(f"{time.time()} LOADPICKLED_OPEN", file=logfile) with open(filename, "rb") as f_in: - return pickle.load(f_in) + print(f"{time.time()} LOADPICKLED_LOAD", file=logfile) + v = pickle.load(f_in) + print(f"{time.time()} LOADPICKLED_DONE", file=logfile) + return v def dump_result_to_file(result_file, result_package): @@ -138,24 +166,30 @@ def encode_byte_code_function(user_namespace, fn, fn_name, args_name, kwargs_nam return code -def load_function(map_file, function_file): +def load_function(map_file, function_file, logfile): # Decodes the function and its file arguments to be executed into # function_code, and updates a user namespace with the function name and # the variable named result_name. When the function is executed, its result # will be stored in this variable in the user namespace. # Returns (namespace, function_code, result_name) + print(f"{time.time()} LOADFUNCTION_MAKENS", file=logfile) # Create the namespace to isolate the function execution. user_ns = locals() user_ns.update({'__builtins__': __builtins__}) - function_info = load_pickled_file(function_file) + print(f"{time.time()} LOADFUNCTION_LOADPICKLED_FUNCTION", file=logfile) + function_info = load_pickled_file(function_file, logfile) + print(f"{time.time()} LOADFUNCTION_UNPACK", file=logfile) (fn, fn_name, fn_args, fn_kwargs) = unpack_function(function_info, user_ns) - mapping = load_pickled_file(map_file) + print(f"{time.time()} LOADFUNCTION_LOAD_PICKLED_MAPPING", file=logfile) + mapping = load_pickled_file(map_file, logfile) + print(f"{time.time()} LOADFUNCTION_REMAP", file=logfile) remap_all_files(mapping, fn_args, fn_kwargs) + print(f"{time.time()} LOADFUNCTION_ENCODE", file=logfile) (code, result_name) = encode_function(user_ns, fn, fn_name, fn_args, fn_kwargs) return (user_ns, code, result_name) @@ -172,6 +206,7 @@ def execute_function(namespace, function_code, result_name): if __name__ == "__main__": + t_mainstart = time.time() try: # parse the three required command line arguments: # map_file: contains a pickled dictionary to map original names to @@ -180,17 +215,27 @@ def execute_function(namespace, function_code, result_name): # result_file: any output (including exceptions) will be written to # this file. try: - (map_file, function_file, result_file) = sys.argv[1:] + (map_file, function_file, result_file, log_file) = sys.argv[1:] except ValueError: print("Usage:\n\t{} function result mapping\n".format(sys.argv[0])) raise + logfile = open(log_file, "w") + print(f"{t_start} START", file=logfile) + print(f"{t_postmeta} POSTMETA", file=logfile) + print(f"{t_postimport} POSTIMPORT", file=logfile) + print(f"{t_mainstart} MAINSTART", file=logfile) + + t_loadfunction = time.time() + print(f"{t_loadfunction} LOADFUNCTION", file=logfile) try: - (namespace, function_code, result_name) = load_function(map_file, function_file) + (namespace, function_code, result_name) = load_function(map_file, function_file, logfile) except Exception: print("There was an error setting up the function for execution.") raise + t_executefunction = time.time() + print(f"{t_executefunction} EXECUTEFUNCTION", file=logfile) try: result = execute_function(namespace, function_code, result_name) except Exception: @@ -202,8 +247,18 @@ def execute_function(namespace, function_code, result_name): # Write out function result to the result file try: + t_dump = time.time() + print(f"{t_dump} DUMP", file=logfile) dump_result_to_file(result_file, result) except Exception: print("Could not write to result file.") traceback.print_exc() sys.exit(1) + t_printmetas = time.time() + print(f"{t_printmetas} PRINTMETAS", file=logfile) + for m in metas: + print(m, file=logfile) + t_done = time.time() + print(f"{t_done} DONE", file=logfile) + + logfile.close() diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index f3f1219301..b9294b8ac2 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -3,6 +3,7 @@ high-throughput system for delegating Parsl tasks to thousands of remote machines """ +import datetime import hashlib import inspect import itertools @@ -31,10 +32,12 @@ from parsl.executors.errors import ExecutorError, InvalidResourceSpecification from parsl.executors.status_handling import BlockProviderExecutor from parsl.executors.workqueue import exec_parsl_function +from parsl.multiprocessing import SpawnContext, SpawnProcess from parsl.process_loggers import wrap_with_logs from parsl.providers import CondorProvider, LocalProvider from parsl.providers.base import ExecutionProvider from parsl.serialize import deserialize, pack_apply_message +from parsl.trace import Span, event, span_bind_sub from parsl.utils import setproctitle from .errors import WorkQueueFailure, WorkQueueTaskFailure @@ -60,19 +63,16 @@ # Support structure to communicate parsl tasks to the work queue submit thread. -ParslTaskToWq = namedtuple( - 'ParslTaskToWq', - 'id ' - 'category ' - 'cores memory disk gpus priority running_time_min env_pkg map_file function_file result_file input_files output_files') +ParslTaskToWq = namedtuple('ParslTaskToWq', + 'id category cores memory disk gpus priority running_time_min ' + 'env_pkg map_file function_file result_file log_file input_files output_files') # Support structure to communicate final status of work queue tasks to parsl # if result_received is True: # result_file is the path to the file containing the result. # if result_received is False: # reason and status are only valid if result_received is False -# result_file is None -WqTaskToParsl = namedtuple('WqTaskToParsl', 'id result_received result_file reason status') +WqTaskToParsl = namedtuple('WqTaskToParsl', 'id wq_id result_received result_file reason status') # Support structure to report parsl filenames to work queue. # parsl_name is the local_name or filepath attribute of a parsl file object. @@ -245,6 +245,7 @@ def __init__(self, autolabel: bool = False, autolabel_window: int = 1, autocategory: bool = True, + enable_monitoring: bool = False, max_retries: int = 1, init_command: str = "", worker_options: str = "", @@ -260,8 +261,8 @@ def __init__(self, self.scaling_cores_per_worker = scaling_cores_per_worker self.label = label - self.task_queue = multiprocessing.Queue() # type: multiprocessing.Queue - self.collector_queue = multiprocessing.Queue() # type: multiprocessing.Queue + self.task_queue: multiprocessing.Queue = SpawnContext.Queue() + self.collector_queue: multiprocessing.Queue = SpawnContext.Queue() self.address = address self.port = port self.executor_task_counter = -1 @@ -281,8 +282,9 @@ def __init__(self, self.autolabel = autolabel self.autolabel_window = autolabel_window self.autocategory = autocategory + self.enable_monitoring = enable_monitoring self.max_retries = max_retries - self.should_stop = multiprocessing.Value(c_bool, False) + self.should_stop = SpawnContext.Value(c_bool, False) self.cached_envs = {} # type: Dict[int, str] self.worker_options = worker_options self.worker_executable = worker_executable @@ -301,7 +303,7 @@ def __init__(self, self.project_password_file = None # Build foundations of the launch command - self.launch_cmd = ("{package_prefix}python3 exec_parsl_function.py {mapping} {function} {result}") + self.launch_cmd = ("{package_prefix}python3 exec_parsl_function.py {mapping} {function} {result} {log}") if self.init_command != "": self.launch_cmd = self.init_command + "; " + self.launch_cmd @@ -333,7 +335,11 @@ def start(self): logger.debug("Starting WorkQueueExecutor") - port_mailbox = multiprocessing.Queue() + port_mailbox = SpawnContext.Queue() + + logger.warning("BODGE: delay here for hack around often observed futex race...") + time.sleep(15) + logger.warning("BODGE: delay finished") # Create a Process to perform WorkQueue submissions submit_process_kwargs = {"task_queue": self.task_queue, @@ -344,6 +350,7 @@ def start(self): "shared_fs": self.shared_fs, "autolabel": self.autolabel, "autolabel_window": self.autolabel_window, + "enable_monitoring": self.enable_monitoring, "autocategory": self.autocategory, "max_retries": self.max_retries, "should_stop": self.should_stop, @@ -354,9 +361,9 @@ def start(self): "port_mailbox": port_mailbox, "coprocess": self.coprocess } - self.submit_process = multiprocessing.Process(target=_work_queue_submit_wait, - name="WorkQueue-Submit-Process", - kwargs=submit_process_kwargs) + self.submit_process = SpawnProcess(target=_work_queue_submit_wait, + name="WorkQueue-Submit-Process", + kwargs=submit_process_kwargs) self.collector_thread = threading.Thread(target=self._collect_work_queue_results, name="WorkQueue-collector-thread") @@ -401,6 +408,10 @@ def submit(self, func, resource_specification, *args, **kwargs): kwargs : dict Keyword arguments to the Parsl app """ + self.executor_task_counter += 1 + executor_task_id = self.executor_task_counter + executor_task_span = Span("EXECUTOR_TASK", executor_task_id) + event("WQEX_SUBMIT_START", executor_task_span) cores = None memory = None disk = None @@ -408,6 +419,7 @@ def submit(self, func, resource_specification, *args, **kwargs): priority = None category = None running_time_min = None + event("WQEX_SUBMIT_PROCESS_RESOURCE_SPEC_START", executor_task_span) if resource_specification and isinstance(resource_specification, dict): logger.debug("Got resource_specification: {}".format(resource_specification)) @@ -451,11 +463,12 @@ def submit(self, func, resource_specification, *args, **kwargs): elif k == 'running_time_min': running_time_min = resource_specification[k] - self.executor_task_counter += 1 - executor_task_id = self.executor_task_counter + event("WQEX_SUBMIT_PROCESS_RESOURCE_SPEC_END", executor_task_span) # Create a per task directory for the function, result, map, and result files + event("WQEX_SUBMIT_MKDIR_START", executor_task_span) os.mkdir(self._path_in_task(executor_task_id)) + event("WQEX_SUBMIT_MKDIR_END", executor_task_span) input_files = [] output_files = [] @@ -481,8 +494,11 @@ def submit(self, func, resource_specification, *args, **kwargs): fu.parsl_executor_task_id = executor_task_id assert isinstance(resource_specification, dict) fu.resource_specification = resource_specification + fu.parsl_executor_task_span = executor_task_span logger.debug("Getting tasks_lock to set WQ-level task entry") + event("WQEX_SUBMIT_ACQUIRE_TASKS_LOCK_START", executor_task_span) with self.tasks_lock: + event("WQEX_SUBMIT_ACQUIRE_TASKS_LOCK_END", executor_task_span) logger.debug("Got tasks_lock to set WQ-level task entry") self.tasks[str(executor_task_id)] = fu @@ -491,19 +507,25 @@ def submit(self, func, resource_specification, *args, **kwargs): function_file = self._path_in_task(executor_task_id, "function") result_file = self._path_in_task(executor_task_id, "result") map_file = self._path_in_task(executor_task_id, "map") + log_file = self._path_in_task(executor_task_id, "log") logger.debug("Creating executor task {} with function at: {}".format(executor_task_id, function_file)) logger.debug("Creating executor task {} with result to be found at: {}".format(executor_task_id, result_file)) + logger.debug("Creating executor task {} with log to be found at: {}".format(executor_task_id, log_file)) - self._serialize_function(function_file, func, args, kwargs) + event("WQEX_SUBMIT_SERIALIZE_START", executor_task_span) + self._serialize_function(function_file, func, args, kwargs, executor_task_span) + event("WQEX_SUBMIT_SERIALIZE_END", executor_task_span) if self.pack: env_pkg = self._prepare_package(func, self.extra_pkgs) else: env_pkg = None - logger.debug("Constructing map for local filenames at worker for executor task {}".format(executor_task_id)) - self._construct_map_file(map_file, input_files, output_files) + logger.debug("Constructing map for local filenames at worker for task {}".format(executor_task_id)) + event("WQEX_SUBMIT_MAPFILE_START", executor_task_span) + self._construct_map_file(map_file, input_files, output_files, executor_task_span) + event("WQEX_SUBMIT_MAPFILE_END", executor_task_span) if not self.submit_process.is_alive(): raise ExecutorError(self, "Workqueue Submit Process is not alive") @@ -512,21 +534,27 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.debug("Placing executor task {} on message queue".format(executor_task_id)) if category is None: category = func.__name__ if self.autocategory else 'parsl-default' - self.task_queue.put_nowait(ParslTaskToWq(executor_task_id, - category, - cores, - memory, - disk, - gpus, - priority, - running_time_min, - env_pkg, - map_file, - function_file, - result_file, - input_files, - output_files)) - + event("WQEX_SUBMIT_PTWQ_START", executor_task_span) + ptwq = ParslTaskToWq(executor_task_id, + category, + cores, + memory, + disk, + gpus, + priority, + running_time_min, + env_pkg, + map_file, + function_file, + result_file, + log_file, + input_files, + output_files) + + event("WQEX_SUBMIT_ENQUEUE_START", executor_task_span) + self.task_queue.put_nowait(ptwq) + event("WQEX_SUBMIT_ENQUEUE_END", executor_task_span) + event("WQEX_SUBMIT_END", executor_task_span) return fu def _construct_worker_command(self): @@ -555,7 +583,7 @@ def _patch_providers(self): if self.project_password_file: self.provider.transfer_input_files.append(self.project_password_file) - def _serialize_function(self, fn_path, parsl_fn, parsl_fn_args, parsl_fn_kwargs): + def _serialize_function(self, fn_path, parsl_fn, parsl_fn_args, parsl_fn_kwargs, span): """Takes the function application parsl_fn(*parsl_fn_args, **parsl_fn_kwargs) and serializes it to the file fn_path.""" @@ -567,13 +595,19 @@ def _serialize_function(self, fn_path, parsl_fn, parsl_fn_args, parsl_fn_kwargs) "args": parsl_fn_args, "kwargs": parsl_fn_kwargs} else: + event("WQEX_SUBMIT_SERIALIZE_PACK_APPLY", span) function_info = {"byte code": pack_apply_message(parsl_fn, parsl_fn_args, parsl_fn_kwargs, - buffer_threshold=1024 * 1024)} + buffer_threshold=1024 * 1024, + super_span=span)} + event("WQEX_SUBMIT_SERIALIZE_OPEN", span) with open(fn_path, "wb") as f_out: + event("WQEX_SUBMIT_SERIALIZE_PICKLEDUMP", span) pickle.dump(function_info, f_out) + event("WQEX_SUBMIT_SERIALIZE_CLOSING", span) + event("WQEX_SUBMIT_SERIALIZE_CLOSED", span) - def _construct_map_file(self, map_file, input_files, output_files): + def _construct_map_file(self, map_file, input_files, output_files, span): """ Map local filepath of parsl files to the filenames at the execution worker. If using a shared filesystem, the filepath is mapped to its absolute filename. Otherwise, to its original relative filename. In this later case, work queue @@ -586,8 +620,12 @@ def _construct_map_file(self, map_file, input_files, output_files): else: remote_name = local_name file_translation_map[local_name] = remote_name + event("WQEX_SUBMIT_MAPFILE_OPEN", span) with open(map_file, "wb") as f_out: + event("WQEX_SUBMIT_MAPFILE_PICKLEDUMP", span) pickle.dump(file_translation_map, f_out) + event("WQEX_SUBMIT_MAPFILE_CLOSING", span) + event("WQEX_SUBMIT_MAPFILE_CLOSED", span) def _register_file(self, parsl_file): """Generates a tuple (parsl_file.filepath, stage, cache) to give to @@ -712,6 +750,26 @@ def shutdown(self, *args, **kwargs): logger.debug("Work Queue shutdown completed") + # TODO: factor this with htex - perhaps it should exist only in the + # block provider, and there should be no implementation of this at + # all in the base executor class (because this is only block + # relevant) + def create_monitoring_info(self, status): + """ Create a msg for monitoring based on the poll status + + """ + msg = [] + for bid, s in status.items(): + d = {} + d['run_id'] = self.run_id + d['status'] = s.status_name + d['timestamp'] = datetime.datetime.now() + d['executor_label'] = self.label + d['job_id'] = self.blocks_to_job_id.get(bid, None) + d['block_id'] = bid + msg.append(d) + return msg + @wrap_with_logs def _collect_work_queue_results(self): """Sets the values of tasks' futures of tasks completed by work queue. @@ -728,6 +786,25 @@ def _collect_work_queue_results(self): except queue.Empty: continue + # it would be nicer to do this right after submission, but + # at time of writing, that happens in a different process and + # it's not straightforward to get that value back to the main + # process where in-memory tracing is stored. + + # 1. how can binding work here with an "external" thing to bind to? (the work queue task, + # which doesn't have a Span() representation... should I make a span representation + # here? + + # 2. this creates a new Span object for the numbered executor task, which means relying + # on python object identity won't work for matching this binding with the spans + # elsewhere. At present, that binding works ok because we assume theres only ever one + # EXECUTOR_TASK 0, for example, ignoring all other executors that may exist in the same + # process - but I think this is the wrong way to proceed. + + executor_task_span = Span("EXECUTOR_TASK", int(task_report.id)) + wq_task_span = Span("WQ_TASK", task_report.wq_id) + span_bind_sub(executor_task_span, wq_task_span) + # Obtain the future from the tasks dictionary with self.tasks_lock: future = self.tasks.pop(task_report.id) @@ -779,6 +856,7 @@ def _work_queue_submit_wait(*, shared_fs: bool, autolabel: bool, autolabel_window: int, + enable_monitoring: bool, autocategory: bool, max_retries: Optional[int], should_stop, # multiprocessing.Value is an awkward type alias from inside multiprocessing @@ -826,6 +904,10 @@ def _work_queue_submit_wait(*, if project_password_file: q.specify_password_file(project_password_file) + if enable_monitoring: + logger.info("BENC: enabling WQ monitoring") + q.enable_monitoring() + if autolabel: q.enable_monitoring() if autolabel_window is not None: @@ -875,7 +957,8 @@ def _work_queue_submit_wait(*, command_str = launch_cmd.format(package_prefix=pkg_pfx, mapping=os.path.basename(task.map_file), function=os.path.basename(task.function_file), - result=os.path.basename(task.result_file)) + result=os.path.basename(task.result_file), + log=os.path.basename(task.log_file)) logger.debug(command_str) # Create WorkQueue task for the command @@ -886,13 +969,15 @@ def _work_queue_submit_wait(*, "parsl_coprocess", os.path.basename(task.map_file), os.path.basename(task.function_file), - os.path.basename(task.result_file)) + os.path.basename(task.result_file), + os.path.basename(task.log_file)) t.specify_exec_method("direct") logger.debug("Sending executor task {} to coprocess".format(task.id)) except Exception as e: logger.error("Unable to create task: {}".format(e)) collector_queue.put_nowait(WqTaskToParsl(id=task.id, + wq_id=-1, result_received=False, result_file=None, reason="task could not be created by work queue", @@ -931,6 +1016,7 @@ def _work_queue_submit_wait(*, t.specify_input_file(task.function_file, cache=False) t.specify_input_file(task.map_file, cache=False) t.specify_output_file(task.result_file, cache=False) + t.specify_output_file(task.log_file, cache=False) t.specify_tag(str(task.id)) result_file_of_task_id[str(task.id)] = task.result_file @@ -954,6 +1040,7 @@ def _work_queue_submit_wait(*, except Exception as e: logger.error("Unable to submit task to work queue: {}".format(e)) collector_queue.put_nowait(WqTaskToParsl(id=task.id, + wq_id=-1, result_received=False, result_file=None, reason="task could not be submited to work queue", @@ -971,7 +1058,7 @@ def _work_queue_submit_wait(*, continue # When a task is found: executor_task_id = t.tag - logger.debug("Completed Work Queue task {}, executor task {}".format(t.id, t.tag)) + logger.info("Completed Work Queue task {}, executor task {}".format(t.id, t.tag)) result_file = result_file_of_task_id.pop(t.tag) # A tasks completes 'succesfully' if it has result file. @@ -981,6 +1068,7 @@ def _work_queue_submit_wait(*, if os.path.exists(result_file): logger.debug("Found result in {}".format(result_file)) collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id, + wq_id=t.id, result_received=True, result_file=result_file, reason=None, @@ -995,6 +1083,7 @@ def _work_queue_submit_wait(*, logger.debug("Task with executor id {} / Work Queue id {} failed because:\n{}" .format(executor_task_id, t.id, reason)) collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id, + wq_id=-1, result_received=False, result_file=None, reason=reason, diff --git a/parsl/executors/workqueue/parsl_coprocess.py b/parsl/executors/workqueue/parsl_coprocess.py index 0bbcde89cc..f76c7d2de1 100755 --- a/parsl/executors/workqueue/parsl_coprocess.py +++ b/parsl/executors/workqueue/parsl_coprocess.py @@ -145,21 +145,16 @@ def main(): def name(): return 'parsl_coprocess' @remote_execute -def run_parsl_task(a, b, c): +def run_parsl_task(a, b, c, d): import parsl.executors.workqueue.exec_parsl_function as epf - try: - (map_file, function_file, result_file) = (a, b, c) - try: - (namespace, function_code, result_name) = epf.load_function(map_file, function_file) - except Exception: - raise + map_file, function_file, result_file, log_file = (a, b, c, d) + with open(log_file, 'w') as logfile: try: + namespace, function_code, result_name = epf.load_function(map_file, function_file, logfile) result = epf.execute_function(namespace, function_code, result_name) except Exception: - raise - except Exception: - result = RemoteExceptionWrapper(*sys.exc_info()) - epf.dump_result_to_file(result_file, result) + result = RemoteExceptionWrapper(*sys.exc_info()) + epf.dump_result_to_file(result_file, result) return None if __name__ == "__main__": main() diff --git a/parsl/executors/workqueue/parsl_coprocess_stub.py b/parsl/executors/workqueue/parsl_coprocess_stub.py index e914f6faf1..89e026acd0 100755 --- a/parsl/executors/workqueue/parsl_coprocess_stub.py +++ b/parsl/executors/workqueue/parsl_coprocess_stub.py @@ -9,23 +9,19 @@ def name(): return 'parsl_coprocess' -def run_parsl_task(a, b, c): +def run_parsl_task(a, b, c, d): import parsl.executors.workqueue.exec_parsl_function as epf - try: - (map_file, function_file, result_file) = (a, b, c) + (map_file, function_file, result_file, log_file) = (a, b, c, d) + with open(log_file, "w") as logfile: try: - (namespace, function_code, result_name) = epf.load_function(map_file, function_file) - except Exception: - raise - try: + (namespace, function_code, result_name) = epf.load_function(map_file, function_file, logfile) result = epf.execute_function(namespace, function_code, result_name) + except Exception: - raise - except Exception: - result = RemoteExceptionWrapper(*sys.exc_info()) + result = RemoteExceptionWrapper(*sys.exc_info()) - epf.dump_result_to_file(result_file, result) + epf.dump_result_to_file(result_file, result) return None diff --git a/parsl/jobs/job_status_poller.py b/parsl/jobs/job_status_poller.py index 172d5195b6..80f367b898 100644 --- a/parsl/jobs/job_status_poller.py +++ b/parsl/jobs/job_status_poller.py @@ -3,6 +3,7 @@ from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.strategy import Strategy +from parsl.process_loggers import wrap_with_logs from parsl.utils import Timer logger = logging.getLogger(__name__) @@ -16,10 +17,15 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float, max_idletime=max_idletime) super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller") + @wrap_with_logs def poll(self) -> None: + logger.info("POLL: update state") self._update_state() + logger.debug("POLL: run error handlers") self._run_error_handlers(self._executors) + logger.debug("POLL: strategize") self._strategy.strategize(self._executors) + logger.debug("POLL: done") def _run_error_handlers(self, executors: List[BlockProviderExecutor]) -> None: for e in executors: diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index 8ca50a47e4..73c4d59555 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -193,7 +193,9 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_ self.executors[label]['first'] = False # Tasks that are either pending completion + logger.debug("getting outstanding (which looks like an attribute reference but is actually a network operation") active_tasks = executor.outstanding + logger.debug(f"got outstanding {active_tasks}") status = executor.status_facade @@ -225,6 +227,11 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_ if active_tasks > 0 and self.executors[executor.label]['idle_since']: self.executors[executor.label]['idle_since'] = None + logger.debug(f"METRIC STRATEGY {executor.label} " + f"active_tasks={active_tasks} " + f"running_blocks={running} pending_blocks={pending} " + f"active_blocks={active_blocks} active_slots={active_slots}") + # Case 1 # No tasks. if active_tasks == 0: @@ -254,6 +261,7 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_ # we have to scale_in now. logger.debug(f"Idle time has reached {self.max_idletime}s for executor {label}; scaling in") executor.scale_in_facade(active_blocks - min_blocks) + logger.debug("executor.scale_in_facade returned") else: logger.debug( @@ -303,6 +311,7 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_ excess_blocks = min(excess_blocks, active_blocks - min_blocks) logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s") executor.scale_in_facade(excess_blocks, max_idletime=self.max_idletime) + logger.debug("executor.scale_in_facade returned") else: logger.error("This strategy does not support scaling in except for HighThroughputExecutor - taking no action") else: diff --git a/parsl/monitoring/allprocs.py b/parsl/monitoring/allprocs.py new file mode 100644 index 0000000000..ebbddb39d1 --- /dev/null +++ b/parsl/monitoring/allprocs.py @@ -0,0 +1,35 @@ +import pickle +import random +import signal +import time +from typing import Any, List + +import psutil + +records: List +records = [] + + +def summarize_and_exit(sig: Any, frame: Any) -> None: + with open("./allprocs.pickle", "wb") as f: + pickle.dump(records, f) + + raise KeyboardInterrupt + + +signal.signal(signal.SIGTERM, summarize_and_exit) +signal.signal(signal.SIGINT, summarize_and_exit) + + +while True: + pass_start = time.time() + + for p in psutil.process_iter(attrs=['pid', 'name', 'ppid', 'create_time', 'cmdline', 'cpu_times', 'memory_info']): + timestamp = time.time() + records.append((timestamp, p.info)) + + pass_end = time.time() + + print(pass_end - pass_start) + + time.sleep(3 + random.random() * 7) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 3fbe5736ba..015e7bec43 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -196,7 +196,10 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No def send(self, message: TaggedMonitoringMessage) -> None: logger.debug("Sending message type %s", message[0]) + t_before = time.time() self.radio.send(message) + t_after = time.time() + logger.debug(f"Sent message in {t_after - t_before} seconds") def close(self) -> None: logger.info("Terminating Monitoring Hub") diff --git a/parsl/monitoring/visualization/plots/default/workflow_plots.py b/parsl/monitoring/visualization/plots/default/workflow_plots.py index 517c6ab6aa..e75fa8f366 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_plots.py @@ -125,8 +125,8 @@ def task_per_app_plot(task, status, time_completed): yaxis=dict(title='Number of tasks'), title="Execution tries per app")) return plot(fig, show_link=False, output_type="div", include_plotlyjs=False) - except Exception as e: - return "The tasks per app plot cannot be generated because of exception {}.".format(e) + except Exception: + raise def total_tasks_plot(df_task, df_status, columns=20): diff --git a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py index c7cd776d81..fd6c09f1ed 100644 --- a/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py +++ b/parsl/monitoring/visualization/plots/default/workflow_resource_plots.py @@ -237,6 +237,5 @@ def resource_efficiency(resource, node, label): yaxis=dict(title=yaxis), title=title)) return plot(fig, show_link=False, output_type="div", include_plotlyjs=False) - except Exception as e: - print(e) - return "The resource efficiency plot cannot be generated because of exception {}.".format(e) + except Exception: + raise diff --git a/parsl/monitoring/visualization/views.py b/parsl/monitoring/visualization/views.py index 8e34119143..eb123c3919 100644 --- a/parsl/monitoring/visualization/views.py +++ b/parsl/monitoring/visualization/views.py @@ -34,7 +34,8 @@ def format_time(value): rounded_timedelta = datetime.timedelta(days=value.days, seconds=value.seconds) return rounded_timedelta else: - return "Incorrect time format found (neither float nor datetime.datetime object)" + print("Warning: Incorrect time format (neither float nor datetime object): {}, type: {}".format(value, type(value))) # TODO: use logging + return "-" def format_duration(value): diff --git a/parsl/multiprocessing.py b/parsl/multiprocessing.py index a468d840ec..1153e2dcb9 100644 --- a/parsl/multiprocessing.py +++ b/parsl/multiprocessing.py @@ -6,14 +6,20 @@ import multiprocessing.queues import platform from multiprocessing.context import ForkProcess as ForkProcessType +from multiprocessing.context import SpawnProcess as SpawnProcessType from typing import Callable logger = logging.getLogger(__name__) ForkContext = multiprocessing.get_context("fork") + +# for more general compatibility, spawncontext should maybe be +# "anything except fork", with whatever the platform default +# is unless it's Fork. SpawnContext = multiprocessing.get_context("spawn") ForkProcess: Callable[..., ForkProcessType] = ForkContext.Process +SpawnProcess: Callable[..., SpawnProcessType] = SpawnContext.Process class MacSafeQueue(multiprocessing.queues.Queue): diff --git a/parsl/providers/local/unreliable.py b/parsl/providers/local/unreliable.py new file mode 100644 index 0000000000..02cde3a34e --- /dev/null +++ b/parsl/providers/local/unreliable.py @@ -0,0 +1,15 @@ +from parsl.providers.local.local import LocalProvider + + +class UnreliableLocalProvider(LocalProvider): + + def __init__(self, *args, **kwargs): + self._unreliable_count = 7 + super().__init__(*args, **kwargs) + + def submit(self, *args, **kwargs): + if self._unreliable_count > 0: + self._unreliable_count -= 1 + raise RuntimeError("Unreliable provider still counting down initial failures") + + super().submit(*args, **kwargs) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 2e02e2b983..8d303e2d15 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,10 +1,12 @@ import importlib import logging -from typing import Any, Dict, List, Union +import uuid +from typing import Any, Dict, List, Optional, Union import parsl.serialize.concretes as concretes from parsl.serialize.base import SerializerBase from parsl.serialize.errors import DeserializerPluginError +from parsl.trace import Span, event, span_bind_sub logger = logging.getLogger(__name__) @@ -36,7 +38,11 @@ def register_method_for_data(s: SerializerBase) -> None: additional_methods_for_deserialization: Dict[bytes, SerializerBase] = {} -def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: +def pack_apply_message(func: Any, + args: Any, + kwargs: Any, + buffer_threshold: int = int(128 * 1e6), + super_span: Optional[Span] = None) -> bytes: """Serialize and pack function and parameters Parameters @@ -55,10 +61,24 @@ def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int Limits buffer to specified size in bytes. Exceeding this limit would give you a warning in the log. Default is 128MB. """ + pack_apply_id = str(uuid.uuid4()) + pack_apply_span = Span("PACKAPPLY", pack_apply_id) + if super_span is not None: + span_bind_sub(super_span, pack_apply_span) + + event("SERIALIZE_PACK_APPLY_FUNC", pack_apply_span) b_func = serialize(func, buffer_threshold=buffer_threshold) + + event("SERIALIZE_PACK_APPLY_ARGS", pack_apply_span) b_args = serialize(args, buffer_threshold=buffer_threshold) + + event("SERIALIZE_PACK_APPLY_KWARGS", pack_apply_span) b_kwargs = serialize(kwargs, buffer_threshold=buffer_threshold) + + event("SERIALIZE_PACK_APPLY_PACK_BUFFERS", pack_apply_span) packed_buffer = pack_buffers([b_func, b_args, b_kwargs]) + + event("SERIALIZE_PACK_APPLY_END", pack_apply_span) return packed_buffer diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index cc69d56186..a763843c9d 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -56,6 +56,7 @@ def fresh_config(): ) ], strategy='simple', + strategy_period=0.5, app_cache=True, checkpoint_mode='task_exit', retries=2, monitoring=MonitoringHub( diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 9f105af25d..cf4281a384 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -2,8 +2,21 @@ from parsl.config import Config from parsl.monitoring import MonitoringHub -config = Config(executors=[ThreadPoolExecutor(label='threads', max_threads=4)], - monitoring=MonitoringHub( + +# BENC: temp class for dev purposes. should test both UDP and filesystem +# radiomodes with local executor. +class TestExecutor(ThreadPoolExecutor): + radio_mode = "filesystem" + + +def fresh_config(): + executor = TestExecutor(label='threads', max_threads=4) + + # BENC: this is to check I'm overriding in subclass properly + assert executor.radio_mode == "filesystem" + + return Config(executors=[executor], + monitoring=MonitoringHub( hub_address="localhost", resource_monitoring_interval=3, ) diff --git a/parsl/tests/configs/workqueue_ex.py b/parsl/tests/configs/workqueue_ex.py index 7e9a696777..5df64d6a5c 100644 --- a/parsl/tests/configs/workqueue_ex.py +++ b/parsl/tests/configs/workqueue_ex.py @@ -3,8 +3,15 @@ from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.http import HTTPInTaskStaging from parsl.executors import WorkQueueExecutor +from parsl.monitoring import MonitoringHub def fresh_config(): return Config(executors=[WorkQueueExecutor(port=9000, - coprocess=True)]) + coprocess=True)], + strategy_period=0.5, + monitoring=MonitoringHub(hub_address="localhost", + hub_port=55055, + monitoring_debug=True, + resource_monitoring_interval=1, + )) diff --git a/parsl/tests/configs/workqueue_monitoring.py b/parsl/tests/configs/workqueue_monitoring.py new file mode 100644 index 0000000000..e3efd6f3a4 --- /dev/null +++ b/parsl/tests/configs/workqueue_monitoring.py @@ -0,0 +1,22 @@ +from parsl.config import Config +from parsl.data_provider.file_noop import NoOpFileStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.executors import WorkQueueExecutor +from parsl.monitoring import MonitoringHub +from parsl.providers import LocalProvider + + +def fresh_config(): + return Config(strategy='simple', + executors=[WorkQueueExecutor(port=9000, coprocess=True, + provider=LocalProvider(init_blocks=0))], + monitoring=MonitoringHub(hub_address="localhost", + hub_port=55055, + monitoring_debug=True, + resource_monitoring_interval=1, + ) + ) + + +config = fresh_config() diff --git a/parsl/tests/configs/workqueue_monitoring_config.py b/parsl/tests/configs/workqueue_monitoring_config.py new file mode 100644 index 0000000000..3292e3b50b --- /dev/null +++ b/parsl/tests/configs/workqueue_monitoring_config.py @@ -0,0 +1,12 @@ +from parsl.tests.configs.workqueue_monitoring import fresh_config + +# this is a separate file so that it can be imported only +# when used with the whole test suite vs workqueue + +# otherwise, attempting to import the workqueue_monitoring +# module fails if workqueue isnt' around. + +# there might be a better way to do this that looks like how other +# stuff is done + +config = fresh_config() diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 4f1281025c..8b76d3afc8 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -23,6 +23,7 @@ import pytest import parsl +import parsl.trace as pt from parsl.dataflow.dflow import DataFlowKernelLoader from parsl.utils import RepresentationMixin @@ -169,6 +170,14 @@ def pytest_configure(config): ) +@pytest.fixture(autouse=True, scope='session') +def handle_microtracing(): + pt.trace_by_logger = True + pt.trace_by_dict = True + yield + pt.output_event_stats() + + @pytest.fixture(autouse=True, scope='session') def load_dfk_session(request, pytestconfig, tmpd_cwd_session): """Load a dfk around entire test suite, except in local mode. diff --git a/parsl/tests/site_tests/test_provider.py b/parsl/tests/site_tests/test_provider.py index 43a8c592d6..7e8c8e6a15 100644 --- a/parsl/tests/site_tests/test_provider.py +++ b/parsl/tests/site_tests/test_provider.py @@ -20,7 +20,7 @@ def platform(sleep=10, stdout=None): @pytest.mark.local -@pytest.mark.skip("This test cannot run on sites which cannot be identified by site_config_selector") +@pytest.mark.site def test_provider(): """ Provider scaling """ diff --git a/parsl/tests/site_tests/test_site.py b/parsl/tests/site_tests/test_site.py index 6ce3bc2e43..d5721a289a 100644 --- a/parsl/tests/site_tests/test_site.py +++ b/parsl/tests/site_tests/test_site.py @@ -16,7 +16,7 @@ def platform(sleep=10, stdout=None): @pytest.mark.local -@pytest.mark.skip("The behaviour this test is testing is unclear: there is no guarantee that tasks will go to different nodes") +@pytest.mark.site def test_platform(n=2, sleep_dur=10): """ This should sleep to make sure that concurrent apps will go to different workers on different nodes. diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 7def2b736c..5997b5e198 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -2,6 +2,7 @@ import parsl from parsl.app.app import python_app +from parsl.config import Config from parsl.executors import WorkQueueExecutor from parsl.executors.errors import InvalidResourceSpecification from parsl.executors.high_throughput.executor import HighThroughputExecutor @@ -23,7 +24,7 @@ def test_resource(n=2): break # Specify incorrect number of resources - spec = {'cores': 2, 'memory': 1000} + spec = {'cores': 1, 'memory': 1} fut = double(n, parsl_resource_specification=spec) try: fut.result() @@ -32,10 +33,13 @@ def test_resource(n=2): isinstance(executor, HighThroughputExecutor) or isinstance(executor, WorkQueueExecutor) or isinstance(executor, ThreadPoolExecutor)) + else: + pass # TODO: what are we asserting when an exception didn't happen? + # especially wrt taskvine # Specify resources with wrong types # 'cpus' is incorrect, should be 'cores' - spec = {'cpus': 2, 'memory': 1000, 'disk': 1000} + spec = {'cpus': 1, 'memory': 1, 'disk': 1} fut = double(n, parsl_resource_specification=spec) try: fut.result() @@ -44,3 +48,22 @@ def test_resource(n=2): isinstance(executor, HighThroughputExecutor) or isinstance(executor, WorkQueueExecutor) or isinstance(executor, ThreadPoolExecutor)) + else: + pass # TODO: what are we asserting when an exception didn't happen? + # especially wrt taskvine + + +@python_app +def long_delay(parsl_resource_specification={}): + import time + time.sleep(30) + + +@pytest.mark.skip('I need to understand whats happening here better') +@pytest.mark.local +def test_wq_resource_excess(): + c = Config(executors=[WorkQueueExecutor(port=9000, enable_monitoring=True)]) + + parsl.load(c) + f = long_delay(parsl_resource_specification={'memory': 1, 'disk': 1, 'cores': 1}) + assert f.exception() is not None, "This should have failed" diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index 9ffa21df01..507d569f7b 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -12,11 +12,9 @@ @parsl.python_app def this_app(): - # this delay needs to be several times the resource monitoring - # period configured in the test configuration, so that some - # messages are actually sent - there is no guarantee that any - # (non-first) resource message will be sent at all for a short app. - time.sleep(3) + # TODO: deleted this sleep because we will always send a final resource message + # rather than requiring polling to happen - since TODO PR ##### + # time.sleep(3) return 5 @@ -66,6 +64,7 @@ def workqueue_config(): def taskvine_config(): c = Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), worker_launch_method='provider')], + strategy_period=0.5, monitoring=MonitoringHub(hub_address="localhost", resource_monitoring_interval=1)) @@ -88,8 +87,12 @@ def test_row_counts(tmpd_cwd, fresh_config): config.run_dir = tmpd_cwd config.monitoring.logging_endpoint = db_url + print(f"load {time.time()}") with parsl.load(config): + print(f"start {time.time()}") assert this_app().result() == 5 + print(f"end {time.time()}") + print(f"unload {time.time()}") # at this point, we should find one row in the monitoring database. diff --git a/parsl/tests/test_monitoring/test_mon_local/__init__.py b/parsl/tests/test_monitoring/test_mon_local/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/tests/test_monitoring/test_mon_local/test_basic.py b/parsl/tests/test_monitoring/test_mon_local/test_basic.py new file mode 100644 index 0000000000..d51c46bd0c --- /dev/null +++ b/parsl/tests/test_monitoring/test_mon_local/test_basic.py @@ -0,0 +1,96 @@ +import logging +import os +import time + +import pytest +from sqlalchemy import text + +import parsl + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def this_app(): + # this delay needs to be several times the resource monitoring + # period configured in the test configuration, so that some + # messages are actually sent - there is no guarantee that any + # (non-first) resource message will be sent at all for a short app. + time.sleep(3) + + return 5 + + +@pytest.mark.local +def test_row_counts(): + # this is imported here rather than at module level because + # it isn't available in a plain parsl install, so this module + # would otherwise fail to import and break even a basic test + # run. + import sqlalchemy + + from parsl.tests.configs.local_threads_monitoring import fresh_config + + if os.path.exists("runinfo/monitoring.db"): + logger.info("Monitoring database already exists - deleting") + os.remove("runinfo/monitoring.db") + + logger.info("Generating fresh config") + c = fresh_config() + logger.info("Loading parsl") + parsl.load(c) + + logger.info("invoking and waiting for result") + assert this_app().result() == 5 + + logger.info("cleaning up parsl") + parsl.dfk().cleanup() + parsl.clear() + + # at this point, we should find one row in the monitoring database. + + logger.info("checking database content") + engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") + with engine.begin() as connection: + + result = connection.execute(text("SELECT COUNT(*) FROM workflow")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM task")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM try")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM status, try " + "WHERE status.task_id = try.task_id " + "AND status.task_status_name='exec_done' " + "AND task_try_time_running is NULL")) + (c, ) = result.first() + assert c == 0 + + # DIFF WRT ORIGINAL TEST: there is no concept of 'node' in local thread execution + # result = connection.execute(text("SELECT COUNT(*) FROM node")) + # (c, ) = result.first() + # assert c == 2 + + # DIFF WRT ORIGINAL TEST: there is no concept of block in local thread execution + # There should be one block polling status + # local provider has a status_polling_interval of 5s + # result = connection.execute(text("SELECT COUNT(*) FROM block")) + # (c, ) = result.first() + # assert c >= 2 + + # DIFF WRT ORIGINAL TEST: there is no resource monitoring with local thread executor + # result = connection.execute(text("SELECT COUNT(*) FROM resource")) + # (c, ) = result.first() + # assert c >= 1 + + logger.info("all done") + + +if __name__ == "__main__": + test_row_counts() diff --git a/parsl/tests/test_monitoring/test_mon_local/test_db_locks.py b/parsl/tests/test_monitoring/test_mon_local/test_db_locks.py new file mode 100644 index 0000000000..a630f24213 --- /dev/null +++ b/parsl/tests/test_monitoring/test_mon_local/test_db_locks.py @@ -0,0 +1,92 @@ + +import logging +import os +import time + +import pytest +from sqlalchemy import text + +import parsl + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def this_app(): + return 5 + + +@pytest.mark.local +def test_row_counts(): + import sqlalchemy + + from parsl.tests.configs.htex_local_alternate import fresh_config + if os.path.exists("runinfo/monitoring.db"): + logger.info("Monitoring database already exists - deleting") + os.remove("runinfo/monitoring.db") + + engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") + + logger.info("loading parsl") + parsl.load(fresh_config()) + + # parsl.load() returns before all initialisation of monitoring + # is complete, which means it isn't safe to take a read lock on + # the database yet. This delay tries to work around that - some + # better async behaviour might be nice, but I'm not sure what. + time.sleep(10) + + # to get an sqlite3 read lock that is held over a controllable + # long time, create a transaction and perform a SELECT in it. + # (see bottom of https://sqlite.org/lockingv3.html) + + # there's an awkward race here: parsl.load() returns before the + # database might have been created, and so then the db manager will + # crash (and if there is a retry loop there instead, I think it will + # hang until after the read lock stuff below is finished? which might + # be acceptable? if it's meant to be properly async and not blocking?) + # ... in which case, initialise parsl *after taking the lock* would also + # work (although the select statement to get that lock wouldn't be the same + # because it wouldn't be able to select from the right table) + + logger.info("Getting a read lock on the monitoring database") + with engine.begin() as readlock_connection: + readlock_connection.execute(text("BEGIN TRANSACTION")) + result = readlock_connection.execute(text("SELECT COUNT(*) FROM workflow")) + (c, ) = result.first() + assert c == 1 + # now readlock_connection should have a read lock that will + # stay locked until the transaction is ended, or the with + # block ends. + + logger.info("invoking and waiting for result") + assert this_app().result() == 5 + + # there is going to be some raciness here making sure that + # the database manager actually tries to write while the + # read lock is held. I'm not sure if there is a better way + # to detect this other than a hopefully long-enough sleep. + time.sleep(10) + + logger.info("cleaning up parsl") + parsl.dfk().cleanup() + parsl.clear() + + # at this point, we should find one row in the monitoring database. + + logger.info("checking database content") + with engine.begin() as connection: + + result = connection.execute(text("SELECT COUNT(*) FROM workflow")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM task")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM try")) + (c, ) = result.first() + assert c == 1 + + logger.info("all done") diff --git a/parsl/tests/test_monitoring/test_mon_local/test_memoization_representation.py b/parsl/tests/test_monitoring/test_mon_local/test_memoization_representation.py new file mode 100644 index 0000000000..2ba2810bc0 --- /dev/null +++ b/parsl/tests/test_monitoring/test_mon_local/test_memoization_representation.py @@ -0,0 +1,84 @@ + +import logging +import os + +import pytest +from sqlalchemy import text + +import parsl + +logger = logging.getLogger(__name__) + + +@parsl.python_app(cache=True) +def this_app(x): + return x + 1 + + +@pytest.mark.local +def test_hashsum(): + import sqlalchemy + + from parsl.tests.configs.local_threads_monitoring import fresh_config + + if os.path.exists("runinfo/monitoring.db"): + logger.info("Monitoring database already exists - deleting") + os.remove("runinfo/monitoring.db") + + logger.info("loading parsl") + parsl.load(fresh_config()) + + logger.info("invoking and waiting for result (1/4)") + f1 = this_app(4) + assert f1.result() == 5 + + logger.info("invoking and waiting for result (2/4)") + f2 = this_app(17) + assert f2.result() == 18 + + logger.info("invoking and waiting for result (3/4)") + f3 = this_app(4) + assert f3.result() == 5 + + logger.info("invoking and waiting for result (4/4)") + f4 = this_app(4) + assert f4.result() == 5 + + assert f1.task_record['hashsum'] == f3.task_record['hashsum'] + assert f1.task_record['hashsum'] == f4.task_record['hashsum'] + assert f1.task_record['hashsum'] != f2.task_record['hashsum'] + + logger.info("cleaning up parsl") + parsl.dfk().cleanup() + parsl.clear() + + # at this point, we should find one row in the monitoring database. + + logger.info("checking database content") + engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") + with engine.begin() as connection: + + # we should have three tasks, but with only two tries, because the + # memo try should be missing + result = connection.execute(text("SELECT COUNT(*) FROM task")) + (task_count, ) = result.first() + assert task_count == 4 + + # this will check that the number of task rows for each hashsum matches the above app invocations + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_record['hashsum']}'")) + (hashsum_count, ) = result.first() + assert hashsum_count == 3 + + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_record['hashsum']}'")) + (hashsum_count, ) = result.first() + assert hashsum_count == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='exec_done'")) + (memo_count, ) = result.first() + assert memo_count == 2 + + result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='memo_done'")) + (memo_count, ) = result.first() + assert memo_count == 2 + + logger.info("all done") diff --git a/parsl/tests/test_monitoring/test_mon_wq/__init__.py b/parsl/tests/test_monitoring/test_mon_wq/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/tests/test_monitoring/test_mon_wq/test_basic.py b/parsl/tests/test_monitoring/test_mon_wq/test_basic.py new file mode 100644 index 0000000000..d525233550 --- /dev/null +++ b/parsl/tests/test_monitoring/test_mon_wq/test_basic.py @@ -0,0 +1,107 @@ +import logging +import os +import time + +import pytest +from sqlalchemy import text + +import parsl + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def this_app(): + # this delay needs to be several times the resource monitoring + # period configured in the test configuration, so that some + # messages are actually sent - there is no guarantee that any + # (non-first) resource message will be sent at all for a short app. + time.sleep(3) + + return 5 + + +@pytest.mark.local +def test_row_counts(): + # this is imported here rather than at module level because + # it isn't available in a plain parsl install, so this module + # would otherwise fail to import and break even a basic test + # run. + import sqlalchemy + + from parsl.tests.configs.workqueue_monitoring import fresh_config + + if os.path.exists("runinfo/monitoring.db"): + logger.info("Monitoring database already exists - deleting") + os.remove("runinfo/monitoring.db") + + logger.info("Generating fresh config") + c = fresh_config() + logger.info("Loading parsl") + parsl.load(c) + + logger.info("invoking and waiting for result") + assert this_app().result() == 5 + + logger.info("cleaning up parsl") + parsl.dfk().cleanup() + parsl.clear() + + # at this point, we should find one row in the monitoring database. + + logger.info("checking database content") + engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") + with engine.begin() as connection: + + result = connection.execute(text("SELECT COUNT(*) FROM workflow")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM task")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM try")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM status, try " + "WHERE status.task_id = try.task_id " + "AND status.task_status_name='exec_done' " + "AND task_try_time_running is NULL")) + (c, ) = result.first() + assert c == 0 + + # workqueue doesn't populate the node table. + # because parsl level code isn't running on a node persistently + # instead, it is the workqueue worker doing that, which doesn't + # report into parsl monitoring. + # this is a feature downgrade from using htex that needs some + # consideration + + # Two entries: one showing manager active, one inactive + # result = connection.execute("SELECT COUNT(*) FROM node") + # (c, ) = result.first() + # assert c == 2 + + # workqueue, at least when using providers, does have a loose + # block concept: but it doesn't report anything into the block + # table here, and if using wq external scaling thing, then there + # wouldn't be parsl level blocks at all. + # This needs some consideration. + + # There should be one block polling status + # local provider has a status_polling_interval of 5s + # result = connection.execute("SELECT COUNT(*) FROM block") + # (c, ) = result.first() + # assert c >= 2 + + result = connection.execute(text("SELECT COUNT(*) FROM resource")) + (c, ) = result.first() + assert c >= 1 + + logger.info("all done") + + +if __name__ == "__main__": + test_row_counts() diff --git a/parsl/tests/test_monitoring/test_mon_wq/test_db_locks.py b/parsl/tests/test_monitoring/test_mon_wq/test_db_locks.py new file mode 100644 index 0000000000..e82dc86eaf --- /dev/null +++ b/parsl/tests/test_monitoring/test_mon_wq/test_db_locks.py @@ -0,0 +1,93 @@ + +import logging +import os +import time + +import pytest +from sqlalchemy import text + +import parsl + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def this_app(): + return 5 + + +@pytest.mark.local +def test_row_counts(): + import sqlalchemy + + from parsl.tests.configs.workqueue_monitoring import fresh_config + + if os.path.exists("runinfo/monitoring.db"): + logger.info("Monitoring database already exists - deleting") + os.remove("runinfo/monitoring.db") + + engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") + + logger.info("loading parsl") + parsl.load(fresh_config()) + + # parsl.load() returns before all initialisation of monitoring + # is complete, which means it isn't safe to take a read lock on + # the database yet. This delay tries to work around that - some + # better async behaviour might be nice, but I'm not sure what. + time.sleep(10) + + # to get an sqlite3 read lock that is held over a controllable + # long time, create a transaction and perform a SELECT in it. + # (see bottom of https://sqlite.org/lockingv3.html) + + # there's an awkward race here: parsl.load() returns before the + # database might have been created, and so then the db manager will + # crash (and if there is a retry loop there instead, I think it will + # hang until after the read lock stuff below is finished? which might + # be acceptable? if it's meant to be properly async and not blocking?) + # ... in which case, initialise parsl *after taking the lock* would also + # work (although the select statement to get that lock wouldn't be the same + # because it wouldn't be able to select from the right table) + + logger.info("Getting a read lock on the monitoring database") + with engine.begin() as readlock_connection: + readlock_connection.execute(text("BEGIN TRANSACTION")) + result = readlock_connection.execute(text("SELECT COUNT(*) FROM workflow")) + (c, ) = result.first() + assert c == 1 + # now readlock_connection should have a read lock that will + # stay locked until the transaction is ended, or the with + # block ends. + + logger.info("invoking and waiting for result") + assert this_app().result() == 5 + + # there is going to be some raciness here making sure that + # the database manager actually tries to write while the + # read lock is held. I'm not sure if there is a better way + # to detect this other than a hopefully long-enough sleep. + time.sleep(10) + + logger.info("cleaning up parsl") + parsl.dfk().cleanup() + parsl.clear() + + # at this point, we should find one row in the monitoring database. + + logger.info("checking database content") + with engine.begin() as connection: + + result = connection.execute(text("SELECT COUNT(*) FROM workflow")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM task")) + (c, ) = result.first() + assert c == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM try")) + (c, ) = result.first() + assert c == 1 + + logger.info("all done") diff --git a/parsl/tests/test_monitoring/test_mon_wq/test_memoization_representation.py b/parsl/tests/test_monitoring/test_mon_wq/test_memoization_representation.py new file mode 100644 index 0000000000..ee8905e4be --- /dev/null +++ b/parsl/tests/test_monitoring/test_mon_wq/test_memoization_representation.py @@ -0,0 +1,84 @@ + +import logging +import os + +import pytest +from sqlalchemy import text + +import parsl + +logger = logging.getLogger(__name__) + + +@parsl.python_app(cache=True) +def this_app(x): + return x + 1 + + +@pytest.mark.local +def test_hashsum(): + import sqlalchemy + + from parsl.tests.configs.workqueue_monitoring import fresh_config + + if os.path.exists("runinfo/monitoring.db"): + logger.info("Monitoring database already exists - deleting") + os.remove("runinfo/monitoring.db") + + logger.info("loading parsl") + parsl.load(fresh_config()) + + logger.info("invoking and waiting for result (1/4)") + f1 = this_app(4) + assert f1.result() == 5 + + logger.info("invoking and waiting for result (2/4)") + f2 = this_app(17) + assert f2.result() == 18 + + logger.info("invoking and waiting for result (3/4)") + f3 = this_app(4) + assert f3.result() == 5 + + logger.info("invoking and waiting for result (4/4)") + f4 = this_app(4) + assert f4.result() == 5 + + assert f1.task_record['hashsum'] == f3.task_record['hashsum'] + assert f1.task_record['hashsum'] == f4.task_record['hashsum'] + assert f1.task_record['hashsum'] != f2.task_record['hashsum'] + + logger.info("cleaning up parsl") + parsl.dfk().cleanup() + parsl.clear() + + # at this point, we should find one row in the monitoring database. + + logger.info("checking database content") + engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") + with engine.begin() as connection: + + # we should have three tasks, but with only two tries, because the + # memo try should be missing + result = connection.execute(text("SELECT COUNT(*) FROM task")) + (task_count, ) = result.first() + assert task_count == 4 + + # this will check that the number of task rows for each hashsum matches the above app invocations + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_record['hashsum']}'")) + (hashsum_count, ) = result.first() + assert hashsum_count == 3 + + result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_record['hashsum']}'")) + (hashsum_count, ) = result.first() + assert hashsum_count == 1 + + result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='exec_done'")) + (memo_count, ) = result.first() + assert memo_count == 2 + + result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='memo_done'")) + (memo_count, ) = result.first() + assert memo_count == 2 + + logger.info("all done") diff --git a/parsl/tests/test_python_apps/test_executor_selector.py b/parsl/tests/test_python_apps/test_executor_selector.py new file mode 100644 index 0000000000..50974d7b50 --- /dev/null +++ b/parsl/tests/test_python_apps/test_executor_selector.py @@ -0,0 +1,45 @@ +import pytest + +import parsl +from parsl.tests.configs.htex_local import fresh_config as local_config + + +@parsl.python_app(executors=['htex_local']) +def app_executor_list(): + return 7 + + +@pytest.mark.local +def test_executor_list() -> None: + assert app_executor_list().result() == 7 + + +@parsl.python_app(executors='htex_local') +def app_executor_str(): + return 8 + + +@pytest.mark.local +def test_executor_str() -> None: + assert app_executor_str().result() == 8 + + +@parsl.python_app(executors='XXXX_BAD_EXECUTOR') +def app_executor_invalid(): + return 9 + + +@pytest.mark.local +def test_executor_invalid() -> None: + with pytest.raises(ValueError): + app_executor_invalid().result() + + +@parsl.python_app(executors='all') +def app_executor_all(): + return 10 + + +@pytest.mark.local +def test_executor_all() -> None: + assert app_executor_all().result() == 10 diff --git a/parsl/tests/test_scaling/test_scale_down.py b/parsl/tests/test_scaling/test_scale_down.py index a53630374f..e50667dd78 100644 --- a/parsl/tests/test_scaling/test_scale_down.py +++ b/parsl/tests/test_scaling/test_scale_down.py @@ -37,6 +37,7 @@ def local_config(): ], max_idletime=0.5, strategy='simple', + strategy_period=0.5, ) diff --git a/parsl/tests/test_shutdown/test_kill_htex.py b/parsl/tests/test_shutdown/test_kill_htex.py new file mode 100644 index 0000000000..d99e4c12c9 --- /dev/null +++ b/parsl/tests/test_shutdown/test_kill_htex.py @@ -0,0 +1,89 @@ +import logging +import os +import signal +import time + +import pytest + +import parsl + +logger = logging.getLogger(__name__) + +# TODO: +# should parametrically test both htex_local and htex_local_alternate +from parsl.tests.configs.htex_local import fresh_config + + +@parsl.python_app +def simple_app(): + return True + + +# @pytest.mark.local +@pytest.mark.skip("not expected to pass - demonstrates hanging htex with missing interchange - issue 2755") +@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]) # are we expecting SIGKILL resilience here? Ideally yes +def test_kill_interchange(sig): + """This tests that we can kill the interchange process (in different ways) and still have successful shutdown. + One of these signals emulates behaviour when ctrl-C is pressed: that all of the processes receive a + termination signal - SIGINT for ctrl-C - at once, and so specifically we should be + tolerant to interchange processes going away. + + see issue https://github.com/Parsl/parsl/issues/2755 + """ + + # what is the time limit for the interchange shutting down? + expected_shutdown_time = 60 + + logger.info("Initialising parsl") + parsl.load(fresh_config()) + logger.info("Initialised parsl") + + dfk = parsl.dfk() + + assert "htex_local" in dfk.executors.keys(), "htex required" + + proc = dfk.executors["htex_local"].interchange_proc + + assert proc is not None, "Interchange process required" + assert proc.is_alive(), "Interchange must be alive" + + pid = proc.pid + assert pid is not None, "Interchange must have a pid" + + time.sleep(5) + logger.info(f"Sending {sig} to interchange pid {pid} - 1") + os.kill(pid, sig) + time.sleep(5) + logger.info(f"Sending {sig} to interchange pid {pid} - 2") + os.kill(pid, sig) + time.sleep(5) + logger.info(f"Sending {sig} to interchange pid {pid} - 3") + os.kill(pid, sig) + + logger.info("Waiting for interchange process to die, or timeout") + start_time = time.time() + while proc.is_alive() and start_time + expected_shutdown_time > time.time(): + logger.info("Wait loop") + time.sleep(1) + + assert not proc.is_alive(), "Interchange process must have died for test to continue" + + # now we have broken one piece of the monitoring system + # let's run some apps that should generate some monitoring traffic + + logger.info("Invoking simple app") + f = simple_app() + + logger.info("Invoked simple app, waiting for result") + + r = f.exception() + + assert isinstance(r, Exception), "simple app should have raised an exception" + + logger.info("Got simple app result") + + logger.info("Calling cleanup") + parsl.dfk().cleanup() + logger.info("Finished cleanup") + + parsl.clear() diff --git a/parsl/trace.py b/parsl/trace.py new file mode 100644 index 0000000000..5410971d53 --- /dev/null +++ b/parsl/trace.py @@ -0,0 +1,90 @@ +import logging +import pickle +import time +from typing import Any, List, Tuple + +logger = logging.getLogger(__name__) + +trace_by_logger = False +trace_by_dict = True + +events: List[Tuple[float, str, str, Any]] = [] +binds: List[Tuple[str, Any, str, Any]] = [] + + +# the spantype/id will only have uniqueness in the context of an +# enclosing span - but processors of events won't necessarily be +# representing all that uniqueness: for example a log line might +# only talk about TASK 3 even though there can be many task 3s, +# one for each DFK in this process, or many BLOCK 0s, one for each +# scalable executor in each DFK in this process. + +class Span: + def __init__(self, spantype: str, spanid: Any): + self.spantype = spantype + self.spanid = spanid + + +def event(name: str, span: Span): + """Record an event. + Using Any for spanid means anything that we can write out in format string + most concretely a string or an int, but I'm not sure it should be + concretely tied to only those two types. + """ + t = time.time() + + if trace_by_logger: + # human readable + logger.info(f"Event {name} on {span.spantype} {span.spanid}") + + # machine readable (ideally this format would be very unambiguous about span identities) + logger.info(f"EVENT {name} {span.spantype} {span.spanid} {span}") + + if trace_by_dict: + e = (t, name, span.spantype, span.spanid) + events.append(e) + + +def span_bind_sub(super: Span, sub: Span): + if trace_by_logger: + logger.info(f"BIND {super.spantype} {super.spanid} {sub.spantype} {sub.spanid}") + if trace_by_dict: + b = (super.spantype, super.spanid, sub.spantype, sub.spanid) + binds.append(b) + + +def output_event_stats(directory="."): + # TODO: print PID here to help untangle what's happening across + # forks: I can imagine that being complicated as a partially + # completed trace buffer is inherited from a parent. + print("Event stats") + print("===========") + print(f"Count of events: {len(events)}") + print(f"Count of binds: {len(binds)}") + + """ + flats = [] + all_tasks_t = 0 + for ((from_k, to_k), (total, count, raw)) in event_stats.items(): + mean = total / count + dts = [t - last_t for (last_t, t) in raw] + t_median = statistics.median(dts) + t_max = max(dts) + t_min = min(dts) + + flat = (mean, t_median, t_max, t_min, from_k, to_k, total, count) + flats.append(flat) + + all_tasks_t += total + + flats.sort() + + for (t_mean, t_median, t_max, t_min, from_k, to_k, total, count) in flats: + print(f"{from_k} -> {to_k} ({count} iters): min {t_min} / median {t_median} / mean {t_mean} / max {t_max}") + + print("===========") + print(f"Total real time accounted for here: {all_tasks_t} sec") + """ + summary = {"events": events, "binds": binds} + with open(f"{directory}/parsl_tracing.pickle", "wb") as f: + pickle.dump(summary, f) diff --git a/parsl/version.py b/parsl/version.py index 0ba4a3c991..91f97c45d3 100644 --- a/parsl/version.py +++ b/parsl/version.py @@ -1,3 +1,3 @@ """Set module version. """ -VERSION = '1.3.0-dev' +VERSION = '2025.02.24+desc-2025.03.03a' diff --git a/setup.py b/setup.py index 3e9a7f73fb..7e73b937ea 100755 --- a/setup.py +++ b/setup.py @@ -70,6 +70,7 @@ 'parsl/executors/high_throughput/interchange.py', 'parsl/executors/workqueue/exec_parsl_function.py', 'parsl/executors/workqueue/parsl_coprocess.py', + 'parsl/monitoring/allprocs.py', ], extras_require=extras_require,