Skip to content

Commit

Permalink
Merge pull request #126 from BerkeleyLab/implement-teams
Browse files Browse the repository at this point in the history
Implement teams support
  • Loading branch information
everythingfunctional authored Aug 14, 2024
2 parents c8b9d42 + 8a21765 commit f260d3e
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 40 deletions.
12 changes: 7 additions & 5 deletions install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,13 @@ RUN_FPM_SH="build/run-fpm.sh"
cat << EOF > $RUN_FPM_SH
#!/bin/sh
#-- DO NOT EDIT -- created by caffeine/install.sh
"${FPM}" "\$@" \
--compiler "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_FC`" \
--c-compiler "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_CC`" \
--c-flag "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_CFLAGS`" \
--link-flag "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_LDFLAGS`"
fpm_sub_cmd=\$1; shift
"${FPM}" "\$fpm_sub_cmd" \\
--compiler "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_FC`" \\
--c-compiler "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_CC`" \\
--c-flag "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_CFLAGS`" \\
--link-flag "`$PKG_CONFIG caffeine --variable=CAFFEINE_FPM_LDFLAGS`" \\
"\$@"
EOF
chmod u+x $RUN_FPM_SH

Expand Down
2 changes: 1 addition & 1 deletion manifest/fpm.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ maintainer = "rouson@lbl.gov"
copyright = "2021-2024 UC Regents"

[dev-dependencies]
veggies = {git = "https://gitlab.com/everythingfunctional/veggies", tag = "v1.0.5"}
veggies = {git = "https://gitlab.com/everythingfunctional/veggies", tag = "v1.1.2"}
iso_varying_string = {git = "https://gitlab.com/everythingfunctional/iso_varying_string.git", tag = "v3.0.4"}

[build]
39 changes: 28 additions & 11 deletions src/caffeine/allocation_s.f90
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
c_f_procpointer, &
c_loc, &
c_associated, &
c_null_ptr, &
c_null_funptr

implicit none
Expand All @@ -28,6 +27,12 @@
coarray_size = product(ubounds-lbounds+1)*element_size

me = caf_this_image(current_team%info%gex_team)
if (caf_have_child_teams()) then
! Free the child team space to make sure we have space to allocate the coarray
if (me == 1) then
call caf_deallocate(current_team%info%heap_mspace, current_team%info%child_heap_info%allocated_memory)
end if
end if
if (me == 1) then
handle_size = c_sizeof(unused)
total_size = handle_size + coarray_size
Expand All @@ -49,9 +54,12 @@
coarray_handle%info%final_func = final_func
coarray_handle%info%lcobounds(1:size(lcobounds)) = lcobounds
coarray_handle%info%ucobounds(1:size(ucobounds)) = ucobounds
call add_to_team_list(current_team, coarray_handle)
call add_to_team_list(coarray_handle)

allocated_memory = coarray_handle%info%coarray_data
if (caf_have_child_teams()) then
call caf_establish_child_heap
end if
end procedure

module procedure prif_allocate
Expand Down Expand Up @@ -118,36 +126,45 @@
call caf_deallocate(current_team%info%heap_mspace, c_loc(coarray_handles(i)%info))
end do
if (present(stat)) stat = 0
if (caf_have_child_teams()) then
! reclaim any free space possible for the child teams to use
if (caf_this_image(current_team%info%gex_team) == 1) then
call caf_deallocate(current_team%info%heap_mspace, current_team%info%child_heap_info%allocated_memory)
end if
call caf_establish_child_heap
end if
end procedure

module procedure prif_deallocate
call caf_deallocate(non_symmetric_heap_mspace, mem)
end procedure

subroutine add_to_team_list(current_team, coarray_handle)
type(prif_team_type), intent(inout) :: current_team
type(prif_coarray_handle), intent(inout) :: coarray_handle
subroutine add_to_team_list(coarray_handle)
type(prif_coarray_handle), intent(in) :: coarray_handle

if (associated(current_team%info%coarrays)) then
current_team%info%coarrays%previous_handle = c_loc(coarray_handle%info)
coarray_handle%info%next_handle = c_loc(current_team%info%coarrays)
coarray_handle%info%previous_handle = c_null_ptr
current_team%info%coarrays => coarray_handle%info
else
current_team%info%coarrays => coarray_handle%info
coarray_handle%info%next_handle = c_null_ptr
coarray_handle%info%previous_handle = c_null_ptr
end if
current_team%info%coarrays => coarray_handle%info
end subroutine

subroutine remove_from_team_list(coarray_handle)
type(prif_coarray_handle), intent(in) :: coarray_handle

type(handle_data), pointer :: tmp_data

if (&
.not.c_associated(coarray_handle%info%previous_handle) &
.and. .not.c_associated(coarray_handle%info%next_handle)) then
nullify(current_team%info%coarrays)
return
end if
if (c_associated(coarray_handle%info%previous_handle)) then
call c_f_pointer(coarray_handle%info%previous_handle, tmp_data)
tmp_data%next_handle = coarray_handle%info%next_handle
else
call c_f_pointer(coarray_handle%info%next_handle, current_team%info%coarrays)
end if
if (c_associated(coarray_handle%info%next_handle)) then
call c_f_pointer(coarray_handle%info%next_handle, tmp_data)
Expand Down
55 changes: 44 additions & 11 deletions src/caffeine/caffeine.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ int caf_this_image(gex_TM_t team)
}

