-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadExtensions.cs
168 lines (140 loc) · 4.5 KB
/
ThreadExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ThreadExtensions;
public class Dispatcher : IDisposable
{
internal static ConcurrentDictionary<Thread, Dispatcher> _dispatchers = new ConcurrentDictionary<Thread, Dispatcher>();
private readonly Thread _initialThread;
private readonly AutoResetEvent _taskAvailable = new AutoResetEvent(false);
private readonly ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
private bool _running = false;
private CancellationToken? Token;
public bool Cancelled => Token != null && Token.Value.IsCancellationRequested;
public bool Running => _running;
public bool CheckAccess()
{
return Thread.CurrentThread == _initialThread;
}
public void VerifyAccess()
{
if (!CheckAccess())
{
throw new InvalidOperationException("This method can only be called on the thread that created the dispatcher.");
}
}
public Dispatcher() : this(Thread.CurrentThread)
{}
internal Dispatcher( Thread initialThread )
{
_initialThread = initialThread;
_dispatchers.GetOrAdd(initialThread, this);
}
public void Run(CancellationToken token)
{
VerifyAccess();
if (_running) throw new InvalidOperationException("The dispatcher is already running.");
_running = true;
Token = token;
if(!_tasks.IsEmpty) _taskAvailable.Set();
try
{
while (!Cancelled)
{
if (_taskAvailable.WaitOne(100)) // Wait for a task or a cancellation request
{
while (!Cancelled && _tasks.TryDequeue(out var task))
{
try
{
task.RunSynchronously();
}
catch (Exception ex)
{
Console.WriteLine($"Exception in dispatched action: {ex}");
}
}
}
}
}
finally
{
Token = null;
if(!_tasks.IsEmpty) _taskAvailable.Set(); // Ensure that any pending Invoke operations complete
_running = false;
}
}
public Task<TResult> InvokeAsync<TResult>(Func<TResult> function)
{
if (function == null) throw new ArgumentNullException(nameof(function));
var tcs = new TaskCompletionSource<TResult>();
Action wrapperAction = () =>
{
try
{
tcs.SetResult(function());
}
catch (Exception ex)
{
tcs.SetException(ex);
}
};
_tasks.Enqueue(new Task(wrapperAction));
if (_running && !Cancelled) _taskAvailable.Set();
return tcs.Task;
}
public Task InvokeAsync(Delegate action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
// Attempt to convert the delegate to a Func<object>
if (action is Func<object> func)
{
return InvokeAsync(func);
}
// If the delegate is not a Func<object>, try to create a Func<object> that calls the delegate
var tcs = new TaskCompletionSource<object>();
Action wrapperAction = () =>
{
try
{
var result = action.DynamicInvoke();
// explicitly handle Possible null reference argument
if (result != null)
{
tcs.SetResult(result);
}
}
catch (Exception ex)
{
tcs.SetException(ex);
}
};
_tasks.Enqueue(new Task(wrapperAction));
if (_running && !Cancelled) _taskAvailable.Set();
return tcs.Task;
}
public void Dispose()
{
while (_running)
{
Thread.Sleep(100);
}
_taskAvailable.Dispose();
}
}
public static class ThreadExtensions
{
public static Dispatcher GetDispatcher(this Thread thread)
{
if (thread == null) throw new ArgumentNullException(nameof(thread));
Dispatcher? dispatcher = null;
Dispatcher._dispatchers.TryGetValue(thread, out dispatcher);
if (dispatcher == null)
{
dispatcher = new Dispatcher(thread);
}
return dispatcher;
}
}