From 20097d3cc73f244237d02d83f4898be165465cdd Mon Sep 17 00:00:00 2001
From: Dengke Tang <dengket@amazon.com>
Date: Fri, 22 Dec 2023 13:01:50 -0800
Subject: [PATCH] fix two bugs from metrics and cancel (#399)

Co-authored-by: Michael Graeb <graebm@amazon.com>
---
 include/aws/s3/private/s3_meta_request_impl.h |   8 +-
 include/aws/s3/private/s3_request.h           |   4 +-
 source/s3_auto_ranged_put.c                   |   2 +-
 source/s3_meta_request.c                      | 134 +++++++++++-------
 source/s3_util.c                              |   1 +
 tests/CMakeLists.txt                          |   5 +-
 tests/s3_cancel_tests.c                       |  29 +++-
 7 files changed, 117 insertions(+), 66 deletions(-)

diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h
index 9738eb180..d5d3fcb8e 100644
--- a/include/aws/s3/private/s3_meta_request_impl.h
+++ b/include/aws/s3/private/s3_meta_request_impl.h
@@ -226,8 +226,8 @@ struct aws_s3_meta_request {
         /* True if the finish result has been set. */
         uint32_t finish_result_set : 1;
 
-        /* To track the aws_s3_request that are active from HTTP level */
-        struct aws_linked_list ongoing_http_requests_list;
+        /* To track aws_s3_requests with cancellable HTTP streams */
+        struct aws_linked_list cancellable_http_streams_list;
 
     } synced_data;
 
@@ -367,8 +367,8 @@ void aws_s3_meta_request_add_event_for_delivery_synced(
  * The meta-request's finish callback must not be invoked until this returns false. */
 bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_request *meta_request);
 
-/* Cancel the requests with ongoing HTTP activities for the meta request */
-void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code);
+/* Cancel the requests with cancellable HTTP stream for the meta request */
+void aws_s3_meta_request_cancel_cancellable_requests_synced(struct aws_s3_meta_request *meta_request, int error_code);
 
 /* Asynchronously read from the meta request's input stream. Should always be done outside of any mutex,
  * as reading from the stream could cause user code to call back into aws-c-s3.
diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h
index e43996647..b77cf5231 100644
--- a/include/aws/s3/private/s3_request.h
+++ b/include/aws/s3/private/s3_request.h
@@ -114,12 +114,12 @@ struct aws_s3_request {
     struct aws_linked_list_node node;
 
     /* Linked list node used for tracking the request is active from HTTP level. */
-    struct aws_linked_list_node ongoing_http_requests_list_node;
+    struct aws_linked_list_node cancellable_http_streams_list_node;
 
     /* The meta request lock must be held to access the data */
     struct {
         /* The underlying http stream, only valid when the request is active from HTTP level */
-        struct aws_http_stream *http_stream;
+        struct aws_http_stream *cancellable_http_stream;
     } synced_data;
 
     /* TODO Ref count on the request is no longer needed--only one part of code should ever be holding onto a request,
diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c
index e0f7ef972..941d8ae81 100644
--- a/source/s3_auto_ranged_put.c
+++ b/source/s3_auto_ranged_put.c
@@ -1671,7 +1671,7 @@ static int s_s3_auto_ranged_put_pause(
      */
     aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_PAUSED);
 
-    aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_PAUSED);
+    aws_s3_meta_request_cancel_cancellable_requests_synced(meta_request, AWS_ERROR_S3_PAUSED);
 
     /* unlock */
     aws_s3_meta_request_unlock_synced_data(meta_request);
diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c
index 88783c27a..de6aa0055 100644
--- a/source/s3_meta_request.c
+++ b/source/s3_meta_request.c
@@ -203,7 +203,7 @@ int aws_s3_meta_request_init_base(
     meta_request->type = options->type;
     /* Set up reference count. */
     aws_ref_count_init(&meta_request->ref_count, meta_request, s_s3_meta_request_destroy);
-    aws_linked_list_init(&meta_request->synced_data.ongoing_http_requests_list);
+    aws_linked_list_init(&meta_request->synced_data.cancellable_http_streams_list);
 
     if (part_size == SIZE_MAX) {
         aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
@@ -346,7 +346,7 @@ void aws_s3_meta_request_cancel(struct aws_s3_meta_request *meta_request) {
     /* BEGIN CRITICAL SECTION */
     aws_s3_meta_request_lock_synced_data(meta_request);
     aws_s3_meta_request_set_fail_synced(meta_request, NULL, AWS_ERROR_S3_CANCELED);
-    aws_s3_meta_request_cancel_ongoing_http_requests_synced(meta_request, AWS_ERROR_S3_CANCELED);
+    aws_s3_meta_request_cancel_cancellable_requests_synced(meta_request, AWS_ERROR_S3_CANCELED);
     aws_s3_meta_request_unlock_synced_data(meta_request);
     /* END CRITICAL SECTION */
 }
@@ -488,7 +488,7 @@ static void s_s3_meta_request_destroy(void *user_data) {
     AWS_ASSERT(aws_array_list_length(&meta_request->io_threaded_data.event_delivery_array) == 0);
     aws_array_list_clean_up(&meta_request->io_threaded_data.event_delivery_array);
 
-    AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list));
+    AWS_ASSERT(aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list));
 
     aws_s3_meta_request_result_clean_up(meta_request, &meta_request->synced_data.finish_result);
 
@@ -1071,28 +1071,51 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request,
 
     AWS_LOGF_TRACE(AWS_LS_S3_META_REQUEST, "id=%p: Sending request %p", (void *)meta_request, (void *)request);
 
-    if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
-        aws_http_stream_release(stream);
-        stream = NULL;
-
-        AWS_LOGF_ERROR(
-            AWS_LS_S3_META_REQUEST, "id=%p: Could not activate HTTP stream %p", (void *)meta_request, (void *)request);
-
-        goto error_finish;
-    }
-
-    {
+    if (!request->always_send) {
         /* BEGIN CRITICAL SECTION */
         aws_s3_meta_request_lock_synced_data(meta_request);
+        if (aws_s3_meta_request_has_finish_result_synced(meta_request)) {
+            /* The meta request has finish result already, for this request, treat it as canceled. */
+            aws_raise_error(AWS_ERROR_S3_CANCELED);
+            aws_s3_meta_request_unlock_synced_data(meta_request);
+            goto error_finish;
+        }
+
+        /* Activate the stream within the lock as once the activate invoked, the HTTP level callback can happen right
+         * after.  */
+        if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
+            aws_s3_meta_request_unlock_synced_data(meta_request);
+            AWS_LOGF_ERROR(
+                AWS_LS_S3_META_REQUEST,
+                "id=%p: Could not activate HTTP stream %p",
+                (void *)meta_request,
+                (void *)request);
+            goto error_finish;
+        }
         aws_linked_list_push_back(
-            &meta_request->synced_data.ongoing_http_requests_list, &request->ongoing_http_requests_list_node);
-        request->synced_data.http_stream = stream;
+            &meta_request->synced_data.cancellable_http_streams_list, &request->cancellable_http_streams_list_node);
+        request->synced_data.cancellable_http_stream = stream;
+
         aws_s3_meta_request_unlock_synced_data(meta_request);
         /* END CRITICAL SECTION */
+    } else {
+        /* If the request always send, it is not cancellable. We simply activate the stream. */
+        if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
+            AWS_LOGF_ERROR(
+                AWS_LS_S3_META_REQUEST,
+                "id=%p: Could not activate HTTP stream %p",
+                (void *)meta_request,
+                (void *)request);
+            goto error_finish;
+        }
     }
     return;
 
 error_finish:
+    if (stream) {
+        aws_http_stream_release(stream);
+        stream = NULL;
+    }
 
     s_s3_meta_request_send_request_finish(connection, NULL, aws_last_error_or_unknown());
 }
