Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 50 additions & 28 deletions src/dtl/ucx_dtl.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ static ucs_status_t dyad_ucx_request_wait (dyad_dtl_ucx_t* dtl_handle,
return UCS_OK;
}

static inline dyad_rc_t dyad_dtl_ucx_finalize_impl (dyad_dtl_ucx_t** dtl_handle)
{
}

dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self,
dyad_dtl_mode_t mode,
flux_t* h,
Expand All @@ -127,6 +123,7 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self,
// Allocation/Freeing of the Flux handle should be
// handled by the DYAD context
dtl_handle->h = h;
dtl_handle->producer_rank = 0;
dtl_handle->debug = debug;
dtl_handle->ucx_ctx = NULL;
dtl_handle->ucx_worker = NULL;
Expand All @@ -135,6 +132,12 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self,
dtl_handle->consumer_address = NULL;
dtl_handle->addr_len = 0;
dtl_handle->comm_tag = 0;
dtl_handle->tag_counter = 0;

if (flux_get_rank (h, &(dtl_handle->producer_rank)) < 0) {
FLUX_LOG_ERR (h, "Could not get Flux rank in UCX DTL!\n");
goto error;
}

// Read the UCX configuration
FLUX_LOG_INFO (dtl_handle->h, "Reading UCP config\n");
Expand Down Expand Up @@ -272,23 +275,10 @@ dyad_rc_t dyad_dtl_ucx_rpc_pack (dyad_dtl_t* restrict self,
FLUX_LOG_ERR (dtl_handle->h, "Cannot get consumer rank\n");
return DYAD_RC_FLUXFAIL;
}
// The tag is a 64 bit unsigned integer consisting of the
// 32-bit rank of the producer followed by the 32-bit rank
// of the consumer
dtl_handle->comm_tag = ((uint64_t)producer_rank << 32) | (uint64_t)consumer_rank;
// Use Jansson to pack the tag and UCX address into
// the payload to be sent via RPC to the producer plugin
FLUX_LOG_INFO (dtl_handle->h, "Packing RPC payload for UCX DTL\n");
*packed_obj = json_pack ("{s:s, s:i, s:i, s:s%}",
"upath",
upath,
"tag_prod",
(int)producer_rank,
"tag_cons",
(int)consumer_rank,
"ucx_addr",
enc_buf,
enc_len);
*packed_obj = json_pack ("{s:s, s:s%}", "upath", upath, "ucx_addr", enc_buf, enc_len);
free (enc_buf);
// If the packing failed, log an error
if (*packed_obj == NULL) {
Expand All @@ -310,25 +300,17 @@ dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char
FLUX_LOG_INFO (dtl_handle->h, "Unpacking RPC payload\n");
errcode = flux_request_unpack (msg,
NULL,
"{s:s, s:i, s:i, s:s%}",
"{s:s, s:s%}",
"upath",
upath,
"tag_prod",
&tag_prod,
"tag_cons",
&tag_cons,
"ucx_addr",
&enc_addr,
&enc_addr_len);
if (errcode < 0) {
FLUX_LOG_ERR (dtl_handle->h, "Could not unpack Flux message from consumer!\n");
return DYAD_RC_BADUNPACK;
}
dtl_handle->comm_tag = ((uint64_t)tag_prod << 32) | (uint64_t)tag_cons;
FLUX_LOG_INFO (dtl_handle->h, "Obtained upath from RPC payload: %s\n", upath);
FLUX_LOG_INFO (dtl_handle->h,
"Obtained UCP tag from RPC payload: %lu\n",
dtl_handle->comm_tag);
FLUX_LOG_INFO (dtl_handle->h, "Decoding consumer UCP address using base64\n");
dtl_handle->addr_len = base64_decoded_length (enc_addr_len);
dtl_handle->consumer_address = (ucp_address_t*)malloc (dtl_handle->addr_len);
Expand All @@ -353,12 +335,52 @@ dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char

dyad_rc_t dyad_dtl_ucx_rpc_respond (dyad_dtl_t* self, const flux_msg_t* orig_msg)
{
int errcode = 0;
dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle;
dtl_handle->comm_tag =
((uint64_t)(dtl_handle->producer_rank) << 32) | (dtl_handle->tag_counter);
dtl_handle->tag_counter += 1;
errcode = flux_respond_pack (dtl_handle->h,
orig_msg,
"{s:i, s:i}",
"tag_rank",
(int)(dtl_handle->producer_rank),
"tag_count",
(int)(dtl_handle->tag_counter));
if (FLUX_IS_ERROR (errcode)) {
FLUX_LOG_ERR (dtl_handle->h, "Cannot send response to consumer");
return DYAD_RC_BADRESPONSE;
}
return DYAD_RC_OK;
}

dyad_rc_t dyad_dtl_ucx_rpc_recv_response (dyad_dtl_t* self, flux_future_t* f)
{
return DYAD_RC_OK;
dyad_rc_t rc = DYAD_RC_OK;
int errcode = 0;
uint32_t prod_rank = 0;
uint32_t tag_counter = 0;
dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle;
errcode = flux_rpc_get_unpack (f,
"{s:i, s:i}",
"tag_rank",
&prod_rank,
"tag_count",
&tag_counter);
if (FLUX_IS_ERROR (errcode)) {
FLUX_LOG_ERR (dtl_handle->h, "Could not unpack response from server's UCX DTL");
if (errno == ENODATA)
rc = DYAD_RC_RPC_FINISHED;
else
rc = DYAD_RC_BADRPC;
goto finish_recv_response;
}
dtl_handle->comm_tag = ((uint64_t)(prod_rank) << 32) | (tag_counter);
rc = DYAD_RC_OK;

finish_recv_response:
flux_future_reset (f);
return rc;
}

dyad_rc_t dyad_dtl_ucx_establish_connection (dyad_dtl_t* self,
Expand Down
2 changes: 2 additions & 0 deletions src/dtl/ucx_dtl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

struct dyad_dtl_ucx {
flux_t* h;
uint32_t producer_rank;
bool debug;
ucp_context_h ucx_ctx;
ucp_worker_h ucx_worker;
ucp_ep_h ep;
dyad_dtl_comm_mode_t curr_comm_mode;
ucp_address_t* consumer_address;
size_t addr_len;
uint32_t tag_counter;
ucp_tag_t comm_tag;
};

Expand Down