Skip to content

Commit

Permalink
Merge pull request #113 from rajewsky-lab/fix-star-shm-concurrency
Browse files Browse the repository at this point in the history
Fix star shm concurrency
  • Loading branch information
nukappa authored Apr 25, 2024
2 parents 6b7b50d + a854c0c commit 41c65ef
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 33 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
copyright = '2021-2024, Rajewsky Lab'
author = 'Tamas Ryszard Sztanka-Toth, Marvin Jens, Nikos Karaiskos, Nikolaus Rajewsky'

version = '0.7.8'
version = '0.7.9'
release = version

# -- General configuration
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = spacemake
version = 0.7.8
version = 0.7.9
author = Tamas Ryszard Sztanka-Toth, Marvin Jens, Nikos Karaiskos, Nikolaus Rajewsky
author_email = TamasRyszard.Sztanka-Toth@mdc-berlin.de
description = A bioinformatic pipeline for the analysis of spatial transcriptomic data
Expand Down
2 changes: 1 addition & 1 deletion spacemake/smk.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def spacemake_run(pdf, args):
configfiles=[config_path],
cores=args["cores"],
dryrun=args["dryrun"],
targets=['get_stats_prealigned_barcodes'],
targets=['get_stats_prealigned_barcodes', 'unload_genome_flag'],
touch=args["touch"],
force_incomplete=args["rerun_incomplete"],
keepgoing=args["keep_going"],
Expand Down
77 changes: 47 additions & 30 deletions spacemake/snakemake/mapping.smk
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import tempfile
import uuid

"""
This module implements the mapping-strategy feature. This gives the freedom to define
Expand Down Expand Up @@ -39,8 +40,10 @@ bt2_rRNA_log = complete_data_root + "/rRNA.bowtie2.bam.log"
star_index = 'species_data/{species}/{ref_name}/star_index'
star_index_param = star_index
star_index_file = star_index + '/SAindex'
star_index_loaded = star_index + '/genomeLoad.done'
star_index_unloaded = star_index + '/genomeUnload.done'
star_index_locked = star_index + '/smk.indexlocked.{species}.{ref_name}'
star_index_locked_current = star_index_locked + f'.{uuid.uuid4()}'
star_index_loaded = '{species}.{ref_name}.genomeLoad.done'
star_index_unloaded = '{species}.{ref_name}.genomeUnload.done'
star_index_log_location = 'species_data/{species}/{ref_name}/.star_index_logs'

bt2_index = 'species_data/{species}/{ref_name}/bt2_index'
Expand Down Expand Up @@ -228,6 +231,26 @@ def mapstr_to_targets(mapstr, left="uBAM", final="final"):
return map_rules, link_rules


def get_star_flag(flag, default_strategy="STAR:genome:final"):
out_files = []

for index, row in project_df.df.iterrows():
map_strategy = getattr(row, "map_strategy", default_strategy)
map_rules, _ = mapstr_to_targets(map_strategy, left=ubam_input, final=final_target)
is_merged = project_df.get_metadata(
"is_merged", project_id=index[0], sample_id=index[1]
)
if is_merged:
continue

for mr in map_rules:
if mr.mapper == "STAR":
out_files += expand(flag, species=row.species, ref_name=mr.ref_name)

return set(out_files)



def get_mapped_BAM_output(default_strategy="STAR:genome:final"):
"""
This function is called from main.smk at least once
Expand Down Expand Up @@ -456,7 +479,8 @@ rule map_reads_STAR:
input:
# bam=lambda wc: BAM_DEP_LKUP.get(wc_fill(star_mapped_bam, wc), f"can't_find_bam_{wc}"),
# index=lambda wc: BAM_IDX_LKUP.get(wc_fill(star_mapped_bam, wc), f"can't find_idx_{wc}"),
unpack(get_map_inputs)
unpack(get_map_inputs),
loaded_flag=get_star_flag(star_index_loaded)
# bam=lambda wc: BAM_DEP_LKUP.get(wc_fill(star_mapped_bam, wc), f"can't_find_bam_{wc}"),
# index=lambda wc: BAM_IDX_LKUP.get(wc_fill(star_mapped_bam, wc), f"can't find_idx_{wc}"),
output:
Expand Down Expand Up @@ -559,43 +583,36 @@ rule load_genome:
star_index_file
output:
temp(touch(star_index_loaded)),
temp(directory(star_index_log_location))
params:
f_locked_current=lambda wc: expand(star_index_locked_current, ref_name=wc.ref_name, species=wc.species),
log_dir=lambda wc: expand(star_index_log_location, ref_name=wc.ref_name, species=wc.species)
shell:
"""
STAR --genomeLoad Remove --genomeDir {input[0]} --outFileNamePrefix {output[1]}/ || echo "Could not remove shared memory genome for {input[0]}"
STAR --genomeLoad LoadAndExit --genomeDir {input[0]} --outFileNamePrefix {output[1]}/
touch {params.f_locked_current}
STAR --genomeLoad LoadAndExit --genomeDir {input[0]} --outFileNamePrefix {params.log_dir}/ || echo "Could not load genome into shared memory for {input[0]} - maybe already loaded"
"""

def get_star_unloaded_flag(default_strategy="STAR:genome:final"):
out_files = []

for index, row in project_df.df.iterrows():
map_strategy = getattr(row, "map_strategy", default_strategy)
map_rules, _ = mapstr_to_targets(map_strategy, left=ubam_input, final=final_target)
is_merged = project_df.get_metadata(
"is_merged", project_id=index[0], sample_id=index[1]
)
if is_merged:
continue

for mr in map_rules:
if mr.mapper == "STAR":
out_files += expand(star_index_unloaded, species=row.species, ref_name=mr.ref_name)

return set(out_files)

register_module_output_hook(get_star_unloaded_flag, "mapping.smk")

rule unload_genome_flag:
input:
get_star_flag(star_index_unloaded)

rule unload_genome:
input:
bams=get_mapped_BAM_output(),
loaded_flag=star_index_loaded,
bams=ancient(get_mapped_BAM_output()),
index_dir=star_index, # we put last so it is accessible
output:
temp(touch(star_index_unloaded)),
temp(directory(star_index_log_location))
params:
f_locked=lambda wc: expand(star_index_locked, ref_name=wc.ref_name, species=wc.species),
f_locked_current=lambda wc: expand(star_index_locked_current, ref_name=wc.ref_name, species=wc.species),
log_dir=lambda wc: expand(star_index_log_location, ref_name=wc.ref_name, species=wc.species)
shell:
"""
STAR --genomeLoad Remove --genomeDir {input.index_dir} --outFileNamePrefix {output[1]}/
rm {params.f_locked_current}
if ls {params.f_locked}* 1> /dev/null 2>&1;
then
echo 'There are other tasks waiting for the STAR shared memory index. Not removing from {params.f_locked_current}'
else
STAR --genomeLoad Remove --genomeDir {input.index_dir} --outFileNamePrefix {params.log_dir}/ || echo "Could not remove genome from shared memory for {input[0]}"
fi
"""

0 comments on commit 41c65ef

Please sign in to comment.