From f14d92f581161686dad97c82aa994862da195f96 Mon Sep 17 00:00:00 2001
From: Michael Graeb <graebm@amazon.com>
Date: Fri, 2 Jun 2023 16:51:34 -0700
Subject: [PATCH] Tests for async streaming (seems to work!) (#301)

- Add basic tests for async streaming of the request body.
- When async streaming, don't let more than 1 part at a time be in the "prepare" stage.
    -  since `aws_async_input_stream` doesn't allow overlapping read() calls.
- Stop wrapping synchronous streams in an async stream.
    - Now that we treat async streaming slightly different than synchronous, it's simpler to just have them be separate things.
---
 include/aws/s3/private/s3_meta_request_impl.h |   3 +-
 include/aws/s3/private/s3_request_messages.h  |  15 ++-
 source/s3_auto_ranged_put.c                   |  18 ++-
 source/s3_meta_request.c                      |  61 ++++++++--
 source/s3_request_messages.c                  |  86 +++++++-------
 tests/CMakeLists.txt                          |   3 +
 tests/s3_bad_input_stream.c                   |  70 -----------
 tests/s3_data_plane_tests.c                   |  93 ++++++++++++++-
 tests/s3_mock_server_tests.c                  |  18 ---
 tests/s3_tester.c                             | 112 +++++++++++++-----
 tests/s3_tester.h                             |  12 +-
 11 files changed, 297 insertions(+), 194 deletions(-)
 delete mode 100644 tests/s3_bad_input_stream.c

diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h
index 6c7b6ea19..b44414471 100644
--- a/include/aws/s3/private/s3_meta_request_impl.h
+++ b/include/aws/s3/private/s3_meta_request_impl.h
@@ -117,7 +117,8 @@ struct aws_s3_meta_request {
     /* Initial HTTP Message that this meta request is based on. */
     struct aws_http_message *initial_request_message;
 
-    /* Async stream for meta request's body */
+    /* Async stream for meta request's body.
+     * NULL if using initial_request_message's synchronous body stream instead.  */
     struct aws_async_input_stream *request_body_async_stream;
 
     /* Part size to use for uploads and downloads.  Passed down by the creating client. */
diff --git a/include/aws/s3/private/s3_request_messages.h b/include/aws/s3/private/s3_request_messages.h
index 7230edea5..c4fa65f96 100644
--- a/include/aws/s3/private/s3_request_messages.h
+++ b/include/aws/s3/private/s3_request_messages.h
@@ -37,6 +37,13 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he
     size_t excluded_headers_size,
     bool exclude_x_amz_meta);
 
+/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */
+AWS_S3_API
+struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers(
+    struct aws_allocator *allocator,
+    struct aws_http_message *message,
+    struct aws_byte_cursor filepath);
+
 /* Copy headers from one message to the other and exclude specific headers.
  * exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/
 AWS_S3_API
@@ -55,14 +62,6 @@ struct aws_input_stream *aws_s3_message_util_assign_body(
     const struct checksum_config *checksum_config,
     struct aws_byte_buf *out_checksum);
 
-/* Given all possible ways to send a request body, always return an async-stream.
- * Returns NULL on failure */
-struct aws_async_input_stream *aws_s3_message_util_acquire_async_body_stream(
-    struct aws_allocator *allocator,
-    struct aws_http_message *message,
-    struct aws_byte_cursor send_filepath,
-    struct aws_async_input_stream *send_async_stream);
-
 /* Return true if checksum headers has been set. */
 AWS_S3_API
 bool aws_s3_message_util_check_checksum_header(struct aws_http_message *message);
diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c
index 9bd38df2e..a8b3109b6 100644
--- a/source/s3_auto_ranged_put.c
+++ b/source/s3_auto_ranged_put.c
@@ -440,10 +440,17 @@ static void s_s3_meta_request_auto_ranged_put_destroy(struct aws_s3_meta_request
 static bool s_should_skip_scheduling_more_parts_based_on_flags(
     const struct aws_s3_auto_ranged_put *auto_ranged_put,
     uint32_t flags) {
-    if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) {
-        uint32_t num_parts_in_flight =
-            (auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_completed);
 
+    uint32_t num_parts_in_flight =
+        (auto_ranged_put->synced_data.num_parts_sent - auto_ranged_put->synced_data.num_parts_completed);
+
+    /* If the stream is actually async, only allow 1 part in flight at a time.
+     * We need to wait for async read() to complete before calling it again */
+    if (auto_ranged_put->base.request_body_async_stream != NULL) {
+        return num_parts_in_flight > 0;
+    }
+
+    if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) {
         /* Because uploads must read from their streams serially, we try to limit the amount of in flight
          * requests for a given multipart upload if we can. */
         return num_parts_in_flight > 0;
@@ -530,8 +537,7 @@ static bool s_s3_auto_ranged_put_update(
                     struct aws_string *etag = NULL;
 
                     if (!aws_array_list_get_at(&auto_ranged_put->synced_data.etag_list, &etag, etag_index) && etag) {
-                        /* part already downloaded, skip it here and prepare will take care of adjusting the buffer
-                         */
+                        /* part already downloaded, skip it here and prepare will take care of adjusting the buffer */
                         ++auto_ranged_put->threaded_update_data.next_part_number;
 
                     } else {
@@ -1668,7 +1674,7 @@ static void s_s3_auto_ranged_put_request_finished(
                 }
                 if (error_code == AWS_ERROR_SUCCESS && meta_request->progress_callback != NULL) {
                     struct aws_s3_meta_request_progress progress = {
-                        .bytes_transferred = meta_request->part_size,
+                        .bytes_transferred = request->request_body.len,
                         .content_length = auto_ranged_put->content_length,
                     };
                     meta_request->progress_callback(meta_request, &progress, meta_request->user_data);
diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c
index 73126b942..6ff017254 100644
--- a/source/s3_meta_request.c
+++ b/source/s3_meta_request.c
@@ -230,16 +230,24 @@ int aws_s3_meta_request_init_base(
         meta_request->cached_signing_config = aws_cached_signing_config_new(allocator, options->signing_config);
     }
 
-    /* Keep a reference to the original message structure passed in. */
-    meta_request->initial_request_message = aws_http_message_acquire(options->message);
-
-    /* There are several ways for the user to pass in the request's body.
-     * If something besides an aws_async_input_stream was passed in, create an
-     * async wrapper around it */
-    meta_request->request_body_async_stream = aws_s3_message_util_acquire_async_body_stream(
-        allocator, meta_request->initial_request_message, options->send_filepath, options->send_async_stream);
-    if (meta_request->request_body_async_stream == NULL) {
-        goto error;
+    /* Set initial_meta_request, based on how the request's body is being passed in
+     * (we checked earlier that it's not being passed multiple ways) */
+    if (options->send_filepath.len > 0) {
+        /* Create copy of original message, but with body-stream that reads directly from file */
+        meta_request->initial_request_message = aws_s3_message_util_copy_http_message_filepath_body_all_headers(
+            allocator, options->message, options->send_filepath);
+        if (meta_request->initial_request_message == NULL) {
+            goto error;
+        }
+
+    } else if (options->send_async_stream != NULL) {
+        /* Read from async body-stream, but keep original message around for headers, method, etc */
+        meta_request->request_body_async_stream = aws_async_input_stream_acquire(options->send_async_stream);
+        meta_request->initial_request_message = aws_http_message_acquire(options->message);
+
+    } else {
+        /* Keep original message around, we'll read from its synchronous body-stream */
+        meta_request->initial_request_message = aws_http_message_acquire(options->message);
     }
 
     /* Client is currently optional to allow spinning up a meta_request without a client in a test. */
@@ -1588,7 +1596,38 @@ struct aws_future_bool *aws_s3_meta_request_read_body(
     AWS_PRECONDITION(meta_request);
     AWS_PRECONDITION(buffer);
 
-    return aws_async_input_stream_read_to_fill(meta_request->request_body_async_stream, buffer);
+    /* If async-stream, simply call read_to_fill() */
+    if (meta_request->request_body_async_stream != NULL) {
+        return aws_async_input_stream_read_to_fill(meta_request->request_body_async_stream, buffer);
+    }
+
+    /* Else synchronous aws_input_stream */
+    struct aws_input_stream *synchronous_stream =
+        aws_http_message_get_body_stream(meta_request->initial_request_message);
+    AWS_FATAL_ASSERT(synchronous_stream);
+
+    struct aws_future_bool *synchronous_read_future = aws_future_bool_new(meta_request->allocator);
+
+    /* Keep calling read() until we fill the buffer, or hit EOF */
+    struct aws_stream_status status = {.is_end_of_stream = false, .is_valid = true};
+    while ((buffer->len < buffer->capacity) && !status.is_end_of_stream) {
+        /* Read from stream */
+        if (aws_input_stream_read(synchronous_stream, buffer) != AWS_OP_SUCCESS) {
+            aws_future_bool_set_error(synchronous_read_future, aws_last_error());
+            goto synchronous_read_done;
+        }
+
+        /* Check if stream is done */
+        if (aws_input_stream_get_status(synchronous_stream, &status) != AWS_OP_SUCCESS) {
+            aws_future_bool_set_error(synchronous_read_future, aws_last_error());
+            goto synchronous_read_done;
+        }
+    }
+
+    aws_future_bool_set_result(synchronous_read_future, status.is_end_of_stream);
+
+synchronous_read_done:
+    return synchronous_read_future;
 }
 
 bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request) {
diff --git a/source/s3_request_messages.c b/source/s3_request_messages.c
index d79ab4443..f9007507b 100644
--- a/source/s3_request_messages.c
+++ b/source/s3_request_messages.c
@@ -838,49 +838,6 @@ struct aws_input_stream *aws_s3_message_util_assign_body(
     return NULL;
 }
 
-/* Given all possible ways to send a request body, always return an async-stream */
-struct aws_async_input_stream *aws_s3_message_util_acquire_async_body_stream(
-    struct aws_allocator *allocator,
-    struct aws_http_message *message,
-    struct aws_byte_cursor send_filepath,
-    struct aws_async_input_stream *send_async_stream) {
-
-    AWS_PRECONDITION(message);
-
-    /* If user provides async-stream, use it */
-    if (send_async_stream != NULL) {
-        return aws_async_input_stream_acquire(send_async_stream);
-    }
-
-    /* If user provides filepath, create aws_input_stream to read it, and wrap that in an async-stream */
-    if (send_filepath.len > 0) {
-        struct aws_string *filepath_str = aws_string_new_from_cursor(allocator, &send_filepath);
-        struct aws_input_stream *body_stream =
-            aws_input_stream_new_from_file(allocator, aws_string_c_str(filepath_str));
-        aws_string_destroy(filepath_str);
-        if (body_stream == NULL) {
-            return NULL;
-        }
-        send_async_stream = aws_async_input_stream_new_from_synchronous(allocator, body_stream);
-        aws_input_stream_release(body_stream);
-        return send_async_stream;
-    }
-
-    /* If user provides HTTP message with aws_input_stream body, wrap that in an async stream */
-    struct aws_input_stream *request_body = aws_http_message_get_body_stream(message);
-    if (request_body) {
-        return aws_async_input_stream_new_from_synchronous(allocator, request_body);
-    }
-
-    /* Otherwise, no body provided, just create empty async-stream */
-    struct aws_byte_cursor empty_cursor = {0};
-    struct aws_input_stream *empty_stream = aws_input_stream_new_from_cursor(allocator, &empty_cursor);
-    AWS_ASSERT(empty_stream);
-    send_async_stream = aws_async_input_stream_new_from_synchronous(allocator, empty_stream);
-    aws_input_stream_release(empty_stream);
-    return send_async_stream;
-}
-
 bool aws_s3_message_util_check_checksum_header(struct aws_http_message *message) {
     struct aws_http_headers *headers = aws_http_message_get_headers(message);
     for (int algorithm = AWS_SCA_INIT; algorithm <= AWS_SCA_END; algorithm++) {
@@ -987,6 +944,49 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he
     return NULL;
 }
 
+/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */
+struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers(
+    struct aws_allocator *allocator,
+    struct aws_http_message *base_message,
+    struct aws_byte_cursor filepath) {
+
+    bool success = false;
+    struct aws_string *filepath_str = NULL;
+    struct aws_input_stream *body_stream = NULL;
+    struct aws_http_message *message = NULL;
+
+    /* Copy message and retain all headers */
+    message = aws_s3_message_util_copy_http_message_no_body_filter_headers(
+        allocator,
+        base_message,
+        NULL /*excluded_header_array*/,
+        0 /*excluded_header_array_size*/,
+        false /*exclude_x_amz_meta*/);
+    if (!message) {
+        goto clean_up;
+    }
+
+    /* Create body-stream that reads from file */
+    filepath_str = aws_string_new_from_cursor(allocator, &filepath);
+    body_stream = aws_input_stream_new_from_file(allocator, aws_string_c_str(filepath_str));
+    if (!body_stream) {
+        goto clean_up;
+    }
+    aws_http_message_set_body_stream(message, body_stream);
+
+    success = true;
+
+clean_up:
+    aws_string_destroy(filepath_str);
+    aws_input_stream_release(body_stream);
+    if (success) {
+        return message;
+    } else {
+        aws_http_message_release(message);
+        return NULL;
+    }
+}
+
 void aws_s3_message_util_copy_headers(
     struct aws_http_message *source_message,
     struct aws_http_message *dest_message,
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 5cda08cd5..b093d5449 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -109,6 +109,9 @@ add_net_test_case(test_s3_put_object_double_slashes)
 add_net_test_case(test_s3_put_object_no_content_length)
 add_net_test_case(test_s3_put_object_single_part_no_content_length)
 add_net_test_case(test_s3_put_object_zero_size_no_content_length)
+add_net_test_case(test_s3_put_object_async)
+add_net_test_case(test_s3_put_object_async_no_content_length)
+add_net_test_case(test_s3_put_object_async_fail_reading)
 
 if(ENABLE_MRAP_TESTS)
     add_net_test_case(test_s3_get_object_less_than_part_size_mrap)
diff --git a/tests/s3_bad_input_stream.c b/tests/s3_bad_input_stream.c
deleted file mode 100644
index ed81e7b20..000000000
--- a/tests/s3_bad_input_stream.c
+++ /dev/null
@@ -1,70 +0,0 @@
-#include "s3_tester.h"
-#include <aws/io/stream.h>
-
-struct aws_s3_bad_input_stream_impl {
-    struct aws_input_stream base;
-    size_t length;
-    struct aws_allocator *allocator;
-};
-
-static int s_aws_s3_bad_input_stream_seek(
-    struct aws_input_stream *stream,
-    aws_off_t offset,
-    enum aws_stream_seek_basis basis) {
-    (void)stream;
-    (void)offset;
-    (void)basis;
-    aws_raise_error(AWS_ERROR_UNKNOWN);
-    return AWS_OP_ERR;
-}
-
-static int s_aws_s3_bad_input_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
-    (void)stream;
-    (void)dest;
-    aws_raise_error(AWS_IO_STREAM_READ_FAILED);
-    return AWS_OP_ERR;
-}
-
-static int s_aws_s3_bad_input_stream_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
-    (void)stream;
-    (void)status;
-    aws_raise_error(AWS_ERROR_UNKNOWN);
-    return AWS_OP_ERR;
-}
-
-static int s_aws_s3_bad_input_stream_get_length(struct aws_input_stream *stream, int64_t *out_length) {
-    AWS_ASSERT(stream != NULL);
-    struct aws_s3_bad_input_stream_impl *bad_input_stream =
-        AWS_CONTAINER_OF(stream, struct aws_s3_bad_input_stream_impl, base);
-    *out_length = (int64_t)bad_input_stream->length;
-    return AWS_OP_SUCCESS;
-}
-
-static void s_aws_s3_bad_input_stream_destroy(struct aws_s3_bad_input_stream_impl *bad_input_stream) {
-    aws_mem_release(bad_input_stream->allocator, bad_input_stream);
-}
-
-static struct aws_input_stream_vtable s_aws_s3_bad_input_stream_vtable = {
-    .seek = s_aws_s3_bad_input_stream_seek,
-    .read = s_aws_s3_bad_input_stream_read,
-    .get_status = s_aws_s3_bad_input_stream_get_status,
-    .get_length = s_aws_s3_bad_input_stream_get_length,
-};
-
-struct aws_input_stream *aws_s3_bad_input_stream_new(struct aws_allocator *allocator, size_t stream_length) {
-
-    struct aws_s3_bad_input_stream_impl *bad_input_stream =
-        aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_bad_input_stream_impl));
-    bad_input_stream->base.vtable = &s_aws_s3_bad_input_stream_vtable;
-    aws_ref_count_init(
-        &bad_input_stream->base.ref_count,
-        bad_input_stream,
-        (aws_simple_completion_callback *)s_aws_s3_bad_input_stream_destroy);
-
-    struct aws_input_stream *input_stream = &bad_input_stream->base;
-
-    bad_input_stream->length = stream_length;
-    bad_input_stream->allocator = allocator;
-
-    return input_stream;
-}
diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c
index ce2c23db1..49058351a 100644
--- a/tests/s3_data_plane_tests.c
+++ b/tests/s3_data_plane_tests.c
@@ -23,6 +23,7 @@
 #include <aws/io/tls_channel_handler.h>
 #include <aws/io/uri.h>
 #include <aws/testing/aws_test_harness.h>
+#include <aws/testing/stream_tester.h>
 #include <inttypes.h>
 
 AWS_TEST_CASE(test_s3_client_create_destroy, s_test_s3_client_create_destroy)
@@ -2244,6 +2245,81 @@ static int s_test_s3_put_object_zero_size_no_content_length(struct aws_allocator
     return 0;
 }
 
+AWS_TEST_CASE(test_s3_put_object_async, s_test_s3_put_object_async)
+static int s_test_s3_put_object_async(struct aws_allocator *allocator, void *ctx) {
+    (void)ctx;
+
+    struct aws_s3_meta_request_test_results test_results;
+    aws_s3_meta_request_test_results_init(&test_results, allocator);
+
+    struct aws_s3_tester_meta_request_options put_options = {
+        .allocator = allocator,
+        .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
+        .put_options =
+            {
+                .object_size_mb = 10,
+                .async_input_stream = true,
+            },
+    };
+    ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results));
+
+    ASSERT_UINT_EQUALS(MB_TO_BYTES(10), aws_atomic_load_int(&test_results.total_bytes_uploaded));
+
+    aws_s3_meta_request_test_results_clean_up(&test_results);
+    return 0;
+}
+
+AWS_TEST_CASE(test_s3_put_object_async_no_content_length, s_test_s3_put_object_async_no_content_length)
+static int s_test_s3_put_object_async_no_content_length(struct aws_allocator *allocator, void *ctx) {
+    (void)ctx;
+
+    struct aws_s3_meta_request_test_results test_results;
+    aws_s3_meta_request_test_results_init(&test_results, allocator);
+
+    struct aws_s3_tester_meta_request_options put_options = {
+        .allocator = allocator,
+        .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
+        .put_options =
+            {
+                .object_size_mb = 10,
+                .async_input_stream = true,
+                .skip_content_length = true,
+            },
+    };
+    ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results));
+
+    ASSERT_UINT_EQUALS(MB_TO_BYTES(10), aws_atomic_load_int(&test_results.total_bytes_uploaded));
+
+    aws_s3_meta_request_test_results_clean_up(&test_results);
+    return 0;
+}
+
+AWS_TEST_CASE(test_s3_put_object_async_fail_reading, s_test_s3_put_object_async_fail_reading)
+static int s_test_s3_put_object_async_fail_reading(struct aws_allocator *allocator, void *ctx) {
+    (void)ctx;
+
+    struct aws_s3_meta_request_test_results test_results;
+    aws_s3_meta_request_test_results_init(&test_results, allocator);
+
+    struct aws_s3_tester_meta_request_options put_options = {
+        .allocator = allocator,
+        .meta_request_type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT,
+        .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE,
+        .put_options =
+            {
+                .object_size_mb = 10,
+                .async_input_stream = true,
+                .invalid_input_stream = true,
+            },
+    };
+    ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(NULL, &put_options, &test_results));
+
+    ASSERT_INT_EQUALS(AWS_IO_STREAM_READ_FAILED, test_results.finished_error_code);
+
+    aws_s3_meta_request_test_results_clean_up(&test_results);
+    return 0;
+}
+
 AWS_TEST_CASE(test_s3_put_object_sse_kms, s_test_s3_put_object_sse_kms)
 static int s_test_s3_put_object_sse_kms(struct aws_allocator *allocator, void *ctx) {
     (void)ctx;
@@ -3422,6 +3498,9 @@ static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, v
     struct aws_s3_client *client = NULL;
     ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client));
 
