Skip to content

Commit

Permalink
PLT-7979 Stream ledger events to a TChan instead of a socket
Browse files Browse the repository at this point in the history
  • Loading branch information
neilmayhew committed Nov 9, 2023
1 parent 517eae2 commit d987b02
Showing 1 changed file with 31 additions and 55 deletions.
86 changes: 31 additions & 55 deletions cardano-node/src/Cardano/Node/LedgerEvent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module Cardano.Node.LedgerEvent (

-- ** Using Ledger events
, withLedgerEventsServerStream
, foldEvent
, foldLedgerEvents_

-- * Type-level plumbing
, ConvertLedgerEvent (..)
Expand All @@ -54,7 +54,6 @@ module Cardano.Node.LedgerEvent (

import Cardano.Prelude hiding (All, Sum)

import Control.Monad.Fail (MonadFail(..))
import Cardano.Ledger.Binary (DecCBOR(..), EncCBOR(..), Version,
decodeFull', fromCBOR, serialize', toCBOR)
import Cardano.Ledger.Binary.Coders (Decode(..), Encode (..), encode, (!>),
Expand All @@ -80,17 +79,15 @@ import qualified Codec.CBOR.Decoding as CBOR
import qualified Codec.CBOR.Encoding as CBOR
import qualified Codec.CBOR.Read as CBOR
import qualified Codec.CBOR.Write as CBOR
import Control.Concurrent.STM.TChan (TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Monad.Fail (MonadFail(..))
import Control.State.Transition (Event)
import Data.ByteString.Short(ShortByteString)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString as BS
import Data.SOP (All, K (..))
import Data.SOP.Strict (NS(..), hcmap, hcollapse)
import qualified Data.Set as Set
import Data.String (String)
import Network.Socket(PortNumber, defaultProtocol, listen, accept,
bind, close, socket, socketToHandle, withSocketsDo,
SockAddr(..), SocketType(Stream), Family(AF_INET))
import Network.Socket(PortNumber)
import Ouroboros.Consensus.Byron.Ledger.Block (ByronBlock)
import Ouroboros.Consensus.Cardano.Block (AllegraEra, AlonzoEra,
BabbageEra, CardanoEras, ConwayEra, HardForkBlock,
Expand All @@ -105,12 +102,11 @@ import Ouroboros.Consensus.Shelley.Ledger (ShelleyBlock,
ShelleyLedgerEvent (..))
import Ouroboros.Consensus.TypeFamilyWrappers
import Prelude (type (~))
import System.IO(hIsEOF)
import System.IO.Unsafe (unsafePerformIO)
import Cardano.Ledger.Conway.Rules (ConwayNewEpochEvent, ConwayEpochEvent)
import qualified Cardano.Ledger.Conway.Rules as Conway
import qualified Cardano.Ledger.Shelley.API as ShelleyAPI
import Cardano.Ledger.Alonzo.Rules (AlonzoBbodyEvent (ShelleyInAlonzoEvent), AlonzoUtxowEvent (WrappedShelleyEraEvent), AlonzoUtxoEvent (UtxosEvent), AlonzoUtxosEvent)
import GHC.IO.Exception (IOException(IOError, ioe_type), IOErrorType (ResourceVanished))
import Ouroboros.Network.Block (ChainHash(GenesisHash, BlockHash))

type LedgerState crypto =
Expand Down Expand Up @@ -749,58 +745,38 @@ deserializeAnchoredEvent =
bytes <- CBOR.decodeBytes
either (fail . show) pure (decodeFull' version bytes)

ledgerEventsChan :: TChan AnchoredEvent
ledgerEventsChan = unsafePerformIO newBroadcastTChanIO
{-# NOINLINE ledgerEventsChan #-}

-- IO action to read ledger events in binary form
foldEvent
:: Handle
-> a
foldLedgerEvents_
:: a
-> (a -> AnchoredEvent -> IO a)
-> IO a
foldEvent h st0 fn =
LBS.hGetContents h >>= go st0
where
go st bytes = do
eof <- hIsEOF h
if eof then
pure st
else do
(events, event) <- either (panic . show) pure $ deserializeAnchoredEvent bytes
-> IO Void
foldLedgerEvents_ st0 fn = do
chan <- atomically $ dupTChan ledgerEventsChan
let go st = do
event <- atomically $ readTChan chan
st' <- fn st event
go st' events
go st'
go st0

withLedgerEventsServerStream
:: PortNumber
-> (LedgerEventHandler IO (LedgerState StandardCrypto) (HardForkBlock (CardanoEras crypto)) -> IO ())
-> IO ()
withLedgerEventsServerStream port handler = do
withSocketsDo $ do
bracket open closeSockets go
withLedgerEventsServerStream _port handler =
handler $ LedgerEventHandler writeLedgerEvents
where
go s = do
h <- socketToHandle s WriteMode
handler $ LedgerEventHandler $ writeLedgerEvents h

open = do
sock <- socket AF_INET Stream defaultProtocol
bind sock (SockAddrInet port 0)
listen sock 1
putStrLn ("Waiting for client to connect to socket..." :: String)
(clientSock, _) <- accept sock
pure clientSock

closeSockets = close

writeLedgerEvents h ph headerHash slotNo blockNo events = do
forM_ events $ \event -> do
case fromAuxLedgerEvent event of
Nothing -> pure ()
Just e -> do
let chainHashToHeaderHash GenesisHash = Origin
chainHashToHeaderHash (BlockHash bh) = At bh
let anchoredEvent = AnchoredEvent (getOneEraHash <$> chainHashToHeaderHash ph) (getOneEraHash headerHash) slotNo blockNo e
catch (BS.hPut h $ serializeAnchoredEvent (eventCodecVersion event) anchoredEvent) $ \case
-- If the client closes the socket, we continue running the node, but ignore the events.
IOError { ioe_type = ResourceVanished } -> do
pure ()
err -> do
print err
throwIO err
writeLedgerEvents prevHash headerHash slotNo blockNo events = do
forM_ (mapMaybe fromAuxLedgerEvent events) $ \event -> do
let chainHashToHeaderHash GenesisHash = Origin
chainHashToHeaderHash (BlockHash bh) = At bh
atomically $ writeTChan ledgerEventsChan $
AnchoredEvent
(getOneEraHash <$> chainHashToHeaderHash prevHash)
(getOneEraHash headerHash)
slotNo
blockNo
event

0 comments on commit d987b02

Please sign in to comment.