From 0af50a57431b549fc0abbfa64b55940be7bb2d68 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 20 Dec 2024 15:52:26 -0700 Subject: [PATCH 01/21] Add shutdown_socket() --- core/federated/network/socket_common.c | 31 +++++++++++++++++++ .../core/federated/network/socket_common.h | 9 ++++++ 2 files changed, 40 insertions(+) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 20fff0585..4fd5f319b 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -410,3 +410,34 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* } } } + +int shutdown_socket(int* socket, bool read_before_closing) { + if (!read_before_closing) { + if (shutdown(*socket, SHUT_RDWR)) { + lf_print_log("On shut down TCP socket, received reply: %s", strerror(errno)); + return -1; + } + } else { + // According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket, + // the close should happen when receiving a 0 length message from the other end. + // Here, we just signal the other side that no further writes to the socket are + // forthcoming, which should result in the other end getting a zero-length reception. + if (shutdown(*socket, SHUT_WR)) { + lf_print_log("On shut down TCP socket, received reply: %s", strerror(errno)); + return -1; + } + + // Wait for the other end to send an EOF or a socket error to occur. + // Discard any incoming bytes. Normally, this read should return 0 because + // the federate is resigning and should itself invoke shutdown. + unsigned char buffer[10]; + while (read(*socket, buffer, 10) > 0) + ; + } + if (close(*socket)) { + lf_print_log("Error while closing socket: %s\n", strerror(errno)); + return -1; + } + *socket = -1; + return 0; +} diff --git a/include/core/federated/network/socket_common.h b/include/core/federated/network/socket_common.h index 71570c125..de22bf4b3 100644 --- a/include/core/federated/network/socket_common.h +++ b/include/core/federated/network/socket_common.h @@ -240,4 +240,13 @@ int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, char* format, ...); +/** + * @brief Gracefully shuts down and closes a socket, optionally reading until EOF. + * + * @param socket Pointer to the socket descriptor to shutdown and close. + * @param read_before_closing If true, read until EOF before closing the socket. + * @return int Returns 0 on success, -1 on failure (errno will indicate the error). + */ +int shutdown_socket(int* socket, bool read_before_closing); + #endif /* SOCKET_COMMON_H */ From e5129c574db2fa98834f8fdc82c6a6f056363155 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 20 Dec 2024 16:17:50 -0700 Subject: [PATCH 02/21] Use shutdown_socket() in rti_remote.c and federate.c --- core/federated/RTI/rti_remote.c | 51 ++++---------------------- core/federated/federate.c | 21 ++--------- core/federated/network/socket_common.c | 28 ++++++++------ 3 files changed, 28 insertions(+), 72 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index c5bd02955..34075d917 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -871,14 +871,7 @@ static void handle_federate_failed(federate_info_t* my_fed) { // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; - // According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket, - // the close should happen when receiving a 0 length message from the other end. - // Here, we just signal the other side that no further writes to the socket are - // forthcoming, which should result in the other end getting a zero-length reception. - shutdown(my_fed->socket, SHUT_RDWR); - - // We can now safely close the socket. - close(my_fed->socket); // from unistd.h + shutdown_socket(&my_fed->socket, false); // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep @@ -917,21 +910,7 @@ static void handle_federate_resign(federate_info_t* my_fed) { // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; - // According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket, - // the close should happen when receiving a 0 length message from the other end. - // Here, we just signal the other side that no further writes to the socket are - // forthcoming, which should result in the other end getting a zero-length reception. - shutdown(my_fed->socket, SHUT_WR); - - // Wait for the federate to send an EOF or a socket error to occur. - // Discard any incoming bytes. Normally, this read should return 0 because - // the federate is resigning and should itself invoke shutdown. - unsigned char buffer[10]; - while (read(my_fed->socket, buffer, 10) > 0) - ; - - // We can now safely close the socket. - close(my_fed->socket); // from unistd.h + shutdown_socket(&my_fed->socket, true); // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep @@ -1030,9 +1009,7 @@ void send_reject(int* socket_id, unsigned char error_code) { lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket."); } // Close the socket. - shutdown(*socket_id, SHUT_RDWR); - close(*socket_id); - *socket_id = -1; + shutdown_socket(socket_id, false); LF_MUTEX_UNLOCK(&rti_mutex); } @@ -1420,9 +1397,7 @@ void lf_connect_to_federates(int socket_descriptor) { if (!authenticate_federate(&socket_id)) { lf_print_warning("RTI failed to authenticate the incoming federate."); // Close the socket. - shutdown(socket_id, SHUT_RDWR); - close(socket_id); - socket_id = -1; + shutdown_socket(&socket_id, false); // Ignore the federate that failed authentication. i--; continue; @@ -1490,8 +1465,7 @@ void* respond_to_erroneous_connections(void* nothing) { lf_print_warning("RTI failed to write FEDERATION_ID_DOES_NOT_MATCH to erroneous incoming connection."); } // Close the socket. - shutdown(socket_id, SHUT_RDWR); - close(socket_id); + shutdown_socket(&socket_id, false); } return NULL; } @@ -1554,21 +1528,10 @@ void wait_for_federates(int socket_descriptor) { // Shutdown and close the socket that is listening for incoming connections // so that the accept() call in respond_to_erroneous_connections returns. // That thread should then check rti->all_federates_exited and it should exit. - if (shutdown(socket_descriptor, SHUT_RDWR)) { - LF_PRINT_LOG("On shut down TCP socket, received reply: %s", strerror(errno)); - } - // NOTE: In all common TCP/IP stacks, there is a time period, - // typically between 30 and 120 seconds, called the TIME_WAIT period, - // before the port is released after this close. This is because - // the OS is preventing another program from accidentally receiving - // duplicated packets intended for this program. - close(socket_descriptor); + shutdown_socket(&socket_descriptor, false); if (rti_remote->socket_descriptor_UDP > 0) { - if (shutdown(rti_remote->socket_descriptor_UDP, SHUT_RDWR)) { - LF_PRINT_LOG("On shut down UDP socket, received reply: %s", strerror(errno)); - } - close(rti_remote->socket_descriptor_UDP); + shutdown_socket(&rti_remote->socket_descriptor_UDP, false); } } diff --git a/core/federated/federate.c b/core/federated/federate.c index 4b12f8b53..dbdb691c1 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -417,14 +417,12 @@ static void close_inbound_socket(int fed_id, int flag) { if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) { if (flag >= 0) { if (flag > 0) { - shutdown(_fed.sockets_for_inbound_p2p_connections[fed_id], SHUT_RDWR); + shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false); } else { // Have received EOF from the other end. Send EOF to the other end. - shutdown(_fed.sockets_for_inbound_p2p_connections[fed_id], SHUT_WR); + shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], true); } } - close(_fed.sockets_for_inbound_p2p_connections[fed_id]); - _fed.sockets_for_inbound_p2p_connections[fed_id] = -1; } LF_MUTEX_UNLOCK(&socket_mutex); } @@ -837,20 +835,9 @@ static void close_outbound_socket(int fed_id, int flag) { if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) { // Close the socket by sending a FIN packet indicating that no further writes // are expected. Then read until we get an EOF indication. - if (flag >= 0) { - // SHUT_WR indicates no further outgoing messages. - shutdown(_fed.sockets_for_outbound_p2p_connections[fed_id], SHUT_WR); - if (flag > 0) { - // Have not received EOF yet. read until we get an EOF or error indication. - // This compensates for delayed ACKs and disabling of Nagles algorithm - // by delaying exiting until the shutdown is complete. - unsigned char message[32]; - while (read(_fed.sockets_for_outbound_p2p_connections[fed_id], &message, 32) > 0) - ; - } + if (flag > 0) { + shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[fed_id], true); } - close(_fed.sockets_for_outbound_p2p_connections[fed_id]); - _fed.sockets_for_outbound_p2p_connections[fed_id] = -1; } if (_lf_normal_termination) { LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 4fd5f319b..6d12a8657 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -313,10 +313,7 @@ int read_from_socket_close_on_error(int* socket, size_t num_bytes, unsigned char // Read failed. // Socket has probably been closed from the other side. // Shut down and close the socket from this side. - shutdown(*socket, SHUT_RDWR); - close(*socket); - // Mark the socket closed. - *socket = -1; + shutdown_socket(socket, false); return -1; } return 0; @@ -383,10 +380,7 @@ int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* // Write failed. // Socket has probably been closed from the other side. // Shut down and close the socket from this side. - shutdown(*socket, SHUT_RDWR); - close(*socket); - // Mark the socket closed. - *socket = -1; + shutdown_socket(socket, false); } return result; } @@ -414,7 +408,7 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* int shutdown_socket(int* socket, bool read_before_closing) { if (!read_before_closing) { if (shutdown(*socket, SHUT_RDWR)) { - lf_print_log("On shut down TCP socket, received reply: %s", strerror(errno)); + lf_print_warning("On shut down TCP socket, received reply: %s", strerror(errno)); return -1; } } else { @@ -422,20 +416,32 @@ int shutdown_socket(int* socket, bool read_before_closing) { // the close should happen when receiving a 0 length message from the other end. // Here, we just signal the other side that no further writes to the socket are // forthcoming, which should result in the other end getting a zero-length reception. + + // Close the socket by sending a FIN packet indicating that no further writes + // are expected. Then read until we get an EOF indication. if (shutdown(*socket, SHUT_WR)) { - lf_print_log("On shut down TCP socket, received reply: %s", strerror(errno)); + lf_print_warning("On shut down socket, received reply: %s", strerror(errno)); return -1; } // Wait for the other end to send an EOF or a socket error to occur. // Discard any incoming bytes. Normally, this read should return 0 because // the federate is resigning and should itself invoke shutdown. + + // Have not received EOF yet. read until we get an EOF or error indication. + // This compensates for delayed ACKs and disabling of Nagles algorithm + // by delaying exiting until the shutdown is complete. unsigned char buffer[10]; while (read(*socket, buffer, 10) > 0) ; } + // NOTE: In all common TCP/IP stacks, there is a time period, + // typically between 30 and 120 seconds, called the TIME_WAIT period, + // before the port is released after this close. This is because + // the OS is preventing another program from accidentally receiving + // duplicated packets intended for this program. if (close(*socket)) { - lf_print_log("Error while closing socket: %s\n", strerror(errno)); + lf_print_warning("Error while closing socket: %s\n", strerror(errno)); return -1; } *socket = -1; From 1b00ed37f915cb97ac33c6428a4045fc0d6ea8b7 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 20 Dec 2024 16:26:50 -0700 Subject: [PATCH 03/21] Fix comments. --- core/federated/network/socket_common.c | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 6d12a8657..330db1b27 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -412,25 +412,19 @@ int shutdown_socket(int* socket, bool read_before_closing) { return -1; } } else { - // According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket, - // the close should happen when receiving a 0 length message from the other end. - // Here, we just signal the other side that no further writes to the socket are - // forthcoming, which should result in the other end getting a zero-length reception. - - // Close the socket by sending a FIN packet indicating that no further writes - // are expected. Then read until we get an EOF indication. + // Signal the other side that no further writes are expected by sending a FIN packet. + // This indicates the write direction is closed. For more details, refer to: + // https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket if (shutdown(*socket, SHUT_WR)) { - lf_print_warning("On shut down socket, received reply: %s", strerror(errno)); + lf_print_warning("Failed to shut down socket: %s", strerror(errno)); return -1; } - // Wait for the other end to send an EOF or a socket error to occur. - // Discard any incoming bytes. Normally, this read should return 0 because - // the federate is resigning and should itself invoke shutdown. - - // Have not received EOF yet. read until we get an EOF or error indication. - // This compensates for delayed ACKs and disabling of Nagles algorithm - // by delaying exiting until the shutdown is complete. + // Wait for the other side to send an EOF or encounter a socket error. + // Discard any incoming bytes. Normally, this read should return 0, indicating the peer has also closed the + // connection. + // This compensates for delayed ACKs and scenarios where Nagle's algorithm is disabled, ensuring the shutdown + // completes gracefully. unsigned char buffer[10]; while (read(*socket, buffer, 10) > 0) ; From de2fbdeed88173fe3d2d761c497ce23be3a6a6d2 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 20 Dec 2024 17:11:55 -0700 Subject: [PATCH 04/21] Fix port type && change to print_log on shutdown failures && Refactor close_inbound_socket() --- core/federated/federate.c | 25 ++++++++----------------- core/federated/network/socket_common.c | 8 ++++---- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index dbdb691c1..2fcadda50 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -403,26 +403,15 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen /** * Close the socket that receives incoming messages from the - * specified federate ID. This function should be called when a read - * of incoming socket fails or when an EOF is received. - * It can also be called when the receiving end wants to stop communication, - * in which case, flag should be 1. + * specified federate ID. * * @param fed_id The ID of the peer federate sending messages to this * federate. - * @param flag 0 if an EOF was received, -1 if a socket error occurred, 1 otherwise. */ -static void close_inbound_socket(int fed_id, int flag) { +static void close_inbound_socket(int fed_id) { LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) { - if (flag >= 0) { - if (flag > 0) { - shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false); - } else { - // Have received EOF from the other end. Send EOF to the other end. - shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], true); - } - } + shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false); } LF_MUTEX_UNLOCK(&socket_mutex); } @@ -663,7 +652,7 @@ static int handle_tagged_message(int* socket, int fed_id) { env->current_tag.time - start_time, env->current_tag.microstep, intended_tag.time - start_time, intended_tag.microstep); // Close socket, reading any incoming data and discarding it. - close_inbound_socket(fed_id, 1); + close_inbound_socket(fed_id); } else { // Need to use intended_tag here, not actual_tag, so that STP violations are detected. // It will become actual_tag (that is when the reactions will be invoked). @@ -1640,7 +1629,7 @@ void lf_terminate_execution(environment_t* env) { LF_PRINT_DEBUG("Closing incoming P2P sockets."); // Close any incoming P2P sockets that are still open. for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - close_inbound_socket(i, 1); + close_inbound_socket(i); // Ignore errors. Mark the socket closed. _fed.sockets_for_inbound_p2p_connections[i] = -1; } @@ -1930,9 +1919,11 @@ void lf_connect_to_rti(const char* hostname, int port) { void lf_create_server(int specified_port) { assert(specified_port <= UINT16_MAX && specified_port >= 0); - if (create_TCP_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, false)) { + uint16_t port; + if (create_TCP_server(specified_port, &_fed.server_socket, &port, false)) { lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno)); }; + _fed.server_port = (int)port; LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port); // Send the server port number to the RTI diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 330db1b27..92bfec84b 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -155,7 +155,7 @@ static int create_server(uint16_t port, int* final_socket, uint16_t* final_port, return -1; } set_socket_timeout_option(socket_descriptor, &timeout_time); - int used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry); + uint16_t used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry); if (sock_type == 0) { // Enable listening for socket connections. // The second argument is the maximum number of queued socket requests, @@ -408,7 +408,7 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* int shutdown_socket(int* socket, bool read_before_closing) { if (!read_before_closing) { if (shutdown(*socket, SHUT_RDWR)) { - lf_print_warning("On shut down TCP socket, received reply: %s", strerror(errno)); + lf_print_log("On shutdown socket, received reply: %s", strerror(errno)); return -1; } } else { @@ -416,7 +416,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { // This indicates the write direction is closed. For more details, refer to: // https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket if (shutdown(*socket, SHUT_WR)) { - lf_print_warning("Failed to shut down socket: %s", strerror(errno)); + lf_print_log("Failed to shutdown socket: %s", strerror(errno)); return -1; } @@ -435,7 +435,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { // the OS is preventing another program from accidentally receiving // duplicated packets intended for this program. if (close(*socket)) { - lf_print_warning("Error while closing socket: %s\n", strerror(errno)); + lf_print_log("Error while closing socket: %s\n", strerror(errno)); return -1; } *socket = -1; From 9ea9f1d8484e92f1e9b673d99b645f89e49d1d0f Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 20 Dec 2024 17:16:52 -0700 Subject: [PATCH 05/21] Refactor close_outbound_socket(). --- core/federated/federate.c | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 2fcadda50..e7face46f 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -814,22 +814,22 @@ static void* listen_to_federates(void* _args) { * if _lf_normal_termination is true and otherwise proceeds without the lock. * @param fed_id The ID of the peer federate receiving messages from this * federate, or -1 if the RTI (centralized coordination). - * @param flag 0 if the socket has received EOF, 1 if not, -1 if abnormal termination. */ -static void close_outbound_socket(int fed_id, int flag) { +static void close_outbound_socket(int fed_id) { assert(fed_id >= 0 && fed_id < NUMBER_OF_FEDERATES); + // Close outbound connections, in case they have not closed themselves. + // This will result in EOF being sent to the remote federate, except for + // abnormal termination, in which case it will just close the socket. if (_lf_normal_termination) { LF_MUTEX_LOCK(&lf_outbound_socket_mutex); - } - if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) { - // Close the socket by sending a FIN packet indicating that no further writes - // are expected. Then read until we get an EOF indication. - if (flag > 0) { + if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) { + // Close the socket by sending a FIN packet indicating that no further writes + // are expected. Then read until we get an EOF indication. shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[fed_id], true); } - } - if (_lf_normal_termination) { LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); + } else { + shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[fed_id], false); } } @@ -1643,8 +1643,7 @@ void lf_terminate_execution(environment_t* env) { // Close outbound connections, in case they have not closed themselves. // This will result in EOF being sent to the remote federate, except for // abnormal termination, in which case it will just close the socket. - int flag = _lf_normal_termination ? 1 : -1; - close_outbound_socket(i, flag); + close_outbound_socket(i); } LF_PRINT_DEBUG("Waiting for inbound p2p socket listener threads."); From 996896c05bad0d601d87f02a10b8be6cb19b305a Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 20 Dec 2024 18:23:59 -0700 Subject: [PATCH 06/21] Add comment to shutdown_socket function. --- include/core/federated/network/socket_common.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/core/federated/network/socket_common.h b/include/core/federated/network/socket_common.h index de22bf4b3..14452b65a 100644 --- a/include/core/federated/network/socket_common.h +++ b/include/core/federated/network/socket_common.h @@ -242,7 +242,9 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* /** * @brief Gracefully shuts down and closes a socket, optionally reading until EOF. - * + * Shutdown and close the socket. If read_before_closing is false, it just immediately calls shutdown() with SHUT_RDWR + * and close(). If read_before_closing is true, it calls shutdown with SHUT_WR, only disallowing further writing. Then, + * it calls read() until EOF is received, and discards all received bytes. * @param socket Pointer to the socket descriptor to shutdown and close. * @param read_before_closing If true, read until EOF before closing the socket. * @return int Returns 0 on success, -1 on failure (errno will indicate the error). From e3b0ea1ffcdb85421e50a194034bc07c8cb6f178 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 20 Dec 2024 20:56:20 -0700 Subject: [PATCH 07/21] Add commnets. --- core/federated/RTI/rti_remote.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index cc654e3e0..4e5778b2e 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1008,7 +1008,7 @@ void send_reject(int* socket_id, unsigned char error_code) { if (write_to_socket(*socket_id, 2, response)) { lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket."); } - // Close the socket. + // Close the socket without reading until EOF. shutdown_socket(socket_id, false); LF_MUTEX_UNLOCK(&rti_mutex); } @@ -1394,7 +1394,7 @@ void lf_connect_to_federates(int socket_descriptor) { if (rti_remote->authentication_enabled) { if (!authenticate_federate(&socket_id)) { lf_print_warning("RTI failed to authenticate the incoming federate."); - // Close the socket. + // Close the socket without reading until EOF. shutdown_socket(&socket_id, false); // Ignore the federate that failed authentication. i--; @@ -1462,7 +1462,7 @@ void* respond_to_erroneous_connections(void* nothing) { if (write_to_socket(socket_id, 2, response)) { lf_print_warning("RTI failed to write FEDERATION_ID_DOES_NOT_MATCH to erroneous incoming connection."); } - // Close the socket. + // Close the socket without reading until EOF. shutdown_socket(&socket_id, false); } return NULL; From 30a94469b76205fc65f04bba36e4f1717c80e37f Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 21 Dec 2024 14:27:51 -0700 Subject: [PATCH 08/21] Properly replace close() to shutdown_socket() --- core/federated/RTI/rti_remote.c | 14 ++++++-------- core/federated/federate.c | 6 +++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 4e5778b2e..63d266f05 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -939,7 +939,11 @@ void* federate_info_thread_TCP(void* fed) { // Socket is closed lf_print_error("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id); my_fed->enclave.state = NOT_CONNECTED; - my_fed->socket = -1; + // Nothing more to do. Close the socket and exit. + // Prevent multiple threads from closing the same socket at the same time. + LF_MUTEX_LOCK(&rti_mutex); + shutdown_socket(my_fed->socket, false); // from unistd.h + LF_MUTEX_UNLOCK(&rti_mutex); // FIXME: We need better error handling here, but do not stop execution here. break; } @@ -989,12 +993,6 @@ void* federate_info_thread_TCP(void* fed) { } } } - - // Nothing more to do. Close the socket and exit. - // Prevent multiple threads from closing the same socket at the same time. - LF_MUTEX_LOCK(&rti_mutex); - close(my_fed->socket); // from unistd.h - LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } @@ -1445,7 +1443,7 @@ void* respond_to_erroneous_connections(void* nothing) { while (true) { // Wait for an incoming connection request. // The following will block until either a federate attempts to connect - // or close(rti->socket_descriptor_TCP) is called. + // or shutdown_socket(rti->socket_descriptor_TCP) is called. int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1); if (socket_id < 0) { return NULL; diff --git a/core/federated/federate.c b/core/federated/federate.c index 13da24aa0..3b135ea9f 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1993,7 +1993,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Ignore errors on this response. write_to_socket(socket_id, 2, response); } - close(socket_id); + shutdown_socket(socket_id, false); continue; } @@ -2013,7 +2013,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Ignore errors on this response. write_to_socket(socket_id, 2, response); } - close(socket_id); + shutdown_socket(socket_id, false); continue; } @@ -2051,7 +2051,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Failed to create a listening thread. LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[remote_fed_id] != -1) { - close(socket_id); + shutdown_socket(socket_id, false); _fed.sockets_for_inbound_p2p_connections[remote_fed_id] = -1; } LF_MUTEX_UNLOCK(&socket_mutex); From 276d7b4d1e635e1f2c5360d68155389717d281aa Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 21 Dec 2024 14:32:30 -0700 Subject: [PATCH 09/21] Minor fix. --- core/federated/RTI/rti_remote.c | 4 ++-- core/federated/federate.c | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 63d266f05..42d2be3b7 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -942,7 +942,7 @@ void* federate_info_thread_TCP(void* fed) { // Nothing more to do. Close the socket and exit. // Prevent multiple threads from closing the same socket at the same time. LF_MUTEX_LOCK(&rti_mutex); - shutdown_socket(my_fed->socket, false); // from unistd.h + shutdown_socket(&my_fed->socket, false); // from unistd.h LF_MUTEX_UNLOCK(&rti_mutex); // FIXME: We need better error handling here, but do not stop execution here. break; @@ -1007,7 +1007,7 @@ void send_reject(int* socket_id, unsigned char error_code) { lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket."); } // Close the socket without reading until EOF. - shutdown_socket(socket_id, false); + shutdown_socket(&socket_id, false); LF_MUTEX_UNLOCK(&rti_mutex); } diff --git a/core/federated/federate.c b/core/federated/federate.c index 3b135ea9f..5af2b1f51 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1993,7 +1993,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Ignore errors on this response. write_to_socket(socket_id, 2, response); } - shutdown_socket(socket_id, false); + shutdown_socket(&socket_id, false); continue; } @@ -2013,7 +2013,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Ignore errors on this response. write_to_socket(socket_id, 2, response); } - shutdown_socket(socket_id, false); + shutdown_socket(&socket_id, false); continue; } @@ -2051,7 +2051,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Failed to create a listening thread. LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[remote_fed_id] != -1) { - shutdown_socket(socket_id, false); + shutdown_socket(&socket_id, false); _fed.sockets_for_inbound_p2p_connections[remote_fed_id] = -1; } LF_MUTEX_UNLOCK(&socket_mutex); From 93adf2ee3e8c0cedb2dd04656c05b91b144ce6e4 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 21 Dec 2024 14:57:53 -0700 Subject: [PATCH 10/21] Minor fix. --- core/federated/RTI/rti_remote.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 42d2be3b7..8e49c4c88 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1007,7 +1007,7 @@ void send_reject(int* socket_id, unsigned char error_code) { lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket."); } // Close the socket without reading until EOF. - shutdown_socket(&socket_id, false); + shutdown_socket(socket_id, false); LF_MUTEX_UNLOCK(&rti_mutex); } From d27c7430bcef0c37ee382aa3b48aa1c84cd0ef93 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 21 Dec 2024 14:59:10 -0700 Subject: [PATCH 11/21] Properly close federate's socket connected to the RTI. --- core/federated/federate.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 5af2b1f51..c277f54bb 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1493,7 +1493,7 @@ static void* listen_to_rti_TCP(void* args) { lf_print_error("Socket connection to the RTI was closed by the RTI without" " properly sending an EOF first. Considering this a soft error."); // FIXME: If this happens, possibly a new RTI must be elected. - _fed.socket_TCP_RTI = -1; + shutdown_socket(_fed.socket_TCP_RTI, false); return NULL; } else { lf_print_error("Socket connection to the RTI has been broken with error %d: %s." @@ -1501,13 +1501,13 @@ static void* listen_to_rti_TCP(void* args) { " Considering this a soft error.", errno, strerror(errno)); // FIXME: If this happens, possibly a new RTI must be elected. - _fed.socket_TCP_RTI = -1; + shutdown_socket(_fed.socket_TCP_RTI, false); return NULL; } } else if (read_failed > 0) { // EOF received. lf_print("Connection to the RTI closed with an EOF."); - _fed.socket_TCP_RTI = -1; + shutdown_socket(_fed.socket_TCP_RTI, false); return NULL; } switch (buffer[0]) { From ef3345b9a7809616a40939f214ab06a5bb9ec61a Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 21 Dec 2024 15:00:37 -0700 Subject: [PATCH 12/21] Minor fix. --- core/federated/federate.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index c277f54bb..18bd244f5 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1493,7 +1493,7 @@ static void* listen_to_rti_TCP(void* args) { lf_print_error("Socket connection to the RTI was closed by the RTI without" " properly sending an EOF first. Considering this a soft error."); // FIXME: If this happens, possibly a new RTI must be elected. - shutdown_socket(_fed.socket_TCP_RTI, false); + shutdown_socket(&_fed.socket_TCP_RTI, false); return NULL; } else { lf_print_error("Socket connection to the RTI has been broken with error %d: %s." @@ -1501,13 +1501,13 @@ static void* listen_to_rti_TCP(void* args) { " Considering this a soft error.", errno, strerror(errno)); // FIXME: If this happens, possibly a new RTI must be elected. - shutdown_socket(_fed.socket_TCP_RTI, false); + shutdown_socket(&_fed.socket_TCP_RTI, false); return NULL; } } else if (read_failed > 0) { // EOF received. lf_print("Connection to the RTI closed with an EOF."); - shutdown_socket(_fed.socket_TCP_RTI, false); + shutdown_socket(&_fed.socket_TCP_RTI, false); return NULL; } switch (buffer[0]) { From c0d59dbb595eb13b6a40dbe157ba0c66f63ba3e2 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Mon, 13 Jan 2025 11:05:05 -0700 Subject: [PATCH 13/21] Fix merge error. --- core/federated/federate.c | 1 - 1 file changed, 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 05339f331..51b9c2602 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1920,7 +1920,6 @@ void lf_create_server(int specified_port) { if (create_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, TCP, false)) { lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno)); }; - _fed.server_port = (int)port; LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port); // Send the server port number to the RTI From 96b146affca7326386c07bec514ee39acc859924 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Wed, 15 Jan 2025 17:07:24 -0700 Subject: [PATCH 14/21] Fix rti_socket check. --- core/federated/network/socket_common.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 587318a2f..a449fdc71 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -203,7 +203,7 @@ int accept_socket(int socket, int rti_socket) { lf_print_error_system_failure("Firewall permissions prohibit connection."); } else { // For the federates, it should check if the rti_socket is still open, before retrying accept(). - if (rti_socket == -1) { + if (rti_socket != -1) { if (check_socket_closed(rti_socket)) { break; } From e93f266b6a88d1d9b7d6d7c0f39d1ad93e03693d Mon Sep 17 00:00:00 2001 From: Dongha Kim <74869052+Jakio815@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:54:18 -0700 Subject: [PATCH 15/21] Add socket == -1 checking. --- core/federated/network/socket_common.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index a449fdc71..d71438217 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -398,6 +398,10 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* } int shutdown_socket(int* socket, bool read_before_closing) { + if (*socket == -1) { + lf_print_log("Socket is already closed."); + return 0; + } if (!read_before_closing) { if (shutdown(*socket, SHUT_RDWR)) { lf_print_log("On shutdown socket, received reply: %s", strerror(errno)); From 0f89b4c5fcc582479145a26fbe4b610f650dfb00 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 23 Jan 2025 18:02:52 -0700 Subject: [PATCH 16/21] Remove returning -1 on shutdown fail. --- core/federated/network/socket_common.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index a449fdc71..acd5594f8 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -401,7 +401,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { if (!read_before_closing) { if (shutdown(*socket, SHUT_RDWR)) { lf_print_log("On shutdown socket, received reply: %s", strerror(errno)); - return -1; + goto close_socket; // Try closing socket. } } else { // Signal the other side that no further writes are expected by sending a FIN packet. @@ -409,7 +409,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { // https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket if (shutdown(*socket, SHUT_WR)) { lf_print_log("Failed to shutdown socket: %s", strerror(errno)); - return -1; + goto close_socket; // Try closing socket. } // Wait for the other side to send an EOF or encounter a socket error. @@ -421,6 +421,8 @@ int shutdown_socket(int* socket, bool read_before_closing) { while (read(*socket, buffer, 10) > 0) ; } + +close_socket: // Label to jump to the closing part of the function // NOTE: In all common TCP/IP stacks, there is a time period, // typically between 30 and 120 seconds, called the TIME_WAIT period, // before the port is released after this close. This is because From 62354ab5e4d2de6a6e2aa13ec2ade1415de05784 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 23 Jan 2025 18:23:59 -0700 Subject: [PATCH 17/21] Formatting. --- core/federated/network/socket_common.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index e4a636432..e2b20660f 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -405,7 +405,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { if (!read_before_closing) { if (shutdown(*socket, SHUT_RDWR)) { lf_print_log("On shutdown socket, received reply: %s", strerror(errno)); - goto close_socket; // Try closing socket. + goto close_socket; // Try closing socket. } } else { // Signal the other side that no further writes are expected by sending a FIN packet. @@ -413,7 +413,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { // https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket if (shutdown(*socket, SHUT_WR)) { lf_print_log("Failed to shutdown socket: %s", strerror(errno)); - goto close_socket; // Try closing socket. + goto close_socket; // Try closing socket. } // Wait for the other side to send an EOF or encounter a socket error. @@ -426,7 +426,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { ; } -close_socket: // Label to jump to the closing part of the function +close_socket: // Label to jump to the closing part of the function // NOTE: In all common TCP/IP stacks, there is a time period, // typically between 30 and 120 seconds, called the TIME_WAIT period, // before the port is released after this close. This is because From 1edd1e3cabe6a1f5b9b091a51f321046ce074a5b Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 24 Jan 2025 15:00:49 -0700 Subject: [PATCH 18/21] Fix comments. --- include/core/federated/network/socket_common.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/core/federated/network/socket_common.h b/include/core/federated/network/socket_common.h index c273b4b74..87bac7511 100644 --- a/include/core/federated/network/socket_common.h +++ b/include/core/federated/network/socket_common.h @@ -241,13 +241,12 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* char* format, ...); /** - * @brief Gracefully shuts down and closes a socket, optionally reading until EOF. * Shutdown and close the socket. If read_before_closing is false, it just immediately calls shutdown() with SHUT_RDWR * and close(). If read_before_closing is true, it calls shutdown with SHUT_WR, only disallowing further writing. Then, * it calls read() until EOF is received, and discards all received bytes. * @param socket Pointer to the socket descriptor to shutdown and close. * @param read_before_closing If true, read until EOF before closing the socket. - * @return int Returns 0 on success, -1 on failure (errno will indicate the error). + * @return int 0 for success and -1 for an error. */ int shutdown_socket(int* socket, bool read_before_closing); From 8334783933ce07023675f2cc238c3c85fe45bb2d Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 24 Jan 2025 19:43:22 -0700 Subject: [PATCH 19/21] Fix to return -1 on failure of tagged_message from federate. --- core/federated/federate.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 51b9c2602..9da6a2ae1 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -411,7 +411,7 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen static void close_inbound_socket(int fed_id) { LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) { - shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false); + shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], true); } LF_MUTEX_UNLOCK(&socket_mutex); } @@ -653,6 +653,8 @@ static int handle_tagged_message(int* socket, int fed_id) { intended_tag.microstep); // Close socket, reading any incoming data and discarding it. close_inbound_socket(fed_id); + LF_MUTEX_UNLOCK(&env->mutex); + return -1; } else { // Need to use intended_tag here, not actual_tag, so that STP violations are detected. // It will become actual_tag (that is when the reactions will be invoked). From 048a5b3b3991d7ce1c587b2b63b48d6a65937ea7 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 24 Jan 2025 20:27:01 -0700 Subject: [PATCH 20/21] Fix missing comment. --- core/federated/federate.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 9da6a2ae1..c60c7408f 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -403,7 +403,9 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen /** * Close the socket that receives incoming messages from the - * specified federate ID. + * specified federate ID. This function should be called when a read + * of incoming socket fails or when an EOF is received. + * It can also be called when the receiving end wants to stop communication. * * @param fed_id The ID of the peer federate sending messages to this * federate. From 5d348586aa77fd13df6b0a856c2b11eed5f3de9d Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 24 Jan 2025 22:03:02 -0700 Subject: [PATCH 21/21] Revert close_inbound_socket to read false. --- core/federated/federate.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index c60c7408f..a30f733c8 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -413,7 +413,7 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen static void close_inbound_socket(int fed_id) { LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) { - shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], true); + shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false); } LF_MUTEX_UNLOCK(&socket_mutex); }