Skip to content

Commit 88ff8ec

Browse files
committed
Revert "Rewrite TBQueue to use TArray Int (Maybe a)"
This reverts commit 9821578.
1 parent a1e91f4 commit 88ff8ec

File tree

7 files changed

+278
-129
lines changed

7 files changed

+278
-129
lines changed

Control/Concurrent/STM/TBQueue.hs

Lines changed: 142 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -46,92 +46,101 @@ module Control.Concurrent.STM.TBQueue (
4646
capacityTBQueue,
4747
) where
4848

49-
#if !MIN_VERSION_base(4,8,0)
50-
import Control.Applicative (pure)
51-
#endif
52-
import Data.Array.Base
53-
import Data.Maybe (isJust, isNothing)
54-
import Data.Typeable (Typeable)
55-
import GHC.Conc
56-
import Numeric.Natural (Natural)
57-
import Prelude hiding (read)
58-
59-
import Control.Concurrent.STM.TArray
49+
import Control.Monad (unless)
50+
import Data.Typeable (Typeable)
51+
import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse,
52+
readTVar, retry, writeTVar)
53+
import Numeric.Natural (Natural)
54+
import Prelude hiding (read)
6055

6156
-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
6257
--
6358
-- @since 2.4
6459
data TBQueue a
65-
= TBQueue {-# UNPACK #-} !(TVar Int) -- read index
66-
{-# UNPACK #-} !(TVar Int) -- write index
67-
{-# UNPACK #-} !(TArray Int (Maybe a)) -- elements
68-
{-# UNPACK #-} !Int -- initial capacity
60+
= TBQueue {-# UNPACK #-} !(TVar Natural) -- CR: read capacity
61+
{-# UNPACK #-} !(TVar [a]) -- R: elements waiting to be read
62+
{-# UNPACK #-} !(TVar Natural) -- CW: write capacity
63+
{-# UNPACK #-} !(TVar [a]) -- W: elements written (head is most recent)
64+
!(Natural) -- CAP: initial capacity
6965
deriving Typeable
7066

7167
instance Eq (TBQueue a) where
72-
-- each `TBQueue` has its own `TVar`s, so it's sufficient to compare the first one
73-
TBQueue a _ _ _ == TBQueue b _ _ _ = a == b
74-
75-
-- incMod x cap == (x + 1) `mod` cap
76-
incMod :: Int -> Int -> Int
77-
incMod x cap = let y = x + 1 in if y == cap then 0 else y
68+
TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b
7869

79-
-- decMod x cap = (x - 1) `mod` cap
80-
decMod :: Int -> Int -> Int
81-
decMod x cap = if x == 0 then cap - 1 else x - 1
70+
-- Total channel capacity remaining is CR + CW. Reads only need to
71+
-- access CR, writes usually need to access only CW but sometimes need
72+
-- CR. So in the common case we avoid contention between CR and CW.
73+
--
74+
-- - when removing an element from R:
75+
-- CR := CR + 1
76+
--
77+
-- - when adding an element to W:
78+
-- if CW is non-zero
79+
-- then CW := CW - 1
80+
-- then if CR is non-zero
81+
-- then CW := CR - 1; CR := 0
82+
-- else **FULL**
8283

8384
-- | Builds and returns a new instance of 'TBQueue'.
8485
newTBQueue :: Natural -- ^ maximum number of elements the queue can hold
8586
-> STM (TBQueue a)
86-
newTBQueue cap
87-
| cap <= 0 = error "capacity has to be greater than 0"
88-
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
89-
| otherwise = do
90-
rindex <- newTVar 0
91-
windex <- newTVar 0
92-
elements <- newArray (0, cap' - 1) Nothing
93-
pure (TBQueue rindex windex elements cap')
94-
where
95-
cap' = fromIntegral cap
87+
newTBQueue size = do
88+
read <- newTVar []
89+
write <- newTVar []
90+
rsize <- newTVar 0
91+
wsize <- newTVar size
92+
return (TBQueue rsize read wsize write size)
9693

9794
-- | @IO@ version of 'newTBQueue'. This is useful for creating top-level
9895
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
9996
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
10097
-- possible.
10198
newTBQueueIO :: Natural -> IO (TBQueue a)
102-
newTBQueueIO cap
103-
| cap <= 0 = error "capacity has to be greater than 0"
104-
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
105-
| otherwise = do
106-
rindex <- newTVarIO 0
107-
windex <- newTVarIO 0
108-
elements <- newArray (0, cap' - 1) Nothing
109-
pure (TBQueue rindex windex elements cap')
110-
where
111-
cap' = fromIntegral cap
112-
113-
-- | Write a value to a 'TBQueue'; retries if the queue is full.
99+
newTBQueueIO size = do
100+
read <- newTVarIO []
101+
write <- newTVarIO []
102+
rsize <- newTVarIO 0
103+
wsize <- newTVarIO size
104+
return (TBQueue rsize read wsize write size)
105+
106+
-- |Write a value to a 'TBQueue'; blocks if the queue is full.
114107
writeTBQueue :: TBQueue a -> a -> STM ()
115-
writeTBQueue (TBQueue _ windex elements cap) a = do
116-
w <- readTVar windex
117-
ele <- unsafeRead elements w
118-
case ele of
119-
Nothing -> unsafeWrite elements w (Just a)
120-
Just _ -> retry
121-
writeTVar windex $! incMod w cap
122-
123-
-- | Read the next value from the 'TBQueue'; retries if the queue is empty.
108+
writeTBQueue (TBQueue rsize _read wsize write _size) a = do
109+
w <- readTVar wsize
110+
if (w > 0)
111+
then do writeTVar wsize $! w - 1
112+
else do
113+
r <- readTVar rsize
114+
if (r > 0)
115+
then do writeTVar rsize 0
116+
writeTVar wsize $! r - 1
117+
else retry
118+
listend <- readTVar write
119+
writeTVar write (a:listend)
120+
121+
-- |Read the next value from the 'TBQueue'.
124122
readTBQueue :: TBQueue a -> STM a
125-
readTBQueue (TBQueue rindex _ elements cap) = do
126-
r <- readTVar rindex
127-
ele <- unsafeRead elements r
128-
a <- case ele of
129-
Nothing -> retry
130-
Just a -> do
131-
unsafeWrite elements r Nothing
132-
pure a
133-
writeTVar rindex $! incMod r cap
134-
pure a
123+
readTBQueue (TBQueue rsize read _wsize write _size) = do
124+
xs <- readTVar read
125+
r <- readTVar rsize
126+
writeTVar rsize $! r + 1
127+
case xs of
128+
(x:xs') -> do
129+
writeTVar read xs'
130+
return x
131+
[] -> do
132+
ys <- readTVar write
133+
case ys of
134+
[] -> retry
135+
_ -> do
136+
-- NB. lazy: we want the transaction to be
137+
-- short, otherwise it will conflict
138+
let ~(z,zs) = case reverse ys of
139+
z':zs' -> (z',zs')
140+
_ -> error "readTBQueue: impossible"
141+
writeTVar write []
142+
writeTVar read zs
143+
return z
135144

136145
-- | A version of 'readTBQueue' which does not retry. Instead it
137146
-- returns @Nothing@ if no value is available.
@@ -142,89 +151,99 @@ tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` pure Nothing
142151
-- function never retries.
143152
--
144153
-- @since 2.4.5
145-
flushTBQueue :: forall a. TBQueue a -> STM [a]
146-
flushTBQueue (TBQueue _rindex windex elements cap) = do
147-
w <- readTVar windex
148-
go (decMod w cap) []
149-
where
150-
go :: Int -> [a] -> STM [a]
151-
go i acc = do
152-
ele <- unsafeRead elements i
153-
case ele of
154-
Nothing -> pure acc
155-
Just a -> do
156-
unsafeWrite elements i Nothing
157-
go (decMod i cap) (a : acc)
154+
flushTBQueue :: TBQueue a -> STM [a]
155+
flushTBQueue (TBQueue rsize read wsize write size) = do
156+
xs <- readTVar read
157+
ys <- readTVar write
158+
if null xs && null ys
159+
then return []
160+
else do
161+
unless (null xs) $ writeTVar read []
162+
unless (null ys) $ writeTVar write []
163+
writeTVar rsize 0
164+
writeTVar wsize size
165+
return (xs ++ reverse ys)
158166

159167
-- | Get the next value from the @TBQueue@ without removing it,
160-
-- retrying if the queue is empty.
168+
-- retrying if the channel is empty.
161169
peekTBQueue :: TBQueue a -> STM a
162-
peekTBQueue (TBQueue rindex _ elements _) = do
163-
r <- readTVar rindex
164-
ele <- unsafeRead elements r
165-
case ele of
166-
Nothing -> retry
167-
Just a -> pure a
170+
peekTBQueue (TBQueue _ read _ write _) = do
171+
xs <- readTVar read
172+
case xs of
173+
(x:_) -> return x
174+
[] -> do
175+
ys <- readTVar write
176+
case ys of
177+
[] -> retry
178+
_ -> do
179+
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
180+
-- short, otherwise it will conflict
181+
writeTVar write []
182+
writeTVar read (z:zs)
183+
return z
168184

169185
-- | A version of 'peekTBQueue' which does not retry. Instead it
170186
-- returns @Nothing@ if no value is available.
171187
tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
172-
tryPeekTBQueue q = fmap Just (peekTBQueue q) `orElse` pure Nothing
188+
tryPeekTBQueue c = do
189+
m <- tryReadTBQueue c
190+
case m of
191+
Nothing -> return Nothing
192+
Just x -> do
193+
unGetTBQueue c x
194+
return m
173195

174196
-- | Put a data item back onto a channel, where it will be the next item read.
175-
-- Retries if the queue is full.
197+
-- Blocks if the queue is full.
176198
unGetTBQueue :: TBQueue a -> a -> STM ()
177-
unGetTBQueue (TBQueue rindex _ elements cap) a = do
178-
r <- readTVar rindex
179-
ele <- unsafeRead elements r
180-
case ele of
181-
Nothing -> unsafeWrite elements r (Just a)
182-
Just _ -> retry
183-
writeTVar rindex $! decMod r cap
199+
unGetTBQueue (TBQueue rsize read wsize _write _size) a = do
200+
r <- readTVar rsize
201+
if (r > 0)
202+
then do writeTVar rsize $! r - 1
203+
else do
204+
w <- readTVar wsize
205+
if (w > 0)
206+
then writeTVar wsize $! w - 1
207+
else retry
208+
xs <- readTVar read
209+
writeTVar read (a:xs)
184210

185211
-- | Return the length of a 'TBQueue'.
186212
--
187213
-- @since 2.5.0.0
188214
lengthTBQueue :: TBQueue a -> STM Natural
189-
lengthTBQueue (TBQueue rindex windex elements cap) = do
190-
r <- readTVar rindex
191-
w <- readTVar windex
192-
if w == r then do
193-
-- length is 0 or cap
194-
ele <- unsafeRead elements r
195-
case ele of
196-
Nothing -> pure 0
197-
Just _ -> pure $! fromIntegral cap
198-
else do
199-
let len' = w - r
200-
pure $! fromIntegral (if len' < 0 then len' + cap else len')
215+
lengthTBQueue (TBQueue rsize _read wsize _write size) = do
216+
r <- readTVar rsize
217+
w <- readTVar wsize
218+
return $! size - r - w
201219

202220
-- | Returns 'True' if the supplied 'TBQueue' is empty.
203221
isEmptyTBQueue :: TBQueue a -> STM Bool
204-
isEmptyTBQueue (TBQueue rindex windex elements _) = do
205-
r <- readTVar rindex
206-
w <- readTVar windex
207-
if w == r then do
208-
ele <- unsafeRead elements r
209-
pure $! isNothing ele
210-
else
211-
pure False
222+
isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do
223+
xs <- readTVar read
224+
case xs of
225+
(_:_) -> return False
226+
[] -> do ys <- readTVar write
227+
case ys of
228+
[] -> return True
229+
_ -> return False
212230

213231
-- | Returns 'True' if the supplied 'TBQueue' is full.
214232
--
215233
-- @since 2.4.3
216234
isFullTBQueue :: TBQueue a -> STM Bool
217-
isFullTBQueue (TBQueue rindex windex elements _) = do
218-
r <- readTVar rindex
219-
w <- readTVar windex
220-
if w == r then do
221-
ele <- unsafeRead elements r
222-
pure $! isJust ele
223-
else
224-
pure False
235+
isFullTBQueue (TBQueue rsize _read wsize _write _size) = do
236+
w <- readTVar wsize
237+
if (w > 0)
238+
then return False
239+
else do
240+
r <- readTVar rsize
241+
if (r > 0)
242+
then return False
243+
else return True
225244

226245
-- | The maximum number of elements the queue can hold.
227246
--
228247
-- @since TODO
229248
capacityTBQueue :: TBQueue a -> Natural
230-
capacityTBQueue (TBQueue _ _ _ cap) = fromIntegral cap
249+
capacityTBQueue (TBQueue _ _ _ _ cap) = fromIntegral cap

bench/chanbench.hs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
{-# LANGUAGE CPP, RankNTypes #-}
2+
import Control.Concurrent.Async
3+
import Control.Monad
4+
import System.Environment
5+
6+
import Control.Concurrent.Chan
7+
import Control.Concurrent.STM
8+
import Control.Concurrent.STM.TQueue
9+
import Control.Concurrent.STM.TBQueue
10+
11+
-- Using CPP rather than a runtime choice between channel types,
12+
-- because we want the compiler to be able to optimise the calls.
13+
14+
-- #define CHAN
15+
-- #define TCHAN
16+
-- #define TQUEUE
17+
-- #define TBQUEUE
18+
19+
#ifdef CHAN
20+
newc = newChan
21+
readc c = readChan c
22+
writec c x = writeChan c x
23+
#elif defined(TCHAN)
24+
newc = newTChanIO
25+
readc c = atomically $ readTChan c
26+
writec c x = atomically $ writeTChan c x
27+
#elif defined(TQUEUE)
28+
newc = newTQueueIO
29+
readc c = atomically $ readTQueue c
30+
writec c x = atomically $ writeTQueue c x
31+
#elif defined(TBQUEUE)
32+
newc = newTBQueueIO 4096
33+
readc c = atomically $ readTBQueue c
34+
writec c x = atomically $ writeTBQueue c x
35+
#endif
36+
37+
main = do
38+
[stest,sn] <- getArgs -- 2000000 is a good number
39+
let n = read sn :: Int
40+
test = read stest :: Int
41+
runtest n test
42+
43+
runtest :: Int -> Int -> IO ()
44+
runtest n test = do
45+
c <- newc
46+
case test of
47+
0 -> do
48+
a <- async $ replicateM_ n $ writec c (1 :: Int)
49+
b <- async $ replicateM_ n $ readc c
50+
waitBoth a b
51+
return ()
52+
1 -> do
53+
replicateM_ n $ writec c (1 :: Int)
54+
replicateM_ n $ readc c
55+
2 -> do
56+
let n1000 = n `quot` 1000
57+
replicateM_ 1000 $ do
58+
replicateM_ n1000 $ writec c (1 :: Int)
59+
replicateM_ n1000 $ readc c

cabal.project

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
packages:
2-
.
3-
testsuite
4-
bench
1+
packages: . testsuite/
52

63
package testsuite
74
tests: true

hie.yaml

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)