Skip to content

Commit

Permalink
Merge pull request #235 from bdach/upload-scaling
Browse files Browse the repository at this point in the history
Allow `ScoreUploader` to process replays concurrently
  • Loading branch information
peppy authored May 22, 2024
2 parents a34c409 + a976398 commit 79810ab
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 122 deletions.
108 changes: 58 additions & 50 deletions osu.Server.Spectator.Tests/ScoreUploaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ namespace osu.Server.Spectator.Tests
{
public class ScoreUploaderTests
{
private readonly ScoreUploader uploader;
private readonly Mock<IDatabaseAccess> mockDatabase;
private readonly Mock<IScoreStorage> mockStorage;
private readonly Mock<IDatabaseFactory> databaseFactory;
private readonly Mock<ILoggerFactory> loggerFactory;

public ScoreUploaderTests()
{
Expand All @@ -31,16 +32,14 @@ public ScoreUploaderTests()
passed = true
}));

var databaseFactory = new Mock<IDatabaseFactory>();
databaseFactory = new Mock<IDatabaseFactory>();
databaseFactory.Setup(factory => factory.GetInstance()).Returns(mockDatabase.Object);

var loggerFactory = new Mock<ILoggerFactory>();
loggerFactory = new Mock<ILoggerFactory>();
loggerFactory.Setup(factory => factory.CreateLogger(It.IsAny<string>()))
.Returns(new Mock<ILogger>().Object);

mockStorage = new Mock<IScoreStorage>();
uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);
uploader.UploadInterval = 1000; // Set a high timer interval for testing purposes.
}

/// <summary>
Expand All @@ -56,8 +55,9 @@ public ScoreUploaderTests()
public async Task ScoreDataMergedCorrectly()
{
enableUpload();
var uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);

uploader.Enqueue(1, new Score
await uploader.EnqueueAsync(1, new Score
{
ScoreInfo =
{
Expand All @@ -69,58 +69,50 @@ public async Task ScoreDataMergedCorrectly()
// note OnlineID and Passed not set.
}
});
await Task.Delay(2000);

await uploadsCompleteAsync(uploader);

mockStorage.Verify(s => s.WriteAsync(
It.Is<Score>(score => score.ScoreInfo.OnlineID == 2
&& score.ScoreInfo.Passed
&& score.ScoreInfo.User.Username == "some user")), Times.Once);
}

[Fact]
public async Task ScoreUploadsEveryInterval()
public async Task ScoreUploads()
{
enableUpload();
var uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);

// First score.
uploader.Enqueue(1, new Score());
await Task.Delay(2000);
await uploader.EnqueueAsync(1, new Score());
await uploadsCompleteAsync(uploader);
mockStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 2)), Times.Once);

// Second score (ensure the loop keeps running).
uploader.Enqueue(1, new Score());
await Task.Delay(2000);
await uploader.EnqueueAsync(1, new Score());
await uploadsCompleteAsync(uploader);
mockStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 2)), Times.Exactly(2));
}

[Fact]
public async Task ScoreDoesNotUploadIfDisabled()
{
disableUpload();
var uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);

uploader.Enqueue(1, new Score());
await uploader.Flush();
await uploader.EnqueueAsync(1, new Score());
await Task.Delay(1000);
mockStorage.Verify(s => s.WriteAsync(It.IsAny<Score>()), Times.Never);
}

[Fact]
public async Task ScoreOnlyUploadsOnce()
{
enableUpload();

uploader.Enqueue(1, new Score());
await uploader.Flush();
await uploader.Flush();
mockStorage.Verify(s => s.WriteAsync(It.IsAny<Score>()), Times.Once);
}

[Fact]
public async Task ScoreUploadsWithDelayedScoreToken()
{
enableUpload();
var uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);

