Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor message serialisation and deserialisation #197

Merged
merged 36 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5cf1111
Refactor message serialization and deserialization
milanmlft Dec 19, 2023
2299519
Rename `utils.py` -> `message.py`
milanmlft Dec 19, 2023
0bfcab0
Add `decode()` method for `SerialisedMessage`
milanmlft Dec 19, 2023
b9e4a59
Update docstring
milanmlft Dec 19, 2023
558f694
Use new classes in message testing
milanmlft Dec 19, 2023
fef8779
Refactor message processing in the CLI
milanmlft Dec 19, 2023
e239299
Refactor `process_message` to use `SerialisedMessage` class in EHR API
milanmlft Dec 19, 2023
596e85c
Refactor `process_message` to use `SerialisedMessage` class in imagin…
milanmlft Dec 19, 2023
7f160a2
Fix `ImagingStudy` initalisation in `ImagingStudy.from_message()`
milanmlft Dec 19, 2023
31a6f29
Fix imports
milanmlft Dec 19, 2023
9a8f5a9
Fix test: access serialised message bodies
milanmlft Dec 19, 2023
aa78318
Turn `Message` into a `dataclass`
milanmlft Dec 19, 2023
cc5b12c
Fix failing tests
milanmlft Dec 19, 2023
b8dfad0
Use `jsonpickle` for (de)serializing messages
milanmlft Dec 20, 2023
f1b848d
Fix `test_deserialise_datetime()` so it uses the `Message` class to a…
milanmlft Dec 20, 2023
8719153
Add `study_datetime` property for `Message`
milanmlft Dec 20, 2023
b08a9c4
No need to test deserialising individual fields, already covered by `…
milanmlft Dec 20, 2023
3e7c553
Remove `study_date_from_serialised()`, use the class attribute `study…
milanmlft Dec 20, 2023
e8c0cc9
Revert "Add `study_datetime` property for `Message`"
milanmlft Dec 20, 2023
fb44bf6
Remove `Messages` class, use `list[Message]` instead
milanmlft Dec 20, 2023
6af792e
Add type checking for messages parsed from parquet input
milanmlft Dec 20, 2023
0e4fce4
Update `test_messages_from_parquet()` to use JSON strings instead of …
milanmlft Dec 20, 2023
99ffd00
Update `PixlProducer.publish()` to use a list of Message objects and …
milanmlft Dec 20, 2023
986633d
Convert JSON string to bytes when serialising
milanmlft Dec 20, 2023
d2c05ca
Revert "Update `test_messages_from_parquet()` to use JSON strings ins…
milanmlft Dec 20, 2023
cd3e101
`PixlProducer.publish()` should take a `list[Message]` as input in tests
milanmlft Dec 20, 2023
0b855e1
Update EHR API to use new `Message` design
milanmlft Dec 20, 2023
9a4d6a8
Update imaging API to use new `Message` design
milanmlft Dec 20, 2023
baad796
Update deserialise function to accept bytes-encoded JSON string
milanmlft Dec 20, 2023
a81e285
Assert messages against list of `Message`s
milanmlft Dec 20, 2023
8583ca0
Print dataclass in logs
milanmlft Dec 20, 2023
7c32e8d
`jsonpickle.decode()` can handle bytes so no need to decode first
milanmlft Dec 20, 2023
ae90838
Make `deserialisable` a keyword only argument
milanmlft Dec 20, 2023
2c49dc3
Copilot forgot to convert dates to datetimes 🥲
milanmlft Dec 20, 2023
db1e34d
Refactor PixlConsumer run method to accept Message object as callback…
milanmlft Dec 20, 2023
7343fcd
Update consumer in `test_subscriber` to accept Message object instead…
milanmlft Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 29 additions & 53 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
import datetime
import json
import os
from operator import attrgetter
from pathlib import Path
from typing import Any, Optional

import click
import pandas as pd
import requests
import yaml
from core.patient_queue.message import Message, deserialise
from core.patient_queue.producer import PixlProducer
from core.patient_queue.subscriber import PixlBlockingConsumer
from core.patient_queue.utils import deserialise, serialise

from ._logging import logger, set_log_level
from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty
Expand Down Expand Up @@ -84,12 +85,12 @@ def populate(queues: str, *, restart: bool, parquet_dir: Path) -> None:
if state_filepath.exists() and restart:
logger.info(f"Extracting messages from state: {state_filepath}")
inform_user_that_queue_will_be_populated_from(state_filepath)
messages = Messages.from_state_file(state_filepath)
messages = messages_from_state_file(state_filepath)
elif parquet_dir is not None:
messages = messages_from_parquet(parquet_dir)

remove_file_if_it_exists(state_filepath) # will be stale
producer.publish(sorted(messages, key=study_date_from_serialised))
producer.publish(sorted(messages, key=attrgetter("study_datetime")))


@cli.command()
Expand Down Expand Up @@ -273,41 +274,26 @@ def state_filepath_for_queue(queue_name: str) -> Path:
return Path(f"{queue_name.replace('/', '_')}.state")


class Messages(list):
def messages_from_state_file(filepath: Path) -> list[Message]:
"""
Class to represent messages
Return messages from a state file path

Methods
-------
from_state_file(cls, filepath)
Return messages from a state file path
:param filepath: Path for state file to be read
:return: A list of Message objects containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

@classmethod
def from_state_file(cls, filepath: Path) -> "Messages":
"""
Return messages from a state file path

:param filepath: Path for state file to be read
:return: A Messages object containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

return cls(
[
line.encode("utf-8")
for line in Path.open(filepath).readlines()
if string_is_non_empty(line)
]
)
return [
deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line)
]


