Skip to content

Commit

Permalink
Create file-system implementation for Amazon S3 storage
Browse files Browse the repository at this point in the history
  • Loading branch information
skarllot authored and emgarten committed Apr 6, 2018
1 parent f9cc2c3 commit c078cec
Show file tree
Hide file tree
Showing 5 changed files with 458 additions and 0 deletions.
131 changes: 131 additions & 0 deletions src/SleetLib/FileSystem/AmazonS3File.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
using System;
using System.IO;
using System.IO.Compression;
using System.Threading;
using System.Threading.Tasks;
using Amazon.S3;
using NuGet.Common;
using static Sleet.AmazonS3FileSystemAbstraction;

namespace Sleet
{
public class AmazonS3File : FileBase
{
private readonly IAmazonS3 client;
private readonly string bucketName;
private readonly string key;

internal AmazonS3File(
AmazonS3FileSystem fileSystem,
Uri rootPath,
Uri displayPath,
FileInfo localCacheFile,
IAmazonS3 client,
string bucketName,
string key)
: base(fileSystem, rootPath, displayPath, localCacheFile)
{
this.client = client;
this.bucketName = bucketName;
this.key = key;
}

protected override async Task CopyFromSource(ILogger log, CancellationToken token)
{
Uri absoluteUri = UriUtility.GetPath(RootPath, key);
if (!await FileExistsAsync(client, bucketName, key, token).ConfigureAwait(false))
return;

log.LogInformation($"GET {absoluteUri}");

if (File.Exists(LocalCacheFile.FullName))
LocalCacheFile.Delete();

string contentEncoding;
using (FileStream cache = File.OpenWrite(LocalCacheFile.FullName))
{
contentEncoding = await DownloadFileAsync(client, bucketName, key, cache, token).ConfigureAwait(false);
}

if (contentEncoding?.Equals("gzip", StringComparison.OrdinalIgnoreCase) == true)
{
log.LogInformation($"Decompressing {absoluteUri}");

string gzipFile = LocalCacheFile.FullName + ".gz";
File.Move(LocalCacheFile.FullName, gzipFile);

using (Stream destination = File.Create(LocalCacheFile.FullName))
using (Stream source = File.OpenRead(gzipFile))
using (Stream zipStream = new GZipStream(source, CompressionMode.Decompress))
{
await zipStream.CopyToAsync(destination, DefaultCopyBufferSize, token).ConfigureAwait(false);
}
}
}

protected override async Task CopyToSource(ILogger log, CancellationToken token)
{
Uri absoluteUri = UriUtility.GetPath(RootPath, key);
if (!File.Exists(LocalCacheFile.FullName))
{
if (await FileExistsAsync(client, bucketName, key, token).ConfigureAwait(false))
{
log.LogInformation($"Removing {absoluteUri}");
await RemoveFileAsync(client, bucketName, key, token).ConfigureAwait(false);
}
else
{
log.LogInformation($"Skipping {absoluteUri}");
}

return;
}

log.LogInformation($"Pushing {absoluteUri}");

using (FileStream cache = LocalCacheFile.OpenRead())
{
Stream writeStream = cache;
string contentType = null, contentEncoding = null;
if (key.EndsWith(".nupkg", StringComparison.Ordinal))
{
contentType = "application/zip";
}
else if (key.EndsWith(".xml", StringComparison.Ordinal)
|| key.EndsWith(".nuspec", StringComparison.Ordinal))
{
contentType = "application/xml";
}
else if (key.EndsWith(".json", StringComparison.Ordinal)
|| await JsonUtility.IsJsonAsync(LocalCacheFile.FullName))
{
contentType = "application/json";
contentEncoding = "gzip";

// Compress content before uploading
log.LogInformation($"Compressing {absoluteUri}");
writeStream = await JsonUtility.GZipAndMinifyAsync(cache);
}
else if (key.EndsWith(".dll", StringComparison.OrdinalIgnoreCase)
|| key.EndsWith(".pdb", StringComparison.OrdinalIgnoreCase))
{
contentType = "application/octet-stream";
}
else
{
log.LogWarning($"Unknown file type: {absoluteUri}");
}

await UploadFileAsync(client, bucketName, key, contentType, contentEncoding, writeStream, token)
.ConfigureAwait(false);

writeStream.Dispose();
}
}

protected override Task<bool> RemoteExists(ILogger log, CancellationToken token)
{
return FileExistsAsync(client, bucketName, key, token);
}
}
}
107 changes: 107 additions & 0 deletions src/SleetLib/FileSystem/AmazonS3FileSystem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Amazon.S3;
using Amazon.S3.Model;
using NuGet.Common;