// Score with no token.
uploader.Enqueue(2, new Score());
await uploader.Flush();
await uploader.EnqueueAsync(2, new Score());
await Task.Delay(1000);
mockStorage.Verify(s => s.WriteAsync(It.IsAny<Score>()), Times.Never);

// Give the score a token.
Expand All @@ -130,22 +122,21 @@ public async Task ScoreUploadsWithDelayedScoreToken()
passed = true
}));

await uploader.Flush();

await uploadsCompleteAsync(uploader);
mockStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 3)), Times.Once);
}

[Fact]
public async Task TimedOutScoreDoesNotUpload()
{
enableUpload();
var uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);

uploader.TimeoutInterval = 0;

// Score with no token.
uploader.Enqueue(2, new Score());
await uploader.EnqueueAsync(2, new Score());
Thread.Sleep(1000); // Wait for cancellation.
await uploader.Flush();
mockStorage.Verify(s => s.WriteAsync(It.IsAny<Score>()), Times.Never);

// Give the score a token now. It should still not upload because it has timed out.
Expand All @@ -154,21 +145,25 @@ public async Task TimedOutScoreDoesNotUpload()
id = 3,
passed = true
}));

await uploader.Flush();
mockStorage.Verify(s => s.WriteAsync(It.IsAny<Score>()), Times.Never);

// New score that has a token (ensure the loop keeps running).
uploader.Enqueue(1, new Score());
await uploader.Flush();
mockDatabase.Setup(db => db.GetScoreFromToken(3)).Returns(Task.FromResult<SoloScore?>(new SoloScore
{
id = 4,
passed = true
}));
await uploader.EnqueueAsync(3, new Score());
await uploadsCompleteAsync(uploader);
mockStorage.Verify(s => s.WriteAsync(It.IsAny<Score>()), Times.Once);
mockStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 2)), Times.Once);
mockStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 4)), Times.Once);
}

[Fact]
public async Task FailedScoreHandledGracefully()
{
enableUpload();
var uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);

bool shouldThrow = true;
int uploadCount = 0;
Expand All @@ -184,36 +179,49 @@ public async Task FailedScoreHandledGracefully()
});

// Throwing score.
uploader.Enqueue(1, new Score());
await uploader.Flush();
await uploader.EnqueueAsync(1, new Score());
await uploadsCompleteAsync(uploader);
Assert.Equal(0, uploadCount);

shouldThrow = false;

// Same score shouldn't reupload.
await uploader.Flush();
await Task.Delay(1000);
Assert.Equal(0, uploadCount);

uploader.Enqueue(1, new Score());
await uploader.Flush();
await uploader.EnqueueAsync(1, new Score());
await uploadsCompleteAsync(uploader);
Assert.Equal(1, uploadCount);
}

[Fact]
public async Task TimedOutItemGetsOneAttempt()
public async Task TestMassUploads()
{
enableUpload();
AppSettings.ReplayUploaderConcurrency = 4;
var uploader = new ScoreUploader(loggerFactory.Object, databaseFactory.Object, mockStorage.Object);

uploader.TimeoutInterval = 0;
for (int i = 0; i < 1000; ++i)
await uploader.EnqueueAsync(1, new Score());

// Score with no token.
uploader.Enqueue(1, new Score());
Thread.Sleep(1000); // Wait for cancellation.
await uploader.Flush();
mockStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 2)), Times.Once);
await uploadsCompleteAsync(uploader);
mockStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 2)), Times.Exactly(1000));
AppSettings.ReplayUploaderConcurrency = 1;
}

private void enableUpload() => AppSettings.SaveReplays = true;
private void disableUpload() => AppSettings.SaveReplays = false;

private async Task uploadsCompleteAsync(ScoreUploader uploader, int attempts = 5)
{
while (uploader.RemainingUsages > 0)
{
if (attempts <= 0)
Assert.Fail("Waiting for score upload to proceed timed out");

attempts -= 1;
await Task.Delay(1000);
}
}
}
}
24 changes: 18 additions & 6 deletions osu.Server.Spectator.Tests/SpectatorHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ await hub.EndPlaySession(new SpectatorState
State = SpectatedUserState.Passed,
});