+    struct aws_s3_meta_request_test_results test_results;
+    aws_s3_meta_request_test_results_init(&test_results, allocator);
+
     /*** PUT FILE ***/
 
     struct aws_byte_buf path_buf;
@@ -3447,6 +3526,8 @@ static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, v
     ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, NULL));
 
     /*** GET FILE ***/
+    aws_s3_meta_request_test_results_clean_up(&test_results);
+    aws_s3_meta_request_test_results_init(&test_results, allocator);
 
     struct aws_s3_tester_meta_request_options get_options = {
         .allocator = allocator,
@@ -3458,9 +3539,11 @@ static int s_test_s3_round_trip_with_filepath(struct aws_allocator *allocator, v
                 .object_path = object_path,
             },
     };
+    ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &get_options, &test_results));
 
-    ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &get_options, NULL));
+    ASSERT_UINT_EQUALS(MB_TO_BYTES(put_options.put_options.object_size_mb), test_results.received_body_size);
 
+    aws_s3_meta_request_test_results_clean_up(&test_results);
     aws_byte_buf_clean_up(&path_buf);
     aws_s3_client_release(client);
     aws_s3_tester_clean_up(&tester);
@@ -6054,8 +6137,12 @@ static int s_test_s3_put_pause_resume_invalid_resume_stream(struct aws_allocator
     aws_input_stream_release(initial_upload_stream);
 
     /* a bad input stream to resume from */
-    struct aws_input_stream *resume_upload_stream =
-        aws_s3_bad_input_stream_new(allocator, s_pause_resume_object_length_128MB);
+    struct aws_input_stream_tester_options stream_options = {
+        .autogen_length = s_pause_resume_object_length_128MB,
+        .fail_on_nth_read = 1,
+        .fail_with_error_code = AWS_IO_STREAM_READ_FAILED,
+    };
+    struct aws_input_stream *resume_upload_stream = aws_input_stream_new_tester(allocator, &stream_options);
 
     struct aws_s3_meta_request_resume_token *persistable_state = aws_atomic_load_ptr(&test_data.persistable_state_ptr);
 
diff --git a/tests/s3_mock_server_tests.c b/tests/s3_mock_server_tests.c
index 57d0bf4c1..8603a7f76 100644
--- a/tests/s3_mock_server_tests.c
+++ b/tests/s3_mock_server_tests.c
@@ -531,21 +531,6 @@ TEST_CASE(upload_part_invalid_response_mock_server) {
     return AWS_OP_SUCCESS;
 }
 
-static void s_resume_meta_request_progress(
-    struct aws_s3_meta_request *meta_request,
-    const struct aws_s3_meta_request_progress *progress,
-    void *user_data) {
-
-    (void)meta_request;
-    AWS_ASSERT(meta_request);
-    AWS_ASSERT(progress);
-    AWS_ASSERT(user_data);
-
-    struct aws_s3_meta_request_test_results *out_results = user_data;
-
-    aws_atomic_fetch_add(&out_results->total_bytes_uploaded, (size_t)progress->bytes_transferred);
-}
-
 /* Fake a MPU with 4 parts and the 2nd and 3rd have already completed and resume works fine */
 TEST_CASE(resume_first_part_not_completed_mock_server) {
     (void)ctx;
@@ -587,7 +572,6 @@ TEST_CASE(resume_first_part_not_completed_mock_server) {
     };
     struct aws_s3_meta_request_test_results out_results;
     aws_s3_meta_request_test_results_init(&out_results, allocator);
-    out_results.progress_callback = s_resume_meta_request_progress;
 
     ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results));
     /* Make Sure we only uploaded 2 parts. */
