Skip to content

Commit

Permalink
Merge pull request #2050 from digitallyinduced/fix-devserver-issues
Browse files Browse the repository at this point in the history
Fix DevServer race conditions
  • Loading branch information
mpscholten authored Mar 5, 2025
2 parents 7e4163b + a556529 commit b5decc2
Show file tree
Hide file tree
Showing 21 changed files with 643 additions and 757 deletions.
37 changes: 20 additions & 17 deletions IHP/Job/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import IHP.Controller.Param
import qualified System.Random as Random
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
import Control.Monad.Trans.Resource

-- | Lock and fetch the next available job. In case no job is available returns Nothing.
--
Expand Down Expand Up @@ -68,13 +69,13 @@ fetchNextJob timeoutInMicroseconds backoffStrategy workerId = do
-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
--
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (PGListener.Subscription, Async.Async ())
watchForJob :: (?modelContext :: ModelContext) => PGListener.PGListener -> Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> ResourceT IO (PGListener.Subscription, ReleaseKey)
watchForJob pgListener tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do
let tableNameBS = cs tableName
withoutQueryLogging (sqlExec (createNotificationTrigger tableNameBS) ())
liftIO $ withoutQueryLogging (sqlExec (createNotificationTrigger tableNameBS) ())

poller <- pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob
subscription <- pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable))
subscription <- liftIO $ pgListener |> PGListener.subscribe (channelName tableNameBS) (const (Concurrent.putMVar onNewJob JobAvailable))

pure (subscription, poller)

Expand All @@ -86,26 +87,28 @@ watchForJob pgListener tableName pollInterval timeoutInMicroseconds backoffStrat
--
-- This function returns a Async. Call 'cancel' on the async to stop polling the database.
--
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> IO (Async.Async ())
pollForJob :: (?modelContext :: ModelContext) => Text -> Int -> Maybe Int -> BackoffStrategy -> Concurrent.MVar JobWorkerProcessMessage -> ResourceT IO ReleaseKey
pollForJob tableName pollInterval timeoutInMicroseconds backoffStrategy onNewJob = do
let query = PG.Query ("SELECT COUNT(*) FROM ? WHERE (((status = ?) OR (status = ? AND " <> retryQuery backoffStrategy <> ")) AND locked_by IS NULL AND run_at <= NOW()) " <> timeoutCondition timeoutInMicroseconds <> " LIMIT 1")
let params = (PG.Identifier tableName, JobStatusNotStarted, JobStatusRetry, backoffStrategy.delayInSeconds, timeoutInMicroseconds)
Async.asyncBound do
forever do
-- We don't log the queries to the console as it's filling up the log entries with noise
count :: Int <- withoutQueryLogging (sqlQueryScalar query params)
let handler = do
forever do
-- We don't log the queries to the console as it's filling up the log entries with noise
count :: Int <- withoutQueryLogging (sqlQueryScalar query params)

-- For every job we send one signal to the job workers
-- This way we use full concurrency when we find multiple jobs
-- that haven't been picked up by the PGListener
forEach [1..count] \_ -> do
Concurrent.putMVar onNewJob JobAvailable
-- For every job we send one signal to the job workers
-- This way we use full concurrency when we find multiple jobs
-- that haven't been picked up by the PGListener
forEach [1..count] \_ -> do
Concurrent.putMVar onNewJob JobAvailable

-- Add up to 2 seconds of jitter to avoid all job queues polling at the same time
jitter <- Random.randomRIO (0, 2000000)
let pollIntervalWithJitter = pollInterval + jitter
-- Add up to 2 seconds of jitter to avoid all job queues polling at the same time
jitter <- Random.randomRIO (0, 2000000)
let pollIntervalWithJitter = pollInterval + jitter

Concurrent.threadDelay pollIntervalWithJitter
Concurrent.threadDelay pollIntervalWithJitter