@@ -1385,14 +1408,16 @@ static void s_s3_meta_request_stream_complete(struct aws_http_stream *stream, in
     if (meta_request->checksum_config.validate_response_checksum) {
         s_get_response_part_finish_checksum_helper(connection, error_code);
     }
-    if (error_code != AWS_ERROR_S3_CANCELED && error_code != AWS_ERROR_S3_PAUSED) {
-        /* BEGIN CRITICAL SECTION */
+    /* BEGIN CRITICAL SECTION */
+    {
         aws_s3_meta_request_lock_synced_data(meta_request);
-        AWS_ASSERT(request->synced_data.http_stream != NULL);
-        aws_linked_list_remove(&request->ongoing_http_requests_list_node);
+        if (request->synced_data.cancellable_http_stream) {
+            aws_linked_list_remove(&request->cancellable_http_streams_list_node);
+            request->synced_data.cancellable_http_stream = NULL;
+        }
         aws_s3_meta_request_unlock_synced_data(meta_request);
-        /* END CRITICAL SECTION */
     }
+    /* END CRITICAL SECTION */
     s_s3_meta_request_send_request_finish(connection, stream, error_code);
 }
 
@@ -1668,19 +1693,40 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
            meta_request->synced_data.event_delivery_active;
 }
 
-void aws_s3_meta_request_cancel_ongoing_http_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) {
+void aws_s3_meta_request_cancel_cancellable_requests_synced(struct aws_s3_meta_request *meta_request, int error_code) {
     ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
-    while (!aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list)) {
+    while (!aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list)) {
         struct aws_linked_list_node *request_node =
-            aws_linked_list_pop_front(&meta_request->synced_data.ongoing_http_requests_list);
+            aws_linked_list_pop_front(&meta_request->synced_data.cancellable_http_streams_list);
         struct aws_s3_request *request =
-            AWS_CONTAINER_OF(request_node, struct aws_s3_request, ongoing_http_requests_list_node);
-        if (!request->always_send) {
-            /* Cancel the ongoing http stream, unless it's always send. */
-            aws_http_stream_cancel(request->synced_data.http_stream, error_code);
+            AWS_CONTAINER_OF(request_node, struct aws_s3_request, cancellable_http_streams_list_node);
+        AWS_ASSERT(!request->always_send);
+
+        aws_http_stream_cancel(request->synced_data.cancellable_http_stream, error_code);
+        request->synced_data.cancellable_http_stream = NULL;
+    }
+}
+
+static struct aws_s3_request_metrics *s_s3_request_finish_up_and_release_metrics(
+    struct aws_s3_request_metrics *metrics,
+    struct aws_s3_meta_request *meta_request) {
+
+    if (metrics != NULL) {
+        /* Request is done streaming the body, complete the metrics for the request now. */
+
+        if (metrics->time_metrics.end_timestamp_ns == -1) {
+            aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
+            metrics->time_metrics.total_duration_ns =
+                metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
+        }
+
+        if (meta_request->telemetry_callback != NULL) {
+            /* We already in the meta request event thread, invoke the telemetry callback directly */
+            meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
         }
-        request->synced_data.http_stream = NULL;
+        aws_s3_request_metrics_release(metrics);
     }
+    return NULL;
 }
 
 /* Deliver events in event_delivery_array.
@@ -1750,21 +1796,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
                 aws_atomic_fetch_sub(&client->stats.num_requests_streaming_response, 1);
 
                 ++num_parts_delivered;
-
-                if (request->send_data.metrics != NULL) {
-                    /* Request is done streaming the body, complete the metrics for the request now. */
-                    struct aws_s3_request_metrics *metrics = request->send_data.metrics;
-                    metrics->crt_info_metrics.error_code = error_code;
-                    aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
-                    metrics->time_metrics.total_duration_ns =
-                        metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
-
-                    if (meta_request->telemetry_callback != NULL) {
-                        /* We already in the meta request event thread, invoke the telemetry callback directly */
-                        meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
-                    }
-                    request->send_data.metrics = aws_s3_request_metrics_release(metrics);
-                }
+                request->send_data.metrics =
+                    s_s3_request_finish_up_and_release_metrics(request->send_data.metrics, meta_request);
 
                 aws_s3_request_release(request);
             } break;
