diff --git a/README.md b/README.md index 99793963..74d11ba4 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ pip install mtap Gradle: ```groovy -implementation 'edu.umn.nlpie:mtap:1.0.0-rc5' +implementation 'edu.umn.nlpie:mtap:1.0.0' ``` Maven: @@ -52,7 +52,7 @@ Maven: edu.umn.nlpie mtap - 1.0.0-rc5 + 1.0.0 ``` diff --git a/python/docs/conf.py b/python/docs/conf.py index 839fdf6a..b851ce41 100644 --- a/python/docs/conf.py +++ b/python/docs/conf.py @@ -97,7 +97,7 @@ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = 'en' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. diff --git a/python/docs/deployment.rst b/python/docs/deployment.rst index 5b7621dd..a7cf9463 100644 --- a/python/docs/deployment.rst +++ b/python/docs/deployment.rst @@ -1,3 +1,3 @@ mtap.deployment -========================= +=============== .. automodule:: mtap.deployment diff --git a/python/docs/index.rst b/python/docs/index.rst index 7662a900..a864faa8 100644 --- a/python/docs/index.rst +++ b/python/docs/index.rst @@ -2,12 +2,12 @@ Welcome to MTAP's documentation! =================================== .. toctree:: - :maxdepth: 3 + :maxdepth: 2 :caption: Contents: mtap deployment - io + serialization Overview ======== diff --git a/python/docs/io.rst b/python/docs/io.rst deleted file mode 100644 index 10dddc3c..00000000 --- a/python/docs/io.rst +++ /dev/null @@ -1,17 +0,0 @@ -mtap.io -======= -.. module:: mtap.io - -.. contents:: - -Serialization -------------- - -.. automodule:: mtap.io.serialization - :no-members: - -.. autoclass:: mtap.io.serialization.SerializationProcessor - :exclude-members: process - -.. autoclass:: mtap.io.serialization.Serializer - diff --git a/python/docs/serialization.rst b/python/docs/serialization.rst new file mode 100644 index 00000000..0191adbf --- /dev/null +++ b/python/docs/serialization.rst @@ -0,0 +1,14 @@ +mtap.serialization +================== + +Serialization +------------- + +.. automodule:: mtap.serialization + :no-members: + +.. autoclass:: mtap.serialization.SerializationProcessor + :exclude-members: process + +.. autoclass:: mtap.serialization.Serializer + diff --git a/python/mtap/deployment.py b/python/mtap/deployment.py index a40af04f..e8aceffb 100644 --- a/python/mtap/deployment.py +++ b/python/mtap/deployment.py @@ -14,6 +14,8 @@ """Module for deploying a set of processing services and the events server all at once. Examples: + See python/mtap/examples/exampleDeploymentConfiguration.yml for an example of the yaml deployment configuration + which can be loaded using :py:meth:`~mtap.deployment.Deployment.from_yaml_file` An example configuration @@ -54,7 +56,7 @@ from mtap import utilities, _config __all__ = [ - 'Deployment', 'GlobalSettings', 'SharedProcessorConfig', 'EventsDeployment', + 'Deployment', 'GlobalSettings', 'SharedProcessorConfig', 'EventsDeployment', 'ServiceDeployment', 'ProcessorDeployment', 'main', 'deployment_parser', 'ServiceDeploymentException', ] @@ -92,16 +94,16 @@ class GlobalSettings: """Settings shared by event service and all processors. Keyword Args: - host (Optional[str]): The global host override, forces all services to use a specific host name. - mtap_config (Optional[str]): The path to an MTAP config file to load for all services. - log_level (Optional[str]): A python logging level to pass to all services. - register (Optional[str]): Whether services should register with service discovery. + host (~typing.Optional[str]): The global host override, forces all services to use a specific host name. + mtap_config (~typing.Optional[str]): The path to an MTAP config file to load for all services. + log_level (~typing.Optional[str]): A python logging level to pass to all services. + register (~typing.Optional[bool]): Whether services should register with service discovery. Attributes: - host (Optional[str]): The global host override, forces all services to use a specific host name. - mtap_config (Optional[str]): The path to an MTAP config file to load for all services. - log_level (Optional[str]): A python logging level to pass to all services. - register (Optional[str]): Whether services should register with service discovery. + host (~typing.Optional[str]): The global host override, forces all services to use a specific host name. + mtap_config (~typing.Optional[str]): The path to an MTAP config file to load for all services. + log_level (~typing.Optional[str]): A python logging level to pass to all services. + register (~typing.Optional[bool]): Whether services should register with service discovery. """ @@ -120,7 +122,7 @@ def from_conf(conf: Optional[Dict]) -> 'GlobalSettings': """Creates a global settings object from a configuration dictionary. Keyword Args: - conf (Optional[Dict]): The configuration dictionary. + conf (~typing.Optional[~typing.Dict]): The configuration dictionary. Returns: GlobalSettings: The global settings object. @@ -135,38 +137,38 @@ class SharedProcessorConfig: """Configuration that is shared between multiple processor services. Keyword Args: - events_addresses (Optional[str]): An optional GRPC-compatible target for the events + events_addresses (~typing.Optional[~typing.List[str]]): An optional GRPC-compatible target for the events service to be used by all processors. - workers (Optional[int]): The default number of worker threads which will perform + workers (~typing.Optional[int]): The default number of worker threads which will perform processing. - additional_args (Optional[List[str]]): a list of additional arguments that + additional_args (~typing.Optional[~typing.List[str]]): a list of additional arguments that should be appended to every processor. - jvm_args (Optional[List[str]]): a list of JVM arguments for all java + jvm_args (~typing.Optional[~typing.List[str]]): a list of JVM arguments for all java processors. - java_classpath (Optional[str]): A classpath string that will be passed to all java + java_classpath (~typing.Optional[str]): A classpath string that will be passed to all java processors. - startup_timeout (Optional[int]): The default startup timeout for processors. - mp_spawn_method (Optional[str]): A :meth:`multiprocessing.get_context` argument to create + startup_timeout (~typing.Optional[int]): The default startup timeout for processors. + mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create the multiprocessing context. Attributes: - events_addresses (Optional[List[str]]): An optional GRPC-compatible target for the events + events_addresses (~typing.Optional[~typing.List[str]]): An optional GRPC-compatible target for the events service to be used by all processors. - workers (Optional[int]): The default number of worker threads which will perform + workers (~typing.Optional[int]): The default number of worker threads which will perform processing. - additional_args (Optional[List[str]]): a list of additional arguments that + additional_args (~typing.Optional[~typing.List[str]]): a list of additional arguments that should be appended to every processor. - jvm_args (Optional[List[str]]): a list of JVM arguments for all java + jvm_args (~typing.Optional[~typing.List[str]]): a list of JVM arguments for all java processors. - java_classpath (Optional[str]): A classpath string that will be passed to all java + java_classpath (~typing.Optional[str]): A classpath string that will be passed to all java processors. - startup_timeout (Optional[int]): The default startup timeout for processors. - mp_spawn_method (Optional[str]): A :meth:`multiprocessing.get_context` argument to create + startup_timeout (~typing.Optional[int]): The default startup timeout for processors. + mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create the multiprocessing context. """ - def __init__(self, + def __init__(self, *, events_addresses: Optional[List[str]] = None, workers: Optional[int] = None, additional_args: Optional[List[str]] = None, @@ -189,7 +191,7 @@ def from_conf(conf: Optional[Dict]) -> 'SharedProcessorConfig': """Builds a configuration from a dictionary representation. Args: - conf (Optional[Dict]): The configuration dictionary. + conf (~typing.Optional[~typing.Dict]): The configuration dictionary. Returns: SharedProcessorConfig object. @@ -199,8 +201,22 @@ def from_conf(conf: Optional[Dict]) -> 'SharedProcessorConfig': return SharedProcessorConfig(**conf) -class _ServiceDeployment: - def __init__(self, +class ServiceDeployment: + """Shared configuration for services, both events and processors. + + Keyword Args: + workers (~typing.Optional[int]): The number of workers. + register (~typing.Optional[bool]): Whether to use service discovery. + mtap_config (~typing.Optional[str]): A path to the mtap configuration. + log_level (~typing.Optional[str]): The log level. + + Attributes: + workers (~typing.Optional[int]): The number of workers. + register (~typing.Optional[bool]): Whether to use service discovery. + mtap_config (~typing.Optional[str]): A path to the mtap configuration. + log_level (~typing.Optional[str]): The log level. + """ + def __init__(self, *, workers: Optional[int], register: Optional[bool], mtap_config: Optional[str], @@ -210,15 +226,15 @@ def __init__(self, self.mtap_config = mtap_config self.log_level = log_level - def service_args(self, - host: Optional[str] = None, - port: Optional[int] = None, - unique_service_identifier: Optional[List[str]] = None, - register_default: Optional[bool] = None, - global_host: Optional[str] = None, - workers_default: Optional[int] = None, - mtap_config_default: Optional[str] = None, - log_level_default: Optional[str] = None): + def _service_args(self, + host: Optional[str] = None, + port: Optional[int] = None, + unique_service_identifier: Optional[List[str]] = None, + register_default: Optional[bool] = None, + global_host: Optional[str] = None, + workers_default: Optional[int] = None, + mtap_config_default: Optional[str] = None, + log_level_default: Optional[str] = None): call = [] host = global_host or host @@ -261,6 +277,11 @@ class EventsDeployment: mtap_config (~typing.Optional[str]): Path to an mtap configuration file. log_level (~typing.Optional[str]): The log level for the events service. + Attributes: + enabled (bool): Whether an events service should be created. + addresses (~typing.Optional[~typing.Sequence[str]]): The host address of the events service. + service_deployment (ServiceDeployment): + The service deployment settings (workers, registration, config, logging). """ def __init__(self, *, @@ -272,7 +293,8 @@ def __init__(self, *, log_level: Optional[str] = None): self.enabled = enabled self.addresses = addresses - self.service_deployment = _ServiceDeployment(workers, register, mtap_config, log_level) + self.service_deployment = ServiceDeployment(workers=workers, register=register, mtap_config=mtap_config, + log_level=log_level) def create_calls(self, global_settings: GlobalSettings) -> Iterable[Tuple[List[str], str]]: for address in self.addresses: @@ -287,7 +309,7 @@ def create_calls(self, global_settings: GlobalSettings) -> Iterable[Tuple[List[s else: host = splits[0] call = [PYTHON_EXE, '-m', 'mtap', 'events'] - service_args, sid = self.service_deployment.service_args( + service_args, sid = self.service_deployment._service_args( host=host, port=port, register_default=global_settings.register, @@ -303,7 +325,7 @@ def from_conf(conf: Optional[Dict]) -> 'EventsDeployment': """Creates the EventsDeployment configuration option from a configuration dictionary. Args: - conf (Optional[Dict]): The configuration dictionary + conf (~typing.Optional[~typing.Dict]): The configuration dictionary Returns: EventsDeployment or None from the configuration dictionary. @@ -341,6 +363,8 @@ class ProcessorDeployment: Args: implementation (str): Either "java" or "python". entry_point (str): Either the java main class, or the python main module. + + Keyword Args: enabled (~typing.Optional[bool]): Whether the processor should be launched as part of deployment. Default is `True` if `None`. instances (~typing.Optional[int]): The number of instances of the processor to launch. @@ -362,6 +386,23 @@ class ProcessorDeployment: mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create the multiprocessing context. + Attributes: + implementation (str): Either "java" or "python". + entry_point (str): Either the java main class, or the python main module. + enabled (bool): Whether the processor should be launched as part of deployment. + instances (int): The number of instances of the processor to launch. + host (~typing.Optional[str]): The listening host for the processor service. + port (~typing.Optional[int]): The listening port for the processor service. + service_deployment (ServiceDeployment): + The service deployment settings (workers, registration, config, logging). + pre_args (~typing.Optional[~typing.List[str]]): + Arguments that occur prior to the MTAP service arguments (like host, port, etc). + additional_args (~typing.Optional[~typing.List[str]]): + Arguments that occur after the MTAP service arguments. + startup_timeout (~typing.Optional[int]): Optional override startup timeout. + mp_spawn_method (~typing.Optional[str]): A :meth:`multiprocessing.get_context` argument to create + the multiprocessing context. + """ def __init__(self, @@ -381,9 +422,9 @@ def __init__(self, additional_args: Optional[List[str]] = None, startup_timeout: Optional[int] = None, mp_spawn_method: Optional[str] = None): - self.enabled = enabled if enabled is not None else True self.implementation = implementation self.entry_point = entry_point + self.enabled = enabled if enabled is not None else True self.instances = instances or 1 if not isinstance(self.instances, int) or self.instances < 1: raise ValueError("Instances must be strictly positive integer.") @@ -393,7 +434,8 @@ def __init__(self, self.additional_args = additional_args self.host = host self.port = port - self.service_deployment = _ServiceDeployment(workers, register, mtap_config, log_level) + self.service_deployment = ServiceDeployment(workers=workers, register=register, mtap_config=mtap_config, + log_level=log_level) self.startup_timeout = startup_timeout self.mp_spawn_method = mp_spawn_method @@ -402,7 +444,7 @@ def from_conf(conf: Dict) -> 'ProcessorDeployment': """Creates an MTAP processor deployment configuration from a configuration dictionary. Args: - conf (Dict): The configuration dictionary. + conf (~typing.Dict): The configuration dictionary. Returns: ProcessorDeployment object that can be used to constuct the call for the processor. @@ -440,7 +482,7 @@ def create_calls(self, if self.pre_args is not None: call.extend(self.pre_args) - service_args, sid = self.service_deployment.service_args( + service_args, sid = self.service_deployment._service_args( host=self.host, port=port, unique_service_identifier=self.unique_service_identifier, @@ -480,8 +522,15 @@ class Deployment: Deployment settings for the events service. shared_processor_config (~typing.Optional[SharedProcessorConfig]): Shared configuration settings for all processors. - processors (vararg ProcessorDeployment): Configurations for individual processors. + *processors (ProcessorDeployment): Configurations for individual processors. + Attributes: + global_settings (~typing.Optional[GlobalSettings]): Settings shared among all services. + events_deployment (~typing.Optional[EventsDeployment]): + Deployment settings for the events service. + shared_processor_config (~typing.Optional[SharedProcessorConfig]): + Shared configuration settings for all processors. + processors (~typing.List[ProcessorDeployment]): Configurations for individual processors. """ def __init__(self, @@ -500,7 +549,7 @@ def load_configuration(conf: Dict) -> 'Deployment': """Creates a deployment object from a configuration dictionary. Args: - conf (Dict): The configuration dictionary. + conf (~typing.Dict): The configuration dictionary. Returns: Deployment object created. @@ -535,30 +584,27 @@ def from_yaml_file(conf_path: Union[pathlib.Path, str]) -> 'Deployment': return Deployment.load_configuration(conf) @contextmanager - def run_servers(self) -> None: + def run_servers(self): """A context manager that starts all the configured services in subprocesses and returns. Raises: ServiceDeploymentException: If one or more of the services fails to launch. Examples + >>> deploy = Deployment.from_yaml_file('deploy_config.yml') >>> with deploy.run_servers(): >>> # do something that requires the servers. >>> # servers are automatically shutdown / terminated when the block is exited - """ try: self._do_launch_all_processors() - yield + yield self finally: self.shutdown() def run_servers_and_wait(self): """Starts the specified servers and blocks until KeyboardInterrupt, SIGINT, or SIGTERM are received. - - Returns: - """ e = threading.Event() signal.signal(signal.SIGINT, lambda *_: e.set()) @@ -601,6 +647,11 @@ def _do_launch_all_processors(self): print('Done deploying all servers.', flush=True) def shutdown(self): + """Shuts down all processors. + + Returns: + + """ print("Shutting down all processors") excs = [] for p, listener in self._processor_listeners: @@ -650,7 +701,7 @@ def main(args: Optional[Sequence[str]] = None, if conf is None: conf = deployment_parser().parse_args(args) if conf.log_level is not None: - logging.basicConfig(level=getattr(logging, conf.log_level)) + logging.basicConfig(level=conf.log_level) if conf.mode == 'run_servers': deployment = Deployment.from_yaml_file(conf.deploy_config) deployment.run_servers() diff --git a/python/mtap/processing/_base.py b/python/mtap/processing/_base.py index 144b81ee..c6074351 100644 --- a/python/mtap/processing/_base.py +++ b/python/mtap/processing/_base.py @@ -13,9 +13,9 @@ # limitations under the License. """Internal processors and pipelines functionality.""" import contextlib +import datetime import threading from abc import ABCMeta, abstractmethod, ABC -from datetime import timedelta, datetime from typing import ( List, ContextManager, @@ -79,7 +79,7 @@ def __init__(self, context: Optional = None, key: Optional[str] = None): self._key = key self._context = context self._running = False - self.duration = timedelta() + self.duration = datetime.timedelta() self._start = None def start(self): @@ -87,14 +87,14 @@ def start(self): """ if not self._running: self._running = True - self._start = datetime.now() + self._start = datetime.datetime.now() def stop(self): """Stops / pauses the timer """ if self._running: self._running = False - self.duration += datetime.now() - self._start + self.duration += datetime.datetime.now() - self._start def __enter__(self): return self @@ -308,12 +308,6 @@ class ProcessingError(Exception): class ProcessingResult(NamedTuple): """The result of processing one document or event. - - Attributes: - identifier (str): The id of the processor with respect to the pipeline. - result_dict: The json object returned by the processor as its results. - timing_info: A dictionary of the times taken processing this document - created_indices: Any indices that have been added to documents by this processor. """ identifier: str result_dict: Dict @@ -321,22 +315,23 @@ class ProcessingResult(NamedTuple): created_indices: Dict[str, List[str]] +ProcessingResult.identifier.__doc__ = "str: The id of the processor with respect to the pipeline." +ProcessingResult.result_dict.__doc__ = "Dict: The json object returned by the processor as its results." +ProcessingResult.timing_info.__doc__ = "Dict: A dictionary of the times taken processing this document." +ProcessingResult.created_indices.__doc__ = "Dict[str, List[str]]: Any indices that have been added to documents by " \ + "this processor." + class PipelineResult(NamedTuple): """The result of processing an event or document in a pipeline. - Attributes: - component_results (List[ProcessingResult]): The processing results for each individual - component - elapsed_time (timedelta): The elapsed time for the entire pipeline. - Args: component_results (List[ProcessingResult]): The processing results for each individual component - elapsed_time (timedelta): The elapsed time for the entire pipeline. + elapsed_time (~datetime.timedelta): The elapsed time for the entire pipeline. """ component_results: List[ProcessingResult] - elapsed_time: timedelta + elapsed_time: datetime.timedelta def component_result(self, identifier: str) -> ProcessingResult: """Returns the component result for a specific identifier. @@ -354,30 +349,30 @@ def component_result(self, identifier: str) -> ProcessingResult: raise KeyError('No result for identifier: ' + identifier) +PipelineResult.component_results.__doc__ = "List[ProcessingResult]: The processing results for each individual " \ + "component" +PipelineResult.elapsed_time.__doc__ = "~datetime.timedelta: The elapsed time for the entire pipeline." + + class TimerStats(NamedTuple): """Statistics about a specific keyed measured duration recorded by a :obj:`~mtap.processing.base.Stopwatch`. - - Attributes - mean: The sample mean of all measured durations. - std: The sample standard deviation of all measured durations. - min: The minimum of all measured durations. - max: The maximum of all measured durations. - sum: The sum of all measured durations. """ - mean: timedelta - std: timedelta - min: timedelta - max: timedelta - sum: timedelta + mean: datetime.timedelta + std: datetime.timedelta + min: datetime.timedelta + max: datetime.timedelta + sum: datetime.timedelta + + +TimerStats.mean.__doc__ = "~datetime.timedelta: The sample mean of all measured durations." +TimerStats.std.__doc__ = "~datetime.timedelta: The sample standard deviation of all measured durations." +TimerStats.min.__doc__ = "~datetime.timedelta: The minimum of all measured durations." +TimerStats.max.__doc__ = "~datetime.timedelta: The maximum of all measured durations." +TimerStats.sum.__doc__ = "~datetime.timedelta: The sum of all measured durations." class AggregateTimingInfo(NamedTuple): """Collection of all the timing info for a specific processor. - - Attributes: - identifier (str): The ID of the processor with respect to the pipeline. - timing_info (dict[str, TimerStats]): - A map from all the timer keys for the processor to the aggregated duration statistics. """ identifier: str timing_info: 'Dict[str, processing.TimerStats]' @@ -417,6 +412,11 @@ def timing_csv(self) -> Generator[str, None, None]: stats.min, stats.max, stats.sum) +AggregateTimingInfo.identifier.__doc__ = "str: The ID of the processor with respect to the pipeline." +AggregateTimingInfo.timing_info.__doc__ = "dict[str, TimerStats]: A map from all the timer keys for the processor to " \ + "the aggregated duration statistics." + + class ProcessingComponent(ABC): __slots__ = ()