Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy parquet files to exports directory #182

Merged
merged 5 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ dmypy.json

# VS Code
.vscode

# project specific files
/exports/
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ repos:
pass_filenames: false
entry: mypy
args: ['--config-file=setup.cfg']
additional_dependencies: ['mypy', 'types-PyYAML', 'types-requests']
additional_dependencies: ['mypy', 'types-PyYAML', 'types-requests', 'types-python-slugify']
18 changes: 18 additions & 0 deletions cli/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""CLI testing fixtures."""
import pathlib

import pytest
from core.omop import OmopExtract


@pytest.fixture()
def omop_files(tmp_path_factory: pytest.TempPathFactory) -> OmopExtract:
"""Create an OmopFiles instance using a temporary directory"""
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved
export_dir = tmp_path_factory.mktemp("repo_base")
return OmopExtract(export_dir)


@pytest.fixture()
def resources() -> pathlib.Path:
"""Test resources directory path."""
return pathlib.Path(__file__).parent / "resources"
Binary file added cli/tests/resources/omop/public/CARE_SITE.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added cli/tests/resources/omop/public/LOCATION.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added cli/tests/resources/omop/public/PERSON.parquet
Binary file not shown.
Binary file not shown.
Binary file added cli/tests/resources/omop/public/SPECIMEN.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
95 changes: 95 additions & 0 deletions cli/tests/test_copy_omop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test copying of OMOP ES data for later export."""
import datetime

import pytest


def test_new_project_copies(omop_files, resources):
"""
Given a valid export directory and hasn't been exported before
When copy to exports is run
Then the public files should be copied and symlinked to the latest export directory
"""
# ARRANGE
input_dir = resources / "omop"
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
# ACT
omop_files.copy_to_exports(input_dir, project_name, input_date)
# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"

# check public files copied
specific_export_dir = (
output_base / "all_extracts" / "omop" / "2020-06-10t18-00-00" / "public"
)
assert (specific_export_dir).exists()
expected_files = [x.stem for x in (input_dir / "public").glob("*.parquet")]
output_files = [x.stem for x in (specific_export_dir).glob("*.parquet")]
assert expected_files == output_files
# check that symlinked files exist
symlinked_dir = output_base / "latest" / "omop" / "public"
symlinked_files = list(symlinked_dir.glob("*.parquet"))
assert expected_files == [x.stem for x in symlinked_files]
assert symlinked_dir.is_symlink()


def test_second_export(omop_files, resources):
"""
Given one export already exists for the project
When a second export with a different timestamp is run for the same project
Then there should be two export directories in the all_extracts dir,
and the symlinked dir should point to the most recently copied dir
"""
# ARRANGE
input_dir = resources / "omop"
project_name = "Really great cool project"
first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files.copy_to_exports(input_dir, project_name, first_export_datetime)
second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00")

# ACT
omop_files.copy_to_exports(input_dir, project_name, second_export_datetime)

# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"
specific_export_dir = (
output_base / "all_extracts" / "omop" / "2020-07-10t18-00-00" / "public"
)
assert specific_export_dir.exists()
# check that symlinked files are the most recent export
symlinked_dir = output_base / "latest" / "omop" / "public"
assert symlinked_dir.readlink() == specific_export_dir
previous_export_dir = (
output_base / "all_extracts" / "omop" / "2020-06-10t18-00-00" / "public"
)
assert symlinked_dir.readlink() != previous_export_dir
assert previous_export_dir.exists()


def test_project_with_no_public(omop_files, resources):
"""
Given an export directory which has no "public" subdirectory
When copy to exports is run
Then an assertion error will be raised
"""
input_dir = resources
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
with pytest.raises(FileNotFoundError) as error_info:
omop_files.copy_to_exports(input_dir, project_name, input_date)

assert error_info.match("Could not find public")
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ volumes:
orthanc-anon-data:
orthanc-raw-data:
postgres-data:
exports:

networks:
pixl-net:
Expand Down Expand Up @@ -246,6 +247,8 @@ services:
retries: 5
networks:
- pixl-net
volumes:
- ${PWD}/exports:/run/exports

