4
4
using System . Net ;
5
5
using System . Net . NetworkInformation ;
6
6
using System . Text ;
7
+ using System . Threading . Channels ;
8
+
7
9
namespace Pings
8
10
{
9
- public class ObservableQueue < T > : Queue < T >
11
+ public class ObservableQueue < T > : IDisposable
10
12
{
13
+ public bool IsBounded { get ; init ; } = false ;
14
+ private readonly Channel < T > _channel ;
15
+ private bool _disposed ;
16
+
17
+ public ObservableQueue ( ChannelOptions ? options = null , int ? capacity = null )
18
+ {
19
+ if ( options is null && capacity is null )
20
+ {
21
+ _channel = Channel . CreateUnbounded < T > ( ) ;
22
+ return ;
23
+ }
24
+ if ( options is null && capacity is not null && capacity > 0 )
25
+ {
26
+ _channel = Channel . CreateBounded < T > ( ( int ) capacity ) ;
27
+ IsBounded = true ;
28
+ return ;
29
+ }
30
+ if ( options is null && capacity is not null && capacity < 0 )
31
+ throw new ArgumentException ( "Queue type is bounded, but the capacity parameter less than zero." , nameof ( capacity ) ) ;
32
+
33
+ if ( options is BoundedChannelOptions boundedOptions )
34
+ {
35
+ _channel = Channel . CreateBounded < T > ( boundedOptions ) ;
36
+ IsBounded = true ;
37
+ }
38
+ else if ( options is UnboundedChannelOptions unboundedOptions )
39
+ {
40
+ _channel = Channel . CreateUnbounded < T > ( unboundedOptions ) ;
41
+ }
42
+ else
43
+ {
44
+ throw new ArgumentNullException ( nameof ( options ) , "Unable to confirm queue type." ) ;
45
+ }
46
+ }
47
+ public ObservableQueue ( ) : this ( null , null ) { }
48
+ public ObservableQueue ( int capacity ) : this ( null , capacity ) { }
49
+ public ObservableQueue ( ChannelOptions options ) : this ( options , null ) { }
50
+
51
+
11
52
public event Action < T > ? Enqueued ;
12
53
public event Action < T > ? Dequeued ;
13
- public new void Enqueue ( T item )
54
+
55
+ public async Task EnqueueAsync ( T item )
14
56
{
15
- base . Enqueue ( item ) ;
57
+ ObjectDisposedException . ThrowIf ( _disposed , nameof ( ObservableQueue < T > ) ) ;
58
+ await _channel . Writer . WriteAsync ( item ) ;
16
59
Enqueued ? . Invoke ( item ) ;
17
60
}
18
- public new T Dequeue ( )
61
+
62
+ public async Task < T > DequeueAsync ( )
19
63
{
20
- T item = base . Dequeue ( ) ;
64
+ ObjectDisposedException . ThrowIf ( _disposed , nameof ( ObservableQueue < T > ) ) ;
65
+ T item = await _channel . Reader . ReadAsync ( ) ;
21
66
Dequeued ? . Invoke ( item ) ;
22
67
return item ;
23
68
}
69
+
70
+ public int Count => _channel . Reader . Count ;
71
+
72
+ public void Dispose ( )
73
+ {
74
+ Dispose ( true ) ;
75
+ GC . SuppressFinalize ( this ) ;
76
+ }
77
+
78
+ protected virtual void Dispose ( bool disposing )
79
+ {
80
+ if ( ! _disposed )
81
+ {
82
+ if ( disposing )
83
+ {
84
+ _channel . Writer . Complete ( ) ;
85
+ }
86
+ _disposed = true ;
87
+ }
88
+ }
24
89
}
90
+
25
91
public static class IPStatusExtensions
26
92
{
27
93
public static string ToChineseString ( this IPStatus status )
@@ -69,7 +135,7 @@ public ICMPMonitor(CancellationTokenSource cancellationTokenSource, Logging? log
69
135
Logging = logging ;
70
136
Tasks = [ ] ;
71
137
TaskMap = [ ] ;
72
- TasksTable = new ( ) { Caption = new TableTitle ( "当前丢包(L) / 确认警告(C) / 退出(Q)" ) } ;
138
+ TasksTable = new ( ) { Caption = new TableTitle ( "确认警告(C) / 退出(Q)" ) } ;
73
139
TasksTable . AddColumns ( "名称" , "IP/域名" , "状态" , "延迟" , "警告/日志" ) ;
74
140
TasksTable . Centered ( ) ;
75
141
}
@@ -89,13 +155,13 @@ public void AddHost(ICMPTestTask newTask)
89
155
{
90
156
TasksTable . Rows . Update ( TaskMap [ task . IP ] , 3 , new Text ( $ "{ ( int ) task . Delay . TotalMilliseconds } ms") ) ;
91
157
} ;
92
- newTask . OpenWarning += ( task ) =>
158
+ newTask . OpenWarning += async ( task ) =>
93
159
{
94
160
Logging ? . Log ( $ "{ task . Name } ({ task . IP } ) 触发警告 当前状态:{ task . State . ToChineseString ( ) } ") ;
95
161
96
- TasksTable . Rows . Update ( TaskMap [ task . IP ] , 4 , new Text ( task . Warnings . Peek ( ) , new Style ( Color . Yellow , Color . Red , Decoration . Bold ) ) ) ;
162
+ TasksTable . Rows . Update ( TaskMap [ task . IP ] , 4 , new Text ( await task . Warnings . DequeueAsync ( ) , new Style ( Color . Yellow , Color . Red , Decoration . Bold ) ) ) ;
97
163
} ;
98
- newTask . ConfirmWarning += ( task ) =>
164
+ newTask . ConfirmWarning += async ( task ) =>
99
165
{
100
166
if ( ! task . IsWarning )
101
167
{
@@ -105,30 +171,25 @@ public void AddHost(ICMPTestTask newTask)
105
171
}
106
172
else
107
173
{
108
- TasksTable . Rows . Update ( TaskMap [ task . IP ] , 4 , new Text ( task . Warnings . Peek ( ) , new Style ( Color . Yellow , Color . Red , Decoration . Bold ) ) ) ;
174
+ TasksTable . Rows . Update ( TaskMap [ task . IP ] , 4 , new Text ( await task . Warnings . DequeueAsync ( ) , new Style ( Color . Yellow , Color . Red , Decoration . Bold ) ) ) ;
109
175
}
110
176
} ;
111
- newTask . StatusChanged += ( task ) =>
177
+ newTask . StatusChanged += async ( task ) =>
112
178
{
113
179
Logging ? . Log ( $ "{ task . Name } ({ task . IP } ) { task . State . ToChineseString ( ) } { ( int ) task . Delay . TotalMilliseconds } ms") ;
114
180
115
181
TasksTable . Rows . Update ( TaskMap [ task . IP ] , 2 , task . State == IPStatus . Success ? new Text ( task . State . ToChineseString ( ) ) : new Text ( task . State . ToChineseString ( ) , new Style ( Color . Yellow , Color . Red , Decoration . Bold ) ) ) ;
116
182
117
183
task . LastLog = $ "{ task . State . ToChineseString ( ) } [{ DateTime . Now : yyyy-MM-dd HH:mm:ss} ] ";
118
- if ( task . State != IPStatus . Success ) task . Warnings . Enqueue ( $ "{ task . State . ToChineseString ( ) } [{ DateTime . Now : yyyy-MM-dd HH:mm:ss} ]") ;
184
+ if ( task . State != IPStatus . Success ) await task . Warnings . EnqueueAsync ( $ "{ task . State . ToChineseString ( ) } [{ DateTime . Now : yyyy-MM-dd HH:mm:ss} ]") ;
119
185
} ;
120
186
newTask . DelayExceptionOccurred += ( task ) =>
121
187
{
122
188
Logging ? . Log ( $ "{ task . Name } ({ task . IP } ) 延迟波动 { ( int ) task . PreviousDelay . TotalMilliseconds } ms -> { ( int ) task . Delay . TotalMilliseconds } ms") ;
123
189
124
190
task . LastLog = $ "延迟波动 { ( int ) task . PreviousDelay . TotalMilliseconds } ms -> { ( int ) task . Delay . TotalMilliseconds } ms";
125
191
} ;
126
- newTask . RecentLossRateRecorded += ( task ) =>
127
- {
128
- if ( task . RecentLossRate > ( double ) 1 / task . MaxRecentPackets ) Logging ? . Log ( $ "丢包率 { task . RecentLossRate : F2} %") ;
129
-
130
- task . LastLog = $ "丢包率 { task . RecentLossRate : F2} %";
131
- } ;
192
+
132
193
Tasks . Add ( newTask ) ;
133
194
}
134
195
}
@@ -143,18 +204,15 @@ public class ICMPTestTask
143
204
public event Action < ICMPTestTask > ? StatusChanged ;
144
205
public event Action < ICMPTestTask > ? DelayChanged ;
145
206
public event Action < ICMPTestTask > ? DelayExceptionOccurred ;
146
- public event Action < ICMPTestTask > ? RecentLossRateRecorded ;
147
207
148
208
public string Name { get ; init ; }
149
209
public string IP { get ; init ; }
150
210
public int MaxRecentPackets { get ; init ; }
151
211
public TimeSpan SignificantDelayThreshold { get ; init ; }
152
212
public TimeSpan PreviousDelay { get ; set ; } = DefaultDelay ;
153
- public ObservableQueue < string > Warnings { get ; set ; } = new ( ) ;
154
- private Queue < IPStatus > RecentPackets { get ; set ; } = new ( ) ;
155
-
213
+ public ObservableQueue < string > Warnings { get ; set ; }
214
+ private ObservableQueue < IPStatus > RecentPackets { get ; set ; }
156
215
public bool IsWarning => Warnings . Count > 0 ;
157
- public double RecentLossRate => RecentPackets . Count == 0 ? 0 : ( double ) RecentPackets . Count ( status => status != IPStatus . Success ) / RecentPackets . Count * 100 ;
158
216
159
217
private string ? lastLog ;
160
218
public string LastLog
@@ -220,21 +278,17 @@ private bool IsSignificantDelayChange()
220
278
&& Delay > PreviousDelay
221
279
&& ( Delay - PreviousDelay ) . Duration ( ) > SignificantDelayThreshold ;
222
280
}
223
- private void UpdateRecentPackets ( IPStatus state )
224
- {
225
- RecentPackets . Enqueue ( state ) ;
226
- if ( RecentPackets . Count > MaxRecentPackets )
227
- {
228
- RecentPackets . Dequeue ( ) ;
229
- }
230
- }
281
+
231
282
public ICMPTestTask ( CancellationTokenSource CTS , string name , string ip , int timeout , int maxRecentPackets , int significantDelayThreshold )
232
283
{
233
284
Name = name ;
234
285
IP = ip ;
235
286
MaxRecentPackets = maxRecentPackets ;
236
287
SignificantDelayThreshold = TimeSpan . FromMilliseconds ( significantDelayThreshold ) ;
237
288
289
+ Warnings = new ( ) ;
290
+ RecentPackets = new ( maxRecentPackets ) ;
291
+
238
292
Warnings . Enqueued += warning => OpenWarning ? . Invoke ( this ) ;
239
293
Warnings . Dequeued += warning => ConfirmWarning ? . Invoke ( this ) ;
240
294
@@ -251,24 +305,22 @@ public ICMPTestTask(CancellationTokenSource CTS, string name, string ip, int tim
251
305
stopwatch . Restart ( ) ;
252
306
reply = ping . Send ( IP , timeout ) ;
253
307
stopwatch . Stop ( ) ;
254
- State = reply . Status ;
255
308
Delay = State == IPStatus . Success
256
309
? TimeSpan . FromMilliseconds ( reply . RoundtripTime )
257
310
: DefaultDelay ;
311
+ State = reply . Status ;
258
312
}
259
313
catch
260
314
{
261
- State = IPStatus . Unknown ;
262
315
Delay = DefaultDelay ;
263
- RecentLossRateRecorded ? . Invoke ( this ) ;
316
+ State = IPStatus . Unknown ;
264
317
}
265
318
266
- UpdateRecentPackets ( State ) ;
319
+ await RecentPackets . EnqueueAsync ( State ) ;
267
320
268
321
pingCounter ++ ;
269
322
if ( pingCounter == MaxRecentPackets )
270
323
{
271
- RecentLossRateRecorded ? . Invoke ( this ) ;
272
324
pingCounter = 0 ;
273
325
}
274
326
@@ -356,6 +408,8 @@ public struct ICMPTaskConfig
356
408
private int ? timeout ;
357
409
private int ? maxRecentPackets ;
358
410
private int ? significantDelayThreshold ;
411
+ private double ? packetLossDuration ;
412
+ private int ? packetLossCount ;
359
413
360
414
public string Name
361
415
{
@@ -411,6 +465,26 @@ public int SignificantDelayThreshold
411
465
significantDelayThreshold = value ;
412
466
}
413
467
}
468
+ public TimeSpan PacketLossDuration
469
+ {
470
+ get => TimeSpan . FromSeconds ( packetLossDuration ?? ( Timeout / 1000 ) * 30 ) ;
471
+ set
472
+ {
473
+ if ( value . TotalMilliseconds < Timeout )
474
+ throw new ArgumentOutOfRangeException ( nameof ( PacketLossDuration ) , $ "PacketLossDuration must be greater than { nameof ( Timeout ) } .") ;
475
+ packetLossDuration = value . TotalSeconds ;
476
+ }
477
+ }
478
+ public int PacketLossCount
479
+ {
480
+ get => packetLossCount ?? 5 ;
481
+ set
482
+ {
483
+ if ( value < 0 )
484
+ throw new ArgumentOutOfRangeException ( nameof ( PacketLossCount ) , "PacketLossCount must be non-negative." ) ;
485
+ packetLossCount = value ;
486
+ }
487
+ }
414
488
415
489
public ICMPTaskConfig ( params string [ ] raw )
416
490
{
@@ -423,6 +497,8 @@ public ICMPTaskConfig(params string[] raw)
423
497
if ( raw . Length > 2 && int . TryParse ( raw [ 2 ] , out int parsedTimeout ) ) Timeout = parsedTimeout ;
424
498
if ( raw . Length > 3 && int . TryParse ( raw [ 3 ] , out int parsedMaxRecentPackets ) ) MaxRecentPackets = parsedMaxRecentPackets ;
425
499
if ( raw . Length > 4 && int . TryParse ( raw [ 4 ] , out int parsedSignificantDelayThreshold ) ) SignificantDelayThreshold = parsedSignificantDelayThreshold ;
500
+ if ( raw . Length > 5 && double . TryParse ( raw [ 5 ] , out double parsedPacketLossDuration ) ) PacketLossDuration = TimeSpan . FromSeconds ( parsedPacketLossDuration ) ;
501
+ if ( raw . Length > 6 && int . TryParse ( raw [ 6 ] , out int parsedPacketLossCount ) ) PacketLossCount = parsedPacketLossCount ;
426
502
}
427
503
428
504
private static bool IsValidIP ( string ip )
@@ -457,9 +533,9 @@ static void Main(string[] args)
457
533
{
458
534
ICMPMonitor monitor = new ( CTS , Logging ) ;
459
535
string [ ] configLines = File . ReadAllLines ( configPath ) . Select ( line => line . Trim ( ) ) . Where ( line => line . Split ( ' ' ) . Length > 1 ) . ToArray ( ) ;
460
- foreach ( var line in configLines . Select ( line => new ICMPTaskConfig ( line . Split ( ' ' ) ) ) )
536
+ foreach ( var item in configLines . Select ( line => new ICMPTaskConfig ( line . Split ( ' ' ) ) ) )
461
537
{
462
- monitor . AddHost ( new ( CTS , line ) ) ;
538
+ monitor . AddHost ( new ( CTS , item ) ) ;
463
539
}
464
540
AnsiConsole . Clear ( ) ;
465
541
AnsiConsole . WriteLine ( ) ;
@@ -480,18 +556,11 @@ static void Main(string[] args)
480
556
CTS . Cancel ( ) ;
481
557
break ;
482
558
}
483
- if ( key . Key == ConsoleKey . L )
484
- {
485
- foreach ( var task in monitor . Tasks . FindAll ( i => ! i . IsWarning ) )
486
- {
487
- task . LastLog = $ "丢包率 { task . RecentLossRate : F2} %";
488
- }
489
- }
490
559
if ( key . Key == ConsoleKey . C )
491
560
{
492
561
foreach ( var task in monitor . Tasks . FindAll ( i => i . IsWarning ) )
493
562
{
494
- task . Warnings . Dequeue ( ) ;
563
+ _ = task . Warnings . DequeueAsync ( ) ;
495
564
}
496
565
}
497
566
}
0 commit comments