11{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
22{-# LANGUAGE CPP, DeriveDataTypeable #-}
3+ {-# LANGUAGE BangPatterns #-}
34
45#if __GLASGOW_HASKELL__ >= 701
56{-# LANGUAGE Trustworthy #-}
1718--
1819-- A 'TQueue' is like a 'TChan', with two important differences:
1920--
20- -- * it has faster throughput than both 'TChan' and 'Chan' (although
21- -- the costs are amortised, so the cost of individual operations
22- -- can vary a lot).
21+ -- * it has faster throughput than both 'TChan' and 'Chan'
2322--
2423-- * it does /not/ provide equivalents of the 'dupTChan' and
2524-- 'cloneTChan' operations.
2625--
27- -- The implementation is based on the traditional purely-functional
28- -- queue representation that uses two lists to obtain amortised /O(1)/
29- -- enqueue and dequeue operations .
26+ -- The implementation is based on Okasaki's scheduled banker's queues,
27+ -- but it uses * two* schedules so there's only contention between the
28+ -- reader and writer when the queue needs to be rotated .
3029--
3130-- @since 2.4
3231-----------------------------------------------------------------------------
@@ -44,63 +43,109 @@ module Control.Concurrent.STM.TQueue (
4443 writeTQueue ,
4544 unGetTQueue ,
4645 isEmptyTQueue ,
47- ) where
46+ ) where
4847
4948import GHC.Conc
5049import Control.Monad (unless )
5150import Data.Typeable (Typeable )
5251
52+ data End a =
53+ End [a ] -- list
54+ [a ] -- schedule
55+
5356-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
5457--
5558-- @since 2.4
56- data TQueue a = TQueue {- # UNPACK #-} !(TVar [ a ] )
57- {- # UNPACK #-} !(TVar [ a ] )
59+ data TQueue a = TQueue {- # UNPACK #-} !(TVar ( End a ) )
60+ {- # UNPACK #-} !(TVar ( End a ) )
5861 deriving Typeable
62+ {-
63+ Invariant:
64+
65+ Given front list, rear list, front schedule, and rear schedule called
66+ front, rear, fsched, and rsched, respectively,
67+
68+ 2 * (|front| - |rear|) = |fsched| + |rsched|
69+
70+ Note that because lengths cannot be negative, this implies that
71+
72+ |front| >= |rear|
73+
74+ We rotate the queue when either schedule is empty. This preserves
75+ the invariant and ensures that the spine of the front list is
76+ fully realized when a rotation occurs. The spine of the rear list
77+ is *always* fully realized. We could use a strict-spined list for
78+ the rear, but it doesn't really seem to be worth the trouble.
79+ -}
5980
6081instance Eq (TQueue a ) where
6182 TQueue a _ == TQueue b _ = a == b
6283
63- -- | Build and returns a new instance of 'TQueue'
84+ -- | Build and returns a new instance of 'TQueue'
6485newTQueue :: STM (TQueue a )
6586newTQueue = do
66- read <- newTVar []
67- write <- newTVar []
87+ read <- newTVar ( End [] [] )
88+ write <- newTVar ( End [] [] )
6889 return (TQueue read write)
6990
70- -- | @IO@ version of 'newTQueue'. This is useful for creating top-level
91+ -- | @IO@ version of 'newTQueue'. This is useful for creating top-level
7192-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
7293-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
7394-- possible.
7495newTQueueIO :: IO (TQueue a )
7596newTQueueIO = do
76- read <- newTVarIO []
77- write <- newTVarIO []
97+ read <- newTVarIO ( End [] [] )
98+ write <- newTVarIO ( End [] [] )
7899 return (TQueue read write)
79100
80- -- | Write a value to a 'TQueue'.
101+ -- rotate front end = front ++ reverse rear, but the reverse is performed
102+ -- incrementally as the append proceeds.
103+ --
104+ -- Precondition: |front| + 1 >= |rear|. This ensures that when the front
105+ -- list is empty, the rear list has at most one element, so we don't need
106+ -- to reverse it.
107+ rotate :: [a ] -> [a ] -> [a ]
108+ rotate = go []
109+ where
110+ go acc [] rear = rear ++ acc
111+ go acc (x: xs) (r: rs)
112+ = x : go (r: acc) xs rs
113+ go acc xs [] = xs ++ acc
114+
115+ -- | Write a value to a 'TQueue'.
81116writeTQueue :: TQueue a -> a -> STM ()
82- writeTQueue (TQueue _read write) a = do
83- listend <- readTVar write
84- writeTVar write (a: listend)
85-
86- -- | Read the next value from the 'TQueue'.
117+ writeTQueue (TQueue read write) a = do
118+ End listend rsched <- readTVar write
119+ let listend' = a : listend
120+ case rsched of
121+ -- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2
122+ _: _: rsched' -> writeTVar write (End listend' rsched')
123+
124+ -- Rotate the queue; the invariant holds trivially.
125+ _ -> do
126+ End listfront _fsched <- readTVar read
127+ let ! front' = rotate listfront listend'
128+ writeTVar read (End front' front')
129+ writeTVar write (End [] front')
130+
131+ -- | Read the next value from the 'TQueue'.
87132readTQueue :: TQueue a -> STM a
88133readTQueue (TQueue read write) = do
89- xs <- readTVar read
90- case xs of
91- (x : xs') -> do
92- writeTVar read xs'
93- return x
94- [] -> do
95- ys <- readTVar write
96- case ys of
97- [] -> retry
98- _ -> do
99- let (z : zs) = reverse ys -- NB. lazy: we want the transaction to be
100- -- short, otherwise it will conflict
101- writeTVar write []
102- writeTVar read zs
103- return z
134+ End listfront fsched <- readTVar read
135+ case listfront of
136+ [] -> retry
137+ x : front' ->
138+ case fsched of
139+ -- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2
140+ _ : _ : fsched' -> writeTVar read ( End front' fsched') >> return x
141+
142+ -- Rotate the queue; the invariant holds trivially.
143+ _ -> do
144+ End listend _rsched <- readTVar write
145+ let ! front'' = rotate front' listend
146+ writeTVar read ( End front'' front'')
147+ writeTVar write ( End [] front'')
148+ return x
104149
105150-- | A version of 'readTQueue' which does not retry. Instead it
106151-- returns @Nothing@ if no value is available.
@@ -113,44 +158,38 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
113158-- @since 2.4.5
114159flushTQueue :: TQueue a -> STM [a ]
115160flushTQueue (TQueue read write) = do
116- xs <- readTVar read
117- ys <- readTVar write
118- unless (null xs ) $ writeTVar read []
119- unless (null ys ) $ writeTVar write []
120- return (xs ++ reverse ys )
161+ End front fsched <- readTVar read
162+ End rear rsched <- readTVar write
163+ unless (null front && null fsched ) $ writeTVar read ( End [] [] )
164+ unless (null rear && null rsched ) $ writeTVar write ( End [] [] )
165+ return (rotate front rear )
121166
122167-- | Get the next value from the @TQueue@ without removing it,
123168-- retrying if the channel is empty.
124169peekTQueue :: TQueue a -> STM a
125- peekTQueue c = do
126- x <- readTQueue c
127- unGetTQueue c x
128- return x
170+ peekTQueue (TQueue read _write) = do
171+ End front _fsched <- readTVar read
172+ case front of
173+ x: _ -> return x
174+ [] -> retry
129175
130176-- | A version of 'peekTQueue' which does not retry. Instead it
131177-- returns @Nothing@ if no value is available.
132178tryPeekTQueue :: TQueue a -> STM (Maybe a )
133- tryPeekTQueue c = do
134- m <- tryReadTQueue c
135- case m of
136- Nothing -> return Nothing
137- Just x -> do
138- unGetTQueue c x
139- return m
140-
141- -- | Put a data item back onto a channel, where it will be the next item read.
179+ tryPeekTQueue (TQueue read _write) = do
180+ End front _fsched <- readTVar read
181+ case front of
182+ x: _ -> return (Just x)
183+ [] -> return Nothing
184+
185+ -- | Put a data item back onto a channel, where it will be the next item read.
142186unGetTQueue :: TQueue a -> a -> STM ()
143187unGetTQueue (TQueue read _write) a = do
144- xs <- readTVar read
145- writeTVar read (a : xs )
188+ End front fsched <- readTVar read
189+ writeTVar read (End (a : front) (a : a : fsched) )
146190
147- -- | Returns 'True' if the supplied 'TQueue' is empty.
191+ -- | Returns 'True' if the supplied 'TQueue' is empty.
148192isEmptyTQueue :: TQueue a -> STM Bool
149- isEmptyTQueue (TQueue read write) = do
150- xs <- readTVar read
151- case xs of
152- (_: _) -> return False
153- [] -> do ys <- readTVar write
154- case ys of
155- [] -> return True
156- _ -> return False
193+ isEmptyTQueue (TQueue read _write) = do
194+ End front _fsched <- readTVar read
195+ return $! null front
0 commit comments