diff --git a/runtime/include/chpl-comm.h b/runtime/include/chpl-comm.h index 9fd6d2432bee..332f361b26e6 100644 --- a/runtime/include/chpl-comm.h +++ b/runtime/include/chpl-comm.h @@ -155,6 +155,9 @@ void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); // detected. int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); +// Free a handle returned by chpl_comm_*_nb. +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h); + // Returns whether or not the passed wide address is known to be in // a communicable memory region and known to be readable. That is, // GET to that address should succeed without an access violation diff --git a/runtime/include/chpl-mem-desc.h b/runtime/include/chpl-mem-desc.h index 4855aef27d59..f9e65ea46892 100644 --- a/runtime/include/chpl-mem-desc.h +++ b/runtime/include/chpl-mem-desc.h @@ -67,6 +67,7 @@ extern "C" { m(COMM_PER_LOC_INFO, "comm layer per-locale information", false), \ m(COMM_PRV_OBJ_ARRAY, "comm layer private objects array", false), \ m(COMM_PRV_BCAST_DATA, "comm layer private broadcast data", false), \ + m(COMM_NB_HANDLE, "comm layer non-blocking handle", false), \ m(MEM_HEAP_SPACE, "mem layer heap expansion space", false), \ m(GLOM_STRINGS_DATA, "glom strings data", true ), \ m(STRING_LITERALS_BUF, "string literals buffer", true ), \ diff --git a/runtime/src/chpl-cache.c b/runtime/src/chpl-cache.c index 11acf41d4717..4ff6008409a3 100644 --- a/runtime/src/chpl-cache.c +++ b/runtime/src/chpl-cache.c @@ -1967,6 +1967,7 @@ chpl_bool do_wait_for(struct rdcache_s* cache, cache_seqn_t sn) // Whether we waited above or not, if the first entry's event // is already complete, then remove it from the queue. if (chpl_comm_test_nb_complete(cache->pending[index])) { + chpl_comm_free_nb_handle(cache->pending[index]); fifo_circleb_pop(&cache->pending_first_entry, &cache->pending_last_entry, cache->pending_len); diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index c6170ee00565..594e3e7b36c2 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -74,6 +74,7 @@ #include #include + #include #ifndef MAP_HUGETLB // MAP_HUGETLB is not defined on all systems (e.g. MacOS) @@ -186,8 +187,8 @@ static chpl_bool envInjectAM; // env: inject AM messages static chpl_bool envUseDedicatedAmhCores; // env: use dedicated AM cores static const char* envExpectedProvider; // env: provider we should select -static int numTxCtxs; -static int numRxCtxs; +static size_t numTxCtxs; +static size_t numRxCtxs; struct perTxCtxInfo_t { chpl_atomic_bool allocated; // true: in use; false: available @@ -359,18 +360,45 @@ static const char* mcmModeNames[] = { "undefined", static bool cxiHybridMRMode = false; + +// OFI-specific non-blocking handle implementation + +// This is defined here because it is used in the forward declarations below. +// The rountines to initialize and destroy handles, nb_handle_init and +// nb_handle_destroy appear in the RMA section later. The "id" is used to +// verify that the only the task that created the handle uses it -- this +// prevents multiple threads from simultaneously accessing the same transmit +// context if they are not bound to threads. The semantics of +// chpl_comm_test_nb_complete, chpl_comm_wait_nb_some, and chpl_comm_try_nb +// some require distinguishing newly-completed handles from those that that +// have previously commited. The "reported" field is used to distinguish +// between the two. The "complete" field is set when the operation completes. +// It is an atomic because the lower-level functions that set it require it. +// Operations that are too large for the underlying fabric are represented by +// a linked-list of handles. + +typedef struct nb_handle { + chpl_taskID_t id; // task that created the handle + chpl_bool reported; // operation has been reported as complete + chpl_atomic_bool complete; // operation has completed + struct nb_handle *next; +} nb_handle; + +typedef nb_handle* nb_handle_t; + //////////////////////////////////////// // // Forward decls // -static struct perTxCtxInfo_t* tciAlloc(void); +static struct perTxCtxInfo_t* tciAllocFunc(const char *, int); +#define tciAlloc() tciAllocFunc(__FILE__, __LINE__) static struct perTxCtxInfo_t* tciAllocForAmHandler(void); static chpl_bool tciAllocTabEntry(struct perTxCtxInfo_t*); static void tciFree(struct perTxCtxInfo_t*); static void waitForCQSpace(struct perTxCtxInfo_t*, size_t); -static chpl_comm_nb_handle_t ofi_put(const void*, c_nodeid_t, void*, size_t, - chpl_bool); +static void ofi_put(const void*, c_nodeid_t, void*, size_t); +static nb_handle_t ofi_put_nb(nb_handle_t, const void*, c_nodeid_t, void*, size_t); static void ofi_put_lowLevel(const void*, void*, c_nodeid_t, uint64_t, uint64_t, size_t, void*, uint64_t, struct perTxCtxInfo_t*); @@ -379,6 +407,8 @@ static chpl_comm_nb_handle_t ofi_get(void*, c_nodeid_t, void*, size_t); static void ofi_get_lowLevel(void*, void*, c_nodeid_t, uint64_t, uint64_t, size_t, void*, uint64_t, struct perTxCtxInfo_t*); +static chpl_bool check_complete(nb_handle_t*, size_t, chpl_bool); + static void do_remote_get_buff(void*, c_nodeid_t, void*, size_t); static void do_remote_amo_nf_buff(void*, c_nodeid_t, void*, size_t, enum fi_op, enum fi_datatype); @@ -1214,15 +1244,15 @@ void init_ofi(void) { (tciTab[tciTabLen - 1].txCntr == NULL) ? "CQ" : "counter"); if (ofi_txEpScal != NULL) { DBG_PRINTF(DBG_CFG, - "per node config: 1 scalable tx ep + %d tx ctx%s (%d bound), " - "%d rx ctx%s", + "per node config: 1 scalable tx ep + %zu tx ctx%s (%d bound), " + "%zu rx ctx%s", numTxCtxs, (numTxCtxs == 1) ? "" : "s", tciTabBindTxCtxs ? chpl_task_getFixedNumThreads() : 0, numRxCtxs, (numRxCtxs == 1) ? "" : "s"); } else { DBG_PRINTF(DBG_CFG, - "per node config: %d regular tx ep+ctx%s (%d bound), " - "%d rx ctx%s", + "per node config: %zu regular tx ep+ctx%s (%d bound), " + "%zu rx ctx%s", numTxCtxs, (numTxCtxs == 1) ? "" : "s", tciTabBindTxCtxs ? chpl_task_getFixedNumThreads() : 0, numRxCtxs, (numRxCtxs == 1) ? "" : "s"); @@ -1573,6 +1603,13 @@ struct fi_info* findProvInList(struct fi_info* info, if (best && (isInProvider("efa", best))) { best->tx_attr->inject_size = 0; } + + // Set the maximum message size if specified + + best->ep_attr->max_msg_size = + chpl_env_rt_get_int("COMM_OFI_MAX_MSG_SIZE", + best->ep_attr->max_msg_size); + return (best == NULL) ? NULL : fi_dupinfo(best); } @@ -1692,16 +1729,22 @@ chpl_bool canBindTxCtxs(struct fi_info* info) { // endpoints. Until that is fixed, assume it can create as many endpoints // as we need. size_t epCount = isInProvider("cxi", info) ? SIZE_MAX : dom_attr->ep_cnt; + + // Set the maximum number of endpoints if specified + + epCount = chpl_env_rt_get_int("COMM_OFI_EP_CNT", epCount); + size_t numWorkerTxCtxs = ((envPreferScalableTxEp && dom_attr->max_ep_tx_ctx > 1) ? dom_attr->max_ep_tx_ctx - : epCount) - - 1 - - numAmHandlers; + : epCount) - 1 - numAmHandlers; + if (envCommConcurrency > 0 && envCommConcurrency < numWorkerTxCtxs) { numWorkerTxCtxs = envCommConcurrency; } + numTxCtxs = numWorkerTxCtxs + 1 + numAmHandlers; + return fixedNumThreads <= numWorkerTxCtxs; } @@ -2500,11 +2543,29 @@ void init_ofiEp(void) { // Compute numbers of transmit and receive contexts, and then create // the transmit context table. // + // The logic here is a bit convoluted and can probably be cleaned up. See + // the tciTab comment above for more details. For non-scalable endpoints, + // we would like to have one transmit context (and therefore one endpoint) + // per worker thread, one per AM handler, and one for the process in + // general. That will allow us to bind worker threads and AM handlers to + // transmit contexts. If we can't get that many endpoints then transmit + // contexts will not be bound, which signficantly reduces performance. + // + // For scalable endpoints we only need one transmit endpoint with enough + // transmit contexts to bind them as described above. If max_ep_tx_ctx for + // the provider is less than that, then we won't use a scalable endpoint. + // If we are using a scalable endpoint we have to set tx_ctx_cnt to tell + // the provider how many transmit contexts we want per endpoint. + // + int desiredTxCtxs; tciTabBindTxCtxs = canBindTxCtxs(ofi_info); if (tciTabBindTxCtxs) { - numTxCtxs = chpl_task_getFixedNumThreads() + numAmHandlers + 1; + desiredTxCtxs = chpl_task_getFixedNumThreads() + numAmHandlers + 1; } else { - numTxCtxs = chpl_task_getMaxPar() + numAmHandlers + 1; + desiredTxCtxs = chpl_task_getMaxPar() + numAmHandlers + 1; + } + if (desiredTxCtxs < numTxCtxs) { + numTxCtxs = desiredTxCtxs; } DBG_PRINTF(DBG_CFG,"tciTabBindTxCtxs %s numTxCtxs %d numAmHandlers %d", tciTabBindTxCtxs ? "true" : "false", numTxCtxs, numAmHandlers); @@ -3322,7 +3383,7 @@ void chpl_comm_broadcast_private(int id, size_t size) { for (int i = 0; i < chpl_numNodes; i++) { if (i != chpl_nodeID) { (void) ofi_put(chpl_rt_priv_bcast_tab[id], i, - chplPrivBcastTabMap[i][id], size, true /*blocking*/); + chplPrivBcastTabMap[i][id], size); } } } @@ -4170,7 +4231,7 @@ static void am_debugPrep(amRequest_t*); static void amRequestExecOn(c_nodeid_t, c_sublocid_t, chpl_fn_int_t, chpl_comm_on_bundle_t*, size_t, chpl_bool, chpl_bool); -static void amRequestRmaPut(c_nodeid_t, void*, void*, size_t, chpl_bool); +static void amRequestRmaPut(c_nodeid_t, void*, void*, size_t); static void amRequestRmaGet(c_nodeid_t, void*, void*, size_t); static void amRequestAMO(c_nodeid_t, void*, const void*, const void*, void*, int, enum fi_datatype, size_t); @@ -4314,10 +4375,14 @@ void amRequestExecOn(c_nodeid_t node, c_sublocid_t subloc, } } - +/* + * amRequestRmaPut + * + * Performs a PUT by sending an active message to the remote node that causes + * it to perform a GET. This operation returns when the GET has completed. + */ static inline -void amRequestRmaPut(c_nodeid_t node, void* addr, void* raddr, size_t size, - chpl_bool blocking) { +void amRequestRmaPut(c_nodeid_t node, void* addr, void* raddr, size_t size) { assert(!isAmHandler); retireDelayedAmDone(false /*taskIsEnding*/); @@ -4335,7 +4400,7 @@ void amRequestRmaPut(c_nodeid_t node, void* addr, void* raddr, size_t size, .addr = raddr, .raddr = myAddr, .size = size, }, }; - amRequestCommon(node, &req, sizeof(req.rma), true, NULL); + amRequestCommon(node, &req, sizeof(req.rma), true /*blocking*/, NULL); mrUnLocalizeSource(myAddr, addr); } @@ -4360,7 +4425,7 @@ void amRequestRmaGet(c_nodeid_t node, void* addr, void* raddr, size_t size) { .addr = raddr, .raddr = myAddr, .size = size, }, }; - amRequestCommon(node, &req, sizeof(req.rma), true, NULL); + amRequestCommon(node, &req, sizeof(req.rma), true /*blocking*/, NULL); mrUnLocalizeTarget(myAddr, addr, size); } @@ -5281,8 +5346,7 @@ void amWrapPut(struct taskArg_RMA_t* tsk_rma) { DBG_PRINTF(DBG_AM | DBG_AM_RECV, "%s", am_reqStartStr((amRequest_t*) rma)); CHK_TRUE(mrGetKey(NULL, NULL, rma->b.node, rma->raddr, rma->size)); - (void) ofi_put(rma->addr, rma->b.node, rma->raddr, rma->size, - true /*blocking*/); + (void) ofi_put(rma->addr, rma->b.node, rma->raddr, rma->size); // // Note: the RMA bytes must be visible in target memory before the @@ -5446,13 +5510,87 @@ void amCheckLiveness(void) { // Interface: RMA // +static inline +void nb_handle_init(nb_handle_t h) { + h->id = chpl_task_getId(); + h->reported = false; + atomic_init_bool(&h->complete, false); + h->next = NULL; +} + +static inline +void nb_handle_destroy(nb_handle_t h) { + atomic_destroy_bool(&h->complete); +} + +/* + * put_prologue + * + * Common prologue operations for chpl_comm_put and chpl_comm_put_nb. Returns + * true if the PUT should proceed, false if it was handled in this function. + */ +static inline +chpl_bool put_prologue(void* addr, c_nodeid_t node, void* raddr, size_t size, + int32_t commID, int ln, int32_t fn) { + + chpl_bool proceed = false; + retireDelayedAmDone(false /*taskIsEnding*/); + + // + // Sanity checks, self-communication. + // + CHK_TRUE(addr != NULL); + CHK_TRUE(raddr != NULL); + + if (size == 0) { + goto done; + } + + if (node == chpl_nodeID) { + memmove(raddr, addr, size); + goto done; + } + + // Communications callback support + if (chpl_comm_have_callbacks(chpl_comm_cb_event_kind_put)) { + chpl_comm_cb_info_t cb_data = + {chpl_comm_cb_event_kind_put, chpl_nodeID, node, + .iu.comm={addr, raddr, size, commID, ln, fn}}; + chpl_comm_do_callbacks (&cb_data); + } + + chpl_comm_diags_verbose_rdma("put", node, size, ln, fn, commID); + chpl_comm_diags_incr(put); + proceed = true; +done: + return proceed; +} + +/* + * chpl_comm_put_nb + * + * Non-blocking PUT. The PUT may complete after this function returns. Returns + * a handle that can be used to wait for and check the status of the PUT. The + * handle may be NULL, in which case the PUT has already completed. The + * memory buffer must not be modified before the PUT completes. Completion + * indicates that subsequent PUTs to the same memory will occur after the + * completed PUT; it does not mean that the results of the PUT are visible in + * memory (see the README.md for details). Concurrent non-blocking PUTs may + * occur in any order. + */ chpl_comm_nb_handle_t chpl_comm_put_nb(void* addr, c_nodeid_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { - chpl_comm_put(addr, node, raddr, size, commID, ln, fn); - return NULL; -} + DBG_PRINTF(DBG_IFACE, + "%s(%p, %d, %p, %zd, %d)", __func__, + addr, (int) node, raddr, size, (int) commID); + nb_handle_t handle = NULL; + if (put_prologue(addr, node, raddr, size, commID, ln, fn)) { + handle = ofi_put_nb(handle, addr, node, raddr, size); + } + return (chpl_comm_nb_handle_t) handle; +} chpl_comm_nb_handle_t chpl_comm_get_nb(void* addr, c_nodeid_t node, void* raddr, size_t size, @@ -5462,75 +5600,126 @@ chpl_comm_nb_handle_t chpl_comm_get_nb(void* addr, c_nodeid_t node, } +static inline +int test_nb_complete(nb_handle_t handle) { + return handle != NULL ? handle->reported : 1; +} + int chpl_comm_test_nb_complete(chpl_comm_nb_handle_t h) { chpl_comm_diags_incr(test_nb); + return test_nb_complete((nb_handle_t) h); +} - // fi_cq_readfrom? - return ((void*) h) == NULL; +/* + * check_complete + * + * Returns true if a new handle completion is detected, false otherwise. + * Ignores handles that have previously completed (h->reported == true). If + * blocking is true and there are uncompleted handles this will not return + * until a new completion is detected. + */ +static +chpl_bool check_complete(nb_handle_t *handles, size_t nhandles, + chpl_bool blocking) { + + chpl_bool completed = false; // at least one new completion detected + chpl_bool pending = false; // there is an uncompleted handle + if ((handles == NULL) || (nhandles == 0)) { + goto done; + } + struct perTxCtxInfo_t* tcip = NULL; + while (true) { + pending = false; + for(size_t i = 0; i < nhandles; i++) { + nb_handle_t handle = handles[i]; + // ignore handles that have already completed + // NULL handles have by definition already completed + if ((handle == NULL) || handle->reported) { + continue; + } + if (handle->id != chpl_task_getId()) { + char msg[128]; + char task1[32]; + char task2[32]; + snprintf(msg, sizeof(msg), + "Task %s did not create non-blocking handle (created by %s)", + chpl_task_idToString(task1, sizeof(task1), chpl_task_getId()), + chpl_task_idToString(task2, sizeof(task2), handle->id)); + } + pending = true; + // determine if this handle is now complete by checking the completion + // status of its operations + chpl_bool allComplete = true; + for (nb_handle_t p = handle; p != NULL; p = p->next) { + if(!atomic_load_explicit_bool(&p->complete, + chpl_memory_order_acquire)) { + allComplete = false; + break; + } + } + if (allComplete) { + completed = true; + handle->reported = true; + } + } + if (!blocking || completed || !pending) { + break; + } + // progress the endpoint so handles can complete and then try again + if (tcip == NULL) { + CHK_TRUE((tcip = tciAlloc()) != NULL); + } + sched_yield(); + (*tcip->ensureProgressFn)(tcip); + } + if (tcip) { + tciFree(tcip); + } +done: + return completed; } +static inline +void wait_nb_some(nb_handle_t *handles, size_t nhandles) { + (void) check_complete(handles, nhandles, true /*blocking*/); +} void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(wait_nb); - - size_t i; - // fi_cq_readfrom? - for( i = 0; i < nhandles; i++ ) { - CHK_TRUE(h[i] == NULL); - } + wait_nb_some((nb_handle_t *) h, nhandles); } +static inline +int try_nb_some(nb_handle_t *handles, size_t nhandles) { + return check_complete(handles, nhandles, false /*blocking*/); +} int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(try_nb); + return try_nb_some((nb_handle_t *) h, nhandles); +} - size_t i; - // fi_cq_readfrom? - for( i = 0; i < nhandles; i++ ) { - CHK_TRUE(h[i] == NULL); +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h) { + nb_handle_t handle = (nb_handle_t) h; + nb_handle_t next; + for (; handle != NULL; handle = next) { + next = handle->next; + nb_handle_destroy(handle); + chpl_mem_free(handle, 0, 0); } - return 0; } - void chpl_comm_put(void* addr, c_nodeid_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { DBG_PRINTF(DBG_IFACE, "%s(%p, %d, %p, %zd, %d)", __func__, addr, (int) node, raddr, size, (int) commID); - retireDelayedAmDone(false /*taskIsEnding*/); - - // - // Sanity checks, self-communication. - // - CHK_TRUE(addr != NULL); - CHK_TRUE(raddr != NULL); - - if (size == 0) { - return; - } - - if (node == chpl_nodeID) { - memmove(raddr, addr, size); - return; - } - - // Communications callback support - if (chpl_comm_have_callbacks(chpl_comm_cb_event_kind_put)) { - chpl_comm_cb_info_t cb_data = - {chpl_comm_cb_event_kind_put, chpl_nodeID, node, - .iu.comm={addr, raddr, size, commID, ln, fn}}; - chpl_comm_do_callbacks (&cb_data); + if (put_prologue(addr, node, raddr, size, commID, ln, fn)) { + ofi_put(addr, node, raddr, size); } - - chpl_comm_diags_verbose_rdma("put", node, size, ln, fn, commID); - chpl_comm_diags_incr(put); - - (void) ofi_put(addr, node, raddr, size, true /*blocking*/); } - void chpl_comm_get(void* addr, int32_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { DBG_PRINTF(DBG_IFACE, @@ -5745,6 +5934,7 @@ void chpl_comm_ensure_progress(void) { // to begin with. CHK_TRUE((tcip = tciAlloc()) != NULL); (*tcip->ensureProgressFn)(tcip); + tciFree(tcip); } } @@ -5760,11 +5950,11 @@ static __thread struct perTxCtxInfo_t* _ttcip; static inline -struct perTxCtxInfo_t* tciAlloc(void) { +struct perTxCtxInfo_t* tciAllocFunc(const char *file, int line) { + DBG_PRINTF(DBG_TCIPS, "tciAlloc %s:%d]", file, line); return tciAllocCommon(false /*bindToAmHandler*/); } - static inline struct perTxCtxInfo_t* tciAllocForAmHandler(void) { return tciAllocCommon(true /*bindToAmHandler*/); @@ -5798,13 +5988,15 @@ struct perTxCtxInfo_t* tciAllocCommon(chpl_bool bindToAmHandler) { if (bindToAmHandler || (tciTabBindTxCtxs && chpl_task_isFixedThread())) { _ttcip->bound = true; + } + if (mcmMode != mcmm_dlvrCmplt) { _ttcip->putVisBitmap = bitmapAlloc(chpl_numNodes); if ((ofi_info->caps & FI_ATOMIC) != 0) { _ttcip->amoVisBitmap = bitmapAlloc(chpl_numNodes); } } - DBG_PRINTF(DBG_TCIPS, "alloc%s tciTab[%td]", - _ttcip->bound ? " bound" : "", _ttcip - tciTab); + DBG_PRINTF(DBG_TCIPS, "alloc%s tciTab[%td] %p", + _ttcip->bound ? " bound" : "", _ttcip - tciTab, _ttcip); return _ttcip; } @@ -5880,7 +6072,8 @@ void tciFree(struct perTxCtxInfo_t* tcip) { // Bound contexts stay bound. We only release non-bound ones. // if (!tcip->bound) { - DBG_PRINTF(DBG_TCIPS, "free tciTab[%td]", tcip - tciTab); + DBG_PRINTF(DBG_TCIPS, "free tciTab[%td] %p", tcip - tciTab, tcip); + forceMemFxVisAllNodes(true, true, -1, tcip); atomic_store_bool(&tcip->allocated, false); } } @@ -5900,70 +6093,124 @@ void waitForCQSpace(struct perTxCtxInfo_t* tcip, size_t len) { } } -typedef chpl_comm_nb_handle_t (rmaPutFn_t)(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip); +typedef void (rmaPutFn_t)(nb_handle_t handle, void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + struct perTxCtxInfo_t* tcip); static rmaPutFn_t rmaPutFn_selector; +/* + * ofi_put + * + * Blocking PUT. Implemented by initiating a non-blocking PUT and waiting for + * it to complete. + */ + static inline -chpl_comm_nb_handle_t ofi_put(const void* addr, c_nodeid_t node, - void* raddr, size_t size, chpl_bool blocking) { - // - // Don't ask the provider to transfer more than it wants to. - // +void ofi_put(const void* addr, c_nodeid_t node, void* raddr, size_t size) { + + // Allocate the handle on the stack to avoid malloc overhead + nb_handle handle_struct; + nb_handle_t handle = &handle_struct; + nb_handle_init(handle); + + handle = ofi_put_nb(handle, addr, node, raddr, size); + do { + wait_nb_some(&handle, 1); + } while(!test_nb_complete(handle)); + if (handle->next != NULL) { + // free any handles for sub-operations + chpl_comm_free_nb_handle(handle->next); + } + nb_handle_destroy(handle); +} + +/* + * ofi_put_nb + * + * Non-blocking PUT. Returns a handle that can be used to test the completion + * status of the PUT and wait for it to complete. If the PUT is too large + * for the fabric it is broken into multiple PUTs. + * + */ +static +nb_handle_t ofi_put_nb(nb_handle_t handle, const void* addr, c_nodeid_t node, + void* raddr, size_t size) { + + char *src = (char *) addr; + char *dest = (char *) raddr; + nb_handle_t prev = NULL; + nb_handle_t first = NULL; + if (size > ofi_info->ep_attr->max_msg_size) { DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "splitting large PUT %d:%p <= %p, size %zd", (int) node, raddr, addr, size); - - size_t chunkSize = ofi_info->ep_attr->max_msg_size; - for (size_t i = 0; i < size; i += chunkSize) { - if (chunkSize > size - i) { - chunkSize = size - i; - } - (void) ofi_put(&((const char*) addr)[i], node, &((char*) raddr)[i], - chunkSize, blocking); - } - - return NULL; } - DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, - "PUT %d:%p <= %p, size %zd", - (int) node, raddr, addr, size); + struct perTxCtxInfo_t* tcip = NULL; + CHK_TRUE((tcip = tciAlloc()) != NULL); - // - // If the remote address is directly accessible do an RMA from this - // side; otherwise do the opposite RMA from the other side. - // - chpl_comm_nb_handle_t ret; - uint64_t mrKey; - uint64_t mrRaddr; - if (mrGetKey(&mrKey, &mrRaddr, node, raddr, size)) { - struct perTxCtxInfo_t* tcip; - CHK_TRUE((tcip = tciAlloc()) != NULL); - if (tcip->txCntr == NULL) { - waitForCQSpace(tcip, 1); + size_t chunkSize = ofi_info->ep_attr->max_msg_size; + size_t offset = 0; + while (offset < size) { + if (chunkSize > size - offset) { + chunkSize = size - offset; } + DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, + "PUT %d:%p <= %p, size %zd", + (int) node, dest, src, chunkSize); - void* mrDesc; - void* myAddr = mrLocalizeSource(&mrDesc, addr, size, "PUT src"); + if (handle == NULL) { + handle = chpl_mem_alloc(sizeof(*handle), + CHPL_RT_MD_COMM_NB_HANDLE, 0, 0); + nb_handle_init(handle); + } + // Make a linked-list of handles + if (prev != NULL) { + prev->next = handle; + } + // Keep track of the first handle so we can return it. + if (first == NULL) { + first = handle; + } - ret = rmaPutFn_selector(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); + // + // If the remote address is directly accessible do a PUT RMA from this + // side; otherwise do a GET from the other side. + // + uint64_t mrKey; + uint64_t mrRaddr; + if (mrGetKey(&mrKey, &mrRaddr, node, (void *) dest, chunkSize)) { + if (tcip->txCntr == NULL) { + // TODO: why is this necessary? + waitForCQSpace(tcip, 1); + } - mrUnLocalizeSource(myAddr, addr); - tciFree(tcip); - } else { - amRequestRmaPut(node, (void*) addr, raddr, size, blocking); - ret = NULL; - } + void* mrDesc; + void* myAddr = mrLocalizeSource(&mrDesc, (const void *) src, + chunkSize, "PUT src"); - return ret; + rmaPutFn_selector(handle, myAddr, mrDesc, node, mrRaddr, + mrKey, chunkSize, tcip); + + mrUnLocalizeSource(myAddr, src); + } else { + amRequestRmaPut(node, (void *) src, (void *) dest, size); + atomic_store_bool(&handle->complete, true); + } + offset += chunkSize; + src += chunkSize; + dest += chunkSize; + prev = handle; + handle = NULL; + } + tciFree(tcip); + DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, + "PUT %d:%p <= %p, handle %p", first); + return first; } @@ -5972,33 +6219,29 @@ static rmaPutFn_t rmaPutFn_msgOrd; static rmaPutFn_t rmaPutFn_dlvrCmplt; static inline -chpl_comm_nb_handle_t rmaPutFn_selector(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { - chpl_comm_nb_handle_t ret = NULL; +void rmaPutFn_selector(nb_handle_t handle, void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + struct perTxCtxInfo_t* tcip) { switch (mcmMode) { - case mcmm_msgOrdFence: - ret = rmaPutFn_msgOrdFence(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); - break; - case mcmm_msgOrd: - ret = rmaPutFn_msgOrd(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); - break; - case mcmm_dlvrCmplt: - ret = rmaPutFn_dlvrCmplt(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); - break; - default: - INTERNAL_ERROR_V("unexpected mcmMode %d", mcmMode); - break; - } - - return ret; + case mcmm_msgOrdFence: + rmaPutFn_msgOrdFence(handle, myAddr, mrDesc, node, mrRaddr, mrKey, size, + tcip); + break; + case mcmm_msgOrd: + rmaPutFn_msgOrd(handle, myAddr, mrDesc, node, mrRaddr, mrKey, size, + tcip); + break; + case mcmm_dlvrCmplt: + rmaPutFn_dlvrCmplt(handle, myAddr, mrDesc, node, mrRaddr, mrKey, size, + tcip); + break; + default: + INTERNAL_ERROR_V("unexpected mcmMode %d", mcmMode); + break; + } } @@ -6016,134 +6259,93 @@ static ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc, // -// Implements ofi_put() when MCM mode is message ordering with fences. +// Implements ofi_put_nb() when MCM mode is message ordering with fences. // static -chpl_comm_nb_handle_t rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { - uint64_t flags = 0; - chpl_atomic_bool txnDone; - void *ctx; +void rmaPutFn_msgOrdFence(nb_handle_t handle, void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + struct perTxCtxInfo_t* tcip) { + uint64_t flags = 0; if (tcip->bound && size <= ofi_info->tx_attr->inject_size - && !blocking && envInjectRMA) { + && envInjectRMA) { // // Special case: write injection has the least latency. We can use it if - // this PUT is non-blocking, its size doesn't exceed the injection size + // this PUT doesn't exceed the injection size // limit, and we have a bound tx context so we can delay forcing the // memory visibility until later. // flags = FI_INJECT; - } else { - blocking = true; } if (bitmapTest(tcip->amoVisBitmap, node)) { // - // Special case: If our last operation was an AMO then we need to do a - // fenced PUT to force the AMO to complete before this PUT. + // Special case: If our last operation was an AMO then we need to do a + // fenced PUT to force the AMO to be visible before this PUT. + // TODO: this logic is a bit screwed-up. FI_FENCE by itself doesn't + // force the AMO to be visible, it just ensures that the PUT cannot pass + // the AMO. We need to do something to make it visible, and we need + // to clear the bitmap so that we don't keep fencing PUTs until something + // else makes it visible. // flags |= FI_FENCE; } - ctx = TX_CTX_INIT(tcip, blocking, &txnDone); + void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags, tcip); - if (blocking) { - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); - } - // // When using message ordering we have to do something after the PUT // to force it into visibility, and on the same tx context as the PUT // itself because libfabric message ordering is specific to endpoint - // pairs. With a bound tx context we can do it later, when needed. - // Otherwise we have to do it here, before we release the tx context. + // pairs. Indicate that there is dangling PUT to the remote node. // - if (tcip->bound) { - bitmapSet(tcip->putVisBitmap, node); - } else { - mcmReleaseOneNode(node, tcip, "PUT"); - } - - return NULL; + bitmapSet(tcip->putVisBitmap, node); } // -// Implements ofi_put() when MCM mode is message ordering. +// Implements ofi_put_nb() when MCM mode is message ordering. // TODO: see comment for rmaPutFn_msgOrdFence. static -chpl_comm_nb_handle_t rmaPutFn_msgOrd(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { +void rmaPutFn_msgOrd(nb_handle_t handle, void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + struct perTxCtxInfo_t* tcip) { uint64_t flags = 0; - chpl_atomic_bool txnDone; - void *ctx; - // - // When using message ordering we have to do something after the PUT - // to force it into visibility, and on the same tx context as the PUT - // itself because libfabric message ordering is specific to endpoint - // pairs. With a bound tx context we can do it later, when needed. - // Otherwise we have to do it here, before we release the tx context. - // if (tcip->bound && size <= ofi_info->tx_attr->inject_size - && !blocking && envInjectRMA) { + && envInjectRMA) { // // Special case: write injection has the least latency. We can use // that if this PUT's size doesn't exceed the injection size limit // and we have a bound tx context so we can delay forcing the // memory visibility until later. flags = FI_INJECT; - } else { - blocking = true; } - ctx = TX_CTX_INIT(tcip, blocking, &txnDone); + void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags, tcip); - - if (blocking) { - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); - } - - if (tcip->bound) { - bitmapSet(tcip->putVisBitmap, node); - } else { - mcmReleaseOneNode(node, tcip, "PUT"); - } - - return NULL; + bitmapSet(tcip->putVisBitmap, node); } // -// Implements ofi_put() when MCM mode is delivery complete. +// Implements ofi_put_nb() when MCM mode is delivery complete. // static -chpl_comm_nb_handle_t rmaPutFn_dlvrCmplt(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { - chpl_atomic_bool txnDone; - void *ctx = TX_CTX_INIT(tcip, true /*blocking*/, &txnDone); +void rmaPutFn_dlvrCmplt(nb_handle_t handle, void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + struct perTxCtxInfo_t* tcip) { + void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); (void) wrap_fi_write(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, tcip); - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); - return NULL; } @@ -6192,8 +6394,8 @@ ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc, } DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "tx write msg: %d:%#" PRIx64 " <= %p, size %zd, ctx %p, " - "flags %#" PRIx64, - (int) node, mrRaddr, addr, size, ctx, flags); + "flags %#" PRIx64 " tcip %p", + (int) node, mrRaddr, addr, size, ctx, flags, tcip); OFI_RIDE_OUT_EAGAIN(tcip, fi_writemsg(tcip->txCtx, &msg, flags)); tcip->numTxnsOut++; tcip->numTxnsSent++; @@ -6299,7 +6501,7 @@ void do_remote_put_buff(void* addr, c_nodeid_t node, void* raddr, if (size > MAX_UNORDERED_TRANS_SZ || !mrGetKey(&mrKey, &mrRaddr, node, raddr, size) || (info = task_local_buff_acquire(put_buff)) == NULL) { - (void) ofi_put(addr, node, raddr, size, true /*blocking*/); + (void) ofi_put(addr, node, raddr, size); return; } @@ -6374,6 +6576,7 @@ chpl_comm_nb_handle_t ofi_get(void* addr, c_nodeid_t node, if (mrGetKey(&mrKey, &mrRaddr, node, raddr, size)) { struct perTxCtxInfo_t* tcip; CHK_TRUE((tcip = tciAlloc()) != NULL); + // TODO: Why is this necessary? waitForCQSpace(tcip, 1); void* mrDesc; @@ -7331,15 +7534,11 @@ void forceMemFxVisAllNodes(chpl_bool checkPuts, chpl_bool checkAmos, struct perTxCtxInfo_t* tcip) { // // Enforce MCM: make sure the memory effects of all the operations - // we've done so far, to any node, are actually visible. This is only - // needed if we have a bound tx context. Otherwise, we would have - // forced visibility at the time of the operation. + // we've done so far, to any node, are actually visible. // - if (tcip->bound) { - mcmReleaseAllNodes(checkPuts ? tcip->putVisBitmap : NULL, - checkAmos ? tcip->amoVisBitmap : NULL, - skipNode, tcip, "PUT and/or AMO"); - } + mcmReleaseAllNodes(checkPuts ? tcip->putVisBitmap : NULL, + checkAmos ? tcip->amoVisBitmap : NULL, + skipNode, tcip, "PUT and/or AMO"); } @@ -8144,7 +8343,7 @@ void chpl_comm_impl_barrier(const char *msg) { DBG_PRINTF(DBG_BARRIER, "BAR notify parent %d", (int) bar_parent); ofi_put(&one, bar_parent, (void*) &bar_infoMap[bar_parent]->child_notify[parChild], - sizeof(one), true /*blocking*/); + sizeof(one)); // // Wait for our parent locale to release us from the barrier. @@ -8171,7 +8370,7 @@ void chpl_comm_impl_barrier(const char *msg) { DBG_PRINTF(DBG_BARRIER, "BAR release child %d", (int) child); ofi_put(&one, child, (void*) &bar_infoMap[child]->parent_release, - sizeof(one), true /*blocking*/); + sizeof(one)); } } @@ -8231,7 +8430,7 @@ void ofiErrReport(const char* exprStr, int retVal, const char* errStr) { "OFI error: %s: %s:\n" " The program has reached the limit on the number of files it can\n" " have open at once. This may be because the product of the number\n" - " of locales (%d) and the communication concurrency (roughly %d) is\n" + " of locales (%d) and the communication concurrency (roughly %zu) is\n" " a significant fraction of the open-file limit (%ld). If so,\n" " either setting CHPL_RT_COMM_CONCURRENCY to decrease communication\n" " concurrency or running on fewer locales may allow the program to\n" diff --git a/test/runtime/configMatters/comm/large-rma/EXECENV b/test/runtime/configMatters/comm/large-rma/EXECENV new file mode 100644 index 000000000000..2e7098a16072 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/EXECENV @@ -0,0 +1 @@ +CHPL_RT_COMM_OFI_MAX_MSG_SIZE=100 \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/README b/test/runtime/configMatters/comm/large-rma/README new file mode 100644 index 000000000000..2f57cdde6057 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/README @@ -0,0 +1,3 @@ +Test RMA operations that are larger than the maximum message size of the fabric +and therefore require multiple transfers. This is accomplished by setting +the CHPL_RT_COMM_OFI_MAX_MSG_SIZE to a small value. \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.chpl b/test/runtime/configMatters/comm/large-rma/bigTransfer.chpl new file mode 120000 index 000000000000..3d38c2034ae9 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.chpl @@ -0,0 +1 @@ +../bigTransfer.chpl \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.compopts b/test/runtime/configMatters/comm/large-rma/bigTransfer.compopts new file mode 120000 index 000000000000..3d12c48c7991 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.compopts @@ -0,0 +1 @@ +../bigTransfer.compopts \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.execopts b/test/runtime/configMatters/comm/large-rma/bigTransfer.execopts new file mode 100644 index 000000000000..e7574f0058dd --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.execopts @@ -0,0 +1 @@ +--doGET=false --xferMB=2048 diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.good b/test/runtime/configMatters/comm/large-rma/bigTransfer.good new file mode 120000 index 000000000000..523fb7880072 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.good @@ -0,0 +1 @@ +../bigTransfer.good \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.numlocales b/test/runtime/configMatters/comm/large-rma/bigTransfer.numlocales new file mode 120000 index 000000000000..6d7c873c2c01 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.numlocales @@ -0,0 +1 @@ +../bigTransfer.numlocales \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/EXECENV b/test/runtime/configMatters/comm/unbound/EXECENV new file mode 100644 index 000000000000..a2aab52168d8 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/EXECENV @@ -0,0 +1 @@ +CHPL_RT_COMM_OFI_EP_CNT=10 \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/README b/test/runtime/configMatters/comm/unbound/README new file mode 100644 index 000000000000..6c22d25f53b5 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/README @@ -0,0 +1,2 @@ +Tests for CHPL_COMM=ofi with unbound endpoints. This is accomplished by +setting CHPL_RT_COMM_OFI_EP_CNT to a small value. \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/SKIPIF b/test/runtime/configMatters/comm/unbound/SKIPIF new file mode 100644 index 000000000000..1a0e68776535 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/SKIPIF @@ -0,0 +1 @@ +CHPL_COMM != ofi \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.chpl b/test/runtime/configMatters/comm/unbound/bigTransfer.chpl new file mode 120000 index 000000000000..3d38c2034ae9 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.chpl @@ -0,0 +1 @@ +../bigTransfer.chpl \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.compopts b/test/runtime/configMatters/comm/unbound/bigTransfer.compopts new file mode 120000 index 000000000000..3d12c48c7991 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.compopts @@ -0,0 +1 @@ +../bigTransfer.compopts \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.execopts b/test/runtime/configMatters/comm/unbound/bigTransfer.execopts new file mode 100644 index 000000000000..e7574f0058dd --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.execopts @@ -0,0 +1 @@ +--doGET=false --xferMB=2048 diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.good b/test/runtime/configMatters/comm/unbound/bigTransfer.good new file mode 120000 index 000000000000..523fb7880072 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.good @@ -0,0 +1 @@ +../bigTransfer.good \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.numlocales b/test/runtime/configMatters/comm/unbound/bigTransfer.numlocales new file mode 120000 index 000000000000..6d7c873c2c01 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.numlocales @@ -0,0 +1 @@ +../bigTransfer.numlocales \ No newline at end of file