From 8c05a7a2df95888d58df511d5d1fe8b92f98222f Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Wed, 11 Sep 2024 16:58:00 +0200 Subject: [PATCH 1/4] Refactor NetworkCallback to be a newtype This gives the callback function the name 'deliver' to make it more consistent with distributed programming literature, e.g. Introduction to Reliable and Secure Distributed Programming, by Cachin et al. --- hydra-node/src/Hydra/Network.hs | 17 +- hydra-node/src/Hydra/Network/Authenticate.hs | 8 +- hydra-node/src/Hydra/Network/Heartbeat.hs | 19 +- hydra-node/src/Hydra/Network/Ouroboros.hs | 401 +++++++++--------- hydra-node/src/Hydra/Network/Reliability.hs | 82 ++-- hydra-node/src/Hydra/Node.hs | 7 +- hydra-node/src/Hydra/Node/Network.hs | 16 +- .../test/Hydra/Network/AuthenticateSpec.hs | 25 +- .../test/Hydra/Network/HeartbeatSpec.hs | 22 +- .../test/Hydra/Network/ReliabilitySpec.hs | 44 +- hydra-node/test/Hydra/NetworkSpec.hs | 22 +- 11 files changed, 352 insertions(+), 311 deletions(-) diff --git a/hydra-node/src/Hydra/Network.hs b/hydra-node/src/Hydra/Network.hs index fdf240ec002..66240440999 100644 --- a/hydra-node/src/Hydra/Network.hs +++ b/hydra-node/src/Hydra/Network.hs @@ -11,8 +11,8 @@ module Hydra.Network ( -- * Types Network (..), + NetworkCallback (..), NetworkComponent, - NetworkCallback, IP, Host (..), NodeId (..), @@ -41,17 +41,24 @@ deriving anyclass instance FromJSON IP -- * Hydra network interface --- | Handle to interface with the hydra network and send messages "off chain". +-- | Interface from the application to the network layer. newtype Network m msg = Network { broadcast :: msg -> m () - -- ^ Send a `msg` to the whole hydra network. + -- ^ Send a `msg` to the whole configured hydra network including ourselves. } instance Contravariant (Network m) where contramap f (Network bcast) = Network $ \msg -> bcast (f msg) --- | Handle to interface for inbound messages. -type NetworkCallback msg m = msg -> m () +-- | Interface from network layer to the application. +-- XXX: Reliably delivering a message in the crash-recovery fault model is +-- tricky. According to "Introduction to Reliable and Secure Distributed +-- Programming" section "2.2.4 Crashes with recoveries" explains that storing to +-- stable storage and just pointing to stored events is a better way. +newtype NetworkCallback msg m = NetworkCallback + { deliver :: msg -> m () + -- ^ The given `msg` was received from the network. + } -- | A type tying both inbound and outbound messages sending in a single /Component/. -- diff --git a/hydra-node/src/Hydra/Network/Authenticate.hs b/hydra-node/src/Hydra/Network/Authenticate.hs index 091ed064186..f65c84fbfe9 100644 --- a/hydra-node/src/Hydra/Network/Authenticate.hs +++ b/hydra-node/src/Hydra/Network/Authenticate.hs @@ -11,7 +11,7 @@ import Control.Tracer (Tracer) import Data.Aeson (Options (tagSingleConstructors), defaultOptions, genericToJSON) import Data.Aeson qualified as Aeson import Hydra.Logging (traceWith) -import Hydra.Network (Network (Network, broadcast), NetworkComponent) +import Hydra.Network (Network (Network, broadcast), NetworkCallback (..), NetworkComponent) import Hydra.Prelude import Hydra.Tx (Party (Party, vkey), deriveParty) import Hydra.Tx.Crypto (HydraKey, Key (SigningKey), Signature, sign, verify) @@ -64,12 +64,12 @@ withAuthentication :: NetworkComponent m (Signed inbound) (Signed outbound) a -> -- The node internal authenticated network. NetworkComponent m (Authenticated inbound) outbound a -withAuthentication tracer signingKey parties withRawNetwork callback action = do - withRawNetwork checkSignature authenticate +withAuthentication tracer signingKey parties withRawNetwork NetworkCallback{deliver} action = do + withRawNetwork NetworkCallback{deliver = checkSignature} authenticate where checkSignature (Signed msg sig party@Party{vkey = partyVkey}) = if verify partyVkey sig msg && elem party parties - then callback $ Authenticated msg party + then deliver $ Authenticated msg party else traceWith tracer (mkAuthLog msg sig party) me = deriveParty signingKey diff --git a/hydra-node/src/Hydra/Network/Heartbeat.hs b/hydra-node/src/Hydra/Network/Heartbeat.hs index 3cd2d18f549..7a3cdb1e315 100644 --- a/hydra-node/src/Hydra/Network/Heartbeat.hs +++ b/hydra-node/src/Hydra/Network/Heartbeat.hs @@ -24,7 +24,7 @@ import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation)) import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO, writeTVar) import Data.Map qualified as Map import Data.Set qualified as Set -import Hydra.Network (Network (..), NetworkCallback, NetworkComponent, NodeId) +import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent, NodeId) import Hydra.Network.Message (Connectivity (Connected, Disconnected)) data HeartbeatState = HeartbeatState @@ -100,21 +100,28 @@ withHeartbeat nodeId withNetwork callback action = do withAsync (checkHeartbeatState nodeId lastSent network) $ \_ -> action (updateStateFromOutgoingMessages nodeId lastSent network) where - onConnectivityChanged = callback . Left + NetworkCallback{deliver} = callback + + onConnectivityChanged = deliver . Left updateStateFromIncomingMessages :: (MonadSTM m, MonadMonotonicTime m) => TVar m HeartbeatState -> NetworkCallback (Either Connectivity inbound) m -> NetworkCallback (Heartbeat inbound) m -updateStateFromIncomingMessages heartbeatState callback = \case - Data nodeId msg -> notifyAlive nodeId >> callback (Right msg) - Ping nodeId -> notifyAlive nodeId +updateStateFromIncomingMessages heartbeatState callback = + NetworkCallback + { deliver = \case + Data nodeId msg -> notifyAlive nodeId >> deliver (Right msg) + Ping nodeId -> notifyAlive nodeId + } where + NetworkCallback{deliver} = callback + notifyAlive peer = do now <- getMonotonicTime aliveSet <- alive <$> readTVarIO heartbeatState - unless (peer `Map.member` aliveSet) $ callback (Left $ Connected peer) + unless (peer `Map.member` aliveSet) $ deliver (Left $ Connected peer) atomically $ modifyTVar' heartbeatState $ \s -> s diff --git a/hydra-node/src/Hydra/Network/Ouroboros.hs b/hydra-node/src/Hydra/Network/Ouroboros.hs index b69a0b1bbdb..52e3738eabc 100644 --- a/hydra-node/src/Hydra/Network/Ouroboros.hs +++ b/hydra-node/src/Hydra/Network/Ouroboros.hs @@ -36,7 +36,7 @@ import Hydra.Logging (Tracer (..), nullTracer) import Hydra.Network ( Host (..), Network (..), - NetworkCallback, + NetworkCallback (..), NetworkComponent, PortNumber, ) @@ -133,221 +133,226 @@ withOuroborosNetwork :: HydraNetworkConfig -> (HydraHandshakeRefused -> IO ()) -> NetworkComponent IO inbound outbound () -withOuroborosNetwork tracer HydraNetworkConfig{protocolVersion, localHost, remoteHosts} handshakeCallback networkCallback between = do - bchan <- newBroadcastTChanIO - let newBroadcastChannel = atomically $ dupTChan bchan - -- NOTE: There should only be one `IOManager` instance per process. Should we - -- want to use ouroboros network framework in other places, we must factor out - -- this instantiation - withIOManager $ \iomgr -> do - withServerListening iomgr hydraServer $ - race_ (connect iomgr newBroadcastChannel hydraClient) $ do - between $ - Network - { broadcast = atomically . writeTChan bchan - } - where - resolveSockAddr :: Host -> IO SockAddr - resolveSockAddr Host{hostname, port} = do - is <- getAddrInfo (Just defaultHints) (Just $ toString hostname) (Just $ show port) - case is of - (info : _) -> pure $ addrAddress info - _ -> error "getAdrrInfo failed.. do proper error handling" - - getHost :: SockAddr -> IO Host - getHost sockAddr = do - (mHost, mPort) <- getNameInfo [NI_NUMERICHOST, NI_NUMERICSERV] True True sockAddr - maybe (error "getNameInfo failed.. do proper error handling") pure $ do - host <- T.pack <$> mHost - port <- readMaybe =<< mPort - pure $ Host host port +withOuroborosNetwork + tracer + HydraNetworkConfig{protocolVersion, localHost, remoteHosts} + handshakeCallback + NetworkCallback{deliver} + between = do + bchan <- newBroadcastTChanIO + let newBroadcastChannel = atomically $ dupTChan bchan + -- NOTE: There should only be one `IOManager` instance per process. Should we + -- want to use ouroboros network framework in other places, we must factor out + -- this instantiation + withIOManager $ \iomgr -> do + withServerListening iomgr hydraServer $ + race_ (connect iomgr newBroadcastChannel hydraClient) $ do + between $ + Network + { broadcast = atomically . writeTChan bchan + } + where + resolveSockAddr :: Host -> IO SockAddr + resolveSockAddr Host{hostname, port} = do + is <- getAddrInfo (Just defaultHints) (Just $ toString hostname) (Just $ show port) + case is of + (info : _) -> pure $ addrAddress info + _ -> error "getAdrrInfo failed.. do proper error handling" - connect :: - IOManager -> - IO t -> - ( t -> - OuroborosApplicationWithMinimalCtx - InitiatorMode - SockAddr - LByteString - IO - () - Void - ) -> - IO Void - connect iomgr newBroadcastChannel app = do - -- REVIEW(SN): move outside to have this information available? - networkState <- newNetworkMutableState - -- Using port number 0 to let the operating system pick a random port - localAddr <- resolveSockAddr localHost{port = 0} - remoteAddrs <- forM remoteHosts resolveSockAddr - let sn = socketSnocket iomgr - Subscription.ipSubscriptionWorker - sn - (contramap (WithHost localHost . TraceSubscriptions) tracer) - (contramap (WithHost localHost . TraceErrorPolicy) tracer) - networkState - (subscriptionParams localAddr remoteAddrs) - ( \sock -> - actualConnect iomgr newBroadcastChannel app sock `catch` \e -> do - host <- getHost =<< getPeerName sock - onHandshakeError host e - ) + getHost :: SockAddr -> IO Host + getHost sockAddr = do + (mHost, mPort) <- getNameInfo [NI_NUMERICHOST, NI_NUMERICSERV] True True sockAddr + maybe (error "getNameInfo failed.. do proper error handling") pure $ do + host <- T.pack <$> mHost + port <- readMaybe =<< mPort + pure $ Host host port - onHandshakeError :: Host -> HandshakeProtocolError HydraVersionedProtocolNumber -> IO () - onHandshakeError remoteHost = \case - HandshakeError (VersionMismatch theirVersions _) -> do - handshakeCallback - HydraHandshakeRefused - { ourVersion = protocolVersion - , theirVersions = KnownHydraVersions theirVersions - , remoteHost - } - _ -> - handshakeCallback - HydraHandshakeRefused - { ourVersion = protocolVersion - , theirVersions = NoKnownHydraVersions - , remoteHost - } + connect :: + IOManager -> + IO t -> + ( t -> + OuroborosApplicationWithMinimalCtx + InitiatorMode + SockAddr + LByteString + IO + () + Void + ) -> + IO Void + connect iomgr newBroadcastChannel app = do + -- REVIEW(SN): move outside to have this information available? + networkState <- newNetworkMutableState + -- Using port number 0 to let the operating system pick a random port + localAddr <- resolveSockAddr localHost{port = 0} + remoteAddrs <- forM remoteHosts resolveSockAddr + let sn = socketSnocket iomgr + Subscription.ipSubscriptionWorker + sn + (contramap (WithHost localHost . TraceSubscriptions) tracer) + (contramap (WithHost localHost . TraceErrorPolicy) tracer) + networkState + (subscriptionParams localAddr remoteAddrs) + ( \sock -> + actualConnect iomgr newBroadcastChannel app sock `catch` \e -> do + host <- getHost =<< getPeerName sock + onHandshakeError host e + ) - subscriptionParams :: - SockAddr -> - [SockAddr] -> - SubscriptionParams a IPSubscriptionTarget - subscriptionParams localAddr remoteAddrs = - SubscriptionParams - { spLocalAddresses = LocalAddresses (Just localAddr) Nothing Nothing - , spConnectionAttemptDelay = const Nothing - , spErrorPolicies = nullErrorPolicies - , spSubscriptionTarget = IPSubscriptionTarget remoteAddrs (length remoteAddrs) - } + onHandshakeError :: Host -> HandshakeProtocolError HydraVersionedProtocolNumber -> IO () + onHandshakeError remoteHost = \case + HandshakeError (VersionMismatch theirVersions _) -> do + handshakeCallback + HydraHandshakeRefused + { ourVersion = protocolVersion + , theirVersions = KnownHydraVersions theirVersions + , remoteHost + } + _ -> + handshakeCallback + HydraHandshakeRefused + { ourVersion = protocolVersion + , theirVersions = NoKnownHydraVersions + , remoteHost + } - actualConnect :: - IOManager -> - IO t -> - (t -> OuroborosApplicationWithMinimalCtx 'InitiatorMode SockAddr LByteString IO () Void) -> - Socket -> - IO () - actualConnect iomgr newBroadcastChannel app sn = do - chan <- newBroadcastChannel - connectToNodeSocket - iomgr - (codecHandshake hydraVersionedProtocolCodec) - noTimeLimitsHandshake - hydraVersionedProtocolDataCodec - networkConnectTracers - (HandshakeCallbacks acceptableVersion queryVersion) - (simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (app chan)) - sn - where - networkConnectTracers :: NetworkConnectTracers SockAddr HydraVersionedProtocolNumber - networkConnectTracers = - NetworkConnectTracers - { nctMuxTracer = nullTracer - , nctHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer + subscriptionParams :: + SockAddr -> + [SockAddr] -> + SubscriptionParams a IPSubscriptionTarget + subscriptionParams localAddr remoteAddrs = + SubscriptionParams + { spLocalAddresses = LocalAddresses (Just localAddr) Nothing Nothing + , spConnectionAttemptDelay = const Nothing + , spErrorPolicies = nullErrorPolicies + , spSubscriptionTarget = IPSubscriptionTarget remoteAddrs (length remoteAddrs) } - withServerListening :: - IOManager -> - OuroborosApplicationWithMinimalCtx 'ResponderMode SockAddr LByteString IO a b -> - IO b -> - IO () - withServerListening iomgr app continuation = do - networkState <- newNetworkMutableState - localAddr <- resolveSockAddr localHost - -- TODO(SN): whats this? _ <- async $ cleanNetworkMutableState networkState - handle onIOException - $ withServerNode - (socketSnocket iomgr) - makeSocketBearer - notConfigureSocket - networkServerTracers - networkState - (AcceptedConnectionsLimit maxBound maxBound 0) - localAddr + actualConnect :: + IOManager -> + IO t -> + (t -> OuroborosApplicationWithMinimalCtx 'InitiatorMode SockAddr LByteString IO () Void) -> + Socket -> + IO () + actualConnect iomgr newBroadcastChannel app sn = do + chan <- newBroadcastChannel + connectToNodeSocket + iomgr (codecHandshake hydraVersionedProtocolCodec) noTimeLimitsHandshake hydraVersionedProtocolDataCodec + networkConnectTracers (HandshakeCallbacks acceptableVersion queryVersion) - (simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (SomeResponderApplication app)) - nullErrorPolicies - $ \_addr serverAsync -> do - race_ (wait serverAsync) continuation - where - notConfigureSocket :: a -> b -> IO () - notConfigureSocket _ _ = pure () + (simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (app chan)) + sn + where + networkConnectTracers :: NetworkConnectTracers SockAddr HydraVersionedProtocolNumber + networkConnectTracers = + NetworkConnectTracers + { nctMuxTracer = nullTracer + , nctHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer + } - networkServerTracers :: NetworkServerTracers SockAddr HydraVersionedProtocolNumber - networkServerTracers = - NetworkServerTracers - { nstMuxTracer = nullTracer - , nstHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer - , nstErrorPolicyTracer = contramap (WithHost localHost . TraceErrorPolicy) tracer - , nstAcceptPolicyTracer = contramap (WithHost localHost . TraceAcceptPolicy) tracer - } + withServerListening :: + IOManager -> + OuroborosApplicationWithMinimalCtx 'ResponderMode SockAddr LByteString IO a b -> + IO b -> + IO () + withServerListening iomgr app continuation = do + networkState <- newNetworkMutableState + localAddr <- resolveSockAddr localHost + -- TODO(SN): whats this? _ <- async $ cleanNetworkMutableState networkState + handle onIOException + $ withServerNode + (socketSnocket iomgr) + makeSocketBearer + notConfigureSocket + networkServerTracers + networkState + (AcceptedConnectionsLimit maxBound maxBound 0) + localAddr + (codecHandshake hydraVersionedProtocolCodec) + noTimeLimitsHandshake + hydraVersionedProtocolDataCodec + (HandshakeCallbacks acceptableVersion queryVersion) + (simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (SomeResponderApplication app)) + nullErrorPolicies + $ \_addr serverAsync -> do + race_ (wait serverAsync) continuation + where + notConfigureSocket :: a -> b -> IO () + notConfigureSocket _ _ = pure () - onIOException :: IOException -> IO () - onIOException ioException = - throwIO $ - NetworkServerListenException - { ioException - , localHost + networkServerTracers :: NetworkServerTracers SockAddr HydraVersionedProtocolNumber + networkServerTracers = + NetworkServerTracers + { nstMuxTracer = nullTracer + , nstHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer + , nstErrorPolicyTracer = contramap (WithHost localHost . TraceErrorPolicy) tracer + , nstAcceptPolicyTracer = contramap (WithHost localHost . TraceAcceptPolicy) tracer } - hydraClient :: - TChan outbound -> - OuroborosApplicationWithMinimalCtx 'InitiatorMode addr LByteString IO () Void - hydraClient chan = - OuroborosApplication - [ MiniProtocol - { miniProtocolNum = MiniProtocolNum 42 - , miniProtocolLimits = maximumMiniProtocolLimits - , miniProtocolRun = InitiatorProtocolOnly initiator - } - ] - where - initiator :: MiniProtocolCb ctx LByteString IO () - initiator = - mkMiniProtocolCbFromPeer - ( const - (nullTracer, codecFireForget, fireForgetClientPeer $ client chan) - ) + onIOException :: IOException -> IO () + onIOException ioException = + throwIO $ + NetworkServerListenException + { ioException + , localHost + } - hydraServer :: - OuroborosApplicationWithMinimalCtx 'ResponderMode addr LByteString IO Void () - hydraServer = - OuroborosApplication - [ MiniProtocol - { miniProtocolNum = MiniProtocolNum 42 - , miniProtocolLimits = maximumMiniProtocolLimits - , miniProtocolRun = ResponderProtocolOnly responder - } - ] - where - responder :: MiniProtocolCb ctx LByteString IO () - responder = mkMiniProtocolCbFromPeer (const (nullTracer, codecFireForget, fireForgetServerPeer server)) + hydraClient :: + TChan outbound -> + OuroborosApplicationWithMinimalCtx 'InitiatorMode addr LByteString IO () Void + hydraClient chan = + OuroborosApplication + [ MiniProtocol + { miniProtocolNum = MiniProtocolNum 42 + , miniProtocolLimits = maximumMiniProtocolLimits + , miniProtocolRun = InitiatorProtocolOnly initiator + } + ] + where + initiator :: MiniProtocolCb ctx LByteString IO () + initiator = + mkMiniProtocolCbFromPeer + ( const + (nullTracer, codecFireForget, fireForgetClientPeer $ client chan) + ) - -- TODO: provide sensible limits - -- https://github.com/input-output-hk/ouroboros-network/issues/575 - maximumMiniProtocolLimits :: MiniProtocolLimits - maximumMiniProtocolLimits = - MiniProtocolLimits{maximumIngressQueue = maxBound} + hydraServer :: + OuroborosApplicationWithMinimalCtx 'ResponderMode addr LByteString IO Void () + hydraServer = + OuroborosApplication + [ MiniProtocol + { miniProtocolNum = MiniProtocolNum 42 + , miniProtocolLimits = maximumMiniProtocolLimits + , miniProtocolRun = ResponderProtocolOnly responder + } + ] + where + responder :: MiniProtocolCb ctx LByteString IO () + responder = mkMiniProtocolCbFromPeer (const (nullTracer, codecFireForget, fireForgetServerPeer server)) + + -- TODO: provide sensible limits + -- https://github.com/input-output-hk/ouroboros-network/issues/575 + maximumMiniProtocolLimits :: MiniProtocolLimits + maximumMiniProtocolLimits = + MiniProtocolLimits{maximumIngressQueue = maxBound} - client :: - TChan outbound -> - FireForgetClient outbound IO () - client chan = - Idle $ do - atomically (readTChan chan) <&> \msg -> - SendMsg msg (pure $ client chan) + client :: + TChan outbound -> + FireForgetClient outbound IO () + client chan = + Idle $ do + atomically (readTChan chan) <&> \msg -> + SendMsg msg (pure $ client chan) - server :: FireForgetServer inbound IO () - server = - FireForgetServer - { recvMsg = \msg -> networkCallback msg $> server - , recvMsgDone = pure () - } + server :: FireForgetServer inbound IO () + server = + FireForgetServer + { recvMsg = \msg -> deliver msg $> server + , recvMsgDone = pure () + } data NetworkServerListenException = NetworkServerListenException { ioException :: IOException diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 2c363031a01..231906b7cf5 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -107,7 +107,7 @@ import Data.Vector ( (!?), ) import Hydra.Logging (traceWith) -import Hydra.Network (Network (..), NetworkComponent) +import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), isPing) import Hydra.Persistence (Persistence (..), PersistenceIncremental (..)) @@ -231,7 +231,10 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa withAsync (forever $ atomically (readTQueue resendQ) >>= broadcast) $ \_ -> reliableBroadcast sentMessages ourIndex acksCache network where + NetworkCallback{deliver} = callback + allParties = fromList $ sort $ me : otherParties + reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} = action $ Network @@ -259,46 +262,47 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa cacheMessage msg = modifyTVar' sentMessages (|> msg) - reliableCallback acksCache sentMessages resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do - if length acknowledged /= length allParties - then - traceWith - tracer - ReceivedMalformedAcks - { fromParty = party - , partyAcks = acknowledged - , numberOfParties = length allParties - } - else do - eShouldCallbackWithKnownAcks <- atomically $ runMaybeT $ do - loadedAcks <- lift $ readTVar acksCache - partyIndex <- hoistMaybe $ findPartyIndex party - messageAckForParty <- hoistMaybe (acknowledged !? partyIndex) - knownAckForParty <- hoistMaybe $ loadedAcks !? partyIndex - if - | isPing payload -> - -- we do not update indices on Pings but we do propagate them - return (True, partyIndex, loadedAcks) - | messageAckForParty == knownAckForParty + 1 -> do - -- we update indices for next in line messages and propagate them - let newAcks = constructAcks loadedAcks partyIndex - lift $ writeTVar acksCache newAcks - return (True, partyIndex, newAcks) - | otherwise -> - -- other messages are dropped - return (False, partyIndex, loadedAcks) + reliableCallback acksCache sentMessages resend ourIndex = + NetworkCallback $ \(Authenticated (ReliableMsg acknowledged payload) party) -> do + if length acknowledged /= length allParties + then + traceWith + tracer + ReceivedMalformedAcks + { fromParty = party + , partyAcks = acknowledged + , numberOfParties = length allParties + } + else do + eShouldCallbackWithKnownAcks <- atomically $ runMaybeT $ do + loadedAcks <- lift $ readTVar acksCache + partyIndex <- hoistMaybe $ findPartyIndex party + messageAckForParty <- hoistMaybe (acknowledged !? partyIndex) + knownAckForParty <- hoistMaybe $ loadedAcks !? partyIndex + if + | isPing payload -> + -- we do not update indices on Pings but we do propagate them + return (True, partyIndex, loadedAcks) + | messageAckForParty == knownAckForParty + 1 -> do + -- we update indices for next in line messages and propagate them + let newAcks = constructAcks loadedAcks partyIndex + lift $ writeTVar acksCache newAcks + return (True, partyIndex, newAcks) + | otherwise -> + -- other messages are dropped + return (False, partyIndex, loadedAcks) - case eShouldCallbackWithKnownAcks of - Just (shouldCallback, theirIndex, localCounter) -> do - if shouldCallback - then do - callback Authenticated{payload, party} - traceWith tracer Received{acknowledged, localCounter, theirIndex, ourIndex} - else traceWith tracer Ignored{acknowledged, localCounter, theirIndex, ourIndex} + case eShouldCallbackWithKnownAcks of + Just (shouldCallback, theirIndex, localCounter) -> do + if shouldCallback + then do + deliver Authenticated{payload, party} + traceWith tracer Received{acknowledged, localCounter, theirIndex, ourIndex} + else traceWith tracer Ignored{acknowledged, localCounter, theirIndex, ourIndex} - when (isPing payload) $ - resendMessagesIfLagging sentMessages resend theirIndex localCounter acknowledged ourIndex - Nothing -> pure () + when (isPing payload) $ + resendMessagesIfLagging sentMessages resend theirIndex localCounter acknowledged ourIndex + Nothing -> pure () constructAcks acks wantedIndex = zipWith (\ack i -> if i == wantedIndex then ack + 1 else ack) acks partyIndexes diff --git a/hydra-node/src/Hydra/Node.hs b/hydra-node/src/Hydra/Node.hs index 6c14290bddb..a65d09b0114 100644 --- a/hydra-node/src/Hydra/Node.hs +++ b/hydra-node/src/Hydra/Node.hs @@ -46,7 +46,7 @@ import Hydra.HeadLogic.Outcome (StateChanged (..)) import Hydra.HeadLogic.State (getHeadParameters) import Hydra.Ledger (Ledger) import Hydra.Logging (Tracer, traceWith) -import Hydra.Network (Network (..)) +import Hydra.Network (Network (..), NetworkCallback (..)) import Hydra.Network.Message (Message, NetworkEvent (..)) import Hydra.Node.InputQueue (InputQueue (..), Queued (..), createInputQueue) import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..)) @@ -200,8 +200,9 @@ wireClientInput node = enqueue . ClientInput where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node -wireNetworkInput :: DraftHydraNode tx m -> NetworkEvent (Message tx) -> m () -wireNetworkInput node = enqueue . NetworkInput defaultTTL +wireNetworkInput :: DraftHydraNode tx m -> NetworkCallback (NetworkEvent (Message tx)) m +wireNetworkInput node = + NetworkCallback{deliver = enqueue . NetworkInput defaultTTL} where DraftHydraNode{inputQueue = InputQueue{enqueue}} = node diff --git a/hydra-node/src/Hydra/Node/Network.hs b/hydra-node/src/Hydra/Node/Network.hs index a31d1f7ec31..a19a0ee4dbd 100644 --- a/hydra-node/src/Hydra/Node/Network.hs +++ b/hydra-node/src/Hydra/Node/Network.hs @@ -74,7 +74,7 @@ import Hydra.Prelude hiding (fromList, replicate) import Control.Tracer (Tracer) import Hydra.Logging (traceWith) import Hydra.Logging.Messages (HydraLog (..)) -import Hydra.Network (Host (..), IP, NetworkComponent, NodeId, PortNumber) +import Hydra.Network (Host (..), IP, NetworkCallback (..), NetworkComponent, NodeId, PortNumber) import Hydra.Network.Authenticate (Authenticated (..), Signed, withAuthentication) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) import Hydra.Network.Message ( @@ -146,12 +146,14 @@ withNetwork tracer configuration callback action = do , remoteHosts = peers } ( \HydraHandshakeRefused{remoteHost, ourVersion, theirVersions} -> - callback . ConnectivityEvent $ HandshakeFailure{remoteHost, ourVersion, theirVersions} + deliver . ConnectivityEvent $ HandshakeFailure{remoteHost, ourVersion, theirVersions} ) - withHeartbeat nodeId reliability (callback . mapHeartbeat) $ \network -> + withHeartbeat nodeId reliability (NetworkCallback{deliver = deliver . mapHeartbeat}) $ \network -> action network where + NetworkCallback{deliver} = callback + NetworkConfiguration{persistenceDir, signingKey, otherParties, host, port, peers, nodeId} = configuration mapHeartbeat :: Either Connectivity (Authenticated (Message tx)) -> NetworkEvent (Message tx) @@ -186,12 +188,12 @@ configureMessagePersistence tracer persistenceDir numberOfParties = do withFlipHeartbeats :: NetworkComponent m (Authenticated (Heartbeat inbound)) outbound a -> NetworkComponent m (Heartbeat (Authenticated inbound)) outbound a -withFlipHeartbeats withBaseNetwork callback = - withBaseNetwork unwrapHeartbeats +withFlipHeartbeats withBaseNetwork NetworkCallback{deliver} = + withBaseNetwork NetworkCallback{deliver = unwrapHeartbeats} where unwrapHeartbeats = \case - Authenticated (Data nid msg) party -> callback $ Data nid (Authenticated msg party) - Authenticated (Ping nid) _ -> callback $ Ping nid + Authenticated (Data nid msg) party -> deliver $ Data nid (Authenticated msg party) + Authenticated (Ping nid) _ -> deliver $ Ping nid -- | Where are the messages stored, relative to given directory. storedMessagesFile :: FilePath -> FilePath diff --git a/hydra-node/test/Hydra/Network/AuthenticateSpec.hs b/hydra-node/test/Hydra/Network/AuthenticateSpec.hs index 438216d30eb..6040ca59d16 100644 --- a/hydra-node/test/Hydra/Network/AuthenticateSpec.hs +++ b/hydra-node/test/Hydra/Network/AuthenticateSpec.hs @@ -6,7 +6,7 @@ import Control.Monad.IOSim (runSimOrThrow) import Data.ByteString (pack) import Hydra.Ledger.Simple (SimpleTx) import Hydra.Logging (Envelope (message), nullTracer, traceInTVar) -import Hydra.Network (Network (..)) +import Hydra.Network (Network (..), NetworkCallback (..)) import Hydra.Network.Authenticate (AuthLog, Authenticated (..), Signed (Signed), mkAuthLog, withAuthentication) import Hydra.Network.HeartbeatSpec (noop) import Hydra.Network.Message (Message (ReqTx)) @@ -24,8 +24,11 @@ spec = parallel $ do let captureOutgoing msgqueue _cb action = action $ Network{broadcast = \msg -> atomically $ modifyTVar' msgqueue (msg :)} - captureIncoming receivedMessages msg = - atomically $ modifyTVar' receivedMessages (msg :) + captureIncoming receivedMessages = + NetworkCallback + { deliver = \msg -> + atomically $ modifyTVar' receivedMessages (msg :) + } msg <- runIO $ generate @(Message SimpleTx) arbitrary it "pass the authenticated messages around" $ do @@ -38,8 +41,8 @@ spec = parallel $ do nullTracer aliceSk [bob] - ( \incoming _ -> do - incoming (Signed msg (sign bobSk msg) bob) + ( \NetworkCallback{deliver} _ -> do + deliver (Signed msg (sign bobSk msg) bob) ) (captureIncoming receivedMessages) $ \_ -> @@ -60,9 +63,9 @@ spec = parallel $ do nullTracer aliceSk [bob] - ( \incoming _ -> do - incoming (Signed msg (sign bobSk msg) bob) - incoming (Signed unexpectedMessage (sign aliceSk unexpectedMessage) alice) + ( \NetworkCallback{deliver} _ -> do + deliver (Signed msg (sign bobSk msg) bob) + deliver (Signed unexpectedMessage (sign aliceSk unexpectedMessage) alice) ) (captureIncoming receivedMessages) $ \_ -> @@ -82,8 +85,8 @@ spec = parallel $ do nullTracer aliceSk [bob, carol] - ( \incoming _ -> do - incoming (Signed msg (sign carolSk msg) bob) + ( \NetworkCallback{deliver} _ -> do + deliver (Signed msg (sign carolSk msg) bob) ) (captureIncoming receivedMessages) $ \_ -> @@ -128,7 +131,7 @@ spec = parallel $ do tracer aliceSk [bob, carol] - (\incoming _ -> incoming signedMsg) + (\NetworkCallback{deliver} _ -> deliver signedMsg) noop $ \_ -> threadDelay 1 diff --git a/hydra-node/test/Hydra/Network/HeartbeatSpec.hs b/hydra-node/test/Hydra/Network/HeartbeatSpec.hs index 3d1b22d488b..bc560bce607 100644 --- a/hydra-node/test/Hydra/Network/HeartbeatSpec.hs +++ b/hydra-node/test/Hydra/Network/HeartbeatSpec.hs @@ -5,7 +5,7 @@ import Test.Hydra.Prelude import Control.Concurrent.Class.MonadSTM (MonadSTM (readTVarIO), modifyTVar', newTVarIO) import Control.Monad.IOSim (runSimOrThrow) -import Hydra.Network (Network (..), NetworkComponent, NodeId (..)) +import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent, NodeId (..)) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) import Hydra.Network.Message (Connectivity (Connected, Disconnected)) @@ -30,7 +30,7 @@ spec = parallel $ do let receivedHeartbeats = runSimOrThrow $ do (callback, getConnectivityEvents) <- captureConnectivity - withHeartbeat nodeId (\incoming _ -> incoming (Ping otherNodeId)) callback $ \_ -> + withHeartbeat nodeId (\NetworkCallback{deliver} _ -> deliver (Ping otherNodeId)) callback $ \_ -> threadDelay 1 getConnectivityEvents @@ -41,7 +41,7 @@ spec = parallel $ do let receivedHeartbeats = runSimOrThrow $ do (callback, getConnectivityEvents) <- captureConnectivity - withHeartbeat nodeId (\incoming _ -> incoming (Data otherNodeId ())) callback $ \_ -> + withHeartbeat nodeId (\NetworkCallback{deliver} _ -> deliver (Data otherNodeId ())) callback $ \_ -> threadDelay 1 getConnectivityEvents @@ -52,7 +52,7 @@ spec = parallel $ do let receivedHeartbeats = runSimOrThrow $ do (callback, getConnectivityEvents) <- captureConnectivity - withHeartbeat nodeId (\incoming _ -> incoming (Data otherNodeId ()) >> incoming (Ping otherNodeId)) callback $ \_ -> + withHeartbeat nodeId (\NetworkCallback{deliver} _ -> deliver (Data otherNodeId ()) >> deliver (Ping otherNodeId)) callback $ \_ -> threadDelay 1 getConnectivityEvents @@ -63,10 +63,10 @@ spec = parallel $ do let receivedHeartbeats = runSimOrThrow $ do (callback, getConnectivityEvents) <- captureConnectivity - let component incoming action = + let component NetworkCallback{deliver} action = race_ - (action (Network noop)) - (incoming (Ping otherNodeId) >> threadDelay 4 >> incoming (Ping otherNodeId) >> threadDelay 7) + (action (Network{broadcast = const $ pure ()})) + (deliver (Ping otherNodeId) >> threadDelay 4 >> deliver (Ping otherNodeId) >> threadDelay 7) withHeartbeat nodeId component callback $ \_ -> threadDelay 20 @@ -101,8 +101,8 @@ spec = parallel $ do sentHeartbeats `shouldBe` [Ping nodeId, Data nodeId (), Ping nodeId] -noop :: Monad m => b -> m () -noop = const $ pure () +noop :: Monad m => NetworkCallback b m +noop = NetworkCallback{deliver = const $ pure ()} captureOutgoing :: MonadSTM m => m (NetworkComponent m (Heartbeat ()) (Heartbeat ()) (), m [Heartbeat ()]) captureOutgoing = do @@ -112,10 +112,10 @@ captureOutgoing = do broadcast tv msg = atomically $ modifyTVar' tv (msg :) -captureConnectivity :: MonadSTM m => m (Either Connectivity a -> m (), m [Connectivity]) +captureConnectivity :: MonadSTM m => m (NetworkCallback (Either Connectivity a) m, m [Connectivity]) captureConnectivity = do tv <- newTVarIO [] - pure (record tv, readTVarIO tv) + pure (NetworkCallback{deliver = record tv}, readTVarIO tv) where record tv = \case Left c -> atomically $ modifyTVar' tv (c :) diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index e3bb063050f..5d193be1c59 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -16,7 +16,7 @@ import Control.Tracer (Tracer (..), nullTracer) import Data.Sequence.Strict ((|>)) import Data.Vector (Vector, empty, fromList, head, replicate, snoc) import Data.Vector qualified as Vector -import Hydra.Network (Network (..)) +import Hydra.Network (Network (..), NetworkCallback (..)) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) import Hydra.Network.Message (Connectivity) @@ -156,10 +156,10 @@ spec = parallel $ do alicePersistence alice [bob] - ( \incoming action -> do + ( \NetworkCallback{deliver} action -> do concurrently_ (action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` m)}) - (incoming (Authenticated (ReliableMsg (fromList [0, 1]) (Data "node-2" msg)) bob)) + (deliver (Authenticated (ReliableMsg (fromList [0, 1]) (Data "node-2" msg)) bob)) ) noop $ \Network{broadcast} -> do @@ -195,10 +195,10 @@ spec = parallel $ do messagePersistence alice [bob] - ( \incoming action -> do + ( \NetworkCallback{deliver} action -> do concurrently_ (action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` m)}) - (incoming (Authenticated (ReliableMsg (fromList [0, 1]) (Data "node-2" msg)) bob)) + (deliver (Authenticated (ReliableMsg (fromList [0, 1]) (Data "node-2" msg)) bob)) ) noop $ \Network{broadcast} -> do @@ -229,11 +229,11 @@ spec = parallel $ do withFlipHeartbeats $ withReliability tracer persistence party peers underlyingNetwork - failingNetwork seed peer (readQueue, writeQueue) callback action = + failingNetwork seed peer (readQueue, writeQueue) NetworkCallback{deliver} action = withAsync ( forever $ do newMsg <- atomically $ readTQueue readQueue - callback newMsg + deliver newMsg ) $ \_ -> action $ @@ -254,15 +254,15 @@ spec = parallel $ do createPersistenceIncremental fileName >>= \PersistenceIncremental{loadAll} -> loadAll -noop :: Monad m => b -> m () -noop = const $ pure () +noop :: Monad m => NetworkCallback b m +noop = NetworkCallback{deliver = const $ pure ()} aliceReceivesMessages :: [Authenticated (ReliableMsg (Heartbeat msg))] -> [Authenticated (Heartbeat msg)] aliceReceivesMessages messages = runSimOrThrow $ do receivedMessages <- newTVarIO empty alicePersistence <- mockMessagePersistence 3 - let baseNetwork incoming _ = mapM incoming messages + let baseNetwork NetworkCallback{deliver} _ = mapM deliver messages aliceReliabilityStack = withReliability @@ -277,15 +277,21 @@ aliceReceivesMessages messages = runSimOrThrow $ do Vector.toList <$> readTVarIO receivedMessages -captureIncoming :: MonadSTM m => TVar m (Vector p) -> p -> m () -captureIncoming receivedMessages msg = - atomically $ modifyTVar' receivedMessages (`snoc` msg) - -capturePayload :: MonadSTM m => TVar m (Vector msg) -> Either Connectivity (Authenticated (Heartbeat msg)) -> m () -capturePayload receivedMessages = \case - Right Authenticated{payload = Data _ msg} -> - atomically $ modifyTVar' receivedMessages (`snoc` msg) - _ -> pure () +captureIncoming :: MonadSTM m => TVar m (Vector p) -> NetworkCallback p m +captureIncoming receivedMessages = + NetworkCallback + { deliver = \msg -> + atomically $ modifyTVar' receivedMessages (`snoc` msg) + } + +capturePayload :: MonadSTM m => TVar m (Vector msg) -> NetworkCallback (Either Connectivity (Authenticated (Heartbeat msg))) m +capturePayload receivedMessages = + NetworkCallback + { deliver = \case + Right Authenticated{payload = Data _ msg} -> + atomically $ modifyTVar' receivedMessages (`snoc` msg) + _ -> pure () + } waitForAllMessages :: MonadSTM m => [msg] -> TVar m (Vector msg) -> m () waitForAllMessages expectedMessages capturedMessages = atomically $ do diff --git a/hydra-node/test/Hydra/NetworkSpec.hs b/hydra-node/test/Hydra/NetworkSpec.hs index e101c9e8ae4..f100742aba1 100644 --- a/hydra-node/test/Hydra/NetworkSpec.hs +++ b/hydra-node/test/Hydra/NetworkSpec.hs @@ -11,7 +11,7 @@ import Codec.CBOR.Write (toLazyByteString) import Control.Concurrent.Class.MonadSTM (modifyTVar', newTQueue, newTVarIO, readTQueue, readTVarIO, writeTQueue) import Hydra.Ledger.Simple (SimpleTx (..)) import Hydra.Logging (nullTracer, showLogsOnFailure) -import Hydra.Network (Host (..), Network) +import Hydra.Network (Host (..), Network, NetworkCallback (..)) import Hydra.Network.Message ( HydraHandshakeRefused (..), HydraVersionedProtocolNumber (..), @@ -36,6 +36,7 @@ spec = do describe "Ouroboros Network" $ do it "broadcasts messages to single connected peer" $ do received <- atomically newTQueue + let recordReceived = NetworkCallback{deliver = atomically . writeTQueue received} showLogsOnFailure "NetworkSpec" $ \tracer -> failAfter 30 $ do [port1, port2] <- fmap fromIntegral <$> randomUnusedTCPPorts 2 let node1Config = @@ -50,13 +51,14 @@ spec = do , localHost = Host lo port2 , remoteHosts = [Host lo port1] } - withOuroborosNetwork tracer node1Config (const $ pure ()) (const @_ @Integer $ pure ()) $ \hn1 -> - withOuroborosNetwork @Integer tracer node2Config (const $ pure ()) (atomically . writeTQueue received) $ \_ -> do + withOuroborosNetwork @Integer tracer node1Config (const $ pure ()) mockCallback $ \hn1 -> + withOuroborosNetwork @Integer tracer node2Config (const $ pure ()) recordReceived $ \_ -> do withNodeBroadcastingForever hn1 1 $ atomically (readTQueue received) `shouldReturn` 1 it "handshake failures should call the handshakeCallback" $ do received <- atomically newTQueue + let recordReceived = NetworkCallback{deliver = atomically . writeTQueue received} showLogsOnFailure "NetworkSpec" $ \tracer -> failAfter 30 $ do [port1, port2] <- fmap fromIntegral <$> randomUnusedTCPPorts 2 let node1Config = @@ -81,8 +83,8 @@ spec = do (handshakeCallback1, getHandshakeFailures1) <- createHandshakeCallback (handshakeCallback2, getHandshakeFailures2) <- createHandshakeCallback - withOuroborosNetwork @Integer @() tracer node1Config handshakeCallback1 (const @_ @Integer $ pure ()) $ \_ -> - withOuroborosNetwork @Integer tracer node2Config handshakeCallback2 (atomically . writeTQueue received) $ \_ -> do + withOuroborosNetwork @Integer @() tracer node1Config handshakeCallback1 mockCallback $ \_ -> + withOuroborosNetwork @Integer tracer node2Config handshakeCallback2 recordReceived $ \_ -> do threadDelay 0.1 getHandshakeFailures1 `shouldReturn` [Host lo port2] getHandshakeFailures2 `shouldReturn` [Host lo port1] @@ -91,6 +93,7 @@ spec = do node1received <- atomically newTQueue node2received <- atomically newTQueue node3received <- atomically newTQueue + let recordReceivedIn tq = NetworkCallback{deliver = atomically . writeTQueue tq} showLogsOnFailure "NetworkSpec" $ \tracer -> failAfter 30 $ do [port1, port2, port3] <- fmap fromIntegral <$> randomUnusedTCPPorts 3 let node1Config = @@ -111,9 +114,9 @@ spec = do , localHost = Host lo port3 , remoteHosts = [Host lo port2, Host lo port1] } - withOuroborosNetwork @Integer tracer node1Config (const $ pure ()) (atomically . writeTQueue node1received) $ \hn1 -> - withOuroborosNetwork tracer node2Config (const $ pure ()) (atomically . writeTQueue node2received) $ \hn2 -> do - withOuroborosNetwork tracer node3Config (const $ pure ()) (atomically . writeTQueue node3received) $ \hn3 -> do + withOuroborosNetwork @Integer tracer node1Config (const $ pure ()) (recordReceivedIn node1received) $ \hn1 -> + withOuroborosNetwork tracer node2Config (const $ pure ()) (recordReceivedIn node2received) $ \hn2 -> do + withOuroborosNetwork tracer node3Config (const $ pure ()) (recordReceivedIn node3received) $ \hn3 -> do withNodesBroadcastingForever [(hn1, 1), (hn2, 2), (hn3, 3)] $ assertAllnodesReceivedMessagesFromAllOtherNodes [(node1received, 1), (node2received, 2), (node3received, 3)] @@ -157,3 +160,6 @@ prop_canRoundtripCBOREncoding :: prop_canRoundtripCBOREncoding a = let encoded = toLazyByteString $ toCBOR a in (snd <$> deserialiseFromBytes fromCBOR encoded) === Right a + +mockCallback :: Applicative m => NetworkCallback msg m +mockCallback = NetworkCallback{deliver = \_ -> pure ()} From 0f41dae21e44bf0db61ed03937dfc09676652949 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Wed, 11 Sep 2024 17:21:37 +0200 Subject: [PATCH 2/4] Minor simplification on a ouroboros network test --- hydra-node/test/Hydra/NetworkSpec.hs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hydra-node/test/Hydra/NetworkSpec.hs b/hydra-node/test/Hydra/NetworkSpec.hs index f100742aba1..9c0f61859ac 100644 --- a/hydra-node/test/Hydra/NetworkSpec.hs +++ b/hydra-node/test/Hydra/NetworkSpec.hs @@ -57,8 +57,6 @@ spec = do atomically (readTQueue received) `shouldReturn` 1 it "handshake failures should call the handshakeCallback" $ do - received <- atomically newTQueue - let recordReceived = NetworkCallback{deliver = atomically . writeTQueue received} showLogsOnFailure "NetworkSpec" $ \tracer -> failAfter 30 $ do [port1, port2] <- fmap fromIntegral <$> randomUnusedTCPPorts 2 let node1Config = @@ -83,8 +81,8 @@ spec = do (handshakeCallback1, getHandshakeFailures1) <- createHandshakeCallback (handshakeCallback2, getHandshakeFailures2) <- createHandshakeCallback - withOuroborosNetwork @Integer @() tracer node1Config handshakeCallback1 mockCallback $ \_ -> - withOuroborosNetwork @Integer tracer node2Config handshakeCallback2 recordReceived $ \_ -> do + withOuroborosNetwork @Int @Int tracer node1Config handshakeCallback1 mockCallback $ \_ -> + withOuroborosNetwork @Int tracer node2Config handshakeCallback2 mockCallback $ \_ -> do threadDelay 0.1 getHandshakeFailures1 `shouldReturn` [Host lo port2] getHandshakeFailures2 `shouldReturn` [Host lo port1] From 33f86d18ec5244515573b82cecdf09d607737c4c Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Wed, 11 Sep 2024 17:44:15 +0200 Subject: [PATCH 3/4] Disable node-level short-cut of network messages This is now responsibility of a 'NetworkComponent' to allow for reliably delivering implementations. For example: only 'deliver' once we know all other nodes have seen a 'broadcast' message. --- hydra-node/src/Hydra/Node.hs | 5 +---- hydra-node/test/Hydra/BehaviorSpec.hs | 3 +-- hydra-node/test/Hydra/Model/MockChain.hs | 3 +-- hydra-node/test/Hydra/NodeSpec.hs | 11 +++++------ 4 files changed, 8 insertions(+), 14 deletions(-) diff --git a/hydra-node/src/Hydra/Node.hs b/hydra-node/src/Hydra/Node.hs index a65d09b0114..4482fe76fba 100644 --- a/hydra-node/src/Hydra/Node.hs +++ b/hydra-node/src/Hydra/Node.hs @@ -329,10 +329,7 @@ processEffects node tracer inputId effects = do traceWith tracer $ BeginEffect party inputId effectId effect case effect of ClientEffect i -> sendOutput server i - NetworkEffect msg -> do - broadcast hn msg - -- FIXME: This must not be here, such that the network layer can ensure correct delivery (i.e. reliable broadcast) - enqueue (NetworkInput defaultTTL (ReceivedMessage{sender = party, msg})) + NetworkEffect msg -> broadcast hn msg OnChainEffect{postChainTx} -> postTx postChainTx `catch` \(postTxError :: PostTxError tx) -> diff --git a/hydra-node/test/Hydra/BehaviorSpec.hs b/hydra-node/test/Hydra/BehaviorSpec.hs index 4408f310751..508ede707c2 100644 --- a/hydra-node/test/Hydra/BehaviorSpec.hs +++ b/hydra-node/test/Hydra/BehaviorSpec.hs @@ -826,8 +826,7 @@ createMockNetwork node nodes = where broadcast msg = do allNodes <- readTVarIO nodes - let otherNodes = filter (\n -> getParty n /= getParty node) allNodes - mapM_ (`handleMessage` msg) otherNodes + mapM_ (`handleMessage` msg) allNodes handleMessage HydraNode{inputQueue} msg = enqueue inputQueue . NetworkInput defaultTTL $ ReceivedMessage{sender, msg} diff --git a/hydra-node/test/Hydra/Model/MockChain.hs b/hydra-node/test/Hydra/Model/MockChain.hs index b0820b2333c..4917b97ddc2 100644 --- a/hydra-node/test/Hydra/Model/MockChain.hs +++ b/hydra-node/test/Hydra/Model/MockChain.hs @@ -361,8 +361,7 @@ createMockNetwork draftNode nodes = where broadcast msg = do allNodes <- fmap node <$> readTVarIO nodes - let otherNodes = filter (\n -> getParty n /= getParty draftNode) allNodes - mapM_ (`handleMessage` msg) otherNodes + mapM_ (`handleMessage` msg) allNodes handleMessage HydraNode{inputQueue} msg = enqueue inputQueue . NetworkInput defaultTTL $ ReceivedMessage{sender, msg} diff --git a/hydra-node/test/Hydra/NodeSpec.hs b/hydra-node/test/Hydra/NodeSpec.hs index 440361554e1..ef1756e4c60 100644 --- a/hydra-node/test/Hydra/NodeSpec.hs +++ b/hydra-node/test/Hydra/NodeSpec.hs @@ -179,7 +179,7 @@ spec = parallel $ do events <- getRecordedEvents getEventId <$> events `shouldSatisfy` isStrictlyMonotonic - it "emits a single ReqSn and AckSn as leader, even after multiple ReqTxs" $ + it "emits a single ReqSn as leader, even after multiple ReqTxs" $ showLogsOnFailure "NodeSpec" $ \tracer -> do -- NOTE(SN): Sequence of parties in OnInitTx of -- 'inputsToOpenHead' is relevant, so 10 is the (initial) snapshot leader @@ -192,22 +192,21 @@ spec = parallel $ do , receiveMessage ReqTx{transaction = tx2} , receiveMessage ReqTx{transaction = tx3} ] - signedSnapshot = sign aliceSk $ testSnapshot 1 0 [1] (utxoRefs [1, 3, 4]) (node, getNetworkEvents) <- testHydraNode tracer aliceSk [bob, carol] cperiod inputs >>= recordNetwork runToCompletion node - getNetworkEvents `shouldReturn` [ReqSn 0 1 [1] Nothing, AckSn signedSnapshot 1] + getNetworkEvents `shouldReturn` [ReqSn 0 1 [1] Nothing] it "rotates snapshot leaders" $ showLogsOnFailure "NodeSpec" $ \tracer -> do let tx1 = SimpleTx{txSimpleId = 1, txInputs = utxoRefs [2], txOutputs = utxoRefs [4]} sn1 = testSnapshot 1 0 [] (utxoRefs [1, 2, 3]) - sn2 = testSnapshot 2 0 [1] (utxoRefs [1, 3, 4]) inputs = inputsToOpenHead <> [ receiveMessage ReqSn{snapshotVersion = 0, snapshotNumber = 1, transactionIds = mempty, decommitTx = Nothing} - , receiveMessage $ AckSn (sign aliceSk sn1) 1 + , receiveMessageFrom alice $ AckSn (sign aliceSk sn1) 1 + , receiveMessageFrom bob $ AckSn (sign bobSk sn1) 1 , receiveMessageFrom carol $ AckSn (sign carolSk sn1) 1 , receiveMessage ReqTx{transaction = tx1} ] @@ -217,7 +216,7 @@ spec = parallel $ do >>= recordNetwork runToCompletion node - getNetworkEvents `shouldReturn` [AckSn (sign bobSk sn1) 1, ReqSn 0 2 [1] Nothing, AckSn (sign bobSk sn2) 2] + getNetworkEvents `shouldReturn` [AckSn (sign bobSk sn1) 1, ReqSn 0 2 [1] Nothing] it "processes out-of-order AckSn" $ showLogsOnFailure "NodeSpec" $ \tracer -> do From 1e7123f90cc43f0496fd3340fcd21ae6954ea0bf Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Wed, 11 Sep 2024 18:08:26 +0200 Subject: [PATCH 4/4] Short-cut delivery in Hydra.Node.Network This is now replicating the previous behavior of Hydra.Node into the aggregated Network stack. --- hydra-node/src/Hydra/Node/Network.hs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/hydra-node/src/Hydra/Node/Network.hs b/hydra-node/src/Hydra/Node/Network.hs index a19a0ee4dbd..c4e234b868a 100644 --- a/hydra-node/src/Hydra/Node/Network.hs +++ b/hydra-node/src/Hydra/Node/Network.hs @@ -73,8 +73,9 @@ import Hydra.Prelude hiding (fromList, replicate) import Control.Tracer (Tracer) import Hydra.Logging (traceWith) -import Hydra.Logging.Messages (HydraLog (..)) -import Hydra.Network (Host (..), IP, NetworkCallback (..), NetworkComponent, NodeId, PortNumber) +import Hydra.Logging.Messages (HydraLog) +import Hydra.Logging.Messages qualified as Log +import Hydra.Network (Host (..), IP, Network (..), NetworkCallback (..), NetworkComponent, NodeId, PortNumber) import Hydra.Network.Authenticate (Authenticated (..), Signed, withAuthentication) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) import Hydra.Network.Message ( @@ -132,14 +133,14 @@ withNetwork tracer configuration callback action = do let localHost = Host{hostname = show host, port} me = deriveParty signingKey numberOfParties = length $ me : otherParties - messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties + messagePersistence <- configureMessagePersistence (contramap Log.Node tracer) persistenceDir numberOfParties let reliability = withFlipHeartbeats $ - withReliability (contramap Reliability tracer) messagePersistence me otherParties $ - withAuthentication (contramap Authentication tracer) signingKey otherParties $ + withReliability (contramap Log.Reliability tracer) messagePersistence me otherParties $ + withAuthentication (contramap Log.Authentication tracer) signingKey otherParties $ withOuroborosNetwork - (contramap Network tracer) + (contramap Log.Network tracer) HydraNetworkConfig { protocolVersion = currentHydraVersionedProtocol , localHost @@ -149,8 +150,13 @@ withNetwork tracer configuration callback action = do deliver . ConnectivityEvent $ HandshakeFailure{remoteHost, ourVersion, theirVersions} ) - withHeartbeat nodeId reliability (NetworkCallback{deliver = deliver . mapHeartbeat}) $ \network -> - action network + withHeartbeat nodeId reliability (NetworkCallback{deliver = deliver . mapHeartbeat}) $ \Network{broadcast} -> + action + Network + { broadcast = \msg -> do + broadcast msg + deliver (ReceivedMessage{sender = deriveParty signingKey, msg}) + } where NetworkCallback{deliver} = callback