@@ -645,7 +629,6 @@ TEST_CASE(resume_mutli_page_list_parts_mock_server) {
     };
     struct aws_s3_meta_request_test_results out_results;
     aws_s3_meta_request_test_results_init(&out_results, allocator);
-    out_results.progress_callback = s_resume_meta_request_progress;
 
     ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results));
     /* Make Sure we only uploaded 2 parts. */
@@ -754,7 +737,6 @@ TEST_CASE(resume_after_finished_mock_server) {
     };
     struct aws_s3_meta_request_test_results out_results;
     aws_s3_meta_request_test_results_init(&out_results, allocator);
-    out_results.progress_callback = s_resume_meta_request_progress;
 
     ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &put_options, &out_results));
     /* The error code should be success, but there are no headers and stuff as no request was made. */
diff --git a/tests/s3_tester.c b/tests/s3_tester.c
index 37f41a35d..e73a4a72f 100644
--- a/tests/s3_tester.c
+++ b/tests/s3_tester.c
@@ -20,7 +20,9 @@
 #include <aws/io/host_resolver.h>
 #include <aws/io/stream.h>
 #include <aws/io/tls_channel_handler.h>
+#include <aws/testing/async_stream_tester.h>
 #include <aws/testing/aws_test_harness.h>
+#include <aws/testing/stream_tester.h>
 #include <inttypes.h>
 #include <stdlib.h>
 #include <time.h>
