Skip to content

Commit

Permalink
Network interface: reliable broadcast semantics (#1624)
Browse files Browse the repository at this point in the history
Changes the `broadcast` semantics to send a message to everyone on the
Hydra network, *including* ourselves.

Makes `NetworkCallback` a `newtype` and names the function `deliver`.
This makes it consistent with literature and we can reason about the
properties of certain `NetworkComponent` implementations. For example:
We could require each `NetworkComponent` to bet "valid":

> Validity: If a correct process broadcasts a message m, then every
correct
process eventually delivers m

This PR however, just moves the delivery to ourselves from the `Node` to
the aggregate `Hydra.Node.Network` stack.

From the outside, the node behaves exactly as before.

---

* [x] CHANGELOG update not needed
* [x] Documentation update not needed
* [x] Haddocks updated
* [x] No new TODOs introduced
  • Loading branch information
ch1bo authored Sep 12, 2024
2 parents 2d5dce5 + 1e7123f commit beeb63e
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 332 deletions.
17 changes: 12 additions & 5 deletions hydra-node/src/Hydra/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
module Hydra.Network (
-- * Types
Network (..),
NetworkCallback (..),
NetworkComponent,
NetworkCallback,
IP,
Host (..),
NodeId (..),
Expand Down Expand Up @@ -41,17 +41,24 @@ deriving anyclass instance FromJSON IP

-- * Hydra network interface

-- | Handle to interface with the hydra network and send messages "off chain".
-- | Interface from the application to the network layer.
newtype Network m msg = Network
{ broadcast :: msg -> m ()
-- ^ Send a `msg` to the whole hydra network.
-- ^ Send a `msg` to the whole configured hydra network including ourselves.
}

instance Contravariant (Network m) where
contramap f (Network bcast) = Network $ \msg -> bcast (f msg)

-- | Handle to interface for inbound messages.
type NetworkCallback msg m = msg -> m ()
-- | Interface from network layer to the application.
-- XXX: Reliably delivering a message in the crash-recovery fault model is
-- tricky. According to "Introduction to Reliable and Secure Distributed
-- Programming" section "2.2.4 Crashes with recoveries" explains that storing to
-- stable storage and just pointing to stored events is a better way.
newtype NetworkCallback msg m = NetworkCallback
{ deliver :: msg -> m ()
-- ^ The given `msg` was received from the network.
}

-- | A type tying both inbound and outbound messages sending in a single /Component/.
--
Expand Down
8 changes: 4 additions & 4 deletions hydra-node/src/Hydra/Network/Authenticate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Control.Tracer (Tracer)
import Data.Aeson (Options (tagSingleConstructors), defaultOptions, genericToJSON)
import Data.Aeson qualified as Aeson
import Hydra.Logging (traceWith)
import Hydra.Network (Network (Network, broadcast), NetworkComponent)
import Hydra.Network (Network (Network, broadcast), NetworkCallback (..), NetworkComponent)
import Hydra.Prelude
import Hydra.Tx (Party (Party, vkey), deriveParty)
import Hydra.Tx.Crypto (HydraKey, Key (SigningKey), Signature, sign, verify)
Expand Down Expand Up @@ -64,12 +64,12 @@ withAuthentication ::
NetworkComponent m (Signed inbound) (Signed outbound) a ->
-- The node internal authenticated network.
NetworkComponent m (Authenticated inbound) outbound a
withAuthentication tracer signingKey parties withRawNetwork callback action = do
withRawNetwork checkSignature authenticate
withAuthentication tracer signingKey parties withRawNetwork NetworkCallback{deliver} action = do
withRawNetwork NetworkCallback{deliver = checkSignature} authenticate
where
checkSignature (Signed msg sig party@Party{vkey = partyVkey}) =
if verify partyVkey sig msg && elem party parties
then callback $ Authenticated msg party
then deliver $ Authenticated msg party
else traceWith tracer (mkAuthLog msg sig party)

me = deriveParty signingKey
Expand Down
19 changes: 13 additions & 6 deletions hydra-node/src/Hydra/Network/Heartbeat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO, writeTVar)
import Data.Map qualified as Map
import Data.Set qualified as Set
import Hydra.Network (Network (..), NetworkCallback, NetworkComponent, NodeId)
import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent, NodeId)
import Hydra.Network.Message (Connectivity (Connected, Disconnected))

data HeartbeatState = HeartbeatState
Expand Down Expand Up @@ -100,21 +100,28 @@ withHeartbeat nodeId withNetwork callback action = do
withAsync (checkHeartbeatState nodeId lastSent network) $ \_ ->
action (updateStateFromOutgoingMessages nodeId lastSent network)
where
onConnectivityChanged = callback . Left
NetworkCallback{deliver} = callback

onConnectivityChanged = deliver . Left

updateStateFromIncomingMessages ::
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState ->
NetworkCallback (Either Connectivity inbound) m ->
NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages heartbeatState callback = \case
Data nodeId msg -> notifyAlive nodeId >> callback (Right msg)
Ping nodeId -> notifyAlive nodeId
updateStateFromIncomingMessages heartbeatState callback =
NetworkCallback
{ deliver = \case
Data nodeId msg -> notifyAlive nodeId >> deliver (Right msg)
Ping nodeId -> notifyAlive nodeId
}
where
NetworkCallback{deliver} = callback

notifyAlive peer = do
now <- getMonotonicTime
aliveSet <- alive <$> readTVarIO heartbeatState
unless (peer `Map.member` aliveSet) $ callback (Left $ Connected peer)
unless (peer `Map.member` aliveSet) $ deliver (Left $ Connected peer)
atomically $
modifyTVar' heartbeatState $ \s ->
s
Expand Down
Loading

0 comments on commit beeb63e

Please sign in to comment.