Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement event loop based on QNX ionotify #669

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
aa4f966
Provide I/O operation status back to event loop
sfodagain Aug 26, 2024
abe7747
Add flag for last io result
sfodagain Aug 26, 2024
ce39822
Merge branch 'main' into support-qnx
sfodagain Aug 27, 2024
8f7cbff
Add callback to io handle
sfodagain Aug 27, 2024
50bca0d
Revert unrelated changes
sfodagain Aug 27, 2024
29d9c04
Use shared sources
sfodagain Aug 27, 2024
ad1262d
fixup
sfodagain Aug 27, 2024
28e55e1
fixup
sfodagain Aug 27, 2024
c5f9e00
Use #if everywhere
sfodagain Aug 27, 2024
4ed5a87
Fix kqueue
sfodagain Aug 27, 2024
619319f
Fix pipe tests
sfodagain Aug 27, 2024
be85d89
Remove AWS_ASSERT, use AWS_ZERO_STRUCT
sfodagain Sep 3, 2024
739e0f6
Remove changes made to kqueue
sfodagain Sep 16, 2024
4036be9
Add ionotify event loop (#670)
sfod Sep 16, 2024
d709f28
Remove pipe test fix
sfodagain Sep 16, 2024
d8a8085
Copy posix stuff to qnx
sfodagain Sep 16, 2024
702312a
fixup
sfodagain Sep 17, 2024
eede270
fixup
sfodagain Sep 17, 2024
8bc5d1b
Fix pipe missing events issue
sfodagain Sep 17, 2024
692d125
Handle unsubscribing in a task
sfodagain Sep 17, 2024
c79772e
Handle is_subscribed only in resubscriptions
sfodagain Sep 17, 2024
2bddf95
Add aws_pipe_read to tests
sfodagain Sep 17, 2024
dd34f87
Fix race condition, fix pulse error code
sfodagain Sep 20, 2024
c4dceea
Use separate task for resubscribing
sfodagain Sep 20, 2024
29f900e
Add QNX paths
sfodagain Sep 20, 2024
a8d8366
Remove non-qnx specifics from posix copies
sfodagain Sep 23, 2024
6af39ec
Improve comments and logging
sfodagain Sep 23, 2024
9c35c72
Fix latest_io_event_types
sfodagain Sep 23, 2024
bc653f6
Fix format
sfodagain Sep 23, 2024
48134e8
Merge posix and qnx sources
sfodagain Sep 24, 2024
4b59301
Fix naming and comments
sfodagain Sep 25, 2024
1700c76
Use single destroy
sfodagain Sep 25, 2024
88aff4d
Use atomic for event loop state
sfodagain Sep 26, 2024
78b4d38
Use AWS_FATAL_ASSERT
sfodagain Sep 26, 2024
22f3e24
Fix log format string
sfodagain Sep 26, 2024
52110fb
Remove strerror
sfodagain Sep 26, 2024
93c0c39
Add MsgUnregisterEvent on unsubscribing
sfodagain Sep 26, 2024
477a071
Fix logs, comments, code style
sfodagain Sep 26, 2024
bda976c
Fix naming
sfodagain Sep 26, 2024
be74788
Refactor cleaning up and resubscribing
sfodagain Sep 29, 2024
5d9f58f
Add comment to processing io results
sfodagain Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ elseif (APPLE)
#No choice on TLS for apple, darwinssl will always be used.
list(APPEND PLATFORM_LIBS "-framework Security")
set(EVENT_LOOP_DEFINE "KQUEUE")
# FIXME For debugging.
set(EVENT_LOOP_DEFINE "ON_EVENT_WITH_RESULT")

elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetBSD" OR CMAKE_SYSTEM_NAME STREQUAL "OpenBSD")
file(GLOB AWS_IO_OS_HEADERS
Expand All @@ -141,6 +143,8 @@ elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetB
set(EVENT_LOOP_DEFINE "KQUEUE")
set(USE_S2N ON)

elseif(CMAKE_SYSTEM_NAME STREQUAL "QNX")
set(EVENT_LOOP_DEFINE "ON_EVENT_WITH_RESULT")
endif()

if (BYO_CRYPTO)
Expand Down
24 changes: 24 additions & 0 deletions include/aws/io/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,36 @@ AWS_PUSH_SANE_WARNING_LEVEL

#define AWS_C_IO_PACKAGE_ID 1

struct aws_io_handle;

#if AWS_USE_ON_EVENT_WITH_RESULT
struct aws_event_loop;

/**
* Results of the I/O operation(s) performed on the aws_io_handle.
*/
struct aws_io_handle_io_op_result {
size_t read_bytes;
size_t written_bytes;
int read_error_code;
bretambrose marked this conversation as resolved.
Show resolved Hide resolved
int write_error_code;
};

typedef void(aws_io_handle_update_io_results_fn)(
struct aws_event_loop *,
struct aws_io_handle *,
const struct aws_io_handle_io_op_result *);
#endif /* AWS_USE_ON_EVENT_WITH_RESULT */

struct aws_io_handle {
union {
int fd;
void *handle;
} data;
void *additional_data;
#if AWS_USE_ON_EVENT_WITH_RESULT
aws_io_handle_update_io_results_fn *update_io_result;
#endif
};

enum aws_io_message_type {
Expand Down
34 changes: 34 additions & 0 deletions source/bsd/kqueue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,35 @@ struct aws_event_loop_vtable s_kqueue_vtable = {
.is_on_callers_thread = s_is_event_thread,
};

#if AWS_USE_ON_EVENT_WITH_RESULT
/**
* FIXME kqueue is used for debugging/demonstration purposes. It's going to be reverted.
*/
static void s_update_io_result(
struct aws_event_loop *event_loop,
struct aws_io_handle *handle,
const struct aws_io_handle_io_op_result *io_op_result) {
AWS_ASSERT(handle->additional_data);
struct handle_data *handle_data = handle->additional_data;
(void)handle_data;
AWS_ASSERT(event_loop == handle_data->event_loop);
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"id=%p: got feedback on I/O operation for fd %d: read: status %d (%s), %lu bytes; write: status %d (%s), %lu "
"bytes",
(void *)event_loop,
handle->data.fd,
io_op_result->read_error_code,
aws_error_str(io_op_result->read_error_code),
io_op_result->read_bytes,
io_op_result->write_error_code,
aws_error_str(io_op_result->write_error_code),
io_op_result->written_bytes);

/* Here, the handle IO status should be updated. It'll be used in the event loop. */
}
#endif

struct aws_event_loop *aws_event_loop_new_default_with_options(
struct aws_allocator *alloc,
const struct aws_event_loop_options *options) {
Expand Down Expand Up @@ -586,6 +615,9 @@ static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_ta

/* Success */
handle_data->state = HANDLE_STATE_SUBSCRIBED;
#if AWS_USE_ON_EVENT_WITH_RESULT
handle_data->owner->update_io_result = s_update_io_result;
#endif
return;

subscribe_failed:
Expand Down Expand Up @@ -931,6 +963,8 @@ static void aws_event_loop_thread(void *user_data) {
handle_data->owner->data.fd);
handle_data->on_event(
event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);

/* It's possible to check for IO result here. */
}

handle_data->events_this_loop = 0;
Expand Down
17 changes: 17 additions & 0 deletions source/posix/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@ int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_b
if (read_val < 0) {
int errno_value = errno; /* Always cache errno before potential side-effect */
if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
#if AWS_USE_ON_EVENT_WITH_RESULT
if (read_impl->handle.update_io_result) {
struct aws_io_handle_io_op_result io_op_result;
memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result));
sfod marked this conversation as resolved.
Show resolved Hide resolved
io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK;
read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result);
}
#endif
return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
}
return s_raise_posix_error(errno_value);
Expand Down Expand Up @@ -454,6 +462,15 @@ static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) {
if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
/* The pipe is no longer writable. Bail out */
write_impl->is_writable = false;

#if AWS_USE_ON_EVENT_WITH_RESULT
struct aws_io_handle_io_op_result io_op_result;
sfod marked this conversation as resolved.
Show resolved Hide resolved
memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result));
io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK;
AWS_ASSERT(write_impl->handle.update_io_result);
write_impl->handle.update_io_result(write_impl->event_loop, &write_impl->handle, &io_op_result);
#endif