namespace Sleet
{
public class AmazonS3FileSystem : FileSystemBase
{
private readonly string bucketName;
private readonly IAmazonS3 client;

public AmazonS3FileSystem(LocalCache cache, Uri root, IAmazonS3 client, string bucketName)
: this(cache, root, root, client, bucketName)
{
}

public AmazonS3FileSystem(
LocalCache cache,
Uri root,
Uri baseUri,
IAmazonS3 client,
string bucketName,
string feedSubPath = null)
: base(cache, root, baseUri, feedSubPath)
{
this.client = client;
this.bucketName = bucketName;
}

public override async Task<bool> Validate(ILogger log, CancellationToken token)
{
log.LogInformation($"Verifying {bucketName} exists.");

bool isBucketFound = await client.DoesS3BucketExistAsync(bucketName).ConfigureAwait(false);
if (!isBucketFound)
{
log.LogError(
$"Unable to find {bucketName}. Verify that the Amazon account and bucket exists. The bucket " +
"must be created manually before using this feed.");
}

return isBucketFound;
}

public override ISleetFileSystemLock CreateLock(ILogger log)
{
return new AmazonS3FileSystemLock(client, bucketName, log);
}

public override ISleetFile Get(Uri path)
{
return Files.GetOrAdd(path, CreateAmazonS3File);
}

public override async Task<IReadOnlyList<ISleetFile>> GetFiles(ILogger log, CancellationToken token)
{
List<S3Object> s3Objects = null;
var listObjectsRequest = new ListObjectsV2Request
{
BucketName = bucketName,
MaxKeys = 100,
};

ListObjectsV2Response listObjectsResponse;
do
{
listObjectsResponse = await client.ListObjectsV2Async(listObjectsRequest, token).ConfigureAwait(false);
listObjectsRequest.ContinuationToken = listObjectsResponse.NextContinuationToken;

if (s3Objects == null)
s3Objects = listObjectsResponse.S3Objects;
else
s3Objects.AddRange(listObjectsResponse.S3Objects);
} while (listObjectsResponse.IsTruncated);

return s3Objects.Where(x => !x.Key.Equals(AmazonS3FileSystemLock.LockFile))
.Select(x => Get(GetPath(x.Key)))
.ToList();
}

private ISleetFile CreateAmazonS3File(Uri uri)
{
Uri rootUri = UriUtility.ChangeRoot(BaseURI, Root, uri);
string key = GetPathRelativeToBucket(uri);
return new AmazonS3File(this, rootUri, uri, LocalCache.GetNewTempPath(), client, bucketName, key);
}

private string GetPathRelativeToBucket(Uri uri)
{
if (uri == null)
throw new ArgumentNullException(nameof(uri));

string baseUri = BaseURI.ToString();
string path = uri.AbsoluteUri;

if (!path.StartsWith(baseUri, StringComparison.Ordinal))
throw new InvalidOperationException($"Unable to make '{uri.AbsoluteUri}' relative to '{baseUri}'");

return path.Replace(baseUri, string.Empty);
}
}
}
107 changes: 107 additions & 0 deletions src/SleetLib/FileSystem/AmazonS3FileSystemAbstraction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Transfer;

namespace Sleet
{
public static class AmazonS3FileSystemAbstraction
{
public const int DefaultCopyBufferSize = 81920;

public static Task CreateFileAsync(
IAmazonS3 client,
string bucketName,
string key,
string contentBody,
CancellationToken token)
{
var putObjectRequest = new PutObjectRequest
{
BucketName = bucketName,
Key = key,
ContentBody = contentBody,
};

return client.PutObjectAsync(putObjectRequest, token);
}

public static async Task<string> DownloadFileAsync(
IAmazonS3 client,
string bucketName,
string key,
Stream writer,
CancellationToken token)
{
using (GetObjectResponse response = await client
.GetObjectAsync(bucketName, key, token)
.ConfigureAwait(false))
using (Stream responseStream = response.ResponseStream)
{
await responseStream.CopyToAsync(writer, DefaultCopyBufferSize, token).ConfigureAwait(false);
return response.Headers.ContentEncoding;
}
}

public static async Task<bool> FileExistsAsync(
IAmazonS3 client,
string bucketName,
string key,
CancellationToken token)
{
var listObjectsRequest = new ListObjectsV2Request
{
BucketName = bucketName,
Prefix = key,
};
ListObjectsV2Response listObjectsResponse = await client
.ListObjectsV2Async(listObjectsRequest, token)
.ConfigureAwait(false);

return listObjectsResponse.S3Objects
.Any(x => x.Key.Equals(key, StringComparison.Ordinal));
}

public static Task RemoveFileAsync(IAmazonS3 client, string bucketName, string key, CancellationToken token)
{
return client.DeleteObjectAsync(bucketName, key, token);
}

public static async Task UploadFileAsync(
IAmazonS3 client,
string bucketName,
string key,
string contentType,
string contentEncoding,
Stream reader,
CancellationToken token)
{
var transferUtility = new TransferUtility(client);
var request = new TransferUtilityUploadRequest
{
BucketName = bucketName,
Key = key,
InputStream = reader,
AutoCloseStream = false,
AutoResetStreamPosition = false,
Headers = { CacheControl = "no-store" }
};

if (contentType != null)
{
request.ContentType = contentType;
request.Headers.ContentType = contentType;
}

if (contentEncoding != null)
request.Headers.ContentEncoding = contentEncoding;

using (transferUtility)
await transferUtility.UploadAsync(request, token).ConfigureAwait(false);
}
}
}
Loading

0 comments on commit c078cec

Please sign in to comment.