From 9fc0e5d5d3f40fc69129674245472a0ef71571f3 Mon Sep 17 00:00:00 2001 From: Vikram Shivakumar Date: Thu, 10 Apr 2025 22:00:36 -0400 Subject: [PATCH 1/6] parallelized bindings --- include/digest/thread_out.hpp | 2 +- pybind/bindings.cpp | 13 ++- pybind/digest_utils.hpp | 174 +++++++++++++++++++++++++++------- 3 files changed, 150 insertions(+), 39 deletions(-) diff --git a/include/digest/thread_out.hpp b/include/digest/thread_out.hpp index 7846cf3..a55f9ff 100644 --- a/include/digest/thread_out.hpp +++ b/include/digest/thread_out.hpp @@ -365,7 +365,7 @@ void thread_wind( unsigned lwinds_per_thread = num_lwinds / thread_count; unsigned extras = num_lwinds % thread_count; vec.reserve(thread_count); - std::vector>> thread_vector; + std::vector>>> thread_vector; size_t ind = start; for (unsigned i = 0; i < thread_count; i++) { diff --git a/pybind/bindings.cpp b/pybind/bindings.cpp index 33a785c..1b43bf7 100644 --- a/pybind/bindings.cpp +++ b/pybind/bindings.cpp @@ -7,14 +7,17 @@ namespace py = pybind11; PYBIND11_MODULE(digest, m) { m.doc() = "bindings for digest"; m.def("window_minimizer", &window_minimizer, - "A function that runs window minimizer digestion", py::arg("seq"), - py::arg("k") = 31, py::arg("w") = 11, + "A function that runs window minimizer digestion", + py::call_guard(), // Release GIL during execution + py::arg("seq"), py::arg("k") = 31, py::arg("w") = 11, py::arg("num_threads") = 1, py::arg("include_hash") = false); m.def("modimizer", &modimizer, - "A function that runs mod-minimizer digestion", py::arg("seq"), - py::arg("k") = 31, py::arg("mod") = 100, + "A function that runs mod-minimizer digestion", + py::call_guard(), // Release GIL during execution + py::arg("seq"), py::arg("k") = 31, py::arg("mod") = 100, py::arg("num_threads") = 1, py::arg("include_hash") = false); m.def("syncmer", &syncmer, "A function that runs syncmer digestion", - py::arg("seq"), py::arg("k") = 31, py::arg("w") = 11, + py::call_guard(), // Release GIL during execution + py::arg("seq"), py::arg("k") = 31, py::arg("w") = 11, py::arg("num_threads") = 1, py::arg("include_hash") = false); } diff --git a/pybind/digest_utils.hpp b/pybind/digest_utils.hpp index f2547ca..0a8f838 100644 --- a/pybind/digest_utils.hpp +++ b/pybind/digest_utils.hpp @@ -1,52 +1,160 @@ #include #include #include +#include #include std::variant, std::vector>> window_minimizer(const std::string &seq, unsigned k, unsigned large_window, - bool include_hash = false) { - digest::WindowMin - digester(seq, k, large_window); - if (include_hash) { - std::vector> output; - digester.roll_minimizer(seq.length(), output); - return output; - } else { - std::vector output; - digester.roll_minimizer(seq.length(), output); - return output; + unsigned threads, bool include_hash = false) { + if (threads == 1) { + digest::WindowMin + digester(seq, k, large_window); + if (include_hash) { + std::vector> output; + digester.roll_minimizer(seq.length(), output); + return output; + } else { + std::vector output; + digester.roll_minimizer(seq.length(), output); + return output; + } + } + else { + if (include_hash) { + std::vector>> output; + digest::thread_out::thread_wind( + threads, output, seq, k, large_window); + + size_t total_size = 0; + for (const auto& vec : output) { + total_size += vec.size(); + } + std::vector> concatenated; + concatenated.reserve(total_size); + for (const auto& vec : output) { + concatenated.insert(concatenated.end(), vec.begin(), vec.end()); + } + return concatenated; + } else { + std::vector> output; + digest::thread_out::thread_wind( + threads, output, seq, k, large_window); + + size_t total_size = 0; + for (const auto& vec : output) { + total_size += vec.size(); + } + + std::vector concatenated; + concatenated.reserve(total_size); + for (const auto& vec : output) { + concatenated.insert(concatenated.end(), vec.begin(), vec.end()); + } + return concatenated; + } } } -// std::vector> output; std::variant, std::vector>> modimizer(const std::string &seq, unsigned k, uint32_t mod, - bool include_hash = false) { - digest::ModMin digester(seq, k, mod); - if (include_hash) { - std::vector> output; - digester.roll_minimizer(seq.length(), output); - return output; - } else { - std::vector output; - digester.roll_minimizer(seq.length(), output); - return output; + unsigned threads, bool include_hash = false) { + if (threads == 1) { + digest::ModMin digester(seq, k, mod); + if (include_hash) { + std::vector> output; + digester.roll_minimizer(seq.length(), output); + return output; + } else { + std::vector output; + digester.roll_minimizer(seq.length(), output); + return output; + } + } + else { + if (include_hash) { + std::vector>> output; + digest::thread_out::thread_mod( + threads, output, seq, k, mod); + + size_t total_size = 0; + for (const auto& vec : output) { + total_size += vec.size(); + } + std::vector> concatenated; + concatenated.reserve(total_size); + for (const auto& vec : output) { + concatenated.insert(concatenated.end(), vec.begin(), vec.end()); + } + return concatenated; + } else { + std::vector> output; + digest::thread_out::thread_mod( + threads, output, seq, k, mod); + + size_t total_size = 0; + for (const auto& vec : output) { + total_size += vec.size(); + } + + std::vector concatenated; + concatenated.reserve(total_size); + for (const auto& vec : output) { + concatenated.insert(concatenated.end(), vec.begin(), vec.end()); + } + return concatenated; + } } } std::variant, std::vector>> syncmer(const std::string &seq, unsigned k, unsigned large_window, - bool include_hash = false) { - digest::Syncmer - digester(seq, k, large_window); - if (include_hash) { - std::vector> output; - digester.roll_minimizer(seq.length(), output); - return output; - } else { - std::vector output; - digester.roll_minimizer(seq.length(), output); - return output; + unsigned threads, bool include_hash = false) { + if (threads == 1) { + digest::Syncmer + digester(seq, k, large_window); + if (include_hash) { + std::vector> output; + digester.roll_minimizer(seq.length(), output); + return output; + } else { + std::vector output; + digester.roll_minimizer(seq.length(), output); + return output; + } + } + else { + if (include_hash) { + std::vector>> output; + digest::thread_out::thread_sync( + threads, output, seq, k, large_window); + + size_t total_size = 0; + for (const auto& vec : output) { + total_size += vec.size(); + } + std::vector> concatenated; + concatenated.reserve(total_size); + for (const auto& vec : output) { + concatenated.insert(concatenated.end(), vec.begin(), vec.end()); + } + return concatenated; + } else { + std::vector> output; + digest::thread_out::thread_sync( + threads, output, seq, k, large_window); + + size_t total_size = 0; + for (const auto& vec : output) { + total_size += vec.size(); + } + + std::vector concatenated; + concatenated.reserve(total_size); + for (const auto& vec : output) { + concatenated.insert(concatenated.end(), vec.begin(), vec.end()); + } + return concatenated; + } } } \ No newline at end of file From b7a2e16ea824128e603240c4638153792af2c716 Mon Sep 17 00:00:00 2001 From: Vikram Shivakumar Date: Fri, 11 Apr 2025 14:22:32 -0400 Subject: [PATCH 2/6] toggled off window_min fix --- pybind/digest_utils.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/pybind/digest_utils.hpp b/pybind/digest_utils.hpp index 0a8f838..a82eb76 100644 --- a/pybind/digest_utils.hpp +++ b/pybind/digest_utils.hpp @@ -20,6 +20,7 @@ window_minimizer(const std::string &seq, unsigned k, unsigned large_window, return output; } } + // return {}; else { if (include_hash) { std::vector>> output; From 352f4ef820a1c96bfd51ae84e934faa646718443 Mon Sep 17 00:00:00 2001 From: Vikram Shivakumar Date: Mon, 5 May 2025 21:56:59 -0400 Subject: [PATCH 3/6] fixed bindings for window_min compile issue for async --- include/digest/thread_out.hpp | 4 ++-- pybind/digest_utils.hpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/digest/thread_out.hpp b/include/digest/thread_out.hpp index a55f9ff..6e458c2 100644 --- a/include/digest/thread_out.hpp +++ b/include/digest/thread_out.hpp @@ -305,7 +305,7 @@ void thread_wind( extras--; } - thread_vector.emplace_back(std::async(thread_wind_roll1, seq, ind, + thread_vector.push_back(std::async(thread_wind_roll1, seq, ind, k, large_wind_kmer_am, minimized_h, assigned_lwind_am)); @@ -377,7 +377,7 @@ void thread_wind( extras--; } - thread_vector.emplace_back(std::async(thread_wind_roll2, seq, ind, + thread_vector.push_back(std::async(thread_wind_roll2, seq, ind, k, large_wind_kmer_am, minimized_h, assigned_lwind_am)); diff --git a/pybind/digest_utils.hpp b/pybind/digest_utils.hpp index a82eb76..ed29a90 100644 --- a/pybind/digest_utils.hpp +++ b/pybind/digest_utils.hpp @@ -49,7 +49,7 @@ window_minimizer(const std::string &seq, unsigned k, unsigned large_window, std::vector concatenated; concatenated.reserve(total_size); - for (const auto& vec : output) { + for (const auto& vec : output) { concatenated.insert(concatenated.end(), vec.begin(), vec.end()); } return concatenated; @@ -158,4 +158,4 @@ syncmer(const std::string &seq, unsigned k, unsigned large_window, return concatenated; } } -} \ No newline at end of file +} From 7e2b16ebe40d3415f7e8cf59222e08a297af3c15 Mon Sep 17 00:00:00 2001 From: Vikram Shivakumar Date: Tue, 6 May 2025 00:52:17 -0400 Subject: [PATCH 4/6] new wrappers pybind to avoid data copying --- include/digest/thread_out.hpp | 12 +-- pybind/bindings.cpp | 12 +++ pybind/digest_utils.hpp | 146 +++++++++++++++++++++++++++++----- 3 files changed, 146 insertions(+), 24 deletions(-) diff --git a/include/digest/thread_out.hpp b/include/digest/thread_out.hpp index 6e458c2..2d2c0f2 100644 --- a/include/digest/thread_out.hpp +++ b/include/digest/thread_out.hpp @@ -173,7 +173,7 @@ void thread_mod( ind += assigned_kmer_am; } for (auto &t : thread_vector) { - vec.emplace_back(t.get()); + vec.emplace_back(std::move(t.get())); } } @@ -235,7 +235,7 @@ void thread_mod( ind += assigned_kmer_am; } for (auto &t : thread_vector) { - vec.emplace_back(t.get()); + vec.emplace_back(std::move(t.get())); } } @@ -312,7 +312,7 @@ void thread_wind( ind += assigned_lwind_am; } for (auto &t : thread_vector) { - vec.emplace_back(t.get()); + vec.emplace_back(std::move(t.get())); } // handle duplicates @@ -384,7 +384,7 @@ void thread_wind( ind += assigned_lwind_am; } for (auto &t : thread_vector) { - vec.emplace_back(t.get()); + vec.emplace_back(std::move(t.get())); } // handle duplicates @@ -473,7 +473,7 @@ void thread_sync( } for (auto &t : thread_vector) { - vec.emplace_back(t.get()); + vec.emplace_back(std::move(t.get())); } } @@ -535,7 +535,7 @@ void thread_sync( ind += assigned_lwind_am; } for (auto &t : thread_vector) { - vec.emplace_back(t.get()); + vec.emplace_back(std::move(t.get())); } } diff --git a/pybind/bindings.cpp b/pybind/bindings.cpp index 1b43bf7..5799fda 100644 --- a/pybind/bindings.cpp +++ b/pybind/bindings.cpp @@ -20,4 +20,16 @@ PYBIND11_MODULE(digest, m) { py::call_guard(), // Release GIL during execution py::arg("seq"), py::arg("k") = 31, py::arg("w") = 11, py::arg("num_threads") = 1, py::arg("include_hash") = false); + m.def("window_minimizer_numpy", &window_minimizer_numpy, + "A function that runs window minimizer digestion and returns a numpy array", + py::arg("seq"), py::arg("k") = 31, py::arg("w") = 11, py::arg("num_threads") = 1, + py::arg("include_hash") = false); + m.def("modimizer_numpy", &modimizer_numpy, + "A function that runs mod-minimizer digestion and returns a numpy array", + py::arg("seq"), py::arg("k") = 31, py::arg("mod") = 100, py::arg("num_threads") = 1, + py::arg("include_hash") = false); + m.def("syncmer_numpy", &syncmer_numpy, + "A function that runs syncmer digestion and returns a numpy array", + py::arg("seq"), py::arg("k") = 31, py::arg("w") = 11, py::arg("num_threads") = 1, + py::arg("include_hash") = false); } diff --git a/pybind/digest_utils.hpp b/pybind/digest_utils.hpp index ed29a90..ba0bc2f 100644 --- a/pybind/digest_utils.hpp +++ b/pybind/digest_utils.hpp @@ -3,20 +3,25 @@ #include #include #include +#include +#include +#include + +namespace py = pybind11; std::variant, std::vector>> -window_minimizer(const std::string &seq, unsigned k, unsigned large_window, +window_minimizer_wrapper(const char *seq, size_t len, unsigned k, unsigned large_window, unsigned threads, bool include_hash = false) { if (threads == 1) { digest::WindowMin - digester(seq, k, large_window); + digester(seq, len, k, large_window); if (include_hash) { std::vector> output; - digester.roll_minimizer(seq.length(), output); + digester.roll_minimizer(len, output); return output; } else { std::vector output; - digester.roll_minimizer(seq.length(), output); + digester.roll_minimizer(len, output); return output; } } @@ -25,7 +30,7 @@ window_minimizer(const std::string &seq, unsigned k, unsigned large_window, if (include_hash) { std::vector>> output; digest::thread_out::thread_wind( - threads, output, seq, k, large_window); + threads, output, seq, len, k, large_window); size_t total_size = 0; for (const auto& vec : output) { @@ -40,7 +45,7 @@ window_minimizer(const std::string &seq, unsigned k, unsigned large_window, } else { std::vector> output; digest::thread_out::thread_wind( - threads, output, seq, k, large_window); + threads, output, seq, len, k, large_window); size_t total_size = 0; for (const auto& vec : output) { @@ -58,17 +63,17 @@ window_minimizer(const std::string &seq, unsigned k, unsigned large_window, } std::variant, std::vector>> -modimizer(const std::string &seq, unsigned k, uint32_t mod, +modimizer_wrapper(const char *seq, size_t len, unsigned k, uint32_t mod, unsigned threads, bool include_hash = false) { if (threads == 1) { - digest::ModMin digester(seq, k, mod); + digest::ModMin digester(seq, len, k, mod); if (include_hash) { std::vector> output; - digester.roll_minimizer(seq.length(), output); + digester.roll_minimizer(len, output); return output; } else { std::vector output; - digester.roll_minimizer(seq.length(), output); + digester.roll_minimizer(len, output); return output; } } @@ -76,7 +81,7 @@ modimizer(const std::string &seq, unsigned k, uint32_t mod, if (include_hash) { std::vector>> output; digest::thread_out::thread_mod( - threads, output, seq, k, mod); + threads, output, seq, len, k, mod); size_t total_size = 0; for (const auto& vec : output) { @@ -91,7 +96,7 @@ modimizer(const std::string &seq, unsigned k, uint32_t mod, } else { std::vector> output; digest::thread_out::thread_mod( - threads, output, seq, k, mod); + threads, output, seq, len, k, mod); size_t total_size = 0; for (const auto& vec : output) { @@ -109,18 +114,18 @@ modimizer(const std::string &seq, unsigned k, uint32_t mod, } std::variant, std::vector>> -syncmer(const std::string &seq, unsigned k, unsigned large_window, +syncmer_wrapper(const char *seq, size_t len, unsigned k, unsigned large_window, unsigned threads, bool include_hash = false) { if (threads == 1) { digest::Syncmer - digester(seq, k, large_window); + digester(seq, len, k, large_window); if (include_hash) { std::vector> output; - digester.roll_minimizer(seq.length(), output); + digester.roll_minimizer(len, output); return output; } else { std::vector output; - digester.roll_minimizer(seq.length(), output); + digester.roll_minimizer(len, output); return output; } } @@ -128,7 +133,7 @@ syncmer(const std::string &seq, unsigned k, unsigned large_window, if (include_hash) { std::vector>> output; digest::thread_out::thread_sync( - threads, output, seq, k, large_window); + threads, output, seq, len, k, large_window); size_t total_size = 0; for (const auto& vec : output) { @@ -143,7 +148,7 @@ syncmer(const std::string &seq, unsigned k, unsigned large_window, } else { std::vector> output; digest::thread_out::thread_sync( - threads, output, seq, k, large_window); + threads, output, seq, len, k, large_window); size_t total_size = 0; for (const auto& vec : output) { @@ -159,3 +164,108 @@ syncmer(const std::string &seq, unsigned k, unsigned large_window, } } } + +pybind11::array window_minimizer_numpy(py::bytes seq, unsigned k, unsigned large_window, + unsigned threads, bool include_hash = false) { + py::buffer_info info(py::buffer(seq).request()); + const char* data = static_cast(info.ptr); + size_t len = static_cast(info.size); + + std::variant, std::vector>> result; + { + py::gil_scoped_release release; + result = window_minimizer_wrapper(data, len, k, large_window, threads, include_hash); + } + if (std::holds_alternative>(result)) { + const auto& vec = std::get>(result); + return pybind11::array_t(vec.size(), vec.data()); + } else { + const auto& vec = std::get>>(result); + pybind11::array_t arr({static_cast(vec.size()), static_cast(2)}); + auto buf = arr.mutable_unchecked<2>(); + for (pybind11::ssize_t i = 0; i < static_cast(vec.size()); ++i) { + buf(i, 0) = vec[i].first; + buf(i, 1) = vec[i].second; + } + return arr; + } +} + +pybind11::array modimizer_numpy(py::bytes seq, unsigned k, uint32_t mod, + unsigned threads, bool include_hash = false) { + py::buffer_info info(py::buffer(seq).request()); + const char* data = static_cast(info.ptr); + size_t len = static_cast(info.size); + + std::variant, std::vector>> result; + { + py::gil_scoped_release release; + result = modimizer_wrapper(data, len, k, mod, threads, include_hash); + } + if (std::holds_alternative>(result)) { + const auto& vec = std::get>(result); + return pybind11::array_t(vec.size(), vec.data()); + } else { + const auto& vec = std::get>>(result); + pybind11::array_t arr({static_cast(vec.size()), static_cast(2)}); + auto buf = arr.mutable_unchecked<2>(); + for (pybind11::ssize_t i = 0; i < static_cast(vec.size()); ++i) { + buf(i, 0) = vec[i].first; + buf(i, 1) = vec[i].second; + } + return arr; + } +} + +pybind11::array syncmer_numpy(py::bytes seq, unsigned k, unsigned large_window, + unsigned threads, bool include_hash = false) { + py::buffer_info info(py::buffer(seq).request()); + const char* data = static_cast(info.ptr); + size_t len = static_cast(info.size); + + std::variant, std::vector>> result; + { + py::gil_scoped_release release; + result = syncmer_wrapper(data, len, k, large_window, threads, include_hash); + } + if (std::holds_alternative>(result)) { + const auto& vec = std::get>(result); + return pybind11::array_t(vec.size(), vec.data()); + } else { + const auto& vec = std::get>>(result); + pybind11::array_t arr({static_cast(vec.size()), static_cast(2)}); + auto buf = arr.mutable_unchecked<2>(); + for (pybind11::ssize_t i = 0; i < static_cast(vec.size()); ++i) { + buf(i, 0) = vec[i].first; + buf(i, 1) = vec[i].second; + } + return arr; + } +} + +std::variant, std::vector>> +window_minimizer(py::bytes seq, unsigned k, unsigned large_window, + unsigned threads, bool include_hash = false) { + py::buffer_info info(py::buffer(seq).request()); + const char* data = static_cast(info.ptr); + size_t len = static_cast(info.size); + return window_minimizer_wrapper(data, len, k, large_window, threads, include_hash); +} + +std::variant, std::vector>> +modimizer(py::bytes seq, unsigned k, uint32_t mod, + unsigned threads, bool include_hash = false) { + py::buffer_info info(py::buffer(seq).request()); + const char* data = static_cast(info.ptr); + size_t len = static_cast(info.size); + return modimizer_wrapper(data, len, k, mod, threads, include_hash); +} + +std::variant, std::vector>> +syncmer(py::bytes seq, unsigned k, unsigned large_window, + unsigned threads, bool include_hash = false) { + py::buffer_info info(py::buffer(seq).request()); + const char* data = static_cast(info.ptr); + size_t len = static_cast(info.size); + return syncmer_wrapper(data, len, k, large_window, threads, include_hash); +} From 3fa90aa17e4e2fe586d1fdd5981fdde826cf1dbc Mon Sep 17 00:00:00 2001 From: Vikram Shivakumar Date: Thu, 22 May 2025 15:25:07 -0400 Subject: [PATCH 5/6] added parallization --- README.md | 13 +++++++++++++ pybind/digest_utils.hpp | 21 ++++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index ab1c4fe..0898ab6 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,19 @@ Once installed, you can import and use the Digest library in Python: ['ACGTA', 'CGTAG', 'AGCTA', 'TAGCT', 'GCTGA', 'TTACA', 'TACAT', 'GTATG', 'GCAAG', 'TGATC', 'CGTAG', 'TAGTG', 'ATGCT'] ``` +We have also implemented parallel execution in the python library: +``` +>>> import timeit +>>> timeit.timeit('window_minimizer(seq, k=5, w=11)', +... setup='from digest import window_minimizer; seq = open("tests/bench/chrY.fa", "rb").read()', +... number=10) +19.85955114942044 +>>> timeit.timeit('window_minimizer(seq, k=5, w=11, num_threads=4)', +... setup='from digest import window_minimizer; seq = open("tests/bench/chrY.fa", "rb").read()', +... number=10) +10.327348310500383 +``` +