Skip to content

Commit

Permalink
Checkpoint some pthread group work. (#239)
Browse files Browse the repository at this point in the history
Adding full pthread group support is non-trivial, so we will leave group
splitting as not supported until we have some cycles to dedicate to this. The
basic structure is there, but we need nice way to share data across the
threads during gather, scatter, etc.

Signed-off-by: Samuel K. Gutierrez <samuel@lanl.gov>
  • Loading branch information
samuelkgutierrez authored Jul 24, 2024
1 parent 039eec6 commit 3f7ebb2
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 87 deletions.
29 changes: 17 additions & 12 deletions src/qvi-group-pthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,33 @@
#include "qvi-group-pthread.h"
#include "qvi-utils.h"

qvi_group_pthread_s::qvi_group_pthread_s(
int group_size
) {
const int rc = qvi_new(&thgroup, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error();
}

qvi_group_pthread_s::~qvi_group_pthread_s(void)
{
qvi_delete(&thgroup);
}

int
qvi_group_pthread_s::self(
qvi_group_t **child
qvi_group_t **
) {
constexpr int group_size = 1;
qvi_group_pthread_t *ichild = nullptr;
// Create a group containing a single thread.
const int rc = qvi_new(&ichild, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) {
qvi_delete(&ichild);
}
*child = ichild;
return rc;
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

int
qvi_group_pthread_s::split(
int,
int,
qvi_group_t **
qvi_group_s **
) {
// TODO(skg) Test to see if we can do this.
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

Expand Down
16 changes: 5 additions & 11 deletions src/qvi-group-pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,9 @@ struct qvi_group_pthread_s : public qvi_group_s {
/** Constructor. */
qvi_group_pthread_s(
int group_size
) {
const int rc = qvi_new(&thgroup, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error();
}
);
/** Destructor. */
virtual ~qvi_group_pthread_s(void)
{
qvi_delete(&thgroup);
}
virtual ~qvi_group_pthread_s(void);

virtual qvi_task_t *
task(void)
Expand Down Expand Up @@ -78,7 +72,7 @@ struct qvi_group_pthread_s : public qvi_group_s {
int,
qvi_group_s **
) {
// TODO(skg) Need to test this.
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

Expand All @@ -96,7 +90,7 @@ struct qvi_group_pthread_s : public qvi_group_s {
bool *shared,
qvi_bbuff_t ***rxbuffs
) {
return thgroup->gather_bbuffs(
return thgroup->gather(
txbuff, root, shared, rxbuffs
);
}
Expand All @@ -107,7 +101,7 @@ struct qvi_group_pthread_s : public qvi_group_s {
int root,
qvi_bbuff_t **rxbuff
) {
return thgroup->scatter_bbuffs(
return thgroup->scatter(
txbuffs, root, rxbuff
);
}
Expand Down
16 changes: 16 additions & 0 deletions src/qvi-group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ qvi_group_s::thsplit(
return rc;
}

int
qvi_group_s::next_id(
qvi_group_id_t *gid
) {
// Global group ID. Note that we pad its initial value so that other
// infrastructure (e.g., QVI_MPI_GROUP_WORLD) will never equal or exceed
// this value.
static std::atomic<qvi_group_id_t> group_id(64);
if (group_id == UINT64_MAX) {
qvi_log_error("Group ID space exhausted");
return QV_ERR_OOR;
}
*gid = group_id++;
return QV_SUCCESS;
}

/*
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
*/
13 changes: 1 addition & 12 deletions src/qvi-group.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,7 @@ struct qvi_group_s : qvi_refc_s {
static int
next_id(
qvi_group_id_t *gid
) {
// Global group ID. Note that we pad its initial value so that other
// infrastructure (e.g., QVI_MPI_GROUP_WORLD) will never equal or exceed
// this value.
static std::atomic<qvi_group_id_t> group_id(64);
if (group_id == UINT64_MAX) {
qvi_log_error("Group ID space exhausted");
return QV_ERR_OOR;
}
*gid = group_id++;
return QV_SUCCESS;
}
);
};
typedef struct qvi_group_s qvi_group_t;

Expand Down
2 changes: 1 addition & 1 deletion src/qvi-mpi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ qvi_mpi_group_gather_bbuffs(
if (group_id == root) {
// Zero initialize array of pointers to nullptr.
bbuffs = new qvi_bbuff_t*[group_size]();

// TODO(skg) Use dup.
byte_t *bytepos = allbytes.data();
for (int i = 0; i < group_size; ++i) {
rc = qvi_bbuff_new(&bbuffs[i]);
Expand Down
61 changes: 60 additions & 1 deletion src/qvi-pthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@

#include "qvi-pthread.h"
#include "qvi-task.h" // IWYU pragma: keep
#include "qvi-bbuff.h"
#include "qvi-utils.h"

qvi_pthread_group_s::qvi_pthread_group_s(
int group_size
) : m_size(group_size)
{
const int rc = pthread_barrier_init(&m_barrier, NULL, group_size);
if (qvi_unlikely(rc != 0)) throw qvi_runtime_error();
}

void *
qvi_pthread_group_s::call_first_from_pthread_create(
void *arg
Expand Down Expand Up @@ -76,6 +83,27 @@ qvi_pthread_group_s::~qvi_pthread_group_s(void)
pthread_barrier_destroy(&m_barrier);
}

int
qvi_pthread_group_s::size(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_size;
}

int
qvi_pthread_group_s::rank(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2rank.at(qvi_gettid());
}

qvi_task_t *
qvi_pthread_group_s::task(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2task.at(qvi_gettid());
}

int
qvi_pthread_group_s::barrier(void)
{
Expand All @@ -86,6 +114,37 @@ qvi_pthread_group_s::barrier(void)
return QV_SUCCESS;
}

int
qvi_pthread_group_s::split(
int,
int,
qvi_pthread_group_s **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

int
qvi_pthread_group_s::gather(
qvi_bbuff_t *,
int,
bool *,
qvi_bbuff_t ***
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

int
qvi_pthread_group_s::scatter(
qvi_bbuff_t **,
int,
qvi_bbuff_t **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

/*
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
*/
70 changes: 22 additions & 48 deletions src/qvi-pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#define QVI_PTHREAD_H

#include "qvi-common.h"
#include "qvi-utils.h"

typedef void *(*qvi_pthread_routine_fun_ptr_t)(void *);

Expand Down Expand Up @@ -66,11 +65,7 @@ struct qvi_pthread_group_s {
*/
qvi_pthread_group_s(
int group_size
) : m_size(group_size)
{
const int rc = pthread_barrier_init(&m_barrier, NULL, group_size);
if (qvi_unlikely(rc != 0)) throw qvi_runtime_error();
}
);
/**
* This function shall be called by pthread_create() to finish group
* construction. This function is called by the pthreads and NOT their
Expand All @@ -83,60 +78,39 @@ struct qvi_pthread_group_s {
/** Destructor. */
~qvi_pthread_group_s(void);

int
size(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_size;
}
qvi_task_t *
task(void);

int
rank(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2rank.at(qvi_gettid());
}
size(void);

qvi_task_t *
task(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2task.at(qvi_gettid());
}
int
rank(void);

int
barrier(void);

int
create_from_split(
int,
int,
qvi_pthread_group_s **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}
split(
int color,
int key,
qvi_pthread_group_s **child
);

int
gather_bbuffs(
qvi_bbuff_t *,
int,
bool *,
qvi_bbuff_t ***
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}
gather(
qvi_bbuff_t *txbuff,
int root,
bool *shared,
qvi_bbuff_t ***rxbuffs
);

int
scatter_bbuffs(
qvi_bbuff_t **,
int,
qvi_bbuff_t **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}
scatter(
qvi_bbuff_t **txbuffs,
int root,
qvi_bbuff_t **rxbuff
);
};
typedef struct qvi_pthread_group_s qvi_pthread_group_t;

Expand Down
4 changes: 2 additions & 2 deletions tests/test-pthread-split.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ main(void)
}

pthread_t thid2[nthreads];
for(int i = 0 ; i < nthreads; ++i) {
for (int i = 0 ; i < nthreads; ++i) {
const int ptrc = qv_pthread_create(
&thid2[i], attr, thread_work, &thargs2[i], th_scopes[i]
);
Expand All @@ -177,7 +177,7 @@ main(void)
//fprintf(stdout,"Thread finished with '%s'\n", (char *)ret);
}

/* Clean up */
// Clean up.
rc = qv_pthread_scopes_free(nthreads, th_scopes);
if (rc != QV_SUCCESS) {
ers = "qv_pthread_scope_free() failed";
Expand Down

0 comments on commit 3f7ebb2

Please sign in to comment.