return;
}

Expand Down
48 changes: 48 additions & 0 deletions source/posix/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ static void s_socket_connect_event(
"id=%p fd=%d: spurious event, waiting for another notification.",
(void *)socket_args->socket,
handle->data.fd);

#if AWS_USE_ON_EVENT_WITH_RESULT
struct aws_io_handle_io_op_result io_op_result;
sfod marked this conversation as resolved.
Show resolved Hide resolved
memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result));
io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK;
AWS_ASSERT(handle->update_io_result);
handle->update_io_result(event_loop, handle, &io_op_result);
#endif

return;
}

Expand Down Expand Up @@ -955,6 +964,11 @@ static void s_socket_accept_event(
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd);

#if AWS_USE_ON_EVENT_WITH_RESULT
struct aws_io_handle_io_op_result io_op_result;
memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result));
#endif

if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) {
int in_fd = 0;
while (socket_impl->continue_accept && in_fd != -1) {
Expand All @@ -966,12 +980,18 @@ static void s_socket_accept_event(
int errno_value = errno; /* Always cache errno before potential side-effect */

if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK;
#endif
break;
}

int aws_error = aws_socket_get_error(socket);
aws_raise_error(aws_error);
s_on_connection_error(socket, aws_error);
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.read_error_code = aws_error;
#endif
break;
}

Expand Down Expand Up @@ -1064,6 +1084,11 @@ static void s_socket_accept_event(
}
}

