diff --git a/src/index_registry.cpp b/src/index_registry.cpp index cd23c75400c..2921262405e 100644 --- a/src/index_registry.cpp +++ b/src/index_registry.cpp @@ -222,7 +222,7 @@ vector each_approx_graph_memory(const vector& fasta_filenames, auto n = max(fasta_filenames.size(), vcf_filenames.size()); assert(fasta_filenames.size() == 1 || fasta_filenames.size() == n); - assert(vcf_filenames.size() == 1 || vcf_filenames.size() == n); + assert(vcf_filenames.empty() || vcf_filenames.size() == 1 || vcf_filenames.size() == n); double total_ref_size = 0; vector ref_sizes(fasta_filenames.size()); @@ -243,15 +243,21 @@ vector each_approx_graph_memory(const vector& fasta_filenames, for (int64_t i = 0; i < n; ++i) { double ref_size, var_count; - if (vcf_filenames.size() == 1) { + if (vcf_filenames.empty()) { + var_count = 0; + } + else if (vcf_filenames.size() == 1) { var_count = total_var_count * (ref_sizes[i] / total_ref_size); } else { var_count = var_counts[i]; } - if (fasta_filenames.size() == 1) { + if (fasta_filenames.size() == 1 && total_var_count != 0.0) { ref_size = total_ref_size * (var_counts[i] / total_var_count); } + else if (fasta_filenames.size() == 1) { + ref_size = total_ref_size; + } else { ref_size = ref_sizes[i]; } @@ -561,6 +567,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing, + bool has_gff, bool phased_vcf) { if (IndexingParameters::verbosity != IndexingParameters::None) { @@ -571,31 +578,37 @@ IndexRegistry VGIndexes::get_vg_index_registry() { assert(inputs.size() == 2 || inputs.size() == 3); assert(constructing.size() == 2 || constructing.size() == 3); assert(constructing.size() == inputs.size()); - bool chunking_tx = inputs.size() == 3; vector fasta_filenames, vcf_filenames, tx_filenames; + bool has_vcf = inputs.size() == 3 || (inputs.size() == 2 && !has_gff); { int i = 0; - if (chunking_tx) { + if (has_gff) { tx_filenames = inputs[i++]->get_filenames(); } fasta_filenames = inputs[i++]->get_filenames(); - vcf_filenames = inputs[i++]->get_filenames(); + if (has_vcf) { + vcf_filenames = inputs[i++]->get_filenames(); + } } vector> all_outputs(constructing.size()); string output_fasta, output_vcf, output_tx; { auto it = constructing.begin(); - if (chunking_tx) { + if (has_gff) { output_tx = *it; ++it; } output_fasta = *it; ++it; - output_vcf = *it; + if (has_vcf) { + output_vcf = *it; + } } - auto& output_fasta_names = all_outputs[chunking_tx ? 1 : 0]; - auto& output_vcf_names = all_outputs[chunking_tx ? 2 : 1]; + auto& output_fasta_names = all_outputs[has_gff ? 1 : 0]; +#ifdef debug_index_registry_recipes + cerr << "chunking with vcf? " << has_vcf << ", with gff? " << has_gff << endl; +#endif // let's do this first, since it can detect data problems @@ -770,11 +783,12 @@ IndexRegistry VGIndexes::get_vg_index_registry() { ceil(IndexingParameters::thread_chunk_inflation_factor * num_threads)); int num_buckets = 0; int groups_without_bucket = contig_groups.size(); + size_t num_sample_groups = max(contig_groups.size(), 1); // buckets of contigs, grouped by sample groups - vector>> sample_group_buckets(contig_groups.size()); + vector>> sample_group_buckets(num_sample_groups); // records of (total length, bucket index), grouped by sample gorups vector, vector>, - greater>>> bucket_queues(contig_groups.size()); + greater>>> bucket_queues(num_sample_groups); while (!seq_queue.empty()) { int64_t length; string seq_name; @@ -877,8 +891,10 @@ IndexRegistry VGIndexes::get_vg_index_registry() { } #endif - if (buckets.size() == fasta_filenames.size() && buckets.size() == vcf_filenames.size() - && (tx_filenames.empty() || buckets.size() == tx_filenames.size())) { + + if (buckets.size() == fasta_filenames.size() + && (!has_vcf || buckets.size() == vcf_filenames.size()) + && (!has_gff || buckets.size() == tx_filenames.size())) { // it looks like we might have just recapitulated the original chunking, let's check to make sure // does each bucket come from exactly one FASTA file? @@ -898,23 +914,25 @@ IndexRegistry VGIndexes::get_vg_index_registry() { } } + // TODO: shouldn't I also check the correspondence on the input GTFs/VCFs? + if (all_buckets_match) { // there's no need for chunking, just alias them #ifdef debug_index_registry_recipes cerr << "chunking matches input files, no need to re-chunk" << endl; #endif - output_fasta_names = fasta_filenames; - output_vcf_names = vcf_filenames; - if (chunking_tx) { + if (has_gff) { all_outputs[0] = tx_filenames; alias_graph.register_alias(output_tx, inputs[0]); - alias_graph.register_alias(output_fasta, inputs[1]); - alias_graph.register_alias(output_vcf, inputs[2]); } - else { - alias_graph.register_alias(output_fasta, inputs[0]); - alias_graph.register_alias(output_vcf, inputs[1]); + + output_fasta_names = fasta_filenames; + alias_graph.register_alias(output_fasta, inputs[has_gff ? 1 : 0]); + + if (has_vcf) { + all_outputs[all_outputs.size() - 1] = vcf_filenames; + alias_graph.register_alias(output_vcf, inputs[inputs.size() - 1]); } return all_outputs; } @@ -925,7 +943,9 @@ IndexRegistry VGIndexes::get_vg_index_registry() { } output_fasta_names.resize(buckets.size()); - output_vcf_names.resize(buckets.size()); + if (has_vcf) { + all_outputs[all_outputs.size() - 1].resize(buckets.size()); + } // make FASTA sequences for each bucket // the threading here gets to be pretty simple because the fai allows random access @@ -998,489 +1018,494 @@ IndexRegistry VGIndexes::get_vg_index_registry() { } } - // see if we can identify any chunked VCFs that are identical to our input VCFs - // records of (input vcf index, bucket index) - vector> copiable_vcfs; - for (int64_t i = 0; i < buckets.size(); ++i) { - int64_t prev_vcf_idx = -1; - int64_t count = 0; - for (auto& contig : buckets[i]) { - if (contig_to_vcf_idx.count(contig.first)) { - int64_t vcf_idx = contig_to_vcf_idx[contig.first]; - if (prev_vcf_idx == -1 || prev_vcf_idx == vcf_idx) { - prev_vcf_idx = vcf_idx; - count++; - } - else { - // we've seen a second input VCF, mark a sentinel and stop looking - count = -1; - break; + if (has_vcf) { + + auto& output_vcf_names = all_outputs.back(); + + // see if we can identify any chunked VCFs that are identical to our input VCFs + + // records of (input vcf index, bucket index) + vector> copiable_vcfs; + for (int64_t i = 0; i < buckets.size(); ++i) { + int64_t prev_vcf_idx = -1; + int64_t count = 0; + for (auto& contig : buckets[i]) { + if (contig_to_vcf_idx.count(contig.first)) { + int64_t vcf_idx = contig_to_vcf_idx[contig.first]; + if (prev_vcf_idx == -1 || prev_vcf_idx == vcf_idx) { + prev_vcf_idx = vcf_idx; + count++; + } + else { + // we've seen a second input VCF, mark a sentinel and stop looking + count = -1; + break; + } } } + if (prev_vcf_idx >= 0 && count == vcf_contigs_with_variants[prev_vcf_idx].size()) { + // we saw all and only contigs from one VCF, we can just copy it + copiable_vcfs.emplace_back(prev_vcf_idx, i); + } } - if (prev_vcf_idx >= 0 && count == vcf_contigs_with_variants[prev_vcf_idx].size()) { - // we saw all and only contigs from one VCF, we can just copy it - copiable_vcfs.emplace_back(prev_vcf_idx, i); - } - } - + #ifdef debug_index_registry_recipes - cerr << "identified " << copiable_vcfs.size() << " copiable VCFs:" << endl; - for (const auto& copiable_vcf : copiable_vcfs) { - cerr << "\tinput " << copiable_vcf.first << " " << vcf_filenames[copiable_vcf.first] << " -> bucket " << copiable_vcf.second << endl; - } -#endif - - output_vcf_names.resize(buckets.size()); - - // check if we can do a sort-of-aliasing for VCFs, since they're the most time- - // consuming part of the chunking - if (copiable_vcfs.size() == vcf_filenames.size()) { - // all of the input VCFs could be copied to 1 bucket, so we'll just alias - // them and make dummies for the rest - for (auto vcf_copy : copiable_vcfs) { - int64_t input_idx, output_idx; - tie(input_idx, output_idx) = vcf_copy; - output_vcf_names[output_idx] = vcf_filenames[input_idx]; + cerr << "identified " << copiable_vcfs.size() << " copiable VCFs:" << endl; + for (const auto& copiable_vcf : copiable_vcfs) { + cerr << "\tinput " << copiable_vcf.first << " " << vcf_filenames[copiable_vcf.first] << " -> bucket " << copiable_vcf.second << endl; } - for (int64_t i = 0; i < output_vcf_names.size(); ++i) { - if (output_vcf_names[i].empty()) { - // this bucket didn't receive a VCF chunk, let's make a dummy VCF for it - auto output_vcf_name = plan->output_filepath(output_vcf, i, buckets.size()); - htsFile* vcf = bcf_open(output_vcf_name.c_str(), "wz"); - bcf_hdr_t* header = bcf_hdr_init("w"); - // this is to satisfy HaplotypeIndexer, which doesn't like sample-less VCFs - if (phased_vcf) { - int sample_add_code = bcf_hdr_add_sample(header, "dummy"); - if (sample_add_code != 0) { - cerr << "error:[IndexRegistry] error initializing VCF header" << endl; +#endif + output_vcf_names.resize(buckets.size()); + + // check if we can do a sort-of-aliasing for VCFs, since they're the most time- + // consuming part of the chunking + if (copiable_vcfs.size() == vcf_filenames.size()) { + // all of the input VCFs could be copied to 1 bucket, so we'll just alias + // them and make dummies for the rest + for (auto vcf_copy : copiable_vcfs) { + int64_t input_idx, output_idx; + tie(input_idx, output_idx) = vcf_copy; + output_vcf_names[output_idx] = vcf_filenames[input_idx]; + } + for (int64_t i = 0; i < output_vcf_names.size(); ++i) { + if (output_vcf_names[i].empty()) { + // this bucket didn't receive a VCF chunk, let's make a dummy VCF for it + auto output_vcf_name = plan->output_filepath(output_vcf, i, buckets.size()); + htsFile* vcf = bcf_open(output_vcf_name.c_str(), "wz"); + bcf_hdr_t* header = bcf_hdr_init("w"); + // this is to satisfy HaplotypeIndexer, which doesn't like sample-less VCFs + if (phased_vcf) { + int sample_add_code = bcf_hdr_add_sample(header, "dummy"); + if (sample_add_code != 0) { + cerr << "error:[IndexRegistry] error initializing VCF header" << endl; + exit(1); + } + } + int hdr_write_err_code = bcf_hdr_write(vcf, header); + if (hdr_write_err_code != 0) { + cerr << "error:[IndexRegistry] error writing VCF header to " << output_vcf_name << endl; exit(1); } + bcf_hdr_destroy(header); + int close_err_code = hts_close(vcf); + if (close_err_code != 0) { + cerr << "error:[IndexRegistry] encountered error closing VCF " << output_vcf_name << endl; + exit(1); + } + output_vcf_names[i] = output_vcf_name; } - int hdr_write_err_code = bcf_hdr_write(vcf, header); - if (hdr_write_err_code != 0) { - cerr << "error:[IndexRegistry] error writing VCF header to " << output_vcf_name << endl; - exit(1); - } - bcf_hdr_destroy(header); - int close_err_code = hts_close(vcf); - if (close_err_code != 0) { - cerr << "error:[IndexRegistry] encountered error closing VCF " << output_vcf_name << endl; - exit(1); - } - output_vcf_names[i] = output_vcf_name; } - } - // register that this is an alias - alias_graph.register_alias(output_vcf, inputs[chunking_tx ? 2 : 1]); + // register that this is an alias + alias_graph.register_alias(output_vcf, inputs[inputs.size() - 1]); #ifdef debug_index_registry_recipes - cerr << "pseudo-aliased VCFs with filenames:" << endl; - for (const auto& filename : output_vcf_names) { - cerr << "\t" << filename << endl; - } + cerr << "pseudo-aliased VCFs with filenames:" << endl; + for (const auto& filename : output_vcf_names) { + cerr << "\t" << filename << endl; + } #endif - } - else { - - // trackers for whether we can write to a bucket's vcf - vector> bucket_checked_out_or_finished(buckets.size()); - for (int64_t i = 0; i < buckets.size(); ++i) { - bucket_checked_out_or_finished[i].store(false); } - - // if we can copy over a vcf, we don't want to check it out for reading/writing - for (auto copiable_vcf : copiable_vcfs) { - input_checked_out_or_finished[copiable_vcf.first].store(true); - bucket_checked_out_or_finished[copiable_vcf.second].store(true); - } - -#ifdef debug_index_registry_recipes - cerr << "initializing chunked VCFs for output" << endl; -#endif - - // the output files - vector> bucket_vcfs(buckets.size()); - for (int64_t i = 0; i < buckets.size(); ++i) { - auto output_vcf_name = plan->output_filepath(output_vcf, i, buckets.size()); - output_vcf_names[i] = output_vcf_name; + else { - if (bucket_checked_out_or_finished[i].load()) { - // we can copy to make this file, so we don't need to initialize - // a file - continue; + // trackers for whether we can write to a bucket's vcf + vector> bucket_checked_out_or_finished(buckets.size()); + for (int64_t i = 0; i < buckets.size(); ++i) { + bucket_checked_out_or_finished[i].store(false); } - // open to write in bgzipped format - htsFile* vcf_out = bcf_open(output_vcf_name.c_str(), "wz"); - bcf_hdr_t* header_out = bcf_hdr_init("w"); - - // identify which input VCFs we'll be pulling from - unordered_set vcf_indexes; - for (const auto& contig : buckets[i]) { - if (contig_to_vcf_idx.count(contig.first)) { - vcf_indexes.insert(contig_to_vcf_idx[contig.first]); - } + // if we can copy over a vcf, we don't want to check it out for reading/writing + for (auto copiable_vcf : copiable_vcfs) { + input_checked_out_or_finished[copiable_vcf.first].store(true); + bucket_checked_out_or_finished[copiable_vcf.second].store(true); } + #ifdef debug_index_registry_recipes - cerr << "bucket " << i << " will add samples from input VCFs:" << endl; - for (auto j : vcf_indexes) { - cerr << "\t" << j << endl; - } + cerr << "initializing chunked VCFs for output" << endl; #endif - // merge will all the input headers - unordered_set samples_added; - for (auto vcf_idx : vcf_indexes) { + + // the output files + vector> bucket_vcfs(buckets.size()); + for (int64_t i = 0; i < buckets.size(); ++i) { + auto output_vcf_name = plan->output_filepath(output_vcf, i, buckets.size()); + output_vcf_names[i] = output_vcf_name; - auto input_vcf_file = input_vcf_files[vcf_idx]; - bcf_hdr_t* header_in = get<1>(input_vcf_file); - header_out = bcf_hdr_merge(header_out, header_in); - if (header_out == nullptr) { - cerr << "error:[IndexRegistry] error merging VCF header" << endl; - exit(1); + if (bucket_checked_out_or_finished[i].load()) { + // we can copy to make this file, so we don't need to initialize + // a file + continue; } - // add the samples from every header - for (int64_t j = 0; j < bcf_hdr_nsamples(header_in); ++j) { - const char* sample = header_in->samples[j]; - if (!samples_added.count(sample)) { - // TODO: the header has its own dictionary, so this shouldn't be necessary, - // but the khash_t isn't documented very well - samples_added.insert(sample); - // the sample hasn't been added yet - int sample_err_code = bcf_hdr_add_sample(header_out, header_in->samples[j]); - // returns a -1 if the sample is already included, which we expect - if (sample_err_code != 0) { - cerr << "error:[IndexRegistry] error adding samples to VCF header" << endl; - exit(1); - } + // open to write in bgzipped format + htsFile* vcf_out = bcf_open(output_vcf_name.c_str(), "wz"); + bcf_hdr_t* header_out = bcf_hdr_init("w"); + + // identify which input VCFs we'll be pulling from + unordered_set vcf_indexes; + for (const auto& contig : buckets[i]) { + if (contig_to_vcf_idx.count(contig.first)) { + vcf_indexes.insert(contig_to_vcf_idx[contig.first]); } } - } - - // documentation in htslib/vcf.h says that this has to be called after adding samples - int sync_err_code = bcf_hdr_sync(header_out); - if (sync_err_code != 0) { - cerr << "error:[IndexRegistry] error syncing VCF header" << endl; - exit(1); - } - if (phased_vcf && bcf_hdr_nsamples(header_out) == 0) { - cerr << "warning:[IndexRegistry] VCF inputs from file(s)"; +#ifdef debug_index_registry_recipes + cerr << "bucket " << i << " will add samples from input VCFs:" << endl; + for (auto j : vcf_indexes) { + cerr << "\t" << j << endl; + } +#endif + // merge will all the input headers + unordered_set samples_added; for (auto vcf_idx : vcf_indexes) { - cerr << " " << vcf_filenames[vcf_idx]; + + auto input_vcf_file = input_vcf_files[vcf_idx]; + bcf_hdr_t* header_in = get<1>(input_vcf_file); + header_out = bcf_hdr_merge(header_out, header_in); + if (header_out == nullptr) { + cerr << "error:[IndexRegistry] error merging VCF header" << endl; + exit(1); + } + + // add the samples from every header + for (int64_t j = 0; j < bcf_hdr_nsamples(header_in); ++j) { + const char* sample = header_in->samples[j]; + if (!samples_added.count(sample)) { + // TODO: the header has its own dictionary, so this shouldn't be necessary, + // but the khash_t isn't documented very well + samples_added.insert(sample); + // the sample hasn't been added yet + int sample_err_code = bcf_hdr_add_sample(header_out, header_in->samples[j]); + // returns a -1 if the sample is already included, which we expect + if (sample_err_code != 0) { + cerr << "error:[IndexRegistry] error adding samples to VCF header" << endl; + exit(1); + } + } + } } - cerr << " have been identified as phased but contain no samples. Are these valid inputs?" << endl; - // let's add a dummy so that HaplotypeIndexer doesn't get mad later - int sample_add_code = bcf_hdr_add_sample(header_out, "dummy"); - if (sample_add_code != 0) { - cerr << "error:[IndexRegistry] error initializing VCF header" << endl; - exit(1); - } - // and re-sync, not sure if necessary, but it will be cheap regardless - sync_err_code = bcf_hdr_sync(header_out); + // documentation in htslib/vcf.h says that this has to be called after adding samples + int sync_err_code = bcf_hdr_sync(header_out); if (sync_err_code != 0) { cerr << "error:[IndexRegistry] error syncing VCF header" << endl; exit(1); } - } - int hdr_write_err_code = bcf_hdr_write(vcf_out, header_out); - if (hdr_write_err_code != 0) { - cerr << "error:[IndexRegistry] error writing VCF header to " << output_vcf_name << endl; - exit(1); + if (phased_vcf && bcf_hdr_nsamples(header_out) == 0) { + cerr << "warning:[IndexRegistry] VCF inputs from file(s)"; + for (auto vcf_idx : vcf_indexes) { + cerr << " " << vcf_filenames[vcf_idx]; + } + cerr << " have been identified as phased but contain no samples. Are these valid inputs?" << endl; + + // let's add a dummy so that HaplotypeIndexer doesn't get mad later + int sample_add_code = bcf_hdr_add_sample(header_out, "dummy"); + if (sample_add_code != 0) { + cerr << "error:[IndexRegistry] error initializing VCF header" << endl; + exit(1); + } + // and re-sync, not sure if necessary, but it will be cheap regardless + sync_err_code = bcf_hdr_sync(header_out); + if (sync_err_code != 0) { + cerr << "error:[IndexRegistry] error syncing VCF header" << endl; + exit(1); + } + } + int hdr_write_err_code = bcf_hdr_write(vcf_out, header_out); + if (hdr_write_err_code != 0) { + cerr << "error:[IndexRegistry] error writing VCF header to " << output_vcf_name << endl; + exit(1); + } + + // remember these so that we can check them out later + bucket_vcfs[i] = make_pair(vcf_out, header_out); } - // remember these so that we can check them out later - bucket_vcfs[i] = make_pair(vcf_out, header_out); - } - - // the parallel iteration in here is pretty complicated because contigs from - // the input VCFs are being shuffled among the output bucket VCFs, and contigs - // need to be both read and written in lexicographic order. the mutexes here - // let the threads shift between reading and writing from different pairs of VCFs. - // hopefully this high-contention process won't cause too many problems since - // copying each contig takes up a relatively large amount of time - - // a mutex to lock the process of checking whether the next contig the thread - // needs is exposed - mutex input_vcf_mutex; - // a mutex to lock the process of switching to a new bucket - mutex output_vcf_mutex; - // to keep track of which contig in the bucket we're looking for next - vector contig_idx(buckets.size(), 0); - // how many buckets we've finished so far - atomic buckets_finished(0); - vector workers; - for (int64_t i = 0; i < num_threads; ++i) { - // Worker must not capture i; it will be out of scope! - workers.emplace_back([&]() { - int64_t bucket_idx = -1; - while (buckets_finished.load() < buckets.size()) { - // check if any of the input VCFs need to be moved past a contig that isn't - // in our reference - input_vcf_mutex.lock(); - int64_t contig_skip_idx = -1; - for (int64_t j = 0; j < input_vcf_files.size(); ++j) { - if (input_checked_out_or_finished[j].load()) { - continue; + // the parallel iteration in here is pretty complicated because contigs from + // the input VCFs are being shuffled among the output bucket VCFs, and contigs + // need to be both read and written in lexicographic order. the mutexes here + // let the threads shift between reading and writing from different pairs of VCFs. + // hopefully this high-contention process won't cause too many problems since + // copying each contig takes up a relatively large amount of time + + // a mutex to lock the process of checking whether the next contig the thread + // needs is exposed + mutex input_vcf_mutex; + // a mutex to lock the process of switching to a new bucket + mutex output_vcf_mutex; + // to keep track of which contig in the bucket we're looking for next + vector contig_idx(buckets.size(), 0); + // how many buckets we've finished so far + atomic buckets_finished(0); + vector workers; + for (int64_t i = 0; i < num_threads; ++i) { + // Worker must not capture i; it will be out of scope! + workers.emplace_back([&]() { + int64_t bucket_idx = -1; + while (buckets_finished.load() < buckets.size()) { + // check if any of the input VCFs need to be moved past a contig that isn't + // in our reference + input_vcf_mutex.lock(); + int64_t contig_skip_idx = -1; + for (int64_t j = 0; j < input_vcf_files.size(); ++j) { + if (input_checked_out_or_finished[j].load()) { + continue; + } + + const char* chrom = bcf_hdr_id2name(get<1>(input_vcf_files[j]), + get<2>(input_vcf_files[j])->rid); + // check this index over the FASTA sequence lengths for the chromosome + if (!seq_lengths.count(chrom)) { + contig_skip_idx = j; + input_checked_out_or_finished[j].store(true); + } } + input_vcf_mutex.unlock(); - const char* chrom = bcf_hdr_id2name(get<1>(input_vcf_files[j]), - get<2>(input_vcf_files[j])->rid); - // check this index over the FASTA sequence lengths for the chromosome - if (!seq_lengths.count(chrom)) { - contig_skip_idx = j; - input_checked_out_or_finished[j].store(true); - } - } - input_vcf_mutex.unlock(); - - if (contig_skip_idx != -1) { - // we found a contig in the VCF that isn't present in the FASTA, we'll have to skip it - - auto& input_vcf_file = input_vcf_files[contig_skip_idx]; - string skip_contig = bcf_hdr_id2name(get<1>(input_vcf_file), - get<2>(input_vcf_file)->rid); - cerr << "warning:[IndexRegistry] Skipping contig " + skip_contig + ", which is found in VCF(s) but not reference.\n"; - - - // keep reading until end of file or a different contig - int read_err_code = 0; - while (read_err_code >= 0) { - string contig = bcf_hdr_id2name(get<1>(input_vcf_file), - get<2>(input_vcf_file)->rid); - if (contig != skip_contig) { - break; + if (contig_skip_idx != -1) { + // we found a contig in the VCF that isn't present in the FASTA, we'll have to skip it + + auto& input_vcf_file = input_vcf_files[contig_skip_idx]; + string skip_contig = bcf_hdr_id2name(get<1>(input_vcf_file), + get<2>(input_vcf_file)->rid); + cerr << "warning:[IndexRegistry] Skipping contig " + skip_contig + ", which is found in VCF(s) but not reference.\n"; + + + // keep reading until end of file or a different contig + int read_err_code = 0; + while (read_err_code >= 0) { + string contig = bcf_hdr_id2name(get<1>(input_vcf_file), + get<2>(input_vcf_file)->rid); + if (contig != skip_contig) { + break; + } + + read_err_code = bcf_read(get<0>(input_vcf_file), get<1>(input_vcf_file), get<2>(input_vcf_file)); } - read_err_code = bcf_read(get<0>(input_vcf_file), get<1>(input_vcf_file), get<2>(input_vcf_file)); + // check the input back out unless we've finished it + if (read_err_code >= 0) { + input_checked_out_or_finished[contig_skip_idx].store(false); + } + continue; } - // check the input back out unless we've finished it - if (read_err_code >= 0) { - input_checked_out_or_finished[contig_skip_idx].store(false); + // select an output VCF corresponding to a bucket + int64_t copy_from_idx = -1, copy_to_idx = -1; + bool found_bucket = false; + output_vcf_mutex.lock(); + if (!copiable_vcfs.empty()) { + // there are copiable VCFs remaining, do these first + tie(copy_from_idx, copy_to_idx) = copiable_vcfs.back(); + copiable_vcfs.pop_back(); } - continue; - } - - // select an output VCF corresponding to a bucket - int64_t copy_from_idx = -1, copy_to_idx = -1; - bool found_bucket = false; - output_vcf_mutex.lock(); - if (!copiable_vcfs.empty()) { - // there are copiable VCFs remaining, do these first - tie(copy_from_idx, copy_to_idx) = copiable_vcfs.back(); - copiable_vcfs.pop_back(); - } - else { - // start iteration at 1 so we always advance to a new bucket if possible - for (int64_t j = 1; j <= buckets.size(); ++j) { - int64_t next_bucket_idx = (bucket_idx + j) % buckets.size(); - if (!bucket_checked_out_or_finished[next_bucket_idx].load()) { - bucket_checked_out_or_finished[next_bucket_idx].store(true); - bucket_idx = next_bucket_idx; - found_bucket = true; - break; + else { + // start iteration at 1 so we always advance to a new bucket if possible + for (int64_t j = 1; j <= buckets.size(); ++j) { + int64_t next_bucket_idx = (bucket_idx + j) % buckets.size(); + if (!bucket_checked_out_or_finished[next_bucket_idx].load()) { + bucket_checked_out_or_finished[next_bucket_idx].store(true); + bucket_idx = next_bucket_idx; + found_bucket = true; + break; + } } } - } - output_vcf_mutex.unlock(); - - if (copy_from_idx >= 0) { + output_vcf_mutex.unlock(); + + if (copy_from_idx >= 0) { #ifdef debug_index_registry_recipes - cerr << "direct copying " + vcf_filenames[copy_from_idx] + " to " + output_vcf_names[copy_to_idx] + "\n"; + cerr << "direct copying " + vcf_filenames[copy_from_idx] + " to " + output_vcf_names[copy_to_idx] + "\n"; #endif - // we can copy an entire file on this iteration instead of parsing - copy_file(vcf_filenames[copy_from_idx], output_vcf_names[copy_to_idx]); - if (file_exists(vcf_filenames[copy_from_idx] + ".tbi")) { - // there's also a tabix, grab that as well - copy_file(vcf_filenames[copy_from_idx] + ".tbi", output_vcf_names[copy_to_idx] + ".tbi"); + // we can copy an entire file on this iteration instead of parsing + copy_file(vcf_filenames[copy_from_idx], output_vcf_names[copy_to_idx]); + if (file_exists(vcf_filenames[copy_from_idx] + ".tbi")) { + // there's also a tabix, grab that as well + copy_file(vcf_filenames[copy_from_idx] + ".tbi", output_vcf_names[copy_to_idx] + ".tbi"); + } + // this bucket is now totally finished + buckets_finished.fetch_add(1); + continue; } - // this bucket is now totally finished - buckets_finished.fetch_add(1); - continue; - } - - if (!found_bucket) { - // it's now possible for all buckets to be checked out simultaneously - // by other threads, so there's no more need to have this thread running + + if (!found_bucket) { + // it's now possible for all buckets to be checked out simultaneously + // by other threads, so there's no more need to have this thread running #ifdef debug_index_registry_recipes - cerr << "thread exiting\n"; + cerr << "thread exiting\n"; #endif - return; - } - - auto& ctg_idx = contig_idx[bucket_idx]; - - if (!contigs_with_variants.count(buckets[bucket_idx][ctg_idx].first)) { - // this contig doesn't have variants in any of the VCFs, so we skip it - ++ctg_idx; - if (ctg_idx == buckets[bucket_idx].size()) { - buckets_finished.fetch_add(1); - } - else { - bucket_checked_out_or_finished[bucket_idx].store(false); + return; } - continue; - } - - htsFile* vcf_out = bucket_vcfs[bucket_idx].first; - bcf_hdr_t* header_out = bucket_vcfs[bucket_idx].second; - - // check if any of the VCFs' next contig is the next one we want for - // this bucket (and lock other threads out from checking simultaneously) - int64_t input_idx = -1; - input_vcf_mutex.lock(); - for (int64_t j = 0; j < input_vcf_files.size(); ++j) { - if (input_checked_out_or_finished[j].load()) { + + auto& ctg_idx = contig_idx[bucket_idx]; + + if (!contigs_with_variants.count(buckets[bucket_idx][ctg_idx].first)) { + // this contig doesn't have variants in any of the VCFs, so we skip it + ++ctg_idx; + if (ctg_idx == buckets[bucket_idx].size()) { + buckets_finished.fetch_add(1); + } + else { + bucket_checked_out_or_finished[bucket_idx].store(false); + } continue; } - // what is the next contig in this VCF? - const char* chrom = bcf_hdr_id2name(get<1>(input_vcf_files[j]), - get<2>(input_vcf_files[j])->rid); - if (buckets[bucket_idx][ctg_idx].first == chrom) { - input_idx = j; - input_checked_out_or_finished[j].store(true); - break; + htsFile* vcf_out = bucket_vcfs[bucket_idx].first; + bcf_hdr_t* header_out = bucket_vcfs[bucket_idx].second; + + // check if any of the VCFs' next contig is the next one we want for + // this bucket (and lock other threads out from checking simultaneously) + int64_t input_idx = -1; + input_vcf_mutex.lock(); + for (int64_t j = 0; j < input_vcf_files.size(); ++j) { + if (input_checked_out_or_finished[j].load()) { + continue; + } + + // what is the next contig in this VCF? + const char* chrom = bcf_hdr_id2name(get<1>(input_vcf_files[j]), + get<2>(input_vcf_files[j])->rid); + if (buckets[bucket_idx][ctg_idx].first == chrom) { + input_idx = j; + input_checked_out_or_finished[j].store(true); + break; + } } - } - input_vcf_mutex.unlock(); - - if (input_idx < 0) { - // other threads need to get through earlier contigs until this bucket's next - // contig is exposed - bucket_checked_out_or_finished[bucket_idx].store(false); - continue; - } - - // we've checked out one of the input vcfs, now we can read from it - auto& input_vcf_file = input_vcf_files[input_idx]; - - int read_err_code = 0; - while (read_err_code >= 0) { + input_vcf_mutex.unlock(); - const char* chrom = bcf_hdr_id2name(get<1>(input_vcf_file), get<2>(input_vcf_file)->rid); - if (buckets[bucket_idx][ctg_idx].first != chrom) { - break; + if (input_idx < 0) { + // other threads need to get through earlier contigs until this bucket's next + // contig is exposed + bucket_checked_out_or_finished[bucket_idx].store(false); + continue; } - // FIXME: i'm not sure how important it is to handle these malformed VCFs it is -// // read the "END" info field to see if we need to repair it (this seems to be a problem -// // in the grch38 liftover variants from 1kg) -// int32_t* end_dst = NULL; -// int num_end; -// int end_err_code = bcf_get_info_int32(get<1>(input_vcf_file), get<2>(input_vcf_file), "END", -// &end_dst, &num_end); -// if (end_err_code >= 0) { -// // there is an END tag to read -// int64_t end = *end_dst; -// // note: we can query alleles without bcf_unpack, because it will have already -// // unpacked up to info fields -// // calculate it the way the spec says to -// int64_t calc_end = get<2>(input_vcf_file)->pos + strlen(get<2>(input_vcf_file)->d.allele[0]) - 1; -// if (end != calc_end) { -// string msg = "warning:[IndexRegistry] fixing \"END\" of variant " + buckets[bucket_idx][ctg_idx].first + " " + to_string(get<2>(input_vcf_file)->pos) + " from " + to_string(end) + " to " + to_string(calc_end) + "\n"; -//#pragma omp critical -// cerr << msg; -// -// int update_err_code = bcf_update_info_int32(get<1>(input_vcf_file), get<2>(input_vcf_file), "END", -// &calc_end, 1); -// if (update_err_code < 0) { -// cerr << "error:[IndexRegistry] failed to update \"END\"" << endl; -// exit(1); -// } -// } -// free(end_dst); -// } + // we've checked out one of the input vcfs, now we can read from it + auto& input_vcf_file = input_vcf_files[input_idx]; - bcf_translate(header_out, get<1>(input_vcf_file), get<2>(input_vcf_file)); + int read_err_code = 0; + while (read_err_code >= 0) { + + const char* chrom = bcf_hdr_id2name(get<1>(input_vcf_file), get<2>(input_vcf_file)->rid); + if (buckets[bucket_idx][ctg_idx].first != chrom) { + break; + } + + // FIXME: i'm not sure how important it is to handle these malformed VCFs it is + // // read the "END" info field to see if we need to repair it (this seems to be a problem + // // in the grch38 liftover variants from 1kg) + // int32_t* end_dst = NULL; + // int num_end; + // int end_err_code = bcf_get_info_int32(get<1>(input_vcf_file), get<2>(input_vcf_file), "END", + // &end_dst, &num_end); + // if (end_err_code >= 0) { + // // there is an END tag to read + // int64_t end = *end_dst; + // // note: we can query alleles without bcf_unpack, because it will have already + // // unpacked up to info fields + // // calculate it the way the spec says to + // int64_t calc_end = get<2>(input_vcf_file)->pos + strlen(get<2>(input_vcf_file)->d.allele[0]) - 1; + // if (end != calc_end) { + // string msg = "warning:[IndexRegistry] fixing \"END\" of variant " + buckets[bucket_idx][ctg_idx].first + " " + to_string(get<2>(input_vcf_file)->pos) + " from " + to_string(end) + " to " + to_string(calc_end) + "\n"; + //#pragma omp critical + // cerr << msg; + // + // int update_err_code = bcf_update_info_int32(get<1>(input_vcf_file), get<2>(input_vcf_file), "END", + // &calc_end, 1); + // if (update_err_code < 0) { + // cerr << "error:[IndexRegistry] failed to update \"END\"" << endl; + // exit(1); + // } + // } + // free(end_dst); + // } + + bcf_translate(header_out, get<1>(input_vcf_file), get<2>(input_vcf_file)); + + int write_err_code = bcf_write(vcf_out, header_out, get<2>(input_vcf_file)); + if (write_err_code != 0) { + cerr << "error:[IndexRegistry] error writing VCF line to " << output_vcf_names[bucket_idx] << endl; + exit(1); + } + + read_err_code = bcf_read(get<0>(input_vcf_file), get<1>(input_vcf_file), get<2>(input_vcf_file)); + } - int write_err_code = bcf_write(vcf_out, header_out, get<2>(input_vcf_file)); - if (write_err_code != 0) { - cerr << "error:[IndexRegistry] error writing VCF line to " << output_vcf_names[bucket_idx] << endl; + if (read_err_code >= 0) { + // there's still more to read, it's just on different contigs + input_checked_out_or_finished[input_idx].store(false); + } + else if (read_err_code != -1) { + // we encountered a real error + cerr << "error:[IndexRegistry] error reading VCF file " << vcf_filenames[input_idx] << endl; exit(1); } - read_err_code = bcf_read(get<0>(input_vcf_file), get<1>(input_vcf_file), get<2>(input_vcf_file)); - } - - if (read_err_code >= 0) { - // there's still more to read, it's just on different contigs - input_checked_out_or_finished[input_idx].store(false); - } - else if (read_err_code != -1) { - // we encountered a real error - cerr << "error:[IndexRegistry] error reading VCF file " << vcf_filenames[input_idx] << endl; - exit(1); - } - - // we finished this contig - ++ctg_idx; - if (ctg_idx == buckets[bucket_idx].size()) { - buckets_finished.fetch_add(1); - } - else { - bucket_checked_out_or_finished[bucket_idx].store(false); + // we finished this contig + ++ctg_idx; + if (ctg_idx == buckets[bucket_idx].size()) { + buckets_finished.fetch_add(1); + } + else { + bucket_checked_out_or_finished[bucket_idx].store(false); + } } + }); + } + + // barrier sync + for (auto& worker : workers) { + worker.join(); + } + + // close out files + for (int64_t i = 0; i < input_vcf_files.size(); ++i) { + auto vcf_file = input_vcf_files[i]; + bcf_destroy(get<2>(vcf_file)); + bcf_hdr_destroy(get<1>(vcf_file)); + int err_code = hts_close(get<0>(vcf_file)); + if (err_code != 0) { + cerr << "error:[IndexRegistry] encountered error closing VCF " << vcf_filenames[i] << endl; + exit(1); + } + } + for (int64_t i = 0; i < bucket_vcfs.size(); ++i) { + if (!bucket_vcfs[i].second) { + // we didn't open this VCF (probably because we just copied it) + continue; + } + bcf_hdr_destroy(bucket_vcfs[i].second); + int close_err_code = hts_close(bucket_vcfs[i].first); + if (close_err_code != 0) { + cerr << "error:[IndexRegistry] encountered error closing VCF " << output_vcf_names[i] << endl; + exit(1); } - }); - } - - // barrier sync - for (auto& worker : workers) { - worker.join(); - } - - // close out files - for (int64_t i = 0; i < input_vcf_files.size(); ++i) { - auto vcf_file = input_vcf_files[i]; - bcf_destroy(get<2>(vcf_file)); - bcf_hdr_destroy(get<1>(vcf_file)); - int err_code = hts_close(get<0>(vcf_file)); - if (err_code != 0) { - cerr << "error:[IndexRegistry] encountered error closing VCF " << vcf_filenames[i] << endl; - exit(1); } } - for (int64_t i = 0; i < bucket_vcfs.size(); ++i) { - if (!bucket_vcfs[i].second) { - // we didn't open this VCF (probably because we just copied it) + + // TODO: move this into the same work queue as the rest of the VCF chunking? + // tabix index +#pragma omp parallel for schedule(dynamic, 1) + for (int64_t i = 0; i < buckets.size(); ++i) { + // tabix-index the bgzipped VCF we just wrote + + if (file_exists(output_vcf_names[i] + ".tbi")) { + // the tabix already exists continue; } - bcf_hdr_destroy(bucket_vcfs[i].second); - int close_err_code = hts_close(bucket_vcfs[i].first); - if (close_err_code != 0) { - cerr << "error:[IndexRegistry] encountered error closing VCF " << output_vcf_names[i] << endl; + + // parameters inferred from tabix main's sourcecode + int min_shift = 0; + tbx_conf_t conf = tbx_conf_vcf; + int tabix_err_code = tbx_index_build(output_vcf_names[i].c_str(), min_shift, &conf); + if (tabix_err_code == -2) { + cerr << "error:[IndexRegistry] output VCF is not bgzipped: " << output_vcf_names[i] << endl; exit(1); } + else if (tabix_err_code != 0) { + cerr << "warning:[IndexRegistry] could not tabix index VCF " + output_vcf_names[i] + "\n"; + } } } - // TODO: move this into the same work queue as the rest of the VCF chunking? - // tabix index -#pragma omp parallel for schedule(dynamic, 1) - for (int64_t i = 0; i < buckets.size(); ++i) { - // tabix-index the bgzipped VCF we just wrote - - if (file_exists(output_vcf_names[i] + ".tbi")) { - // the tabix already exists - continue; - } - - // parameters inferred from tabix main's sourcecode - int min_shift = 0; - tbx_conf_t conf = tbx_conf_vcf; - int tabix_err_code = tbx_index_build(output_vcf_names[i].c_str(), min_shift, &conf); - if (tabix_err_code == -2) { - cerr << "error:[IndexRegistry] output VCF is not bgzipped: " << output_vcf_names[i] << endl; - exit(1); - } - else if (tabix_err_code != 0) { - cerr << "warning:[IndexRegistry] could not tabix index VCF " + output_vcf_names[i] + "\n"; - } - } - - if (chunking_tx) { + if (has_gff) { if (IndexingParameters::verbosity != IndexingParameters::None) { cerr << "[IndexRegistry]: Chunking GTF/GFF(s)." << endl; @@ -1609,28 +1634,35 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return chunk_contigs(inputs, plan, alias_graph, constructing, true); + return chunk_contigs(inputs, plan, alias_graph, constructing, true, true); }); registry.register_recipe({"Chunked GTF/GFF", "Chunked Reference FASTA", "Chunked VCF"}, {"GTF/GFF", "Reference FASTA", "VCF"}, [=](const vector& inputs, const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return chunk_contigs(inputs, plan, alias_graph, constructing, false); + return chunk_contigs(inputs, plan, alias_graph, constructing, true, false); }); registry.register_recipe({"Chunked Reference FASTA", "Chunked VCF w/ Phasing"}, {"Reference FASTA", "VCF w/ Phasing"}, [=](const vector& inputs, const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return chunk_contigs(inputs, plan, alias_graph, constructing, true); + return chunk_contigs(inputs, plan, alias_graph, constructing, false, true); }); registry.register_recipe({"Chunked Reference FASTA", "Chunked VCF"}, {"Reference FASTA", "VCF"}, [=](const vector& inputs, const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return chunk_contigs(inputs, plan, alias_graph, constructing, false); + return chunk_contigs(inputs, plan, alias_graph, constructing, false, false); + }); + registry.register_recipe({"Chunked GTF/GFF", "Chunked Reference FASTA"}, {"GTF/GFF", "Reference FASTA"}, + [=](const vector& inputs, + const IndexingPlan* plan, + AliasGraph& alias_graph, + const IndexGroup& constructing) { + return chunk_contigs(inputs, plan, alias_graph, constructing, true, false); }); @@ -1799,14 +1831,19 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, const IndexGroup& constructing, bool alt_paths, - bool has_transcripts) { + bool has_transcripts, + bool has_variants) { if (IndexingParameters::verbosity != IndexingParameters::None) { cerr << "[IndexRegistry]: Constructing"; if (has_transcripts) { cerr << " spliced"; } - cerr << " VG graph from FASTA and VCF input." << endl; + cerr << " VG graph from FASTA"; + if (has_variants) { + cerr << " and VCF"; + } + cerr << " input." << endl; } assert(constructing.size() == 2); @@ -1817,16 +1854,11 @@ IndexRegistry VGIndexes::get_vg_index_registry() { auto& graph_names = all_outputs[1]; bool has_ins_fasta = false; - if (!has_transcripts) { - assert(inputs.size() == 2 || inputs.size() == 3); - has_ins_fasta = (inputs.size() == 3); - } - else { - assert(inputs.size() == 3 || inputs.size() == 4); - has_ins_fasta = (inputs.size() == 4); + if (1 + int(has_transcripts) + int(has_variants) != inputs.size()) { + assert(2 + int(has_transcripts) + int(has_variants) == inputs.size()); + has_ins_fasta = true; } - - + // unpack the inputs vector ref_filenames, vcf_filenames, insertions, transcripts; { @@ -1835,7 +1867,9 @@ IndexRegistry VGIndexes::get_vg_index_registry() { transcripts = inputs[i++]->get_filenames(); } ref_filenames = inputs[i++]->get_filenames(); - vcf_filenames = inputs[i++]->get_filenames(); + if (has_variants) { + vcf_filenames = inputs[i++]->get_filenames(); + } if (has_ins_fasta) { insertions = inputs[i++]->get_filenames(); } @@ -1851,33 +1885,34 @@ IndexRegistry VGIndexes::get_vg_index_registry() { FastaReference ins_ref; ins_ref.open(insertions.front()); } - - if (ref_filenames.size() != 1 && vcf_filenames.size() != 1 && + + if (has_variants && ref_filenames.size() != 1 && vcf_filenames.size() != 1 && ref_filenames.size() != vcf_filenames.size()) { - cerr << "[IndexRegistry]: When constructing graph from multiple FASTAs and multiple VCFs, the FASTAs and VCFs must be matched 1-to-1, but input contains " << inputs[0]->get_filenames().size() << " FASTA files and " << inputs[1]->get_filenames().size() << " VCF files." << endl; + cerr << "[IndexRegistry]: When constructing graph from multiple FASTAs and multiple VCFs, the FASTAs and VCFs must be matched 1-to-1, but input contains " << ref_filenames.size() << " FASTA files and " << vcf_filenames.size() << " VCF files." << endl; exit(1); } - if (has_transcripts) { - if ((transcripts.size() != 1 && vcf_filenames.size() != 1 && - transcripts.size() != vcf_filenames.size()) || - (transcripts.size() != 1 && ref_filenames.size() != 1 && - transcripts.size() != ref_filenames.size())) { - cerr << "[IndexRegistry]: When constructing graph from multiple GTF/GFFs and multiple FASTAs or VCFs, the GTF/GFFs and the FASTAs/VCFs must be matched 1-to-1, but input contains " << transcripts.size() << " GTF/GFF files, " << ref_filenames.size() << " FASTA files, and " << vcf_filenames.size() << " VCF files." << endl; - exit(1); - } + if (has_transcripts && transcripts.size() != 1 && ref_filenames.size() != 1 && + transcripts.size() != ref_filenames.size()) { + cerr << "[IndexRegistry]: When constructing graph from multiple GTF/GFFs and multiple FASTAs, the GTF/GFFs and the FASTAs must be matched 1-to-1, but input contains " << transcripts.size() << " GTF/GFF files and " << ref_filenames.size() << " FASTA files." << endl; + exit(1); + } + if (has_transcripts && has_variants && transcripts.size() != 1 && vcf_filenames.size() != 1 && + transcripts.size() != vcf_filenames.size()) { + cerr << "[IndexRegistry]: When constructing graph from multiple GTF/GFFs and multiple VCFs, the GTF/GFFs and the VCFs must be matched 1-to-1, but input contains " << transcripts.size() << " GTF/GFF files and " << vcf_filenames.size() << " VCF files." << endl; + exit(1); } // are we broadcasting the transcripts from one chunk to many? bool broadcasting_txs = transcripts.size() != max(ref_filenames.size(), vcf_filenames.size()); - + // TODO: this estimate should include splice edges too vector> approx_job_requirements; { size_t i = 0; for (auto approx_mem : each_approx_graph_memory(ref_filenames, vcf_filenames)) { int64_t approx_time; - if (vcf_filenames.size() != 1) { + if (!vcf_filenames.empty() && vcf_filenames.size() != 1) { approx_time = get_file_size(vcf_filenames[i]); } else { @@ -1903,10 +1938,9 @@ IndexRegistry VGIndexes::get_vg_index_registry() { #endif auto ref_filename = ref_filenames.size() == 1 ? ref_filenames[0] : ref_filenames[idx]; - auto vcf_filename = vcf_filenames.size() == 1 ? vcf_filenames[0] : vcf_filenames[idx]; #ifdef debug_index_registry_recipes - cerr << "constructing graph with Constructor for ref " << ref_filename << " and variants " << vcf_filename << endl; + cerr << "constructing graph with Constructor for ref " << ref_filename << endl; #endif // init and configure the constructor @@ -1915,6 +1949,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { constructor.alt_paths = alt_paths; constructor.max_node_size = IndexingParameters::max_node_size; constructor.show_progress = IndexingParameters::verbosity >= IndexingParameters::Debug; + if (ref_filenames.size() != 1 && vcf_filenames.size() == 1) { // we have multiple FASTA but only 1 VCF, so we'll limit the // constructor to the contigs of this FASTA for this run @@ -1924,13 +1959,13 @@ IndexRegistry VGIndexes::get_vg_index_registry() { constructor.allowed_vcf_names.insert(seqname); } } - else if (vcf_filenames.size() != 1 && ref_filenames.size() == 1) { + else if (!vcf_filenames.empty() && vcf_filenames.size() != 1 && ref_filenames.size() == 1) { // we have multiple VCFs but only 1 FASTA, so we'll limit the // constructor to the contigs of this VCF for this run // unfortunately there doesn't seem to be a good way to do this without // iterating over the entire file: - for (const auto& contig : vcf_contigs(vcf_filename)) { + for (const auto& contig : vcf_contigs(vcf_filenames[idx])) { constructor.allowed_vcf_names.insert(contig); } } @@ -1943,7 +1978,10 @@ IndexRegistry VGIndexes::get_vg_index_registry() { auto graph = init_mutable_graph(); vector fasta(1, ref_filename); - vector vcf(1, vcf_filename); + vector vcf; + if (!vcf_filenames.empty()) { + vcf.emplace_back(vcf_filenames.size() == 1 ? vcf_filenames[0] : vcf_filenames[idx]); + } // do the construction constructor.construct_graph(fasta, vcf, insertions, graph.get()); @@ -2076,14 +2114,14 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, true, false); + return construct_with_constructor(inputs, plan, constructing, true, false, true); }); registry.register_recipe({"MaxNodeID", "VG w/ Variant Paths"}, {"Chunked Reference FASTA", "Chunked VCF w/ Phasing"}, [construct_with_constructor](const vector& inputs, const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, true, false); + return construct_with_constructor(inputs, plan, constructing, true, false, true); }); registry.register_recipe({"MaxNodeID", "NamedNodeBackTranslation", "VG"}, {"Reference GFA"}, [&](const vector& inputs, @@ -2097,14 +2135,21 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, false, false); + return construct_with_constructor(inputs, plan, constructing, false, false, true); }); registry.register_recipe({"MaxNodeID", "VG"}, {"Chunked Reference FASTA", "Chunked VCF"}, + [construct_with_constructor](const vector& inputs, + const IndexingPlan* plan, + AliasGraph& alias_graph, + const IndexGroup& constructing) { + return construct_with_constructor(inputs, plan, constructing, false, false, true); + }); + registry.register_recipe({"MaxNodeID", "VG"}, {"Chunked Reference FASTA"}, [construct_with_constructor](const vector& inputs, const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, false, false); + return construct_with_constructor(inputs, plan, constructing, false, false, false); }); #ifdef debug_index_registry_setup @@ -2136,7 +2181,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, true, true); + return construct_with_constructor(inputs, plan, constructing, true, true, true); }); registry.register_recipe({"Spliced MaxNodeID", "Spliced VG w/ Variant Paths"}, @@ -2145,7 +2190,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, true, true); + return construct_with_constructor(inputs, plan, constructing, true, true, true); }); registry.register_recipe({"Spliced MaxNodeID", "Spliced VG"}, @@ -2154,7 +2199,7 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, false, true); + return construct_with_constructor(inputs, plan, constructing, false, true, true); }); registry.register_recipe({"Spliced MaxNodeID", "Spliced VG"}, @@ -2163,7 +2208,16 @@ IndexRegistry VGIndexes::get_vg_index_registry() { const IndexingPlan* plan, AliasGraph& alias_graph, const IndexGroup& constructing) { - return construct_with_constructor(inputs, plan, constructing, false, true); + return construct_with_constructor(inputs, plan, constructing, false, true, true); + }); + + registry.register_recipe({"Spliced MaxNodeID", "Spliced VG"}, + {"Chunked GTF/GFF", "Chunked Reference FASTA"}, + [construct_with_constructor](const vector& inputs, + const IndexingPlan* plan, + AliasGraph& alias_graph, + const IndexGroup& constructing) { + return construct_with_constructor(inputs, plan, constructing, false, true, false); }); @@ -2784,15 +2838,22 @@ IndexRegistry VGIndexes::get_vg_index_registry() { return all_outputs; }); - registry.register_recipe({"Haplotype-Transcript GBWT", "Spliced VG w/ Transcript Paths", "Unjoined Transcript Origin Table", }, - {"Chunked GTF/GFF", "Spliced GBWT", "Spliced VG"}, - [merge_gbwts](const vector& inputs, - const IndexingPlan* plan, - AliasGraph& alias_graph, - const IndexGroup& constructing) { + // meta-recipe to either add transcripts paths or also make HST collections + auto do_vg_rna = [merge_gbwts](const vector& inputs, + const IndexingPlan* plan, + AliasGraph& alias_graph, + const IndexGroup& constructing) { + + assert(constructing.size() == 3 || constructing.size() == 1); + bool making_hsts = constructing.size() == 3; if (IndexingParameters::verbosity != IndexingParameters::None) { - cerr << "[IndexRegistry]: Constructing haplotype-transcript GBWT and finishing spliced VG." << endl; + if (making_hsts) { + cerr << "[IndexRegistry]: Constructing haplotype-transcript GBWT and finishing spliced VG." << endl; + } + else { + cerr << "[IndexRegistry]: Finishing spliced VG." << endl; + } if (IndexingParameters::verbosity >= IndexingParameters::Debug) { gbwt::Verbosity::set(gbwt::Verbosity::BASIC); } @@ -2800,15 +2861,17 @@ IndexRegistry VGIndexes::get_vg_index_registry() { else { gbwt::Verbosity::set(gbwt::Verbosity::SILENT); } - assert(constructing.size() == 3); vector> all_outputs(constructing.size()); IndexName output_haplo_tx, output_tx_table, output_tx_graph; { int i = 0; for (auto output_index : constructing) { - if (i == 0) { + if (i == 0 && making_hsts) { output_haplo_tx = output_index; } + else if (i == 0 && !making_hsts) { + output_tx_graph = output_index; + } else if (i == 1) { output_tx_graph = output_index; } @@ -2818,48 +2881,59 @@ IndexRegistry VGIndexes::get_vg_index_registry() { ++i; } } - auto& haplo_tx_gbwt_names = all_outputs[0]; - auto& tx_graph_names = all_outputs[1]; - auto& tx_table_names = all_outputs[2]; - - assert(inputs.size() == 3); - auto tx_filenames = inputs[0]->get_filenames(); - auto gbwt_filenames = inputs[1]->get_filenames(); - auto graph_filenames = inputs[2]->get_filenames(); - - assert(gbwt_filenames.size() == 1); - auto gbwt_filename = gbwt_filenames.front(); - - unique_ptr haplotype_index = vg::io::VPKG::load_one(gbwt_filename); + //auto& haplo_tx_gbwt_names = all_outputs[0]; + auto& tx_graph_names = all_outputs[making_hsts ? 1 : 0]; + //auto& tx_table_names = all_outputs[2]; - // TODO: i can't find where in the building code you actually ensure this... - assert(haplotype_index->bidirectional()); + vector tx_filenames, gbwt_filenames, graph_filenames; + string gbwt_filename; + unique_ptr haplotype_index; + vector gbwt_chunk_names; + if (making_hsts) { + tx_filenames = inputs[0]->get_filenames(); + auto gbwt_filenames = inputs[1]->get_filenames(); + graph_filenames = inputs[2]->get_filenames(); + + assert(gbwt_filenames.size() == 1); + gbwt_filename = gbwt_filenames.front(); + + haplotype_index = vg::io::VPKG::load_one(gbwt_filename); + + // TODO: i can't find where in the building code you actually ensure this... + assert(haplotype_index->bidirectional()); + + // the HST tables + all_outputs[2].resize(graph_filenames.size()); + + gbwt_chunk_names.resize(graph_filenames.size()); + } + else { + tx_filenames = inputs[0]->get_filenames(); + graph_filenames = inputs[1]->get_filenames(); + } tx_graph_names.resize(graph_filenames.size()); - tx_table_names.resize(graph_filenames.size()); - vector gbwt_chunk_names(graph_filenames.size()); + auto haplo_tx_job = [&](int64_t i) { string tx_graph_name = plan->output_filepath(output_tx_graph, i, graph_filenames.size()); ofstream tx_graph_outfile; init_out(tx_graph_outfile, tx_graph_name); - string gbwt_name; - if (graph_filenames.size() != 1) { - // multiple components, so make a temp file that we will merge later - gbwt_name = temp_file::create(); - } - else { - // one component, so we will actually save the output - gbwt_name = plan->output_filepath(output_haplo_tx, i, graph_filenames.size()); + string gbwt_name, info_table_name; + if (making_hsts) { + if (graph_filenames.size() != 1) { + // multiple components, so make a temp file that we will merge later + gbwt_name = temp_file::create(); + } + else { + // one component, so we will actually save the output + gbwt_name = plan->output_filepath(output_haplo_tx, i, graph_filenames.size()); + } } int64_t j = tx_filenames.size() > 1 ? i : 0; - string info_table_name = plan->output_filepath(output_tx_table, i, graph_filenames.size()); - ofstream info_outfile; - init_out(info_outfile, info_table_name); - ifstream infile_graph, infile_tx; init_in(infile_graph, graph_filenames[i]); init_in(infile_tx, tx_filenames[j]); @@ -2894,46 +2968,57 @@ IndexRegistry VGIndexes::get_vg_index_registry() { } } - // go back to the beginning of the transcripts - infile_tx.clear(); - infile_tx.seekg(0); - - // add edges on other haplotypes - size_t num_transcripts_projected = transcriptome.add_haplotype_transcripts(vector({&infile_tx}), *haplotype_index, false); - - // init the haplotype transcript GBWT - size_t node_width = gbwt::bit_length(gbwt::Node::encode(transcriptome.graph().max_node_id(), true)); - bool success = execute_in_fork([&]() { - gbwt::GBWTBuilder gbwt_builder(node_width, - IndexingParameters::gbwt_insert_batch_size, - IndexingParameters::gbwt_sampling_interval); - // actually build it - transcriptome.add_transcripts_to_gbwt(&gbwt_builder, IndexingParameters::bidirectional_haplo_tx_gbwt, false); + if (making_hsts) { - // save the haplotype transcript GBWT - gbwt_builder.finish(); - save_gbwt(gbwt_builder.index, gbwt_name, IndexingParameters::verbosity == IndexingParameters::Debug); - }); - if (!success) { - IndexingParameters::gbwt_insert_batch_size *= IndexingParameters::gbwt_insert_batch_size_increase_factor; - throw RewindPlanException("[IndexRegistry]: Exceeded GBWT insert buffer size, expanding and reattempting.", - {"Haplotype-Transcript GBWT"}); + // go back to the beginning of the transcripts + infile_tx.clear(); + infile_tx.seekg(0); + + // add edges on other haplotypes + size_t num_transcripts_projected = transcriptome.add_haplotype_transcripts(vector({&infile_tx}), *haplotype_index, false); + + // init the haplotype transcript GBWT + size_t node_width = gbwt::bit_length(gbwt::Node::encode(transcriptome.graph().max_node_id(), true)); + bool success = execute_in_fork([&]() { + gbwt::GBWTBuilder gbwt_builder(node_width, + IndexingParameters::gbwt_insert_batch_size, + IndexingParameters::gbwt_sampling_interval); + // actually build it + transcriptome.add_transcripts_to_gbwt(&gbwt_builder, IndexingParameters::bidirectional_haplo_tx_gbwt, false); + + // save the haplotype transcript GBWT + gbwt_builder.finish(); + save_gbwt(gbwt_builder.index, gbwt_name, IndexingParameters::verbosity == IndexingParameters::Debug); + }); + if (!success) { + IndexingParameters::gbwt_insert_batch_size *= IndexingParameters::gbwt_insert_batch_size_increase_factor; + throw RewindPlanException("[IndexRegistry]: Exceeded GBWT insert buffer size, expanding and reattempting.", + {"Haplotype-Transcript GBWT"}); + } + + // write transcript origin info table + info_table_name = plan->output_filepath(output_tx_table, i, graph_filenames.size()); + ofstream info_outfile; + init_out(info_outfile, info_table_name); + transcriptome.write_transcript_info(&info_outfile, *haplotype_index, false); } - // write transcript origin info table - transcriptome.write_transcript_info(&info_outfile, *haplotype_index, false); - // save the graph with the transcript paths added transcriptome.write_graph(&tx_graph_outfile); tx_graph_names[i] = tx_graph_name; - tx_table_names[i] = info_table_name; - gbwt_chunk_names[i] = gbwt_name; + + if (making_hsts) { + gbwt_chunk_names[i] = gbwt_name; + all_outputs[2][i] = info_table_name; + } }; // we'll hold the gbwt in memory, so take it out of our memory budget int64_t target_memory_usage = plan->target_memory_usage(); - target_memory_usage = max(0, target_memory_usage - get_file_size(gbwt_filename)); + if (making_hsts) { + target_memory_usage = max(0, target_memory_usage - get_file_size(gbwt_filename)); + } vector> approx_job_requirements; for (int64_t i = 0; i < graph_filenames.size(); ++i) { @@ -2945,12 +3030,39 @@ IndexRegistry VGIndexes::get_vg_index_registry() { JobSchedule schedule(approx_job_requirements, haplo_tx_job); schedule.execute(target_memory_usage); - // merge the GBWT chunks - haplo_tx_gbwt_names.push_back(merge_gbwts(gbwt_chunk_names, plan, output_haplo_tx)); + if (making_hsts) { + // merge the GBWT chunks + all_outputs[0].push_back(merge_gbwts(gbwt_chunk_names, plan, output_haplo_tx)); + } return all_outputs; + }; + + auto vg_rna_graph_only = + registry.register_recipe({"Spliced VG w/ Transcript Paths"}, + {"Chunked GTF/GFF", "Spliced VG"}, + [do_vg_rna](const vector& inputs, + const IndexingPlan* plan, + AliasGraph& alias_graph, + const IndexGroup& constructing) { + + return do_vg_rna(inputs, plan, alias_graph, constructing); + }); + + auto vg_rna_full = + registry.register_recipe({"Haplotype-Transcript GBWT", "Spliced VG w/ Transcript Paths", "Unjoined Transcript Origin Table"}, + {"Chunked GTF/GFF", "Spliced GBWT", "Spliced VG"}, + [do_vg_rna](const vector& inputs, + const IndexingPlan* plan, + AliasGraph& alias_graph, + const IndexGroup& constructing) { + + return do_vg_rna(inputs, plan, alias_graph, constructing); }); + // if both the full and graph-only are required, only do the full + registry.register_generalization(vg_rna_full, vg_rna_graph_only); + //////////////////////////////////// // Info Table Recipes //////////////////////////////////// @@ -3782,7 +3894,14 @@ vector VGIndexes::get_default_mpmap_indexes() { "Spliced XG", "Spliced Distance Index", "Spliced GCSA", - "Spliced LCP", + "Spliced LCP" + }; + return indexes; +} + +vector VGIndexes::get_default_rpvg_indexes() { + vector indexes{ + "Spliced XG", "Haplotype-Transcript GBWT", "Transcript Origin Table" }; @@ -4220,40 +4339,24 @@ RecipeName IndexRegistry::register_recipe(const vector& identifiers, return name; } -//void IndexRegistry::register_joint_recipe(const vector& identifiers, -// const vector& input_identifiers, -// const JointRecipeFunc& exec) { -// // We're going to generate a bunch of single-index recipes where the first -// // one to run calls the joint recipe, and other ones to run just return -// // their slice of the joint recipe's return value. -// -// // We need all the joint recipe names, one for each identifier we generate -// vector names; -// -// // We need a place to hold the return values we can carry around by value. -// shared_ptr>> results(std::make_shared>>()); -// -// for (size_t i = 0; i < identifiers.size(); i++) { -// IndexName being_generated = identifiers[i]; -// -// // Create a recipe that invokes the joint recipe. -// RecipeFunc stub = [i, results, being_generated, &exec](const vector& inputs, const IndexingPlan* plan, const IndexName& constructing) -> vector { -// if (results->empty()) { -// // Invoke the actual logic, passing along the plan, and fill in results -// *results = exec(inputs, plan); -// // TODO: handle parallel invocations? -// } -// -// // Get our slice of the result file list. -// return results->at(i); -// }; -// -// names.push_back(register_recipe(being_generated, input_identifiers, stub)); -// } -// -// // Remember that these are a joint recipe. -// simplifications.emplace_back(input_identifiers, names); -//} +void IndexRegistry::register_generalization(const RecipeName& generalizer, const RecipeName& generalizee) { + for (const auto& index_name : generalizee.first) { + if (!generalizer.first.count(index_name)) { + cerr << "error:[IndexRegistry] registered a generalization that does not contain generalizee's output " << index_name << endl; + exit(1); + } + } + const auto& generalizer_recipe = recipe_registry.at(generalizer.first).at(generalizer.second); + const auto& generalizee_recipe = recipe_registry.at(generalizee.first).at(generalizee.second); + for (const auto& index_name : generalizee_recipe.input_group()) { + if (!generalizer_recipe.input_group().count(index_name)) { + cerr << "error:[IndexRegistry] registered a generalization that does not contain generalizee's input " << index_name << endl; + exit(1); + } + } + + generalizations[generalizee] = generalizer; +} IndexFile* IndexRegistry::get_index(const IndexName& identifier) { return index_registry.at(identifier).get(); @@ -4849,142 +4952,19 @@ IndexingPlan IndexRegistry::make_plan(const IndexGroup& end_products) const { sort(plan.steps.begin(), plan.steps.end(), [&](const RecipeName& a, const RecipeName& b) { return dep_order_of_identifier[a.first] < dep_order_of_identifier[b.first]; }); + + // remove generalizees if we used their generalizers + set plan_set(plan.steps.begin(), plan.steps.begin()); + plan.steps.resize(remove_if(plan.steps.begin(), plan.steps.end(), [&](const RecipeName& recipe) { + return generalizations.count(recipe) && plan_set.count(generalizations.at(recipe)); + }) - plan.steps.begin()); + #ifdef debug_index_registry cerr << "full plan including provided files:" << endl; for (auto plan_elem : plan.steps) { cerr << "\t" << to_string(plan_elem.first) << " " << plan_elem.second << endl; } #endif - -// // Now simplify the plan by using joint recipes if possible. -// -// // First we need to know all the indexes being created, and when -// map make_at_step; -// for (size_t i = 0; i < plan.steps.size(); i++) { -// make_at_step.emplace(plan.steps[i].first, i); -// } -// -// // We also need to know what steps we've already simplified, or are inputs. -// // We don't want to apply overlapping simplifications. -// vector fixed_step(plan.steps.size(), false); -// -// for (size_t i = 0; i < plan.steps.size(); i++) { -// auto& recipe = plan.steps.at(i); -// if (get_index(recipe.first)->is_finished()) { -// // This is already provided and ineligible for simplification. -// fixed_step[i] = true; -// } -// } -// -// for (auto& simplification : simplifications) { -// // For each set of output indexes from a simplification -// -//#ifdef debug_index_registry -// cerr << "Consider simplification to jointly make:" << endl; -// for (auto& recipe: simplification.second) { -// cerr << "\t" << to_string(recipe.first) << endl; -// } -//#endif -// -// // Determine if we are making all the products of the simplification, -// // and those products have not been involved in prior simplifications -// bool making_all_products_unsimplified = true; -// // And if so, the first step at which we are making any -// size_t first_step = numeric_limits::max(); -// for (auto& product_recipe : simplification.second) { -// const IndexName& product_name = product_recipe.first; -// -// auto found = make_at_step.find(product_name); -// if (found == make_at_step.end()) { -// // We aren't making this product -// -//#ifdef debug_index_registry -// cerr << "We are not making " << to_string(product_name) << endl; -//#endif -// -// making_all_products_unsimplified = false; -// break; -// } -// -// if (fixed_step[found->second]) { -// // We are making this product but we already simplified it or took it as input -// -//#ifdef debug_index_registry -// cerr << "We cannot further simplify making " << to_string(product_name) << endl; -//#endif -// -// making_all_products_unsimplified = false; -// break; -// } -// -//#ifdef debug_index_registry -// cerr << "We are making " << to_string(product_name) << " at step " << found->second << endl; -//#endif -// -// first_step = min(first_step, found->second); -// } -// -// if (!making_all_products_unsimplified) { -// // This simplification can't be used becuase it makes extra -// // products, or products that are already simplified. -// -//#ifdef debug_index_registry -// cerr << "We are not making all the products for this simplification, or some products cannot be further simplified" << endl; -//#endif -// -// continue; -// } -// -//#ifdef debug_index_registry -// cerr << "To simplify, all inputs will need to be available before step " << first_step << endl; -//#endif -// -// // See what we have available before the first step -// set available_in_time; -// for (size_t i = 0; i < first_step; i++) { -// available_in_time.insert(plan.steps[i].first); -// } -// -// // See if it's all the inputs the simplification needs -// bool all_available = true; -// for (auto& needed : simplification.first) { -// if (!available_in_time.count(needed)) { -//#ifdef debug_index_registry -// cerr << "We are not making " << to_string(needed) << " in time or at all." << endl; -//#endif -// all_available = false; -// break; -// } -// } -// -// if (!all_available) { -// // This simplification can't be used because not all its inputs are available in time. -// -//#ifdef debug_index_registry -// cerr << "Not all inputs will be available in time." << endl; -//#endif -// -// continue; -// } -// -//#ifdef debug_index_registry -// cerr << "All inputs will be available in time. Apply simplification!" << endl; -//#endif -// -// for (auto& recipe : simplification.second) { -// // Replace each relevant step with the corresponding joint step for that index. -// size_t step_to_simplify = make_at_step.at(recipe.first); -// plan.steps.at(step_to_simplify) = recipe; -// fixed_step[step_to_simplify] = true; -// } -// } -// -//#ifdef debug_index_registry -// cerr << "plan after simplification:" << endl; -// for (auto plan_elem : plan.steps) { -// cerr << "\t" << to_string(plan_elem.first) << " " << plan_elem.second << endl; -// } -//#endif // Now remove the input data from the plan plan.steps.resize(remove_if(plan.steps.begin(), plan.steps.end(), [&](const RecipeName& recipe_choice) { @@ -5084,11 +5064,13 @@ string IndexRegistry::to_dot(const vector& targets) const { } string unselected_col = targets.empty() ? "black" : "gray33"; size_t recipe_idx = 0; + map recipe_to_dot_id; for (const auto& recipe_record : recipe_registry) { const auto& recipes = recipe_record.second; for (size_t priority_idx = 0; priority_idx < recipes.size(); ++priority_idx, ++recipe_idx) { const auto& recipe = recipes[priority_idx]; string recipe_dot_id = "R" + to_string(recipe_idx); + recipe_to_dot_id[RecipeName(recipe_record.first, recipe_idx)] = recipe_dot_id; bool recipe_in_plan = plan_elements.count(RecipeName(recipe_record.first, priority_idx)); if (recipe_in_plan) { strm << recipe_dot_id << "[label=\"" << priority_idx << "\" shape=circle style=bold];" << endl; @@ -5122,6 +5104,9 @@ string IndexRegistry::to_dot(const vector& targets) const { } } + for (const auto& generalization_record : generalizations) { + strm << recipe_to_dot_id.at(generalization_record.first) << " -> " << recipe_to_dot_id.at(generalization_record.second) << " [style=dashed color=" << unselected_col << "];" << endl; + } strm << "}" << endl; return strm.str(); } diff --git a/src/index_registry.hpp b/src/index_registry.hpp index 12d3bcb460f..593d8b5e6a5 100644 --- a/src/index_registry.hpp +++ b/src/index_registry.hpp @@ -49,13 +49,6 @@ using RecipeFunc = function>(const vector; -/** - * Is a recipe to create the files (returned by name) associated with some - * indexes, from a series of input indexes, given the plan they are being - * generated for. - */ -using JointRecipeFunc = function>>(const vector&,const IndexingPlan*)>; - /** * A struct namespace for global handling of parameters used by * the IndexRegistry @@ -135,6 +128,8 @@ struct VGIndexes { static vector get_default_map_indexes(); /// A list of the identifiers of the default indexes to run vg mpmap static vector get_default_mpmap_indexes(); + /// A list of the identifiers of the default indexes to run rpvg + static vector get_default_rpvg_indexes(); /// A list of the identifiers of the default indexes to run vg giraffe static vector get_default_giraffe_indexes(); }; @@ -233,20 +228,9 @@ class IndexRegistry { const vector& input_identifiers, const RecipeFunc& exec); -// /// Register a recipe to produce multiple indexes. -// /// Individual index recipes must still be registered; this recipe will be -// /// used to simplify the plan when the individual recipes with the same -// /// input set are all called. -// /// -// /// Joint recipes may also be used to generate single indexes if they have -// /// higher priority than other recipes. -// /// -// /// All output identifiers must be distinct, and there must be at least -// /// one. Only one multi-index recipe can be applied to simplify the -// /// production of any given index in a plan. -// void register_joint_recipe(const vector& identifiers, -// const vector& input_identifiers, -// const JointRecipeFunc& exec); + /// Indicate one recipe is a broadened version of another. The indexes consumed and produced + /// by the generalization must be semantically identical to those of the generalizee + void register_generalization(const RecipeName& generalizer, const RecipeName& generalizee); /// Indicate a serialized file that contains some identified index void provide(const IndexName& identifier, const string& filename); @@ -334,9 +318,8 @@ class IndexRegistry { /// The storage struct for recipes, which may make index map> recipe_registry; - /// Record that, when the given input indexes are available, this - /// collection of recipes is efficient to run together. - vector, vector>> simplifications; + /// Map from generalizees to generalizers + map generalizations; /// Temporary directory in which indexes will live string work_dir; diff --git a/src/multipath_mapper.cpp b/src/multipath_mapper.cpp index 6d9ff7b492c..6dc0c175824 100644 --- a/src/multipath_mapper.cpp +++ b/src/multipath_mapper.cpp @@ -3087,7 +3087,7 @@ namespace vg { // check if the fully realized alignment still looks approx disjoint with the primary auto interval = aligned_interval(candidate); if (searching_left) { - if (interval.second >= primary_interval.first + max_softclip_overlap || + if (interval.second >= primary_interval.first + 2 * max_softclip_overlap || min(interval.second, primary_interval.first) - interval.first < min_softclip_length_for_splice) { #ifdef debug_multipath_mapper cerr << "rejecting candidate because of overlap" << endl; @@ -3098,7 +3098,7 @@ namespace vg { } } else { - if (interval.first < primary_interval.second - max_softclip_overlap || + if (interval.first < primary_interval.second - 2 * max_softclip_overlap || interval.second - max(interval.first, primary_interval.second) < min_softclip_length_for_splice) { #ifdef debug_multipath_mapper cerr << "rejecting candidate because of overlap" << endl; diff --git a/src/subcommand/autoindex_main.cpp b/src/subcommand/autoindex_main.cpp index 47004ec6dc2..24fc16ed9ab 100644 --- a/src/subcommand/autoindex_main.cpp +++ b/src/subcommand/autoindex_main.cpp @@ -99,7 +99,7 @@ void help_autoindex(char** argv) { << " output:" << endl << " -p, --prefix PREFIX prefix to use for all output (default: index)" << endl << " -w, --workflow NAME workflow to produce indexes for, can be provided multiple" << endl - << " times. options: map, mpmap, giraffe (default: map)" << endl + << " times. options: map, mpmap, rpvg, giraffe (default: map)" << endl << " input data:" << endl << " -r, --ref-fasta FILE FASTA file containing the reference sequence (may repeat)" << endl << " -v, --vcf FILE VCF file with sequence names matching -r (may repeat)" << endl @@ -206,6 +206,11 @@ int main_autoindex(int argc, char** argv) { targets.emplace_back(move(target)); } } + else if (optarg == string("rpvg")) { + for (auto& target : VGIndexes::get_default_rpvg_indexes()) { + targets.emplace_back(move(target)); + } + } else { cerr << "error: Unrecognized workflow (-w): " << optarg << endl; return 1; diff --git a/test/t/52_vg_autoindex.t b/test/t/52_vg_autoindex.t index 78d70679940..29982f1c939 100644 --- a/test/t/52_vg_autoindex.t +++ b/test/t/52_vg_autoindex.t @@ -5,7 +5,7 @@ BASH_TAP_ROOT=../deps/bash-tap PATH=../bin:$PATH # for vg -plan tests 32 +plan tests 34 rm auto.* @@ -35,7 +35,7 @@ rm auto.* # to add: GFA construction -vg autoindex -p auto -w mpmap -r tiny/tiny.fa -v tiny/tiny.vcf.gz -x tiny/tiny.gtf +vg autoindex -p auto -w mpmap -w rpvg -r tiny/tiny.fa -v tiny/tiny.vcf.gz -x tiny/tiny.gtf is $(echo $?) 0 "autoindexing successfully completes indexing for vg mpmap with unchunked input" is $(ls auto.* | wc -l) 6 "autoindexing creates 6 files for mpmap/rpvg" vg sim -x auto.spliced.xg -n 20 -a -l 10 | vg mpmap -x auto.spliced.xg -g auto.spliced.gcsa -d auto.spliced.dist -B -t 1 -G - > /dev/null @@ -45,18 +45,24 @@ is $(cat auto.txorigin.tsv | wc -l) 7 "transcript origin table has expected numb rm auto.* -vg autoindex -p auto -w mpmap -r small/x.fa -r small/y.fa -v small/x.vcf.gz -v small/y.vcf.gz -x small/x.gtf -x small/y.gtf +vg autoindex -p auto -w mpmap -r tiny/tiny.fa -x tiny/tiny.gtf +is $(echo $?) 0 "autoindexing successfully completes indexing for vg mpmap without variants" +is $(ls auto.* | wc -l) 4 "autoindexing creates 4 files for mpmap/rpvg without variants" + +rm auto.* + +vg autoindex -p auto -w mpmap -w rpvg -r small/x.fa -r small/y.fa -v small/x.vcf.gz -v small/y.vcf.gz -x small/x.gtf -x small/y.gtf is $(echo $?) 0 "autoindexing successfully completes indexing for vg mpmap with chunked input" is $(ls auto.* | wc -l) 6 "autoindexing creates 6 files for mpmap/rpvg with chunked input" rm auto.* -vg autoindex -p auto -w mpmap -r small/x.fa -r small/y.fa -v small/x.vcf.gz -v small/y.vcf.gz -x small/xy.gtf +vg autoindex -p auto -w mpmap -w rpvg -r small/x.fa -r small/y.fa -v small/x.vcf.gz -v small/y.vcf.gz -x small/xy.gtf is $(echo $?) 0 "autoindexing successfully completes indexing for vg mpmap with partially chunked input" rm auto.* -vg autoindex -p auto -w mpmap -r small/xy.fa -v small/x.vcf.gz -v small/y.vcf.gz -x small/xy.gtf +vg autoindex -p auto -w mpmap -w rpvg -r small/xy.fa -v small/x.vcf.gz -v small/y.vcf.gz -x small/xy.gtf is $(echo $?) 0 "autoindexing successfully completes indexing for vg mpmap with another partially chunked input" rm auto.*