Skip to content

Commit

Permalink
WIP: Seperate IoWorker to seperate class
Browse files Browse the repository at this point in the history
  • Loading branch information
Foxtrot47 committed Jan 14, 2024
1 parent 1027852 commit 3c97aa1
Showing 1 changed file with 215 additions and 0 deletions.
215 changes: 215 additions & 0 deletions Crimson/Core/IoWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Crimson.Models;
using Crimson.Repository;
using Crimson.Utils;
using Serilog;

namespace Crimson.Core
{
public class IoWorker
{
private Thread[] _workerThreads;
public event Action<IoWorkerResult> TaskCompleted;

private readonly ConcurrentQueue<IoTask> _ioQueue = new();
private CancellationTokenSource _cancellationTokenSource = new();

private readonly ConcurrentDictionary<string, object> _fileLocksConcurrentDictionary = new();
private readonly ConcurrentDictionary<string, List<FileManifest>> _chunkToFileManifestsDictionary = new();

private ILogger _log;
private readonly LibraryManager _libraryManager;
private readonly IStoreRepository _repository;
private readonly Storage _storage;

private readonly int _numberOfThreads;
private const int _progressUpdateIntervalInMS = 500;

private Stopwatch _installStopWatch = new();
private DateTime _lastUpdateTime = DateTime.MinValue;

public IoWorker(ILogger log, int threadCount)
{
_log = log;
_numberOfThreads = threadCount;

_workerThreads = new Thread[_numberOfThreads];
for (var i = 0; i < _numberOfThreads; i++)
{
_workerThreads[i] = new Thread(ProcessIoQueue);
}
}

public void Start()
{
}

public void Join()
{
foreach (var thread in _workerThreads)
{
thread.Join();
}
}

private async void ProcessIoQueue()
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
if (_ioQueue.TryDequeue(out var ioTask))
{
try
{
switch (ioTask.TaskType)
{
case IoTaskType.Copy:
// Ensure there is a lock object for each destination file
var fileLock =
_fileLocksConcurrentDictionary.GetOrAdd(ioTask.DestinationFilePath, new object());

var compressedChunkData = await File.ReadAllBytesAsync(ioTask.SourceFilePath);
var chunk = Chunk.ReadBuffer(compressedChunkData);
_log.Debug("ProcessIoQueue: Reading chunk buffers from {source} finished", ioTask.SourceFilePath);

var directoryPath = Path.GetDirectoryName(ioTask.DestinationFilePath);
if (!string.IsNullOrEmpty(directoryPath))
{
Directory.CreateDirectory(directoryPath);
}

lock (fileLock)
{
using var fileStream = new FileStream(ioTask.DestinationFilePath, FileMode.OpenOrCreate,
FileAccess.Write, FileShare.None);

_log.Debug("ProcessIoQueue: Seeking {seek}bytes on file {destination}", ioTask.FileOffset, ioTask.DestinationFilePath);
fileStream.Seek(ioTask.FileOffset, SeekOrigin.Begin);

// Since chunk offset is a long we cannot use it directly in File stream write or read
// Use a memory stream to seek to the chunk offset
using var memoryStream = new MemoryStream(chunk.Data);
memoryStream.Seek(ioTask.Offset, SeekOrigin.Begin);

var remainingBytesToWrite = ioTask.Size;
// Buffer size is irrelevant as write is continuous
const int bufferSize = 4096;
var buffer = new byte[bufferSize];

_log.Debug("ProcessIoQueue: Writing {size}bytes to {file}", ioTask.Size, ioTask.DestinationFilePath);

while (remainingBytesToWrite > 0)
{
var bytesToRead = (int)Math.Min(bufferSize, remainingBytesToWrite);
var bytesRead = memoryStream.Read(buffer, 0, bytesToRead);
fileStream.Write(buffer, 0, bytesRead);

remainingBytesToWrite -= bytesRead;
}

fileStream.Flush();
_log.Debug("ProcessIoQueue: Finished Writing {size}bytes to {file}", ioTask.Size, ioTask.DestinationFilePath);
}

lock (_installItemLock)
{
CurrentInstall.WrittenSize += ioTask.Size / 1000000.0;
CurrentInstall.WriteSpeed = _installStopWatch.IsRunning && _installStopWatch.Elapsed.TotalSeconds > 0
? Math.Round(CurrentInstall.WrittenSize / _installStopWatch.Elapsed.TotalSeconds, 2)
: 0;
CurrentInstall.ProgressPercentage = Convert.ToInt32((CurrentInstall.WrittenSize / CurrentInstall.TotalWriteSizeMb) * 100);

// Limit firing progress update events
if ((DateTime.Now - _lastUpdateTime).TotalMilliseconds >= _progressUpdateIntervalInMS)
{
_lastUpdateTime = DateTime.Now;
InstallProgressUpdate?.Invoke(CurrentInstall);
}
}

// Check for references to the chunk and decrement by one
int newCount = _chunkPartReferences.AddOrUpdate(
ioTask.GuidStr,
(key) => 0, // Not expected to be called as the key should exist
(key, oldValue) =>
{
_log.Debug("ProcessIoQueue: decrementing reference count of {guid} by 1. Current value:{oldValue}", ioTask.GuidStr, oldValue);
return oldValue - 1;
}
);

// Check if the updated count is 0 or less
if (newCount <= 0)
{
// Attempt to remove the item from the dictionary
if (_chunkPartReferences.TryRemove(ioTask.GuidStr, out _))
{
_log.Debug("ProcessIoQueue: Deleting chunk file {file}", ioTask.SourceFilePath);
// Delete the file if successfully removed
File.Delete(ioTask.SourceFilePath);
}
}
break;
case IoTaskType.Delete:
File.Delete(ioTask.DestinationFilePath);
lock (_installItemLock)
{
CurrentInstall.WrittenSize += ioTask.Size / 1000000.0;
CurrentInstall.WriteSpeed = _installStopWatch.IsRunning && _installStopWatch.Elapsed.TotalSeconds > 0
? Math.Round(CurrentInstall.WrittenSize / _installStopWatch.Elapsed.TotalSeconds, 2)
: 0;
CurrentInstall.ProgressPercentage = Convert.ToInt32((CurrentInstall.WrittenSize / CurrentInstall.TotalWriteSizeMb) * 100);

// Limit firing progress update events
if ((DateTime.Now - _lastUpdateTime).TotalMilliseconds >= _progressUpdateIntervalInMS)
{
_lastUpdateTime = DateTime.Now;
InstallProgressUpdate?.Invoke(CurrentInstall);
}
}
break;
}
}
catch (Exception ex)
{
_log.Error("ProcessIoQueue: IO task failed with exception {ex}", ex);
await _cancellationTokenSource.CancelAsync();
if (CurrentInstall != null)
{
CurrentInstall.Status = ActionStatus.Failed;
InstallationStatusChanged?.Invoke(CurrentInstall);
CurrentInstall = null;
}
ProcessNext();
}
}
else
{
await Task.Delay(500);
}

if (_chunkPartReferences.Count <= 0 && _downloadQueue.IsEmpty && CurrentInstall != null)
{
_log.Information("ProcessIoQueue: Both queues are empty. Starting finalize stage");
_ = UpdateInstalledGameStatus();
}
}
}

}


public struct IoWorkerResult(bool result, long size)
{
public bool _result = result;
public long _size = size;
}
}

0 comments on commit 3c97aa1

Please sign in to comment.