diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPool.Test.csproj b/C#/forSpbu/MyThreadPool.Test/MyThreadPool.Test.csproj new file mode 100644 index 0000000..659333f --- /dev/null +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPool.Test.csproj @@ -0,0 +1,24 @@ + + + + net7.0 + enable + enable + + false + true + + + + + + + + + + + + + + + diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs new file mode 100644 index 0000000..2f0cbac --- /dev/null +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs @@ -0,0 +1,158 @@ +namespace MyThreadPool.Test; + +public class MyThreadPoolTests +{ + private class PoolTester + { + public int Counter { get; private set; } + + public int Fst() + { + Counter++; + var result = 0; + for (int i = 0; i < 1000; i++) + { + result += i; + } + + return result; + } + } + + private static int TestFunc() + { + var result = 0; + for (int i = 0; i < 1000; i++) + { + result += i; + } + + return result; + } + + private MyThreadPool? _pool; + private int _processThreadsAmount; + private int _threadsAmount; + + [SetUp] + public void Setup() + { + System.Diagnostics.Process.GetCurrentProcess().Refresh(); + this._processThreadsAmount = System.Diagnostics.Process.GetCurrentProcess().Threads.Count; + this._threadsAmount = 4; + this._pool = new MyThreadPool(this._threadsAmount); + } + + [Test] + public void SubmitResultTest() + { + Assert.That(_pool!.Submit(TestFunc).Result, Is.EqualTo(499500)); + } + + [Test] + public void SubmitContinueWithResultTest() + { + Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).Result, Is.EqualTo(499501)); + } + + [Test] + public void DoubleContinueWithResultTest() + { + Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).ContinueWith((int result) => result.ToString()).Result, Is.EqualTo("499501")); + } + + [Test] + public void SubmitResultContinueWithResultTest() + { + var task = _pool!.Submit(TestFunc); + + + Assert.That(task.ContinueWith((int result) => result + 1).Result, Is.EqualTo(499501)); + } + + [Test] + public void LazyResultTest() + { + var tester = new PoolTester(); + var task = _pool!.Submit(tester.Fst); + + var res = task.Result; + Assert.That(tester.Counter, Is.EqualTo(1)); + res = task.Result; + Assert.That(tester.Counter, Is.EqualTo(1)); + } + + [Test] + public void ExceptionResultTest() + { + var tester = new PoolTester(); + var task = _pool!.Submit(() => throw new NotImplementedException()); + + Assert.Throws(() => + { + var _ = task.Result; + }); + } + + [Test] + public void IsCompletedTest() + { + var startEvent = new ManualResetEvent(false); + var task = _pool!.Submit(() => + { + startEvent.WaitOne(); + return TestFunc(); + }); + + Assert.That(task.IsCompleted, Is.False); + startEvent.Set(); + var res = task.Result; + Assert.That(task.IsCompleted, Is.True); + res = task.Result; + Assert.That(task.IsCompleted, Is.True); + } + + [Test] + public void ShutdownTest() + { + var fstTask = _pool!.Submit(TestFunc); + var secTask = _pool!.Submit(TestFunc); + _pool!.Shutdown(); + System.Diagnostics.Process.GetCurrentProcess().Refresh(); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_processThreadsAmount)); + + Assert.Multiple(() => + { + Assert.That(fstTask.IsCompleted, Is.True); + Assert.That(secTask.IsCompleted, Is.True); + }); + + _pool!.Submit(TestFunc); + fstTask.ContinueWith((int result) => result + 1); + System.Diagnostics.Process.GetCurrentProcess().Refresh(); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_processThreadsAmount)); + } + + [Test] + public void DoubleContinueWithTest() + { + var fstTask = _pool!.Submit(TestFunc); + var fstContinueTask = fstTask.ContinueWith((int result) => result + 1); + var secContinueTask = fstTask.ContinueWith((int result) => result.ToString()); + + Assert.Multiple(() => + { + Assert.That(fstContinueTask.Result, Is.EqualTo(499501)); + Assert.That(secContinueTask.Result, Is.EqualTo("499500")); + }); + } + + [Test] + public void ThreadsCountTest() + { + System.Diagnostics.Process.GetCurrentProcess().Refresh(); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_processThreadsAmount + 4)); + } + + +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool.Test/Usings.cs b/C#/forSpbu/MyThreadPool.Test/Usings.cs new file mode 100644 index 0000000..cefced4 --- /dev/null +++ b/C#/forSpbu/MyThreadPool.Test/Usings.cs @@ -0,0 +1 @@ +global using NUnit.Framework; \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/IMyTask.cs b/C#/forSpbu/MyThreadPool/IMyTask.cs new file mode 100644 index 0000000..0dc54cc --- /dev/null +++ b/C#/forSpbu/MyThreadPool/IMyTask.cs @@ -0,0 +1,27 @@ +namespace MyThreadPool; + +/// +/// Interface for my thread pool tasks +/// +/// Task return value +public interface IMyTask +{ + /// + /// Whether task is completed + /// + public bool IsCompleted { get; } + + /// + /// Task result, blocks called thread + /// + /// task result + public TResult Result { get; } + + /// + /// Continues task with given function, current task result is used as a parameter for the new one + /// + /// New task delegate + /// New task return type + /// Returns created task + public IMyTask ContinueWith(Func nextDelegate); +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadCreationException.cs b/C#/forSpbu/MyThreadPool/MyThreadCreationException.cs new file mode 100644 index 0000000..5ba042e --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyThreadCreationException.cs @@ -0,0 +1,8 @@ +namespace MyThreadPool; + +public class MyThreadCreationException : Exception +{ + public MyThreadCreationException() + { + } +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs new file mode 100644 index 0000000..7c91e33 --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -0,0 +1,174 @@ +using System.Collections.Concurrent; + +namespace MyThreadPool; + +/// +/// Thread pool for concurrent delegate calculation +/// +public class MyThreadPool : IDisposable +{ + private readonly ConcurrentQueue _taskActions = new (); + private readonly Thread[] _threads; + private readonly Semaphore _taskBlocker = new (0, int.MaxValue); + private readonly CancellationTokenSource _cancellation = new (); + + /// + /// Creates my thread pool with given number of threads + /// + /// Thread count + /// If size is invalid + public MyThreadPool(int size) + { + if (size <= 0) + { + throw new MyThreadCreationException(); + } + + this._threads = new Thread[size]; + var threadStart = new ManualResetEvent(false); + for (int i = 0; i < size; i++) + { + this._threads[i] = new Thread(() => + { + threadStart.WaitOne(); + while (true) + { + this._taskBlocker.WaitOne(); + if (this._taskActions.TryDequeue(out var taskAction)) + { + taskAction.Invoke(); + } + + if (this._cancellation.IsCancellationRequested) + { + break; + } + } + }); + this._threads[i].Start(); + } + + threadStart.Set(); + } + + /// + /// Add new func to a pool queue + /// + /// Func to add + /// Func return type + /// I my task instance of one added to pool + public IMyTask Submit(Func func) + { + var task = new MyTask(func, this); + lock (this._cancellation) + { + if (!this._cancellation.IsCancellationRequested) + { + this._taskActions.Enqueue(task.Execute); + this._taskBlocker.Release(); + } + } + + return task; + } + + /// + /// Softly shuts down the pool, blocks called thread until all running tasks finished + /// + public void Shutdown() + { + lock (this._cancellation) + { + this._cancellation.Cancel(); + } + + for (var i = 0; i < this._threads.Length; i++) + { + this._taskBlocker.Release(); + } + + foreach (var thread in this._threads) + { + thread.Join(); + } + } + + /// + /// Shuts down the pool + /// + public void Dispose() => this.Shutdown(); + + private class MyTask : IMyTask + { + private volatile Func? _func; + private readonly MyThreadPool _threadPool; + + private TResult? _result; + private volatile Exception _exception = new AggregateException(); + private volatile bool _threwException; + private readonly ManualResetEvent _completeEvent = new (false); + private readonly ConcurrentBag _nextTasks = new (); + + public bool IsCompleted { get; private set; } + + public MyTask(Func func, MyThreadPool threadPool) + { + this._func = func; + this._threadPool = threadPool; + } + + public TResult Result => + this._completeEvent.WaitOne() && this._threwException ? throw this._exception : this._result!; + + + public void Execute() + { + try + { + var result = this._func!(); + this._threwException = false; + this._result = result; + } + catch (Exception e) + { + this._threwException = true; + this._exception = new AggregateException(e); + } + finally + { + this._func = null; + this.IsCompleted = true; + this._completeEvent.Set(); + } + lock(this._nextTasks) + { + foreach (var task in this._nextTasks) + { + this._threadPool._taskActions.Enqueue(task); + } + } + } + public IMyTask ContinueWith(Func nextDelegate) + { + lock (this._nextTasks) + { + if (this.IsCompleted) + { + return this._threadPool.Submit(() => nextDelegate(this._result!)); + Console.WriteLine("Added after Completed"); + } + + var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); + lock (this._threadPool._cancellation) + { + if (!this._threadPool._cancellation.IsCancellationRequested) + { + this._nextTasks.Add(nextTask.Execute); + } + } + + return nextTask; + } + } + } +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.csproj b/C#/forSpbu/MyThreadPool/MyThreadPool.csproj new file mode 100644 index 0000000..6836c68 --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.csproj @@ -0,0 +1,9 @@ + + + + net7.0 + enable + enable + + + diff --git a/C#/forSpbu/forSpbu.sln b/C#/forSpbu/forSpbu.sln index 527f400..b71ffd6 100644 --- a/C#/forSpbu/forSpbu.sln +++ b/C#/forSpbu/forSpbu.sln @@ -10,6 +10,14 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "03.03", "03.03", "{882A9B9C EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "10.03", "10.03", "{EA6FC7D9-BDFB-49CD-AC00-FC5DDC5274B0}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2023", "2023", "{0AB32DDD-DD46-4C2C-BDFE-3B5D0D8DC1F3}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "21.09", "21.09", "{1EE24177-570E-465D-87C1-E0030FD69104}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{176DFEB7-A517-46EA-B632-2C3BAF67D25F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool.Test", "MyThreadPool.Test\MyThreadPool.Test.csproj", "{7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -21,12 +29,23 @@ Global {E007586F-9760-4744-BB25-EDEFD6BA860C}.Release|Any CPU.ActiveCfg = Release|Any CPU {E007586F-9760-4744-BB25-EDEFD6BA860C}.Release|Any CPU.Build.0 = Release|Any CPU {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.Build.0 = Release|Any CPU + {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.Build.0 = Release|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Release|Any CPU.Build.0 = Release|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution - {E007586F-9760-4744-BB25-EDEFD6BA860C} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} + {E007586F-9760-4744-BB25-EDEFD6BA860C} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} + {1EE24177-570E-465D-87C1-E0030FD69104} = {0AB32DDD-DD46-4C2C-BDFE-3B5D0D8DC1F3} + {176DFEB7-A517-46EA-B632-2C3BAF67D25F} = {1EE24177-570E-465D-87C1-E0030FD69104} + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A} = {1EE24177-570E-465D-87C1-E0030FD69104} EndGlobalSection EndGlobal