Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Common IntelliJ Platform excludes

# User specific
**/.idea/**/workspace.xml
**/.idea/**/tasks.xml
**/.idea/shelf/*
**/.idea/dictionaries
**/.idea/httpRequests/

# Sensitive or high-churn files
**/.idea/**/dataSources/
**/.idea/**/dataSources.ids
**/.idea/**/dataSources.xml
**/.idea/**/dataSources.local.xml
**/.idea/**/sqlDataSources.xml
**/.idea/**/dynamic.xml

# Rider
# Rider auto-generates .iml files, and contentModel.xml
**/.idea/**/*.iml
**/.idea/**/contentModel.xml
**/.idea/**/modules.xml

*.suo
*.user
.vs/
[Bb]in/
[Oo]bj/
_UpgradeReport_Files/
[Pp]ackages/

Thumbs.db
Desktop.ini
.DS_Store

**/.idea/
22 changes: 22 additions & 0 deletions 3Homework12.10.22/MyThreadPool/MyThreadPool.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPool", "MyThreadPool\MyThreadPool.csproj", "{224B1961-604D-4EAB-ACCA-1D44E285FCDA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPoolTests", "MyThreadPoolTests\MyThreadPoolTests.csproj", "{F948F284-F2E1-4BC4-BC8B-DA89906D74AF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{224B1961-604D-4EAB-ACCA-1D44E285FCDA}.Release|Any CPU.Build.0 = Release|Any CPU
{F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F948F284-F2E1-4BC4-BC8B-DA89906D74AF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
26 changes: 26 additions & 0 deletions 3Homework12.10.22/MyThreadPool/MyThreadPool/IMyTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace MyThreadPool;

/// <summary>
/// An operation performed on MyThreadPool that returns a value.
/// </summary>
/// <typeparam name="TResult">Type of the returnable value.</typeparam>
public interface IMyTask<TResult>
{
/// <summary>
/// Gets a value indicating whether get true if the task is completed.
/// </summary>
bool IsCompleted { get; }

/// <summary>
/// Gets the task result.
/// </summary>
TResult Result { get; }

/// <summary>
/// Creates a new task based on this task.
/// </summary>
/// <param name="func">The function to perform.</param>
/// <typeparam name="TNewResult">Type of the new returnable value.</typeparam>
/// <returns>Task with new returnable value.</returns>
IMyTask<TNewResult> ContinueWith<TNewResult>(Func<TResult, TNewResult> func);
}
241 changes: 241 additions & 0 deletions 3Homework12.10.22/MyThreadPool/MyThreadPool/MyThreadPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
namespace MyThreadPool;

using System.Collections.Concurrent;

/// <summary>
/// The ThreadPool abstraction.
/// </summary>
public class MyThreadPool
{
private BlockingCollection<Action> tasks;
private MyThread[] threads;
private CancellationTokenSource source = new();
private bool isShutdown;
private AutoResetEvent tasksOver;
private Semaphore taskCount = new Semaphore(0, 10000);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему 10000? Больше 10000 пул не принимает? Нет, пул должен принимать сколько угодно задач. Можно без семафора, на Event-ах :)


/// <summary>
/// Initializes a new instance of the <see cref="MyThreadPool"/> class.
/// </summary>
/// <param name="threadCount">Number of this ThreadPool threads.</param>
public MyThreadPool(int threadCount = 10)
{
if (threadCount <= 0)
{
throw new InvalidDataException();
}

this.ThreadCount = threadCount;
this.tasks = new();
this.threads = new MyThread[threadCount];
this.tasksOver = new AutoResetEvent(false);

for (int i = 0; i < threadCount; ++i)
{
this.threads[i] = new MyThread(this.tasks, this.source.Token, this.tasksOver, this.taskCount);
}

this.isShutdown = false;
}

/// <summary>
/// Gets number of existing threads.
/// </summary>
public int ThreadCount { get; }

/// <summary>
/// Submits new task to the ThreadPool.
/// </summary>
/// <param name="func">A calculation to perform.</param>
/// <typeparam name="TResult">Value type.</typeparam>
/// <returns>Task.</returns>
public IMyTask<TResult> Submit<TResult>(Func<TResult> func)
{
var task = new MyTask<TResult>(func, this);
this.tasks.Add(() => task.Start());
this.tasksOver.Reset();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это не синхронизировано с Set в рабочем потоке, поэтому, кажется, может произойти "Рабочий поток увидел, что в очереди пусто, приготовился сделать tasksOver.Set, но тут произошло переключение контекстов, управление передалось в Submit, он сделал this.tasksOver.Reset();, управление вернулось в рабочий поток, он тут же сделал Set и задача в очереди, но задачи типа кончились."

this.taskCount.Release();
return task;
}

/// <summary>
/// Completes the threads.
/// </summary>
public void Shutdown()
{
if (this.isShutdown)
{
return;
}

this.tasks.CompleteAdding();
var timeoutThread = new Thread(() =>
{
Thread.Sleep(20000);
this.tasksOver.Set();
});

timeoutThread.Start();
this.tasksOver.WaitOne();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasksOver выставляется каждым рабочим потоком, когда он видит пустую очередь (например, в самом начале) и нигде не сбрасывается вроде. Если это так, то тут мы сразу же проскочим этот WaitOne, вне зависимости от занятости потоков.


if (this.tasks.Count > 0)
{
throw new TimeoutException("Tasks from the queue cannot be executed.");
}

this.source.Cancel();

for (int i = 0; i < this.ThreadCount; i++)
{
this.taskCount.Release();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release имеет аргумент "сколько зарелизить", так что можно было и не в цикле

}

var areJoined = true;
foreach (var thread in this.threads)
{
thread.Join();
if (thread.IsWorking)
{
areJoined = false;
}
}

this.isShutdown = true;
if (!areJoined)
{
throw new TimeoutException("Not all tasks were accomplished.");
}
}

private class MyThread
{
private Thread thread;
private BlockingCollection<Action> collection;
private int timeout = 5000;
private AutoResetEvent tasksOver;
private Semaphore tasksCount;

public MyThread(BlockingCollection<Action> collection, CancellationToken token, AutoResetEvent tasksOver, Semaphore tasksCount)
{
this.tasksOver = tasksOver;
this.collection = collection;
this.tasksCount = tasksCount;
this.thread = new Thread(() => this.Start(token));
this.IsWorking = false;
this.thread.Start();
}

public bool IsWorking { get; private set; }

public void Join()
{
if (this.thread.IsAlive)
{
this.thread.Join(this.timeout);
}
}

private void Start(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (this.collection.TryTake(out var action))

This comment was marked as resolved.

{
this.IsWorking = true;
action();
this.IsWorking = false;
}
else
{
this.tasksOver.Set();
this.tasksCount.WaitOne();
}
}
}
}

private class MyTask<T> : IMyTask<T>
{
private MyThreadPool pool;
private Func<T>? func;
private ManualResetEvent resetEvent = new(false);
private T? result;
private Exception? returnedException;
private BlockingCollection<Action> deferredTasks;

/// <summary>
/// Initializes a new instance of the <see cref="MyTask{T}"/> class.
/// </summary>
/// <param name="func">Function that will performed.</param>
/// <param name="threadPool">Pool for the submit.</param>
public MyTask(Func<T> func, MyThreadPool threadPool)
{
this.func = func;
this.pool = threadPool;
this.IsCompleted = false;
this.deferredTasks = new BlockingCollection<Action>();
}

/// <inheritdoc/>
public bool IsCompleted { get; private set; }

/// <inheritdoc/>
public T Result
{
get
{
this.resetEvent.WaitOne();
if (this.returnedException != null)
{
throw new AggregateException(this.returnedException);
}

return this.result!;
}
}

public void Start()
{
try
{
this.result = this.func!();
}
catch (Exception exception)
{
this.returnedException = exception;
}
finally
{
this.func = null;
this.IsCompleted = true;
this.resetEvent.Set();
this.deferredTasks.CompleteAdding();
foreach (var task in this.deferredTasks)
{
this.pool.Submit(() => task);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А если к этому моменту пулу сделали Shutdown, Submit бросит исключение --- в рабочий поток, так что пользователь ничего про него не узнает. А рабочий поток бесславно сдохнет. Нехорошо.

}
}

This comment was marked as resolved.

}

/// <inheritdoc/>
public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<T, TNewResult> func1)
{
if (this.result != null)
{
return this.pool.Submit(() => func1(this.Result));
}

MyTask<TNewResult> newTask = new MyTask<TNewResult>(() => func1(this.Result), this.pool);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Два раза один тип в одной строке не пишут :)

try
{
this.deferredTasks.Add(() => newTask.Start());
}
catch (InvalidOperationException)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вполне реалистичная ситуация, когда ContinueWith вызвался одновременно с окончанием Start. Попробуйте без исключений это разрешить (можно lock использовать -- Вы, я заметил, придумываете жуткие извращения, чтобы не писать lock). Исключения не должны бросаться, когда всё нормально.

{
return this.pool.Submit(() => func1(this.Result));
}

return newTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
Loading