@@ -231,6 +233,25 @@ static void s_s3_test_meta_request_telemetry(
     aws_s3_tester_unlock_synced_data(tester);
 }
 
+static void s_s3_test_meta_request_progress(
+    struct aws_s3_meta_request *meta_request,
+    const struct aws_s3_meta_request_progress *progress,
+    void *user_data) {
+
+    (void)meta_request;
+    AWS_ASSERT(meta_request);
+    AWS_ASSERT(progress);
+    AWS_ASSERT(user_data);
+
+    struct aws_s3_meta_request_test_results *meta_request_test_results = user_data;
+
+    aws_atomic_fetch_add(&meta_request_test_results->total_bytes_uploaded, (size_t)progress->bytes_transferred);
+
+    if (meta_request_test_results->progress_callback != NULL) {
+        meta_request_test_results->progress_callback(meta_request, progress, user_data);
+    }
+}
+
 /* Notify the tester that a particular clean up step has finished. */
 static void s_s3_test_client_shutdown(void *user_data);
 
@@ -400,7 +421,8 @@ int aws_s3_tester_bind_meta_request(
     ASSERT_TRUE(options->telemetry_callback == NULL);
     options->telemetry_callback = s_s3_test_meta_request_telemetry;
 
-    options->progress_callback = meta_request_test_results->progress_callback;
+    ASSERT_TRUE(options->progress_callback == NULL);
+    options->progress_callback = s_s3_test_meta_request_progress;
 
     ASSERT_TRUE(options->user_data == NULL);
     options->user_data = meta_request_test_results;
@@ -1326,6 +1348,7 @@ int aws_s3_tester_send_meta_request_with_options(
     AWS_ZERO_STRUCT(input_stream_buffer);
 
     struct aws_input_stream *input_stream = NULL;
+    struct aws_async_input_stream *async_stream = NULL;
 
     if (meta_request_options.message == NULL) {
         const struct aws_byte_cursor *bucket_name = options->bucket_name;
@@ -1382,12 +1405,6 @@ int aws_s3_tester_send_meta_request_with_options(
                 ASSERT_TRUE(object_size_bytes > client->part_size);
             }
 
-            if (options->put_options.invalid_input_stream) {
-                input_stream = aws_s3_bad_input_stream_new(allocator, object_size_bytes);
-            } else {
-                input_stream = aws_s3_test_input_stream_new(allocator, object_size_bytes);
-            }
-
             struct aws_byte_buf object_path_buffer;
             aws_byte_buf_init(&object_path_buffer, allocator, 128);
 
@@ -1440,33 +1457,33 @@ int aws_s3_tester_send_meta_request_with_options(
 
             struct aws_byte_cursor test_object_path = aws_byte_cursor_from_buf(&object_path_buffer);
             struct aws_byte_cursor host_cur = aws_byte_cursor_from_string(host_name);
-            /* Put together a simple S3 Put Object request. */
-            struct aws_http_message *message = aws_s3_test_put_object_request_new(
-                allocator, &host_cur, test_object_path, g_test_body_content_type, input_stream, options->sse_type);
-
-            if (options->put_options.content_length) {
-                /* make a invalid request */
-                char content_length_buffer[64] = "";
-                snprintf(
-                    content_length_buffer, sizeof(content_length_buffer), "%zu", options->put_options.content_length);
 
-                struct aws_http_headers *headers = aws_http_message_get_headers(message);
-                aws_http_headers_set(
-                    headers, g_content_length_header_name, aws_byte_cursor_from_c_str(content_length_buffer));
+            /* Create "tester" stream with appropriate options */
+            struct aws_async_input_stream_tester_options stream_options = {
+                .base =
+                    {
+                        .autogen_length = object_size_bytes,
+                    },
+            };
+            if (options->put_options.invalid_input_stream) {
+                stream_options.base.fail_on_nth_read = 1;
+                stream_options.base.fail_with_error_code = AWS_IO_STREAM_READ_FAILED;
             }
 
-            if (options->put_options.skip_content_length) {
-                struct aws_http_headers *headers = aws_http_message_get_headers(message);
-                aws_http_headers_erase(headers, g_content_length_header_name);
-            }
+            if (options->put_options.async_input_stream) {
+                stream_options.completion_strategy = AWS_ASYNC_READ_COMPLETES_ON_ANOTHER_THREAD;
 
-            if (options->put_options.invalid_request) {
-                /* make a invalid request */
-                aws_http_message_set_request_path(message, aws_byte_cursor_from_c_str("invalid_path"));
+                async_stream = aws_async_input_stream_new_tester(allocator, &stream_options);
+                ASSERT_NOT_NULL(async_stream);
+                meta_request_options.send_async_stream = async_stream;
+            } else {
+                input_stream = aws_input_stream_new_tester(allocator, &stream_options.base);
+                ASSERT_NOT_NULL(input_stream);
             }
 
+            /* if uploading via filepath, write input_stream out as tmp file on disk, and then upload that */
             if (options->put_options.file_on_disk) {
-                /* write input_stream to a tmp file on disk */
+                ASSERT_NOT_NULL(input_stream);
                 struct aws_byte_buf filepath_buf;
                 aws_byte_buf_init(&filepath_buf, allocator, 128);
                 struct aws_byte_cursor filepath_prefix = aws_byte_cursor_from_c_str("tmp");
@@ -1497,7 +1514,43 @@ int aws_s3_tester_send_meta_request_with_options(
 
                 /* use filepath instead of input_stream */
                 meta_request_options.send_filepath = aws_byte_cursor_from_string(filepath_str);
-                aws_http_message_set_body_stream(message, NULL);
+                input_stream = aws_input_stream_release(input_stream);
+            }
+
+            /* Put together a simple S3 Put Object request. */
+            struct aws_http_message *message;
+            if (input_stream != NULL) {
+                message = aws_s3_test_put_object_request_new(
+                    allocator, &host_cur, test_object_path, g_test_body_content_type, input_stream, options->sse_type);
+            } else {
+                message = aws_s3_test_put_object_request_new_without_body(
+                    allocator,
+                    &host_cur,
+                    g_test_body_content_type,
+                    test_object_path,
+                    object_size_bytes,
+                    options->sse_type);
+            }
+
+            if (options->put_options.content_length) {
+                /* make a invalid request */
+                char content_length_buffer[64] = "";
+                snprintf(
+                    content_length_buffer, sizeof(content_length_buffer), "%zu", options->put_options.content_length);
+
+                struct aws_http_headers *headers = aws_http_message_get_headers(message);
+                aws_http_headers_set(
+                    headers, g_content_length_header_name, aws_byte_cursor_from_c_str(content_length_buffer));
+            }
+
+            if (options->put_options.skip_content_length) {
+                struct aws_http_headers *headers = aws_http_message_get_headers(message);
+                aws_http_headers_erase(headers, g_content_length_header_name);
+            }
+
+            if (options->put_options.invalid_request) {
+                /* make a invalid request */
+                aws_http_message_set_request_path(message, aws_byte_cursor_from_c_str("invalid_path"));
             }
 
             if (options->put_options.content_encoding.ptr != NULL) {
@@ -1529,6 +1582,7 @@ int aws_s3_tester_send_meta_request_with_options(
     out_results->headers_callback = options->headers_callback;
     out_results->body_callback = options->body_callback;
     out_results->finish_callback = options->finish_callback;
+    out_results->progress_callback = options->progress_callback;
 
     out_results->algorithm = options->expected_validate_checksum_alg;
 
@@ -1589,6 +1643,8 @@ int aws_s3_tester_send_meta_request_with_options(
     aws_input_stream_release(input_stream);
     input_stream = NULL;
 
+    async_stream = aws_async_input_stream_release(async_stream);
+
     aws_byte_buf_clean_up(&input_stream_buffer);
 
     if (clean_up_local_tester) {
diff --git a/tests/s3_tester.h b/tests/s3_tester.h
index 5d4afa1ce..270c66a10 100644
--- a/tests/s3_tester.h
+++ b/tests/s3_tester.h
@@ -161,6 +161,7 @@ struct aws_s3_tester_meta_request_options {
     aws_s3_meta_request_headers_callback_fn *headers_callback;
     aws_s3_meta_request_receive_body_callback_fn *body_callback;
     aws_s3_meta_request_finish_fn *finish_callback;
+    aws_s3_meta_request_progress_fn *progress_callback;
 
     /* Default Meta Request specific options. */
     struct {
@@ -178,12 +179,12 @@ struct aws_s3_tester_meta_request_options {
         struct aws_byte_cursor object_path_override;
         uint32_t object_size_mb;
         bool ensure_multipart;
+        bool async_input_stream; /* send via async stream */
+        bool file_on_disk;       /* write to file on disk, then send via aws_s3_meta_request_options.send_filepath */
         bool invalid_request;
         bool invalid_input_stream;
         bool valid_md5;
         bool invalid_md5;
-        /* write file to desk, and then send via aws_s3_meta_request_options.send_filepath */
-        bool file_on_disk;
         struct aws_s3_meta_request_resume_token *resume_token;
         /* manually overwrite the content length for some invalid input stream */
         size_t content_length;
@@ -206,7 +207,6 @@ struct aws_s3_meta_request_test_results {
     aws_s3_meta_request_headers_callback_fn *headers_callback;
     aws_s3_meta_request_receive_body_callback_fn *body_callback;
     aws_s3_meta_request_finish_fn *finish_callback;
-    aws_s3_meta_request_shutdown_fn *shutdown_callback;
     aws_s3_meta_request_progress_fn *progress_callback;
 
     struct aws_http_headers *error_response_headers;
@@ -223,7 +223,9 @@ struct aws_s3_meta_request_test_results {
     int finished_error_code;
     enum aws_s3_checksum_algorithm algorithm;
 
-    /* accumulator of amount of bytes uploaded */
+    /* accumulator of amount of bytes uploaded.
+     * Currently, this only works for MPU and Copy meta-requests.
+     * It's powered by the progress_callback which isn't invoked for all types */
     struct aws_atomic_var total_bytes_uploaded;
 
     /* Protected the tester->synced_data.lock */
@@ -411,8 +413,6 @@ enum aws_s3_test_stream_value {
     TEST_STREAM_VALUE_2,
 };
 
-struct aws_input_stream *aws_s3_bad_input_stream_new(struct aws_allocator *allocator, size_t length);
-
 struct aws_input_stream *aws_s3_test_input_stream_new(struct aws_allocator *allocator, size_t length);
 
 struct aws_input_stream *aws_s3_test_input_stream_new_with_value_type(