Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions MyThreadPool/MyThreadPool.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.4.33403.182
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{571D564D-1D0C-469E-857E-D0D1390DC178}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestsForMyThreadPool", "TestsForMyThreadPool\TestsForMyThreadPool.csproj", "{EDDD394B-53B2-4A35-916C-7AEB2FBF3728}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{571D564D-1D0C-469E-857E-D0D1390DC178}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{571D564D-1D0C-469E-857E-D0D1390DC178}.Debug|Any CPU.Build.0 = Debug|Any CPU
{571D564D-1D0C-469E-857E-D0D1390DC178}.Release|Any CPU.ActiveCfg = Release|Any CPU
{571D564D-1D0C-469E-857E-D0D1390DC178}.Release|Any CPU.Build.0 = Release|Any CPU
{EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EDDD394B-53B2-4A35-916C-7AEB2FBF3728}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B78A1404-BCEF-4F4C-93D1-CCA4EFF20DB0}
EndGlobalSection
EndGlobal
20 changes: 20 additions & 0 deletions MyThreadPool/MyThreadPool/IMyTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace MyThreadPool;

/// <summary>
/// Interface for creating tasks
/// </summary>
public interface IMyTask<TResult>
{
/// <summary>
/// Check is completed Task and return result
/// </summary>
public bool IsCompleted { get; }
/// <summary>
/// Return Result Task
/// </summary>
public TResult? Result { get; }
/// <summary>
/// A method for solving subtasks from the results obtained from the tasks
/// </summary>
public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<TResult, TNewResult> supplier);
}
242 changes: 242 additions & 0 deletions MyThreadPool/MyThreadPool/MyThreadPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
namespace MyThreadPool;

