Skip to content

Commit

Permalink
Updated cardano-client-demo & benchmarking code
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Aug 13, 2024
1 parent d0d77db commit 088a505
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import Cardano.Tracing.OrphanInstances.Consensus ()
import Cardano.Tracing.OrphanInstances.Network ()
import Cardano.Tracing.OrphanInstances.Shelley ()

import Ouroboros.Network.Protocol.TxSubmission2.Type (TokBlockingStyle (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (SingBlockingStyle (..))

import Cardano.Api hiding (Active)
import Cardano.TxGenerator.Types (TPSRate, TxGenError)
Expand Down Expand Up @@ -124,11 +124,11 @@ mkSubmissionSummary startTime reportsRefs
txStreamSource :: forall era. MVar (StreamState (TxStream IO era)) -> TpsThrottle -> TxSource era
txStreamSource streamRef tpsThrottle = Active worker
where
worker :: forall m blocking . MonadIO m => TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker :: forall m blocking . MonadIO m => SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker blocking req = do
(done, txCount) <- case blocking of
TokBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
TokNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
SingBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
SingNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
txList <- liftIO $ unFold txCount
case done of
Stop -> return (Exhausted, txList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ import qualified Ouroboros.Consensus.Shelley.Ledger.Mempool as Mempool (TxId (Sh
import qualified Ouroboros.Consensus.Cardano.Block as Block
(TxId (GenTxIdAllegra, GenTxIdAlonzo, GenTxIdBabbage, GenTxIdConway, GenTxIdMary, GenTxIdShelley))

import Ouroboros.Network.Protocol.TxSubmission2.Type

Check warning on line 53 in bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs

View workflow job for this annotation

GitHub Actions / build

Warning in module Cardano.Benchmarking.GeneratorTx.SubmissionClient: Use fewer imports ▫︎ Found: "import Ouroboros.Network.Protocol.TxSubmission2.Type\n ( NumTxIdsToAck(..), NumTxIdsToReq(..) )\nimport Ouroboros.Network.Protocol.TxSubmission2.Type\n ( BlockingReplyList(..), SingBlockingStyle(..), TxSizeInBytes )\n" ▫︎ Perhaps: "import Ouroboros.Network.Protocol.TxSubmission2.Type\n ( NumTxIdsToAck(..),\n NumTxIdsToReq(..),\n BlockingReplyList(..),\n SingBlockingStyle(..),\n TxSizeInBytes )\n"

Check warning on line 53 in bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs

View workflow job for this annotation

GitHub Actions / build

Warning in module Cardano.Benchmarking.GeneratorTx.SubmissionClient: Use fewer imports ▫︎ Found: "import Ouroboros.Network.Protocol.TxSubmission2.Type\n ( NumTxIdsToAck(..), NumTxIdsToReq(..) )\nimport Ouroboros.Network.Protocol.TxSubmission2.Type\n ( BlockingReplyList(..), SingBlockingStyle(..), TxSizeInBytes )\n" ▫︎ Perhaps: "import Ouroboros.Network.Protocol.TxSubmission2.Type\n ( NumTxIdsToAck(..),\n NumTxIdsToReq(..),\n BlockingReplyList(..),\n SingBlockingStyle(..),\n TxSizeInBytes )\n"
(NumTxIdsToAck (..), NumTxIdsToReq (..))
import Ouroboros.Network.Protocol.TxSubmission2.Client (ClientStIdle (..),
ClientStTxIds (..), ClientStTxs (..), TxSubmissionClient (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (BlockingReplyList (..),
TokBlockingStyle (..), TxSizeInBytes)
SingBlockingStyle (..), TxSizeInBytes)

import Cardano.Api hiding (Active)
import Cardano.Api.Shelley (fromShelleyTxId, toConsensusGenTx)
Expand All @@ -75,14 +77,14 @@ data TxSource era
= Exhausted
| Active (ProduceNextTxs era)

type ProduceNextTxs era = (forall m blocking . MonadIO m => TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))
type ProduceNextTxs era = (forall m blocking . MonadIO m => SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))

produceNextTxs :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs blocking req (txProducer, unack, stats) = do
(newTxProducer, txList) <- produceNextTxs' blocking req txProducer
return ((newTxProducer, unack, stats), txList)

produceNextTxs' :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' _ _ Exhausted = return (Exhausted, [])
produceNextTxs' blocking req (Active callback) = callback blocking req

Expand All @@ -104,10 +106,10 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
TxSubmissionClient $
pure $ client (initialTxSource, UnAcked [], SubmissionThreadStats 0 0 0)
where
discardAcknowledged :: TokBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged :: SingBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged blocking (Ack ack) (txSource, UnAcked unAcked, stats) = do
when (tokIsBlocking blocking && ack /= length unAcked) $ do
let err = "decideAnnouncement: TokBlocking, but length unAcked != ack"
let err = "decideAnnouncement: SingBlocking, but length unAcked != ack"
traceWith bmtr (TraceBenchTxSubError err)
fail (T.unpack err)
let (stillUnacked, acked) = L.splitAtEnd ack unAcked
Expand All @@ -128,9 +130,9 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =

requestTxIds :: forall blocking.
LocalState era
-> TokBlockingStyle blocking
-> Word16
-> Word16
-> SingBlockingStyle blocking
-> NumTxIdsToAck
-> NumTxIdsToReq
-> m (ClientStTxIds blocking (GenTxId CardanoBlock) (GenTx CardanoBlock) m ())
requestTxIds state blocking ackNum reqNum = do
let ack = Ack $ fromIntegral ackNum
Expand All @@ -145,15 +147,15 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> outs)

case blocking of
TokBlocking -> case NE.nonEmpty newTxs of
SingBlocking -> case NE.nonEmpty newTxs of
Nothing -> do
traceWith tr EndOfProtocol
endOfProtocolCallback stats
pure $ SendMsgDone ()
(Just txs) -> pure $ SendMsgReplyTxIds
(BlockingReply $ txToIdSize <$> txs)
(client stateC)
TokNonBlocking -> pure $ SendMsgReplyTxIds
SingNonBlocking -> pure $ SendMsgReplyTxIds
(NonBlockingReply $ txToIdSize <$> newTxs)
(client stateC)

Expand Down Expand Up @@ -198,17 +200,17 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
fromGenTxId (Block.GenTxIdConway (Mempool.ShelleyTxId i)) = fromShelleyTxId i
fromGenTxId _ = error "TODO: fix incomplete match"

tokIsBlocking :: TokBlockingStyle a -> Bool
tokIsBlocking :: SingBlockingStyle a -> Bool
tokIsBlocking = \case
TokBlocking -> True
TokNonBlocking -> False
SingBlocking -> True
SingNonBlocking -> False

reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace :: Ack -> Req -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace ack req = \case
TokBlocking -> ReqIdsBlocking ack req
TokNonBlocking -> ReqIdsNonBlocking ack req
SingBlocking -> ReqIdsBlocking ack req
SingNonBlocking -> ReqIdsNonBlocking ack req

idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace :: ToAnnce tx -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace (ToAnnce toAnn) = \case
TokBlocking -> IdsListBlocking $ length toAnn
TokNonBlocking -> IdsListNonBlocking $ length toAnn
SingBlocking -> IdsListBlocking $ length toAnn
SingNonBlocking -> IdsListNonBlocking $ length toAnn

0 comments on commit 088a505

Please sign in to comment.