fst <$> allocate (Async.async handler) Async.cancel

createNotificationTrigger :: ByteString -> PG.Query
createNotificationTrigger tableName = PG.Query $ ""
Expand Down
153 changes: 76 additions & 77 deletions IHP/Job/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener

import Control.Monad.Trans.Resource
import qualified IHP.Log as Log

-- | Used by the RunJobs binary
Expand All @@ -43,47 +43,49 @@ dedicatedProcessMainLoop jobWorkers = do
-- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync
-- could overload the main PGListener connection. In that case we still want jobs to be
-- run independent of the system being very busy.
pgListener <- PGListener.init ?modelContext
stopSignal <- Concurrent.newEmptyMVar
waitForExitSignal <- installSignalHandlers
PGListener.withPGListener ?modelContext \pgListener -> do
stopSignal <- Concurrent.newEmptyMVar

let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }

processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)
runResourceT do
waitForExitSignal <- liftIO installSignalHandlers

waitForExitSignal
let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }

processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)

Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)
liftIO waitForExitSignal

-- Stop subscriptions and poller already
-- This will stop all producers for the queue MVar
forEach processes \JobWorkerProcess { poller, subscription, action } -> do
PGListener.unsubscribe subscription pgListener
Async.cancel poller
Concurrent.putMVar action Stop
liftIO $ Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)

PGListener.stop pgListener
-- Stop subscriptions and poller already
-- This will stop all producers for the queue MVar
liftIO $ forEach processes \JobWorkerProcess { pollerReleaseKey, subscription, action } -> do
PGListener.unsubscribe subscription pgListener
release pollerReleaseKey
Concurrent.putMVar action Stop

-- While waiting for all jobs to complete, we also wait for another exit signal
-- If the user sends two exit signals, we just kill all processes
async do
waitForExitSignal
liftIO $ PGListener.stop pgListener

Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)

forEach processes \JobWorkerProcess { runners } -> do
forEach runners Async.cancel
-- While waiting for all jobs to complete, we also wait for another exit signal
-- If the user sends two exit signals, we just kill all processes
liftIO $ async do
waitForExitSignal

Concurrent.throwTo threadId Exit.ExitSuccess
Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)

forEach processes \JobWorkerProcess { runners } -> do
forEach runners \(releaseKey, _) -> release releaseKey

pure ()
Concurrent.throwTo threadId Exit.ExitSuccess

-- Wait for all runners to complete
forEach processes \JobWorkerProcess { runners } -> do
forEach runners Async.wait
pure ()

Concurrent.throwTo threadId Exit.ExitSuccess
-- Wait for all runners to complete
liftIO $ forEach processes \JobWorkerProcess { runners } -> do
forEach runners \(_, async) -> Async.wait async

liftIO $ Concurrent.throwTo threadId Exit.ExitSuccess

devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop frameworkConfig pgListener jobWorkers = do
Expand All @@ -92,17 +94,16 @@ devServerMainLoop frameworkConfig pgListener jobWorkers = do
let logger = frameworkConfig.logger

Log.info ("Starting worker " <> tshow workerId)

let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }

processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)
runResourceT do
let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }

processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun) -> listenAndRun jobWorkerArgs)

(forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
forEach processes \JobWorkerProcess { poller, subscription, runners, action } -> do
Concurrent.putMVar action Stop
Async.cancel poller
forEach runners Async.cancel
liftIO $ (forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
forEach processes \JobWorkerProcess { action } -> do
Concurrent.putMVar action Stop

-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent
installSignalHandlers :: IO (IO ())
Expand Down Expand Up @@ -157,43 +158,41 @@ jobWorkerFetchAndRunLoop :: forall job.
, CanUpdate job
, Show job
, Table job
) => JobWorkerArgs -> IO JobWorkerProcess
) => JobWorkerArgs -> ResourceT IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do
let ?context = frameworkConfig
let ?modelContext = modelContext

