From e3c56656089c963deaf85be9843ee6ec1f49b99a Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Mon, 9 Oct 2023 01:11:48 +0300 Subject: [PATCH 01/13] Init --- C#/forSpbu/MyThreadPool/IMyTask.cs | 10 ++++++ C#/forSpbu/MyThreadPool/MyTask.cs | 9 ++++++ C#/forSpbu/MyThreadPool/MyThread.cs | 35 +++++++++++++++++++++ C#/forSpbu/MyThreadPool/MyThreadPool.cs | 28 +++++++++++++++++ C#/forSpbu/MyThreadPool/MyThreadPool.csproj | 9 ++++++ C#/forSpbu/forSpbu.sln | 20 +++++++++--- 6 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 C#/forSpbu/MyThreadPool/IMyTask.cs create mode 100644 C#/forSpbu/MyThreadPool/MyTask.cs create mode 100644 C#/forSpbu/MyThreadPool/MyThread.cs create mode 100644 C#/forSpbu/MyThreadPool/MyThreadPool.cs create mode 100644 C#/forSpbu/MyThreadPool/MyThreadPool.csproj diff --git a/C#/forSpbu/MyThreadPool/IMyTask.cs b/C#/forSpbu/MyThreadPool/IMyTask.cs new file mode 100644 index 0000000..88f38e6 --- /dev/null +++ b/C#/forSpbu/MyThreadPool/IMyTask.cs @@ -0,0 +1,10 @@ +namespace MyThreadPool; + +public interface IMyTask +{ + public bool IsCompleted { get; } + + public TResult Result(); + + public IMyTask ContinueWith(Func nextDelegate); +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyTask.cs b/C#/forSpbu/MyThreadPool/MyTask.cs new file mode 100644 index 0000000..48a6037 --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyTask.cs @@ -0,0 +1,9 @@ +namespace MyThreadPool; + +public class MyTask : IMyTask +{ + public TResult Result() + { + + } +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThread.cs b/C#/forSpbu/MyThreadPool/MyThread.cs new file mode 100644 index 0000000..01ef8c2 --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyThread.cs @@ -0,0 +1,35 @@ +namespace MyThreadPool; + +public class MyThread +{ + public MyThread() + { + this._thread = new Thread(LifeCycle); + } + + private void LifeCycle() + { + while (!_isTerminated) + { + + } + } + + /// + /// Stops the process sharply + /// + public void Terminate() + { + this._isTerminated = true; + } + + public void Execute(Func task) + { + + } + + private volatile bool _isTerminated; + private volatile bool _isKilled; + + private Thread _thread; +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs new file mode 100644 index 0000000..663b2d8 --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -0,0 +1,28 @@ +namespace MyThreadPool; + +public class MyThreadPool +{ + public MyThreadPool(int size) + { + if (size <= 0) + { + throw new MyThreadCreationException(); + } + + this._threads = new Thread[size]; + this._semaphore = new Semaphore(0, size); + } + + public IMyTask Submit(Func func) + { + _semaphore.WaitOne(); + + + } + + public int Size => this._threads.Length; + + private Semaphore _semaphore; + private I[] _mutexes; + private Thread[] _threads; +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.csproj b/C#/forSpbu/MyThreadPool/MyThreadPool.csproj new file mode 100644 index 0000000..6836c68 --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.csproj @@ -0,0 +1,9 @@ + + + + net7.0 + enable + enable + + + diff --git a/C#/forSpbu/forSpbu.sln b/C#/forSpbu/forSpbu.sln index 527f400..b7b5f9c 100644 --- a/C#/forSpbu/forSpbu.sln +++ b/C#/forSpbu/forSpbu.sln @@ -10,6 +10,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "03.03", "03.03", "{882A9B9C EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "10.03", "10.03", "{EA6FC7D9-BDFB-49CD-AC00-FC5DDC5274B0}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2023", "2023", "{0AB32DDD-DD46-4C2C-BDFE-3B5D0D8DC1F3}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "21.09", "21.09", "{1EE24177-570E-465D-87C1-E0030FD69104}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{176DFEB7-A517-46EA-B632-2C3BAF67D25F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -21,12 +27,18 @@ Global {E007586F-9760-4744-BB25-EDEFD6BA860C}.Release|Any CPU.ActiveCfg = Release|Any CPU {E007586F-9760-4744-BB25-EDEFD6BA860C}.Release|Any CPU.Build.0 = Release|Any CPU {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.Build.0 = Release|Any CPU + {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E}.Release|Any CPU.Build.0 = Release|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution - {E007586F-9760-4744-BB25-EDEFD6BA860C} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} + {E007586F-9760-4744-BB25-EDEFD6BA860C} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} + {1EE24177-570E-465D-87C1-E0030FD69104} = {0AB32DDD-DD46-4C2C-BDFE-3B5D0D8DC1F3} + {176DFEB7-A517-46EA-B632-2C3BAF67D25F} = {1EE24177-570E-465D-87C1-E0030FD69104} EndGlobalSection EndGlobal From 8ff8947d3fd034258bf63721e106b716d1478f35 Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Mon, 9 Oct 2023 09:24:08 +0300 Subject: [PATCH 02/13] Init --- C#/forSpbu/MyThreadPool/MyTask.cs | 50 ++++++++++++++++ C#/forSpbu/MyThreadPool/MyThread.cs | 35 ------------ .../MyThreadPool/MyThreadCreationException.cs | 8 +++ C#/forSpbu/MyThreadPool/MyThreadPool.cs | 57 +++++++++++++++---- 4 files changed, 104 insertions(+), 46 deletions(-) delete mode 100644 C#/forSpbu/MyThreadPool/MyThread.cs create mode 100644 C#/forSpbu/MyThreadPool/MyThreadCreationException.cs diff --git a/C#/forSpbu/MyThreadPool/MyTask.cs b/C#/forSpbu/MyThreadPool/MyTask.cs index 48a6037..f7d6f07 100644 --- a/C#/forSpbu/MyThreadPool/MyTask.cs +++ b/C#/forSpbu/MyThreadPool/MyTask.cs @@ -4,6 +4,56 @@ public class MyTask : IMyTask { public TResult Result() { + //while (!this.IsCompleted) {} should be blocked with lock + + if (this._threwException) + { + throw this._exception; + } + + return this._result; + } + + public IMyTask ContinueWith(Func nextDelegate) + { + var nextTask = new MyTask(); + + void NextAction() + { + try + { + var nextResult = nextDelegate(this._result); + nextTask.FuncFinished(nextResult); + } + catch (Exception e) + { + nextTask.FuncFinished(e); + } + } + this.NextActions.Add(NextAction); + return nextTask; + } + + public void FuncFinished(TResult result) + { + if (this.IsCompleted) return; + this.IsCompleted = true; + this._threwException = false; + this._result = result; + } + + public void FuncFinished(Exception exception) + { + if (this.IsCompleted) return; + this.IsCompleted = true; + this._threwException = true; + this._exception = new AggregateException(this._exception); } + + private volatile Exception _exception = new AggregateException(); + private TResult? _result; + private volatile bool _threwException; + public bool IsCompleted { get; private set; } + public readonly List NextActions = new (); } \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThread.cs b/C#/forSpbu/MyThreadPool/MyThread.cs deleted file mode 100644 index 01ef8c2..0000000 --- a/C#/forSpbu/MyThreadPool/MyThread.cs +++ /dev/null @@ -1,35 +0,0 @@ -namespace MyThreadPool; - -public class MyThread -{ - public MyThread() - { - this._thread = new Thread(LifeCycle); - } - - private void LifeCycle() - { - while (!_isTerminated) - { - - } - } - - /// - /// Stops the process sharply - /// - public void Terminate() - { - this._isTerminated = true; - } - - public void Execute(Func task) - { - - } - - private volatile bool _isTerminated; - private volatile bool _isKilled; - - private Thread _thread; -} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadCreationException.cs b/C#/forSpbu/MyThreadPool/MyThreadCreationException.cs new file mode 100644 index 0000000..5ba042e --- /dev/null +++ b/C#/forSpbu/MyThreadPool/MyThreadCreationException.cs @@ -0,0 +1,8 @@ +namespace MyThreadPool; + +public class MyThreadCreationException : Exception +{ + public MyThreadCreationException() + { + } +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 663b2d8..f92cc7d 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -1,4 +1,6 @@ -namespace MyThreadPool; +using System.Collections.Concurrent; + +namespace MyThreadPool; public class MyThreadPool { @@ -9,20 +11,53 @@ public MyThreadPool(int size) throw new MyThreadCreationException(); } - this._threads = new Thread[size]; - this._semaphore = new Semaphore(0, size); + this._threads = new Thread[size]; + for (int i = 0; i < size; i++) + { + this._threads[i] = new Thread(() => + { + while (true) + { + this._functions.TryDequeue(out var func); + func?.Invoke(); + } + }); + } } public IMyTask Submit(Func func) { - _semaphore.WaitOne(); - - + var finishEvent = new ManualResetEvent(false); + var task = new MyTask(); + this._functions.Enqueue((() => + { + lock(task) + { + try + { + var result = func(); + task.FuncFinished(result); + } + catch (Exception e) + { + task.FuncFinished(e); + } + finally + { + finishEvent.Set(); + } + } + }, finishEvent)); + return task; } - - public int Size => this._threads.Length; - private Semaphore _semaphore; - private I[] _mutexes; - private Thread[] _threads; + public void Shutdown() + { + _isTerminated = true; + } + + private readonly ConcurrentQueue<(Action action, ManualResetEvent finishEvent)> _functions = new (); + private readonly Thread[] _threads; + private Semaphore stop; + private bool _isTerminated; } \ No newline at end of file From c475034ecb30d89b7e47edc928d4a53f96b44c9c Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Sun, 29 Oct 2023 03:46:26 +0300 Subject: [PATCH 03/13] Fixed thread pool --- C#/forSpbu/MyThreadPool/MyTask.cs | 67 ++++++++++++++++--------- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 34 ++++++------- 2 files changed, 56 insertions(+), 45 deletions(-) diff --git a/C#/forSpbu/MyThreadPool/MyTask.cs b/C#/forSpbu/MyThreadPool/MyTask.cs index f7d6f07..6148dfd 100644 --- a/C#/forSpbu/MyThreadPool/MyTask.cs +++ b/C#/forSpbu/MyThreadPool/MyTask.cs @@ -2,58 +2,75 @@ public class MyTask : IMyTask { + public MyTask(Func func, MyThreadPool threadPool) + { + this._func = func; + this._threadPool = threadPool; + } + public TResult Result() { - //while (!this.IsCompleted) {} should be blocked with lock + this.Execute(); if (this._threwException) { throw this._exception; } - return this._result; + return this._result!; } - public IMyTask ContinueWith(Func nextDelegate) + public void Execute() { - var nextTask = new MyTask(); - - void NextAction() + if (this.IsCompleted) return; + lock (this._func!) { + if (this.IsCompleted) return; try { - var nextResult = nextDelegate(this._result); - nextTask.FuncFinished(nextResult); + var result = this._func(); + this._threwException = false; + this._result = result; } catch (Exception e) { - nextTask.FuncFinished(e); + this._threwException = true; + this._exception = new AggregateException(e); + } + finally + { + this.IsCompleted = true; + this._func = null; } } - this.NextActions.Add(NextAction); - - return nextTask; } + - public void FuncFinished(TResult result) + public IMyTask ContinueWith(Func nextDelegate) { - if (this.IsCompleted) return; - this.IsCompleted = true; - this._threwException = false; - this._result = result; + lock (this.NextTasks) + { + if (this.IsCompleted) + { + return _threadPool.Submit(() => nextDelegate(this._result!)); + } + + var nextTask = new MyTask(() => nextDelegate(this._result!), _threadPool); + this.NextTasks.Add(() => + { + nextTask.Execute(); + }); + + return nextTask; + } } - public void FuncFinished(Exception exception) - { - if (this.IsCompleted) return; - this.IsCompleted = true; - this._threwException = true; - this._exception = new AggregateException(this._exception); - } + private Func? _func; + private readonly MyThreadPool _threadPool; private volatile Exception _exception = new AggregateException(); private TResult? _result; private volatile bool _threwException; public bool IsCompleted { get; private set; } - public readonly List NextActions = new (); + public readonly List NextTasks = new (); } \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index f92cc7d..8ba760c 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -16,7 +16,7 @@ public MyThreadPool(int size) { this._threads[i] = new Thread(() => { - while (true) + while (!this._isTerminated) { this._functions.TryDequeue(out var func); func?.Invoke(); @@ -27,37 +27,31 @@ public MyThreadPool(int size) public IMyTask Submit(Func func) { - var finishEvent = new ManualResetEvent(false); - var task = new MyTask(); - this._functions.Enqueue((() => + var task = new MyTask(func, this); + this._functions.Enqueue(() => { - lock(task) + task.Execute(); + lock (task.NextTasks) { - try + foreach (var taskDelegate in task.NextTasks) { - var result = func(); - task.FuncFinished(result); - } - catch (Exception e) - { - task.FuncFinished(e); - } - finally - { - finishEvent.Set(); + this._functions.Enqueue(taskDelegate); } } - }, finishEvent)); + }); return task; } public void Shutdown() { _isTerminated = true; + foreach (var thread in _threads) + { + thread.Join(); + } } - private readonly ConcurrentQueue<(Action action, ManualResetEvent finishEvent)> _functions = new (); + private readonly ConcurrentQueue _functions = new (); private readonly Thread[] _threads; - private Semaphore stop; - private bool _isTerminated; + private volatile bool _isTerminated; } \ No newline at end of file From 7d67e90c1d323bbc3479829b417aaea35d2d39d5 Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Sun, 29 Oct 2023 03:58:46 +0300 Subject: [PATCH 04/13] Added comments --- C#/forSpbu/MyThreadPool/IMyTask.cs | 17 +++++++++++++++++ C#/forSpbu/MyThreadPool/MyTask.cs | 2 +- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 17 +++++++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/C#/forSpbu/MyThreadPool/IMyTask.cs b/C#/forSpbu/MyThreadPool/IMyTask.cs index 88f38e6..c31ae6f 100644 --- a/C#/forSpbu/MyThreadPool/IMyTask.cs +++ b/C#/forSpbu/MyThreadPool/IMyTask.cs @@ -1,10 +1,27 @@ namespace MyThreadPool; +/// +/// Interface for my thread pool tasks +/// +/// Task return value public interface IMyTask { + /// + /// Whether task is completed + /// public bool IsCompleted { get; } + /// + /// Returns task result, blocks called thread + /// + /// task result public TResult Result(); + /// + /// Continues task with given function, current task result is used as a parameter for the new one + /// + /// New task delegate + /// New task return type + /// public IMyTask ContinueWith(Func nextDelegate); } \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyTask.cs b/C#/forSpbu/MyThreadPool/MyTask.cs index 6148dfd..c30961c 100644 --- a/C#/forSpbu/MyThreadPool/MyTask.cs +++ b/C#/forSpbu/MyThreadPool/MyTask.cs @@ -1,6 +1,6 @@ namespace MyThreadPool; -public class MyTask : IMyTask +internal class MyTask : IMyTask { public MyTask(Func func, MyThreadPool threadPool) { diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 8ba760c..a19bddf 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -2,8 +2,16 @@ namespace MyThreadPool; +/// +/// Thread pool for concurrent delegate calculation +/// public class MyThreadPool { + /// + /// Creates my thread pool with given number of threads + /// + /// Thread count + /// If size is invalid public MyThreadPool(int size) { if (size <= 0) @@ -25,6 +33,12 @@ public MyThreadPool(int size) } } + /// + /// Add new func to a pool queue + /// + /// Func to add + /// Func return type + /// I my task instance of one added to pool public IMyTask Submit(Func func) { var task = new MyTask(func, this); @@ -42,6 +56,9 @@ public IMyTask Submit(Func func) return task; } + /// + /// Softly shuts down a pool, blocks called thread until all running tasks finished + /// public void Shutdown() { _isTerminated = true; From 20a440c670497057e1da2608a1e6ab1a6212c8df Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Sun, 29 Oct 2023 16:19:43 +0300 Subject: [PATCH 05/13] Added tests --- .../MyThreadPool.Test.csproj | 24 +++ .../MyThreadPool.Test/MyThreadPoolTests.cs | 140 ++++++++++++++++++ C#/forSpbu/MyThreadPool.Test/Usings.cs | 1 + C#/forSpbu/MyThreadPool/MyTask.cs | 76 ---------- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 97 +++++++++++- C#/forSpbu/forSpbu.sln | 7 + 6 files changed, 263 insertions(+), 82 deletions(-) create mode 100644 C#/forSpbu/MyThreadPool.Test/MyThreadPool.Test.csproj create mode 100644 C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs create mode 100644 C#/forSpbu/MyThreadPool.Test/Usings.cs delete mode 100644 C#/forSpbu/MyThreadPool/MyTask.cs diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPool.Test.csproj b/C#/forSpbu/MyThreadPool.Test/MyThreadPool.Test.csproj new file mode 100644 index 0000000..659333f --- /dev/null +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPool.Test.csproj @@ -0,0 +1,24 @@ + + + + net7.0 + enable + enable + + false + true + + + + + + + + + + + + + + + diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs new file mode 100644 index 0000000..2f26510 --- /dev/null +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs @@ -0,0 +1,140 @@ +namespace MyThreadPool.Test; + +public class MyThreadPoolTests +{ + private class PoolTester + { + public int Fst() + { + Counter++; + var result = 0; + for (int i = 0; i < 1000; i++) + { + result += i; + } + + return result; + } + + public int Counter { get; private set; } + } + + private static int TestFunc() + { + var result = 0; + for (int i = 0; i < 1000; i++) + { + result += i; + } + + return result; + } + + private MyThreadPool? _pool; + private int _procAmount; + private int _threadsAmount; + + [SetUp] + public void Setup() + { + this._threadsAmount = 4; + this._procAmount = System.Diagnostics.Process.GetCurrentProcess().Threads.Count; + this._pool = new MyThreadPool(this._threadsAmount); + } + + [Test] + public void SubmitResultTest() + { + Assert.That(_pool!.Submit(TestFunc).Result(), Is.EqualTo(499500)); + } + + [Test] + public void SubmitContinueWithResultTest() + { + Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).Result(), Is.EqualTo(499501)); + } + + [Test] + public void DoubleContinueWithResultTest() + { + Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).ContinueWith((int result) => result.ToString()).Result(), Is.EqualTo("499501")); + } + + [Test] + public void SubmitResultContinueWithResultTest() + { + var task = _pool!.Submit(TestFunc); + task.Result(); + + Assert.That(task.ContinueWith((int result) => result + 1).Result(), Is.EqualTo(499501)); + } + + [Test] + public void LazyResultTest() + { + var tester = new PoolTester(); + var task = _pool!.Submit(tester.Fst); + + task.Result(); + Assert.That(tester.Counter, Is.EqualTo(1)); + task.Result(); + Assert.That(tester.Counter, Is.EqualTo(1)); + } + + [Test] + public void IsCompletedTest() + { + var startEvent = new ManualResetEvent(false); + var task = _pool!.Submit(() => + { + startEvent.WaitOne(); + return TestFunc(); + }); + + Assert.That(task.IsCompleted, Is.False); + startEvent.Set(); + task.Result(); + Assert.That(task.IsCompleted, Is.True); + task.Result(); + Assert.That(task.IsCompleted, Is.True); + } + + [Test] + public void ShutdownTest() + { + var fstTask = _pool!.Submit(TestFunc); + var secTask = _pool!.Submit(TestFunc); + _pool!.Shutdown(); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount)); + + Assert.Multiple(() => + { + Assert.That(fstTask.IsCompleted, Is.True); + Assert.That(secTask.IsCompleted, Is.True); + }); + + _pool!.Submit(TestFunc); + fstTask.ContinueWith((int result) => result + 1); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount)); + } + + [Test] + public void DoubleContinueWithTest() + { + var fstTask = _pool!.Submit(TestFunc); + var fstContinueTask = fstTask.ContinueWith((int result) => result + 1); + var secContinueTask = fstTask.ContinueWith((int result) => result.ToString()); + + Assert.Multiple(() => + { + Assert.That(fstContinueTask.Result(), Is.EqualTo(499501)); + Assert.That(secContinueTask.Result(), Is.EqualTo("499500")); + }); + } + + [Test] + public void ThreadsCountTest() + { + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount + 4)); + } +} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool.Test/Usings.cs b/C#/forSpbu/MyThreadPool.Test/Usings.cs new file mode 100644 index 0000000..cefced4 --- /dev/null +++ b/C#/forSpbu/MyThreadPool.Test/Usings.cs @@ -0,0 +1 @@ +global using NUnit.Framework; \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyTask.cs b/C#/forSpbu/MyThreadPool/MyTask.cs deleted file mode 100644 index c30961c..0000000 --- a/C#/forSpbu/MyThreadPool/MyTask.cs +++ /dev/null @@ -1,76 +0,0 @@ -namespace MyThreadPool; - -internal class MyTask : IMyTask -{ - public MyTask(Func func, MyThreadPool threadPool) - { - this._func = func; - this._threadPool = threadPool; - } - - public TResult Result() - { - this.Execute(); - - if (this._threwException) - { - throw this._exception; - } - - return this._result!; - } - - public void Execute() - { - if (this.IsCompleted) return; - lock (this._func!) - { - if (this.IsCompleted) return; - try - { - var result = this._func(); - this._threwException = false; - this._result = result; - } - catch (Exception e) - { - this._threwException = true; - this._exception = new AggregateException(e); - } - finally - { - this.IsCompleted = true; - this._func = null; - } - } - } - - - public IMyTask ContinueWith(Func nextDelegate) - { - lock (this.NextTasks) - { - if (this.IsCompleted) - { - return _threadPool.Submit(() => nextDelegate(this._result!)); - } - - var nextTask = new MyTask(() => nextDelegate(this._result!), _threadPool); - this.NextTasks.Add(() => - { - nextTask.Execute(); - }); - - return nextTask; - } - } - - private Func? _func; - private readonly MyThreadPool _threadPool; - - private volatile Exception _exception = new AggregateException(); - private TResult? _result; - private volatile bool _threwException; - public bool IsCompleted { get; private set; } - public readonly List NextTasks = new (); -} \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index a19bddf..ecf3eaa 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -5,8 +5,84 @@ namespace MyThreadPool; /// /// Thread pool for concurrent delegate calculation /// -public class MyThreadPool +public class MyThreadPool : IDisposable { + private class MyTask : IMyTask + { + public MyTask(Func func, MyThreadPool threadPool) + { + this._func = func; + this._threadPool = threadPool; + } + + public TResult Result() + { + while (!Volatile.Read(ref this._isCompleted)) + { + } + + if (this._threwException) + { + throw this._exception; + } + + return this._result!; + } + + public void Execute() + { + if (Volatile.Read(ref this._isCompleted)) return; + lock (this._func!) + { + if (Volatile.Read(ref this._isCompleted)) return; + try + { + var result = this._func(); + this._threwException = false; + this._result = result; + } + catch (Exception e) + { + this._threwException = true; + this._exception = new AggregateException(e); + } + finally + { + Volatile.Write(ref this._isCompleted, true); + this._func = null; + } + } + } + + + public IMyTask ContinueWith(Func nextDelegate) + { + lock (Volatile.Read(ref this.NextTasks)) + { + if (Volatile.Read(ref this._isCompleted)) + { + return this._threadPool.Submit(() => nextDelegate(this._result!)); + } + + var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); + Volatile.Read(ref this.NextTasks).Add(nextTask.Execute); + + return nextTask; + } + } + + private volatile Func? _func; + private readonly MyThreadPool _threadPool; + + private volatile Exception _exception = new AggregateException(); + private TResult? _result; + private volatile bool _threwException; + private bool _isCompleted; + + public bool IsCompleted => this._isCompleted; + public ConcurrentBag NextTasks = new (); + } + /// /// Creates my thread pool with given number of threads /// @@ -27,9 +103,13 @@ public MyThreadPool(int size) while (!this._isTerminated) { this._functions.TryDequeue(out var func); - func?.Invoke(); + if (func != null) + { + func.Invoke(); + } } }); + this._threads[i].Start(); } } @@ -45,9 +125,9 @@ public IMyTask Submit(Func func) this._functions.Enqueue(() => { task.Execute(); - lock (task.NextTasks) + lock (Volatile.Read(ref task.NextTasks)) { - foreach (var taskDelegate in task.NextTasks) + foreach (var taskDelegate in Volatile.Read(ref task.NextTasks)) { this._functions.Enqueue(taskDelegate); } @@ -57,7 +137,7 @@ public IMyTask Submit(Func func) } /// - /// Softly shuts down a pool, blocks called thread until all running tasks finished + /// Softly shuts down the pool, blocks called thread until all running tasks finished /// public void Shutdown() { @@ -67,7 +147,12 @@ public void Shutdown() thread.Join(); } } - + + /// + /// Shuts down the pool + /// + public void Dispose() => this.Shutdown(); + private readonly ConcurrentQueue _functions = new (); private readonly Thread[] _threads; private volatile bool _isTerminated; diff --git a/C#/forSpbu/forSpbu.sln b/C#/forSpbu/forSpbu.sln index b7b5f9c..b71ffd6 100644 --- a/C#/forSpbu/forSpbu.sln +++ b/C#/forSpbu/forSpbu.sln @@ -16,6 +16,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "21.09", "21.09", "{1EE24177 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{176DFEB7-A517-46EA-B632-2C3BAF67D25F}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool.Test", "MyThreadPool.Test\MyThreadPool.Test.csproj", "{7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -34,11 +36,16 @@ Global {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Debug|Any CPU.Build.0 = Debug|Any CPU {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Release|Any CPU.ActiveCfg = Release|Any CPU {176DFEB7-A517-46EA-B632-2C3BAF67D25F}.Release|Any CPU.Build.0 = Release|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {E007586F-9760-4744-BB25-EDEFD6BA860C} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} {A4F6ADD5-85FD-4F67-8B29-549DDDF6F82E} = {D3FCB669-E93F-4F0B-B9C5-6592CE93AC7F} {1EE24177-570E-465D-87C1-E0030FD69104} = {0AB32DDD-DD46-4C2C-BDFE-3B5D0D8DC1F3} {176DFEB7-A517-46EA-B632-2C3BAF67D25F} = {1EE24177-570E-465D-87C1-E0030FD69104} + {7CDF4F68-A43F-4A5B-8A43-2E464CA14B5A} = {1EE24177-570E-465D-87C1-E0030FD69104} EndGlobalSection EndGlobal From cc1c9db741a9b47aa82344a6c8956f796c09d4a1 Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Sun, 29 Oct 2023 16:26:51 +0300 Subject: [PATCH 06/13] Added exception test --- C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs index 2f26510..4102b57 100644 --- a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs @@ -81,6 +81,15 @@ public void LazyResultTest() Assert.That(tester.Counter, Is.EqualTo(1)); } + [Test] + public void ExceptionResultTest() + { + var tester = new PoolTester(); + var task = _pool!.Submit(() => throw new NotImplementedException()); + + Assert.Throws(() => task.Result()); + } + [Test] public void IsCompletedTest() { From 4386c9d66e75953da3385a0e7994baabab4423f2 Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Sun, 29 Oct 2023 16:30:20 +0300 Subject: [PATCH 07/13] Removed unused volatile --- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index ecf3eaa..8a9bbaa 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -57,7 +57,7 @@ public void Execute() public IMyTask ContinueWith(Func nextDelegate) { - lock (Volatile.Read(ref this.NextTasks)) + lock (this.NextTasks) { if (Volatile.Read(ref this._isCompleted)) { @@ -65,7 +65,7 @@ public IMyTask ContinueWith(Func ne } var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); - Volatile.Read(ref this.NextTasks).Add(nextTask.Execute); + this.NextTasks.Add(nextTask.Execute); return nextTask; } @@ -80,7 +80,7 @@ public IMyTask ContinueWith(Func ne private bool _isCompleted; public bool IsCompleted => this._isCompleted; - public ConcurrentBag NextTasks = new (); + public readonly ConcurrentBag NextTasks = new (); } /// @@ -125,9 +125,9 @@ public IMyTask Submit(Func func) this._functions.Enqueue(() => { task.Execute(); - lock (Volatile.Read(ref task.NextTasks)) + lock (task.NextTasks) { - foreach (var taskDelegate in Volatile.Read(ref task.NextTasks)) + foreach (var taskDelegate in task.NextTasks) { this._functions.Enqueue(taskDelegate); } From cc28d0c4ebd8f1c343c171aeaac865a0bdfcce90 Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Sun, 29 Oct 2023 16:59:56 +0300 Subject: [PATCH 08/13] Fixed --- .../MyThreadPool.Test/MyThreadPoolTests.cs | 4 + C#/forSpbu/MyThreadPool/MyThreadPool.cs | 156 +++++++++--------- 2 files changed, 82 insertions(+), 78 deletions(-) diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs index 4102b57..c84fe89 100644 --- a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs @@ -38,6 +38,7 @@ private static int TestFunc() public void Setup() { this._threadsAmount = 4; + System.Diagnostics.Process.GetCurrentProcess().Refresh(); this._procAmount = System.Diagnostics.Process.GetCurrentProcess().Threads.Count; this._pool = new MyThreadPool(this._threadsAmount); } @@ -114,6 +115,7 @@ public void ShutdownTest() var fstTask = _pool!.Submit(TestFunc); var secTask = _pool!.Submit(TestFunc); _pool!.Shutdown(); + System.Diagnostics.Process.GetCurrentProcess().Refresh(); Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount)); Assert.Multiple(() => @@ -124,6 +126,7 @@ public void ShutdownTest() _pool!.Submit(TestFunc); fstTask.ContinueWith((int result) => result + 1); + System.Diagnostics.Process.GetCurrentProcess().Refresh(); Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount)); } @@ -144,6 +147,7 @@ public void DoubleContinueWithTest() [Test] public void ThreadsCountTest() { + System.Diagnostics.Process.GetCurrentProcess().Refresh(); Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount + 4)); } } \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 8a9bbaa..1139903 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -7,81 +7,9 @@ namespace MyThreadPool; /// public class MyThreadPool : IDisposable { - private class MyTask : IMyTask - { - public MyTask(Func func, MyThreadPool threadPool) - { - this._func = func; - this._threadPool = threadPool; - } - - public TResult Result() - { - while (!Volatile.Read(ref this._isCompleted)) - { - } - - if (this._threwException) - { - throw this._exception; - } - - return this._result!; - } - - public void Execute() - { - if (Volatile.Read(ref this._isCompleted)) return; - lock (this._func!) - { - if (Volatile.Read(ref this._isCompleted)) return; - try - { - var result = this._func(); - this._threwException = false; - this._result = result; - } - catch (Exception e) - { - this._threwException = true; - this._exception = new AggregateException(e); - } - finally - { - Volatile.Write(ref this._isCompleted, true); - this._func = null; - } - } - } - - - public IMyTask ContinueWith(Func nextDelegate) - { - lock (this.NextTasks) - { - if (Volatile.Read(ref this._isCompleted)) - { - return this._threadPool.Submit(() => nextDelegate(this._result!)); - } - - var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); - this.NextTasks.Add(nextTask.Execute); - - return nextTask; - } - } - - private volatile Func? _func; - private readonly MyThreadPool _threadPool; - - private volatile Exception _exception = new AggregateException(); - private TResult? _result; - private volatile bool _threwException; - private bool _isCompleted; - - public bool IsCompleted => this._isCompleted; - public readonly ConcurrentBag NextTasks = new (); - } + private readonly ConcurrentQueue _functions = new (); + private readonly Thread[] _threads; + private volatile bool _isTerminated; /// /// Creates my thread pool with given number of threads @@ -152,8 +80,80 @@ public void Shutdown() /// Shuts down the pool /// public void Dispose() => this.Shutdown(); + + private class MyTask : IMyTask + { + private volatile Func? _func; + private readonly MyThreadPool _threadPool; + + private volatile Exception _exception = new AggregateException(); + private TResult? _result; + private volatile bool _threwException; + private bool _isCompleted; - private readonly ConcurrentQueue _functions = new (); - private readonly Thread[] _threads; - private volatile bool _isTerminated; + public bool IsCompleted => this._isCompleted; + public readonly ConcurrentBag NextTasks = new (); + + public MyTask(Func func, MyThreadPool threadPool) + { + this._func = func; + this._threadPool = threadPool; + } + + public TResult Result() + { + while (!Volatile.Read(ref this._isCompleted)) + { + } + + if (this._threwException) + { + throw this._exception; + } + + return this._result!; + } + + public void Execute() + { + if (Volatile.Read(ref this._isCompleted)) return; + lock (this._func!) + { + if (Volatile.Read(ref this._isCompleted)) return; + try + { + var result = this._func(); + this._threwException = false; + this._result = result; + } + catch (Exception e) + { + this._threwException = true; + this._exception = new AggregateException(e); + } + finally + { + Volatile.Write(ref this._isCompleted, true); + this._func = null; + } + } + } + + + public IMyTask ContinueWith(Func nextDelegate) + { + lock (this.NextTasks) + { + if (Volatile.Read(ref this._isCompleted)) + { + return this._threadPool.Submit(() => nextDelegate(this._result!)); + } + + var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); + this.NextTasks.Add(nextTask.Execute); + + return nextTask; + } + } + } } \ No newline at end of file From c6186238ab800d8a1b1159ec16991d5c0c3e0079 Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Sun, 29 Oct 2023 17:05:26 +0300 Subject: [PATCH 09/13] Fixed --- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 1139903..8a8e517 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -53,9 +53,9 @@ public IMyTask Submit(Func func) this._functions.Enqueue(() => { task.Execute(); - lock (task.NextTasks) + lock (Volatile.Read(ref task.NextTasks)) { - foreach (var taskDelegate in task.NextTasks) + foreach (var taskDelegate in Volatile.Read(ref task.NextTasks)) { this._functions.Enqueue(taskDelegate); } @@ -92,7 +92,7 @@ private class MyTask : IMyTask private bool _isCompleted; public bool IsCompleted => this._isCompleted; - public readonly ConcurrentBag NextTasks = new (); + public ConcurrentBag NextTasks = new (); public MyTask(Func func, MyThreadPool threadPool) { @@ -142,7 +142,7 @@ public void Execute() public IMyTask ContinueWith(Func nextDelegate) { - lock (this.NextTasks) + lock (Volatile.Read(ref this.NextTasks)) { if (Volatile.Read(ref this._isCompleted)) { @@ -150,7 +150,7 @@ public IMyTask ContinueWith(Func ne } var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); - this.NextTasks.Add(nextTask.Execute); + Volatile.Read(ref this.NextTasks).Add(nextTask.Execute); return nextTask; } From 17a8d17230cc6f74715faf109ab980547d18e56d Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Fri, 29 Dec 2023 17:56:45 +0300 Subject: [PATCH 10/13] Changed --- .../MyThreadPool.Test/MyThreadPoolTests.cs | 18 +-- C#/forSpbu/MyThreadPool/IMyTask.cs | 2 +- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 120 +++++++++++------- 3 files changed, 82 insertions(+), 58 deletions(-) diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs index c84fe89..865b76c 100644 --- a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs @@ -4,6 +4,8 @@ public class MyThreadPoolTests { private class PoolTester { + public int Counter { get; private set; } + public int Fst() { Counter++; @@ -15,8 +17,6 @@ public int Fst() return result; } - - public int Counter { get; private set; } } private static int TestFunc() @@ -31,15 +31,15 @@ private static int TestFunc() } private MyThreadPool? _pool; - private int _procAmount; + private int _processThreadsAmount; private int _threadsAmount; [SetUp] public void Setup() { - this._threadsAmount = 4; System.Diagnostics.Process.GetCurrentProcess().Refresh(); - this._procAmount = System.Diagnostics.Process.GetCurrentProcess().Threads.Count; + this._processThreadsAmount = System.Diagnostics.Process.GetCurrentProcess().Threads.Count; + this._threadsAmount = 4; this._pool = new MyThreadPool(this._threadsAmount); } @@ -116,7 +116,7 @@ public void ShutdownTest() var secTask = _pool!.Submit(TestFunc); _pool!.Shutdown(); System.Diagnostics.Process.GetCurrentProcess().Refresh(); - Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount)); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_processThreadsAmount)); Assert.Multiple(() => { @@ -127,7 +127,7 @@ public void ShutdownTest() _pool!.Submit(TestFunc); fstTask.ContinueWith((int result) => result + 1); System.Diagnostics.Process.GetCurrentProcess().Refresh(); - Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount)); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_processThreadsAmount)); } [Test] @@ -148,6 +148,8 @@ public void DoubleContinueWithTest() public void ThreadsCountTest() { System.Diagnostics.Process.GetCurrentProcess().Refresh(); - Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_procAmount + 4)); + Assert.That(System.Diagnostics.Process.GetCurrentProcess().Threads, Has.Count.EqualTo(_processThreadsAmount + 4)); } + + } \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/IMyTask.cs b/C#/forSpbu/MyThreadPool/IMyTask.cs index c31ae6f..ca12e4e 100644 --- a/C#/forSpbu/MyThreadPool/IMyTask.cs +++ b/C#/forSpbu/MyThreadPool/IMyTask.cs @@ -22,6 +22,6 @@ public interface IMyTask /// /// New task delegate /// New task return type - /// + /// Returns created task public IMyTask ContinueWith(Func nextDelegate); } \ No newline at end of file diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 8a8e517..497c879 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -7,9 +7,10 @@ namespace MyThreadPool; /// public class MyThreadPool : IDisposable { - private readonly ConcurrentQueue _functions = new (); + private readonly ConcurrentQueue _taskActions = new (); private readonly Thread[] _threads; - private volatile bool _isTerminated; + private readonly Semaphore _taskBlocker = new (0, int.MaxValue); + private readonly CancellationTokenSource _cancellation = new (); /// /// Creates my thread pool with given number of threads @@ -24,21 +25,29 @@ public MyThreadPool(int size) } this._threads = new Thread[size]; + var threadStart = new ManualResetEvent(false); for (int i = 0; i < size; i++) { this._threads[i] = new Thread(() => { - while (!this._isTerminated) + threadStart.WaitOne(); + while (true) { - this._functions.TryDequeue(out var func); - if (func != null) + this._taskBlocker.WaitOne(); + if (this._taskActions.TryDequeue(out var taskAction)) { - func.Invoke(); + taskAction.Invoke(); + } + + if (this._cancellation.IsCancellationRequested) + { + break; } } }); - this._threads[i].Start(); } + + threadStart.Set(); } /// @@ -50,17 +59,15 @@ public MyThreadPool(int size) public IMyTask Submit(Func func) { var task = new MyTask(func, this); - this._functions.Enqueue(() => + lock (this._cancellation) { - task.Execute(); - lock (Volatile.Read(ref task.NextTasks)) + if (this._cancellation.IsCancellationRequested) { - foreach (var taskDelegate in Volatile.Read(ref task.NextTasks)) - { - this._functions.Enqueue(taskDelegate); - } + this._taskActions.Enqueue(task.Execute); + this._taskBlocker.Release(); } - }); + } + return task; } @@ -69,8 +76,17 @@ public IMyTask Submit(Func func) /// public void Shutdown() { - _isTerminated = true; - foreach (var thread in _threads) + lock (this._cancellation) + { + this._cancellation.Cancel(); + } + + for (var i = 0; i < this._threads.Length; i++) + { + this._taskBlocker.Release(); + } + + foreach (var thread in this._threads) { thread.Join(); } @@ -85,14 +101,14 @@ private class MyTask : IMyTask { private volatile Func? _func; private readonly MyThreadPool _threadPool; - - private volatile Exception _exception = new AggregateException(); + private TResult? _result; + private volatile Exception _exception = new AggregateException(); private volatile bool _threwException; - private bool _isCompleted; + private readonly ManualResetEvent _completeEvent = new (false); + private readonly ConcurrentBag _nextTasks = new (); - public bool IsCompleted => this._isCompleted; - public ConcurrentBag NextTasks = new (); + public bool IsCompleted { get; private set; } public MyTask(Func func, MyThreadPool threadPool) { @@ -102,9 +118,7 @@ public MyTask(Func func, MyThreadPool threadPool) public TResult Result() { - while (!Volatile.Read(ref this._isCompleted)) - { - } + this._completeEvent.WaitOne(); if (this._threwException) { @@ -116,42 +130,50 @@ public TResult Result() public void Execute() { - if (Volatile.Read(ref this._isCompleted)) return; - lock (this._func!) + try { - if (Volatile.Read(ref this._isCompleted)) return; - try - { - var result = this._func(); - this._threwException = false; - this._result = result; - } - catch (Exception e) - { - this._threwException = true; - this._exception = new AggregateException(e); - } - finally + var result = this._func!(); + this._threwException = false; + this._result = result; + } + catch (Exception e) + { + this._threwException = true; + this._exception = new AggregateException(e); + } + finally + { + this._func = null; + this.IsCompleted = true; + this._completeEvent.Set(); + } + + lock(this._nextTasks) + { + foreach (var task in this._nextTasks) { - Volatile.Write(ref this._isCompleted, true); - this._func = null; - } + this._threadPool._taskActions.Enqueue(task); + } } } - - public IMyTask ContinueWith(Func nextDelegate) { - lock (Volatile.Read(ref this.NextTasks)) + lock (this._nextTasks) { - if (Volatile.Read(ref this._isCompleted)) + if (this.IsCompleted) { return this._threadPool.Submit(() => nextDelegate(this._result!)); } var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); - Volatile.Read(ref this.NextTasks).Add(nextTask.Execute); - + lock (this._threadPool._cancellation) + { + if (!this._threadPool._cancellation.IsCancellationRequested) + { + this._nextTasks.Add(nextTask.Execute); + } + } + return nextTask; } } From a3f6468b551d70953b1951dae81e2543962ff600 Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Fri, 29 Dec 2023 18:26:26 +0300 Subject: [PATCH 11/13] Changed --- .../MyThreadPool.Test/MyThreadPoolTests.cs | 27 ++++++++++--------- C#/forSpbu/MyThreadPool/IMyTask.cs | 4 +-- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 12 ++------- 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs index 865b76c..2f0cbac 100644 --- a/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs +++ b/C#/forSpbu/MyThreadPool.Test/MyThreadPoolTests.cs @@ -46,28 +46,28 @@ public void Setup() [Test] public void SubmitResultTest() { - Assert.That(_pool!.Submit(TestFunc).Result(), Is.EqualTo(499500)); + Assert.That(_pool!.Submit(TestFunc).Result, Is.EqualTo(499500)); } [Test] public void SubmitContinueWithResultTest() { - Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).Result(), Is.EqualTo(499501)); + Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).Result, Is.EqualTo(499501)); } [Test] public void DoubleContinueWithResultTest() { - Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).ContinueWith((int result) => result.ToString()).Result(), Is.EqualTo("499501")); + Assert.That(_pool!.Submit(TestFunc).ContinueWith((int result) => result + 1).ContinueWith((int result) => result.ToString()).Result, Is.EqualTo("499501")); } [Test] public void SubmitResultContinueWithResultTest() { var task = _pool!.Submit(TestFunc); - task.Result(); - Assert.That(task.ContinueWith((int result) => result + 1).Result(), Is.EqualTo(499501)); + + Assert.That(task.ContinueWith((int result) => result + 1).Result, Is.EqualTo(499501)); } [Test] @@ -76,9 +76,9 @@ public void LazyResultTest() var tester = new PoolTester(); var task = _pool!.Submit(tester.Fst); - task.Result(); + var res = task.Result; Assert.That(tester.Counter, Is.EqualTo(1)); - task.Result(); + res = task.Result; Assert.That(tester.Counter, Is.EqualTo(1)); } @@ -88,7 +88,10 @@ public void ExceptionResultTest() var tester = new PoolTester(); var task = _pool!.Submit(() => throw new NotImplementedException()); - Assert.Throws(() => task.Result()); + Assert.Throws(() => + { + var _ = task.Result; + }); } [Test] @@ -103,9 +106,9 @@ public void IsCompletedTest() Assert.That(task.IsCompleted, Is.False); startEvent.Set(); - task.Result(); + var res = task.Result; Assert.That(task.IsCompleted, Is.True); - task.Result(); + res = task.Result; Assert.That(task.IsCompleted, Is.True); } @@ -139,8 +142,8 @@ public void DoubleContinueWithTest() Assert.Multiple(() => { - Assert.That(fstContinueTask.Result(), Is.EqualTo(499501)); - Assert.That(secContinueTask.Result(), Is.EqualTo("499500")); + Assert.That(fstContinueTask.Result, Is.EqualTo(499501)); + Assert.That(secContinueTask.Result, Is.EqualTo("499500")); }); } diff --git a/C#/forSpbu/MyThreadPool/IMyTask.cs b/C#/forSpbu/MyThreadPool/IMyTask.cs index ca12e4e..0dc54cc 100644 --- a/C#/forSpbu/MyThreadPool/IMyTask.cs +++ b/C#/forSpbu/MyThreadPool/IMyTask.cs @@ -12,10 +12,10 @@ public interface IMyTask public bool IsCompleted { get; } /// - /// Returns task result, blocks called thread + /// Task result, blocks called thread /// /// task result - public TResult Result(); + public TResult Result { get; } /// /// Continues task with given function, current task result is used as a parameter for the new one diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 497c879..402898d 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -116,17 +116,9 @@ public MyTask(Func func, MyThreadPool threadPool) this._threadPool = threadPool; } - public TResult Result() - { - this._completeEvent.WaitOne(); + public TResult Result => + this._completeEvent.WaitOne() && this._threwException ? throw this._exception : this._result!; - if (this._threwException) - { - throw this._exception; - } - - return this._result!; - } public void Execute() { From 430fe48afdb503d2dc72b1f6332dff6264bd3dcd Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Fri, 29 Dec 2023 19:18:47 +0300 Subject: [PATCH 12/13] Fixed --- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 402898d..34759dc 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -45,6 +45,7 @@ public MyThreadPool(int size) } } }); + this._threads[i].Start(); } threadStart.Set(); @@ -61,7 +62,7 @@ public IMyTask Submit(Func func) var task = new MyTask(func, this); lock (this._cancellation) { - if (this._cancellation.IsCancellationRequested) + if (!this._cancellation.IsCancellationRequested) { this._taskActions.Enqueue(task.Execute); this._taskBlocker.Release(); @@ -139,12 +140,13 @@ public void Execute() this.IsCompleted = true; this._completeEvent.Set(); } - + Console.WriteLine("Completed"); lock(this._nextTasks) { foreach (var task in this._nextTasks) { this._threadPool._taskActions.Enqueue(task); + Console.WriteLine("Added in execute"); } } } @@ -155,6 +157,7 @@ public IMyTask ContinueWith(Func ne if (this.IsCompleted) { return this._threadPool.Submit(() => nextDelegate(this._result!)); + Console.WriteLine("Added after Completed"); } var nextTask = new MyTask(() => nextDelegate(this._result!), this._threadPool); @@ -163,6 +166,7 @@ public IMyTask ContinueWith(Func ne if (!this._threadPool._cancellation.IsCancellationRequested) { this._nextTasks.Add(nextTask.Execute); + Console.WriteLine("Queued"); } } From 23ce9072b30855b0f437b4e645d19992df78cb7a Mon Sep 17 00:00:00 2001 From: IgnatSergeev Date: Fri, 29 Dec 2023 19:42:03 +0300 Subject: [PATCH 13/13] Fixed compiler warning --- C#/forSpbu/MyThreadPool/MyThreadPool.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/C#/forSpbu/MyThreadPool/MyThreadPool.cs b/C#/forSpbu/MyThreadPool/MyThreadPool.cs index 34759dc..7c91e33 100644 --- a/C#/forSpbu/MyThreadPool/MyThreadPool.cs +++ b/C#/forSpbu/MyThreadPool/MyThreadPool.cs @@ -140,13 +140,11 @@ public void Execute() this.IsCompleted = true; this._completeEvent.Set(); } - Console.WriteLine("Completed"); lock(this._nextTasks) { foreach (var task in this._nextTasks) { this._threadPool._taskActions.Enqueue(task); - Console.WriteLine("Added in execute"); } } } @@ -166,7 +164,6 @@ public IMyTask ContinueWith(Func ne if (!this._threadPool._cancellation.IsCancellationRequested) { this._nextTasks.Add(nextTask.Execute); - Console.WriteLine("Queued"); } }