#if AWS_USE_ON_EVENT_WITH_RESULT
AWS_ASSERT(handle->update_io_result);
handle->update_io_result(event_loop, handle, &io_op_result);
#endif

AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET,
"id=%p fd=%d: finished processing incoming connections, "
Expand Down Expand Up @@ -1632,6 +1657,11 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
bool parent_request_failed = false;
bool pushed_to_written_queue = false;

#if AWS_USE_ON_EVENT_WITH_RESULT
struct aws_io_handle_io_op_result io_op_result;
memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result));
#endif

/* if a close call happens in the middle, this queue will have been cleaned out from under us. */
while (!aws_linked_list_empty(&socket_impl->write_queue)) {
struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue);
Expand Down Expand Up @@ -1660,6 +1690,9 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
if (errno_value == EAGAIN) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd);
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK; /* TODO Add AWS_IO_WRITE_EAGAIN code. */
#endif
break;
}

Expand All @@ -1672,6 +1705,9 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
aws_error = AWS_IO_SOCKET_CLOSED;
aws_raise_error(aws_error);
purge = true;
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.write_error_code = aws_error;
#endif
break;
}

Expand All @@ -1684,9 +1720,16 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
errno_value);
aws_error = s_determine_socket_error(errno_value);
aws_raise_error(aws_error);
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.write_error_code = aws_error;
#endif
break;
}

#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.written_bytes += (size_t)written;
#endif

size_t remaining_to_write = write_request->cursor_cpy.len;

aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written);
Expand Down Expand Up @@ -1732,6 +1775,11 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task);
}

#if AWS_USE_ON_EVENT_WITH_RESULT
AWS_ASSERT(socket->io_handle.update_io_result);
socket->io_handle.update_io_result(socket->event_loop, &socket->io_handle, &io_op_result);
#endif

/* Only report error if aws_socket_write() invoked this function and its write_request failed */
if (!parent_request_failed) {
return AWS_OP_SUCCESS;
Expand Down
29 changes: 29 additions & 0 deletions source/socket_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ static void s_do_read(struct socket_handler *socket_handler) {
if (max_to_read == 0) {
return;
}
#if AWS_USE_ON_EVENT_WITH_RESULT
struct aws_io_handle_io_op_result io_op_result;
memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result));
AWS_ASSERT(socket_handler->socket->io_handle.update_io_result);
#endif

size_t total_read = 0;
size_t read = 0;
Expand All @@ -153,10 +158,16 @@ static void s_do_read(struct socket_handler *socket_handler) {
if (aws_socket_read(socket_handler->socket, &message->message_data, &read)) {
last_error = aws_last_error();
aws_mem_release(message->allocator, message);
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.read_error_code = last_error;
#endif
break;
}

total_read += read;
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.read_bytes += read;
#endif
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: read %llu from socket",
Expand All @@ -166,6 +177,9 @@ static void s_do_read(struct socket_handler *socket_handler) {
if (aws_channel_slot_send_message(socket_handler->slot, message, AWS_CHANNEL_DIR_READ)) {
last_error = aws_last_error();
aws_mem_release(message->allocator, message);
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.read_error_code = last_error;
#endif
break;
}
}
Expand All @@ -183,14 +197,24 @@ static void s_do_read(struct socket_handler *socket_handler) {
AWS_ASSERT(last_error != 0);

if (last_error != AWS_IO_READ_WOULD_BLOCK) {
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.read_error_code = last_error;
#endif
aws_channel_shutdown(socket_handler->slot->channel, last_error);
} else {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET_HANDLER,
"id=%p: out of data to read on socket. "
"Waiting on event-loop notification.",
(void *)socket_handler->slot->handler);
#if AWS_USE_ON_EVENT_WITH_RESULT
io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK;
#endif
}
#if AWS_USE_ON_EVENT_WITH_RESULT
socket_handler->socket->io_handle.update_io_result(
socket_handler->socket->event_loop, &socket_handler->socket->io_handle, &io_op_result);
#endif
return;
}
/* in this case, everything was fine, but there's still pending reads. We need to schedule a task to do the read
Expand All @@ -206,6 +230,11 @@ static void s_do_read(struct socket_handler *socket_handler) {
&socket_handler->read_task_storage, s_read_task, socket_handler, "socket_handler_re_read");
aws_channel_schedule_task_now(socket_handler->slot->channel, &socket_handler->read_task_storage);
}

#if AWS_USE_ON_EVENT_WITH_RESULT
socket_handler->socket->io_handle.update_io_result(
socket_handler->socket->event_loop, &socket_handler->socket->io_handle, &io_op_result);
#endif
}

/* the socket is either readable or errored out. If it's readable, kick off s_do_read() to do its thing. */
Expand Down