diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..775497a --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +# Common IntelliJ Platform excludes + +# User specific +**/.idea/**/workspace.xml +**/.idea/**/tasks.xml +**/.idea/shelf/* +**/.idea/dictionaries +**/.idea/httpRequests/ + +# Sensitive or high-churn files +**/.idea/**/dataSources/ +**/.idea/**/dataSources.ids +**/.idea/**/dataSources.xml +**/.idea/**/dataSources.local.xml +**/.idea/**/sqlDataSources.xml +**/.idea/**/dynamic.xml + +# Rider +# Rider auto-generates .iml files, and contentModel.xml +**/.idea/**/*.iml +**/.idea/**/contentModel.xml +**/.idea/**/modules.xml + +*.suo +*.user +.vs/ +[Bb]in/ +[Oo]bj/ +_UpgradeReport_Files/ +[Pp]ackages/ + +Thumbs.db +Desktop.ini +.DS_Store + +**/.idea/ diff --git a/3Homework12.10.22/MyThreadPool/MyThreadPool.sln b/3Homework12.10.22/MyThreadPool/MyThreadPool.sln new file mode 100644 index 0000000..dd05c0c --- /dev/null +++ b/3Homework12.10.22/MyThreadPool/MyThreadPool.sln @@ -0,0 +1,22 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{224B1961-604D-4EAB-ACCA-1D44E285FCDA}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPoolTests", "MyThreadPoolTests\MyThreadPoolTests.csproj", "{F948F284-F2E1-4BC4-BC8B-DA89906D74AF}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Release|Any CPU.Build.0 = Release|Any CPU + {F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/3Homework12.10.22/MyThreadPool/MyThreadPool/IMyTask.cs b/3Homework12.10.22/MyThreadPool/MyThreadPool/IMyTask.cs new file mode 100644 index 0000000..d0b4f13 --- /dev/null +++ b/3Homework12.10.22/MyThreadPool/MyThreadPool/IMyTask.cs @@ -0,0 +1,26 @@ +namespace MyThreadPool; + +/// +/// An operation performed on MyThreadPool that returns a value. +/// +/// Type of the returnable value. +public interface IMyTask +{ + /// + /// Gets a value indicating whether get true if the task is completed. + /// + bool IsCompleted { get; } + + /// + /// Gets the task result. + /// + TResult Result { get; } + + /// + /// Creates a new task based on this task. + /// + /// The function to perform. + /// Type of the new returnable value. + /// Task with new returnable value. + IMyTask ContinueWith(Func func); +} \ No newline at end of file diff --git a/3Homework12.10.22/MyThreadPool/MyThreadPool/MyThreadPool.cs b/3Homework12.10.22/MyThreadPool/MyThreadPool/MyThreadPool.cs new file mode 100644 index 0000000..98c21be --- /dev/null +++ b/3Homework12.10.22/MyThreadPool/MyThreadPool/MyThreadPool.cs @@ -0,0 +1,241 @@ +namespace MyThreadPool; + +using System.Collections.Concurrent; + +/// +/// The ThreadPool abstraction. +/// +public class MyThreadPool +{ + private BlockingCollection tasks; + private MyThread[] threads; + private CancellationTokenSource source = new(); + private bool isShutdown; + private AutoResetEvent tasksOver; + private Semaphore taskCount = new Semaphore(0, 10000); + + /// + /// Initializes a new instance of the class. + /// + /// Number of this ThreadPool threads. + public MyThreadPool(int threadCount = 10) + { + if (threadCount <= 0) + { + throw new InvalidDataException(); + } + + this.ThreadCount = threadCount; + this.tasks = new(); + this.threads = new MyThread[threadCount]; + this.tasksOver = new AutoResetEvent(false); + + for (int i = 0; i < threadCount; ++i) + { + this.threads[i] = new MyThread(this.tasks, this.source.Token, this.tasksOver, this.taskCount); + } + + this.isShutdown = false; + } + + /// + /// Gets number of existing threads. + /// + public int ThreadCount { get; } + + /// + /// Submits new task to the ThreadPool. + /// + /// A calculation to perform. + /// Value type. + /// Task. + public IMyTask Submit(Func func) + { + var task = new MyTask(func, this); + this.tasks.Add(() => task.Start()); + this.tasksOver.Reset(); + this.taskCount.Release(); + return task; + } + + /// + /// Completes the threads. + /// + public void Shutdown() + { + if (this.isShutdown) + { + return; + } + + this.tasks.CompleteAdding(); + var timeoutThread = new Thread(() => + { + Thread.Sleep(20000); + this.tasksOver.Set(); + }); + + timeoutThread.Start(); + this.tasksOver.WaitOne(); + + if (this.tasks.Count > 0) + { + throw new TimeoutException("Tasks from the queue cannot be executed."); + } + + this.source.Cancel(); + + for (int i = 0; i < this.ThreadCount; i++) + { + this.taskCount.Release(); + } + + var areJoined = true; + foreach (var thread in this.threads) + { + thread.Join(); + if (thread.IsWorking) + { + areJoined = false; + } + } + + this.isShutdown = true; + if (!areJoined) + { + throw new TimeoutException("Not all tasks were accomplished."); + } + } + + private class MyThread + { + private Thread thread; + private BlockingCollection collection; + private int timeout = 5000; + private AutoResetEvent tasksOver; + private Semaphore tasksCount; + + public MyThread(BlockingCollection collection, CancellationToken token, AutoResetEvent tasksOver, Semaphore tasksCount) + { + this.tasksOver = tasksOver; + this.collection = collection; + this.tasksCount = tasksCount; + this.thread = new Thread(() => this.Start(token)); + this.IsWorking = false; + this.thread.Start(); + } + + public bool IsWorking { get; private set; } + + public void Join() + { + if (this.thread.IsAlive) + { + this.thread.Join(this.timeout); + } + } + + private void Start(CancellationToken token) + { + while (!token.IsCancellationRequested) + { + if (this.collection.TryTake(out var action)) + { + this.IsWorking = true; + action(); + this.IsWorking = false; + } + else + { + this.tasksOver.Set(); + this.tasksCount.WaitOne(); + } + } + } + } + + private class MyTask : IMyTask + { + private MyThreadPool pool; + private Func? func; + private ManualResetEvent resetEvent = new(false); + private T? result; + private Exception? returnedException; + private BlockingCollection deferredTasks; + + /// + /// Initializes a new instance of the class. + /// + /// Function that will performed. + /// Pool for the submit. + public MyTask(Func func, MyThreadPool threadPool) + { + this.func = func; + this.pool = threadPool; + this.IsCompleted = false; + this.deferredTasks = new BlockingCollection(); + } + + /// + public bool IsCompleted { get; private set; } + + /// + public T Result + { + get + { + this.resetEvent.WaitOne(); + if (this.returnedException != null) + { + throw new AggregateException(this.returnedException); + } + + return this.result!; + } + } + + public void Start() + { + try + { + this.result = this.func!(); + } + catch (Exception exception) + { + this.returnedException = exception; + } + finally + { + this.func = null; + this.IsCompleted = true; + this.resetEvent.Set(); + this.deferredTasks.CompleteAdding(); + foreach (var task in this.deferredTasks) + { + this.pool.Submit(() => task); + } + } + } + + /// + public IMyTask ContinueWith(Func func1) + { + if (this.result != null) + { + return this.pool.Submit(() => func1(this.Result)); + } + + MyTask newTask = new MyTask(() => func1(this.Result), this.pool); + try + { + this.deferredTasks.Add(() => newTask.Start()); + } + catch (InvalidOperationException) + { + return this.pool.Submit(() => func1(this.Result)); + } + + return newTask; + } + } +} diff --git a/3Homework12.10.22/MyThreadPool/MyThreadPool/MyThreadPool.csproj b/3Homework12.10.22/MyThreadPool/MyThreadPool/MyThreadPool.csproj new file mode 100644 index 0000000..eb2460e --- /dev/null +++ b/3Homework12.10.22/MyThreadPool/MyThreadPool/MyThreadPool.csproj @@ -0,0 +1,9 @@ + + + + net6.0 + enable + enable + + + diff --git a/3Homework12.10.22/MyThreadPool/MyThreadPoolTests/MyThreadPoolTests.cs b/3Homework12.10.22/MyThreadPool/MyThreadPoolTests/MyThreadPoolTests.cs new file mode 100644 index 0000000..c8abeb5 --- /dev/null +++ b/3Homework12.10.22/MyThreadPool/MyThreadPoolTests/MyThreadPoolTests.cs @@ -0,0 +1,274 @@ +namespace MyThreadPool.Tests; + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +public class MyThreadPoolTests +{ + private int threadsInThreadPoolCount = 10; + + [Test] + public void AtLeastNThreadsInThePool() + { + MyThreadPool pool = new(this.threadsInThreadPoolCount); + var func = new Func(() => + { + Thread.Sleep(5000); + return 1; + }); + + var stopwatch = new Stopwatch(); + var tasks = new List>(); + stopwatch.Start(); + for (int i = 0; i < this.threadsInThreadPoolCount; ++i) + { + tasks.Add(pool.Submit(() => func())); + } + + for (int i = 0; i < this.threadsInThreadPoolCount; ++i) + { + Assert.AreEqual(1, tasks[i].Result); + } + + stopwatch.Stop(); + Assert.Less(stopwatch.ElapsedMilliseconds, 10000); + Assert.AreEqual(10, pool.ThreadCount); + + pool.Shutdown(); + } + + [Test] + public void ManyTasksWorks() + { + MyThreadPool pool = new(this.threadsInThreadPoolCount); + var func = new Func(() => + { + int x = 0; + for (int i = 1; i < 1000; ++i) + { + x += i; + } + + return x; + }); + + var tasks = new List>(); + for (int i = 0; i < 500; ++i) + { + tasks.Add(pool.Submit(() => func())); + } + + for (int i = 0; i < 500; ++i) + { + Assert.AreEqual(499500, tasks[i].Result); + } + + pool.Shutdown(); + } + + [Test] + public void ShutdownWorksWithEndlessProcess() + { + MyThreadPool pool = new(3); + var func = new Func(() => + { + Thread.Sleep(20000); + return 1; + }); + + var tasks = new List>(); + for (int i = 0; i < 2; ++i) + { + tasks.Add(pool.Submit(() => func())); + } + + for (int i = 0; i < 10; ++i) + { + tasks.Add(pool.Submit(() => 2 * 2)); + } + + for (int i = 9; i < 12; ++i) + { + Assert.AreEqual(4, tasks[i].Result); + } + + Assert.Catch(pool.Shutdown); + } + + [Test] + public void SeveralContinueWithWorks() + { + MyThreadPool pool = new(this.threadsInThreadPoolCount); + var func1 = new Func(() => + { + Thread.Sleep(500); + return 1; + }); + + var func2 = new Func(x => + { + Thread.Sleep(500); + return 2 * x; + }); + + var tasks = new List>(); + tasks.Add(pool.Submit(() => func1())); + for (int i = 1; i < 10; ++i) + { + tasks.Add(tasks[i - 1].ContinueWith(func2)); + } + + int x = 1; + for (int i = 0; i < 1; ++i) + { + Assert.AreEqual(x, tasks[i].Result); + x *= 2; + } + + pool.Shutdown(); +} + + [Test] + public void ContinueWithDoesNotBlockThread() + { + MyThreadPool pool = new(3); + var func1 = new Func(() => + { + Thread.Sleep(5000); + return 1; + }); + + var func2 = new Func(x => 2 * x); + + var stopwatch = new Stopwatch(); + var tasks = new List>(); + stopwatch.Start(); + tasks.Add(pool.Submit(() => func1())); + tasks.Add(tasks[0].ContinueWith(func2)); + tasks.Add(pool.Submit(() => func1())); + + Assert.AreEqual(1, tasks[0].Result); + Assert.AreEqual(2, tasks[1].Result); + Assert.AreEqual(1, tasks[2].Result); + stopwatch.Stop(); + Assert.Less(stopwatch.ElapsedMilliseconds, 10000); + + pool.Shutdown(); + } + + [Test] + public async Task ParallelSubmitsWorks() + { + MyThreadPool pool = new(); + var func1 = new Func(() => + { + Thread.Sleep(50); + return 1; + }); + + var tasks = new Task[100]; + for (int i = 0; i < 100; ++i) + { + tasks[i] = Task.Run(() => pool.Submit(() => func1()).Result); + } + + for (int i = 0; i < 100; ++i) + { + Assert.AreEqual(1, await tasks[i]); + } + + pool.Shutdown(); + } + + [Test] + public async Task ParallelSubmitAdnShutDownWorks() + { + MyThreadPool pool = new(); + var func1 = new Func(() => + { + Thread.Sleep(50); + return 1; + }); + + var tasks = new Task[100]; + Task? task1 = null; + for (int i = 0; i < 100; ++i) + { + if (i == 50) + { + task1 = Task.Run(() => pool.Shutdown()); + } + + tasks[i] = Task.Run(() => pool.Submit(() => func1()).Result); + } + + for (int i = 0; i < 100; ++i) + { + int result; + var invalidOperation = new InvalidOperationException().GetType(); + try + { + result = tasks[i].Result; + Assert.AreEqual(1, result); + } + catch (AggregateException e) + { + Assert.AreEqual(1, e.InnerExceptions.Count); + Assert.AreEqual(invalidOperation, e.InnerException!.GetType()); + } + } + + Assert.NotNull(task1); + if (task1 == null) + { + return; + } + + await task1; + Assert.IsNull(task1.Exception); + + pool.Shutdown(); + } + + [Test] + public Task ParallelSubmitsContinueWithAndShutDownWorks() + { + MyThreadPool pool = new(1); + var func1 = new Func(() => + { + Thread.Sleep(50); + return 1; + }); + var func2 = new Func(x => + { + Thread.Sleep(50); + return 2 * x; + }); + + var taskSubmit = Task.Run(() => pool.Submit(() => func1())); + Assert.AreEqual(1, taskSubmit.Result.Result); + var taskShutDown = Task.Run(() => pool.Shutdown()); + var taskContinueWith = Task.Run(() => taskSubmit.Result.ContinueWith(func2)); + + int result; + var invalidOperation = new InvalidOperationException().GetType(); + try + { + result = taskContinueWith.Result.Result; + Assert.AreEqual(2, result); + } + catch (AggregateException e) + { + Assert.AreEqual(1, e.InnerExceptions.Count); + Assert.AreEqual(invalidOperation, e.InnerException!.GetType()); + } + + Assert.IsNull(taskShutDown.Exception); + pool.Shutdown(); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/3Homework12.10.22/MyThreadPool/MyThreadPoolTests/MyThreadPoolTests.csproj b/3Homework12.10.22/MyThreadPool/MyThreadPoolTests/MyThreadPoolTests.csproj new file mode 100644 index 0000000..faab430 --- /dev/null +++ b/3Homework12.10.22/MyThreadPool/MyThreadPoolTests/MyThreadPoolTests.csproj @@ -0,0 +1,23 @@ + + + + net6.0 + enable + + false + + MyThreadPool.Tests + + + + + + + + + + + + + +