Skip to content

Commit

Permalink
block-forging: async exception safety
Browse files Browse the repository at this point in the history
When the block forger thread adds a new block, the adding thread might
be killed by an async exception.  If that happens, the block forger will
get 'Nothing' when `blockProcessed` returns, and it can exit.

Co-authored-by: Marcin Szamotulski <coot@coot.me>
  • Loading branch information
bolt12 and coot committed Jun 22, 2023
1 parent 7e56790 commit c171775
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract (ChainDepState,
tickChainDepState)
import Ouroboros.Consensus.Storage.ChainDB.API as ChainDB (ChainDB,
addBlockAsync, blockProcessed, getCurrentChain,
getPastLedger)
Processed (..), addBlockAsync, blockProcessed,
getCurrentChain, getPastLedger)
import qualified Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment as InvalidBlockPunishment
(noPunishment)
import Ouroboros.Consensus.Util.IOLike (atomically)
Expand Down Expand Up @@ -171,9 +171,9 @@ runForge epochSize_ nextSlot opts chainDB blockForging cfg = do
-- Add the block to the chain DB (synchronously) and verify adoption
let noPunish = InvalidBlockPunishment.noPunishment
result <- lift $ ChainDB.addBlockAsync chainDB noPunish newBlock
curTip <- lift $ atomically $ ChainDB.blockProcessed result
mbCurTip <- lift $ atomically $ ChainDB.blockProcessed result

when (curTip /= blockPoint newBlock) $
when (mbCurTip /= Processed (blockPoint newBlock)) $
exitEarly' "block not adopted"

-- | Context required to forge a block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ data TraceForgeEvent blk
-- this warrants a warning.
| TraceDidntAdoptBlock SlotNo blk

-- | We did not adopt the block we produced, because the adoption thread
-- died. Most likely because of an async exception.
| TraceAdoptionThreadDied SlotNo blk

-- | We forged a block that is invalid according to the ledger in the
-- ChainDB. This means there is an inconsistency between the mempool
-- validation and the ledger validation. This is a serious error!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as
import Ouroboros.Consensus.Node.Run
import Ouroboros.Consensus.Node.Tracers
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB,
Processed (..))
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import qualified Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment as InvalidBlockPunishment
import Ouroboros.Consensus.Storage.ChainDB.Init (InitChainDB)
Expand Down Expand Up @@ -429,10 +430,10 @@ forkBlockForging IS{..} blockForging =
uninterruptibleMask_ $ do
result <- lift $ ChainDB.addBlockAsync chainDB noPunish newBlock
-- Block until we have processed the block
curTip <- lift $ atomically $ ChainDB.blockProcessed result
mbCurTip <- lift $ atomically $ ChainDB.blockProcessed result

-- Check whether we adopted our block
when (curTip /= blockPoint newBlock) $ do
when (mbCurTip /= Processed (blockPoint newBlock)) $ do
isInvalid <- lift $ atomically $
($ blockHash newBlock) . forgetFingerprint <$>
ChainDB.getIsInvalidBlock chainDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Ouroboros.Consensus.Storage.ChainDB.API (
, getTipBlockNo
-- * Adding a block
, AddBlockPromise (..)
, Processed (..)
, addBlock
, addBlockWaitWrittenToDisk
, addBlock_
Expand Down Expand Up @@ -405,7 +406,7 @@ data AddBlockPromise m blk = AddBlockPromise
-- NOTE: Even when the result is 'False', 'getIsFetched' might still
-- return 'True', e.g., the block was older than @k@, but it has been
-- downloaded and stored on disk before.
, blockProcessed :: STM m (Point blk)
, blockProcessed :: STM m (Processed (Point blk))
-- ^ Use this 'STM' transaction to wait until the block has been
-- processed: the block has been written to disk and chain selection has
-- been performed for the block, /unless/ the block is from the future.
Expand All @@ -415,6 +416,8 @@ data AddBlockPromise m blk = AddBlockPromise
-- wasn't adopted. We might have adopted a longer chain of which the
-- added block is a part, but not the tip.
--
-- It returns 'Nothing' if the thread adding the block died.
--
-- NOTE: When the block is from the future, chain selection for the
-- block won't be performed until the block is no longer in the future,
-- which might take some time. For that reason, this transaction will
Expand All @@ -423,6 +426,10 @@ data AddBlockPromise m blk = AddBlockPromise
-- disk.
}

data Processed a = Processed a
| FailToProcess String
deriving (Eq, Show)

-- | Add a block synchronously: wait until the block has been written to disk
-- (see 'blockWrittenToDisk').
addBlockWaitWrittenToDisk :: IOLike m => ChainDB m blk -> InvalidBlockPunishment m -> blk -> m Bool
Expand All @@ -432,13 +439,17 @@ addBlockWaitWrittenToDisk chainDB punish blk = do

-- | Add a block synchronously: wait until the block has been processed (see
-- 'blockProcessed'). The new tip of the ChainDB is returned.
addBlock :: IOLike m => ChainDB m blk -> InvalidBlockPunishment m -> blk -> m (Point blk)
--
-- Note: this is a partial function, only to support tests.
addBlock :: IOLike m => ChainDB m blk -> InvalidBlockPunishment m -> blk -> m (Processed (Point blk))
addBlock chainDB punish blk = do
promise <- addBlockAsync chainDB punish blk
atomically $ blockProcessed promise

