Skip to content

Commit

Permalink
TLS deliver buffer data during shutdown (#650)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Graeb <graebm@amazon.com>
  • Loading branch information
TingDaoK and graebm authored Jul 29, 2024
1 parent bf06976 commit 614c35c
Show file tree
Hide file tree
Showing 9 changed files with 740 additions and 308 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
clang-sanitizers:
runs-on: ubuntu-22.04 # latest
strategy:
fail-fast: false
matrix:
sanitizers: [",thread", ",address,undefined"]
steps:
Expand Down
6 changes: 6 additions & 0 deletions include/aws/io/private/tls_channel_handler_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ struct aws_tls_channel_handler_shared {
struct aws_crt_statistics_tls stats;
};

enum aws_tls_handler_read_state {
AWS_TLS_HANDLER_OPEN,
AWS_TLS_HANDLER_READ_SHUTTING_DOWN,
AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE,
};

AWS_EXTERN_C_BEGIN

AWS_IO_API void aws_tls_channel_handler_shared_init(
Expand Down
4 changes: 2 additions & 2 deletions source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar

channel->window_update_scheduled = false;

if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUT_DOWN) {
/* get the right-most slot to start the updates. */
struct aws_channel_slot *slot = channel->first;
while (slot->adj_right) {
Expand Down Expand Up @@ -858,7 +858,7 @@ static void s_window_update_task(struct aws_channel_task *channel_task, void *ar

int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) {

if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUT_DOWN) {
slot->current_window_update_batch_size =
aws_add_size_saturating(slot->current_window_update_batch_size, window);

Expand Down
175 changes: 123 additions & 52 deletions source/darwin/secure_transport_tls_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ struct secure_transport_handler {
bool negotiation_finished;
bool verify_peer;
bool read_task_pending;
enum aws_tls_handler_read_state read_state;
int delay_shutdown_error_code;
};

static OSStatus s_read_cb(SSLConnectionRef conn, void *data, size_t *len) {
Expand Down Expand Up @@ -548,6 +550,41 @@ static int s_process_write_message(
return AWS_OP_SUCCESS;
}

static void s_run_read(struct aws_channel_task *task, void *arg, aws_task_status status);

static void s_initialize_read_delay_shutdown(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
int error_code) {
struct secure_transport_handler *secure_transport_handler = handler->impl;
/**
* In case of if we have any queued data in the handler after negotiation and we start to shutdown,
* make sure we pass those data down the pipeline before we complete the shutdown.
*/
AWS_LOGF_DEBUG(
AWS_LS_IO_TLS,
"id=%p: TLS handler still have pending data to be delivered during shutdown. Wait until downstream "
"reads the data.",
(void *)handler);
if (aws_channel_slot_downstream_read_window(slot) == 0) {
AWS_LOGF_WARN(
AWS_LS_IO_TLS,
"id=%p: TLS shutdown delayed. Pending data cannot be processed until the flow-control window opens. "
" Your application may hang if the read window never opens",
(void *)handler);
}
secure_transport_handler->read_state = AWS_TLS_HANDLER_READ_SHUTTING_DOWN;
secure_transport_handler->delay_shutdown_error_code = error_code;
if (!secure_transport_handler->read_task_pending) {
/* Kick off read, in case data arrives with TLS negotiation. Shutdown starts right after negotiation.
* Nothing will kick off read in that case. */
secure_transport_handler->read_task_pending = true;
aws_channel_task_init(
&secure_transport_handler->read_task, s_run_read, handler, "darwin_channel_handler_read_on_delay_shutdown");
aws_channel_schedule_task_now(slot->channel, &secure_transport_handler->read_task);
}
}

static int s_handle_shutdown(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
Expand All @@ -556,24 +593,30 @@ static int s_handle_shutdown(
bool abort_immediately) {
struct secure_transport_handler *secure_transport_handler = handler->impl;

if (dir == AWS_CHANNEL_DIR_WRITE) {
if (dir == AWS_CHANNEL_DIR_READ) {
AWS_LOGF_DEBUG(
AWS_LS_IO_TLS, "id=%p: shutting down read direction with error %d.", (void *)handler, error_code);
if (!abort_immediately && secure_transport_handler->negotiation_finished &&
!aws_linked_list_empty(&secure_transport_handler->input_queue) && slot->adj_right) {
s_initialize_read_delay_shutdown(handler, slot, error_code);
/* Early out, not complete the shutdown process for the handler until the handler processes the pending
* data. */
return AWS_OP_SUCCESS;
}
secure_transport_handler->read_state = AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE;
} else {
/* Shutdown in write direction */
if (!abort_immediately && error_code != AWS_IO_SOCKET_CLOSED) {
AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: shutting down write direction.", (void *)handler);
SSLClose(secure_transport_handler->ctx);
}
} else {
AWS_LOGF_DEBUG(
AWS_LS_IO_TLS,
"id=%p: shutting down read direction with error %d. Flushing queues.",
(void *)handler,
error_code);
while (!aws_linked_list_empty(&secure_transport_handler->input_queue)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&secure_transport_handler->input_queue);
struct aws_io_message *message = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
aws_mem_release(message->allocator, message);
}
}

/* Flushing queues */
while (!aws_linked_list_empty(&secure_transport_handler->input_queue)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&secure_transport_handler->input_queue);
struct aws_io_message *message = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);
aws_mem_release(message->allocator, message);
}
return aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, abort_immediately);
}

Expand All @@ -583,6 +626,12 @@ static int s_process_read_message(
struct aws_io_message *message) {

struct secure_transport_handler *secure_transport_handler = handler->impl;
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE) {
if (message) {
aws_mem_release(message->allocator, message);
}
return AWS_OP_SUCCESS;
}

if (message) {
aws_linked_list_push_back(&secure_transport_handler->input_queue, &message->queueing_handle);
Expand All @@ -608,59 +657,67 @@ static int s_process_read_message(
AWS_LS_IO_TLS, "id=%p: downstream window is %llu", (void *)handler, (unsigned long long)downstream_window);
size_t processed = 0;

OSStatus status = noErr;
while (processed < downstream_window && status == noErr) {
int shutdown_error_code = 0;
while (processed < downstream_window) {

struct aws_io_message *outgoing_read_message = aws_channel_acquire_message_from_pool(
slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, downstream_window - processed);

size_t read = 0;
status = SSLRead(
OSStatus status = SSLRead(
secure_transport_handler->ctx,
outgoing_read_message->message_data.buffer,
outgoing_read_message->message_data.capacity,
&read);

AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: bytes read %llu", (void *)handler, (unsigned long long)read);
if (read <= 0) {
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);

if (status != errSSLWouldBlock) {
AWS_LOGF_ERROR(
AWS_LS_IO_TLS,
"id=%p: error reported during SSLRead. OSStatus code %d",
(void *)handler,
(int)status);
if (read > 0) {
processed += read;
outgoing_read_message->message_data.len = read;

if (status != errSSLClosedGraceful) {
aws_raise_error(AWS_IO_TLS_ERROR_READ_FAILURE);
aws_channel_shutdown(secure_transport_handler->parent_slot->channel, AWS_IO_TLS_ERROR_READ_FAILURE);
} else {
AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: connection shutting down gracefully.", (void *)handler);
aws_channel_shutdown(secure_transport_handler->parent_slot->channel, AWS_ERROR_SUCCESS);
}
if (secure_transport_handler->on_data_read) {
secure_transport_handler->on_data_read(
handler, slot, &outgoing_read_message->message_data, secure_transport_handler->user_data);
}
continue;
};

processed += read;
outgoing_read_message->message_data.len = read;

if (secure_transport_handler->on_data_read) {
secure_transport_handler->on_data_read(
handler, slot, &outgoing_read_message->message_data, secure_transport_handler->user_data);
}

if (slot->adj_right) {
if (aws_channel_slot_send_message(slot, outgoing_read_message, AWS_CHANNEL_DIR_READ)) {
if (slot->adj_right) {
if (aws_channel_slot_send_message(slot, outgoing_read_message, AWS_CHANNEL_DIR_READ)) {
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);
shutdown_error_code = aws_last_error();
goto shutdown_channel;
}
/* outgoing message was pushed to the input_queue, so this handler owns it now */
} else {
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);
aws_channel_shutdown(secure_transport_handler->parent_slot->channel, aws_last_error());
/* incoming message was pushed to the input_queue, so this handler owns it now */
return AWS_OP_SUCCESS;
}
} else {
/* Nothing was read */
aws_mem_release(outgoing_read_message->allocator, outgoing_read_message);
}

switch (status) {
case errSSLWouldBlock:
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUTTING_DOWN) {
/* Propagate the shutdown as we blocked now. */
goto shutdown_channel;
} else {
break;
}
case errSSLClosedGraceful:
AWS_LOGF_TRACE(AWS_LS_IO_TLS, "id=%p: connection shutting down gracefully.", (void *)handler);
goto shutdown_channel;
case noErr:
/* continue the while loop */
continue;
default:
/* unexpected error happened */
aws_raise_error(AWS_IO_TLS_ERROR_READ_FAILURE);
shutdown_error_code = AWS_IO_TLS_ERROR_READ_FAILURE;
goto shutdown_channel;
}

/* Break the while loop */
break;
}
AWS_LOGF_TRACE(
AWS_LS_IO_TLS,
Expand All @@ -669,6 +726,21 @@ static int s_process_read_message(
(unsigned long long)downstream_window - processed);

return AWS_OP_SUCCESS;

shutdown_channel:
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUTTING_DOWN) {
if (secure_transport_handler->delay_shutdown_error_code != 0) {
/* Propagate the original error code if it is set. */
shutdown_error_code = secure_transport_handler->delay_shutdown_error_code;
}
/* Continue the shutdown process delayed before. */
secure_transport_handler->read_state = AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE;
aws_channel_slot_on_handler_shutdown_complete(slot, AWS_CHANNEL_DIR_READ, shutdown_error_code, false);
} else {
/* Starts the shutdown process */
aws_channel_shutdown(slot->channel, shutdown_error_code);
}
return AWS_OP_SUCCESS;
}

static void s_run_read(struct aws_channel_task *task, void *arg, aws_task_status status) {
Expand All @@ -683,6 +755,9 @@ static void s_run_read(struct aws_channel_task *task, void *arg, aws_task_status

static int s_increment_read_window(struct aws_channel_handler *handler, struct aws_channel_slot *slot, size_t size) {
struct secure_transport_handler *secure_transport_handler = handler->impl;
if (secure_transport_handler->read_state == AWS_TLS_HANDLER_READ_SHUT_DOWN_COMPLETE) {
return AWS_OP_SUCCESS;
}

AWS_LOGF_TRACE(
AWS_LS_IO_TLS, "id=%p: increment read window message received %llu", (void *)handler, (unsigned long long)size);
Expand All @@ -704,13 +779,9 @@ static int s_increment_read_window(struct aws_channel_handler *handler, struct a
aws_channel_slot_increment_read_window(slot, window_update_size);
}

if (secure_transport_handler->negotiation_finished && !secure_transport_handler->read_task.node.next) {
if (secure_transport_handler->negotiation_finished && !secure_transport_handler->read_task_pending) {
/* TLS requires full records before it can decrypt anything. As a result we need to check everything we've
* buffered instead of just waiting on a read from the socket, or we'll hit a deadlock.
*
* We have messages in a queue and they need to be run after the socket has popped (even if it didn't have data
* to read). Alternatively, s2n reads entire records at a time, so we'll need to grab whatever we can and we
* have no idea what's going on inside there. So we need to attempt another read.
*/
secure_transport_handler->read_task_pending = true;
aws_channel_task_init(
Expand Down
Loading

0 comments on commit 614c35c

Please sign in to comment.