action <- Concurrent.newMVar JobAvailable
runners <- forM [1..(maxConcurrency @job)] \index -> async do
let loop = do
receivedAction <- Concurrent.takeMVar action

case receivedAction of
JobAvailable -> do
maybeJob <- Queue.fetchNextJob @job (timeoutInMicroseconds @job) (backoffStrategy @job) workerId
case maybeJob of
Just job -> do
Log.info ("Starting job: " <> tshow job)

let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.tryAsync (Timeout.timeout timeout (perform job))
case resultOrException of
Left exception -> do
Queue.jobDidFail job exception
when (Exception.isAsyncException exception) (Exception.throwIO exception)
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job

loop
Nothing -> loop
Stop -> do
-- Put the stop signal back in to stop the other runners as well
Concurrent.putMVar action Stop
pure ()

loop

(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (timeoutInMicroseconds @job) (backoffStrategy @job) action


pure JobWorkerProcess { runners, subscription, poller, action }
action <- liftIO $ Concurrent.newMVar JobAvailable
let loop = do
receivedAction <- Concurrent.takeMVar action

case receivedAction of
JobAvailable -> do
maybeJob <- Queue.fetchNextJob @job (timeoutInMicroseconds @job) (backoffStrategy @job) workerId
case maybeJob of
Just job -> do
Log.info ("Starting job: " <> tshow job)

let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.tryAsync (Timeout.timeout timeout (perform job))
case resultOrException of
Left exception -> do
Queue.jobDidFail job exception
when (Exception.isAsyncException exception) (Exception.throwIO exception)
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job

loop
Nothing -> loop
Stop -> do
-- Put the stop signal back in to stop the other runners as well
Concurrent.putMVar action Stop
pure ()

runners <- forM [1..(maxConcurrency @job)] \index -> allocate (async loop) cancel

(subscription, pollerReleaseKey) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (timeoutInMicroseconds @job) (backoffStrategy @job) action


pure JobWorkerProcess { runners, subscription, pollerReleaseKey, action }
7 changes: 4 additions & 3 deletions IHP/Job/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import IHP.Prelude
import IHP.FrameworkConfig
import qualified IHP.PGListener as PGListener
import qualified Control.Concurrent as Concurrent
import Control.Monad.Trans.Resource

data BackoffStrategy
= LinearBackoff { delayInSeconds :: !Int }
Expand Down Expand Up @@ -57,7 +58,7 @@ data JobWorkerArgs = JobWorkerArgs
, pgListener :: PGListener.PGListener
}

newtype JobWorker = JobWorker (JobWorkerArgs -> IO JobWorkerProcess)
newtype JobWorker = JobWorker (JobWorkerArgs -> ResourceT IO JobWorkerProcess)

-- | Mapping for @JOB_STATUS@. The DDL statement for this can be found in IHPSchema.sql:
--
Expand All @@ -73,9 +74,9 @@ data JobStatus