pacs-api:
build:
Expand Down
29 changes: 29 additions & 0 deletions pixl_core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,32 @@ The client of choice for RabbitMQ at this point in time is [pika](https://pika.r
asynchronous way of transferring messages. The former is geared towards high data throughput whereas the latter is geared towards stability.
The asynchronous mode of transferring messages is a lot more complex as it is based on the
[asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html).


### OMOP ES files

Public parquet exports from OMOP ES that should be transferred outside the hospital are copied to the `exports` directory at the repository base.

Within this directory each project has a directory, with all extracts run stored in `all_extracts` and the `latest` directory
contains a symlink to the most extract. This symlinking means that during the export stage it is clear which export should be sent.
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved

```
└── project-1
├── all_extracts
│ └── omop
│ ├── 2020-06-10t18-00-00
│ │ └── public
│ └── 2020-07-10t18-00-00
│ └── public
└── latest
└── omop
└── public -> ../../../ all_extracts / omop / 2020-07-10t18-00-00 / public
└── project-2
├── all_extracts
│ └── omop
│ └── 2023-12-13t16-22-40
│ └── public
└── latest
└── omop
└── public -> ../../../ all_extracts / omop / 2023-12-13t16-22-40 / public
```
1 change: 1 addition & 0 deletions pixl_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"fastapi==0.103.2",
"token-bucket==0.3.0",
"python-decouple==3.6",
"python-slugify==8.0.1",
"pika==1.3.1",
"aio_pika==8.2.4",
"environs==9.5.0",
Expand Down
72 changes: 72 additions & 0 deletions pixl_core/src/core/omop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Processing of OMOP parquet files."""
import datetime
import pathlib
import shutil

import slugify

root_from_install = pathlib.Path(__file__).parents[3]


class OmopExtract:
"""Processing Omop extracts on the filesystem."""

def __init__(self, root_dir: pathlib.Path = root_from_install) -> None:
"""Create instance of OMOP file helper."""
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved
self.export_dir = root_dir / "exports"

@staticmethod
def _get_slugs(
project_name: str, extract_datetime: datetime.datetime
) -> tuple[str, str]:
"""Convert project name and datetime to slugs for writing to filesystem."""
project_slug = slugify.slugify(project_name)
extract_time_slug = slugify.slugify(extract_datetime.isoformat())
return project_slug, extract_time_slug

def copy_to_exports(
self,
omop_dir: pathlib.Path,
project_name: str,
extract_datetime: datetime.datetime,
) -> str:
"""
Copy public omop directory as the latest extract for the project.

Creates directories if they don't already exist.
:param omop_dir: parent path for omop export, with a "public" subdirectory
:param project_name: name of the project
:param extract_datetime: datetime that the OMOP ES extract was run
:raises FileNotFoundError: if there is no public subdirectory in `omop_dir`
:returns str: the project slug, so this can be registered for export to the DSH
"""
public_input = omop_dir / "public"
if not public_input.exists():
msg = f"Could not find public directory in input {omop_dir}"
raise FileNotFoundError(msg)

# Make directory for exports if they don't exist
project_slug, extract_time_slug = self._get_slugs(
project_name, extract_datetime
)
export_base = self.export_dir / project_slug
public_output = OmopExtract._mkdir(
export_base / "all_extracts" / "omop" / extract_time_slug / "public"
)

# Copy extract files, overwriting if it exists
shutil.copytree(public_input, public_output, dirs_exist_ok=True)
# Make the latest export dir if it doesn't exist
latest_parent_dir = self._mkdir(export_base / "latest" / "omop")
# Symlink this extract to the latest directory
latest_public = latest_parent_dir / "public"
if latest_public.exists():
latest_public.unlink()

latest_public.symlink_to(public_output, target_is_directory=True)
return project_slug

@staticmethod
def _mkdir(directory: pathlib.Path) -> pathlib.Path:
directory.mkdir(parents=True, exist_ok=True)
return directory
Loading