diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6ee0561..5e4d39c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ repos: - id: check-added-large-files - id: check-case-conflict - id: check-docstring-first - - id: check-executables-have-shebangs + - id: check-json - id: check-merge-conflict - id: check-toml - id: check-symlinks @@ -26,22 +26,25 @@ repos: hooks: - id: black language_version: python3.9 + types: [python] - repo: https://github.com/pycqa/pylint rev: v2.12.2 hooks: - id: pylint language: system + types: [python] - repo: https://gitlab.com/pycqa/flake8 rev: 4.0.1 hooks: - id: flake8 language: system -# - repo: https://github.com/python/mypy + types: [python] - repo: https://github.com/pre-commit/mirrors-mypy rev: v0.930 hooks: - id: mypy language: system + types: [python] - repo: local hooks: - id: pytest diff --git a/docker-compose.yml b/docker-compose.yml index be27aa1..6445373 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,59 +30,12 @@ services: build: context: . target: test - depends_on: - - postgres - - notifier - - verifier - - epic environment: - - CONFIG=/local/test.yaml - - ENV=/local/docker.env + - CONFIG=./local/test.yaml + - ENV=./secrets/example.env volumes: - ./assets:/assets + - ./gold:/gold - ./local:/local - - notifier: - build: - context: . - target: epic - command: epic.notify - depends_on: - - epic - - postgres - environment: - - CONFIG=/local/notifier.yaml - - ENV=/local/docker.env - volumes: - - ./assets:/assets - - ./local:/local - - verifier: - build: - context: . - target: epic - command: epic.verify - depends_on: - - epic - - postgres - environment: - - CONFIG=/local/verifier.yaml - - ENV=/local/docker.env - volumes: - - ./assets:/assets - - ./local:/local - - epic: - build: - context: . - target: epic - environment: - - CONFIG=/local/epic.yaml - - ENV=/local/docker.env - expose: - - "80" - ports: - - "80:80" - restart: always - volumes: - - ./local:/local + - ./model:/model + - ./secrets:/secrets diff --git a/dockerfile b/dockerfile index d737ca6..3fb9980 100644 --- a/dockerfile +++ b/dockerfile @@ -26,22 +26,15 @@ RUN \ apt-get -qq autoremove -y --purge && \ rm -rf /var/lib/apt/lists/* -FROM python:3.9.9-slim-bullseye as epic -LABEL name="epic" -WORKDIR /tmp -ENV FREETDS /etc/freetds -ENV PATH /root/.local/bin:$PATH -COPY --from=build /root/.local /root/.local -COPY --from=build /tmp/assets /tmp/assets -COPY --from=build /usr/bin/tini /usr/bin -ENTRYPOINT ["/usr/bin/tini", "--"] -CMD ["epic"] - FROM build as test ARG IFLAGS LABEL name="dsdk.test" WORKDIR /tmp RUN \ - pip install ${IFLAGS} ".[all]" + pip install ${IFLAGS} ".[all]" && \ + ln -s /local ./local && \ + ln -s /secrets ./secrets && \ + ln -s /model ./model && \ + ln -s /gold ./gold ENTRYPOINT [ "/usr/bin/tini", "--" ] CMD [ "pre-commit", "run", "--all-files" ] diff --git a/local/.gitignore b/local/.gitignore index b91cff5..d8efc03 100644 --- a/local/.gitignore +++ b/local/.gitignore @@ -1,6 +1,5 @@ * !.gitignore !configuration.yaml -!notifier.yaml -!verifier.yaml -!epic.yaml +!docker.yaml +!test.yaml diff --git a/local/configuration.yaml b/local/configuration.yaml index 73ea058..cf1e6ef 100644 --- a/local/configuration.yaml +++ b/local/configuration.yaml @@ -1,6 +1,18 @@ +!example +flowsheets: !flowsheets + client_id: ${EPIC_CLIENT_ID} + cookie: ${EPIC_COOKIE} + flowsheet_id: ${EPIC_FLOWSHEET_ID} + flowsheet_template_id: ${EPIC_FLOWSHEET_TEMPLATE_ID} + password: ${EPIC_PASSWORD} + url: ${EPIC_URL}api/epic/2011/clinical/patient/addflowsheetvalue/flowsheetvalue + username: ${EPIC_USERNAME} postgres: !postgres database: ${POSTGRES_DATABASE} - host: ${POSTRES_HOST} + host: ${POSTGRES_HOST} password: ${POSTGRES_PASSWORD} - port: 5432 + port: ${POSTGRES_PORT} + schema: example + sql: null username: ${POSTGRES_USERNAME} +time_zone: America/New_York diff --git a/local/epic.yaml b/local/epic.yaml deleted file mode 100644 index c9b0e67..0000000 --- a/local/epic.yaml +++ /dev/null @@ -1,4 +0,0 @@ -!epic -address: 0.0.0.0:80 -add_flowsheet_value_path: ${EPIC_ROOT}${EPIC_ADD_FLOWSHEET_VALUE_PATH} -get_flowsheet_rows_path: ${EPIC_ROOT}${EPIC_GET_FLOWSHEET_ROWS_PATH} diff --git a/local/notifier.yaml b/local/notifier.yaml deleted file mode 100644 index b2461d1..0000000 --- a/local/notifier.yaml +++ /dev/null @@ -1,18 +0,0 @@ -!epicnotifier -client_id: ${EPIC_CLIENT_ID} -cookie: ${EPIC_COOKIE} -flowsheet_id: ${EPIC_FLOWSHEET_ID} -flowsheet_template_id: ${EPIC_FLOWSHEET_TEMPLATE_ID} -password: ${EPIC_PASSWORD} -postgres: !postgres - database: ${POSTGRES_DATABASE} - host: ${POSTGRES_HOST} - password: ${POSTGRES_PASSWORD} - port: 5432 - sql: !asset - ext: .sql - path: ./assets/postgres - username: ${POSTGRES_USERNAME} -username: ${EPIC_USERNAME} -user_id: ${EPIC_USER_ID} -url: ${EPIC_URL}${EPIC_ROOT}${EPIC_ADD_FLOWSHEET_VALUE_PATH} diff --git a/local/test.yaml b/local/test.yaml new file mode 100644 index 0000000..6cb5648 --- /dev/null +++ b/local/test.yaml @@ -0,0 +1,20 @@ +!example +as_of: 2019-09-18 17:19:23.873398+00 +gold: ./predict/gold/gold.pkl +flowsheets: !flowsheets + client_id: ${EPIC_CLIENT_ID} + cookie: ${EPIC_COOKIE} + flowsheet_id: ${EPIC_FLOWSHEET_ID} + flowsheet_template_id: ${EPIC_FLOWSHEET_TEMPLATE_ID} + password: ${EPIC_PASSWORD} + url: ${EPIC_URL}api/epic/2011/clinical/patient/addflowsheetvalue/flowsheetvalue + username: ${EPIC_USERNAME} +postgres: !postgres + database: ${POSTGRES_DATABASE} + host: ${POSTGRES_HOST} + password: ${POSTGRES_PASSWORD} + port: ${POSTGRES_PORT} + schema: example + sql: null + username: ${POSTGRES_USERNAME} +time_zone: America/New_York diff --git a/local/verifier.yaml b/local/verifier.yaml deleted file mode 100644 index 5f1ca7f..0000000 --- a/local/verifier.yaml +++ /dev/null @@ -1,18 +0,0 @@ -!epicverifier -client_id: ${EPIC_CLIENT_ID} -cookie: ${EPIC_COOKIE} -flowsheet_id: ${EPIC_FLOWSHEET_ID} -flowsheet_template_id: ${EPIC_FLOWSHEET_TEMPLATE_ID} -password: ${EPIC_PASSWORD} -postgres: !postgres - database: ${POSTGRES_DATABASE} - host: ${POSTGRES_HOST} - password: ${POSTGRES_PASSWORD} - port: 5432 - sql: !asset - ext: .sql - path: ./assets/postgres - username: ${POSTGRES_USERNAME} -username: ${EPIC_USERNAME} -user_id: ${EPIC_USER_ID} -url: ${EPIC_URL}${EPIC_ROOT}${EPIC_GET_FLOWSHEET_ROWS_PATH} diff --git a/secrets/.gitignore b/secrets/.gitignore index b8dfb1e..2f6aa94 100644 --- a/secrets/.gitignore +++ b/secrets/.gitignore @@ -1,3 +1,3 @@ * !.gitignore -!docker.env +!example.env diff --git a/secrets/docker.env b/secrets/docker.env deleted file mode 100644 index ee14b71..0000000 --- a/secrets/docker.env +++ /dev/null @@ -1,16 +0,0 @@ -EPIC_ADD_FLOWSHEET_VALUE_PATH=api/epic/2011/clinical/patient/addflowsheetvalue/flowsheetvalue?PatientID={PatientID}&PatientIDType={PatientIDType}&ContactID={ContactID}&ContactIDType={ContactIDType}&UserID={UserID}&UserIDType={UserIDType}&FlowsheetID={FlowsheetID}&FlowsheetIDType={FlowsheetIDType}&Value={Value}&Comment={Comment}&InstantValueTaken={InstantValueTaken}&FlowsheetTemplateID={FlowsheetTemplateID}&FlowsheetTemplateIDType={FlowsheetTemplateIDType} -EPIC_CLIENT_ID=00000000-0000-0000-0000-000000000000 -EPIC_COOKIE=ASP.NET_SessionId=000000000000000000000000 -EPIC_FLOWSHEET_ID=0000000000 -EPIC_FLOWSHEET_TEMPLATE_ID=0000000000 -EPIC_GET_FLOWSHEET_ROWS_PATH=api/epic/2014/Clinical/Patient/getflowsheetrows/flowsheetrows -EPIC_PASSWORD=epic -EPIC_ROOT=interconnect-prd-web/ -EPIC_URL=https://epic/ -EPIC_USERNAME=Epicuser -EPIC_USER_ID=EPICUSER -POSTGRES_DATABASE=test -POSTGRES_HOST=postgres -POSTGRES_PASSWORD=postgres -POSTGRES_PORT=5432 -POSTGRES_USERNAME=postgres diff --git a/secrets/example.env b/secrets/example.env new file mode 100644 index 0000000..a2a3de1 --- /dev/null +++ b/secrets/example.env @@ -0,0 +1,13 @@ +EPIC_CLIENT_ID=00000000-0000-0000-0000-000000000000 +EPIC_COOKIE=ASP.NET_SessionId=000000000000000000000000 +EPIC_FLOWSHEET_ID=0000000000 +EPIC_FLOWSHEET_TEMPLATE_ID=0000000000 +EPIC_PASSWORD=password +EPIC_URL=https://interconnectbgprod.uphs.upenn.edu/interconnect-prd-web/ +EPIC_USERNAME=epic +EPIC_USER_ID=PENNSIGNALS +POSTGRES_DATABASE=test +POSTGRES_HOST=postgres +POSTGRES_PASSWORD=password +POSTGRES_PORT=5432 +POSTGRES_USERNAME=postgres diff --git a/setup.py b/setup.py index 4b3abb1..470deda 100644 --- a/setup.py +++ b/setup.py @@ -51,18 +51,10 @@ "types-pymssql", "types-pyyaml", "types-requests", + "vcrpy", ) setup( - entry_points={ - "console_scripts": [ - "epic = dsdk.epic:Server.main" - "epic.notify = dsdk.epic:Notifier.main", - "epic.verify = dsdk.epic:Verifier.main", - "epic.notify.api.test = dsdk.epic:Notifier.test", - "epic.verify.api.test = dsdk.epic:Verifier.test", - ] - }, extras_require={ "all": (PSYCOPG2_REQUIRES + PYMSSQL_REQUIRES + TEST_REQUIRES), "psycopg2": PSYCOPG2_REQUIRES, diff --git a/src/dsdk/__init__.py b/src/dsdk/__init__.py index b764514..78725fb 100644 --- a/src/dsdk/__init__.py +++ b/src/dsdk/__init__.py @@ -2,6 +2,8 @@ """Data Science Deployment Kit.""" from .asset import Asset +from .flowsheet import Mixin as FlowsheetMixin +from .flowsheet import Flowsheet from .interval import Interval from .model import Mixin as ModelMixin from .model import Model @@ -28,6 +30,8 @@ "Batch", "Delegate", "Interval", + "Flowsheet", + "FlowsheetMixin", "Model", "ModelMixin", "MssqlMixin", diff --git a/src/dsdk/epic.py b/src/dsdk/epic.py deleted file mode 100644 index a7a36a8..0000000 --- a/src/dsdk/epic.py +++ /dev/null @@ -1,842 +0,0 @@ -# -*- coding: utf-8 -*- -"""Epic.""" - -from __future__ import annotations - -from argparse import Namespace -from base64 import b64encode -from contextlib import contextmanager -from datetime import datetime, timedelta -from http.server import BaseHTTPRequestHandler, HTTPServer -from json import dumps, loads -from logging import getLogger -from re import compile as re_compile -from select import select # pylint: disable=no-name-in-module -from typing import ( - Any, - Callable, - Dict, - Generator, - List, - Mapping, - Optional, - Tuple, - Union, - cast, -) -from urllib.parse import quote - -from cfgenvy import Parser, yaml_type -from pkg_resources import DistributionNotFound, get_distribution -from requests import Session -from requests.exceptions import ConnectionError as RequestsConnectionError -from requests.exceptions import HTTPError, Timeout - -from .postgres import Persistor as Postgres -from .utils import configure_logger - -try: - __version__ = get_distribution("dsdk").version -except DistributionNotFound: - # package is not installed - pass - - -logger = getLogger(__name__) - - -DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" - - -class Egress(Parser): # pylint: disable=too-many-instance-attributes - """Egress.""" - - ON = dumps({"key": "%s.on"}) - END = dumps({"key": "%s.end"}) - YAML = "!egress" - - VERSION = __version__ - - @classmethod - def as_yaml_type(cls, tag: Optional[str] = None): - """As yaml type.""" - Postgres.as_yaml_type() - yaml_type( - cls, - tag or cls.YAML, - init=cls._yaml_init, - repr=cls._yaml_repr, - ) - - @classmethod - @contextmanager - def context( - cls, - key: str, - argv: Optional[List[str]] = None, - env: Optional[Mapping[str, str]] = None, - ): - """Context.""" - configure_logger("dsdk") - logger.info(cls.ON, key) - yield cls.parse(argv=argv, env=env) - logger.info(cls.END, key) - - @classmethod - def main(cls): - """Main.""" - with cls.context("main") as service: - service() - - @classmethod - def yaml_types(cls): - """Yaml types.""" - cls.as_yaml_type() - - @classmethod - def _yaml_init(cls, loader, node): - """Yaml init.""" - return cls(**loader.construct_mapping(node, deep=True)) - - @classmethod - def _yaml_repr(cls, dumper, self, *, tag): - """Yaml repr.""" - return dumper.represent_mapper(tag, self.as_yaml()) - - def __init__( # pylint: disable=too-many-arguments,too-many-locals - self, - client_id: str, - cookie: str, - password: str, - url: str, - flowsheet_id: str, - flowsheet_template_id: str, - postgres: Postgres, - user_id: str, - username: str, - comment: str = "Not for clinical use.", - contact_id_type: str = "CSN", - flowsheet_id_type: str = "external", - flowsheet_template_id_type: str = "external", - lookback_hours: int = 72, - patient_id_type: str = "UID", - poll_timeout: int = 60, - operation_timeout: int = 5, - user_id_type: str = "external", - ): - """__init__.""" - self.authorization = ( - b"Basic " + b64encode(f"EMP${username}:{password}".encode("utf-8")) - ).decode("utf-8") - self.postgres = postgres - self.client_id = client_id - self.comment = comment - self.contact_id_type = contact_id_type - self.cookie = cookie - self.flowsheet_id = flowsheet_id - self.flowsheet_id_type = flowsheet_id_type - self.flowsheet_template_id = flowsheet_template_id - self.flowsheet_template_id_type = flowsheet_template_id_type - self.lookback_hours = lookback_hours - self.operation_timeout = operation_timeout - self.patient_id_type = patient_id_type - self.poll_timeout = poll_timeout - self.url = url - self.user_id = user_id - self.user_id_type = user_id_type - - def __call__(self): - """__call__.""" - with self.session() as session: - with self.listener() as listener: - with self.postgres.commit() as cur: - self.recover(cur, session) - for each in self.listen(listener): - self.on_notify(each, cur, session) - - def as_yaml(self) -> Dict[str, Any]: - """As yaml.""" - return { - "authorization": self.authorization, - "client_id": self.client_id, - "comment": self.comment, - "contact_id_type": self.contact_id_type, - "cookie": self.cookie, - "flowsheet_id": self.flowsheet_id, - "flowsheet_id_type": self.flowsheet_id_type, - "flowsheet_template_id": self.flowsheet_template_id, - "flowsheet_template_id_type": self.flowsheet_template_id_type, - "lookback_hours": self.lookback_hours, - "operation_timeut": self.operation_timeout, - "patient_id_type": self.patient_id_type, - "poll_timeout": self.poll_timeout, - "url": self.url, - "user_id": self.user_id, - "user_id_type": self.user_id_type, - } - - @contextmanager - def listener(self) -> Generator[Any, None, None]: - """Listener.""" - raise NotImplementedError() - - def listen(self, listen) -> Generator[Any, None, None]: - """Listen.""" - while True: - readers, _, exceptions = select( - [listen], [], [listen], self.poll_timeout - ) - if exceptions: - break - if not readers: - continue - if listen.poll(): - while listen.notifies: - yield listen.notified.pop() - - def on_notify( # pylint: disable=unused-argument,no-self-use - self, - event, - cur, - session: Session, - ): - """On postgres notify handler.""" - logger.debug( - "NOTIFY: %(id)s.%(channel)s.%(payload)s", - { - "channel": event.channel, - "id": event.id, - "payload": event.payload, - }, - ) - - def on_success(self, entity, cur, content: Dict[str, Any]): - """On success.""" - raise NotImplementedError() - - def on_error(self, entity, cur, content: Exception): - """On error.""" - raise NotImplementedError() - - def recover(self, cur, session: Session): - """Recover.""" - sql = self.postgres.sql - cur.execute(sql.epic.prediction.recover) - for each in cur.fetchall(): - ok, content = self.rest(each, session) - if not ok: - self.on_error(each, cur, cast(Exception, content)) - continue - self.on_success(each, cur, cast(Dict[str, Any], content)) - - def rest( - self, - entity, - session: Session, - ) -> Tuple[bool, Union[Exception, Dict[str, Any]]]: - """Rest.""" - raise NotImplementedError() - - @contextmanager - def session(self) -> Generator[Any, None, None]: - """Session.""" - session = Session() - session.verify = False - session.headers.update( - { - "authorization": self.authorization, - "content-type": "application/json", - "cookie": self.cookie, - "epic-client-id": self.client_id, - "epic-user-id": self.user_id, - "epic-user-idtype": self.user_id_type, - } - ) - yield session - - -class Notifier(Egress): - """Notifier.""" - - YAML = "!epicnotifier" - - @classmethod - def test( - cls, - # csn="278820881", - csn="218202909", # inpatient admission date is 2019-02-06 at PAH - empi="8330651951", - id=0, # pylint: disable=redefined-builtin - score="0.5", - ): - """Test epic API.""" - prediction = { - "as_of": datetime.utcnow(), - "csn": csn, - "empi": empi, - "id": id, - "score": score, - } - with cls.context("test.notifier.api") as notifier: - with notifier.session() as session: - ok, response = notifier.rest(prediction, session) - print(ok) - print(response) - - @contextmanager - def listener(self) -> Generator[Any, None, None]: - """Listener.""" - postgres = self.postgres - sql = postgres.sql - with postgres.listen(sql.predictions.listen) as listener: - yield listener - - def on_notify(self, event, cur, session): - """On notify.""" - super.on_notify(event, cur, session) - sql = self.postgres.sql - cur.execute(sql.prediction.recent, event.id) - for each in cur.fetchall(): - ok, content = self.rest(each, session) - if not ok: - self.on_error(each, cur, cast(Exception, content)) - continue - self.on_success(each, cur, cast(Dict[str, Any], content)) - - def on_success(self, entity, cur, content: Dict[str, Any]): - """On success.""" - cur.execute( - self.postgres.sql.epic.notification.insert, - {"prediction_id": entity["id"]}, - ) - - def on_error(self, entity, cur, content: Exception): - """On error.""" - cur.execute( - self.postgres.sql.epic.notifications.errors.insert, - { - "description": str(content), - "name": content.__class__.__name__, - "prediction_id": entity["id"], - }, - ) - - def recover(self, cur, session: Session): - """Recover.""" - sql = self.postgres.sql - cur.execute(sql.epic.notification.recover) - for each in cur.fetchall(): - ok, content = self.rest(each, session) - if not ok: - self.on_error(each, cur, cast(Exception, content)) - continue - self.on_success(each, cur, cast(Dict[str, Any], content)) - - def rest( - self, - entity, - session: Session, - ) -> Tuple[bool, Union[Exception, Dict[str, Any]]]: - """Rest.""" - query = { - "Comment": self.comment, - "ContactID": entity["csn"], - "ContactIDType": self.contact_id_type, - "FlowsheetID": self.flowsheet_id, - "FlowsheetIDType": self.flowsheet_id_type, - "FlowsheetTemplateID": self.flowsheet_template_id, - "FlowsheetTemplateIDType": self.flowsheet_template_id_type, - "InstantValueTaken": entity["as_of"].strftime(DATETIME_FORMAT), - "PatientID": entity["empi"], - "PatientIDType": self.patient_id_type, - "UserID": self.user_id, - "UserIDType": self.user_id_type, - "Value": entity["score"], - } - url = self.url.format( - **{ - key: quote(value.encode("utf-8")) - for key, value in query.items() - } - ) - try: - response = session.post( - url=url, - json={}, - timeout=self.operation_timeout, - ) - response.raise_for_status() - except (RequestsConnectionError, HTTPError, Timeout) as e: - return False, e - return True, response.json() - - -class Verifier(Egress): - """Verifier.""" - - YAML = "!epicverifier" - - @classmethod - def test( - cls, - # csn="278820881", - csn="218202909", # inpatient admission date is 2019-02-06 at PAH - empi="8330651951", - id=0, # pylint: disable=redefined-builtin - score="0.5", - ): - """Test verifier.""" - notification = { - "as_of": datetime.utcnow(), - "csn": csn, - "empi": empi, - "id": id, - "score": score, - } - with cls.context("test.verifier.api") as verifier: - with verifier.session() as session: - ok, response = verifier.rest(notification, session) - print(ok) - print(response) - - @contextmanager - def listener(self) -> Generator[Any, None, None]: - """Listener.""" - postgres = self.postgres - sql = postgres.sql - with postgres.listen(sql.notification.listen) as listener: - yield listener - - def on_notify(self, event, cur, session: Session): - """On notify.""" - super().on_notify(event, cur, session) - sql = self.postgres.sql - cur.execute(sql.notification.recent, event.id) - for each in cur.fetchall(): - ok, content = self.rest(each, session) - if not ok: - self.on_error(each, cur, cast(Exception, content)) - continue - self.on_success(each, cur, cast(Dict[str, Any], content)) - - def on_success(self, entity, cur, content: Dict[str, Any]): - """On success.""" - cur.execute( - self.postgres.sql.epic.verifications.insert, - {"notification_id": entity["id"]}, - ) - - def on_error(self, entity, cur, content: Exception): - """On error.""" - cur.execute( - self.postgres.sql.epic.verifications.errors.insert, - { - "description": str(content), - "name": content.__class__.__name__, - "notification_id": entity["id"], - }, - ) - - def rest( - self, - entity, - session: Session, - ) -> Tuple[bool, Union[Exception, Dict[str, Any]]]: - """Rest.""" - json = { - "ContactID": entity["csn"], - "ContactIDType": self.contact_id_type, - "FlowsheetRowIDs": [ - { - "ID": self.flowsheet_id, - "IDType": self.flowsheet_id_type, - } - ], - "LookbackHours": self.lookback_hours, - "PatientID": entity["empi"], - "PatientIDType": self.patient_id_type, - "UserID": self.user_id, - "UserIDType": self.user_id_type, - } - try: - response = session.post( - url=self.url, - json=json, - timeout=self.operation_timeout, - ) - response.raise_for_status() - except (RequestsConnectionError, HTTPError, Timeout) as e: - return False, e - return True, response.json() - - -def parse_path(path: str) -> Tuple[str, Dict[str, str]]: - """Parse path.""" - url, *query_list = path.split("?", 1) - query = "".join(query_list) - param_list = query.split("&") - params: Dict[str, str] = {} - for each in param_list: - key, *values = each.split("=", 1) - params[key] = "".join(values) - return (url, params) - - -def equals(expected: str) -> Callable[..., None]: - """Equals.""" - - def _check(key: str, value: str): - if value.lower() != expected.lower(): - raise ValueError(f"{key} actual: {value}, expected: {expected}") - - return _check - - -def matches(expected: str) -> Callable[..., None]: - """Matches.""" - pattern = re_compile(expected) - - def _check(key: str, value: str): - if not pattern.match(value): - raise ValueError(f"{key} actual: {value}, pattern: {expected}") - - return _check - - -def is_float(key: str, value: str): - """Is float.""" - actual = str(float(value)) - if actual != value: - raise ValueError(f"{key} actual: {value}, is not float") - - -def is_datetime(key: str, value: str): - """Is datetime.""" - actual = datetime.strptime(value, DATETIME_FORMAT).strftime( - DATETIME_FORMAT - ) - if actual != value: - raise ValueError(f"{key} actual: {value}, is not datetime") - - -def is_lookback(key: str, value: int): - """Is lookback.""" - if value < 0 or value > 72: - raise ValueError(f"{key} actual {value}, is not between 0 and 72") - - -class Server(HTTPServer): - """Server. - - The errors returned do not match the API. - """ - - def __init__( - self, - server_address, - handler_class, - add_flowsheet_value_path: str, - get_flowsheet_rows_path: str, - ): - """__init__.""" - super().__init__(server_address, handler_class) - self.by_patient: Dict[ - Tuple[str, str], - Dict[Tuple[str, str], Dict[Tuple[str, str], Namespace]], - ] = {} - self.dispatch_url = { - parse_path(path)[0].lower(): method - for path, method in ( - (add_flowsheet_value_path, AddFlowsheetValue()), - (get_flowsheet_rows_path, GetFlowsheetRows()), - ) - } - - def by_patient_and_contact( - self, - entry: Namespace, - ) -> Dict[Tuple[str, str], Namespace]: - """By patient and contact.""" - key = (entry.patient_id, entry.patient_id_type) - by_contact = self.by_patient.get(key) - if by_contact is None: - by_contact = self.by_patient[key] = {} - - key = (entry.contact_id, entry.contact_id_type) - by_flowsheet = by_contact.get(key) - if by_flowsheet is None: - by_flowsheet = by_contact[key] = {} - return by_flowsheet - - def __call__(self): - """__call__.""" - self.serve_forever() - - -class ResponseHandler: - """ResponseHandler.""" - - def __call__( - self, - request, - url, - params, - body, - ): - """__call__.""" - raise NotImplementedError() - - def is_invalid( - self, - request, - url, - params, - body, - ) -> bool: - """Is invalid.""" - raise NotImplementedError() - - -class AddFlowsheetValue(ResponseHandler): - """AddFlowsheetValue.""" - - def __call__( - self, - request, - url, - params, - body, - ) -> None: - """__call__.""" - if self.is_invalid(request, url, params, body): - return - - entry = Namespace() - entry.comment = params["Comment"] - entry.contact_id = params["ContectID"] - entry.contact_id_type = params["ContactIDType"].lower() - entry.flowsheet_id = params["FlowsheetID"] - entry.flowsheet_id_type = params["FlowsheetIDType"].lower() - entry.flowsheet_template_id = params["FlowsheetTemplateID"] - entry.flowsheet_template_id_type = params[ - "FlowsheetTemplateIDType" - ].lower() - entry.instant = params["InstantValueTaken"] - entry.patient_id = params["PatientID"] - entry.patient_id_type = params["PatientIDType"].lower() - entry.user_id = params["UserID"] - entry.user_id_type = params["UserIDType"].lower() - entry.value = params["Value"] - - by_flowsheet = request.server.by_patient_and_contact(entry) - - key = (entry.flowsheet_id, entry.flowsheet_id_type) - entries = by_flowsheet.get(key) - if entries is None: - entries = by_flowsheet[key] = [] - entries.append(entry) - - response = { - "Errors": [], - "Success": True, - } - request.respond(200, response) - - def is_invalid( - self, - request, - url, - params, - body, - ) -> bool: - """Is invalid.""" - errors = request.check_headers() - try: - for key, validate in ( - ("Comment", None), - ("ContactID", None), - ("ContactIDType", None), - ("FlowsheetID", None), - ("FlowsheetIDType", None), - ("FlowsheetTemplateID", None), - ("FlowsheetTemplateIDType", None), - ("PatientID", None), - ("PatientIDType", equals("internal")), - ("UserID", None), - ("UserIDType", equals("internal")), - ("Value", is_float), - ("InstantValueTaken", is_datetime), - ): - value = params[key] - if validate is not None: - validate(key, value) - except (KeyError, ValueError) as e: - errors.append(e) - - if not errors: - return False - - # TODO: - # - errors do not match the spec - response = { - "Errors": [ - f"{error.__class__.__name__}: {error}" for error in errors - ], - "Success": False, - } - request.respond(400, response) - return True - - -class GetFlowsheetRows(ResponseHandler): - """Get flowsheet rows.""" - - def __call__( - self, - request, - url, - params, - body, - ) -> None: - """__call__.""" - json = loads(body) - if self.is_invalid(request, url, params, json): - return - - query = Namespace() - query.contact_id = json["ContactID"] - query.contact_id_type = json["ContactIDType"].lower() - query.patient_id = json["PatientID"] - query.patient_id_type = json["PatientIDType"].lower() - query.lookback_hours = json["LookbackHours"] - query.lookback = datetime.now() - timedelta( - hours=-int(query.lookback_hours) - ) - query.user_id = json["UserID"] - query.user_id_type = json["UserIDType"].lower() - query.flowsheet_row_ids = [ - Namespace( - flowsheet_id=each["ID"], - flowsheet_id_type=each["IDType"].lower(), - ) - for each in json["FlowsheetRowIDs"] - ] - # TODO some errors here if flowsheets are empty - # implying patient or contact DNE? - by_flowsheet = request.server.by_patient_and_contact(query) - response = { - "FlowsheetRows": [ - { - "FlowsheetColumns": [ - { - "Comment": entry.comment, - "FlowsheetRowID": [ - { - "ID": entry.flowsheet_id, - "IDType": entry.flowsheet_id_type.upper(), - }, - ], - "FormattedValue": entry.value, - "Instant": entry.instant.isoformat() + "Z", - "LinesDrainsAirwaysID": None, - "OrderIDs": [], - "RawValue": entry.value, - "UserEnteredBy": [ - { - "ID": entry.user_id, - "IDType": entry.user_id_type.upper(), - }, - ], - } - for key in query.flowsheet_row_ids - for entry in by_flowsheet(key) - if entry.instant >= query.lookback - ], - "Name": "Penn Signals Test Message", - "Occurence": "28", # ??? - "Unit": "", - } - ], - } - request.respond(200, response) - - def is_invalid(self, request, url, params, body): - """Is invalid.""" - errors = request.check_headers() - try: - for key, validate in ( - ("ContactID", None), - ("ContactIDType", None), - ("FlowsheetID", None), - ("FlowsheetIDType", None), - ("LookbackHours", is_lookback), - ("PatientID", None), - ("PatientIDType", equals("internal")), - ("UserID", None), - ("UserIDType", equals("internal")), - ): - value = body[key] - if validate is not None: - validate(key, value) - except (KeyError, ValueError) as e: - errors.append(e) - - try: - key = "FlowsheetRowIDs" - value = body[key] - for each in value: - _ = each["ID"] - _ = each["IDType"] - except (KeyError, ValueError) as e: - errors.append(e) - - if not errors: - return False - - response = { - "Errors": [ - f"{error.__class__.__name__}: {error}" for error in errors - ], - "Success": False, - } - request.respond(400, response) - return True - - -class RequestHandler(BaseHTTPRequestHandler): - """Request Handler.""" - - def do_POST(self): # noqa: N802 - """Do post.""" - url, params = parse_path(self.path) - url = url.lower() - content_length = int(self.headers.get("content-length", 0)) - body = self.rfile.read(content_length) - self.server.dispatch[url](self, url, params, body) - - def check_headers(self) -> List[Exception]: - """Check headers.""" - errors: List[Exception] = [] - for key, validate in ( - ("authorization", None), - ("content-type", equals("application/json")), - ("cookie", None), - ("epic-client-id", matches(r"\d{8}-\d{4}-\d{4}-\d{4}-\d{12}")), - ("epic-user-id", equals("PENNSIGNALS")), - ("epic-user-idtype", equals("external")), - ): - try: - value = self.headers.get(key, None) - if validate: - validate(key, value) - except (KeyError, ValueError) as e: - errors.append(e) - return errors - - def respond(self, code, response) -> None: - """Respond.""" - self.send_response(code) - self.send_header("content-type", "application/json") - self.end_headers() - self.wfile.write(dumps(response).encode("utf-8")) - self.wfile.close() diff --git a/src/dsdk/flowsheet.py b/src/dsdk/flowsheet.py new file mode 100644 index 0000000..607f912 --- /dev/null +++ b/src/dsdk/flowsheet.py @@ -0,0 +1,357 @@ +# -*- coding: utf-8 -*- +"""Epic.""" + +from abc import ABC +from base64 import b64encode +from contextlib import contextmanager +from datetime import datetime +from json import dumps, JSONDecodeError +from time import sleep as default_sleep +from typing import TYPE_CHECKING, Any, Dict, Generator, Optional +from urllib.parse import urlencode + +from cfgenvy import YamlMapping +from requests import Session +from requests.exceptions import ( + ConnectionError as RequestsConnectionError, + HTTPError, + Timeout, +) + +from .profile import Profile, profile +from .persistor import Persistor +from .service import Service +from .utils import configure_logger, retry + +logger = configure_logger(__name__) + +DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + + +class Result: # pylint: disable=too-few-public-methods + """Rest result.""" + + def __init__( + self, + *, + duration: Profile, + status: bool, + description: Optional[str] = None, + name: Optional[str] = None, + status_code: Optional[str] = None, + text: Optional[str] = None, + ): + """__init__.""" + self.duration = duration + self.status = status + self.description = description + self.name = name + self.status_code = status_code + self.text = text + + def __str__(self): + """__str__.""" + return str(self.__dict__) + + +class Flowsheet(YamlMapping): # pylint: disable=too-many-instance-attributes + """Flowsheet.""" + + TIMEOUT = dumps( + { + "key": "epic.rest.connection.or.timeout", + "prediction": "%s", + } + ) + HTTP_ERROR = dumps( + { + "key": "epic.rest.http.error", + "prediction": "%s", + } + ) + JSON_DECODE_ERROR = dumps( + {"key": "epic.rest.json.decode.error", "prediction": "%s"} + ) + SUCCESS = dumps( + { + "key": "epic.rest", + "prediction": "%s", + } + ) + + YAML = "!flowsheets" + + def __init__( # pylint: disable=too-many-locals + self, + *, + client_id: str, + cookie: str, + flowsheet_id: str, + flowsheet_template_id: str, + password: str, + url: str, + username: str, + contact_id_type: str = "CSN", + flowsheet_id_type: str = "external", + flowsheet_template_id_type: str = "external", + operation_timeout: int = 5, + patient_id_type: str = "UID", + user_id: str = "PENNSIGNALS", + user_id_type: str = "external", + ): + """__init__.""" + self.authorization = ( + b"Basic " + b64encode(f"EMP${username}:{password}".encode("utf-8")) + ).decode("utf-8") + self.client_id = client_id + self.contact_id_type = contact_id_type + self.cookie = cookie + self.flowsheet_id = flowsheet_id + self.flowsheet_id_type = flowsheet_id_type + self.flowsheet_template_id = flowsheet_template_id + self.flowsheet_template_id_type = flowsheet_template_id_type + self.operation_timeout = operation_timeout + self.password = password + self.patient_id_type = patient_id_type + self.url = url + self.user_id = user_id + self.user_id_type = user_id_type + self.username = username + + def as_yaml(self) -> Dict[str, Any]: + """As yaml.""" + return { + "client_id": self.client_id, + "contact_id_type": self.contact_id_type, + "cookie": self.cookie, + "flowsheet_id": self.flowsheet_id, + "flowsheet_id_type": self.flowsheet_id_type, + "flowsheet_template_id": self.flowsheet_template_id, + "flowsheet_template_id_type": self.flowsheet_template_id_type, + "operation_timeout": self.operation_timeout, + "password": self.password, + "patient_id_type": self.patient_id_type, + "url": self.url, + "user_id": self.user_id, + "user_id_type": self.user_id_type, + "username": self.username, + } + + def publish(self, persistor: Persistor): + """Yield results from rest call for adding flowsheets.""" + sql = persistor.sql + with persistor.rollback() as cur: + missings = persistor.df_from_query( + cur, sql.flowsheets.missing, {"dry_run": 0} + ) + with self.session() as session: + for _, missing in missings.iterrows(): + result = self.rest(missing, session) + with persistor.commit() as cur: + if result.status: + persistor.query( + cur, + sql.flowsheets.insert, + { + "dry_run": 0, + "id": missing["id"], + "profile_end": result.duration.end, + "profile_on": result.duration.on, + }, + ) + continue + persistor.query( + cur, + sql.flowsheets.errors.insert, + { + "description": result.description, + "dry_run": 0, + "name": result.name, + "prediction_id": missing["id"], + "profile_end": result.duration.end, + "profile_on": result.duration.on, + "status_code": result.status_code, + "text": result.text, + }, + ) + yield result + + @contextmanager + def session(self) -> Generator[Any, None, None]: + """Session.""" + session = Session() + session.verify = False + session.headers.update( + { + "authorization": self.authorization, + "content-type": "application/json", + "cookie": self.cookie, + "epic-client-id": self.client_id, + "epic-user-id": self.user_id, + "epic-user-idtype": self.user_id_type, + } + ) + yield session + + def rest( + self, + missing, + session: Session, + ) -> Result: + """Rest.""" + query = { + "Comment": missing["id"], + "ContactID": missing["csn"], + "ContactIDType": self.contact_id_type, + "FlowsheetID": self.flowsheet_id, + "FlowsheetIDType": self.flowsheet_id_type, + "FlowsheetTemplateID": self.flowsheet_template_id, + "FlowsheetTemplateIDType": self.flowsheet_template_id_type, + "InstantValueTaken": missing["as_of"].strftime(DATETIME_FORMAT), + "PatientID": missing["empi"], + "PatientIDType": self.patient_id_type, + "UserID": self.user_id, + "UserIDType": self.user_id_type, + "Value": missing["score"], + } + url = self.url + "?" + urlencode(query) + try: + with profile("dsdk.epic.rest") as interval: + response = self.on_rest( + session, url, {}, self.operation_timeout + ) + body = response.json() + response.raise_for_status() + except (RequestsConnectionError, Timeout) as e: + logger.error(self.TIMEOUT, missing["id"]) + return Result( + duration=interval, + status=False, + name=type(e).__name__, + text=str(e), + ) + except HTTPError as e: + logger.error(self.HTTP_ERROR, missing["id"]) + return Result( + duration=interval, + status=False, + description=body["ExceptionMessage"], + name=type(e).__name__, + status_code=response.status_code, + ) + except JSONDecodeError as e: + logger.error(self.JSON_DECODE_ERROR, missing["id"]) + return Result( + duration=interval, + status=False, + name=type(e).__name__, + status_code=response.status_code, + text=response.text, # this is also very verbose + ) + logger.info(self.SUCCESS, missing["id"]) + return Result( + duration=interval, + status=True, + description=body, + status_code=response.status_code, + ) + + @retry((RequestsConnectionError, Timeout)) + def on_rest( # pylint: disable=no-self-use + self, + session: Session, + url: str, + json: Dict, + timeout: int, + ): + """On post.""" + return session.post( + url=url, + json=json, + timeout=timeout, + ) + + def test( + self, + # csn=278820881, + csn=218202909, # inpatient admission date is 2019-02-06 at PAH + # csn="BAD202909", + empi="8330651951", + # empi="BAD2345678", + id=0, # pylint: disable=redefined-builtin + score="0.5", + ): + """Test epic API.""" + missing = { + "as_of": datetime.utcnow(), + "csn": csn, + "empi": empi, + "id": id, + "score": score, + } + with self.session() as session: + result = self.rest(missing, session) + print(result) + + +if TYPE_CHECKING: + BaseMixin = Service +else: + BaseMixin = ABC + + +class Mixin(BaseMixin): + """Mixin.""" + + @classmethod + def yaml_types(cls) -> None: + """Yaml types.""" + super().yaml_types() + Flowsheet.as_yaml_type() + + @classmethod + def publish_flowsheets(cls): + """Publish flowsheets.""" + with cls.context("flowsheets.publish") as service: + service.on_publish_flowsheets() + + @classmethod + def publish_flowsheet(cls): + """Flowsheets test.""" + with cls.context("flowsheets.publish") as service: + service.on_publish_flowsheet() + + def __init__( + self, + *, + poll_interval: int = 60, + flowsheets: Flowsheet, + **kwargs, + ): + """__init__.""" + self.poll_interval = poll_interval + self.flowsheets = flowsheets + super().__init__(**kwargs) + + def as_yaml(self) -> Dict[str, Any]: + """As yaml.""" + return { + "flowsheets": self.flowsheets, + "poll_interval": self.poll_interval, + **super().as_yaml(), + } + + def on_publish_flowsheet(self): + """On publish flowsheet.""" + self.flowsheets.test() + + def on_publish_flowsheets(self, sleep=default_sleep): + """On flowsheets.""" + while True: + for _ in self.publish(): + pass + sleep(self.poll_interval) + + def publish(self) -> Generator[Any, None, None]: + """Publish.""" + raise NotImplementedError() diff --git a/src/dsdk/service.py b/src/dsdk/service.py index d9fd55d..f3a8069 100644 --- a/src/dsdk/service.py +++ b/src/dsdk/service.py @@ -14,7 +14,6 @@ Callable, Dict, Generator, - List, Mapping, Optional, Sequence, @@ -205,18 +204,18 @@ class Service( # pylint: disable=too-many-instance-attributes VERSION = __version__ @classmethod - def as_yaml_type(cls, tag: Optional[str] = None) -> None: - """As yaml type.""" + def yaml_types(cls): + """Yaml types.""" Asset.as_yaml_type() Interval.as_yaml_type() - super().as_yaml_type(tag) + cls.as_yaml_type() @classmethod @contextmanager def context( cls, key: str, - argv: Optional[List[str]] = None, + argv: Optional[Sequence[str]] = None, env: Optional[Mapping[str, str]] = None, ): """Context.""" diff --git a/src/dsdk/utils.py b/src/dsdk/utils.py index 69640ac..c16044c 100644 --- a/src/dsdk/utils.py +++ b/src/dsdk/utils.py @@ -12,7 +12,7 @@ from pickle import load as pickle_load from sys import stderr, stdout from time import sleep as default_sleep -from typing import Any, Callable, Sequence +from typing import Any, Callable, Type, Sequence from dateutil import parser, tz @@ -107,7 +107,7 @@ def now_utc_datetime() -> datetime: def retry( - exceptions: Sequence[Exception], + exceptions: Sequence[Type[Exception]], retries: int = 60, delay: float = 1.0, backoff: float = 1.05, diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..4853786 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +"""Conftest.""" + +from contextlib import contextmanager +from typing import Any, Generator +from unittest.mock import Mock + +from pytest import fixture +from dsdk import Service, FlowsheetMixin, PostgresMixin + + +@contextmanager +def rollback(): + """Rollback contextmanager stub.""" + yield Mock() + + +class _MockFlowsheetsService( # pylint: disable=too-many-ancestors + PostgresMixin, + FlowsheetMixin, + Service, +): + """Mock Flowsheet Service.""" + + YAML = "!example" + + def __init__(self, postgres, **kwargs): + """__init__.""" + postgres = Mock() + postgres.rollback = rollback + postgres.commit = rollback + super().__init__(pipeline=None, postgres=postgres, **kwargs) + + def publish(self) -> Generator[Any, None, None]: + """Publish.""" + yield from self.flowsheets.publish(self.postgres) + + +@fixture +def mock_flowsheets_service(): + """Mock flowsheet service.""" + return _MockFlowsheetsService.parse() diff --git a/test/flowsheets.invalid.csn.yaml b/test/flowsheets.invalid.csn.yaml new file mode 100644 index 0000000..c3b11d2 --- /dev/null +++ b/test/flowsheets.invalid.csn.yaml @@ -0,0 +1,108 @@ +interactions: +- request: + body: '{}' + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '2' + User-Agent: + - python-requests/2.27.1 + authorization: + - EPIC_AUTHORIZATION + content-type: + - application/json + cookie: + - EPIC_COOKIE + epic-client-id: + - EPIC_CLIENT_ID + epic-user-id: + - PENNSIGNALS + epic-user-idtype: + - external + method: POST + uri: https://interconnectbgprod.uphs.upenn.edu/interconnect-prd-web/api/epic/2011/clinical/patient/addflowsheetvalue/flowsheetvalue?Comment=0&ContactID=999999999&ContactIDType=CSN&FlowsheetID=0000000000&FlowsheetIDType=external&FlowsheetTemplateID=0000000000&FlowsheetTemplateIDType=external&InstantValueTaken=2019-09-18T17%3A19%3A23Z&PatientID=8330651951&PatientIDType=UID&UserID=PENNSIGNALS&UserIDType=external&Value=0.5 + response: + body: + string: '{"Message":"An error has occurred.","ExceptionMessage":"An error occurred + while executing the command: EPT_DAT_RETRIEVAL_ERROR.","ExceptionType":"Epic.ServiceModel.Internal.ServiceCommandException","StackTrace":" at + Epic.Clinical.Generated.Services.Epic_Clinical_PatientController.v2011_ADDFLOWSHEETVALUE(String + PatientID, String PatientIDType, String ContactID, String ContactIDType, String + UserID, String UserIDType, String FlowsheetID, String FlowsheetIDType, String + Value, String Comment, Nullable`1 InstantValueTaken, String FlowsheetTemplateID, + String FlowsheetTemplateIDType)\r\n at lambda_method(Closure , Object , + Object[] )\r\n at System.Web.Http.Controllers.ReflectedHttpActionDescriptor.ActionExecutor.<>c__DisplayClass10.b__9(Object + instance, Object[] methodParameters)\r\n at System.Web.Http.Controllers.ReflectedHttpActionDescriptor.ExecuteAsync(HttpControllerContext + controllerContext, IDictionary`2 arguments, CancellationToken cancellationToken)\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Tracing.ITraceWriterExtensions.d__18`1.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.ApiControllerActionInvoker.d__0.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Tracing.ITraceWriterExtensions.d__18`1.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Filters.ActionFilterAttribute.d__5.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Web.Http.Filters.ActionFilterAttribute.d__5.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Filters.ActionFilterAttribute.d__0.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.ActionFilterResult.d__2.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Filters.AuthorizationFilterAttribute.d__2.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.AuthenticationFilterResult.d__0.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.ExceptionFilterResult.d__0.MoveNext()","InnerException":{"Message":"An + error has occurred.","ExceptionMessage":"An error occurred while executing + the command: EPT_DAT_RETRIEVAL_ERROR.","ExceptionType":"Epic.Core.Communication.EcfCommandException","StackTrace":" at + Epic.Core.Communication.Internal.EcfConnection.HandleErrorPacket(Byte[] response, + Int32 packetLength, Int32 startIndex, Command command, Int64 endTime, INetworkStream + networkStream)\r\n at Epic.Core.Communication.Internal.EcfConnection.BuildResponseFromPacket(Int32 + packetLength, Byte[] response, Command command, INetworkStream networkStream, + Int64 endTime, Boolean responseExpected, ProcessState& state, String& pauseMessage)\r\n at + Epic.Core.Communication.Internal.EcfConnection.Execute(Command command, String + instrumentationHeader)\r\n at Epic.Core.Communication.Connection.Execute(Command + command, Int32 lockAcquireTimeout)\r\n at Epic.Core.Communication.Command.Execute(Int32 + lockAcquireTimeout, EventHandler`1 asyncExecuteCompletedHandler)\r\n at + Epic.Clinical.Generated.Services.Epic_Clinical_PatientController.v2011_ADDFLOWSHEETVALUE(String + PatientID, String PatientIDType, String ContactID, String ContactIDType, String + UserID, String UserIDType, String FlowsheetID, String FlowsheetIDType, String + Value, String Comment, Nullable`1 InstantValueTaken, String FlowsheetTemplateID, + String FlowsheetTemplateIDType)"}}' + headers: + Cache-Control: + - no-cache,no-store + Content-Length: + - '5786' + Content-Type: + - application/json; charset=utf-8 + Date: + - Wed, 19 Jan 2022 20:14:44 GMT + Expires: + - '-1' + Pragma: + - no-cache + Server: + - Microsoft-IIS/8.5 + X-AspNet-Version: + - 4.0.30319 + X-Powered-By: + - ASP.NET + status: + code: 400 + message: 'An error occurred while executing the command: EPT_DAT_RETRIEVAL_ERROR.' +version: 1 diff --git a/test/flowsheets.invalid.empi.yaml b/test/flowsheets.invalid.empi.yaml new file mode 100644 index 0000000..5479dd4 --- /dev/null +++ b/test/flowsheets.invalid.empi.yaml @@ -0,0 +1,109 @@ +interactions: +- request: + body: '{}' + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '2' + User-Agent: + - python-requests/2.27.1 + authorization: + - EPIC_AUTHORIZATION + content-type: + - application/json + cookie: + - EPIC_COOKIE + epic-client-id: + - EPIC_CLIENT_ID + epic-user-id: + - PENNSIGNALS + epic-user-idtype: + - external + method: POST + uri: https://interconnectbgprod.uphs.upenn.edu/interconnect-prd-web/api/epic/2011/clinical/patient/addflowsheetvalue/flowsheetvalue?Comment=0&ContactID=218202909&ContactIDType=CSN&FlowsheetID=0000000000&FlowsheetIDType=external&FlowsheetTemplateID=0000000000&FlowsheetTemplateIDType=external&InstantValueTaken=2019-09-18T17%3A19%3A23Z&PatientID=9999999999&PatientIDType=UID&UserID=PENNSIGNALS&UserIDType=external&Value=0.5 + response: + body: + string: '{"Message":"An error has occurred.","ExceptionMessage":"An error occurred + while executing the command: EPT_ID_RETRIEVAL_ERROR details: 2:InvalidRecord:RecordNotFound:9999999999;UID.","ExceptionType":"Epic.ServiceModel.Internal.ServiceCommandException","StackTrace":" at + Epic.Clinical.Generated.Services.Epic_Clinical_PatientController.v2011_ADDFLOWSHEETVALUE(String + PatientID, String PatientIDType, String ContactID, String ContactIDType, String + UserID, String UserIDType, String FlowsheetID, String FlowsheetIDType, String + Value, String Comment, Nullable`1 InstantValueTaken, String FlowsheetTemplateID, + String FlowsheetTemplateIDType)\r\n at lambda_method(Closure , Object , + Object[] )\r\n at System.Web.Http.Controllers.ReflectedHttpActionDescriptor.ActionExecutor.<>c__DisplayClass10.b__9(Object + instance, Object[] methodParameters)\r\n at System.Web.Http.Controllers.ReflectedHttpActionDescriptor.ExecuteAsync(HttpControllerContext + controllerContext, IDictionary`2 arguments, CancellationToken cancellationToken)\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Tracing.ITraceWriterExtensions.d__18`1.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.ApiControllerActionInvoker.d__0.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Tracing.ITraceWriterExtensions.d__18`1.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Filters.ActionFilterAttribute.d__5.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Web.Http.Filters.ActionFilterAttribute.d__5.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Filters.ActionFilterAttribute.d__0.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.ActionFilterResult.d__2.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Filters.AuthorizationFilterAttribute.d__2.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.AuthenticationFilterResult.d__0.MoveNext()\r\n--- + End of stack trace from previous location where exception was thrown ---\r\n at + System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task + task)\r\n at System.Web.Http.Controllers.ExceptionFilterResult.d__0.MoveNext()","InnerException":{"Message":"An + error has occurred.","ExceptionMessage":"An error occurred while executing + the command: EPT_ID_RETRIEVAL_ERROR details: 2:InvalidRecord:RecordNotFound:9999999999;UID.","ExceptionType":"Epic.Core.Communication.EcfCommandException","StackTrace":" at + Epic.Core.Communication.Internal.EcfConnection.HandleErrorPacket(Byte[] response, + Int32 packetLength, Int32 startIndex, Command command, Int64 endTime, INetworkStream + networkStream)\r\n at Epic.Core.Communication.Internal.EcfConnection.BuildResponseFromPacket(Int32 + packetLength, Byte[] response, Command command, INetworkStream networkStream, + Int64 endTime, Boolean responseExpected, ProcessState& state, String& pauseMessage)\r\n at + Epic.Core.Communication.Internal.EcfConnection.Execute(Command command, String + instrumentationHeader)\r\n at Epic.Core.Communication.Connection.Execute(Command + command, Int32 lockAcquireTimeout)\r\n at Epic.Core.Communication.Command.Execute(Int32 + lockAcquireTimeout, EventHandler`1 asyncExecuteCompletedHandler)\r\n at + Epic.Clinical.Generated.Services.Epic_Clinical_PatientController.v2011_ADDFLOWSHEETVALUE(String + PatientID, String PatientIDType, String ContactID, String ContactIDType, String + UserID, String UserIDType, String FlowsheetID, String FlowsheetIDType, String + Value, String Comment, Nullable`1 InstantValueTaken, String FlowsheetTemplateID, + String FlowsheetTemplateIDType)"}}' + headers: + Cache-Control: + - no-cache,no-store + Content-Length: + - '5894' + Content-Type: + - application/json; charset=utf-8 + Date: + - Wed, 19 Jan 2022 20:14:44 GMT + Expires: + - '-1' + Pragma: + - no-cache + Server: + - Microsoft-IIS/8.5 + X-AspNet-Version: + - 4.0.30319 + X-Powered-By: + - ASP.NET + status: + code: 400 + message: 'An error occurred while executing the command: EPT_ID_RETRIEVAL_ERROR + details: 2:InvalidRecord:RecordNotFound:9999999999;UID.' +version: 1 diff --git a/test/flowsheets.valid.yaml b/test/flowsheets.valid.yaml new file mode 100644 index 0000000..30d3b0e --- /dev/null +++ b/test/flowsheets.valid.yaml @@ -0,0 +1,54 @@ +interactions: +- request: + body: '{}' + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '2' + User-Agent: + - python-requests/2.27.1 + authorization: + - EPIC_AUTHORIZATION + content-type: + - application/json + cookie: + - EPIC_COOKIE + epic-client-id: + - EPIC_CLIENT_ID + epic-user-id: + - PENNSIGNALS + epic-user-idtype: + - external + method: POST + uri: https://interconnectbgprod.uphs.upenn.edu/interconnect-prd-web/api/epic/2011/clinical/patient/addflowsheetvalue/flowsheetvalue?Comment=0&ContactID=218202909&ContactIDType=CSN&FlowsheetID=0000000000&FlowsheetIDType=external&FlowsheetTemplateID=0000000000&FlowsheetTemplateIDType=external&InstantValueTaken=2019-09-18T17%3A19%3A23Z&PatientID=8330651951&PatientIDType=UID&UserID=PENNSIGNALS&UserIDType=external&Value=0.5 + response: + body: + string: '{"Success":true,"Errors":[]}' + headers: + Cache-Control: + - no-cache,no-store + Content-Length: + - '28' + Content-Type: + - application/json; charset=utf-8 + Date: + - Wed, 19 Jan 2022 20:16:00 GMT + Expires: + - '-1' + Pragma: + - no-cache + Server: + - Microsoft-IIS/8.5 + X-AspNet-Version: + - 4.0.30319 + X-Powered-By: + - ASP.NET + status: + code: 200 + message: OK +version: 1 diff --git a/test/test_flowsheets.py b/test/test_flowsheets.py new file mode 100644 index 0000000..221d5c7 --- /dev/null +++ b/test/test_flowsheets.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +"""Test flowsheets.""" + +from pandas import DataFrame +from vcr import VCR + +vcr = VCR( + filter_headers=( + ("authorization", "EPIC_AUTHORIZATION"), + ("epic-client-id", "EPIC_CLIENT_ID"), + ("cookie", "EPIC_COOKIE"), + ), +) + + +@vcr.use_cassette("./test/flowsheets.valid.yaml") +def test_valid(mock_flowsheets_service): + """Test valid flowsheet.""" + service = mock_flowsheets_service + + postgres = service.postgres + postgres.df_from_query.return_value = DataFrame( + [ + # inpatient admission date is 2019-02-06 at PAH + { + "as_of": service.as_of, + "csn": 218202909, + "empi": "8330651951", + "id": 0, + "run_id": 0, + "score": 0.5, + } + ] + ) + for result in service.publish(): + assert result.status is True + assert result.status_code == 200 + + +@vcr.use_cassette("./test/flowsheets.invalid.csn.yaml") +def test_invalid_csn(mock_flowsheets_service): + """Test invalid csn.""" + service = mock_flowsheets_service + + postgres = service.postgres + postgres.df_from_query.return_value = DataFrame( + [ + { + "as_of": service.as_of, + "csn": 999999999, + "empi": "8330651951", + "id": 0, + "run_id": 0, + "score": 0.5, + } + ] + ) + expected = ( + "An error occurred while executing the command: " + "EPT_DAT_RETRIEVAL_ERROR." + ) + + for result in service.publish(): + assert result.description == expected + assert result.status is False + assert result.status_code == 400 + assert result.name == "HTTPError" + + +@vcr.use_cassette("./test/flowsheets.invalid.empi.yaml") +def test_invalid_empi(mock_flowsheets_service): + """Test invalid empi.""" + service = mock_flowsheets_service + + postgres = service.postgres + postgres.df_from_query.return_value = DataFrame( + [ + { + "as_of": service.as_of, + "csn": 218202909, + "empi": "9999999999", + "id": 0, + "run_id": 0, + "score": 0.5, + } + ] + ) + expected = ( + "An error occurred while executing the command: " + "EPT_ID_RETRIEVAL_ERROR details: " + "2:InvalidRecord:RecordNotFound:9999999999;UID." + ) + + for result in service.publish(): + assert result.description == expected + assert result.status is False + assert result.status_code == 400 + assert result.name == "HTTPError" diff --git a/test/test_postgres.py b/test/test_postgres.py deleted file mode 100644 index 59d13fb..0000000 --- a/test/test_postgres.py +++ /dev/null @@ -1,146 +0,0 @@ -# -*- coding: utf-8 -*- -"""Test postgres.""" - -from contextlib import contextmanager -from os import environ as os_env -from typing import Any, Generator - -from pandas import DataFrame, read_sql_query - -from dsdk import Asset, Batch, Postgres, configure_logger -from dsdk.model import Batch as ModelBatch - -logger = configure_logger(__name__) - - -class Persistor(Postgres): - """Persistor.""" - - def __init__( - self, - env=None, - **kwargs, - ): - """__init__.""" - if env is None: - env = os_env - self.attempts = 0 - super().__init__( - username=kwargs.get( - "username", env.get("POSTGRES_USERNAME", "postgres") - ), - password=kwargs.get( - "password", env.get("POSTGRES_PASSWORD", "postgres") - ), - host=kwargs.get("host", env.get("POSTGRES_HOST", "postgres")), - port=kwargs.get("port", int(env.get("POSTGRES_PORT", "5432"))), - database=kwargs.get( - "database", env.get("POSTGRES_DATABASE", "test") - ), - schema=kwargs.get("schema", env.get("POSTGRES_SCHEMA", "example")), - sql=Asset.build( - path=kwargs.get( - "sql", env.get("POSTGRES_SQL", "./assets/postgres") - ), - ext=".sql", - ), - ) - - @contextmanager - def connect(self) -> Generator[Any, None, None]: - """Connect.""" - self.attempts += 1 - with super().connect() as con: - yield con - - -def test_connect(): - """Test connect.""" - persistor = Persistor() - with persistor.connect() as con: - logger.info(con.info) - - -def test_cursor(): - """Test cursor.""" - persistor = Persistor() - with persistor.rollback() as cur: - cur.execute("""select 1 as n""") - for row in cur.fetchall(): - n, *_ = row - assert n == 1 - - -def test_open_run( - data=( - (0, 0.75, True, False, False), - (1, 0.25, True, False, False), - (2, 0.75, False, True, False), - (3, 0.25, False, True, False), - (4, 0.75, False, False, True), - (5, 0.25, False, False, True), - ), - in_columns=( - "subject_id", - "greenish", - "is_animal", - "is_vegetable", - "is_mineral", - ), - check=""" -select - run_id, - subject_id, - score, - greenish, - is_animal, - is_vegetable, - is_mineral -from - predictions - natural join features id -where - run_id = %(run_id)s""", -): - """Test open_run.""" - batch = Batch( - as_of=None, - duration=None, - microservice_version="1.0.0", - time_zone=None, - ) - persistor = Persistor() - model_batch = ModelBatch(model_version="1.0.0", parent=batch) - with persistor.open_run(parent=model_batch) as run: - df = DataFrame(data=list(data), columns=in_columns) - df.set_index("subject_id") - df["score"] = ~df["is_mineral"] * ( - (df["is_animal"] * df["greenish"]) - + (df["is_vegetable"] * (1.0 - df["greenish"])) - ) - run.predictions = df - - with persistor.rollback() as cur: - cur.execute(f"set search_path={persistor.schema}") - df = read_sql_query( - sql=check, con=cur.connection, params={"run_id": run.id} - ) - df.set_index("subject_id") - - # reorder columns to match run.predictions - df = df[run.predictions.columns] - # logger.error(df.head(10)) - # logger.error(run.predictions.head(10)) - assert df.equals(run.predictions) - - -def test_retry_connect(): - """Test retry_connect.""" - - -def test_store_evidence(): - """Test store evidence.""" - - -def test_store_df(): - """Test store_df."""