Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New feature: intra-node write aggregation #156

Merged
merged 13 commits into from
Nov 7, 2024
7 changes: 7 additions & 0 deletions benchmarks/C/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -72,6 +78,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
rm -f ${OUTDIR}/$i.nc
rm -f ${OUTDIR}/$i.bb.nc
rm -f ${OUTDIR}/$i.nc4
Expand Down
7 changes: 7 additions & 0 deletions benchmarks/FLASH-IO/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -72,6 +78,7 @@ for i in ${check_PROGRAMS} ; do
${MPIRUN} ${NCMPIDIFF} -q ${TESTOUTDIR}/$i.ncmpi_plt_crn_0000.nc ${TESTOUTDIR}/$i.bb.ncmpi_plt_crn_0000.nc
fi
done
done
rm -f ${OUTDIR}/$i.ncmpi_chk_0000.nc
rm -f ${OUTDIR}/$i.ncmpi_plt_cnt_0000.nc
rm -f ${OUTDIR}/$i.ncmpi_plt_crn_0000.nc
Expand Down
3 changes: 3 additions & 0 deletions examples/C/column_wise.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ int main(int argc, char** argv)

/* the global array is NY * (NX * nprocs) */
G_NX = NX * nprocs;
if (verbose && rank == 0)
printf("Global variable of size %d x %d\n",NY,G_NX);

myOff = NX * rank;
myNX = NX;
if (verbose) printf("%2d: myOff=%3d myNX=%3d\n",rank,myOff,myNX);
Expand Down
12 changes: 10 additions & 2 deletions examples/C/i_varn_int64.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ static int check_contents(int ncid, int *varid)
printf("Expected file contents [%d][%d]=%lld, but got %lld\n",
i,j,expected[i][j],r_buffer[j]);
nerrs++;
i = 4; /* break loop i */
break;
}
}
free(r_buffer);
Expand Down Expand Up @@ -349,9 +351,12 @@ int main(int argc, char** argv)
/* check buffer contents */
for (i=0; i<nreqs; i++) {
for (j=0; j<req_lens[i]; j++)
if (buffer[i][j] != rank)
if (buffer[i][j] != rank) {
printf("Expected read buf[%d][%d]=%d, but got %lld\n",
i,j,rank,buffer[i][j]);
i = nreqs; /* break loop i */
break;
}
}

for (i=0; i<nreqs; i++) free(buffer[i]);
Expand Down Expand Up @@ -392,9 +397,12 @@ int main(int argc, char** argv)
/* check buffer contents */
for (i=0; i<nreqs; i++) {
for (j=0; j<req_lens[i]; j++)
if (buffer[i][j*2] != rank)
if (buffer[i][j*2] != rank) {
printf("Expected read buf[%d][%d]=%d, but got %lld\n",
i,j*2,rank,buffer[i][j*2]);
i = nreqs; /* break loop i */
break;
}
}

err = ncmpi_close(ncid);
Expand Down
7 changes: 7 additions & 0 deletions examples/C/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -92,6 +98,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi

done
done
# delete output file
if test $i = get_vara ; then
Expand Down
7 changes: 7 additions & 0 deletions examples/CXX/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
# delete output file
if test $i = get_vara ; then
rm -f ${OUTDIR}/put_vara.nc
Expand Down
7 changes: 7 additions & 0 deletions examples/F77/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
# delete output file
rm -f ${OUTDIR}/$i.nc
rm -f ${OUTDIR}/$i.bb.nc
Expand Down
7 changes: 7 additions & 0 deletions examples/F90/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
# delete output file
rm -f ${OUTDIR}/$i.nc
rm -f ${OUTDIR}/$i.bb.nc
Expand Down
7 changes: 7 additions & 0 deletions examples/adios/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand All @@ -42,5 +48,6 @@ for i in ${check_PROGRAMS} ; do
echo "PASS: C parallel run on $1 processes --------------- $i"
fi
done
done
done

7 changes: 7 additions & 0 deletions examples/burst_buffer/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ unset PNETCDF_HINTS
for i in ${check_PROGRAMS} ; do
# echo "---- exec=$i"
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand All @@ -49,6 +55,7 @@ for i in ${check_PROGRAMS} ; do
${TESTSEQRUN} ${VALIDATOR} -q ${TESTOUTDIR}/$i.nc
# echo ""
done
done
# delete output files
rm -f ${OUTDIR}/$i.nc
done
Expand Down
7 changes: 7 additions & 0 deletions examples/tutorial/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -136,6 +142,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
done

