diff --git a/MyThreadPool/MyThreadPool.sln b/MyThreadPool/MyThreadPool.sln new file mode 100644 index 0000000..f05a317 --- /dev/null +++ b/MyThreadPool/MyThreadPool.sln @@ -0,0 +1,31 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.4.33403.182 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{571D564D-1D0C-469E-857E-D0D1390DC178}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestsForMyThreadPool", "TestsForMyThreadPool\TestsForMyThreadPool.csproj", "{EDDD394B-53B2-4A35-916C-7AEB2FBF3728}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {571D564D-1D0C-469E-857E-D0D1390DC178}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {571D564D-1D0C-469E-857E-D0D1390DC178}.Debug|Any CPU.Build.0 = Debug|Any CPU + {571D564D-1D0C-469E-857E-D0D1390DC178}.Release|Any CPU.ActiveCfg = Release|Any CPU + {571D564D-1D0C-469E-857E-D0D1390DC178}.Release|Any CPU.Build.0 = Release|Any CPU + {EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {B78A1404-BCEF-4F4C-93D1-CCA4EFF20DB0} + EndGlobalSection +EndGlobal diff --git a/MyThreadPool/MyThreadPool/IMyTask.cs b/MyThreadPool/MyThreadPool/IMyTask.cs new file mode 100644 index 0000000..19da006 --- /dev/null +++ b/MyThreadPool/MyThreadPool/IMyTask.cs @@ -0,0 +1,20 @@ +namespace MyThreadPool; + +/// +/// Interface for creating tasks +/// +public interface IMyTask +{ + /// + /// Check is completed Task and return result + /// + public bool IsCompleted { get; } + /// + /// Return Result Task + /// + public TResult? Result { get; } + /// + /// A method for solving subtasks from the results obtained from the tasks + /// + public IMyTask ContinueWith(Func supplier); +} \ No newline at end of file diff --git a/MyThreadPool/MyThreadPool/MyThreadPool.cs b/MyThreadPool/MyThreadPool/MyThreadPool.cs new file mode 100644 index 0000000..857de17 --- /dev/null +++ b/MyThreadPool/MyThreadPool/MyThreadPool.cs @@ -0,0 +1,242 @@ +namespace MyThreadPool; + +/// +/// A class for automatic efficient flow control in the program. +/// +public class MyThreadPool +{ + private readonly MyThread[]? arrayThreads; + private readonly Queue tasks = new(); + private readonly CancellationTokenSource token = new(); + private readonly Object lockerForThreads; + private EventWaitHandle waitHandle; + + /// + /// Constructor for creating n number of threads for tasks + /// + public MyThreadPool(int sizeThreads) + { + waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset); + lockerForThreads = new(); + if (sizeThreads <= 0) + { + throw new ArgumentOutOfRangeException(); + } + arrayThreads = new MyThread[sizeThreads]; + for (int i = 0; i < sizeThreads; i++) + { + arrayThreads[i] = new(tasks, token.Token, lockerForThreads, waitHandle); + } + } + + /// + /// Get number of threads + /// + public int GetNumberOfThreads() + { + if (arrayThreads == null) + { + throw new ArgumentNullException(); + } + return arrayThreads.Length; + } + + /// + /// Accepts a function, adds it as a task in the thread, and returns the created task + /// + public IMyTask Submit(Func supplier) + { + if (token.IsCancellationRequested) + { + throw new ShudownWasThrownException(); + } + if (arrayThreads == null) + { + throw new ArgumentNullException(); + } + var newTask = new MyTask(supplier, token, this); + tasks.Enqueue(() => newTask.StartSupplier()); + waitHandle.Set(); + + return newTask; + } + + /// + /// Interrupts the processing of tasks that are not started do not begin, and those that are started are being completed + /// + public void Shutdown() + { + if (arrayThreads == null) + { + throw new ArgumentNullException(nameof(arrayThreads)); + } + + token.Cancel(); + waitHandle.Set(); + + foreach(var thread in arrayThreads) + { + thread.Join(); + } + } + + private class MyThread + { + private Thread? thread; + private volatile Queue tasks; + private Object locker; + private CancellationToken token; + private EventWaitHandle waitHandle; + + /// + /// Task-based custom thread constructor + /// + public MyThread(Queue tasks, CancellationToken token, object locker, EventWaitHandle waitHandle) + { + this.waitHandle = waitHandle; + this.tasks = tasks; + this.token = token; + this.locker = locker; + thread = new Thread(EternalCycle); + thread.Start(); + } + + /// + /// Task waiting cycle + /// + private void EternalCycle() + { + Action? task = null; + while (!token.IsCancellationRequested) + { + waitHandle.WaitOne(); + if (!token.IsCancellationRequested) + { + lock (locker) + { + if (tasks.Count != 0) + { + var isCorrect = tasks.TryDequeue(out task); + if (!isCorrect) + { + throw new InvalidOperationException(); + } + } + if (tasks.Count == 0) + { + waitHandle.Reset(); + } + } + if (task != null) + { + task(); + } + task = null; + } + } + } + + /// + /// Suspends the current thread + /// + public void Join() + { + if (thread == null) + { + throw new ArgumentNullException(nameof(thread)); + } + + thread.Join(); + } + } + + private class MyTask : IMyTask + { + private readonly Func? supplier; + private TResult? result; + private Exception? exception; + private CancellationTokenSource token; + private ManualResetEvent resetEvent = new ManualResetEvent(false); + private MyThreadPool? pool; + + /// + /// Constructor for creating a task + /// + public MyTask(Func supplier, CancellationTokenSource token, MyThreadPool pool) + { + this.supplier = supplier; + this.token = token; + this.pool = pool; + this.result = default; + } + + /// + /// Get Result + /// + public TResult Result + { + get + { + if (waitHandle == null) + { + throw new InvalidOperationException(); + } + waitHandle.WaitOne(); + if (exception != null) + { + throw new AggregateException(exception); + } + return result!; + } + } + + /// + /// Check is completed Task and return result + /// + public bool IsCompleted { get; private set; } + + /// + /// Task completion + /// + public void StartSupplier() + { + if (supplier != null) + { + try + { + result = supplier(); + } + catch (Exception ex) + { + exception = ex; + } + + if (waitHandle == null) + { + throw new InvalidOperationException(); + } + + waitHandle.Set(); + + IsCompleted = true; + } + } + + /// + /// A method for solving subtasks from the results obtained from the tasks + /// + public IMyTask ContinueWith(Func supplier) + { + if (pool == null) + { + throw new InvalidOperationException(); + } + + if (!token.IsCancellationRequested) + { + return pool.Submit(() => supplier(Result)); + } + throw new ShudownWasThrownException(); + } + } +} \ No newline at end of file diff --git a/MyThreadPool/MyThreadPool/MyThreadPool.csproj b/MyThreadPool/MyThreadPool/MyThreadPool.csproj new file mode 100644 index 0000000..424caff --- /dev/null +++ b/MyThreadPool/MyThreadPool/MyThreadPool.csproj @@ -0,0 +1,11 @@ + + + + Exe + net7.0 + enable + enable + Library + + + diff --git a/MyThreadPool/MyThreadPool/ShudownWasThrownException.cs b/MyThreadPool/MyThreadPool/ShudownWasThrownException.cs new file mode 100644 index 0000000..656a70c --- /dev/null +++ b/MyThreadPool/MyThreadPool/ShudownWasThrownException.cs @@ -0,0 +1,13 @@ +namespace MyThreadPool; + +/// +/// An exception is thrown if, after calling the Shutdown method, the user calls other methods +/// +public class ShudownWasThrownException : Exception +{ + public ShudownWasThrownException() { } + + public ShudownWasThrownException(string? message) : base(message) { } + + public ShudownWasThrownException(string? message, Exception? innerException) : base(message, innerException) { } +} \ No newline at end of file diff --git a/MyThreadPool/TestsForMyThreadPool/TestsForMyThreadPool.cs b/MyThreadPool/TestsForMyThreadPool/TestsForMyThreadPool.cs new file mode 100644 index 0000000..5f672bf --- /dev/null +++ b/MyThreadPool/TestsForMyThreadPool/TestsForMyThreadPool.cs @@ -0,0 +1,100 @@ +namespace TestsForMyThreadPool; + +using MyThreadPool; + +public class Tests +{ + [Test] + public void ATestWithOneTaskForTenThreads() + { + var myThreadPool = new MyThreadPool(10); + var task = myThreadPool.Submit(() => 2 * 2); + Assert.That(task.Result, Is.EqualTo(4)); + myThreadPool.Shutdown(); + } + + [Test] + public void ATestWithTwoTasksForTenThreads() + { + var myThreadPool = new MyThreadPool(10); + var firstTask = myThreadPool.Submit(() => 2 * 2); + var secondTask = myThreadPool.Submit(() => 3 + 3); + Assert.That(firstTask.Result, Is.EqualTo(4)); + Assert.That(secondTask.Result, Is.EqualTo(6)); + myThreadPool.Shutdown(); + } + + [Test] + public void WhenThereAreFewerThreadsThanTasks() + { + var myThreadPool = new MyThreadPool(1); + var firstTask = myThreadPool.Submit(() => 2 * 2); + var secondTask = myThreadPool.Submit(() => 3 + 3); + Assert.That(firstTask.Result, Is.EqualTo(4)); + Assert.That(secondTask.Result, Is.EqualTo(6)); + myThreadPool.Shutdown(); + } + + [Test] + public void IsContinueWithWorkingCorrectlyWithWithOneSubtask() + { + var myThreadPool = new MyThreadPool(3); + var myTask = myThreadPool.Submit(() => 2 * 2).ContinueWith(x => x.ToString()); + Assert.That(myTask.Result, Is.EqualTo("4")); + myThreadPool.Shutdown(); + } + + [Test] + public void IsContinueWithWorkingCorrectlyWithWithSeveralSubtask() + { + var myThreadPool = new MyThreadPool(3); + var myTask = myThreadPool.Submit(() => 2 * 2).ContinueWith(x => x * x).ContinueWith(x => x.ToString()); + Assert.That(myTask.Result, Is.EqualTo("16")); + myThreadPool.Shutdown(); + } + + [Test] + public void NumberThreadsTheRequiredNumberIsCreated() + { + var myThreadPool = new MyThreadPool(5); + Assert.That(myThreadPool.GetNumberOfThreads(), Is.EqualTo(5)); + myThreadPool.Shutdown(); + } + + [Test] + public void WorkingWithException() + { + var myThreadPool = new MyThreadPool(10); + var task = myThreadPool.Submit(() => + { + int zero = 0; + return 1 / zero; + }); + Assert.Throws(() => { var result = task.Result;}); + myThreadPool.Shutdown(); + } + + [Test] + public void TheTaskWillCompletedWhenExceptionResult() + { + var myThreadPool = new MyThreadPool(10); + var task = myThreadPool.Submit(() => + { + int zero = 0; + return 1 / zero; + }); + Assert.Throws(() => { var result = task.Result; }); + Assert.That(task.IsCompleted, Is.True); + myThreadPool.Shutdown(); + } + + [Test] + public void AnotherTasksWillNotBeAcceptedAndTheOldOnesWillBeFinalized() + { + var myThreadPool = new MyThreadPool(1); + var firstTask = myThreadPool.Submit(() => 2 * 2); + myThreadPool.Shutdown(); + Assert.That(firstTask.Result, Is.EqualTo(4)); + Assert.Throws(() => { var secondTask = myThreadPool.Submit(() => 3 + 3); }); + } +} \ No newline at end of file diff --git a/MyThreadPool/TestsForMyThreadPool/TestsForMyThreadPool.csproj b/MyThreadPool/TestsForMyThreadPool/TestsForMyThreadPool.csproj new file mode 100644 index 0000000..c105e8f --- /dev/null +++ b/MyThreadPool/TestsForMyThreadPool/TestsForMyThreadPool.csproj @@ -0,0 +1,23 @@ + + + + net7.0 + enable + enable + + false + + + + + + + + + + + + + + + diff --git a/MyThreadPool/TestsForMyThreadPool/Usings.cs b/MyThreadPool/TestsForMyThreadPool/Usings.cs new file mode 100644 index 0000000..cefced4 --- /dev/null +++ b/MyThreadPool/TestsForMyThreadPool/Usings.cs @@ -0,0 +1 @@ +global using NUnit.Framework; \ No newline at end of file