Skip to content

Commit 79b6886

Browse files
authored
Merge pull request #458 from dash-project/feat-dash-asyncatomic
Add support for async atomics and fix constness in GlobRefs
2 parents 2e59fa1 + 68bc30c commit 79b6886

File tree

15 files changed

+1195
-202
lines changed

15 files changed

+1195
-202
lines changed

dart-if/include/dash/dart/if/dart_communication.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,34 @@ dart_ret_t dart_accumulate(
249249
dart_datatype_t dtype,
250250
dart_operation_t op) DART_NOTHROW;
251251

252+
253+
/**
254+
* Perform an element-wise atomic update on the values pointed to by \c gptr
255+
* by applying the operation \c op with the corresponding value in \c value
256+
* on them.
257+
*
258+
* DART Equivalent to MPI_Accumulate. In contrast to \ref dart_accumulate, this
259+
* function blocks until the local buffer can be reused.
260+
*
261+
* \param gptr A global pointer determining the target of the accumulate
262+
* operation.
263+
* \param values The local buffer holding the elements to accumulate.
264+
* \param nelem The number of local elements to accumulate per unit.
265+
* \param dtype The data type to use in the accumulate operation \c op.
266+
* \param op The accumulation operation to perform.
267+
*
268+
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
269+
*
270+
* \threadsafe_data{team}
271+
* \ingroup DartCommunication
272+
*/
273+
dart_ret_t dart_accumulate_blocking_local(
274+
dart_gptr_t gptr,
275+
const void * values,
276+
size_t nelem,
277+
dart_datatype_t dtype,
278+
dart_operation_t op) DART_NOTHROW;
279+
252280
/**
253281
* Perform an element-wise atomic update on the value of type \c dtype pointed
254282
* to by \c gptr by applying the operation \c op with \c value on it and

dart-impl/mpi/src/dart_communication.c

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -589,16 +589,13 @@ dart_ret_t dart_accumulate(
589589
dart_datatype_t dtype,
590590
dart_operation_t op)
591591
{
592-
MPI_Datatype mpi_dtype;
593-
MPI_Op mpi_op;
594592
dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid);
595593
uint64_t offset = gptr.addr_or_offs.offset;
596594
int16_t seg_id = gptr.segid;
597595
dart_team_t teamid = gptr.teamid;
598596

599597
CHECK_IS_BASICTYPE(dtype);
600-
mpi_dtype = dart__mpi__datatype_struct(dtype)->basic.mpi_type;
601-
mpi_op = dart__mpi__op(op);
598+
MPI_Op mpi_op = dart__mpi__op(op);
602599

603600

604601
dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid);
@@ -651,24 +648,118 @@ dart_ret_t dart_accumulate(
651648
DART_LOG_TRACE("dart_accumulate: MPI_Accumulate (src %p, size %zu)",
652649
src_ptr, remainder);
653650

654-
CHECK_MPI_RET(
655-
MPI_Accumulate(
651+
MPI_Datatype mpi_dtype = dart__mpi__datatype_struct(dtype)->basic.mpi_type;
652+
CHECK_MPI_RET(
653+
MPI_Accumulate(
654+
src_ptr,
655+
remainder,
656+
mpi_dtype,
657+
team_unit_id.id,
658+
offset,
659+
remainder,
660+
mpi_dtype,
661+
mpi_op,
662+
win),
663+
"MPI_Accumulate");
664+
}
665+
666+
DART_LOG_DEBUG("dart_accumulate > finished");
667+
return DART_OK;
668+
}
669+
670+
671+
dart_ret_t dart_accumulate_blocking_local(
672+
dart_gptr_t gptr,
673+
const void * values,
674+
size_t nelem,
675+
dart_datatype_t dtype,
676+
dart_operation_t op)
677+
{
678+
dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid);
679+
uint64_t offset = gptr.addr_or_offs.offset;
680+
int16_t seg_id = gptr.segid;
681+
dart_team_t teamid = gptr.teamid;
682+
683+
CHECK_IS_BASICTYPE(dtype);
684+
MPI_Op mpi_op = dart__mpi__op(op);
685+
686+
dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid);
687+
if (dart__unlikely(team_data == NULL)) {
688+
DART_LOG_ERROR("dart_accumulate ! failed: Unknown team %i!", teamid);
689+
return DART_ERR_INVAL;
690+
}
691+
692+
CHECK_UNITID_RANGE(team_unit_id, team_data);
693+
694+
DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%ld op:%d unit:%d",
695+
nelem, dtype, op, team_unit_id.id);
696+
697+
dart_segment_info_t *seginfo = dart_segment_get_info(
698+
&(team_data->segdata), seg_id);
699+
if (dart__unlikely(seginfo == NULL)) {
700+
DART_LOG_ERROR("dart_accumulate ! "
701+
"Unknown segment %i on team %i", seg_id, teamid);
702+
return DART_ERR_INVAL;
703+
}
704+
705+
MPI_Win win = seginfo->win;
706+
offset += dart_segment_disp(seginfo, team_unit_id);
707+
708+
// chunk up the put
709+
const size_t nchunks = nelem / MAX_CONTIG_ELEMENTS;
710+
const size_t remainder = nelem % MAX_CONTIG_ELEMENTS;
711+
const char * src_ptr = (const char*) values;
712+
713+
MPI_Request reqs[2];
714+
int num_reqs = 0;
715+
716+
if (nchunks > 0) {
717+
DART_LOG_TRACE("dart_accumulate: MPI_Raccumulate (src %p, size %zu)",
718+
src_ptr, nchunks * MAX_CONTIG_ELEMENTS);
719+
CHECK_MPI_RET(
720+
MPI_Raccumulate(
656721
src_ptr,
657-
remainder,
658-
mpi_dtype,
722+
nchunks,
723+
dart__mpi__datatype_maxtype(dtype),
659724
team_unit_id.id,
660725
offset,
661-
remainder,
662-
mpi_dtype,
726+
nchunks,
727+
dart__mpi__datatype_maxtype(dtype),
663728
mpi_op,
664-
win),
665-
"MPI_Accumulate");
729+
win,
730+
&reqs[num_reqs++]),
731+
"MPI_Accumulate");
732+
offset += nchunks * MAX_CONTIG_ELEMENTS;
733+
src_ptr += nchunks * MAX_CONTIG_ELEMENTS;
666734
}
667735

736+
if (remainder > 0) {
737+
DART_LOG_TRACE("dart_accumulate: MPI_Raccumulate (src %p, size %zu)",
738+
src_ptr, remainder);
739+
740+
MPI_Datatype mpi_dtype = dart__mpi__datatype_struct(dtype)->basic.mpi_type;
741+
CHECK_MPI_RET(
742+
MPI_Raccumulate(
743+
src_ptr,
744+
remainder,
745+
mpi_dtype,
746+
team_unit_id.id,
747+
offset,
748+
remainder,
749+
mpi_dtype,
750+
mpi_op,
751+
win,
752+
&reqs[num_reqs++]),
753+
"MPI_Accumulate");
754+
}
755+
756+
MPI_Waitall(num_reqs, reqs, MPI_STATUSES_IGNORE);
757+
668758
DART_LOG_DEBUG("dart_accumulate > finished");
669759
return DART_OK;
670760
}
671761

762+
672763
dart_ret_t dart_fetch_and_op(
673764
dart_gptr_t gptr,
674765
const void * value,

dash/include/dash/Array.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,8 @@ class AsyncArrayRef
286286
typedef T * pointer;
287287
typedef const T * const_pointer;
288288

289-
typedef GlobAsyncRef< T> async_reference;
290-
typedef GlobAsyncRef<const T> const_async_reference;
289+
typedef GlobAsyncRef<T> async_reference;
290+
typedef typename GlobAsyncRef<T>::const_type const_async_reference;
291291

292292
public:
293293
typedef std::integral_constant<dim_t, 1>
@@ -363,7 +363,7 @@ class AsyncArrayRef
363363
* Subscript operator, access to local array element at given position.
364364
*/
365365
constexpr const_async_reference operator[](const size_type n) const {
366-
return async_reference(
366+
return const_async_reference(
367367
(*(_array->begin() + n)).dart_gptr());
368368
}
369369

@@ -660,8 +660,8 @@ class Array
660660
typedef std::reverse_iterator< iterator> reverse_iterator;
661661
typedef std::reverse_iterator<const_iterator> const_reverse_iterator;
662662

663-
typedef GlobRef< value_type> reference;
664-
typedef GlobRef<const value_type> const_reference;
663+
typedef GlobRef<value_type> reference;
664+
typedef typename GlobRef<value_type>::const_type const_reference;
665665

666666
typedef GlobIter< value_type, PatternType> pointer;
667667
typedef GlobIter<const value_type, PatternType> const_pointer;

dash/include/dash/Atomic.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace dash {
3030
* dash::atomic::load(array.lbegin()) // not allowed
3131
* \endcode
3232
* \endnote
33-
*
33+
*
3434
* \code
3535
* dash::Array<dash::Atomic<int>> array(100);
3636
* // supported as Atomic<value_t>(value_t T) is available
@@ -115,6 +115,7 @@ std::ostream & operator<<(
115115

116116
#include <dash/atomic/Type_traits.h>
117117
#include <dash/atomic/GlobAtomicRef.h>
118+
#include <dash/atomic/GlobAtomicAsyncRef.h>
118119
#include <dash/atomic/Operation.h>
119120

120121
#endif // DASH__ATOMIC_H__INCLUDED

0 commit comments

Comments
 (0)