Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 11 additions & 39 deletions src/host/proxy/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,51 +686,23 @@ int process_channel_amo(proxy_state_t *state, proxy_channel_t *ch, int *is_proce
}

void enforce_cst(proxy_state_t *proxy_state) {
#if defined(NVSHMEM_X86_64)
nvshmemi_state_t *state = proxy_state->nvshmemi_state;
#endif

int status = 0;

if (nvshmemi_options.BYPASS_FLUSH) return;

if (proxy_state->is_consistency_api_supported) {
if (CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TO_OWNER > proxy_state->gdr_device_native_ordering &&
CUPFN(nvshmemi_cuda_syms, cuFlushGPUDirectRDMAWrites)) {
status =
CUPFN(nvshmemi_cuda_syms,
cuFlushGPUDirectRDMAWrites(CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TARGET_CURRENT_CTX,
CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TO_OWNER));
/** We would want to use cudaFlushGPUDirectRDMAWritesToAllDevices when we enable
consistent access of data on any GPU (and not just self GPU) with
wait_until, quiet, barrier, etc. **/
if (status != CUDA_SUCCESS) {
NVSHMEMI_ERROR_EXIT("cuFlushGPUDirectRDMAWrites() failed in the proxy thread \n");
}
}
return;
}
#if defined(NVSHMEM_PPC64LE)
status = cudaEventRecord(proxy_state->cuev, proxy_state->stream);
if (unlikely(status != CUDA_SUCCESS)) {
NVSHMEMI_ERROR_EXIT("cuEventRecord() failed in the proxy thread \n");
}
#elif defined(NVSHMEM_X86_64)
for (int i = 0; i < state->num_initialized_transports; i++) {
if (!((state->transport_bitmap) & (1 << i))) continue;
struct nvshmem_transport *tcurr = state->transports[i];
if (!tcurr->host_ops.enforce_cst) continue;

// assuming the transport is connected - IB RC
if (tcurr->attr & NVSHMEM_TRANSPORT_ATTR_CONNECTED) {
status = tcurr->host_ops.enforce_cst(tcurr);
if (status) {
NVSHMEMI_ERROR_PRINT("aborting due to error in progress_cst \n");
exit(-1);
}
if (CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TO_OWNER > proxy_state->gdr_device_native_ordering &&
CUPFN(nvshmemi_cuda_syms, cuFlushGPUDirectRDMAWrites)) {
status =
CUPFN(nvshmemi_cuda_syms,
cuFlushGPUDirectRDMAWrites(CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TARGET_CURRENT_CTX,
CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TO_OWNER));
/** We would want to use cudaFlushGPUDirectRDMAWritesToAllDevices when we enable
consistent access of data on any GPU (and not just self GPU) with
wait_until, quiet, barrier, etc. **/
if (status != CUDA_SUCCESS) {
NVSHMEMI_ERROR_EXIT("cuFlushGPUDirectRDMAWrites() failed in the proxy thread \n");
}
}
#endif
}

inline void quiet_ack_channels(proxy_state_t *proxy_state) {
Expand Down
1 change: 0 additions & 1 deletion src/include/internal/host_transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ struct nvshmem_transport_host_ops {
fence_handle fence;
quiet_handle quiet;
put_signal_handle put_signal;
int (*enforce_cst)(struct nvshmem_transport *transport);
int (*enforce_cst_at_target)(struct nvshmem_transport *transport);
int (*add_device_remote_mem_handles)(struct nvshmem_transport *transport, int transport_stride,
nvshmem_mem_handle_t *mem_handles, uint64_t heap_offset,
Expand Down
3 changes: 3 additions & 0 deletions src/modules/transport/common/env_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ NVSHMEMI_ENV_DEF(DISABLE_LOCAL_ONLY_PROXY, bool, false, NVSHMEMI_ENV_CAT_TRANSPO
NVSHMEMI_ENV_DEF(LIBFABRIC_PROVIDER, string, "cxi", NVSHMEMI_ENV_CAT_TRANSPORT,
"Set the feature set provider for the libfabric transport: cxi, efa, verbs")

NVSHMEMI_ENV_DEF(LIBFABRIC_MAX_NIC_PER_PE, int, 16, NVSHMEMI_ENV_CAT_TRANSPORT,
"Set the maximum number of NIC's per PE to use for libfabric provider")

#if defined(NVSHMEM_IBGDA_SUPPORT) || defined(NVSHMEM_ENV_ALL)
/** GPU-initiated communication **/
NVSHMEMI_ENV_DEF(IBGDA_ENABLE_MULTI_PORT, bool, false, NVSHMEMI_ENV_CAT_TRANSPORT,
Expand Down
41 changes: 0 additions & 41 deletions src/modules/transport/ibdevx/ibdevx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1440,46 +1440,6 @@ int nvshmemt_ibdevx_amo(struct nvshmem_transport *tcurr, int pe, void *curetptr,
return status;
}

int nvshmemt_ibdevx_enforce_cst_at_target(struct nvshmem_transport *tcurr) {
nvshmemt_ib_common_state_t ibdevx_state = (nvshmemt_ib_common_state_t)tcurr->state;
struct ibdevx_ep *ep = (struct ibdevx_ep *)ibdevx_state->cst_ep;
struct ibdevx_rw_wqe *wqe;

int status = 0;

uintptr_t wqe_bb_idx_64 = ep->wqe_bb_idx;
uint32_t wqe_bb_idx_32 = ep->wqe_bb_idx;
size_t wqe_size;

wqe = (struct ibdevx_rw_wqe *)((char *)ep->wq_buf +
((wqe_bb_idx_64 % get_ibdevx_qp_depth(ibdevx_state))
<< NVSHMEMT_IBDEVX_WQE_BB_SHIFT));
wqe_size = sizeof(struct ibdevx_rw_wqe);
memset(wqe, 0, sizeof(struct ibdevx_rw_wqe));

wqe->ctrl.fm_ce_se = MLX5_WQE_CTRL_CQ_UPDATE;
wqe->ctrl.qpn_ds =
htobe32((uint32_t)(wqe_size / NVSHMEMT_IBDEVX_MLX5_SEND_WQE_DS) | ep->qpid << 8);
wqe->ctrl.opmod_idx_opcode = htobe32(MLX5_OPCODE_RDMA_READ | (wqe_bb_idx_32 << 8));

wqe->raddr.raddr = htobe64((uintptr_t)local_dummy_mr.mr->addr);
wqe->raddr.rkey = htobe32(local_dummy_mr.rkey);

wqe->data.data_seg.byte_count = htobe32((uint32_t)4);
wqe->data.data_seg.lkey = htobe32(local_dummy_mr.lkey);
wqe->data.data_seg.addr = htobe64((uintptr_t)local_dummy_mr.mr->addr);

assert(wqe_size <= MLX5_SEND_WQE_BB);
ep->wqe_bb_idx++;
nvshmemt_ibdevx_post_send(ep, (void *)wqe, 1);

status = nvshmemt_ib_common_check_poll_avail(tcurr, ep, NVSHMEMT_IB_COMMON_WAIT_ALL);
NVSHMEMI_NZ_ERROR_JMP(status, NVSHMEMX_ERROR_INTERNAL, out, "check_poll failed \n");

out:
return status;
}

// Using common fence and quiet functions from transport_ib_common

int nvshmemt_ibdevx_ep_create(struct ibdevx_ep **ep, int devid, nvshmem_transport_t t,
Expand Down Expand Up @@ -1932,7 +1892,6 @@ int nvshmemt_init(nvshmem_transport_t *t, struct nvshmemi_cuda_fn_table *table,
transport->host_ops.finalize = nvshmemt_ibdevx_finalize;
transport->host_ops.show_info = nvshmemt_ibdevx_show_info;
transport->host_ops.progress = nvshmemt_ibdevx_progress;
transport->host_ops.enforce_cst = nvshmemt_ibdevx_enforce_cst_at_target;
transport->host_ops.put_signal = nvshmemt_put_signal;

transport->attr = NVSHMEM_TRANSPORT_ATTR_CONNECTED;
Expand Down
1 change: 0 additions & 1 deletion src/modules/transport/ibgda/ibgda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4915,7 +4915,6 @@ int nvshmemt_init(nvshmem_transport_t *t, struct nvshmemi_cuda_fn_table *table,
transport->host_ops.amo = NULL;
transport->host_ops.fence = NULL;
transport->host_ops.quiet = NULL;
transport->host_ops.enforce_cst = NULL;
transport->host_ops.add_device_remote_mem_handles =
nvshmemt_ibgda_add_device_remote_mem_handles;
transport->host_ops.put_signal = NULL;
Expand Down
1 change: 0 additions & 1 deletion src/modules/transport/ibrc/ibrc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1802,7 +1802,6 @@ int nvshmemt_init(nvshmem_transport_t *t, struct nvshmemi_cuda_fn_table *table,
transport->host_ops.progress = nvshmemt_ibrc_progress;
transport->host_ops.put_signal = nvshmemt_put_signal;

transport->host_ops.enforce_cst = nvshmemt_ibrc_enforce_cst_at_target;
#if !defined(NVSHMEM_PPC64LE) && !defined(NVSHMEM_AARCH64)
if (!use_gdrcopy)
#endif
Expand Down
Loading