/// <summary>
/// A class for automatic efficient flow control in the program.
/// </summary>
public class MyThreadPool
{
private readonly MyThread[]? arrayThreads;
private readonly Queue<Action> tasks = new();
private readonly CancellationTokenSource token = new();
private readonly Object lockerForThreads;
private EventWaitHandle waitHandle;

/// <summary>
/// Constructor for creating n number of threads for tasks
/// </summary>
public MyThreadPool(int sizeThreads)
{
waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь тоже лучше ManualResetEvent использовать. Хотя если вы его используйте для пропуска одного потока для выполнения задачи при Submit, то лучше AutoResetEvent. MRE только все потоки умеет пропускать за раз

lockerForThreads = new();
if (sizeThreads <= 0)
{
throw new ArgumentOutOfRangeException();
}
arrayThreads = new MyThread[sizeThreads];
for (int i = 0; i < sizeThreads; i++)
{
arrayThreads[i] = new(tasks, token.Token, lockerForThreads, waitHandle);
}
}

/// <summary>
/// Get number of threads
/// </summary>
public int GetNumberOfThreads()
{
if (arrayThreads == null)
{
throw new ArgumentNullException();
}
return arrayThreads.Length;
}

/// <summary>
/// Accepts a function, adds it as a task in the thread, and returns the created task
/// </summary>
public IMyTask<TResult> Submit<TResult>(Func<TResult> supplier)
{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь надо учесть условие "после вызова Shutdown новые задачи не принимаются на исполнение потоками из пула", а также согласовать исполнение метод с Shutdown (сделать так, чтобы не было гонок при вызове Submit и Shutdown одновременно из разных потоков)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Согласование Submit с Shutdown -- не готово.
(для этого надо использовать lock по защищаемому ресурсу)

if (token.IsCancellationRequested)
{
throw new ShudownWasThrownException();
}
if (arrayThreads == null)
{
throw new ArgumentNullException();
}
var newTask = new MyTask<TResult>(supplier, token, this);
tasks.Enqueue(() => newTask.StartSupplier());
waitHandle.Set();

return newTask;
}

/// <summary>
/// Interrupts the processing of tasks that are not started do not begin, and those that are started are being completed
/// </summary>
public void Shutdown()
{
if (arrayThreads == null)
{
throw new ArgumentNullException(nameof(arrayThreads));
}

token.Cancel();
waitHandle.Set();

foreach(var thread in arrayThreads)
{
thread.Join();
}
}

private class MyThread
{
private Thread? thread;
private volatile Queue<Action> tasks;
private Object locker;
private CancellationToken token;
private EventWaitHandle waitHandle;

/// <summary>
/// Task-based custom thread constructor
/// </summary>
public MyThread(Queue<Action> tasks, CancellationToken token, object locker, EventWaitHandle waitHandle)
{
this.waitHandle = waitHandle;
this.tasks = tasks;
this.token = token;
this.locker = locker;
thread = new Thread(EternalCycle);
thread.Start();
}

/// <summary>
/// Task waiting cycle
/// </summary>
private void EternalCycle()
{
Action? task = null;
while (!token.IsCancellationRequested)
{
waitHandle.WaitOne();
if (!token.IsCancellationRequested)
{
lock (locker)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь lock по очереди задач надо, как ранее писал

{
if (tasks.Count != 0)
{
var isCorrect = tasks.TryDequeue(out task);
if (!isCorrect)
{
throw new InvalidOperationException();
}
}
if (tasks.Count == 0)
{
waitHandle.Reset();
}
}
if (task != null)
{
task();
}
task = null;
}
}
}

/// <summary>
/// Suspends the current thread
/// </summary>
public void Join()
{
if (thread == null)
{
throw new ArgumentNullException(nameof(thread));
}

thread.Join();
}
}

private class MyTask<TResult> : IMyTask<TResult>
{
private readonly Func<TResult>? supplier;
private TResult? result;
private Exception? exception;
private CancellationTokenSource token;
private ManualResetEvent resetEvent = new ManualResetEvent(false);
private MyThreadPool? pool;

/// <summary>
/// Constructor for creating a task
/// </summary>
public MyTask(Func<TResult> supplier, CancellationTokenSource token, MyThreadPool pool)
{
this.supplier = supplier;
this.token = token;
this.pool = pool;
this.result = default;
}

/// <summary>
/// Get Result
/// </summary>
public TResult Result
{
get
{
if (waitHandle == null)
{
throw new InvalidOperationException();
}
waitHandle.WaitOne();
if (exception != null)
{
throw new AggregateException(exception);
}
return result!;
}
}

/// <summary>
/// Check is completed Task and return result
/// </summary>
public bool IsCompleted { get; private set; }

/// <summary>
/// Task completion
/// </summary>
public void StartSupplier()
{
if (supplier != null)
{
try
{
result = supplier();
}
catch (Exception ex)
{
exception = ex;
}

if (waitHandle == null)
{
throw new InvalidOperationException();
}

waitHandle.Set();

IsCompleted = true;
}
}

/// <summary>
/// A method for solving subtasks from the results obtained from the tasks
/// </summary>
public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<TResult, TNewResult> supplier)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь и в StartSupplier куда-то пропала функциональность по добавлению всех continuation-ов в очередь и сабмиту этих continuation-ов в тредпул, когда "базовая" задача досчиталась. А это для выполнения условий задачи важно

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Прошлое замечание по защите от гонок между ContinueWith и Shutdown, между добавлением continuation-ов на пул и Shutdown актуально

{
if (pool == null)
{
throw new InvalidOperationException();
}

if (!token.IsCancellationRequested)
{
return pool.Submit(() => supplier(Result));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ниже компилятор ругается: действительно, почему supplier может быть null?

}
throw new ShudownWasThrownException();
}
}
}
11 changes: 11 additions & 0 deletions MyThreadPool/MyThreadPool/MyThreadPool.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<OutputType>Library</OutputType>
</PropertyGroup>

</Project>
13 changes: 13 additions & 0 deletions MyThreadPool/MyThreadPool/ShudownWasThrownException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace MyThreadPool;

/// <summary>
/// An exception is thrown if, after calling the Shutdown method, the user calls other methods
/// </summary>
public class ShudownWasThrownException : Exception
{
public ShudownWasThrownException() { }

public ShudownWasThrownException(string? message) : base(message) { }

public ShudownWasThrownException(string? message, Exception? innerException) : base(message, innerException) { }
}
Loading