From 4dfee7ab9cb1104c01dd807584eb46d87cdce76c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=AE=87=E4=BA=AE?= Date: Tue, 12 Aug 2025 17:47:22 +0800 Subject: [PATCH] parallel support to speed up on large dataset function **process_abundance_expression_info** and **get_DE_info now**support multi-threads couputation by pass params BPPARAM = BPPARAM. --- R/expression_processing.R | 14 +++---- R/muscat_de.R | 6 +-- R/pipeline_wrappers.R | 78 ++++++++++++--------------------------- 3 files changed, 33 insertions(+), 65 deletions(-) diff --git a/R/expression_processing.R b/R/expression_processing.R index cc8c187..84994b0 100644 --- a/R/expression_processing.R +++ b/R/expression_processing.R @@ -99,7 +99,7 @@ get_muscat_exprs_frac = function(sce, sample_id, celltype_id, group_id){ #' #' @export #' -get_muscat_exprs_avg = function(sce, sample_id, celltype_id, group_id){ +get_muscat_exprs_avg = function(sce, sample_id, celltype_id, group_id,...){ requireNamespace("dplyr") @@ -121,7 +121,7 @@ get_muscat_exprs_avg = function(sce, sample_id, celltype_id, group_id){ # sce = suppressWarnings(scater::logNormCounts(sce)) } - avg = muscat::aggregateData(sce, assay = "logcounts", fun = "mean", by = c("cluster_id", "sample_id")) + avg = muscat::aggregateData(sce, assay = "logcounts", fun = "mean", by = c("cluster_id", "sample_id"),...) avg_df = sce$cluster_id %>% unique() %>% lapply(function(celltype_oi, avg){ avg_celltype = avg@assays@data[[celltype_oi]] @@ -162,7 +162,7 @@ get_muscat_exprs_avg = function(sce, sample_id, celltype_id, group_id){ #' #' @export #' -get_pseudobulk_logCPM_exprs = function(sce, sample_id, celltype_id, group_id, batches = NA, assay_oi_pb = "counts", fun_oi_pb = "sum"){ +get_pseudobulk_logCPM_exprs = function(sce, sample_id, celltype_id, group_id, batches = NA, assay_oi_pb = "counts", fun_oi_pb = "sum",...){ requireNamespace("dplyr") @@ -174,7 +174,7 @@ get_pseudobulk_logCPM_exprs = function(sce, sample_id, celltype_id, group_id, ba sid = "id", # sample IDs (ctrl/stim.1234) drop = FALSE) # - pb = muscat::aggregateData(sce, assay = assay_oi_pb, fun = fun_oi_pb, by = c("cluster_id", "sample_id")) + pb = muscat::aggregateData(sce, assay = assay_oi_pb, fun = fun_oi_pb, by = c("cluster_id", "sample_id"),...) # Prepare the design (group and batch effects) for the combat correction if(length(batches) > 1){ @@ -363,7 +363,7 @@ fix_frq_df = function(sce, frq_celltype_samples){ #' #' @export #' -get_avg_pb_exprs = function(sce, sample_id, celltype_id, group_id, batches = NA, min_cells = 10){ +get_avg_pb_exprs = function(sce, sample_id, celltype_id, group_id, batches = NA, min_cells = 10,...){ requireNamespace("dplyr") @@ -429,10 +429,10 @@ get_avg_pb_exprs = function(sce, sample_id, celltype_id, group_id, batches = NA, ## calculate averages, fractions, relative abundance of a cell type in a group # calculate average expression - avg_df = get_muscat_exprs_avg(sce, sample_id = sample_id, celltype_id = celltype_id, group_id = group_id) + avg_df = get_muscat_exprs_avg(sce, sample_id = sample_id, celltype_id = celltype_id, group_id = group_id,...) # calculate pseudobulked counts - pb_df = get_pseudobulk_logCPM_exprs(sce, sample_id = sample_id, celltype_id = celltype_id, group_id = group_id, batches = batches, assay_oi_pb = "counts", fun_oi_pb = "sum") # should be these parameters + pb_df = get_pseudobulk_logCPM_exprs(sce, sample_id = sample_id, celltype_id = celltype_id, group_id = group_id, batches = batches, assay_oi_pb = "counts", fun_oi_pb = "sum",...) # should be these parameters # check whether something needs to be fixed if(nrow(avg_df %>% dplyr::filter(is.na(average_sample))) > 0 | nrow(avg_df %>% dplyr::filter(is.nan(average_sample))) > 0) { diff --git a/R/muscat_de.R b/R/muscat_de.R index ebc1db1..77c4b00 100644 --- a/R/muscat_de.R +++ b/R/muscat_de.R @@ -48,7 +48,7 @@ #' @export #' #' -perform_muscat_de_analysis = function(sce, sample_id, celltype_id, group_id, batches, covariates, contrasts, expressed_df, assay_oi_pb = "counts", fun_oi_pb = "sum", de_method_oi = "edgeR", min_cells = 10){ +perform_muscat_de_analysis = function(sce, sample_id, celltype_id, group_id, batches, covariates, contrasts, expressed_df, assay_oi_pb = "counts", fun_oi_pb = "sum", de_method_oi = "edgeR", min_cells = 10,...){ requireNamespace("dplyr") if (class(sce) != "SingleCellExperiment") { @@ -209,7 +209,7 @@ perform_muscat_de_analysis = function(sce, sample_id, celltype_id, group_id, bat pb = muscat::aggregateData(sce, assay = assay_oi_pb, fun = fun_oi_pb, - by = c("cluster_id", "sample_id")) + by = c("cluster_id", "sample_id"),...) if(assay_oi_pb == "counts"){ libsizes = colSums(SummarizedExperiment::assay(pb)) @@ -306,7 +306,7 @@ perform_muscat_de_analysis = function(sce, sample_id, celltype_id, group_id, bat pb = pb[genes_to_keep, ] # run DS analysis - res = muscat::pbDS(pb, method = de_method_oi , design = design, contrast = contrast, min_cells = min_cells, verbose = FALSE, filter = "none") + res = muscat::pbDS(pb, method = de_method_oi , design = design, contrast = contrast, min_cells = min_cells, verbose = FALSE, filter = "none",...) de_output_tidy = muscat::resDS(sce, res, bind = "row", cpm = FALSE, frq = FALSE) %>% tibble::as_tibble() %>% dplyr::rename(p_adj = p_adj.glb) # # check which cell types were excluded diff --git a/R/pipeline_wrappers.R b/R/pipeline_wrappers.R index c310699..7178346 100644 --- a/R/pipeline_wrappers.R +++ b/R/pipeline_wrappers.R @@ -282,7 +282,7 @@ get_abundance_info = function(sce, sample_id, group_id, celltype_id, min_cells = #' #' @export #' -process_abundance_expression_info = function(sce, sample_id, group_id, celltype_id, min_cells = 10, senders_oi, receivers_oi, lr_network, batches = NA, frq_list, abundance_info){ +process_abundance_expression_info = function(sce, sample_id, group_id, celltype_id, min_cells = 10, senders_oi, receivers_oi, lr_network, batches = NA, frq_list, abundance_info,...){ requireNamespace("dplyr") requireNamespace("ggplot2") @@ -329,8 +329,7 @@ process_abundance_expression_info = function(sce, sample_id, group_id, celltype_ sample_id = sample_id, celltype_id = celltype_id, group_id = group_id, - batches = batches, - min_cells = min_cells)) + batches = batches,...)) celltype_info$frq_df = frq_list$frq_df celltype_info$frq_df_group = frq_list$frq_df_group @@ -405,7 +404,7 @@ process_abundance_expression_info = function(sce, sample_id, group_id, celltype_ #' @export #' #' -get_DE_info = function (sce, sample_id, group_id, celltype_id, batches, covariates, contrasts_oi, expressed_df, min_cells = 10, assay_oi_pb = "counts", fun_oi_pb = "sum", de_method_oi = "edgeR", findMarkers = FALSE, contrast_tbl = NULL) { +get_DE_info = function (sce, sample_id, group_id, celltype_id, batches, covariates, contrasts_oi, expressed_df, min_cells = 10, assay_oi_pb = "counts", fun_oi_pb = "sum", de_method_oi = "edgeR", findMarkers = FALSE, contrast_tbl = NULL,...) { requireNamespace("dplyr") requireNamespace("ggplot2") if (class(sce) != "SingleCellExperiment") { @@ -571,61 +570,30 @@ get_DE_info = function (sce, sample_id, group_id, celltype_id, batches, covariat } celltypes = SummarizedExperiment::colData(sce)[, celltype_id] %>% unique() - - #DE_list = celltypes %>% lapply(function(celltype_oi, sce) { - # sce_oi = sce[, SummarizedExperiment::colData(sce)[, celltype_id] == - # celltype_oi] - # DE_result = tryCatch({ - # perform_muscat_de_analysis(sce = sce_oi, sample_id = sample_id, - # celltype_id = celltype_id, group_id = group_id, - # batches = batches, covariates = covariates, contrasts = contrasts_oi, - # expressed_df = expressed_df, assay_oi_pb = assay_oi_pb, - # fun_oi_pb = fun_oi_pb, de_method_oi = de_method_oi, - # min_cells = min_cells) - # }, error = function(cond) { - # message(paste0("perform_muscat_de_analysis errored for celltype: ", - # celltype_oi)) - # message("Here's the original error message:") - # message(cond) - # message("") - # print(cond) - # message(paste0("perform_muscat_de_analysis errored for celltype: ", - # celltype_oi)) - # message("") - # print("In case: Error in x[[1]]: subscript out of bounds: this likely means that there are not enough samples per group with sufficient cells of this cell type. This cell type will thus be ignored for further analyses, other cell types will still be considered.") - # return(NA) - # }) - #}, sce) - - DE_list <- celltypes %>% lapply(function(celltype_oi, sce) { - sce_oi <- sce[, SummarizedExperiment::colData(sce)[[celltype_id]] == celltype_oi] - - DE_result <- tryCatch({ - result <- perform_muscat_de_analysis( - sce = sce_oi, - sample_id = sample_id, - celltype_id = celltype_id, - group_id = group_id, - batches = batches, - covariates = covariates, - contrasts = contrasts_oi, - expressed_df = expressed_df, - assay_oi_pb = assay_oi_pb, - fun_oi_pb = fun_oi_pb, - de_method_oi = de_method_oi, - min_cells = min_cells - ) - message(paste0("Success for cell type: ", celltype_oi)) - result + DE_list = celltypes %>% lapply(function(celltype_oi, sce) { + sce_oi = sce[, SummarizedExperiment::colData(sce)[, celltype_id] == + celltype_oi] + DE_result = tryCatch({ + perform_muscat_de_analysis(sce = sce_oi, sample_id = sample_id, + celltype_id = celltype_id, group_id = group_id, + batches = batches, covariates = covariates, contrasts = contrasts_oi, + expressed_df = expressed_df, assay_oi_pb = assay_oi_pb, + fun_oi_pb = fun_oi_pb, de_method_oi = de_method_oi, + min_cells = min_cells,...) }, error = function(cond) { - message(paste0("❌ Error for cell type: ", celltype_oi)) - message("Error message: ", conditionMessage(cond)) - message("In case: Error in x[[1]]: subscript out of bounds: this likely means that there are not enough samples per group with sufficient cells of this cell type. This cell type will thus be ignored for further analyses, other cell types will still be considered.") + message(paste0("perform_muscat_de_analysis errored for celltype: ", + celltype_oi)) + message("Here's the original error message:") + message(cond) + message("") + print(cond) + message(paste0("perform_muscat_de_analysis errored for celltype: ", + celltype_oi)) + message("") + print("In case: Error in x[[1]]: subscript out of bounds: this likely means that there are not enough samples per group with sufficient cells of this cell type. This cell type will thus be ignored for further analyses, other cell types will still be considered.") return(NA) }) - return(DE_result) }, sce) - celltype_de = list(de_output = c(DE_list %>% purrr::map("de_output")), de_output_tidy = DE_list %>% purrr::map("de_output_tidy") %>% bind_rows())