Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error on end unconnected #217

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ extern "C" {
#define UDX_MAGIC_BYTE 255
#define UDX_VERSION 1

#define UDX_SOCKET_RECEIVING 0b0001
#define UDX_SOCKET_BOUND 0b0010
#define UDX_SOCKET_CLOSING 0b0100
#define UDX_SOCKET_CLOSING_HANDLES 0b1000
#define UDX_SOCKET_RECEIVING 0b0001
#define UDX_SOCKET_BOUND 0b0010
#define UDX_SOCKET_CLOSED 0b0100

#define UDX_STREAM_CONNECTED 0b000000001
#define UDX_STREAM_RECEIVING 0b000000010
Expand Down Expand Up @@ -476,10 +475,6 @@ udx_stream_read_start (udx_stream_t *stream, udx_stream_read_cb cb);
int
udx_stream_read_stop (udx_stream_t *stream);

// only exposed here as a convenience / debug tool - the udx instance uses this automatically
int
udx_stream_check_timeouts (udx_stream_t *stream);

int
udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb);

Expand Down
91 changes: 44 additions & 47 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,6 @@ on_uv_close (uv_handle_t *handle) {
trigger_socket_close((udx_socket_t *) handle->data);
}

void
udx__close_handles (udx_socket_t *socket) {
if (socket->status & UDX_SOCKET_CLOSING_HANDLES) return;
socket->status |= UDX_SOCKET_CLOSING_HANDLES;

if (socket->status & UDX_SOCKET_BOUND) {
socket->pending_closes++;
uv_poll_stop(&(socket->io_poll));
uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close);
}

socket->pending_closes++;
uv_close((uv_handle_t *) &(socket->handle), on_uv_close);

udx_t *udx = socket->udx;

udx->sockets--;
}