@@ -1804,13 +1837,8 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
                 AWS_FATAL_ASSERT(meta_request->telemetry_callback != NULL);
                 AWS_FATAL_ASSERT(metrics != NULL);
 
-                if (metrics->time_metrics.end_timestamp_ns == -1) {
-                    aws_high_res_clock_get_ticks((uint64_t *)&metrics->time_metrics.end_timestamp_ns);
-                    metrics->time_metrics.total_duration_ns =
-                        metrics->time_metrics.end_timestamp_ns - metrics->time_metrics.start_timestamp_ns;
-                }
-                meta_request->telemetry_callback(meta_request, metrics, meta_request->user_data);
-                event.u.telemetry.metrics = aws_s3_request_metrics_release(event.u.telemetry.metrics);
+                event.u.telemetry.metrics =
+                    s_s3_request_finish_up_and_release_metrics(event.u.telemetry.metrics, meta_request);
             } break;
 
             default:
@@ -1935,6 +1963,10 @@ void aws_s3_meta_request_finish_default(struct aws_s3_meta_request *meta_request
         struct aws_linked_list_node *request_node = aws_linked_list_pop_front(&release_request_list);
         struct aws_s3_request *release_request = AWS_CONTAINER_OF(request_node, struct aws_s3_request, node);
         AWS_FATAL_ASSERT(release_request != NULL);
+        /* This pending-body-streaming request was never moved to the event-delivery queue,
+         * so its metrics were never finished. Finish them now. */
+        release_request->send_data.metrics =
+            s_s3_request_finish_up_and_release_metrics(release_request->send_data.metrics, meta_request);
         aws_s3_request_release(release_request);
     }
 
diff --git a/source/s3_util.c b/source/s3_util.c
index 1747a9524..69ac22100 100644
--- a/source/s3_util.c
+++ b/source/s3_util.c
@@ -636,6 +636,7 @@ int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor e
 void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request) {
     AWS_PRECONDITION(meta_request);
     AWS_PRECONDITION(request);
+    ASSERT_SYNCED_DATA_LOCK_HELD(meta_request);
 
     if (request->send_data.metrics != NULL) {
         /* Request is done, complete the metrics for the request now. */
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 04c8fb9b6..92b1ce6d8 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -49,8 +49,8 @@ add_net_test_case(test_s3_cancel_mpu_create_completed)
 add_net_test_case(test_s3_cancel_mpu_one_part_completed)
 add_net_test_case(test_s3_cancel_mpu_one_part_completed_async)
 add_net_test_case(test_s3_cancel_mpu_all_parts_completed)
-add_net_test_case(test_s3_cancel_mpu_ongoing_http_requests)
-add_net_test_case(test_s3_pause_mpu_ongoing_http_requests)
+add_net_test_case(test_s3_cancel_mpu_cancellable_requests)
+add_net_test_case(test_s3_pause_mpu_cancellable_requests)
 add_net_test_case(test_s3_cancel_mpd_nothing_sent)
 add_net_test_case(test_s3_cancel_mpd_one_part_sent)
 add_net_test_case(test_s3_cancel_mpd_one_part_completed)
@@ -59,6 +59,7 @@ add_net_test_case(test_s3_cancel_mpd_head_object_sent)
 add_net_test_case(test_s3_cancel_mpd_head_object_completed)
 add_net_test_case(test_s3_cancel_mpd_get_without_range_sent)
 add_net_test_case(test_s3_cancel_mpd_get_without_range_completed)
+add_net_test_case(test_s3_cancel_mpd_pending_streaming)
 add_net_test_case(test_s3_cancel_prepare)
 
 add_net_test_case(test_s3_get_object_tls_disabled)
diff --git a/tests/s3_cancel_tests.c b/tests/s3_cancel_tests.c
index c2e83f411..eb1a0ec23 100644
--- a/tests/s3_cancel_tests.c
+++ b/tests/s3_cancel_tests.c
@@ -30,6 +30,7 @@ enum s3_update_cancel_type {
     S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_SENT,
     S3_UPDATE_CANCEL_TYPE_MPD_ONE_PART_COMPLETED,
     S3_UPDATE_CANCEL_TYPE_MPD_TWO_PARTS_COMPLETED,
+    S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING,
 };
 
 struct s3_cancel_test_user_data {
@@ -78,7 +79,7 @@ static bool s_s3_meta_request_update_cancel_test(
             break;
 
         case S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS:
-            call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.ongoing_http_requests_list);
+            call_cancel_or_pause = !aws_linked_list_empty(&meta_request->synced_data.cancellable_http_streams_list);
             break;
 
         case S3_UPDATE_CANCEL_TYPE_NUM_MPU_CANCEL_TYPES:
@@ -122,6 +123,11 @@ static bool s_s3_meta_request_update_cancel_test(
             /* Prevent other parts from being queued while we wait for these two to complete. */
             block_update = !call_cancel_or_pause && auto_ranged_get->synced_data.num_parts_requested == 2;
             break;
+
+        case S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING:
+            call_cancel_or_pause =
+                aws_priority_queue_size(&meta_request->synced_data.pending_body_streaming_requests) > 0;
+            break;
     }
 
     aws_s3_meta_request_unlock_synced_data(meta_request);
@@ -299,7 +305,9 @@ static int s3_cancel_test_helper_ex(
             .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE,
             .get_options =
                 {
-                    .object_path = g_pre_existing_object_1MB,
+                    /* Note 1: 10MB object with 16KB parts, so that tests have many requests in-flight.
+                     * We want to try and stress stuff like parts arriving out of order. */
+                    .object_path = g_pre_existing_object_10MB,
                 },
         };
 
@@ -527,8 +535,8 @@ static int s_test_s3_cancel_mpu_all_parts_completed(struct aws_allocator *alloca
     return 0;
 }
 
-AWS_TEST_CASE(test_s3_cancel_mpu_ongoing_http_requests, s_test_s3_cancel_mpu_ongoing_http_requests)
-static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
+AWS_TEST_CASE(test_s3_cancel_mpu_cancellable_requests, s_test_s3_cancel_mpu_cancellable_requests)
+static int s_test_s3_cancel_mpu_cancellable_requests(struct aws_allocator *allocator, void *ctx) {
     (void)ctx;
 
     ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPU_ONGOING_HTTP_REQUESTS));
@@ -536,8 +544,8 @@ static int s_test_s3_cancel_mpu_ongoing_http_requests(struct aws_allocator *allo
     return 0;
 }
 
-AWS_TEST_CASE(test_s3_pause_mpu_ongoing_http_requests, s_test_s3_pause_mpu_ongoing_http_requests)
-static int s_test_s3_pause_mpu_ongoing_http_requests(struct aws_allocator *allocator, void *ctx) {
+AWS_TEST_CASE(test_s3_pause_mpu_cancellable_requests, s_test_s3_pause_mpu_cancellable_requests)
+static int s_test_s3_pause_mpu_cancellable_requests(struct aws_allocator *allocator, void *ctx) {
     (void)ctx;
 
     ASSERT_SUCCESS(s3_cancel_test_helper_ex(
@@ -618,6 +626,15 @@ static int s_test_s3_cancel_mpd_get_without_range_completed(struct aws_allocator
     return 0;
 }
 
+AWS_TEST_CASE(test_s3_cancel_mpd_pending_streaming, s_test_s3_cancel_mpd_pending_streaming)
+static int s_test_s3_cancel_mpd_pending_streaming(struct aws_allocator *allocator, void *ctx) {
+    (void)ctx;
+
+    ASSERT_SUCCESS(s3_cancel_test_helper(allocator, S3_UPDATE_CANCEL_TYPE_MPD_PENDING_STREAMING));
+
+    return 0;
+}
+
 struct test_s3_cancel_prepare_user_data {
     uint32_t request_prepare_counters[AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_MAX];
 };