rm -f ${OUTDIR}/pnetcdf-*.nc
Expand Down
16 changes: 12 additions & 4 deletions src/drivers/common/mem_alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ void ncmpii_add_mem_entry(void *buf,
/*----< ncmpii_del_mem_entry() >---------------------------------------------*/
/* delete a malloc entry from the table */
static
void ncmpii_del_mem_entry(void *buf)
int ncmpii_del_mem_entry(void *buf)
{
int err=0;

#ifdef ENABLE_THREAD_SAFE
pthread_mutex_lock(&lock);
#endif
Expand All @@ -146,6 +148,7 @@ void ncmpii_del_mem_entry(void *buf)
if (ret == NULL) {
fprintf(stderr, "Error at line %d file %s: tfind() buf=%p\n",
__LINE__,__FILE__,buf);
err = 1;
goto fn_exit;
}
/* free space for func and filename */
Expand All @@ -159,6 +162,7 @@ void ncmpii_del_mem_entry(void *buf)
if (ret == NULL) {
fprintf(stderr, "Error at line %d file %s: tdelete() buf=%p\n",
__LINE__,__FILE__,buf);
err = 1;
goto fn_exit;
}
free(tmp);
Expand All @@ -170,7 +174,7 @@ void ncmpii_del_mem_entry(void *buf)
#ifdef ENABLE_THREAD_SAFE
pthread_mutex_unlock(&lock);
#endif
return;
return err;
}
#endif

Expand Down Expand Up @@ -246,7 +250,9 @@ void *NCI_Realloc_fn(void *ptr,
}

#ifdef PNC_MALLOC_TRACE
ncmpii_del_mem_entry(ptr);
if (ncmpii_del_mem_entry(ptr) != 0)
fprintf(stderr, "realloc failed in file %s func %s line %d\n",
filename, func, lineno);
#endif
void *buf = (void *) realloc(ptr, size);
#ifdef PNETCDF_DEBUG
Expand Down Expand Up @@ -275,7 +281,9 @@ void NCI_Free_fn(void *ptr,
{
if (ptr == NULL) return;
#ifdef PNC_MALLOC_TRACE
ncmpii_del_mem_entry(ptr);
if (ncmpii_del_mem_entry(ptr) != 0)
fprintf(stderr, "free failed in file %s func %s line %d\n",
filename, func, lineno);
#endif
free(ptr);
}
Expand Down
3 changes: 2 additions & 1 deletion src/drivers/ncmpio/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ C_SRCS = ncmpio_driver.c \
ncmpio_fill.c \
ncmpio_util.c \
ncmpio_hash_func.c \
ncmpio_file_io.c
ncmpio_file_io.c \
ncmpio_intra_node.c

$(M4_SRCS:.m4=.c): Makefile

Expand Down
24 changes: 24 additions & 0 deletions src/drivers/ncmpio/ncmpio_NC.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,17 @@ struct NC {

char *path; /* file name */
struct NC *old; /* contains the previous NC during redef. */

/* Below are used for intra-node aggregation */
int num_aggrs_per_node; /* number of aggregators per compute node. Set
through a user hint. 0 to disable the
intra-node aggregation, -1 to let PnetCDF to
decide. This value must be the same among all
processes.
*/
int my_aggr; /* rank ID of my aggregator */
int num_nonaggrs; /* number of non-aggregators assigned */
int *nonaggr_ranks; /* ranks of assigned non-aggregators */
};

#define NC_readonly(ncp) fIsSet((ncp)->flags, NC_MODE_RDONLY)
Expand Down Expand Up @@ -631,4 +642,17 @@ ncmpio_read_write(NC *ncp, int rw_flag, int coll_indep, MPI_Offset offset,
MPI_Offset buf_count, MPI_Datatype buf_type, void *buf,
int buftype_is_contig);

/* Begin defined in ncmpio_intranode.c --------------------------------------*/
extern int
ncmpio_intra_node_aggr_init(NC *ncp);

extern int
ncmpio_intra_node_aggregation_nreqs(NC *ncp, int num_reqs, NC_req *put_list,
MPI_Offset newnumrecs);
extern int
ncmpio_intra_node_aggregation(NC *ncp, NC_var *varp, const MPI_Offset *start,
const MPI_Offset *count,
const MPI_Offset *stride, MPI_Offset bufCount,
MPI_Datatype bufType, void *buf);

#endif /* H_NC */
17 changes: 7 additions & 10 deletions src/drivers/ncmpio/ncmpio_close.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ ncmpio_free_NC(NC *ncp)
*/
if (ncp->mpiinfo != MPI_INFO_NULL) MPI_Info_free(&ncp->mpiinfo);

if (ncp->get_list != NULL) NCI_Free(ncp->get_list);
if (ncp->put_list != NULL) NCI_Free(ncp->put_list);
if (ncp->abuf != NULL) NCI_Free(ncp->abuf);
if (ncp->path != NULL) NCI_Free(ncp->path);
if (ncp->get_list != NULL) NCI_Free(ncp->get_list);
if (ncp->put_list != NULL) NCI_Free(ncp->put_list);
if (ncp->abuf != NULL) NCI_Free(ncp->abuf);
if (ncp->path != NULL) NCI_Free(ncp->path);
if (ncp->nonaggr_ranks != NULL) NCI_Free(ncp->nonaggr_ranks);

NCI_Free(ncp);
}
Expand Down Expand Up @@ -144,17 +145,13 @@ ncmpio_close(void *ncdp)
}
#else
if (ncp->numLeadGetReqs > 0) {
int rank;
MPI_Comm_rank(ncp->comm, &rank);
printf("PnetCDF warning: %d nonblocking get requests still pending on process %d. Cancelling ...\n",ncp->numLeadGetReqs,rank);
printf("PnetCDF warning: %d nonblocking get requests still pending on process %d. Cancelling ...\n",ncp->numLeadGetReqs,ncp->rank);
err = ncmpio_cancel(ncp, NC_GET_REQ_ALL, NULL, NULL);
if (status == NC_NOERR) status = err;
if (status == NC_NOERR) status = NC_EPENDING;
}
if (ncp->numLeadPutReqs > 0) {
int rank;
MPI_Comm_rank(ncp->comm, &rank);
printf("PnetCDF warning: %d nonblocking put requests still pending on process %d. Cancelling ...\n",ncp->numLeadPutReqs,rank);
printf("PnetCDF warning: %d nonblocking put requests still pending on process %d. Cancelling ...\n",ncp->numLeadPutReqs,ncp->rank);
err = ncmpio_cancel(ncp, NC_PUT_REQ_ALL, NULL, NULL);
if (status == NC_NOERR) status = err;
if (status == NC_NOERR) status = NC_EPENDING;
Expand Down
Loading