Skip to content

Commit

Permalink
Updated GetFileStreamAsync impl
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Jan 4, 2024
1 parent d8478ec commit 594e94b
Showing 1 changed file with 34 additions and 26 deletions.
60 changes: 34 additions & 26 deletions src/Foundatio.Aliyun/Storage/AliyunFileStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand All @@ -23,17 +23,17 @@ public class AliyunFileStorage : IFileStorage {
public AliyunFileStorage(AliyunFileStorageOptions options) {
if (options == null)
throw new ArgumentNullException(nameof(options));

_serializer = options.Serializer ?? DefaultSerializer.Instance;
_logger = options.LoggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance;

var connectionString = new AliyunFileStorageConnectionStringBuilder(options.ConnectionString);
_client = new OssClient(connectionString.Endpoint, connectionString.AccessKey, connectionString.SecretKey);

_bucket = connectionString.Bucket;
if (DoesBucketExist(_bucket))
if (DoesBucketExist(_bucket))
return;

_logger.LogInformation("Creating {Bucket}", _bucket);
_client.CreateBucket(_bucket);
_logger.LogInformation("Created {Bucket}", _bucket);
Expand All @@ -45,19 +45,27 @@ public AliyunFileStorage(Builder<AliyunFileStorageOptionsBuilder, AliyunFileStor
ISerializer IHaveSerializer.Serializer => _serializer;
public OssClient Client => _client;

public async Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default) {
[Obsolete($"Use {nameof(GetFileStreamAsync)} with {nameof(FileAccess)} instead to define read or write behaviour of stream")]
public Task<Stream> GetFileStreamAsync(string path, CancellationToken cancellationToken = default)
=> GetFileStreamAsync(path, StreamMode.Read, cancellationToken);

public async Task<Stream> GetFileStreamAsync(string path, StreamMode streamMode, CancellationToken cancellationToken = default)

Check failure on line 52 in src/Foundatio.Aliyun/Storage/AliyunFileStorage.cs

View workflow job for this annotation

GitHub Actions / build / build

The type or namespace name 'StreamMode' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 52 in src/Foundatio.Aliyun/Storage/AliyunFileStorage.cs

View workflow job for this annotation

GitHub Actions / build / build

The type or namespace name 'StreamMode' could not be found (are you missing a using directive or an assembly reference?)
{
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

if (streamMode is StreamMode.Write)
throw new NotSupportedException($"Stream mode {streamMode} is not supported.");

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Getting file stream for {Path}", normalizedPath);

var response = await Task.Factory.FromAsync(_client.BeginGetObject, _client.EndGetObject, new GetObjectRequest(_bucket, normalizedPath), null).AnyContext();
if (!response.HttpStatusCode.IsSuccessful()) {
_logger.LogError("[{HttpStatusCode}] Unable to get file stream for {Path}", response.HttpStatusCode, normalizedPath);
return null;
}

return new ActionableStream(response.ResponseStream, () => {
_logger.LogTrace("Disposing file stream for {Path}", normalizedPath);
response.Dispose();
Expand Down Expand Up @@ -108,7 +116,7 @@ public async Task<bool> SaveFileAsync(string path, Stream stream, CancellationTo

string normalizedPath = NormalizePath(path);
_logger.LogTrace("Saving {Path}", normalizedPath);

var seekableStream = stream.CanSeek ? stream : new MemoryStream();
if (!stream.CanSeek) {
await stream.CopyToAsync(seekableStream).AnyContext();
Expand Down Expand Up @@ -136,7 +144,7 @@ public async Task<bool> RenameFileAsync(string path, string newPath, Cancellatio
string normalizedPath = NormalizePath(path);
string normalizedNewPath = NormalizePath(newPath);
_logger.LogInformation("Renaming {Path} to {NewPath}", normalizedPath, normalizedNewPath);

return await CopyFileAsync(normalizedPath, normalizedNewPath, cancellationToken).AnyContext() &&
await DeleteFileAsync(normalizedPath, cancellationToken).AnyContext();
}
Expand Down Expand Up @@ -185,7 +193,7 @@ public async Task<int> DeleteFilesAsync(string searchPattern = null, Cancellatio
var files = await GetFileListAsync(searchPattern, cancellationToken: cancellation).AnyContext();
_logger.LogInformation("Deleting {FileCount} files matching {SearchPattern}", files.Count, searchPattern);
var result = _client.DeleteObjects(new DeleteObjectsRequest(_bucket, files.Select(spec => spec.Path).ToList()));
if (result.HttpStatusCode != HttpStatusCode.OK)
if (result.HttpStatusCode != HttpStatusCode.OK)
throw new Exception($"[{result.HttpStatusCode}] Unable to delete files");

int count = result.Keys?.Length ?? 0;
Expand All @@ -204,14 +212,14 @@ public async Task<PagedFileListResult> GetPagedFileListAsync(int pageSize = 100,

private async Task<NextPageResult> GetFiles(string searchPattern, int page, int pageSize, CancellationToken cancellationToken) {
var criteria = GetRequestCriteria(searchPattern);

int pagingLimit = pageSize;
int skip = (page - 1) * pagingLimit;
if (pagingLimit < Int32.MaxValue)
pagingLimit++;

_logger.LogTrace(
s => s.Property("SearchPattern", searchPattern).Property("Limit", pagingLimit).Property("Skip", skip),
s => s.Property("SearchPattern", searchPattern).Property("Limit", pagingLimit).Property("Skip", skip),
"Getting file list matching {Prefix} and {Pattern}...", criteria.Prefix, criteria.Pattern
);

Expand All @@ -221,7 +229,7 @@ private async Task<NextPageResult> GetFiles(string searchPattern, int page, int
do {
var listing = await Task.Factory.FromAsync(
_client.BeginListObjects,
_client.EndListObjects,
_client.EndListObjects,
new ListObjectsRequest(_bucket) {
Prefix = criteria.Prefix,
Marker = marker,
Expand All @@ -236,7 +244,7 @@ private async Task<NextPageResult> GetFiles(string searchPattern, int page, int
_logger.LogTrace("Skipping {Path}: Doesn't match pattern", blob.Key);
continue;
}

blobs.Add(blob);
}
} while (!cancellationToken.IsCancellationRequested && !String.IsNullOrEmpty(marker) && blobs.Count < totalLimit);
Expand Down Expand Up @@ -269,24 +277,24 @@ private async Task<NextPageResult> GetFiles(string searchPattern, int page, int
private async Task<List<FileSpec>> GetFileListAsync(string searchPattern = null, int? limit = null, int? skip = null, CancellationToken cancellationToken = default) {
if (limit is <= 0)
return new List<FileSpec>();

var criteria = GetRequestCriteria(searchPattern);

_logger.LogTrace(
s => s.Property("SearchPattern", searchPattern).Property("Limit", limit).Property("Skip", skip),
s => s.Property("SearchPattern", searchPattern).Property("Limit", limit).Property("Skip", skip),
"Getting file list matching {Prefix} and {Pattern}...", criteria.Prefix, criteria.Pattern
);
int totalLimit = limit.GetValueOrDefault(Int32.MaxValue) < Int32.MaxValue

int totalLimit = limit.GetValueOrDefault(Int32.MaxValue) < Int32.MaxValue
? skip.GetValueOrDefault() + limit.Value
: Int32.MaxValue;

string marker = null;
var blobs = new List<OssObjectSummary>();
do {
var listing = await Task.Factory.FromAsync(
_client.BeginListObjects,
_client.EndListObjects,
_client.EndListObjects,
new ListObjectsRequest(_bucket) {
Prefix = criteria.Prefix,
Marker = marker,
Expand All @@ -301,14 +309,14 @@ private async Task<List<FileSpec>> GetFileListAsync(string searchPattern = null,
_logger.LogTrace("Skipping {Path}: Doesn't match pattern", blob.Key);
continue;
}

blobs.Add(blob);
}
} while (!cancellationToken.IsCancellationRequested && !String.IsNullOrEmpty(marker) && blobs.Count < totalLimit);

if (skip.HasValue)
blobs = blobs.Skip(skip.Value).ToList();

if (limit.HasValue)
blobs = blobs.Take(limit.Value).ToList();

Expand All @@ -328,14 +336,14 @@ private class SearchCriteria {
private SearchCriteria GetRequestCriteria(string searchPattern) {
if (String.IsNullOrEmpty(searchPattern))
return new SearchCriteria { Prefix = String.Empty };

string normalizedSearchPattern = NormalizePath(searchPattern);
int wildcardPos = normalizedSearchPattern.IndexOf('*');
bool hasWildcard = wildcardPos >= 0;

string prefix = normalizedSearchPattern;
Regex patternRegex = null;

if (hasWildcard) {
patternRegex = new Regex($"^{Regex.Escape(normalizedSearchPattern).Replace("\\*", ".*?")}$");
int slashPos = normalizedSearchPattern.LastIndexOf('/');
Expand All @@ -347,7 +355,7 @@ private SearchCriteria GetRequestCriteria(string searchPattern) {
Pattern = patternRegex
};
}

public void Dispose() { }

private bool IsNotFoundException(Exception ex) {
Expand Down

0 comments on commit 594e94b

Please sign in to comment.