static bool
stream_write_wanted (udx_stream_t *stream) {
if (!(stream->status & UDX_STREAM_CONNECTED)) {
Expand Down Expand Up @@ -232,7 +213,7 @@ socket_write_wanted (udx_socket_t *socket) {

static int
update_poll (udx_socket_t *socket) {
if (socket->status & UDX_SOCKET_CLOSING_HANDLES) {
if (socket->status & UDX_SOCKET_CLOSED) {
assert(!uv_is_active((uv_handle_t *) &socket->io_poll));
return 0;
}
Expand Down Expand Up @@ -414,7 +395,6 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {

static void
clear_outgoing_packets (udx_stream_t *stream) {
debug_printf("close: clearing outgoing packets\n");

// todo: skip the math, and just
// 1. destroy all packets
Expand Down Expand Up @@ -450,8 +430,6 @@ clear_outgoing_packets (udx_stream_t *stream) {
free(pkt);
}

debug_printf("close: cancelling queued writes, queue.len=%u\n", stream->write_queue.len);

while (stream->write_queue.len > 0) {
udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_shift(&stream->write_queue), udx_stream_write_buf_t, queue);
assert(wbuf != NULL);
Expand Down Expand Up @@ -1428,16 +1406,10 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
}

static bool
check_if_streams_have_data (udx_socket_t *socket) {
check_for_streams (udx_socket_t *socket) {
for (uint32_t i = 0; i < socket->udx->streams_len; i++) {
udx_stream_t *stream = socket->udx->streams[i];
if (stream->socket == socket) {
if (stream->status & UDX_STREAM_DEAD) {
if (stream->write_wanted & (UDX_STREAM_WRITE_WANT_DESTROY | UDX_STREAM_WRITE_WANT_STATE)) return true;
} else {
if (stream->write_wanted || stream->write_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0) return true;
}
}
if (stream->socket == socket) return true;
}
return false;
}
Expand Down Expand Up @@ -1485,7 +1457,7 @@ send_packet (udx_socket_t *socket, udx_packet_t *pkt) {

static bool
send_datagrams (udx_socket_t *socket) {
assert((socket->status & UDX_SOCKET_CLOSING_HANDLES) == 0);
assert((socket->status & UDX_SOCKET_CLOSED) == 0);
while (socket->send_queue.len > 0) {
udx_packet_t *pkt = udx__queue_data(udx__queue_peek(&socket->send_queue), udx_packet_t, queue);
ssize_t rc = send_packet(socket, pkt);
Expand All @@ -1500,7 +1472,7 @@ send_datagrams (udx_socket_t *socket) {
req->on_send(req, 0);
}
// edge case: user calls both udx_socket_close (draining queue) and udx_socket_send (adding to queue)
if (socket->status & UDX_SOCKET_CLOSING_HANDLES) {
if (socket->status & UDX_SOCKET_CLOSED) {
return false;
}
}
Expand Down Expand Up @@ -1870,7 +1842,7 @@ on_uv_poll (uv_poll_t *handle, int status, int events) {
buf.base = (char *) &b;
buf.len = 2048;

while (!(socket->status & UDX_SOCKET_CLOSING_HANDLES) && (size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) {
while (!(socket->status & UDX_SOCKET_CLOSED) && (size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) {
if (!process_packet(socket, b, size, (struct sockaddr *) &addr) && socket->on_recv != NULL) {
buf.len = size;

Expand All @@ -1885,17 +1857,13 @@ on_uv_poll (uv_poll_t *handle, int status, int events) {
}
}

if (events & UV_WRITABLE && !(socket->status & UDX_SOCKET_CLOSING_HANDLES)) {
if (events & UV_WRITABLE && !(socket->status & UDX_SOCKET_CLOSED)) {
if (events & UV_READABLE) {
// compensate for potentially long-running read callbacks
uv_update_time(handle->loop);
}

send_packets(socket);

if (socket->status & UDX_SOCKET_CLOSING && socket->send_queue.len == 0 && !check_if_streams_have_data(socket)) {
udx__close_handles(socket);
}
}

update_poll(socket);
Expand Down Expand Up @@ -2110,15 +2078,12 @@ udx_socket_recv_stop (udx_socket_t *socket) {

int
udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) {
// if (socket->streams_len > 0) return UV_EBUSY;
if (check_for_streams(socket)) return UV_EBUSY;

socket->status |= UDX_SOCKET_CLOSING;
socket->status |= UDX_SOCKET_CLOSED;

socket->on_close = cb;

// allow stream packets to flush, but cancel anything else
// todo: drop all relay packets

while (socket->send_queue.len > 0) {
udx_packet_t *pkt = udx__queue_data(udx__queue_shift(&socket->send_queue), udx_packet_t, queue);
assert(pkt != NULL);
Expand All @@ -2130,10 +2095,18 @@ udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) {
}
}

if (!check_if_streams_have_data(socket)) {
udx__close_handles(socket);
if (socket->status & UDX_SOCKET_BOUND) {
socket->pending_closes++;
uv_poll_stop(&(socket->io_poll));
uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close);
}

socket->pending_closes++;
uv_close((uv_handle_t *) &(socket->handle), on_uv_close);

udx_t *udx = socket->udx;
udx->sockets--;

return 0;
}

Expand Down Expand Up @@ -2450,6 +2423,11 @@ udx_stream_relay_to (udx_stream_t *stream, udx_stream_t *destination) {

int
udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb) {

if (!(stream->status & UDX_STREAM_CONNECTED)) {
return UV_ENOTCONN;
}

assert(bufs_len == 1);

req->stream = stream;
Expand Down Expand Up @@ -2512,7 +2490,17 @@ _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf

int
udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb) {
assert(bufs_len > 0);
if (!(stream->status & UDX_STREAM_CONNECTED)) {
return UV_ENOTCONN;
}

if (stream->status & UDX_STREAM_ENDING) {
return UV_EPIPE;
}

if (bufs_len == 0) {
return UV_EINVAL;
}

req->nwbufs = bufs_len;

Expand All @@ -2531,6 +2519,15 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t

int
udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb) {

if (!(stream->status & UDX_STREAM_CONNECTED)) {
return UV_ENOTCONN;
}

if (stream->status & UDX_STREAM_ENDING) {
return UV_EPIPE;
}

stream->status |= UDX_STREAM_ENDING;

if (bufs_len > 0) {
Expand Down
Loading