Skip to content

Commit

Permalink
FlowHttp: Support to download as bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
darklinkpower committed Dec 3, 2024
1 parent 2857508 commit 83dc13f
Showing 1 changed file with 107 additions and 37 deletions.
144 changes: 107 additions & 37 deletions source/Common/FlowHttp/HttpRequests/FlowHttpRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,33 @@ internal FlowHttpRequest(HttpClientFactory httpClientFactory) : base(httpClientF

}


internal HttpContentResult<string> DownloadString(CancellationToken cancellationToken = default, DownloadStateController downloadStateController = null, DownloadStateChangedCallback stateChangedCallback = null, DownloadProgressChangedCallback progressChangedCallback = null)
{
return Task.Run(() => DownloadStringAsync(cancellationToken, downloadStateController, stateChangedCallback, progressChangedCallback)).GetAwaiter().GetResult();
}

internal async Task<HttpContentResult<string>> DownloadStringAsync(CancellationToken cancellationToken = default, DownloadStateController downloadStateController = null, DownloadStateChangedCallback stateChangedCallback = null, DownloadProgressChangedCallback progressChangedCallback = null)
public async Task<HttpContentResult<string>> DownloadStringAsync(CancellationToken cancellationToken = default, DownloadStateController downloadStateController = null, DownloadStateChangedCallback stateChangedCallback = null, DownloadProgressChangedCallback progressChangedCallback = null)
{
return await DownloadContentAsync(ReadResponseString, cancellationToken, downloadStateController, stateChangedCallback, progressChangedCallback);
}

internal HttpContentResult<byte[]> DownloadBytes(CancellationToken cancellationToken = default, DownloadStateController downloadStateController = null, DownloadStateChangedCallback stateChangedCallback = null, DownloadProgressChangedCallback progressChangedCallback = null)
{
return Task.Run(() => DownloadBytesAsync(cancellationToken, downloadStateController, stateChangedCallback, progressChangedCallback)).GetAwaiter().GetResult();
}

public async Task<HttpContentResult<byte[]>> DownloadBytesAsync(CancellationToken cancellationToken = default, DownloadStateController downloadStateController = null, DownloadStateChangedCallback stateChangedCallback = null, DownloadProgressChangedCallback progressChangedCallback = null)
{
return await DownloadContentAsync(ReadResponseBytes, cancellationToken, downloadStateController, stateChangedCallback, progressChangedCallback);
}

protected async Task<HttpContentResult<T>> DownloadContentAsync<T>(
Func<HttpResponseMessage, CancellationToken, DownloadStateController, DownloadStateChangedCallback, DownloadProgressChangedCallback, Task<T>> readResponseFunc,
CancellationToken cancellationToken = default,
DownloadStateController downloadStateController = null,
DownloadStateChangedCallback stateChangedCallback = null,
DownloadProgressChangedCallback progressChangedCallback = null)
{
#if DEBUG
_logger.Info($"Starting download of url \"{_url}\"...");
Expand All @@ -41,25 +62,25 @@ internal async Task<HttpContentResult<string>> DownloadStringAsync(CancellationT
cts.CancelAfter(_timeout.Value);
}

StringContent stringContent = null;
StringContent content = null;
try
{
if (!string.IsNullOrEmpty(_content))
{
stringContent = new StringContent(_content, _contentEncoding, _contentMediaType);
content = new StringContent(_content, _contentEncoding, _contentMediaType);
}

using (var request = CreateRequest(_url, stringContent))
using (var request = CreateRequest(_url, content))
{
var httpClient = _httpClientFactory.GetClient(request);
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cts.Token))
{
httpStatusCode = response.StatusCode;
if (response.IsSuccessStatusCode)
{
var content = await ReadResponseContent(response, cts.Token, downloadStateController, stateChangedCallback, progressChangedCallback);
var result = await readResponseFunc(response, cancellationToken, downloadStateController, stateChangedCallback, progressChangedCallback);
OnDownloadStateChanged(stateChangedCallback, HttpRequestClientStatus.Completed);
return HttpContentResult<string>.Success(_url, content, httpStatusCode, response);
return HttpContentResult<T>.Success(_url, result, response.StatusCode, response);
}
}
}
Expand All @@ -78,19 +99,19 @@ internal async Task<HttpContentResult<string>> DownloadStringAsync(CancellationT
}
finally
{
stringContent?.Dispose();
content?.Dispose();
}

return HttpContentResult<string>.Failure(_url, error, httpStatusCode);
return HttpContentResult<T>.Failure(_url, error, httpStatusCode);
}
}