// NOTE: gex_TM_T is a typedef to a C pointer, so the `gex_TM_t* initial_team` arg in the C signature matches the BIND(C) interface of an `intent(out)` arg of type `c_ptr` for the same argument
void caf_caffeinate(mspace* symmetric_heap, intptr_t* symmetric_heap_start, mspace* non_symmetric_heap, gex_TM_t* initial_team)
{
GASNET_SAFE(gex_Client_Init(&myclient, &myep, &myworldteam, "caffeine", NULL, NULL, 0));
void caf_caffeinate(
mspace* symmetric_heap,
intptr_t* symmetric_heap_start,
intptr_t* symmetric_heap_size,
mspace* non_symmetric_heap,
gex_TM_t* initial_team
) {
GASNET_SAFE(gex_Client_Init(&myclient, &myep, initial_team, "caffeine", NULL, NULL, 0));

// query largest possible segment GASNet can give us of the same size across all processes:
size_t max_seg = gasnet_getMaxGlobalSegmentSize();
Expand All @@ -59,7 +64,7 @@ void caf_caffeinate(mspace* symmetric_heap, intptr_t* symmetric_heap_start, mspa
// TODO: issue a console warning here instead of silently capping
segsz = MIN(segsz,max_seg);

GASNET_SAFE(gex_Segment_Attach(&mysegment, myworldteam, segsz));
GASNET_SAFE(gex_Segment_Attach(&mysegment, *initial_team, segsz));

*symmetric_heap_start = (intptr_t)gex_Segment_QueryAddr(mysegment);
size_t total_heap_size = gex_Segment_QuerySize(mysegment);
Expand All @@ -72,16 +77,16 @@ void caf_caffeinate(mspace* symmetric_heap, intptr_t* symmetric_heap_start, mspa
assert(non_symmetric_fraction > 0 && non_symmetric_fraction < 1); // TODO: real error reporting

size_t non_symmetric_heap_size = total_heap_size * non_symmetric_fraction;
size_t symmetric_heap_size = total_heap_size - non_symmetric_heap_size;
intptr_t non_symmetric_heap_start = *symmetric_heap_start + symmetric_heap_size;
*symmetric_heap_size = total_heap_size - non_symmetric_heap_size;
intptr_t non_symmetric_heap_start = *symmetric_heap_start + *symmetric_heap_size;

if (caf_this_image(myworldteam) == 1) {
*symmetric_heap = create_mspace_with_base((void*)*symmetric_heap_start, symmetric_heap_size, 0);
mspace_set_footprint_limit(*symmetric_heap, symmetric_heap_size);
if (caf_this_image(*initial_team) == 1) {
*symmetric_heap = create_mspace_with_base((void*)*symmetric_heap_start, *symmetric_heap_size, 0);
mspace_set_footprint_limit(*symmetric_heap, *symmetric_heap_size);
}
*non_symmetric_heap = create_mspace_with_base((void*)non_symmetric_heap_start, non_symmetric_heap_size, 0);
mspace_set_footprint_limit(*non_symmetric_heap, non_symmetric_heap_size);
*initial_team = myworldteam;
myworldteam = *initial_team;
}

void caf_decaffeinate(int exit_code)
Expand All @@ -96,14 +101,37 @@ int caf_num_images(gex_TM_t team)

void* caf_allocate(mspace heap, size_t bytes)
{
return mspace_memalign(heap, 8, bytes);
void* allocated_space = mspace_memalign(heap, 8, bytes);
if (!allocated_space) // uh-oh, something went wrong..
gasnett_fatalerror("caf_allocate failed to mspace_memalign(%"PRIuSZ")",
bytes);
return allocated_space;
}

void* caf_allocate_remaining(mspace heap, void** allocated_space, size_t* allocated_size)
{
// The following doesn't necessarily give us all remaining space
// nor necessarily the largest open space, but in practice is likely
// to work out that way
struct mallinfo heap_info = mspace_mallinfo(heap);
*allocated_size = heap_info.keepcost * 0.9f;
*allocated_space = mspace_memalign(heap, 8, *allocated_size);
if (!*allocated_space) // uh-oh, something went wrong..
gasnett_fatalerror("caf_allocate_remaining failed to mspace_memalign(%"PRIuSZ")",
*allocated_size);
}

void caf_deallocate(mspace heap, void* mem)
{
mspace_free(heap, mem);
}

void caf_establish_mspace(mspace* heap, void* heap_start, size_t heap_size)
{
*heap = create_mspace_with_base(heap_start, heap_size, 0);
mspace_set_footprint_limit(*heap, heap_size);
}

// take address in a segment and convert to an address on given image
intptr_t caf_convert_base_addr(void* addr, int image)
{
Expand Down Expand Up @@ -259,6 +287,11 @@ size_t caf_elem_len(CFI_cdesc_t* a_desc)
return a_desc->elem_len;
}

void caf_form_team(gex_TM_t current_team, gex_TM_t* new_team, intmax_t team_number, int new_index)
{
gex_TM_Split(new_team, current_team, team_number, new_index, NULL, 0, GEX_FLAG_TM_NO_SCRATCH);
}

bool caf_numeric_type(CFI_cdesc_t* a_desc)
{
switch (a_desc->type)
Expand Down
2 changes: 1 addition & 1 deletion src/caffeine/collective_subroutines/co_reduce_s.f90
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
! Terms of use are as specified in LICENSE.txt
submodule(prif:prif_private_s) co_reduce_s
use iso_c_binding, only : &
c_loc, c_null_ptr, c_funloc, c_associated, c_f_pointer, c_f_procpointer, c_char, c_int64_t, c_double, &
c_loc, c_funloc, c_associated, c_f_pointer, c_f_procpointer, c_char, c_int64_t, c_double, &
c_float, c_int32_t

implicit none
Expand Down
51 changes: 49 additions & 2 deletions src/caffeine/prif_private_s.f90
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ pure module subroutine assert(assertion, description, diagnostics)

! ________ Program initiation and finalization ___________

subroutine caf_caffeinate(symmetric_heap, symmetric_heap_start, non_symmetric_heap, initial_team) bind(C)
subroutine caf_caffeinate( &
symmetric_heap, &
symmetric_heap_start, &
symmetric_heap_size, &
non_symmetric_heap, &
initial_team) &
bind(C)
import c_ptr, c_intptr_t
implicit none
type(c_ptr), intent(out) :: symmetric_heap
integer(c_intptr_t), intent(out) :: symmetric_heap_start
integer(c_intptr_t), intent(out) :: symmetric_heap_start, symmetric_heap_size
type(c_ptr), intent(out) :: non_symmetric_heap
type(c_ptr), intent(out) :: initial_team
end subroutine
Expand Down Expand Up @@ -62,13 +68,29 @@ function caf_allocate(mspace, bytes) result(ptr) bind(c)
type(c_ptr) :: ptr
end function

subroutine caf_allocate_remaining(mspace, allocated_space, allocated_size) bind(c)
import c_size_t, c_ptr
implicit none
type(c_ptr), intent(in), value :: mspace
type(c_ptr), intent(out) :: allocated_space
integer(c_size_t), intent(out) :: allocated_size
end subroutine

subroutine caf_deallocate(mspace, mem) bind(c)
import c_ptr
implicit none
type(c_ptr), intent(in), value :: mspace
type(c_ptr), intent(in), value :: mem
end subroutine

subroutine caf_establish_mspace(mspace, mem, mem_size) bind(c)
import c_size_t, c_ptr
implicit none
type(c_ptr), intent(out) :: mspace
type(c_ptr), intent(in), value :: mem
integer(c_size_t), intent(in), value :: mem_size
end subroutine

! ___________________ PRIF Queries ______________________

module function caf_convert_base_addr(addr, image) result(ptr) bind(c)
Expand Down Expand Up @@ -185,6 +207,15 @@ pure function caf_elem_len(a) result(a_elem_len) bind(C)
integer(c_size_t), target :: a_elem_len
end function

subroutine caf_form_team(current_team, new_team, team_number, new_index) bind(C)
!! void caf_form_team(gex_TM_t* current_team, gex_TM_t* new_team, intmax_t team_number, int new_index);
import c_ptr, c_int, c_intmax_t
type(c_ptr), intent(in), value :: current_team
type(c_ptr), intent(out) :: new_team
integer(c_intmax_t), intent(in), value :: team_number
integer(c_int), intent(in), value :: new_index
end subroutine

end interface

contains
Expand Down Expand Up @@ -238,4 +269,20 @@ pure function optional_value(var) result(c_val)
end if
end function

subroutine caf_establish_child_heap
if (caf_this_image(current_team%info%gex_team) == 1) then
call caf_allocate_remaining( &
current_team%info%heap_mspace, &
current_team%info%child_heap_info%allocated_memory, &
current_team%info%child_heap_info%size)
current_team%info%child_heap_info%offset = &
as_int(current_team%info%child_heap_info%allocated_memory) - current_team%info%heap_start
end if
call prif_co_broadcast(current_team%info%child_heap_info, 1)
end subroutine

logical function caf_have_child_teams()
caf_have_child_teams = associated(current_team%info%child_heap_info)
end function

end submodule prif_private_s
2 changes: 1 addition & 1 deletion src/caffeine/program_startup_s.F90
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
! Copyright (c), The Regents of the University of California
! Terms of use are as specified in LICENSE.txt
submodule(prif:prif_private_s) program_startup_s

implicit none
contains

Expand All @@ -14,6 +13,7 @@
call caf_caffeinate( &
initial_team%heap_mspace, &
initial_team%heap_start, &
initial_team%heap_size, &
non_symmetric_heap_mspace, &
initial_team%gex_team)
current_team%info => initial_team
Expand Down
Loading

0 comments on commit f260d3e

Please sign in to comment.