diff --git a/.github/workflows/PR-validation.yml b/.github/workflows/PR-validation.yml new file mode 100644 index 00000000..c39ae7d2 --- /dev/null +++ b/.github/workflows/PR-validation.yml @@ -0,0 +1,66 @@ +name: PR validation +on: + pull_request: + types: [synchronize, opened, reopened, edited] + branches: + - main +jobs: + pipeline-seq-retrieval-container-image-build: + name: pipeline/seq_retrieval container-image build + runs-on: ubuntu-22.04 + defaults: + run: + shell: bash + working-directory: ./pipeline/seq_retrieval/ + steps: + - name: Check out repository code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + sparse-checkout: | + pipeline/seq_retrieval/ + - name: Build container image + run: | + make container-image + pipeline-seq-retrieval-python-typing-check: + name: pipeline/seq_retrieval python typing check + runs-on: ubuntu-22.04 + defaults: + run: + shell: bash + working-directory: ./pipeline/seq_retrieval/ + steps: + - name: Check out repository code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + sparse-checkout: | + pipeline/seq_retrieval/ + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Check python typing + run: | + make check-python-typing + pipeline-seq-retrieval-python-style-check: + name: pipeline/seq_retrieval python style check + runs-on: ubuntu-22.04 + defaults: + run: + shell: bash + working-directory: ./pipeline/seq_retrieval/ + steps: + - name: Check out repository code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + sparse-checkout: | + pipeline/seq_retrieval/ + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Check python style + run: | + make check-python-style + #TODO: add unit testing + #TODO: add integration testing diff --git a/.gitignore b/.gitignore index 68bc17f9..af90381a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,13 @@ -# Byte-compiled / optimized / DLL files +# Python files +## Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class -# C extensions +## C extensions *.so -# Distribution / packaging +## Distribution / packaging .Python build/ develop-eggs/ @@ -26,17 +27,17 @@ share/python-wheels/ *.egg MANIFEST -# PyInstaller +## PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec -# Installer logs +## package installer logs pip-log.txt pip-delete-this-directory.txt -# Unit test / coverage reports +## Unit test / coverage reports htmlcov/ .tox/ .nox/ @@ -51,57 +52,57 @@ coverage.xml .pytest_cache/ cover/ -# Translations +## Translations *.mo *.pot -# Django stuff: +## Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal -# Flask stuff: +## Flask stuff: instance/ .webassets-cache -# Scrapy stuff: +## Scrapy stuff: .scrapy -# Sphinx documentation +## Sphinx documentation docs/_build/ -# PyBuilder +## PyBuilder .pybuilder/ target/ -# Jupyter Notebook +## Jupyter Notebook .ipynb_checkpoints -# IPython +## IPython profile_default/ ipython_config.py -# pyenv +## pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version -# pipenv +## pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock -# poetry +## poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control #poetry.lock -# pdm +## pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it @@ -109,17 +110,17 @@ ipython_config.py # https://pdm.fming.dev/#use-with-ide .pdm.toml -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +## PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ -# Celery stuff +## Celery stuff celerybeat-schedule celerybeat.pid -# SageMath parsed files +## SageMath parsed files *.sage.py -# Environments +## Environments .env .venv env/ @@ -128,33 +129,36 @@ ENV/ env.bak/ venv.bak/ -# Spyder project settings +## Spyder project settings .spyderproject .spyproject -# Rope project settings +## Rope project settings .ropeproject -# mkdocs documentation +## mkdocs documentation /site -# mypy +## mypy .mypy_cache/ .dmypy.json dmypy.json -# Pyre type checker +## Pyre type checker .pyre/ -# pytype static type analyzer +## pytype static type analyzer .pytype/ -# Cython debug symbols +## Cython debug symbols cython_debug/ -# PyCharm +## PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# VS Code +.vscode/ diff --git a/pipeline/seq_retrieval/.flake8 b/pipeline/seq_retrieval/.flake8 new file mode 100644 index 00000000..02b9d28d --- /dev/null +++ b/pipeline/seq_retrieval/.flake8 @@ -0,0 +1,6 @@ +[flake8] +ignore = E203, E266, E501, F401, W503, E501 +max-complexity = 18 +select = B,C,E,F,W,T4 +exclude = venv +per-file-ignores = __init__.py:F401 diff --git a/pipeline/seq_retrieval/Dockerfile b/pipeline/seq_retrieval/Dockerfile new file mode 100644 index 00000000..5043e735 --- /dev/null +++ b/pipeline/seq_retrieval/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-alpine + +WORKDIR /usr/src/app + +RUN apk add --no-cache build-base zlib-dev bzip2-dev xz-dev + +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ ./ + +ENTRYPOINT [ "python", "main.py"] +CMD [ "--help" ] diff --git a/pipeline/seq_retrieval/Makefile b/pipeline/seq_retrieval/Makefile new file mode 100644 index 00000000..6f2acd17 --- /dev/null +++ b/pipeline/seq_retrieval/Makefile @@ -0,0 +1,34 @@ +.PHONY: check-venv-active check-python-typing + +container-image: + docker build -t agr_pavi/seq_retrieval . +python-dependencies: + pip install -r requirements.txt + +python-dependencies-update: + pip install -U -r requirements.txt + +python-test-dependencies: + pip install -r tests/requirements.txt + +check-python-typing: python-dependencies python-test-dependencies + mypy --install-types --non-interactive src/main.py + +check-python-style: python-dependencies python-test-dependencies + flake8 ./ + +check-venv-active: +ifeq ($(VIRTUAL_ENV),) + @echo 'No active python virtual environment found.'\ + 'Please active the virtual environment first by running `source venv/bin/activate`,'\ + 'or read README.md for instructions on how to set up a new one.' + @exit 1 +else + @: +endif + +python-dependencies-dev: check-venv-active python-dependencies + +python-dependencies-dev-update: check-venv-active python-dependencies-update + +check-python-typing-dev: check-venv-active check-python-typing diff --git a/pipeline/seq_retrieval/README.md b/pipeline/seq_retrieval/README.md new file mode 100644 index 00000000..add19607 --- /dev/null +++ b/pipeline/seq_retrieval/README.md @@ -0,0 +1,124 @@ +# PAVI Sequence retrieval +This subdirectory contains all code and configs for the PAVI sequence retrieval component. + +## Development +### Local dev environment +In order to enable isolated local development that does not interfere with the global system python setup, +a virtual environment is used to do code development and testing for this component. + +To start developing on a new system, create a virtual environment using the following command +(having this directory as your working directory): +```bash +python3.12 -m venv ./venv +``` + +Then when developing this component, activated the virtual environment with: +```bash +source venv/bin/activate +``` +Once the virtual environment is activated, all packages will be installed in this isolated virtual environment. +To install all python package dependencies: +```bash +pip install -r requirements.txt +``` + +To upgrade all packages (already installed) to the latest available version matching the requirements: +```bash +pip install -U -r requirements.txt +``` + +Once ending development for this component, deactivate the virtual environment with: +```bash +deactivate +``` + +### Code guidelines +#### Input/output typing and checking +**TL;DR** +Type hints are required wherever possible and `mypy` is used to enforce this on PR validation. +To check all code part of the seq_retrieval module in this folder (on local development), run: +```shell +make check-python-typing-dev +``` + +**Detailed explanation** +Altought the Python interpreter always uses dynamic typing, it is recommended to use type hints wherever possible, +to ensure all functions and methods always receive the expected type as to not get caught by unexpected errors. + +As an example, the following untyped python function will define local cache usage behavior taking a boolean as toggle: +```python +def set_local_cache_reuse(reuse): + """ + Define whether or not data_file_mover will reuse files in local cache where already available pre-execution. + + Args: + reuse (bool): set to `True` to enable local cache reuse behavior (default `False`) + """ + global _reuse_local_cache + _reuse_local_cache = reuse + if reuse: + print("Local cache reuse enabled.") + else: + print("Local cache reuse disabled.") +``` + +When called with boolean values, this function works just fine: +```python +>>> set_local_cache_reuse(True) +Local cache reuse enabled. +>>> set_local_cache_reuse(False) +Local cache reuse disabled. +``` +However when calling with a String instead of a boolean, you may get unexpected behaviour: +```python +>>> set_local_cache_reuse("False") +Local cache reuse enabled. +``` +This happens because Python dynamically types and converts types at runtime, +and all strings except empty ones are converted to boolean value `True`. + +To prevent this, add type hints to your code: +```python +def set_local_cache_reuse(reuse: bool) -> None: + """ + Define whether or not data_file_mover will reuse files in local cache where already available pre-execution. + + Args: + reuse (bool): set to `True` to enable local cache reuse behavior (default `False`) + """ + global _reuse_local_cache + _reuse_local_cache = reuse + if reuse: + print("Local cache reuse enabled.") + else: + print("Local cache reuse disabled.") +set_local_cache_reuse("False") +``` + +Type hints themselves are not enforced at runtime, and will thus not stop the code from running (incorrectly), +but using `myPy` those errors can be revealed before merging this code. Storing the above code snippet in a file +called `set_local_cache_reuse.py` and running `myPy` on it gives the following result: +```shell +> mypy set_local_cache_reuse.py +set_local_cache_reuse.py:9: error: Name "_reuse_local_cache" is not defined [name-defined] +set_local_cache_reuse.py:14: error: Argument 1 to "set_local_cache_reuse" has incompatible type "str"; expected "bool" [arg-type] +Found 2 errors in 1 file (checked 1 source file) +``` +With the myPy output, we can now return to the code and fix the errors reported +which would otherwise result in silent unexpected behavior and bugs. + +To run `mypy` on the seq_retrieval module in this folder in your local development environment, +run the following shell command: +```shell +make check-python-typing-dev +``` + +`mypy` checks are automatically run and enforced as part of the PR validation +and all reported errors must be fixed to enable merging each PR into `main`. +If the `pipeline/seq_retrieval check python typing` status check fails on a PR in github, +click the details link and inspect the failing step output for hints on what to fix. + +#### Code documentation (docstrings) +All modules, functions, classes and methods should have their input, attributes and output documented +through docstrings to make the code easy to read and understand for anyone reading it. +To ensure this is done in a uniform way accross all code, follow the [Google Python Style Guide on docstrings](https://google.github.io/styleguide/pyguide.html#38-comments-and-docstrings). diff --git a/pipeline/seq_retrieval/requirements.txt b/pipeline/seq_retrieval/requirements.txt new file mode 100644 index 00000000..ae3751d8 --- /dev/null +++ b/pipeline/seq_retrieval/requirements.txt @@ -0,0 +1,4 @@ +biopython==1.83 +click==8.1.* +pysam==0.22.* +requests==2.31.* diff --git a/pipeline/seq_retrieval/src/data_mover/data_file_mover.py b/pipeline/seq_retrieval/src/data_mover/data_file_mover.py new file mode 100644 index 00000000..435c908d --- /dev/null +++ b/pipeline/seq_retrieval/src/data_mover/data_file_mover.py @@ -0,0 +1,167 @@ +""" +Module used to find, access and copy files at/to/from remote locations +""" +import os.path +from pathlib import Path +from typing import Dict, Optional +import requests +from urllib.parse import urlparse, unquote + +_stored_files: Dict[str, str] = dict() +"""Module level memory cache of filepaths for all files accessed through this module.""" + +_DEFAULT_DIR = '/tmp/pavi/' +"""Module level default destination directory to search/download remote files in/to.""" + +_reuse_local_cache = False +""" +Module level toggle defining local file cache reuse behaviour. + * When True, reuse identically named files existing pre-runtime at local destination location for remote files + * When False, delete identically named files existing pre-runtime at local destination location for remote files before downloading. + +Change the value through the `set_local_cache_reuse` function. +""" + + +def set_local_cache_reuse(reuse: bool) -> None: + """ + Define data_file_mover module-level behaviour on local file cache reuse. + + data_file_mover will + * reuse identically named files at local target location when available pre-runtime when set to `True`. + * remove identically named files at local target location when present pre-runtime when set to `False`. + + Args: + reuse (bool): set to `True` to enable local cache reuse behavior (default `False`) + """ + global _reuse_local_cache + _reuse_local_cache = reuse + + +def is_accessible_url(url: str) -> bool: + """ + Check whether provided `url` is an accessible (remote) URL + + Args: + url: URL to be checked for accessability + + Returns: + `True` when provided `url` is an accessible URL, `False` otherwise + """ + response = requests.head(url) + if response.ok: + return True + else: + return False + + +def fetch_file(url: str, dest_dir: str = _DEFAULT_DIR, reuse_local_cache: Optional[bool] = None) -> str: + """ + Fetch file from URL and return its local path. + + Parses `url` to determine scheme and sends it to the appropriate data_file_mover function for retrieval. + Result is cached in the data_file_mover module's `_stored_files` memory cache, which is used to speed up + repeated retrieval. + + Args: + url: URL to fetch local file for + dest_dir: Destination directory to search/download remote files in/to. + reuse_local_cache: Argument to override local cache reuse behavior defined at data_file_mover level. + + Returns: + Absolute path to local file matching the requested URL (string). + """ + global _stored_files + local_path = None + if url not in _stored_files.keys(): + url_components = urlparse(url) + if url_components.scheme == 'file': + filepath = url_components.netloc + url_components.path + local_path = find_local_file(filepath) + else: + local_path = download_from_url(url, dest_dir, reuse_local_cache=reuse_local_cache) + _stored_files[url] = local_path + else: + local_path = _stored_files[url] + + return local_path + + +def find_local_file(path: str) -> str: + """ + Find a file locally based on path and return its absolute path. + + Args: + path: path to local file to find, can be relative. + + Returns: + Absolute path to file found at `path` (string). + + Raises: + `FileNotFoundError`: If `path` is not found, or is not a file. + """ + if not os.path.exists(path): + raise FileNotFoundError(f"No file found at path '{path}'.") + else: + if not os.path.isfile(path): + raise FileNotFoundError(f"Specified path '{path}' exists but is not a file.") + else: + return str(Path(path).resolve()) + + +def download_from_url(url: str, dest_dir: str = _DEFAULT_DIR, chunk_size: int = 10 * 1024, reuse_local_cache: Optional[bool] = None) -> str: + """ + Download file from remote URL and return its absolute local path. + + Parses `url` to determine scheme and downloads the remote file to local filesystem as appropriate. + When local cache reuse is enabled, does not download but returns identically named files at `dest_dir` location when found. + When local cache reuse is disabled, removes identically named files at `dest_dir` location when found before initiating download. + + Args: + url: URL to remote file to download + dest_dir: Destination directory to search/download remote file in/to. + chunk_size: Chunk size use while downloading + reuse_local_cache: Argument to override local cache reuse behavior defined at data_file_mover level. + + Returns: + Absolute path to the found/downloaded file (string). + + Raises: + `ValueError`: if `url` scheme is not supported or `url` is not accessible. + """ + + if reuse_local_cache is None: + reuse_local_cache = _reuse_local_cache + + url_components = urlparse(url) + if url_components.scheme in ['http', 'https']: + + if not is_accessible_url(url): + raise ValueError(f"URL {url} is not accessible.") + + Path(dest_dir).mkdir(parents=True, exist_ok=True) + + filename = unquote(os.path.basename(url_components.path)) + local_file_path = os.path.join(dest_dir, filename) + + if os.path.exists(local_file_path) and os.path.isfile(local_file_path): + if reuse_local_cache is True: + # Return the local file path without downloading new content + return str(Path(local_file_path).resolve()) + else: + os.remove(local_file_path) + + # Download file through streaming to support large files + tmp_file_path = f"{local_file_path}.part" + response = requests.get(url, stream=True) + + with open(tmp_file_path, mode="wb") as local_file: + for chunk in response.iter_content(chunk_size=chunk_size): + local_file.write(chunk) + + os.rename(tmp_file_path, local_file_path) + + return str(Path(local_file_path).resolve()) + else: + # Currently not supported + raise ValueError(f"URL with scheme '{url_components.scheme}' is currently not supported.") diff --git a/pipeline/seq_retrieval/src/main.py b/pipeline/seq_retrieval/src/main.py new file mode 100644 index 00000000..0e4b0e62 --- /dev/null +++ b/pipeline/seq_retrieval/src/main.py @@ -0,0 +1,114 @@ +""" +Main module serving the CLI for PAVI sequence retrieval. + +Retrieves multiple sequence regions and returns them as one chained sequence. +""" +import click +from typing import Any, Dict, List, Literal +import json + +import data_mover.data_file_mover as data_file_mover +from seq_region import SeqRegion, chain_seq_region_seqs + + +def validate_strand_param(ctx, param, value: str) -> Literal['+', '-']: + """ + Processes and normalises the value of click input argument `strand`. + + Returns: + A normalised version of strings representing a strand: '-' or '+' + + Raises: + click.BadParameter: If an unrecognised string was provided. + """ + POS_CHOICES = ['+', '+1', 'pos'] + NEG_CHOICES = ['-', '-1', 'neg'] + if value in POS_CHOICES: + return '+' + elif value in NEG_CHOICES: + return '-' + else: + raise click.BadParameter(f"Must be one of {POS_CHOICES} for positive strand, or {NEG_CHOICES} for negative strand.") + + +def process_seq_regions_param(ctx, param, value: str) -> List[Dict[str, Any]]: + """ + Parse the value of click input parameter seq_regions and validate it's structure. + + Value is expected to be a JSON-formatted list of sequence regions to retrieve. + Each region should have: + * a 'start' property indicating the region start (1-based, inclusive) + * a 'end' property indicating the region end (1-based, inclusive) + + Returns: + List of dicts representing start and end of seq region + + Raises: + click.BadParameter: If value could not be parsed as JSON or had an invalid structure or values. + """ + seq_regions = None + try: + seq_regions = json.loads(value) + except Exception: + raise click.BadParameter("Must be a valid JSON-formatted string.") + else: + if not isinstance(seq_regions, list): + raise click.BadParameter("Must be a valid list (JSON-array) of sequence regions to retrieve.") + for region in seq_regions: + if not isinstance(region, dict): + raise click.BadParameter(f"Region {region} is not a valid dict. All regions in seq_regions list must be valid dicts (JSON-objects).") + if 'start' not in region.keys(): + raise click.BadParameter(f"Region {region} does not have a 'start' property, which is a required property.") + if 'end' not in region.keys(): + raise click.BadParameter(f"Region {region} does not have a 'end' property, which is a required property.") + if not isinstance(region['start'], int): + raise click.BadParameter(f"'start' property of region {region} is not an integer. All positions must be integers.") + if not isinstance(region['end'], int): + raise click.BadParameter(f"'end' property of region {region} is not an integer. All positions must be integers.") + + return seq_regions + + +@click.command() +@click.option("--seq_id", type=click.STRING, required=True, + help="The sequence ID to retrieve sequences for.") +@click.option("--seq_strand", type=click.STRING, default='+', callback=validate_strand_param, + help="The sequence strand to retrieve sequences for.") +@click.option("--seq_regions", type=click.UNPROCESSED, required=True, callback=process_seq_regions_param, + help="A list of sequence regions to retrieve sequences for.") +@click.option("--fasta_file_url", type=click.STRING, required=True, + help="""URL to (faidx-indexed) fasta file to retrieve sequences from.\ + Assumes additional index files can be found at `.fai`, + and at `.gzi` if the fastafile is compressed. + Use "file://*" URL for local file or "http(s)://*" for remote files.""") +@click.option("--reuse_local_cache", is_flag=True, + help="""When defined and using remote `fasta_file_url`, reused local files + if file already exists at destination path, rather than re-downloading and overwritting.""") +def main(seq_id: str, seq_strand: str, seq_regions: List, fasta_file_url: str, reuse_local_cache: bool): + """ + Main method for sequence retrieval from JBrowse faidx indexed fasta files. Receives input args from click. + + Prints a single (transcript) sequence obtained by concatenating the sequence of + all sequence regions requested (in positional order defined by specified seq_strand). + """ + + click.echo(f"Received request to retrieve sequences for {seq_id}, strand {seq_strand}, seq_regions {seq_regions}!") + + data_file_mover.set_local_cache_reuse(reuse_local_cache) + + seq_region_objs = [] + for region in seq_regions: + seq_region_objs.append(SeqRegion(seq_id=seq_id, start=region['start'], end=region['end'], strand=seq_strand, + fasta_file_url=fasta_file_url)) + + for seq_region in seq_region_objs: + # Retrieve sequence for region + seq_region.fetch_seq() + + # Concatenate all regions into single sequence + seq_concat = chain_seq_region_seqs(seq_region_objs, seq_strand) + click.echo(f"\nSeq concat: {seq_concat}") + + +if __name__ == '__main__': + main() diff --git a/pipeline/seq_retrieval/src/seq_region/__init__.py b/pipeline/seq_retrieval/src/seq_region/__init__.py new file mode 100644 index 00000000..8a4ddd88 --- /dev/null +++ b/pipeline/seq_retrieval/src/seq_region/__init__.py @@ -0,0 +1,2 @@ +from .seq_region import SeqRegion +from .seq_region import chain_seq_region_seqs diff --git a/pipeline/seq_retrieval/src/seq_region/seq_region.py b/pipeline/seq_retrieval/src/seq_region/seq_region.py new file mode 100644 index 00000000..c8d777a6 --- /dev/null +++ b/pipeline/seq_retrieval/src/seq_region/seq_region.py @@ -0,0 +1,161 @@ +""" +Module containing the SeqRegion class and all functions to handle SeqRegion entities. +""" +from typing import Any, Dict, List, Optional + +from Bio import Seq # Bio.Seq biopython submodule +import pysam + +from data_mover import data_file_mover + + +class SeqRegion(): + """ + Defines a DNA sequence region. + """ + + seq_id: str + """The sequence identifier found in the fasta file on which the sequence region is located""" + + start: int + """The start position of the sequence region (1-based, inclusive). Asserted to be `start` < `end`.""" + + end: int + """The end position of the sequence region (1-base, inclusive). Asserted to be `start` < `end`.""" + + strand: str + """The (genomic) strand of the sequence region""" + + fasta_file_path: str + """Absolute path to (faidx indexed) FASTA file containing the sequences""" + + sequence: Optional[str] + """the DNA sequence of a sequence region""" + + def __init__(self, seq_id: str, start: int, end: int, strand: str, fasta_file_url: str, seq: Optional[str] = None): + """ + Initializes a SeqRegion instance + + Args: + seq_id: The sequence identifier found in the fasta file on which the sequence region is located + start: The start position of the sequence region (1-based, inclusive).\ + If negative strand, `start` and `end` are swapped if `end` < `start`. + end: The end position of the sequence region (1-base, inclusive).\ + If negative strand, `start` and `end` are swapped if `end` < `start`. + strand: the (genomic) strand of the sequence region + fasta_file_url: Path to local faidx-indexed FASTA file containing the sequences to retrieve (regions of).\ + Faidx-index files `fasta_file_url`.fai and `fasta_file_url`.gzi for compressed fasta file must be accessible URLs. + seq: optional DNA sequence of the sequence region + + Raises: + ValueError: if value of `end` < `start` and `strand` is '+' + """ + self.seq_id = seq_id + self.strand = strand + + # If strand is -, ensure start <= end (swap as required) + if strand == '-': + if end < start: + self.start = end + self.end = start + else: + self.start = start + self.end = end + # If strand is +, throw error when end < start (likely user error) + else: + if end < start: + raise ValueError(f"Unexpected position order: end {end} < start {start}.") + else: + self.start = start + self.end = end + + # Fetch the file(s) + local_fasta_file_path = data_file_mover.fetch_file(fasta_file_url) + + # Fetch additional faidx index files in addition to fasta file itself + # (to the same location) + index_files = [fasta_file_url + '.fai'] + if fasta_file_url.endswith('.gz'): + index_files.append(fasta_file_url + '.gzi') + + for index_file in index_files: + data_file_mover.fetch_file(index_file) + + self.fasta_file_path = local_fasta_file_path + + if seq is not None: + self.sequence = seq + + def fetch_seq(self) -> None: + """ + Fetch sequence found at `seq_id`:`start`-`end`:`strand` + by reading from faidx files at `fasta_file_path`. + + Stores resulting sequence in `sequence` attribute. + """ + try: + fasta_file = pysam.FastaFile(self.fasta_file_path) + except ValueError: + raise FileNotFoundError(f"Missing index file matching path {self.fasta_file_path}.") + except IOError: + raise IOError(f"Error while reading fasta file or index matching path {self.fasta_file_path}.") + else: + seq = fasta_file.fetch(reference=self.seq_id, start=(self.start - 1), end=self.end) + fasta_file.close() + + if self.strand == '-': + seq = Seq.reverse_complement(seq) + + self.set_sequence(seq) + + def set_sequence(self, sequence: str) -> None: + """ + Set the `sequence` attribute. + + Asserts the length of `sequence` matches the expected sequence length for this region. + + Args: + sequence: DNA sequence (string) + + Raises: + valueError: If the length of `sequence` provided does not match the region length + """ + + seq_len = len(sequence) + expected_len = self.end - self.start + 1 + if seq_len != expected_len: + raise ValueError(f"Sequence length {seq_len} does not equal length expected on region positions {expected_len}.") + else: + self.sequence = sequence + + def get_sequence(self) -> str: + """Return `sequence` attribute as a string (empty string if `None`).""" + return str(self.sequence) + + +def chain_seq_region_seqs(seq_regions: List[SeqRegion], seq_strand: str) -> str: + """ + Chain multiple SeqRegions' sequenes together into one continuous sequence. + + SeqRegions are chained together in an order based on the `start` attribute of each: + * Ascending order when `seq_strand` is positive strand + * Descending order when `seq_strand` is negative strand + + Args: + seq_regions: list of SeqRegion objects to chain together + seq_strand: sequence strand which defines the chaining order + + Returns: + String representing the chained sequence of all input SeqRegions + """ + + sort_args: Dict[str, Any] = dict(key=lambda region: region.start, reverse=False) + + if seq_strand == '-': + sort_args['reverse'] = True + + sorted_regions = seq_regions + sorted_regions.sort(**sort_args) + chained_seq = ''.join(map(lambda region : region.get_sequence(), sorted_regions)) + + return chained_seq diff --git a/pipeline/seq_retrieval/tests/requirements.txt b/pipeline/seq_retrieval/tests/requirements.txt new file mode 100644 index 00000000..d6d09df3 --- /dev/null +++ b/pipeline/seq_retrieval/tests/requirements.txt @@ -0,0 +1,2 @@ +flake8==7.0.* +mypy==1.9.*