Skip to content

Comments

My ThreadPool#5

Open
Niksen111 wants to merge 19 commits intomainfrom
3Homework
Open

My ThreadPool#5
Niksen111 wants to merge 19 commits intomainfrom
3Homework

Conversation

@Niksen111
Copy link
Owner

Simple thread pool


using NUnit.Framework;

public class Tests

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А ещё линтер очень недоволен, и тут даже по делу

[Test]
public void Test1()
{
Assert.Pass();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(

@@ -0,0 +1,62 @@
namespace MyThreadPool;

public class MyTask<T> : IMyTask<T>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Надо комментарии
  • Это лучше сделать private-вложенным классом ThreadPool-а, чтобы извне нельзя было делать странные вещи, например, вызывать Start

/// <inheritdoc/>
public bool IsCompleted { get; private set; } = false;

public MyTask(Func<T> func, MyThreadPool threadPool)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут nullability-анализ совсем недоволен

private Func<T> func;
private ManualResetEvent reset = new(false);
private T result;
private Exception? retrunedException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retruned :)

{
if (!this.IsCompleted)
{
this.reset.WaitOne();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это прямо нарушает требование условия про неблокирование вызывающего

Comment on lines 66 to 68
while (this.tasks.Count > 0)
{
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Крутящееся ожидание нельзя, процессор сожжёте :)

throw new AggregateException(this.retrunedException);
}

return this.result;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что-то я не понял, как в result в итоге оказывается значение

{
while (!token.IsCancellationRequested)
{
if (this.collection.TryTake(out var action))

This comment was marked as resolved.

Copy link

@yurii-litvinov yurii-litvinov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Существенно лучше, чем было, но существенно хуже, чем могло бы быть

Comment on lines 68 to 71
for (int i = 0; i < 20 && this.tasks.Count > 0; ++i)
{
Thread.Sleep(1000);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Так оно заблокирует вызывающего на целую секунду минимум, так нельзя. Рабочие потоки могут сами сообщать, что они всё, например, выставляя какой-нибудь AutoResetEvent, который мы тут можем ждать. И параллельно с этим запустить поток, висящий на ThreadSleep 20 секунд, и тоже выставляющий Event — типа таймаут. В реальной жизни это был бы Task.WhenAny, но в этой задаче Task.WhenAny нельзя — слишком высокоуровневый.

}
else
{
Thread.Sleep(1000);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Секунда для процессора — это целая вечность. Типа "пытаемся взять задачу, а если она не нашлась, засыпаем на эпохи, пока звёзды не займут правильное положение". И нет, тут Sleep не надо, мы можем при добавлении задачи в очередь будить поток — используйте ResetEvent-ы.

{
private MyThreadPool pool;
private Func<T> func;
private ManualResetEvent reset = new(false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это прежде всего не reset, а event, так что и называться должен так, чтобы отражать событие, которое представляет.

{
this.reset.Set();
this.IsCompleted = true;
}

This comment was marked as resolved.

Comment on lines 188 to 189
this.reset.Set();
this.IsCompleted = true;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Наоборот. Иначе будет задача, которая уже вернула результат, но ещё не IsCompleted, это не обязательно баг, но нездорово

/// <inheritdoc/>
public IMyTask<TNewResult> ContinueWith<TNewResult>(Func<T, TNewResult> func1)
{
return this.pool.Submit(() => func1(this.Result));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не-а. Если есть длинная задача, у которой 150 Continuation-ов, то они все попадут в пул и будут ждать, пока задача закончится (встав на ResetEvent в Result). Остальные задачи в пуле не смогут считаться, потому что потоки пула заняты. Надо, чтобы Continuation-ы попадали в пул только когда они готовы считаться.


pool.Shutdown();
}
} No newline at end of file

This comment was marked as resolved.

Copy link

@yurii-litvinov yurii-litvinov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ещё пару важных моментов всё-таки надо поправить

});

timeoutThread.Start();
this.tasksOver.WaitOne();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasksOver выставляется каждым рабочим потоком, когда он видит пустую очередь (например, в самом начале) и нигде не сбрасывается вроде. Если это так, то тут мы сразу же проскочим этот WaitOne, вне зависимости от занятости потоков.

{
var task = new MyTask<TResult>(func, this);
this.tasks.Add(() => task.Start());
this.taskAdded.Set();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskAdded — это AutoResetEvent, так что если мы вызвали десять раз Submit до активации какого-то из рабочих потоков, у нас будет десять задач в очереди и ровно один поток, который разбужен, чтобы их делать. Не очень хорошо.

}

var newTask = new MyTask<TNewResult>(NewFunc, this.pool);
var thread = new Thread(() => newTask.Start());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А так continuation запускается вообще в отдельном потоке, что, конечно, разгружает тредпул, но убивает всю идею тредпула :) Нет, так нельзя, надо класть на пул, но потом, по завершению основной задачи.

private bool isShutdown;
private AutoResetEvent tasksOver;
private AutoResetEvent taskAdded;
private Semaphore taskCount = new Semaphore(0, 10000);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему 10000? Больше 10000 пул не принимает? Нет, пул должен принимать сколько угодно задач. Можно без семафора, на Event-ах :)

for (int i = 0; i < this.ThreadCount; i++)
{
this.taskAdded.Set();
this.taskCount.Release();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release имеет аргумент "сколько зарелизить", так что можно было и не в цикле

var task = new MyTask<TResult>(func, this);
this.tasks.Add(() => task.Start());
this.taskAdded.Set();
this.tasksOver.Reset();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это не синхронизировано с Set в рабочем потоке, поэтому, кажется, может произойти "Рабочий поток увидел, что в очереди пусто, приготовился сделать tasksOver.Set, но тут произошло переключение контекстов, управление передалось в Submit, он сделал this.tasksOver.Reset();, управление вернулось в рабочий поток, он тут же сделал Set и задача в очереди, но задачи типа кончились."

return this.pool.Submit(() => func1(this.Result));
}

MyTask<TNewResult> newTask = new MyTask<TNewResult>(() => func1(this.Result), this.pool);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Два раза один тип в одной строке не пишут :)

{
this.deferredTasks.Add(() => newTask.Start());
}
catch (InvalidOperationException)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вполне реалистичная ситуация, когда ContinueWith вызвался одновременно с окончанием Start. Попробуйте без исключений это разрешить (можно lock использовать -- Вы, я заметил, придумываете жуткие извращения, чтобы не писать lock). Исключения не должны бросаться, когда всё нормально.

this.deferredTasks.CompleteAdding();
foreach (var task in this.deferredTasks)
{
this.pool.Submit(() => task);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А если к этому моменту пулу сделали Shutdown, Submit бросит исключение --- в рабочий поток, так что пользователь ничего про него не узнает. А рабочий поток бесславно сдохнет. Нехорошо.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants