From 3f7ebb29033beda7196ada7d35d5b4ec96f43212 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20K=2E=20Guti=C3=A9rrez?= Date: Tue, 23 Jul 2024 18:47:23 -0600 Subject: [PATCH] Checkpoint some pthread group work. (#239) Adding full pthread group support is non-trivial, so we will leave group splitting as not supported until we have some cycles to dedicate to this. The basic structure is there, but we need nice way to share data across the threads during gather, scatter, etc. Signed-off-by: Samuel K. Gutierrez --- src/qvi-group-pthread.cc | 29 +++++++++------- src/qvi-group-pthread.h | 16 +++------ src/qvi-group.cc | 16 +++++++++ src/qvi-group.h | 13 +------ src/qvi-mpi.cc | 2 +- src/qvi-pthread.cc | 61 ++++++++++++++++++++++++++++++++- src/qvi-pthread.h | 70 ++++++++++++-------------------------- tests/test-pthread-split.c | 4 +-- 8 files changed, 124 insertions(+), 87 deletions(-) diff --git a/src/qvi-group-pthread.cc b/src/qvi-group-pthread.cc index 815c5b1..2933f43 100644 --- a/src/qvi-group-pthread.cc +++ b/src/qvi-group-pthread.cc @@ -14,28 +14,33 @@ #include "qvi-group-pthread.h" #include "qvi-utils.h" +qvi_group_pthread_s::qvi_group_pthread_s( + int group_size +) { + const int rc = qvi_new(&thgroup, group_size); + if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error(); +} + +qvi_group_pthread_s::~qvi_group_pthread_s(void) +{ + qvi_delete(&thgroup); +} + int qvi_group_pthread_s::self( - qvi_group_t **child + qvi_group_t ** ) { - constexpr int group_size = 1; - qvi_group_pthread_t *ichild = nullptr; - // Create a group containing a single thread. - const int rc = qvi_new(&ichild, group_size); - if (qvi_unlikely(rc != QV_SUCCESS)) { - qvi_delete(&ichild); - } - *child = ichild; - return rc; + // TODO(skg) + return QV_ERR_NOT_SUPPORTED; } int qvi_group_pthread_s::split( int, int, - qvi_group_t ** + qvi_group_s ** ) { - // TODO(skg) Test to see if we can do this. + // TODO(skg) return QV_ERR_NOT_SUPPORTED; } diff --git a/src/qvi-group-pthread.h b/src/qvi-group-pthread.h index 3352858..95dffe1 100644 --- a/src/qvi-group-pthread.h +++ b/src/qvi-group-pthread.h @@ -26,15 +26,9 @@ struct qvi_group_pthread_s : public qvi_group_s { /** Constructor. */ qvi_group_pthread_s( int group_size - ) { - const int rc = qvi_new(&thgroup, group_size); - if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error(); - } + ); /** Destructor. */ - virtual ~qvi_group_pthread_s(void) - { - qvi_delete(&thgroup); - } + virtual ~qvi_group_pthread_s(void); virtual qvi_task_t * task(void) @@ -78,7 +72,7 @@ struct qvi_group_pthread_s : public qvi_group_s { int, qvi_group_s ** ) { - // TODO(skg) Need to test this. + // TODO(skg) return QV_ERR_NOT_SUPPORTED; } @@ -96,7 +90,7 @@ struct qvi_group_pthread_s : public qvi_group_s { bool *shared, qvi_bbuff_t ***rxbuffs ) { - return thgroup->gather_bbuffs( + return thgroup->gather( txbuff, root, shared, rxbuffs ); } @@ -107,7 +101,7 @@ struct qvi_group_pthread_s : public qvi_group_s { int root, qvi_bbuff_t **rxbuff ) { - return thgroup->scatter_bbuffs( + return thgroup->scatter( txbuffs, root, rxbuff ); } diff --git a/src/qvi-group.cc b/src/qvi-group.cc index c40132b..4acaa84 100644 --- a/src/qvi-group.cc +++ b/src/qvi-group.cc @@ -29,6 +29,22 @@ qvi_group_s::thsplit( return rc; } +int +qvi_group_s::next_id( + qvi_group_id_t *gid +) { + // Global group ID. Note that we pad its initial value so that other + // infrastructure (e.g., QVI_MPI_GROUP_WORLD) will never equal or exceed + // this value. + static std::atomic group_id(64); + if (group_id == UINT64_MAX) { + qvi_log_error("Group ID space exhausted"); + return QV_ERR_OOR; + } + *gid = group_id++; + return QV_SUCCESS; +} + /* * vim: ft=cpp ts=4 sts=4 sw=4 expandtab */ diff --git a/src/qvi-group.h b/src/qvi-group.h index 52aef0b..a86a0ab 100644 --- a/src/qvi-group.h +++ b/src/qvi-group.h @@ -93,18 +93,7 @@ struct qvi_group_s : qvi_refc_s { static int next_id( qvi_group_id_t *gid - ) { - // Global group ID. Note that we pad its initial value so that other - // infrastructure (e.g., QVI_MPI_GROUP_WORLD) will never equal or exceed - // this value. - static std::atomic group_id(64); - if (group_id == UINT64_MAX) { - qvi_log_error("Group ID space exhausted"); - return QV_ERR_OOR; - } - *gid = group_id++; - return QV_SUCCESS; - } + ); }; typedef struct qvi_group_s qvi_group_t; diff --git a/src/qvi-mpi.cc b/src/qvi-mpi.cc index 506f529..f1dd5f0 100644 --- a/src/qvi-mpi.cc +++ b/src/qvi-mpi.cc @@ -413,7 +413,7 @@ qvi_mpi_group_gather_bbuffs( if (group_id == root) { // Zero initialize array of pointers to nullptr. bbuffs = new qvi_bbuff_t*[group_size](); - + // TODO(skg) Use dup. byte_t *bytepos = allbytes.data(); for (int i = 0; i < group_size; ++i) { rc = qvi_bbuff_new(&bbuffs[i]); diff --git a/src/qvi-pthread.cc b/src/qvi-pthread.cc index 08d4b43..4cfa227 100644 --- a/src/qvi-pthread.cc +++ b/src/qvi-pthread.cc @@ -13,9 +13,16 @@ #include "qvi-pthread.h" #include "qvi-task.h" // IWYU pragma: keep -#include "qvi-bbuff.h" #include "qvi-utils.h" +qvi_pthread_group_s::qvi_pthread_group_s( + int group_size +) : m_size(group_size) +{ + const int rc = pthread_barrier_init(&m_barrier, NULL, group_size); + if (qvi_unlikely(rc != 0)) throw qvi_runtime_error(); +} + void * qvi_pthread_group_s::call_first_from_pthread_create( void *arg @@ -76,6 +83,27 @@ qvi_pthread_group_s::~qvi_pthread_group_s(void) pthread_barrier_destroy(&m_barrier); } +int +qvi_pthread_group_s::size(void) +{ + std::lock_guard guard(m_mutex); + return m_size; +} + +int +qvi_pthread_group_s::rank(void) +{ + std::lock_guard guard(m_mutex); + return m_tid2rank.at(qvi_gettid()); +} + +qvi_task_t * +qvi_pthread_group_s::task(void) +{ + std::lock_guard guard(m_mutex); + return m_tid2task.at(qvi_gettid()); +} + int qvi_pthread_group_s::barrier(void) { @@ -86,6 +114,37 @@ qvi_pthread_group_s::barrier(void) return QV_SUCCESS; } +int +qvi_pthread_group_s::split( + int, + int, + qvi_pthread_group_s ** +) { + // TODO(skg) + return QV_ERR_NOT_SUPPORTED; +} + +int +qvi_pthread_group_s::gather( + qvi_bbuff_t *, + int, + bool *, + qvi_bbuff_t *** +) { + // TODO(skg) + return QV_ERR_NOT_SUPPORTED; +} + +int +qvi_pthread_group_s::scatter( + qvi_bbuff_t **, + int, + qvi_bbuff_t ** +) { + // TODO(skg) + return QV_ERR_NOT_SUPPORTED; +} + /* * vim: ft=cpp ts=4 sts=4 sw=4 expandtab */ diff --git a/src/qvi-pthread.h b/src/qvi-pthread.h index 88e6ffb..d14bc63 100644 --- a/src/qvi-pthread.h +++ b/src/qvi-pthread.h @@ -15,7 +15,6 @@ #define QVI_PTHREAD_H #include "qvi-common.h" -#include "qvi-utils.h" typedef void *(*qvi_pthread_routine_fun_ptr_t)(void *); @@ -66,11 +65,7 @@ struct qvi_pthread_group_s { */ qvi_pthread_group_s( int group_size - ) : m_size(group_size) - { - const int rc = pthread_barrier_init(&m_barrier, NULL, group_size); - if (qvi_unlikely(rc != 0)) throw qvi_runtime_error(); - } + ); /** * This function shall be called by pthread_create() to finish group * construction. This function is called by the pthreads and NOT their @@ -83,60 +78,39 @@ struct qvi_pthread_group_s { /** Destructor. */ ~qvi_pthread_group_s(void); - int - size(void) - { - std::lock_guard guard(m_mutex); - return m_size; - } + qvi_task_t * + task(void); int - rank(void) - { - std::lock_guard guard(m_mutex); - return m_tid2rank.at(qvi_gettid()); - } + size(void); - qvi_task_t * - task(void) - { - std::lock_guard guard(m_mutex); - return m_tid2task.at(qvi_gettid()); - } + int + rank(void); int barrier(void); int - create_from_split( - int, - int, - qvi_pthread_group_s ** - ) { - // TODO(skg) - return QV_ERR_NOT_SUPPORTED; - } + split( + int color, + int key, + qvi_pthread_group_s **child + ); int - gather_bbuffs( - qvi_bbuff_t *, - int, - bool *, - qvi_bbuff_t *** - ) { - // TODO(skg) - return QV_ERR_NOT_SUPPORTED; - } + gather( + qvi_bbuff_t *txbuff, + int root, + bool *shared, + qvi_bbuff_t ***rxbuffs + ); int - scatter_bbuffs( - qvi_bbuff_t **, - int, - qvi_bbuff_t ** - ) { - // TODO(skg) - return QV_ERR_NOT_SUPPORTED; - } + scatter( + qvi_bbuff_t **txbuffs, + int root, + qvi_bbuff_t **rxbuff + ); }; typedef struct qvi_pthread_group_s qvi_pthread_group_t; diff --git a/tests/test-pthread-split.c b/tests/test-pthread-split.c index 7423b81..9e9be79 100644 --- a/tests/test-pthread-split.c +++ b/tests/test-pthread-split.c @@ -159,7 +159,7 @@ main(void) } pthread_t thid2[nthreads]; - for(int i = 0 ; i < nthreads; ++i) { + for (int i = 0 ; i < nthreads; ++i) { const int ptrc = qv_pthread_create( &thid2[i], attr, thread_work, &thargs2[i], th_scopes[i] ); @@ -177,7 +177,7 @@ main(void) //fprintf(stdout,"Thread finished with '%s'\n", (char *)ret); } - /* Clean up */ + // Clean up. rc = qv_pthread_scopes_free(nthreads, th_scopes); if (rc != QV_SUCCESS) { ers = "qv_pthread_scope_free() failed";