Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VS-1466 restructure export vcf for copy reasons #8960

Merged
merged 10 commits into from
Aug 28, 2024
137 changes: 80 additions & 57 deletions scripts/variantstore/wdl/GvsExtractCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ workflow GvsExtractCallset {
String interval_filename = basename(SplitIntervals.interval_files[i])
String vcf_filename = if (zero_pad_output_vcf_filenames) then sub(interval_filename, ".interval_list", "") else "~{output_file_base_name}_${i}"


call ExtractTask {
input:
go = select_first([ValidateFilterSetName.done, true]),
Expand Down Expand Up @@ -245,7 +244,6 @@ workflow GvsExtractCallset {
filter_set_name = filter_set_name,
drop_state = drop_state,
output_file = vcf_filename + vcf_extension,
output_gcs_dir = output_gcs_dir,
max_last_modified_timestamp = GetBQTablesMaxLastModifiedTimestamp.max_last_modified_timestamp,
extract_preemptible_override = extract_preemptible_override,
extract_maxretries_override = extract_maxretries_override,
Expand All @@ -259,6 +257,7 @@ workflow GvsExtractCallset {
maximum_alternate_alleles = maximum_alternate_alleles,
target_interval_list = target_interval_list,
}

}

call SumBytes {
Expand All @@ -267,9 +266,13 @@ workflow GvsExtractCallset {
cloud_sdk_docker = effective_cloud_sdk_docker,
}

call CreateManifest {
call CreateManifestAndOptionallyCopyOutputs {
input:
manifest_lines = ExtractTask.manifest,
interval_indices = ExtractTask.interval_number,
output_vcfs = ExtractTask.output_vcf,
output_vcf_indices = ExtractTask.output_vcf_index,
output_vcf_bytes = ExtractTask.output_vcf_bytes,
output_vcf_index_bytes = ExtractTask.output_vcf_index_bytes,
output_gcs_dir = output_gcs_dir,
cloud_sdk_docker = effective_cloud_sdk_docker,
}
Expand All @@ -295,7 +298,7 @@ workflow GvsExtractCallset {
Array[File] output_vcf_indexes = ExtractTask.output_vcf_index
Array[File] output_vcf_interval_files = SplitIntervals.interval_files
Float total_vcfs_size_mb = SumBytes.total_mb
File manifest = CreateManifest.manifest
File manifest = CreateManifestAndOptionallyCopyOutputs.manifest
File sample_name_list = GenerateSampleListFile.sample_name_list
String recorded_git_hash = effective_git_hash
Boolean done = true
Expand Down Expand Up @@ -328,7 +331,6 @@ task ExtractTask {
String? vet_extract_table_version
String read_project_id
String output_file
String? output_gcs_dir

String cost_observability_tablename = "cost_observability"

Expand Down Expand Up @@ -448,28 +450,9 @@ task ExtractTask {
-V ${pre_off_target_vcf}
fi

# Drop trailing slash if one exists
OUTPUT_GCS_DIR=$(echo ~{output_gcs_dir} | sed 's/\/$//')
du -b ~{output_file} | cut -f1 > vcf_bytes.txt
du -b ~{output_file}.tbi | cut -f1 > vcf_index_bytes.txt

OUTPUT_FILE_BYTES="$(du -b ~{output_file} | cut -f1)"
echo ${OUTPUT_FILE_BYTES} > vcf_bytes.txt

OUTPUT_FILE_INDEX_BYTES="$(du -b ~{output_file}.tbi | cut -f1)"
echo ${OUTPUT_FILE_INDEX_BYTES} > vcf_index_bytes.txt

if [ -n "${OUTPUT_GCS_DIR}" ]; then
gsutil cp ~{output_file} ${OUTPUT_GCS_DIR}/
gsutil cp ~{output_file}.tbi ${OUTPUT_GCS_DIR}/
OUTPUT_FILE_DEST="${OUTPUT_GCS_DIR}/~{output_file}"
OUTPUT_FILE_INDEX_DEST="${OUTPUT_GCS_DIR}/~{output_file}.tbi"
else
OUTPUT_FILE_DEST="~{output_file}"
OUTPUT_FILE_INDEX_DEST="~{output_file}.tbi"
fi

# Parent Task will collect manifest lines and create a joined file
# Currently, the schema is `[interval_number], [output_file_location], [output_file_size_bytes], [output_file_index_location], [output_file_size_bytes]`
echo ~{interval_index},${OUTPUT_FILE_DEST},${OUTPUT_FILE_BYTES},${OUTPUT_FILE_INDEX_DEST},${OUTPUT_FILE_INDEX_BYTES} >> manifest.txt
>>>
runtime {
docker: gatk_docker
Expand All @@ -484,53 +467,98 @@ task ExtractTask {

# files sizes are floats instead of ints because they can be larger
output {
Int interval_number = interval_index
File output_vcf = "~{output_file}"
Float output_vcf_bytes = read_float("vcf_bytes.txt")
File output_vcf_index = "~{output_file}.tbi"
Float output_vcf_index_bytes = read_float("vcf_index_bytes.txt")
String manifest = read_string("manifest.txt")
File monitoring_log = "monitoring.log"
}
}

task SumBytes {
task CreateManifestAndOptionallyCopyOutputs {
input {
Array[Float] file_sizes_bytes
Array[Int] interval_indices
Array[File] output_vcfs
Array[File] output_vcf_indices
Array[Float] output_vcf_bytes
Array[Float] output_vcf_index_bytes
String? output_gcs_dir
String cloud_sdk_docker
}
meta {
# Not `volatile: true` since there shouldn't be a need to re-run this if there has already been a successful execution.
output_vcfs: {
localization_optional: true
}
output_vcf_indices: {
localization_optional: true
}
}

command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace

echo "~{sep=" " file_sizes_bytes}" | tr " " "\n" | python3 -c "
import sys;
total_bytes = sum(float(i.strip()) for i in sys.stdin);
total_mb = total_bytes/10**6;
print(total_mb);"
# Drop trailing slash if one exists
OUTPUT_GCS_DIR=$(echo ~{output_gcs_dir} | sed 's/\/$//')

declare -a interval_indices=(~{sep=' ' interval_indices})
declare -a output_vcfs=(~{sep=' ' output_vcfs})
declare -a output_vcf_indices=(~{sep=' ' output_vcf_indices})
declare -a output_vcf_bytes=(~{sep=' ' output_vcf_bytes})
declare -a output_vcf_index_bytes=(~{sep=' ' output_vcf_index_bytes})

echo -n >> manifest_lines.txt
for (( i=0; i<${#interval_indices[@]}; ++i));
do
echo "Interval " + $i

OUTPUT_VCF=${output_vcfs[$i]}
LOCAL_VCF=$(basename $OUTPUT_VCF)
OUTPUT_VCF_INDEX=${output_vcf_indices[$i]}
LOCAL_VCF_INDEX=$(basename $OUTPUT_VCF_INDEX)

if [ -n "${OUTPUT_GCS_DIR}" ]; then
gsutil cp $OUTPUT_VCF ${OUTPUT_GCS_DIR}/
gsutil cp $OUTPUT_VCF_INDEX ${OUTPUT_GCS_DIR}/
OUTPUT_FILE_DEST="${OUTPUT_GCS_DIR}/$LOCAL_VCF"
OUTPUT_FILE_INDEX_DEST="${OUTPUT_GCS_DIR}/$LOCAL_VCF_INDEX"
else
OUTPUT_FILE_DEST=$LOCAL_VCF
OUTPUT_FILE_INDEX_DEST=$LOCAL_VCF_INDEX
fi

echo ${interval_indices[$i]},${OUTPUT_FILE_DEST},${output_vcf_bytes[$i]},${OUTPUT_FILE_INDEX_DEST},${output_vcf_index_bytes[$i]} >> manifest_lines.txt

done;

echo "vcf_file_location, vcf_file_bytes, vcf_index_location, vcf_index_bytes" >> manifest.txt
sort -n manifest_lines.txt | cut -d',' -f 2- >> manifest.txt

if [ -n "$OUTPUT_GCS_DIR" ]; then
gsutil cp manifest.txt ${OUTPUT_GCS_DIR}/
fi
>>>
output {
File manifest_lines = "manifest_lines.txt"
File manifest = "manifest.txt"
}

runtime {
docker: cloud_sdk_docker
memory: "3 GB"
disks: "local-disk 500 HDD"
preemptible: 3
cpu: 1
}

output {
Float total_mb = read_float(stdout())
}
}

task CreateManifest {
task SumBytes {
input {
Array[String] manifest_lines
String? output_gcs_dir
String cloud_sdk_docker
Array[Float] file_sizes_bytes
String cloud_sdk_docker
}
meta {
# Not `volatile: true` since there shouldn't be a need to re-run this if there has already been a successful execution.
Expand All @@ -541,28 +569,23 @@ task CreateManifest {
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace

MANIFEST_LINES_TXT=~{write_lines(manifest_lines)}
echo "vcf_file_location, vcf_file_bytes, vcf_index_location, vcf_index_bytes" >> manifest.txt
sort -n ${MANIFEST_LINES_TXT} | cut -d',' -f 2- >> manifest.txt

# Drop trailing slash if one exists
OUTPUT_GCS_DIR=$(echo ~{output_gcs_dir} | sed 's/\/$//')

if [ -n "$OUTPUT_GCS_DIR" ]; then
gsutil cp manifest.txt ${OUTPUT_GCS_DIR}/
fi
echo "~{sep=" " file_sizes_bytes}" | tr " " "\n" | python3 -c "
import sys;
total_bytes = sum(float(i.strip()) for i in sys.stdin);
total_mb = total_bytes/10**6;
print(total_mb);"
>>>
output {
File manifest = "manifest.txt"
}

runtime {
docker: cloud_sdk_docker
memory: "3 GB"
disks: "local-disk 500 HDD"
preemptible: 3
cpu: 1
}

output {
Float total_mb = read_float(stdout())
}
}

task GenerateSampleListFile {
Expand Down
Loading