Skip to content

Commit

Permalink
Fix CRT HTTP Client transcribe streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyRyabinin authored and sbiscigl committed Aug 30, 2024
1 parent 2331e28 commit 783ea4a
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@ static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient";
// Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model.
class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream {
public:
SDKAdaptingInputStream(const std::shared_ptr<Aws::Utils::RateLimits::RateLimiterInterface>& rateLimiter, std::shared_ptr<Aws::Crt::Io::IStream> stream,
const Aws::Http::HttpClient& client, const Aws::Http::HttpRequest& request, bool isStreaming,
Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()) noexcept :
Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter),
m_client(client), m_currentRequest(request), m_isStreaming(isStreaming), m_chunkEnd(false)
SDKAdaptingInputStream(const std::shared_ptr<Aws::Utils::RateLimits::RateLimiterInterface>& rateLimiter,
std::shared_ptr<Aws::Crt::Io::IStream> stream,
const Aws::Http::HttpClient& client,
const Aws::Http::HttpRequest& request,
bool isStreaming,
Aws::Crt::Allocator* allocator = Aws::Crt::ApiAllocator()) noexcept :
Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator),
m_rateLimiter(rateLimiter),
m_client(client),
m_currentRequest(request),
m_isStreaming(isStreaming),
m_chunkEnd(false)
{
}

protected:

bool ReadImpl(Aws::Crt::ByteBuf &buffer) noexcept override
Expand Down Expand Up @@ -56,7 +64,18 @@ class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream {

// now do the read. We may over read by an IO buffer size, but it's fine. The throttle will still
// kick-in in plenty of time.
bool retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer);
bool retValue = false;
if (!m_isStreaming)
{
retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer);
}
else
{
if (StdIOStreamInputStream::GetStatusImpl().is_valid && StdIOStreamInputStream::PeekImpl() != std::char_traits<char>::eof())
{
retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadSomeImpl(buffer);
}
}
size_t newPos = buffer.len;
AWS_ASSERT(newPos >= currentPos && "the buffer length should not have decreased in value.");

Expand Down Expand Up @@ -161,10 +180,10 @@ class AsyncWaiter
m_cvar.wait(uniqueLocker, [this](){return m_wakeupIntentional;});
}

bool WaitOnCompletionUntil(std::chrono::time_point<std::chrono::high_resolution_clock> until)
bool WaitOnCompletionFor(const size_t ms)
{
std::unique_lock<std::mutex> uniqueLocker(m_lock);
return m_cvar.wait_until(uniqueLocker, until, [this](){return m_wakeupIntentional;});
return m_cvar.wait_for(uniqueLocker, std::chrono::milliseconds(ms), [this](){return m_wakeupIntentional;});
}

private:
Expand Down Expand Up @@ -431,9 +450,14 @@ namespace Aws

// This will arrive at or around the same time as the headers. Use it to set the response code on the response
requestOptions.onIncomingHeadersBlockDone =
[response](Crt::Http::HttpStream& stream, enum aws_http_header_block block)
[request, response](Crt::Http::HttpStream& stream, enum aws_http_header_block block)
{
OnIncomingHeadersBlockDone(stream, block, response);
auto& headersHandler = request->GetHeadersReceivedEventHandler();
if (headersHandler)
{
headersHandler(request.get(), response.get());
}
};

// CRT client is async only so we'll need to do the synchronous part ourselves.
Expand Down Expand Up @@ -467,9 +491,7 @@ namespace Aws
// all that effort if that's the worst thing that can happen?
if (m_configuration.requestTimeoutMs > 0 )
{
auto requestExpiryTime = std::chrono::high_resolution_clock::now() +
std::chrono::milliseconds(m_configuration.requestTimeoutMs);
waiterTimedOut = !waiter.WaitOnCompletionUntil(requestExpiryTime);
waiterTimedOut = !waiter.WaitOnCompletionFor(m_configuration.requestTimeoutMs);

// if this is true, the waiter timed out without a terminal condition being woken up.
if (waiterTimedOut)
Expand Down

0 comments on commit 783ea4a

Please sign in to comment.