await scoreUploader.Flush();
await uploadsCompleteAsync();

if (savingEnabled)
mockScoreStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 456)), Times.Once);
Expand Down Expand Up @@ -208,7 +208,7 @@ await hub.EndPlaySession(new SpectatorState
State = SpectatedUserState.Quit,
});

await scoreUploader.Flush();
await uploadsCompleteAsync();

mockScoreStorage.Verify(s => s.WriteAsync(It.IsAny<Score>()), Times.Never);
mockReceiver.Verify(clients => clients.UserFinishedPlaying(streamer_id, It.Is<SpectatorState>(m => m.State == SpectatedUserState.Quit)), Times.Once());
Expand Down Expand Up @@ -384,7 +384,7 @@ await hub.EndPlaySession(new SpectatorState
State = SpectatedUserState.Passed,
});

await scoreUploader.Flush();
await uploadsCompleteAsync();

if (saved)
mockScoreStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 456)), Times.Once);
Expand Down Expand Up @@ -470,7 +470,7 @@ await hub.EndPlaySession(new SpectatorState
State = SpectatedUserState.Passed,
});

await scoreUploader.Flush();
await uploadsCompleteAsync();

mockScoreStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.UserID == streamer_id
&& score.ScoreInfo.User.OnlineID == streamer_id
Expand Down Expand Up @@ -521,7 +521,7 @@ await hub.EndPlaySession(new SpectatorState
State = SpectatedUserState.Failed,
});

await scoreUploader.Flush();
await uploadsCompleteAsync();

mockScoreStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.OnlineID == 456)), Times.Never);
mockReceiver.Verify(clients => clients.UserFinishedPlaying(streamer_id, It.Is<SpectatorState>(m => m.State == SpectatedUserState.Failed)), Times.Once());
Expand Down Expand Up @@ -575,10 +575,22 @@ await hub.EndPlaySession(new SpectatorState
State = SpectatedUserState.Passed,
});

await scoreUploader.Flush();
await uploadsCompleteAsync();

mockScoreStorage.Verify(s => s.WriteAsync(It.Is<Score>(score => score.ScoreInfo.Rank == ScoreRank.A)), Times.Once);
mockReceiver.Verify(clients => clients.UserFinishedPlaying(streamer_id, It.Is<SpectatorState>(m => m.State == SpectatedUserState.Passed)), Times.Once());
}

private async Task uploadsCompleteAsync(int attempts = 5)
{
while (scoreUploader.RemainingUsages > 0)
{
if (attempts <= 0)
Assert.Fail("Waiting for score upload to proceed timed out");

attempts -= 1;
await Task.Delay(1000);
}
}
}
}
4 changes: 4 additions & 0 deletions osu.Server.Spectator/AppSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace osu.Server.Spectator
public static class AppSettings
{
public static bool SaveReplays { get; set; }
public static int ReplayUploaderConcurrency { get; set; }

#region For use with FileScoreStorage

Expand Down Expand Up @@ -36,6 +37,9 @@ public static class AppSettings
static AppSettings()
{
SaveReplays = Environment.GetEnvironmentVariable("SAVE_REPLAYS") == "1";
ReplayUploaderConcurrency = int.Parse(Environment.GetEnvironmentVariable("REPLAY_UPLOAD_THREADS") ?? "1");
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(ReplayUploaderConcurrency);

ReplaysPath = Environment.GetEnvironmentVariable("REPLAYS_PATH") ?? "replays";
S3Key = Environment.GetEnvironmentVariable("S3_KEY") ?? string.Empty;
S3Secret = Environment.GetEnvironmentVariable("S3_SECRET") ?? string.Empty;
Expand Down
Loading

0 comments on commit 79810ab

Please sign in to comment.