-- | Add a block synchronously. Variant of 'addBlock' that doesn't return the
-- new tip of the ChainDB.
--
-- Note: this is a partial function, only to support tests.
addBlock_ :: IOLike m => ChainDB m blk -> InvalidBlockPunishment m -> blk -> m ()
addBlock_ = void ..: addBlock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Protocol.Abstract
import Ouroboros.Consensus.Storage.ChainDB.API (BlockComponent (..))
import Ouroboros.Consensus.Storage.ChainDB.API (BlockComponent (..),
Processed (..))
import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
(addBlockSync)
import Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB
Expand Down Expand Up @@ -531,7 +532,14 @@ addBlockRunner
addBlockRunner cdb@CDB{..} = forever $ do
let trace = traceWith cdbTracer . TraceAddBlockEvent
trace $ PoppedBlockFromQueue RisingEdge
blkToAdd <- getBlockToAdd cdbBlocksToAdd
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
blockRealPoint $ blockToAdd blkToAdd
addBlockSync cdb blkToAdd
-- if the `addBlockSync` does not complete because it was killed by an async
-- exception (or it errored), notify the blocked thread
bracketOnError (getBlockToAdd cdbBlocksToAdd)
(\blkToAdd -> atomically $ do
_ <- tryPutTMVar (varBlockProcessed blkToAdd)
(FailToProcess "Failed to add block synchronously")
closeBlocksToAdd cdbBlocksToAdd)
(\blkToAdd -> do
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
blockRealPoint $ blockToAdd blkToAdd
addBlockSync cdb blkToAdd)
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..),
BlockComponent (..), ChainType (..),
InvalidBlockReason (..))
InvalidBlockReason (..), Processed (..))
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
(InvalidBlockPunishment)
import qualified Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment as InvalidBlockPunishment
Expand Down Expand Up @@ -340,7 +340,7 @@ addBlockSync cdb@CDB {..} BlockToAdd { blockToAdd = b, .. } = do
-- 'AddBlockPromise' with the given tip.
deliverProcessed :: Point blk -> m ()
deliverProcessed tip = atomically $
putTMVar varBlockProcessed tip
putTMVar varBlockProcessed (Processed tip)

-- | Return 'True' when the given header should be ignored when adding it
-- because it is too old, i.e., we wouldn't be able to switch to a chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
, BlockToAdd (..)
, BlocksToAdd
, addBlockToAdd
, closeBlocksToAdd
, getBlockToAdd
, newBlocksToAdd
-- * Trace types
Expand All @@ -61,6 +62,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
) where

import Control.Tracer
import Data.Foldable (traverse_)
import Data.Map.Strict (Map)
import Data.Maybe.Strict (StrictMaybe (..))
import Data.Set (Set)
Expand All @@ -78,7 +80,7 @@ import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..),
ChainDbError (..), ChainType, InvalidBlockReason,
StreamFrom, StreamTo, UnknownRange)
Processed (..), StreamFrom, StreamTo, UnknownRange)
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
(InvalidBlockPunishment)
import Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB (LedgerDB',
Expand Down Expand Up @@ -446,7 +448,7 @@ data BlockToAdd m blk = BlockToAdd
, blockToAdd :: !blk
, varBlockWrittenToDisk :: !(StrictTMVar m Bool)
-- ^ Used for the 'blockWrittenToDisk' field of 'AddBlockPromise'.
, varBlockProcessed :: !(StrictTMVar m (Point blk))
, varBlockProcessed :: !(StrictTMVar m (Processed (Point blk)))
-- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'.
}

Expand Down Expand Up @@ -488,6 +490,15 @@ addBlockToAdd tracer (BlocksToAdd queue) punish blk = do
getBlockToAdd :: IOLike m => BlocksToAdd m blk -> m (BlockToAdd m blk)
getBlockToAdd (BlocksToAdd queue) = atomically $ readTBQueue queue

-- | Flush the 'BlocksToAdd' queue and notify the waiting threads.
--
closeBlocksToAdd :: IOLike m => BlocksToAdd m blk -> STM m ()
closeBlocksToAdd (BlocksToAdd queue) = do
as <- flushTBQueue queue
traverse_ (\a -> tryPutTMVar (varBlockProcessed a)
(FailToProcess "Queue flushed"))
as

{-------------------------------------------------------------------------------
Trace types
-------------------------------------------------------------------------------}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ import Ouroboros.Consensus.Protocol.MockChainSel
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..),
BlockComponent (..), ChainDbError (..),
InvalidBlockReason (..), IteratorResult (..),
StreamFrom (..), StreamTo (..), UnknownRange (..),
validBounds)
Processed (..), StreamFrom (..), StreamTo (..),
UnknownRange (..), validBounds)
import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel (olderThanK)
import Ouroboros.Consensus.Storage.LedgerDB
import Ouroboros.Consensus.Util (repeatedly)
Expand Down Expand Up @@ -560,7 +560,7 @@ addBlockPromise cfg blk m = (result, m')
&& Map.member (blockHash blk) (blocks m')
result = AddBlockPromise
{ blockWrittenToDisk = return blockWritten
, blockProcessed = return $ tipPoint m'
, blockProcessed = return $ Processed $ tipPoint m'
}

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,11 @@ run env@ChainDBEnv { varDB, .. } cmd =
advanceAndAdd :: ChainDBState m blk -> SlotNo -> blk -> m (Point blk)
advanceAndAdd ChainDBState { chainDB } newCurSlot blk = do
atomically $ modifyTVar varCurSlot (max newCurSlot)
addBlock chainDB InvalidBlockPunishment.noPunishment blk
-- `blockProcessed` always returns 'Just'
res <- addBlock chainDB InvalidBlockPunishment.noPunishment blk
return $ case res of
FailToProcess f -> error $ "advanceAndAdd: block not added - " ++ f
Processed pt -> pt

wipeVolatileDB :: ChainDBState m blk -> m (Point blk)
wipeVolatileDB st = do
Expand Down

0 comments on commit c171775

Please sign in to comment.