Skip to content

Commit

Permalink
Finish implementing multipart uploads for logged in users
Browse files Browse the repository at this point in the history
  • Loading branch information
Jack-Edwards committed Sep 8, 2024
1 parent 0b599f6 commit b6d394f
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 91 deletions.
4 changes: 2 additions & 2 deletions Crypter.API/Controllers/FileTransferController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ public async Task<IActionResult> InitializeMultipartFileTransferAsync([FromQuery
[ProducesResponseType(StatusCodes.Status400BadRequest, Type = typeof(ErrorResponse))]
[ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(ErrorResponse))]
public async Task<IActionResult> UploadMultipartFileTransferAsync([FromQuery] string id, [FromQuery] int position,
[FromForm] IFormFile? cipertext)
[FromForm] IFormFile? ciphertext)
{
SaveMultipartFileTransferCommand command = new SaveMultipartFileTransferCommand(UserId, id, position, cipertext?.OpenReadStream());
SaveMultipartFileTransferCommand command = new SaveMultipartFileTransferCommand(UserId, id, position, ciphertext?.OpenReadStream());
return await _sender.Send(command)
.MatchAsync(
left: MakeErrorResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
using System.IO;
using Crypter.Common.Client.Interfaces.HttpClients;
using Crypter.Common.Client.Transfer.Models;
using Crypter.Common.Enums;
using Crypter.Crypto.Common;
using Crypter.Crypto.Common.KeyExchange;
using Crypter.Crypto.Common.StreamEncryption;
Expand Down
42 changes: 19 additions & 23 deletions Crypter.Common.Client/Transfer/Handlers/UploadFileHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Crypter.Common.Client.Interfaces.HttpClients;
using Crypter.Common.Client.Transfer.Handlers.Base;
Expand Down Expand Up @@ -66,20 +65,20 @@ internal void SetTransferInfo(Func<Stream> fileStreamOpener, string fileName, lo
_transferInfoSet = true;
}

public Task<Either<UploadTransferError, UploadHandlerResponse>> UploadAsync(Action<double>? updateCallback = null, bool multipart = false)
public Task<Either<UploadTransferError, UploadHandlerResponse>> UploadAsync(Action<double>? updateCallback = null)
{
if (!_transferInfoSet)
{
return Either<UploadTransferError, UploadHandlerResponse>
.From(UploadTransferError.UnknownError)
.AsTask();
}

(Func<Action<double>?, EncryptionStream> encryptionStreamOpener, byte[]? senderPublicKey, byte[] proof) = GetEncryptionInfo(_fileStreamOpener!, _fileSize);
UploadFileTransferRequest request = new UploadFileTransferRequest(_fileName!, _fileContentType!, senderPublicKey,
KeyExchangeNonce, proof, ExpirationHours);

if (multipart)
if (SenderDefined)
{
// Initialize
return CrypterApiClient.FileTransfer
Expand All @@ -89,21 +88,22 @@ public Task<Either<UploadTransferError, UploadHandlerResponse>> UploadAsync(Acti
// Upload
.BindAsync(async initializeResult =>
{
Either<UploadMultipartFileTransferError, Unit> uploadResult =
Either<UploadMultipartFileTransferError, Unit>.Neither;
Either<UploadMultipartFileTransferError, Unit> uploadResult = Either<UploadMultipartFileTransferError, Unit>.Neither;
EncryptionStream encryptionStream = encryptionStreamOpener(updateCallback);
long maximumReadLength = ClientTransferSettings.MaximumMultipartUploadPartSizeMB *
Convert.ToInt64(Math.Pow(10, 6));
foreach (var iterable in SplitEncryptionStream(encryptionStream, maximumReadLength)
.Select((x, y) => new { StreamOpener = x, Index = y }))
long maximumReadLength = ClientTransferSettings.MaximumMultipartUploadPartSizeMB * Convert.ToInt64(Math.Pow(10, 6));

IAsyncEnumerator<Func<MemoryStream>> enumerable = SplitEncryptionStreamAsync(encryptionStream, maximumReadLength).GetAsyncEnumerator();
int currentPosition = 0;
while (await enumerable.MoveNextAsync())
{
uploadResult = await CrypterApiClient.FileTransfer.UploadMultipartFileTransferAsync(
initializeResult.HashId,
iterable.Index, iterable.StreamOpener);
initializeResult.HashId, currentPosition, enumerable.Current);
if (!uploadResult.IsRight)
{
break;
}

currentPosition++;
}

return await uploadResult
Expand All @@ -125,23 +125,19 @@ public Task<Either<UploadTransferError, UploadHandlerResponse>> UploadAsync(Acti
RecipientKeySeed));
}

IEnumerable<Func<MemoryStream>> SplitEncryptionStream(EncryptionStream encryptionStream, long maximumReadLength)
async IAsyncEnumerable<Func<MemoryStream>> SplitEncryptionStreamAsync(EncryptionStream encryptionStream, long maximumReadLength)
{
int bytesRead = 0;
bool endOfStream;
do
{
byte[] buffer = new byte[maximumReadLength];
bytesRead = encryptionStream.Read(buffer);
if (bytesRead > 0)
int bytesRead = await encryptionStream.ReadAsync(buffer);
endOfStream = bytesRead == 0;
if (!endOfStream)
{
yield return () =>
{
MemoryStream memoryStream = new MemoryStream();
memoryStream.Write(buffer, 0, bytesRead);
return memoryStream;
};
yield return () => new MemoryStream(buffer, 0, bytesRead);
}
} while (bytesRead > 0);
} while (!endOfStream);
}
}
}
3 changes: 1 addition & 2 deletions Crypter.Core/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
* Contact the current copyright holder to discuss commercial license options.
*/

using System;
using System.Threading.Tasks;
using Crypter.Common.Exceptions;
using Crypter.Core.Identity;
Expand All @@ -37,7 +36,6 @@
using Crypter.DataAccess;
using Hangfire;
using Hangfire.PostgreSql;
using MediatR;
using Microsoft.AspNetCore.Builder;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
Expand Down Expand Up @@ -108,6 +106,7 @@ public static IServiceCollection AddCrypterCore(this IServiceCollection services
{
options.AllocatedGB = transferStorageSettings.AllocatedGB;
options.Location = transferStorageSettings.Location;
options.MaximumTransferSizeMB = transferStorageSettings.MaximumTransferSizeMB;
});

services.AddHangfire(config => config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
*/

using System;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -70,12 +69,10 @@ public AbandonMultipartFileTransferCommandHandler(

public async Task<Either<AbandonMultipartFileTransferError, Unit>> Handle(AbandonMultipartFileTransferCommand request, CancellationToken cancellationToken)
{
await using IDbContextTransaction transaction = await _dataContext.Database
.BeginTransactionAsync(IsolationLevel.Serializable, CancellationToken.None);

try
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
IExecutionStrategy executionStrategy = _dataContext.Database.CreateExecutionStrategy();
return await executionStrategy.ExecuteAsync(async () =>
{
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
Task<Either<AbandonMultipartFileTransferError, Unit>> responseTask =
from additionalData in ValidateRequestAsync(request)
from abandonResult in Either<AbandonMultipartFileTransferError, Unit>.FromRightAsync(
Expand All @@ -100,14 +97,11 @@ from sideEffects in Either<AbandonMultipartFileTransferError, Unit>.FromRightAsy
async () =>
{
FailedMultipartFileTransferAbandonEvent failedMultipartAbandonEvent =
new FailedMultipartFileTransferAbandonEvent(request.HashId, request.SenderId, AbandonMultipartFileTransferError.UnknownError, utcNow);
new FailedMultipartFileTransferAbandonEvent(request.HashId, request.SenderId,
AbandonMultipartFileTransferError.UnknownError, utcNow);
await _publisher.Publish(failedMultipartAbandonEvent, CancellationToken.None);
});
}
finally
{
await transaction.CommitAsync(CancellationToken.None);
}
});
}

private async Task<Either<AbandonMultipartFileTransferError, ValidRequestData>> ValidateRequestAsync(AbandonMultipartFileTransferCommand request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
*/

using System;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -71,15 +70,13 @@ public FinalizeMultipartFileTransferCommandHandler(

public async Task<Either<FinalizeMultipartFileTransferError, Unit>> Handle(FinalizeMultipartFileTransferCommand request, CancellationToken cancellationToken)
{
await using IDbContextTransaction transaction = await _dataContext.Database
.BeginTransactionAsync(IsolationLevel.Serializable, CancellationToken.None);

try
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
IExecutionStrategy executionStrategy = _dataContext.Database.CreateExecutionStrategy();
return await executionStrategy.ExecuteAsync(async () =>
{
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
Task<Either<FinalizeMultipartFileTransferError, Unit>> responseTask =
from additionalData in ValidateRequestAsync(request)
from finalizeResult in FinalizeAsync(request, additionalData).ToLeftEitherAsync(Unit.Default)
from finalizeResult in FinalizeAsync(additionalData).ToLeftEitherAsync(Unit.Default)
let successfulMultipartFinalizeEvent = new SuccessfulMultipartFileTransferFinalizationEvent(
additionalData.InitializedTransferEntity.Id,
additionalData.InitializedTransferEntity.RecipientId ?? Maybe<Guid>.None,
Expand All @@ -93,20 +90,18 @@ from sideEffects in Either<FinalizeMultipartFileTransferError, Unit>.FromRightAs
async error =>
{
FailedMultipartFileTransferFinalizationEvent failedMultipartFinalizationEvent =
new FailedMultipartFileTransferFinalizationEvent(request.HashId, request.SenderId, error, utcNow);
new FailedMultipartFileTransferFinalizationEvent(request.HashId, request.SenderId, error,
utcNow);
await _publisher.Publish(failedMultipartFinalizationEvent, CancellationToken.None);
},
async () =>
{
FailedMultipartFileTransferFinalizationEvent failedMultipartFinalizationEvent =
new FailedMultipartFileTransferFinalizationEvent(request.HashId, request.SenderId, FinalizeMultipartFileTransferError.UnknownError, utcNow);
new FailedMultipartFileTransferFinalizationEvent(request.HashId, request.SenderId,
FinalizeMultipartFileTransferError.UnknownError, utcNow);
await _publisher.Publish(failedMultipartFinalizationEvent, CancellationToken.None);
});
}
finally
{
await transaction.CommitAsync(CancellationToken.None);
}
});
}

private async Task<Either<FinalizeMultipartFileTransferError, ValidRequestData>> ValidateRequestAsync(FinalizeMultipartFileTransferCommand request)
Expand All @@ -132,7 +127,7 @@ private async Task<Either<FinalizeMultipartFileTransferError, ValidRequestData>>
return new ValidRequestData(initializedTransferEntity);
}

private async Task<Maybe<FinalizeMultipartFileTransferError>> FinalizeAsync(FinalizeMultipartFileTransferCommand request, ValidRequestData additionalData)
private async Task<Maybe<FinalizeMultipartFileTransferError>> FinalizeAsync(ValidRequestData additionalData)
{
bool finalizeSuccess = await _transferRepository.JoinTransferPartsAsync(
additionalData.InitializedTransferEntity.Id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
*/

using System;
using System.Data;
using System.IO;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -81,12 +80,10 @@ public SaveMultipartFileTransferCommandHandler(
public async Task<Either<UploadMultipartFileTransferError, Unit>> Handle(SaveMultipartFileTransferCommand request,
CancellationToken cancellationToken)
{
await using IDbContextTransaction transaction = await _dataContext.Database
.BeginTransactionAsync(IsolationLevel.Serializable, CancellationToken.None);

try
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
IExecutionStrategy executionStrategy = _dataContext.Database.CreateExecutionStrategy();
return await executionStrategy.ExecuteAsync(async () =>
{
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
Task<Either<UploadMultipartFileTransferError, Unit>> responseTask =
from additionalData in ValidateRequestAsync(request)
from saveResult in SavePartAsync(request, additionalData).ToLeftEitherAsync(Unit.Default)
Expand All @@ -111,11 +108,7 @@ from sideEffects in Either<UploadMultipartFileTransferError, Unit>.FromRightAsyn
new FailedMultipartFileTransferUploadEvent(request.HashId, request.SenderId, UploadMultipartFileTransferError.UnknownError, utcNow);
await _publisher.Publish(failedMultipartUploadEvent, CancellationToken.None);
});
}
finally
{
await transaction.CommitAsync(CancellationToken.None);
}
});
}

private async Task<Either<UploadMultipartFileTransferError, ValidRequestData>> ValidateRequestAsync(SaveMultipartFileTransferCommand request)
Expand Down
24 changes: 15 additions & 9 deletions Crypter.Core/Repositories/TransferRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ public long GetTransferPartsSize(Guid id, TransferItemType itemType, TransferUse
{
string directory = GetTransferPartsDirectory(itemType, userType, id);
DirectoryInfo directoryInfo = new DirectoryInfo(directory);
return directoryInfo
.EnumerateFiles()
.Select(x => x.Length)
.DefaultIfEmpty(0)
.Sum(x => Convert.ToInt64(x / Math.Pow(10, 6)));
if (directoryInfo.Exists)
{
return directoryInfo
.EnumerateFiles()
.Select(x => x.Length)
.DefaultIfEmpty(0)
.Sum(x => Convert.ToInt64(x / Math.Pow(10, 6)));
}

return 0;
}

public async Task<bool> SaveTransferAsync(Guid id, TransferItemType itemType, TransferUserType userType,
Expand Down Expand Up @@ -185,14 +190,15 @@ public async Task<bool> JoinTransferPartsAsync(Guid id, TransferItemType itemTyp

List<string> filenames = Directory
.EnumerateFiles(partsDirectory)
.Order()
.OrderBy(x => int.Parse(Path.GetFileNameWithoutExtension(x)))
.ToList();

bool sequentialFilenames = !filenames
.Where((name, index) => Path.GetFileNameWithoutExtension(name) != index.ToString())
bool nonSequentialFilenames = filenames
.Select(x => int.Parse(Path.GetFileNameWithoutExtension(x)))
.Where((name, index) => name != index)
.Any();

if (!sequentialFilenames)
if (nonSequentialFilenames)
{
return false;
}
Expand Down
8 changes: 5 additions & 3 deletions Crypter.DataAccess/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ namespace Crypter.DataAccess;

public static class DependencyInjection
{
private static readonly string[] RetryableErrorCodes = ["57P01"];

public static IServiceCollection AddDataAccess(this IServiceCollection services, string connectionString)
{
ServiceProvider serviceProvider = services.BuildServiceProvider();
Expand All @@ -43,13 +45,13 @@ public static IServiceCollection AddDataAccess(this IServiceCollection services,
{
optionsBuilder.UseNpgsql(connectionString, npgsqlOptionsBuilder =>
{
npgsqlOptionsBuilder.EnableRetryOnFailure(5, TimeSpan.FromSeconds(5), new[] { "57P01" });
npgsqlOptionsBuilder.EnableRetryOnFailure(5, TimeSpan.FromSeconds(5), RetryableErrorCodes);
npgsqlOptionsBuilder.MigrationsHistoryTable(HistoryRepository.DefaultTableName,
DataContext.SchemaName);
})
.LogTo(
filter: (eventId, level) => eventId.Id == CoreEventId.ExecutionStrategyRetrying,
logger: (eventData) =>
filter: (eventId, _) => eventId.Id == CoreEventId.ExecutionStrategyRetrying,
logger: eventData =>
{
ExecutionStrategyEventData? retryEventData = eventData as ExecutionStrategyEventData;
IReadOnlyList<Exception>? exceptions = retryEventData?.ExceptionsEncountered;
Expand Down
14 changes: 7 additions & 7 deletions Crypter.Web/Shared/Transfer/UploadFileTransfer.razor
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@

<div class="encrypt-status text-center" hidden="@(!EncryptionInProgress)">
<h3>@UploadStatusMessage</h3>
@if (TransmissionType is TransferTransmissionType.Stream or TransferTransmissionType.Multipart)
{
<div class="progress">
<div class="progress-bar" role="progressbar" style="transition:none;width: @((UploadStatusPercent ?? 0) * 100)%" aria-valuenow="@(UploadStatusPercent ?? 0)" aria-valuemin="0" aria-valuemax="1"></div>
</div>
}
else
@if (TransmissionType is TransferTransmissionType.Buffer)
{
<div class="spinner-border" role="status">
<span class="visually-hidden">Loading...</span>
</div>
}
else
{
<div class="progress">
<div class="progress-bar" role="progressbar" style="transition:none;width: @((UploadStatusPercent ?? 0) * 100)%" aria-valuenow="@(UploadStatusPercent ?? 0)" aria-valuemin="0" aria-valuemax="1"></div>
</div>
}
</div>
Loading

0 comments on commit b6d394f

Please sign in to comment.