-
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[work-pool] Add work-pool subproject
* This is an extraction from my mutant-manager project.
- Loading branch information
Showing
9 changed files
with
618 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# work-pool | ||
|
||
Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
_common/package: !include "../common/package.yaml" | ||
|
||
name: work-pool | ||
synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot. | ||
homepage: https://github.com/mbj/mhs#readme | ||
github: mbj/mhs | ||
version: 0.0.1 | ||
|
||
<<: *defaults | ||
|
||
dependencies: | ||
- base | ||
- containers | ||
- mprelude | ||
- text | ||
- unliftio | ||
|
||
tests: | ||
test: | ||
<<: *test | ||
dependencies: | ||
- devtools | ||
- tasty | ||
- tasty-hunit | ||
- work-pool |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
module WorkPool | ||
( Config(..) | ||
, Done | ||
, Pool | ||
, Source | ||
, pushJob | ||
, readJobCount | ||
, runPool | ||
, runSource | ||
) | ||
where | ||
|
||
import Data.Set (Set) | ||
import MPrelude | ||
import Prelude (succ) | ||
|
||
import qualified Data.Set as Set | ||
import qualified UnliftIO.Async as UnliftIO | ||
import qualified UnliftIO.STM as UnliftIO | ||
|
||
-- | Worker pool configuration | ||
data Config m a = Config | ||
{ produceJobs :: Pool a -> m () | ||
-- ^ function called to produce work, use `pushJob` to create workable jobs. | ||
-- once this function returns and all jobs are worked off the workers exit. | ||
, queueSize :: Natural | ||
-- ^ maximum size of the jobs queued | ||
, workerCount :: Natural | ||
-- ^ number of workers to boot | ||
, workerRun :: Natural -> Source a -> m Done | ||
-- ^ function called when a worker is booted, argument is the worker index, | ||
-- and a source to be drained with `runSource` | ||
} | ||
|
||
-- Internal queue event, supplying job or quitting the worker | ||
data Event a = Quit | Job a | ||
|
||
-- | Running pool | ||
newtype Pool a = Pool | ||
{ queue :: UnliftIO.TBQueue (Event a) | ||
} | ||
|
||
-- | Source to be drained | ||
newtype Source a = Source | ||
{ queue :: UnliftIO.TBQueue (Event a) | ||
} | ||
|
||
-- | Type forcing clients to drain the source via `runSource` | ||
data Done = Done | ||
|
||
-- | Add (dynamically) created a job to the pool | ||
-- | ||
-- This function will block if the max queue size would be overflown. | ||
-- As the workers create space in the queue this function will unblock. | ||
pushJob :: MonadIO m => Pool a -> a -> m () | ||
pushJob Pool{..} item | ||
= UnliftIO.atomically | ||
$ UnliftIO.writeTBQueue queue (Job item) | ||
|
||
-- | Read the number of jobs in the pool | ||
-- | ||
-- This function does not block. Its not guaranteed count is still correct | ||
-- when the function returns | ||
readJobCount :: MonadIO m => Pool a -> m Natural | ||
readJobCount Pool{..} | ||
= UnliftIO.atomically | ||
$ UnliftIO.lengthTBQueue queue | ||
|
||
-- | Drain a source | ||
-- | ||
-- This function: | ||
-- * is executed 0 or many times per worker. | ||
-- * executes the action as long there are items in the queue. | ||
-- * as long the producer did not exit: blocks if there are no items in the queue | ||
-- * if the producer exits and the queue is empty: returns. | ||
-- * is he only way to produce a `Done` value required by the `Config` api. | ||
runSource :: MonadIO m => (a -> m ()) -> Source a -> m Done | ||
runSource action Source{..} = go $> Done | ||
where | ||
go = UnliftIO.atomically (UnliftIO.readTBQueue queue) >>= \case | ||
(Job item) -> action item >> go | ||
Quit -> UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit | ||
|
||
-- | Run worker pool with specified config | ||
-- | ||
-- The function will return if either: | ||
-- * the `produceJobs` function returns | ||
-- * a worker or throws an error | ||
-- * the producer throws an error. | ||
-- | ||
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package. | ||
runPool :: forall a m . MonadUnliftIO m => Config m a -> m () | ||
runPool Config{..} = boot =<< UnliftIO.atomically (UnliftIO.newTBQueue queueSize) | ||
where | ||
boot queue = go 0 [] | ||
where | ||
go index workerHandlers = | ||
if index == workerCount | ||
then | ||
UnliftIO.withAsync | ||
(runProducer queue) | ||
(\producerHandle -> waitAll (Set.fromList (producerHandle:workerHandlers))) | ||
else | ||
UnliftIO.withAsync | ||
(workerRun index Source{..}) | ||
(\async -> go (succ index) (async:workerHandlers)) | ||
|
||
runProducer queue = do | ||
produceJobs Pool{..} | ||
UnliftIO.atomically (UnliftIO.writeTBQueue queue Quit) | ||
pure Done | ||
|
||
waitAll :: Set (UnliftIO.Async Done) -> m () | ||
waitAll remaining = case Set.toList remaining of | ||
[] -> pure () | ||
[handler] -> void $ UnliftIO.wait handler | ||
list -> do | ||
(handler, _result) <- UnliftIO.waitAny list | ||
waitAll $ Set.delete handler remaining |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import Control.Arrow (left) | ||
import Control.Monad (when) | ||
import MPrelude | ||
import Test.Tasty | ||
import Test.Tasty.HUnit | ||
|
||
import qualified Data.List as List | ||
import qualified Data.Set as Set | ||
import qualified Data.String as String | ||
import qualified Devtools | ||
import qualified UnliftIO.Concurrent as UnliftIO | ||
import qualified UnliftIO.Exception as UnliftIO | ||
import qualified WorkPool | ||
|
||
main :: IO () | ||
main | ||
= defaultMain | ||
$ testGroup "work-pool" | ||
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"]) | ||
, mkSuccess 1 1 | ||
, mkSuccess 1 100 | ||
, mkSuccess 100 1 | ||
, mkSuccess 100 100 | ||
, mkSuccess 1000 1000 | ||
, producerFailure | ||
, workerFailure | ||
] | ||
where | ||
mkSuccess :: Natural -> Natural -> TestTree | ||
mkSuccess queueSize workerCount = | ||
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do | ||
output <- UnliftIO.newMVar [] | ||
WorkPool.runPool $ config output | ||
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output | ||
where | ||
config output = WorkPool.Config{..} | ||
where | ||
workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done | ||
workerRun _index = WorkPool.runSource $ \value -> do | ||
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ()) | ||
|
||
workerFailure :: TestTree | ||
workerFailure = testCase "worker failure" $ do | ||
result <- UnliftIO.try (WorkPool.runPool config) | ||
assertEqual "" (Left "intentional error\n") (left formatException result) | ||
where | ||
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..} | ||
|
||
workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done | ||
workerRun _index = | ||
WorkPool.runSource $ \value -> | ||
when (value == 100) $ UnliftIO.throwString "intentional error" | ||
|
||
producerFailure :: TestTree | ||
producerFailure = testCase "producer failure" $ do | ||
result <- UnliftIO.try (WorkPool.runPool config) | ||
assertEqual "" (Left "intentional error\n") (left formatException result) | ||
where | ||
config :: WorkPool.Config IO Natural | ||
config | ||
= WorkPool.Config | ||
{ produceJobs = produceJobsFailing | ||
, queueSize = 100 | ||
, workerCount = 100 | ||
, .. | ||
} | ||
|
||
produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m () | ||
produceJobsFailing pool = do | ||
WorkPool.pushJob pool 1 | ||
UnliftIO.throwString "intentional error" | ||
|
||
workerRun :: MonadIO m => Natural -> WorkPool.Source Natural -> m WorkPool.Done | ||
workerRun _index = WorkPool.runSource $ const (pure ()) | ||
|
||
formatException :: UnliftIO.SomeException -> String | ||
formatException | ||
= String.unlines | ||
. List.drop 2 | ||
. List.take 3 | ||
. String.lines | ||
. UnliftIO.displayException | ||
|
||
produceJobs :: MonadIO m => WorkPool.Pool Natural -> m () | ||
produceJobs pool = traverse_ (WorkPool.pushJob pool) values | ||
|
||
values :: [Natural] | ||
values = [0..1000] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
Diff 0.4.1 | ||
OneTuple 0.4.1.1 | ||
QuickCheck 2.14.3 | ||
StateVar 1.2.2 | ||
aeson 2.1.2.1 | ||
alex 3.3.0.0 | ||
ansi-terminal 0.11.5 | ||
ansi-terminal-types 0.11.5 | ||
ansi-wl-pprint 0.6.9 | ||
array 0.5.4.0 | ||
assoc 1.1 | ||
async 2.2.5 | ||
attoparsec 0.14.4 | ||
base 4.17.2.1 | ||
base-compat 0.12.3 | ||
base-compat-batteries 0.12.3 | ||
base-orphans 0.9.1 | ||
bifunctors 5.5.15 | ||
binary 0.8.9.1 | ||
bitvec 1.1.5.0 | ||
bytestring 0.11.5.3 | ||
call-stack 0.4.0 | ||
clock 0.8.4 | ||
cmdargs 0.10.22 | ||
colour 2.3.6 | ||
comonad 5.0.8 | ||
conduit 1.3.5 | ||
containers 0.6.7 | ||
contravariant 1.5.5 | ||
cpphs 1.20.9.1 | ||
data-default 0.7.1.1 | ||
data-default-class 0.1.2.0 | ||
data-default-instances-containers 0.0.1 | ||
data-default-instances-dlist 0.0.1 | ||
data-default-instances-old-locale 0.0.1 | ||
data-fix 0.3.2 | ||
deepseq 1.4.8.0 | ||
deriving-aeson 0.2.9 | ||
devtools 0.2.0 | ||
directory 1.3.7.1 | ||
distributive 0.6.2.1 | ||
dlist 1.0 | ||
exceptions 0.10.5 | ||
extra 1.7.14 | ||
file-embed 0.0.15.0 | ||
filepath 1.4.2.2 | ||
filepattern 0.1.3 | ||
foldable1-classes-compat 0.1 | ||
generically 0.1.1 | ||
ghc 9.4.8 | ||
ghc-bignum 1.3 | ||
ghc-boot 9.4.8 | ||
ghc-boot-th 9.4.8 | ||
ghc-heap 9.4.8 | ||
ghc-lib-parser 9.4.8.20231111 | ||
ghc-lib-parser-ex 9.4.0.0 | ||
ghc-prim 0.9.1 | ||
ghci 9.4.8 | ||
happy 1.20.1.1 | ||
hashable 1.4.3.0 | ||
hlint 3.5 | ||
hpc 0.6.1.0 | ||
hscolour 1.24.4 | ||
indexed-traversable 0.1.3 | ||
indexed-traversable-instances 0.1.1.2 | ||
integer-logarithms 1.0.3.1 | ||
libyaml 0.1.2 | ||
mono-traversable 1.0.15.3 | ||
mprelude 0.2.3 | ||
mtl 2.2.2 | ||
old-locale 1.0.0.7 | ||
optparse-applicative 0.17.1.0 | ||
parsec 3.1.16.1 | ||
polyparse 1.13 | ||
pretty 1.1.3.6 | ||
primitive 0.8.0.0 | ||
process 1.6.18.0 | ||
random 1.2.1.1 | ||
refact 0.3.0.2 | ||
resourcet 1.2.6 | ||
rts 1.0.2 | ||
safe-exceptions 0.1.7.4 | ||
scientific 0.3.7.0 | ||
semialign 1.3 | ||
semigroupoids 5.3.7 | ||
source-constraints 0.0.5 | ||
split 0.2.3.5 | ||
splitmix 0.1.0.5 | ||
stm 2.5.1.0 | ||
strict 0.5 | ||
syb 0.7.2.4 | ||
tagged 0.8.7 | ||
tasty 1.4.3 | ||
tasty-expected-failure 0.12.3 | ||
tasty-hunit 0.10.1 | ||
tasty-mgolden 0.0.2 | ||
template-haskell 2.19.0.0 | ||
terminfo 0.4.1.5 | ||
text 2.0.2 | ||
text-short 0.1.5 | ||
th-abstraction 0.4.5.0 | ||
th-lift 0.8.4 | ||
these 1.2 | ||
time 1.12.2 | ||
time-compat 1.9.6.1 | ||
transformers 0.5.6.2 | ||
transformers-compat 0.7.2 | ||
typed-process 0.2.11.1 | ||
unbounded-delays 0.1.1.1 | ||
uniplate 1.6.13 | ||
unix 2.7.3 | ||
unliftio 0.2.25.0 | ||
unliftio-core 0.2.1.0 | ||
unordered-containers 0.2.19.1 | ||
utf8-string 1.0.2 | ||
uuid-types 1.0.5.1 | ||
vector 0.13.1.0 | ||
vector-algorithms 0.9.0.1 | ||
vector-stream 0.1.0.0 | ||
witherable 0.4.2 | ||
work-pool 0.0.1 | ||
yaml 0.11.11.2 |
Oops, something went wrong.