diff --git a/python/mtap/_event.py b/python/mtap/_event.py index 4ce700a0..85cb8a68 100644 --- a/python/mtap/_event.py +++ b/python/mtap/_event.py @@ -1,4 +1,4 @@ -# Copyright 2019 Regents of the University of Minnesota. +# Copyright (c) Regents of the University of Minnesota. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ class Event(ContextManager['Event']): """ __slots__ = ('_event_id', '_client', '_lock', 'label_adapters', '_documents_cache', '_metadata_cache', '_binaries_cache', - '_event_service_instance_id') + '_event_service_instance_id', '_leased') label_adapters: Mapping[str, 'ProtoLabelAdapter'] @@ -81,6 +81,7 @@ def __init__( only_create_new: bool = False, label_adapters: Optional[Mapping[str, 'ProtoLabelAdapter']] = None, event_service_instance_id: Optional[str] = None, + lease: bool = True ): self._event_id = event_id or str(uuid.uuid4()) self._event_service_instance_id = event_service_instance_id @@ -89,7 +90,8 @@ def __init__( else: self.label_adapters = {} self._client = client - if self._client is not None: + self._leased = lease + if self._client is not None and self._leased: self._event_service_instance_id = self._client.open_event( event_service_instance_id, self._event_id, @@ -142,14 +144,17 @@ def binaries(self) -> MutableMapping[str, bytes]: def created_indices(self) -> Dict[str, List[str]]: """A mapping of document names to a list of the names of all the label indices that have been added to that document""" - return {document_name: document.created_indices - for document_name, document in self.documents.items()} + try: + return {document_name: document.created_indices + for document_name, document in self._documents_cache.data.items()} + except AttributeError: + return {} def close(self): """Closes this event. Lets the event service know that we are done with the event, allowing to clean up the event if no other clients have open leases to it.""" - if self.client is not None: + if self.client is not None and self._leased: self.release_lease() def create_document(self, @@ -205,7 +210,8 @@ def add_document(self, document: Document): cast(_Documents, self.documents).data[name] = document def __exit__(self, exc_type, exc_val, exc_tb): - self.close() + if exc_type is not KeyboardInterrupt: + self.close() def add_created_indices(self, created_indices): for k, v in created_indices.items(): diff --git a/python/mtap/_events_client_grpc.py b/python/mtap/_events_client_grpc.py index a113b8a1..0efda2a1 100644 --- a/python/mtap/_events_client_grpc.py +++ b/python/mtap/_events_client_grpc.py @@ -1,4 +1,4 @@ -# Copyright 2019 Regents of the University of Minnesota. +# Copyright (c) Regents of the University of Minnesota. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -96,11 +96,6 @@ def open_event(self, instance_id, event_id, only_create_new): self.stub.OpenEvent(request) return self.instance_id - def _conn_error(self): - return ConnectionError( - f"Failed to connect to events service on address: " - f"{self._address}") - def close_event(self, instance_id, event_id): self._check_instance_id(instance_id) diff --git a/python/mtap/deployment.py b/python/mtap/deployment.py index 25c6d053..1ace18b3 100644 --- a/python/mtap/deployment.py +++ b/python/mtap/deployment.py @@ -362,6 +362,15 @@ class ProcessorDeployment: startup_timeout: Optional[float] = None """Optional override startup timeout.""" + mp: Optional[bool] = None + """Whether to use the multiprocessing pool based processor server.""" + + mp_processes: Optional[int] = None + """If using multiprocessing host, the number of worker processes.""" + + mp_start_method: Optional[str] = None + """A multiprocessing.get_context start method to use.""" + @staticmethod def from_dict(conf: Dict) -> 'ProcessorDeployment': """Creates an MTAP processor deployment configuration from a @@ -423,6 +432,13 @@ def _create_processor_call(depl, global_settings, port, service_deployment, call.extend(['--events', ','.join(events_addresses)]) if depl.additional_args is not None: call.extend(depl.additional_args) + + if depl.mp: + call.extend(['--mp']) + if depl.mp_processes is not None: + call.extend(['']) + + if shared_config.additional_args is not None: call.extend(shared_config.additional_args) if (depl.implementation == 'java' diff --git a/python/mtap/events_server.py b/python/mtap/events_server.py index ecb22bd1..7aadeafd 100644 --- a/python/mtap/events_server.py +++ b/python/mtap/events_server.py @@ -63,7 +63,7 @@ def __init__(self, host: str, if config is None: config = _config.Config() options = config.get('grpc.events_options', {}) - logger.info("Events service using options " + str(options)) + logger.debug("Events service using options " + str(options)) server = grpc.server(thread_pool, options=list(options.items())) servicer = EventsServicer(instance_id=self.sid) events_pb2_grpc.add_EventsServicer_to_server(servicer, server) diff --git a/python/mtap/pipeline/_error_handling.py b/python/mtap/pipeline/_error_handling.py index dab60ee9..c69166c1 100644 --- a/python/mtap/pipeline/_error_handling.py +++ b/python/mtap/pipeline/_error_handling.py @@ -1,4 +1,4 @@ -# Copyright 2023 Regents of the University of Minnesota. +# Copyright (c) Regents of the University of Minnesota. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ :class:`logging.Logger` object (name ``logging``). * :class:`ErrorsDirectoryErrorHandler` which for all errors that occur writes a set of files to disk containing the serialized event, the stack - trace, and the error information (name ``directory``). + trace, and the error information (name ``to_directory``). * :class:`SuppressAllErrorsHandler` which suppresses all errors (name ``suppress``). @@ -43,11 +43,11 @@ An example of error_handlers configuration in the pipeline file is shown below. -.. code-block:: text +. code-block:: text error_handlers: - name: logging - - name: directory + - name: to_directory params: output_directory: '/path/to/' - name: termination @@ -70,9 +70,10 @@ Dict, Optional, Union, - ClassVar, Type + ClassVar, Type, Tuple, Iterable ) +import mtap from mtap._event import Event from mtap.processing import ErrorInfo from mtap.serialization import Serializer @@ -82,6 +83,16 @@ ErrorHandlerFactory = Callable[..., 'ProcessingErrorHandler'] +def handle_error(error_handlers: Iterable[Tuple['ProcessingErrorHandler', Dict]], + event: mtap.Event, + error: ErrorInfo): + for handler, state in error_handlers: + try: + handler.handle_error(event, error, state) + except SuppressError: + break + + class StopProcessing(Exception): """Thrown by error handlers when the pipeline should immediately terminate.""" @@ -179,14 +190,19 @@ def name(cls): def handle_error(self, event, error_info, state): from mtap.processing import ErrorOrigin + if error_info.error_repr == "ValueError('Cannot invoke RPC on closed channel!')": + return if error_info.origin == ErrorOrigin.REMOTE: print( f"An error occurred while processing an event with id " f"'{event.event_id}' through the remote component " f"'{error_info.component_id}' at address " - f"'{error_info.address}': {error_info.error_repr}\n" - f"It had the following message: {error_info.localized_msg}\n" - f"{self.more_help}") + f"'{error_info.address}': {error_info.grpc_code}") + if error_info.localized_msg: + print( + f"It had the following message: {error_info.localized_msg}" + ) + print(f"{self.more_help}", flush=True) else: print( f"An error occurred while processing an event with id " @@ -219,7 +235,7 @@ def handle_error(self, _1, _2, state): state['failures'] += 1 except KeyError: state['failures'] = 1 - if state['failures'] >= self.max_failures: + if state['failures'] > self.max_failures: print(f"Pipeline exceeded the maximum number " f"of allowed failures ({self.max_failures}) " f"and is terminating.") @@ -237,26 +253,34 @@ class LoggingErrorHandler(ProcessingErrorHandler): logger: The logger to use. """ - logger: logging.Logger - - def __init__(self, logger: Union[str, logging.Logger, None] = None): + def __init__(self, logger: Union[str, logging.Logger, None] = None, level='error'): if isinstance(logger, str): logger = logging.getLogger(logger) - self.logger = (logger or logging.getLogger('mtap.processing')) + self.logger = getattr(logger or logging.getLogger('mtap.processing'), level) @classmethod def name(cls): return 'logging' def handle_error(self, event, error_info, state): - self.logger.error( - f"An error occurred while processing an event with id " - f"'{event.event_id}' through the remote component " - f"'{error_info.component_id}' at address " - f"'{error_info.address}': {error_info.error_repr}\n" - f"It had the following message: {error_info.localized_msg}\n" - + ''.join(error_info.stack_trace) - ) + from mtap.processing import ErrorOrigin + if error_info.origin == ErrorOrigin.REMOTE: + msg = (f"An error occurred while processing an event with id " + f"'{event.event_id}' through the remote component " + f"'{error_info.component_id}' at address " + f"'{error_info.address}': {error_info.error_repr}") + if error_info.localized_msg: + msg += f"It had the following message: {error_info.localized_msg}" + msg += ''.join(error_info.stack_trace) + self.logger(msg) + else: + self.logger( + f"An error occurred while processing an event with id " + f"'{event.event_id}' through the component " + f"'{error_info.component_id}': '{error_info.error_repr}'\n" + f"It had the following message: {error_info.localized_msg}\n" + + ''.join(error_info.stack_trace) + ) class ErrorsDirectoryErrorHandler(ProcessingErrorHandler): diff --git a/python/mtap/pipeline/_hosting.py b/python/mtap/pipeline/_hosting.py index 91bf54a5..0920b5c3 100644 --- a/python/mtap/pipeline/_hosting.py +++ b/python/mtap/pipeline/_hosting.py @@ -16,7 +16,6 @@ import argparse import logging import threading -import traceback import uuid from argparse import ArgumentParser from concurrent.futures import thread @@ -172,13 +171,11 @@ def Process(self, request, context): with dict_to_event(event_dict, client=self.events) as event: params = {} copy_struct_to_dict(request.params, params) - self.pool.wait_for_capacity() res = self.pool.start_task(event, params) _, result, error = res.get() if error is not None: exc = ProcessingException(error) - logger.error(str(exc)) - logger.error(traceback.print_exception(exc)) + logger.error(error) context.abort_with_status(rpc_status.to_status(exc.to_rpc_status())) return response = pipeline_pb2.ProcessEventInPipelineResponse() diff --git a/python/mtap/pipeline/_mp_pipeline.py b/python/mtap/pipeline/_mp_pipeline.py index 73552f6e..f7f77912 100644 --- a/python/mtap/pipeline/_mp_pipeline.py +++ b/python/mtap/pipeline/_mp_pipeline.py @@ -15,11 +15,11 @@ import logging import multiprocessing import signal -import traceback +import sys from contextlib import ExitStack from logging.handlers import QueueListener from queue import Queue, Empty -from threading import Condition, Lock +from threading import BoundedSemaphore from typing import Optional, TYPE_CHECKING from grpc import RpcError @@ -27,11 +27,9 @@ from mtap._config import Config from mtap.pipeline._common import event_and_params -from mtap.pipeline._error_handling import StopProcessing, SuppressError -from mtap.pipeline._exc import PipelineTerminated +from mtap.pipeline._error_handling import handle_error, StopProcessing from mtap.pipeline._sources import ProcessingSource, IterableProcessingSource -from mtap.processing import ( - ProcessingException, ) +from mtap.processing import ProcessingException if TYPE_CHECKING: from mtap.pipeline._pipeline import ActivePipeline @@ -70,9 +68,11 @@ def _mp_process_event(event_id, event_service_instance_id, params): params ) except ProcessingException as e: + logging.debug("Exception during processing: ", e) return event_id, None, e.error_info except Exception as e: - ei = ProcessingException.from_local_exception(e, 'pipeline').error_info + logging.debug("MTAP exception during processing: ", e) + ei = ProcessingException.from_local_exception(*sys.exc_info(), 'pipeline').error_info return event_id, None, ei return event_id, result, None @@ -82,7 +82,7 @@ def __init__(self, pipeline): self.pipeline = pipeline self.config = pipeline.mp_config self.active_events = {} - self.targets_cond = Condition(Lock()) + self.sem = BoundedSemaphore(self.config.workers + self.config.read_ahead) mp_context = self.config.mp_context if mp_context is None: @@ -110,11 +110,8 @@ def __init__(self, pipeline): ) ) - @property - def max_targets(self): - return self.config.workers + self.config.read_ahead - def start_task(self, event, params, callback=None): + self.sem.acquire() event_id = event.event_id res = self.pool.apply_async(_mp_process_event, args=(event_id, @@ -126,11 +123,10 @@ def start_task(self, event, params, callback=None): def task_complete(self, result): event_id, result, error = result - with self.targets_cond: - event, callback = self.active_events.pop(event_id) - self.targets_cond.notify() - if callback: - callback(event, result, error) + event, callback = self.active_events.pop(event_id) + if callback: + callback(event, result, error) + self.sem.release() def __enter__(self): return self @@ -138,13 +134,6 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def has_capacity(self): - return len(self.active_events) < self.max_targets - - def wait_for_capacity(self): - with self.targets_cond: - self.targets_cond.wait_for(self.has_capacity) - def close(self): self.pool.terminate() self.pool.join() @@ -152,11 +141,13 @@ def close(self): class MpPipelineRunner: - def __init__(self, pool: MpPipelinePool, source, total=None, callback=None, show_progress=False): + def __init__(self, pool: MpPipelinePool, source, total=None, callback=None, show_progress=True): self.pool = pool self.config = pool.config self.pipeline = pool.pipeline - total = (source.total if hasattr(source, 'total') else None) or total + total = (source.total if hasattr(source, 'total') else (len(source) if hasattr(source, '__len__') else None)) + if pool.config.show_progress is not None: + show_progress = pool.config.show_progress self.progress_bar = tqdm( total=total, unit='events', @@ -171,7 +162,7 @@ def __init__(self, pool: MpPipelinePool, source, total=None, callback=None, show self.source = source self.stop = False self.times = self.pipeline.create_times() - self.handler_states = [{} for _ in self.pipeline.error_handlers] + self.error_handlers = [(handler, {}) for handler in self.pipeline.error_handlers] self.callback = callback self.results = Queue() self.active_targets = 0 @@ -184,10 +175,13 @@ def stop_processing(self): def run(self): try: - with self.source: + with ExitStack() as es: + es.callback(self.finish_results) it = iter(self.source.produce()) + es.callback(it.close) self.run_loop(it) except KeyboardInterrupt as e: + self.stop = True print('Pipeline terminated by user (KeyboardInterrupt).') raise e return self.times @@ -196,7 +190,6 @@ def run_loop(self, it): while True: if self.stop: break - self.pool.wait_for_capacity() try: target = next(it) except StopIteration: @@ -212,20 +205,19 @@ def run_loop(self, it): # meaning the lease will never get freed. try: event.release_lease() - except RpcError: - # Client might already be closed, we tried our best - pass + except RpcError as e2: + print("Failed to clean up event on termination: ", e2) raise e self.drain_results() - while self.active_targets > 0: - res = self.results.get() - self.handle_result(*res) - if self.stop: - raise PipelineTerminated("Pipeline terminated by an error handler.") def result_callback(self, event, result, error): self.results.put_nowait((event, result, error)) + def finish_results(self): + while self.active_targets > 0: + res = self.results.get() + self.handle_result(*res) + def drain_results(self): while True: try: @@ -236,21 +228,20 @@ def drain_results(self): def handle_result(self, event, result, error): self.active_targets -= 1 - if result is not None: - self.times.add_result_times(result) - if self.callback: - self.callback(result, event) - event.release_lease() - return + if self.progress_bar is not None: + self.progress_bar.update(1) - for handler, state in zip(self.pipeline.error_handlers, - self.handler_states): - try: - handler.handle_error(event, error, state) - except StopProcessing: - self.stop_processing() - except SuppressError: - break - except Exception as e: - print("An error handler raised an exception: ", e) - traceback.print_exc() + try: + if result is not None: + self.times.add_result_times(result) + if self.callback: + self.callback(result, event) + return + if not self.stop: + try: + handle_error(self.error_handlers, event, error) + except StopProcessing: + self.stop = True + raise + finally: + event.release_lease() diff --git a/python/mtap/processing/_exc.py b/python/mtap/processing/_exc.py index 390b8c44..43040b4e 100644 --- a/python/mtap/processing/_exc.py +++ b/python/mtap/processing/_exc.py @@ -1,4 +1,4 @@ -# Copyright 2023 Regents of the University of Minnesota. +# Copyright (c) Regents of the University of Minnesota. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +17,10 @@ from typing import List, Optional +class NotStatusException(Exception): + pass + + class ProcessingException(Exception): """An exception that occurred in a processing component. @@ -58,19 +62,20 @@ def to_rpc_status(self): return status @staticmethod - def from_local_exception(exc, component_id, message=None): + def from_local_exception(etype, value, tb, component_id, address=None, message=None): error_info = ErrorInfo( origin=ErrorOrigin.LOCAL, component_id=component_id, lang='python', - error_type=str(type(exc)), - error_repr=repr(exc), + error_type=type(value).__name__, + error_repr=repr(value), localized_msg=message or "An internal error occurred while " "attempting to process an Event. " "This is potentially a bug, contact the " "developer of the component.", locale="en-US", - stack_trace=list(traceback.format_exception(exc)) + stack_trace=list(traceback.format_exception(etype, value, tb)), + address=address ) return ProcessingException(error_info) @@ -81,8 +86,7 @@ def from_rpc_error(rpc_error, component_id, address): status = rpc_status.from_call(rpc_error) if status is None: - return ProcessingException.from_local_exception(rpc_error, - component_id) + raise NotStatusException() info = error_details_pb2.ErrorInfo() debug_info = error_details_pb2.DebugInfo() localized_message = error_details_pb2.LocalizedMessage() @@ -99,7 +103,7 @@ def from_rpc_error(rpc_error, component_id, address): localized_msg=localized_message.message, locale=localized_message.locale, stack_trace=list(debug_info.stack_entries), - address=address, + address=address ) return ProcessingException(error_info) diff --git a/python/mtap/processing/_runners.py b/python/mtap/processing/_runners.py index 0ad0509b..cd1efe3c 100644 --- a/python/mtap/processing/_runners.py +++ b/python/mtap/processing/_runners.py @@ -1,4 +1,4 @@ -# Copyright 2019 Regents of the University of Minnesota. +# Copyright (c) Regents of the University of Minnesota. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,8 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import copy import logging +import sys from typing import Optional, Dict, Any import grpc @@ -23,7 +25,7 @@ from mtap._events_client import events_client, EventsAddressLike, EventsClient from mtap._structs import copy_dict_to_struct, copy_struct_to_dict from mtap.api.v1 import processing_pb2_grpc, processing_pb2 -from mtap.processing._exc import ProcessingException +from mtap.processing._exc import ProcessingException, NotStatusException from mtap.processing._processing_component import ProcessingComponent from mtap.processing._processor import Processor, EventProcessor @@ -35,14 +37,16 @@ def __init__(self, processor: EventProcessor, events_address: EventsAddressLike, component_id: Optional[str] = None, - params: Optional[Dict[str, Any]] = None): + params: Optional[Dict[str, Any]] = None, + client: Optional[EventsClient] = None): self._processor = processor self._events_address = events_address self._processor_name = processor.metadata['name'] self._component_id = component_id or self.processor_name self._params = params or {} self.metadata = processor.metadata - self._client = None + self._client = client + self._client_created = False @property def processor_name(self) -> str: @@ -56,6 +60,7 @@ def component_id(self) -> str: def _events_client(self) -> EventsClient: if self._client is None: self._client = events_client(self._events_address) + self._client_created = True return self._client def call_process(self, event_id, event_instance_id, params): @@ -63,34 +68,37 @@ def call_process(self, event_id, event_instance_id, params): if params is not None: p.update(params) - with Processor.enter_context() as c, \ - Event( - event_id=event_id, - event_service_instance_id=event_instance_id, - client=self._events_client, - label_adapters=self._processor.custom_label_adapters - ) as event: - with Processor.started_stopwatch('process_method'): - try: + with Processor.enter_context() as c: + event = Event( + event_id=event_id, + event_service_instance_id=event_instance_id, + client=self._events_client, + label_adapters=self._processor.custom_label_adapters, + lease=False # The client / pipeline should hold the lease. + ) + try: + with Processor.started_stopwatch('process_method'): result = self._processor.process(event, p) - except KeyError as e: - if e.args[0] == 'document_name': - raise ProcessingException.from_local_exception( - e, self.component_id, - "This error is likely caused by attempting " - "to run an event through a document processor. " - "Either call the pipeline with a document or " - "set the 'document_name' processor parameter." - ) from e - raise e - except Exception as e: + except KeyError as e: + if e == KeyError('document_name'): raise ProcessingException.from_local_exception( - e, self.component_id - ) from e + *sys.exc_info(), self.component_id, + message="This error is likely caused by attempting " + "to run an event through a document processor. " + "Either call the pipeline with a document or " + "set the 'document_name' processor parameter." + ) + raise e + except Exception as e: + if e == ValueError('Cannot invoke RPC on closed channel!'): + msg = "Channel was closed when trying to process." + raise ProcessingException.from_local_exception( + *sys.exc_info(), self.component_id, msg + ) return result, c.times, event.created_indices def close(self): - if self._client is not None: + if self._client_created and self._client is not None: self._client.close() self._client = None @@ -103,7 +111,8 @@ def __init__( address=None, params=None, enable_proxy=None, - call_timeout=None + call_timeout=None, + channel=None ): self._processor_name = processor_name self._component_id = component_id or processor_name @@ -112,16 +121,16 @@ def __init__( address = self._address config = Config() if enable_proxy is not None: - config['grpc.processor_options.gprc.enable_http_proxy'] \ - = enable_proxy + config['grpc.processor_options.gprc.enable_http_proxy'] = enable_proxy if address is None: from mtap.discovery import DiscoveryMechanism discovery = DiscoveryMechanism() address = discovery.discover_processor_service(processor_name, 'v1') channel_options = config.get('grpc.processor_options', {}) - self._channel = insecure_channel(address, options=list( - channel_options.items())) + self._channel = channel + if channel is None: + self._channel = insecure_channel(address, options=list(channel_options.items())) self._stub = processing_pb2_grpc.ProcessorStub(self._channel) self._call_timeout = call_timeout or 10.0 @@ -154,29 +163,30 @@ def call_process(self, event_id, event_instance_id, params): request, timeout=self._call_timeout, metadata=[('service-name', self.processor_name)]) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: - raise ProcessingException.from_local_exception( - e, - self.component_id, - "Failed to connect to processor service, check " + except grpc.RpcError as rpc_error: + exc_info = sys.exc_info() + try: + raise ProcessingException.from_rpc_error( + rpc_error, self.component_id, self._address + ) + except NotStatusException: + msg = None + if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + msg = f'Deadline exceeded waiting for "{self.processor_name}" to process an event.' + if rpc_error.code() == grpc.StatusCode.UNAVAILABLE: + msg = f'Failed to connect to "{self.processor_name}", check ' "that the service is running and the address is " "correctly configured." - ) - if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: raise ProcessingException.from_local_exception( - e, - self.component_id, - "The configured timeout for completing the " - "remote processing request was exceeded." + *exc_info, self.component_id, self._address, msg ) - raise ProcessingException.from_rpc_error( - e, self.component_id, self._address - ) from e except Exception as e: + msg = None + if e == ValueError('Cannot invoke RPC on closed channel!'): + msg = "Channel was closed when trying to process." raise ProcessingException.from_local_exception( - e, self.component_id - ) from e + *sys.exc_info(), self.component_id, self._address, msg + ) r = {} copy_struct_to_dict(response.result, r) diff --git a/python/mtap/processing/_service.py b/python/mtap/processing/_service.py index 088fb95d..a054a1e7 100644 --- a/python/mtap/processing/_service.py +++ b/python/mtap/processing/_service.py @@ -15,7 +15,6 @@ import argparse import logging import threading -import traceback import uuid from concurrent.futures import thread from logging.handlers import QueueListener @@ -25,7 +24,7 @@ from grpc_health.v1 import health, health_pb2_grpc from grpc_status import rpc_status -from mtap import _config, _structs, utilities +from mtap import _config, _structs from mtap._common import run_server_forever from mtap._event import Event from mtap.api.v1 import processing_pb2, processing_pb2_grpc @@ -39,7 +38,6 @@ def run_processor(proc: EventProcessor, *, - mp: bool = False, options: Optional[argparse.Namespace] = None, args: Optional[Sequence[str]] = None, mp_context=None): @@ -100,9 +98,9 @@ def run_processor(proc: EventProcessor, if enable_http_proxy is not None: c['grpc.events_channel_options.grpc.enable_http_proxy'] \ = enable_http_proxy - if mp: + if options.mp: runner = MpProcessorRunner(proc=proc, - workers=options.workers, + mp_processes=options.workers, events_address=events_addresses, processor_name=name, mp_context=mp_context, @@ -169,13 +167,14 @@ def __init__(self, proc: EventProcessor, processor_name: str, component_id: Optional[str] = None, - workers: Optional[int] = 8, + mp_processes: Optional[int] = 8, events_address: Optional[Union[str, Sequence[str]]] = None, mp_context=None, + mp_start_method=None, log_level=None): if mp_context is None: import multiprocessing as mp - mp_context = mp.get_context('spawn') + mp_context = mp.get_context(mp_start_method) config = _config.Config() log_queue = mp_context.Queue(-1) handler = logging.StreamHandler() @@ -185,7 +184,7 @@ def __init__(self, self.log_listener.start() self.pool = mp_context.Pool( - workers, + mp_processes, initializer=_mp_initialize, initargs=(proc, events_address, dict(config), log_queue) ) @@ -291,9 +290,20 @@ def processor_parser() -> argparse.ArgumentParser: help="If set, will enable usage of http_proxy by grpc." ) processors_parser.add_argument( - '--mp-spawn-method', + '--mp', action='store_true', + help="Whether to use the multiprocessing pool based processor server." + ) + processors_parser.add_argument( + '--mp-processes', + type=int, + default=2, + help="If using multiprocessing host, the number of worker processes." + ) + processors_parser.add_argument( + '--mp-start-method', + default='spawn', choices=['spawn', 'fork', 'forkserver', 'None'], - help="A multiprocessing spawn method to use." + help="A multiprocessing.get_context start method to use." ) return processors_parser @@ -347,8 +357,7 @@ def Process(self, request, context=None): created_index.index_name = index_name return response except ProcessingException as e: - logger.error(str(e)) - logger.error(traceback.format_exc()) + logger.error(e, exc_info=True) context.abort_with_status(rpc_status.to_status(e.to_rpc_status())) def GetStats(self, request, context): diff --git a/python/tests/pipeline/test_error_handling.py b/python/tests/pipeline/test_error_handling.py index cf3f517a..efed5dce 100644 --- a/python/tests/pipeline/test_error_handling.py +++ b/python/tests/pipeline/test_error_handling.py @@ -1,5 +1,28 @@ +# Copyright (c) Regents of the University of Minnesota. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tempfile +from pathlib import Path +from typing import Dict, Any + +import pytest + +from mtap import Event from mtap.pipeline import ProcessingErrorHandler, SimpleErrorHandler, TerminationErrorHandler, LoggingErrorHandler, \ ErrorsDirectoryErrorHandler, SuppressAllErrorsHandler +from mtap.pipeline._error_handling import handle_error, StopProcessing +from mtap.processing import ErrorInfo, ErrorOrigin def test_registry(): @@ -9,3 +32,84 @@ def test_registry(): directory = ProcessingErrorHandler.from_dict({'name': 'to_directory', 'params': {'output_directory': "."}}) assert isinstance(directory, ErrorsDirectoryErrorHandler) assert isinstance(ProcessingErrorHandler.from_dict({'name': 'suppress'}), SuppressAllErrorsHandler) + + +event = Event(event_id='1') +ei = ErrorInfo( + origin=ErrorOrigin.LOCAL, + component_id='test', + lang='py', + error_type='blaherror', + error_repr='blah', + localized_msg='', + locale='en_US', + stack_trace=["a", "b"] +) + + +def test_error_thrown_by_handler(): + class First(ProcessingErrorHandler): + def handle_error(self, event: Event, error_info: ErrorInfo, state: Dict[Any, Any]): + raise ValueError() + + class Second(ProcessingErrorHandler): + def handle_error(self, event: Event, error_info: ErrorInfo, state: Dict[Any, Any]): + self.seen = True + + handlers = [(First(), {}), (Second, {})] + with pytest.raises(ValueError): + handle_error(handlers, event, ei) + + +def test_multiple_error_handlers(): + class First(ProcessingErrorHandler): + def handle_error(self, e: Event, error_info: ErrorInfo, state: Dict[Any, Any]): + pass + + class Second(ProcessingErrorHandler): + def handle_error(self, e: Event, error_info: ErrorInfo, state: Dict[Any, Any]): + self.seen = True + + second = Second() + handlers = [(First(), {}), (second, {})] + handle_error(handlers, event, ei) + assert second.seen + + +def test_termination_error_handler(): + handlers = [((TerminationErrorHandler(max_failures=1)), {})] + + handle_error(handlers, event, ei) + with pytest.raises(StopProcessing): + handle_error(handlers, event, ei) + + +def test_logging_handler(): + class FakeLogger: + def error(self, *values): + self.msg = ''.join(values) + + logger = FakeLogger() + handler = LoggingErrorHandler(logger) + + handlers = [(handler, {})] + handle_error(handlers, event, ei) + assert logger.msg.startswith("An error occurred while ") + + +def test_errors_directory_handler(): + with tempfile.TemporaryDirectory() as errors_dir: + handler = ErrorsDirectoryErrorHandler(output_directory=errors_dir) + handlers = [(handler, {})] + handle_error(handlers, event, ei) + p = Path(errors_dir) / '1' + assert p.exists() + assert (p / 'info.json').exists() + assert (p / 'event.json').exists() + assert (p / 'trace.txt').exists() + + +def test_suppress_error_handler(): + handlers = [(SuppressAllErrorsHandler(), {}), (TerminationErrorHandler(), {})] + handle_error(handlers, event, ei) + assert True diff --git a/python/tests/processing/test_pipeline.py b/python/tests/processing/test_pipeline.py index 40b03d9f..81f3973f 100644 --- a/python/tests/processing/test_pipeline.py +++ b/python/tests/processing/test_pipeline.py @@ -1,4 +1,4 @@ -# Copyright 2021 Regents of the University of Minnesota. +# Copyright (c) Regents of the University of Minnesota. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License.