@@ -9,7 +9,7 @@ use std::thread;
9
9
use std:: time:: { Duration , Instant } ;
10
10
11
11
use anyhow:: { anyhow, Result } ;
12
- use crossbeam_channel:: { bounded , Receiver , RecvTimeoutError , Sender } ;
12
+ use crossbeam_channel:: { unbounded , Receiver , RecvTimeoutError , Sender } ;
13
13
use etcetera:: BaseStrategy ;
14
14
use ignore:: overrides:: { Override , OverrideBuilder } ;
15
15
use ignore:: { self , WalkBuilder , WalkParallel , WalkState } ;
@@ -43,6 +43,77 @@ pub enum WorkerResult {
43
43
Error ( ignore:: Error ) ,
44
44
}
45
45
46
+ /// Storage for a WorkerResult.
47
+ type ResultBox = Box < Option < WorkerResult > > ;
48
+
49
+ /// A WorkerResult that recycles itself.
50
+ pub struct WorkerMsg {
51
+ inner : Option < ResultBox > ,
52
+ tx : Sender < ResultBox > ,
53
+ }
54
+
55
+ impl WorkerMsg {
56
+ /// Create a new message.
57
+ fn new ( inner : ResultBox , tx : Sender < ResultBox > ) -> Self {
58
+ Self {
59
+ inner : Some ( inner) ,
60
+ tx,
61
+ }
62
+ }
63
+
64
+ /// Extract the result from this message.
65
+ pub fn take ( mut self ) -> WorkerResult {
66
+ self . inner . as_mut ( ) . unwrap ( ) . take ( ) . unwrap ( )
67
+ }
68
+ }
69
+
70
+ impl Drop for WorkerMsg {
71
+ fn drop ( & mut self ) {
72
+ let _ = self . tx . send ( self . inner . take ( ) . unwrap ( ) ) ;
73
+ }
74
+ }
75
+
76
+ /// A pool of WorkerResults that can be recycled.
77
+ struct ResultPool {
78
+ size : usize ,
79
+ tx : Sender < ResultBox > ,
80
+ rx : Receiver < ResultBox > ,
81
+ }
82
+
83
+ /// Capacity was chosen empircally to perform similarly to an unbounded channel
84
+ const RESULT_POOL_CAPACITY : usize = 0x4000 ;
85
+
86
+ impl ResultPool {
87
+ /// Create an empty pool.
88
+ fn new ( ) -> Self {
89
+ let ( tx, rx) = unbounded ( ) ;
90
+
91
+ Self { size : 0 , tx, rx }
92
+ }
93
+
94
+ /// Allocate or recycle a WorkerResult from the pool.
95
+ fn get ( & mut self , result : WorkerResult ) -> WorkerMsg {
96
+ let inner = if self . size < RESULT_POOL_CAPACITY {
97
+ match self . rx . try_recv ( ) {
98
+ Ok ( mut inner) => {
99
+ * inner = Some ( result) ;
100
+ inner
101
+ }
102
+ Err ( _) => {
103
+ self . size += 1 ;
104
+ Box :: new ( Some ( result) )
105
+ }
106
+ }
107
+ } else {
108
+ let mut inner = self . rx . recv ( ) . unwrap ( ) ;
109
+ * inner = Some ( result) ;
110
+ inner
111
+ } ;
112
+
113
+ WorkerMsg :: new ( inner, self . tx . clone ( ) )
114
+ }
115
+ }
116
+
46
117
/// Maximum size of the output buffer before flushing results to the console
47
118
const MAX_BUFFER_LENGTH : usize = 1000 ;
48
119
/// Default duration until output buffering switches to streaming.
@@ -56,8 +127,8 @@ struct ReceiverBuffer<'a, W> {
56
127
quit_flag : & ' a AtomicBool ,
57
128
/// The ^C notifier.
58
129
interrupt_flag : & ' a AtomicBool ,
59
- /// Receiver for worker results .
60
- rx : Receiver < WorkerResult > ,
130
+ /// Receiver for worker messages .
131
+ rx : Receiver < WorkerMsg > ,
61
132
/// Standard output.
62
133
stdout : W ,
63
134
/// The current buffer mode.
@@ -72,7 +143,7 @@ struct ReceiverBuffer<'a, W> {
72
143
73
144
impl < ' a , W : Write > ReceiverBuffer < ' a , W > {
74
145
/// Create a new receiver buffer.
75
- fn new ( state : & ' a WorkerState , rx : Receiver < WorkerResult > , stdout : W ) -> Self {
146
+ fn new ( state : & ' a WorkerState , rx : Receiver < WorkerMsg > , stdout : W ) -> Self {
76
147
let config = & state. config ;
77
148
let quit_flag = state. quit_flag . as_ref ( ) ;
78
149
let interrupt_flag = state. interrupt_flag . as_ref ( ) ;
@@ -104,7 +175,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
104
175
105
176
/// Receive the next worker result.
106
177
fn recv ( & self ) -> Result < WorkerResult , RecvTimeoutError > {
107
- match self . mode {
178
+ let result = match self . mode {
108
179
ReceiverMode :: Buffering => {
109
180
// Wait at most until we should switch to streaming
110
181
self . rx . recv_deadline ( self . deadline )
@@ -113,7 +184,8 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
113
184
// Wait however long it takes for a result
114
185
Ok ( self . rx . recv ( ) ?)
115
186
}
116
- }
187
+ } ;
188
+ result. map ( WorkerMsg :: take)
117
189
}
118
190
119
191
/// Wait for a result or state change.
@@ -319,7 +391,7 @@ impl WorkerState {
319
391
320
392
/// Run the receiver work, either on this thread or a pool of background
321
393
/// threads (for --exec).
322
- fn receive ( & self , rx : Receiver < WorkerResult > ) -> ExitCode {
394
+ fn receive ( & self , rx : Receiver < WorkerMsg > ) -> ExitCode {
323
395
let config = & self . config ;
324
396
325
397
// This will be set to `Some` if the `--exec` argument was supplied.
@@ -355,12 +427,13 @@ impl WorkerState {
355
427
}
356
428
357
429
/// Spawn the sender threads.
358
- fn spawn_senders ( & self , walker : WalkParallel , tx : Sender < WorkerResult > ) {
430
+ fn spawn_senders ( & self , walker : WalkParallel , tx : Sender < WorkerMsg > ) {
359
431
walker. run ( || {
360
432
let patterns = & self . patterns ;
361
433
let config = & self . config ;
362
434
let quit_flag = self . quit_flag . as_ref ( ) ;
363
435
let tx = tx. clone ( ) ;
436
+ let mut pool = ResultPool :: new ( ) ;
364
437
365
438
Box :: new ( move |entry| {
366
439
if quit_flag. load ( Ordering :: Relaxed ) {
@@ -387,20 +460,22 @@ impl WorkerState {
387
460
DirEntry :: broken_symlink ( path)
388
461
}
389
462
_ => {
390
- return match tx . send ( WorkerResult :: Error ( ignore:: Error :: WithPath {
463
+ let result = pool . get ( WorkerResult :: Error ( ignore:: Error :: WithPath {
391
464
path,
392
465
err : inner_err,
393
- } ) ) {
466
+ } ) ) ;
467
+ return match tx. send ( result) {
394
468
Ok ( _) => WalkState :: Continue ,
395
469
Err ( _) => WalkState :: Quit ,
396
- }
470
+ } ;
397
471
}
398
472
} ,
399
473
Err ( err) => {
400
- return match tx. send ( WorkerResult :: Error ( err) ) {
474
+ let result = pool. get ( WorkerResult :: Error ( err) ) ;
475
+ return match tx. send ( result) {
401
476
Ok ( _) => WalkState :: Continue ,
402
477
Err ( _) => WalkState :: Quit ,
403
- }
478
+ } ;
404
479
}
405
480
} ;
406
481
@@ -509,7 +584,8 @@ impl WorkerState {
509
584
}
510
585
}
511
586
512
- let send_result = tx. send ( WorkerResult :: Entry ( entry) ) ;
587
+ let result = pool. get ( WorkerResult :: Entry ( entry) ) ;
588
+ let send_result = tx. send ( result) ;
513
589
514
590
if send_result. is_err ( ) {
515
591
return WalkState :: Quit ;
@@ -545,8 +621,7 @@ impl WorkerState {
545
621
. unwrap ( ) ;
546
622
}
547
623
548
- // Channel capacity was chosen empircally to perform similarly to an unbounded channel
549
- let ( tx, rx) = bounded ( 0x4000 * config. threads ) ;
624
+ let ( tx, rx) = unbounded ( ) ;
550
625
551
626
let exit_code = thread:: scope ( |scope| {
552
627
// Spawn the receiver thread(s)
0 commit comments