From 7d1aebd40e4af1a94ee2565e3df6453c17423d73 Mon Sep 17 00:00:00 2001 From: Nick Minor Date: Thu, 12 Feb 2026 21:52:45 -0600 Subject: [PATCH 1/3] Convert BLAST LabKey uploads from batched to eager per-sample MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The BLAST workflow currently .collect()s all per-sample prepared CSVs into a single list and hands them to one LABKEY_UPLOAD_BLAST and one LABKEY_UPLOAD_FASTA process invocation. Those processes discover files via os.listdir() and loop over them. This means no upload can start until every sample finishes preparation, and a failure in any batch fails the entire upload. The GOTTCHA2 workflow already uses the eager pattern: each upload process receives a per-sample queue channel tuple, fires as soon as that sample is ready, and emits its own log. Logs are .mix()ed together for downstream gating. This commit brings BLAST uploads in line with that pattern. The change is entirely in the Nextflow wiring (bundle_blast_for_labkey.nf and stat_blast_workflow.nf). The Python upload scripts are unchanged — they already handle the single-file case correctly because their os.listdir() loop naturally finds one file when Nextflow stages one file. --- subworkflows/bundle_blast_for_labkey.nf | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/subworkflows/bundle_blast_for_labkey.nf b/subworkflows/bundle_blast_for_labkey.nf index 6baefbf..9d7425b 100644 --- a/subworkflows/bundle_blast_for_labkey.nf +++ b/subworkflows/bundle_blast_for_labkey.nf @@ -66,17 +66,17 @@ workflow BUNDLE_BLAST_FOR_LABKEY { validation_complete ) - // Upload BLAST results to LabKey + // Upload BLAST results to LabKey (eager: one process invocation per sample) LABKEY_UPLOAD_BLAST( - PREPARE_BLAST_LABKEY.out.csv.map { _meta, path -> path }.collect(), + PREPARE_BLAST_LABKEY.out.csv, experiment_id, run_id, run_context ) - // Upload FASTA results to LabKey + // Upload FASTA results to LabKey (eager: one process invocation per sample) LABKEY_UPLOAD_FASTA( - PREPARE_FASTA_LABKEY.out.csv.map { _meta, path -> path }.collect(), + PREPARE_FASTA_LABKEY.out.csv, experiment_id, run_id, run_context @@ -248,19 +248,19 @@ process PREPARE_FASTA_LABKEY { } process LABKEY_UPLOAD_BLAST { + tag "${sample_id}" label 'low' secret 'LABKEY_API_KEY' input: - path csv_files // collected list of all sample CSVs (value channel from .collect()) + tuple val(sample_id), path(csv_file) // per-sample prepared CSV from PREPARE_BLAST_LABKEY val experiment_id val run_id tuple val(sample_set_id), val(state_dir) // run_context: bundled from CHECK_RUN_STATE output: path "blast_labkey_upload.log", emit: log - path csv_files // preserve input files in work dir (no emit label - intentional) script: def sample_set_arg = sample_set_id ? "--sample-set-id '${sample_set_id}'" : "" @@ -281,12 +281,13 @@ process LABKEY_UPLOAD_BLAST { // Uploading BLAST FASTA process LABKEY_UPLOAD_FASTA { + tag "${sample_id}" label 'low' secret 'LABKEY_API_KEY' input: - path csv_files // collected list of all sample CSVs (value channel from .collect()) + tuple val(sample_id), path(csv_file) // per-sample prepared CSV from PREPARE_FASTA_LABKEY val experiment_id val run_id tuple val(sample_set_id), val(state_dir) // run_context: bundled from CHECK_RUN_STATE From d36cdc38a0c459ffb567e56096ef37754b8a2e80 Mon Sep 17 00:00:00 2001 From: Nick Minor Date: Thu, 12 Feb 2026 08:50:20 -0600 Subject: [PATCH 2/3] Open v2.5.0 params schema for development The v2.4.0 schema was released with that tag and should not be modified. New params from feature branches (deacon, GOTTCHA2, etc.) need a new schema version to land in. --- lib/py_nvd/params.py | 2 +- schemas/README.md | 1 + schemas/nvd-params.latest.schema.json | 2 +- schemas/nvd-params.v2.5.0.schema.json | 326 ++++++++++++++++++++++++++ 4 files changed, 329 insertions(+), 2 deletions(-) create mode 100644 schemas/nvd-params.v2.5.0.schema.json diff --git a/lib/py_nvd/params.py b/lib/py_nvd/params.py index ce4f59d..bbbf490 100644 --- a/lib/py_nvd/params.py +++ b/lib/py_nvd/params.py @@ -22,7 +22,7 @@ SCHEMA_FILENAME = "nvd-params.latest.schema.json" # GitHub raw URL for schema (fallback and for generated templates) -SCHEMA_URL = "https://raw.githubusercontent.com/dhoconno/nvd/main/schemas/nvd-params.v2.4.0.schema.json" +SCHEMA_URL = "https://raw.githubusercontent.com/dhoconno/nvd/main/schemas/nvd-params.v2.5.0.schema.json" def _find_schema_path() -> Path: diff --git a/schemas/README.md b/schemas/README.md index 12bf560..3e91c2b 100644 --- a/schemas/README.md +++ b/schemas/README.md @@ -8,6 +8,7 @@ This directory contains JSON Schema definitions for the NVD pipeline. |--------|-------------| | `nvd-params.v2.3.0.schema.json` | Pipeline parameters schema (version 2.3.0) | | `nvd-params.v2.4.0.schema.json` | Pipeline parameters schema (version 2.4.0) | +| `nvd-params.v2.5.0.schema.json` | Pipeline parameters schema (version 2.5.0) | | `nvd-params.latest.schema.json` | Symlink to the current version | ## Usage diff --git a/schemas/nvd-params.latest.schema.json b/schemas/nvd-params.latest.schema.json index 798a65f..f862abd 120000 --- a/schemas/nvd-params.latest.schema.json +++ b/schemas/nvd-params.latest.schema.json @@ -1 +1 @@ -nvd-params.v2.4.0.schema.json \ No newline at end of file +nvd-params.v2.5.0.schema.json \ No newline at end of file diff --git a/schemas/nvd-params.v2.5.0.schema.json b/schemas/nvd-params.v2.5.0.schema.json new file mode 100644 index 0000000..37c1506 --- /dev/null +++ b/schemas/nvd-params.v2.5.0.schema.json @@ -0,0 +1,326 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://raw.githubusercontent.com/dhoconno/nvd/main/schemas/nvd-params.v2.5.0.schema.json", + "title": "NVD Pipeline Parameters", + "description": "Parameters for the Novel Virus Detection (NVD) metagenomics pipeline. Use with Nextflow's -params-file option or register as a preset with nvd preset register.", + "type": "object", + "properties": { + "labkey": { + "type": "boolean", + "default": false, + "description": "Enable LabKey integration for result uploads. When enabled, all labkey_* params must be set." + }, + "labkey_server": { + "type": "string", + "description": "LabKey server URL" + }, + "labkey_project_name": { + "type": "string", + "description": "LabKey project name" + }, + "labkey_webdav": { + "type": "string", + "description": "LabKey WebDAV endpoint URL" + }, + "labkey_schema": { + "type": "string", + "description": "LabKey schema name" + }, + "labkey_gottcha_fasta_list": { + "type": "string", + "description": "LabKey list name for GOTTCHA2 FASTA uploads" + }, + "labkey_gottcha_full_list": { + "type": "string", + "description": "LabKey list name for GOTTCHA2 full results" + }, + "labkey_gottcha_blast_verified_full_list": { + "type": "string", + "description": "LabKey list name for BLAST-verified GOTTCHA2 results" + }, + "labkey_blast_meta_hits_list": { + "type": "string", + "description": "LabKey list name for BLAST meta hits" + }, + "labkey_blast_fasta_list": { + "type": "string", + "description": "LabKey list name for BLAST FASTA uploads" + }, + "labkey_exp_id_guard_list": { + "type": "string", + "description": "LabKey list name for experiment ID guard (prevents duplicate uploads)" + }, + "samplesheet": { + "type": "string", + "format": "path", + "description": "Path to samplesheet CSV with columns: sample_id, srr, platform, fastq1, fastq2" + }, + "results": { + "type": "string", + "format": "path", + "description": "Directory for pipeline output files" + }, + "tools": { + "type": "string", + "pattern": "^(stat_blast|nvd|stat|blast|stast|gottcha|all|clumpify)(,(stat_blast|nvd|stat|blast|stast|gottcha|all|clumpify))*$", + "description": "Which analysis tool(s) to run. Valid tools: stat_blast, nvd, stat, blast, gottcha, all, clumpify. For multiple tools, use comma-separated values with no spaces (e.g., 'stat,blast,gottcha'). Use 'all' to run the complete pipeline." + }, + "experiment_id": { + "type": "string", + "description": "Experiment identifier for tracking and LabKey integration" + }, + "max_concurrent_downloads": { + "type": "integer", + "default": 3, + "minimum": 1, + "description": "Maximum number of concurrent SRA downloads" + }, + "cleanup": { + "type": "boolean", + "description": "Whether to empty the work directory after successful completion" + }, + "work_dir": { + "type": ["string", "null"], + "format": "path", + "description": "Nextflow work directory for intermediate files" + }, + "gottcha2_db_version": { + "type": "string", + "description": "GOTTCHA2 database version identifier" + }, + "blast_db_version": { + "type": "string", + "description": "BLAST database version identifier" + }, + "stat_db_version": { + "type": "string", + "description": "STAT database version identifier" + }, + "gottcha2_db": { + "type": "string", + "format": "path", + "description": "Path to GOTTCHA2 database directory" + }, + "nvd_files": { + "type": "string", + "format": "path", + "description": "Path to NVD resource files directory" + }, + "blast_db": { + "type": "string", + "format": "path", + "description": "Path to BLAST database directory" + }, + "blast_db_prefix": { + "type": "string", + "description": "BLAST database name prefix" + }, + "stat_index": { + "type": "string", + "format": "path", + "description": "Path to STAT index file" + }, + "stat_dbss": { + "type": "string", + "format": "path", + "description": "Path to STAT DBSS file" + }, + "stat_annotation": { + "type": "string", + "format": "path", + "description": "Path to STAT annotation file" + }, + "human_virus_taxlist": { + "type": "string", + "format": "path", + "description": "Path to human virus taxonomy list file" + }, + "preprocess": { + "type": "boolean", + "default": false, + "description": "Run all preprocessing steps on reads before classification (enables dedup, trim_adapters, scrub_host_reads, filter_reads)" + }, + "merge_pairs": { + "type": "boolean", + "description": "Merge paired read mates based on overlaps between them" + }, + "dedup": { + "type": "boolean", + "description": "Deduplicate sequencing reads" + }, + "trim_adapters": { + "type": "boolean", + "description": "Trim Illumina adapters from reads" + }, + "scrub_host_reads": { + "type": "boolean", + "description": "Remove host (human) reads using STAT. Requires sra_human_db to be set." + }, + "sra_human_db": { + "type": "string", + "format": "path", + "description": "Path to human reads STAT database for host scrubbing and SRA submission prep" + }, + "human_read_scrub": { + "type": "string", + "format": "path", + "deprecated": true, + "description": "DEPRECATED: Use sra_human_db instead. Path to human reads database for SRA submission scrubbing" + }, + "filter_reads": { + "type": "boolean", + "description": "Apply quality and length filtering to reads" + }, + "min_read_quality_illumina": { + "type": "integer", + "default": 20, + "minimum": 0, + "description": "Minimum average quality score for Illumina reads" + }, + "min_read_quality_nanopore": { + "type": "integer", + "default": 12, + "minimum": 0, + "description": "Minimum average quality score for Nanopore reads" + }, + "min_read_length": { + "type": "integer", + "default": 50, + "minimum": 1, + "description": "Minimum read length to retain" + }, + "max_read_length": { + "type": "integer", + "minimum": 1, + "description": "Maximum read length to retain (no limit if not specified)" + }, + "cutoff_percent": { + "type": "number", + "default": 0.001, + "minimum": 0, + "maximum": 1, + "description": "Minimum abundance threshold for reporting taxa (0-1)" + }, + "entropy": { + "type": "number", + "default": 0.9, + "minimum": 0, + "maximum": 1, + "description": "Entropy threshold for sequence complexity filtering (0-1)" + }, + "min_consecutive_bases": { + "type": "integer", + "default": 200, + "minimum": 1, + "description": "Minimum number of consecutive bases required" + }, + "qtrim": { + "type": "string", + "default": "t", + "description": "Quality trimming mode" + }, + "tax_stringency": { + "type": "number", + "default": 0.7, + "minimum": 0, + "maximum": 1, + "description": "Stringency for taxonomic classification (0-1)" + }, + "include_children": { + "type": "boolean", + "default": true, + "description": "Include child taxa in taxonomic analysis" + }, + "human_virus_families": { + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "Adenoviridae", + "Anelloviridae", + "Arenaviridae", + "Arteriviridae", + "Astroviridae", + "Bornaviridae", + "Peribunyaviridae", + "Caliciviridae", + "Coronaviridae", + "Filoviridae", + "Flaviviridae", + "Hepadnaviridae", + "Hepeviridae", + "Orthoherpesviridae", + "Orthomyxoviridae", + "Papillomaviridae", + "Paramyxoviridae", + "Parvoviridae", + "Picobirnaviridae", + "Picornaviridae", + "Pneumoviridae", + "Polyomaviridae", + "Poxviridae", + "Sedoreoviridae", + "Retroviridae", + "Rhabdoviridae", + "Togaviridae", + "Kolmioviridae" + ], + "description": "List of virus family names to include in human virus analysis" + }, + "min_gottcha_reads": { + "type": "integer", + "default": 250, + "minimum": 1, + "description": "Minimum number of reads required for GOTTCHA2 analysis" + }, + "max_blast_targets": { + "type": "integer", + "default": 100, + "minimum": 1, + "description": "Maximum number of BLAST hits to consider before calling hits" + }, + "blast_retention_count": { + "type": "integer", + "default": 5, + "minimum": 1, + "description": "Number of top BLAST hits to retain" + }, + "state_dir": { + "type": "string", + "format": "path", + "description": "State directory for run tracking, upload deduplication, and taxonomy cache. Defaults to NVD_STATE_DIR env var or ~/.cache/nvd" + }, + "refman_registry": { + "type": "string", + "format": "path", + "description": "Path to refman registry file for reference management" + }, + "monoimage": { + "type": "string", + "default": "nrminor/nvd:v2.4.0", + "description": "Container image to use for pipeline execution" + }, + "slack_enabled": { + "type": "boolean", + "default": false, + "description": "Enable Slack notifications for run completion (stat_blast workflow only)" + }, + "slack_channel": { + "type": "string", + "pattern": "^C[A-Z0-9]+$", + "description": "Slack channel ID for notifications (e.g., 'C0123456789')" + }, + "stateless": { + "type": "boolean", + "default": false, + "description": "Run without state management (disables run tracking, sample locking, LabKey, and Slack). When enabled, state_dir is ignored and taxonomy_dir must be set." + }, + "taxonomy_dir": { + "type": ["string", "null"], + "format": "path", + "description": "Path to taxonomy database directory containing NCBI taxdump files. Defaults to {state_dir}/taxdump when state_dir is set. Required when stateless=true." + } + }, + "additionalProperties": false +} From 4195e8c4afcc82387b98c82fa3029f3c3963a42a Mon Sep 17 00:00:00 2001 From: Nick Minor Date: Thu, 12 Feb 2026 22:16:03 -0600 Subject: [PATCH 3/3] decomposing deduplication into two related parameters with backwards compatibility --- lib/py_nvd/_fingerprint.json | 2 +- lib/py_nvd/cli/commands/preset.py | 14 +++++++++++++- lib/py_nvd/cli/commands/run.py | 16 +++++++++++++++- lib/py_nvd/models.py | 12 +++++++++++- lib/py_nvd/params.py | 2 ++ modules/minimap2.nf | 16 +++++++++------- nextflow.config | 6 +++++- schemas/nvd-params.v2.5.0.schema.json | 10 +++++++++- workflows/preprocess_reads.nf | 4 ++-- 9 files changed, 67 insertions(+), 15 deletions(-) diff --git a/lib/py_nvd/_fingerprint.json b/lib/py_nvd/_fingerprint.json index 1df35cc..f1db982 100644 --- a/lib/py_nvd/_fingerprint.json +++ b/lib/py_nvd/_fingerprint.json @@ -1,4 +1,4 @@ { "main.nf": "d3df999c77a6754811017c07fa446c551d8334e72822a6df1c5cdcacb4715ebb", - "nextflow.config": "8d1bbbd14e66c6813a75fce1e3aca4307704fa1e4cd6552fb2d653dd60f78d51" + "nextflow.config": "0b83a6d10e66f13e1821df96ec76849df034bb97c7758993a0c54cdaa3a38166" } diff --git a/lib/py_nvd/cli/commands/preset.py b/lib/py_nvd/cli/commands/preset.py index a04bc42..c16c4d8 100644 --- a/lib/py_nvd/cli/commands/preset.py +++ b/lib/py_nvd/cli/commands/preset.py @@ -160,7 +160,17 @@ def preset_register( dedup: bool | None = typer.Option( None, "--dedup/--no-dedup", - help="Deduplicate reads", + help="Deduplicate reads (umbrella: enables both --dedup-seq and --dedup-pos)", + ), + dedup_seq: bool | None = typer.Option( + None, + "--dedup-seq/--no-dedup-seq", + help="Sequence-based deduplication with clumpify", + ), + dedup_pos: bool | None = typer.Option( + None, + "--dedup-pos/--no-dedup-pos", + help="Positional deduplication with samtools markdup", ), trim_adapters: bool | None = typer.Option( None, @@ -225,6 +235,8 @@ def preset_register( "entropy": entropy, "preprocess": preprocess, "dedup": dedup, + "dedup_seq": dedup_seq, + "dedup_pos": dedup_pos, "trim_adapters": trim_adapters, "scrub_host_reads": scrub_host_reads, "filter_reads": filter_reads, diff --git a/lib/py_nvd/cli/commands/run.py b/lib/py_nvd/cli/commands/run.py index 32a110b..a7e97c5 100644 --- a/lib/py_nvd/cli/commands/run.py +++ b/lib/py_nvd/cli/commands/run.py @@ -337,7 +337,19 @@ def run( dedup: bool | None = typer.Option( None, "--dedup/--no-dedup", - help="Deduplicate reads (default: follows --preprocess)", + help="Deduplicate reads (umbrella: enables both --dedup-seq and --dedup-pos)", + rich_help_panel=PANEL_PREPROCESSING, + ), + dedup_seq: bool | None = typer.Option( + None, + "--dedup-seq/--no-dedup-seq", + help="Sequence-based deduplication with clumpify (default: follows --dedup)", + rich_help_panel=PANEL_PREPROCESSING, + ), + dedup_pos: bool | None = typer.Option( + None, + "--dedup-pos/--no-dedup-pos", + help="Positional deduplication with samtools markdup (default: follows --dedup)", rich_help_panel=PANEL_PREPROCESSING, ), trim_adapters: bool | None = typer.Option( @@ -625,6 +637,8 @@ def run( "preprocess": preprocess, "merge_pairs": merge_pairs, "dedup": dedup, + "dedup_seq": dedup_seq, + "dedup_pos": dedup_pos, "trim_adapters": trim_adapters, "scrub_host_reads": scrub_host_reads, "filter_reads": filter_reads, diff --git a/lib/py_nvd/models.py b/lib/py_nvd/models.py index 53d9f7e..447c89e 100644 --- a/lib/py_nvd/models.py +++ b/lib/py_nvd/models.py @@ -929,7 +929,17 @@ class NvdParams(BaseModel): ) dedup: bool | None = Field( None, - description="Deduplicate reads", + description="Deduplicate reads (umbrella: enables both dedup_seq and dedup_pos)", + json_schema_extra={"category": "Preprocessing"}, + ) + dedup_seq: bool | None = Field( + None, + description="Sequence-based deduplication with clumpify (preprocessing)", + json_schema_extra={"category": "Preprocessing"}, + ) + dedup_pos: bool | None = Field( + None, + description="Positional deduplication with samtools markdup (after alignment)", json_schema_extra={"category": "Preprocessing"}, ) trim_adapters: bool | None = Field( diff --git a/lib/py_nvd/params.py b/lib/py_nvd/params.py index bbbf490..78370df 100644 --- a/lib/py_nvd/params.py +++ b/lib/py_nvd/params.py @@ -281,6 +281,8 @@ def _yaml_analysis_section( "preprocess", "merge_pairs", "dedup", + "dedup_seq", + "dedup_pos", "trim_adapters", "scrub_host_reads", "filter_reads", diff --git a/modules/minimap2.nf b/modules/minimap2.nf index 36194fa..dd22f82 100644 --- a/modules/minimap2.nf +++ b/modules/minimap2.nf @@ -1,12 +1,13 @@ /* - * Map reads to contigs using minimap2, with optional duplicate marking. + * Map reads to contigs using minimap2, with optional positional duplicate marking. * - * When params.dedup is true, the pipeline includes samtools collate/fixmate/markdup - * to identify and remove PCR/optical duplicates. This is recommended for amplicon - * or high-duplication libraries but adds computational overhead. + * When positional dedup is enabled (dedup_pos or dedup), the pipeline includes + * samtools collate/fixmate/markdup to identify and remove PCR/optical duplicates. + * This is recommended for amplicon or high-duplication libraries but adds + * computational overhead. * - * When params.dedup is false, reads are simply filtered (unmapped removed) and - * coordinate-sorted, which is sufficient for many viral metagenomics applications. + * When positional dedup is disabled, reads are simply filtered (unmapped removed) + * and coordinate-sorted, which is sufficient for many viral metagenomics applications. * * Output is always a coordinate-sorted, indexed BAM file. */ @@ -30,7 +31,8 @@ process MAP_READS_TO_CONTIGS { def preset = platform == 'ont' || platform == 'sra' ? "map-ont" : "sr" - if (params.dedup) { + def should_dedup_pos = params.dedup_pos ?: params.dedup ?: params.preprocess + if (should_dedup_pos) { """ minimap2 -ax ${preset} -t ${task.cpus} ${contigs} ${reads} \\ | samtools view -b -F 4 \\ diff --git a/nextflow.config b/nextflow.config index 9108e89..5aa3891 100644 --- a/nextflow.config +++ b/nextflow.config @@ -76,8 +76,12 @@ params { // whether to merge paired read mates based on overlaps between them merge_pairs = null - // whether to deduplicate sequencing reads + // whether to deduplicate sequencing reads (umbrella: enables both dedup_seq and dedup_pos) dedup = null + // sequence-based deduplication with clumpify (preprocessing) + dedup_seq = null + // positional deduplication with samtools markdup (after alignment) + dedup_pos = null // Adapter trimming (Illumina only) trim_adapters = null diff --git a/schemas/nvd-params.v2.5.0.schema.json b/schemas/nvd-params.v2.5.0.schema.json index 37c1506..a164b22 100644 --- a/schemas/nvd-params.v2.5.0.schema.json +++ b/schemas/nvd-params.v2.5.0.schema.json @@ -146,7 +146,15 @@ }, "dedup": { "type": "boolean", - "description": "Deduplicate sequencing reads" + "description": "Deduplicate sequencing reads (umbrella: enables both dedup_seq and dedup_pos)" + }, + "dedup_seq": { + "type": "boolean", + "description": "Sequence-based deduplication with clumpify (preprocessing)" + }, + "dedup_pos": { + "type": "boolean", + "description": "Positional deduplication with samtools markdup (after alignment)" }, "trim_adapters": { "type": "boolean", diff --git a/workflows/preprocess_reads.nf b/workflows/preprocess_reads.nf index 14aa753..d63ab0b 100644 --- a/workflows/preprocess_reads.nf +++ b/workflows/preprocess_reads.nf @@ -6,8 +6,8 @@ workflow PREPROCESS_READS { ch_fastq_tuple // tuple(sample_id, platform, read_structure, reads) main: - // Resolve optional step flags: explicit param wins, otherwise fall back to master switch - def should_dedup = params.dedup ?: params.preprocess + // Resolve optional step flags: explicit param wins, then umbrella, then master switch + def should_dedup = params.dedup_seq ?: params.dedup ?: params.preprocess def should_trim = params.trim_adapters ?: params.preprocess def should_scrub = params.scrub_host_reads ?: params.preprocess def should_filter = params.filter_reads ?: params.preprocess