Skip to content

Commit c260458

Browse files
committed
Should make fork join more stable
1 parent 86630b6 commit c260458

File tree

2 files changed

+52
-10
lines changed

2 files changed

+52
-10
lines changed

sitelib/util/concurrent/fork-join-pool.scm

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
shutdown)
8484
(protocol (lambda (p)
8585
(lambda (pool)
86-
(p (make-notifier)
86+
(p (make-event)
8787
pool
8888
(make-shared-queue)
8989
(make-atomic #f))))))
@@ -94,9 +94,9 @@
9494
(define (worker-queue-empty? wq) (shared-queue-empty? (worker-queue-queue wq)))
9595
(define (worker-queue-size wq) (shared-queue-size (worker-queue-queue wq)))
9696
(define (worker-queue-notify! wq)
97-
(notifier-send-notification! (worker-queue-notifier wq)))
97+
(event-set-event! (worker-queue-notifier wq)))
9898
(define (worker-queue-wait! wq . opts)
99-
(apply notifier-wait-notification! (worker-queue-notifier wq) opts))
99+
(apply event-receive! (worker-queue-notifier wq) opts))
100100
(define (worker-queue-shutdown! wq)
101101
(atomic-store! (worker-queue-shutdown wq) #t)
102102
(worker-queue-notify! wq))
@@ -157,7 +157,6 @@
157157
thread-count ;; # of worker threads
158158
idle-count ;; # of idle threads
159159
parameter
160-
notifier
161160
lock)
162161
(protocol (lambda (p)
163162
(lambda ((n (or integer? fork-join-pool-parameters?)))
@@ -173,7 +172,6 @@
173172
(make-atomic-fixnum 0)
174173
(make-atomic-fixnum 0)
175174
parameter
176-
(make-notifier)
177175
(make-mutex))))))
178176

179177
(define (fork-join-pool-keep-alive pool)
@@ -265,13 +263,17 @@
265263
(fork-join-pool-thread-count-dec! pool))
266264

267265
(define (fork-join-pool-run-worker pool wq)
268-
(define (run-task task wq)
266+
(define (run-task task root-wq wq)
269267
(when task
270268
(task)
271-
(run-task (worker-queue-pop! wq 0 #f) wq)))
269+
(run-task (or (worker-queue-pop! wq 0 #f)
270+
(worker-queue-pop! root-wq 0 #f))
271+
root-wq
272+
wq)))
272273
(define (->timeout pool)
273274
(add-duration (current-time) (fork-join-pool-keep-alive pool)))
274-
(run-task (worker-queue-pop! (fork-join-pool-worker-queue pool) 0 #f) wq)
275+
(let ((queue (fork-join-pool-worker-queue pool)))
276+
(run-task (worker-queue-pop! queue 0 #f) queue wq))
275277
(fork-join-pool-idle-count-inc! pool)
276278
(cond ((and (worker-queue-wait! wq (->timeout pool))
277279
(not (worker-queue-shutdown? wq)))

sitelib/util/concurrent/notifier.scm

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,13 @@
3434
(rename (notifier <notifier>))
3535
notifier-waiting?
3636
notifier-send-notification!
37-
notifier-wait-notification!)
37+
notifier-wait-notification!
38+
39+
event? make-event
40+
(rename (event <event>))
41+
event-set-event!
42+
event-receive!
43+
)
3844
(import (rnrs)
3945
(srfi :18)
4046
(util concurrent atomic))
@@ -55,9 +61,43 @@
5561

5662
(define (notifier-wait-notification! notifier . maybe-timeout)
5763
(define to (and (not (null? maybe-timeout)) (car maybe-timeout)))
64+
(define lock (notifier-lock notifier))
65+
(mutex-lock! lock)
5866
(atomic-fixnum-inc! (notifier-waiter notifier))
59-
(let ((r (mutex-unlock! (notifier-lock notifier) (notifier-cv notifier) to)))
67+
(let ((r (mutex-unlock! lock (notifier-cv notifier) to)))
6068
(atomic-fixnum-dec! (notifier-waiter notifier))
6169
r))
6270

71+
;; Similar to SetEvent on Win32 API
72+
;; It holds the value until the event is read
73+
(define-record-type event
74+
(fields lock cv (mutable received))
75+
(protocol (lambda (p)
76+
(lambda (:optional (set? #f))
77+
(p (make-mutex "event-lock")
78+
(make-condition-variable)
79+
set?)))))
80+
81+
(define (event-set-event! event)
82+
(define lock (event-lock event))
83+
(mutex-lock! lock)
84+
(event-received-set! event #t)
85+
(condition-variable-broadcast! (event-cv event))
86+
(mutex-unlock! lock))
87+
88+
(define (event-receive! event . maybe-timeout)
89+
(define to (and (not (null? maybe-timeout)) (car maybe-timeout)))
90+
(define lock (event-lock event))
91+
(mutex-lock! lock)
92+
(cond ((event-received event)
93+
(event-received-set! event #f)
94+
(mutex-unlock! lock))
95+
((mutex-unlock! lock (event-cv event) to)
96+
(mutex-lock! lock)
97+
(let ((r (and (event-received event)
98+
(event-received-set! event #f))))
99+
(mutex-unlock! lock)
100+
r))
101+
(else #f)))
102+
63103
)

0 commit comments

Comments
 (0)