From e7c054f2c0e0bc6263362a37255984b183608a94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Wed, 13 Nov 2019 13:19:01 +0100 Subject: [PATCH 01/21] basic functionality --- .../callback_communicator_ts.hpp | 443 ++++++++++++++++++ tests/transport/CMakeLists.txt | 2 +- tests/transport/test_ts.cpp | 75 +++ 3 files changed, 519 insertions(+), 1 deletion(-) create mode 100644 include/ghex/transport_layer/callback_communicator_ts.hpp create mode 100644 tests/transport/test_ts.cpp diff --git a/include/ghex/transport_layer/callback_communicator_ts.hpp b/include/ghex/transport_layer/callback_communicator_ts.hpp new file mode 100644 index 0000000..1820ede --- /dev/null +++ b/include/ghex/transport_layer/callback_communicator_ts.hpp @@ -0,0 +1,443 @@ +/* + * GridTools + * + * Copyright (c) 2014-2019, ETH Zurich + * All rights reserved. + * + * Please, refer to the LICENSE file in the root directory. + * SPDX-License-Identifier: BSD-3-Clause + * + */ +#ifndef INCLUDED_GHEX_TL_CALLBACK_COMMUNICATOR_TS_HPP +#define INCLUDED_GHEX_TL_CALLBACK_COMMUNICATOR_TS_HPP + +#include +#include "./callback_communicator.hpp" + +namespace gridtools +{ + namespace ghex + { + namespace tl { + + /** callback_communicator is a class to dispatch send and receive operations to. Each operation can + * optionally be tied to a user defined callback function / function object. The payload of each + * send/receive operation must be a ghex::shared_message_buffer. + * This class will keep a (shallow) copy of each message, thus it is safe to release the message at + * the caller's site. + * + * The user defined callback must define void operator()(message_type,rank_type,tag_type), where + * message_type is a shared_message_buffer that can be cheaply copied/moved from within the callback body + * if needed. + * + * The communication must be explicitely progressed using the member function progress. + * + * An instance of this class is + * - a move-only. + * - not thread-safe + * + * If unprogressed operations remain at time of destruction, std::terminate will be called. + * + * @tparam Communicator underlying transport communicator + * @tparam Allocator allocator type used for allocating shared message buffers */ + template> + class callback_communicator_ts + { + public: // member types + + using communicator_type = Communicator; + using future_type = typename communicator_type::template future; + using tag_type = typename communicator_type::tag_type; + using rank_type = typename communicator_type::rank_type; + using allocator_type = Allocator; + using message_type = shared_message_buffer; + + + struct request_state + { + volatile bool m_ready = false; + bool is_ready() const noexcept + { + return m_ready; + } + }; + + struct request + { + std::shared_ptr m_request_state; + bool is_ready() const noexcept + { + return m_request_state->is_ready(); + } + }; + + private: // member types + + // necessary meta information for each send/receive operation + struct element_type + { + using message_arg_type = message_type; + std::function m_cb; + rank_type m_rank; + tag_type m_tag; + future_type m_future; + message_type m_msg; + //std::weak_ptr m_request_state; + std::shared_ptr m_request_state; + }; + using send_element_type = element_type; + using recv_element_type = element_type; + using lock_free_alloc_t = boost::lockfree::allocator>; + using send_container_type = boost::lockfree::queue>; + using recv_container_type = boost::lockfree::queue>; + + + private: // members + + communicator_type m_comm; + allocator_type m_alloc; + send_container_type m_sends; + recv_container_type m_recvs; + + public: // ctors + + /** @brief construct from a basic transport communicator + * @param comm the underlying transport communicator + * @param alloc the allocator instance to be used for constructing messages */ + callback_communicator_ts(const communicator_type& comm, allocator_type alloc = allocator_type{}) + : m_comm(comm), m_alloc(alloc), m_sends(128), m_recvs(128) {} + callback_communicator_ts(communicator_type&& comm, allocator_type alloc = allocator_type{}) + : m_comm(std::move(comm)), m_alloc(alloc), m_sends(128), m_recvs(128) {} + + callback_communicator_ts(const callback_communicator_ts&) = delete; + callback_communicator_ts(callback_communicator_ts&&) = default; + + /** terminates the program if the queues are not empty */ + ~callback_communicator_ts() + { + // consume all + } + + public: // queries + + auto rank() const noexcept { return m_comm.rank(); } + auto size() const noexcept { return m_comm.size(); } + + ///** returns the number of unprocessed send handles in the queue. */ + //std::size_t pending_sends() const { return m_sends.size(); } + ///** returns the number of unprocessed recv handles in the queue. */ + //std::size_t pending_recvs() const { return m_recvs.size(); } + + public: // get a message + + /** get a message with size n from the communicator */ + message_type make_message(std::size_t n = 0u) const + { + return { m_alloc, n }; + } + + public: // send + + /** @brief Send a message to a destination with the given tag and register a callback which will be + * invoked when the send operation is completed. + * @tparam CallBack User defined callback class which defines + * void Callback::operator()(message_type,rank_type,tag_type) + * @param msg Message to be sent + * @param dst Destination of the message + * @param tag Tag associated with the message + * @param cb Callback function object */ + template + request send(message_type msg, rank_type dst, tag_type tag, CallBack&& cb) + { + GHEX_CHECK_CALLBACK + request req{std::make_shared()}; + auto element_ptr = new send_element_type{std::forward(cb), dst, tag, future_type{}, std::move(msg), req.m_request_state}; + element_ptr->m_future = std::move( m_comm.send(element_ptr->m_msg, dst, tag) ); + while (!m_sends.push(element_ptr)) {} + return req; + } + + /** @brief Send a message without registering a callback. */ + request send(message_type msg, rank_type dst, tag_type tag) + { + return send(std::move(msg),dst,tag,[](message_type,rank_type,tag_type){}); + } + + /** @brief Send a message to multiple destinations with the same rank an register an associated callback. + * @tparam Neighs Range over rank_type + * @tparam CallBack User defined callback class which defines + * void Callback::operator()(rank_type,tag_type,message_type) + * @param msg Message to be sent + * @param neighs Range of destination ranks + * @param tag Tag associated with the message + * @param cb Callback function object */ + template + std::vector send_multi(message_type msg, Neighs const &neighs, int tag, CallBack&& cb) + { + GHEX_CHECK_CALLBACK + using cb_type = typename std::remove_cv::type>::type; + auto cb_ptr = std::make_shared( std::forward(cb) ); + std::vector reqs; + for (auto id : neighs) + reqs.push_back( send(msg, id, tag, + [cb_ptr](message_type m, rank_type r, tag_type t) + { + // if (cb_ptr->use_count == 1) + (*cb_ptr)(std::move(m),r,t); + }) ); + } + + /** @brief Send a message to multiple destinations without registering a callback */ + template + std::vector send_multi(message_type msg, Neighs const &neighs, int tag) + { + return send_multi(std::move(msg),neighs,tag,[](message_type, rank_type,tag_type){}); + } + + public: // receive + + /** @brief Receive a message from a source rank with the given tag and register a callback which will + * be invoked when the receive operation is completed. + * @tparam CallBack User defined callback class which defines + * void Callback::operator()(message_type,rank_type,tag_type) + * @param msg Message where data will be received + * @param src Source of the message + * @param tag Tag associated with the message + * @param cb Callback function object */ + template + request recv(message_type msg, rank_type src, tag_type tag, CallBack&& cb) + { + GHEX_CHECK_CALLBACK + request req{std::make_shared()}; + auto element_ptr = new recv_element_type{std::forward(cb), src, tag, future_type{}, std::move(msg), req.m_request_state}; + element_ptr->m_future = std::move( m_comm.recv(element_ptr->m_msg, src, tag) ); + while (!m_recvs.push(element_ptr)) {} + return req; + } + + /** @brief Receive a message with length size (storage is allocated accordingly). */ + template + request recv(std::size_t size, rank_type src, tag_type tag, CallBack&& cb) + { + return recv(message_type{size,m_alloc}, src, tag, std::forward(cb)); + } + + /** @brief Receive a message without registering a callback. */ + request recv(message_type msg, rank_type src, tag_type tag) + { + return recv(std::move(msg),src,tag,[](message_type,rank_type,tag_type){}); + } + + public: // progress + + /** @brief Progress the communication. This function checks whether any receive and send operation is + * completed and calls the associated callback (if it exists). + * @return returns false if all registered operations have been completed.*/ + std::size_t progress() + { + std::size_t num_completed = 0u; + num_completed += run(m_sends); + num_completed += run(m_recvs); + return num_completed; + } + + ///** @brief Progress the communication. This function checks whether any receive and send operation is + // * completed and calls the associated callback (if it exists). When all registered operations have + // * been completed this function checks for further unexpected incoming messages which will be received + // * in a newly allocated shared_message_buffer and returned to the user through invocation of the + // * provided callback. + // * @tparam CallBack User defined callback class which defines + // * void Callback::operator()(message_type,rank_type,tag_type) + // * @param unexpected_cb callback function object + // * @return returns false if all registered operations have been completed. */ + //template + //bool progress(CallBack&& unexpected_cb) + //{ + // GHEX_CHECK_CALLBACK + // const auto not_completed = progress(); + // if (!not_completed) + // { + // if (auto o = m_comm.template recv_any_source_any_tag(m_alloc)) + // { + // auto t = o->get(); + // unexpected_cb(std::move(std::get<2>(t)),std::get<0>(t),std::get<1>(t)); + // } + // } + // return not_completed; + //} + + public: // attach/detach + + ///** @brief Deregister a send operation from this object which matches the given destination and tag. + // * If such operation is found the callback will be discared and the message will be returned to the + // * caller together with a future on which completion can be awaited. + // * @param dst Destination of the message + // * @param tag Tag associated with the message + // * @return Either a pair of future and message or none */ + //boost::optional> detach_send(rank_type dst, tag_type tag) + //{ + // return detach(dst,tag,m_sends); + //} + + ///** @brief Deregister a receive operation from this object which matches the given destination and tag. + // * If such operation is found the callback will be discared and the message will be returned to the + // * caller together with a future on which completion can be awaited. + // * @param src Source of the message + // * @param tag Tag associated with the message + // * @return Either a pair of future and message or none */ + //boost::optional> detach_recv(rank_type src, tag_type tag) + //{ + // return detach(src,tag,m_recvs); + //} + + ///** @brief Register a send operation with this object with future, destination and tag and associate it + // * with a callback. This is the inverse operation of detach. Note, that attaching of a send operation + // * originating from the underlying basic communicator is supported. + // * @tparam CallBack User defined callback class which defines + // * void Callback::operator()(message_type,rank_type,tag_type) + // * @param fut future object + // * @param msg message data + // * @param dst destination rank + // * @param tag associated tag + // * @param cb Callback function object */ + //template + //void attach_send(future_type&& fut, message_type msg, rank_type dst, tag_type tag, CallBack&& cb) + //{ + // GHEX_CHECK_CALLBACK + // auto ptr = new send_element_type{ std::forward(cb), dst, tag, std::move(fut), std::move(msg) }; + // while(!m_sends.push(ptr)) {} + //} + + ///** @brief Register a send without associated callback. */ + //void attach_send(future_type&& fut, message_type msg, rank_type dst, tag_type tag) + //{ + // auto ptr = new send_element_type{ [](message_type,rank_type,tag_type){}, dst, tag, std::move(fut), std::move(msg) }; + // while(!m_sends.push(ptr)) {} + //} + + ///** @brief Register a receive operation with this object with future, source and tag and associate it + // * with a callback. This is the inverse operation of detach. Note, that attaching of a recv operation + // * originating from the underlying basic communicator is supported. + // * @tparam CallBack User defined callback class which defines + // * void Callback::operator()(message_type,rank_type,tag_type) + // * @param fut future object + // * @param msg message data + // * @param dst source rank + // * @param tag associated tag + // * @param cb Callback function object */ + //template + //void attach_recv(future_type&& fut, message_type msg, rank_type src, tag_type tag, CallBack&& cb) + //{ + // GHEX_CHECK_CALLBACK + // auto ptr = new recv_element_type{ std::forward(cb), src, tag, std::move(fut), std::move(msg) }; + // while(!m_recvs.push(ptr)) {} + //} + + ///** @brief Register a receive without associated callback. */ + //void attach_recv(future_type&& fut, message_type msg, rank_type src, tag_type tag) + //{ + // auto ptr = new recv_element_type{ [](message_type,rank_type,tag_type){}, src, tag, std::move(fut), std::move(msg) }; + // while(!m_recvs.push(ptr)) {} + //} + + public: // cancel + + ///** @brief Deregister all operations from this object and attempt to cancel the communication. + // * @return true if cancelling was successful. */ + //bool cancel() + //{ + // const auto s = cancel_sends(); + // const auto r = cancel_recvs(); + // return s && r; + //} + + ///** @brief Deregister all send operations from this object and attempt to cancel the communication. + // * @return true if cancelling was successful. */ + //bool cancel_sends() { return cancel(m_sends); } + + ///** @brief Deregister all recv operations from this object and attempt to cancel the communication. + // * @return true if cancelling was successful. */ + //bool cancel_recvs() { return cancel(m_recvs); } + + private: // implementation + + template + std::size_t run(Queue& d) + { + send_element_type* ptr = nullptr; + if (d.pop(ptr)) + { + if (ptr->m_future.ready()) + { + // call the callback + ptr->m_cb(std::move(ptr->m_msg), ptr->m_rank, ptr->m_tag); + //ptr->m_request_state.lock()->m_ready = true; + ptr->m_request_state->m_ready = true; + delete ptr; + return 1u; + } + else + { + while( !d.push(ptr) ) {} + return 0u; + } + } + else return 0u; + } + + //template + //boost::optional> detach(rank_type rank, tag_type tag, Queue& d) + //{ + // std::vector d2; + // send_element_type* found_ptr = nullptr; + // d.consume_all( + // [rank, tag,&d2,&found_ptr](auto ptr) + // { + // if (ptr->m_rank == rank && ptr->m_tag == tag) + // { + // found_ptr = ptr; + // } + // else + // { + // d2.push_back(ptr); + // } + // }); + // for (auto ptr : d2) + // while(!d.push(ptr)){} + + // if (found_ptr) + // { + // auto cb = std::move(found_ptr->m_cb); + // auto fut = std::move(found_ptr->m_future); + // auto msg = std::move(found_ptr->m_msg); + // delete found_ptr; + // return std::pair{std::move(fut), std::move(msg)}; + // } + // return boost::none; + //} + + //template + //bool cancel(Queue& d) + //{ + // bool result = true; + // d.consume_all( + // [&result](auto ptr) + // { + // auto& fut = ptr->m_future; + // if (!fut.ready()) + // result = result && fut.cancel(); + // //else + // // fut.wait(); + // delete ptr; + // } + // ); + // return result; + //} + }; + + } // namespace tl + } // namespace ghex +}// namespace gridtools + +#endif/*INCLUDED_GHEX_TL_CALLBACK_COMMUNICATOR_TS_HPP */ + diff --git a/tests/transport/CMakeLists.txt b/tests/transport/CMakeLists.txt index 6052534..15fb46b 100644 --- a/tests/transport/CMakeLists.txt +++ b/tests/transport/CMakeLists.txt @@ -1,5 +1,5 @@ -set(_tests test_low_level test_low_level_x test_send_multi test_cancel_request test_attach_detach) +set(_tests test_low_level test_low_level_x test_send_multi test_cancel_request test_attach_detach test_ts) foreach(t_ ${_tests}) diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp new file mode 100644 index 0000000..77efc17 --- /dev/null +++ b/tests/transport/test_ts.cpp @@ -0,0 +1,75 @@ +/* + * GridTools + * + * Copyright (c) 2014-2019, ETH Zurich + * All rights reserved. + * + * Please, refer to the LICENSE file in the root directory. + * SPDX-License-Identifier: BSD-3-Clause + * + */ +#include +#include +#include +#include +#include +#include +#include + +#include + +using comm_t = gridtools::ghex::tl::communicator; +using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts>; + +std::atomic num_completed; + +void test1(std::size_t num_progress_threads) +{ + comm_t comm; + callback_comm_t cb_comm(comm); + + const int rank = cb_comm.rank(); + const int r_rank = (rank+1)%cb_comm.size(); + const int l_rank = (rank+cb_comm.size()-1)%cb_comm.size(); + + callback_comm_t::message_type send_msg(4096); + callback_comm_t::message_type recv_msg(4096); + callback_comm_t::request send_req; + callback_comm_t::request recv_req; + + std::size_t num_requests = 2; + + auto send_recv_func = + [&cb_comm,&recv_msg,&send_msg,&recv_req,&send_req,l_rank,r_rank]() + { + /*recv_req =*/ cb_comm.recv(recv_msg,l_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "received!" << std::endl; }); + /*send_req =*/ cb_comm.send(send_msg,r_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "sent!" << std::endl; }); + //while ( !(recv_req.is_ready() && send_req.is_ready()) ) {} + }; + + auto progress_func = + [&cb_comm, num_requests]() + { + while(num_completed < num_requests) + { + num_completed += cb_comm.progress(); + } + }; + + std::vector progress_threads; + progress_threads.reserve(num_progress_threads+1); + + for (std::size_t i=0; i Date: Wed, 13 Nov 2019 13:46:15 +0100 Subject: [PATCH 02/21] better test --- tests/transport/test_ts.cpp | 41 ++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 77efc17..81e003e 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -23,8 +23,10 @@ using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts num_completed; -void test1(std::size_t num_progress_threads) +void test1(std::size_t num_progress_threads, bool wait) { + num_completed.store(0u); + comm_t comm; callback_comm_t cb_comm(comm); @@ -39,14 +41,24 @@ void test1(std::size_t num_progress_threads) std::size_t num_requests = 2; - auto send_recv_func = + // lambda which places send and receive calls + auto send_recv_func_nowait = + [&cb_comm,&recv_msg,&send_msg,l_rank,r_rank]() + { + cb_comm.recv(recv_msg,l_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "received!" << std::endl; }); + cb_comm.send(send_msg,r_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "sent!" << std::endl; }); + }; + + // lambda which places send and receive calls and waits for completion + auto send_recv_func_wait = [&cb_comm,&recv_msg,&send_msg,&recv_req,&send_req,l_rank,r_rank]() { - /*recv_req =*/ cb_comm.recv(recv_msg,l_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "received!" << std::endl; }); - /*send_req =*/ cb_comm.send(send_msg,r_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "sent!" << std::endl; }); - //while ( !(recv_req.is_ready() && send_req.is_ready()) ) {} + recv_req = cb_comm.recv(recv_msg,l_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "received!" << std::endl; }); + send_req = cb_comm.send(send_msg,r_rank,0,[](callback_comm_t::message_type, int, int) { std::cout << "sent!" << std::endl; }); + while ( !(recv_req.is_ready() && send_req.is_ready()) ) {} }; + // lambda which progresses the queues auto progress_func = [&cb_comm, num_requests]() { @@ -56,20 +68,25 @@ void test1(std::size_t num_progress_threads) } }; - std::vector progress_threads; - progress_threads.reserve(num_progress_threads+1); + std::vector threads; + threads.reserve(num_progress_threads+1); for (std::size_t i=0; i Date: Wed, 13 Nov 2019 14:31:38 +0100 Subject: [PATCH 03/21] multiple comm threads, multiple progress threads --- tests/transport/test_ts.cpp | 64 +++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 81e003e..0c25e30 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -23,7 +23,7 @@ using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts num_completed; -void test1(std::size_t num_progress_threads, bool wait) +void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool wait) { num_completed.store(0u); @@ -34,27 +34,45 @@ void test1(std::size_t num_progress_threads, bool wait) const int r_rank = (rank+1)%cb_comm.size(); const int l_rank = (rank+cb_comm.size()-1)%cb_comm.size(); - callback_comm_t::message_type send_msg(4096); - callback_comm_t::message_type recv_msg(4096); - callback_comm_t::request send_req; - callback_comm_t::request recv_req; + using msg_type = callback_comm_t::message_type; + using req_type = callback_comm_t::request; - std::size_t num_requests = 2; + std::vector send_msgs; + std::vector recv_msgs; + std::vector send_reqs; + std::vector recv_reqs; + for (std::size_t i=0; i threads; - threads.reserve(num_progress_threads+1); + threads.reserve(num_progress_threads+num_comm_threads); for (std::size_t i=0; i Date: Wed, 13 Nov 2019 14:37:42 +0100 Subject: [PATCH 04/21] simplified test --- tests/transport/test_ts.cpp | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 0c25e30..9327a6c 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -35,18 +35,13 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool const int l_rank = (rank+cb_comm.size()-1)%cb_comm.size(); using msg_type = callback_comm_t::message_type; - using req_type = callback_comm_t::request; std::vector send_msgs; std::vector recv_msgs; - std::vector send_reqs; - std::vector recv_reqs; for (std::size_t i=0; i Date: Wed, 13 Nov 2019 14:57:20 +0100 Subject: [PATCH 05/21] using thread-local basic communicators --- .../callback_communicator_ts.hpp | 102 ++++++++++-------- tests/transport/test_ts.cpp | 16 +-- 2 files changed, 66 insertions(+), 52 deletions(-) diff --git a/include/ghex/transport_layer/callback_communicator_ts.hpp b/include/ghex/transport_layer/callback_communicator_ts.hpp index 1820ede..b842c13 100644 --- a/include/ghex/transport_layer/callback_communicator_ts.hpp +++ b/include/ghex/transport_layer/callback_communicator_ts.hpp @@ -147,52 +147,57 @@ namespace gridtools * @param tag Tag associated with the message * @param cb Callback function object */ template - request send(message_type msg, rank_type dst, tag_type tag, CallBack&& cb) + request send(communicator_type& comm, message_type msg, rank_type dst, tag_type tag, CallBack&& cb) { GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto element_ptr = new send_element_type{std::forward(cb), dst, tag, future_type{}, std::move(msg), req.m_request_state}; - element_ptr->m_future = std::move( m_comm.send(element_ptr->m_msg, dst, tag) ); + element_ptr->m_future = std::move( comm.send(element_ptr->m_msg, dst, tag) ); while (!m_sends.push(element_ptr)) {} return req; } - - /** @brief Send a message without registering a callback. */ - request send(message_type msg, rank_type dst, tag_type tag) + template + request send(message_type msg, rank_type dst, tag_type tag, CallBack&& cb) { - return send(std::move(msg),dst,tag,[](message_type,rank_type,tag_type){}); + return send(m_comm, msg, dst, tag, std::forward(cb)); } - /** @brief Send a message to multiple destinations with the same rank an register an associated callback. - * @tparam Neighs Range over rank_type - * @tparam CallBack User defined callback class which defines - * void Callback::operator()(rank_type,tag_type,message_type) - * @param msg Message to be sent - * @param neighs Range of destination ranks - * @param tag Tag associated with the message - * @param cb Callback function object */ - template - std::vector send_multi(message_type msg, Neighs const &neighs, int tag, CallBack&& cb) - { - GHEX_CHECK_CALLBACK - using cb_type = typename std::remove_cv::type>::type; - auto cb_ptr = std::make_shared( std::forward(cb) ); - std::vector reqs; - for (auto id : neighs) - reqs.push_back( send(msg, id, tag, - [cb_ptr](message_type m, rank_type r, tag_type t) - { - // if (cb_ptr->use_count == 1) - (*cb_ptr)(std::move(m),r,t); - }) ); - } + ///** @brief Send a message without registering a callback. */ + //request send(message_type msg, rank_type dst, tag_type tag) + //{ + // return send(std::move(msg),dst,tag,[](message_type,rank_type,tag_type){}); + //} - /** @brief Send a message to multiple destinations without registering a callback */ - template - std::vector send_multi(message_type msg, Neighs const &neighs, int tag) - { - return send_multi(std::move(msg),neighs,tag,[](message_type, rank_type,tag_type){}); - } + ///** @brief Send a message to multiple destinations with the same rank an register an associated callback. + // * @tparam Neighs Range over rank_type + // * @tparam CallBack User defined callback class which defines + // * void Callback::operator()(rank_type,tag_type,message_type) + // * @param msg Message to be sent + // * @param neighs Range of destination ranks + // * @param tag Tag associated with the message + // * @param cb Callback function object */ + //template + //std::vector send_multi(message_type msg, Neighs const &neighs, int tag, CallBack&& cb) + //{ + // GHEX_CHECK_CALLBACK + // using cb_type = typename std::remove_cv::type>::type; + // auto cb_ptr = std::make_shared( std::forward(cb) ); + // std::vector reqs; + // for (auto id : neighs) + // reqs.push_back( send(msg, id, tag, + // [cb_ptr](message_type m, rank_type r, tag_type t) + // { + // // if (cb_ptr->use_count == 1) + // (*cb_ptr)(std::move(m),r,t); + // }) ); + //} + + ///** @brief Send a message to multiple destinations without registering a callback */ + //template + //std::vector send_multi(message_type msg, Neighs const &neighs, int tag) + //{ + // return send_multi(std::move(msg),neighs,tag,[](message_type, rank_type,tag_type){}); + //} public: // receive @@ -205,28 +210,33 @@ namespace gridtools * @param tag Tag associated with the message * @param cb Callback function object */ template - request recv(message_type msg, rank_type src, tag_type tag, CallBack&& cb) + request recv(communicator_type& comm, message_type msg, rank_type src, tag_type tag, CallBack&& cb) { GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto element_ptr = new recv_element_type{std::forward(cb), src, tag, future_type{}, std::move(msg), req.m_request_state}; - element_ptr->m_future = std::move( m_comm.recv(element_ptr->m_msg, src, tag) ); + element_ptr->m_future = std::move( comm.recv(element_ptr->m_msg, src, tag) ); while (!m_recvs.push(element_ptr)) {} return req; } - - /** @brief Receive a message with length size (storage is allocated accordingly). */ template - request recv(std::size_t size, rank_type src, tag_type tag, CallBack&& cb) + request recv(message_type msg, rank_type src, tag_type tag, CallBack&& cb) { - return recv(message_type{size,m_alloc}, src, tag, std::forward(cb)); + return recv(m_comm, msg, src, tag, std::forward(cb)); } - /** @brief Receive a message without registering a callback. */ - request recv(message_type msg, rank_type src, tag_type tag) - { - return recv(std::move(msg),src,tag,[](message_type,rank_type,tag_type){}); - } + ///** @brief Receive a message with length size (storage is allocated accordingly). */ + //template + //request recv(std::size_t size, rank_type src, tag_type tag, CallBack&& cb) + //{ + // return recv(message_type{size,m_alloc}, src, tag, std::forward(cb)); + //} + + ///** @brief Receive a message without registering a callback. */ + //request recv(message_type msg, rank_type src, tag_type tag) + //{ + // return recv(std::move(msg),src,tag,[](message_type,rank_type,tag_type){}); + //} public: // progress diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 9327a6c..eaa7d60 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -38,34 +38,36 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool std::vector send_msgs; std::vector recv_msgs; + std::vector comms; for (std::size_t i=0; i Date: Wed, 13 Nov 2019 17:15:11 +0100 Subject: [PATCH 06/21] type erased version --- .../callback_communicator_ts.hpp | 345 ++---------------- tests/transport/test_ts.cpp | 10 +- 2 files changed, 45 insertions(+), 310 deletions(-) diff --git a/include/ghex/transport_layer/callback_communicator_ts.hpp b/include/ghex/transport_layer/callback_communicator_ts.hpp index b842c13..6c14d78 100644 --- a/include/ghex/transport_layer/callback_communicator_ts.hpp +++ b/include/ghex/transport_layer/callback_communicator_ts.hpp @@ -20,35 +20,13 @@ namespace gridtools { namespace tl { - /** callback_communicator is a class to dispatch send and receive operations to. Each operation can - * optionally be tied to a user defined callback function / function object. The payload of each - * send/receive operation must be a ghex::shared_message_buffer. - * This class will keep a (shallow) copy of each message, thus it is safe to release the message at - * the caller's site. - * - * The user defined callback must define void operator()(message_type,rank_type,tag_type), where - * message_type is a shared_message_buffer that can be cheaply copied/moved from within the callback body - * if needed. - * - * The communication must be explicitely progressed using the member function progress. - * - * An instance of this class is - * - a move-only. - * - not thread-safe - * - * If unprogressed operations remain at time of destruction, std::terminate will be called. - * - * @tparam Communicator underlying transport communicator - * @tparam Allocator allocator type used for allocating shared message buffers */ - template> + template> class callback_communicator_ts { public: // member types - using communicator_type = Communicator; - using future_type = typename communicator_type::template future; - using tag_type = typename communicator_type::tag_type; - using rank_type = typename communicator_type::rank_type; + using tag_type = int; + using rank_type = int; using allocator_type = Allocator; using message_type = shared_message_buffer; @@ -73,6 +51,32 @@ namespace gridtools private: // member types + struct any_future + { + struct iface + { + virtual bool ready() = 0; + virtual ~iface() {} + }; + template + struct holder : public iface + { + Future m_future; + holder() = default; + holder(Future&& fut): m_future{std::move(fut)} {} + bool ready() override { return m_future.ready(); } + }; + + std::unique_ptr m_ptr; + + template + any_future(Future&& fut) + : m_ptr{std::make_unique>(std::move(fut))} + {} + + bool ready() { return m_ptr->ready(); } + }; + // necessary meta information for each send/receive operation struct element_type { @@ -80,9 +84,8 @@ namespace gridtools std::function m_cb; rank_type m_rank; tag_type m_tag; - future_type m_future; + any_future m_future; message_type m_msg; - //std::weak_ptr m_request_state; std::shared_ptr m_request_state; }; using send_element_type = element_type; @@ -94,7 +97,6 @@ namespace gridtools private: // members - communicator_type m_comm; allocator_type m_alloc; send_container_type m_sends; recv_container_type m_recvs; @@ -104,10 +106,8 @@ namespace gridtools /** @brief construct from a basic transport communicator * @param comm the underlying transport communicator * @param alloc the allocator instance to be used for constructing messages */ - callback_communicator_ts(const communicator_type& comm, allocator_type alloc = allocator_type{}) - : m_comm(comm), m_alloc(alloc), m_sends(128), m_recvs(128) {} - callback_communicator_ts(communicator_type&& comm, allocator_type alloc = allocator_type{}) - : m_comm(std::move(comm)), m_alloc(alloc), m_sends(128), m_recvs(128) {} + callback_communicator_ts(allocator_type alloc = allocator_type{}) + : m_alloc(alloc), m_sends(128), m_recvs(128) {} callback_communicator_ts(const callback_communicator_ts&) = delete; callback_communicator_ts(callback_communicator_ts&&) = default; @@ -117,132 +117,35 @@ namespace gridtools { // consume all } - - public: // queries - - auto rank() const noexcept { return m_comm.rank(); } - auto size() const noexcept { return m_comm.size(); } - - ///** returns the number of unprocessed send handles in the queue. */ - //std::size_t pending_sends() const { return m_sends.size(); } - ///** returns the number of unprocessed recv handles in the queue. */ - //std::size_t pending_recvs() const { return m_recvs.size(); } - - public: // get a message - - /** get a message with size n from the communicator */ - message_type make_message(std::size_t n = 0u) const - { - return { m_alloc, n }; - } public: // send - /** @brief Send a message to a destination with the given tag and register a callback which will be - * invoked when the send operation is completed. - * @tparam CallBack User defined callback class which defines - * void Callback::operator()(message_type,rank_type,tag_type) - * @param msg Message to be sent - * @param dst Destination of the message - * @param tag Tag associated with the message - * @param cb Callback function object */ - template - request send(communicator_type& comm, message_type msg, rank_type dst, tag_type tag, CallBack&& cb) + template + request send(Comm& comm, message_type msg, rank_type dst, tag_type tag, CallBack&& cb) { GHEX_CHECK_CALLBACK request req{std::make_shared()}; - auto element_ptr = new send_element_type{std::forward(cb), dst, tag, future_type{}, std::move(msg), req.m_request_state}; - element_ptr->m_future = std::move( comm.send(element_ptr->m_msg, dst, tag) ); + auto fut = comm.send(msg,dst,tag); + auto element_ptr = new send_element_type{std::forward(cb), dst, tag, std::move(fut), std::move(msg), req.m_request_state}; while (!m_sends.push(element_ptr)) {} return req; } - template - request send(message_type msg, rank_type dst, tag_type tag, CallBack&& cb) - { - return send(m_comm, msg, dst, tag, std::forward(cb)); - } - - ///** @brief Send a message without registering a callback. */ - //request send(message_type msg, rank_type dst, tag_type tag) - //{ - // return send(std::move(msg),dst,tag,[](message_type,rank_type,tag_type){}); - //} - - ///** @brief Send a message to multiple destinations with the same rank an register an associated callback. - // * @tparam Neighs Range over rank_type - // * @tparam CallBack User defined callback class which defines - // * void Callback::operator()(rank_type,tag_type,message_type) - // * @param msg Message to be sent - // * @param neighs Range of destination ranks - // * @param tag Tag associated with the message - // * @param cb Callback function object */ - //template - //std::vector send_multi(message_type msg, Neighs const &neighs, int tag, CallBack&& cb) - //{ - // GHEX_CHECK_CALLBACK - // using cb_type = typename std::remove_cv::type>::type; - // auto cb_ptr = std::make_shared( std::forward(cb) ); - // std::vector reqs; - // for (auto id : neighs) - // reqs.push_back( send(msg, id, tag, - // [cb_ptr](message_type m, rank_type r, tag_type t) - // { - // // if (cb_ptr->use_count == 1) - // (*cb_ptr)(std::move(m),r,t); - // }) ); - //} - - ///** @brief Send a message to multiple destinations without registering a callback */ - //template - //std::vector send_multi(message_type msg, Neighs const &neighs, int tag) - //{ - // return send_multi(std::move(msg),neighs,tag,[](message_type, rank_type,tag_type){}); - //} public: // receive - /** @brief Receive a message from a source rank with the given tag and register a callback which will - * be invoked when the receive operation is completed. - * @tparam CallBack User defined callback class which defines - * void Callback::operator()(message_type,rank_type,tag_type) - * @param msg Message where data will be received - * @param src Source of the message - * @param tag Tag associated with the message - * @param cb Callback function object */ - template - request recv(communicator_type& comm, message_type msg, rank_type src, tag_type tag, CallBack&& cb) + template + request recv(Comm& comm, message_type msg, rank_type src, tag_type tag, CallBack&& cb) { GHEX_CHECK_CALLBACK request req{std::make_shared()}; - auto element_ptr = new recv_element_type{std::forward(cb), src, tag, future_type{}, std::move(msg), req.m_request_state}; - element_ptr->m_future = std::move( comm.recv(element_ptr->m_msg, src, tag) ); + auto fut = comm.recv(msg,src,tag); + auto element_ptr = new recv_element_type{std::forward(cb), src, tag, std::move(fut), std::move(msg), req.m_request_state}; while (!m_recvs.push(element_ptr)) {} return req; } - template - request recv(message_type msg, rank_type src, tag_type tag, CallBack&& cb) - { - return recv(m_comm, msg, src, tag, std::forward(cb)); - } - - ///** @brief Receive a message with length size (storage is allocated accordingly). */ - //template - //request recv(std::size_t size, rank_type src, tag_type tag, CallBack&& cb) - //{ - // return recv(message_type{size,m_alloc}, src, tag, std::forward(cb)); - //} - - ///** @brief Receive a message without registering a callback. */ - //request recv(message_type msg, rank_type src, tag_type tag) - //{ - // return recv(std::move(msg),src,tag,[](message_type,rank_type,tag_type){}); - //} public: // progress - /** @brief Progress the communication. This function checks whether any receive and send operation is - * completed and calls the associated callback (if it exists). - * @return returns false if all registered operations have been completed.*/ std::size_t progress() { std::size_t num_completed = 0u; @@ -251,124 +154,6 @@ namespace gridtools return num_completed; } - ///** @brief Progress the communication. This function checks whether any receive and send operation is - // * completed and calls the associated callback (if it exists). When all registered operations have - // * been completed this function checks for further unexpected incoming messages which will be received - // * in a newly allocated shared_message_buffer and returned to the user through invocation of the - // * provided callback. - // * @tparam CallBack User defined callback class which defines - // * void Callback::operator()(message_type,rank_type,tag_type) - // * @param unexpected_cb callback function object - // * @return returns false if all registered operations have been completed. */ - //template - //bool progress(CallBack&& unexpected_cb) - //{ - // GHEX_CHECK_CALLBACK - // const auto not_completed = progress(); - // if (!not_completed) - // { - // if (auto o = m_comm.template recv_any_source_any_tag(m_alloc)) - // { - // auto t = o->get(); - // unexpected_cb(std::move(std::get<2>(t)),std::get<0>(t),std::get<1>(t)); - // } - // } - // return not_completed; - //} - - public: // attach/detach - - ///** @brief Deregister a send operation from this object which matches the given destination and tag. - // * If such operation is found the callback will be discared and the message will be returned to the - // * caller together with a future on which completion can be awaited. - // * @param dst Destination of the message - // * @param tag Tag associated with the message - // * @return Either a pair of future and message or none */ - //boost::optional> detach_send(rank_type dst, tag_type tag) - //{ - // return detach(dst,tag,m_sends); - //} - - ///** @brief Deregister a receive operation from this object which matches the given destination and tag. - // * If such operation is found the callback will be discared and the message will be returned to the - // * caller together with a future on which completion can be awaited. - // * @param src Source of the message - // * @param tag Tag associated with the message - // * @return Either a pair of future and message or none */ - //boost::optional> detach_recv(rank_type src, tag_type tag) - //{ - // return detach(src,tag,m_recvs); - //} - - ///** @brief Register a send operation with this object with future, destination and tag and associate it - // * with a callback. This is the inverse operation of detach. Note, that attaching of a send operation - // * originating from the underlying basic communicator is supported. - // * @tparam CallBack User defined callback class which defines - // * void Callback::operator()(message_type,rank_type,tag_type) - // * @param fut future object - // * @param msg message data - // * @param dst destination rank - // * @param tag associated tag - // * @param cb Callback function object */ - //template - //void attach_send(future_type&& fut, message_type msg, rank_type dst, tag_type tag, CallBack&& cb) - //{ - // GHEX_CHECK_CALLBACK - // auto ptr = new send_element_type{ std::forward(cb), dst, tag, std::move(fut), std::move(msg) }; - // while(!m_sends.push(ptr)) {} - //} - - ///** @brief Register a send without associated callback. */ - //void attach_send(future_type&& fut, message_type msg, rank_type dst, tag_type tag) - //{ - // auto ptr = new send_element_type{ [](message_type,rank_type,tag_type){}, dst, tag, std::move(fut), std::move(msg) }; - // while(!m_sends.push(ptr)) {} - //} - - ///** @brief Register a receive operation with this object with future, source and tag and associate it - // * with a callback. This is the inverse operation of detach. Note, that attaching of a recv operation - // * originating from the underlying basic communicator is supported. - // * @tparam CallBack User defined callback class which defines - // * void Callback::operator()(message_type,rank_type,tag_type) - // * @param fut future object - // * @param msg message data - // * @param dst source rank - // * @param tag associated tag - // * @param cb Callback function object */ - //template - //void attach_recv(future_type&& fut, message_type msg, rank_type src, tag_type tag, CallBack&& cb) - //{ - // GHEX_CHECK_CALLBACK - // auto ptr = new recv_element_type{ std::forward(cb), src, tag, std::move(fut), std::move(msg) }; - // while(!m_recvs.push(ptr)) {} - //} - - ///** @brief Register a receive without associated callback. */ - //void attach_recv(future_type&& fut, message_type msg, rank_type src, tag_type tag) - //{ - // auto ptr = new recv_element_type{ [](message_type,rank_type,tag_type){}, src, tag, std::move(fut), std::move(msg) }; - // while(!m_recvs.push(ptr)) {} - //} - - public: // cancel - - ///** @brief Deregister all operations from this object and attempt to cancel the communication. - // * @return true if cancelling was successful. */ - //bool cancel() - //{ - // const auto s = cancel_sends(); - // const auto r = cancel_recvs(); - // return s && r; - //} - - ///** @brief Deregister all send operations from this object and attempt to cancel the communication. - // * @return true if cancelling was successful. */ - //bool cancel_sends() { return cancel(m_sends); } - - ///** @brief Deregister all recv operations from this object and attempt to cancel the communication. - // * @return true if cancelling was successful. */ - //bool cancel_recvs() { return cancel(m_recvs); } - private: // implementation template @@ -381,7 +166,6 @@ namespace gridtools { // call the callback ptr->m_cb(std::move(ptr->m_msg), ptr->m_rank, ptr->m_tag); - //ptr->m_request_state.lock()->m_ready = true; ptr->m_request_state->m_ready = true; delete ptr; return 1u; @@ -394,55 +178,6 @@ namespace gridtools } else return 0u; } - - //template - //boost::optional> detach(rank_type rank, tag_type tag, Queue& d) - //{ - // std::vector d2; - // send_element_type* found_ptr = nullptr; - // d.consume_all( - // [rank, tag,&d2,&found_ptr](auto ptr) - // { - // if (ptr->m_rank == rank && ptr->m_tag == tag) - // { - // found_ptr = ptr; - // } - // else - // { - // d2.push_back(ptr); - // } - // }); - // for (auto ptr : d2) - // while(!d.push(ptr)){} - - // if (found_ptr) - // { - // auto cb = std::move(found_ptr->m_cb); - // auto fut = std::move(found_ptr->m_future); - // auto msg = std::move(found_ptr->m_msg); - // delete found_ptr; - // return std::pair{std::move(fut), std::move(msg)}; - // } - // return boost::none; - //} - - //template - //bool cancel(Queue& d) - //{ - // bool result = true; - // d.consume_all( - // [&result](auto ptr) - // { - // auto& fut = ptr->m_future; - // if (!fut.ready()) - // result = result && fut.cancel(); - // //else - // // fut.wait(); - // delete ptr; - // } - // ); - // return result; - //} }; } // namespace tl diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index eaa7d60..ec9f3c1 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -19,7 +19,7 @@ #include using comm_t = gridtools::ghex::tl::communicator; -using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts>; +using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts>; std::atomic num_completed; @@ -28,11 +28,11 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool num_completed.store(0u); comm_t comm; - callback_comm_t cb_comm(comm); + callback_comm_t cb_comm; - const int rank = cb_comm.rank(); - const int r_rank = (rank+1)%cb_comm.size(); - const int l_rank = (rank+cb_comm.size()-1)%cb_comm.size(); + const int rank = comm.rank(); + const int r_rank = (rank+1)%comm.size(); + const int l_rank = (rank+comm.size()-1)%comm.size(); using msg_type = callback_comm_t::message_type; From c8c2e3b47e0dbebdfabc6ab2e4df897344b5cd33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Wed, 13 Nov 2019 17:24:13 +0100 Subject: [PATCH 07/21] formatting and comments --- .../callback_communicator_ts.hpp | 24 +++++-------------- tests/transport/test_ts.cpp | 16 ++++++++----- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/include/ghex/transport_layer/callback_communicator_ts.hpp b/include/ghex/transport_layer/callback_communicator_ts.hpp index 6c14d78..5e4b9ee 100644 --- a/include/ghex/transport_layer/callback_communicator_ts.hpp +++ b/include/ghex/transport_layer/callback_communicator_ts.hpp @@ -30,23 +30,16 @@ namespace gridtools using allocator_type = Allocator; using message_type = shared_message_buffer; - struct request_state { volatile bool m_ready = false; - bool is_ready() const noexcept - { - return m_ready; - } + bool is_ready() const noexcept { return m_ready; } }; struct request { std::shared_ptr m_request_state; - bool is_ready() const noexcept - { - return m_request_state->is_ready(); - } + bool is_ready() const noexcept { return m_request_state->is_ready(); } }; private: // member types @@ -58,6 +51,7 @@ namespace gridtools virtual bool ready() = 0; virtual ~iface() {} }; + template struct holder : public iface { @@ -70,9 +64,7 @@ namespace gridtools std::unique_ptr m_ptr; template - any_future(Future&& fut) - : m_ptr{std::make_unique>(std::move(fut))} - {} + any_future(Future&& fut) : m_ptr{std::make_unique>(std::move(fut))} {} bool ready() { return m_ptr->ready(); } }; @@ -94,7 +86,6 @@ namespace gridtools using send_container_type = boost::lockfree::queue>; using recv_container_type = boost::lockfree::queue>; - private: // members allocator_type m_alloc; @@ -103,19 +94,15 @@ namespace gridtools public: // ctors - /** @brief construct from a basic transport communicator - * @param comm the underlying transport communicator - * @param alloc the allocator instance to be used for constructing messages */ callback_communicator_ts(allocator_type alloc = allocator_type{}) : m_alloc(alloc), m_sends(128), m_recvs(128) {} callback_communicator_ts(const callback_communicator_ts&) = delete; callback_communicator_ts(callback_communicator_ts&&) = default; - /** terminates the program if the queues are not empty */ ~callback_communicator_ts() { - // consume all + // TODO: consume all } public: // send @@ -166,6 +153,7 @@ namespace gridtools { // call the callback ptr->m_cb(std::move(ptr->m_msg), ptr->m_rank, ptr->m_tag); + // make request ready ptr->m_request_state->m_ready = true; delete ptr; return 1u; diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index ec9f3c1..b34cb94 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -23,19 +23,21 @@ using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts num_completed; +// ring-communication using arbitrary number of threads for communication and progressing void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool wait) { num_completed.store(0u); - comm_t comm; + comm_t comm; callback_comm_t cb_comm; - const int rank = comm.rank(); + using msg_type = callback_comm_t::message_type; + + const int rank = comm.rank(); const int r_rank = (rank+1)%comm.size(); const int l_rank = (rank+comm.size()-1)%comm.size(); - using msg_type = callback_comm_t::message_type; - + // per-thread objects std::vector send_msgs; std::vector recv_msgs; std::vector comms; @@ -55,6 +57,7 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool cb_comm.recv(c, recv_msg,l_rank,tag, [](callback_comm_t::message_type, int r, int t) { std::cout << "received from " << r << " with tag " << t << std::endl; }); + cb_comm.send(c, send_msg,r_rank,tag, [](callback_comm_t::message_type, int r, int t) { std::cout << "sent to " << r << " with tag " << t << std::endl; }); @@ -67,6 +70,7 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool auto recv_req = cb_comm.recv(c, recv_msg,l_rank,tag, [](callback_comm_t::message_type, int r, int t) { std::cout << "received from " << r << " with tag " << t << std::endl; }); + auto send_req = cb_comm.send(c, send_msg,r_rank,tag, [](callback_comm_t::message_type, int r, int t) { std::cout << "sent to " << r << " with tag " << t << std::endl; }); @@ -78,11 +82,10 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool [&cb_comm, num_requests]() { while(num_completed < num_requests) - { num_completed += cb_comm.progress(); - } }; + // make threads std::vector threads; threads.reserve(num_progress_threads+num_comm_threads); @@ -106,6 +109,7 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool recv_msgs[i], send_msgs[i]) ); + // wait for completion for (auto& t : threads) t.join(); From 59e64709001df9c9728083e560fd0308e1d17822 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Wed, 13 Nov 2019 17:49:42 +0100 Subject: [PATCH 08/21] added comments for clarification --- tests/transport/test_ts.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index b34cb94..1ae089c 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -20,6 +20,7 @@ using comm_t = gridtools::ghex::tl::communicator; using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts>; +using msg_type = callback_comm_t::message_type; std::atomic num_completed; @@ -28,15 +29,15 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool { num_completed.store(0u); + // use basic communicator to establish neighbors comm_t comm; - callback_comm_t cb_comm; - - using msg_type = callback_comm_t::message_type; - const int rank = comm.rank(); const int r_rank = (rank+1)%comm.size(); const int l_rank = (rank+comm.size()-1)%comm.size(); + // shared callback communicator + callback_comm_t cb_comm; + // per-thread objects std::vector send_msgs; std::vector recv_msgs; @@ -48,6 +49,7 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool comms.push_back(comm_t()); } + // total number of sends and receives std::size_t num_requests = 2*num_comm_threads; // lambda which places send and receive calls From 8bf03dc50827dbeb7bf21b6eabc4aacabc6f656f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Wed, 13 Nov 2019 18:07:26 +0100 Subject: [PATCH 09/21] check integrity of messages --- tests/transport/test_ts.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 1ae089c..334c419 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -47,6 +47,10 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool send_msgs.push_back(msg_type(4096)); recv_msgs.push_back(msg_type(4096)); comms.push_back(comm_t()); + send_msgs.back().data()[0] = rank; + send_msgs.back().data()[1] = i; + recv_msgs.back().data()[0] = -1; + recv_msgs.back().data()[1] = -1; } // total number of sends and receives @@ -114,6 +118,12 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool // wait for completion for (auto& t : threads) t.join(); + + for (std::size_t i=0; i()[0] == l_rank); + EXPECT_TRUE(recv_msgs[i].data()[1] == (int)i); + } comm.barrier(); } From b5c56e651fd7e2a0e58dc0cf4258bea0f32c0116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 10:22:38 +0100 Subject: [PATCH 10/21] renamed to continuation_communicator --- ...r_ts.hpp => continuation_communicator.hpp} | 21 +++++++----- tests/transport/test_ts.cpp | 34 +++++++++---------- 2 files changed, 30 insertions(+), 25 deletions(-) rename include/ghex/transport_layer/{callback_communicator_ts.hpp => continuation_communicator.hpp} (88%) diff --git a/include/ghex/transport_layer/callback_communicator_ts.hpp b/include/ghex/transport_layer/continuation_communicator.hpp similarity index 88% rename from include/ghex/transport_layer/callback_communicator_ts.hpp rename to include/ghex/transport_layer/continuation_communicator.hpp index 5e4b9ee..75b3dc2 100644 --- a/include/ghex/transport_layer/callback_communicator_ts.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -8,8 +8,8 @@ * SPDX-License-Identifier: BSD-3-Clause * */ -#ifndef INCLUDED_GHEX_TL_CALLBACK_COMMUNICATOR_TS_HPP -#define INCLUDED_GHEX_TL_CALLBACK_COMMUNICATOR_TS_HPP +#ifndef INCLUDED_GHEX_TL_CONTINUATION_COMMUNICATOR_HPP +#define INCLUDED_GHEX_TL_CONTINUATION_COMMUNICATOR_HPP #include #include "./callback_communicator.hpp" @@ -21,7 +21,7 @@ namespace gridtools namespace tl { template> - class callback_communicator_ts + class continuation_communicator { public: // member types @@ -30,12 +30,16 @@ namespace gridtools using allocator_type = Allocator; using message_type = shared_message_buffer; + // shared request state struct request_state { + // volatile is needed to prevent the compiler + // from optimizing away the check of this member volatile bool m_ready = false; bool is_ready() const noexcept { return m_ready; } }; + // simple request class which is returned from send and recv calls struct request { std::shared_ptr m_request_state; @@ -44,6 +48,7 @@ namespace gridtools private: // member types + // type-erased future struct any_future { struct iface @@ -94,13 +99,13 @@ namespace gridtools public: // ctors - callback_communicator_ts(allocator_type alloc = allocator_type{}) + continuation_communicator(allocator_type alloc = allocator_type{}) : m_alloc(alloc), m_sends(128), m_recvs(128) {} - callback_communicator_ts(const callback_communicator_ts&) = delete; - callback_communicator_ts(callback_communicator_ts&&) = default; + continuation_communicator(const continuation_communicator&) = delete; + continuation_communicator(continuation_communicator&&) = default; - ~callback_communicator_ts() + ~continuation_communicator() { // TODO: consume all } @@ -172,5 +177,5 @@ namespace gridtools } // namespace ghex }// namespace gridtools -#endif/*INCLUDED_GHEX_TL_CALLBACK_COMMUNICATOR_TS_HPP */ +#endif/*INCLUDED_GHEX_TL_CONTINUATION_COMMUNICATOR_HPP */ diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 334c419..74b711c 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -9,7 +9,7 @@ * */ #include -#include +#include #include #include #include @@ -19,8 +19,8 @@ #include using comm_t = gridtools::ghex::tl::communicator; -using callback_comm_t = gridtools::ghex::tl::callback_communicator_ts>; -using msg_type = callback_comm_t::message_type; +using cont_comm_t = gridtools::ghex::tl::continuation_communicator>; +using msg_type = cont_comm_t::message_type; std::atomic num_completed; @@ -30,13 +30,13 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool num_completed.store(0u); // use basic communicator to establish neighbors - comm_t comm; + comm_t comm; const int rank = comm.rank(); const int r_rank = (rank+1)%comm.size(); const int l_rank = (rank+comm.size()-1)%comm.size(); // shared callback communicator - callback_comm_t cb_comm; + cont_comm_t cont_comm; // per-thread objects std::vector send_msgs; @@ -58,37 +58,37 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool // lambda which places send and receive calls auto send_recv_func_nowait = - [&cb_comm,l_rank,r_rank](comm_t& c, int tag, msg_type recv_msg, msg_type send_msg) + [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type recv_msg, msg_type send_msg) { - cb_comm.recv(c, recv_msg,l_rank,tag, - [](callback_comm_t::message_type, int r, int t) { + cont_comm.recv(c, recv_msg,l_rank,tag, + [](cont_comm_t::message_type, int r, int t) { std::cout << "received from " << r << " with tag " << t << std::endl; }); - cb_comm.send(c, send_msg,r_rank,tag, - [](callback_comm_t::message_type, int r, int t) { + cont_comm.send(c, send_msg,r_rank,tag, + [](cont_comm_t::message_type, int r, int t) { std::cout << "sent to " << r << " with tag " << t << std::endl; }); }; // lambda which places send and receive calls and waits for completion auto send_recv_func_wait = - [&cb_comm,l_rank,r_rank](comm_t& c, int tag, msg_type recv_msg, msg_type send_msg) + [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type recv_msg, msg_type send_msg) { - auto recv_req = cb_comm.recv(c, recv_msg,l_rank,tag, - [](callback_comm_t::message_type, int r, int t) { + auto recv_req = cont_comm.recv(c, recv_msg,l_rank,tag, + [](cont_comm_t::message_type, int r, int t) { std::cout << "received from " << r << " with tag " << t << std::endl; }); - auto send_req = cb_comm.send(c, send_msg,r_rank,tag, - [](callback_comm_t::message_type, int r, int t) { + auto send_req = cont_comm.send(c, send_msg,r_rank,tag, + [](cont_comm_t::message_type, int r, int t) { std::cout << "sent to " << r << " with tag " << t << std::endl; }); while ( !(recv_req.is_ready() && send_req.is_ready()) ) {} }; // lambda which progresses the queues auto progress_func = - [&cb_comm, num_requests]() + [&cont_comm, num_requests]() { while(num_completed < num_requests) - num_completed += cb_comm.progress(); + num_completed += cont_comm.progress(); }; // make threads From 664735ae8b7421bd8501204b82b2c7f26128b23a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 15:10:58 +0100 Subject: [PATCH 11/21] any message is possible --- .../continuation_communicator.hpp | 40 +++++++++++++++---- tests/transport/test_ts.cpp | 14 +++---- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 75b3dc2..dff5ad2 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -28,7 +28,6 @@ namespace gridtools using tag_type = int; using rank_type = int; using allocator_type = Allocator; - using message_type = shared_message_buffer; // shared request state struct request_state @@ -46,6 +45,18 @@ namespace gridtools bool is_ready() const noexcept { return m_request_state->is_ready(); } }; + struct any_message + { + using value_type = unsigned char; + unsigned char* m_data; + std::size_t m_size; + unsigned char* data() noexcept { return m_data; } + const unsigned char* data() const noexcept { return m_data; } + std::size_t size() const noexcept { return m_size; } + }; + + using message_type = any_message; + private: // member types // type-erased future @@ -58,7 +69,7 @@ namespace gridtools }; template - struct holder : public iface + struct holder final : public iface { Future m_future; holder() = default; @@ -74,6 +85,7 @@ namespace gridtools bool ready() { return m_ptr->ready(); } }; + // necessary meta information for each send/receive operation struct element_type { @@ -112,26 +124,38 @@ namespace gridtools public: // send - template - request send(Comm& comm, message_type msg, rank_type dst, tag_type tag, CallBack&& cb) + template + request send(Comm& comm, Message& msg, rank_type dst, tag_type tag, CallBack&& cb) { GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); - auto element_ptr = new send_element_type{std::forward(cb), dst, tag, std::move(fut), std::move(msg), req.m_request_state}; + auto element_ptr = new send_element_type{ + std::forward(cb), + dst, + tag, + std::move(fut), + any_message{msg.data(),msg.size()}, + req.m_request_state}; while (!m_sends.push(element_ptr)) {} return req; } public: // receive - template - request recv(Comm& comm, message_type msg, rank_type src, tag_type tag, CallBack&& cb) + template + request recv(Comm& comm, Message& msg, rank_type src, tag_type tag, CallBack&& cb) { GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); - auto element_ptr = new recv_element_type{std::forward(cb), src, tag, std::move(fut), std::move(msg), req.m_request_state}; + auto element_ptr = new recv_element_type{ + std::forward(cb), + src, + tag, + std::move(fut), + {msg.data(),msg.size()}, + req.m_request_state}; while (!m_recvs.push(element_ptr)) {} return req; } diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 74b711c..8826de1 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -20,7 +20,7 @@ using comm_t = gridtools::ghex::tl::communicator; using cont_comm_t = gridtools::ghex::tl::continuation_communicator>; -using msg_type = cont_comm_t::message_type; +using msg_type = gridtools::ghex::tl::message_buffer<>; std::atomic num_completed; @@ -58,7 +58,7 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool // lambda which places send and receive calls auto send_recv_func_nowait = - [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type recv_msg, msg_type send_msg) + [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type& recv_msg, msg_type& send_msg) { cont_comm.recv(c, recv_msg,l_rank,tag, [](cont_comm_t::message_type, int r, int t) { @@ -71,7 +71,7 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool // lambda which places send and receive calls and waits for completion auto send_recv_func_wait = - [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type recv_msg, msg_type send_msg) + [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type& recv_msg, msg_type& send_msg) { auto recv_req = cont_comm.recv(c, recv_msg,l_rank,tag, [](cont_comm_t::message_type, int r, int t) { @@ -104,16 +104,16 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool send_recv_func_wait, std::ref(comms[i]), (int)i, - recv_msgs[i], - send_msgs[i]) ); + std::ref(recv_msgs[i]), + std::ref(send_msgs[i])) ); else for (std::size_t i=0; i Date: Thu, 14 Nov 2019 15:25:51 +0100 Subject: [PATCH 12/21] removed allocator --- .../ghex/transport_layer/continuation_communicator.hpp | 9 +++------ tests/transport/test_ts.cpp | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index dff5ad2..0ab5228 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -20,14 +20,12 @@ namespace gridtools { namespace tl { - template> class continuation_communicator { public: // member types using tag_type = int; using rank_type = int; - using allocator_type = Allocator; // shared request state struct request_state @@ -105,14 +103,13 @@ namespace gridtools private: // members - allocator_type m_alloc; send_container_type m_sends; recv_container_type m_recvs; public: // ctors - continuation_communicator(allocator_type alloc = allocator_type{}) - : m_alloc(alloc), m_sends(128), m_recvs(128) {} + continuation_communicator() + : m_sends(128), m_recvs(128) {} continuation_communicator(const continuation_communicator&) = delete; continuation_communicator(continuation_communicator&&) = default; @@ -135,7 +132,7 @@ namespace gridtools dst, tag, std::move(fut), - any_message{msg.data(),msg.size()}, + {msg.data(),msg.size()}, req.m_request_state}; while (!m_sends.push(element_ptr)) {} return req; diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 8826de1..44f60ba 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -19,7 +19,7 @@ #include using comm_t = gridtools::ghex::tl::communicator; -using cont_comm_t = gridtools::ghex::tl::continuation_communicator>; +using cont_comm_t = gridtools::ghex::tl::continuation_communicator; using msg_type = gridtools::ghex::tl::message_buffer<>; std::atomic num_completed; From ceb124a9b957924ff8946ac70ac4b268168e0b88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 16:23:15 +0100 Subject: [PATCH 13/21] take ownership of message if moved in --- .../continuation_communicator.hpp | 128 +++++++++++++----- tests/transport/test_ts.cpp | 24 ++-- 2 files changed, 110 insertions(+), 42 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 0ab5228..1ea9e31 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -43,7 +43,45 @@ namespace gridtools bool is_ready() const noexcept { return m_request_state->is_ready(); } }; + // type-erased message struct any_message + { + struct iface + { + virtual unsigned char* data() noexcept = 0; + virtual const unsigned char* data() const noexcept = 0; + virtual std::size_t size() const noexcept = 0; + virtual ~iface() {} + }; + + template + struct holder final : public iface + { + Message m_message; + holder(Message&& m): m_message{std::move(m)} {} + + unsigned char* data() noexcept override { return m_message.data(); } + const unsigned char* data() const noexcept override { return m_message.data(); } + std::size_t size() const noexcept override { return m_message.size(); } + }; + + std::unique_ptr m_ptr; + + template + any_message(Message&& m) : m_ptr{std::make_unique>(std::move(m))} {} + any_message(any_message&&) = default; + + unsigned char* data() noexcept { return m_ptr->data(); } + const unsigned char* data() const noexcept { return m_ptr->data(); } + std::size_t size() const noexcept { return m_ptr->size(); } + }; + + using message_type = any_message; + + private: // member types + + // simple wrapper around an l-value reference message (stores pointer and size) + struct ref_message { using value_type = unsigned char; unsigned char* m_data; @@ -52,11 +90,7 @@ namespace gridtools const unsigned char* data() const noexcept { return m_data; } std::size_t size() const noexcept { return m_size; } }; - - using message_type = any_message; - - private: // member types - + // type-erased future struct any_future { @@ -118,43 +152,25 @@ namespace gridtools { // TODO: consume all } - + public: // send + // take ownership of message if it is an r-value reference! template - request send(Comm& comm, Message& msg, rank_type dst, tag_type tag, CallBack&& cb) + request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag, CallBack&& cb) { - GHEX_CHECK_CALLBACK - request req{std::make_shared()}; - auto fut = comm.send(msg,dst,tag); - auto element_ptr = new send_element_type{ - std::forward(cb), - dst, - tag, - std::move(fut), - {msg.data(),msg.size()}, - req.m_request_state}; - while (!m_sends.push(element_ptr)) {} - return req; + using is_rvalue = std::is_rvalue_reference(msg))>; + return send(comm, std::forward(msg), dst, tag, std::forward(cb), is_rvalue()); } public: // receive + // take ownership of message if it is an r-value reference! template - request recv(Comm& comm, Message& msg, rank_type src, tag_type tag, CallBack&& cb) + request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag, CallBack&& cb) { - GHEX_CHECK_CALLBACK - request req{std::make_shared()}; - auto fut = comm.recv(msg,src,tag); - auto element_ptr = new recv_element_type{ - std::forward(cb), - src, - tag, - std::move(fut), - {msg.data(),msg.size()}, - req.m_request_state}; - while (!m_recvs.push(element_ptr)) {} - return req; + using is_rvalue = std::is_rvalue_reference(msg))>; + return recv(comm, std::forward(msg), src, tag, std::forward(cb), is_rvalue()); } public: // progress @@ -169,6 +185,54 @@ namespace gridtools private: // implementation + template + request send(Comm& comm, Message& msg, rank_type dst, tag_type tag, CallBack&& cb, std::false_type) + { + GHEX_CHECK_CALLBACK + request req{std::make_shared()}; + auto fut = comm.send(msg,dst,tag); + auto element_ptr = new send_element_type{std::forward(cb), dst, tag, std::move(fut), + ref_message{msg.data(),msg.size()}, req.m_request_state}; + while (!m_sends.push(element_ptr)) {} + return req; + } + + template + request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag, CallBack&& cb, std::true_type) + { + GHEX_CHECK_CALLBACK + request req{std::make_shared()}; + auto fut = comm.send(msg,dst,tag); + auto element_ptr = new send_element_type{std::forward(cb), dst, tag, std::move(fut), + std::move(msg), req.m_request_state}; + while (!m_sends.push(element_ptr)) {} + return req; + } + + template + request recv(Comm& comm, Message& msg, rank_type src, tag_type tag, CallBack&& cb, std::false_type) + { + GHEX_CHECK_CALLBACK + request req{std::make_shared()}; + auto fut = comm.recv(msg,src,tag); + auto element_ptr = new recv_element_type{std::forward(cb), src, tag, std::move(fut), + ref_message{msg.data(),msg.size()}, req.m_request_state}; + while (!m_recvs.push(element_ptr)) {} + return req; + } + + template + request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag, CallBack&& cb, std::true_type) + { + GHEX_CHECK_CALLBACK + request req{std::make_shared()}; + auto fut = comm.recv(msg,src,tag); + auto element_ptr = new recv_element_type{std::forward(cb), src, tag, std::move(fut), + std::move(msg), req.m_request_state}; + while (!m_recvs.push(element_ptr)) {} + return req; + } + template std::size_t run(Queue& d) { diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 44f60ba..0fc52a6 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -61,12 +61,16 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type& recv_msg, msg_type& send_msg) { cont_comm.recv(c, recv_msg,l_rank,tag, - [](cont_comm_t::message_type, int r, int t) { - std::cout << "received from " << r << " with tag " << t << std::endl; }); - - cont_comm.send(c, send_msg,r_rank,tag, - [](cont_comm_t::message_type, int r, int t) { - std::cout << "sent to " << r << " with tag " << t << std::endl; }); + [](cont_comm_t::message_type m, int r, int t) { + std::cout << "received from " << r << " with tag " << t << " and size " << m.size() << std::endl; }); + + // give up ownership of some message: + msg_type another_msg(4096); + another_msg.data()[0] = send_msg.data()[0]; + another_msg.data()[1] = send_msg.data()[1]; + cont_comm.send(c, std::move(another_msg),r_rank,tag, + [](cont_comm_t::message_type m, int r, int t) { + std::cout << "sent to " << r << " with tag " << t << " and size " << m.size() << std::endl; }); }; // lambda which places send and receive calls and waits for completion @@ -74,12 +78,12 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool [&cont_comm,l_rank,r_rank](comm_t& c, int tag, msg_type& recv_msg, msg_type& send_msg) { auto recv_req = cont_comm.recv(c, recv_msg,l_rank,tag, - [](cont_comm_t::message_type, int r, int t) { - std::cout << "received from " << r << " with tag " << t << std::endl; }); + [](cont_comm_t::message_type m, int r, int t) { + std::cout << "received from " << r << " with tag " << t << " and size " << m.size() << std::endl; }); auto send_req = cont_comm.send(c, send_msg,r_rank,tag, - [](cont_comm_t::message_type, int r, int t) { - std::cout << "sent to " << r << " with tag " << t << std::endl; }); + [](cont_comm_t::message_type m, int r, int t) { + std::cout << "sent to " << r << " with tag " << t << " and size " << m.size() << std::endl; }); while ( !(recv_req.is_ready() && send_req.is_ready()) ) {} }; From db6ba11fdcd9d0fe70fb47cfc1c2fcecd73c3438 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 16:24:16 +0100 Subject: [PATCH 14/21] forgot value_type --- include/ghex/transport_layer/continuation_communicator.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 1ea9e31..73b5b80 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -46,6 +46,8 @@ namespace gridtools // type-erased message struct any_message { + using value_type = unsigned char; + struct iface { virtual unsigned char* data() noexcept = 0; From 0c5b22a834b495047029633ab387b2de351491d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 16:35:35 +0100 Subject: [PATCH 15/21] no callback version --- .../continuation_communicator.hpp | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 73b5b80..1f51cc6 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -119,7 +119,6 @@ namespace gridtools bool ready() { return m_ptr->ready(); } }; - // necessary meta information for each send/receive operation struct element_type { @@ -131,11 +130,9 @@ namespace gridtools message_type m_msg; std::shared_ptr m_request_state; }; - using send_element_type = element_type; - using recv_element_type = element_type; using lock_free_alloc_t = boost::lockfree::allocator>; - using send_container_type = boost::lockfree::queue>; - using recv_container_type = boost::lockfree::queue>; + using send_container_type = boost::lockfree::queue>; + using recv_container_type = boost::lockfree::queue>; private: // members @@ -164,6 +161,14 @@ namespace gridtools using is_rvalue = std::is_rvalue_reference(msg))>; return send(comm, std::forward(msg), dst, tag, std::forward(cb), is_rvalue()); } + + // take ownership of message if it is an r-value reference! + // no callback + template + request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag) + { + return send(comm, std::forward(msg), dst, tag, [](message_type,rank_type,tag_type){}); + } public: // receive @@ -175,6 +180,14 @@ namespace gridtools return recv(comm, std::forward(msg), src, tag, std::forward(cb), is_rvalue()); } + // take ownership of message if it is an r-value reference! + // no callback + template + request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag) + { + return recv(comm, std::forward(msg), src, tag, [](message_type,rank_type,tag_type){}); + } + public: // progress std::size_t progress() @@ -193,8 +206,8 @@ namespace gridtools GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); - auto element_ptr = new send_element_type{std::forward(cb), dst, tag, std::move(fut), - ref_message{msg.data(),msg.size()}, req.m_request_state}; + auto element_ptr = new element_type{std::forward(cb), dst, tag, std::move(fut), + ref_message{msg.data(),msg.size()}, req.m_request_state}; while (!m_sends.push(element_ptr)) {} return req; } @@ -205,8 +218,8 @@ namespace gridtools GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); - auto element_ptr = new send_element_type{std::forward(cb), dst, tag, std::move(fut), - std::move(msg), req.m_request_state}; + auto element_ptr = new element_type{std::forward(cb), dst, tag, std::move(fut), + std::move(msg), req.m_request_state}; while (!m_sends.push(element_ptr)) {} return req; } @@ -217,8 +230,8 @@ namespace gridtools GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); - auto element_ptr = new recv_element_type{std::forward(cb), src, tag, std::move(fut), - ref_message{msg.data(),msg.size()}, req.m_request_state}; + auto element_ptr = new element_type{std::forward(cb), src, tag, std::move(fut), + ref_message{msg.data(),msg.size()}, req.m_request_state}; while (!m_recvs.push(element_ptr)) {} return req; } @@ -229,8 +242,8 @@ namespace gridtools GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); - auto element_ptr = new recv_element_type{std::forward(cb), src, tag, std::move(fut), - std::move(msg), req.m_request_state}; + auto element_ptr = new element_type{std::forward(cb), src, tag, std::move(fut), + std::move(msg), req.m_request_state}; while (!m_recvs.push(element_ptr)) {} return req; } @@ -238,7 +251,7 @@ namespace gridtools template std::size_t run(Queue& d) { - send_element_type* ptr = nullptr; + element_type* ptr = nullptr; if (d.pop(ptr)) { if (ptr->m_future.ready()) From f12f1244d4e3f95b4102e611d88b177e17379deb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 16:42:20 +0100 Subject: [PATCH 16/21] cleanup --- .../continuation_communicator.hpp | 64 ++++++++++--------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 1f51cc6..61b7a0c 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -14,18 +14,12 @@ #include #include "./callback_communicator.hpp" -namespace gridtools -{ - namespace ghex - { +namespace gridtools{ + namespace ghex { namespace tl { - class continuation_communicator - { - public: // member types - - using tag_type = int; - using rank_type = int; + // implementation details here: + namespace cont_detail { // shared request state struct request_state @@ -78,10 +72,6 @@ namespace gridtools std::size_t size() const noexcept { return m_ptr->size(); } }; - using message_type = any_message; - - private: // member types - // simple wrapper around an l-value reference message (stores pointer and size) struct ref_message { @@ -119,16 +109,33 @@ namespace gridtools bool ready() { return m_ptr->ready(); } }; + } // namespace cont_detail + + + // user code here: + class continuation_communicator + { + public: // member types + + using tag_type = int; + using rank_type = int; + using message_type = cont_detail::any_message; + using request = cont_detail::request; + + private: // member types + + using ref_message = cont_detail::ref_message; + // necessary meta information for each send/receive operation struct element_type { using message_arg_type = message_type; std::function m_cb; - rank_type m_rank; - tag_type m_tag; - any_future m_future; - message_type m_msg; - std::shared_ptr m_request_state; + rank_type m_rank; + tag_type m_tag; + cont_detail::any_future m_future; + message_type m_msg; + std::shared_ptr m_request_state; }; using lock_free_alloc_t = boost::lockfree::allocator>; using send_container_type = boost::lockfree::queue>; @@ -141,16 +148,10 @@ namespace gridtools public: // ctors - continuation_communicator() - : m_sends(128), m_recvs(128) {} - + continuation_communicator() : m_sends(128), m_recvs(128) {} continuation_communicator(const continuation_communicator&) = delete; continuation_communicator(continuation_communicator&&) = default; - - ~continuation_communicator() - { - // TODO: consume all - } + ~continuation_communicator() { /* TODO: consume all*/ } public: // send @@ -198,13 +199,14 @@ namespace gridtools return num_completed; } + private: // implementation template request send(Comm& comm, Message& msg, rank_type dst, tag_type tag, CallBack&& cb, std::false_type) { GHEX_CHECK_CALLBACK - request req{std::make_shared()}; + request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); auto element_ptr = new element_type{std::forward(cb), dst, tag, std::move(fut), ref_message{msg.data(),msg.size()}, req.m_request_state}; @@ -216,7 +218,7 @@ namespace gridtools request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag, CallBack&& cb, std::true_type) { GHEX_CHECK_CALLBACK - request req{std::make_shared()}; + request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); auto element_ptr = new element_type{std::forward(cb), dst, tag, std::move(fut), std::move(msg), req.m_request_state}; @@ -228,7 +230,7 @@ namespace gridtools request recv(Comm& comm, Message& msg, rank_type src, tag_type tag, CallBack&& cb, std::false_type) { GHEX_CHECK_CALLBACK - request req{std::make_shared()}; + request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); auto element_ptr = new element_type{std::forward(cb), src, tag, std::move(fut), ref_message{msg.data(),msg.size()}, req.m_request_state}; @@ -240,7 +242,7 @@ namespace gridtools request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag, CallBack&& cb, std::true_type) { GHEX_CHECK_CALLBACK - request req{std::make_shared()}; + request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); auto element_ptr = new element_type{std::forward(cb), src, tag, std::move(fut), std::move(msg), req.m_request_state}; From 2bac102bb2f340c3ca50fe3a7357841994c4b422 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 17:37:42 +0100 Subject: [PATCH 17/21] send_multi --- .../continuation_communicator.hpp | 74 ++++++++++++++++++- 1 file changed, 70 insertions(+), 4 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 61b7a0c..1954bae 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -82,6 +82,21 @@ namespace gridtools{ const unsigned char* data() const noexcept { return m_data; } std::size_t size() const noexcept { return m_size; } }; + + template + struct shared_message + { + using value_type = typename Message::value_type; + std::shared_ptr m_message; + + shared_message(Message&& m) : m_message{std::make_shared(std::move(m))} {} + shared_message(const shared_message&) = default; + shared_message(shared_message&&) = default; + + value_type* data() noexcept { return m_message->data(); } + const value_type* data() const noexcept { return m_message->data(); } + std::size_t size() const noexcept { return m_message->size(); } + }; // type-erased future struct any_future @@ -159,6 +174,7 @@ namespace gridtools{ template request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag, CallBack&& cb) { + GHEX_CHECK_CALLBACK using is_rvalue = std::is_rvalue_reference(msg))>; return send(comm, std::forward(msg), dst, tag, std::forward(cb), is_rvalue()); } @@ -171,12 +187,32 @@ namespace gridtools{ return send(comm, std::forward(msg), dst, tag, [](message_type,rank_type,tag_type){}); } + public: // send multi + + // take ownership of message if it is an r-value reference! + template + std::vector send_multi(Comm& comm, Message&& msg, const Neighs& neighs, tag_type tag, CallBack&& cb) + { + GHEX_CHECK_CALLBACK + using is_rvalue = std::is_rvalue_reference(msg))>; + return send_multi(comm, std::forward(msg), neighs, tag, std::forward(cb), is_rvalue()); + } + + // take ownership of message if it is an r-value reference! + // no callback + template + std::vector send_multi(Comm& comm, Message&& msg, const Neighs& neighs, tag_type tag) + { + return send_multi(comm, std::forward(msg), neighs, tag, [](message_type,rank_type,tag_type){}); + } + public: // receive // take ownership of message if it is an r-value reference! template request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag, CallBack&& cb) { + GHEX_CHECK_CALLBACK using is_rvalue = std::is_rvalue_reference(msg))>; return recv(comm, std::forward(msg), src, tag, std::forward(cb), is_rvalue()); } @@ -205,7 +241,6 @@ namespace gridtools{ template request send(Comm& comm, Message& msg, rank_type dst, tag_type tag, CallBack&& cb, std::false_type) { - GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); auto element_ptr = new element_type{std::forward(cb), dst, tag, std::move(fut), @@ -217,7 +252,6 @@ namespace gridtools{ template request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag, CallBack&& cb, std::true_type) { - GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); auto element_ptr = new element_type{std::forward(cb), dst, tag, std::move(fut), @@ -229,7 +263,6 @@ namespace gridtools{ template request recv(Comm& comm, Message& msg, rank_type src, tag_type tag, CallBack&& cb, std::false_type) { - GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); auto element_ptr = new element_type{std::forward(cb), src, tag, std::move(fut), @@ -241,7 +274,6 @@ namespace gridtools{ template request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag, CallBack&& cb, std::true_type) { - GHEX_CHECK_CALLBACK request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); auto element_ptr = new element_type{std::forward(cb), src, tag, std::move(fut), @@ -250,6 +282,40 @@ namespace gridtools{ return req; } + template + std::vector send_multi(Comm& comm, Message& msg, const Neighs& neighs, tag_type tag, CallBack&& cb, std::false_type) + { + using cb_type = typename std::remove_cv::type>::type; + auto cb_ptr = std::make_shared( std::forward(cb) ); + std::vector reqs; + for (auto id : neighs) + reqs.push_back( send(comm, msg, id, tag, + [cb_ptr](message_type m, rank_type r, tag_type t) + { + // if (cb_ptr->use_count == 1) + (*cb_ptr)(std::move(m),r,t); + }) ); + return reqs; + } + + template + std::vector send_multi(Comm& comm, Message&& msg, const Neighs& neighs, tag_type tag, CallBack&& cb, std::true_type) + { + using cb_type = typename std::remove_cv::type>::type; + auto cb_ptr = std::make_shared( std::forward(cb) ); + cont_detail::shared_message s_msg{std::move(msg)}; + std::vector reqs; + for (auto id : neighs) + reqs.push_back( send(comm, s_msg, id, tag, + [cb_ptr, s_msg](message_type m, rank_type r, tag_type t) + { + // if (cb_ptr->use_count == 1) + (*cb_ptr)(std::move(m),r,t); + }) ); + return reqs; + } + + template std::size_t run(Queue& d) { From bd60386ea8bab07fa41b460112b3d659956a02ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 17:47:08 +0100 Subject: [PATCH 18/21] took into account value_type of user-supplied message --- include/ghex/transport_layer/continuation_communicator.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 1954bae..c71622c 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -53,12 +53,13 @@ namespace gridtools{ template struct holder final : public iface { + using value_type = typename Message::value_type; Message m_message; holder(Message&& m): m_message{std::move(m)} {} - unsigned char* data() noexcept override { return m_message.data(); } - const unsigned char* data() const noexcept override { return m_message.data(); } - std::size_t size() const noexcept override { return m_message.size(); } + unsigned char* data() noexcept override { return reinterpret_cast(m_message.data()); } + const unsigned char* data() const noexcept override { return reinterpret_cast(m_message.data()); } + std::size_t size() const noexcept override { return sizeof(value_type)*m_message.size(); } }; std::unique_ptr m_ptr; From 8b15261c190c8e5e7a1b755656c7b90e7107f9c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 20:28:51 +0100 Subject: [PATCH 19/21] test for send_multi, passes! --- .../continuation_communicator.hpp | 7 +- tests/transport/test_ts.cpp | 267 +++++++++++++++++- 2 files changed, 267 insertions(+), 7 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index c71622c..8048fdb 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -307,12 +307,15 @@ namespace gridtools{ cont_detail::shared_message s_msg{std::move(msg)}; std::vector reqs; for (auto id : neighs) - reqs.push_back( send(comm, s_msg, id, tag, - [cb_ptr, s_msg](message_type m, rank_type r, tag_type t) + { + auto s_msg_cpy = s_msg; + reqs.push_back( send(comm, std::move(s_msg_cpy), id, tag, + [cb_ptr](message_type m, rank_type r, tag_type t) { // if (cb_ptr->use_count == 1) (*cb_ptr)(std::move(m),r,t); }) ); + } return reqs; } diff --git a/tests/transport/test_ts.cpp b/tests/transport/test_ts.cpp index 0fc52a6..ad5246f 100644 --- a/tests/transport/test_ts.cpp +++ b/tests/transport/test_ts.cpp @@ -25,7 +25,23 @@ using msg_type = gridtools::ghex::tl::message_buffer<>; std::atomic num_completed; // ring-communication using arbitrary number of threads for communication and progressing -void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool wait) +// each rank has num_comm_threads threads +// each rank has num_progress_threads which progress the communication and execute the callbacks +// +// num_comm_threads send to the right rank +// num_comm_threads receive from the left rank +// +// the messages are passed as l-value references (GHEX does not take ownership) +// there is one exception to show-case the usage of moving in the message, but this does not alter the semantics of +// this test +// +// there are two modes: +// - wait mode: + each thread waits until the send and receive are finished +// + this is done using the requests returned by the communicator +// +// - nowait mode: + send and receives are posted, the function returns immediately +// +void test_ring(std::size_t num_progress_threads, std::size_t num_comm_threads, bool wait) { num_completed.store(0u); @@ -64,7 +80,8 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool [](cont_comm_t::message_type m, int r, int t) { std::cout << "received from " << r << " with tag " << t << " and size " << m.size() << std::endl; }); - // give up ownership of some message: + // give up ownership of some message + // this is just to illustrate the functionality and syntax msg_type another_msg(4096); another_msg.data()[0] = send_msg.data()[0]; another_msg.data()[1] = send_msg.data()[1]; @@ -133,7 +150,247 @@ void test1(std::size_t num_progress_threads, std::size_t num_comm_threads, bool } -TEST(transport, basic) { - test1(2, 3, true); - test1(2, 3, false); +// send multiple messages from rank 0 (broadcast) +// and repost the same message after it's been sent +// each rank has num_comm_threads threads +// each rank has num_progress_threads which progress the communication and execute the callbacks +// +// rank 0: num_comm_threads send twice to each rank (using a repost) +// other ranks: num_comm_threads receive twice from rank 0 +// +// there are two modes: +// - wait mode: + each thread waits until the first round of communication has finished and then reposts +// + this is done using the requests returned by the communicator +// + the messages are passed as l-value references (GHEX does not take ownership) +// +// - nowait mode: + rank 0: each thread submits a send_multi. Another thread from the progress thread-pool +// executes the callback and resubmits a send_multi after the message has been sent +// to all other ranks. +// + other ranks: two receives are posted (with two different recv messages), the function returns immediately +// + the messages are passed as r-value references (GHEX takes ownership) +// +void test_send_multi(std::size_t num_progress_threads, std::size_t num_comm_threads, bool wait) +{ + num_completed.store(0u); + + // use basic communicator to establish neighbors + comm_t comm; + + // shared callback communicator + cont_comm_t cont_comm; + + // per-thread objects + std::vector comms; + std::vector num_reps; // used for counting in the first nowait send callback + for (std::size_t i=0; i neighbor_ranks; + for (int i=1; i()[0] = tag; + // get a vector of requests + auto reqs = cont_comm.send_multi(c, msg, neighbor_ranks, tag, + [](cont_comm_t::message_type m, int r, int t) + { + std::cout << "sent to " << r << " with tag " << t << " and size " << m.size() << std::endl; + }); + // wait until all requests are done + bool finished = false; + while (!finished) + { + bool f = true; + for (auto& r : reqs) + f = f && r.is_ready(); + finished = f; + } + std::cout << "reposting" << std::endl; + reqs = cont_comm.send_multi(c, msg, neighbor_ranks, tag+num_comm_threads, + [](cont_comm_t::message_type m, int r, int t) + { + std::cout << "sent to " << r << " with tag " << t << " and size " << m.size() << std::endl; + }); + // wait until all requests are done + // this is important since otherwise the message will go out of scope and is destroyed + // and that would lead to corruption since we passed the message as l-value reference + finished = false; + while (!finished) + { + bool f = true; + for (auto& r : reqs) + f = f && r.is_ready(); + finished = f; + } + }; + + // nowait mode + auto send_multi_nowait = + [&cont_comm,&neighbor_ranks,num_comm_threads](comm_t& c, int tag, int& num_reps_i) + { + const int s = neighbor_ranks.size(); + msg_type msg(4096); + msg.data()[0] = tag; + // no return value is required, message is moved in + cont_comm.send_multi(c, std::move(msg), neighbor_ranks, tag, + [&num_reps_i,s,&c,&cont_comm,neighbor_ranks,num_comm_threads](cont_comm_t::message_type m, int r, int t) + { + std::cout << "sent to " << r << " with tag " << t << " and size " << m.size() << std::endl; + ++num_reps_i; + // check if the message has been sent to all ranks + if (num_reps_i%s == 0) + { + std::cout << "reposting" << std::endl; + // note the move in the repost: + // it recommended to always move inside a callback since this is safe in all cases! + // here it is actually required to move and not doing so will lead to bad bad things. + cont_comm.send_multi(c, std::move(m), neighbor_ranks, t+num_comm_threads); + } + }); + }; + + // make threads + std::vector threads; + threads.reserve(num_progress_threads+num_comm_threads); + + for (std::size_t i=0; i()[0] = -1; + // get a request as return value + auto req = cont_comm.recv(c, msg, 0, tag, + [](cont_comm_t::message_type m, int, int t) + { + EXPECT_TRUE(reinterpret_cast(m.data())[0] == t); + }); + // wait on the request + while (!req.is_ready()){} + msg.data()[0] = -1; + req = cont_comm.recv(c, msg, 0, tag+num_comm_threads, + [num_comm_threads](cont_comm_t::message_type m, int, int t) + { + EXPECT_TRUE(reinterpret_cast(m.data())[0] == (int)(t-num_comm_threads)); + }); + // wait until the requests is ready + // this is important since otherwise the message will go out of scope and is destroyed + // and that would lead to corruption since we passed the message as l-value reference + while (!req.is_ready()){} + }; + + // nowait mode + auto recv_nowait = + [&cont_comm,num_comm_threads](comm_t& c, int tag) + { + msg_type msg(4096); + msg.data()[0] = -1; + // no return value is required, message is moved in + cont_comm.recv(c, std::move(msg), 0, tag, + [](cont_comm_t::message_type m, int, int t) + { + EXPECT_TRUE(reinterpret_cast(m.data())[0] == t); + }); + // another message is created + msg_type msg2(4096); + msg2.data()[0] = -1; + // no return value is required, message is moved in + cont_comm.recv(c, std::move(msg2), 0, tag+num_comm_threads, + [num_comm_threads](cont_comm_t::message_type m, int, int t) + { + EXPECT_TRUE(reinterpret_cast(m.data())[0] == (int)(t-num_comm_threads)); + }); + }; + + // make threads + std::vector threads; + threads.reserve(num_progress_threads+num_comm_threads); + + for (std::size_t i=0; i Date: Thu, 14 Nov 2019 20:37:46 +0100 Subject: [PATCH 20/21] comments --- .../continuation_communicator.hpp | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 8048fdb..841718d 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -128,18 +128,24 @@ namespace gridtools{ } // namespace cont_detail - // user code here: + + // thread-safe shared communicator which handles callbacks + // note: no templates, everything is type-erased + // relies on future-based basic communicator which is passed for every send/recv class continuation_communicator { public: // member types using tag_type = int; using rank_type = int; - using message_type = cont_detail::any_message; + // this is the message type returned in the callback: + using message_type = cont_detail::any_message; + // returned from send/recv to check for completion using request = cont_detail::request; private: // member types + // wrapper for messages passed by l-value reference using ref_message = cont_detail::ref_message; // necessary meta information for each send/receive operation @@ -153,6 +159,7 @@ namespace gridtools{ message_type m_msg; std::shared_ptr m_request_state; }; + // we need thread-safe queues using lock_free_alloc_t = boost::lockfree::allocator>; using send_container_type = boost::lockfree::queue>; using recv_container_type = boost::lockfree::queue>; @@ -171,7 +178,9 @@ namespace gridtools{ public: // send - // take ownership of message if it is an r-value reference! + // use basic comm to post the send and place the callback in a queue + // returns a request to check for completion + // takes ownership of message if it is an r-value reference! template request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag, CallBack&& cb) { @@ -180,8 +189,7 @@ namespace gridtools{ return send(comm, std::forward(msg), dst, tag, std::forward(cb), is_rvalue()); } - // take ownership of message if it is an r-value reference! - // no callback + // no-callback version template request send(Comm& comm, Message&& msg, rank_type dst, tag_type tag) { @@ -190,7 +198,10 @@ namespace gridtools{ public: // send multi - // take ownership of message if it is an r-value reference! + // use basic comm to post the sends and place the callback in a queue + // returns a vector of request to check for completion + // takes ownership of message if it is an r-value reference! + // internally transforms the callback (and the message if moved in) into shared objects template std::vector send_multi(Comm& comm, Message&& msg, const Neighs& neighs, tag_type tag, CallBack&& cb) { @@ -199,8 +210,7 @@ namespace gridtools{ return send_multi(comm, std::forward(msg), neighs, tag, std::forward(cb), is_rvalue()); } - // take ownership of message if it is an r-value reference! - // no callback + // no-callback version template std::vector send_multi(Comm& comm, Message&& msg, const Neighs& neighs, tag_type tag) { @@ -209,7 +219,9 @@ namespace gridtools{ public: // receive - // take ownership of message if it is an r-value reference! + // use basic comm to post the recv and place the callback in a queue + // returns a request to check for completion + // takes ownership of message if it is an r-value reference! template request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag, CallBack&& cb) { @@ -218,8 +230,7 @@ namespace gridtools{ return recv(comm, std::forward(msg), src, tag, std::forward(cb), is_rvalue()); } - // take ownership of message if it is an r-value reference! - // no callback + // no-callback version template request recv(Comm& comm, Message&& msg, rank_type src, tag_type tag) { @@ -228,6 +239,7 @@ namespace gridtools{ public: // progress + // progress the ques and return the number of progressed callbacks std::size_t progress() { std::size_t num_completed = 0u; @@ -319,7 +331,6 @@ namespace gridtools{ return reqs; } - template std::size_t run(Queue& d) { From 75fcf784a4a61aa6c0c0378e89d3d5a213b887eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20B=C3=B6sch?= <48126478+boeschf@users.noreply.github.com> Date: Thu, 14 Nov 2019 20:44:29 +0100 Subject: [PATCH 21/21] fixed ref_message - can be used for any message now --- .../continuation_communicator.hpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/include/ghex/transport_layer/continuation_communicator.hpp b/include/ghex/transport_layer/continuation_communicator.hpp index 841718d..6478289 100644 --- a/include/ghex/transport_layer/continuation_communicator.hpp +++ b/include/ghex/transport_layer/continuation_communicator.hpp @@ -74,16 +74,18 @@ namespace gridtools{ }; // simple wrapper around an l-value reference message (stores pointer and size) + template struct ref_message { - using value_type = unsigned char; - unsigned char* m_data; + using value_type = T;//unsigned char; + T* m_data; std::size_t m_size; - unsigned char* data() noexcept { return m_data; } - const unsigned char* data() const noexcept { return m_data; } + T* data() noexcept { return m_data; } + const T* data() const noexcept { return m_data; } std::size_t size() const noexcept { return m_size; } }; + // simple shared message which is internally used for send_multi template struct shared_message { @@ -146,7 +148,8 @@ namespace gridtools{ private: // member types // wrapper for messages passed by l-value reference - using ref_message = cont_detail::ref_message; + template + using ref_message = cont_detail::ref_message; // necessary meta information for each send/receive operation struct element_type @@ -254,10 +257,11 @@ namespace gridtools{ template request send(Comm& comm, Message& msg, rank_type dst, tag_type tag, CallBack&& cb, std::false_type) { + using V = typename Message::value_type; request req{std::make_shared()}; auto fut = comm.send(msg,dst,tag); auto element_ptr = new element_type{std::forward(cb), dst, tag, std::move(fut), - ref_message{msg.data(),msg.size()}, req.m_request_state}; + ref_message{msg.data(),msg.size()}, req.m_request_state}; while (!m_sends.push(element_ptr)) {} return req; } @@ -276,10 +280,11 @@ namespace gridtools{ template request recv(Comm& comm, Message& msg, rank_type src, tag_type tag, CallBack&& cb, std::false_type) { + using V = typename Message::value_type; request req{std::make_shared()}; auto fut = comm.recv(msg,src,tag); auto element_ptr = new element_type{std::forward(cb), src, tag, std::move(fut), - ref_message{msg.data(),msg.size()}, req.m_request_state}; + ref_message{msg.data(),msg.size()}, req.m_request_state}; while (!m_recvs.push(element_ptr)) {} return req; }