diff --git a/images/fraser/Dockerfile b/images/fraser/Dockerfile index 527cbc4d..eb01c846 100644 --- a/images/fraser/Dockerfile +++ b/images/fraser/Dockerfile @@ -2,14 +2,36 @@ FROM debian:bookworm-slim ENV MAMBA_ROOT_PREFIX /root/micromamba ENV PATH $MAMBA_ROOT_PREFIX/bin:$PATH -ARG VERSION=${VERSION:-1.12.1} +ARG VERSION=2.4.6 RUN apt-get update && apt-get install -y git wget bash bzip2 zip && \ - rm -r /var/lib/apt/lists/* && \ rm -r /var/cache/apt/* && \ - wget -qO- https://api.anaconda.org/download/conda-forge/micromamba/0.8.2/linux-64/micromamba-0.8.2-he9b6cbd_0.tar.bz2 | tar -xvj -C /usr/local bin/micromamba && \ - mkdir ${MAMBA_ROOT_PREFIX} && \ - micromamba install -y --prefix ${MAMBA_ROOT_PREFIX} -c bioconda -c conda-forge \ - python=3.10 r-base=4.3.1 r-reshape2=1.4.4 r-tidyverse=2.0.0 \ - bioconductor-fraser=${VERSION} bioconductor-txdb.hsapiens.ucsc.hg38.knowngene bioconductor-org.hs.eg.db && \ - rm -r /root/micromamba/pkgs + wget -qO- https://micro.mamba.pm/api/micromamba/linux-64/latest | tar -xvj -C /usr/local/bin --strip-components=1 bin/micromamba && \ + chmod +x /usr/local/bin/micromamba && \ + mkdir -p ${MAMBA_ROOT_PREFIX} && \ + micromamba install -y --root-prefix ${MAMBA_ROOT_PREFIX} --prefix ${MAMBA_ROOT_PREFIX} \ + -c bioconda -c conda-forge \ + python=3.10.19 \ + r-base=4.4.3 \ + r-reshape2=1.4.5 \ + r-tidyverse=2.0.0 \ + r-argparse=2.2.3 \ + r-biocmanager \ + bioconductor-fraser=${VERSION} \ + bioconductor-txdb.hsapiens.ucsc.hg38.knowngene=3.20.0 \ + bioconductor-org.hs.eg.db=3.20.0 && \ + micromamba run -n base Rscript -e 'BiocManager::install("SummarizedExperiment", update=FALSE, ask=FALSE)' && \ + micromamba run -n base Rscript -e 'BiocManager::install("Rsamtools", update=FALSE, ask=FALSE)' && \ + micromamba run -n base Rscript -e 'BiocManager::install("HDF5Array", update=FALSE, ask=FALSE)' && \ + micromamba clean --all --yes && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir /RDrnaseq + +COPY fraser_count_split.R RDrnaseq/ +COPY fraser_analysis.R RDrnaseq/ +COPY fraser_count_non_split.R RDrnaseq/ +COPY fraser_init.R RDrnaseq/ +COPY fraser_merge_non_split.R RDrnaseq/ +COPY fraser_merge_split.R RDrnaseq/ +COPY fraser_join_counts.R RDrnaseq/ diff --git a/images/fraser/README_Fraser.txt b/images/fraser/README_Fraser.txt new file mode 100644 index 00000000..85f6c06f --- /dev/null +++ b/images/fraser/README_Fraser.txt @@ -0,0 +1,42 @@ +The tool is called "Fraser" and it is an R package that can be used to analyze RNA-seq data for splicing events. +It is designed to identify and quantify splicing events in a cohort of samples, and can be used to compare +splicing patterns between different groups of samples. + +This image and tool was made to run Cohort-Level analyses on Exome data for the RD team. +When using these scripts and tools, please make sure to have the correct version of R and the necessary packages installed. +They were designed to be used in conjunction with the cpg-flow-rdrnaseq and specifically the fraser stage of that workflow and run in this order + +1. fraser_init.R: +Initializes the FRASER dataset for a cohort. +Inputs: --cohort_id, --sample_map (CSV with sample_id and bam_path), --work_dir, --nthreads +What it does: Reads the sample map, creates a FRASER dataset skeleton, and saves it for downstream steps. + +2. fraser_count_split.R: +Counts split (spliced) reads for a single sample. +Inputs: --fds_path, --bam_path, --cohort_id, --sample_id, --work_dir, --nthreads +What it does: Loads the dataset, subsets to the sample, counts split reads, and saves results to the cache. + +3. fraser_merge_split.R: +Merges split read counts from all samples. +Inputs: --fds_path, --cohort_id, --work_dir, --nthreads +What it does: Loads and merges all split count files, annotates splice sites, and saves the merged data for the cohort. + +4. fraser_count_non_split.R: +Counts non-split (unspliced) reads for a single sample. +Inputs: --fds_path, --bam_path, --cohort_id, --sample_id, --coords_path, --work_dir, --nthreads +What it does: Counts non-split reads at splice site coordinates and saves results to the cache. + +5. fraser_merge_non_split.R: +Merges non-split read counts and calculates PSI values. +Inputs: --fds_path, --cohort_id, --filtered_ranges_path, --work_dir, --nthreads +What it does: Merges non-split counts for all samples, organizes HDF5 files, and prepares data for analysis. + +6. fraser_join_counts.R: +Joins split and non-split counts into a single dataset. +Inputs: --fds_path, --cohort_id, --work_dir, --nthreads +What it does: Checks data integrity, joins split and non-split counts, and ensures all metadata is present for analysis. + +7. fraser_analysis.R: +Performs statistical analysis and generates QC plots. +Inputs: --fds_dir, --cohort_id, --pval_cutoff, --delta_psi_cutoff, --min_count, --nthreads +What it does: Loads the final dataset, calculates splicing statistics, applies filters, and generates quality control plots and results. diff --git a/images/fraser/fraser_analysis.R b/images/fraser/fraser_analysis.R new file mode 100644 index 00000000..548b8c32 --- /dev/null +++ b/images/fraser/fraser_analysis.R @@ -0,0 +1,130 @@ +#!/usr/bin/env Rscript + +library(argparse) +library(FRASER) +library(readr) +library(TxDb.Hsapiens.UCSC.hg38.knownGene) +library(org.Hs.eg.db) +library(HDF5Array) +library(ggplot2) + +parser <- ArgumentParser(description = "FRASER 2.0 Statistical Analysis") +parser$add_argument("--fds_dir", required = TRUE, help = "Base directory containing the output folder") +parser$add_argument("--cohort_id", required = TRUE, help = "Cohort ID") +parser$add_argument("--pval_cutoff", type = "double", default = 0.05) +parser$add_argument("--delta_psi_cutoff", type = "double", default = 0.3) +parser$add_argument("--min_count", type = "integer", default = 5) +parser$add_argument("--nthreads", type = "integer", default = 1) +args <- parser$parse_args() + +# --- 1. Parallelization Setup --- +# Use the user-provided nthreads for all BiocParallel operations +bp <- MulticoreParam(workers = args$nthreads) +register(bp) + +# --- 2. Load the Dataset --- +options(delayedArray.block.size = 1e9) +fds_name <- paste0("FRASER_", args$cohort_id) +message(paste0("Loading Fraser Data Set: ", fds_name)) +fds <- loadFraserDataSet(dir = file.path(args$fds_dir, fds_name), name = fds_name) + +# REPAIR: If spliceSiteID is missing, FRASER 2.0 cannot calculate Jaccard/PSI +if(! "spliceSiteID" %in% colnames(mcols(fds, type="ss"))){ + message("Manually injecting Splice Site IDs...") + + # This internal FRASER call generates the mapping without needing the full constructor + # It populates the 'spliceSiteCoords' slot which calculatePSIValues needs + fds <- FRASER:::annotateSpliceSite(fds) + + # We also need to ensure the Jaccard-specific metadata is initialized + # Often, this is what's missing when the merge.data.table fails + if(is.null(mcols(fds, type="j")$startID)){ + # This maps junctions to the spliceSiteIDs we just generated + fds <- FRASER:::updateIndices(fds) + } +} + +# Create QC directory +dir.create("qc_plots", showWarnings = FALSE) + +# --- 3. Calculate Filter Values FIRST (required for plotting) --- +message("Calculating filter expression values...") +fds <- filterExpressionAndVariability(fds, minDeltaPsi = 0.0, filter = FALSE) + +# --- 4. Generate QC Plots --- +# DOWNSAMPLING FOR PLOTS: Use 30,000 random junctions for QC to keep it fast +set.seed(42) +plot_idx <- sample(nrow(fds), min(nrow(fds), 30000)) +fds_plot_subset <- fds[plot_idx, ] + +message("Generating QC plots using downsampled subset...") +png("qc_plots/filter_expression.png", width = 1200, height = 1200, res = 150) +plotFilterExpression(fds_plot_subset, bins = 100) +dev.off() + +# --- 5. Apply Filtering --- +message("Applying filtering based on calculated values...") +fds_filtered <- fds[mcols(fds, type = "j")[, "passed"], ] + +# Dimensionality Message +raw_dim <- nrow(fds) +filtered_dim <- nrow(fds_filtered) + +message(paste0("\n--- Filtering Summary ---")) +message(paste0("Original junctions: ", raw_dim)) +message(paste0("Filtered junctions: ", filtered_dim)) +if (raw_dim > 0) { message(paste0("Reduction: ", round((1 - (filtered_dim / raw_dim)) * 100, 2), "%")) } + +# --- 6. Hyperparameter Optimization --- +# Optimization must run on the filtered set +message("Optimizing encoding dimension (q)...") +opt_q <- bestQ(fds_filtered, type = "jaccard") + +png("qc_plots/best_q_optimization.png", width = 1200, height = 1200, res = 150) +plotEncDimSearch(fds_filtered, type = "jaccard") +dev.off() + +# --- 7. Fitting --- +message(paste0("Fitting FRASER model with q = ", opt_q, "...")) +fds_fit <- FRASER(fds_filtered, q = opt_q, type = "jaccard", BPPARAM = bp) + +# --- 8. Annotation (MUST happen BEFORE calculating gene-level p-values) --- +message("Annotating results with gene information...") +fds_fit <- annotateRangesWithTxDb(fds_fit, + txdb = TxDb.Hsapiens.UCSC.hg38.knownGene, + orgDb = org.Hs.eg.db) + +# --- 9. Calculate gene-level p-values (after annotation) --- +message("Calculating gene-level p-values...") +fds_fit <- calculatePadjValues(fds_fit, type = "jaccard") + +# --- 10. QQ Plot (after gene-level p-values are calculated) --- +message("Generating QQ plot...") +tryCatch({ + png("qc_plots/qq_plot.png", width = 1200, height = 1200, res = 150) + plotQQ(fds_fit, aggregate = TRUE, global = TRUE) + dev.off() +}, error = function(e) { + dev.off() # Close any open graphics device + message("Warning: QQ plot generation failed: ", e$message) + message("Skipping QQ plot and continuing with analysis...") +}) + +# --- 11. Results & Compressed Export --- +message("Extracting results...") +res <- results(fds_fit, + padjCutoff = args$pval_cutoff, + deltaPsiCutoff = args$delta_psi_cutoff, + minCount = args$min_count) + +# Extract all results for Jaccard +res_all <- results(fds_fit, padjCutoff = 1, deltaPsiCutoff = 0, minCount = 0) + +message("Saving results...") +write_csv(as.data.frame(res), paste0(args$cohort_id, ".significant.csv")) +write_csv(as.data.frame(res_all), paste0(args$cohort_id, ".all_results.csv.gz")) + +# --- 12. Final Save --- +message("Saving final FRASER object...") +saveFraserDataSet(fds_fit, dir = getwd(), name = paste0(args$cohort_id, "_final")) +message("Analysis Complete.") diff --git a/images/fraser/fraser_count_non_split.R b/images/fraser/fraser_count_non_split.R new file mode 100644 index 00000000..9456df66 --- /dev/null +++ b/images/fraser/fraser_count_non_split.R @@ -0,0 +1,78 @@ +#!/usr/bin/env Rscript + +# Set memory limit - increased slightly to allow for HDF5 overhead +Sys.setenv("R_MAX_VSIZE" = "16Gb") + +library(argparse) +library(FRASER) +library(BiocParallel) + +parser <- ArgumentParser(description = "Count Non-Split Reads for a Single Sample") +parser$add_argument("--fds_path", required = TRUE, help = "Path to localized FDS RDS file") +parser$add_argument("--bam_path", required = TRUE, help = "Path to the localized BAM file") +parser$add_argument("--cohort_id", required = TRUE, help = "Cohort ID") +parser$add_argument("--sample_id", required = TRUE, help = "Sample ID") +parser$add_argument("--coords_path", required = TRUE, help = "Path to splice_site_coords.RDS") +parser$add_argument("--work_dir", default = "/io/work", help = "Working directory") +parser$add_argument("--nthreads", type = "integer", default = 1, help = "Number of threads") +args <- parser$parse_args() + +# 1. Reconstruct Directory Structure +fds_name <- paste0("FRASER_", args$cohort_id) +save_dir <- file.path(args$work_dir, "savedObjects", fds_name) +dir.create(save_dir, recursive = TRUE, showWarnings = FALSE) +file.copy(args$fds_path, file.path(save_dir, "fds-object.RDS")) + +# Create the cache directory WITHOUT the fds_name subdirectory +# This matches what the merge script expects +cache_dir <- file.path(args$work_dir, "cache", "nonSplicedCounts") +dir.create(cache_dir, recursive = TRUE, showWarnings = FALSE) + +# 2. Configure Parallelism and HDF5 +options("FRASER.maxSamplesNoHDF5" = 0) +options("FRASER.maxJunctionsNoHDF5" = -1) + +# 3. Load Dataset and Coordinates +fds <- loadFraserDataSet(dir = args$work_dir, name = fds_name) +filtered_coords <- readRDS(args$coords_path) # This is your splice_site_coords.RDS +# Add this before calling countNonSplicedReads +colData(fds)$bamFile <- args$bam_path + +# Set strand specificity +strandSpecific(fds) <- 2 + +# 5. Run Non-Split Counting +# This writes the .h5 or .RDS file into the cache_dir created above +message(paste("Counting non-split reads for sample:", args$sample_id)) + +sample_result <- countNonSplicedReads(args$sample_id, + splitCountRanges = NULL, + fds = fds, + NcpuPerSample = args$nthreads, + minAnchor = 5, + recount = TRUE, + spliceSiteCoords = filtered_coords) +# 6. Verification +# Define the actual subdirectory FRASER uses: cache/nonSplicedCounts/FRASER_{cohort_id}/ +actual_cache_dir <- file.path(args$work_dir, "cache", "nonSplicedCounts", fds_name) +expected_h5 <- file.path(actual_cache_dir, paste0("nonSplicedCounts-", args$sample_id, ".h5")) +expected_rds <- file.path(actual_cache_dir, paste0("nonSplicedCounts-", args$sample_id, ".RDS")) + +if (file.exists(expected_h5)) { + message("Success: Created non-split counts (HDF5) at ", expected_h5) +} else if (file.exists(expected_rds)) { + message("Success: Created non-split counts (RDS) at ", expected_rds) +} else { + # Debugging: List files in the directory to see what actually happened + if(dir.exists(actual_cache_dir)){ + message("Files found in cache dir: ", paste(list.files(actual_cache_dir), collapse=", ")) + } + else { + message("Cache directory not found: ", actual_cache_dir) + } + + if(!file.exists(paste0(args$bam_path, ".bai"))){ + stop("BAM Index (.bai) missing. FRASER cannot perform random access.") + } + stop(paste("Counting failed. Output not found for sample:", args$sample_id)) +} diff --git a/images/fraser/fraser_count_split.R b/images/fraser/fraser_count_split.R new file mode 100644 index 00000000..cf2e8760 --- /dev/null +++ b/images/fraser/fraser_count_split.R @@ -0,0 +1,74 @@ +#!/usr/bin/env Rscript + +# Set memory limit - increased slightly to allow for HDF5 overhead +Sys.setenv("R_MAX_VSIZE" = "16Gb") + +library(argparse) +library(FRASER) +library(BiocParallel) + +parser <- ArgumentParser(description = "Count Split Reads for a Single Sample") +parser$add_argument("--fds_path", required = TRUE, help = "Path to localized FDS RDS file") +parser$add_argument("--bam_path", required = TRUE, help = "Path to the specific BAM file for this sample") +parser$add_argument("--cohort_id", required = TRUE, help = "Cohort ID") +parser$add_argument("--sample_id", required = TRUE, help = "Sample ID to count") +parser$add_argument("--work_dir", default = "/io/work", help = "Working directory") +parser$add_argument("--nthreads", type = "integer", default = 1, help = "Number of threads") +args <- parser$parse_args() + +# 1. Reconstruct the FRASER directory structure so loadFraserDataSet works +# FRASER expects: {work_dir}/savedObjects/FRASER_{cohort_id}/fds-object.RDS +fds_dir_name <- paste0("FRASER_", args$cohort_id) +save_dir <- file.path(args$work_dir, "savedObjects", fds_dir_name) +dir.create(save_dir, recursive = TRUE, showWarnings = FALSE) +file.copy(args$fds_path, file.path(save_dir, "fds-object.RDS"), overwrite = TRUE) + +# 2. Force HDF5 and Configure Parallelism +options("FRASER.maxSamplesNoHDF5" = 0) +options("FRASER.maxJunctionsNoHDF5" = -1) +# Use MulticoreParam if threads > 1, else Serial +if(args$nthreads > 1){ + bpparam <- MulticoreParam(workers = args$nthreads) +} else { + bpparam <- SerialParam() +} + +# 3. Load and Prune IMMEDIATELY +fds <- loadFraserDataSet(dir = args$work_dir, name = fds_dir_name) + +strandSpecific(fds) <- 2 + +# SUBSET FIRST: This is the most critical memory-saving step. +# By subsetting here, we drop the metadata of all other samples. +fds <- fds[, fds$sampleID == args$sample_id] + +# Validate the BAM path - Ensure the R script sees what Hail localized +if(!file.exists(args$bam_path)){ + stop(paste("BAM file not found at:", args$bam_path)) +} +colData(fds)$bamFile <- args$bam_path + +# 4. Count Split Reads +message(paste("Starting split read counting for sample:", args$sample_id)) + +# In FRASER 2.0, we use getSplitReadCountsForAllSamples with recount=TRUE. +# This writes an RDS file to the cache which we will harvest in the merge step. +fds <- getSplitReadCountsForAllSamples( + fds, + recount = TRUE, + BPPARAM = bpparam +) + +# 5. Verification +# FRASER saves individual counts to: cache/splitCounts/splitCounts-{sample_id}.RDS +expected_out <- file.path(args$work_dir, "cache", "splitCounts", paste0("splitCounts-", args$sample_id, ".RDS")) + +if (file.exists(expected_out)) { + message("Success: Created split counts at ", expected_out) +} else { + # Check for common Bioinformatic failures + if(!file.exists(paste0(args$bam_path, ".bai"))){ + stop("BAM Index (.bai) missing. FRASER cannot perform random access counting.") + } + stop("Counting failed. The RDS file was not generated in the cache.") +} diff --git a/images/fraser/fraser_init.R b/images/fraser/fraser_init.R new file mode 100644 index 00000000..243ce91f --- /dev/null +++ b/images/fraser/fraser_init.R @@ -0,0 +1,55 @@ +library(argparse) +library(FRASER) +library(BiocParallel) + +parser <- ArgumentParser(description = "Initialize FRASER Data Set") +parser$add_argument("--cohort_id", required = TRUE, help = "Cohort ID") +parser$add_argument("--sample_map", required = TRUE, help = "Path to CSV with sample_id and bam_path") +parser$add_argument("--work_dir", default = "/io/work", help = "Working directory for FRASER") +parser$add_argument("--nthreads", type = "integer", default = 1, help = "Number of threads") +args <- parser$parse_args() + +# 1. Read the static mapping created by the Python job +sample_map <- read.csv(args$sample_map, stringsAsFactors = FALSE) +# Define the dataset name consistently with the Python expectations +fds_name <- paste0("FRASER_", args$cohort_id) + + +sample_table <- DataFrame( + sampleID = as.character(sample_map$sample_id), + bamFile = as.character(sample_map$bam_path), + pairedEnd = TRUE +) +options("FRASER.maxSamplesNoHDF5" = 0) +options("FRASER.maxJunctionsNoHDF5" = -1) + +fds <- FraserDataSet( + colData = sample_table, + workingDir = args$work_dir, + name = fds_name +) + +# Setup parallel execution +bp <- MulticoreParam(workers = args$nthreads) +register(bp) + + +# We calculate initial metadata/PSI skeleton + +fds <- saveFraserDataSet(fds) + +# Print location for Python to capture if needed +fds_save_path <- file.path( + args$work_dir, + "savedObjects", + fds_name, + "fds-object.RDS" +) + +if (file.exists(fds_save_path)) { + message("Successfully initialized FDS skeleton at: ", fds_save_path) + # Output path for Hail logs + cat(fds_save_path, "\n") +} else { + stop("FDS object was not saved to: ", fds_save_path) +} diff --git a/images/fraser/fraser_join_counts.R b/images/fraser/fraser_join_counts.R new file mode 100644 index 00000000..895d682f --- /dev/null +++ b/images/fraser/fraser_join_counts.R @@ -0,0 +1,184 @@ +#!/usr/bin/env Rscript +library(argparse) +library(FRASER) +library(SummarizedExperiment) +library(HDF5Array) +library(BiocParallel) + +parser <- ArgumentParser(description = "Join Split and Non-Split Counts") +parser$add_argument("--fds_path", required = TRUE) +parser$add_argument("--cohort_id", required = TRUE) +parser$add_argument("--work_dir", default = "/io/work") +parser$add_argument("--nthreads", type = "integer", default = 1) +args <- parser$parse_args() + +# Configure FRASER to use HDF5 backend +options("FRASER.maxSamplesNoHDF5" = 0) +options("FRASER.maxJunctionsNoHDF5" = -1) + +# Setup parallel processing +bp <- MulticoreParam(workers = args$nthreads) +register(bp) + +check_fds_integrity <- function(fds) { + message("\n--- Running FDS Integrity Check ---") + + # 1. Check for basic ID existence + j_cols <- colnames(mcols(fds, type="j")) + ss_cols <- colnames(mcols(fds, type="ss")) + + has_ids <- all(c("startID", "endID") %in% j_cols) && ("spliceSiteID" %in% ss_cols) + + if(has_ids) { + message("SUCCESS: SpliceSiteIDs found in both Junctions and SpliceSites.") + } else { + stop("CRITICAL ERROR: SpliceSiteID metadata is missing. Analysis will fail.") + } + + # 2. Check for ID alignment (The "by.y" error test) + missing_starts <- !all(mcols(fds, type="j")$startID %in% mcols(fds, type="ss")$spliceSiteID) + if(missing_starts) { + stop("CRITICAL ERROR: Junction startIDs do not match SpliceSiteIDs. Map is broken.") + } + message("SUCCESS: Junction IDs are correctly mapped to Splice Sites.") + + # 3. Check HDF5 Backend connectivity + tryCatch({ + test_val <- as.matrix(counts(fds, type="j")[1, 1]) + message("SUCCESS: HDF5 backends are reachable and readable.") + }, error = function(e) { + stop("CRITICAL ERROR: HDF5 files are missing or paths are broken: ", e$message) + }) + + # 4. Check for Non-Split Counts + if("rawCountsSS" %in% assayNames(fds)) { + message("SUCCESS: Non-split counts (rawCountsSS) are present.") + } else { + warning("WARNING: Non-split counts are missing. Jaccard/PSI calculation may fail.") + } + + message("--- Integrity Check Passed ---\n") +} + +fds_name <- paste0("FRASER_", args$cohort_id) +saveDir <- file.path(args$work_dir, "savedObjects", fds_name) + +# 1. Load the FDS object +message("\n[1/7] Loading Fraser Data Set from RDS...") +fds <- readRDS(args$fds_path) +workingDir(fds) <- saveDir + + +# 2. Load split counts +message("\n[2/7] Loading split counts...") +split_se_dir <- file.path(saveDir, "splitCounts") +if(!dir.exists(split_se_dir)){ + stop(paste("Missing splitCounts directory at:", split_se_dir)) +} +splitCounts_se <- loadHDF5SummarizedExperiment(dir = split_se_dir) +message("Split counts dimensions: ", nrow(splitCounts_se), " junctions x ", ncol(splitCounts_se), " samples") + +# Annotate split counts with splice site IDs +# NOTE: annotateSpliceSite() automatically extracts splice sites from junctions +# and generates consistent startID/endID mappings +message("Annotating split counts with splice site IDs...") +splitCounts_se <- FRASER:::annotateSpliceSite(splitCounts_se) + +# 3. Load merged non-split counts +message("\n[3/7] Loading merged non-split counts...") +merged_non_split_dir <- file.path(saveDir, "nonSplitCounts") +if(!dir.exists(merged_non_split_dir)){ + stop(paste("Missing merged non-split counts directory at:", merged_non_split_dir)) +} +nonSplitCounts_se <- loadHDF5SummarizedExperiment(dir = merged_non_split_dir) +message("Non-split counts dimensions: ", nrow(nonSplitCounts_se), " sites x ", ncol(nonSplitCounts_se), " samples") + +# CRITICAL FIX: Always re-annotate after loading to ensure consistency +# The saved HDF5 object might have stale or incompatible IDs +message("Re-annotating non-split counts with splice site IDs...") +nonSplitCounts_se <- FRASER:::annotateSpliceSite(nonSplitCounts_se) + +# 4. Filter split counts to only include junctions that reference the non-split splice sites +message("\n[4/7] Filtering split counts to match non-split coordinate space...") + +# Get the valid splice site IDs from non-split counts +nonsplit_ids <- mcols(nonSplitCounts_se)$spliceSiteID + +# Get split junction IDs +split_start_ids <- mcols(splitCounts_se)$startID +split_end_ids <- mcols(splitCounts_se)$endID + +# Find junctions where BOTH start and end are in the non-split coordinate set +valid_junctions <- (!is.na(split_start_ids) & !is.na(split_end_ids) & + split_start_ids %in% nonsplit_ids & + split_end_ids %in% nonsplit_ids) + +n_valid <- sum(valid_junctions) +n_total <- nrow(splitCounts_se) +message(" Junctions with both start/end in non-split coords: ", n_valid, " / ", n_total) +message(" Filtering out ", n_total - n_valid, " junctions that reference sites outside the coordinate set") + +if(n_valid == 0) { + stop("ERROR: No junctions have both start and end in the non-split coordinate set!") +} + +# Filter the split counts to only valid junctions +splitCounts_se_filtered <- splitCounts_se[valid_junctions, ] +message(" Filtered split counts dimensions: ", nrow(splitCounts_se_filtered), " junctions x ", + ncol(splitCounts_se_filtered), " samples") + +# Update the splitCounts_se to use the filtered version +splitCounts_se <- splitCounts_se_filtered + +# 5. Verify ID consistency after filtering +message("\n[5/7] Verifying ID consistency after filtering...") +split_start_ids_filtered <- na.omit(unique(mcols(splitCounts_se)$startID)) +split_end_ids_filtered <- na.omit(unique(mcols(splitCounts_se)$endID)) + +message(" Filtered split startIDs: ", length(split_start_ids_filtered)) +message(" Filtered split endIDs: ", length(split_end_ids_filtered)) +message(" Non-split spliceSiteIDs: ", length(nonsplit_ids)) + +# Check overlap - should be 100% now +missing_start <- sum(!split_start_ids_filtered %in% nonsplit_ids) +missing_end <- sum(!split_end_ids_filtered %in% nonsplit_ids) + +if(missing_start > 0 || missing_end > 0) { + stop("ERROR: After filtering, still have missing IDs! startIDs: ", missing_start, ", endIDs: ", missing_end) +} + +message("SUCCESS: All split junction IDs now exist in non-split coordinate set!") + +# 6. Add counts to FRASER object +message("\n[6/7] Joining split and non-split counts into FDS object...") +fds <- addCountsToFraserDataSet( + fds = fds, + splitCounts = splitCounts_se, + nonSplitCounts = nonSplitCounts_se +) + +message("Counts successfully joined!") +message(" - Split junctions: ", nrow(counts(fds, type = "j"))) +message(" - Splice sites: ", nrow(counts(fds, type = "ss"))) + + +# 7. Calculate PSI values +message("\n[7/7] Calculating PSI values...") +fds <- calculatePSIValues(fds, types = c("psi3", "psi5", "jaccard")) + +message("PSI values calculated successfully!") +message("Available assays: ", paste(assayNames(fds), collapse = ", ")) + +# Final integrity check +check_fds_integrity(fds) + +# 8. Save final FRASER object +message("\nSaving final integrated FDS...") +fds <- saveFraserDataSet(fds) + +message("\n=== FRASER join complete! ===") +message("FDS object saved in: ", workingDir(fds)) +message("\nNext steps for analysis:") +message(" 1. Filter junctions: fds <- filterExpressionAndVariability(fds)") +message(" 2. Fit model: fds <- FRASER(fds)") +message(" 3. Get results: results(fds)") diff --git a/images/fraser/fraser_merge_non_split.R b/images/fraser/fraser_merge_non_split.R new file mode 100644 index 00000000..5415c9b6 --- /dev/null +++ b/images/fraser/fraser_merge_non_split.R @@ -0,0 +1,205 @@ +#!/usr/bin/env Rscript + +library(argparse) +library(FRASER) +library(BiocParallel) +library(SummarizedExperiment) +library(HDF5Array) +library(rhdf5) + +parser <- ArgumentParser(description = "Merge Non-Split Counts and Calculate PSI") +parser$add_argument("--fds_path", required = TRUE, help = "Path to localized FDS RDS file") +parser$add_argument("--cohort_id", required = TRUE, help = "Cohort ID") +parser$add_argument("--filtered_ranges_path", required = TRUE, help = "Path to splice_site_coords.RDS") +parser$add_argument("--work_dir", default = "/io/work", help = "Working directory") +parser$add_argument("--nthreads", type = "integer", default = 1, help = "Number of threads") +args <- parser$parse_args() + +# Configure FRASER to use HDF5 backend +options("FRASER.maxSamplesNoHDF5" = 0) +options("FRASER.maxJunctionsNoHDF5" = -1) + +# 1. Setup directories +message("\n[1/6] Setting up directories...") +fds_name <- paste0("FRASER_", args$cohort_id) +save_dir <- file.path(args$work_dir, "savedObjects", fds_name) +out_dir <- file.path(save_dir, "nonSplitCounts") +dir.create(out_dir, recursive = TRUE, showWarnings = FALSE) + +# Copy FDS file only if source and destination are different +fds_dest <- file.path(save_dir, "fds-object.RDS") +if(normalizePath(args$fds_path, mustWork = FALSE) != normalizePath(fds_dest, mustWork = FALSE)) { + file.copy(args$fds_path, fds_dest, overwrite = TRUE) + message("Copied FDS from: ", args$fds_path) +} else { + message("FDS already in correct location: ", fds_dest) +} + +# 2. Load FDS to get sample information +message("\n[2/6] Loading FraserDataSet...") +fds <- loadFraserDataSet(dir = args$work_dir, name = fds_name) +sample_ids <- samples(fds) +n_samples <- length(sample_ids) +message("Samples: ", n_samples) + +# 3. Load the genomic ranges +message("\n[3/6] Loading genomic ranges...") +non_split_ranges <- readRDS(args$filtered_ranges_path) +n_sites <- length(non_split_ranges) +message("Non-split sites: ", n_sites) + +# 4. Copy H5 files from cache to the proper output directory +message("\n[4/6] Copying and organizing H5 files...") +cache_dir <- file.path(args$work_dir, "cache", "nonSplicedCounts") +h5_files <- list.files(cache_dir, pattern = "\\.h5$", full.names = TRUE) +message("Found ", length(h5_files), " HDF5 files in cache") + +if(length(h5_files) == 0) { + stop("ERROR: No H5 files found in cache directory: ", cache_dir) +} + +# Copy with standardized naming convention +for(i in seq_along(sample_ids)) { + sid <- sample_ids[i] + + # 1. Precise matching using word boundaries (\\b) + # This ensures "S1" doesn't match "S10" + match_idx <- grep(paste0("\\b", sid, "\\b"), h5_files) + + # 2. Safety check: Did we find a match? + if(length(match_idx) == 0) { + warning("No H5 file found for sample: ", sid) + next + } + + # 3. Extract the first matching file path + src_file <- h5_files[match_idx[1]] + + # 4. Define destination and copy + # Constructing the new filename: nonSplicedCounts-[sid].h5 + dest <- file.path(out_dir, paste0("nonSplicedCounts-", sid, ".h5")) + + success <- file.copy(src_file, dest, overwrite = TRUE) + + # 5. Provide feedback + if(success) { + message("Successfully Copied: ", sid) + message(" From: ", basename(src_file)) + message(" To: ", basename(dest)) + } else { + warning("Failed to copy file for sample: ", sid) + } +} + +# 5. Validate H5 files and build combined count matrix +message("\n[5/6] Building combined count matrix (memory-efficient)...") + +# Read first file to validate dimensions +first_h5 <- file.path(out_dir, paste0("nonSplicedCounts-", sample_ids[1], ".h5")) +if(!file.exists(first_h5)) { + stop("ERROR: First H5 file not found: ", first_h5) +} + +h5_info <- h5ls(first_h5) +dataset_row <- h5_info[h5_info$name == "nonSplicedCounts", ] +actual_rows <- as.numeric(strsplit(dataset_row$dim, " x ")[[1]][1]) + +if(actual_rows != n_sites) { + stop("ERROR: H5 files have ", actual_rows, " rows but ranges have ", n_sites, + " sites. Mismatch!") +} + +message("Validated: H5 dimensions match genomic ranges (", actual_rows, " sites)") + +# Create a combined count matrix by reading each sample one at a time +count_matrix <- matrix(0, nrow = n_sites, ncol = n_samples) +colnames(count_matrix) <- sample_ids + +total_counts <- 0 +for(i in seq_along(sample_ids)) { + sid <- sample_ids[i] + h5_file <- file.path(out_dir, paste0("nonSplicedCounts-", sid, ".h5")) + + if(!file.exists(h5_file)) { + warning("H5 file not found for sample ", sid, ", using zeros") + next + } + + message(" Reading [", i, "/", n_samples, "]: ", sid) + + # Read the counts data for this sample only + counts <- h5read(h5_file, "nonSplicedCounts") + if(is.matrix(counts)) { + counts <- counts[, 1] + } + + count_matrix[, i] <- as.vector(counts) + total_counts <- total_counts + sum(counts) +} + +message("Combined matrix dimensions: ", nrow(count_matrix), " x ", ncol(count_matrix)) +message("Total counts: ", total_counts) + +if(total_counts == 0) { + stop("ERROR: The merged non-split counts matrix is empty (all zeros).") +} + +# 6. Create HDF5-backed SummarizedExperiment +message("\n[6/6] Creating HDF5-backed SummarizedExperiment...") + +# Create SE with in-memory matrix +nonSplitCounts_se <- SummarizedExperiment( + assays = list(rawCountsSS = count_matrix), + rowRanges = non_split_ranges, + colData = DataFrame(sampleID = sample_ids, row.names = sample_ids) +) + +message("Non-split SE created with dimensions: ", nrow(nonSplitCounts_se), + " x ", ncol(nonSplitCounts_se)) + +# Annotate non-split counts with splice site IDs +message("Annotating non-split counts with splice site IDs...") +nonSplitCounts_se <- FRASER:::annotateSpliceSite(nonSplitCounts_se) + +# Save the SummarizedExperiment - this will automatically create HDF5 backend +message("Saving non-split HDF5 SummarizedExperiment...") +saveHDF5SummarizedExperiment(nonSplitCounts_se, dir = out_dir, replace = TRUE) + +# Reload it to get the proper HDF5-backed version +message("Reloading HDF5-backed version...") +non_split_counts <- loadHDF5SummarizedExperiment(dir = out_dir) + +# Final verification +if(!"rawCountsSS" %in% assayNames(non_split_counts)) { + stop("ERROR: 'rawCountsSS' assay not found in reloaded object.") +} + +message("Verification: Reloaded object has ", sum(assay(non_split_counts, "rawCountsSS")), " total counts") + +# 7. Configure Backend for FDS +message("\nConfiguring FRASER backend...") +bp <- MulticoreParam(workers = args$nthreads) +register(bp) + +# Set BAM file metadata (for validation purposes) +available_bams <- list.files("/io/batch/input_bams", pattern = "\\.bam$", full.names = TRUE) +if(length(available_bams) > 0) { + ref_bam <- available_bams[1] + message("Validating metadata against reference BAM: ", ref_bam) + colData(fds)$bamFile <- ref_bam + seqlevelsStyle(fds) <- seqlevelsStyle(Rsamtools::BamFile(ref_bam)) +} else { + warning("WARNING: No BAM files found in /io/batch/input_bams.") +} +strandSpecific(fds) <- 2 + +# 8. Update FDS object with the merged non-split counts +message("\nUpdating FDS object with merged non-split counts...") +nonSplicedReads(fds) <- non_split_counts +fds <- saveFraserDataSet(fds) + +message("\n=== Non-split merge complete! ===") +message("Final stats:") +message(" - Sites: ", nrow(non_split_counts)) +message(" - Samples: ", ncol(non_split_counts)) +message(" - Total counts: ", sum(assay(non_split_counts, "rawCountsSS"))) diff --git a/images/fraser/fraser_merge_split.R b/images/fraser/fraser_merge_split.R new file mode 100644 index 00000000..13771414 --- /dev/null +++ b/images/fraser/fraser_merge_split.R @@ -0,0 +1,88 @@ +#!/usr/bin/env Rscript + +library(argparse) +library(FRASER) +library(BiocParallel) +library(SummarizedExperiment) +library(Rsamtools) +library(DelayedMatrixStats) +library(HDF5Array) + + +parser <- ArgumentParser(description = "Merge Split Read Counts") +parser$add_argument("--fds_path", required = TRUE, help = "Path to FDS RDS file") +parser$add_argument("--cohort_id", required = TRUE, help = "Cohort ID") +parser$add_argument("--work_dir", default = "/io/work", help = "Working directory") +parser$add_argument("--nthreads", type = "integer", default = 1, help = "Number of threads") +args <- parser$parse_args() + +# 1. Reconstruct Directory Structure +fds_name <- paste0("FRASER_", args$cohort_id) +save_dir <- file.path(args$work_dir, "savedObjects", fds_name) +dir.create(save_dir, recursive = TRUE, showWarnings = FALSE) +file.copy(args$fds_path, file.path(save_dir, "fds-object.RDS"), overwrite = TRUE) + +# 2. Configure Backend +options("FRASER.maxSamplesNoHDF5" = 0) +options("FRASER.maxJunctionsNoHDF5" = -1) + + +bp <- MulticoreParam(workers = args$nthreads) +register(bp) + +# 3. Load Dataset +# Match the name exactly as defined in the init stage +fds <- loadFraserDataSet(dir = args$work_dir, name = fds_name) + + +available_bams <- list.files("/io/batch/input_bams", pattern = "\\.bam$", full.names = TRUE) + +if(length(available_bams) > 0){ + message("Using reference BAM for metadata: ", available_bams[1]) + + # Force-update colData for ALL samples to point to a valid file. + # This prevents the 'dummy.bam' error during internal validation. + new_colData <- colData(fds) + new_colData$bamFile <- available_bams[1] + colData(fds) <- new_colData + + # Manually assert the seqlevelsStyle (e.g., 'UCSC' or 'Ensembl') + # This satisfies the internal check that is currently crashing. + seqlevelsStyle(fds) <- seqlevelsStyle(BamFile(available_bams[1])) +} else { + stop("CRITICAL ERROR: No BAM files found in /io/batch/input_bams.") +} +strandSpecific(fds) <- 2 +# -------------------------------------------- +minExpressionInOneSample <- 20 +# 4. Merge individual split count RDS files from the cache +# FRASER automatically looks in: {work_dir}/cache/splitCounts/ +message("Merging split counts from cache...") +splitCounts <- getSplitReadCountsForAllSamples(fds=fds, recount=FALSE) + +# --- CRITICAL FIX: Annotate the object ITSELF, not a temporary variable --- +# This generates startID and endID and attaches them to the rowRanges +message("Generating Splice Site IDs...") +rowRanges(splitCounts) <- FRASER:::annotateSpliceSite(rowRanges(splitCounts)) + +# 5. Now save the Annotated SE +message("Saving merged split counts SummarizedExperiment with IDs...") +split_counts_dir <- file.path(save_dir, "splitCounts") +dir.create(split_counts_dir, recursive = TRUE, showWarnings = FALSE) +saveHDF5SummarizedExperiment(splitCounts, dir = split_counts_dir, replace = TRUE) + +# 6. Extract coordinates for the next pipeline steps +# We use the rowRanges that NOW HAVE the startID/endID columns +splitCountRangesNonFilt <- rowRanges(splitCounts) + +maxCount <- rowMaxs(assay(splitCounts, "rawCountsJ")) +passed <- maxCount >= minExpressionInOneSample +# extract granges after filtering +splitCountRanges <- splitCountRangesNonFilt[passed,] +spliceSiteCoords <- FRASER:::extractSpliceSiteCoordinates(splitCountRanges) + + +# Use absolute paths for saving to match Python 'mv' commands +saveRDS(splitCountRangesNonFilt, file.path(args$work_dir, "g_ranges_split_counts.RDS")) +saveRDS(splitCountRanges, file.path(args$work_dir, "g_ranges_non_split_counts.RDS")) +saveRDS(spliceSiteCoords, file.path(args$work_dir, "splice_site_coords.RDS"))