Skip to content

Commit

Permalink
Aggregate ledger events into groups
Browse files Browse the repository at this point in the history
We group them by protocol version because the version has to be sent
out-of-band from the encoding of the events themselves
  • Loading branch information
neilmayhew committed Dec 1, 2023
1 parent 29d3981 commit 9fcc0b1
Showing 1 changed file with 43 additions and 41 deletions.
84 changes: 43 additions & 41 deletions cardano-node/src/Cardano/Node/LedgerEvent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
Expand All @@ -21,12 +20,11 @@
-- Shamelessly stolen and adapted from db-sync.
module Cardano.Node.LedgerEvent (
-- * Ledger Events
AnchoredEvent (..)
AnchoredEvents (..)
, LedgerEvent (..)
, LedgerNewEpochEvent (..)
, LedgerRewardUpdateEvent (..)
, deserializeAnchoredEvent
, serializeAnchoredEvent
, Versioned (..)
, ledgerEventName

-- ** Using Ledger events
Expand All @@ -35,8 +33,8 @@ module Cardano.Node.LedgerEvent (
, foldEvent
, withLedgerEventsChan
, mkLedgerEventHandler
, LedgerEventWriter
, LedgerEventReader
, LedgerEventsWriter
, LedgerEventsReader

-- * Type-level plumbing
, ConvertLedgerEvent (..)
Expand Down Expand Up @@ -92,6 +90,7 @@ import Control.State.Transition (Event)
import Data.ByteString.Short(ShortByteString)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString as BS
import qualified Data.List.NonEmpty as NE
import Data.SOP (All, K (..))
import Data.SOP.Strict (NS(..), hcmap, hcollapse)
import qualified Data.Set as Set
Expand Down Expand Up @@ -705,63 +704,66 @@ eventCodecVersion = \case
OneEraLedgerEvent ( S(S(S(S(S(Z{}))))) ) -> eraProtVerLow @(BabbageEra crypto)

Check warning on line 704 in cardano-node/src/Cardano/Node/LedgerEvent.hs

View workflow job for this annotation

GitHub Actions / build

Warning in eventCodecVersion in module Cardano.Node.LedgerEvent: Redundant bracket ▫︎ Found: "(Z {})" ▫︎ Perhaps: "Z {}"
OneEraLedgerEvent (S(S(S(S(S(S(Z{}))))))) -> eraProtVerLow @(ConwayEra crypto)

Check warning on line 705 in cardano-node/src/Cardano/Node/LedgerEvent.hs

View workflow job for this annotation

GitHub Actions / build

Warning in eventCodecVersion in module Cardano.Node.LedgerEvent: Redundant bracket ▫︎ Found: "(Z {})" ▫︎ Perhaps: "Z {}"

data AnchoredEvent =
AnchoredEvent
data AnchoredEvents =
AnchoredEvents
{ prevBlockHeaderHash :: !(WithOrigin ShortByteString)
, blockHeaderHash :: !ShortByteString
, slotNo :: !SlotNo
, blockNo :: !BlockNo
, ledgerEvent :: !(LedgerEvent StandardCrypto)
, ledgerEvents :: !(NonEmpty (LedgerEvent StandardCrypto))
}
deriving (Eq, Show)

instance EncCBOR AnchoredEvent where
encCBOR AnchoredEvent{prevBlockHeaderHash, blockHeaderHash , slotNo, blockNo, ledgerEvent} =
encode $ Rec AnchoredEvent
instance EncCBOR AnchoredEvents where
encCBOR AnchoredEvents{prevBlockHeaderHash, blockHeaderHash , slotNo, blockNo, ledgerEvents} =
encode $ Rec AnchoredEvents
!> To prevBlockHeaderHash
!> To blockHeaderHash
!> To slotNo
!> To blockNo
!> To ledgerEvent
!> To ledgerEvents

instance DecCBOR AnchoredEvent where
instance DecCBOR AnchoredEvents where
decCBOR =
decode $ RecD AnchoredEvent
decode $ RecD AnchoredEvents
<! From
<! From
<! From
<! From
<! From

serializeAnchoredEvent :: (Version, AnchoredEvent) -> ByteString
serializeAnchoredEvent (version, event) =
data Versioned a = Versioned Version a

serializeVersioned :: EncCBOR a => Versioned a -> ByteString
serializeVersioned (Versioned version x) =
CBOR.toStrictByteString encoding
where
encoding =
CBOR.encodeListLen 2
<>
toCBOR version
<>
CBOR.encodeBytes (serialize' version event)
CBOR.encodeBytes (serialize' version x)

deserializeAnchoredEvent
:: LBS.ByteString
-> Either CBOR.DeserialiseFailure (LBS.ByteString, (Version, AnchoredEvent))
deserializeAnchoredEvent =
deserializeVersioned
:: DecCBOR a
=> LBS.ByteString
-> Either CBOR.DeserialiseFailure (LBS.ByteString, Versioned a)
deserializeVersioned =
CBOR.deserialiseFromBytes decoder
where
decoder = do
-- TODO: ensure map len is 2
_ <- CBOR.decodeListLen
version <- fromCBOR
bytes <- CBOR.decodeBytes
either (fail . show) (pure . (version,)) $ decodeFull' version bytes
either (fail . show) (pure . Versioned version) $ decodeFull' version bytes

-- IO action to read ledger events in binary form
foldEvent
:: Handle
-> a
-> (a -> AnchoredEvent -> IO a)
-> (a -> AnchoredEvents -> IO a)
-> IO a
foldEvent h st0 fn =
LBS.hGetContents h >>= go st0
Expand All @@ -771,7 +773,7 @@ foldEvent h st0 fn =
if eof then
pure st
else do
(events, (_version, event)) <- either (panic . show) pure $ deserializeAnchoredEvent bytes
(events, Versioned _ event) <- either (panic . show) pure $ deserializeVersioned bytes
st' <- fn st event
go st' events

Expand All @@ -787,7 +789,7 @@ withLedgerEventsServerStream port action = do
where
go s = do
h <- socketToHandle s WriteMode
action $ mkLedgerEventHandler (writeAnchoredEvent h)
action $ mkLedgerEventHandler (writeAnchoredEvents h)

open = do
sock <- socket AF_INET Stream defaultProtocol
Expand All @@ -799,7 +801,7 @@ withLedgerEventsServerStream port action = do

closeSockets = close

writeAnchoredEvent h = handle errHandler . BS.hPut h . serializeAnchoredEvent
writeAnchoredEvents h = handle errHandler . BS.hPut h . serializeVersioned
where
errHandler = \case
-- If the client closes the socket, we continue running the node, but ignore the events.
Expand All @@ -810,41 +812,41 @@ withLedgerEventsServerStream port action = do
throwIO err

withLedgerEventsChan
:: (LedgerEventWriter -> LedgerEventReader -> IO a)
:: (LedgerEventsWriter -> LedgerEventsReader -> IO a)
-> IO a
withLedgerEventsChan action = do
chan <- newTChanIO
action (atomically . writeTChan chan) (atomically $ readTChan chan)

type LedgerEventWriter = (Version, AnchoredEvent) -> IO ()
type LedgerEventReader = IO (Version, AnchoredEvent)
type LedgerEventsWriter = Versioned AnchoredEvents -> IO ()
type LedgerEventsReader = IO (Versioned AnchoredEvents)

mkLedgerEventHandler
:: LedgerEventWriter
:: LedgerEventsWriter
-> StandardLedgerEventHandler
mkLedgerEventHandler writer =
LedgerEventHandler $ \p h s b -> traverse_ writer . mkAnchoredEvents p h s b
LedgerEventHandler $ \p h s b -> traverse_ writer . mkVersionedAnchoredEvents p h s b

mkAnchoredEvents
mkVersionedAnchoredEvents
:: ChainHash (HardForkBlock (CardanoEras StandardCrypto))
-> HeaderHash (HardForkBlock (CardanoEras StandardCrypto))
-> SlotNo
-> BlockNo
-> [AuxLedgerEvent (LedgerState StandardCrypto)]
-> [(Version, AnchoredEvent)]
mkAnchoredEvents prevHash headerHash slotNo blockNo auxEvents =
[ (version,
AnchoredEvent
-> [Versioned AnchoredEvents]
mkVersionedAnchoredEvents prevHash headerHash slotNo blockNo auxEvents =
[ Versioned version $
AnchoredEvents
(getOneEraHash <$> chainHashToOriginHash prevHash)
(getOneEraHash headerHash)
slotNo
blockNo
ledgerEvent
)
| (version, Just ledgerEvent)
<- (eventCodecVersion &&& fromAuxLedgerEvent) <$> auxEvents
ledgerEvents
| (version, ledgerEvents) <- versionedGroups
]
where
chainHashToOriginHash :: ChainHash b -> WithOrigin (HeaderHash b)
chainHashToOriginHash GenesisHash = Origin
chainHashToOriginHash (BlockHash bh) = At bh
versionedEvents = mapMaybe (sequence . (eventCodecVersion &&& fromAuxLedgerEvent)) auxEvents
versionedGroups = map (first NE.head . NE.unzip) . NE.groupBy ((==) `on` fst) $ versionedEvents

0 comments on commit 9fcc0b1

Please sign in to comment.