From 783ea4a57fb8f7855cc12033457414da97987a43 Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Wed, 28 Aug 2024 18:36:48 +0000 Subject: [PATCH] Fix CRT HTTP Client transcribe streaming --- .../source/http/crt/CRTHttpClient.cpp | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index 168a1d34a74..6f968f862bc 100644 --- a/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -20,13 +20,21 @@ static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient"; // Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model. class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream { public: - SDKAdaptingInputStream(const std::shared_ptr& rateLimiter, std::shared_ptr stream, - const Aws::Http::HttpClient& client, const Aws::Http::HttpRequest& request, bool isStreaming, - Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()) noexcept : - Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter), - m_client(client), m_currentRequest(request), m_isStreaming(isStreaming), m_chunkEnd(false) + SDKAdaptingInputStream(const std::shared_ptr& rateLimiter, + std::shared_ptr stream, + const Aws::Http::HttpClient& client, + const Aws::Http::HttpRequest& request, + bool isStreaming, + Aws::Crt::Allocator* allocator = Aws::Crt::ApiAllocator()) noexcept : + Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), + m_rateLimiter(rateLimiter), + m_client(client), + m_currentRequest(request), + m_isStreaming(isStreaming), + m_chunkEnd(false) { } + protected: bool ReadImpl(Aws::Crt::ByteBuf &buffer) noexcept override @@ -56,7 +64,18 @@ class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream { // now do the read. We may over read by an IO buffer size, but it's fine. The throttle will still // kick-in in plenty of time. - bool retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer); + bool retValue = false; + if (!m_isStreaming) + { + retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer); + } + else + { + if (StdIOStreamInputStream::GetStatusImpl().is_valid && StdIOStreamInputStream::PeekImpl() != std::char_traits::eof()) + { + retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadSomeImpl(buffer); + } + } size_t newPos = buffer.len; AWS_ASSERT(newPos >= currentPos && "the buffer length should not have decreased in value."); @@ -161,10 +180,10 @@ class AsyncWaiter m_cvar.wait(uniqueLocker, [this](){return m_wakeupIntentional;}); } - bool WaitOnCompletionUntil(std::chrono::time_point until) + bool WaitOnCompletionFor(const size_t ms) { std::unique_lock uniqueLocker(m_lock); - return m_cvar.wait_until(uniqueLocker, until, [this](){return m_wakeupIntentional;}); + return m_cvar.wait_for(uniqueLocker, std::chrono::milliseconds(ms), [this](){return m_wakeupIntentional;}); } private: @@ -431,9 +450,14 @@ namespace Aws // This will arrive at or around the same time as the headers. Use it to set the response code on the response requestOptions.onIncomingHeadersBlockDone = - [response](Crt::Http::HttpStream& stream, enum aws_http_header_block block) + [request, response](Crt::Http::HttpStream& stream, enum aws_http_header_block block) { OnIncomingHeadersBlockDone(stream, block, response); + auto& headersHandler = request->GetHeadersReceivedEventHandler(); + if (headersHandler) + { + headersHandler(request.get(), response.get()); + } }; // CRT client is async only so we'll need to do the synchronous part ourselves. @@ -467,9 +491,7 @@ namespace Aws // all that effort if that's the worst thing that can happen? if (m_configuration.requestTimeoutMs > 0 ) { - auto requestExpiryTime = std::chrono::high_resolution_clock::now() + - std::chrono::milliseconds(m_configuration.requestTimeoutMs); - waiterTimedOut = !waiter.WaitOnCompletionUntil(requestExpiryTime); + waiterTimedOut = !waiter.WaitOnCompletionFor(m_configuration.requestTimeoutMs); // if this is true, the waiter timed out without a terminal condition being woken up. if (waiterTimedOut)