Skip to content

Commit

Permalink
Fix hPutBuf: resource vanished (Broken pipe)) when starting. Works no…
Browse files Browse the repository at this point in the history
…w, but only accepts a single client connection
  • Loading branch information
koslambrou committed Sep 25, 2023
1 parent c7f9cb2 commit a566720
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 51 deletions.
41 changes: 26 additions & 15 deletions cardano-node/app/reward-history.hs
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
import Cardano.Node.LedgerEvent (foldEvent, filterRewards, parseStakeCredential)
import Cardano.Node.LedgerEvent (parseStakeCredential, foldEvent, filterRewards)
import System.Environment (getArgs)
import System.IO (stdin, IOMode(ReadMode))
import Text.Pretty.Simple (pPrint)
import System.IO (IOMode(ReadMode))
import Network.Socket
import Control.Exception (bracket, bracketOnError)
import Control.Monad (void)

-- Usage: rewards-history <<<stdin LEDGER-EVENTS] <STAKE-ADDRESS>
-- Usage: reward-history <<<stdin LEDGER-EVENTS] <STAKE-ADDRESS>
--
-- Example:
--
-- cat ledger_events.cbor | rewards-history "e04cf4f01890215bd181d1fcd3c9589a2a4a3adbcff1a70b748080fa82"
main :: IO ()
main = do
stakeCredential <- getArgs >>= expectStakeCredential . head
addrInfo <- resolve
putStrLn $ "connecting to " <> show addrInfo
sock <- openSocket addrInfo
connect sock $ addrAddress addrInfo
h <- socketToHandle sock ReadMode

history <- foldEvent (\st -> pure . filterRewards stakeCredential st) mempty h
pPrint history
print $ "Got stake credential: " ++ show stakeCredential

runTCPClient "localhost" "9999" $ \sock -> do
h <- socketToHandle sock ReadMode

putStrLn "Getting reward history..."
void $ foldEvent h mempty $ \st e -> let r = filterRewards stakeCredential st e in print r >> pure r
-- foldEvent h () $ \() e -> print e
where
resolve = do
let hints = defaultHints { addrSocketType = Stream, addrFamily = AF_INET }
head <$> getAddrInfo (Just hints) (Just "localhost") (Just "9999")
expectStakeCredential =
maybe (error "invalid / missing stake address as 1st argument") return
.
parseStakeCredential

runTCPClient :: HostName -> ServiceName -> (Socket -> IO a) -> IO a
runTCPClient host port client = withSocketsDo $ do
addrInfo <- resolve
putStrLn $ "Connecting to " <> show addrInfo
bracket (open addrInfo) close client
where
resolve = do
let hints = defaultHints { addrSocketType = Stream, addrFamily = AF_INET }
head <$> getAddrInfo (Just hints) (Just host) (Just port)
open addr = bracketOnError (openSocket addr) close $ \sock -> do
connect sock $ addrAddress addr
return sock
69 changes: 36 additions & 33 deletions cardano-node/src/Cardano/Node/LedgerEvent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
Expand All @@ -33,7 +31,7 @@ module Cardano.Node.LedgerEvent (
, foldEvent
, filterRewards
, parseStakeCredential
, streamingLedgerEvents
, withLedgerEventsServerStream
) where

import Cardano.Prelude hiding (All, Sum)
Expand Down Expand Up @@ -281,9 +279,10 @@ toLedgerEventShelley evt =
Just $ LedgerMirDist fromReserve fromTreasury deltaReserve deltaTreasury
NoMirTransfer{} -> -- FIXME: create an event for this
Nothing
ShelleyLedgerEventTICK (TickNewEpochEvent (Shelley.EpochEvent poolReap)) ->
let PoolReapEvent (RetiredPools refunded unclaimed epoch) = poolReap
in Just $ LedgerPoolReaping epoch refunded unclaimed
ShelleyLedgerEventTICK (TickNewEpochEvent (Shelley.EpochEvent _poolReap)) ->
-- let PoolReapEvent (RetiredPools refunded unclaimed epoch) = poolReap
-- in Just $ LedgerPoolReaping epoch refunded unclaimed
Just LedgerTick
ShelleyLedgerEventBBODY {} ->
Just LedgerBody
ShelleyLedgerEventTICK {} ->
Expand Down Expand Up @@ -354,17 +353,17 @@ deserializeEvent bytes = do

