diff --git a/lib/py_nvd/_fingerprint.json b/lib/py_nvd/_fingerprint.json index 1df35cc..f1db982 100644 --- a/lib/py_nvd/_fingerprint.json +++ b/lib/py_nvd/_fingerprint.json @@ -1,4 +1,4 @@ { "main.nf": "d3df999c77a6754811017c07fa446c551d8334e72822a6df1c5cdcacb4715ebb", - "nextflow.config": "8d1bbbd14e66c6813a75fce1e3aca4307704fa1e4cd6552fb2d653dd60f78d51" + "nextflow.config": "0b83a6d10e66f13e1821df96ec76849df034bb97c7758993a0c54cdaa3a38166" } diff --git a/lib/py_nvd/cli/commands/preset.py b/lib/py_nvd/cli/commands/preset.py index a04bc42..c16c4d8 100644 --- a/lib/py_nvd/cli/commands/preset.py +++ b/lib/py_nvd/cli/commands/preset.py @@ -160,7 +160,17 @@ def preset_register( dedup: bool | None = typer.Option( None, "--dedup/--no-dedup", - help="Deduplicate reads", + help="Deduplicate reads (umbrella: enables both --dedup-seq and --dedup-pos)", + ), + dedup_seq: bool | None = typer.Option( + None, + "--dedup-seq/--no-dedup-seq", + help="Sequence-based deduplication with clumpify", + ), + dedup_pos: bool | None = typer.Option( + None, + "--dedup-pos/--no-dedup-pos", + help="Positional deduplication with samtools markdup", ), trim_adapters: bool | None = typer.Option( None, @@ -225,6 +235,8 @@ def preset_register( "entropy": entropy, "preprocess": preprocess, "dedup": dedup, + "dedup_seq": dedup_seq, + "dedup_pos": dedup_pos, "trim_adapters": trim_adapters, "scrub_host_reads": scrub_host_reads, "filter_reads": filter_reads, diff --git a/lib/py_nvd/cli/commands/run.py b/lib/py_nvd/cli/commands/run.py index 32a110b..a7e97c5 100644 --- a/lib/py_nvd/cli/commands/run.py +++ b/lib/py_nvd/cli/commands/run.py @@ -337,7 +337,19 @@ def run( dedup: bool | None = typer.Option( None, "--dedup/--no-dedup", - help="Deduplicate reads (default: follows --preprocess)", + help="Deduplicate reads (umbrella: enables both --dedup-seq and --dedup-pos)", + rich_help_panel=PANEL_PREPROCESSING, + ), + dedup_seq: bool | None = typer.Option( + None, + "--dedup-seq/--no-dedup-seq", + help="Sequence-based deduplication with clumpify (default: follows --dedup)", + rich_help_panel=PANEL_PREPROCESSING, + ), + dedup_pos: bool | None = typer.Option( + None, + "--dedup-pos/--no-dedup-pos", + help="Positional deduplication with samtools markdup (default: follows --dedup)", rich_help_panel=PANEL_PREPROCESSING, ), trim_adapters: bool | None = typer.Option( @@ -625,6 +637,8 @@ def run( "preprocess": preprocess, "merge_pairs": merge_pairs, "dedup": dedup, + "dedup_seq": dedup_seq, + "dedup_pos": dedup_pos, "trim_adapters": trim_adapters, "scrub_host_reads": scrub_host_reads, "filter_reads": filter_reads, diff --git a/lib/py_nvd/models.py b/lib/py_nvd/models.py index 53d9f7e..447c89e 100644 --- a/lib/py_nvd/models.py +++ b/lib/py_nvd/models.py @@ -929,7 +929,17 @@ class NvdParams(BaseModel): ) dedup: bool | None = Field( None, - description="Deduplicate reads", + description="Deduplicate reads (umbrella: enables both dedup_seq and dedup_pos)", + json_schema_extra={"category": "Preprocessing"}, + ) + dedup_seq: bool | None = Field( + None, + description="Sequence-based deduplication with clumpify (preprocessing)", + json_schema_extra={"category": "Preprocessing"}, + ) + dedup_pos: bool | None = Field( + None, + description="Positional deduplication with samtools markdup (after alignment)", json_schema_extra={"category": "Preprocessing"}, ) trim_adapters: bool | None = Field( diff --git a/lib/py_nvd/params.py b/lib/py_nvd/params.py index ce4f59d..78370df 100644 --- a/lib/py_nvd/params.py +++ b/lib/py_nvd/params.py @@ -22,7 +22,7 @@ SCHEMA_FILENAME = "nvd-params.latest.schema.json" # GitHub raw URL for schema (fallback and for generated templates) -SCHEMA_URL = "https://raw.githubusercontent.com/dhoconno/nvd/main/schemas/nvd-params.v2.4.0.schema.json" +SCHEMA_URL = "https://raw.githubusercontent.com/dhoconno/nvd/main/schemas/nvd-params.v2.5.0.schema.json" def _find_schema_path() -> Path: @@ -281,6 +281,8 @@ def _yaml_analysis_section( "preprocess", "merge_pairs", "dedup", + "dedup_seq", + "dedup_pos", "trim_adapters", "scrub_host_reads", "filter_reads", diff --git a/modules/minimap2.nf b/modules/minimap2.nf index 36194fa..dd22f82 100644 --- a/modules/minimap2.nf +++ b/modules/minimap2.nf @@ -1,12 +1,13 @@ /* - * Map reads to contigs using minimap2, with optional duplicate marking. + * Map reads to contigs using minimap2, with optional positional duplicate marking. * - * When params.dedup is true, the pipeline includes samtools collate/fixmate/markdup - * to identify and remove PCR/optical duplicates. This is recommended for amplicon - * or high-duplication libraries but adds computational overhead. + * When positional dedup is enabled (dedup_pos or dedup), the pipeline includes + * samtools collate/fixmate/markdup to identify and remove PCR/optical duplicates. + * This is recommended for amplicon or high-duplication libraries but adds + * computational overhead. * - * When params.dedup is false, reads are simply filtered (unmapped removed) and - * coordinate-sorted, which is sufficient for many viral metagenomics applications. + * When positional dedup is disabled, reads are simply filtered (unmapped removed) + * and coordinate-sorted, which is sufficient for many viral metagenomics applications. * * Output is always a coordinate-sorted, indexed BAM file. */ @@ -30,7 +31,8 @@ process MAP_READS_TO_CONTIGS { def preset = platform == 'ont' || platform == 'sra' ? "map-ont" : "sr" - if (params.dedup) { + def should_dedup_pos = params.dedup_pos ?: params.dedup ?: params.preprocess + if (should_dedup_pos) { """ minimap2 -ax ${preset} -t ${task.cpus} ${contigs} ${reads} \\ | samtools view -b -F 4 \\ diff --git a/nextflow.config b/nextflow.config index 9108e89..5aa3891 100644 --- a/nextflow.config +++ b/nextflow.config @@ -76,8 +76,12 @@ params { // whether to merge paired read mates based on overlaps between them merge_pairs = null - // whether to deduplicate sequencing reads + // whether to deduplicate sequencing reads (umbrella: enables both dedup_seq and dedup_pos) dedup = null + // sequence-based deduplication with clumpify (preprocessing) + dedup_seq = null + // positional deduplication with samtools markdup (after alignment) + dedup_pos = null // Adapter trimming (Illumina only) trim_adapters = null diff --git a/schemas/README.md b/schemas/README.md index 12bf560..3e91c2b 100644 --- a/schemas/README.md +++ b/schemas/README.md @@ -8,6 +8,7 @@ This directory contains JSON Schema definitions for the NVD pipeline. |--------|-------------| | `nvd-params.v2.3.0.schema.json` | Pipeline parameters schema (version 2.3.0) | | `nvd-params.v2.4.0.schema.json` | Pipeline parameters schema (version 2.4.0) | +| `nvd-params.v2.5.0.schema.json` | Pipeline parameters schema (version 2.5.0) | | `nvd-params.latest.schema.json` | Symlink to the current version | ## Usage diff --git a/schemas/nvd-params.latest.schema.json b/schemas/nvd-params.latest.schema.json index 798a65f..f862abd 120000 --- a/schemas/nvd-params.latest.schema.json +++ b/schemas/nvd-params.latest.schema.json @@ -1 +1 @@ -nvd-params.v2.4.0.schema.json \ No newline at end of file +nvd-params.v2.5.0.schema.json \ No newline at end of file diff --git a/schemas/nvd-params.v2.5.0.schema.json b/schemas/nvd-params.v2.5.0.schema.json new file mode 100644 index 0000000..a164b22 --- /dev/null +++ b/schemas/nvd-params.v2.5.0.schema.json @@ -0,0 +1,334 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://raw.githubusercontent.com/dhoconno/nvd/main/schemas/nvd-params.v2.5.0.schema.json", + "title": "NVD Pipeline Parameters", + "description": "Parameters for the Novel Virus Detection (NVD) metagenomics pipeline. Use with Nextflow's -params-file option or register as a preset with nvd preset register.", + "type": "object", + "properties": { + "labkey": { + "type": "boolean", + "default": false, + "description": "Enable LabKey integration for result uploads. When enabled, all labkey_* params must be set." + }, + "labkey_server": { + "type": "string", + "description": "LabKey server URL" + }, + "labkey_project_name": { + "type": "string", + "description": "LabKey project name" + }, + "labkey_webdav": { + "type": "string", + "description": "LabKey WebDAV endpoint URL" + }, + "labkey_schema": { + "type": "string", + "description": "LabKey schema name" + }, + "labkey_gottcha_fasta_list": { + "type": "string", + "description": "LabKey list name for GOTTCHA2 FASTA uploads" + }, + "labkey_gottcha_full_list": { + "type": "string", + "description": "LabKey list name for GOTTCHA2 full results" + }, + "labkey_gottcha_blast_verified_full_list": { + "type": "string", + "description": "LabKey list name for BLAST-verified GOTTCHA2 results" + }, + "labkey_blast_meta_hits_list": { + "type": "string", + "description": "LabKey list name for BLAST meta hits" + }, + "labkey_blast_fasta_list": { + "type": "string", + "description": "LabKey list name for BLAST FASTA uploads" + }, + "labkey_exp_id_guard_list": { + "type": "string", + "description": "LabKey list name for experiment ID guard (prevents duplicate uploads)" + }, + "samplesheet": { + "type": "string", + "format": "path", + "description": "Path to samplesheet CSV with columns: sample_id, srr, platform, fastq1, fastq2" + }, + "results": { + "type": "string", + "format": "path", + "description": "Directory for pipeline output files" + }, + "tools": { + "type": "string", + "pattern": "^(stat_blast|nvd|stat|blast|stast|gottcha|all|clumpify)(,(stat_blast|nvd|stat|blast|stast|gottcha|all|clumpify))*$", + "description": "Which analysis tool(s) to run. Valid tools: stat_blast, nvd, stat, blast, gottcha, all, clumpify. For multiple tools, use comma-separated values with no spaces (e.g., 'stat,blast,gottcha'). Use 'all' to run the complete pipeline." + }, + "experiment_id": { + "type": "string", + "description": "Experiment identifier for tracking and LabKey integration" + }, + "max_concurrent_downloads": { + "type": "integer", + "default": 3, + "minimum": 1, + "description": "Maximum number of concurrent SRA downloads" + }, + "cleanup": { + "type": "boolean", + "description": "Whether to empty the work directory after successful completion" + }, + "work_dir": { + "type": ["string", "null"], + "format": "path", + "description": "Nextflow work directory for intermediate files" + }, + "gottcha2_db_version": { + "type": "string", + "description": "GOTTCHA2 database version identifier" + }, + "blast_db_version": { + "type": "string", + "description": "BLAST database version identifier" + }, + "stat_db_version": { + "type": "string", + "description": "STAT database version identifier" + }, + "gottcha2_db": { + "type": "string", + "format": "path", + "description": "Path to GOTTCHA2 database directory" + }, + "nvd_files": { + "type": "string", + "format": "path", + "description": "Path to NVD resource files directory" + }, + "blast_db": { + "type": "string", + "format": "path", + "description": "Path to BLAST database directory" + }, + "blast_db_prefix": { + "type": "string", + "description": "BLAST database name prefix" + }, + "stat_index": { + "type": "string", + "format": "path", + "description": "Path to STAT index file" + }, + "stat_dbss": { + "type": "string", + "format": "path", + "description": "Path to STAT DBSS file" + }, + "stat_annotation": { + "type": "string", + "format": "path", + "description": "Path to STAT annotation file" + }, + "human_virus_taxlist": { + "type": "string", + "format": "path", + "description": "Path to human virus taxonomy list file" + }, + "preprocess": { + "type": "boolean", + "default": false, + "description": "Run all preprocessing steps on reads before classification (enables dedup, trim_adapters, scrub_host_reads, filter_reads)" + }, + "merge_pairs": { + "type": "boolean", + "description": "Merge paired read mates based on overlaps between them" + }, + "dedup": { + "type": "boolean", + "description": "Deduplicate sequencing reads (umbrella: enables both dedup_seq and dedup_pos)" + }, + "dedup_seq": { + "type": "boolean", + "description": "Sequence-based deduplication with clumpify (preprocessing)" + }, + "dedup_pos": { + "type": "boolean", + "description": "Positional deduplication with samtools markdup (after alignment)" + }, + "trim_adapters": { + "type": "boolean", + "description": "Trim Illumina adapters from reads" + }, + "scrub_host_reads": { + "type": "boolean", + "description": "Remove host (human) reads using STAT. Requires sra_human_db to be set." + }, + "sra_human_db": { + "type": "string", + "format": "path", + "description": "Path to human reads STAT database for host scrubbing and SRA submission prep" + }, + "human_read_scrub": { + "type": "string", + "format": "path", + "deprecated": true, + "description": "DEPRECATED: Use sra_human_db instead. Path to human reads database for SRA submission scrubbing" + }, + "filter_reads": { + "type": "boolean", + "description": "Apply quality and length filtering to reads" + }, + "min_read_quality_illumina": { + "type": "integer", + "default": 20, + "minimum": 0, + "description": "Minimum average quality score for Illumina reads" + }, + "min_read_quality_nanopore": { + "type": "integer", + "default": 12, + "minimum": 0, + "description": "Minimum average quality score for Nanopore reads" + }, + "min_read_length": { + "type": "integer", + "default": 50, + "minimum": 1, + "description": "Minimum read length to retain" + }, + "max_read_length": { + "type": "integer", + "minimum": 1, + "description": "Maximum read length to retain (no limit if not specified)" + }, + "cutoff_percent": { + "type": "number", + "default": 0.001, + "minimum": 0, + "maximum": 1, + "description": "Minimum abundance threshold for reporting taxa (0-1)" + }, + "entropy": { + "type": "number", + "default": 0.9, + "minimum": 0, + "maximum": 1, + "description": "Entropy threshold for sequence complexity filtering (0-1)" + }, + "min_consecutive_bases": { + "type": "integer", + "default": 200, + "minimum": 1, + "description": "Minimum number of consecutive bases required" + }, + "qtrim": { + "type": "string", + "default": "t", + "description": "Quality trimming mode" + }, + "tax_stringency": { + "type": "number", + "default": 0.7, + "minimum": 0, + "maximum": 1, + "description": "Stringency for taxonomic classification (0-1)" + }, + "include_children": { + "type": "boolean", + "default": true, + "description": "Include child taxa in taxonomic analysis" + }, + "human_virus_families": { + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "Adenoviridae", + "Anelloviridae", + "Arenaviridae", + "Arteriviridae", + "Astroviridae", + "Bornaviridae", + "Peribunyaviridae", + "Caliciviridae", + "Coronaviridae", + "Filoviridae", + "Flaviviridae", + "Hepadnaviridae", + "Hepeviridae", + "Orthoherpesviridae", + "Orthomyxoviridae", + "Papillomaviridae", + "Paramyxoviridae", + "Parvoviridae", + "Picobirnaviridae", + "Picornaviridae", + "Pneumoviridae", + "Polyomaviridae", + "Poxviridae", + "Sedoreoviridae", + "Retroviridae", + "Rhabdoviridae", + "Togaviridae", + "Kolmioviridae" + ], + "description": "List of virus family names to include in human virus analysis" + }, + "min_gottcha_reads": { + "type": "integer", + "default": 250, + "minimum": 1, + "description": "Minimum number of reads required for GOTTCHA2 analysis" + }, + "max_blast_targets": { + "type": "integer", + "default": 100, + "minimum": 1, + "description": "Maximum number of BLAST hits to consider before calling hits" + }, + "blast_retention_count": { + "type": "integer", + "default": 5, + "minimum": 1, + "description": "Number of top BLAST hits to retain" + }, + "state_dir": { + "type": "string", + "format": "path", + "description": "State directory for run tracking, upload deduplication, and taxonomy cache. Defaults to NVD_STATE_DIR env var or ~/.cache/nvd" + }, + "refman_registry": { + "type": "string", + "format": "path", + "description": "Path to refman registry file for reference management" + }, + "monoimage": { + "type": "string", + "default": "nrminor/nvd:v2.4.0", + "description": "Container image to use for pipeline execution" + }, + "slack_enabled": { + "type": "boolean", + "default": false, + "description": "Enable Slack notifications for run completion (stat_blast workflow only)" + }, + "slack_channel": { + "type": "string", + "pattern": "^C[A-Z0-9]+$", + "description": "Slack channel ID for notifications (e.g., 'C0123456789')" + }, + "stateless": { + "type": "boolean", + "default": false, + "description": "Run without state management (disables run tracking, sample locking, LabKey, and Slack). When enabled, state_dir is ignored and taxonomy_dir must be set." + }, + "taxonomy_dir": { + "type": ["string", "null"], + "format": "path", + "description": "Path to taxonomy database directory containing NCBI taxdump files. Defaults to {state_dir}/taxdump when state_dir is set. Required when stateless=true." + } + }, + "additionalProperties": false +} diff --git a/subworkflows/bundle_blast_for_labkey.nf b/subworkflows/bundle_blast_for_labkey.nf index 6baefbf..9d7425b 100644 --- a/subworkflows/bundle_blast_for_labkey.nf +++ b/subworkflows/bundle_blast_for_labkey.nf @@ -66,17 +66,17 @@ workflow BUNDLE_BLAST_FOR_LABKEY { validation_complete ) - // Upload BLAST results to LabKey + // Upload BLAST results to LabKey (eager: one process invocation per sample) LABKEY_UPLOAD_BLAST( - PREPARE_BLAST_LABKEY.out.csv.map { _meta, path -> path }.collect(), + PREPARE_BLAST_LABKEY.out.csv, experiment_id, run_id, run_context ) - // Upload FASTA results to LabKey + // Upload FASTA results to LabKey (eager: one process invocation per sample) LABKEY_UPLOAD_FASTA( - PREPARE_FASTA_LABKEY.out.csv.map { _meta, path -> path }.collect(), + PREPARE_FASTA_LABKEY.out.csv, experiment_id, run_id, run_context @@ -248,19 +248,19 @@ process PREPARE_FASTA_LABKEY { } process LABKEY_UPLOAD_BLAST { + tag "${sample_id}" label 'low' secret 'LABKEY_API_KEY' input: - path csv_files // collected list of all sample CSVs (value channel from .collect()) + tuple val(sample_id), path(csv_file) // per-sample prepared CSV from PREPARE_BLAST_LABKEY val experiment_id val run_id tuple val(sample_set_id), val(state_dir) // run_context: bundled from CHECK_RUN_STATE output: path "blast_labkey_upload.log", emit: log - path csv_files // preserve input files in work dir (no emit label - intentional) script: def sample_set_arg = sample_set_id ? "--sample-set-id '${sample_set_id}'" : "" @@ -281,12 +281,13 @@ process LABKEY_UPLOAD_BLAST { // Uploading BLAST FASTA process LABKEY_UPLOAD_FASTA { + tag "${sample_id}" label 'low' secret 'LABKEY_API_KEY' input: - path csv_files // collected list of all sample CSVs (value channel from .collect()) + tuple val(sample_id), path(csv_file) // per-sample prepared CSV from PREPARE_FASTA_LABKEY val experiment_id val run_id tuple val(sample_set_id), val(state_dir) // run_context: bundled from CHECK_RUN_STATE diff --git a/workflows/preprocess_reads.nf b/workflows/preprocess_reads.nf index 14aa753..d63ab0b 100644 --- a/workflows/preprocess_reads.nf +++ b/workflows/preprocess_reads.nf @@ -6,8 +6,8 @@ workflow PREPROCESS_READS { ch_fastq_tuple // tuple(sample_id, platform, read_structure, reads) main: - // Resolve optional step flags: explicit param wins, otherwise fall back to master switch - def should_dedup = params.dedup ?: params.preprocess + // Resolve optional step flags: explicit param wins, then umbrella, then master switch + def should_dedup = params.dedup_seq ?: params.dedup ?: params.preprocess def should_trim = params.trim_adapters ?: params.preprocess def should_scrub = params.scrub_host_reads ?: params.preprocess def should_filter = params.filter_reads ?: params.preprocess