/// <summary>
/// Reads and retrieves the content from the response message using the retrieved encoding.
/// </summary>
/// <param name="response">The HttpResponseMessage containing the response content to read.</param>
/// <returns>The content as a string.</returns>
protected async Task<string> ReadResponseContent(HttpResponseMessage response, CancellationToken cancellationToken, DownloadStateController downloadStateController, DownloadStateChangedCallback stateChangedCallback, DownloadProgressChangedCallback progressChangedCallback)
private async Task<string> ReadResponseString(
HttpResponseMessage response,
CancellationToken cancellationToken,
DownloadStateController downloadStateController,
DownloadStateChangedCallback stateChangedCallback,
DownloadProgressChangedCallback progressChangedCallback)
{
var encoding = GetEncodingFromHeaders(response.Content.Headers);
var startTime = DateTime.Now;
Expand All @@ -109,38 +130,87 @@ protected async Task<string> ReadResponseContent(HttpResponseMessage response, C

while ((bytesRead = await streamReader.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var shouldPause = downloadStateController?.IsPaused() == true;
cancellationToken.ThrowIfCancellationRequested();
if (shouldPause)
{
OnDownloadStateChanged(stateChangedCallback, HttpRequestClientStatus.Paused);
await downloadStateController.PauseAsync();
cancellationToken.ThrowIfCancellationRequested();
OnDownloadStateChanged(stateChangedCallback, HttpRequestClientStatus.Downloading);
}

HandleDownloadProgress(ref contentProgressLength, totalContentLength, ref lastReportTime, ref lastTotalBytesRead, buffer.Length, downloadStateController, cancellationToken, stateChangedCallback, progressChangedCallback);
stringBuilder.Append(buffer, 0, bytesRead);
contentProgressLength += bytesRead;
if (progressChangedCallback != null)
{
var currentTime = DateTime.Now;
var isCompleted = contentProgressLength == totalContentLength;
if (isCompleted || currentTime - lastReportTime >= _progressReportInterval)
{
ReportProgress(progressChangedCallback, contentProgressLength, totalContentLength, startTime, currentTime, lastReportTime, lastTotalBytesRead);
lastReportTime = currentTime;
lastTotalBytesRead = contentProgressLength;
}
}
}
}
}

#if DEBUG
_logger.Info("Download completed successfully.");
_logger.Info("String download completed successfully.");
#endif

return stringBuilder.ToString();
}


private async Task<byte[]> ReadResponseBytes(
HttpResponseMessage response,
CancellationToken cancellationToken,
DownloadStateController downloadStateController,
DownloadStateChangedCallback stateChangedCallback,
DownloadProgressChangedCallback progressChangedCallback)
{
var totalContentLength = response.Content.Headers.ContentLength ?? 0;
var buffer = new byte[4096];
using (var contentStream = await response.Content.ReadAsStreamAsync())
{
using (var memoryStream = new MemoryStream())
{
int bytesRead;
long contentProgressLength = 0;
long lastTotalBytesRead = 0;
var startTime = DateTime.Now;
var lastReportTime = startTime;

while ((bytesRead = await contentStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
{
HandleDownloadProgress(ref contentProgressLength, totalContentLength, ref lastReportTime, ref lastTotalBytesRead, buffer.Length, downloadStateController, cancellationToken, stateChangedCallback, progressChangedCallback);
await memoryStream.WriteAsync(buffer, 0, bytesRead, cancellationToken);
}

#if DEBUG
_logger.Info("Byte download completed successfully.");
#endif
return memoryStream.ToArray();
}
}
}

private void HandleDownloadProgress(
ref long contentProgressLength,
long totalContentLength, ref DateTime lastReportTime,
ref long lastTotalBytesRead,
long bytesRead,
DownloadStateController downloadStateController,
CancellationToken cancellationToken,
DownloadStateChangedCallback stateChangedCallback,
DownloadProgressChangedCallback progressChangedCallback)
{
var shouldPause = downloadStateController?.IsPaused() == true;
cancellationToken.ThrowIfCancellationRequested();
if (shouldPause)
{
OnDownloadStateChanged(stateChangedCallback, HttpRequestClientStatus.Paused);
downloadStateController?.PauseAsync().GetAwaiter().GetResult();
cancellationToken.ThrowIfCancellationRequested();
OnDownloadStateChanged(stateChangedCallback, HttpRequestClientStatus.Downloading);
}

contentProgressLength += bytesRead;
if (progressChangedCallback != null)
{
var currentTime = DateTime.Now;
var isCompleted = contentProgressLength == totalContentLength;
if (isCompleted || currentTime - lastReportTime >= _progressReportInterval)
{
ReportProgress(progressChangedCallback, contentProgressLength, totalContentLength, DateTime.Now, currentTime, lastReportTime, lastTotalBytesRead);
lastReportTime = currentTime;
lastTotalBytesRead = contentProgressLength;
}
}
}


}
}

0 comments on commit 83dc13f

Please sign in to comment.