diff --git a/stack-9.4.yaml b/stack-9.4.yaml index aa55bfd2..927cbfd8 100644 --- a/stack-9.4.yaml +++ b/stack-9.4.yaml @@ -71,6 +71,8 @@ flags: development: true tasty-mgolden: development: true + work-pool: + development: true xray: development: true packages: @@ -97,4 +99,5 @@ packages: - source-constraints - stack-deploy - tasty-mgolden +- work-pool - xray diff --git a/stack-9.6.yaml b/stack-9.6.yaml index dcd9528a..790ed340 100644 --- a/stack-9.6.yaml +++ b/stack-9.6.yaml @@ -63,6 +63,8 @@ flags: development: true tasty-mgolden: development: true + work-pool: + development: true xray: development: true packages: @@ -89,4 +91,5 @@ packages: - source-constraints - stack-deploy - tasty-mgolden +- work-pool - xray diff --git a/work-pool/README.md b/work-pool/README.md new file mode 100644 index 00000000..daa6832e --- /dev/null +++ b/work-pool/README.md @@ -0,0 +1,3 @@ +# work-pool + +Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production. diff --git a/work-pool/package.yaml b/work-pool/package.yaml new file mode 100644 index 00000000..f513e9ea --- /dev/null +++ b/work-pool/package.yaml @@ -0,0 +1,25 @@ +_common/package: !include "../common/package.yaml" + +name: work-pool +synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot. +homepage: https://github.com/mbj/mhs#readme +github: mbj/mhs +version: 0.0.1 + +<<: *defaults + +dependencies: +- base +- containers +- mprelude +- text +- unliftio + +tests: + test: + <<: *test + dependencies: + - devtools + - tasty + - tasty-hunit + - work-pool diff --git a/work-pool/src/WorkPool.hs b/work-pool/src/WorkPool.hs new file mode 100644 index 00000000..06897c68 --- /dev/null +++ b/work-pool/src/WorkPool.hs @@ -0,0 +1,119 @@ +module WorkPool + ( Config(..) + , Done + , Pool + , Source + , pushJob + , readJobCount + , runPool + , runSource + ) +where + +import Data.Set (Set) +import MPrelude +import Prelude (succ) + +import qualified Data.Set as Set +import qualified UnliftIO.Async as UnliftIO +import qualified UnliftIO.STM as UnliftIO + +-- | Worker pool configuration +data Config m a = Config + { produceJobs :: Pool a -> m () + -- ^ function called to produce work, use `pushJob` to create workable jobs. + -- once this function returns and all jobs are worked off the workers exit. + , queueSize :: Natural + -- ^ maximum size of the jobs queued + , workerCount :: Natural + -- ^ number of workers to boot + , workerRun :: Natural -> Source a -> m Done + -- ^ function called when a worker is booted, argument is the worker index, + -- and a source to be drained with `runSource` + } + +-- Internal queue event, supplying job or quitting the worker +data Event a = Quit | Job a + +-- | Running pool +newtype Pool a = Pool + { queue :: UnliftIO.TBQueue (Event a) + } + +-- | Source to be drained +newtype Source a = Source + { queue :: UnliftIO.TBQueue (Event a) + } + +-- | Type forcing clients to drain the source via `runSource` +data Done = Done + +-- | Add (dynamically) created a job to the pool +-- +-- This function will block if the max queue size would be overflown. +-- As the workers create space in the queue this function will unblock. +pushJob :: MonadIO m => Pool a -> a -> m () +pushJob Pool{..} item + = UnliftIO.atomically + $ UnliftIO.writeTBQueue queue (Job item) + +-- | Read the number of jobs in the pool +-- +-- This function does not block. Its not guaranteed count is still correct +-- when the function returns +readJobCount :: MonadIO m => Pool a -> m Natural +readJobCount Pool{..} + = UnliftIO.atomically + $ UnliftIO.lengthTBQueue queue + +-- | Drain a source +-- +-- This function: +-- * is executed 0 or many times per worker. +-- * executes the action as long there are items in the queue. +-- * as long the producer did not exit: blocks if there are no items in the queue +-- * if the producer exits and the queue is empty: returns. +-- * is he only way to produce a `Done` value required by the `Config` api. +runSource :: MonadIO m => (a -> m ()) -> Source a -> m Done +runSource action Source{..} = go $> Done + where + go = UnliftIO.atomically (UnliftIO.readTBQueue queue) >>= \case + (Job item) -> action item >> go + Quit -> UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit + +-- | Run worker pool with specified config +-- +-- The function will return if either: +-- * the `produceJobs` function returns +-- * a worker or throws an error +-- * the producer throws an error. +-- +-- Care is taken to not leak threads via the use if `withAsync` from the `async` package. +runPool :: forall a m . MonadUnliftIO m => Config m a -> m () +runPool Config{..} = boot =<< UnliftIO.atomically (UnliftIO.newTBQueue queueSize) + where + boot queue = go 0 [] + where + go index workerHandlers = + if index == workerCount + then + UnliftIO.withAsync + (runProducer queue) + (\producerHandle -> waitAll (Set.fromList (producerHandle:workerHandlers))) + else + UnliftIO.withAsync + (workerRun index Source{..}) + (\async -> go (succ index) (async:workerHandlers)) + + runProducer queue = do + produceJobs Pool{..} + UnliftIO.atomically (UnliftIO.writeTBQueue queue Quit) + pure Done + + waitAll :: Set (UnliftIO.Async Done) -> m () + waitAll remaining = case Set.toList remaining of + [] -> pure () + [handler] -> void $ UnliftIO.wait handler + list -> do + (handler, _result) <- UnliftIO.waitAny list + waitAll $ Set.delete handler remaining diff --git a/work-pool/test/Test.hs b/work-pool/test/Test.hs new file mode 100644 index 00000000..8324131b --- /dev/null +++ b/work-pool/test/Test.hs @@ -0,0 +1,88 @@ +import Control.Arrow (left) +import Control.Monad (when) +import MPrelude +import Test.Tasty +import Test.Tasty.HUnit + +import qualified Data.List as List +import qualified Data.Set as Set +import qualified Data.String as String +import qualified Devtools +import qualified UnliftIO.Concurrent as UnliftIO +import qualified UnliftIO.Exception as UnliftIO +import qualified WorkPool + +main :: IO () +main + = defaultMain + $ testGroup "work-pool" + [ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"]) + , mkSuccess 1 1 + , mkSuccess 1 100 + , mkSuccess 100 1 + , mkSuccess 100 100 + , mkSuccess 1000 1000 + , producerFailure + , workerFailure + ] + where + mkSuccess :: Natural -> Natural -> TestTree + mkSuccess queueSize workerCount = + testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do + output <- UnliftIO.newMVar [] + WorkPool.runPool $ config output + assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output + where + config output = WorkPool.Config{..} + where + workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done + workerRun _index = WorkPool.runSource $ \value -> do + void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ()) + + workerFailure :: TestTree + workerFailure = testCase "worker failure" $ do + result <- UnliftIO.try (WorkPool.runPool config) + assertEqual "" (Left "intentional error\n") (left formatException result) + where + config = WorkPool.Config{queueSize = 100, workerCount = 100, ..} + + workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done + workerRun _index = + WorkPool.runSource $ \value -> + when (value == 100) $ UnliftIO.throwString "intentional error" + + producerFailure :: TestTree + producerFailure = testCase "producer failure" $ do + result <- UnliftIO.try (WorkPool.runPool config) + assertEqual "" (Left "intentional error\n") (left formatException result) + where + config :: WorkPool.Config IO Natural + config + = WorkPool.Config + { produceJobs = produceJobsFailing + , queueSize = 100 + , workerCount = 100 + , .. + } + + produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m () + produceJobsFailing pool = do + WorkPool.pushJob pool 1 + UnliftIO.throwString "intentional error" + + workerRun :: MonadIO m => Natural -> WorkPool.Source Natural -> m WorkPool.Done + workerRun _index = WorkPool.runSource $ const (pure ()) + + formatException :: UnliftIO.SomeException -> String + formatException + = String.unlines + . List.drop 2 + . List.take 3 + . String.lines + . UnliftIO.displayException + + produceJobs :: MonadIO m => WorkPool.Pool Natural -> m () + produceJobs pool = traverse_ (WorkPool.pushJob pool) values + + values :: [Natural] + values = [0..1000] diff --git a/work-pool/test/stack-9.4-dependencies.txt b/work-pool/test/stack-9.4-dependencies.txt new file mode 100644 index 00000000..3fe3641e --- /dev/null +++ b/work-pool/test/stack-9.4-dependencies.txt @@ -0,0 +1,122 @@ +Diff 0.4.1 +OneTuple 0.4.1.1 +QuickCheck 2.14.3 +StateVar 1.2.2 +aeson 2.1.2.1 +alex 3.3.0.0 +ansi-terminal 0.11.5 +ansi-terminal-types 0.11.5 +ansi-wl-pprint 0.6.9 +array 0.5.4.0 +assoc 1.1 +async 2.2.5 +attoparsec 0.14.4 +base 4.17.2.1 +base-compat 0.12.3 +base-compat-batteries 0.12.3 +base-orphans 0.9.1 +bifunctors 5.5.15 +binary 0.8.9.1 +bitvec 1.1.5.0 +bytestring 0.11.5.3 +call-stack 0.4.0 +clock 0.8.4 +cmdargs 0.10.22 +colour 2.3.6 +comonad 5.0.8 +conduit 1.3.5 +containers 0.6.7 +contravariant 1.5.5 +cpphs 1.20.9.1 +data-default 0.7.1.1 +data-default-class 0.1.2.0 +data-default-instances-containers 0.0.1 +data-default-instances-dlist 0.0.1 +data-default-instances-old-locale 0.0.1 +data-fix 0.3.2 +deepseq 1.4.8.0 +deriving-aeson 0.2.9 +devtools 0.2.0 +directory 1.3.7.1 +distributive 0.6.2.1 +dlist 1.0 +exceptions 0.10.5 +extra 1.7.14 +file-embed 0.0.15.0 +filepath 1.4.2.2 +filepattern 0.1.3 +foldable1-classes-compat 0.1 +generically 0.1.1 +ghc 9.4.8 +ghc-bignum 1.3 +ghc-boot 9.4.8 +ghc-boot-th 9.4.8 +ghc-heap 9.4.8 +ghc-lib-parser 9.4.8.20231111 +ghc-lib-parser-ex 9.4.0.0 +ghc-prim 0.9.1 +ghci 9.4.8 +happy 1.20.1.1 +hashable 1.4.3.0 +hlint 3.5 +hpc 0.6.1.0 +hscolour 1.24.4 +indexed-traversable 0.1.3 +indexed-traversable-instances 0.1.1.2 +integer-logarithms 1.0.3.1 +libyaml 0.1.2 +mono-traversable 1.0.15.3 +mprelude 0.2.3 +mtl 2.2.2 +old-locale 1.0.0.7 +optparse-applicative 0.17.1.0 +parsec 3.1.16.1 +polyparse 1.13 +pretty 1.1.3.6 +primitive 0.8.0.0 +process 1.6.18.0 +random 1.2.1.1 +refact 0.3.0.2 +resourcet 1.2.6 +rts 1.0.2 +safe-exceptions 0.1.7.4 +scientific 0.3.7.0 +semialign 1.3 +semigroupoids 5.3.7 +source-constraints 0.0.5 +split 0.2.3.5 +splitmix 0.1.0.5 +stm 2.5.1.0 +strict 0.5 +syb 0.7.2.4 +tagged 0.8.7 +tasty 1.4.3 +tasty-expected-failure 0.12.3 +tasty-hunit 0.10.1 +tasty-mgolden 0.0.2 +template-haskell 2.19.0.0 +terminfo 0.4.1.5 +text 2.0.2 +text-short 0.1.5 +th-abstraction 0.4.5.0 +th-lift 0.8.4 +these 1.2 +time 1.12.2 +time-compat 1.9.6.1 +transformers 0.5.6.2 +transformers-compat 0.7.2 +typed-process 0.2.11.1 +unbounded-delays 0.1.1.1 +uniplate 1.6.13 +unix 2.7.3 +unliftio 0.2.25.0 +unliftio-core 0.2.1.0 +unordered-containers 0.2.19.1 +utf8-string 1.0.2 +uuid-types 1.0.5.1 +vector 0.13.1.0 +vector-algorithms 0.9.0.1 +vector-stream 0.1.0.0 +witherable 0.4.2 +work-pool 0.0.1 +yaml 0.11.11.2 diff --git a/work-pool/test/stack-9.6-dependencies.txt b/work-pool/test/stack-9.6-dependencies.txt new file mode 100644 index 00000000..53c8a4f8 --- /dev/null +++ b/work-pool/test/stack-9.6-dependencies.txt @@ -0,0 +1,123 @@ +Diff 0.4.1 +OneTuple 0.4.1.1 +QuickCheck 2.14.3 +StateVar 1.2.2 +aeson 2.1.2.1 +alex 3.4.0.1 +ansi-terminal 1.0.2 +ansi-terminal-types 0.11.5 +array 0.5.6.0 +assoc 1.1 +async 2.2.5 +attoparsec 0.14.4 +base 4.18.2.1 +base-compat 0.13.1 +base-compat-batteries 0.13.1 +base-orphans 0.9.2 +bifunctors 5.6.2 +binary 0.8.9.1 +bitvec 1.1.5.0 +bytestring 0.11.5.3 +call-stack 0.4.0 +clock 0.8.4 +cmdargs 0.10.22 +colour 2.3.6 +comonad 5.0.8 +conduit 1.3.5 +containers 0.6.7 +contravariant 1.5.5 +cpphs 1.20.9.1 +data-default 0.7.1.1 +data-default-class 0.1.2.0 +data-default-instances-containers 0.0.1 +data-default-instances-dlist 0.0.1 +data-default-instances-old-locale 0.0.1 +data-fix 0.3.2 +deepseq 1.4.8.1 +deriving-aeson 0.2.9 +devtools 0.2.0 +directory 1.3.8.4 +distributive 0.6.2.1 +dlist 1.0 +exceptions 0.10.7 +extra 1.7.14 +file-embed 0.0.16.0 +filepath 1.4.300.1 +filepattern 0.1.3 +generically 0.1.1 +ghc 9.6.5 +ghc-bignum 1.3 +ghc-boot 9.6.5 +ghc-boot-th 9.6.5 +ghc-heap 9.6.5 +ghc-lib-parser 9.6.5.20240423 +ghc-lib-parser-ex 9.6.0.2 +ghc-prim 0.10.0 +ghci 9.6.5 +happy 1.20.1.1 +hashable 1.4.4.0 +hlint 3.6.1 +hpc 0.6.2.0 +hscolour 1.25 +indexed-traversable 0.1.3 +indexed-traversable-instances 0.1.1.2 +integer-logarithms 1.0.3.1 +libyaml 0.1.4 +libyaml-clib 0.2.5 +mono-traversable 1.0.17.0 +mprelude 0.2.3 +mtl 2.3.1 +old-locale 1.0.0.7 +optparse-applicative 0.18.1.0 +os-string 2.0.2.1 +parsec 3.1.16.1 +polyparse 1.13 +pretty 1.1.3.6 +prettyprinter 1.7.1 +prettyprinter-ansi-terminal 1.1.3 +primitive 0.8.0.0 +process 1.6.19.0 +random 1.2.1.2 +refact 0.3.0.2 +resourcet 1.3.0 +rts 1.0.2 +safe-exceptions 0.1.7.4 +scientific 0.3.7.0 +semialign 1.3 +semigroupoids 6.0.1 +source-constraints 0.0.5 +split 0.2.5 +splitmix 0.1.0.5 +stm 2.5.1.0 +strict 0.5 +syb 0.7.2.4 +tagged 0.8.8 +tasty 1.4.3 +tasty-expected-failure 0.12.3 +tasty-hunit 0.10.1 +tasty-mgolden 0.0.2 +template-haskell 2.20.0.0 +text 2.0.2 +text-short 0.1.5 +th-abstraction 0.5.0.0 +th-lift 0.8.4 +these 1.2 +time 1.12.2 +time-compat 1.9.6.1 +transformers 0.6.1.0 +transformers-compat 0.7.2 +typed-process 0.2.11.1 +unbounded-delays 0.1.1.1 +uniplate 1.6.13 +unix 2.8.4.0 +unliftio 0.2.25.0 +unliftio-core 0.2.1.0 +unordered-containers 0.2.20 +utf8-string 1.0.2 +uuid-types 1.0.5.1 +vector 0.13.1.0 +vector-algorithms 0.9.0.1 +vector-stream 0.1.0.1 +witherable 0.4.2 +work-pool 0.0.1 +yaml 0.11.11.2 diff --git a/work-pool/work-pool.cabal b/work-pool/work-pool.cabal new file mode 100644 index 00000000..73a0cf94 --- /dev/null +++ b/work-pool/work-pool.cabal @@ -0,0 +1,132 @@ +cabal-version: 1.12 + +-- This file has been generated from package.yaml by hpack version 0.36.0. +-- +-- see: https://github.com/sol/hpack + +name: work-pool +version: 0.0.1 +synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot. +homepage: https://github.com/mbj/mhs#readme +bug-reports: https://github.com/mbj/mhs/issues +author: Markus Schirp +maintainer: mbj@schirp-dso.com +copyright: 2023 Markus Schirp +license: BSD3 +build-type: Simple +tested-with: + GHC == 9.4 + , GHC == 9.6 + +source-repository head + type: git + location: https://github.com/mbj/mhs + +flag development + description: Run GHC with development flags + manual: True + default: False + +library + exposed-modules: + WorkPool + other-modules: + Paths_work_pool + hs-source-dirs: + src + default-extensions: + ConstraintKinds + DataKinds + DeriveAnyClass + DeriveGeneric + DerivingStrategies + DerivingVia + DuplicateRecordFields + FlexibleContexts + FlexibleInstances + GeneralizedNewtypeDeriving + LambdaCase + MultiParamTypeClasses + NoFieldSelectors + NoImplicitPrelude + NumericUnderscores + OverloadedLists + OverloadedRecordDot + OverloadedStrings + RankNTypes + RecordWildCards + ScopedTypeVariables + Strict + TemplateHaskell + TupleSections + TypeApplications + TypeFamilies + ViewPatterns + ghc-options: -Wall -Wcompat -Werror -Widentities -Wimplicit-prelude -Wincomplete-record-updates -Wincomplete-uni-patterns -Wmissing-deriving-strategies -Wmissing-local-signatures -Wmissing-signatures -Wmonomorphism-restriction -Wno-ambiguous-fields -Wredundant-constraints -fhide-source-paths -funbox-strict-fields -optP-Wno-nonportable-include-path + build-depends: + base + , containers + , mprelude + , text + , unliftio + default-language: Haskell2010 + if flag(development) + ghc-options: -Werror -fplugin=SourceConstraints + build-depends: + source-constraints + else + ghc-options: -Wwarn + +test-suite test + type: exitcode-stdio-1.0 + main-is: Test.hs + other-modules: + Paths_work_pool + hs-source-dirs: + test + default-extensions: + ConstraintKinds + DataKinds + DeriveAnyClass + DeriveGeneric + DerivingStrategies + DerivingVia + DuplicateRecordFields + FlexibleContexts + FlexibleInstances + GeneralizedNewtypeDeriving + LambdaCase + MultiParamTypeClasses + NoFieldSelectors + NoImplicitPrelude + NumericUnderscores + OverloadedLists + OverloadedRecordDot + OverloadedStrings + RankNTypes + RecordWildCards + ScopedTypeVariables + Strict + TemplateHaskell + TupleSections + TypeApplications + TypeFamilies + ViewPatterns + ghc-options: -Wall -Wcompat -Werror -Widentities -Wimplicit-prelude -Wincomplete-record-updates -Wincomplete-uni-patterns -Wmissing-deriving-strategies -Wmissing-local-signatures -Wmissing-signatures -Wmonomorphism-restriction -Wno-ambiguous-fields -Wredundant-constraints -fhide-source-paths -funbox-strict-fields -optP-Wno-nonportable-include-path -rtsopts -threaded -with-rtsopts=-N + build-depends: + base + , containers + , devtools + , mprelude + , tasty + , tasty-hunit + , text + , unliftio + , work-pool + default-language: Haskell2010 + if flag(development) + ghc-options: -Werror -fplugin=SourceConstraints + build-depends: + source-constraints + else + ghc-options: -Wwarn