diff --git a/assets/postgres/epic/notifications/recent.sql b/assets/postgres/epic/notifications/recent.sql new file mode 100644 index 0000000..2d856cf --- /dev/null +++ b/assets/postgres/epic/notifications/recent.sql @@ -0,0 +1,31 @@ +with args as ( + select + cast(%(notification_id)s as int) as notification_id +) +select + n.id + p.id as prediction_id, + p.run_id, + p.csn, + p.empi, + p.score, + r.as_of +from + args + join runs as r + upper(r.interval) != 'infinity' + join predictions as p on + p.run_id = r.id + join epic_notifications as n on + n.prediction_id = p.id + and n.id <= args.notification_id + left join epic_verifcations as v on + v.notification_id = n.id + left join epic_verification_errors as e on + e.notification_id = n.id + and e.acknowledged_on is null +where + v.id is null + and e.id is null +order by + r.id, p.id diff --git a/assets/postgres/epic/notifications/recover.sql b/assets/postgres/epic/notifications/recover.sql index 2d79207..965a810 100644 --- a/assets/postgres/epic/notifications/recover.sql +++ b/assets/postgres/epic/notifications/recover.sql @@ -16,6 +16,7 @@ from v.notification_id = n.id left join epic_verification_errors as e on e.notification_id = n.id + and e.acknowledged_on is null where v.id is null and e.id is null diff --git a/assets/postgres/predictions/recent.sql b/assets/postgres/predictions/recent.sql new file mode 100644 index 0000000..3597bbc --- /dev/null +++ b/assets/postgres/predictions/recent.sql @@ -0,0 +1,28 @@ +with args as ( + select + cast(%(prediction_id)s as int) as prediction_id +) +select + p.id, + p.run_id, + p.csn, + p.empi, + p.score, + r.as_of +from + args + join runs as r + upper(r.interval) != 'infinity' + join predictions as p on + p.id <= args.prediction_id + and p.run_id = r.id + left join epic_notifications as n on + n.prediction_id = p.id + left join epic_notification_errors as e on + e.prediction_id = p.id + and e.acknowledged_on is null +where + n.id is null + and e.id is null +order by + r.id, p.id diff --git a/assets/postgres/predictions/recover.sql b/assets/postgres/predictions/recover.sql index 5eedec6..fbbbe29 100644 --- a/assets/postgres/predictions/recover.sql +++ b/assets/postgres/predictions/recover.sql @@ -19,4 +19,4 @@ where n.id is null and e.id is null order by - p.id + r.id, p.id diff --git a/postgres/sql/patchdb.d/007.epic.sql b/postgres/sql/patchdb.d/007.epic.sql index 5fe215a..f62d2c1 100644 --- a/postgres/sql/patchdb.d/007.epic.sql +++ b/postgres/sql/patchdb.d/007.epic.sql @@ -48,7 +48,7 @@ begin on delete cascade on update cascade ); - create trigger epci_verifications_inserted after insert on epic_verifications + create trigger epic_verifications_inserted after insert on epic_verifications referencing new table as inserted for each statement execute procedure call_notify(); diff --git a/setup.py b/setup.py index a64bfe5..d017932 100644 --- a/setup.py +++ b/setup.py @@ -12,12 +12,11 @@ "numpy>=1.15.4", "pandas>=0.23.4", "pip>=21.2.2", + "requests>=2.26.0", "setuptools>=57.4.0", "wheel>=0.35.1", ) -EPIC_REQUIRES = ("requests",) - PYMSSQL_REQUIRES = ("cython>=0.29.21", "pymssql>=2.1.4") PSYCOPG2_REQUIRES = ("psycopg2-binary>=2.8.6",) @@ -56,17 +55,14 @@ setup( entry_points={ "console_scripts": [ - "epic-notify = dsdk.epic:Notify.main", - "epic-validate = dsdk.epic:Validate.main", + "epic-notify = dsdk.epic:Notifier.main", + "epic-validate = dsdk.epic:Validator.main", + "epic-notify-test = dsdk.epic:Notifier.test", + "epic-validate-test = dsdk.epic:Notifier.test", ] }, extras_require={ - "all": ( - EPIC_REQUIRES - + PSYCOPG2_REQUIRES - + PYMSSQL_REQUIRES - + TEST_REQUIRES - ), + "all": (PSYCOPG2_REQUIRES + PYMSSQL_REQUIRES + TEST_REQUIRES), "psycopg2": PSYCOPG2_REQUIRES, "pymssql": PYMSSQL_REQUIRES, "test": TEST_REQUIRES, diff --git a/src/dsdk/epic.py b/src/dsdk/epic.py index 100ac74..eaa9682 100644 --- a/src/dsdk/epic.py +++ b/src/dsdk/epic.py @@ -5,19 +5,19 @@ from contextlib import contextmanager from datetime import datetime from logging import getLogger -from select import select -from typing import Any, Dict, Optional, Tuple +from os import getcwd +from os.path import join as pathjoin +from select import select # pylint: disable=no-name-in-module +from typing import Any, Dict, Generator, Optional, Tuple, Union, cast from urllib.parse import quote -from cfgenvy import yaml_type +from cfgenvy import Parser, yaml_type +from requests import Session +from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.exceptions import HTTPError, Timeout from .postgres import Persistor as Postgres -try: - from requests import Session -except ImportError: - Session = None - logger = getLogger(__name__) @@ -53,6 +53,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals password: str, url: str, flowsheet_id: str, + postgres: Postgres, comment: str = "Not for clinical use.", contact_id_type: str = "CSN", flowsheet_id_type: str = "external", @@ -60,16 +61,17 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals flowsheet_template_id_type: str = "external", lookback_hours: int = 72, patient_id_type: str = "UID", - poll_timeout: int = 300, + poll_timeout: int = 60, + operation_timeout: int = 5, username: str = "Pennsignals", user_id_type: str = "external", user_id: str = "PENNSIGNALS", ): """__init__.""" - self.authorization = b"Basic " + b64encode( - f"EMP${username}:{password}".encode("utf-8") - ) - self.postgres = None + self.authorization = ( + b"Basic " + b64encode(f"EMP${username}:{password}".encode("utf-8")) + ).decode("utf-8") + self.postgres = postgres self.client_id = client_id self.comment = comment self.contact_id_type = contact_id_type @@ -79,6 +81,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals self.flowsheet_template_id = flowsheet_template_id self.flowsheet_template_id_type = flowsheet_template_id_type self.lookback_hours = lookback_hours + self.operation_timeout = operation_timeout self.patient_id_type = patient_id_type self.poll_timeout = poll_timeout self.url = url @@ -91,7 +94,7 @@ def __call__(self): with self.listener() as listener: with self.postgres.commit() as cur: self.recover(cur, session) - for each in listener: + for each in self.listen(listener): self.on_notify(each, cur, session) def as_yaml(self) -> Dict[str, Any]: @@ -107,6 +110,7 @@ def as_yaml(self) -> Dict[str, Any]: "flowsheet_template_id": self.flowsheet_template_id, "flowsheet_template_id_type": self.flowsheet_template_id_type, "lookback_hours": self.lookback_hours, + "operation_timeut": self.operation_timeout, "patient_id_type": self.patient_id_type, "poll_timeout": self.poll_timeout, "url": self.url, @@ -115,11 +119,11 @@ def as_yaml(self) -> Dict[str, Any]: } @contextmanager - def listener(self): + def listener(self) -> Generator[Any, None, None]: """Listener.""" raise NotImplementedError() - def listen(self, listen, cur, session): + def listen(self, listen) -> Generator[Any, None, None]: """Listen.""" while True: readers, _, exceptions = select( @@ -133,11 +137,11 @@ def listen(self, listen, cur, session): while listen.notifies: yield listen.notified.pop() - def on_notify( # pylint: disable: unused-argument,no-self-use + def on_notify( # pylint: disable=unused-argument,no-self-use self, event, cur, - session, + session: Session, ): """On postgres notify handler.""" logger.debug( @@ -149,42 +153,46 @@ def on_notify( # pylint: disable: unused-argument,no-self-use }, ) - def on_success(self, entity, cur, response): + def on_success(self, entity, cur, content: Dict[str, Any]): """On success.""" raise NotImplementedError() - def on_error(self, entity, cur, response): + def on_error(self, entity, cur, content: Exception): """On error.""" raise NotImplementedError() - def recover(self, cur, session): + def recover(self, cur, session: Session): """Recover.""" sql = self.postgres.sql cur.execute(sql.epic.prediction.recover) for each in cur.fetchall(): - ok, message = self.rest(each, session) - if ok: - self.on_success(each, cur, message) - else: - self.on_error(each, cur, message) + ok, content = self.rest(each, session) + if not ok: + self.on_error(each, cur, cast(Exception, content)) + continue + self.on_success(each, cur, cast(Dict[str, Any], content)) - def rest(self, entity, session): + def rest( + self, + entity, + session: Session, + ) -> Tuple[bool, Union[Exception, Dict[str, Any]]]: """Rest.""" raise NotImplementedError() @contextmanager - def session(self): + def session(self) -> Generator[Any, None, None]: """Session.""" session = Session() session.verify = False session.headers.update( { - "Authorization": self.authorization, - "Content-Type": "application/json", - "Cookie": self.cookie, - "Epic-Client-ID": self.client_id, - "Epic-User-ID": self.user_id, - "Epic-User-IDType": self.user_id_type, + "authorization": self.authorization, + "content-type": "application/json", + "cookie": self.cookie, + "epic-client-id": self.client_id, + "epic-user-id": self.user_id, + "epic-user-idtype": self.user_id_type, } ) yield session @@ -195,6 +203,42 @@ class Notifier(Epic): YAML = "!epicnotifier" + @classmethod + def test( + cls, + csn="278820881", + empi="8330651951", + id=0, # pylint: disable=redefined-builtin + score="0.5", + ): + """Test epic API.""" + cls.as_yaml_type() + Postgres.as_yaml_type() + parser = Parser() + cwd = getcwd() + + notifier = parser.parse( + argv=( + "-c", + pathjoin(cwd, "local", "test.notifier.yaml"), + "-e", + pathjoin(cwd, "secrets", "test.notifier.env"), + ) + ) + + prediction = { + "as_of": datetime.utcnow(), + "csn": csn, + "empi": empi, + "id": id, + "score": score, + } + + with notifier.session() as session: + ok, response = notifier.rest(prediction, session) + print(ok) + print(response) + @classmethod def as_yaml_type(cls, tag: Optional[str] = None): """As yaml type.""" @@ -216,11 +260,11 @@ def _yaml_repr(cls, dumper, self, *, tag): return dumper.represent_mapper(tag, self.as_yaml()) @contextmanager - def listener(self): + def listener(self) -> Generator[Any, None, None]: """Listener.""" postgres = self.postgres sql = postgres.sql - with postgres.listen(sql.prediction.listen) as listener: + with postgres.listen(sql.predictions.listen) as listener: yield listener def on_notify(self, event, cur, session): @@ -229,76 +273,80 @@ def on_notify(self, event, cur, session): sql = self.postgres.sql cur.execute(sql.prediction.recent, event.id) for each in cur.fetchall(): - ok, message = self.rest(each, session) - if ok: - self.on_success(each, cur, message) - else: - self.on_error(each, cur, message) + ok, content = self.rest(each, session) + if not ok: + self.on_error(each, cur, cast(Exception, content)) + continue + self.on_success(each, cur, cast(Dict[str, Any], content)) - def on_success(self, notification, cur, response): + def on_success(self, entity, cur, content: Dict[str, Any]): """On success.""" cur.execute( self.postgres.sql.epic.notification.insert, - {"prediction_id": notification["id"]}, + {"prediction_id": entity["id"]}, ) - def on_error(self, notification, cur, response): + def on_error(self, entity, cur, content: Exception): """On error.""" cur.execute( - self.postgres.sql.epic.notification_error.insert, + self.postgres.sql.epic.notifications.errors.insert, { - "description": response.text, - "name": response.reason, - "prediction_id": notification["id"], + "description": str(content), + "name": content.__class__.__name__, + "prediction_id": entity["id"], }, ) - def recover(self, cur, session): + def recover(self, cur, session: Session): """Recover.""" sql = self.postgres.sql cur.execute(sql.epic.notification.recover) for each in cur.fetchall(): - ok, message = self.rest(each, session) - if ok: - self.on_success(each, cur, message) - else: - self.on_error(each, cur, message) + ok, content = self.rest(each, session) + if not ok: + self.on_error(each, cur, cast(Exception, content)) + continue + self.on_success(each, cur, cast(Dict[str, Any], content)) - def rest(self, prediction, session) -> Tuple[bool, Any]: + def rest( + self, + entity, + session: Session, + ) -> Tuple[bool, Union[Exception, Dict[str, Any]]]: """Rest.""" query = { "Comment": self.comment, - "ContactID": prediction["csn"], + "ContactID": entity["csn"], "ContactIDType": self.contact_id_type, "FlowsheetID": self.flowsheet_id, "FlowsheetIDType": self.flowsheet_id_type, "FlowsheetTemplateID": self.flowsheet_template_id, "FlowsheetTemplateIDType": self.flowsheet_template_id_type, - "InstantValueTaken": prediction["as_of"].strftime( + "InstantValueTaken": entity["as_of"].strftime( "%Y-%m-%dT%H:%M:%SZ" ), - "PatientID": prediction["empi"], + "PatientID": entity["empi"], "PatientIDType": self.patient_id_type, "UserID": self.user_id, "UserIDType": self.user_id_type, - "Value": prediction["score"], + "Value": entity["score"], } - url = self.url.format( **{ key: quote(value.encode("utf-8")) for key, value in query.items() } ) - - print(url) - - response = session.post( - url=url, - data={}, - ) - - return response.status_code == 200, response + try: + response = session.post( + url=url, + json={}, + timeout=self.operation_timeout, + ) + response.raise_for_status() + except (RequestsConnectionError, HTTPError, Timeout) as e: + return False, e + return True, response.json() class Verifier(Epic): @@ -306,48 +354,88 @@ class Verifier(Epic): YAML = "!epicverifier" + @classmethod + def test( + cls, + csn="278820881", + empi="8330651951", + id=0, # pylint: disable=redefined-builtin + score="0.5", + ): + """Test verifier.""" + cls.as_yaml_type() + Postgres.as_yaml_type() + parser = Parser() + cwd = getcwd() + + verifier = parser.parse( + argv=( + "-c", + pathjoin(cwd, "local", "test.verifier.yaml"), + "-e", + pathjoin(cwd, "secrets", "test.verifier.env"), + ) + ) + + notification = { + "as_of": datetime.utcnow(), + "csn": csn, + "empi": empi, + "id": id, + "score": score, + } + + with verifier.session() as session: + ok, response = verifier.rest(notification, session) + print(ok) + print(response) + @contextmanager - def listener(self): + def listener(self) -> Generator[Any, None, None]: """Listener.""" postgres = self.postgres sql = postgres.sql with postgres.listen(sql.notification.listen) as listener: yield listener - def on_notify(self, event, cur, session): + def on_notify(self, event, cur, session: Session): """On notify.""" - super.on_notify(event, cur, session) + super().on_notify(event, cur, session) sql = self.postgres.sql cur.execute(sql.notification.recent, event.id) for each in cur.fetchall(): - ok, response = self.rest(each, session) - if ok: - self.on_success(each, session, response) - else: - self.on_error(each, session, response) + ok, content = self.rest(each, session) + if not ok: + self.on_error(each, cur, cast(Exception, content)) + continue + self.on_success(each, cur, cast(Dict[str, Any], content)) - def on_success(self, notification, cur, response): + def on_success(self, entity, cur, content: Dict[str, Any]): """On success.""" cur.execute( self.postgres.sql.epic.verifications.insert, - {"prediction_id": notification["id"]}, + {"notification_id": entity["id"]}, ) - def on_error(self, notification, cur, response): + def on_error(self, entity, cur, content: Exception): """On error.""" cur.execute( self.postgres.sql.epic.verifications.errors.insert, { - "description": response.text, - "name": response.reason, - "notification_id": notification["id"], + "description": str(content), + "name": content.__class__.__name__, + "notification_id": entity["id"], }, ) - def rest(self, notification, session) -> Tuple[bool, Any]: + def rest( + self, + entity, + session: Session, + ) -> Tuple[bool, Union[Exception, Dict[str, Any]]]: """Rest.""" json = { - "ContactID": notification["csn"], + "ContactID": entity["csn"], "ContactIDType": self.contact_id_type, "FlowsheetRowIDs": [ { @@ -356,95 +444,18 @@ def rest(self, notification, session) -> Tuple[bool, Any]: } ], "LookbackHours": self.lookback_hours, - "PatientID": notification["empi"], + "PatientID": entity["empi"], "PatientIDType": self.patient_id_type, "UserID": self.user_id, "UserIDType": self.user_id_type, } - response = session.post( - url=self.url, - json=json, - ) - return response.status_code == 200, response - - -def test_notifier(csn, empi, score): - """Rest.""" - from os import getcwd - from os.path import join as pathjoin - - from cfgenvy import Parser - - from .asset import Asset - - Asset.as_yaml_type() - Postgres.as_yaml_type() - Notifier.as_yaml_type() - parser = Parser() - cwd = getcwd() - - notifier = parser.parse( - argv=( - "-c", - pathjoin(cwd, "local", "test.notifier.yaml"), - "-e", - pathjoin(cwd, "secrets", "test.notifier.env"), - ) - ) - - prediction = { - "as_of": datetime.utcnow(), - "csn": csn, - "empi": empi, - "score": score, - } - - with notifier.session() as session: - ok, response = notifier.rest(prediction, session) - print(ok) - print(response.json()) - - -def test_verifier(csn, empi, score): - """Test verifier.""" - from os import getcwd - from os.path import join as pathjoin - - from cfgenvy import Parser - - from .asset import Asset - - Asset.as_yaml_type() - Postgres.as_yaml_type() - Verifier.as_yaml_type() - parser = Parser() - cwd = getcwd() - - verifier = parser.parse( - argv=( - "-c", - pathjoin(cwd, "local", "test.verifier.yaml"), - "-e", - pathjoin(cwd, "secrets", "test.verifier.env"), - ) - ) - - notification = { - "as_of": datetime.utcnow(), - "csn": csn, - "empi": empi, - "score": score, - } - - with verifier.session() as session: - ok, response = verifier.rest(notification, session) - print(ok) - print(response.json()) - - -if __name__ == "__main__": - test_notifier( - csn="278820881", - empi="8330651951", - score="0.5", - ) + try: + response = session.post( + url=self.url, + json=json, + timeout=self.operation_timeout, + ) + response.raise_for_status() + except (RequestsConnectionError, HTTPError, Timeout) as e: + return False, e + return True, response.json() diff --git a/src/dsdk/persistor.py b/src/dsdk/persistor.py index ebf7f6f..2e5e969 100644 --- a/src/dsdk/persistor.py +++ b/src/dsdk/persistor.py @@ -233,6 +233,7 @@ class Persistor(AbstractPersistor): @classmethod def as_yaml_type(cls, tag: Optional[str] = None) -> None: """As yaml type.""" + Asset.as_yaml_type() yaml_type( cls, tag or cls.YAML,