diff --git a/HW3/HW3.sln b/HW3/HW3.sln new file mode 100644 index 0000000..d63f488 --- /dev/null +++ b/HW3/HW3.sln @@ -0,0 +1,22 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{FA31B85A-23F0-4566-8D57-08FA33575C42}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool.Test", "MyThreadPool.Test\MyThreadPool.Test.csproj", "{0655F283-5458-481F-83B3-F5091360DA57}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {FA31B85A-23F0-4566-8D57-08FA33575C42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FA31B85A-23F0-4566-8D57-08FA33575C42}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FA31B85A-23F0-4566-8D57-08FA33575C42}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FA31B85A-23F0-4566-8D57-08FA33575C42}.Release|Any CPU.Build.0 = Release|Any CPU + {0655F283-5458-481F-83B3-F5091360DA57}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0655F283-5458-481F-83B3-F5091360DA57}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0655F283-5458-481F-83B3-F5091360DA57}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0655F283-5458-481F-83B3-F5091360DA57}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/HW3/MyThreadPool.Test/MyThreadPool.Test.csproj b/HW3/MyThreadPool.Test/MyThreadPool.Test.csproj new file mode 100644 index 0000000..5af5968 --- /dev/null +++ b/HW3/MyThreadPool.Test/MyThreadPool.Test.csproj @@ -0,0 +1,39 @@ + + + + net9.0 + enable + enable + true + false + true + + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + diff --git a/HW3/MyThreadPool.Test/MyThreadPoolTest.cs b/HW3/MyThreadPool.Test/MyThreadPoolTest.cs new file mode 100644 index 0000000..1fa7b82 --- /dev/null +++ b/HW3/MyThreadPool.Test/MyThreadPoolTest.cs @@ -0,0 +1,131 @@ +// +// Copyright (c) khusainovilas. All rights reserved. +// + +namespace MyThreadPool.Test; + +using System.Diagnostics; + +/// +/// Unit tests for MyThreadPool. +/// +public class MyThreadPoolTest +{ + /// + /// Tests that submitting a simple task returns the correct result and completes. + /// + [Test] + public void MyThreadPool_Submit_ReturnsCorrectResult() + { + var pool = new MyThreadPool(1); + var task = pool.Submit(() => 42); + Assert.Multiple(() => + { + Assert.That(task.Result, Is.EqualTo(42)); + Assert.That(task.IsCompleted, Is.True); + }); + } + + /// + /// Tests that a continuation task executes correctly with the transformed result. + /// + [Test] + public void IMyTask_ContinueWith_ExecutesCorrectly() + { + var pool = new MyThreadPool(1); + var task = pool.Submit(() => 2 * 2); + var continuation = task.ContinueWith(x => x.ToString()); + Assert.Multiple(() => + { + Assert.That(continuation.Result, Is.EqualTo("4")); + Assert.That(continuation.IsCompleted, Is.True); + }); + } + + /// + /// Tests that an exception in a task is wrapped in AggregateException and the task completes. + /// + [Test] + public void IMyTask_Result_ThrowsAggregateExceptionOnError() + { + var pool = new MyThreadPool(1); + var task = pool.Submit(() => throw new InvalidOperationException("Test error")); + var exception = Assert.Throws(() => _ = task.Result); + Assert.That(exception.InnerException, Is.TypeOf()); + Assert.Multiple(() => + { + Assert.That(exception.InnerException.Message, Is.EqualTo("Test error")); + Assert.That(task.IsCompleted, Is.True); + }); + } + + /// + /// Tests that multiple continuations for a single task execute correctly with different transformations. + /// + [Test] + public void IMyTask_ContinueWith_SupportsMultipleContinuations() + { + var pool = new MyThreadPool(1); + var task = pool.Submit(() => 5); + var cont1 = task.ContinueWith(x => x + 1); + var cont2 = task.ContinueWith(x => x * 2); + Assert.Multiple(() => + { + Assert.That(cont1.Result, Is.EqualTo(6)); + Assert.That(cont2.Result, Is.EqualTo(10)); + Assert.That(cont1.IsCompleted, Is.True); + Assert.That(cont2.IsCompleted, Is.True); + }); + } + + /// + /// Tests that shutdown prevents new tasks and allows existing tasks to complete. + /// + [Test] + public void MyThreadPool_Shutdown_PreventsNewTasksAndCompletesExisting() + { + var pool = new MyThreadPool(1); + var task = pool.Submit(() => + { + Thread.Sleep(100); + return 1; + }); + pool.Shutdown(); + Assert.Throws(() => pool.Submit(() => 2)); + Assert.Multiple(() => + { + Assert.That(task.Result, Is.EqualTo(1)); + Assert.That(task.IsCompleted, Is.True); + }); + } + + /// + /// Tests that the thread pool uses at least n threads by checking parallel execution time. + /// + [Test] + public void MyThreadPool_ThreadCount_UsesAtLeastNThreads() + { + const int n = 4; + var pool = new MyThreadPool(n); + var tasks = new List>(); + var stopwatch = Stopwatch.StartNew(); + for (var i = 0; i < n; i++) + { + tasks.Add(pool.Submit(() => + { + Thread.Sleep(1000); + return 1; + })); + } + + foreach (var task in tasks) + { + Assert.That(task.Result, Is.EqualTo(1)); + } + + stopwatch.Stop(); + + // The work of 4 threads will take approximately 1 second. + Assert.That(stopwatch.Elapsed.TotalSeconds, Is.LessThan(1.5)); + } +} \ No newline at end of file diff --git a/HW3/MyThreadPool.Test/stylecop.json b/HW3/MyThreadPool.Test/stylecop.json new file mode 100644 index 0000000..a97d030 --- /dev/null +++ b/HW3/MyThreadPool.Test/stylecop.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://raw.githubusercontent.com/DotNetAnalyzers/StyleCopAnalyzers/master/StyleCop.Analyzers/StyleCop.Analyzers/Settings/stylecop.schema.json", + "settings": { + "documentationRules": { + "companyName": "khusainovilas", + "copyrightText": "Copyright (c) {companyName}. All rights reserved.", + "xmlHeader": true + } + } +} \ No newline at end of file diff --git a/HW3/MyThreadPool/IMyTask.cs b/HW3/MyThreadPool/IMyTask.cs new file mode 100644 index 0000000..2b67fe5 --- /dev/null +++ b/HW3/MyThreadPool/IMyTask.cs @@ -0,0 +1,34 @@ +// +// Copyright (c) khusainovilas. All rights reserved. +// + +namespace MyThreadPool; + +/// +/// Defines a contract for a task in the custom thread pool. +/// +/// The type of the result produced by the task. +public interface IMyTask +{ + /// + /// Gets a value indicating whether returns true if the task is completed (successfully or with an error). + /// + bool IsCompleted { get; } + + /// + /// Gets the result of the task. + /// If the task is not completed, it blocks the calling thread until it is completed. + /// If the task completes with an exception, it throws an AggregateException with an internal exception. + /// + TResult Result { get; } + + /// + /// Creates a new task that runs after the current one is completed, + /// using its result as an argument for continuation. + /// A new task is placed in the pool and does not block the calling thread. + /// \ + /// The type of the result produced by the continuation task. + /// A function that takes the result of the current task and produces a new result. + /// A new task representing the continuation. + IMyTask ContinueWith(Func continuation); +} \ No newline at end of file diff --git a/HW3/MyThreadPool/MyTask.cs b/HW3/MyThreadPool/MyTask.cs new file mode 100644 index 0000000..577fbcf --- /dev/null +++ b/HW3/MyThreadPool/MyTask.cs @@ -0,0 +1,123 @@ +// +// Copyright (c) khusainovilas. All rights reserved. +// + +namespace MyThreadPool; + +/// +/// Represents a task that executes a function and produces a result>. +/// +/// The type of the result produced by the task. +internal class MyTask : IMyTask +{ + private readonly object @lock = new(); + private readonly ManualResetEvent waitHandle = new(false); + private readonly MyThreadPool pool; + private readonly Func func; + private TResult result = default!; + private Exception exception = null!; + private volatile bool isCompleted; + private List? continuations; + + /// + /// Initializes a new instance of the class. + /// + /// The thread pool to queue continuations. + /// The function to execute to produce the result. + public MyTask(MyThreadPool pool, Func func) + { + this.pool = pool ?? throw new ArgumentNullException(nameof(pool)); + this.func = func ?? throw new ArgumentNullException(nameof(func)); + this.continuations = []; + } + + /// + public bool IsCompleted => this.isCompleted; + + /// + public TResult Result + { + get + { + if (!this.isCompleted) + { + this.waitHandle.WaitOne(); + } + + lock (this.@lock) + { + if (this.exception != null) + { + throw new AggregateException("Task failed with an exception.", this.exception); + } + + return this.result; + } + } + } + + /// + public IMyTask ContinueWith(Func continuation) + { + ArgumentNullException.ThrowIfNull(continuation); + + var newTask = new MyTask(this.pool, () => continuation(this.Result)); + lock (this.@lock) + { + if (this.isCompleted) + { + this.pool.QueueTask(newTask.GetExecuteAction()); + } + else + { + if (this.continuations == null) + { + throw new InvalidOperationException("Cannot add continuation to a completed task with cleared continuations."); + } + + this.continuations.Add(newTask.GetExecuteAction()); + } + } + + return newTask; + } + + /// + /// Returns an Action that executes the task, stores the result or exception, and queues continuations. + /// + /// An Action to be executed by the thread pool. + public Action GetExecuteAction() + { + return () => + { + try + { + this.result = this.func(); + } + catch (Exception ex) + { + lock (this.@lock) + { + this.exception = ex; + } + } + finally + { + lock (this.@lock) + { + this.isCompleted = true; + this.waitHandle.Set(); + if (this.continuations != null) + { + foreach (var continuation in this.continuations) + { + this.pool.QueueTask(continuation); + } + } + + this.continuations = null; + } + } + }; + } +} \ No newline at end of file diff --git a/HW3/MyThreadPool/MyThreadPool.cs b/HW3/MyThreadPool/MyThreadPool.cs new file mode 100644 index 0000000..790b734 --- /dev/null +++ b/HW3/MyThreadPool/MyThreadPool.cs @@ -0,0 +1,160 @@ +// +// Copyright (c) khusainovilas. All rights reserved. +// + +namespace MyThreadPool; + +/// +/// Main thread pool class. +/// +public class MyThreadPool +{ + private readonly ManualResetEvent shutdownEvent; + private readonly Thread[] threads; + private readonly Queue taskQueue; + private readonly object queueLock; + private int threadCount; + private volatile bool isShutdown; + + /// + /// Initializes a new instance of the class. + /// + /// The number of threads in the pool. Must be positive. + public MyThreadPool(int threadCount) + { + if (threadCount <= 0) + { + throw new ArgumentOutOfRangeException(nameof(threadCount), "Thread count must be positive."); + } + + this.threadCount = threadCount; + this.threads = new Thread[threadCount]; + this.taskQueue = new Queue(); + this.queueLock = new object(); + this.isShutdown = false; + this.shutdownEvent = new ManualResetEvent(false); + + for (var i = 0; i < threadCount; i++) + { + this.threads[i] = new Thread(this.ThreadProc) + { + IsBackground = true, + Name = $"ThreadPoolThread-{i}", + }; + this.threads[i].Start(); + } + } + + /// + /// Submits a task to the thread pool for execution. + /// + /// The type of the result produced by the task. + /// The function to execute to produce the result. + /// An representing the task. + public IMyTask Submit(Func func) + { + if (this.isShutdown) + { + throw new InvalidOperationException("Cannot submit tasks to a shutdown thread pool."); + } + + var task = new MyTask(this, func); + this.QueueTask(task.GetExecuteAction() ?? throw new InvalidOperationException()); + return task; + } + + /// + /// Initiates a collaborative shutdown, preventing new tasks from being queued and allowing existing tasks to complete. + /// Blocks until all threads have finished. + /// + public void Shutdown() + { + lock (this.queueLock) + { + if (this.isShutdown) + { + return; + } + + this.isShutdown = true; + Monitor.PulseAll(this.queueLock); + } + + this.shutdownEvent.WaitOne(); + } + + /// + /// Disposes the thread pool, ensuring a proper shutdown. + /// + public void Dispose() + { + if (!this.isShutdown) + { + this.Shutdown(); + } + + this.shutdownEvent.Dispose(); + } + + /// + /// Queues a task for execution by the thread pool. + /// + /// The action to execute. + /// Thrown when the thread pool is shutting down. + internal void QueueTask(Action action) + { + lock (this.queueLock) + { + if (this.isShutdown) + { + throw new InvalidOperationException("Cannot queue tasks to a shutdown thread pool."); + } + + this.taskQueue.Enqueue(action); + + Monitor.PulseAll(this.queueLock); + } + } + + /// + /// Worker thread procedure that processes tasks from the queue. + /// + private void ThreadProc() + { + while (true) + { + Action task = null!; + lock (this.queueLock) + { + while (this.taskQueue.Count == 0 && !this.isShutdown) + { + Monitor.Wait(this.queueLock); + } + + if (this.isShutdown && this.taskQueue.Count == 0) + { + break; + } + + if (this.taskQueue.Count > 0) + { + task = this.taskQueue.Dequeue(); + } + } + + try + { + task(); + } + catch + { + // ignored + } + } + + if (Interlocked.Decrement(ref this.threadCount) == 0) + { + this.shutdownEvent.Set(); + } + } +} \ No newline at end of file diff --git a/HW3/MyThreadPool/MyThreadPool.csproj b/HW3/MyThreadPool/MyThreadPool.csproj new file mode 100644 index 0000000..6b0dd10 --- /dev/null +++ b/HW3/MyThreadPool/MyThreadPool.csproj @@ -0,0 +1,23 @@ + + + + Library + net9.0 + enable + enable + MyThreadPool + true + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + diff --git a/HW3/MyThreadPool/stylecop.json b/HW3/MyThreadPool/stylecop.json new file mode 100644 index 0000000..a97d030 --- /dev/null +++ b/HW3/MyThreadPool/stylecop.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://raw.githubusercontent.com/DotNetAnalyzers/StyleCopAnalyzers/master/StyleCop.Analyzers/StyleCop.Analyzers/Settings/stylecop.schema.json", + "settings": { + "documentationRules": { + "companyName": "khusainovilas", + "copyrightText": "Copyright (c) {companyName}. All rights reserved.", + "xmlHeader": true + } + } +} \ No newline at end of file