Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CRT HTTP Client transcribe streaming #3097

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@ static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient";
// Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model.
class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream {
public:
SDKAdaptingInputStream(const std::shared_ptr<Aws::Utils::RateLimits::RateLimiterInterface>& rateLimiter, std::shared_ptr<Aws::Crt::Io::IStream> stream,
const Aws::Http::HttpClient& client, const Aws::Http::HttpRequest& request, bool isStreaming,
Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()) noexcept :
Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter),
m_client(client), m_currentRequest(request), m_isStreaming(isStreaming), m_chunkEnd(false)
SDKAdaptingInputStream(const std::shared_ptr<Aws::Utils::RateLimits::RateLimiterInterface>& rateLimiter,
std::shared_ptr<Aws::Crt::Io::IStream> stream,
const Aws::Http::HttpClient& client,
const Aws::Http::HttpRequest& request,
bool isStreaming,
Aws::Crt::Allocator* allocator = Aws::Crt::ApiAllocator()) noexcept :
Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator),
m_rateLimiter(rateLimiter),
m_client(client),
m_currentRequest(request),
m_isStreaming(isStreaming),
m_chunkEnd(false)
{
}

protected:

bool ReadImpl(Aws::Crt::ByteBuf &buffer) noexcept override
Expand Down Expand Up @@ -56,7 +64,18 @@ class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream {

// now do the read. We may over read by an IO buffer size, but it's fine. The throttle will still
// kick-in in plenty of time.
bool retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer);
bool retValue = false;
if (!m_isStreaming)
{
retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer);
}
else
{
if (StdIOStreamInputStream::GetStatusImpl().is_valid && StdIOStreamInputStream::PeekImpl() != std::char_traits<char>::eof())
{
retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadSomeImpl(buffer);
}
}
size_t newPos = buffer.len;
AWS_ASSERT(newPos >= currentPos && "the buffer length should not have decreased in value.");

Expand Down Expand Up @@ -161,10 +180,10 @@ class AsyncWaiter
m_cvar.wait(uniqueLocker, [this](){return m_wakeupIntentional;});
}

bool WaitOnCompletionUntil(std::chrono::time_point<std::chrono::high_resolution_clock> until)
bool WaitOnCompletionFor(const size_t ms)
{
std::unique_lock<std::mutex> uniqueLocker(m_lock);
return m_cvar.wait_until(uniqueLocker, until, [this](){return m_wakeupIntentional;});
return m_cvar.wait_for(uniqueLocker, std::chrono::milliseconds(ms), [this](){return m_wakeupIntentional;});
}

private:
Expand Down Expand Up @@ -431,9 +450,14 @@ namespace Aws

// This will arrive at or around the same time as the headers. Use it to set the response code on the response
requestOptions.onIncomingHeadersBlockDone =
[response](Crt::Http::HttpStream& stream, enum aws_http_header_block block)
[request, response](Crt::Http::HttpStream& stream, enum aws_http_header_block block)
{
OnIncomingHeadersBlockDone(stream, block, response);
auto& headersHandler = request->GetHeadersReceivedEventHandler();
if (headersHandler)
{
headersHandler(request.get(), response.get());
}
};

// CRT client is async only so we'll need to do the synchronous part ourselves.
Expand Down Expand Up @@ -467,9 +491,7 @@ namespace Aws
// all that effort if that's the worst thing that can happen?
if (m_configuration.requestTimeoutMs > 0 )
{
auto requestExpiryTime = std::chrono::high_resolution_clock::now() +
std::chrono::milliseconds(m_configuration.requestTimeoutMs);
waiterTimedOut = !waiter.WaitOnCompletionUntil(requestExpiryTime);
waiterTimedOut = !waiter.WaitOnCompletionFor(m_configuration.requestTimeoutMs);

// if this is true, the waiter timed out without a terminal condition being woken up.
if (waiterTimedOut)
Expand Down
Loading