diff --git a/.github/workflows/run-pytest.yml b/.github/workflows/run-pytest.yml index 66942c4b..216adf26 100644 --- a/.github/workflows/run-pytest.yml +++ b/.github/workflows/run-pytest.yml @@ -10,7 +10,7 @@ jobs: pytest: strategy: matrix: - python-version: [3.6, 3.7, 3.8] + python-version: [3.6, 3.7, 3.8, 3.9] os: [ubuntu-latest] # can't use macOS when using service containers or container jobs runs-on: ${{ matrix.os }} services: diff --git a/README.md b/README.md index 171ba06e..ea931792 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ ![Run pytests](https://github.com/pepkit/pipestat/workflows/Run%20pytests/badge.svg) [![codecov](https://codecov.io/gh/pepkit/pipestat/branch/master/graph/badge.svg?token=O07MXSQZ32)](https://codecov.io/gh/pepkit/pipestat) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) pipestat
diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000..bc5f12fb --- /dev/null +++ b/codecov.yml @@ -0,0 +1,5 @@ +ignore: + - "*/argparser.py" + - "*/cli.py" + - "*/__main__.py" + - "setup.py" \ No newline at end of file diff --git a/docs/README.md b/docs/README.md index 171ba06e..ea931792 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,5 +1,6 @@ ![Run pytests](https://github.com/pepkit/pipestat/workflows/Run%20pytests/badge.svg) [![codecov](https://codecov.io/gh/pepkit/pipestat/branch/master/graph/badge.svg?token=O07MXSQZ32)](https://codecov.io/gh/pepkit/pipestat) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) pipestat
diff --git a/docs/api_docs.md b/docs/api_docs.md index 1ba54ddd..21f94abe 100644 --- a/docs/api_docs.md +++ b/docs/api_docs.md @@ -461,4 +461,4 @@ Check schema for any possible issues -*Version Information: `pipestat` v0.0.1, generated by `lucidoc` v0.4.3* +*Version Information: `pipestat` v0.0.3-dev, generated by `lucidoc` v0.4.3* diff --git a/docs/autodoc_build/pipestat.md b/docs/autodoc_build/pipestat.md index b13e3c3a..46782529 100644 --- a/docs/autodoc_build/pipestat.md +++ b/docs/autodoc_build/pipestat.md @@ -339,7 +339,7 @@ Result schema mappings ```python -def retrieve(self, record_identifier=None, result_identifier=None, limit=None) +def retrieve(self, record_identifier=None, result_identifier=None) ``` Retrieve a result for a record. @@ -350,7 +350,6 @@ be returned. - `record_identifier` (`str`): unique identifier of the record - `result_identifier` (`str`): name of the result to be retrieved -- `limit` (`int`): max number of results to be returned #### Returns: @@ -385,7 +384,7 @@ Schema path ```python -def select(self, columns=None, condition=None, condition_val=None, limit=None) +def select(self, columns=None, condition=None, condition_val=None, offset=None, limit=None) ``` Get all the contents from the selected table, possibly restricted by the provided condition. @@ -394,7 +393,8 @@ Get all the contents from the selected table, possibly restricted by the provide - `columns` (`str | list[str]`): columns to select - `condition` (`str`): condition to restrict the resultswith, will be appended to the end of the SELECT statement and safely populated with 'condition_val', for example: `"id=%s"` - `condition_val` (`list`): values to fill the placeholderin 'condition' with -- `limit` (`int`): max number of results to be returned +- `offset` (`int`): number of records to be skipped +- `limit` (`int`): max number of records to be returned #### Returns: @@ -460,4 +460,4 @@ Check schema for any possible issues -*Version Information: `pipestat` v0.0.1, generated by `lucidoc` v0.4.3* \ No newline at end of file +*Version Information: `pipestat` v0.0.3-dev, generated by `lucidoc` v0.4.3* \ No newline at end of file diff --git a/docs/changelog.md b/docs/changelog.md index dcf848e5..b01f3de0 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format. +## [0.0.3] - 2021-03-12 +### Added +- possibility to initialize the `PipestatManager` object (or use the `pipestat status` CLI) with no results schema defined for pipeline status management; [Issue #1](https://github.com/pepkit/pipestat/issues/1) + + ## [0.0.2] - 2021-02-22 ### Added - initial package release \ No newline at end of file diff --git a/docs_jupyter/cli.ipynb b/docs_jupyter/cli.ipynb index a9eec695..914ace6f 100644 --- a/docs_jupyter/cli.ipynb +++ b/docs_jupyter/cli.ipynb @@ -51,7 +51,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "version: 0.0.1\n", + "version: 0.0.3\n", "usage: pipestat [-h] [--version] [--silent] [--verbosity V] [--logdev]\n", " {report,inspect,remove,retrieve,status} ...\n", "\n", @@ -518,8 +518,8 @@ "\n", "\n", "PipestatManager (test)\n", - "Backend: file (/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp.XSWvUccu)\n", - "Results schema source: /Users/mstolarczyk/code/pipestat/docs_jupyter/../tests/data/sample_output_schema.yaml\n", + "Backend: file (/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp.Zid7BMd1)\n", + "Results schema source: ../tests/data/sample_output_schema.yaml\n", "Status schema source: /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pipestat/schemas/status_schema.yaml\n", "Records count: 1\n" ] @@ -548,8 +548,8 @@ "\n", "\n", "PipestatManager (test)\n", - "Backend: file (/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp.XSWvUccu)\n", - "Results schema source: /Users/mstolarczyk/code/pipestat/docs_jupyter/../tests/data/sample_output_schema.yaml\n", + "Backend: file (/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp.Zid7BMd1)\n", + "Results schema source: ../tests/data/sample_output_schema.yaml\n", "Status schema source: /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pipestat/schemas/status_schema.yaml\n", "Records count: 1\n", "\n", @@ -655,8 +655,8 @@ "\n", "\n", "PipestatManager (test)\n", - "Backend: file (/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp.XSWvUccu)\n", - "Results schema source: /Users/mstolarczyk/code/pipestat/docs_jupyter/../tests/data/sample_output_schema.yaml\n", + "Backend: file (/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp.Zid7BMd1)\n", + "Results schema source: ../tests/data/sample_output_schema.yaml\n", "Status schema source: /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pipestat/schemas/status_schema.yaml\n", "Records count: 1\n", "\n", @@ -680,7 +680,9 @@ "To manage pipeline status call `pipestat status `:\n", "\n", "- `set` to set pipeline statuses\n", - "- `get` to retrieve pipeline statuses" + "- `get` to retrieve pipeline statuses\n", + "\n", + "Starting with `pipestat 0.0.3` the `--schema` argument is not required for status management if YAML file is used as the backend." ] }, { diff --git a/docs_jupyter/python_api.ipynb b/docs_jupyter/python_api.ipynb index 31e697d3..f70bce46 100644 --- a/docs_jupyter/python_api.ipynb +++ b/docs_jupyter/python_api.ipynb @@ -57,12 +57,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp_fb7s787.yaml\n" + "/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmpa6poai5_.yaml\n" ] } ], "source": [ "from tempfile import mkstemp\n", + "\n", "_, temp_file = mkstemp(suffix=\".yaml\")\n", "print(temp_file)" ] @@ -81,10 +82,10 @@ "outputs": [], "source": [ "psm = pipestat.PipestatManager(\n", - " namespace=\"test\", \n", - " record_identifier=\"sample1\", \n", - " results_file_path=temp_file, \n", - " schema_path=\"../tests/data/sample_output_schema.yaml\"\n", + " namespace=\"test\",\n", + " record_identifier=\"sample1\",\n", + " results_file_path=temp_file,\n", + " schema_path=\"../tests/data/sample_output_schema.yaml\",\n", ")" ] }, @@ -283,7 +284,7 @@ } ], "source": [ - "try: \n", + "try:\n", " psm.report(values={\"output_file\": {\"path\": \"/home/user/path.csv\"}})\n", "except ValidationError as e:\n", " print(e)" @@ -322,7 +323,12 @@ ], "source": [ "psm.report(\n", - " values={\"output_file\": {\"path\": \"/home/user/path.csv\", \"title\": \"CSV file with some data\"}}\n", + " values={\n", + " \"output_file\": {\n", + " \"path\": \"/home/user/path.csv\",\n", + " \"title\": \"CSV file with some data\",\n", + " }\n", + " }\n", ")" ] }, @@ -389,7 +395,12 @@ ], "source": [ "psm.report(\n", - " values={\"output_file\": {\"path\": \"/home/user/path_new.csv\", \"title\": \"new CSV file with some data\"}}\n", + " values={\n", + " \"output_file\": {\n", + " \"path\": \"/home/user/path_new.csv\",\n", + " \"title\": \"new CSV file with some data\",\n", + " }\n", + " }\n", ")" ] }, @@ -425,8 +436,13 @@ ], "source": [ "psm.report(\n", - " values={\"output_file\": {\"path\": \"/home/user/path_new.csv\", \"title\": \"new CSV file with some data\"}},\n", - " force_overwrite=True\n", + " values={\n", + " \"output_file\": {\n", + " \"path\": \"/home/user/path_new.csv\",\n", + " \"title\": \"new CSV file with some data\",\n", + " }\n", + " },\n", + " force_overwrite=True,\n", ")\n", "psm.data" ] @@ -445,10 +461,10 @@ "outputs": [], "source": [ "psm1 = pipestat.PipestatManager(\n", - " namespace=\"test\",\n", - " record_identifier=\"sample1\",\n", - " results_file_path=temp_file,\n", - " schema_path=\"../tests/data/sample_output_schema.yaml\"\n", + " namespace=\"test\",\n", + " record_identifier=\"sample1\",\n", + " results_file_path=temp_file,\n", + " schema_path=\"../tests/data/sample_output_schema.yaml\",\n", ")" ] }, @@ -492,7 +508,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmp_fb7s787.yaml\n", + "/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmpa6poai5_.yaml\n", "test:\n", " sample1:\n", " output_file:\n", @@ -590,9 +606,7 @@ "source": [ "try:\n", " psm.report(\n", - " record_identifier=\"sample2\",\n", - " values={\"number_of_things\": []},\n", - " strict_type=False\n", + " record_identifier=\"sample2\", values={\"number_of_things\": []}, strict_type=False\n", " )\n", "except TypeError as e:\n", " print(e)" @@ -814,20 +828,21 @@ "name": "stdout", "output_type": "stream", "text": [ - "/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmpmny6226c.yaml\n" + "/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmpf9m81e1n.yaml\n" ] } ], "source": [ "from tempfile import mkstemp\n", + "\n", "_, temp_file_highlight = mkstemp(suffix=\".yaml\")\n", "print(temp_file_highlight)\n", "\n", "psm_highlight = pipestat.PipestatManager(\n", - " namespace=\"test_highlight\", \n", - " record_identifier=\"sample1\", \n", - " results_file_path=temp_file_highlight, \n", - " schema_path=\"../tests/data/sample_output_schema_highlight.yaml\"\n", + " namespace=\"test_highlight\",\n", + " record_identifier=\"sample1\",\n", + " results_file_path=temp_file_highlight,\n", + " schema_path=\"../tests/data/sample_output_schema_highlight.yaml\",\n", ")" ] }, @@ -999,6 +1014,96 @@ "\n", "Please refer to the Python API documentation (`__init__` method) to see how to use custom status schema and flags directory." ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initializing `PipestatManager` without results schema\n", + "\n", + "Starting with `pipestat 0.0.3`, it is possible to initialize the `PipestatManager` object without specifying the results schema file. This feature comes in handy if `PipestatManager` is created with a sole intent to monitor pipeline status.\n", + "\n", + "Here's an example:" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/var/folders/3f/0wj7rs2144l9zsgxd3jn5nxc0000gn/T/tmpq81inuvn.yaml\n" + ] + } + ], + "source": [ + "_, temp_file_no_schema = mkstemp(suffix=\".yaml\")\n", + "print(temp_file_no_schema)\n", + "\n", + "psm_no_schema = pipestat.PipestatManager(\n", + " namespace=\"test_no_schema\", results_file_path=temp_file_no_schema\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "As you can see, the object has been initialized successfully. Obviuosly, the schema has to be defined to report and retrieve results as the requirement to predefine results and therefore the possibility to rely on the schema to gather all the possible results metadata in the pipestat clients is a big advantage.\n", + "\n", + "Moreover, initialization with a database as a backend is impossible without schema due to the characteristics of relational databases, which must have columns predefined, when created." + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Results schema not found. The schema is required to report results. It needs to be supplied to the object constructor.\n" + ] + } + ], + "source": [ + "try:\n", + " psm_no_schema.report(record_identifier=\"sample1\", values={\"key\": \"val\"})\n", + "except pipestat.SchemaNotFoundError as e:\n", + " print(e)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "As mentioned above, the pipeline status management capabilities are supported with no results schema defined:" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'running'" + ] + }, + "execution_count": 34, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "psm_no_schema.set_status(status_identifier=\"running\", record_identifier=\"sample1\")\n", + "psm_no_schema.get_status(record_identifier=\"sample1\")" + ] } ], "metadata": { diff --git a/pipestat/__init__.py b/pipestat/__init__.py index 5f5dd037..02cdabbb 100644 --- a/pipestat/__init__.py +++ b/pipestat/__init__.py @@ -1,9 +1,10 @@ # Project configuration, particularly for logging. import logmuse + from ._version import __version__ -from .pipestat import * from .helpers import * +from .pipestat import * __classes__ = ["PipestatManager"] __all__ = __classes__ diff --git a/pipestat/__main__.py b/pipestat/__main__.py index 454bb49c..0edeef18 100644 --- a/pipestat/__main__.py +++ b/pipestat/__main__.py @@ -1,6 +1,7 @@ -from .pipestat import main import sys +from .cli import main + if __name__ == "__main__": try: sys.exit(main()) diff --git a/pipestat/_version.py b/pipestat/_version.py index 3b93d0be..27fdca49 100644 --- a/pipestat/_version.py +++ b/pipestat/_version.py @@ -1 +1 @@ -__version__ = "0.0.2" +__version__ = "0.0.3" diff --git a/pipestat/argparser.py b/pipestat/argparser.py new file mode 100644 index 00000000..1418664f --- /dev/null +++ b/pipestat/argparser.py @@ -0,0 +1,227 @@ +import argparse + +from ubiquerg import VersionInHelpParser + +from ._version import __version__ +from .const import * + + +def _env_txt(arg_name): + """ + Check if env var set and produce text + """ + arg_val = os.environ.get(ENV_VARS[arg_name]) + txt = f"If not provided '{ENV_VARS[arg_name]}' env var will be used. " + return txt + ( + "Currently not set" if arg_val is None else f"Currently set to: {arg_val}" + ) + + +def build_argparser(desc): + """ + Builds argument parser. + :param str desc: additional description to print in help + :return argparse.ArgumentParser + """ + banner = "%(prog)s - report pipeline results" + additional_description = desc + parser = VersionInHelpParser( + version=__version__, description=banner, epilog=additional_description + ) + + subparsers = parser.add_subparsers(dest="command") + + def add_subparser(cmd, msg, subparsers): + return subparsers.add_parser( + cmd, + description=msg, + help=msg, + formatter_class=lambda prog: argparse.HelpFormatter( + prog, max_help_position=40, width=90 + ), + ) + + sps = {} + # common arguments + for cmd in SUBPARSER_MSGS.keys(): + sps[cmd] = add_subparser(cmd, SUBPARSER_MSGS[cmd], subparsers) + # status is nested and status subcommands require config path + if cmd == STATUS_CMD: + continue + sps[cmd].add_argument( + "-n", + "--namespace", + type=str, + metavar="N", + help=f"Name of the pipeline to report result for. {_env_txt('namespace')}", + ) + + status_subparser = sps[STATUS_CMD] + status_subparsers = status_subparser.add_subparsers(dest="subcommand") + + status_sps = {} + for cmd, desc in STATUS_SUBPARSER_MESSAGES.items(): + status_sps[cmd] = add_subparser(cmd, desc, status_subparsers) + status_sps[cmd].add_argument( + "-n", + "--namespace", + type=str, + metavar="N", + help=f"Name of the pipeline to report result for. {_env_txt('namespace')}", + ) + if cmd == STATUS_SET_CMD: + status_sps[cmd].add_argument( + "-i", + "--status-identifier", + metavar="S", + help="Status identifier to use", + required=True, + ) + status_sps[cmd].add_argument( + "-f", + "--results-file", + type=str, + metavar="F", + help=f"Path to the YAML file where the results will be stored. " + f"This file will be used as {PKG_NAME} backend and to restore" + f" the reported results across sessions", + ) + status_sps[cmd].add_argument( + "-c", + "--config", + type=str, + metavar="C", + help=f"Path to the YAML configuration file. {_env_txt('config')}", + ) + status_sps[cmd].add_argument( + "-a", + "--database-only", + action="store_true", + help="Whether the reported data should not be stored in the memory," + " only in the database.", + ) + status_sps[cmd].add_argument( + "-s", + "--schema", + type=str, + metavar="S", + help=f"Path to the schema that defines the results that can be reported. {_env_txt('schema')}", + ) + status_sps[cmd].add_argument( + "--status-schema", + type=str, + metavar="ST", + help=f"Path to the status schema. " + f"Default will be used if not provided: {STATUS_SCHEMA}", + ) + status_sps[cmd].add_argument( + "--flag-dir", + type=str, + metavar="FD", + help=f"Path to the flag directory in case YAML file is " + f"the pipestat backend.", + ) + status_sps[cmd].add_argument( + "-r", + "--record-identifier", + type=str, + metavar="R", + help=f"ID of the record to report the result for. {_env_txt('record_identifier')}", + ) + + # remove, report and inspect + for cmd in [REMOVE_CMD, REPORT_CMD, INSPECT_CMD, RETRIEVE_CMD]: + sps[cmd].add_argument( + "-f", + "--results-file", + type=str, + metavar="F", + help=f"Path to the YAML file where the results will be stored. " + f"This file will be used as {PKG_NAME} backend and to restore" + f" the reported results across sessions", + ) + sps[cmd].add_argument( + "-c", + "--config", + type=str, + metavar="C", + help=f"Path to the YAML configuration file. {_env_txt('config')}", + ) + sps[cmd].add_argument( + "-a", + "--database-only", + action="store_true", + help="Whether the reported data should not be stored in the memory," + " only in the database.", + ) + sps[cmd].add_argument( + "-s", + "--schema", + type=str, + metavar="S", + help=f"Path to the schema that defines the results that can be reported. {_env_txt('schema')}", + ) + sps[cmd].add_argument( + "--status-schema", + type=str, + metavar="ST", + help=f"Path to the status schema. " + f"Default will be used if not provided: {STATUS_SCHEMA}", + ) + sps[cmd].add_argument( + "--flag-dir", + type=str, + metavar="FD", + help=f"Path to the flag directory in case YAML file is " + f"the pipestat backend.", + ) + + # remove and report + for cmd in [REMOVE_CMD, REPORT_CMD, RETRIEVE_CMD]: + sps[cmd].add_argument( + "-i", + "--result-identifier", + required=True, + type=str, + metavar="I", + help="ID of the result to report; needs to be defined in the schema", + ) + sps[cmd].add_argument( + "-r", + "--record-identifier", + type=str, + metavar="R", + help=f"ID of the record to report the result for. {_env_txt('record_identifier')}", + ) + + # report + sps[REPORT_CMD].add_argument( + "-v", + "--value", + required=True, + metavar="V", + help="Value of the result to report", + ) + + sps[REPORT_CMD].add_argument( + "-o", + "--overwrite", + action="store_true", + help="Whether the result should override existing ones in " + "case of name clashes", + ) + + sps[REPORT_CMD].add_argument( + "-t", + "--try-convert", + action="store_true", + help="Whether to try to convert the reported value into reqiuired " + "class in case it does not meet the schema requirements", + ) + + # inspect + sps[INSPECT_CMD].add_argument( + "-d", "--data", action="store_true", help="Whether to display the data" + ) + + return parser diff --git a/pipestat/cli.py b/pipestat/cli.py new file mode 100644 index 00000000..ab52c931 --- /dev/null +++ b/pipestat/cli.py @@ -0,0 +1,88 @@ +import sys +from logging import getLogger + +import logmuse +from ubiquerg import expandpath + +from .argparser import build_argparser +from .const import * +from .pipestat import PipestatManager + +_LOGGER = getLogger(PKG_NAME) + + +def main(): + """ Primary workflow """ + from inspect import getdoc + + parser = logmuse.add_logging_options(build_argparser(getdoc(PipestatManager))) + args = parser.parse_args() + if args.command is None: + parser.print_help(sys.stderr) + sys.exit(1) + global _LOGGER + _LOGGER = logmuse.logger_via_cli(args, make_root=True) + _LOGGER.debug("Args namespace:\n{}".format(args)) + if args.config and not args.schema: + parser.error("the following arguments are required: -s/--schema") + psm = PipestatManager( + namespace=args.namespace, + schema_path=args.schema, + results_file_path=args.results_file, + config=args.config, + database_only=args.database_only, + status_schema_path=args.status_schema, + flag_file_dir=args.flag_dir, + ) + if args.command == REPORT_CMD: + value = args.value + result_metadata = psm.schema[args.result_identifier] + if ( + result_metadata[SCHEMA_TYPE_KEY] + in [ + "object", + "image", + "file", + ] + and os.path.exists(expandpath(value)) + ): + from json import load + + _LOGGER.info( + f"Reading JSON file with object type value: {expandpath(value)}" + ) + with open(expandpath(value), "r") as json_file: + value = load(json_file) + psm.report( + record_identifier=args.record_identifier, + values={args.result_identifier: value}, + force_overwrite=args.overwrite, + strict_type=not args.try_convert, + ) + if args.command == INSPECT_CMD: + print("\n") + print(psm) + if args.data and not args.database_only: + print("\nData:") + print(psm.data) + if args.command == REMOVE_CMD: + psm.remove( + result_identifier=args.result_identifier, + record_identifier=args.record_identifier, + ) + if args.command == RETRIEVE_CMD: + print( + psm.retrieve( + result_identifier=args.result_identifier, + record_identifier=args.record_identifier, + ) + ) + if args.command == STATUS_CMD: + if args.subcommand == STATUS_GET_CMD: + print(psm.get_status(record_identifier=args.record_identifier)) + if args.subcommand == STATUS_SET_CMD: + psm.set_status( + status_identifier=args.status_identifier, + record_identifier=args.record_identifier, + ) + sys.exit(0) diff --git a/pipestat/const.py b/pipestat/const.py index bbb9d79d..3c72253a 100644 --- a/pipestat/const.py +++ b/pipestat/const.py @@ -1,6 +1,5 @@ import os - PKG_NAME = "pipestat" LOCK_PREFIX = "lock." REPORT_CMD = "report" diff --git a/pipestat/exceptions.py b/pipestat/exceptions.py index 359f614f..8d4f4522 100644 --- a/pipestat/exceptions.py +++ b/pipestat/exceptions.py @@ -1,6 +1,7 @@ """ Package exception types """ import abc + from .const import * __all__ = [ diff --git a/pipestat/helpers.py b/pipestat/helpers.py index b872e5e6..44af23a3 100644 --- a/pipestat/helpers.py +++ b/pipestat/helpers.py @@ -1,243 +1,16 @@ import logging -import os -import jsonschema -import argparse +from re import findall +import jsonschema from oyaml import safe_load -from re import findall from psycopg2 import sql - -from ubiquerg import VersionInHelpParser, expandpath +from ubiquerg import expandpath from .const import * -from ._version import __version__ -from .exceptions import * - _LOGGER = logging.getLogger(__name__) -def _env_txt(arg_name): - """ - Check if env var set and produce text - """ - arg_val = os.environ.get(ENV_VARS[arg_name]) - txt = f"If not provided '{ENV_VARS[arg_name]}' env var will be used. " - return txt + ( - "Currently not set" if arg_val is None else f"Currently set to: {arg_val}" - ) - - -def build_argparser(desc): - """ - Builds argument parser. - :param str desc: additional description to print in help - :return argparse.ArgumentParser - """ - banner = "%(prog)s - report pipeline results" - additional_description = desc - parser = VersionInHelpParser( - version=__version__, description=banner, epilog=additional_description - ) - - subparsers = parser.add_subparsers(dest="command") - - def add_subparser(cmd, msg, subparsers): - return subparsers.add_parser( - cmd, - description=msg, - help=msg, - formatter_class=lambda prog: argparse.HelpFormatter( - prog, max_help_position=40, width=90 - ), - ) - - sps = {} - # common arguments - for cmd in SUBPARSER_MSGS.keys(): - sps[cmd] = add_subparser(cmd, SUBPARSER_MSGS[cmd], subparsers) - # status is nested and status subcommands require config path - if cmd == STATUS_CMD: - continue - sps[cmd].add_argument( - "-n", - "--namespace", - type=str, - metavar="N", - help=f"Name of the pipeline to report result for. {_env_txt('namespace')}", - ) - - status_subparser = sps[STATUS_CMD] - status_subparsers = status_subparser.add_subparsers(dest="subcommand") - - status_sps = {} - for cmd, desc in STATUS_SUBPARSER_MESSAGES.items(): - status_sps[cmd] = add_subparser(cmd, desc, status_subparsers) - status_sps[cmd].add_argument( - "-n", - "--namespace", - type=str, - metavar="N", - help=f"Name of the pipeline to report result for. {_env_txt('namespace')}", - ) - if cmd == STATUS_SET_CMD: - status_sps[cmd].add_argument( - "-i", - "--status-identifier", - metavar="S", - help="Status identifier to use", - required=True, - ) - status_sps[cmd].add_argument( - "-f", - "--results-file", - type=str, - metavar="F", - help=f"Path to the YAML file where the results will be stored. " - f"This file will be used as {PKG_NAME} backend and to restore" - f" the reported results across sessions", - ) - status_sps[cmd].add_argument( - "-c", - "--config", - type=str, - metavar="C", - help=f"Path to the YAML configuration file. {_env_txt('config')}", - ) - status_sps[cmd].add_argument( - "-a", - "--database-only", - action="store_true", - help="Whether the reported data should not be stored in the memory," - " only in the database.", - ) - status_sps[cmd].add_argument( - "-s", - "--schema", - type=str, - metavar="S", - help=f"Path to the schema that defines the results that can be reported. {_env_txt('schema')}", - ) - status_sps[cmd].add_argument( - "--status-schema", - type=str, - metavar="ST", - help=f"Path to the status schema. " - f"Default will be used if not provided: {STATUS_SCHEMA}", - ) - status_sps[cmd].add_argument( - "--flag-dir", - type=str, - metavar="FD", - help=f"Path to the flag directory in case YAML file is " - f"the pipestat backend.", - ) - status_sps[cmd].add_argument( - "-r", - "--record-identifier", - type=str, - metavar="R", - help=f"ID of the record to report the result for. {_env_txt('record_identifier')}", - ) - - # remove, report and inspect - for cmd in [REMOVE_CMD, REPORT_CMD, INSPECT_CMD, RETRIEVE_CMD]: - sps[cmd].add_argument( - "-f", - "--results-file", - type=str, - metavar="F", - help=f"Path to the YAML file where the results will be stored. " - f"This file will be used as {PKG_NAME} backend and to restore" - f" the reported results across sessions", - ) - sps[cmd].add_argument( - "-c", - "--config", - type=str, - metavar="C", - help=f"Path to the YAML configuration file. {_env_txt('config')}", - ) - sps[cmd].add_argument( - "-a", - "--database-only", - action="store_true", - help="Whether the reported data should not be stored in the memory," - " only in the database.", - ) - sps[cmd].add_argument( - "-s", - "--schema", - type=str, - metavar="S", - help=f"Path to the schema that defines the results that can be reported. {_env_txt('schema')}", - ) - sps[cmd].add_argument( - "--status-schema", - type=str, - metavar="ST", - help=f"Path to the status schema. " - f"Default will be used if not provided: {STATUS_SCHEMA}", - ) - sps[cmd].add_argument( - "--flag-dir", - type=str, - metavar="FD", - help=f"Path to the flag directory in case YAML file is " - f"the pipestat backend.", - ) - - # remove and report - for cmd in [REMOVE_CMD, REPORT_CMD, RETRIEVE_CMD]: - sps[cmd].add_argument( - "-i", - "--result-identifier", - required=True, - type=str, - metavar="I", - help="ID of the result to report; needs to be defined in the schema", - ) - sps[cmd].add_argument( - "-r", - "--record-identifier", - type=str, - metavar="R", - help=f"ID of the record to report the result for. {_env_txt('record_identifier')}", - ) - - # report - sps[REPORT_CMD].add_argument( - "-v", - "--value", - required=True, - metavar="V", - help="Value of the result to report", - ) - - sps[REPORT_CMD].add_argument( - "-o", - "--overwrite", - action="store_true", - help="Whether the result should override existing ones in " - "case of name clashes", - ) - - sps[REPORT_CMD].add_argument( - "-t", - "--try-convert", - action="store_true", - help="Whether to try to convert the reported value into reqiuired " - "class in case it does not meet the schema requirements", - ) - - # inspect - sps[INSPECT_CMD].add_argument( - "-d", "--data", action="store_true", help="Whether to display the data" - ) - - return parser - - def schema_to_columns(schema): """ Get a list of database table columns from a schema diff --git a/pipestat/pipestat.py b/pipestat/pipestat.py index f68f86d5..224c92ce 100644 --- a/pipestat/pipestat.py +++ b/pipestat/pipestat.py @@ -1,15 +1,14 @@ -import psycopg2 -from psycopg2.extras import DictCursor, Json -from psycopg2.extensions import connection -from logging import getLogger from contextlib import contextmanager from copy import deepcopy +from logging import getLogger -import sys -import logmuse +import psycopg2 from attmap import PathExAttMap as PXAM +from psycopg2.extensions import connection +from psycopg2.extras import DictCursor, Json from ubiquerg import create_lock, remove_lock from yacman import YacAttMap + from .const import * from .exceptions import * from .helpers import * @@ -46,9 +45,18 @@ class PipestatManager(dict): pipeline status and can be backed by either a YAML-formatted file or a PostgreSQL database. """ - def __init__(self, namespace=None, record_identifier=None, schema_path=None, - results_file_path=None, database_only=False, config=None, - status_schema_path=None, flag_file_dir=None): + + def __init__( + self, + namespace=None, + record_identifier=None, + schema_path=None, + results_file_path=None, + database_only=False, + config=None, + status_schema_path=None, + flag_file_dir=None, + ): """ Initialize the object @@ -68,6 +76,7 @@ def __init__(self, namespace=None, record_identifier=None, schema_path=None, :param str status_schema_path: path to the status schema that formalizes the status flags structure """ + def _check_cfg_key(cfg, key): if key not in cfg: _LOGGER.warning(f"Key '{key}' not found in config") @@ -97,14 +106,14 @@ def _select_value(arg_name, arg_value, cfg, strict=True, env_var=None): if env_var is not None: arg = os.getenv(env_var, None) if arg is not None: - _LOGGER.debug( - f"Value '{arg}' sourced from '{env_var}' env var") + _LOGGER.debug(f"Value '{arg}' sourced from '{env_var}' env var") return expandpath(arg) if strict: raise PipestatError( f"Value for the required '{arg_name}' argument could not be" f" determined. Provide it in the config or pass to the " - f"object constructor.") + f"object constructor." + ) return return cfg[arg_name] @@ -120,68 +129,111 @@ def _select_value(arg_name, arg_value, cfg, strict=True, env_var=None): elif isinstance(config, dict): self[CONFIG_KEY] = YacAttMap(entries=config) else: - raise TypeError("database_config has to be either path to the " - "file to read or a dict") + raise TypeError( + "database_config has to be either path to the " + "file to read or a dict" + ) # validate config # TODO: uncomment below when this gets released: https://github.com/pepkit/attmap/pull/75 # cfg = self[CONFIG_KEY].to_dict(expand=True) # _, cfg_schema = read_yaml_data(CFG_SCHEMA, "config schema") # validate(cfg, cfg_schema) - self[NAME_KEY] = _select_value("namespace", namespace, self[CONFIG_KEY], env_var=ENV_VARS["namespace"]) + self[NAME_KEY] = _select_value( + "namespace", namespace, self[CONFIG_KEY], env_var=ENV_VARS["namespace"] + ) self[RECORD_ID_KEY] = _select_value( - "record_identifier", record_identifier, self[CONFIG_KEY], False, ENV_VARS["record_identifier"]) + "record_identifier", + record_identifier, + self[CONFIG_KEY], + False, + ENV_VARS["record_identifier"], + ) self[DB_ONLY_KEY] = database_only # read results schema - schema_path = _mk_abs_via_cfg( - _select_value("schema_path", schema_path, self[CONFIG_KEY], - env_var=ENV_VARS["schema"]), - config) - _, self[SCHEMA_KEY] = read_yaml_data(schema_path, "schema") - self.validate_schema() - self._schema_path = schema_path + self._schema_path = _select_value( + "schema_path", + schema_path, + self[CONFIG_KEY], + False, + env_var=ENV_VARS["schema"], + ) + if self._schema_path is not None: + _, self[SCHEMA_KEY] = read_yaml_data( + _mk_abs_via_cfg(self._schema_path, config), "schema" + ) + self.validate_schema() + # determine the highlighted results + # the conditional in the list comprehension below needs to be a + # literal "== True" so that if evaluates to False if 'highlight' + # value is just "truthy", not True + self[HIGHLIGHTED_KEY] = [ + k + for k, v in self.schema.items() + if "highlight" in v and v["highlight"] == True + ] + if self[HIGHLIGHTED_KEY]: + assert isinstance(self[HIGHLIGHTED_KEY], list), TypeError( + f"highlighted results specification " + f"({self[HIGHLIGHTED_KEY]}) has to be a list" + ) # read status schema - status_schema_path = _mk_abs_via_cfg(_select_value( - "status_schema_path", status_schema_path, - self[CONFIG_KEY], False, env_var=ENV_VARS["status_schema"]), config) or STATUS_SCHEMA + status_schema_path = ( + _mk_abs_via_cfg( + _select_value( + "status_schema_path", + status_schema_path, + self[CONFIG_KEY], + False, + env_var=ENV_VARS["status_schema"], + ), + config, + ) + or STATUS_SCHEMA + ) self[STATUS_SCHEMA_SOURCE_KEY], self[STATUS_SCHEMA_KEY] = read_yaml_data( - status_schema_path, "status schema") - # determine the highlighted results - # the conditional in the list comprehension below needs to be a - # literal "== True" so that if evaluates to False if 'highlight' - # value is just "truthy", not True - self[HIGHLIGHTED_KEY] = [k for k, v in self.schema.items() - if "highlight" in v and v["highlight"] == True] - if self[HIGHLIGHTED_KEY]: - assert isinstance(self[HIGHLIGHTED_KEY], list), \ - TypeError(f"highlighted results specification " - f"({self[HIGHLIGHTED_KEY]}) has to be a list") + status_schema_path, "status schema" + ) # determine results file results_file_path = _mk_abs_via_cfg( - _select_value("results_file_path", results_file_path, - self[CONFIG_KEY], False, ENV_VARS["results_file"]), - config) + _select_value( + "results_file_path", + results_file_path, + self[CONFIG_KEY], + False, + ENV_VARS["results_file"], + ), + config, + ) if results_file_path: if self[DB_ONLY_KEY]: - raise ValueError("Running in database only mode does not make " - "sense with a YAML file as a backend.") + raise ValueError( + "Running in database only mode does not make " + "sense with a YAML file as a backend." + ) self[FILE_KEY] = results_file_path self._init_results_file() flag_file_dir = _select_value( - "flag_file_dir", flag_file_dir, self[CONFIG_KEY], False) \ - or os.path.dirname(self.file) + "flag_file_dir", flag_file_dir, self[CONFIG_KEY], False + ) or os.path.dirname(self.file) self[STATUS_FILE_DIR] = _mk_abs_via_cfg(flag_file_dir, self.config_path) elif CFG_DATABASE_KEY in self[CONFIG_KEY]: - if not all([_check_cfg_key(self[CONFIG_KEY][CFG_DATABASE_KEY], key) - for key in DB_CREDENTIALS]): - raise MissingConfigDataError("Must specify all database login " - "credentials or result_file_path") + if not all( + [ + _check_cfg_key(self[CONFIG_KEY][CFG_DATABASE_KEY], key) + for key in DB_CREDENTIALS + ] + ): + raise MissingConfigDataError( + "Must specify all database login " "credentials or result_file_path" + ) self[DATA_KEY] = YacAttMap() self._init_postgres_table() self._init_status_table() else: - raise MissingConfigDataError("Must specify either database login " - "credentials or a YAML file path") + raise MissingConfigDataError( + "Must specify either database login " "credentials or a YAML file path" + ) def __str__(self): """ @@ -191,13 +243,13 @@ def __str__(self): """ res = f"{self.__class__.__name__} ({self.namespace})" res += "\nBackend: {}".format( - f"file ({self.file})" if self.file else "PostgreSQL") + f"file ({self.file})" if self.file else "PostgreSQL" + ) res += f"\nResults schema source: {self.schema_path}" res += f"\nStatus schema source: {self.status_schema_source}" res += f"\nRecords count: {self.record_count}" if self.highlighted_results: - res += \ - f"\nHighlighted results: {', '.join(self.highlighted_results)}" + res += f"\nHighlighted results: {', '.join(self.highlighted_results)}" return res def _get_flag_file(self, record_identifier=None): @@ -208,11 +260,14 @@ def _get_flag_file(self, record_identifier=None): :return str: path to the status flag file """ from glob import glob + r_id = self._strict_record_id(record_identifier) if self.file is None: return if self.file is not None: - regex = os.path.join(self[STATUS_FILE_DIR], f"{self.namespace}_{r_id}_*.flag") + regex = os.path.join( + self[STATUS_FILE_DIR], f"{self.namespace}_{r_id}_*.flag" + ) file_list = glob(regex) if len(file_list) > 1: _LOGGER.warning("Multiple flag files found") @@ -239,8 +294,11 @@ def record_count(self): :return int: number of records reported """ - return len(self.data[self.namespace]) if self.file \ + return ( + len(self.data[self.namespace]) + if self.file else self._count_rows(self.namespace) + ) @property def namespace(self): @@ -347,8 +405,9 @@ def db_cursor(self): try: if not self.check_connection(): self.establish_postgres_connection() - with self[DB_CONNECTION_KEY] as c, \ - c.cursor(cursor_factory=LoggingCursor) as cur: + with self[DB_CONNECTION_KEY] as c, c.cursor( + cursor_factory=LoggingCursor + ) as cur: yield cur except Exception: raise @@ -364,10 +423,12 @@ def get_status(self, record_identifier=None): r_id = self._strict_record_id(record_identifier) if self.file is None: with self.db_cursor as cur: - query = sql.SQL(f"SELECT {STATUS} " - f"FROM {f'{self.namespace}_{STATUS}'} " - f"WHERE {RECORD_ID}=%s") - cur.execute(query, (r_id, )) + query = sql.SQL( + f"SELECT {STATUS} " + f"FROM {f'{self.namespace}_{STATUS}'} " + f"WHERE {RECORD_ID}=%s" + ) + cur.execute(query, (r_id,)) result = cur.fetchone() return result[0] if result is not None else None else: @@ -378,7 +439,8 @@ def get_status(self, record_identifier=None): return status _LOGGER.debug( f"Could not determine status for '{r_id}' record. " - f"No flags found in: {self[STATUS_FILE_DIR]}") + f"No flags found in: {self[STATUS_FILE_DIR]}" + ) return def _get_attr(self, attr): @@ -405,8 +467,7 @@ def _table_to_dict(self): for res_id, val in record.items(): if val is not None: self._report_data_element( - record_identifier=record_id, - values={res_id: val} + record_identifier=record_id, values={res_id: val} ) def _init_postgres_table(self): @@ -420,13 +481,11 @@ def _init_postgres_table(self): if self.schema is None: raise SchemaNotFoundError("initialize the database table") if self._check_table_exists(table_name=self.namespace): - _LOGGER.debug( - f"Table '{self.namespace}' already exists in the database") + _LOGGER.debug(f"Table '{self.namespace}' already exists in the database") if not self[DB_ONLY_KEY]: self._table_to_dict() return False - _LOGGER.info( - f"Initializing '{self.namespace}' table in '{PKG_NAME}' database") + _LOGGER.info(f"Initializing '{self.namespace}' table in '{PKG_NAME}' database") columns = FIXED_COLUMNS + schema_to_columns(schema=self.schema) self._create_table(table_name=self.namespace, columns=columns) return True @@ -447,8 +506,9 @@ def _init_status_table(self): status_table_name = f"{self.namespace}_{STATUS}" # self._create_status_type() if not self._check_table_exists(table_name=status_table_name): - _LOGGER.info(f"Initializing '{status_table_name}' table in " - f"'{PKG_NAME}' database") + _LOGGER.info( + f"Initializing '{status_table_name}' table in " f"'{PKG_NAME}' database" + ) self._create_table(status_table_name, STATUS_TABLE_COLUMNS) def _create_table(self, table_name, columns): @@ -473,7 +533,7 @@ def _init_results_file(self): """ if not os.path.exists(self.file): _LOGGER.info(f"Initializing results file '{self.file}'") - data = YacAttMap(entries={self.namespace: '{}'}) + data = YacAttMap(entries={self.namespace: "{}"}) data.write(filepath=self.file) data.make_readonly() self[DATA_KEY] = data @@ -484,7 +544,8 @@ def _init_results_file(self): if filtered and self.namespace not in filtered: raise PipestatDatabaseError( f"'{self.file}' is already used to report results for " - f"other namespace: {filtered[0]}") + f"other namespace: {filtered[0]}" + ) self[DATA_KEY] = data return False @@ -499,7 +560,7 @@ def _check_table_exists(self, table_name): cur.execute( "SELECT EXISTS(SELECT * FROM information_schema.tables " "WHERE table_name=%s)", - (table_name, ) + (table_name,), ) return cur.fetchone()[0] @@ -512,9 +573,11 @@ def _check_record(self, condition_col, condition_val, table_name): :return bool: whether any record matches the provided condition """ with self.db_cursor as cur: - statement = f"SELECT EXISTS(SELECT 1 from {table_name} " \ - f"WHERE {condition_col}=%s)" - cur.execute(statement, (condition_val, )) + statement = ( + f"SELECT EXISTS(SELECT 1 from {table_name} " + f"WHERE {condition_col}=%s)" + ) + cur.execute(statement, (condition_val,)) return cur.fetchone()[0] def _count_rows(self, table_name): @@ -526,7 +589,8 @@ def _count_rows(self, table_name): """ with self.db_cursor as cur: statement = sql.SQL("SELECT COUNT(*) FROM {}").format( - sql.Identifier(table_name)) + sql.Identifier(table_name) + ) cur.execute(statement) return cur.fetchall()[0][0] @@ -545,27 +609,29 @@ def _report_postgres(self, value, record_identifier, table_name=None): :return int: id of the row just inserted """ table_name = table_name or self.namespace - if not self._check_record(condition_col=RECORD_ID, - condition_val=record_identifier, - table_name=table_name): + if not self._check_record( + condition_col=RECORD_ID, + condition_val=record_identifier, + table_name=table_name, + ): with self.db_cursor as cur: cur.execute( f"INSERT INTO {table_name} ({RECORD_ID}) VALUES (%s)", - (record_identifier, ) + (record_identifier,), ) # prep a list of SQL objects with column-named value placeholders - columns = sql.SQL(",").join([sql.SQL("{}=%({})s").format( - sql.Identifier(k), sql.SQL(k)) for k in list(value.keys())]) + columns = sql.SQL(",").join( + [ + sql.SQL("{}=%({})s").format(sql.Identifier(k), sql.SQL(k)) + for k in list(value.keys()) + ] + ) # construct the query template to execute - query = sql.SQL("UPDATE {n} SET {c} WHERE {id}=%({id})s RETURNING id").\ - format( - n=sql.Identifier(table_name), - c=columns, - id=sql.SQL(RECORD_ID) + query = sql.SQL("UPDATE {n} SET {c} WHERE {id}=%({id})s RETURNING id").format( + n=sql.Identifier(table_name), c=columns, id=sql.SQL(RECORD_ID) ) # preprocess the values, dict -> Json - values = {k: Json(v) if isinstance(v, dict) else v - for k, v in value.items()} + values = {k: Json(v) if isinstance(v, dict) else v for k, v in value.items()} # add record_identifier column, which is specified outside of values values.update({RECORD_ID: record_identifier}) with self.db_cursor as cur: @@ -589,7 +655,8 @@ def clear_status(self, record_identifier=None, flag_names=None): removed = [] for f in flag_names: path_flag_file = self.get_status_flag_path( - status_identifier=f, record_identifier=r_id) + status_identifier=f, record_identifier=r_id + ) try: os.remove(path_flag_file) except: @@ -600,14 +667,18 @@ def clear_status(self, record_identifier=None, flag_names=None): return removed else: removed = self.get_status(r_id) - status_table_name = f'{self.namespace}_{STATUS}' + status_table_name = f"{self.namespace}_{STATUS}" with self.db_cursor as cur: try: - cur.execute(f"DELETE FROM {status_table_name} WHERE " - f"{RECORD_ID}='{r_id}'") + cur.execute( + f"DELETE FROM {status_table_name} WHERE " + f"{RECORD_ID}='{r_id}'" + ) except Exception as e: - _LOGGER.error(f"Could not remove the status from the " - f"database. Exception: {e}") + _LOGGER.error( + f"Could not remove the status from the " + f"database. Exception: {e}" + ) return [] else: return [removed] @@ -626,8 +697,9 @@ def get_status_flag_path(self, status_identifier, record_identifier=None): # DB as the backend return r_id = self._strict_record_id(record_identifier) - return os.path.join(self[STATUS_FILE_DIR], - f"{self.namespace}_{r_id}_{status_identifier}.flag") + return os.path.join( + self[STATUS_FILE_DIR], f"{self.namespace}_{r_id}_{status_identifier}.flag" + ) def set_status(self, status_identifier, record_identifier=None): """ @@ -647,7 +719,8 @@ def set_status(self, status_identifier, record_identifier=None): if status_identifier not in known_status_identifiers: raise PipestatError( f"'{status_identifier}' is not a defined status identifier. " - f"These are allowed: {known_status_identifiers}") + f"These are allowed: {known_status_identifiers}" + ) prev_status = self.get_status(r_id) if self.file is not None: if prev_status: @@ -663,17 +736,19 @@ def set_status(self, status_identifier, record_identifier=None): self._report_postgres( value={STATUS: status_identifier}, record_identifier=r_id, - table_name=f"{self.namespace}_{STATUS}" + table_name=f"{self.namespace}_{STATUS}", ) except Exception as e: - _LOGGER.error(f"Could not insert into the status table. " - f"Exception: {e}") + _LOGGER.error( + f"Could not insert into the status table. " f"Exception: {e}" + ) raise if prev_status: _LOGGER.debug( - f"Changed status from '{prev_status}' to '{status_identifier}'") + f"Changed status from '{prev_status}' to '{status_identifier}'" + ) - def check_result_exists(self, result_identifier, record_identifier=None): + def check_result_exists(self, result_identifier, record_identifier=None): """ Check if the result has been reported @@ -684,7 +759,8 @@ def check_result_exists(self, result_identifier, record_identifier=None): """ record_identifier = self._strict_record_id(record_identifier) return self._check_which_results_exist( - results=[result_identifier], rid=record_identifier) + results=[result_identifier], rid=record_identifier + ) def _check_which_results_exist(self, results, rid=None): """ @@ -699,15 +775,18 @@ def _check_which_results_exist(self, results, rid=None): existing = [] for r in results: if not self[DB_ONLY_KEY]: - if self.namespace in self.data and rid in self.data[self.namespace] \ - and r in self.data[self.namespace][rid]: + if ( + self.namespace in self.data + and rid in self.data[self.namespace] + and r in self.data[self.namespace][rid] + ): existing.append(r) else: with self.db_cursor as cur: try: cur.execute( f"SELECT {r} FROM {self.namespace} WHERE {RECORD_ID}=%s", - (rid, ) + (rid,), ) except Exception: continue @@ -730,15 +809,24 @@ def check_record_exists(self, record_identifier=None): cur.execute( f"SELECT exists(SELECT 1 from {self.namespace} " f"WHERE {RECORD_ID}=%s)", - (record_identifier, ) + (record_identifier,), ) return cur.fetchone() - if self.namespace in self.data and record_identifier in self.data[self.namespace]: + if ( + self.namespace in self.data + and record_identifier in self.data[self.namespace] + ): return True return False - def report(self, values, record_identifier=None, force_overwrite=False, - strict_type=True, return_id=False): + def report( + self, + values, + record_identifier=None, + force_overwrite=False, + strict_type=True, + return_id=False, + ): """ Report a result. @@ -759,28 +847,31 @@ def report(self, values, record_identifier=None, force_overwrite=False, if return_id and self.file is not None: raise NotImplementedError( "There is no way to return the updated object ID while using " - "results file as the object backend") + "results file as the object backend" + ) if self.schema is None: raise SchemaNotFoundError("report results") result_identifiers = list(values.keys()) self.assert_results_defined(results=result_identifiers) existing = self._check_which_results_exist( - rid=record_identifier, results=result_identifiers) + rid=record_identifier, results=result_identifiers + ) if existing: _LOGGER.warning( - f"These results exist for '{record_identifier}': {existing}") + f"These results exist for '{record_identifier}': {existing}" + ) if not force_overwrite: return False _LOGGER.info(f"Overwriting existing results: {existing}") for r in result_identifiers: - validate_type(value=values[r], schema=self.result_schemas[r], - strict_type=strict_type) + validate_type( + value=values[r], schema=self.result_schemas[r], strict_type=strict_type + ) if self.file is not None: self.data.make_writable() if not self[DB_ONLY_KEY]: self._report_data_element( - record_identifier=record_identifier, - values=values + record_identifier=record_identifier, values=values ) if self.file is not None: self.data.write() @@ -788,21 +879,22 @@ def report(self, values, record_identifier=None, force_overwrite=False, else: try: updated_ids = self._report_postgres( - record_identifier=record_identifier, - value=values + record_identifier=record_identifier, value=values ) except Exception as e: - _LOGGER.error(f"Could not insert the result into the database. " - f"Exception: {e}") + _LOGGER.error( + f"Could not insert the result into the database. " f"Exception: {e}" + ) if not self[DB_ONLY_KEY]: for r in result_identifiers: del self[DATA_KEY][self.namespace][record_identifier][r] raise nl = "\n" - rep_strs = [f'{k}: {v}' for k, v in values.items()] + rep_strs = [f"{k}: {v}" for k, v in values.items()] _LOGGER.info( f"Reported records for '{record_identifier}' in '{self.namespace}' " - f"namespace:{nl} - {(nl + ' - ').join(rep_strs)}") + f"namespace:{nl} - {(nl + ' - ').join(rep_strs)}" + ) return True if not return_id else updated_ids def _report_data_element(self, record_identifier, values): @@ -820,8 +912,9 @@ def _report_data_element(self, record_identifier, values): for res_id, val in values.items(): self[DATA_KEY][self.namespace][record_identifier][res_id] = val - def select(self, columns=None, condition=None, condition_val=None, - offset=None, limit=None): + def select( + self, columns=None, condition=None, condition_val=None, offset=None, limit=None + ): """ Get all the contents from the selected table, possibly restricted by the provided condition. @@ -842,15 +935,16 @@ def select(self, columns=None, condition=None, condition_val=None, "Selection is not supported on objects backed by results files." " Use 'retrieve' method instead." ) - condition, condition_val = \ - preprocess_condition_pair(condition, condition_val) + condition, condition_val = preprocess_condition_pair(condition, condition_val) if not columns: columns = sql.SQL("*") else: - columns = sql.SQL(',').join( - [sql.Identifier(x) for x in mk_list_of_str(columns)]) + columns = sql.SQL(",").join( + [sql.Identifier(x) for x in mk_list_of_str(columns)] + ) statement = sql.SQL("SELECT {} FROM {}").format( - columns, sql.Identifier(self.namespace)) + columns, sql.Identifier(self.namespace) + ) if condition: statement += sql.SQL(" WHERE ") statement += condition @@ -876,35 +970,35 @@ def retrieve(self, record_identifier=None, result_identifier=None): if self[DB_ONLY_KEY]: if result_identifier is not None: existing = self._check_which_results_exist( - results=[result_identifier], - rid=record_identifier + results=[result_identifier], rid=record_identifier ) if not existing: raise PipestatDatabaseError( f"Result '{result_identifier}' not found for record " - f"'{record_identifier}'") + f"'{record_identifier}'" + ) with self.db_cursor as cur: - query = sql.SQL(f"SELECT {result_identifier or '*'} " - f"FROM {self.namespace} WHERE {RECORD_ID}=%s") - cur.execute(query, (record_identifier, )) + query = sql.SQL( + f"SELECT {result_identifier or '*'} " + f"FROM {self.namespace} WHERE {RECORD_ID}=%s" + ) + cur.execute(query, (record_identifier,)) result = cur.fetchall() if len(result) > 0: if result_identifier is None: - return {k: v for k, v in dict(result[0]).items() - if v is not None} + return {k: v for k, v in dict(result[0]).items() if v is not None} return dict(result[0])[result_identifier] - raise PipestatDatabaseError( - f"Record '{record_identifier}' not found") + raise PipestatDatabaseError(f"Record '{record_identifier}' not found") else: if record_identifier not in self.data[self.namespace]: - raise PipestatDatabaseError( - f"Record '{record_identifier}' not found") + raise PipestatDatabaseError(f"Record '{record_identifier}' not found") if result_identifier is None: return self.data[self.namespace][record_identifier].to_dict() if result_identifier not in self.data[self.namespace][record_identifier]: raise PipestatDatabaseError( f"Result '{result_identifier}' not found for record " - f"'{record_identifier}'") + f"'{record_identifier}'" + ) return self.data[self.namespace][record_identifier][result_identifier] def remove(self, record_identifier=None, result_identifier=None): @@ -925,9 +1019,12 @@ def remove(self, record_identifier=None, result_identifier=None): _LOGGER.error(f"Record '{record_identifier}' not found") return False if result_identifier and not self.check_result_exists( - result_identifier, record_identifier): - _LOGGER.error(f"'{result_identifier}' has not been reported for " - f"'{record_identifier}'") + result_identifier, record_identifier + ): + _LOGGER.error( + f"'{result_identifier}' has not been reported for " + f"'{record_identifier}'" + ) return False if self.file: self.data.make_writable() @@ -936,14 +1033,19 @@ def remove(self, record_identifier=None, result_identifier=None): _LOGGER.info(f"Removing '{record_identifier}' record") del self[DATA_KEY][self.namespace][record_identifier] else: - val_backup = \ - self[DATA_KEY][self.namespace][record_identifier][result_identifier] + val_backup = self[DATA_KEY][self.namespace][record_identifier][ + result_identifier + ] del self[DATA_KEY][self.namespace][record_identifier][result_identifier] - _LOGGER.info(f"Removed result '{result_identifier}' for record " - f"'{record_identifier}' from '{self.namespace}' namespace") + _LOGGER.info( + f"Removed result '{result_identifier}' for record " + f"'{record_identifier}' from '{self.namespace}' namespace" + ) if not self[DATA_KEY][self.namespace][record_identifier]: - _LOGGER.info(f"Last result removed for '{record_identifier}'. " - f"Removing the record") + _LOGGER.info( + f"Last result removed for '{record_identifier}'. " + f"Removing the record" + ) del self[DATA_KEY][self.namespace][record_identifier] rm_record = True if self.file: @@ -953,13 +1055,16 @@ def remove(self, record_identifier=None, result_identifier=None): if rm_record: try: with self.db_cursor as cur: - cur.execute(f"DELETE FROM {self.namespace} WHERE " - f"{RECORD_ID}='{record_identifier}'") + cur.execute( + f"DELETE FROM {self.namespace} WHERE " + f"{RECORD_ID}='{record_identifier}'" + ) except Exception as e: - _LOGGER.error(f"Could not remove the result from the " - f"database. Exception: {e}") - self[DATA_KEY][self.namespace].setdefault( - record_identifier, PXAM()) + _LOGGER.error( + f"Could not remove the result from the " + f"database. Exception: {e}" + ) + self[DATA_KEY][self.namespace].setdefault(record_identifier, PXAM()) raise return True try: @@ -969,10 +1074,13 @@ def remove(self, record_identifier=None, result_identifier=None): f"WHERE {RECORD_ID}='{record_identifier}'" ) except Exception as e: - _LOGGER.error(f"Could not remove the result from the database. " - f"Exception: {e}") + _LOGGER.error( + f"Could not remove the result from the database. " f"Exception: {e}" + ) if not self[DB_ONLY_KEY]: - self[DATA_KEY][self.namespace][record_identifier][result_identifier] = val_backup + self[DATA_KEY][self.namespace][record_identifier][ + result_identifier + ] = val_backup raise return True @@ -982,6 +1090,7 @@ def validate_schema(self): :raises SchemaError: if any schema format issue is detected """ + def _recursively_replace_custom_types(s): """ Replace the custom types in pipestat schema with canonical types @@ -990,29 +1099,35 @@ def _recursively_replace_custom_types(s): :return dict: schema with types replaced """ for k, v in s.items(): - assert SCHEMA_TYPE_KEY in v, \ - SchemaError(f"Result '{k}' is missing '{SCHEMA_TYPE_KEY}' key") + assert SCHEMA_TYPE_KEY in v, SchemaError( + f"Result '{k}' is missing '{SCHEMA_TYPE_KEY}' key" + ) if v[SCHEMA_TYPE_KEY] == "object" and SCHEMA_PROP_KEY in s[k]: _recursively_replace_custom_types(s[k][SCHEMA_PROP_KEY]) if v[SCHEMA_TYPE_KEY] in CANONICAL_TYPES.keys(): s.setdefault(k, {}) s[k].setdefault(SCHEMA_PROP_KEY, {}) s[k][SCHEMA_PROP_KEY].update( - CANONICAL_TYPES[v[SCHEMA_TYPE_KEY]][SCHEMA_PROP_KEY]) + CANONICAL_TYPES[v[SCHEMA_TYPE_KEY]][SCHEMA_PROP_KEY] + ) s[k].setdefault("required", []) s[k]["required"].extend( - CANONICAL_TYPES[v[SCHEMA_TYPE_KEY]]["required"]) - s[k][SCHEMA_TYPE_KEY] = \ - CANONICAL_TYPES[v[SCHEMA_TYPE_KEY]][SCHEMA_TYPE_KEY] + CANONICAL_TYPES[v[SCHEMA_TYPE_KEY]]["required"] + ) + s[k][SCHEMA_TYPE_KEY] = CANONICAL_TYPES[v[SCHEMA_TYPE_KEY]][ + SCHEMA_TYPE_KEY + ] return s schema = deepcopy(self.schema) _LOGGER.debug(f"Validating input schema") - assert isinstance(schema, dict), \ - SchemaError(f"The schema has to be a {dict().__class__.__name__}") + assert isinstance(schema, dict), SchemaError( + f"The schema has to be a {dict().__class__.__name__}" + ) for col_name in RESERVED_COLNAMES: - assert col_name not in schema.keys(), \ - PipestatError(f"'{col_name}' is an identifier reserved by pipestat") + assert col_name not in schema.keys(), PipestatError( + f"'{col_name}' is an identifier reserved by pipestat" + ) self[RES_SCHEMAS_KEY] = {} schema = _recursively_replace_custom_types(schema) self[RES_SCHEMAS_KEY] = schema @@ -1029,7 +1144,8 @@ def assert_results_defined(self, results): for r in results: assert r in known_results, SchemaError( f"'{r}' is not a known result. Results defined in the " - f"schema are: {list(known_results)}.") + f"schema are: {list(known_results)}." + ) def check_connection(self): """ @@ -1038,10 +1154,12 @@ def check_connection(self): :return bool: whether the connection has been established """ if self.file is not None: - raise PipestatDatabaseError(f"The {self.__class__.__name__} object " - f"is not backed by a database") + raise PipestatDatabaseError( + f"The {self.__class__.__name__} object " f"is not backed by a database" + ) if DB_CONNECTION_KEY in self and isinstance( - self[DB_CONNECTION_KEY], psycopg2.extensions.connection): + self[DB_CONNECTION_KEY], psycopg2.extensions.connection + ): return True return False @@ -1053,26 +1171,32 @@ def establish_postgres_connection(self, suppress=False): :return bool: whether the connection has been established successfully """ if self.check_connection(): - raise PipestatDatabaseError(f"Connection is already established: " - f"{self[DB_CONNECTION_KEY].info.host}") + raise PipestatDatabaseError( + f"Connection is already established: " + f"{self[DB_CONNECTION_KEY].info.host}" + ) try: self[DB_CONNECTION_KEY] = psycopg2.connect( dbname=self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_NAME_KEY], user=self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_USER_KEY], password=self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_PASSWORD_KEY], host=self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY], - port=self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_PORT_KEY] + port=self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_PORT_KEY], ) except psycopg2.Error as e: - _LOGGER.error(f"Could not connect to: " - f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}") + _LOGGER.error( + f"Could not connect to: " + f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}" + ) _LOGGER.info(f"Caught error: {e}") if suppress: return False raise else: - _LOGGER.debug(f"Established connection with PostgreSQL: " - f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}") + _LOGGER.debug( + f"Established connection with PostgreSQL: " + f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}" + ) return True def close_postgres_connection(self): @@ -1082,11 +1206,14 @@ def close_postgres_connection(self): if not self.check_connection(): raise PipestatDatabaseError( f"The connection has not been established: " - f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}") + f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}" + ) self[DB_CONNECTION_KEY].close() del self[DB_CONNECTION_KEY] - _LOGGER.debug(f"Closed connection with PostgreSQL: " - f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}") + _LOGGER.debug( + f"Closed connection with PostgreSQL: " + f"{self[CONFIG_KEY][CFG_DATABASE_KEY][CFG_HOST_KEY]}" + ) def _strict_record_id(self, forced_value=None): """ @@ -1104,69 +1231,3 @@ def _strict_record_id(self, forced_value=None): f"the action on. Either in the {self.__class__.__name__} " f"constructor or as an argument to the method." ) - - -def main(): - """ Primary workflow """ - from inspect import getdoc - parser = logmuse.add_logging_options( - build_argparser(getdoc(PipestatManager))) - args = parser.parse_args() - if args.command is None: - parser.print_help(sys.stderr) - sys.exit(1) - global _LOGGER - _LOGGER = logmuse.logger_via_cli(args, make_root=True) - _LOGGER.debug("Args namespace:\n{}".format(args)) - if args.config and not args.schema: - parser.error("the following arguments are required: -s/--schema") - psm = PipestatManager( - namespace=args.namespace, - schema_path=args.schema, - results_file_path=args.results_file, - config=args.config, - database_only=args.database_only, - status_schema_path=args.status_schema, - flag_file_dir=args.flag_dir - ) - if args.command == REPORT_CMD: - value = args.value - result_metadata = psm.schema[args.result_identifier] - if result_metadata[SCHEMA_TYPE_KEY] in ["object", "image", "file"] \ - and os.path.exists(expandpath(value)): - from json import load - _LOGGER.info(f"Reading JSON file with object type value: " - f"{expandpath(value)}") - with open(expandpath(value), "r") as json_file: - value = load(json_file) - psm.report( - record_identifier=args.record_identifier, - values={args.result_identifier: value}, - force_overwrite=args.overwrite, - strict_type=not args.try_convert - ) - if args.command == INSPECT_CMD: - print("\n") - print(psm) - if args.data and not args.database_only: - print("\nData:") - print(psm.data) - if args.command == REMOVE_CMD: - psm.remove( - result_identifier=args.result_identifier, - record_identifier=args.record_identifier - ) - if args.command == RETRIEVE_CMD: - print(psm.retrieve( - result_identifier=args.result_identifier, - record_identifier=args.record_identifier - )) - if args.command == STATUS_CMD: - if args.subcommand == STATUS_GET_CMD: - print(psm.get_status(record_identifier=args.record_identifier)) - if args.subcommand == STATUS_SET_CMD: - psm.set_status( - status_identifier=args.status_identifier, - record_identifier=args.record_identifier - ) - sys.exit(0) diff --git a/setup.py b/setup.py index 0080e5ac..d7e92fa8 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,10 @@ #! /usr/bin/env python import os -from setuptools import setup import sys +from setuptools import setup + PACKAGE = "pipestat" # Additional keyword arguments for setup(). diff --git a/tests/conftest.py b/tests/conftest.py index d0c2a97d..21591e74 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os + import pytest @@ -35,3 +36,8 @@ def recursive_schema_file_path(data_path): @pytest.fixture def config_file_path(data_path): return os.path.join(data_path, "config.yaml") + + +@pytest.fixture +def config_no_schema_file_path(data_path): + return os.path.join(data_path, "config_no_schema.yaml") diff --git a/tests/data/config_no_schema.yaml b/tests/data/config_no_schema.yaml new file mode 100644 index 00000000..a82d0c92 --- /dev/null +++ b/tests/data/config_no_schema.yaml @@ -0,0 +1,10 @@ +namespace: test +record_identifier: sample1 +#schema_path: sample_output_schema.yaml #$HOME/Desktop/sample_output_schema.yaml +database: + name: pipestat-test + user: postgres + password: pipestat-password + host: localhost + port: 5432 +#results_file_path: results_file.yaml #$HOME/Desktop/results.yaml diff --git a/tests/test_pipestat.py b/tests/test_pipestat.py index 1df7dd54..2a2907f5 100644 --- a/tests/test_pipestat.py +++ b/tests/test_pipestat.py @@ -1,15 +1,16 @@ -import pytest -from _pytest.monkeypatch import monkeypatch import os -from tempfile import mkdtemp -from yaml import dump from collections import Mapping +from tempfile import mkdtemp + +import pytest +from _pytest.monkeypatch import monkeypatch from jsonschema import ValidationError from psycopg2 import Error as psycopg2Error +from yaml import dump -from pipestat.exceptions import * -from pipestat.const import * from pipestat import PipestatManager +from pipestat.const import * +from pipestat.exceptions import * from pipestat.helpers import read_yaml_data @@ -108,6 +109,7 @@ def test_obj_creation_db(self, config_file_path): """ Object constructor works with database as backend""" assert isinstance(PipestatManager(config=config_file_path), PipestatManager) + @pytest.mark.xfail(reason="schema is no longer required to init the object") def test_schema_req(self, results_file_path): """ Object constructor raises exception if schema is not provided @@ -244,6 +246,45 @@ def test_report_basic( if backend == "file": is_in_file(results_file_path, str(list(val.values())[0])) + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ("sample3", {"name_of_something": "test_name"}), + ], + ) + @pytest.mark.parametrize("backend", ["file", "db"]) + def test_report_requires_schema( + self, + rec_id, + val, + config_no_schema_file_path, + results_file_path, + backend, + ): + """ + If schema is not provided at object instantiation stage, SchemaNotFondError + is raised if report method is called with file as a backend. + + In case of the DB as a backend, the error is raised at object + instantiation stage since there is no way to init relational DB table + with no columns predefined + """ + args = dict(namespace="test") + backend_data = ( + {"config": config_no_schema_file_path} + if backend == "db" + else {"results_file_path": results_file_path} + ) + args.update(backend_data) + if backend == "db": + with pytest.raises(SchemaNotFoundError): + psm = PipestatManager(**args) + else: + psm = PipestatManager(**args) + if backend == "file": + with pytest.raises(SchemaNotFoundError): + psm.report(record_identifier=rec_id, values=val) + @pytest.mark.parametrize( ["rec_id", "val"], [("sample1", {"number_of_things": 2}), ("sample2", {"number_of_things": 1})],