From 275d223e9bccfde2106eda10d7050c878bddaad6 Mon Sep 17 00:00:00 2001 From: wkliao Date: Wed, 28 Aug 2024 16:45:53 -0500 Subject: [PATCH] Use MPI independent I/O when number of processes is 1 Check if the number of processes is 1. If this is the case, call only the MPI independent I/O functions. In addition, it avoids calls to MPI_Barrier, MPI_Bcast, and MPI_Allreduce. --- src/drivers/ncmpio/ncmpio_NC.h | 4 +- src/drivers/ncmpio/ncmpio_attr.m4 | 9 +-- src/drivers/ncmpio/ncmpio_close.c | 21 +++---- src/drivers/ncmpio/ncmpio_create.c | 4 +- src/drivers/ncmpio/ncmpio_dim.c | 4 +- src/drivers/ncmpio/ncmpio_enddef.c | 79 +++++++++++++++----------- src/drivers/ncmpio/ncmpio_file_io.c | 4 +- src/drivers/ncmpio/ncmpio_fill.c | 63 +++++++++++--------- src/drivers/ncmpio/ncmpio_getput.m4 | 28 ++++----- src/drivers/ncmpio/ncmpio_header_get.c | 17 ++++-- src/drivers/ncmpio/ncmpio_header_put.c | 20 ++++--- src/drivers/ncmpio/ncmpio_open.c | 4 +- src/drivers/ncmpio/ncmpio_sync.c | 25 +++++--- src/drivers/ncmpio/ncmpio_var.c | 6 +- src/drivers/ncmpio/ncmpio_vard.c | 22 +++---- src/drivers/ncmpio/ncmpio_wait.c | 36 ++++++------ 16 files changed, 199 insertions(+), 147 deletions(-) diff --git a/src/drivers/ncmpio/ncmpio_NC.h b/src/drivers/ncmpio/ncmpio_NC.h index 6f4d311ff..b5c1694b9 100644 --- a/src/drivers/ncmpio/ncmpio_NC.h +++ b/src/drivers/ncmpio/ncmpio_NC.h @@ -407,6 +407,8 @@ struct NC { MPI_Offset get_size; /* amount of reads committed so far in bytes */ MPI_Comm comm; /* MPI communicator */ + int rank; /* MPI rank of this process */ + int nprocs; /* number of MPI processes */ MPI_Info mpiinfo; /* used MPI info object */ MPI_File collective_fh; /* file handle for collective mode */ MPI_File independent_fh; /* file handle for independent mode */ @@ -474,7 +476,7 @@ typedef struct bufferinfo { int chunk; /* chunk size for reading the header */ int version; /* 1, 2, and 5 for CDF-1, 2, and 5 respectively */ int safe_mode;/* 0: disabled, 1: enabled */ - int rw_mode; /* 0: independent, 1: collective */ + int coll_mode;/* 0: independent, 1: collective */ char *base; /* beginning of read/write buffer */ char *pos; /* current position in buffer */ char *end; /* end position of buffer */ diff --git a/src/drivers/ncmpio/ncmpio_attr.m4 b/src/drivers/ncmpio/ncmpio_attr.m4 index 01af948f0..0d2c8ea2e 100644 --- a/src/drivers/ncmpio/ncmpio_attr.m4 +++ b/src/drivers/ncmpio/ncmpio_attr.m4 @@ -477,7 +477,7 @@ ncmpio_rename_att(void *ncdp, err_check: if (nname != NULL) NCI_Free(nname); - if (ncp->safe_mode) { + if (ncp->safe_mode && ncp->nprocs > 1) { int minE, mpireturn; /* check error code across processes */ @@ -597,7 +597,7 @@ ncmpio_copy_att(void *ncdp_in, } err_check: - if (ncp_out->safe_mode) { + if (ncp_out->safe_mode && ncp_out->nprocs > 1) { int minE, mpireturn; /* check the error code across processes */ @@ -710,7 +710,7 @@ ncmpio_del_att(void *ncdp, err_check: if (nname != NULL) NCI_Free(nname); - if (ncp->safe_mode) { + if (ncp->safe_mode && ncp->nprocs > 1) { int minE, mpireturn; /* find min error code across processes */ @@ -1044,7 +1044,8 @@ ncmpio_put_att(void *ncdp, } err_check: - if (ncp->safe_mode) { /* check the error code across processes */ + if (ncp->safe_mode && ncp->nprocs > 1) { + /* check the error code across processes */ int minE, mpireturn; TRACE_COMM(MPI_Allreduce)(&err, &minE, 1, MPI_INT, MPI_MIN, ncp->comm); diff --git a/src/drivers/ncmpio/ncmpio_close.c b/src/drivers/ncmpio/ncmpio_close.c index f82e6b8b2..d62b7c08c 100644 --- a/src/drivers/ncmpio/ncmpio_close.c +++ b/src/drivers/ncmpio/ncmpio_close.c @@ -69,7 +69,7 @@ ncmpio_close_files(NC *ncp, int doUnlink) { return ncmpii_error_mpi2nc(mpireturn, "MPI_File_close"); } - if (ncp->collective_fh != MPI_FILE_NULL) { + if (ncp->nprocs > 1 && ncp->collective_fh != MPI_FILE_NULL) { TRACE_IO(MPI_File_close)(&ncp->collective_fh); if (mpireturn != MPI_SUCCESS) return ncmpii_error_mpi2nc(mpireturn, "MPI_File_close"); @@ -78,9 +78,13 @@ ncmpio_close_files(NC *ncp, int doUnlink) { if (doUnlink) { /* called from ncmpi_abort, if the file is being created and is still * in define mode, the file is deleted */ - TRACE_IO(MPI_File_delete)((char *)ncp->path, ncp->mpiinfo); - if (mpireturn != MPI_SUCCESS) - return ncmpii_error_mpi2nc(mpireturn, "MPI_File_delete"); + if (ncp->rank == 0) { + TRACE_IO(MPI_File_delete)((char *)ncp->path, ncp->mpiinfo); + if (mpireturn != MPI_SUCCESS) + return ncmpii_error_mpi2nc(mpireturn, "MPI_File_delete"); + } + if (ncp->nprocs > 1) + MPI_Barrier(ncp->comm); } return NC_NOERR; } @@ -163,13 +167,10 @@ ncmpio_close(void *ncdp) /* file is open for write and no variable has been defined */ if (!NC_readonly(ncp) && ncp->vars.ndefined == 0) { - int rank; - /* wait until all processes close the file */ - MPI_Barrier(ncp->comm); + if (ncp->nprocs > 1) MPI_Barrier(ncp->comm); - MPI_Comm_rank(ncp->comm, &rank); - if (rank == 0) { + if (ncp->rank == 0) { /* ignore all errors, as unexpected file size if not a fatal error */ #ifdef HAVE_TRUNCATE /* when calling POSIX I/O, remove file type prefix from file name */ @@ -222,7 +223,7 @@ ncmpio_close(void *ncdp) } #endif } - MPI_Barrier(ncp->comm); + if (ncp->nprocs > 1) MPI_Barrier(ncp->comm); } /* free up space occupied by the header metadata */ diff --git a/src/drivers/ncmpio/ncmpio_create.c b/src/drivers/ncmpio/ncmpio_create.c index fec95559a..02375e97f 100644 --- a/src/drivers/ncmpio/ncmpio_create.c +++ b/src/drivers/ncmpio/ncmpio_create.c @@ -293,8 +293,10 @@ ncmpio_create(MPI_Comm comm, ncp->comm = comm; /* reuse comm duplicated in dispatch layer */ ncp->mpiinfo = info_used; /* is not MPI_INFO_NULL */ ncp->mpiomode = mpiomode; + ncp->rank = rank; + ncp->nprocs = nprocs; ncp->collective_fh = fh; - ncp->independent_fh = MPI_FILE_NULL; + ncp->independent_fh = (nprocs > 1) ? MPI_FILE_NULL : fh; ncp->path = (char*) NCI_Malloc(strlen(path) + 1); strcpy(ncp->path, path); diff --git a/src/drivers/ncmpio/ncmpio_dim.c b/src/drivers/ncmpio/ncmpio_dim.c index 3badcf75c..b2af61d48 100644 --- a/src/drivers/ncmpio/ncmpio_dim.c +++ b/src/drivers/ncmpio/ncmpio_dim.c @@ -346,10 +346,10 @@ ncmpio_rename_dim(void *ncdp, #endif err_check: - if (ncp->safe_mode) { + if (ncp->safe_mode && ncp->nprocs > 1) { + /* check the error so far across processes */ int status, mpireturn; - /* check the error so far across processes */ TRACE_COMM(MPI_Allreduce)(&err, &status, 1, MPI_INT, MPI_MIN,ncp->comm); if (mpireturn != MPI_SUCCESS) { NCI_Free(nnewname); diff --git a/src/drivers/ncmpio/ncmpio_enddef.c b/src/drivers/ncmpio/ncmpio_enddef.c index 1befa1491..1bab732ee 100644 --- a/src/drivers/ncmpio/ncmpio_enddef.c +++ b/src/drivers/ncmpio/ncmpio_enddef.c @@ -39,13 +39,16 @@ move_file_block(NC *ncp, MPI_Offset from, /* source file starting offset */ MPI_Offset nbytes) /* amount to be moved */ { - int rank, nprocs, bufcount, mpireturn, err, status=NC_NOERR, min_st; + int rank, bufcount, mpireturn, err, status=NC_NOERR, min_st; void *buf; size_t chunk_size; MPI_Status mpistatus; + MPI_File fh; - MPI_Comm_size(ncp->comm, &nprocs); - MPI_Comm_rank(ncp->comm, &rank); + rank = ncp->rank; + + /* moving file blocks must be done in collective mode, ignoring NC_HCOLL */ + fh = ncp->collective_fh; /* Divide amount nbytes among all processes. If the divided amount, * chunk_size, is larger then MOVE_UNIT, set chunk_size to be the move unit @@ -53,8 +56,8 @@ move_file_block(NC *ncp, * use 4-byte int in their count argument.) */ #define MOVE_UNIT 67108864 - chunk_size = nbytes / nprocs; - if (nbytes % nprocs) chunk_size++; + chunk_size = nbytes / ncp->nprocs; + if (nbytes % ncp->nprocs) chunk_size++; if (chunk_size > MOVE_UNIT) { /* move data in multiple rounds, MOVE_UNIT per process at a time */ chunk_size = MOVE_UNIT; @@ -66,8 +69,8 @@ move_file_block(NC *ncp, if (buf == NULL) DEBUG_RETURN_ERROR(NC_ENOMEM) /* make fileview entire file visible */ - TRACE_IO(MPI_File_set_view)(ncp->collective_fh, 0, MPI_BYTE, MPI_BYTE, - "native", MPI_INFO_NULL); + TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, MPI_BYTE, "native", + MPI_INFO_NULL); /* move the variable starting from its tail toward its beginning */ while (nbytes > 0) { @@ -77,7 +80,7 @@ move_file_block(NC *ncp, * checked, must be < NC_MAX_INT */ bufcount = (int)chunk_size; - if (nbytes < (MPI_Offset)nprocs * chunk_size) { + if (nbytes < (MPI_Offset)ncp->nprocs * chunk_size) { /* handle the last group of chunks */ MPI_Offset rem_chunks = nbytes / chunk_size; if (rank > rem_chunks) /* these processes do not read/write */ @@ -88,7 +91,7 @@ move_file_block(NC *ncp, nbytes = 0; } else { - nbytes -= chunk_size*nprocs; + nbytes -= chunk_size*ncp->nprocs; } /* explicitly initialize mpistatus object to 0. For zero-length read, @@ -99,8 +102,7 @@ move_file_block(NC *ncp, memset(&mpistatus, 0, sizeof(MPI_Status)); /* read the original data @ from+nbytes+rank*chunk_size */ - TRACE_IO(MPI_File_read_at_all)(ncp->collective_fh, - from+nbytes+rank*chunk_size, + TRACE_IO(MPI_File_read_at_all)(fh, from+nbytes+rank*chunk_size, buf, bufcount, MPI_BYTE, &mpistatus); if (mpireturn != MPI_SUCCESS) { err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_at_all"); @@ -128,11 +130,13 @@ move_file_block(NC *ncp, ncp->get_size += get_size; } - /* MPI_Barrier(ncp->comm); */ - /* important, in case new region overlaps old region */ - TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, - ncp->comm); - status = min_st; + if (ncp->nprocs > 1) { + /* MPI_Barrier(ncp->comm); */ + /* important, in case new region overlaps old region */ + TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, + ncp->comm); + status = min_st; + } if (status != NC_NOERR) break; /* write to new location @ to+nbytes+rank*chunk_size @@ -159,8 +163,7 @@ move_file_block(NC *ncp, */ memset(&mpistatus, 0, sizeof(MPI_Status)); - TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh, - to+nbytes+rank*chunk_size, + TRACE_IO(MPI_File_write_at_all)(fh, to+nbytes+rank*chunk_size, buf, get_size /* NOT bufcount */, MPI_BYTE, &mpistatus); if (mpireturn != MPI_SUCCESS) { @@ -180,8 +183,10 @@ move_file_block(NC *ncp, else ncp->put_size += put_size; } - TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, ncp->comm); - status = min_st; + if (ncp->nprocs > 1) { + TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, ncp->comm); + status = min_st; + } if (status != NC_NOERR) break; } NCI_Free(buf); @@ -269,8 +274,10 @@ NC_begins(NC *ncp) MPI_Comm_rank(ncp->comm, &rank); ncp->xsz = ncmpio_hdr_len_NC(ncp); - if (ncp->safe_mode) { /* this consistency check is redundant as metadata is - kept consistent at all time when safe mode is on */ + if (ncp->safe_mode && ncp->nprocs > 1) { + /* this consistency check is redundant as metadata is kept consistent + * at all time when safe mode is on + */ int err, status; MPI_Offset root_xsz = ncp->xsz; @@ -477,14 +484,22 @@ NC_begins(NC *ncp) static int write_NC(NC *ncp) { - int status=NC_NOERR, mpireturn, err, rank; + int status=NC_NOERR, mpireturn, err, rank, is_coll; MPI_Offset i, header_wlen, ntimes; + MPI_File fh; MPI_Status mpistatus; assert(!NC_readonly(ncp)); MPI_Comm_rank(ncp->comm, &rank); + /* Depending on whether NC_HCOLL is set, writing file header can be done + * through either MPI collective or independent write call. + * When * ncp->nprocs == 1, ncp->collective_fh == ncp->independent_fh + */ + is_coll = (ncp->nprocs > 1 && fIsSet(ncp->flags, NC_HCOLL)) ? 1 : 0; + fh = ncp->collective_fh; + /* In NC_begins(), root's ncp->xsz and ncp->begin_var, root's header * size and extent, have been broadcast (sync-ed) among processes. */ @@ -554,12 +569,12 @@ write_NC(NC *ncp) buf_ptr = buf; for (i=0; iflags, NC_HCOLL)) - TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh, offset, buf_ptr, - bufCount, MPI_BYTE, &mpistatus); + if (is_coll) + TRACE_IO(MPI_File_write_at_all)(fh, offset, buf_ptr, bufCount, + MPI_BYTE, &mpistatus); else - TRACE_IO(MPI_File_write_at)(ncp->collective_fh, offset, buf_ptr, - bufCount, MPI_BYTE, &mpistatus); + TRACE_IO(MPI_File_write_at)(fh, offset, buf_ptr, bufCount, + MPI_BYTE, &mpistatus); if (mpireturn != MPI_SUCCESS) { err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_at"); /* write has failed, which is more serious than inconsistency */ @@ -586,12 +601,12 @@ write_NC(NC *ncp) else if (fIsSet(ncp->flags, NC_HCOLL)) { /* other processes participate the collective call */ for (i=0; icollective_fh, 0, NULL, - 0, MPI_BYTE, &mpistatus); + TRACE_IO(MPI_File_write_at_all)(fh, 0, NULL, 0, MPI_BYTE, + &mpistatus); } fn_exit: - if (ncp->safe_mode == 1) { + if (ncp->safe_mode == 1 && ncp->nprocs > 1) { /* broadcast root's status, because only root writes to the file */ int root_status = status; TRACE_COMM(MPI_Bcast)(&root_status, 1, MPI_INT, 0, ncp->comm); @@ -611,7 +626,7 @@ write_NC(NC *ncp) * do not get error and proceed to the next subroutine call. */ #define CHECK_ERROR(err) { \ - if (ncp->safe_mode == 1) { \ + if (ncp->safe_mode == 1 && ncp->nprocs > 1) { \ int status; \ TRACE_COMM(MPI_Allreduce)(&err, &status, 1, MPI_INT, MPI_MIN, \ ncp->comm); \ diff --git a/src/drivers/ncmpio/ncmpio_file_io.c b/src/drivers/ncmpio/ncmpio_file_io.c index 8227e624b..4b4a30632 100644 --- a/src/drivers/ncmpio/ncmpio_file_io.c +++ b/src/drivers/ncmpio/ncmpio_file_io.c @@ -128,7 +128,7 @@ ncmpio_read_write(NC *ncp, xbuf = NCI_Malloc((size_t)req_size); } - if (coll_indep == NC_REQ_COLL) { + if (ncp->nprocs > 1 && coll_indep == NC_REQ_COLL) { TRACE_IO(MPI_File_read_at_all)(fh, offset, xbuf, xlen, xbuf_type, &mpistatus); if (mpireturn != MPI_SUCCESS) { @@ -274,7 +274,7 @@ ncmpio_read_write(NC *ncp, } } - if (coll_indep == NC_REQ_COLL) { + if (ncp->nprocs > 1 && coll_indep == NC_REQ_COLL) { TRACE_IO(MPI_File_write_at_all)(fh, offset, xbuf, xlen, xbuf_type, &mpistatus); if (mpireturn != MPI_SUCCESS) { diff --git a/src/drivers/ncmpio/ncmpio_fill.c b/src/drivers/ncmpio/ncmpio_fill.c index a5e605d23..f43151de9 100644 --- a/src/drivers/ncmpio/ncmpio_fill.c +++ b/src/drivers/ncmpio/ncmpio_fill.c @@ -31,16 +31,6 @@ #include #include "ncmpio_NC.h" -#define CHECK_ERROR(status) { \ - if (ncp->safe_mode == 1) { \ - int g_status; \ - TRACE_COMM(MPI_Allreduce)(&status, &g_status, 1, MPI_INT, MPI_MIN, \ - ncp->comm); \ - if (g_status != NC_NOERR) return g_status; \ - } \ - else if (status != NC_NOERR) \ - return status; \ -} /* The default fill values defined in pnetcdf.h.inc must be the same as the * ones defined in netCDF-4 and match the hexadecimal values set below @@ -201,6 +191,7 @@ fill_var_rec(NC *ncp, offset += ncp->recsize * recno; offset += start * varp->xsz; + /* when nprocs == 1, we keep I/O mode in independent mode at all time */ fh = ncp->collective_fh; /* make the entire file visible */ @@ -231,7 +222,11 @@ fill_var_rec(NC *ncp, } /* write to variable collectively */ - TRACE_IO(MPI_File_write_at_all)(fh, offset, buf, (int)count, bufType, + if (ncp->nprocs > 1) + TRACE_IO(MPI_File_write_at_all)(fh, offset, buf, (int)count, bufType, + &mpistatus); + else + TRACE_IO(MPI_File_write_at)(fh, offset, buf, (int)count, bufType, &mpistatus); NCI_Free(buf); if (bufType != MPI_BYTE) MPI_Type_free(&bufType); @@ -249,12 +244,14 @@ fill_var_rec(NC *ncp, * * First, find the max numrecs among all processes. */ - MPI_Offset max_numrecs, numrecs=recno+1; - TRACE_COMM(MPI_Allreduce)(&numrecs, &max_numrecs, 1, MPI_OFFSET, - MPI_MAX, ncp->comm); - if (mpireturn != MPI_SUCCESS) { - err = ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); - if (status == NC_NOERR) status = err; + MPI_Offset max_numrecs=recno+1; + if (ncp->nprocs > 1) { + TRACE_COMM(MPI_Allreduce)(MPI_IN_PLACE, &max_numrecs, 1, MPI_OFFSET, + MPI_MAX, ncp->comm); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); + if (status == NC_NOERR) status = err; + } } /* In collective mode, ncp->numrecs is always sync-ed among processes */ @@ -400,14 +397,20 @@ fillerup_aggregate(NC *ncp, NC *old_ncp) * variables' fill modes and overwrite local's if an inconsistency is found * Note ncp->vars.ndefined is already made consistent by this point. */ - for (i=start_vid; ivars.ndefined; i++) - noFill[i-start_vid] = (char)(ncp->vars.value[i]->no_fill); + if (ncp->nprocs > 1) { + for (i=start_vid; ivars.ndefined; i++) + noFill[i-start_vid] = (char)(ncp->vars.value[i]->no_fill); + TRACE_COMM(MPI_Bcast)(noFill, (ncp->vars.ndefined - start_vid), MPI_BYTE, 0, ncp->comm); - for (i=start_vid; ivars.ndefined; i++) { - /* overwrite local's mode */ - ncp->vars.value[i]->no_fill = noFill[i-start_vid]; - if (!noFill[i-start_vid]) nVarsFill++; + if (mpireturn != MPI_SUCCESS) + return ncmpii_error_mpi2nc(mpireturn, "MPI_Bcast"); + + for (i=start_vid; ivars.ndefined; i++) { + /* overwrite local's mode */ + ncp->vars.value[i]->no_fill = noFill[i-start_vid]; + if (!noFill[i-start_vid]) nVarsFill++; + } } #else for (i=start_vid; ivars.ndefined; i++) { @@ -620,6 +623,7 @@ fillerup_aggregate(NC *ncp, NC *old_ncp) NCI_Free(count); NCI_Free(offset); + /* when nprocs == 1, we keep I/O mode in independent mode at all time */ fh = ncp->collective_fh; TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, filetype, "native", @@ -650,7 +654,10 @@ fillerup_aggregate(NC *ncp, NC *old_ncp) } /* write to variable collectively */ - TRACE_IO(MPI_File_write_at_all)(fh, 0, buf, (int)buf_len, bufType, &mpistatus); + if (ncp->nprocs > 1) + TRACE_IO(MPI_File_write_at_all)(fh, 0, buf, (int)buf_len, bufType, &mpistatus); + else + TRACE_IO(MPI_File_write_at)(fh, 0, buf, (int)buf_len, bufType, &mpistatus); NCI_Free(buf); if (bufType != MPI_BYTE) MPI_Type_free(&bufType); @@ -728,7 +735,7 @@ ncmpio_fill_var_rec(void *ncdp, } err_check: - if (ncp->safe_mode) { /* consistency check */ + if (ncp->safe_mode && ncp->nprocs > 1) { /* consistency check */ int root_varid, status, mpireturn; MPI_Offset root_recno; @@ -776,12 +783,13 @@ ncmpio_set_fill(void *ncdp, int i, mpireturn, oldmode; NC *ncp = (NC*)ncdp; - if (ncp->safe_mode) { + if (ncp->safe_mode && ncp->nprocs > 1) { int err, status, root_fill_mode=fill_mode; TRACE_COMM(MPI_Bcast)(&root_fill_mode, 1, MPI_INT, 0, ncp->comm); if (mpireturn != MPI_SUCCESS) return ncmpii_error_mpi2nc(mpireturn, "MPI_Bcast"); + if (fill_mode != root_fill_mode) /* dataset's fill mode is inconsistent with root's */ DEBUG_ASSIGN_ERROR(err, NC_EMULTIDEFINE_FILL_MODE) @@ -792,6 +800,7 @@ ncmpio_set_fill(void *ncdp, TRACE_COMM(MPI_Allreduce)(&err, &status, 1, MPI_INT, MPI_MIN, ncp->comm); if (mpireturn != MPI_SUCCESS) return ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); + if (err == NC_NOERR) err = status; if (err != NC_NOERR) return err; } @@ -833,7 +842,7 @@ ncmpio_def_var_fill(void *ncdp, /* sanity check for ncdp and varid has been done in dispatchers */ varp = ncp->vars.value[varid]; - if (ncp->safe_mode) { + if (ncp->safe_mode && ncp->nprocs > 1) { int root_ids[3], my_fill_null, minE, mpireturn; /* check if varid, no_fill, fill_value, are consistent */ diff --git a/src/drivers/ncmpio/ncmpio_getput.m4 b/src/drivers/ncmpio/ncmpio_getput.m4 index 98b943f67..e0a083c61 100644 --- a/src/drivers/ncmpio/ncmpio_getput.m4 +++ b/src/drivers/ncmpio/ncmpio_getput.m4 @@ -266,12 +266,11 @@ err_check: * have to process this one record at a time. */ - if (fIsSet(reqMode, NC_REQ_COLL)) { + fh = ncp->independent_fh; + coll_indep = NC_REQ_INDEP; + if (ncp->nprocs > 1 && fIsSet(reqMode, NC_REQ_COLL)) { fh = ncp->collective_fh; coll_indep = NC_REQ_COLL; - } else { - fh = ncp->independent_fh; - coll_indep = NC_REQ_INDEP; } /* MPI_File_set_view is collective */ @@ -317,12 +316,14 @@ err_check: * different among processes. First, find the max numrecs among * all processes. */ - MPI_Offset max_numrecs; - TRACE_COMM(MPI_Allreduce)(&new_numrecs, &max_numrecs, 1, - MPI_OFFSET, MPI_MAX, ncp->comm); - if (mpireturn != MPI_SUCCESS) { - err = ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); - if (status == NC_NOERR) status = err; + MPI_Offset max_numrecs = new_numrecs; + if (ncp->nprocs > 1) { + TRACE_COMM(MPI_Allreduce)(&new_numrecs, &max_numrecs, 1, + MPI_OFFSET, MPI_MAX, ncp->comm); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); + if (status == NC_NOERR) status = err; + } } /* In collective mode, ncp->numrecs is always sync-ed among processes */ @@ -493,12 +494,11 @@ err_check: * have to process this one record at a time. */ - if (fIsSet(reqMode, NC_REQ_COLL)) { + fh = ncp->independent_fh; + coll_indep = NC_REQ_INDEP; + if (ncp->nprocs > 1 && fIsSet(reqMode, NC_REQ_COLL)) { fh = ncp->collective_fh; coll_indep = NC_REQ_COLL; - } else { - fh = ncp->independent_fh; - coll_indep = NC_REQ_INDEP; } /* MPI_File_set_view is collective */ diff --git a/src/drivers/ncmpio/ncmpio_header_get.c b/src/drivers/ncmpio/ncmpio_header_get.c index ebf8f6651..68846a69d 100644 --- a/src/drivers/ncmpio/ncmpio_header_get.c +++ b/src/drivers/ncmpio/ncmpio_header_get.c @@ -321,11 +321,12 @@ hdr_len_NC_vararray(const NC_vararray *ncap, */ static int hdr_fetch(bufferinfo *gbp) { - int rank, err=NC_NOERR, mpireturn; + int rank, nprocs, err=NC_NOERR, mpireturn; MPI_Status mpistatus; assert(gbp->base != NULL); + MPI_Comm_size(gbp->comm, &nprocs); MPI_Comm_rank(gbp->comm, &rank); if (rank == 0) { char *readBuf; @@ -358,7 +359,7 @@ hdr_fetch(bufferinfo *gbp) { /* fileview is already entire file visible and MPI_File_read_at does not change the file pointer */ - if (gbp->rw_mode == 1) /* collective read */ + if (gbp->coll_mode == 1) /* collective read */ TRACE_IO(MPI_File_read_at_all)(gbp->collective_fh, gbp->offset, readBuf, readLen, MPI_BYTE, &mpistatus); else @@ -391,19 +392,20 @@ hdr_fetch(bufferinfo *gbp) { * file pointer location */ gbp->offset += readLen; } - else if (gbp->rw_mode == 1) { /* collective read */ + else if (gbp->coll_mode == 1) { /* collective read */ /* other processes participate the collective call */ TRACE_IO(MPI_File_read_at_all)(gbp->collective_fh, 0, NULL, 0, MPI_BYTE, &mpistatus); } - if (gbp->safe_mode == 1) { + if (gbp->safe_mode == 1 && nprocs > 1) { TRACE_COMM(MPI_Bcast)(&err, 1, MPI_INT, 0, gbp->comm); if (err != NC_NOERR) return err; } /* broadcast root's read (full or partial header) to other processes */ - TRACE_COMM(MPI_Bcast)(gbp->base, gbp->chunk, MPI_BYTE, 0, gbp->comm); + if (nprocs > 1) + TRACE_COMM(MPI_Bcast)(gbp->base, gbp->chunk, MPI_BYTE, 0, gbp->comm); gbp->pos = gbp->base; @@ -1335,7 +1337,10 @@ ncmpio_hdr_get_NC(NC *ncp) getbuf.get_size = 0; getbuf.offset = 0; /* read from start of the file */ getbuf.safe_mode = ncp->safe_mode; - getbuf.rw_mode = (fIsSet(ncp->flags, NC_HCOLL)) ? 1 : 0; + if (ncp->nprocs > 1 && fIsSet(ncp->flags, NC_HCOLL)) + getbuf.coll_mode = 1; + else + getbuf.coll_mode = 0; /* CDF-5's minimum header size is 4 bytes more than CDF-1 and CDF-2's */ getbuf.chunk = _RNDUP( MAX(MIN_NC_XSZ+4, ncp->chunk), X_ALIGN ); diff --git a/src/drivers/ncmpio/ncmpio_header_put.c b/src/drivers/ncmpio/ncmpio_header_put.c index 8ba249202..4af8416ff 100644 --- a/src/drivers/ncmpio/ncmpio_header_put.c +++ b/src/drivers/ncmpio/ncmpio_header_put.c @@ -451,7 +451,10 @@ ncmpio_hdr_put_NC(NC *ncp, void *buf) putbuf.pos = buf; putbuf.base = buf; putbuf.safe_mode = ncp->safe_mode; - putbuf.rw_mode = (fIsSet(ncp->flags, NC_HCOLL)) ? 1 : 0; + if (ncp->nprocs > 1 && fIsSet(ncp->flags, NC_HCOLL)) + putbuf.coll_mode = 1; + else + putbuf.coll_mode = 0; /* netCDF file format: * netcdf_file = header data @@ -511,7 +514,7 @@ ncmpio_hdr_put_NC(NC *ncp, void *buf) */ int ncmpio_write_header(NC *ncp) { - int rank, status=NC_NOERR, mpireturn, err; + int status=NC_NOERR, mpireturn, err; size_t i, ntimes; MPI_File fh; MPI_Status mpistatus; @@ -523,7 +526,7 @@ int ncmpio_write_header(NC *ncp) */ fh = ncp->collective_fh; - if (NC_indep(ncp)) + if (NC_indep(ncp)) /* called in independent data mode */ fh = ncp->independent_fh; /* update file header size, as this subroutine may be called from a rename @@ -535,8 +538,7 @@ int ncmpio_write_header(NC *ncp) ntimes = ncp->xsz / NC_MAX_INT; if (ncp->xsz % NC_MAX_INT) ntimes++; - MPI_Comm_rank(ncp->comm, &rank); - if (rank == 0) { /* only root writes to file header */ + if (ncp->rank == 0) { /* only root writes to file header */ MPI_Offset offset; size_t remain; size_t bufLen = _RNDUP(ncp->xsz, X_ALIGN); @@ -560,10 +562,10 @@ int ncmpio_write_header(NC *ncp) */ memset(&mpistatus, 0, sizeof(MPI_Status)); - if (fIsSet(ncp->flags, NC_HCOLL)) /* collective write */ + if (fIsSet(ncp->flags, NC_HCOLL)) /* header collective write */ TRACE_IO(MPI_File_write_at_all)(fh, offset, buf_ptr, writeLen, MPI_BYTE, &mpistatus); - else + else /* header independent write */ TRACE_IO(MPI_File_write_at)(fh, offset, buf_ptr, writeLen, MPI_BYTE, &mpistatus); @@ -593,8 +595,8 @@ int ncmpio_write_header(NC *ncp) } NCI_Free(buf); } - else if (fIsSet(ncp->flags, NC_HCOLL)) { /* collective write */ - /* other processes participate the collective call */ + else if (fIsSet(ncp->flags, NC_HCOLL)) { /* header collective write */ + /* collective write: other processes participate the collective call */ for (i=0; iiomode = omode; ncp->comm = comm; /* reuse comm duplicated in dispatch layer */ + MPI_Comm_rank(comm, &ncp->rank); + MPI_Comm_size(comm, &ncp->nprocs); ncp->mpiinfo = info_used; /* is not MPI_INFO_NULL */ ncp->mpiomode = mpiomode; ncp->collective_fh = fh; - ncp->independent_fh = MPI_FILE_NULL; + ncp->independent_fh = (ncp->nprocs > 1) ? MPI_FILE_NULL : fh; ncp->path = (char*) NCI_Malloc(strlen(path) + 1); strcpy(ncp->path, path); diff --git a/src/drivers/ncmpio/ncmpio_sync.c b/src/drivers/ncmpio/ncmpio_sync.c index c6fe19464..0090fd6bc 100644 --- a/src/drivers/ncmpio/ncmpio_sync.c +++ b/src/drivers/ncmpio/ncmpio_sync.c @@ -40,6 +40,8 @@ ncmpio_file_sync(NC *ncp) { if (mpireturn != MPI_SUCCESS) return ncmpii_error_mpi2nc(mpireturn, "MPI_File_sync"); } + /* when nprocs == 1, ncp->collective_fh == ncp->independent_fh */ + if (ncp->nprocs == 1) return NC_NOERR; /* ncp->collective_fh is never MPI_FILE_NULL as collective mode is * default in PnetCDF */ @@ -47,7 +49,9 @@ ncmpio_file_sync(NC *ncp) { if (mpireturn != MPI_SUCCESS) return ncmpii_error_mpi2nc(mpireturn, "MPI_File_sync"); + /* Barrier is not necessary ... TRACE_COMM(MPI_Barrier)(ncp->comm); + */ return NC_NOERR; } @@ -76,9 +80,9 @@ ncmpio_write_numrecs(NC *ncp, /* return now if there is no record variabled defined */ if (ncp->vars.num_rec_vars == 0) return NC_NOERR; - fh = ncp->collective_fh; - if (NC_indep(ncp)) - fh = ncp->independent_fh; + fh = ncp->independent_fh; + if (ncp->nprocs > 1 && !NC_indep(ncp)) + fh = ncp->collective_fh; if (rank > 0 && fIsSet(ncp->flags, NC_HCOLL)) { /* other processes participate the collective call */ @@ -115,7 +119,7 @@ ncmpio_write_numrecs(NC *ncp, memset(&mpistatus, 0, sizeof(MPI_Status)); /* root's file view always includes the entire file header */ - if (fIsSet(ncp->flags, NC_HCOLL)) + if (fIsSet(ncp->flags, NC_HCOLL) && ncp->nprocs > 1) TRACE_IO(MPI_File_write_at_all)(fh, NC_NUMRECS_OFFSET, (void*)pos, len, MPI_BYTE, &mpistatus); else @@ -183,15 +187,18 @@ ncmpio_sync_numrecs(void *ncdp) /* find the max numrecs among all processes * Note max numrecs may be smaller than some process's ncp->numrecs */ - TRACE_COMM(MPI_Allreduce)(&ncp->numrecs, &max_numrecs, 1, MPI_OFFSET, - MPI_MAX, ncp->comm); - if (mpireturn != MPI_SUCCESS) - return ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); + max_numrecs = ncp->numrecs; + if (ncp->nprocs > 1) { + TRACE_COMM(MPI_Allreduce)(&ncp->numrecs, &max_numrecs, 1, MPI_OFFSET, + MPI_MAX, ncp->comm); + if (mpireturn != MPI_SUCCESS) + return ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); + } /* root process writes max_numrecs to file */ status = ncmpio_write_numrecs(ncp, max_numrecs); - if (ncp->safe_mode == 1) { + if (ncp->nprocs > 1 && ncp->safe_mode == 1) { /* broadcast root's status, because only root writes to the file */ int root_status = status; TRACE_COMM(MPI_Bcast)(&root_status, 1, MPI_INT, 0, ncp->comm); diff --git a/src/drivers/ncmpio/ncmpio_var.c b/src/drivers/ncmpio/ncmpio_var.c index 74e37f330..7f88c9a3c 100644 --- a/src/drivers/ncmpio/ncmpio_var.c +++ b/src/drivers/ncmpio/ncmpio_var.c @@ -415,10 +415,10 @@ ncmpio_def_var(void *ncdp, ncp->vars.ndefined++; err_check: - if (ncp->safe_mode) { + if (ncp->safe_mode && ncp->nprocs > 1) { int minE, mpireturn; - /* first check the error code across processes */ + /* First check the error code across processes */ TRACE_COMM(MPI_Allreduce)(&err, &minE, 1, MPI_INT, MPI_MIN, ncp->comm); if (mpireturn != MPI_SUCCESS) { if (nname != NULL) NCI_Free(nname); @@ -600,7 +600,7 @@ ncmpio_rename_var(void *ncdp, #endif err_check: - if (ncp->safe_mode) { + if (ncp->safe_mode && ncp->nprocs > 1) { int minE, mpireturn; /* First check error code so far across processes */ diff --git a/src/drivers/ncmpio/ncmpio_vard.c b/src/drivers/ncmpio/ncmpio_vard.c index e65d63df8..e35c2050c 100644 --- a/src/drivers/ncmpio/ncmpio_vard.c +++ b/src/drivers/ncmpio/ncmpio_vard.c @@ -296,12 +296,12 @@ getput_vard(NC *ncp, } status = err; - if (fIsSet(reqMode, NC_REQ_COLL)) { + /* when ncp->nprocs == 1, ncp->collective_fh == ncp->independent_fh */ + fh = ncp->independent_fh; + coll_indep = NC_REQ_INDEP; + if (ncp->nprocs > 1 && fIsSet(reqMode, NC_REQ_COLL)) { fh = ncp->collective_fh; coll_indep = NC_REQ_COLL; - } else { - fh = ncp->independent_fh; - coll_indep = NC_REQ_INDEP; } /* set the MPI-IO fileview, this is a collective call */ @@ -357,12 +357,14 @@ getput_vard(NC *ncp, /* new_numrecs may be different among processes. * First, find the max numrecs among all processes. */ - MPI_Offset max_numrecs; - TRACE_COMM(MPI_Allreduce)(&new_numrecs, &max_numrecs, 1, - MPI_OFFSET, MPI_MAX, ncp->comm); - if (mpireturn != MPI_SUCCESS) { - err = ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); - if (status == NC_NOERR) status = err; + MPI_Offset max_numrecs = new_numrecs; + if (ncp->nprocs > 1) { + TRACE_COMM(MPI_Allreduce)(&new_numrecs, &max_numrecs, 1, + MPI_OFFSET, MPI_MAX, ncp->comm); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); + if (status == NC_NOERR) status = err; + } } /* In collective mode, ncp->numrecs is always sync-ed among processes */ diff --git a/src/drivers/ncmpio/ncmpio_wait.c b/src/drivers/ncmpio/ncmpio_wait.c index 676e7ee51..bd5055c1a 100644 --- a/src/drivers/ncmpio/ncmpio_wait.c +++ b/src/drivers/ncmpio/ncmpio_wait.c @@ -65,14 +65,20 @@ ncmpio_getput_zero_req(NC *ncp, int reqMode) MPI_INFO_NULL); if (fIsSet(reqMode, NC_REQ_RD)) { - TRACE_IO(MPI_File_read_all)(fh, NULL, 0, MPI_BYTE, &mpistatus); + if (ncp->nprocs > 1) + TRACE_IO(MPI_File_read_all)(fh, NULL, 0, MPI_BYTE, &mpistatus); + else + TRACE_IO(MPI_File_read)(fh, NULL, 0, MPI_BYTE, &mpistatus); if (mpireturn != MPI_SUCCESS) { err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_all"); err = (err == NC_EFILE) ? NC_EREAD : err; DEBUG_ASSIGN_ERROR(status, err) } } else { /* write request */ - TRACE_IO(MPI_File_write_all)(fh, NULL, 0, MPI_BYTE, &mpistatus); + if (ncp->nprocs > 1) + TRACE_IO(MPI_File_write_all)(fh, NULL, 0, MPI_BYTE, &mpistatus); + else + TRACE_IO(MPI_File_write)(fh, NULL, 0, MPI_BYTE, &mpistatus); if (mpireturn != MPI_SUCCESS) { err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_all"); err = (err == NC_EFILE) ? NC_EWRITE : err; @@ -950,15 +956,15 @@ req_commit(NC *ncp, } /* synchronize request metadata across processes if collective I/O */ - if (coll_indep == NC_REQ_COLL) { + if (coll_indep == NC_REQ_COLL && ncp->nprocs > 1) { int mpireturn; - MPI_Offset io_req[4], do_io[4]; /* [0]: read [1]: write [2]: error */ - io_req[0] = num_r_reqs; - io_req[1] = num_w_reqs; - io_req[2] = -err; /* all NC errors are negative */ - io_req[3] = newnumrecs; - TRACE_COMM(MPI_Allreduce)(io_req, do_io, 4, MPI_OFFSET, MPI_MAX, - ncp->comm); + MPI_Offset do_io[4]; /* [0]: read [1]: write [2]: error */ + do_io[0] = num_r_reqs; + do_io[1] = num_w_reqs; + do_io[2] = -err; /* all NC errors are negative */ + do_io[3] = newnumrecs; + TRACE_COMM(MPI_Allreduce)(MPI_IN_PLACE, do_io, 4, MPI_OFFSET, + MPI_MAX, ncp->comm); if (mpireturn != MPI_SUCCESS) return ncmpii_error_mpi2nc(mpireturn, "MPI_Allreduce"); @@ -2039,10 +2045,9 @@ req_aggregation(NC *ncp, } NCI_Free(reqs); - if (coll_indep == NC_REQ_COLL) + fh = ncp->independent_fh; + if (ncp->nprocs > 1 && coll_indep == NC_REQ_COLL) fh = ncp->collective_fh; - else - fh = ncp->independent_fh; /* set the MPI-IO fileview, this is a collective call */ offset = 0; @@ -2461,10 +2466,9 @@ mgetput(NC *ncp, mpi_io: NCI_Free(reqs); - if (coll_indep == NC_REQ_COLL) + fh = ncp->independent_fh; + if (ncp->nprocs > 1 && coll_indep == NC_REQ_COLL) fh = ncp->collective_fh; - else - fh = ncp->independent_fh; /* set the MPI-IO fileview, this is a collective call */ err = ncmpio_file_set_view(ncp, fh, &offset, filetype);