Skip to content

Commit

Permalink
Merge pull request #92 from eisenhauer/LongMsgs
Browse files Browse the repository at this point in the history
Fix Long Message functionality
  • Loading branch information
eisenhauer authored Jan 23, 2025
2 parents 5ed8f25 + bb363a0 commit 7f19762
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ cmake_minimum_required(VERSION 3.14)
# GTKorvo
set(CMAKE_DIRECTORY_LABELS EVPath)

project(EVPath VERSION 4.5.5 LANGUAGES C CXX)
project(EVPath VERSION 4.5.6 LANGUAGES C CXX)

# Some boilerplate to setup nice output directories
include(GNUInstallDirs)
Expand Down
20 changes: 11 additions & 9 deletions cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2311,10 +2311,11 @@ timeout_conn(CManager cm, void *client_data)
byte_swap = 1;
case 0x004d4400: /* CMD\0 */
break;
case 0x00444d01: /* \1DMC reversed byte order long msg*/
case 0x00424d00: /* \0BMC reversed byte order long msg*/
byte_swap = 1;
case 0x004d4401: /* CMD\1 long msg*/
case 0x004d4200: /* CMB\0 long msg*/
short_length = 0;
get_attrs = 1;
break;
case 0x00414d00: /* \0AMC reversed byte order */
byte_swap = 1;
Expand Down Expand Up @@ -2390,7 +2391,8 @@ timeout_conn(CManager cm, void *client_data)
header_len = 16;
}
if (!short_length) {
header_len += 4; /* extra data length bytes */
header_len = 16; /* extra data length bytes */
skip = 0;
}
} else {
if (short_length) {
Expand Down Expand Up @@ -2451,6 +2453,7 @@ timeout_conn(CManager cm, void *client_data)
((char*)&tmp)[1] = base[6];
((char*)&tmp)[2] = base[5];
((char*)&tmp)[3] = base[4];
data_length += tmp;
if (header_len != 12) {
((char*)&attr_length)[0] = base[11];
((char*)&attr_length)[1] = base[10];
Expand All @@ -2463,10 +2466,10 @@ timeout_conn(CManager cm, void *client_data)
#else
checksum = (unsigned char) check_sum_base[0];
#endif
data_length = ((int64_t)(((int *) base)[0])) << 32;
data_length += ((int *) base)[1];
data_length = ((int64_t)(((unsigned int *) base)[1])) << 32;
data_length += ((unsigned int *) base)[0];
if (header_len != 12) {
attr_length = ((int *) base)[1];
attr_length = ((int *) base)[2];
}
}
}
Expand Down Expand Up @@ -3258,9 +3261,8 @@ INT_CMregister_invalid_message_handler(CManager cm, CMUnregCMHandler handler)
void *header_ptr = NULL;
int header_len = 0;
int no_attr_header[2] = {0x434d4400, 0}; /* CMD\0 in first entry */
// not yet impl int no_attr_long_header[4] = {0x434d4401, 0x434d4401, 0, 0}; /* CMD\1 in first entry, pad to 16 */
int attr_header[4] = {0x434d4100, 0x434d4100, 0, 0}; /* CMA\0 in first entry, pad to 16 */
int attr_long_header[4] = {0x434d4101, 0, 0, 0}; /* CMA\1 in first entry */
int attr_long_header[4] = {0x434d4200, 0, 0, 0}; /* CMB\0 in first entry */
FFSEncodeVector vec;
size_t length = 0, vec_count = 0, actual;
int do_write = 1;
Expand Down Expand Up @@ -3336,7 +3338,7 @@ INT_CMregister_invalid_message_handler(CManager cm, CMUnregCMHandler handler)
length += vec[vec_count].iov_len;
vec_count++;
}
if ((length & 0x7fffffff) == 0) {
if (length > 0x7fffffff) {
long_message = 1;
}
if (attrs != NULL) {
Expand Down
2 changes: 1 addition & 1 deletion cm_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ typedef attr_list (*CMTransport_listen_func)(CManager cm,
typedef void *(*CMTransport_read_block_func)(CMtrans_services svc,
void *conn_data,
ssize_t *actual, ssize_t *offset);
typedef int (*CMTransport_read_to_buffer_func)(CMtrans_services svc,
typedef ssize_t (*CMTransport_read_to_buffer_func)(CMtrans_services svc,
void *conn_data,
void *buffer,
ssize_t len, int block_flag);
Expand Down
44 changes: 8 additions & 36 deletions cmfabric.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ static atom_t CM_IP_INTERFACE = -1;
static atom_t CM_TRANSPORT = -1;

static int
check_host(hostname, sin_addr)
char *hostname;
void *sin_addr;
check_host(char *hostname, void *sin_addr)
{
struct hostent *host_addr;
host_addr = gethostbyname(hostname);
Expand Down Expand Up @@ -1369,9 +1367,7 @@ fabric_service_incoming(void *void_trans, void *void_eq)
}

extern void
libcmfabric_LTX_shutdown_conn(svc, fcd)
CMtrans_services svc;
fabric_conn_data_ptr fcd;
libcmfabric_LTX_shutdown_conn(CMtrans_services svc, fabric_conn_data_ptr fcd)
{
svc->trace_out(fcd->fabd->cm, "CMFABRIC shutdown_conn, removing select %d\n",
fcd->fd);
Expand Down Expand Up @@ -1507,14 +1503,8 @@ static int client_connect(CManager cm, CMtrans_services svc, transport_entry tra
}

static int
initiate_conn(cm, svc, trans, attrs, fcd, conn_attr_list, no_more_redirect)
CManager cm;
CMtrans_services svc;
transport_entry trans;
attr_list attrs;
fabric_conn_data_ptr fcd;
attr_list conn_attr_list;
int no_more_redirect;
initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs,
fabric_conn_data_ptr fcd, attr_list conn_attr_list, int no_more_redirect)
{
int int_port_num;
fabric_client_data_ptr fabd = (fabric_client_data_ptr) trans->trans_data;
Expand Down Expand Up @@ -1599,11 +1589,7 @@ int no_more_redirect;
* (name_str stores the machine name).
*/
extern CMConnection
libcmfabric_LTX_initiate_conn(cm, svc, trans, attrs)
CManager cm;
CMtrans_services svc;
transport_entry trans;
attr_list attrs;
libcmfabric_LTX_initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs)
{
fabric_conn_data_ptr fcd = create_fabric_conn_data(svc);
attr_list conn_attr_list = create_attr_list();
Expand Down Expand Up @@ -1690,12 +1676,7 @@ libcmfabric_LTX_self_check(CManager cm, CMtrans_services svc, transport_entry tr
}

extern int
libcmfabric_LTX_connection_eq(cm, svc, trans, attrs, fcd)
CManager cm;
CMtrans_services svc;
transport_entry trans;
attr_list attrs;
fabric_conn_data_ptr fcd;
libcmfabric_LTX_connection_eq(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs, fabric_conn_data_ptr fcd)
{

int int_port_num;
Expand Down Expand Up @@ -2269,11 +2250,7 @@ struct iovec {
#endif

extern void
libcmfabric_LTX_set_write_notify(trans, svc, fcd, enable)
transport_entry trans;
CMtrans_services svc;
fabric_conn_data_ptr fcd;
int enable;
libcmfabric_LTX_set_write_notify(transport_entry trans, CMtrans_services svc, fabric_conn_data_ptr fcd, int enable)
{
if (enable != 0) {
svc->fd_write_select(trans->cm, fcd->fd, (select_list_func) trans->write_possible,
Expand Down Expand Up @@ -2452,12 +2429,7 @@ libcmfabric_LTX_writev_complete_notify_func(CMtrans_services svc,
}

extern int
libcmfabric_LTX_writev_func(svc, fcd, iovs, iovcnt, attrs)
CMtrans_services svc;
fabric_conn_data_ptr fcd;
void *iovs;
int iovcnt;
attr_list attrs;
libcmfabric_LTX_writev_func(CMtrans_services svc, fabric_conn_data_ptr fcd, void *iovs, int iovcnt, attr_list attrs)
{
return libcmfabric_LTX_writev_complete_notify_func(svc, fcd, iovs, iovcnt,
attrs, NULL, NULL);
Expand Down
39 changes: 28 additions & 11 deletions cmsockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,12 @@ set_block_state(CMtrans_services svc, socket_conn_data_ptr scd,
#endif
}

#ifndef MAX_RW_COUNT
// Not actually defined outside the kernel as far as I know - GSE
#define MAX_RW_COUNT 0x7ffff000
//#define MAX_RW_COUNT 0x3ffff000 // Be more conservative.
#endif

extern ssize_t
libcmsockets_LTX_read_to_buffer_func(CMtrans_services svc, socket_conn_data_ptr scd, void *buffer, ssize_t requested_len, int non_blocking)
{
Expand All @@ -928,7 +934,9 @@ libcmsockets_LTX_read_to_buffer_func(CMtrans_services svc, socket_conn_data_ptr
scd->fd);
set_block_state(svc, scd, Non_Block);
}
iget = read(scd->fd, (char *) buffer, (int)requested_len);
ssize_t read_len = requested_len;
if (read_len > MAX_RW_COUNT) read_len = MAX_RW_COUNT;
iget = read(scd->fd, (char *) buffer, (int)read_len);
if ((iget == -1) || (iget == 0)) {
int lerrno = errno;
if ((lerrno != 0) &&
Expand All @@ -951,8 +959,10 @@ libcmsockets_LTX_read_to_buffer_func(CMtrans_services svc, socket_conn_data_ptr
left = requested_len - iget;
while (left > 0) {
int lerrno;
read_len = left;
if (left > MAX_RW_COUNT) read_len = MAX_RW_COUNT;
iget = read(scd->fd, (char *) buffer + requested_len - left,
(int)left);
(int)read_len);
lerrno = errno;
if (iget == -1) {
if ((lerrno != EWOULDBLOCK) &&
Expand Down Expand Up @@ -1024,29 +1034,31 @@ int iovcnt;
}
#endif

#ifndef MAX_RW_COUNT
// Not actually defined outside the kernel as far as I know - GSE
#define MAX_RW_COUNT 0x7ffff000
#endif

extern ssize_t
libcmsockets_LTX_writev_func(CMtrans_services svc, socket_conn_data_ptr scd, void *iovs, int iovcnt, attr_list attrs);

static ssize_t long_writev(CMtrans_services svc, socket_conn_data_ptr scd, struct iovec* iov, int iovcnt, attr_list attrs, ssize_t left)
{
int cur_iov_base = 0;
int cur_iov_cnt = 0;
svc->trace_out(scd->sd->cm, "CMSocket doing long writev of %zd bytes on fd %d",
left, scd->fd);
while (left > 0) {
ssize_t write_size = 0;
int ret;
while (cur_iov_cnt + cur_iov_base < iovcnt) {
cur_iov_cnt++;
#define TRAIL_BUFFER 1024
if ((write_size + iov[cur_iov_cnt + cur_iov_base -1].iov_len) + TRAIL_BUFFER > MAX_RW_COUNT) {
struct iovec saved_iov_entry = iov[cur_iov_cnt + cur_iov_base -1];
ssize_t new_iov_len = MAX_RW_COUNT - write_size - TRAIL_BUFFER; // give some buffer
iov[cur_iov_cnt + cur_iov_base -1].iov_len = new_iov_len;
int ret = libcmsockets_LTX_writev_func(svc, scd, &iov[cur_iov_base], cur_iov_cnt, attrs);
if (ret != cur_iov_cnt) return ret + cur_iov_base;
svc->trace_out(scd->sd->cm, "CMSocket doing long intermediate writev of %d buffers on fd %d",
(int)new_iov_len, scd->fd);
ret = libcmsockets_LTX_writev_func(svc, scd, &iov[cur_iov_base], cur_iov_cnt, attrs);
if (ret != cur_iov_cnt) {
return ret + cur_iov_base;
}
iov[cur_iov_cnt + cur_iov_base -1].iov_len = saved_iov_entry.iov_len - new_iov_len;
iov[cur_iov_cnt + cur_iov_base -1].iov_base += new_iov_len;
write_size += new_iov_len;
Expand All @@ -1058,10 +1070,15 @@ static ssize_t long_writev(CMtrans_services svc, socket_conn_data_ptr scd, struc
write_size += iov[cur_iov_cnt + cur_iov_base -1].iov_len;
}
}
libcmsockets_LTX_writev_func(svc, scd, &iov[cur_iov_base], cur_iov_cnt, attrs);
svc->trace_out(scd->sd->cm, "CMSocket doing long final writev of %zd bytes on fd %d",
left, scd->fd);
ret = libcmsockets_LTX_writev_func(svc, scd, &iov[cur_iov_base], cur_iov_cnt, attrs);
if (ret != cur_iov_cnt) {
return ret + cur_iov_base;
}
left -= write_size;
}
return 0;
return iovcnt;
}

extern ssize_t
Expand Down

0 comments on commit 7f19762

Please sign in to comment.