def messages_from_parquet(dir_path: Path) -> Messages:
def messages_from_parquet(dir_path: Path) -> list[Message]:
"""
Reads patient information from parquet files within directory structure
and transforms that into messages.
Expand Down Expand Up @@ -345,9 +331,6 @@ def messages_from_parquet(dir_path: Path) -> Messages:
f"{expected_col_names}"
)

# First line is column names
messages = Messages()

for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = f"csv file expected to have at least {expected_col_names} as " f"column names"
Expand All @@ -367,17 +350,19 @@ def messages_from_parquet(dir_path: Path) -> Messages:
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"])

messages = []

for _, row in cohort_data.iterrows():
messages.append(
serialise(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=row[dt_col_name],
procedure_occurrence_id=row[procedure_occurrence_id],
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
)
# Create new dict to initialise message
message = Message(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=row[dt_col_name],
procedure_occurrence_id=row[procedure_occurrence_id],
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
)
messages.append(message)

if len(messages) == 0:
msg = f"Failed to find any messages in {dir_path}"
Expand Down Expand Up @@ -446,12 +431,3 @@ def api_config_for_queue(queue_name: str) -> APIConfig:
raise ValueError(msg)

return APIConfig(config[config_key])


def study_date_from_serialised(message: bytes) -> datetime.datetime:
"""Get the study date from a serialised message as a datetime"""
result = deserialise(message)["study_datetime"]
if not isinstance(result, datetime.datetime):
msg = "Expected study date to be a datetime. Got %s"
raise TypeError(msg, type(result))
return result
6 changes: 5 additions & 1 deletion cli/tests/test_messages_from_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pathlib import Path

from core.patient_queue.message import Message
from pixl_cli.main import messages_from_parquet


Expand All @@ -25,6 +26,9 @@ def test_messages_from_parquet(resources: Path) -> None:
"""
omop_parquet_dir = resources / "omop"
messages = messages_from_parquet(omop_parquet_dir)
assert all(isinstance(msg, Message) for msg in messages)

message_bodies = [msg.serialise(deserialisable=False) for msg in messages]
expected_messages = [
b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", '
b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", '
Expand All @@ -40,4 +44,4 @@ def test_messages_from_parquet(resources: Path) -> None:
b'"omop_es_timestamp": "2023-12-07T14:08:58"}',
]

assert messages == expected_messages
assert message_bodies == expected_messages
3 changes: 2 additions & 1 deletion pixl_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ dependencies = [
"pika==1.3.1",
"aio_pika==8.2.4",
"environs==9.5.0",
"requests==2.31.0"
"requests==2.31.0",
"jsonpickle==3.0.2"
]

[project.optional-dependencies]
Expand Down
74 changes: 74 additions & 0 deletions pixl_core/src/core/patient_queue/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Classes to represent messages in the patient queue."""

import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from jsonpickle import decode, encode

logger = logging.getLogger(__name__)


@dataclass
class Message:
"""Class to represent a message containing the relevant information for a study."""

mrn: str
accession_number: str
study_datetime: datetime
procedure_occurrence_id: str
project_name: str
omop_es_timestamp: datetime

def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002
"""
Serialise the message into a JSON string and convert to bytes.

:param deserialisable: If True, the serialised message will be deserialisable, by setting
the unpicklable flag to False in jsonpickle.encode(), meaning that the original Message
object can be recovered by `deserialise()`. If False, calling `deserialise()` on the
serialised message will return a dictionary.
"""
msg = (
"Serialising message with\n"
" * patient id: %s\n"
" * accession number: %s\n"
" * timestamp: %s\n"
" * procedure_occurrence_id: %s\n",
" * project_name: %s\n * omop_es_timestamp: %s",
self.mrn,
self.accession_number,
self.study_datetime,
self.procedure_occurrence_id,
self.project_name,
self.omop_es_timestamp,
)
logger.debug(msg)

return str.encode(encode(self, unpicklable=deserialisable))


def deserialise(serialised_msg: bytes) -> Any:
"""
Deserialise a message from a bytes-encoded JSON string.
If the message was serialised with `deserialisable=True`, the original Message object will be
returned. Otherwise, a dictionary will be returned.

:param serialised_msg: The serialised message.
"""
return decode(serialised_msg.decode("utf-8")) # noqa: S301
12 changes: 9 additions & 3 deletions pixl_core/src/core/patient_queue/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import logging
from time import sleep

from core.patient_queue.message import Message

from ._base import PixlBlockingInterface

LOGGER = logging.getLogger(__name__)
Expand All @@ -24,19 +26,23 @@
class PixlProducer(PixlBlockingInterface):
"""Generic publisher for RabbitMQ"""

def publish(self, messages: list[bytes]) -> None:
def publish(self, messages: list[Message]) -> None:
"""
Sends a list of serialised messages to a queue.
:param messages: list of messages to be sent to queue
"""
LOGGER.debug("Publishing %i messages to queue: %s", len(messages), self.queue_name)
if len(messages) > 0:
for msg in messages:
LOGGER.debug("Serialising message")
serialised_msg = msg.serialise()
LOGGER.debug("Preparing to publish")
self._channel.basic_publish(exchange="", routing_key=self.queue_name, body=msg)
self._channel.basic_publish(
exchange="", routing_key=self.queue_name, body=serialised_msg
)
# RabbitMQ can miss-order messages if there is not a sufficient delay
sleep(0.1)
LOGGER.debug("Message %s published to queue %s", msg.decode(), self.queue_name)
LOGGER.debug("Message %s published to queue %s", serialised_msg, self.queue_name)
else:
LOGGER.debug("List of messages is empty so nothing will be published to queue.")

Expand Down
70 changes: 0 additions & 70 deletions pixl_core/src/core/patient_queue/utils.py

This file was deleted.

Loading