diff --git a/.github/workflows/cli-coverage.yml b/.github/workflows/cli-coverage.yml new file mode 100644 index 00000000..bc2c64cc --- /dev/null +++ b/.github/workflows/cli-coverage.yml @@ -0,0 +1,61 @@ +name: Pipestat Test Coverage + +on: + push: + branches: [master, dev] + pull_request: + branches: [ dev ] + +jobs: + cli-coverage-report: + strategy: + matrix: + python-version: ["3.10"] + os: [ ubuntu-latest ] # can't use macOS when using service containers or container jobs + runs-on: ${{ matrix.os }} + services: + postgres: + image: postgres + env: # needs to match DB config in: ../../tests/data/config.yaml + POSTGRES_USER: postgres + POSTGRES_PASSWORD: pipestat-password + POSTGRES_DB: pipestat-test + POSTGRES_HOST: localhost + ports: + - 5432:5432 + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + + steps: + - uses: actions/checkout@v2 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dev dependencies + run: if [ -f requirements/requirements-dev.txt ]; then pip install -r requirements/requirements-dev.txt; fi + + - name: Install test dependencies + run: if [ -f requirements/requirements-test.txt ]; then pip install -r requirements/requirements-test.txt; fi + + - name: Install backend dependencies + run: if [ -f requirements/requirements-db-backend.txt ]; then pip install -r requirements/requirements-db-backend.txt; fi + + - name: Install pipestat + run: python -m pip install . + + - name: Run tests + run: coverage run -m pytest + + - name: build coverage + run: coverage html -i + + - run: smokeshow upload htmlcov + env: + SMOKESHOW_GITHUB_STATUS_DESCRIPTION: Coverage {coverage-percentage} + #SMOKESHOW_GITHUB_COVERAGE_THRESHOLD: 50 + SMOKESHOW_GITHUB_CONTEXT: coverage + SMOKESHOW_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SMOKESHOW_GITHUB_PR_HEAD_SHA: ${{ github.event.pull_request.head.sha }} + SMOKESHOW_AUTH_KEY: ${{ secrets.SMOKESHOW_AUTH_KEY }} diff --git a/.github/workflows/run-pytest.yml b/.github/workflows/run-pytest.yml index 63152ac5..c81fae1e 100644 --- a/.github/workflows/run-pytest.yml +++ b/.github/workflows/run-pytest.yml @@ -39,14 +39,17 @@ jobs: - name: Install test dependencies run: if [ -f requirements/requirements-test.txt ]; then pip install -r requirements/requirements-test.txt; fi + - name: Install backend dependencies + run: if [ -f requirements/requirements-db-backend.txt ]; then pip install -r requirements/requirements-db-backend.txt; fi + - name: Install pipestat run: python -m pip install . - name: Run pytest tests run: pytest tests -x -vv --cov=./ --cov-report=xml - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v1 - with: - file: ./coverage.xml - name: py-${{ matrix.python-version }}-${{ matrix.os }} +# - name: Upload coverage to Codecov +# uses: codecov/codecov-action@v1 +# with: +# file: ./coverage.xml +# name: py-${{ matrix.python-version }}-${{ matrix.os }} diff --git a/docs/changelog.md b/docs/changelog.md index f84678af..60c6bdb6 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format. +## [0.9.0] - 2024-04-19 +### Fixed +- Bug with rm_record for filebackend +- Bug when using record_identifier via env variable and the CLI +### Added +- Added results history and history retrieval for both file and db backends via `retrieve_history` [#177](https://github.com/pepkit/pipestat/issues/177). +- Added `remove_record` to Pipestat manager object (it is no longer only on backend classes) +- Added `meta` key to each record for the file backend +- db backend will now create an additional sql history table +- Reporting history is toggleable +### Changed +- Removing the last result no longer removes the entire record. +- `pipestat_created_time, and `pipestat_modified_time` now live under the `meta` key. +- `history` lives under the `meta` key for the filebackend. + ## [0.8.2] - 2024-02-22 ### Changed - Changed yacman requirement and using FutureYamlConfigManager. diff --git a/pipestat/_version.py b/pipestat/_version.py index deded324..3e2f46a3 100644 --- a/pipestat/_version.py +++ b/pipestat/_version.py @@ -1 +1 @@ -__version__ = "0.8.2" +__version__ = "0.9.0" diff --git a/pipestat/argparser.py b/pipestat/argparser.py index 8072f8d6..f3793056 100644 --- a/pipestat/argparser.py +++ b/pipestat/argparser.py @@ -11,6 +11,7 @@ INSPECT_CMD = "inspect" REMOVE_CMD = "remove" RETRIEVE_CMD = "retrieve" +HISTORY_CMD = "history" STATUS_CMD = "status" INIT_CMD = "init" SUMMARIZE_CMD = "summarize" @@ -26,6 +27,7 @@ SUMMARIZE_CMD: "Generates HTML Report", LINK_CMD: "Create symlinks of reported files", SERVE_CMD: "Initializes pipestatreader API", + HISTORY_CMD: "Retrieve history of reported results for one record identifier", } STATUS_GET_CMD = "get" @@ -169,7 +171,7 @@ def add_subparser( ) # remove, report and inspect - for cmd in [REMOVE_CMD, REPORT_CMD, INSPECT_CMD, RETRIEVE_CMD]: + for cmd in [REMOVE_CMD, REPORT_CMD, INSPECT_CMD, RETRIEVE_CMD, HISTORY_CMD]: sps[cmd].add_argument( "-f", "--results-file", @@ -239,13 +241,21 @@ def add_subparser( help=f"ID of the record to report the result for. {_env_txt('record_identifier')}", ) - sps[RETRIEVE_CMD].add_argument( - "-r", - "--record-identifier", - type=str, - metavar="R", - help=f"ID of the record to report the result for. {_env_txt('record_identifier')}", - ) + for cmd in [RETRIEVE_CMD, HISTORY_CMD]: + sps[cmd].add_argument( + "-i", + "--result-identifier", + type=str, + metavar="I", + help="ID of the result to report; needs to be defined in the schema", + ) + sps[cmd].add_argument( + "-r", + "--record-identifier", + type=str, + metavar="R", + help=f"ID of the record to report the result for. {_env_txt('record_identifier')}", + ) # report sps[REPORT_CMD].add_argument( diff --git a/pipestat/backends/abstract.py b/pipestat/backends/abstract.py index 7871a560..48db5426 100644 --- a/pipestat/backends/abstract.py +++ b/pipestat/backends/abstract.py @@ -16,7 +16,7 @@ class PipestatBackend(ABC): """Abstract class representing a pipestat backend""" def __init__(self, pipeline_type): - _LOGGER.warning("Initialize PipestatBackend") + _LOGGER.debug("Initialize PipestatBackend") self.pipeline_type = pipeline_type def assert_results_defined(self, results: List[str], pipeline_type: str) -> None: @@ -153,6 +153,7 @@ def report( record_identifier: str, force_overwrite: bool = False, result_formatter: Optional[staticmethod] = None, + history_enabled: bool = True, ) -> str: _LOGGER.warning("Not implemented yet for this backend") diff --git a/pipestat/backends/db_backend/db_parsed_schema.py b/pipestat/backends/db_backend/db_parsed_schema.py index 891b7492..b909b5d6 100644 --- a/pipestat/backends/db_backend/db_parsed_schema.py +++ b/pipestat/backends/db_backend/db_parsed_schema.py @@ -196,6 +196,49 @@ def _get_data_type(type_name): def file_like_table_name(self): return self._table_name("files") + def build_history_model(self, pipeline_type): + """Creates model for history ORM + :param str pipeline_type: project or sample-level pipeline + :return model: (model, table_name) + """ + if pipeline_type == "project": + history_table_name = self.project_table_name + "_history" + data = self.project_level_data + main_table_id = self.project_table_name + ".id" + + elif pipeline_type == "sample": + history_table_name = self.sample_table_name + "_history" + data = self.sample_level_data + main_table_id = self.sample_table_name + ".id" + + else: + raise PipestatError( + f"Building model requires pipeline type. Provided type: '{pipeline_type}' " + ) + + if not self.sample_level_data and not self.project_level_data: + return None + + field_defs = self._make_field_definitions(data, require_type=True) + field_defs = self._add_status_field(field_defs) + field_defs = self._add_record_identifier_field(field_defs) + field_defs = self._add_id_field(field_defs) + field_defs = self._add_pipeline_name_field(field_defs) + field_defs = self._add_created_time_field(field_defs) + field_defs = self._add_modified_time_field(field_defs) + + field_defs["source_record_id"] = ( + int, + Field( + default=None, + foreign_key=main_table_id, + ), + ) + + history_model = _create_model(history_table_name, **field_defs) + + return history_model, history_table_name + def build_model(self, pipeline_type): if pipeline_type == "project": data = self.project_level_data diff --git a/pipestat/backends/db_backend/dbbackend.py b/pipestat/backends/db_backend/dbbackend.py index 9d9d5211..1ccd1130 100644 --- a/pipestat/backends/db_backend/dbbackend.py +++ b/pipestat/backends/db_backend/dbbackend.py @@ -1,3 +1,4 @@ +import copy import datetime from logging import getLogger from contextlib import contextmanager @@ -62,6 +63,8 @@ def __init__( self.result_formatter = result_formatter self.orms = self._create_orms(pipeline_type=pipeline_type) + self.history_table = self._create_history_orms(pipeline_type=pipeline_type) + self.table_name = list(self.orms.keys())[0] SQLModel.metadata.create_all(self._engine) @@ -274,9 +277,28 @@ def remove_record( if rm_record: try: ORMClass = self.get_model(table_name=self.table_name) + ORMClass_History = self.history_table[list(self.history_table.keys())[0]] if self.check_record_exists( record_identifier=record_identifier, ): + with self.session as s: + source_record_id = ( + s.exec( + sql_select(ORMClass).where( + getattr(ORMClass, RECORD_IDENTIFIER) == record_identifier + ) + ) + .first() + .id + ) + linked_records = s.exec( + sql_select(ORMClass_History).where( + getattr(ORMClass_History, "source_record_id") == source_record_id + ) + ).all() + for r in linked_records: + s.delete(r) + s.commit() with self.session as s: record = s.exec( sql_select(ORMClass).where( @@ -284,7 +306,6 @@ def remove_record( ) ).first() s.delete(record) - s.commit() else: raise RecordNotFoundError(f"Record '{record_identifier}' not found") @@ -298,8 +319,9 @@ def report( self, values: Dict[str, Any], record_identifier: str, - force_overwrite: bool = False, + force_overwrite: bool = True, result_formatter: Optional[staticmethod] = None, + history_enabled: bool = True, ) -> Union[List[str], bool]: """ Update the value of a result in a current namespace. @@ -337,6 +359,7 @@ def report( try: ORMClass = self.get_model(table_name=self.table_name) + ORMClass_History = self.history_table[list(self.history_table.keys())[0]] values.update({RECORD_IDENTIFIER: record_identifier}) if not self.check_record_exists( @@ -356,11 +379,24 @@ def report( getattr(ORMClass, RECORD_IDENTIFIER) == record_identifier ) ).first() - + old_record_attributes = record_to_update.model_dump() values.update({MODIFIED_TIME: datetime.datetime.now()}) for result_id, result_value in values.items(): setattr(record_to_update, result_id, result_value) s.commit() + if history_enabled: + if "id" in old_record_attributes: + del old_record_attributes["id"] + with self.session as s: + source_record = s.exec( + sql_select(ORMClass).where( + getattr(ORMClass, RECORD_IDENTIFIER) == record_identifier + ) + ).first() + new_record_history = ORMClass_History(**old_record_attributes) + new_record_history.source_record_id = source_record.id + s.add(new_record_history) + s.commit() for res_id, val in values.items(): results_formatted.append( @@ -411,9 +447,10 @@ def select_records( total_count = len(s.exec(sql_select(ORM)).all()) if columns is not None: + columns = copy.deepcopy(columns) for i in ["id", "record_identifier"]: # Must add id, need it for cursor if i not in columns: - columns = [i] + columns + columns.insert(0, i) try: statement = sql_select(*[getattr(ORM, column) for column in columns]).order_by( ORM.id @@ -467,6 +504,105 @@ def select_records( return records_dict + def retrieve_history_db( + self, + record_identifier: str, + result_identifier: Optional[Union[str, List[str]]] = None, + ) -> Dict[str, Any]: + """ + + :param record_identifier: single record_identifier + :param result_identifier: single or list of result identifiers + :return: dict records_dict = { + "history": List[Dict[{key, Any}]], + } + """ + + record_identifier = record_identifier or self.record_identifier + + ORMClass = self.get_model(table_name=self.table_name) + ORMClass_History = self.history_table[list(self.history_table.keys())[0]] + + if not result_identifier: + columns = None + else: + if isinstance(result_identifier, str): + columns = [result_identifier] + elif isinstance(result_identifier, list): + columns = copy.deepcopy(result_identifier) + else: + raise ValueError("Result identifier must be a str or list[str]") + for i in ["id", MODIFIED_TIME]: + if i not in columns: + columns.insert(0, i) + + if not self.check_record_exists( + record_identifier=record_identifier, + ): + raise RecordNotFoundError(f"{record_identifier} does not exist.") + else: + with self.session as s: + source_record_id = ( + s.exec( + sql_select(ORMClass).where( + getattr(ORMClass, RECORD_IDENTIFIER) == record_identifier + ) + ) + .first() + .id + ) + if columns is not None: + try: + statement = sql_select( + *[getattr(ORMClass_History, column) for column in columns] + ).order_by(ORMClass_History.id) + except AttributeError: + raise ColumnNotFoundError( + msg=f"One of the supplied columns does not exist in current table: {columns}" + ) + else: + statement = sql_select(ORMClass_History).order_by(ORMClass_History.id) + + statement = statement.where( + getattr(ORMClass_History, "source_record_id") == source_record_id + ) + + history_records = s.exec(statement).all() + + end_results = [] + + # SQL model returns either a SQLModelMetaCLass OR a sqlalchemy Row. + # We must create a dictionary containing the record before returning + + if not columns: + end_results = [r.model_dump() for r in history_records] + + else: + for record in history_records: + record_dict = dict(record._mapping) + end_results.append(record_dict) + + # This next step is to process the results such that they will match output similar to the filebackend + + collected_keys = [] + new_history_dict = {} + for result in end_results: + for key, value in result.items(): + if key == MODIFIED_TIME: + continue + elif value: + if key not in new_history_dict: + collected_keys.append(key) + new_history_dict[key] = {result[MODIFIED_TIME]: value} + else: + new_history_dict[key].update({result[MODIFIED_TIME]: value}) + + records_dict = { + "history": new_history_dict, + } + + return records_dict + def select_distinct( self, columns: Union[str, List[str]], @@ -527,7 +663,10 @@ def set_status( _LOGGER.debug(f"Changed status from '{prev_status}' to '{status_identifier}'") def _create_orms(self, pipeline_type): - """Create ORMs.""" + """Create ORMs. + :param str pipeline_type: project or sample-level pipeline + :return dict: {table_name: model} + """ _LOGGER.debug(f"Creating models for '{self.pipeline_name}' table in '{PKG_NAME}' database") model = self.parsed_schema.build_model(pipeline_type=pipeline_type) table_name = self.parsed_schema._table_name(pipeline_type) @@ -539,6 +678,19 @@ def _create_orms(self, pipeline_type): f"Neither project nor samples model could be built from schema source: {self.status_schema_source}" ) + def _create_history_orms(self, pipeline_type): + """Creates the additional ORMs for auditing result modifications + :param str pipeline_type: project or sample-level pipeline + :return dict: {table_name: model} + """ + model, table_name = self.parsed_schema.build_history_model(pipeline_type=pipeline_type) + if model: + return {table_name: model} + else: + raise SchemaError( + f"Neither project nor samples model could be built from schema source: {self.status_schema_source}" + ) + @property def _engine(self): """Access the database engine backing this manager.""" diff --git a/pipestat/backends/file_backend/filebackend.py b/pipestat/backends/file_backend/filebackend.py index 125a48cb..62ba6601 100644 --- a/pipestat/backends/file_backend/filebackend.py +++ b/pipestat/backends/file_backend/filebackend.py @@ -18,7 +18,7 @@ from ...exceptions import UnrecognizedStatusError, PipestatError from ...backends.abstract import PipestatBackend -from ...const import DATE_FORMAT, PKG_NAME, CREATED_TIME, MODIFIED_TIME +from ...const import DATE_FORMAT, PKG_NAME, CREATED_TIME, MODIFIED_TIME, META_KEY, HISTORY_KEY _LOGGER = getLogger(PKG_NAME) @@ -253,11 +253,18 @@ def remove( return False if rm_record: + # NOTE: THIS CURRENTLY REMOVES ALL HISTORY OF THE RECORD AS WELL self.remove_record( record_identifier=record_identifier, rm_record=rm_record, ) else: + self._modify_history( + data=self._data[self.pipeline_name][self.pipeline_type][record_identifier], + res_id=result_identifier, + time=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + value="", + ) del self._data[self.pipeline_name][self.pipeline_type][record_identifier][ result_identifier ] @@ -278,19 +285,9 @@ def remove( remaining_attributes = list( self._data[self.pipeline_name][self.pipeline_type][record_identifier].keys() ) - if ( - len(remaining_attributes) == 2 - and CREATED_TIME in remaining_attributes - and MODIFIED_TIME in remaining_attributes - ): - _LOGGER.info( - f"Last result removed for '{record_identifier}'. " f"Removing the record" - ) - rm_record = True - self.remove_record( - record_identifier=record_identifier, - rm_record=rm_record, - ) + if len(remaining_attributes) == 1 and META_KEY in remaining_attributes: + _LOGGER.info(f"Last result removed for '{record_identifier}'.") + with write_lock(self._data) as locked_data: locked_data.write() return True @@ -311,8 +308,8 @@ def remove_record( try: _LOGGER.info(f"Removing '{record_identifier}' record") del self._data[self.pipeline_name][self.pipeline_type][record_identifier] - with self._data as locked_data: - locked_data.write() + with write_lock(self._data) as data_locked: + data_locked.write() return True except: _LOGGER.warning( @@ -320,14 +317,17 @@ def remove_record( ) return False else: - _LOGGER.info(f" rm_record flag False, aborting Removing '{record_identifier}' record") + _LOGGER.info( + f" rm_record flag is set to False, aborting Removing '{record_identifier}' record" + ) def report( self, values: Dict[str, Any], record_identifier: Optional[str] = None, - force_overwrite: bool = False, + force_overwrite: bool = True, result_formatter: Optional[staticmethod] = None, + history_enabled: bool = True, ) -> Union[List[str], bool]: """ Update the value of a result in a current namespace. @@ -355,27 +355,57 @@ def report( self.assert_results_defined( results=result_identifiers, pipeline_type=self.pipeline_type ) + existing = self.list_results( record_identifier=record_identifier, restrict_to=result_identifiers, ) + if existing: existing_str = ", ".join(existing) _LOGGER.warning(f"These results exist for '{record_identifier}': {existing_str}") if not force_overwrite: return False _LOGGER.info(f"Overwriting existing results: {existing_str}") - values.update({MODIFIED_TIME: current_time}) + self._data[self.pipeline_name][self.pipeline_type][record_identifier][META_KEY].update( + {MODIFIED_TIME: current_time} + ) if not existing: if record_identifier in self._data[self.pipeline_name][self.pipeline_type].keys(): - values.update({MODIFIED_TIME: current_time}) - else: - values.update({CREATED_TIME: current_time}) - values.update({MODIFIED_TIME: current_time}) + self._data[self.pipeline_name][self.pipeline_type][record_identifier].setdefault( + META_KEY, {} + ) + self._data[self.pipeline_name][self.pipeline_type][record_identifier][ + META_KEY + ].update({MODIFIED_TIME: current_time}) - self._data[self.pipeline_name][self.pipeline_type].setdefault(record_identifier, {}) + else: + self._data[self.pipeline_name][self.pipeline_type].setdefault( + record_identifier, {} + ) + self._data[self.pipeline_name][self.pipeline_type][record_identifier].setdefault( + META_KEY, {} + ) + self._data[self.pipeline_name][self.pipeline_type][record_identifier][ + META_KEY + ].update({MODIFIED_TIME: current_time}) + self._data[self.pipeline_name][self.pipeline_type][record_identifier][ + META_KEY + ].update({CREATED_TIME: current_time}) for res_id, val in values.items(): + if history_enabled: + if existing: + self._modify_history( + data=self._data[self.pipeline_name][self.pipeline_type][record_identifier][ + META_KEY + ], + res_id=res_id, + time=current_time, + value=self._data[self.pipeline_name][self.pipeline_type][ + record_identifier + ][res_id], + ) self._data[self.pipeline_name][self.pipeline_type][record_identifier][res_id] = val results_formatted.append( result_formatter( @@ -429,6 +459,7 @@ def select_records( limit: Optional[int] = 1000, cursor: Optional[int] = None, bool_operator: Optional[str] = "AND", + meta_data_bool: Optional[bool] = False, ) -> Dict[str, Any]: """ Select records from the FileBackend @@ -442,6 +473,7 @@ def select_records( :param int limit: maximum number of results to retrieve per page :param int cursor: cursor position to begin retrieving records :param bool bool_operator: Perform filtering with AND or OR Logic. + :param bool meta_data: Should this return associated meta data with records? :return dict records_dict = { "total_size": int, "page_size": int, @@ -519,7 +551,7 @@ def get_nested_column(result_value: dict, key_list: list, retrieved_operator: Ca if filter_condition["key"] != "record_identifier": for key, value in data[record_identifier].items(): result = False - if isinstance(value, dict): + if isinstance(value, dict) and key != "meta": if key == filter_condition["key"][0]: result = get_nested_column( value, @@ -527,12 +559,17 @@ def get_nested_column(result_value: dict, key_list: list, retrieved_operator: Ca retrieved_operator, ) else: - if filter_condition["key"] == key: + if key == "meta": # Filter datetime objects - if key in CREATED_TIME or key in MODIFIED_TIME: + if ( + filter_condition["key"] == CREATED_TIME + or filter_condition["key"] == MODIFIED_TIME + ): try: time_stamp = datetime.datetime.strptime( - data[record_identifier][key], + data[record_identifier][META_KEY][ + filter_condition["key"] + ], DATE_FORMAT, ) result = retrieved_operator( @@ -540,10 +577,8 @@ def get_nested_column(result_value: dict, key_list: list, retrieved_operator: Ca ) except TypeError: result = False - else: - result = retrieved_operator( - value, filter_condition["value"] - ) + elif filter_condition["key"] == key: + result = retrieved_operator(value, filter_condition["value"]) if result: retrieved_results.append(record_identifier) @@ -589,6 +624,8 @@ def get_nested_column(result_value: dict, key_list: list, retrieved_operator: Ca if record != {}: record.update({"record_identifier": record_identifier}) records_list.append(record) + if "meta" in record and not meta_data_bool: + del record["meta"] records_dict = { "total_size": total_count, @@ -738,3 +775,16 @@ def _load_results_file(self) -> None: f"{num_namespaces} other namespaces are already in the file: [{', '.join(namespaces_reported)}]. " f"Pipestat will not report multiple namespaces to one file unless `multi_pipelines` is True." ) + + def _modify_history(self, data, res_id, time, value): + """Modify File backend with each change + + data is the loaded yaml results file in dict format + type = "report", "deletion" + """ + if "history" not in data: + data.setdefault(HISTORY_KEY, {}) + if res_id not in data[HISTORY_KEY]: + data[HISTORY_KEY].setdefault(res_id, {}) + + data[HISTORY_KEY][res_id].update({time: value}) diff --git a/pipestat/cli.py b/pipestat/cli.py index a5f87721..5d13f8ac 100644 --- a/pipestat/cli.py +++ b/pipestat/cli.py @@ -18,6 +18,7 @@ SUMMARIZE_CMD, SERVE_CMD, LINK_CMD, + HISTORY_CMD, ) from .const import ( SCHEMA_KEY, @@ -121,7 +122,7 @@ def call_reader(): types_to_read_from_json = ["object"] + list(CANONICAL_TYPES.keys()) # The next few commands require a record_identifier. Need to also check ENV variables for its existence. - if args.record_identifier is None: + if not hasattr(args, "record_identifier") or getattr(args, "record_identifier", None) is None: args.record_identifier = os.getenv("PIPESTAT_RECORD_IDENTIFIER") if args.command == REPORT_CMD: @@ -174,5 +175,12 @@ def call_reader(): status_identifier=args.status_identifier, record_identifier=args.record_identifier, ) + if args.command == HISTORY_CMD: + print(f"\nHistory for Record: {args.record_identifier}") + print( + psm.retrieve_history( + record_identifier=args.record_identifier, result_identifier=args.result_identifier + ) + ) sys.exit(0) diff --git a/pipestat/const.py b/pipestat/const.py index 1d277912..cc2804cb 100644 --- a/pipestat/const.py +++ b/pipestat/const.py @@ -63,6 +63,9 @@ "RECORD_IDENTIFIER", "CREATED_TIME", "MODIFIED_TIME", + "DATE_FORMAT", + "META_KEY", + "HISTORY_KEY", ] PKG_NAME = "pipestat" @@ -84,6 +87,9 @@ CREATED_TIME = "pipestat_created_time" MODIFIED_TIME = "pipestat_modified_time" +META_KEY = "meta" +HISTORY_KEY = "history" + CANONICAL_TYPES = { "image": { "type": "object", diff --git a/pipestat/exceptions.py b/pipestat/exceptions.py index fb5d6e20..3bc00c1e 100644 --- a/pipestat/exceptions.py +++ b/pipestat/exceptions.py @@ -68,6 +68,16 @@ def __init__(self, msg, cli=False): super(SchemaNotFoundError, self).__init__(txt) +class SchemaValidationErrorDuringReport(SchemaError): + """Adds clarity to JSON schema validation errors by providing additional information to error message.""" + + def __init__(self, msg, record_identifier, result_identifier, result): + + txt = msg # original schema validation error + txt += f"\nRecord identifier {record_identifier} \nResult_identifier {result_identifier} \nReported result: {result}" + super(SchemaValidationErrorDuringReport, self).__init__(txt) + + class MissingConfigDataError(PipestatError): """Exception for invalid config file.""" diff --git a/pipestat/helpers.py b/pipestat/helpers.py index 075e1bbc..9f99b604 100644 --- a/pipestat/helpers.py +++ b/pipestat/helpers.py @@ -11,10 +11,8 @@ from shutil import make_archive from typing import Any, Dict, Optional, Tuple, Union, List -from oyaml import safe_load, dump -from ubiquerg import expandpath - -from zipfile import ZipFile, ZIP_DEFLATED +from yaml import dump +from .exceptions import SchemaValidationErrorDuringReport from .const import ( PIPESTAT_GENERIC_CONFIG, @@ -26,7 +24,7 @@ _LOGGER = logging.getLogger(__name__) -def validate_type(value, schema, strict_type=False): +def validate_type(value, schema, strict_type=False, record_identifier=None): """ Validate reported result against a partial schema, in case of failure try to cast the value into the required class. @@ -37,13 +35,19 @@ def validate_type(value, schema, strict_type=False): :param dict schema: partial jsonschema schema to validate against, e.g. {"type": "integer"} :param bool strict_type: whether the value should validate as is + :param str record_identifier: used for clarifying error messages """ try: jsonschema.validate(value, schema) except jsonschema.exceptions.ValidationError as e: if strict_type: - raise + raise SchemaValidationErrorDuringReport( + msg=str(e), + record_identifier=record_identifier, + result_identifier=schema, + result=value, + ) _LOGGER.debug(f"{str(e)}") if schema[SCHEMA_TYPE_KEY] != "object": value = CLASSES_BY_TYPE[schema[SCHEMA_TYPE_KEY]](value) @@ -63,27 +67,6 @@ def validate_type(value, schema, strict_type=False): _LOGGER.debug(f"Value '{value}' validated successfully against a schema") -def read_yaml_data(path: Union[str, Path], what: str) -> Tuple[str, Dict[str, Any]]: - """ - Safely read YAML file and log message - - :param str path: YAML file to read - :param str what: context - :return (str, dict): absolute path to the read file and the read data - """ - if isinstance(path, Path): - test = lambda p: p.is_file() - elif isinstance(path, str): - path = expandpath(path) - test = os.path.isfile - else: - raise TypeError(f"Alleged path to YAML file to read is neither path nor string: {path}") - assert test(path), FileNotFoundError(f"File not found: {path}") - _LOGGER.debug(f"Reading {what} from '{path}'") - with open(path, "r") as f: - return path, safe_load(f) - - def mk_list_of_str(x): """ Make sure the input is a list of strings @@ -100,41 +83,14 @@ def mk_list_of_str(x): ) -def mk_abs_via_cfg( - path: Optional[str], - cfg_path: Optional[str], -) -> Optional[str]: - """ - Helper function to ensure a path is absolute. - - Assumes a relative path is relative to cfg_path, or to current working directory if cfg_path is None. +def make_subdirectories(path): + """Takes an absolute file path and creates subdirectories to file if they do not exist""" - : param str path: The path to make absolute. - : param str cfg_path: Relative paths will be relative the containing folder of this pat - """ - if path is None: - return path - assert isinstance(path, str), TypeError("Path is expected to be a str") - if os.path.isabs(path): - return path - if cfg_path is None: - rel_to_cwd = os.path.join(os.getcwd(), path) + if path: try: - os.makedirs(os.path.dirname(rel_to_cwd)) + os.makedirs(os.path.dirname(path)) except FileExistsError: pass - if os.path.exists(rel_to_cwd) or os.access(os.path.dirname(rel_to_cwd), os.W_OK): - return rel_to_cwd - else: - raise OSError(f"File not found: {path}") - joined = os.path.join(os.path.dirname(cfg_path), path) - try: - os.makedirs(os.path.dirname(joined)) - except FileExistsError: - pass - if os.path.isabs(joined): - return joined - raise OSError(f"Could not make this path absolute: {path}") def init_generic_config(): diff --git a/pipestat/parsed_schema.py b/pipestat/parsed_schema.py index 3ed58afe..022fb231 100644 --- a/pipestat/parsed_schema.py +++ b/pipestat/parsed_schema.py @@ -4,7 +4,7 @@ import logging from pathlib import Path from typing import Any, Dict, List, Mapping, Optional, Union - +import yacman from .const import ( CANONICAL_TYPES, CLASSES_BY_TYPE, @@ -15,7 +15,6 @@ SCHEMA_TYPE_KEY, ) from .exceptions import SchemaError -from .helpers import read_yaml_data _LOGGER = logging.getLogger(__name__) @@ -76,7 +75,7 @@ class ParsedSchema(object): def __init__(self, data: Union[Dict[str, Any], Path, str]) -> None: # initial validation and parse if not isinstance(data, dict): - _, data = read_yaml_data(data, "schema") + data = yacman.load_yaml(data) # Keep a copy of the original schema self.original_schema = copy.deepcopy(data) @@ -169,20 +168,35 @@ def __str__(self): :return str: string representation of the object """ res = f"{self.__class__.__name__} ({self._pipeline_name})" + + def add_props(props): + res = "" + if len(props) == 0: + res += "\n - None" + else: + for k, v in props: + res += f"\n - {k} : {v}" + return res + if self._project_level_data is not None: - res += "\n Project Level Data:" - for k, v in self._project_level_data.items(): - res += f"\n - {k} : {v}" + res += "\n Project-level properties:" + res += add_props(self._project_level_data.items()) if self._sample_level_data is not None: - res += "\n Sample Level Data:" - for k, v in self._sample_level_data.items(): - res += f"\n - {k} : {v}" + res += "\n Sample-level properties:" + res += add_props(self._sample_level_data.items()) if self._status_data is not None: - res += "\n Status Data:" - for k, v in self._status_data.items(): - res += f"\n - {k} : {v}" + res += "\n Status properties:" + res += add_props(self._status_data.items()) return res + def __repr__(self): + """ + Generate string representation of the object. + + :return str: string representation of the object + """ + return self.__str__() + @property def pipeline_name(self): """Return the declared name for the pipeline for which this schema's written.""" @@ -273,35 +287,38 @@ def _recursively_replace_custom_types(s: Dict[str, Any]) -> Dict[str, Any]: return s -def replace_JSON_refs(s: Dict[str, Any], data: Dict[str, Any]) -> Dict[str, Any]: +def replace_JSON_refs( + target_schema: Dict[str, Any], source_schema: Dict[str, Any] +) -> Dict[str, Any]: """ - Recursively search and replace the $refs if they exist in schema, s, and if their corresponding $defs exist in - source schema, data + Recursively search and replace the $refs if they exist in schema, target_schema, and if their corresponding $defs + exist in source schema, source_schema. If $defs exist in the target schema and target_schema is the same as + source_schema then deepcopy should be used such that target_schema = copy.deepcopy(source_schema) - :param dict s: schema to replace types in - :param dict data: source schema - :return dict s: schema with types replaced + :param dict target_schema: schema to replace types in + :param dict source_schema: source schema + :return dict target_schema: schema with types replaced """ - for k, v in list(s.items()): + for k, v in list(target_schema.items()): if isinstance(v, dict): - replace_JSON_refs(s[k], data) + replace_JSON_refs(target_schema[k], source_schema) if "$ref" == k: split_value = v.split("/") if len(split_value) != 3: raise SchemaError( msg=f"$ref exists in source schema but path,{v} ,not valid, e.g. '#/$defs/file' " ) - if split_value[1] in data and split_value[2] in data[split_value[1]]: - result = data[split_value[1]][split_value[2]] + if split_value[1] in source_schema and split_value[2] in source_schema[split_value[1]]: + result = source_schema[split_value[1]][split_value[2]] else: result = None if result is not None: for key, value in result.items(): - s.update({key: value}) - del s["$ref"] + target_schema.update({key: value}) + del target_schema["$ref"] else: raise SchemaError( msg=f"Could not find {split_value[1]} and {split_value[2]} in $def" ) - return s + return target_schema diff --git a/pipestat/pipestat.py b/pipestat/pipestat.py index 12e78c09..8a689781 100644 --- a/pipestat/pipestat.py +++ b/pipestat/pipestat.py @@ -9,6 +9,9 @@ from jsonschema import validate from yacman import FutureYAMLConfigManager as YAMLConfigManager from yacman.yacman_future import select_config +from ubiquerg import mkabs +from yacman import load_yaml + from typing import Optional, Union, Dict, Any, List, Iterator @@ -24,7 +27,12 @@ ) from pipestat.backends.file_backend.filebackend import FileBackend from .reports import HTMLReportBuilder, _create_stats_objs_summaries -from .helpers import validate_type, mk_abs_via_cfg, read_yaml_data, default_formatter, zip_report +from .helpers import ( + validate_type, + default_formatter, + zip_report, + make_subdirectories, +) from .const import ( PKG_NAME, DEFAULT_PIPELINE_NAME, @@ -173,7 +181,7 @@ def __init__( else: self.cfg[CONFIG_KEY] = YAMLConfigManager() - _, cfg_schema = read_yaml_data(CFG_SCHEMA, "config schema") + cfg_schema = load_yaml(CFG_SCHEMA) validate(self.cfg[CONFIG_KEY].exp, cfg_schema) self.cfg[SCHEMA_PATH] = self.cfg[CONFIG_KEY].priority_get( @@ -207,7 +215,7 @@ def __init__( "pipeline_type", default="sample", override=pipeline_type ) - self.cfg[FILE_KEY] = mk_abs_via_cfg( + self.cfg[FILE_KEY] = mkabs( self.resolve_results_file_path( self.cfg[CONFIG_KEY].priority_get( "results_file_path", @@ -217,6 +225,7 @@ def __init__( ), self.cfg["config_path"], ) + make_subdirectories(self.cfg[FILE_KEY]) self.cfg[RESULT_FORMATTER] = result_formatter @@ -341,9 +350,8 @@ def initialize_filebackend(self, record_identifier, results_file_path, flag_file override=flag_file_dir, default=os.path.dirname(self.cfg[FILE_KEY]), ) - self.cfg[STATUS_FILE_DIR] = mk_abs_via_cfg( - flag_file_dir, self.config_path or self.cfg[FILE_KEY] - ) + self.cfg[STATUS_FILE_DIR] = mkabs(flag_file_dir, self.config_path or self.cfg[FILE_KEY]) + make_subdirectories(self.cfg[STATUS_FILE_DIR]) self.backend = FileBackend( self.cfg[FILE_KEY], @@ -468,6 +476,7 @@ def list_recent_results( col_name = CREATED_TIME else: col_name = MODIFIED_TIME + results = self.select_records( limit=limit, filter_conditions=[ @@ -483,7 +492,6 @@ def list_recent_results( }, ], ) - return results def process_schema(self, schema_path): @@ -501,7 +509,7 @@ def process_schema(self, schema_path): # raise SchemaNotFoundError("PipestatManager creation failed; no schema") else: # Main schema - schema_to_read = mk_abs_via_cfg(self._schema_path, self.cfg["config_path"]) + schema_to_read = mkabs(self._schema_path, self.cfg["config_path"]) self._schema_path = schema_to_read parsed_schema = ParsedSchema(schema_to_read) self.cfg[SCHEMA_KEY] = parsed_schema @@ -509,10 +517,8 @@ def process_schema(self, schema_path): # Status schema self.cfg[STATUS_SCHEMA_KEY] = parsed_schema.status_data if not self.cfg[STATUS_SCHEMA_KEY]: - ( - self.cfg[STATUS_SCHEMA_SOURCE_KEY], - self.cfg[STATUS_SCHEMA_KEY], - ) = read_yaml_data(path=STATUS_SCHEMA, what="default status schema") + self.cfg[STATUS_SCHEMA_SOURCE_KEY] = STATUS_SCHEMA + self.cfg[STATUS_SCHEMA_KEY] = load_yaml(filepath=STATUS_SCHEMA) else: self.cfg[STATUS_SCHEMA_SOURCE_KEY] = schema_to_read @@ -540,14 +546,26 @@ def remove( result_identifier=result_identifier, ) + @require_backend + def remove_record( + self, + record_identifier: Optional[str] = None, + rm_record: Optional[bool] = False, + ) -> bool: + return self.backend.remove_record( + record_identifier=record_identifier, + rm_record=rm_record, + ) + @require_backend def report( self, values: Dict[str, Any], record_identifier: Optional[str] = None, - force_overwrite: bool = False, + force_overwrite: bool = True, result_formatter: Optional[staticmethod] = None, strict_type: bool = True, + history_enabled: bool = True, ) -> Union[List[str], bool]: """ Report a result. @@ -561,6 +579,7 @@ def report( :param bool strict_type: whether the type of the reported values should remain as is. Pipestat would attempt to convert to the schema-defined one otherwise + :param bool history_enabled: Should history of reported results be enabled? :return str reported_results: return list of formatted string """ @@ -583,6 +602,7 @@ def report( value=values[r], schema=self.result_schemas[r], strict_type=strict_type, + record_identifier=record_identifier, ) reported_results = self.backend.report( @@ -590,6 +610,7 @@ def report( record_identifier=r_id, force_overwrite=force_overwrite, result_formatter=result_formatter, + history_enabled=history_enabled, ) return reported_results @@ -658,13 +679,10 @@ def retrieve_one( """ Retrieve a single record :param str record_identifier: single record_identifier - :param str result_identifier: single record_identifier - :return: Dict[str, any]: a mapping with filtered - - - results reported for the record + :param str result_identifier: single record_identifier or list of result identifiers + :return: Dict[str, any]: a mapping with filtered results reported for the record """ - r_id = record_identifier or self.record_identifier + record_identifier = record_identifier or self.record_identifier filter_conditions = [ { @@ -692,7 +710,7 @@ def retrieve_one( f"Results '{columns}' for '{record_identifier}' not found" ) try: - return result[0][result_identifier] + return result[0][columns[0]] except IndexError: raise RecordNotFoundError( f"Results '{columns}' for '{record_identifier}' not found" @@ -714,6 +732,69 @@ def retrieve_one( except IndexError: raise RecordNotFoundError(f"Record '{record_identifier}' not found") + def retrieve_history( + self, + record_identifier: str = None, + result_identifier: Optional[Union[str, List[str]]] = None, + ) -> Union[Any, Dict[str, Any]]: + """ + Retrieve a single record's history + :param str record_identifier: single record_identifier + :param str result_identifier: single result_identifier or list of result identifiers + :return: Dict[str, any]: a mapping with filtered historical results + """ + + record_identifier = record_identifier or self.record_identifier + + if self.file: + result = self.backend.select_records( + filter_conditions=[ + { + "key": "record_identifier", + "operator": "eq", + "value": record_identifier, + } + ], + meta_data_bool=True, + )["records"][0] + + if "meta" in result and "history" in result["meta"]: + history = {} + if isinstance(result_identifier, str) and result_identifier in result: + history.update( + {result_identifier: result["meta"]["history"][result_identifier]} + ) + elif isinstance(result_identifier, list): + for r in result_identifier: + if r in result["meta"]["history"]: + history.update({r: result["meta"]["history"][r]}) + else: + history = result["meta"]["history"] + else: + _LOGGER.warning(f"No history available for Record: {record_identifier}") + return {} + + else: + if result_identifier: + history = self.backend.retrieve_history_db(record_identifier, result_identifier)[ + "history" + ] + else: + history = self.backend.retrieve_history_db(record_identifier)["history"] + + # DB backend returns some extra_keys that we can remove before returning them to the user. + extra_keys_to_delete = [ + "id", + "pipestat_created_time", + "source_record_id", + "record_identifier", + ] + history = { + key: value for key, value in history.items() if key not in extra_keys_to_delete + } + + return history + def retrieve_many( self, record_identifiers: List[str], @@ -813,7 +894,8 @@ def check_multi_results(self): results_directory = self.cfg["unresolved_result_path"].split( "{record_identifier}" )[0] - results_directory = mk_abs_via_cfg(results_directory, self.cfg["config_path"]) + results_directory = mkabs(results_directory, self.cfg["config_path"]) + make_subdirectories(results_directory) self.backend.aggregate_multi_results(results_directory) @require_backend diff --git a/pipestat/reports.py b/pipestat/reports.py index 702d57a3..2cf5422b 100644 --- a/pipestat/reports.py +++ b/pipestat/reports.py @@ -17,7 +17,8 @@ from peppy.const import AMENDMENTS_KEY from typing import List from copy import deepcopy -from .helpers import mk_abs_via_cfg + +from ubiquerg import mkabs from ._version import __version__ from .const import ( @@ -34,7 +35,7 @@ STATUS_FILE_DIR, FILE_KEY, ) - +from .helpers import make_subdirectories _LOGGER = getLogger(PKG_NAME) @@ -509,12 +510,13 @@ def create_sample_html(self, sample_stats, navbar, footer, sample_name): if self.prj.cfg["multi_result_files"] is True: self.prj.cfg["record_identifier"] = sample_name - temp_result_file_path = mk_abs_via_cfg( + temp_result_file_path = mkabs( self.prj.resolve_results_file_path(self.prj.cfg["unresolved_result_path"]), self.prj.cfg["config_path"], ) + make_subdirectories(temp_result_file_path) self.prj.backend.status_file_dir = os.path.dirname( - mk_abs_via_cfg(temp_result_file_path, self.prj.cfg["config_path"]) + mkabs(temp_result_file_path, self.prj.cfg["config_path"]) ) flag = self.prj.get_status(record_identifier=sample_name) @@ -1218,12 +1220,13 @@ def _warn(what, e, sn): try: if psm.cfg["multi_result_files"] is True: psm.cfg["record_identifier"] = sample_name - temp_result_file_path = mk_abs_via_cfg( + temp_result_file_path = mkabs( psm.resolve_results_file_path(psm.cfg["unresolved_result_path"]), psm.cfg["config_path"], ) + make_subdirectories(temp_result_file_path) psm.backend.status_file_dir = os.path.dirname( - mk_abs_via_cfg(temp_result_file_path, psm.cfg["config_path"]) + mkabs(temp_result_file_path, psm.cfg["config_path"]) ) status = psm.get_status(record_identifier=sample_name) statuses.append(status) @@ -1327,46 +1330,6 @@ def _get_runtime(profile_df: _pd.DataFrame) -> str: ).split(".")[0] -def get_file_for_project( - prj, - pipeline_name: str, - appendix: str = None, - directory: str = None, - reportdir: str = None, -) -> str: - """ - Create a path to the file for the current project. - Takes the possibility of amendment being activated at the time - - Format of the output path: - {output_dir}/{directory}/{p.name}_{pipeline_name}_{active_amendments}_{appendix} - - :param pipestat.PipestatManager prj: pipestat manager object - :param str pipeline_name: name of the pipeline to get the file for - :param str appendix: the appendix of the file to create the path for, - like 'objs_summary.tsv' for objects summary file - :param str directory: optional subdirectory for location of file - :return str fp: path to the file, e.g. objects.yaml, stats.tsv - """ - # TODO try to combine with get_file_for_table to reduce code. - if reportdir is None: # Determine a default reportdir if not provided - results_file_path = getattr(prj.backend, "results_file_path", None) - config_path = getattr(prj, "config_path", None) - output_dir = getattr(prj, "output_dir", None) - output_dir = output_dir or results_file_path or config_path - output_dir = os.path.dirname(output_dir) - reportdir = os.path.join(output_dir, "reports") - if prj.cfg["project_name"] is None: - fp = os.path.join(reportdir, directory or "", f"NO_PROJECT_NAME_{pipeline_name}") - else: - fp = os.path.join(reportdir, directory or "", f"{prj.cfg['project_name']}_{pipeline_name}") - - if hasattr(prj, "amendments") and getattr(prj, "amendments"): - fp += f"_{'_'.join(prj.amendments)}" - fp += f"_{appendix}" - return fp - - def get_file_for_table(prj, pipeline_name: str, appendix=None, directory=None) -> str: """ Create a path to the file for the current project. @@ -1392,7 +1355,8 @@ def get_file_for_table(prj, pipeline_name: str, appendix=None, directory=None) - fp = os.path.join(table_dir, directory or "", f"{pipeline_name}") if hasattr(prj, "amendments") and getattr(prj, "amendments"): fp += f"_{'_'.join(prj.amendments)}" - fp += f"_{appendix}" + if appendix: + fp += f"_{appendix}" return fp diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index da8d9bd0..6be7e389 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -1,7 +1,7 @@ jsonschema logmuse>=0.2.5 -oyaml -ubiquerg>=0.6.1 +pyyaml +ubiquerg>=0.8.0 yacman>=0.9.3 pandas eido diff --git a/requirements/requirements-test.txt b/requirements/requirements-test.txt index 2e6a1871..0383dfbd 100644 --- a/requirements/requirements-test.txt +++ b/requirements/requirements-test.txt @@ -5,10 +5,9 @@ pytest-cov>=2.8.1 jinja2 jsonschema logmuse>=0.2.5 -oyaml -ubiquerg>=0.6.1 +pyyaml +ubiquerg>=0.8.0 yacman>=0.9.2 -PyYAML pandas eido psycopg @@ -16,5 +15,7 @@ pydantic>=2.5.3, <3.0.0 sqlmodel>=0.0.14 uvicorn fastapi +coverage +smokeshow diff --git a/tests/conftest.py b/tests/conftest.py index 0f593b79..e31df43d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,7 @@ import subprocess from pipestat.const import STATUS_SCHEMA -from pipestat.helpers import read_yaml_data +from yacman import load_yaml from atexit import register REC_ID = "constant_record_id" @@ -45,18 +45,20 @@ DB_DEPENDENCIES = False +# SERVICE_UNAVAILABLE = False +# DB_DEPENDENCIES = True + + def get_data_file_path(filename: str) -> str: data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") return os.path.join(data_path, filename) # Data corresponding to the non-default status schema info used in a few places in the test data files. -_, COMMON_CUSTOM_STATUS_DATA = read_yaml_data( - path=get_data_file_path("custom_status_schema.yaml"), what="custom test schema data" -) +COMMON_CUSTOM_STATUS_DATA = load_yaml(filepath=get_data_file_path("custom_status_schema.yaml")) # Data corresponding to default status schema, at pipestat/schema/status_schema.yaml -_, DEFAULT_STATUS_DATA = read_yaml_data(path=STATUS_SCHEMA, what="default status schema") +DEFAULT_STATUS_DATA = load_yaml(filepath=STATUS_SCHEMA) @pytest.fixture diff --git a/tests/data/config.yaml b/tests/data/config.yaml index 02e1717c..f6b37c09 100644 --- a/tests/data/config.yaml +++ b/tests/data/config.yaml @@ -1,5 +1,5 @@ project_name: test -sample_name: sample1 +record_identifier: sample1 schema_path: sample_output_schema.yaml database: dialect: postgresql diff --git a/tests/test_db_only_mode.py b/tests/test_db_only_mode.py index 5bc0d32f..28f316cf 100644 --- a/tests/test_db_only_mode.py +++ b/tests/test_db_only_mode.py @@ -47,6 +47,7 @@ def test_manager_can_be_built_without_exception(self, config_file_path, schema_f database_only=True, config_file=config_file_path, ) + print("done") except Exception as e: pytest.fail(f"Pipestat manager construction failed: {e})") diff --git a/tests/test_init.py b/tests/test_init.py index 52fe6869..80cb5f12 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1,6 +1,5 @@ from tempfile import mkdtemp -import oyaml import pytest import os from yaml import dump diff --git a/tests/test_parsed_schema.py b/tests/test_parsed_schema.py index bb4795f7..7c636112 100644 --- a/tests/test_parsed_schema.py +++ b/tests/test_parsed_schema.py @@ -4,22 +4,23 @@ from pathlib import Path from typing import * import pytest -import oyaml +import yaml from pipestat.const import SAMPLE_NAME, STATUS, RECORD_IDENTIFIER -from pipestat.exceptions import SchemaError +from pipestat.exceptions import SchemaError, SchemaValidationErrorDuringReport from pipestat.parsed_schema import ( NULL_MAPPING_VALUE, ParsedSchema, SCHEMA_PIPELINE_NAME_KEY, ) from .conftest import COMMON_CUSTOM_STATUS_DATA, DEFAULT_STATUS_DATA, get_data_file_path +from pipestat.helpers import validate_type TEMP_SCHEMA_FILENAME = "schema.tmp.yaml" def write_yaml(data: Mapping[str, Any], path: Path) -> Path: with open(path, "w") as fH: - oyaml.dump(data, fH) + yaml.dump(data, fH) return path @@ -30,7 +31,7 @@ def echo_data(data: Mapping[str, Any], path: Path) -> Mapping[str, Any]: def read_yaml(path: Union[str, Path]) -> Dict[str, Any]: with open(path, "r") as fh: - return oyaml.safe_load(fh) + return yaml.safe_load(fh) @pytest.fixture(scope="function", params=[lambda p: p, read_yaml]) @@ -275,3 +276,17 @@ def test_JSON_schema_resolved_original(output_schema_as_JSON_schema, output_sche print(schema2.original_schema) print(schema2.resolved_schema) print("done") + + +def test_JSON_schema_validation_exception(output_schema_as_JSON_schema): + # schema with defs and refs + schema = ParsedSchema(output_schema_as_JSON_schema) + sample_level_data = schema.sample_level_data + + # This should pass without exception + validate_type(value=5, schema=sample_level_data["number_of_things"]) + + with pytest.raises(SchemaValidationErrorDuringReport): + validate_type( + value="string", schema=sample_level_data["number_of_things"], strict_type=True + ) diff --git a/tests/test_pipestat.py b/tests/test_pipestat.py index df22f9ef..c874f36c 100644 --- a/tests/test_pipestat.py +++ b/tests/test_pipestat.py @@ -60,7 +60,7 @@ class TestSplitClasses: ], ) @pytest.mark.parametrize("backend", ["file", "db"]) - def test_basics( + def test_basics_all( self, rec_id, val, @@ -90,6 +90,7 @@ def test_basics( psm.clear_status(record_identifier=rec_id) status = psm.get_status(record_identifier=rec_id) assert status is None + psm.remove_record(record_identifier=rec_id, rm_record=True) with pytest.raises(RecordNotFoundError): psm.retrieve_one(record_identifier=rec_id) if backend == "db": @@ -177,8 +178,8 @@ def test_basics_project_level( psm.clear_status(record_identifier=rec_id) status = psm.get_status(record_identifier=rec_id) assert status is None - with pytest.raises(RecordNotFoundError): - psm.retrieve_one(record_identifier=rec_id) + # with pytest.raises(RecordNotFoundError): + # psm.retrieve_one(record_identifier=rec_id) if backend == "db": psm.remove(record_identifier=rec_id) with pytest.raises(RecordNotFoundError): @@ -199,7 +200,7 @@ class TestReporting: ("sample3", {"name_of_something": "test_name"}), ], ) - @pytest.mark.parametrize("backend", ["file", "db"]) + @pytest.mark.parametrize("backend", ["db"]) def test_report_basic( self, rec_id, @@ -327,7 +328,7 @@ def test_report_samples_and_project_with_pipestatmanager( }, ], ) - @pytest.mark.parametrize("backend", ["file", "db"]) + @pytest.mark.parametrize("backend", ["file"]) def test_complex_object_report( self, val, config_file_path, recursive_schema_file_path, results_file_path, backend ): @@ -588,6 +589,70 @@ def test_retrieve_basic( # Test Retrieve Whole Record assert isinstance(psm.retrieve_one(record_identifier=rec_id), Mapping) + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ("sample1", {"name_of_something": "test_name"}), + ("sample1", {"number_of_things": 2}), + ], + ) + @pytest.mark.parametrize("backend", ["file", "db"]) + def test_retrieve_basic_no_record_identifier( + self, + rec_id, + val, + config_file_path, + results_file_path, + schema_file_path, + backend, + ): + with NamedTemporaryFile() as f, ContextManagerDBTesting(DB_URL): + results_file_path = f.name + args = dict(schema_path=schema_file_path, database_only=False) + backend_data = ( + {"config_file": config_file_path} + if backend == "db" + else {"results_file_path": results_file_path} + ) + args.update(backend_data) + args.update(record_identifier=rec_id) + psm = SamplePipestatManager(**args) + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + assert ( + psm.retrieve_one(result_identifier=list(val.keys())[0]) == list(val.items())[0][1] + ) + + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ("sample1", {"number_of_things": 2}), + ], + ) + @pytest.mark.parametrize("backend", ["file", "db"]) + def test_retrieve_one_single_result_as_list( + self, + rec_id, + val, + config_file_path, + results_file_path, + schema_file_path, + backend, + ): + with NamedTemporaryFile() as f, ContextManagerDBTesting(DB_URL): + results_file_path = f.name + args = dict(schema_path=schema_file_path, database_only=False) + backend_data = ( + {"config_file": config_file_path} + if backend == "db" + else {"results_file_path": results_file_path} + ) + args.update(backend_data) + psm = SamplePipestatManager(**args) + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + assert psm.retrieve_one( + record_identifier=rec_id, result_identifier="number_of_things" + ) == psm.retrieve_one(record_identifier=rec_id, result_identifier=["number_of_things"]) + @pytest.mark.parametrize( ["rec_id", "val"], [ @@ -834,37 +899,6 @@ def test_remove_nonexistent_record( psm = SamplePipestatManager(**args) assert not psm.remove(record_identifier=rec_id) - @pytest.mark.parametrize(["rec_id", "res_id"], [("sample3", "name_of_something")]) - @pytest.mark.parametrize("backend", ["file"]) - def test_last_result_removal_removes_record( - self, - rec_id, - res_id, - schema_file_path, - config_file_path, - results_file_path, - backend, - ): - with NamedTemporaryFile() as f, ContextManagerDBTesting(DB_URL): - results_file_path = f.name - args = dict(schema_path=schema_file_path, database_only=False) - backend_data = ( - {"config_file": config_file_path} - if backend == "db" - else {"results_file_path": results_file_path} - ) - args.update(backend_data) - psm = SamplePipestatManager(**args) - psm.report( - record_identifier=rec_id, - values={res_id: "something"}, - force_overwrite=True, - ) - assert psm.remove(record_identifier=rec_id, result_identifier=res_id) - - with pytest.raises(RecordNotFoundError): - result = psm.retrieve_one(record_identifier=rec_id) - @pytest.mark.skipif(not DB_DEPENDENCIES, reason="Requires dependencies") @pytest.mark.skipif(SERVICE_UNAVAILABLE, reason="requires postgres service to be available") @@ -1004,7 +1038,6 @@ def test_no_config__psm_is_built_from_env_vars( except Exception as e: pytest.fail(f"Error during pipestat manager creation: {e}") - # @pytest.mark.skip(reason="known failure for now with config file") def test_config__psm_is_built_from_config_file_env_var(self, monkeypatch, config_file_path): """PSM can be created from config parsed from env var value.""" monkeypatch.setenv(ENV_VARS["config"], config_file_path) @@ -1362,7 +1395,7 @@ def test_basics( list(val.keys())[0], "--value", list(val.values())[0], - "--config-file", + "--config", config_file_path, "--schema", schema_file_path, @@ -1381,8 +1414,6 @@ def test_basics( rec_id, "--result-identifier", list(val.keys())[0], - "--value", - list(val.values())[0], "--results-file", results_file_path, "--schema", @@ -1395,9 +1426,34 @@ def test_basics( rec_id, "--result-identifier", list(val.keys())[0], - "--value", - list(val.values())[0], - "--config-file", + "--config", + config_file_path, + "--schema", + schema_file_path, + ] + + with pytest.raises( + SystemExit + ): # pipestat cli normal behavior is to end with a "sys.exit(0)" + main(test_args=x) + + # history + if backend != "db": + x = [ + "history", + "--record-identifier", + rec_id, + "--results-file", + results_file_path, + "--schema", + schema_file_path, + ] + else: + x = [ + "history", + "--record-identifier", + rec_id, + "--config", config_file_path, "--schema", schema_file_path, @@ -1462,91 +1518,6 @@ def test_linking( @pytest.mark.skipif(not DB_DEPENDENCIES, reason="Requires dependencies") @pytest.mark.skipif(SERVICE_UNAVAILABLE, reason="requires service X to be available") class TestTimeStamp: - @pytest.mark.parametrize( - ["rec_id", "val"], - [ - ("sample1", {"name_of_something": "test_name"}), - ], - ) - @pytest.mark.parametrize("backend", ["file", "db"]) - def test_basic_time_stamp( - self, - rec_id, - val, - config_file_path, - schema_file_path, - results_file_path, - backend, - ): - with NamedTemporaryFile() as f, ContextManagerDBTesting(DB_URL): - results_file_path = f.name - args = dict(schema_path=schema_file_path, database_only=False) - backend_data = ( - {"config_file": config_file_path} - if backend == "db" - else {"results_file_path": results_file_path} - ) - args.update(backend_data) - psm = SamplePipestatManager(**args) - psm.report(record_identifier=rec_id, values=val, force_overwrite=True) - - # CHECK CREATION AND MODIFY TIME EXIST - - created = psm.select_records( - filter_conditions=[ - { - "key": "record_identifier", - "operator": "eq", - "value": rec_id, - }, - ], - columns=[CREATED_TIME], - )["records"][0][CREATED_TIME] - - modified = psm.select_records( - filter_conditions=[ - { - "key": "record_identifier", - "operator": "eq", - "value": rec_id, - }, - ], - columns=[MODIFIED_TIME], - )["records"][0][MODIFIED_TIME] - - assert created is not None - assert modified is not None - assert created == modified - # Report new - val = {"number_of_things": 1} - time.sleep( - 1 - ) # The filebackend is so fast that the updated time will equal the created time - psm.report(record_identifier="sample1", values=val, force_overwrite=True) - # CHECK MODIFY TIME DIFFERS FROM CREATED TIME - created = psm.select_records( - filter_conditions=[ - { - "key": "record_identifier", - "operator": "eq", - "value": rec_id, - }, - ], - columns=[CREATED_TIME], - )["records"][0][CREATED_TIME] - - modified = psm.select_records( - filter_conditions=[ - { - "key": "record_identifier", - "operator": "eq", - "value": rec_id, - }, - ], - columns=[MODIFIED_TIME], - )["records"][0][MODIFIED_TIME] - - assert created != modified @pytest.mark.parametrize("backend", ["db", "file"]) def test_list_recent_results( @@ -1598,7 +1569,7 @@ def test_list_recent_results( @pytest.mark.skipif(not DB_DEPENDENCIES, reason="Requires dependencies") @pytest.mark.skipif(SERVICE_UNAVAILABLE, reason="requires service X to be available") class TestSelectRecords: - @pytest.mark.parametrize("backend", ["file", "db"]) + @pytest.mark.parametrize("backend", ["db", "file"]) def test_select_records_basic( self, config_file_path, @@ -2340,3 +2311,173 @@ def test_set_index( mod = psm.backend.get_model(table_name=psm.backend.table_name) assert mod.md5sum.index is True assert mod.number_of_things.index is False + + +@pytest.mark.skipif(not DB_DEPENDENCIES, reason="Requires dependencies") +@pytest.mark.skipif(SERVICE_UNAVAILABLE, reason="requires service X to be available") +class TestRetrieveHistory: + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ("sample1", {"name_of_something": "test_name"}), + ], + ) + @pytest.mark.parametrize("backend", ["db", "file"]) + def test_select_history_basic( + self, + config_file_path, + results_file_path, + schema_file_path, + backend, + range_values, + rec_id, + val, + ): + with NamedTemporaryFile() as f, ContextManagerDBTesting(DB_URL): + results_file_path = f.name + args = dict(schema_path=schema_file_path, database_only=False) + backend_data = ( + {"config_file": config_file_path} + if backend == "db" + else {"results_file_path": results_file_path} + ) + args.update(backend_data) + psm = SamplePipestatManager(**args) + + val["number_of_things"] = 1 + + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + + val = {"name_of_something": "MODIFIED_test_name", "number_of_things": 2} + + time.sleep(1) + + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + + history_result = psm.retrieve_history( + record_identifier="sample1", result_identifier="name_of_something" + ) + + all_history_result = psm.retrieve_history(record_identifier="sample1") + + assert len(all_history_result.keys()) == 2 + assert len(history_result.keys()) == 1 + assert len(history_result["name_of_something"].keys()) == 1 + + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ("sample1", {"name_of_something": "test_name"}), + ], + ) + @pytest.mark.parametrize("backend", ["db", "file"]) + def test_select_history_multi_results( + self, + config_file_path, + results_file_path, + schema_file_path, + backend, + range_values, + rec_id, + val, + ): + with NamedTemporaryFile() as f, ContextManagerDBTesting(DB_URL): + results_file_path = f.name + args = dict(schema_path=schema_file_path, database_only=False) + backend_data = ( + {"config_file": config_file_path} + if backend == "db" + else {"results_file_path": results_file_path} + ) + args.update(backend_data) + psm = SamplePipestatManager(**args) + + val["number_of_things"] = 1 + + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + + val = {"name_of_something": "MODIFIED_test_name", "number_of_things": 2} + + time.sleep(1) + + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + + history_result = psm.retrieve_history( + record_identifier="sample1", + result_identifier=["name_of_something", "number_of_things"], + ) + + assert len(history_result.keys()) == 2 + assert len(history_result["name_of_something"].keys()) == 1 + + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ( + "sample1", + { + "output_image": { + "path": "path_string", + "thumbnail_path": "thumbnail_path_string", + "title": "title_string", + } + }, + ), + ], + ) + @pytest.mark.parametrize("backend", ["db", "file"]) + def test_select_history_complex_objects( + self, + config_file_path, + results_file_path, + recursive_schema_file_path, + backend, + range_values, + rec_id, + val, + ): + with NamedTemporaryFile() as f, ContextManagerDBTesting(DB_URL): + results_file_path = f.name + args = dict(schema_path=recursive_schema_file_path, database_only=False) + backend_data = ( + {"config_file": config_file_path} + if backend == "db" + else {"results_file_path": results_file_path} + ) + args.update(backend_data) + psm = SamplePipestatManager(**args) + + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + + val = { + "output_image": { + "path": "path_string2", + "thumbnail_path": "thumbnail_path_string2", + "title": "title_string2", + } + } + + time.sleep(1) + + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + + val = { + "output_image": { + "path": "path_string3", + "thumbnail_path": "thumbnail_path_string3", + "title": "title_string3", + } + } + + time.sleep(1) + + psm.report(record_identifier=rec_id, values=val, force_overwrite=True) + + history_result = psm.retrieve_history( + record_identifier="sample1", + result_identifier="output_image", + ) + + assert len(history_result.keys()) == 1 + assert "output_image" in history_result + assert len(history_result["output_image"].keys()) == 2