diff --git a/.github/workflows/run-pytest.yml b/.github/workflows/run-pytest.yml index 58da7fe6..5e5b688a 100644 --- a/.github/workflows/run-pytest.yml +++ b/.github/workflows/run-pytest.yml @@ -12,22 +12,25 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ["3.9", "3.11"] + python-version: ["3.9", "3.12"] os: [ubuntu-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Install uv + run: pip install uv + - name: Install dev dependencies - run: if [ -f requirements/requirements-dev.txt ]; then pip install -r requirements/requirements-dev.txt; fi + run: if [ -f requirements/requirements-dev.txt ]; then uv pip install -r requirements/requirements-dev.txt --system; fi - name: Install package - run: python -m pip install . + run: uv pip install . --system # - name: Run pytest tests # run: pytest tests -x -vv diff --git a/MANIFEST.in b/MANIFEST.in index 91bf8afb..2215a408 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -14,3 +14,4 @@ include bedboss/tokens/* include bedboss/tokens/* include bedboss/bbuploader/* include bedboss/refgenome_validator/chrom_sizes/* +include bedboss/scripts/* diff --git a/bedboss/bbuploader/cli.py b/bedboss/bbuploader/cli.py index 55c6ac2c..cb911699 100644 --- a/bedboss/bbuploader/cli.py +++ b/bedboss/bbuploader/cli.py @@ -1,4 +1,5 @@ import typer + from bedboss._version import __version__ app_bbuploader = typer.Typer( @@ -21,7 +22,7 @@ def upload_all( None, help="The latest date when opep was updated [Default: today's date]" ), search_limit: int = typer.Option( - 10, help="Limit of projects to be searched. [Default: 10]" + 50, help="Limit of projects to be searched. [Default: 50]" ), search_offset: int = typer.Option( 0, help="Limit of projects to be searched. [Default: 0]" @@ -33,10 +34,19 @@ def upload_all( None, help="Reference genome [Default: None] (e.g. hg38) - if None, all genomes will be processed", ), + preload: bool = typer.Option( + True, help="Download bedfile before caching it. [Default: True]" + ), create_bedset: bool = typer.Option( True, help="Create bedset from bed files. [Default: True]" ), - rerun: bool = typer.Option(True, help="Re-run all the samples. [Default: False]"), + overwrite: bool = typer.Option( + False, help="Overwrite existing bedfiles. [Default: False]" + ), + overwrite_bedset: bool = typer.Option( + True, help="Overwrite existing bedset. [Default: False]" + ), + rerun: bool = typer.Option(False, help="Re-run all the samples. [Default: False]"), run_skipped: bool = typer.Option( True, help="Run skipped projects. [Default: False]" ), @@ -44,6 +54,16 @@ def upload_all( standardize_pep: bool = typer.Option( False, help="Standardize pep with BEDMESS. [Default: False]" ), + use_skipper: bool = typer.Option( + False, + help="Use skipper to skip projects if they were processed locally [Default: False]", + ), + reinit_skipper: bool = typer.Option( + False, help="Reinitialize skipper. [Default: False]" + ), + lite: bool = typer.Option( + False, help="Run the pipeline in lite mode. [Default: False]" + ), ): from .main import upload_all as upload_all_function @@ -57,10 +77,16 @@ def upload_all( download_limit=download_limit, genome=genome, create_bedset=create_bedset, + preload=preload, rerun=rerun, run_skipped=run_skipped, run_failed=run_failed, standardize_pep=standardize_pep, + use_skipper=use_skipper, + reinit_skipper=reinit_skipper, + overwrite=overwrite, + overwrite_bedset=overwrite_bedset, + lite=lite, ) @@ -78,14 +104,33 @@ def upload_gse( None, help=" reference genome to upload to database. If None, all genomes will be processed", ), + preload: bool = typer.Option( + True, help="Download bedfile before caching it. [Default: True]" + ), rerun: bool = typer.Option(True, help="Re-run all the samples. [Default: False]"), run_skipped: bool = typer.Option( True, help="Run skipped projects. [Default: False]" ), run_failed: bool = typer.Option(True, help="Run failed projects. [Default: False]"), + overwrite: bool = typer.Option( + False, help="Overwrite existing bedfiles. [Default: False]" + ), + overwrite_bedset: bool = typer.Option( + True, help="Overwrite existing bedset. [Default: False]" + ), standardize_pep: bool = typer.Option( False, help="Standardize pep with BEDMESS. [Default: False]" ), + use_skipper: bool = typer.Option( + False, + help="Use local skipper to skip projects if they were processed locally [Default: False]", + ), + reinit_skipper: bool = typer.Option( + False, help="Reinitialize skipper. [Default: False]" + ), + lite: bool = typer.Option( + False, help="Run the pipeline in lite mode. [Default: False]" + ), ): from .main import upload_gse as upload_gse_function @@ -95,10 +140,16 @@ def upload_gse( gse=gse, create_bedset=create_bedset, genome=genome, + preload=preload, rerun=rerun, run_skipped=run_skipped, run_failed=run_failed, standardize_pep=standardize_pep, + use_skipper=use_skipper, + reinit_skipper=reinit_skipper, + overwrite=overwrite, + overwrite_bedset=overwrite_bedset, + lite=lite, ) diff --git a/bedboss/bbuploader/constants.py b/bedboss/bbuploader/constants.py index 0f33a052..e7a7f910 100644 --- a/bedboss/bbuploader/constants.py +++ b/bedboss/bbuploader/constants.py @@ -1,6 +1,6 @@ PKG_NAME = "bbuploader" -FILE_FOLDER_NAME = "files" +FILE_FOLDER_NAME = "geo_files" DEFAULT_GEO_TAG = "samples" diff --git a/bedboss/bbuploader/main.py b/bedboss/bbuploader/main.py index 201c39a9..21d224a7 100644 --- a/bedboss/bbuploader/main.py +++ b/bedboss/bbuploader/main.py @@ -5,27 +5,37 @@ import peppy from bbconf import BedBaseAgent from bbconf.db_utils import GeoGseStatus, GeoGsmStatus +from geniml.exceptions import GenimlBaseError from pephubclient import PEPHubClient from pephubclient.helpers import MessageHandler from pephubclient.models import SearchReturnModel from sqlalchemy import and_, select from sqlalchemy.orm import Session -from bedboss.bbuploader.constants import DEFAULT_GEO_TAG, PKG_NAME, STATUS +from bedboss.bbuploader.constants import ( + DEFAULT_GEO_TAG, + FILE_FOLDER_NAME, + PKG_NAME, + STATUS, +) from bedboss.bbuploader.models import ( BedBossMetadata, BedBossRequired, ProjectProcessingStatus, ) +from bedboss.bbuploader.utils import create_gsm_sub_name from bedboss.bedboss import run_all from bedboss.bedbuncher.bedbuncher import run_bedbuncher from bedboss.exceptions import BedBossException -from bedboss.utils import standardize_genome_name, standardize_pep as pep_standardizer +from bedboss.skipper import Skipper +from bedboss.utils import calculate_time, download_file, standardize_genome_name +from bedboss.utils import standardize_pep as pep_standardizer _LOGGER = logging.getLogger(PKG_NAME) _LOGGER.setLevel(logging.DEBUG) +@calculate_time def upload_all( bedbase_config: str, outfolder: str = os.getcwd(), @@ -36,10 +46,16 @@ def upload_all( download_limit: int = 100, genome: str = None, create_bedset: bool = True, + preload=True, rerun: bool = False, run_skipped: bool = False, run_failed: bool = True, standardize_pep: bool = False, + use_skipper=True, + reinit_skipper=False, + overwrite=False, + overwrite_bedset=False, + lite=False, ): """ This is main function that is responsible for processing bed files from PEPHub. @@ -53,16 +69,21 @@ def upload_all( :param download_limit: limit of GSE projects to be downloaded (used for testing purposes) [Default: 100] :param genome: reference genome [Default: None] (e.g. hg38) - if None, all genomes will be processed :param create_bedset: create bedset from bed files - :param rerun: rerun processing of the series - :param run_skipped: rerun files that were skipped - :param run_failed: rerun failed files + :param preload: pre - download files to the local folder (used for faster reproducibility) + :param rerun: rerun processing of the series. Used in logging system. If you want to reupload file use overwrite + :param run_skipped: rerun files that were skipped. Used in logging system. If you want to reupload file use overwrite + :param run_failed: rerun failed files. Used in logging system. If you want to reupload file use overwrite :param standardize_pep: standardize pep metadata using BEDMS + :param use_skipper: use skipper to skip already processed logged locally. Skipper creates local log of processed + and failed files. + :param reinit_skipper: reinitialize skipper, if set to True, skipper will be reinitialized and all logs files will be cleaned + :param lite: lite mode, where skipping statistic processing for memory optimization and time saving """ phc = PEPHubClient() os.makedirs(outfolder, exist_ok=True) - bbagent = BedBaseAgent(config=bedbase_config) + bbagent = BedBaseAgent(config=bedbase_config, init_ml=not lite) genome = standardize_genome_name(genome) pep_annotation_list = find_peps( @@ -130,13 +151,20 @@ def upload_all( sa_session=session, gse_status_sa_model=gse_status, standardize_pep=standardize_pep, - rerun=rerun, + # rerun=rerun, + use_skipper=use_skipper, + reinit_skipper=reinit_skipper, + preload=preload, + overwrite=overwrite, + overwrite_bedset=overwrite_bedset, + lite=lite, ) except Exception as err: _LOGGER.error( f"Processing of '{gse_pep.name}' failed with error: {err}" ) gse_status.status = STATUS.FAIL + gse_status.error = str(err) session.commit() continue @@ -182,7 +210,7 @@ def process_pep_sample( return BedBossRequired( sample_name=bed_sample.sample_name, file_path=bed_sample.file_url, - ref_genome=standardize_genome_name(bed_sample.ref_genome), + ref_genome=bed_sample.ref_genome, type=file_type, narrowpeak=is_narrowpeak, pep=project_metadata, @@ -245,16 +273,23 @@ def find_peps( ) +@calculate_time def upload_gse( gse: str, bedbase_config: Union[str, BedBaseAgent], outfolder: str = os.getcwd(), create_bedset: bool = True, genome: str = None, + preload: bool = True, rerun: bool = False, run_skipped: bool = False, run_failed: bool = True, standardize_pep: bool = False, + use_skipper=True, + reinit_skipper=False, + overwrite=False, + overwrite_bedset=True, + lite=False, ): """ Upload bed files from GEO series to BedBase @@ -264,14 +299,21 @@ def upload_gse( :param outfolder: working directory, where files will be downloaded, processed and statistics will be saved :param create_bedset: create bedset from bed files :param genome: reference genome to upload to database. If None, all genomes will be processed + :param preload: pre - download files to the local folder (used for faster reproducibility) :param rerun: rerun processing of the series :param run_skipped: rerun files that were skipped :param run_failed: rerun failed files :param standardize_pep: standardize pep metadata using BEDMS + :param use_skipper: use skipper to skip already processed logged locally. Skipper creates local log of processed + and failed files. + :param reinit_skipper: reinitialize skipper, if set to True, skipper will be reinitialized and all logs files will be cleaned + :param overwrite: overwrite existing bedfiles + :param overwrite_bedset: overwrite existing bedset + :param lite: lite mode, where skipping statistic processing for memory optimization and time saving :return: None """ - bbagent = BedBaseAgent(config=bedbase_config) + bbagent = BedBaseAgent(config=bedbase_config, init_ml=not lite) with Session(bbagent.config.db_engine.engine) as session: _LOGGER.info(f"Processing: '{gse}'") @@ -306,18 +348,24 @@ def upload_gse( try: upload_result = _upload_gse( gse=gse, - bedbase_config=bedbase_config, + bedbase_config=bbagent, outfolder=outfolder, create_bedset=create_bedset, genome=genome, sa_session=session, gse_status_sa_model=gse_status, standardize_pep=standardize_pep, - rerun=rerun, + preload=preload, + overwrite=overwrite, + overwrite_bedset=overwrite_bedset, + use_skipper=use_skipper, + reinit_skipper=reinit_skipper, + lite=lite, ) except Exception as e: _LOGGER.error(f"Processing of '{gse}' failed with error: {e}") gse_status.status = STATUS.FAIL + gse_status.error = str(e) session.commit() exit() @@ -360,7 +408,12 @@ def _upload_gse( sa_session: Session = None, gse_status_sa_model: GeoGseStatus = None, standardize_pep: bool = False, - rerun: bool = False, + overwrite: bool = False, + overwrite_bedset: bool = False, + use_skipper: bool = True, + reinit_skipper: bool = False, + preload: bool = True, + lite=False, ) -> ProjectProcessingStatus: """ Upload bed files from GEO series to BedBase @@ -373,8 +426,13 @@ def _upload_gse( :param sa_session: opened session to the database :param gse_status_sa_model: sqlalchemy model for project status :param standardize_pep: standardize pep metadata using BEDMS - :param rerun: force overwrite data in the database - + :param overwrite: overwrite existing bedfiles + :param overwrite_bedset: overwrite existing bedset + :param use_skipper: use skipper to skip already processed logged locally. Skipper creates local log of processed + and failed files. + :param reinit_skipper: reinitialize skipper, if set to True, skipper will be reinitialized and all logs will be + :param preload: pre - download files to the local folder (used for faster reproducibility) + :param lite: lite mode, where skipping statistic processing for memory optimization and time saving :return: None """ if isinstance(bedbase_config, str): @@ -394,9 +452,34 @@ def _upload_gse( uploaded_files = [] gse_status_sa_model.number_of_files = len(project.samples) sa_session.commit() - for project_sample in project.samples: + + total_sample_number = len(project.samples) + + if use_skipper: + skipper_obj = Skipper(output_path=outfolder, name=gse) + if reinit_skipper: + skipper_obj.reinitialize() + _LOGGER.info(f"Skipper initialized for: '{gse}'") + else: + skipper_obj = None + + for counter, project_sample in enumerate(project.samples): + _LOGGER.info(f">> Processing {counter+1} / {total_sample_number}") sample_gsm = project_sample.get("sample_geo_accession", "").lower() + # if int(project_sample.get("file_size") or 0) > 10000000: + # _LOGGER.info(f"Skipping: '{sample_gsm}' - file size is too big") + # project_status.number_of_skipped += 1 + # skipper_obj.add_failed(sample_gsm, f"File size is too big. {int(project_sample.get('file_size'))/1000000} MB") + # continue + + if skipper_obj: + is_processed = skipper_obj.is_processed(sample_gsm) + if is_processed: + _LOGGER.info(f"Skipping: '{sample_gsm}' - already processed") + uploaded_files.append(is_processed) + continue + required_metadata = process_pep_sample( bed_sample=project_sample, ) @@ -419,6 +502,14 @@ def _upload_gse( ) sa_session.add(sample_status) sa_session.commit() + else: + if sample_status.status == STATUS.SUCCESS and not overwrite: + _LOGGER.info( + f"Skipping: '{required_metadata.sample_name}' - already processed" + ) + uploaded_files.append(sample_status.bed_id) + project_status.number_of_processed += 1 + continue sample_status.genome = required_metadata.ref_genome # to upload files only with a specific genome @@ -440,10 +531,25 @@ def _upload_gse( sample_status.status = STATUS.PROCESSING sa_session.commit() + if preload: + gsm_folder = create_gsm_sub_name(sample_gsm) + files_path = os.path.join(outfolder, FILE_FOLDER_NAME, gsm_folder) + os.makedirs(files_path, exist_ok=True) + file_abs_path = os.path.abspath( + os.path.join(files_path, project_sample.file) + ) + download_file(project_sample.file_url, file_abs_path, no_fail=True) + else: + file_abs_path = required_metadata.file_path + + required_metadata.ref_genome = standardize_genome_name( + required_metadata.ref_genome, file_abs_path + ) + try: file_digest = run_all( name=required_metadata.title, - input_file=required_metadata.file_path, + input_file=file_abs_path, input_type=required_metadata.type, outfolder=os.path.join(outfolder, "outputs"), genome=required_metadata.ref_genome, @@ -453,17 +559,34 @@ def _upload_gse( upload_pephub=True, upload_s3=True, upload_qdrant=True, - force_overwrite=rerun, + force_overwrite=overwrite, + lite=lite, ) uploaded_files.append(file_digest) + if skipper_obj: + skipper_obj.add_processed(sample_gsm, file_digest) sample_status.status = STATUS.SUCCESS + sample_status.bed_id = file_digest project_status.number_of_processed += 1 except BedBossException as exc: + _LOGGER.error(f"Processing of '{sample_gsm}' failed with error: {str(exc)}") sample_status.status = STATUS.FAIL sample_status.error = str(exc) project_status.number_of_failed += 1 + if skipper_obj: + skipper_obj.add_failed(sample_gsm, f"Error: {str(exc)}") + + except GenimlBaseError as exc: + _LOGGER.error(f"Processing of '{sample_gsm}' failed with error: {str(exc)}") + sample_status.status = STATUS.FAIL + sample_status.error = str(exc) + project_status.number_of_failed += 1 + + if skipper_obj: + skipper_obj.add_failed(sample_gsm, f"Error: {str(exc)}") + sa_session.commit() if create_bedset and uploaded_files: @@ -475,11 +598,12 @@ def _upload_gse( output_folder=os.path.join(outfolder, "outputs"), name=gse, description=project.description, - heavy=True, + heavy=False, # TODO: set to False because can't handle bedset > 10 files upload_pephub=True, upload_s3=True, no_fail=True, - force_overwrite=False, + force_overwrite=overwrite_bedset, + lite=lite, ) else: diff --git a/bedboss/bbuploader/utils.py b/bedboss/bbuploader/utils.py index 6121b91d..35691bb1 100644 --- a/bedboss/bbuploader/utils.py +++ b/bedboss/bbuploader/utils.py @@ -7,7 +7,6 @@ _LOGGER = logging.getLogger(PKG_NAME) -# This function is not used in the code anymore def download_file(file_url: str, local_file_path: str, force: bool = False) -> None: """ Download file using ftp url @@ -22,3 +21,26 @@ def download_file(file_url: str, local_file_path: str, force: bool = False) -> N urllib.request.urlretrieve(file_url, local_file_path) else: _LOGGER.info(f"File {local_file_path} already exists. Skipping downloading.") + + +def create_gsm_sub_name(name: str) -> str: + """ + Create gse subfolder name. e.g. + gse123456 -> gsm123nnn + gse123 -> gsennn + gse1234-> gse1nnn + gse1 -> gsennn + + ! This function was copied from geopephub utils + + :param name: gse name + :return: gse subfolder name + """ + + len_name = len(name) + + if len_name <= 6: + return """gsmnnn""" + else: + # return name[:6] + "n" * (len_name - 6) + return name[:-3] + "n" * 3 diff --git a/bedboss/bedboss.py b/bedboss/bedboss.py index cfbd9bd8..e6222d1a 100644 --- a/bedboss/bedboss.py +++ b/bedboss/bedboss.py @@ -1,3 +1,4 @@ +import datetime import logging import os import subprocess @@ -7,33 +8,32 @@ import pephubclient import peppy import pypiper +import yaml from bbconf.bbagent import BedBaseAgent from bbconf.const import DEFAULT_LICENSE from bbconf.models.base_models import FileModel from eido import validate_project +from geniml.bbclient import BBClient from pephubclient.helpers import MessageHandler as m from pephubclient.helpers import is_registry_path +from bedboss._version import __version__ from bedboss.bedbuncher import run_bedbuncher from bedboss.bedmaker.bedmaker import make_all from bedboss.bedstat.bedstat import bedstat from bedboss.const import BEDBOSS_PEP_SCHEMA_PATH, PKG_NAME +from bedboss.exceptions import BedBossException from bedboss.models import ( BedClassificationUpload, + BedSetAnnotations, FilesUpload, PlotsUpload, StatsUpload, ) from bedboss.refgenome_validator.main import ReferenceValidator - -from bedboss.utils import ( - standardize_genome_name, - get_genome_digest, - standardize_pep as pep_standardizer, -) -from bedboss.exceptions import BedBossException -from bedboss._version import __version__ - +from bedboss.skipper import Skipper +from bedboss.utils import calculate_time, get_genome_digest, standardize_genome_name +from bedboss.utils import standardize_pep as pep_standardizer _LOGGER = logging.getLogger(PKG_NAME) @@ -46,10 +46,14 @@ def requirements_check() -> None: """ _LOGGER.info("Checking requirements...") subprocess.run( - ["bash", f"{os.path.dirname(os.path.abspath(__file__))}/requirements_test.sh"] + [ + "bash", + f"{os.path.dirname(os.path.abspath(__file__))}/scripts/requirements_test.sh", + ] ) +@calculate_time def run_all( input_file: str, input_type: str, @@ -68,9 +72,11 @@ def run_all( other_metadata: dict = None, just_db_commit: bool = False, force_overwrite: bool = False, + update: bool = False, upload_qdrant: bool = False, upload_s3: bool = False, upload_pephub: bool = False, + lite: bool = False, # Universes universe: bool = False, universe_method: str = None, @@ -97,11 +103,13 @@ def run_all( :param dict other_metadata: a dict containing all attributes from the sample :param str ensdb: a full path to the ensdb gtf file required for genomes not in GDdata [optional] (basically genomes that's not in GDdata) - :param bool just_db_commit: whether just to commit the JSON to the database (default: False) - :param bool force_overwrite: force overwrite analysis (default: False) - :param bool upload_qdrant: whether to skip qdrant indexing + :param bool just_db_commit: whether just to commit the JSON to the database [Default: False] + :param bool force_overwrite: force overwrite analysis [Default: False] + :param bool update: whether to update the record in the database [Default: False] (if True, overwrites 'force_overwrite' and ignores it) + :param bool upload_qdrant: whether to skip qdrant indexing [Default: False] :param bool upload_s3: whether to upload to s3 - :param bool upload_pephub: whether to push bedfiles and metadata to pephub (default: False) + :param bool upload_pephub: whether to push bedfiles and metadata to pephub [Default: False] + :param bool lite: whether to run lite version of the pipeline [Default: False] :param bool universe: whether to add the sample as the universe [Default: False] :param str universe_method: method used to create the universe [Default: None] @@ -110,7 +118,7 @@ def run_all( :return str bed_digest: bed digest """ if isinstance(bedbase_config, str): - bbagent = BedBaseAgent(bedbase_config) + bbagent = BedBaseAgent(config=bedbase_config, init_ml=not lite) elif isinstance(bedbase_config, bbconf.BedBaseAgent): bbagent = bedbase_config else: @@ -143,21 +151,26 @@ def run_all( narrowpeak=narrowpeak, check_qc=check_qc, chrom_sizes=chrom_sizes, + lite=lite, pm=pm, ) if not other_metadata: other_metadata = {"sample_name": name} - statistics_dict = bedstat( - bedfile=bed_metadata.bed_file, - outfolder=outfolder, - genome=genome, - ensdb=ensdb, - bed_digest=bed_metadata.bed_digest, - open_signal_matrix=open_signal_matrix, - just_db_commit=just_db_commit, - pm=pm, - ) + if lite: + statistics_dict = {} + else: + statistics_dict = bedstat( + bedfile=bed_metadata.bed_file, + outfolder=outfolder, + genome=genome, + ensdb=ensdb, + bed_digest=bed_metadata.bed_digest, + open_signal_matrix=open_signal_matrix, + just_db_commit=just_db_commit, + rfg_config=rfg_config, + pm=pm, + ) statistics_dict["bed_type"] = bed_metadata.bed_type statistics_dict["bed_format"] = bed_metadata.bed_format.value @@ -204,22 +217,42 @@ def run_all( else: ref_valid_stats = None - bbagent.bed.add( - identifier=bed_metadata.bed_digest, - stats=stats.model_dump(exclude_unset=True), - metadata=other_metadata, - plots=plots.model_dump(exclude_unset=True), - files=files.model_dump(exclude_unset=True), - classification=classification.model_dump(exclude_unset=True), - ref_validation=ref_valid_stats, - license_id=license_id, - upload_qdrant=upload_qdrant, - upload_pephub=upload_pephub, - upload_s3=upload_s3, - local_path=outfolder, - overwrite=force_overwrite, - nofail=True, - ) + if update: + bbagent.bed.update( + identifier=bed_metadata.bed_digest, + stats=stats.model_dump(exclude_unset=True), + metadata=other_metadata, + plots=plots.model_dump(exclude_unset=True), + files=files.model_dump(exclude_unset=True), + classification=classification.model_dump(exclude_unset=True), + ref_validation=ref_valid_stats, + license_id=license_id, + upload_qdrant=upload_qdrant and not lite, + upload_pephub=upload_pephub, + upload_s3=upload_s3, + local_path=outfolder, + overwrite=True, + processed=not lite, + nofail=True, + ) + else: + bbagent.bed.add( + identifier=bed_metadata.bed_digest, + stats=stats.model_dump(exclude_unset=True), + metadata=other_metadata, + plots=plots.model_dump(exclude_unset=True), + files=files.model_dump(exclude_unset=True), + classification=classification.model_dump(exclude_unset=True), + ref_validation=ref_valid_stats, + license_id=license_id, + upload_qdrant=upload_qdrant and not lite, + upload_pephub=upload_pephub, + upload_s3=upload_s3, + local_path=outfolder, + overwrite=force_overwrite, + processed=not lite, + nofail=True, + ) if universe: bbagent.bed.add_universe( @@ -235,6 +268,7 @@ def run_all( return bed_metadata.bed_digest +@calculate_time def insert_pep( bedbase_config: str, output_folder: str, @@ -249,11 +283,14 @@ def insert_pep( ensdb: str = None, just_db_commit: bool = False, force_overwrite: bool = False, + update: bool = False, upload_s3: bool = False, upload_pephub: bool = False, upload_qdrant: bool = False, no_fail: bool = False, standardize_pep: bool = False, + lite: bool = False, + rerun: bool = False, pm: pypiper.PipelineManager = None, ) -> None: """ @@ -275,11 +312,14 @@ def insert_pep( :param str ensdb: a full path to the ensdb gtf file required for genomes not in GDdata :param bool just_db_commit: whether save only to the database (Without saving locally ) :param bool force_overwrite: whether to overwrite the existing record + :param bool update: whether to update the record in the database. This option will overwrite the force_overwrite option. [Default: False] :param bool upload_s3: whether to upload to s3 :param bool upload_pephub: whether to push bedfiles and metadata to pephub (default: False) :param bool upload_qdrant: whether to execute qdrant indexing :param bool no_fail: whether to raise an error if bedset was not added to the database + :param bool lite: whether to run lite version of the pipeline :param bool standardize_pep: whether to standardize the pep file before processing by using bedms. (default: False) + :param bool rerun: whether to rerun processed samples :param pypiper.PipelineManager pm: pypiper object :return: None """ @@ -303,7 +343,21 @@ def insert_pep( validate_project(pep, BEDBOSS_PEP_SCHEMA_PATH) + bedset_annotation = BedSetAnnotations(**pep.config).model_dump() + skipper = Skipper(output_folder, pep.name) + + if rerun: + skipper.reinitialize() + for i, pep_sample in enumerate(pep.samples): + is_processed = skipper.is_processed(pep_sample.sample_name) + if is_processed: + m.print_success( + f"Skipping {pep_sample.sample_name} : {is_processed}. Already processed." + ) + processed_ids.append(is_processed) + continue + m.print_success(f"Processing sample {i + 1}/{len(pep.samples)}") _LOGGER.info(f"Running bedboss pipeline for {pep_sample.sample_name}") if pep_sample.get("file_type"): @@ -331,19 +385,24 @@ def insert_pep( ensdb=ensdb, just_db_commit=just_db_commit, force_overwrite=force_overwrite, + update=update, upload_qdrant=upload_qdrant, upload_s3=upload_s3, upload_pephub=upload_pephub, universe=pep_sample.get("universe"), universe_method=pep_sample.get("universe_method"), universe_bedset=pep_sample.get("universe_bedset"), + lite=lite, pm=pm, ) processed_ids.append(bed_id) + skipper.add_processed(pep_sample.sample_name, bed_id, success=True) + except BedBossException as e: _LOGGER.error(f"Failed to process {pep_sample.sample_name}. See {e}") failed_samples.append(pep_sample.sample_name) + skipper.add_failed(pep_sample.sample_name, f"{e}") if create_bedset: _LOGGER.info(f"Creating bedset from {pep.name}") @@ -359,6 +418,8 @@ def insert_pep( upload_s3=upload_s3, no_fail=no_fail, force_overwrite=force_overwrite, + annotation=bedset_annotation, + lite=lite, ) else: _LOGGER.info( @@ -371,3 +432,204 @@ def insert_pep( m.print_error(f"Failed samples: {failed_samples}") return None + + +@calculate_time +def reprocess_all( + bedbase_config: Union[str, BedBaseAgent], + output_folder: str, + limit: int = 10, + no_fail: bool = False, +) -> None: + """ + Run bedboss pipeline for all unprocessed beds in the bedbase + + :param bedbase_config: bedbase configuration file path + :param output_folder: output folder of the pipeline + :param limit: limit of the number of beds to process + :param no_fail: whether to raise an error if bedset was not added to the database + + :return: None + """ + + if isinstance(bedbase_config, str): + bbagent = BedBaseAgent(config=bedbase_config) + elif isinstance(bedbase_config, bbconf.BedBaseAgent): + bbagent = bedbase_config + else: + raise BedBossException("Incorrect bedbase_config type. Exiting...") + + unprocessed_beds = bbagent.bed.get_unprocessed(limit=limit) + + bbclient = BBClient() + failed_samples = [] + for bed_annot in unprocessed_beds.results: + bed_file = bbclient.load_bed(bed_annot.id) + + try: + run_all( + input_file=bed_file.path, + input_type="bed", + outfolder=output_folder, + genome=bed_annot.genome_alias, + bedbase_config=bbagent, + name=bed_annot.name, + license_id=bed_annot.license_id, + rfg_config=None, + check_qc=False, + validate_reference=True, + chrom_sizes=None, + open_signal_matrix=None, + ensdb=None, + other_metadata=None, + just_db_commit=False, + update=True, + upload_qdrant=True, + upload_s3=True, + upload_pephub=True, + lite=False, + universe=False, + universe_method=None, + universe_bedset=None, + pm=None, + ) + except Exception as e: + _LOGGER.error(f"Failed to process {bed_annot.name}. See {e}") + if no_fail: + raise BedBossException(f"Failed to process {bed_annot.name}. See {e}") + + failed_samples.append( + { + "id": bed_annot.id, + "error": e, + } + ) + + if failed_samples: + date_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + with open( + os.path.join(output_folder, f"failed_samples_{date_now}.yaml"), "w" + ) as file: + yaml.dump(failed_samples, file) + + m.print_warning(f"Logs with failed samples are saved in {output_folder}") + + m.print_success(f"Processing completed successfully") + + print_values = dict( + unprocessed_files=unprocessed_beds.count, + processing_files=unprocessed_beds.limit, + failed_files=len(failed_samples), + success_files=unprocessed_beds.limit - len(failed_samples), + ) + print(print_values) + + +@calculate_time +def reprocess_one( + bedbase_config: Union[str, BedBaseAgent], + output_folder: str, + identifier: str, +) -> None: + """ + Run bedboss pipeline for one bed in the bedbase [Reprocess] + + :param bedbase_config: bedbase configuration file path + :param output_folder: output folder of the pipeline + :param identifier: bed identifier + + :return: None + """ + + if isinstance(bedbase_config, str): + bbagent = BedBaseAgent(config=bedbase_config) + elif isinstance(bedbase_config, bbconf.BedBaseAgent): + bbagent = bedbase_config + else: + raise BedBossException("Incorrect bedbase_config type. Exiting...") + + bbclient = BBClient() + + bed_annot = bbagent.bed.get(identifier) + bed_file = bbclient.load_bed(bed_annot.id) + + run_all( + input_file=bed_file.path, + input_type="bed", + outfolder=output_folder, + genome=bed_annot.genome_alias, + bedbase_config=bbagent, + name=bed_annot.name, + license_id=bed_annot.license_id, + rfg_config=None, + check_qc=False, + validate_reference=True, + chrom_sizes=None, + open_signal_matrix=None, + ensdb=None, + other_metadata=None, + just_db_commit=False, + update=True, + upload_qdrant=True, + upload_s3=True, + upload_pephub=True, + lite=False, + universe=False, + universe_method=None, + universe_bedset=None, + pm=None, + ) + + _LOGGER.info(f"Successfully processed {identifier}") + + +@calculate_time +def reprocess_bedset( + bedbase_config: Union[str, BedBaseAgent], + output_folder: str, + identifier: str, + no_fail: bool = True, + heavy: bool = False, +): + """ + Recalculate bedset from the bedbase + + :param bedbase_config: bedbase configuration file path + :param output_folder: output folder of the pipeline + :param identifier: bedset identifier + :param no_fail: whether to raise an error if bedset was not added to the database + :param heavy: whether to use heavy processing. Calculate plots for bedset + + :return: None + """ + + if isinstance(bedbase_config, str): + bbagent = BedBaseAgent(config=bedbase_config) + elif isinstance(bedbase_config, bbconf.BedBaseAgent): + bbagent = bedbase_config + else: + raise BedBossException("Incorrect bedbase_config type. Exiting...") + + bedset_annot = bbagent.bedset.get(identifier) + + run_bedbuncher( + bedbase_config=bbagent, + record_id=bedset_annot.id, + bed_set=bedset_annot.bed_ids, + name=bedset_annot.name, + output_folder=output_folder, + description=bedset_annot.description, + heavy=heavy, + upload_pephub=False, + upload_s3=heavy, + no_fail=no_fail, + force_overwrite=True, + annotation={ + **bedset_annot.model_dump( + exclude={ + "bed_ids", + } + ) + }, + lite=False, + ) diff --git a/bedboss/bedbuncher/bedbuncher.py b/bedboss/bedbuncher/bedbuncher.py index 2c3a330f..95504318 100644 --- a/bedboss/bedbuncher/bedbuncher.py +++ b/bedboss/bedbuncher/bedbuncher.py @@ -95,11 +95,13 @@ def run_bedbuncher( output_folder: str, name: str = None, description: str = None, + annotation: dict = None, heavy: bool = False, upload_pephub: bool = False, upload_s3: bool = False, no_fail: bool = False, force_overwrite: bool = False, + lite: bool = False, ) -> None: """ Add bedset to the database @@ -110,12 +112,14 @@ def run_bedbuncher( :param output_folder: path to the output folder :param bed_set: Bedset object or list of bedfiles ids :param description: Bedset description + :param annotation: Bedset annotation (author, source) :param heavy: whether to use heavy processing (add all columns to the database). if False -> R-script won't be executed, only basic statistics will be calculated :param no_fail: whether to raise an error if bedset was not added to the database :param upload_pephub: whether to create a view in pephub :param upload_s3: whether to upload files to s3 :param force_overwrite: whether to overwrite the record in the database + :param lite: whether to run the pipeline in lite mode # TODO: force_overwrite is not working!!! Fix it! :return: """ @@ -159,6 +163,8 @@ def run_bedbuncher( local_path=output_folder, no_fail=no_fail, overwrite=force_overwrite, + annotation=annotation, + processed=not lite, ) diff --git a/bedboss/bedmaker/bedmaker.py b/bedboss/bedmaker/bedmaker.py index 27336f3e..21ed24d6 100755 --- a/bedboss/bedmaker/bedmaker.py +++ b/bedboss/bedmaker/bedmaker.py @@ -105,9 +105,7 @@ def make_bigbed( except Exception as err: cleanup_pm_temp(pm) raise BedBossException( - f"Fail to generating bigBed files for {bed_path}: " - f"unable to validate genome assembly with Refgenie. " - f"Error: {err}" + f"Fail to generating bigBed files for {bed_path}: " f"Error: {err}" ) else: cmd = ( @@ -161,7 +159,7 @@ def make_bed( :return: path to the BED file """ - _LOGGER.info(f"Converting {os.path.abspath(input_file)} to BED format.") + _LOGGER.info(f"Processing {input_file} file in bedmaker...") input_type = input_type.lower() if input_type not in [member.value for member in InputTypes]: @@ -305,6 +303,10 @@ def make_bed( if pm_clean: pm.stop_pipeline() + _LOGGER.info( + f"Bed output file: {output_path}. BEDmaker: File processed successfully." + ) + return output_path @@ -317,6 +319,7 @@ def make_all( chrom_sizes: str = None, narrowpeak: bool = False, check_qc: bool = True, + lite: bool = False, pm: pypiper.PipelineManager = None, ) -> BedMakerOutput: """ @@ -340,6 +343,7 @@ def make_all( :param narrowpeak: whether the regions are narrow (transcription factor implies narrow, histone mark implies broad peaks) :param check_qc: run quality control during bedmaking + :param lite: run the pipeline in lite mode (without producing bigBed files) :param pm: pypiper object :return: dict with generated bed metadata - BedMakerOutput object: @@ -384,15 +388,19 @@ def make_all( f"Quality control failed for {output_path}. Error: {e}" ) try: - output_bigbed = make_bigbed( - bed_path=output_bed, - output_path=output_path, - genome=genome, - bed_type=bed_type, - rfg_config=rfg_config, - chrom_sizes=chrom_sizes, - pm=pm, - ) + if lite: + _LOGGER.info("Skipping bigBed generation due to lite mode.") + output_bigbed = None + else: + output_bigbed = make_bigbed( + bed_path=output_bed, + output_path=output_path, + genome=genome, + bed_type=bed_type, + rfg_config=rfg_config, + chrom_sizes=chrom_sizes, + pm=pm, + ) except BedBossException: output_bigbed = None if pm_clean: diff --git a/bedboss/bedqc/bedqc.py b/bedboss/bedqc/bedqc.py index 1493d91a..1f68f808 100755 --- a/bedboss/bedqc/bedqc.py +++ b/bedboss/bedqc/bedqc.py @@ -93,7 +93,9 @@ def bedqc( detail.append(f"Mean region width is less than {min_region_width} bp.") if len(detail) > 0: - _LOGGER.info("file_name: ", bedfile_name) + _LOGGER.info( + f"file_name: {bedfile_name}", + ) if os.path.exists(output_file): with open(output_file, "a") as f: f.write(f"{bedfile_name}\t{detail} \n") diff --git a/bedboss/bedstat/bedstat.py b/bedboss/bedstat/bedstat.py index 65415b4e..c211190d 100755 --- a/bedboss/bedstat/bedstat.py +++ b/bedboss/bedstat/bedstat.py @@ -1,11 +1,14 @@ import json import logging import os +import statistics +from pathlib import Path from typing import Union import pypiper from geniml.io import RegionSet +from bedboss.bedstat.gc_content import calculate_gc_content, create_gc_plot from bedboss.const import ( BEDSTAT_OUTPUT, HOME_PATH, @@ -70,6 +73,7 @@ def bedstat( ensdb: str = None, open_signal_matrix: str = None, just_db_commit: bool = False, + rfg_config: Union[str, Path] = None, pm: pypiper.PipelineManager = None, ) -> dict: """ @@ -82,6 +86,8 @@ def bedstat( required for the tissue specificity plots :param str outfolder: The folder for storing the pipeline results. :param str genome: genome assembly of the sample + :param bool just_db_commit: if True, the pipeline will only commit to the database + :param str rfg_config: path to the refgenie config file :param str ensdb: a full path to the ensdb gtf file required for genomes not in GDdata :param pm: pypiper object @@ -94,15 +100,16 @@ def bedstat( except FileExistsError: pass + # TODO: osm commented to speed up code # find/download open signal matrix - if not open_signal_matrix or not os.path.exists(open_signal_matrix): - try: - open_signal_matrix = get_osm_path(genome) - except OpenSignalMatrixException: - _LOGGER.warning( - f"Open Signal Matrix was not found for {genome}. Skipping..." - ) - open_signal_matrix = None + # if not open_signal_matrix or not os.path.exists(open_signal_matrix): + # try: + # open_signal_matrix = get_osm_path(genome) + # except OpenSignalMatrixException: + # _LOGGER.warning( + # f"Open Signal Matrix was not found for {genome}. Skipping..." + # ) + open_signal_matrix = None # Used to stop pipeline bedstat is used independently if not pm: @@ -112,19 +119,17 @@ def bedstat( if not bed_digest: bed_digest = RegionSet(bedfile).identifier - bedfile_name = os.path.split(bedfile)[1] - fileid = os.path.splitext(os.path.splitext(bedfile_name)[0])[0] outfolder_stats_results = os.path.abspath(os.path.join(outfolder_stats, bed_digest)) try: os.makedirs(outfolder_stats_results) except FileExistsError: pass json_file_path = os.path.abspath( - os.path.join(outfolder_stats_results, fileid + ".json") + os.path.join(outfolder_stats_results, bed_digest + ".json") ) json_plots_file_path = os.path.abspath( - os.path.join(outfolder_stats_results, fileid + "_plots.json") + os.path.join(outfolder_stats_results, bed_digest + "_plots.json") ) if not just_db_commit: if not pm: @@ -152,7 +157,7 @@ def bedstat( ) command = ( f"Rscript {rscript_path} --bedfilePath={bedfile} " - f"--fileId={fileid} --openSignalMatrix={open_signal_matrix} " + f"--fileId={bed_digest} --openSignalMatrix={open_signal_matrix} " f"--outputFolder={outfolder_stats_results} --genome={genome} " f"--ensdb={ensdb} --digest={bed_digest}" ) @@ -177,6 +182,25 @@ def bedstat( # length 1 and force keys to lower to correspond with the # postgres column identifiers data = {k.lower(): v[0] if isinstance(v, list) else v for k, v in data.items()} + try: + gc_contents = calculate_gc_content( + bedfile=bedfile, genome=genome, rfg_config=rfg_config + ) + except BaseException as e: + gc_contents = None + + if gc_contents: + gc_mean = statistics.mean(gc_contents) + + data["gc_content"] = round(gc_mean, 2) + + gc_plot = create_gc_plot( + bed_id=bed_digest, + gc_contents=gc_contents, + outfolder=os.path.join(outfolder_stats_results), + gc_mean=gc_mean, + ) + plots.append(gc_plot) for plot in plots: plot_id = plot["name"] diff --git a/bedboss/bedstat/gc_content.py b/bedboss/bedstat/gc_content.py new file mode 100644 index 00000000..a49a19f0 --- /dev/null +++ b/bedboss/bedstat/gc_content.py @@ -0,0 +1,141 @@ +import logging +import os +from typing import List, Union + +import matplotlib.pyplot as plt +import seaborn as sns +from gdrs import GenomeAssembly, calc_gc_content +from matplotlib.ticker import MaxNLocator +from refgenconf import RefgenconfError +from yacman.exceptions import UndefinedAliasError + +from bedboss.bedmaker.utils import get_rgc + +_LOGGER = logging.getLogger("bedboss") + +assembly_objects = {} + + +def get_genome_fasta_file(genome: str, rfg_config: str = None) -> str: + """ + Get genome fasta file with Refgenie. + + :param genome: genome name + :param rfg_config: path to refgenie config file + """ + rgc = get_rgc(rfg_config=rfg_config) + + try: + # get local path to the chrom.sizes asset + fasta_file = rgc.seek( + genome_name=genome, + asset_name="fasta", + tag_name="default", + seek_key="fasta", + ) + _LOGGER.info(f"fasta path: {fasta_file}") + except (UndefinedAliasError, RefgenconfError): + # if no local chrom.sizes file found, pull it first + _LOGGER.info("Could not determine path to chrom.sizes asset, pulling") + rgc.pull(genome=genome, asset="fasta", tag="default") + fasta_file = rgc.seek( + genome_name=genome, + asset_name="fasta", + tag_name="default", + seek_key="fasta", + ) + return fasta_file + + +def get_genome_assembly_obj( + genome: str, rfg_config: str = None +) -> Union[GenomeAssembly, None]: + """ + Get assembly object for a genome. + + :param genome: genome name + :param rfg_config: path to refgenie config file + + :return: assembly object + """ + if genome in assembly_objects: + return assembly_objects[genome] + else: + try: + assembly_objects[genome] = GenomeAssembly( + get_genome_fasta_file(genome, rfg_config=rfg_config) + ) + return assembly_objects[genome] + except Exception as e: + _LOGGER.error(f"Could not get assembly object for {genome}: {e}") + return None + + +def calculate_gc_content( + bedfile: str, genome: str, rfg_config: str = None +) -> Union[List[float], None]: + """ + Calculate GC content for a bed file. + + :param bedfile: path to bed file + :param genome: genome name + :param rfg_config: path to refgenie config file + + :return: list of GC contents + """ + + assembly_obj = get_genome_assembly_obj(genome, rfg_config=rfg_config) + if assembly_obj is None: + return None + + gc_contents = calc_gc_content( + bedfile, + assembly_obj, + ignore_unk_chroms=True, + ) + return gc_contents + + +def create_gc_plot( + bed_id: str, gc_contents: List[float], outfolder: str, gc_mean: float +) -> dict: + """ + Create a GC content plot. + + :param bed_id: bed ID + :param gc_contents: list of GC contents + :param outfolder: path to output file + :param gc_mean: mean GC content + + :return str: path to output file + """ + plt.rcParams["font.size"] = 10 + plt.figure(figsize=(8, 8)) + sns.kdeplot(gc_contents, linewidth=0.8, color="black") + plt.gca().xaxis.set_major_locator(MaxNLocator(nbins=5)) + + plt.axvline( + gc_mean, + color="r", + linestyle="--", + linewidth=0.8, + label=f"Mean: {gc_mean:.2f}", + ) + sns.despine() + plt.xlabel("GC Content") + plt.legend() + + plt.title("GC Content Distribution") + + pdf_path = os.path.join(outfolder, f"{bed_id}_gccontent.pdf") + png_path = os.path.join(outfolder, f"{bed_id}_gccontent.png") + + plt.savefig(pdf_path) + plt.savefig(png_path) + + return { + "name": "gccontent", + "title": "GC Content Distribution", + "thumbnail_path": png_path, + "path": pdf_path, + } diff --git a/bedboss/bedstat/tools/regionstat.R b/bedboss/bedstat/tools/regionstat.R index 26de6736..33211f92 100644 --- a/bedboss/bedstat/tools/regionstat.R +++ b/bedboss/bedstat/tools/regionstat.R @@ -1,9 +1,9 @@ # R Script that generates statistics for a bedfile library(GenomicDistributions) -library(GenomicDistributionsData) -library(GenomeInfoDb) -library(ensembldb) +# library(GenomicDistributionsData) +# library(GenomeInfoDb) +# library(ensembldb) library(optparse) library(tools) library(R.utils) @@ -118,6 +118,7 @@ doItAall <- function(query, fileId, genome, cellMatrix) { } else { run_plot = TRUE } + query_new = GenomeInfoDb::keepStandardChromosomes(query, pruning.mode="coarse") if (run_plot){ tryCatch( expr = { @@ -125,20 +126,21 @@ doItAall <- function(query, fileId, genome, cellMatrix) { message("Ensembl annotation gtf file not provided. Skipping TSS distance plot ... ") } else{ if (genome %in% c("hg19", "hg38", "mm10", "mm9")) { - TSSdist = calcFeatureDistRefTSS(query, genome) - plotBoth("tssdist", plotFeatureDist( TSSdist, featureName="TSS")) + TSSdist = calcFeatureDistRefTSS(query_new, genome) + plotBoth("tss_distance", plotFeatureDist( TSSdist, featureName="TSS")) } else { tss = getTssFromGTF(gtffile, TRUE) - TSSdist = calcFeatureDist(query, tss) - plotBoth("tssdist", plotFeatureDist( TSSdist, featureName="TSS")) + TSSdist = calcFeatureDist(query_new, tss) + plotBoth("tss_distance", plotFeatureDist( TSSdist, featureName="TSS")) } + plots = rbind(plots, getPlotReportDF("tss_distance", "Region-TSS distance distribution")) + message("Successfully calculated and plot TSS distance.") } if (exists("bedmeta")){ tss <- list(median_TSS_dist = signif(median(abs(TSSdist), na.rm=TRUE), digits = 4)) bedmeta = append(bedmeta, tss) } - plots = rbind(plots, getPlotReportDF("tssdist", "Region-TSS distance distribution")) - message("Successfully calculated and plot TSS distance.") + }, error = function(e){ message('Caught an error in creating: TSS distance plot!') @@ -146,9 +148,7 @@ doItAall <- function(query, fileId, genome, cellMatrix) { } ) } - - - + # Chromosomes region distribution plot if (!exists("bedmeta") ){ tryCatch( @@ -158,7 +158,7 @@ doItAall <- function(query, fileId, genome, cellMatrix) { genomeBins = getGenomeBins(chromSizes) plotBoth("chrombins", plotChromBins(calcChromBins(query, genomeBins))) } else{ - plotBoth("chrombins", plotChromBins(calcChromBinsRef(query, genome))) + plotBoth("chrombins", plotChromBins(calcChromBinsRef(query_new, genome))) } plots = rbind(plots, getPlotReportDF("chrombins", "Regions distribution over chromosomes")) @@ -170,49 +170,47 @@ doItAall <- function(query, fileId, genome, cellMatrix) { } ) } - - - - - # OPTIONAL: Plot GC content only if proper BSgenome package is installed. - if (exists("bedmeta")){ - if ("gc_content" %in% names(bedmeta)){ - run_plot = FALSE - } else { - run_plot = TRUE - } - } else { - run_plot = TRUE - } - if (run_plot){ - if (bsGenomeAvail) { - tryCatch( - expr = { - if (requireNamespace(BSgm, quietly=TRUE)){ - library (BSgm, character.only = TRUE) - bsg = eval(as.name(BSgm)) - gcvec = calcGCContent(query, bsg) - } else { - library (BSg, character.only = TRUE) - bsg = eval(as.name(BSg)) - gcvec = calcGCContent(query, bsg) - } - plotBoth("gccontent", plotGCContent(gcvec)) - if (exists("bedmeta")){ - gc_content <- list(gc_content = signif(mean(gcvec), digits = 4)) - bedmeta = append(bedmeta, gc_content) - } - plots = rbind(plots, getPlotReportDF("gccontent", "GC content")) - message("Successfully calculated and plot GC content.") - }, - error = function(e){ - message('Caught an error in creating: GC content plot!') - print(e, gcvec) - } - ) - } - } +# We can calculate it differently +# # OPTIONAL: Plot GC content only if proper BSgenome package is installed. +# if (exists("bedmeta")){ +# if ("gc_content" %in% names(bedmeta)){ +# run_plot = FALSE +# } else { +# run_plot = TRUE +# } +# } else { +# run_plot = TRUE +# } +# +# if (run_plot){ +# if (bsGenomeAvail) { +# tryCatch( +# expr = { +# if (requireNamespace(BSgm, quietly=TRUE)){ +# library (BSgm, character.only = TRUE) +# bsg = eval(as.name(BSgm)) +# gcvec = calcGCContent(query, bsg) +# } else { +# library (BSg, character.only = TRUE) +# bsg = eval(as.name(BSg)) +# gcvec = calcGCContent(query, bsg) +# } +# plotBoth("gccontent", plotGCContent(gcvec)) +# if (exists("bedmeta")){ +# gc_content <- list(gc_content = signif(mean(gcvec), digits = 4)) +# bedmeta = append(bedmeta, gc_content) +# } +# plots = rbind(plots, getPlotReportDF("gccontent", "GC content")) +# message("Successfully calculated and plot GC content.") +# }, +# error = function(e){ +# message('Caught an error in creating: GC content plot!') +# print(e, gcvec) +# } +# ) +# } +# } # Partition plots, default to percentages @@ -311,7 +309,7 @@ doItAall <- function(query, fileId, genome, cellMatrix) { message('Caught an error in creating: Cumulative partition plot!') print(e) } - ) + ) } # QThis plot @@ -360,24 +358,24 @@ doItAall <- function(query, fileId, genome, cellMatrix) { } - # Tissue specificity plot if open signal matrix is provided - if (!exists("bedmeta") ){ - if (cellMatrix == "None") { - message("open signal matrix not provided. Skipping tissue specificity plot ... ") - } else { - tryCatch( - expr = { - plotBoth("open_chromatin", plotSummarySignal(calcSummarySignal(query, data.table::fread(cellMatrix)))) - plots = rbind(plots, getPlotReportDF("open_chromatin", "Cell specific enrichment for open chromatin")) - message("Successfully calculated and plot cell specific enrichment for open chromatin.") - }, - error = function(e){ - message('Caught an error in creating: Cell specific enrichment for open chromatin plot!') - print(e) - } - ) - } - } +# # Tissue specificity plot if open signal matrix is provided +# if (!exists("bedmeta") ){ +# if (cellMatrix == "None") { +# message("open signal matrix not provided. Skipping tissue specificity plot ... ") +# } else { +# tryCatch( +# expr = { +# plotBoth("open_chromatin", plotSummarySignal(calcSummarySignal(query, data.table::fread(cellMatrix)))) +# plots = rbind(plots, getPlotReportDF("open_chromatin", "Cell specific enrichment for open chromatin")) +# message("Successfully calculated and plot cell specific enrichment for open chromatin.") +# }, +# error = function(e){ +# message('Caught an error in creating: Cell specific enrichment for open chromatin plot!') +# print(e) +# } +# ) +# } +# } # Note: names of the list elements MUST match what's defined in: https://github.com/databio/bbconf/blob/master/bbconf/schemas/bedfiles_schema.yaml @@ -387,7 +385,7 @@ doItAall <- function(query, fileId, genome, cellMatrix) { } else { bedmeta = list( name=fileId, - regions_no=length(query), + number_of_regions=length(query), mean_region_width=ifelse(exists('widths'), signif(mean(widths), digits = 4), NA), md5sum=opt$digest ) diff --git a/bedboss/cli.py b/bedboss/cli.py index 5bdd924a..5473f3c4 100644 --- a/bedboss/cli.py +++ b/bedboss/cli.py @@ -89,6 +89,13 @@ def run_all( force_overwrite: bool = typer.Option( False, help="Force overwrite the output files" ), + update: bool = typer.Option( + False, + help="Update the bedbase database with the new record if it exists. This overwrites 'force_overwrite' option", + ), + lite: bool = typer.Option( + False, help="Run the pipeline in lite mode. [Default: False]" + ), upload_qdrant: bool = typer.Option(False, help="Upload to Qdrant"), upload_s3: bool = typer.Option(False, help="Upload to S3"), upload_pephub: bool = typer.Option(False, help="Upload to PEPHub"), @@ -129,8 +136,10 @@ def run_all( open_signal_matrix=open_signal_matrix, ensdb=ensdb, other_metadata=None, + lite=lite, just_db_commit=just_db_commit, force_overwrite=force_overwrite, + update=update, upload_qdrant=upload_qdrant, upload_s3=upload_s3, upload_pephub=upload_pephub, @@ -164,12 +173,20 @@ def run_pep( force_overwrite: bool = typer.Option( False, help="Force overwrite the output files" ), + update: bool = typer.Option( + False, + help="Update the bedbase database with the new record if it exists. This overwrites 'force_overwrite' option", + ), upload_qdrant: bool = typer.Option(True, help="Upload to Qdrant"), upload_s3: bool = typer.Option(True, help="Upload to S3"), upload_pephub: bool = typer.Option(True, help="Upload to PEPHub"), no_fail: bool = typer.Option(False, help="Do not fail on error"), license_id: str = typer.Option(DEFAULT_LICENSE, help="License ID"), standardize_pep: bool = typer.Option(False, help="Standardize the PEP using bedMS"), + lite: bool = typer.Option( + False, help="Run the pipeline in lite mode. [Default: False]" + ), + rerun: bool = typer.Option(False, help="Rerun already processed samples"), # PipelineManager multi: bool = typer.Option(False, help="Run multiple samples"), recover: bool = typer.Option(True, help="Recover from previous run"), @@ -192,12 +209,15 @@ def run_pep( ensdb=ensdb, just_db_commit=just_db_commit, force_overwrite=force_overwrite, + update=update, license_id=license_id, upload_s3=upload_s3, upload_pephub=upload_pephub, upload_qdrant=upload_qdrant, no_fail=no_fail, standardize_pep=standardize_pep, + lite=lite, + rerun=rerun, pm=create_pm( outfolder=outfolder, multi=multi, @@ -208,6 +228,75 @@ def run_pep( ) +@app.command(help="Run unprocessed files, or reprocess them") +def reprocess_all( + bedbase_config: str = typer.Option( + ..., + help="Path to the bedbase config file", + exists=True, + file_okay=True, + readable=True, + ), + outfolder: str = typer.Option(..., help="Path to the output folder"), + limit: int = typer.Option(100, help="Limit the number of files to reprocess"), + no_fail: bool = typer.Option(True, help="Do not fail on error"), +): + from bedboss.bedboss import reprocess_all as reprocess_all_function + + reprocess_all_function( + bedbase_config=bedbase_config, + output_folder=outfolder, + limit=limit, + no_fail=no_fail, + ) + + +@app.command(help="Run unprocessed file, or reprocess it [Only 1 file]") +def reprocess_one( + bedbase_config: str = typer.Option( + ..., + help="Path to the bedbase config file", + exists=True, + file_okay=True, + readable=True, + ), + outfolder: str = typer.Option(..., help="Path to the output folder"), + identifier: str = typer.Option(..., help="Identifier of the bed file"), +): + from bedboss.bedboss import reprocess_one as reprocess_one_function + + reprocess_one_function( + bedbase_config=bedbase_config, + output_folder=outfolder, + identifier=identifier, + ) + + +@app.command(help="Reprocess a bedset") +def reprocess_bedset( + bedbase_config: str = typer.Option( + ..., + help="Path to the bedbase config file", + exists=True, + file_okay=True, + readable=True, + ), + outfolder: str = typer.Option(..., help="Path to the output folder"), + identifier: str = typer.Option(..., help="Bedset ID"), + no_fail: bool = typer.Option(True, help="Do not fail on error"), + heavy: bool = typer.Option(False, help="Run the heavy version of the pipeline"), +): + from bedboss.bedboss import reprocess_bedset as reprocess_bedset_function + + reprocess_bedset_function( + bedbase_config=bedbase_config, + output_folder=outfolder, + identifier=identifier, + no_fail=no_fail, + heavy=heavy, + ) + + @app.command(help=f"Create a bed files form a [{', '.join(options_list)}] file") def make_bed( input_file: str = typer.Option( @@ -568,7 +657,9 @@ def install_requirements(): import subprocess r_path = os.path.abspath( - os.path.join(os.path.dirname(os.path.realpath(__file__)), "installRdeps.R") + os.path.join( + os.path.dirname(os.path.realpath(__file__)), "scripts", "installRdeps.R" + ) ) subprocess.run( @@ -576,6 +667,22 @@ def install_requirements(): ) +@app.command(help="Verify configuration file") +def verify_config( + config: str = typer.Option( + ..., + help="Path to the bedbase config file", + exists=True, + file_okay=True, + readable=True, + ), +): + from bbconf.config_parser.utils import config_analyzer + + if config_analyzer(config): + print("Configuration file is valid!") + + @app.callback() def common( ctx: typer.Context, diff --git a/bedboss/models.py b/bedboss/models.py index 636bef36..45878f5d 100644 --- a/bedboss/models.py +++ b/bedboss/models.py @@ -135,3 +135,14 @@ class FilesUpload(BedFiles): class BedClassificationUpload(BedClassification): model_config = ConfigDict(extra="ignore") + + +class BedSetAnnotations(BaseModel): + """ + Annotations for a bedset + """ + + author: Union[str, None] = None + source: Union[str, None] = None + + model_config = ConfigDict(extra="ignore") diff --git a/bedboss/refgenome_validator/const.py b/bedboss/refgenome_validator/const.py new file mode 100644 index 00000000..5f0fb0aa --- /dev/null +++ b/bedboss/refgenome_validator/const.py @@ -0,0 +1,11 @@ +GENOME_FILES = { + "ensembl_hg38.chrom.sizes": "hg38", + "ncbi_hg38.chrom.sizes": "hg38", + "ucsc_dm6.chrom.sizes": "dm6", + "ucsc_hg19.chrom.sizes": "hg19", + "ucsc_hg38.chrom.sizes": "hg38", + "ucsc_mm10.chrom.sizes": "mm10", + "ucsc_mm39.chrom.sizes": "mm39", + "ucsc_mm9.chrom.sizes": "mm9", + "ucsc_panTo6.chrom.sizes": "panTro6", +} diff --git a/bedboss/refgenome_validator/main.py b/bedboss/refgenome_validator/main.py index 756351b8..532693d9 100644 --- a/bedboss/refgenome_validator/main.py +++ b/bedboss/refgenome_validator/main.py @@ -1,22 +1,23 @@ -from typing import Optional, Union, List, Dict +import logging import os +from typing import Dict, List, Optional, Union from bedboss.exceptions import ValidatorException +from bedboss.refgenome_validator.const import GENOME_FILES from bedboss.refgenome_validator.genome_model import GenomeModel from bedboss.refgenome_validator.models import ( - ChromNameStats, ChromLengthStats, - SequenceFitStats, + ChromNameStats, + CompatibilityConcise, CompatibilityStats, RatingModel, - CompatibilityConcise, + SequenceFitStats, ) from bedboss.refgenome_validator.utils import ( get_bed_chrom_info, - run_igd_command, parse_IGD_output, + run_igd_command, ) -import logging _LOGGER = logging.getLogger("bedboss") @@ -412,3 +413,32 @@ def _create_concise_output(output: CompatibilityStats) -> CompatibilityConcise: assigned_points=output.compatibility.assigned_points, tier_ranking=output.compatibility.tier_ranking, ) + + def predict(self, bedfile: str) -> Union[str, None]: + """ + Predict compatibility of a bed file with reference genomes + + :param bedfile: path to bedfile + + :return: sring with the name of the reference genome that the bed file is compatible with or None, if no compatibility is found + """ + + _LOGGER.info(f"Predicting compatibility of {bedfile} with reference genomes...") + compatibility_stats: Dict[str, CompatibilityConcise] = ( + self.determine_compatibility(bedfile, concise=True) + ) + + best_rankings = [] + + for genome, prediction in compatibility_stats.items(): + if prediction.tier_ranking == 1: + best_rankings.append(genome) + + if len(best_rankings) == 0: + for genome, prediction in compatibility_stats.items(): + if prediction.tier_ranking == 2: + best_rankings.append(genome) + + if len(best_rankings) == 1: + return GENOME_FILES.get(best_rankings[0]) + return None diff --git a/bedboss/refgenome_validator/models.py b/bedboss/refgenome_validator/models.py index f852860a..42c51c00 100644 --- a/bedboss/refgenome_validator/models.py +++ b/bedboss/refgenome_validator/models.py @@ -1,6 +1,7 @@ -from pydantic import BaseModel, ConfigDict from typing import Union +from pydantic import BaseModel, ConfigDict + class ChromNameStats(BaseModel): xs: float = 0.0 diff --git a/bedboss/refgenome_validator/utils.py b/bedboss/refgenome_validator/utils.py index 2a849344..9655c931 100644 --- a/bedboss/refgenome_validator/utils.py +++ b/bedboss/refgenome_validator/utils.py @@ -1,7 +1,8 @@ -from typing import Union, List -from geniml.io import RegionSet -import subprocess import logging +import subprocess +from typing import List, Union + +from geniml.io import RegionSet _LOGGER = logging.getLogger("bedboss") diff --git a/bedboss/scripts/__init__.py b/bedboss/scripts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bedboss/scripts/check_R_req.R b/bedboss/scripts/check_R_req.R new file mode 100644 index 00000000..a83c5cc6 --- /dev/null +++ b/bedboss/scripts/check_R_req.R @@ -0,0 +1,43 @@ +check_required_packages <- function() { + # ANSI escape codes for colors + red <- "\033[31m" + green <- "\033[32m" + reset <- "\033[0m" + + # List of required R packages + required_packages <- c( + "optparse", "devtools", "ensembldb", "ExperimentHub", "AnnotationHub", + "AnnotationFilter", "BSgenome", "GenomicFeatures", "GenomicDistributions", + "GenomicDistributionsData", "GenomeInfoDb", "tools", "R.utils", "LOLA", "conflicted" + ) + + # Check if each package is installed + missing_packages <- required_packages[!sapply(required_packages, requireNamespace, quietly = TRUE)] + + # Get the installed packages + installed_packages <- required_packages[sapply(required_packages, requireNamespace, quietly = TRUE)] + + # If there are missing packages, print them + if (length(missing_packages) > 0) { + cat(paste0(red, "The following packages are missing:\n", reset)) + cat(paste0(red, missing_packages, collapse = "\n"), "\n", reset) # Print missing packages vertically + } else { + cat(paste0(green, "All R required packages are installed.\n", reset)) + } + + # Print installed packages vertically + cat(paste0(green, "\nThe following packages are installed:\n", reset)) + cat(paste0(green, installed_packages, collapse = "\n"), "\n", reset) # Print installed packages vertically + + # Return missing packages + return(missing_packages) +} + +# Call the function +missing_packages <- check_required_packages() + +# Optionally handle missing packages +if (length(missing_packages) > 0) { + quit(status = 1) +} +quit(status = 0) \ No newline at end of file diff --git a/bedboss/installRdeps.R b/bedboss/scripts/installRdeps.R similarity index 100% rename from bedboss/installRdeps.R rename to bedboss/scripts/installRdeps.R diff --git a/bedboss/requirements_test.sh b/bedboss/scripts/requirements_test.sh similarity index 76% rename from bedboss/requirements_test.sh rename to bedboss/scripts/requirements_test.sh index f5cc81f2..20b8a326 100755 --- a/bedboss/requirements_test.sh +++ b/bedboss/scripts/requirements_test.sh @@ -116,17 +116,36 @@ if ! is_executable "wigToBigWig"; then INSTALL_WARNINGS=$((INSTALL_WARNINGS+1)) fi - -if is_executable "R"; then +# Old requirements check +#if is_executable "R"; then +# +# echo -e "-----------------------------------------------------------" +# echo -e "Checking required R packages for bedstat... " +# echo -e "-----------------------------------------------------------" +# declare -a requiredRPackages=("optparse ""devtools" "ensembldb" "ExperimentHub" "AnnotationHub" "AnnotationFilter" "BSgenome" "GenomicFeatures" "GenomicDistributions" "GenomicDistributionsData" "GenomeInfoDb" "ensembldb" "tools" "R.utils" "LOLA" "conflicted") +# for package in "${requiredRPackages[@]}"; do +# if ! r_check_req $package; then +# INSTALL_ERROR=$((INSTALL_ERROR+1)) +# fi +#done +#fi + +if is_executable "Rscript"; then echo -e "-----------------------------------------------------------" echo -e "Checking required R packages for bedstat... " echo -e "-----------------------------------------------------------" - declare -a requiredRPackages=("optparse ""devtools" "ensembldb" "ExperimentHub" "AnnotationHub" "AnnotationFilter" "BSgenome" "GenomicFeatures" "GenomicDistributions" "GenomicDistributionsData" "GenomeInfoDb" "ensembldb" "tools" "R.utils" "LOLA" "conflicted") - for package in "${requiredRPackages[@]}"; do - if ! r_check_req $package; then - INSTALL_ERROR=$((INSTALL_ERROR+1)) - fi -done + + SCRIPT_DIR=$(dirname "$(realpath "$0")") +# echo "The script is located at: $SCRIPT_DIR" + Rscript "$SCRIPT_DIR/check_R_req.R" + + if [ $? -eq 0 ]; then + echo "All required packages are installed." + else + echo $(fail "ERROR: Some R required packages are missing. Please install them.") + INSTALL_ERROR=$((INSTALL_ERROR+1)) + fi + fi echo "Number of WARNINGS: $INSTALL_WARNINGS" diff --git a/bedboss/skipper.py b/bedboss/skipper.py new file mode 100644 index 00000000..59fb58c7 --- /dev/null +++ b/bedboss/skipper.py @@ -0,0 +1,103 @@ +# This module will serve to skip samples that were already processed. +import os +from typing import Union + + +class Skipper: + def __init__(self, output_path: str, name: str): + self.output_path = output_path + self.name = name + + self.file_path = os.path.join(output_path, f"{name}.log") + self.file_fail_log_path = os.path.join(output_path, f"{name}_fail.log") + + self.info = self._read_log_file(self.file_path) + + def is_processed(self, sample_name: str) -> Union[str, bool]: + """ + Check if a sample was already processed. + + :param sample_name: name of the sample + + :return: digest of the sample + """ + return self.info.get(sample_name, False) + + def add_processed( + self, sample_name: str, digest: str, success: bool = False + ) -> None: + """ + Add a sample to the processed list. + + :param sample_name: name of the sample + :param digest: digest of the sample + :param success: if the processing was successful + """ + # line = f"{sample_name}\t{digest}\t{'success' if success else 'failed'}\n" + line = f"{sample_name},{digest}\n" + with open(self.file_path, "a") as file: + file.write(line) + + self.info[sample_name] = digest + + def _create_log_file(self, file_path: str) -> None: + """ + Create a log file. + + :param file_path: path to the log file + """ + with open(file_path, "w") as file: + file.write("") + + def _read_log_file(self, file_path: str) -> dict: + """ + Read the log file. + + :param file_path: path to the log file + """ + data_dict = {} + if os.path.exists(file_path): + with open(file_path, "r") as file: + for line in file: + # Split each line by whitespace + columns = line.strip().split(",") + + # Assign the first column as the key, and the rest as the value + if len(columns) >= 2: # Ensure there are at least three columns + key = columns[0] + value = columns[1] + data_dict[key] = value + else: + self._create_log_file(file_path) + + return data_dict + + def create_fail_log(self): + """ + Create a log file for failed samples. + """ + + if not os.path.exists(self.file_fail_log_path): + self._create_log_file(self.file_fail_log_path) + + def reinitialize(self): + """ + Reinitialize the log file. + """ + if os.path.exists(self.file_path): + os.remove(self.file_path) + self.info = self._read_log_file(self.file_path) + else: + self.info = self._read_log_file(self.file_path) + + def add_failed(self, sample_name: str, error: str = None): + """ + Add a sample to the failed list. + + :param sample_name: name of the sample + :param error: error message + """ + + line = f"{sample_name},{error}\n" + with open(self.file_fail_log_path, "a") as file: + file.write(line) diff --git a/bedboss/utils.py b/bedboss/utils.py index 5cbd80c2..4fe6c5b4 100644 --- a/bedboss/utils.py +++ b/bedboss/utils.py @@ -1,36 +1,50 @@ +import glob import logging import os +import time import urllib.request -import glob +from functools import wraps +import peppy import requests +from bedms import AttrStandardizer from pephubclient.files_manager import FilesManager -import peppy from peppy.const import SAMPLE_RAW_DICT_KEY -from bedms import AttrStandardizer from pypiper import PipelineManager +from bedboss.refgenome_validator.main import ReferenceValidator + _LOGGER = logging.getLogger("bedboss") -def standardize_genome_name(input_genome: str) -> str: +def standardize_genome_name(input_genome: str, bedfile: str = None) -> str: """ Standardizing user provided genome :param input_genome: standardize user provided genome, so bedboss know what genome we should use + :param bedfile: path to bed file :return: genome name string """ - if not input_genome: - return "" + if not isinstance(input_genome, str): + input_genome = "" input_genome = input_genome.strip().lower() # TODO: we have to add more genome options and preprocessing of the string if input_genome == "hg38" or input_genome == "grch38": return "hg38" elif input_genome == "hg19" or input_genome == "grch37": return "hg19" - elif input_genome == "mm10": + elif input_genome == "mm10" or input_genome == "grcm38": return "mm10" + elif input_genome == "mm9" or input_genome == "grcm37": + return "mm9" + + elif not input_genome or len(input_genome) > 7: + if bedfile: + predictor = ReferenceValidator() + return predictor.predict(bedfile) or "" + else: + return input_genome # else: # raise GenomeException("Incorrect genome assembly was provided") else: @@ -189,3 +203,27 @@ def cleanup_pm_temp(pm: PipelineManager) -> None: except Exception as e: _LOGGER.error(f"Error cleaning up: {e}") pm.cleanup_list_conditional = [] + + +def calculate_time(func): + @wraps(func) + def wrapper(*args, **kwargs): + + # print(f"--> Arguments: {args}") + # print(f"--> Keyword arguments: {kwargs}") + + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + execution_time = end_time - start_time + + hours, remainder = divmod(execution_time, 3600) + minutes, seconds = divmod(remainder, 60) + + print( + f"Function '{func.__name__}' executed in {int(hours)} hours, {int(minutes)} minutes, and {seconds:.2f} seconds" + ) + + return result + + return wrapper diff --git a/docs/changelog.md b/docs/changelog.md index b6ff12c4..6b207db0 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,22 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format. +# [0.5.0] - 2025-01-16 + +## Added + +- Added open_chromatin plot back into processing. +- Added gtrs dependency, that calculates gc content. +- Added skipper that automatically skips samples in pep that were already processed. +- Added lite functionality to main functions that allows to run uploading without using any heavy processing. +- Added function that will reprocess files, if they were unprocessed in the bedbase. +- Added function that predicts genome if genome wasn't provided. + +## Fixes +- Important speed improvements. +- Improved requirements checker. +- Minor bug fixes. + # [0.4.1] - 2024-09-20 ## Added - Standardization of peps using bedbase bedms schema diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index dd74b6e3..fbf58771 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -5,11 +5,12 @@ peppy>=0.40.7 yacman>=0.8.4 requests>=2.28.2 piper>=v0.14.3 -bbconf>=0.7.0 +bbconf>=0.10.3 # bbconf @ git+https://github.com/databio/bbconf.git@dev#egg=bbconf refgenconf>=0.12.2 pandas>=2.0.0 ubiquerg>=0.6.2 bedms>=0.1.0 pephubclient>=0.4.4 -geniml[ml]>=0.4.2 \ No newline at end of file +geniml[ml]>=0.6.0 +gdrs>=0.1.0 diff --git a/scripts/all/run_all.py b/scripts/all/run_all.py new file mode 100644 index 00000000..7ad2e1fc --- /dev/null +++ b/scripts/all/run_all.py @@ -0,0 +1,23 @@ +def unprocessed_run(): + from bedboss.bedboss import reprocess_all + + run_unprocessed_beds( + bedbase_config="/home/bnt4me/virginia/repos/bbuploader/config_db_local.yaml", + limit=10, + output_folder="/home/bnt4me/virginia/repos/bbuploader/data", + ) + + +def reprocess_one(): + from bedboss.bedboss import reprocess_one + + reprocess_one( + identifier="a0f1889fd8026780df8bba6a8ddac00e", + bedbase_config="/home/bnt4me/virginia/repos/bbuploader/config_db_local.yaml", + output_folder="/home/bnt4me/virginia/repos/bbuploader/data", + ) + + +if __name__ == "__main__": + # unprocessed_run() + reprocess_one() diff --git a/scripts/bb_text_search/main.py b/scripts/bb_text_search/main.py new file mode 100644 index 00000000..adc7dfd3 --- /dev/null +++ b/scripts/bb_text_search/main.py @@ -0,0 +1,53 @@ +import os +import pickle + +from geniml.search.backends.dbbackend import QdrantBackend +from qdrant_client import QdrantClient +from qdrant_client.http import models +from qdrant_client.models import Distance, PointStruct, VectorParams + +TEXT_QDRANT_COLLECTION_NAME = "bed_text" + +DEFAULT_QUANTIZATION_CONFIG = models.ScalarQuantization( + scalar=models.ScalarQuantizationConfig( + type=models.ScalarType.INT8, + quantile=0.99, + always_ram=True, + ), +) + + +def upload_text_embeddings(): + # lab qdrant client + # qc = QdrantClient( + # host=os.environ.get("QDRATN_HOST"), + # api_key=os.environ.get("QDRANT_API_KEY") + # ) + + qc = QdrantBackend( + dim=384, + collection=TEXT_QDRANT_COLLECTION_NAME, + qdrant_host="", + qdrant_api_key="", + ) + qc = QdrantClient( + url="", + api_key="", + ) + + # load metadata embedddings into new collection + with open("./text_loading.pkl", "rb") as f: + text_vectors, payloads = pickle.load(f) + + ids = list(range(0, len(payloads))) + + points = [ + PointStruct(id=ids[i], vector=text_vectors[i].tolist(), payload=payloads[i]) + for i in range(len(payloads)) + ] + + qc.upsert(collection_name=TEXT_QDRANT_COLLECTION_NAME, points=points) + + +if __name__ == "__main__": + upload_text_embeddings() diff --git a/scripts/bb_text_search/search_test.py b/scripts/bb_text_search/search_test.py new file mode 100644 index 00000000..34f58454 --- /dev/null +++ b/scripts/bb_text_search/search_test.py @@ -0,0 +1,55 @@ +from geniml.search.backends import BiVectorBackend, QdrantBackend +from geniml.search.interfaces import BiVectorSearchInterface + + +def search_test(): + # backend for text embeddings and bed embeddings + text_backend = QdrantBackend( + dim=384, + collection="bed_text", + qdrant_host="", + qdrant_api_key="", + ) # dim of sentence-transformers embedding output + bed_backend = QdrantBackend( + collection="bedbase2", + qdrant_host="", + qdrant_api_key="", + ) + + import cProfile + + # import pstats + # + # from bedboss.bedboss import run_all + # + # with cProfile.Profile() as pr: + # the search backend + from time import time + + search_backend = BiVectorBackend(text_backend, bed_backend) + + # the search interface + search_interface = BiVectorSearchInterface( + backend=search_backend, query2vec="sentence-transformers/all-MiniLM-L6-v2" + ) + time1 = time() + # actual search + result = search_interface.query_search( + query="leukemia", + limit=500, + with_payload=True, + with_vectors=False, + p=1.0, + q=1.0, + distance=False, # QdrantBackend returns similarity as the score, not distance + ) + result + time2 = time() + print(time2 - time1) + # stats = pstats.Stats(pr) + # stats.sort_stats(pstats.SortKey.TIME) + # stats.dump_stats(filename="test_profiling") + + +if __name__ == "__main__": + search_test() diff --git a/scripts/bbuploader/main.py b/scripts/bbuploader/main.py new file mode 100644 index 00000000..f698c543 --- /dev/null +++ b/scripts/bbuploader/main.py @@ -0,0 +1,87 @@ +from bedboss.bbuploader.main import upload_gse + + +def runn(): + upload_gse( + # gse="gse246900", + # gse="gse247593", + # gse="gse241222", + # gse="gse266130", + gse="gse256031", + # gse="gse240325", # TODO: check if qc works + # gse="gse229592", # mice + bedbase_config="/home/bnt4me/virginia/repos/bbuploader/config_db_local.yaml", + outfolder="/home/bnt4me/virginia/repos/bbuploader/data", + # genome="HG38", + rerun=True, + run_failed=True, + run_skipped=True, + ) + # import pandas as pd + + # df = pd.read_csv("/home/bnt4me/Downloads/test_b.bed.gz", sep="\t", + # header=None, nrows=4) + # rf = pd.read_csv("/home/bnt4me/.bbcache/bedfiles/4/f/test.bed.gz", sep="\t", + # header=None, nrows=4) + # rf + + +def another_test(): + # time it: + import time + + from bedboss.bbuploader.main import upload_gse + + time1 = time.time() + upload_gse( + # gse="gse261411", + # gse="gse261536", + # gse="gse274130", + # Genome hg19 and mm10 + gse="gse280839", + # gse="gse246900", + # gse="gse247593", + # gse="gse241222", + # gse="gse266130", + # gse="gse209627", + # gse="gse266949", + # gse="gse240325", # TODO: check if qc works + # gse="gse229592", # mice + bedbase_config="/home/bnt4me/virginia/repos/bbuploader/config_db_local.yaml", + outfolder="/home/bnt4me/virginia/repos/bbuploader/data", + # genome="HG38", + rerun=True, + run_failed=True, + run_skipped=True, + reinit_skipper=True, + lite=True, + overwrite=True, + overwrite_bedset=True, + ) + time2 = time.time() + print(f"Time taken: {time2 - time1}") + + +def upload_time(): + from bedboss.bbuploader.main import upload_all + + upload_all( + bedbase_config="/home/bnt4me/virginia/repos/bbuploader/config_db_local.yaml", + outfolder="/home/bnt4me/virginia/repos/bbuploader/data", + start_date="2020/06/01", + end_date="2020/07/15", + search_limit=1000, + download_limit=10000, + search_offset=0, + # genome="hg38", + rerun=True, + run_skipped=True, + lite=True, + ) + + +if __name__ == "__main__": + # runn() + + another_test() + # upload_time() diff --git a/scripts/profiling/prof.py b/scripts/profiling/prof.py index 09758daa..e6ed5a59 100644 --- a/scripts/profiling/prof.py +++ b/scripts/profiling/prof.py @@ -9,7 +9,9 @@ def runn(): bedbase_config="/home/bnt4me/virginia/repos/bbuploader/config_db_local.yaml", outfolder="/home/bnt4me/virginia/repos/bbuploader/data", genome="hg38", - input_file="/home/bnt4me/virginia/repos/bedboss/test/data/bed/hg38/GSM6732293_Con_liver-IP2.bed", + # input_file="/home/bnt4me/virginia/repos/bedboss/test/data/bed/hg38/GSM6732293_Con_liver-IP2.bed", + # input_file="/home/bnt4me/virginia/repos/bedboss/scripts/profiling/GSE253137_diff_peaks_PSIP_Jurkat_vs_IgG_Jurkat.MAPQ30_keepdupauto_hg38_peaks.narrowPeak.gz", + input_file="/home/bnt4me/Downloads/test_chroms.bed.gz", input_type="bed", force_overwrite=True, upload_pephub=True, diff --git a/scripts/ref_genome_validating/grab_chrom_sizes.py b/scripts/ref_genome_validating/grab_chrom_sizes.py index 46914ffa..0cb6934b 100644 --- a/scripts/ref_genome_validating/grab_chrom_sizes.py +++ b/scripts/ref_genome_validating/grab_chrom_sizes.py @@ -5,7 +5,6 @@ def main(): - # file_path = "/home/drc/Downloads/ncbi_ref_genome/ncbi_dataset/GCF_000001405.40_GRCh38.p14_genomic.fa" file_path = "/home/drc/Downloads/backup ref genome/GCA_000001405.29.fasta" FastaFile = open(file_path, "r") @@ -14,7 +13,6 @@ def main(): "/home/drc/GITHUB/bedboss/bedboss/scripts/ref_genome_validating/chrom_sizes/ensembl_hg38.chrom.sizes", "w", ) as file: - for rec in SeqIO.parse(FastaFile, "fasta"): name = rec.id seq = rec.seq diff --git a/scripts/ref_genome_validating/process_exclude_ranges.py b/scripts/ref_genome_validating/process_exclude_ranges.py index ca3854e6..fc12c8c6 100644 --- a/scripts/ref_genome_validating/process_exclude_ranges.py +++ b/scripts/ref_genome_validating/process_exclude_ranges.py @@ -6,8 +6,8 @@ import os import shutil import subprocess -import pipestat +import pipestat from geofetch import Geofetcher # assumes user has built the igd database and has igd(c) installed locally @@ -40,7 +40,6 @@ def main(species): print("Must supply species,e.g. mouse, homosapiens, rat, cow!") else: - # Make sure to have the IDE ignore these folders!!!! data_output_path = os.path.abspath("data") results_path = os.path.abspath("results") diff --git a/scripts/ref_genome_validating/stats_compat_testing.py b/scripts/ref_genome_validating/stats_compat_testing.py index 1e1fabca..cbf72dc1 100644 --- a/scripts/ref_genome_validating/stats_compat_testing.py +++ b/scripts/ref_genome_validating/stats_compat_testing.py @@ -2,11 +2,12 @@ # PEP : donaldcampbelljr/refgenome_compat_testing:default import copy import os -import pephubclient + +import matplotlib.colors as mcolors +import matplotlib.pyplot as plt import pandas as pd +import pephubclient import seaborn as sns -import matplotlib.pyplot as plt -import matplotlib.colors as mcolors try: PEP_URL = os.environ["PEP_URL"] @@ -17,7 +18,6 @@ def main(): - # pull from tier rating column to get the final assessment tier_rating_keys = ["mm10", "hg38", "hg19"] diff --git a/scripts/ref_genome_validating/stats_exclude_ranges_results.py b/scripts/ref_genome_validating/stats_exclude_ranges_results.py index 4d833564..efab976e 100644 --- a/scripts/ref_genome_validating/stats_exclude_ranges_results.py +++ b/scripts/ref_genome_validating/stats_exclude_ranges_results.py @@ -1,15 +1,13 @@ import copy - import os +import matplotlib.colors as mcolors +import matplotlib.pyplot as plt import numpy as np -import requests - -import pephubclient import pandas as pd +import pephubclient +import requests import seaborn as sns -import matplotlib.pyplot as plt -import matplotlib.colors as mcolors from scipy.cluster.hierarchy import linkage # PEP_URL = "donaldcampbelljr/excluded_ranges_species:default" diff --git a/scripts/ref_genome_validating/validate_genome.py b/scripts/ref_genome_validating/validate_genome.py index 85599b18..e56af284 100644 --- a/scripts/ref_genome_validating/validate_genome.py +++ b/scripts/ref_genome_validating/validate_genome.py @@ -1,15 +1,13 @@ # This script will be used to do standalone trials and tuning of the ref genome validator import json import os -from pipestat import pipestat -from bedboss.refgenome_validator.main import ( - ReferenceValidator, - GenomeModel, -) +from pipestat import pipestat # helper utils -from process_exclude_ranges import unzip_bedfile, get_samples, MAX_SAMPLES +from process_exclude_ranges import MAX_SAMPLES, get_samples, unzip_bedfile + +from bedboss.refgenome_validator.main import GenomeModel, ReferenceValidator try: IGD_DB_PATH = os.environ["IGD_DB_PATH"] diff --git a/test/test_bbuploader.py b/test/test_bbuploader.py index 2251a702..8d627080 100644 --- a/test/test_bbuploader.py +++ b/test/test_bbuploader.py @@ -1,4 +1,4 @@ -from bedboss.bbuploader.main import upload_gse, upload_all +from bedboss.bbuploader.main import upload_all, upload_gse def test_manual(): diff --git a/test/test_dependencies/bedbase_config_test.yaml b/test/test_dependencies/bedbase_config_test.yaml index 696aae26..36a81dac 100644 --- a/test/test_dependencies/bedbase_config_test.yaml +++ b/test/test_dependencies/bedbase_config_test.yaml @@ -10,7 +10,7 @@ database: port: 5432 password: docker user: postgres - name: bedbase + database: bedbase #name: pep-db dialect: postgresql driver: psycopg diff --git a/test/test_ref_validator.py b/test/test_ref_validator.py index f0540aa1..7b806333 100644 --- a/test/test_ref_validator.py +++ b/test/test_ref_validator.py @@ -2,7 +2,6 @@ from bedboss.refgenome_validator.main import ReferenceValidator - FILE_DIR = os.path.dirname(os.path.realpath(__file__)) HG19_CORRECT_DIR = os.path.join(FILE_DIR, "test_data", "bed", "hg19", "correct") FILE_PATH = f"{HG19_CORRECT_DIR}/sample1.bed.gz" @@ -17,25 +16,10 @@ def test_main(): "/home/bnt4me/.bbcache/bedfiles/0/7/0740332b148a613342603e2e483f53e5.bed.gz", concise=True, ) + # result = ReferenceValidator().predict( + # # "/home/bnt4me/.bbcache/bedfiles/0/4/04c46b96264ef40bca93f03b10345da5.bed.gz", + # "/home/bnt4me/.bbcache/bedfiles/1/9/19ed879232e44812b1ee35b57792b924.bed.gz", + # ) + # result assert dict_result - - -def test_another_test(): - from bedboss.bbuploader.main import upload_gse - - upload_gse( - # gse="gse246900", - # gse="gse247593", - # gse="gse241222", - # gse="gse266130", - gse="gse99178", - # gse="gse240325", # TODO: check if qc works - # gse="gse229592", # mice - bedbase_config="/home/bnt4me/virginia/repos/bbuploader/config_db_local.yaml", - outfolder="/home/bnt4me/virginia/repos/bbuploader/data", - # genome="HG38", - rerun=True, - run_failed=True, - run_skipped=True, - ) diff --git a/test/test_standardizer.py b/test/test_standardizer.py index b63b6af3..48974dd4 100644 --- a/test/test_standardizer.py +++ b/test/test_standardizer.py @@ -1,7 +1,8 @@ -from bedboss.utils import standardize_pep import peppy import pytest +from bedboss.utils import standardize_pep + # @pytest.mark.skip(reason="Not for automatic testing") @pytest.mark.parametrize(