From 23fa7d17cd20181c1a85198d4e9a914606c28b62 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Wed, 16 Oct 2024 18:38:31 -0400 Subject: [PATCH 1/2] return UV_ENOTCONN / UV_EPIPE on stream write or write_end if not connected or already ended --- src/udx.c | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/udx.c b/src/udx.c index 1c52ce6..b646629 100644 --- a/src/udx.c +++ b/src/udx.c @@ -2450,6 +2450,11 @@ udx_stream_relay_to (udx_stream_t *stream, udx_stream_t *destination) { int udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb) { + + if (!(stream->status & UDX_STREAM_CONNECTED)) { + return UV_ENOTCONN; + } + assert(bufs_len == 1); req->stream = stream; @@ -2512,7 +2517,17 @@ _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf int udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb) { - assert(bufs_len > 0); + if (!(stream->status & UDX_STREAM_CONNECTED)) { + return UV_ENOTCONN; + } + + if (stream->status & UDX_STREAM_ENDING) { + return UV_EPIPE; + } + + if (bufs_len == 0) { + return UV_EINVAL; + } req->nwbufs = bufs_len; @@ -2531,6 +2546,15 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t int udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb) { + + if (!(stream->status & UDX_STREAM_CONNECTED)) { + return UV_ENOTCONN; + } + + if (stream->status & UDX_STREAM_ENDING) { + return UV_EPIPE; + } + stream->status |= UDX_STREAM_ENDING; if (bufs_len > 0) { From f6502ee0e00584b0a76d76cebab43dd60268f487 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Thu, 17 Oct 2024 07:03:36 -0400 Subject: [PATCH 2/2] UV_EBUSY on socket close if streams are still open --- include/udx.h | 11 +++------ src/udx.c | 65 +++++++++++++++------------------------------------ 2 files changed, 22 insertions(+), 54 deletions(-) diff --git a/include/udx.h b/include/udx.h index 1e2b735..4da8fbd 100644 --- a/include/udx.h +++ b/include/udx.h @@ -28,10 +28,9 @@ extern "C" { #define UDX_MAGIC_BYTE 255 #define UDX_VERSION 1 -#define UDX_SOCKET_RECEIVING 0b0001 -#define UDX_SOCKET_BOUND 0b0010 -#define UDX_SOCKET_CLOSING 0b0100 -#define UDX_SOCKET_CLOSING_HANDLES 0b1000 +#define UDX_SOCKET_RECEIVING 0b0001 +#define UDX_SOCKET_BOUND 0b0010 +#define UDX_SOCKET_CLOSED 0b0100 #define UDX_STREAM_CONNECTED 0b000000001 #define UDX_STREAM_RECEIVING 0b000000010 @@ -476,10 +475,6 @@ udx_stream_read_start (udx_stream_t *stream, udx_stream_read_cb cb); int udx_stream_read_stop (udx_stream_t *stream); -// only exposed here as a convenience / debug tool - the udx instance uses this automatically -int -udx_stream_check_timeouts (udx_stream_t *stream); - int udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb); diff --git a/src/udx.c b/src/udx.c index b646629..d6b2e81 100644 --- a/src/udx.c +++ b/src/udx.c @@ -176,25 +176,6 @@ on_uv_close (uv_handle_t *handle) { trigger_socket_close((udx_socket_t *) handle->data); } -void -udx__close_handles (udx_socket_t *socket) { - if (socket->status & UDX_SOCKET_CLOSING_HANDLES) return; - socket->status |= UDX_SOCKET_CLOSING_HANDLES; - - if (socket->status & UDX_SOCKET_BOUND) { - socket->pending_closes++; - uv_poll_stop(&(socket->io_poll)); - uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close); - } - - socket->pending_closes++; - uv_close((uv_handle_t *) &(socket->handle), on_uv_close); - - udx_t *udx = socket->udx; - - udx->sockets--; -} - static bool stream_write_wanted (udx_stream_t *stream) { if (!(stream->status & UDX_STREAM_CONNECTED)) { @@ -232,7 +213,7 @@ socket_write_wanted (udx_socket_t *socket) { static int update_poll (udx_socket_t *socket) { - if (socket->status & UDX_SOCKET_CLOSING_HANDLES) { + if (socket->status & UDX_SOCKET_CLOSED) { assert(!uv_is_active((uv_handle_t *) &socket->io_poll)); return 0; } @@ -414,7 +395,6 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) { static void clear_outgoing_packets (udx_stream_t *stream) { - debug_printf("close: clearing outgoing packets\n"); // todo: skip the math, and just // 1. destroy all packets @@ -450,8 +430,6 @@ clear_outgoing_packets (udx_stream_t *stream) { free(pkt); } - debug_printf("close: cancelling queued writes, queue.len=%u\n", stream->write_queue.len); - while (stream->write_queue.len > 0) { udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_shift(&stream->write_queue), udx_stream_write_buf_t, queue); assert(wbuf != NULL); @@ -1428,16 +1406,10 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd } static bool -check_if_streams_have_data (udx_socket_t *socket) { +check_for_streams (udx_socket_t *socket) { for (uint32_t i = 0; i < socket->udx->streams_len; i++) { udx_stream_t *stream = socket->udx->streams[i]; - if (stream->socket == socket) { - if (stream->status & UDX_STREAM_DEAD) { - if (stream->write_wanted & (UDX_STREAM_WRITE_WANT_DESTROY | UDX_STREAM_WRITE_WANT_STATE)) return true; - } else { - if (stream->write_wanted || stream->write_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0) return true; - } - } + if (stream->socket == socket) return true; } return false; } @@ -1485,7 +1457,7 @@ send_packet (udx_socket_t *socket, udx_packet_t *pkt) { static bool send_datagrams (udx_socket_t *socket) { - assert((socket->status & UDX_SOCKET_CLOSING_HANDLES) == 0); + assert((socket->status & UDX_SOCKET_CLOSED) == 0); while (socket->send_queue.len > 0) { udx_packet_t *pkt = udx__queue_data(udx__queue_peek(&socket->send_queue), udx_packet_t, queue); ssize_t rc = send_packet(socket, pkt); @@ -1500,7 +1472,7 @@ send_datagrams (udx_socket_t *socket) { req->on_send(req, 0); } // edge case: user calls both udx_socket_close (draining queue) and udx_socket_send (adding to queue) - if (socket->status & UDX_SOCKET_CLOSING_HANDLES) { + if (socket->status & UDX_SOCKET_CLOSED) { return false; } } @@ -1870,7 +1842,7 @@ on_uv_poll (uv_poll_t *handle, int status, int events) { buf.base = (char *) &b; buf.len = 2048; - while (!(socket->status & UDX_SOCKET_CLOSING_HANDLES) && (size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) { + while (!(socket->status & UDX_SOCKET_CLOSED) && (size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) { if (!process_packet(socket, b, size, (struct sockaddr *) &addr) && socket->on_recv != NULL) { buf.len = size; @@ -1885,17 +1857,13 @@ on_uv_poll (uv_poll_t *handle, int status, int events) { } } - if (events & UV_WRITABLE && !(socket->status & UDX_SOCKET_CLOSING_HANDLES)) { + if (events & UV_WRITABLE && !(socket->status & UDX_SOCKET_CLOSED)) { if (events & UV_READABLE) { // compensate for potentially long-running read callbacks uv_update_time(handle->loop); } send_packets(socket); - - if (socket->status & UDX_SOCKET_CLOSING && socket->send_queue.len == 0 && !check_if_streams_have_data(socket)) { - udx__close_handles(socket); - } } update_poll(socket); @@ -2110,15 +2078,12 @@ udx_socket_recv_stop (udx_socket_t *socket) { int udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) { - // if (socket->streams_len > 0) return UV_EBUSY; + if (check_for_streams(socket)) return UV_EBUSY; - socket->status |= UDX_SOCKET_CLOSING; + socket->status |= UDX_SOCKET_CLOSED; socket->on_close = cb; - // allow stream packets to flush, but cancel anything else - // todo: drop all relay packets - while (socket->send_queue.len > 0) { udx_packet_t *pkt = udx__queue_data(udx__queue_shift(&socket->send_queue), udx_packet_t, queue); assert(pkt != NULL); @@ -2130,10 +2095,18 @@ udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) { } } - if (!check_if_streams_have_data(socket)) { - udx__close_handles(socket); + if (socket->status & UDX_SOCKET_BOUND) { + socket->pending_closes++; + uv_poll_stop(&(socket->io_poll)); + uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close); } + socket->pending_closes++; + uv_close((uv_handle_t *) &(socket->handle), on_uv_close); + + udx_t *udx = socket->udx; + udx->sockets--; + return 0; }