data JobWorkerProcess
= JobWorkerProcess
{ runners :: [Async ()]
{ runners :: [(ReleaseKey, Async ())]
, subscription :: PGListener.Subscription
, poller :: Async ()
, pollerReleaseKey :: ReleaseKey
, action :: Concurrent.MVar JobWorkerProcessMessage
}

Expand Down
5 changes: 5 additions & 0 deletions IHP/PGListener.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module IHP.PGListener
, PGListener (..)
, init
, stop
, withPGListener
, subscribe
, subscribeJSON
, unsubscribe
Expand Down Expand Up @@ -95,6 +96,10 @@ stop :: PGListener -> IO ()
stop PGListener { notifyLoopAsync } = do
cancel notifyLoopAsync

withPGListener :: ModelContext -> (PGListener -> IO a) -> IO a
withPGListener modelContext =
Exception.bracket (init modelContext) stop

-- | After you subscribed to a channel, the provided callback will be called whenever there's a new
-- notification on the channel.
--
Expand Down
4 changes: 2 additions & 2 deletions IHP/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module IHP.Prelude
, module Data.IORef
, module Data.Time.Format
, null
, module Control.Exception
, module Control.Exception.Safe
, module Control.Monad.Fail
, module Control.Concurrent.Async
, module NeatInterpolation
Expand Down Expand Up @@ -72,7 +72,7 @@ import Data.TMap (TMap)
import Database.PostgreSQL.Simple (FromRow)
import Data.IORef
import Data.Time.Format
import Control.Exception (throw, throwIO, catch)
import Control.Exception.Safe (throw, throwIO, catch)
import Control.Monad.Fail (fail)
import Control.Concurrent.Async
import NeatInterpolation (trimming)
Expand Down
7 changes: 3 additions & 4 deletions IHP/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ run configBuilder = do

withFrameworkConfig configBuilder \frameworkConfig -> do
modelContext <- IHP.FrameworkConfig.initModelContext frameworkConfig
let withPGListener = Exception.bracket (PGListener.init modelContext) PGListener.stop

withInitalizers frameworkConfig modelContext do
withPGListener \pgListener -> do
PGListener.withPGListener modelContext \pgListener -> do
autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener)

let ?modelContext = modelContext
Expand All @@ -74,12 +73,12 @@ run configBuilder = do

{-# INLINABLE run #-}

withBackgroundWorkers :: (Job.Worker RootApplication, ?modelContext :: ModelContext) => PGListener.PGListener -> FrameworkConfig -> IO a -> IO a
withBackgroundWorkers :: (Job.Worker RootApplication, ?modelContext :: ModelContext) => PGListener.PGListener -> FrameworkConfig -> IO () -> IO ()
withBackgroundWorkers pgListener frameworkConfig app = do
let jobWorkers = Job.workers RootApplication
let isDevelopment = frameworkConfig.environment == Env.Development
if isDevelopment && not (isEmpty jobWorkers)
then withAsync (Job.devServerMainLoop frameworkConfig pgListener jobWorkers) (const app)
then race_ (Job.devServerMainLoop frameworkConfig pgListener jobWorkers) app
else app

-- | Returns a WAI app that servers files stored in the app's @static/@ directory and IHP's own @static/@ directory
Expand Down
24 changes: 12 additions & 12 deletions IHP/Test/Mocking.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,18 @@ withIHPApp application configBuilder hspecAction = do
withTestDatabase \testDatabase -> do
modelContext <- createModelContext dbPoolIdleTime dbPoolMaxConnections (testDatabase.url) logger

pgListener <- PGListener.init modelContext
autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener)
let sessionVault = Vault.insert sessionVaultKey mempty Vault.empty
let applicationContext = ApplicationContext { modelContext = modelContext, autoRefreshServer, frameworkConfig, pgListener }

let requestContext = RequestContext
{ request = defaultRequest {vault = sessionVault}
, requestBody = FormBody [] []
, respond = const (pure ResponseReceived)
, frameworkConfig = frameworkConfig }

(hspecAction MockContext { .. })
PGListener.withPGListener modelContext \pgListener -> do
autoRefreshServer <- newIORef (AutoRefresh.newAutoRefreshServer pgListener)
let sessionVault = Vault.insert sessionVaultKey mempty Vault.empty
let applicationContext = ApplicationContext { modelContext = modelContext, autoRefreshServer, frameworkConfig, pgListener }

let requestContext = RequestContext
{ request = defaultRequest {vault = sessionVault}
, requestBody = FormBody [] []
, respond = const (pure ResponseReceived)
, frameworkConfig = frameworkConfig }

(hspecAction MockContext { .. })


mockContextNoDatabase :: (InitControllerContext application) => application -> ConfigBuilder -> IO (MockContext application)
Expand Down
Loading

0 comments on commit b5decc2

Please sign in to comment.