-
Notifications
You must be signed in to change notification settings - Fork 0
ThreadPool #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
ThreadPool #3
Changes from all commits
2e079ee
545cb39
9f1f1a0
6cb1df1
9996866
dee3019
5f3cb7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| namespace MyThreadPoolTests; | ||
|
|
||
| public class Tests | ||
| { | ||
| // ���� �������� ���� �������� ������� ���������� | ||
| [Test] | ||
| public void ShouldOperationCanceledExceptionWhenShutDownIfTaskHasNotStart() | ||
| { | ||
| var pool = new MyThreadPool.MyThreadPool(1); | ||
| var task = pool.Submit(() => { Thread.Sleep(100); return 1; }); | ||
| static int ReturnTwo(int lol) => 2; | ||
| var continuation = task.ContinueWith(ReturnTwo); | ||
| pool.ShutDown(); | ||
| int ReturnResult() => continuation.Result; | ||
| Assert.Throws<OperationCanceledException>(() => ReturnResult()); | ||
| } | ||
|
|
||
| [Test] | ||
| public void ShouldExpectedIsCancellationRequestedIsEqualTrueWhenShutDown() | ||
| { | ||
| var pool = new MyThreadPool.MyThreadPool(1); | ||
| pool.ShutDown(); | ||
| Assert.True(pool.Source.Token.IsCancellationRequested); | ||
| } | ||
|
|
||
| private static IEnumerable<TestCaseData> TestRemoveCaseData() => new TestCaseData[] | ||
| { | ||
| }; | ||
|
|
||
| private static IEnumerable<TestCaseData> CaseData() | ||
| { | ||
| MyThreadPool.MyThreadPool pool = new (10); | ||
| var list = new List<MyThreadPool.IMyTask<int>>(); | ||
| var numberOfTasks = 15; | ||
|
|
||
| for (int i = 0; i < numberOfTasks; i++) | ||
| { | ||
| var locali = i; | ||
| list.Add(pool.Submit(() => locali)); | ||
| } | ||
|
|
||
| int counter = 0; | ||
| foreach (var value in list) | ||
| { | ||
| yield return new TestCaseData(value.Result, counter, pool); | ||
| counter++; | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Это странный способ записать просто тест. Это ж не данные для тестируемой системы, а содержательная логика |
||
|
|
||
| // ��� ������ ����������� � �� ��������� ��� ��������� ������ | ||
| [TestCaseSource(nameof(CaseData))] | ||
| public void ShouldResultIsEqualExpectedValue(int actual, int expected, MyThreadPool.MyThreadPool pool) | ||
| { | ||
| Assert.That(actual, Is.EqualTo(expected)); | ||
| pool.ShutDown(); | ||
| } | ||
|
|
||
| // ���������, ��� ����� ������� �� ����������� (� ���� ������ �� �������� ������) | ||
| [Test] | ||
| public void ShouldNumberOfThreadIsEqualNumberThatWasSentToConstructor() | ||
| { | ||
| int numberOfThreads = 10; | ||
| MyThreadPool.MyThreadPool pool = new(numberOfThreads); | ||
| var task = pool.Submit(() => 1); | ||
| task = pool.Submit(() => 1); | ||
| Assert.That(pool.CountOfThreads, Is.EqualTo(numberOfThreads)); | ||
| pool.ShutDown(); | ||
|
|
||
| numberOfThreads = 5; | ||
| pool = new(numberOfThreads); | ||
| Assert.That(pool.CountOfThreads, Is.EqualTo(numberOfThreads)); | ||
| pool.ShutDown(); | ||
|
|
||
| numberOfThreads = 100; | ||
| pool = new(numberOfThreads); | ||
| task = pool.Submit(() => 1); | ||
| int a = task.Result; | ||
| Assert.That(pool.CountOfThreads, Is.EqualTo(numberOfThreads)); | ||
| pool.ShutDown(); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Если pool.CountOfThreads просто возвращает то, что передали в конструктор, потоков может быть сколько угодно. Было бы интереснее запустить N > numberOfThreads потоков и посмотреть, что за раз отработало ровно numberOfThreads из них. |
||
|
|
||
| // �������� ��� ����� ������ ����� ��������� �� �����, ��� ���������� �������� | ||
| // ���� ��� ��� � 1000 �� ��� ����� ����� ��� �����, �� �� ���� ��� ����� ��������� | ||
| // � ��� �������������, �� ���� ��� �� ������ | ||
| /* | ||
| [Test] | ||
| public void ShouldTheNewTaskWillBeCompletedNoEarlierThanTheOriginalOneIsCompleted() | ||
| { | ||
| var pool = new MyThreadPool.MyThreadPool(10); | ||
|
|
||
| // ������ ������ � ������� � �������� ����� ���������� > 1000 �� | ||
| var task = pool.Submit(() => { Thread.Sleep(1000); return 1; }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Используйте ResetEvent-ы. Создали задачу, запустили, она встала на ResetEvent-е, проверили, что хотели, отпустили ResetEvent, продолжили, проверили, что всё ок. Thread.Sleep действительно не нужен |
||
|
|
||
| // �����������, ��� ����� ������ �� ���� �������� | ||
|
|
||
| // ����� ������ �� ����� ���������� - �������, ����� ���, 100 �� ��� ��� ������ ���������� (�������� ��� ��� �� 10 �������, �� ������� ����� ������ 1) | ||
| static int ReturnTwo(int lol) => 2; | ||
| var continuation = task.ContinueWith(returnTwo); | ||
|
|
||
| // ����� ������ �� ���� ������ � ������ ����������� �� 100 �� (�� ���� �� ShutDown) | ||
| // ������ ��� ��������� � �������� Result ��� ������ ���� �� | ||
| Thread.Sleep(100); | ||
| pool.ShutDown(); | ||
|
|
||
| bool isCompleted = false; | ||
|
|
||
| // ���� ��� ����� - �� ��� ������������. �� ��� ������ => ����� ���� �������� ( ���� �� ����� � ��, ��� ����� ������ ������ ����������� �� 100 ��) | ||
| Assert.That(isCompleted, Is.EqualTo(continuation.IsCompleted)); | ||
| }*/ | ||
|
|
||
| // �������� ��� ����� ������ ����������� | ||
| [Test] | ||
| public void ShouldNewTaskIsBeingExecuted() | ||
| { | ||
| var pool = new MyThreadPool.MyThreadPool(10); | ||
| var task = pool.Submit(() => 1); | ||
|
|
||
| static int ReturnTwo(int lol) => 2; | ||
| var continuation = task.ContinueWith(ReturnTwo); | ||
| int result = 2; | ||
| Assert.That(continuation.Result, Is.EqualTo(result)); | ||
|
|
||
| var continuationContinuation = continuation.ContinueWith((x) => x * x); | ||
| result = 4; | ||
|
|
||
| Thread.Sleep(10); | ||
| Assert.That(continuationContinuation.Result, Is.EqualTo(result)); | ||
| pool.ShutDown(); | ||
| } | ||
|
|
||
| [Test] | ||
| public void ShouldAggregateExceptionWhenDevideByZero() | ||
| { | ||
| var pool = new MyThreadPool.MyThreadPool(10); | ||
| int zero = 0; | ||
| var task = pool.Submit(() => 1 / zero); | ||
| int ReturnResult() => task.Result; | ||
| Assert.Throws<AggregateException>(() => ReturnResult()); | ||
| pool.ShutDown(); | ||
| } | ||
|
|
||
| /* | ||
| [Test] | ||
| public void ShouldExpectedFalseWhenIsCompletedForCancelledTask() | ||
| { | ||
| var pool = new MyThreadPool.MyThreadPool(10); | ||
| var task = pool.Submit(() => 1); | ||
| pool.ShutDown(); | ||
| bool result = false; | ||
| Assert.That(result, Is.EqualTo(task.IsCompleted)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Она вполне могла успеть посчитаться до ShutDown |
||
| }*/ | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Нет ни одного теста на многопоточный доступ к методам тредпула. Сейчас совершенно неочевидно, что он потокобезопасен. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
|
|
||
| <PropertyGroup> | ||
| <TargetFramework>net6.0</TargetFramework> | ||
| <ImplicitUsings>enable</ImplicitUsings> | ||
| <Nullable>enable</Nullable> | ||
|
|
||
| <IsPackable>false</IsPackable> | ||
|
|
||
| <OutputType>Library</OutputType> | ||
| </PropertyGroup> | ||
|
|
||
| <ItemGroup> | ||
| <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" /> | ||
| <PackageReference Include="NUnit" Version="3.13.3" /> | ||
| <PackageReference Include="NUnit3TestAdapter" Version="4.2.1" /> | ||
| <PackageReference Include="NUnit.Analyzers" Version="3.3.0" /> | ||
| <PackageReference Include="coverlet.collector" Version="3.1.2" /> | ||
| </ItemGroup> | ||
|
|
||
| <ItemGroup> | ||
| <ProjectReference Include="..\ThreadPool\ThreadPool.csproj" /> | ||
| </ItemGroup> | ||
|
|
||
| </Project> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| global using NUnit.Framework; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| | ||
| Microsoft Visual Studio Solution File, Format Version 12.00 | ||
| # Visual Studio Version 17 | ||
| VisualStudioVersion = 17.2.32505.173 | ||
| MinimumVisualStudioVersion = 10.0.40219.1 | ||
| Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ThreadPool", "ThreadPool\ThreadPool.csproj", "{2855D0B1-E08B-447E-8F19-04D6B93B842B}" | ||
| EndProject | ||
| Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPoolTests", "MyThreadPoolTests\MyThreadPoolTests.csproj", "{1700B832-9222-4906-9C1C-55450298057B}" | ||
| EndProject | ||
| Global | ||
| GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||
| Debug|Any CPU = Debug|Any CPU | ||
| Release|Any CPU = Release|Any CPU | ||
| EndGlobalSection | ||
| GlobalSection(ProjectConfigurationPlatforms) = postSolution | ||
| {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | ||
| {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||
| {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||
| {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Release|Any CPU.Build.0 = Release|Any CPU | ||
| {1700B832-9222-4906-9C1C-55450298057B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | ||
| {1700B832-9222-4906-9C1C-55450298057B}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||
| {1700B832-9222-4906-9C1C-55450298057B}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||
| {1700B832-9222-4906-9C1C-55450298057B}.Release|Any CPU.Build.0 = Release|Any CPU | ||
| EndGlobalSection | ||
| GlobalSection(SolutionProperties) = preSolution | ||
| HideSolutionNode = FALSE | ||
| EndGlobalSection | ||
| GlobalSection(ExtensibilityGlobals) = postSolution | ||
| SolutionGuid = {341821D8-69C8-40C8-8ACF-1DED0D7FD0ED} | ||
| EndGlobalSection | ||
| EndGlobal |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| namespace MyThreadPool; | ||
|
|
||
| /// <summary> | ||
| /// Interface for tasks accepted for execution | ||
| /// </summary> | ||
| /// <typeparam name="TResult">Type of return value</typeparam> | ||
| public interface IMyTask<TResult> | ||
| { | ||
| /// <summary> | ||
| /// Returns true if the task is completed | ||
| /// </summary> | ||
| public bool IsCompleted { get; } | ||
|
|
||
| /// <summary> | ||
| /// Returns the result of the task execution | ||
| /// </summary> | ||
| public TResult Result { get; } | ||
|
|
||
| /// <summary> | ||
| /// | ||
| /// </summary> | ||
| /// <typeparam name="TNewResult"></typeparam> | ||
| /// <param name="func"></param> | ||
| /// <returns></returns> | ||
| public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<TResult, TNewResult> func); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| namespace MyThreadPool; | ||
|
|
||
| using System.Collections.Concurrent; | ||
|
|
||
| public class MyTask<TResult> : IMyTask<TResult> | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| { | ||
| private readonly Func<TResult> function; | ||
|
|
||
| private readonly Object lockObject = new(); | ||
|
|
||
| private Exception? exception = null; | ||
|
|
||
| private readonly ConcurrentQueue<Action> queueOfTasksToComplete = new(); | ||
|
|
||
| private volatile bool isCompleted; | ||
|
|
||
| private TResult? result; | ||
|
|
||
| private readonly MyThreadPool threadPool; | ||
|
|
||
| public MyTask(Func<TResult> function, MyThreadPool threadPool) | ||
| { | ||
| this.function = function; | ||
| this.threadPool = threadPool; | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public bool IsCompleted => isCompleted; | ||
|
|
||
| public void Work() | ||
| { | ||
| if (threadPool.Source.Token.IsCancellationRequested) | ||
| { | ||
| lock (lockObject) | ||
| { | ||
| if (threadPool.Source.Token.IsCancellationRequested) | ||
| { | ||
| Monitor.Pulse(lockObject); | ||
| } | ||
| } | ||
|
|
||
| return; | ||
| } | ||
|
|
||
| try | ||
| { | ||
| result = function(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| exception = ex; | ||
| } | ||
| finally | ||
| { | ||
| isCompleted = true; | ||
| } | ||
|
|
||
| lock (lockObject) | ||
| { | ||
| // Сообщаем потоку, ждущему на Monitor.Wait, что задача вполнена | ||
| Monitor.Pulse(lockObject); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PulseAll. Результат могут ждать больше одного потока. |
||
| queueOfTasksToComplete.TryDequeue(out Action? result); | ||
| if (result != null) | ||
| { | ||
| threadPool.QueueWorkItem(result!); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public TResult Result | ||
| { | ||
| get | ||
| { | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Лишняя пустая строчка |
||
| if (isCompleted) | ||
| { | ||
| // Если соответствующий задаче метод завершился с исключением, то бросаем AggregateException | ||
| if (exception != null) | ||
| { | ||
| throw new AggregateException(exception); | ||
| } | ||
|
|
||
| return result!; | ||
| } | ||
|
|
||
| threadPool.Source.Token.ThrowIfCancellationRequested(); | ||
|
|
||
| // Если результат еще не посчитан | ||
| if (!isCompleted) | ||
| { | ||
| lock (lockObject) | ||
| { | ||
| // Ждём когда рузльтат будет вычислен, блокируя при этом поток | ||
| while (!threadPool.Source.Token.IsCancellationRequested && !IsCompleted) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Если пулу делают Shutdown, кажется, Monitor.Wait об этом не узнает и будет ждать Monitor.Pulse на 61-й строчке вечно |
||
| { | ||
| Monitor.Wait(lockObject); | ||
| } | ||
|
|
||
| threadPool.Source.Token.ThrowIfCancellationRequested(); | ||
| } | ||
| } | ||
|
|
||
| // Если соответствующий задаче метод завершился с исключением, то бросаем AggregateException | ||
| if (exception != null) | ||
| { | ||
| throw new AggregateException(exception); | ||
| } | ||
|
|
||
| return result!; | ||
| } | ||
|
|
||
| private set { } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Это не нужно |
||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<TResult, TNewResult> func) | ||
| { | ||
| var task = new MyTask<TNewResult>(() => func(Result), threadPool); | ||
| if (isCompleted) | ||
| { | ||
| threadPool.QueueForTaskItem(task); | ||
| return task; | ||
| } | ||
|
|
||
| lock (lockObject) | ||
| { | ||
| if (isCompleted) | ||
| { | ||
| threadPool.QueueForTaskItem(task); | ||
| return task; | ||
| } | ||
| else | ||
| { | ||
| // выгружаем в очередь | ||
| queueOfTasksToComplete.Enqueue(task.Work); | ||
| } | ||
| } | ||
|
|
||
| return task; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Хм.