From 29d39811c5b023f43168a544dd53d52a53eb29a0 Mon Sep 17 00:00:00 2001 From: Neil Mayhew Date: Tue, 28 Nov 2023 17:23:10 -0700 Subject: [PATCH] Refactor to share more code between socket and chan ledger event handlers --- cardano-node/src/Cardano/Node/LedgerEvent.hs | 63 ++++++++++---------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/cardano-node/src/Cardano/Node/LedgerEvent.hs b/cardano-node/src/Cardano/Node/LedgerEvent.hs index d76255d7c75..cfb5ad6362d 100644 --- a/cardano-node/src/Cardano/Node/LedgerEvent.hs +++ b/cardano-node/src/Cardano/Node/LedgerEvent.hs @@ -11,6 +11,7 @@ {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE UndecidableInstances #-} @@ -59,6 +60,7 @@ module Cardano.Node.LedgerEvent ( import Cardano.Prelude hiding (All, Sum) +import Control.Arrow ((&&&)) import Control.Monad.Fail (MonadFail(..)) import Cardano.Ledger.Binary (DecCBOR(..), EncCBOR(..), Version, decodeFull', fromCBOR, serialize', toCBOR) @@ -731,8 +733,8 @@ instance DecCBOR AnchoredEvent where AnchoredEvent -> ByteString -serializeAnchoredEvent version event = +serializeAnchoredEvent :: (Version, AnchoredEvent) -> ByteString +serializeAnchoredEvent (version, event) = CBOR.toStrictByteString encoding where encoding = @@ -744,7 +746,7 @@ serializeAnchoredEvent version event = deserializeAnchoredEvent :: LBS.ByteString - -> Either CBOR.DeserialiseFailure (LBS.ByteString, AnchoredEvent) + -> Either CBOR.DeserialiseFailure (LBS.ByteString, (Version, AnchoredEvent)) deserializeAnchoredEvent = CBOR.deserialiseFromBytes decoder where @@ -753,7 +755,7 @@ deserializeAnchoredEvent = _ <- CBOR.decodeListLen version <- fromCBOR bytes <- CBOR.decodeBytes - either (fail . show) pure (decodeFull' version bytes) + either (fail . show) (pure . (version,)) $ decodeFull' version bytes -- IO action to read ledger events in binary form foldEvent @@ -769,7 +771,7 @@ foldEvent h st0 fn = if eof then pure st else do - (events, event) <- either (panic . show) pure $ deserializeAnchoredEvent bytes + (events, (_version, event)) <- either (panic . show) pure $ deserializeAnchoredEvent bytes st' <- fn st event go st' events @@ -779,13 +781,13 @@ withLedgerEventsServerStream :: PortNumber -> (StandardLedgerEventHandler -> IO ()) -> IO () -withLedgerEventsServerStream port handler = do +withLedgerEventsServerStream port action = do withSocketsDo $ do bracket open closeSockets go where go s = do h <- socketToHandle s WriteMode - handler $ LedgerEventHandler $ writeLedgerEvents h + action $ mkLedgerEventHandler (writeAnchoredEvent h) open = do sock <- socket AF_INET Stream defaultProtocol @@ -797,21 +799,15 @@ withLedgerEventsServerStream port handler = do closeSockets = close - writeLedgerEvents h ph headerHash slotNo blockNo events = do - forM_ events $ \event -> do - case fromAuxLedgerEvent event of - Nothing -> pure () - Just e -> do - let chainHashToHeaderHash GenesisHash = Origin - chainHashToHeaderHash (BlockHash bh) = At bh - let anchoredEvent = AnchoredEvent (getOneEraHash <$> chainHashToHeaderHash ph) (getOneEraHash headerHash) slotNo blockNo e - catch (BS.hPut h $ serializeAnchoredEvent (eventCodecVersion event) anchoredEvent) $ \case - -- If the client closes the socket, we continue running the node, but ignore the events. - IOError { ioe_type = ResourceVanished } -> do - pure () - err -> do - print err - throwIO err + writeAnchoredEvent h = handle errHandler . BS.hPut h . serializeAnchoredEvent + where + errHandler = \case + -- If the client closes the socket, we continue running the node, but ignore the events. + IOError { ioe_type = ResourceVanished } -> do + pure () + err -> do + print err + throwIO err withLedgerEventsChan :: (LedgerEventWriter -> LedgerEventReader -> IO a) @@ -820,8 +816,8 @@ withLedgerEventsChan action = do chan <- newTChanIO action (atomically . writeTChan chan) (atomically $ readTChan chan) -type LedgerEventWriter = AnchoredEvent -> IO () -type LedgerEventReader = IO AnchoredEvent +type LedgerEventWriter = (Version, AnchoredEvent) -> IO () +type LedgerEventReader = IO (Version, AnchoredEvent) mkLedgerEventHandler :: LedgerEventWriter @@ -835,15 +831,18 @@ mkAnchoredEvents -> SlotNo -> BlockNo -> [AuxLedgerEvent (LedgerState StandardCrypto)] - -> [AnchoredEvent] + -> [(Version, AnchoredEvent)] mkAnchoredEvents prevHash headerHash slotNo blockNo auxEvents = - [ AnchoredEvent - (getOneEraHash <$> chainHashToOriginHash prevHash) - (getOneEraHash headerHash) - slotNo - blockNo - ledgerEvent - | Just ledgerEvent <- map fromAuxLedgerEvent auxEvents + [ (version, + AnchoredEvent + (getOneEraHash <$> chainHashToOriginHash prevHash) + (getOneEraHash headerHash) + slotNo + blockNo + ledgerEvent + ) + | (version, Just ledgerEvent) + <- (eventCodecVersion &&& fromAuxLedgerEvent) <$> auxEvents ] where chainHashToOriginHash :: ChainHash b -> WithOrigin (HeaderHash b)