-- IO action to read ledger events in binary form
foldEvent
:: (a -> AnchoredEvent -> IO a)
:: Handle
-> a
-> Handle
-> (a -> AnchoredEvent -> IO a)
-> IO a
foldEvent fn st0 h =
foldEvent h st0 fn =
LBS.hGetContents h >>= go st0
where
go st bytes = do
eof <- hIsEOF h
if eof then
return st
pure st
else do
(rest, version :: Version) <- unsafeDeserialiseFromBytes
fromCBOR
Expand Down Expand Up @@ -400,31 +399,35 @@ filterRewards credential st = \case
where
mergeRewards = Set.foldr (<>) mempty . Set.map Ledger.rewardAmount

-- FIXME: inferred is horrible... but it does not work with a naive type declaration
streamingLedgerEvents
withLedgerEventsServerStream
:: PortNumber
-> (LedgerEventHandler IO (ExtLedgerState (HardForkBlock (CardanoEras StandardCrypto))) -> IO ())
-> IO ()
streamingLedgerEvents port handler =
withSocketsDo $
bracket open closeSockets go
withLedgerEventsServerStream port handler = do
withSocketsDo $ do
bracket open closeSockets go

where
go (s,_) = do
h <- socketToHandle s WriteMode
let ledgerEventHandler = LedgerEventHandler $ \headerHash slotNo event -> do
maybe
(pure ())
(\ e -> BS.hPut h $
serializeEvent (eventCodecVersion event) (AnchoredEvent (getOneEraHash headerHash) slotNo e))
(fromAuxLedgerEvent event)
handler ledgerEventHandler

open = do
sock <- socket AF_INET Stream defaultProtocol
bind sock (SockAddrInet port 0)
listen sock 1
(clientSock, _) <- accept sock
pure (sock, clientSock)

closeSockets (s,s') = close s >> close s'
go s = do
h <- socketToHandle s WriteMode
handler $ LedgerEventHandler $ writeLedgerEvents h

open = do
sock <- socket AF_INET Stream defaultProtocol
bind sock (SockAddrInet port 0)
listen sock 1
putStrLn ("Waiting for client to connect to socket..." :: String)
(clientSock, _) <- accept sock
pure clientSock

closeSockets = close

writeLedgerEvents h headerHash slotNo event = do
case fromAuxLedgerEvent event of
Nothing -> pure ()
Just e -> do
let serializedEvent =
serializeEvent
(eventCodecVersion event)
(AnchoredEvent (getOneEraHash headerHash) slotNo e)
BS.hPut h serializedEvent
6 changes: 3 additions & 3 deletions cardano-node/src/Cardano/Node/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ import Cardano.Node.Configuration.NodeAddress
import Cardano.Node.Configuration.POM (NodeConfiguration (..),
PartialNodeConfiguration (..), SomeNetworkP2PMode (..),
defaultPartialNodeConfiguration, makeNodeConfiguration, parseNodeConfigurationFP)
import Cardano.Node.LedgerEvent
import Cardano.Node.LedgerEvent (withLedgerEventsServerStream)
import Cardano.Node.Startup
import Cardano.Node.Tracing.API
import Cardano.Node.Tracing.StateRep (NodeState (NodeKernelOnline))
Expand Down Expand Up @@ -173,12 +173,12 @@ runNode cmdPc = do
let ProtocolInfo { pInfoConfig } = Api.protocolInfo runP
in getNetworkMagic $ Consensus.configBlock pInfoConfig

streamingLedgerEvents 9999 $ \ ledgerHandler ->
withLedgerEventsServerStream 9999 $ \ ledgerEventHandler ->
case p of
SomeConsensusProtocol blk runP ->
handleNodeWithTracers
(case blk of
Api.CardanoBlockType -> ledgerHandler
Api.CardanoBlockType -> ledgerEventHandler
Api.ByronBlockType{} -> discardEvent
Api.ShelleyBlockType{} -> discardEvent
)
Expand Down

0 comments on commit a566720

Please sign in to comment.