Skip to content

Commit

Permalink
Refactor to share more code between socket and chan ledger event hand…
Browse files Browse the repository at this point in the history
…lers
  • Loading branch information
neilmayhew committed Nov 29, 2023
1 parent f343676 commit 845b46f
Showing 1 changed file with 31 additions and 32 deletions.
63 changes: 31 additions & 32 deletions cardano-node/src/Cardano/Node/LedgerEvent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
Expand Down Expand Up @@ -59,6 +60,7 @@ module Cardano.Node.LedgerEvent (

import Cardano.Prelude hiding (All, Sum)

import Control.Arrow ((&&&))
import Control.Monad.Fail (MonadFail(..))
import Cardano.Ledger.Binary (DecCBOR(..), EncCBOR(..), Version,
decodeFull', fromCBOR, serialize', toCBOR)
Expand Down Expand Up @@ -731,8 +733,8 @@ instance DecCBOR AnchoredEvent where
<! From
<! From

serializeAnchoredEvent :: Version -> AnchoredEvent -> ByteString
serializeAnchoredEvent version event =
serializeAnchoredEvent :: (Version, AnchoredEvent) -> ByteString
serializeAnchoredEvent (version, event) =
CBOR.toStrictByteString encoding
where
encoding =
Expand All @@ -744,7 +746,7 @@ serializeAnchoredEvent version event =

deserializeAnchoredEvent
:: LBS.ByteString
-> Either CBOR.DeserialiseFailure (LBS.ByteString, AnchoredEvent)
-> Either CBOR.DeserialiseFailure (LBS.ByteString, (Version, AnchoredEvent))
deserializeAnchoredEvent =
CBOR.deserialiseFromBytes decoder
where
Expand All @@ -753,7 +755,7 @@ deserializeAnchoredEvent =
_ <- CBOR.decodeListLen
version <- fromCBOR
bytes <- CBOR.decodeBytes
either (fail . show) pure (decodeFull' version bytes)
either (fail . show) (pure . (version,)) $ decodeFull' version bytes

-- IO action to read ledger events in binary form
foldEvent
Expand All @@ -769,7 +771,7 @@ foldEvent h st0 fn =
if eof then
pure st
else do
(events, event) <- either (panic . show) pure $ deserializeAnchoredEvent bytes
(events, (_version, event)) <- either (panic . show) pure $ deserializeAnchoredEvent bytes
st' <- fn st event
go st' events

Expand All @@ -779,13 +781,13 @@ withLedgerEventsServerStream
:: PortNumber
-> (StandardLedgerEventHandler -> IO ())
-> IO ()
withLedgerEventsServerStream port handler = do
withLedgerEventsServerStream port action = do
withSocketsDo $ do
bracket open closeSockets go
where
go s = do
h <- socketToHandle s WriteMode
handler $ LedgerEventHandler $ writeLedgerEvents h
action $ mkLedgerEventHandler (writeAnchoredEvent h)

open = do
sock <- socket AF_INET Stream defaultProtocol
Expand All @@ -797,21 +799,15 @@ withLedgerEventsServerStream port handler = do

closeSockets = close

writeLedgerEvents h ph headerHash slotNo blockNo events = do
forM_ events $ \event -> do
case fromAuxLedgerEvent event of
Nothing -> pure ()
Just e -> do
let chainHashToHeaderHash GenesisHash = Origin
chainHashToHeaderHash (BlockHash bh) = At bh
let anchoredEvent = AnchoredEvent (getOneEraHash <$> chainHashToHeaderHash ph) (getOneEraHash headerHash) slotNo blockNo e
catch (BS.hPut h $ serializeAnchoredEvent (eventCodecVersion event) anchoredEvent) $ \case
-- If the client closes the socket, we continue running the node, but ignore the events.
IOError { ioe_type = ResourceVanished } -> do
pure ()
err -> do
print err
throwIO err
writeAnchoredEvent h = handle errHandler . BS.hPut h . serializeAnchoredEvent
where
errHandler = \case
-- If the client closes the socket, we continue running the node, but ignore the events.
IOError { ioe_type = ResourceVanished } -> do
pure ()
err -> do
print err
throwIO err

withLedgerEventsChan
:: (LedgerEventWriter -> LedgerEventReader -> IO a)
Expand All @@ -820,8 +816,8 @@ withLedgerEventsChan action = do
chan <- newTChanIO
action (atomically . writeTChan chan) (atomically $ readTChan chan)

type LedgerEventWriter = AnchoredEvent -> IO ()
type LedgerEventReader = IO AnchoredEvent
type LedgerEventWriter = (Version, AnchoredEvent) -> IO ()
type LedgerEventReader = IO (Version, AnchoredEvent)

mkLedgerEventHandler
:: LedgerEventWriter
Expand All @@ -835,15 +831,18 @@ mkAnchoredEvents
-> SlotNo
-> BlockNo
-> [AuxLedgerEvent (LedgerState StandardCrypto)]
-> [AnchoredEvent]
-> [(Version, AnchoredEvent)]
mkAnchoredEvents prevHash headerHash slotNo blockNo auxEvents =
[ AnchoredEvent
(getOneEraHash <$> chainHashToOriginHash prevHash)
(getOneEraHash headerHash)
slotNo
blockNo
ledgerEvent
| Just ledgerEvent <- map fromAuxLedgerEvent auxEvents
[ (version,
AnchoredEvent
(getOneEraHash <$> chainHashToOriginHash prevHash)
(getOneEraHash headerHash)
slotNo
blockNo
ledgerEvent
)
| (version, Just ledgerEvent)
<- (eventCodecVersion &&& fromAuxLedgerEvent) <$> auxEvents
]
where
chainHashToOriginHash :: ChainHash b -> WithOrigin (HeaderHash b)
Expand Down

0 comments on commit 845b46f

Please sign in to comment.