diff --git a/ThreadPool/ThreadPool.sln b/ThreadPool/ThreadPool.sln new file mode 100644 index 0000000..9bef84d --- /dev/null +++ b/ThreadPool/ThreadPool.sln @@ -0,0 +1,22 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ThreadPool", "ThreadPool\ThreadPool.csproj", "{E0D93E78-090C-4C9B-B545-DBB1534BC37D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ThreadPoolTest", "ThreadPoolTest\ThreadPoolTest.csproj", "{AE21AEB8-EC3E-4D26-A154-E8C11BA4F93E}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {E0D93E78-090C-4C9B-B545-DBB1534BC37D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E0D93E78-090C-4C9B-B545-DBB1534BC37D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E0D93E78-090C-4C9B-B545-DBB1534BC37D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E0D93E78-090C-4C9B-B545-DBB1534BC37D}.Release|Any CPU.Build.0 = Release|Any CPU + {AE21AEB8-EC3E-4D26-A154-E8C11BA4F93E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AE21AEB8-EC3E-4D26-A154-E8C11BA4F93E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AE21AEB8-EC3E-4D26-A154-E8C11BA4F93E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AE21AEB8-EC3E-4D26-A154-E8C11BA4F93E}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/ThreadPool/ThreadPool/IMyTask.cs b/ThreadPool/ThreadPool/IMyTask.cs new file mode 100644 index 0000000..6ee16d4 --- /dev/null +++ b/ThreadPool/ThreadPool/IMyTask.cs @@ -0,0 +1,25 @@ +namespace ThreadPool; + +using System; + +/// +/// interface for object from MyThreadPool +/// +public interface IMyTask +{ + /// + /// shows whether the task is completed or not + /// + public bool IsCompleted { get; set; } + + /// + /// returns the result of the task + /// + public TResult Result { get; } + + /// + /// continues calculating + /// + public IMyTask ContinueWith(Func func); +} + diff --git a/ThreadPool/ThreadPool/MyThreadPool.cs b/ThreadPool/ThreadPool/MyThreadPool.cs new file mode 100644 index 0000000..f22dac4 --- /dev/null +++ b/ThreadPool/ThreadPool/MyThreadPool.cs @@ -0,0 +1,208 @@ +namespace ThreadPool; + +using System.Collections.Generic; +using System.Threading; +using System.Collections.Concurrent; +using System; + +/// +/// pool of threads that can be used to execute tasks +/// +public class MyThreadPool +{ + private readonly Thread[] _threads; + private readonly CancellationTokenSource _token = new(); + private readonly ConcurrentQueue _tasksQueue = new(); + private readonly AutoResetEvent _waiterNewTask = new AutoResetEvent(false); + private readonly AutoResetEvent _waiterTaskDone = new AutoResetEvent(false); + private volatile int _freeThreadsCount = 0; + private readonly object _lockObject = new(); + + /// + /// constructor + /// + public MyThreadPool(int countThreads) + { + if (countThreads < 1) + { + throw new ArgumentOutOfRangeException(nameof(countThreads)); + } + + _threads = new Thread[countThreads]; + for (int i = 0; i < countThreads; i++) + { + _threads[i] = new Thread(() => + { + while (!_token.IsCancellationRequested || !_tasksQueue.IsEmpty) + { + if (_tasksQueue.TryDequeue(out Action action)) + { + action(); + } + else + { + _waiterNewTask.WaitOne(); + if (!_tasksQueue.IsEmpty) + { + _waiterNewTask.Set(); + } + } + } + + Interlocked.Increment(ref _freeThreadsCount); + _waiterTaskDone.Set(); + }); + + _threads[i].Start(); + } + } + + /// + /// add task + /// + public IMyTask AddTask(Func function) + { + ReadyToAddNewTask(); + lock (_lockObject) + { + ReadyToAddNewTask(); + var task = new MyTask(function, this); + AddInTaskQueue(task.Count); + return task; + } + } + + private void ReadyToAddNewTask() + { + if (_token.IsCancellationRequested) + { + throw new InvalidOperationException(); + } + } + + private void AddInTaskQueue(Action task) + { + _tasksQueue.Enqueue(task); + _waiterNewTask.Set(); + } + + /// + /// close threads + /// + public void Shutdown() + { + lock (_lockObject) + { + _token.Cancel(); + } + + while (_freeThreadsCount != _threads.Length) + { + _waiterNewTask.Set(); + _waiterTaskDone.WaitOne(); + } + } + + /// + /// class for parallel tasks + /// + private class MyTask : IMyTask + { + private Func _func; + private TResult _result; + private Exception _resultException = null; + private readonly object _locker = new(); + private readonly ManualResetEvent _waiterManual = new(false); + private Queue _continueWithTasksQueue = new(); + private readonly MyThreadPool _myThreadPool; + + /// + /// shows whether the task is completed or not + /// + public bool IsCompleted { get; set; } + + /// + /// returns the result of the task + /// + public TResult Result + { + get + { + _waiterManual.WaitOne(); + if (_resultException != null) + { + throw new AggregateException(_resultException); + } + + return _result; + } + } + + /// + /// constructor + /// + public MyTask(Func function, MyThreadPool myThreadPool) + { + ArgumentNullException.ThrowIfNull(function); + _func = function; + _myThreadPool = myThreadPool; + } + + /// + /// counting result of task + /// + public void Count() + { + try + { + _result = _func(); + } + catch (Exception funcException) + { + _resultException = funcException; + } + + lock (_locker) + { + _func = null; + IsCompleted = true; + _waiterManual.Set(); + } + + lock (_locker) + { + while (_continueWithTasksQueue.Count > 0) + { + var action = _continueWithTasksQueue.Dequeue(); + lock (_myThreadPool._lockObject) + { + _myThreadPool.AddInTaskQueue(action); + } + } + } + } + + /// + /// continues calculating + /// + public IMyTask ContinueWith(Func func) + { + lock (_myThreadPool._lockObject) + { + _myThreadPool.ReadyToAddNewTask(); + var task = new MyTask(() => func(Result), _myThreadPool); + _continueWithTasksQueue.Enqueue(task.Count); + if (IsCompleted) + { + while (_continueWithTasksQueue.Count > 0) + { + var action = _continueWithTasksQueue.Dequeue(); + _myThreadPool._tasksQueue.Enqueue(action); + _myThreadPool._waiterNewTask.Set(); + } + } + return task; + } + } + } +} \ No newline at end of file diff --git a/ThreadPool/ThreadPool/ThreadPool.csproj b/ThreadPool/ThreadPool/ThreadPool.csproj new file mode 100644 index 0000000..6a038c2 --- /dev/null +++ b/ThreadPool/ThreadPool/ThreadPool.csproj @@ -0,0 +1,10 @@ + + + + net6.0 + Windows + 10 + disable + + + diff --git a/ThreadPool/ThreadPoolTest/ThreadPoolTest.cs b/ThreadPool/ThreadPoolTest/ThreadPoolTest.cs new file mode 100644 index 0000000..33f916e --- /dev/null +++ b/ThreadPool/ThreadPoolTest/ThreadPoolTest.cs @@ -0,0 +1,207 @@ +namespace ThreadPoolTest; + +using System; +using System.Threading; +using NUnit.Framework; +using ThreadPool; + +public class Tests +{ + private MyThreadPool _threadPool; + private readonly int _numberOfThreads = 4; + + [SetUp] + public void Setup() + => _threadPool = new(Environment.ProcessorCount); + + [TearDown] + public void TearDown() + => _threadPool.Shutdown(); + + [Test] + public void NullFunctionShouldThrowException() + { + Assert.Throws(() => _threadPool.AddTask(null)); + } + + [Test] + public void ResultShouldThrowException() + { + var task = _threadPool.AddTask(() => throw new ArgumentOutOfRangeException()); + Assert.Throws(() => Result(task)); + } + + [Test] + public void ParallelAddTaskTest() + { + var tasks = new IMyTask[30]; + var functions = new Func[30]; + for (int i = 0; i < 30; i++) + { + var index = i; + functions[i] = () => + { + var result = 0; + for (int j = 0; j < 100; j++) + { + result++; + } + return result + index; + }; + } + + for (int i = 0; i < 30; i++) + { + tasks[i] = _threadPool.AddTask(functions[i]); + } + for (int i = 0; i < 30; i++) + { + Assert.AreEqual(100 + i, tasks[i].Result); + } + } + + [Test] + public void ShutdownWhileTaskCountTest() + { + var tasks = new IMyTask[30]; + var functions = new Func[30]; + for (int i = 0; i < 30; i++) + { + var i1 = i; + functions[i] = () => + { + var result = 0; + for (int j = 0; j < 10; j++) + { + result += 10; + } + return result + i1; + }; + } + + for (int i = 0; i < 30; i++) + { + tasks[i] = _threadPool.AddTask(functions[i]); + } + _threadPool.Shutdown(); + for (int i = 0; i < 30; i++) + { + Assert.AreEqual(100 + i, tasks[i].Result); + } + } + + private object Result(IMyTask task) + => task.Result; + + [Test] + public void ExceptionInThreadPoolConstructorTest() + { + Assert.Throws(() => new MyThreadPool(0)); + } + + [Test] + public void ExceptionAfterShutdownTest() + { + _threadPool.Shutdown(); + Assert.Throws(() => _threadPool.AddTask(() => 5)); + } + + [Test] + public void GetTaskAfterShutdown() + { + var task1 = _threadPool.AddTask(() => 3 * _numberOfThreads); + var task2 = _threadPool.AddTask(() => _numberOfThreads * _numberOfThreads); + Assert.AreEqual(12, task1.Result); + Assert.AreEqual(16, task2.Result); + } + + [Test] + public void TaskResultTest() + { + int number1 = 2; + int number2 = 2; + var task1 = _threadPool.AddTask(() => number1 + 2); + var task2 = _threadPool.AddTask(() => number2 * 5); + Assert.AreEqual(4, task1.Result); + Assert.AreEqual(10, task2.Result); + } + + [Test] + public void ContinueWithTest() + { + var task1 = _threadPool.AddTask(() => 3 * _numberOfThreads); //12 + var task2 = task1.ContinueWith(x => x + _numberOfThreads); //12 + 4 + var task3 = task1.ContinueWith(x => x + task2.Result); // 12 + 16 + Assert.AreEqual(12, task1.Result); + Assert.AreEqual(16, task2.Result); + Assert.AreEqual(28, task3.Result); + } + + [Test] + public void TestWithSeveralThreads() + { + var answerFunc = 10000; + var countOfTasks = 15; + var tasks = new IMyTask[countOfTasks]; + var functions = new Func[countOfTasks]; + _threadPool = new MyThreadPool(5); + for (int i = 0; i < countOfTasks; i++) + { + var index = i; + functions[i] = new Func(() => + { + var result = 0; + for (int j = 0; j < answerFunc; j++) + { + result++; + } + + return result + index; + }); + } + + for (int i = 0; i < countOfTasks; i++) + { + tasks[i] = _threadPool.AddTask(functions[i]); + } + var threads = new Thread[10]; + for (int i = 0; i < 10; i++) + { + threads[i] = new Thread(() => + { + var threadTasks = new IMyTask[countOfTasks]; + var threadFunctions = new Func[countOfTasks]; + for (int j = 0; j < countOfTasks; j++) + { + var index = j; + threadFunctions[j] = new Func(() => + { + var result = 0; + for (int z = 0; z < answerFunc; z++) + { + result++; + } + + return result + index; + }); + } + + for (int j = 0; j < countOfTasks; j++) + { + threadTasks[j] = _threadPool.AddTask(functions[j]); + } + + for (int j = 0; j < countOfTasks; j++) + { + Assert.AreEqual(answerFunc + j, threadTasks[j].Result); + } + }); + } + + for (int i = 0; i < 10; i++) + { + threads[i].Start(); + threads[i].Join(); + } + } +} \ No newline at end of file diff --git a/ThreadPool/ThreadPoolTest/ThreadPoolTest.csproj b/ThreadPool/ThreadPoolTest/ThreadPoolTest.csproj new file mode 100644 index 0000000..2ba879f --- /dev/null +++ b/ThreadPool/ThreadPoolTest/ThreadPoolTest.csproj @@ -0,0 +1,22 @@ + + + + net6.0 + + false + + 10 + + + + + + + + + + + + + + diff --git a/appveyor.yml b/appveyor.yml index 9809100..d79f319 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,3 +1,6 @@ +image: Visual Studio 2022 + + build_script: - For /R %%I in (*.sln) do dotnet test %%I test: of \ No newline at end of file