diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 7f3fd6f15..91e2fe7dc 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -900,14 +900,7 @@ static void handle_federate_failed(federate_info_t* my_fed) { // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; - // According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket, - // the close should happen when receiving a 0 length message from the other end. - // Here, we just signal the other side that no further writes to the socket are - // forthcoming, which should result in the other end getting a zero-length reception. - shutdown(my_fed->socket, SHUT_RDWR); - - // We can now safely close the socket. - close(my_fed->socket); // from unistd.h + shutdown_socket(&my_fed->socket, false); // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep @@ -946,21 +939,7 @@ static void handle_federate_resign(federate_info_t* my_fed) { // Indicate that there will no further events from this federate. my_fed->enclave.next_event = FOREVER_TAG; - // According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket, - // the close should happen when receiving a 0 length message from the other end. - // Here, we just signal the other side that no further writes to the socket are - // forthcoming, which should result in the other end getting a zero-length reception. - shutdown(my_fed->socket, SHUT_WR); - - // Wait for the federate to send an EOF or a socket error to occur. - // Discard any incoming bytes. Normally, this read should return 0 because - // the federate is resigning and should itself invoke shutdown. - unsigned char buffer[10]; - while (read(my_fed->socket, buffer, 10) > 0) - ; - - // We can now safely close the socket. - close(my_fed->socket); // from unistd.h + shutdown_socket(&my_fed->socket, true); // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep @@ -989,7 +968,11 @@ void* federate_info_thread_TCP(void* fed) { // Socket is closed lf_print_error("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id); my_fed->enclave.state = NOT_CONNECTED; - my_fed->socket = -1; + // Nothing more to do. Close the socket and exit. + // Prevent multiple threads from closing the same socket at the same time. + LF_MUTEX_LOCK(&rti_mutex); + shutdown_socket(&my_fed->socket, false); // from unistd.h + LF_MUTEX_UNLOCK(&rti_mutex); // FIXME: We need better error handling here, but do not stop execution here. break; } @@ -1039,12 +1022,6 @@ void* federate_info_thread_TCP(void* fed) { } } } - - // Nothing more to do. Close the socket and exit. - // Prevent multiple threads from closing the same socket at the same time. - LF_MUTEX_LOCK(&rti_mutex); - close(my_fed->socket); // from unistd.h - LF_MUTEX_UNLOCK(&rti_mutex); return NULL; } @@ -1058,10 +1035,8 @@ void send_reject(int* socket_id, unsigned char error_code) { if (write_to_socket(*socket_id, 2, response)) { lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket."); } - // Close the socket. - shutdown(*socket_id, SHUT_RDWR); - close(*socket_id); - *socket_id = -1; + // Close the socket without reading until EOF. + shutdown_socket(socket_id, false); LF_MUTEX_UNLOCK(&rti_mutex); } @@ -1444,10 +1419,8 @@ void lf_connect_to_federates(int socket_descriptor) { if (rti_remote->authentication_enabled) { if (!authenticate_federate(&socket_id)) { lf_print_warning("RTI failed to authenticate the incoming federate."); - // Close the socket. - shutdown(socket_id, SHUT_RDWR); - close(socket_id); - socket_id = -1; + // Close the socket without reading until EOF. + shutdown_socket(&socket_id, false); // Ignore the federate that failed authentication. i--; continue; @@ -1497,7 +1470,7 @@ void* respond_to_erroneous_connections(void* nothing) { while (true) { // Wait for an incoming connection request. // The following will block until either a federate attempts to connect - // or close(rti->socket_descriptor_TCP) is called. + // or shutdown_socket(rti->socket_descriptor_TCP) is called. int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1); if (socket_id < 0) { return NULL; @@ -1514,9 +1487,8 @@ void* respond_to_erroneous_connections(void* nothing) { if (write_to_socket(socket_id, 2, response)) { lf_print_warning("RTI failed to write FEDERATION_ID_DOES_NOT_MATCH to erroneous incoming connection."); } - // Close the socket. - shutdown(socket_id, SHUT_RDWR); - close(socket_id); + // Close the socket without reading until EOF. + shutdown_socket(&socket_id, false); } return NULL; } @@ -1579,21 +1551,10 @@ void wait_for_federates(int socket_descriptor) { // Shutdown and close the socket that is listening for incoming connections // so that the accept() call in respond_to_erroneous_connections returns. // That thread should then check rti->all_federates_exited and it should exit. - if (shutdown(socket_descriptor, SHUT_RDWR)) { - LF_PRINT_LOG("On shut down TCP socket, received reply: %s", strerror(errno)); - } - // NOTE: In all common TCP/IP stacks, there is a time period, - // typically between 30 and 120 seconds, called the TIME_WAIT period, - // before the port is released after this close. This is because - // the OS is preventing another program from accidentally receiving - // duplicated packets intended for this program. - close(socket_descriptor); + shutdown_socket(&socket_descriptor, false); if (rti_remote->socket_descriptor_UDP > 0) { - if (shutdown(rti_remote->socket_descriptor_UDP, SHUT_RDWR)) { - LF_PRINT_LOG("On shut down UDP socket, received reply: %s", strerror(errno)); - } - close(rti_remote->socket_descriptor_UDP); + shutdown_socket(&rti_remote->socket_descriptor_UDP, false); } } diff --git a/core/federated/federate.c b/core/federated/federate.c index 5ec66cdca..7eafbcd01 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -408,26 +408,15 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen * Close the socket that receives incoming messages from the * specified federate ID. This function should be called when a read * of incoming socket fails or when an EOF is received. - * It can also be called when the receiving end wants to stop communication, - * in which case, flag should be 1. + * It can also be called when the receiving end wants to stop communication. * * @param fed_id The ID of the peer federate sending messages to this * federate. - * @param flag 0 if an EOF was received, -1 if a socket error occurred, 1 otherwise. */ -static void close_inbound_socket(int fed_id, int flag) { +static void close_inbound_socket(int fed_id) { LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) { - if (flag >= 0) { - if (flag > 0) { - shutdown(_fed.sockets_for_inbound_p2p_connections[fed_id], SHUT_RDWR); - } else { - // Have received EOF from the other end. Send EOF to the other end. - shutdown(_fed.sockets_for_inbound_p2p_connections[fed_id], SHUT_WR); - } - } - close(_fed.sockets_for_inbound_p2p_connections[fed_id]); - _fed.sockets_for_inbound_p2p_connections[fed_id] = -1; + shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false); } LF_MUTEX_UNLOCK(&socket_mutex); } @@ -668,7 +657,9 @@ static int handle_tagged_message(int* socket, int fed_id) { env->current_tag.time - start_time, env->current_tag.microstep, intended_tag.time - start_time, intended_tag.microstep); // Close socket, reading any incoming data and discarding it. - close_inbound_socket(fed_id, 1); + close_inbound_socket(fed_id); + LF_MUTEX_UNLOCK(&env->mutex); + return -1; } else { // Need to use intended_tag here, not actual_tag, so that STP violations are detected. // It will become actual_tag (that is when the reactions will be invoked). @@ -830,33 +821,22 @@ static void* listen_to_federates(void* _args) { * if _lf_normal_termination is true and otherwise proceeds without the lock. * @param fed_id The ID of the peer federate receiving messages from this * federate, or -1 if the RTI (centralized coordination). - * @param flag 0 if the socket has received EOF, 1 if not, -1 if abnormal termination. */ -static void close_outbound_socket(int fed_id, int flag) { +static void close_outbound_socket(int fed_id) { assert(fed_id >= 0 && fed_id < NUMBER_OF_FEDERATES); + // Close outbound connections, in case they have not closed themselves. + // This will result in EOF being sent to the remote federate, except for + // abnormal termination, in which case it will just close the socket. if (_lf_normal_termination) { LF_MUTEX_LOCK(&lf_outbound_socket_mutex); - } - if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) { - // Close the socket by sending a FIN packet indicating that no further writes - // are expected. Then read until we get an EOF indication. - if (flag >= 0) { - // SHUT_WR indicates no further outgoing messages. - shutdown(_fed.sockets_for_outbound_p2p_connections[fed_id], SHUT_WR); - if (flag > 0) { - // Have not received EOF yet. read until we get an EOF or error indication. - // This compensates for delayed ACKs and disabling of Nagles algorithm - // by delaying exiting until the shutdown is complete. - unsigned char message[32]; - while (read(_fed.sockets_for_outbound_p2p_connections[fed_id], &message, 32) > 0) - ; - } + if (_fed.sockets_for_outbound_p2p_connections[fed_id] >= 0) { + // Close the socket by sending a FIN packet indicating that no further writes + // are expected. Then read until we get an EOF indication. + shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[fed_id], true); } - close(_fed.sockets_for_outbound_p2p_connections[fed_id]); - _fed.sockets_for_outbound_p2p_connections[fed_id] = -1; - } - if (_lf_normal_termination) { LF_MUTEX_UNLOCK(&lf_outbound_socket_mutex); + } else { + shutdown_socket(&_fed.sockets_for_outbound_p2p_connections[fed_id], false); } } @@ -1550,7 +1530,7 @@ static void* listen_to_rti_TCP(void* args) { lf_print_error("Socket connection to the RTI was closed by the RTI without" " properly sending an EOF first. Considering this a soft error."); // FIXME: If this happens, possibly a new RTI must be elected. - _fed.socket_TCP_RTI = -1; + shutdown_socket(&_fed.socket_TCP_RTI, false); return NULL; } else { lf_print_error("Socket connection to the RTI has been broken with error %d: %s." @@ -1558,13 +1538,13 @@ static void* listen_to_rti_TCP(void* args) { " Considering this a soft error.", errno, strerror(errno)); // FIXME: If this happens, possibly a new RTI must be elected. - _fed.socket_TCP_RTI = -1; + shutdown_socket(&_fed.socket_TCP_RTI, false); return NULL; } } else if (read_failed > 0) { // EOF received. lf_print("Connection to the RTI closed with an EOF."); - _fed.socket_TCP_RTI = -1; + shutdown_socket(&_fed.socket_TCP_RTI, false); return NULL; } switch (buffer[0]) { @@ -1689,7 +1669,7 @@ void lf_terminate_execution(environment_t* env) { LF_PRINT_DEBUG("Closing incoming P2P sockets."); // Close any incoming P2P sockets that are still open. for (int i = 0; i < NUMBER_OF_FEDERATES; i++) { - close_inbound_socket(i, 1); + close_inbound_socket(i); // Ignore errors. Mark the socket closed. _fed.sockets_for_inbound_p2p_connections[i] = -1; } @@ -1703,8 +1683,7 @@ void lf_terminate_execution(environment_t* env) { // Close outbound connections, in case they have not closed themselves. // This will result in EOF being sent to the remote federate, except for // abnormal termination, in which case it will just close the socket. - int flag = _lf_normal_termination ? 1 : -1; - close_outbound_socket(i, flag); + close_outbound_socket(i); } LF_PRINT_DEBUG("Waiting for inbound p2p socket listener threads."); @@ -2055,7 +2034,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Ignore errors on this response. write_to_socket(socket_id, 2, response); } - close(socket_id); + shutdown_socket(&socket_id, false); continue; } @@ -2075,7 +2054,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Ignore errors on this response. write_to_socket(socket_id, 2, response); } - close(socket_id); + shutdown_socket(&socket_id, false); continue; } @@ -2113,7 +2092,7 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { // Failed to create a listening thread. LF_MUTEX_LOCK(&socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[remote_fed_id] != -1) { - close(socket_id); + shutdown_socket(&socket_id, false); _fed.sockets_for_inbound_p2p_connections[remote_fed_id] = -1; } LF_MUTEX_UNLOCK(&socket_mutex); diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index cc71e897b..e2b20660f 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -203,7 +203,7 @@ int accept_socket(int socket, int rti_socket) { lf_print_error_system_failure("Firewall permissions prohibit connection."); } else { // For the federates, it should check if the rti_socket is still open, before retrying accept(). - if (rti_socket == -1) { + if (rti_socket != -1) { if (check_socket_closed(rti_socket)) { break; } @@ -305,10 +305,7 @@ int read_from_socket_close_on_error(int* socket, size_t num_bytes, unsigned char // Read failed. // Socket has probably been closed from the other side. // Shut down and close the socket from this side. - shutdown(*socket, SHUT_RDWR); - close(*socket); - // Mark the socket closed. - *socket = -1; + shutdown_socket(socket, false); return -1; } return 0; @@ -375,10 +372,7 @@ int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* // Write failed. // Socket has probably been closed from the other side. // Shut down and close the socket from this side. - shutdown(*socket, SHUT_RDWR); - close(*socket); - // Mark the socket closed. - *socket = -1; + shutdown_socket(socket, false); } return result; } @@ -402,3 +396,46 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* } } } + +int shutdown_socket(int* socket, bool read_before_closing) { + if (*socket == -1) { + lf_print_log("Socket is already closed."); + return 0; + } + if (!read_before_closing) { + if (shutdown(*socket, SHUT_RDWR)) { + lf_print_log("On shutdown socket, received reply: %s", strerror(errno)); + goto close_socket; // Try closing socket. + } + } else { + // Signal the other side that no further writes are expected by sending a FIN packet. + // This indicates the write direction is closed. For more details, refer to: + // https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket + if (shutdown(*socket, SHUT_WR)) { + lf_print_log("Failed to shutdown socket: %s", strerror(errno)); + goto close_socket; // Try closing socket. + } + + // Wait for the other side to send an EOF or encounter a socket error. + // Discard any incoming bytes. Normally, this read should return 0, indicating the peer has also closed the + // connection. + // This compensates for delayed ACKs and scenarios where Nagle's algorithm is disabled, ensuring the shutdown + // completes gracefully. + unsigned char buffer[10]; + while (read(*socket, buffer, 10) > 0) + ; + } + +close_socket: // Label to jump to the closing part of the function + // NOTE: In all common TCP/IP stacks, there is a time period, + // typically between 30 and 120 seconds, called the TIME_WAIT period, + // before the port is released after this close. This is because + // the OS is preventing another program from accidentally receiving + // duplicated packets intended for this program. + if (close(*socket)) { + lf_print_log("Error while closing socket: %s\n", strerror(errno)); + return -1; + } + *socket = -1; + return 0; +} diff --git a/include/core/federated/network/socket_common.h b/include/core/federated/network/socket_common.h index 9c6138e05..87bac7511 100644 --- a/include/core/federated/network/socket_common.h +++ b/include/core/federated/network/socket_common.h @@ -240,4 +240,14 @@ int write_to_socket_close_on_error(int* socket, size_t num_bytes, unsigned char* void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, char* format, ...); +/** + * Shutdown and close the socket. If read_before_closing is false, it just immediately calls shutdown() with SHUT_RDWR + * and close(). If read_before_closing is true, it calls shutdown with SHUT_WR, only disallowing further writing. Then, + * it calls read() until EOF is received, and discards all received bytes. + * @param socket Pointer to the socket descriptor to shutdown and close. + * @param read_before_closing If true, read until EOF before closing the socket. + * @return int 0 for success and -1 for an error. + */ +int shutdown_socket(int* socket, bool read_before_closing); + #endif /* SOCKET_COMMON_H */