diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 4d49ee9496b..b66ef84517d 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -71,7 +71,11 @@ steps: - label: "Babbage integration tests (linux)" key: linux-tests-integration-babbage depends_on: linux-nix - command: nix shell 'nixpkgs#just' -c just babbage-integration-tests + command: | + mkdir integration-test-dir + export INTEGRATION_TEST_DIR=integration-test-dir + nix shell 'nixpkgs#just' -c just babbage-integration-tests + artifact_paths: [ "./integration-test-dir/**" ] agents: system: ${linux} env: @@ -80,7 +84,11 @@ steps: - label: "Conway integration tests (linux)" key: linux-tests-integration-conway depends_on: linux-nix - command: nix shell 'nixpkgs#just' -c just conway-integration-tests + command: | + mkdir integration-test-dir + export INTEGRATION_TEST_DIR=integration-test-dir + nix shell 'nixpkgs#just' -c just conway-integration-tests + artifact_paths: [ "./integration-test-dir/**" ] agents: system: ${linux} env: diff --git a/lib/integration/framework/Test/Integration/Framework/Setup.hs b/lib/integration/framework/Test/Integration/Framework/Setup.hs index 915362b93c6..95d97b28978 100644 --- a/lib/integration/framework/Test/Integration/Framework/Setup.hs +++ b/lib/integration/framework/Test/Integration/Framework/Setup.hs @@ -86,6 +86,7 @@ import Cardano.Wallet.Launch.Cluster.Env import Cardano.Wallet.Launch.Cluster.FileOf ( DirOf (..) , FileOf (..) + , absolutize , mkRelDirOf , toFilePath ) @@ -176,7 +177,8 @@ import System.Directory ( createDirectory ) import System.Environment - ( setEnv + ( lookupEnv + , setEnv ) import System.Environment.Extended ( envFromText @@ -193,8 +195,8 @@ import System.IO.Temp.Extra , withSystemTempDir ) import System.Path - ( absDir - , absFile + ( absFile + , absRel , relDir , relFile , () @@ -241,7 +243,9 @@ import qualified Data.Text as T -- | Do all the program setup required for integration tests, create a temporary -- directory, and pass this info to the main hspec action. -withTestsSetup :: (DirOf "cluster" -> (Tracer IO TestsLog, Tracers IO) -> IO a) -> IO a +withTestsSetup + :: (DirOf "cluster" -> (Tracer IO TestsLog, Tracers IO) -> IO a) + -> IO a withTestsSetup action = do -- Handle SIGTERM properly installSignalHandlersNoLogging @@ -255,13 +259,18 @@ withTestsSetup action = do -- Flush test output as soon as a line is printed. -- Set UTF-8, regardless of user locale. withUtf8 - $ - -- This temporary directory will contain logs, and all other data - -- produced by the integration tests. - withSystemTempDir stdoutTextTracer "test" skipCleanup - $ \testDir -> do - let clusterDir = DirOf $ absDir testDir - withTracers clusterDir $ action clusterDir + $ do + mEnv <- lookupEnv "INTEGRATION_TEST_DIR" + -- This temporary directory will contain logs, and all other data + -- produced by the integration tests. + let run fp = do + fpa <- absolutize $ absRel fp + let testDir = DirOf fpa + withTracers testDir $ action testDir + case mEnv of + Just env -> run env + Nothing -> withSystemTempDir + stdoutTextTracer "test" skipCleanup run mkFaucetFunds :: TestnetMagic -> FaucetM FaucetFunds mkFaucetFunds testnetMagic = do diff --git a/lib/iohk-monitoring-extra/src/Cardano/BM/ToTextTracer.hs b/lib/iohk-monitoring-extra/src/Cardano/BM/ToTextTracer.hs index e46c661dbab..87157aa7fb2 100644 --- a/lib/iohk-monitoring-extra/src/Cardano/BM/ToTextTracer.hs +++ b/lib/iohk-monitoring-extra/src/Cardano/BM/ToTextTracer.hs @@ -71,6 +71,7 @@ newToTextTracer newToTextTracer clusterLogsFile minSeverity = runContT $ do ch <- newTChanIO h <- ContT $ withFile clusterLogsFile WriteMode + hSetBuffering h NoBuffering liftIO $ hSetBuffering h NoBuffering liftIO $ async >=> link $ forever $ do (x, s, t) <- atomically $ readTChan ch diff --git a/lib/launcher/src/Cardano/Launcher.hs b/lib/launcher/src/Cardano/Launcher.hs index 863c341c20d..50eb40fb336 100644 --- a/lib/launcher/src/Cardano/Launcher.hs +++ b/lib/launcher/src/Cardano/Launcher.hs @@ -407,7 +407,7 @@ instance HasPrivacyAnnotation WaitForProcessLog instance HasSeverityAnnotation WaitForProcessLog where getSeverityAnnotation = \case MsgWaitBefore -> Debug - MsgWaitAfter _ -> Debug + MsgWaitAfter _ -> Warning MsgWaitCancelled -> Debug instance ToText ProcessHasExited where diff --git a/lib/local-cluster/exe/local-cluster.hs b/lib/local-cluster/exe/local-cluster.hs index f3fa768ecac..877a46f7152 100644 --- a/lib/local-cluster/exe/local-cluster.hs +++ b/lib/local-cluster/exe/local-cluster.hs @@ -17,6 +17,7 @@ import Cardano.Startup ) import Cardano.Wallet.Launch.Cluster ( Config (..) + , runningNodeSocketPath ) import Cardano.Wallet.Launch.Cluster.CommandLine ( CommandLineOptions (..) @@ -32,7 +33,8 @@ import Cardano.Wallet.Launch.Cluster.FileOf , toFilePath ) import Cardano.Wallet.Launch.Cluster.Http.Faucet.Server - ( newNodeConnVar + ( NodeConnVar (setNodeConn) + , newNodeConnVar ) import Cardano.Wallet.Launch.Cluster.Http.Service ( withServiceServer @@ -283,15 +285,16 @@ main = withUtf8 $ do } debug "Starting the monitoring server" - (_, phaseTracer) <- withSNetworkId (NTestnet 42) + (nodeConn, phaseTracer) <- withSNetworkId (NTestnet 42) $ \network -> do nodeConn <- liftIO newNodeConnVar - withServiceServer + (_ , phaseTracer) <- withServiceServer network nodeConn clusterCfg tracer httpService + pure (nodeConn, phaseTracer) debug "Starting the faucet" @@ -301,6 +304,7 @@ main = withUtf8 $ do debug "Starting the cluster" node <- ContT $ Cluster.withCluster clusterCfg faucetFunds + liftIO $ setNodeConn nodeConn $ runningNodeSocketPath node debug "Starting the relay node" nodeSocket <- case parse . nodeSocketFile diff --git a/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Faucet/Client.hs b/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Faucet/Client.hs index 54d503965a5..56ad4c96cea 100644 --- a/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Faucet/Client.hs +++ b/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Faucet/Client.hs @@ -25,6 +25,9 @@ import Cardano.Wallet.Launch.Cluster.Http.Faucet.SendFaucetAssets ( SendFaucetAssets , WithNetwork (..) ) +import Cardano.Wallet.Launch.Cluster.Http.Monitor.Client + ( recovering + ) import Cardano.Wallet.Primitive.NetworkId ( HasSNetworkId , SNetworkId @@ -54,6 +57,8 @@ import Servant.Client ) import UnliftIO ( MonadUnliftIO + , UnliftIO (..) + , askUnliftIO ) -- | Queries that can be run against the local cluster @@ -79,7 +84,7 @@ mkFaucet _ = } newtype MsgFaucetClient = MsgFaucetRequest AnyFaucetQ - deriving stock Show + deriving stock (Show) instance ToText MsgFaucetClient where toText (MsgFaucetRequest q) = "Faucet request: " <> toText (show q) @@ -95,12 +100,18 @@ newFaucetQ -> Tracer m MsgFaucetClient -> Faucet n -> m (RunFaucetQ m) -newFaucetQ query tr Faucet{..} = pure - $ RunFaucetQ - $ \request -> do - traceWith tr (MsgFaucetRequest $ AnyFaucetQ request) - case request of - SendFaucetAssetsQ assets -> - liftIO - $ query - $ sendFaucetAssets (WithNetwork assets) $> () +newFaucetQ query tr Faucet{..} = do + UnliftIO unlift <- askUnliftIO + pure + $ RunFaucetQ + $ \request -> do + let f = + unlift + . traceWith tr + . MsgFaucetRequest + $ AnyFaucetQ request + liftIO $ recovering f $ case request of + SendFaucetAssetsQ assets -> + liftIO + $ query + $ sendFaucetAssets (WithNetwork assets) $> () diff --git a/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Monitor/Client.hs b/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Monitor/Client.hs index 8aab0564e19..38a2b5e0b83 100644 --- a/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Monitor/Client.hs +++ b/lib/local-cluster/lib/Cardano/Wallet/Launch/Cluster/Http/Monitor/Client.hs @@ -16,6 +16,7 @@ module Cardano.Wallet.Launch.Cluster.Http.Monitor.Client , AnyMonitorQ (..) , newRunQuery , mkMonitorClient + , recovering ) where @@ -129,25 +130,24 @@ newRunQuery query tr MonitorClient{ready, observe, step, switch} = UnliftIO unlift <- askUnliftIO pure $ RunQuery $ \request -> do traceWith tr $ MsgMonitorClientReq $ AnyQuery request - let recovering :: forall a. IO a -> IO a - recovering doing = recoverAll retryPolicy - $ \rt -> do - unless (firstTry rt) - $ unlift - $ traceWith tr . MsgMonitorClientRetry - $ AnyQuery request - doing - liftIO $ recovering $ case request of + let f = unlift + . traceWith tr . MsgMonitorClientRetry + $ AnyQuery request + liftIO $ recovering f $ case request of ReadyQ -> query ready ObserveQ -> unApiT <$> query observe StepQ -> query step $> () SwitchQ -> unApiT <$> query switch -retryPolicy :: RetryPolicyM IO -retryPolicy = capDelay (60 * oneSecond) $ exponentialBackoff oneSecond - where - oneSecond = 1_000_000 :: Int - -firstTry :: RetryStatus -> Bool -firstTry (RetryStatus 0 _ _) = True -firstTry _ = False +recovering :: IO () -> IO a -> IO a +recovering f doing = recoverAll retryPolicy + $ \rt -> do + unless (firstTry rt) f + doing + where + retryPolicy :: RetryPolicyM IO + retryPolicy = capDelay (60 * oneSecond) $ exponentialBackoff oneSecond + oneSecond = 1_000_000 :: Int + firstTry :: RetryStatus -> Bool + firstTry (RetryStatus 0 _ _) = True + firstTry _ = False diff --git a/lib/local-cluster/local-cluster.cabal b/lib/local-cluster/local-cluster.cabal index 62d3dc36198..f6ab75a25b0 100644 --- a/lib/local-cluster/local-cluster.cabal +++ b/lib/local-cluster/local-cluster.cabal @@ -202,12 +202,21 @@ common test-common , aeson-qq , base , bytestring + , cardano-addresses + , cardano-binary + , cardano-ledger-core + , cardano-ledger-alonzo + , cardano-ledger-babbage + , cardano-ledger-byron + , cardano-ledger-mary , cardano-ledger-shelley , cardano-wallet-application-extras , cardano-wallet-launcher , cardano-wallet-network-layer , cardano-wallet-primitive + , cardano-wallet-read , cardano-wallet-test-utils + , containers , contra-tracer , extra , filepath @@ -218,10 +227,12 @@ common test-common , local-cluster , mtl , openapi3 + , ouroboros-consensus-cardano , ouroboros-network , pathtype , process , QuickCheck + , streaming , time , unliftio , with-utf8 @@ -229,7 +240,6 @@ common test-common build-tool-depends: , hspec-discover:hspec-discover - , local-cluster:local-cluster -- until cabal has no support for multi home, hls requires to have only one home -- for the other modules , so we cannot use the common test-common for those @@ -249,6 +259,9 @@ test-suite test-local-cluster Paths_local_cluster Spec SpecHook + build-tool-depends: + , local-cluster:local-cluster + , cardano-wallet-api:cardano-wallet executable test-local-cluster-exe import: test-common diff --git a/lib/local-cluster/test/unit/Cardano/Wallet/Launch/Cluster/Http/ServiceSpec.hs b/lib/local-cluster/test/unit/Cardano/Wallet/Launch/Cluster/Http/ServiceSpec.hs index b9a009728d0..46e7cec9fde 100644 --- a/lib/local-cluster/test/unit/Cardano/Wallet/Launch/Cluster/Http/ServiceSpec.hs +++ b/lib/local-cluster/test/unit/Cardano/Wallet/Launch/Cluster/Http/ServiceSpec.hs @@ -1,5 +1,12 @@ +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeOperators #-} {-# OPTIONS_GHC -Wno-unrecognised-pragmas #-} {-# HLINT ignore "Evaluate" #-} @@ -11,16 +18,32 @@ where import Prelude +import Cardano.Binary + ( serialize' + ) import Cardano.BM.ToTextTracer ( ToTextTracer (..) , newToTextTracer ) +import Cardano.Chain.Common + ( unsafeGetLovelace + ) import Cardano.Launcher ( Command (..) , IfToSendSigINT (..) , TimeoutInSecs (..) , withBackendProcess ) +import Cardano.Ledger.Coin + ( Coin (..) + ) +import Cardano.Ledger.Mary.Value + ( MaryValue (..) + ) +import Cardano.Wallet.Faucet.Gen.Address + ( NetworkTag (..) + , genAddress + ) import Cardano.Wallet.Launch.Cluster ( FaucetFunds (FaucetFunds) , FileOf (..) @@ -30,7 +53,11 @@ import Cardano.Wallet.Launch.Cluster.Faucet.Serialize ( saveFunds ) import Cardano.Wallet.Launch.Cluster.Http.Faucet.Client - ( RunFaucetQ + ( FaucetQ (SendFaucetAssetsQ) + , RunFaucetQ (RunFaucetQ) + ) +import Cardano.Wallet.Launch.Cluster.Http.Faucet.SendFaucetAssets + ( SendFaucetAssets (SendFaucetAssets) ) import Cardano.Wallet.Launch.Cluster.Http.Monitor.Client ( MonitorQ (..) @@ -58,10 +85,21 @@ import Cardano.Wallet.Network.Ports ( PortNumber , getRandomPort ) +import Cardano.Wallet.Network.Rollback.One + ( oneHistory + ) +import Cardano.Wallet.Network.Streaming + ( ChainStream + , eraBlockS + , eraTxS + , forChainStream + , forConsensusS + , newTMVarBuffer + , scanChainStream + , withStreamingFromBlockChain + ) import Cardano.Wallet.Primitive.Ledger.Shelley - ( CardanoBlock - , StandardCrypto - , fromGenesisData + ( fromGenesisData ) import Cardano.Wallet.Primitive.NetworkId ( NetworkId (..) @@ -71,8 +109,35 @@ import Cardano.Wallet.Primitive.NetworkId import Cardano.Wallet.Primitive.SyncProgress ( SyncTolerance (SyncTolerance) ) +import Cardano.Wallet.Primitive.Types.Address + ( Address (..) + ) +import Cardano.Wallet.Primitive.Types.TokenBundle + ( TokenBundle (..) + ) +import Cardano.Wallet.Read + ( Era (..) + , EraValue + , IsEra (..) + , K (..) + , Tx + , applyEraFunValue + , extractEraValue + , (:*:) (..) + ) +import Cardano.Wallet.Read.Tx.Outputs + ( Outputs (..) + , getEraOutputs + ) +import Control.Exception + ( SomeException (..) + , catch + , throwIO + ) import Control.Monad ( forM_ + , join + , replicateM , unless ) import Control.Monad.Cont @@ -82,9 +147,6 @@ import Control.Monad.Cont import Control.Monad.Fix ( fix ) -import Control.Monad.IO.Class - ( liftIO - ) import Control.Monitoring.Tracing ( MonitorState (..) ) @@ -93,19 +155,51 @@ import Control.Tracer , nullTracer , traceWith ) +import Data.ByteString + ( ByteString + ) +import Data.Foldable + ( toList + ) +import Data.Map.Strict + ( Map + ) +import Data.Set + ( Set + ) +import Ouroboros.Consensus.Cardano.Block + ( CardanoBlock + , StandardAllegra + , StandardAlonzo + , StandardBabbage + , StandardConway + , StandardCrypto + , StandardMary + , StandardShelley + ) +import Streaming + ( MonadIO (liftIO) + , Of + , Stream + ) import System.Environment ( lookupEnv ) import System.FilePath - ( (<.>) + ( takeDirectory + , (<.>) , () ) import System.IO - ( IOMode (..) - , withFile + ( Handle + , IOMode (..) + , hClose + , hSetBuffering + , openFile ) import System.IO.Extra - ( withTempFile + ( BufferMode (..) + , withTempFile ) import System.Path ( absFile @@ -118,9 +212,14 @@ import Test.Hspec , describe , it , shouldBe + , shouldNotBe + ) +import Test.QuickCheck + ( generate ) import UnliftIO.Async ( async + , race , wait ) import UnliftIO.Concurrent @@ -130,7 +229,18 @@ import UnliftIO.Directory ( createDirectoryIfMissing ) +import qualified Cardano.Address as Addr +import qualified Cardano.Chain.UTxO as Byron +import qualified Cardano.Ledger.Address as SL +import qualified Cardano.Ledger.Alonzo.TxOut as Alonzo +import qualified Cardano.Ledger.Babbage.TxOut as Babbage +import qualified Cardano.Ledger.Shelley as SL +import qualified Cardano.Ledger.Shelley.API as SL import qualified Cardano.Wallet.Network.Implementation as NL +import qualified Cardano.Wallet.Primitive.Types.Coin as W +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import qualified Streaming.Prelude as S testService :: MonitorState @@ -147,6 +257,17 @@ testService w f = $ ServiceConfiguration Nothing w liftIO $ f tracer query +withFile :: FilePath -> IOMode -> (Handle -> IO a) -> IO a +withFile path mode action = do + createDirectoryIfMissing True (takeDirectory path) + h <- openFile path mode + hSetBuffering h NoBuffering + catch + (action h) + $ \(SomeException e) -> do + hClose h + throwIO e + localClusterCommand :: FilePath -- ^ filename to append to the logs dir @@ -290,7 +411,7 @@ spec = do traceWith tracer Pools traceWith tracer Relay traceWith tracer (Cluster Nothing) - threadDelay 10000 + threadDelay 10_000 (History phases, state) <- query ObserveQ snd <$> phases `shouldBe` [ RetrievingFunds @@ -329,7 +450,7 @@ spec = do liftIO $ do fix $ \loop -> do result <- query ReadyQ - unless result $ threadDelay 10000 >> loop + unless result $ threadDelay 1_000_000 >> loop describe "withNetwork" $ do it "can start and stop" $ evalContT $ do ((query, _), ToTextTracer tr) <- @@ -337,15 +458,82 @@ spec = do "withNetwork-can-start-and-stop" noFunds node <- liftIO $ waitForNode query - nl <- withNetwork tr node - tip <- liftIO $ currentNodeTip nl + network <- withNetwork tr node + tip <- liftIO $ currentNodeTip network tip `seq` pure () + it "can get the first block" $ evalContT $ do + ((query, _), ToTextTracer tr) <- + testServiceWithCluster + "withNetwork-can-get-collect-the-incoming-blocks" + noFunds + node <- liftIO $ waitForNode query + network <- withNetwork tr node + blocks <- withStreamingFromBlockChain network tr newTMVarBuffer + firstBlock <- + liftIO + $ S.head_ -- get the first element + $ elements blocks + liftIO $ join firstBlock `shouldNotBe` Nothing + it "can get the first non-empty balance" $ evalContT $ do + ((query, _), ToTextTracer tr) <- + testServiceWithCluster + "withNetwork-can-get-collect-the-incoming-blocks" + noFunds + node <- liftIO $ waitForNode query + network <- withNetwork tr node + blocks <- withStreamingFromBlockChain network tr newTMVarBuffer + firstBalance <- + liftIO + $ S.head_ + $ balance + $ outputs + $ eraTxS + $ eraBlockS + $ forConsensusS blocks + liftIO $ firstBalance `shouldNotBe` Nothing + describe "send faucet assets" $ do + it "can send assets to a node" $ evalContT $ do + ((query, RunFaucetQ faucet), ToTextTracer tr) <- + testServiceWithCluster + "send-faucet-assets-can-send-assets-to-a-node" + noFunds + node <- liftIO $ waitForNode query + addrs <- liftIO $ replicateM 2 $ generate $ genAddress [TestnetTag] + liftIO + $ faucet + $ SendFaucetAssetsQ + $ SendFaucetAssets + 1 + [ ( Address . Addr.unAddress $ addr + , (TokenBundle (W.Coin 1_234_567) mempty, mempty) + ) + | addr <- addrs + ] + network <- withNetwork tr node + blocks <- + withStreamingFromBlockChain network tr newTMVarBuffer + eBalances <- + liftIO + $ race (threadDelay 100_000_000) + $ waitForAddressesBalance + (Set.fromList $ Addr.unAddress <$> addrs) + $ balance + $ outputs + $ eraTxS + $ eraBlockS + $ forConsensusS blocks + liftIO + $ eBalances + `shouldBe` Right + ( Map.fromList + $ map (,1_234_567) (Addr.unAddress <$> addrs) + ) waitForNode :: RunMonitorQ IO -> IO RunningNode waitForNode (RunQuery query) = fix $ \loop -> do (history', _) <- query ObserveQ case getNode history' of - Nothing -> threadDelay 10000 >> loop + Nothing -> threadDelay 10_000 >> loop Just node -> pure node getNode :: History -> Maybe RunningNode @@ -354,3 +542,89 @@ getNode (History phases) = case phases of (_time, phase) : _ -> case phase of Cluster (Just node) -> Just node _ -> Nothing + +waitForAddressesBalance + :: Monad m + => Set ByteString + -> Stream (Of (Map ByteString Integer)) m () + -> m (Map ByteString Integer) +waitForAddressesBalance addrs s = do + mm <- S.head_ . S.dropWhile f $ s + pure $ case mm of + Nothing -> mempty + Just m -> Map.filterWithKey (\k _ -> k `Set.member` addrs) m + where + f m = not $ addrs `Set.isSubsetOf` Map.keysSet m + +data TxOut = TxOut + { address :: ByteString + , value :: Integer + } + deriving stock (Show, Eq) + +outputs + :: Monad m + => ChainStream (EraValue (ctx :*: Tx)) m r + -> ChainStream TxOut m r +outputs = forChainStream $ S.each . extractEraValue . applyEraFunValue f + where + f :: IsEra era => (ctx :*: Tx) era -> K [TxOut] era + f (_bh :*: tx) = txOutFromOutput $ getEraOutputs tx + +-- a bit of a hack to drop the first element of a stream +-- because we know it's a rollback and we're only interested in the +-- forward +elements + :: Monad m + => ChainStream a m r + -> Stream (Of (Maybe a)) m r +elements = S.drop 1 . scanChainStream (const Just) (oneHistory Nothing) + +balance + :: Monad m + => ChainStream TxOut m r + -> Stream (Of (Map ByteString Integer)) m r +balance = + scanChainStream + (\m (TxOut addr val) -> Map.insertWith (+) addr val m) + $ oneHistory Map.empty + +txOutFromOutput :: forall era. IsEra era => Outputs era -> K [TxOut] era +txOutFromOutput = case theEra @era of + Byron -> \(Outputs os) -> K $ fromByronTxOut <$> toList os + Shelley -> \(Outputs os) -> K $ fromShelleyTxOut <$> toList os + Allegra -> \(Outputs os) -> K $ fromAllegraTxOut <$> toList os + Mary -> \(Outputs os) -> K $ fromMaryTxOut <$> toList os + Alonzo -> \(Outputs os) -> K $ fromAlonzoTxOut <$> toList os + Babbage -> \(Outputs os) -> K $ fromBabbageTxOut <$> toList os + Conway -> \(Outputs os) -> K $ fromConwayTxOut <$> toList os + where + fromByronTxOut :: Byron.TxOut -> TxOut + fromByronTxOut (Byron.TxOut addr amount) = + TxOut (serialize' addr) (fromIntegral $ unsafeGetLovelace amount) + + fromShelleyTxOut :: SL.ShelleyTxOut StandardShelley -> TxOut + fromShelleyTxOut (SL.ShelleyTxOut addr (Coin amount)) = + TxOut (SL.serialiseAddr addr) amount + + fromAllegraTxOut :: SL.ShelleyTxOut StandardAllegra -> TxOut + fromAllegraTxOut (SL.ShelleyTxOut addr (Coin amount)) = + TxOut (SL.serialiseAddr addr) amount + + fromMaryTxOut :: SL.ShelleyTxOut StandardMary -> TxOut + fromMaryTxOut (SL.ShelleyTxOut addr (MaryValue (Coin amount) _)) = + TxOut (SL.serialiseAddr addr) amount + + fromAlonzoTxOut :: Alonzo.AlonzoTxOut StandardAlonzo -> TxOut + fromAlonzoTxOut (Alonzo.AlonzoTxOut addr (MaryValue (Coin amount) _) _) = + TxOut (SL.serialiseAddr addr) amount + + fromBabbageTxOut :: Babbage.BabbageTxOut StandardBabbage -> TxOut + fromBabbageTxOut + (Babbage.BabbageTxOut addr (MaryValue (Coin amount) _) _ _) = + TxOut (SL.serialiseAddr addr) amount + + fromConwayTxOut :: Babbage.BabbageTxOut StandardConway -> TxOut + fromConwayTxOut + (Babbage.BabbageTxOut addr (MaryValue (Coin amount) _) _ _) = + TxOut (SL.serialiseAddr addr) amount diff --git a/lib/network-layer/cardano-wallet-network-layer.cabal b/lib/network-layer/cardano-wallet-network-layer.cabal index de2dafb2ca0..7e8147ba236 100644 --- a/lib/network-layer/cardano-wallet-network-layer.cabal +++ b/lib/network-layer/cardano-wallet-network-layer.cabal @@ -14,11 +14,11 @@ maintainer: hal@cardanofoundation.org category: Network build-type: Simple extra-doc-files: CHANGELOG.md + -- extra-source-files: common language - default-language: - Haskell2010 + default-language: Haskell2010 default-extensions: NoImplicitPrelude OverloadedStrings @@ -46,8 +46,8 @@ library Cardano.Wallet.Network.Config Cardano.Wallet.Network.Implementation Cardano.Wallet.Network.Implementation.Ouroboros - Cardano.Wallet.Network.Implementation.UnliftIO Cardano.Wallet.Network.Implementation.Types + Cardano.Wallet.Network.Implementation.UnliftIO Cardano.Wallet.Network.Light Cardano.Wallet.Network.LocalStateQuery Cardano.Wallet.Network.LocalStateQuery.Extra @@ -57,6 +57,10 @@ library Cardano.Wallet.Network.Logging Cardano.Wallet.Network.Logging.Aggregation Cardano.Wallet.Network.RestorationMode + Cardano.Wallet.Network.Rollback.ChainPoints + Cardano.Wallet.Network.Rollback.One + Cardano.Wallet.Network.Streaming + -- other-modules: -- other-extensions: @@ -65,9 +69,13 @@ library , bytestring , cardano-api , cardano-balance-tx:internal + , cardano-binary , cardano-crypto-class + , cardano-ledger-alonzo + , cardano-ledger-babbage , cardano-ledger-byron , cardano-ledger-core + , cardano-ledger-mary , cardano-ledger-shelley , cardano-slotting , cardano-wallet-launcher @@ -92,8 +100,10 @@ library , ouroboros-network-api , ouroboros-network-framework , ouroboros-network-protocols + , parallel , retry , safe + , streaming , strict-stm , text , text-class @@ -104,23 +114,21 @@ library , unliftio-core test-suite unit - import: language, opts-exe - type: exitcode-stdio-1.0 - hs-source-dirs: test + import: language, opts-exe + type: exitcode-stdio-1.0 + hs-source-dirs: test build-depends: - base + , base , bytestring , cardano-wallet-network-layer , cardano-wallet-primitive , contra-tracer + , hspec , io-classes + , QuickCheck , text , transformers - , hspec - , QuickCheck - build-tool-depends: - hspec-discover:hspec-discover - main-is: - Main.hs - other-modules: - Cardano.Wallet.Network.LightSpec + + build-tool-depends: hspec-discover:hspec-discover + main-is: Main.hs + other-modules: Cardano.Wallet.Network.LightSpec diff --git a/lib/network-layer/src/Cardano/Wallet/Network/Rollback/ChainPoints.hs b/lib/network-layer/src/Cardano/Wallet/Network/Rollback/ChainPoints.hs new file mode 100644 index 00000000000..9454fdf4717 --- /dev/null +++ b/lib/network-layer/src/Cardano/Wallet/Network/Rollback/ChainPoints.hs @@ -0,0 +1,44 @@ +module Cardano.Wallet.Network.Rollback.ChainPoints + ( ChainPoints (..) + , WithChainPoint (..) + , chainPointDifference + , + ) +where + +import Prelude + +import Cardano.Wallet.Read + ( ChainPoint (..) + , SlotNo (..) + ) +import Numeric.Natural + ( Natural + ) + +-- | Abstract data type representing a store for values at chain points. +data ChainPoints value = ChainPoints + { rollback :: ChainPoint -> ChainPoints value + -- ^ Rollback to the given chain point, returning the new chain point and + -- the new store. + , feed :: ChainPoint -> value -> ChainPoints value + -- ^ Feed a new value + , current :: value + -- ^ The current stored value + } + +-- | A value associated with a chain point. +data WithChainPoint a = Valued + { point :: ChainPoint + , value :: a + } + +-- | Calculate the difference in slots between two chain points. It will not +-- return a negative number which means that if the second chain point is +-- after the first one, the result will be 0. +chainPointDifference :: ChainPoint -> ChainPoint -> Natural +chainPointDifference x y = max 0 $ slotNo x - slotNo y + where + slotNo :: ChainPoint -> Natural + slotNo GenesisPoint = 0 + slotNo (BlockPoint s _) = unSlotNo s diff --git a/lib/network-layer/src/Cardano/Wallet/Network/Rollback/One.hs b/lib/network-layer/src/Cardano/Wallet/Network/Rollback/One.hs new file mode 100644 index 00000000000..849e205de67 --- /dev/null +++ b/lib/network-layer/src/Cardano/Wallet/Network/Rollback/One.hs @@ -0,0 +1,48 @@ +module Cardano.Wallet.Network.Rollback.One + ( oneHistory + ) +where + +import Prelude + +import Cardano.Wallet.Network.Rollback.ChainPoints + ( ChainPoints (..) + ) +import Cardano.Wallet.Read + ( ChainPoint (..) + ) +import Control.Monad.Fix + ( fix + ) +import Data.Map.Lazy + ( Map + ) + +import qualified Data.Map.Lazy as Map + +newtype OneHistory b = OneHistory (Map ChainPoint b) + +-- | Create a 'ChainPoints' store with a maximum number of slots to keep in +-- history. It will keep at least one state at the given distance from the +-- tip. +oneHistory + :: b + -- ^ Initial value + -> ChainPoints b +oneHistory b0 = + ($ start) + $ fix + $ \go (OneHistory actual) -> + ChainPoints + { rollback = \cp -> + let + (good, _bad) = Map.split cp actual + in + go $ OneHistory good + , feed = \cp b -> go $ OneHistory $ Map.insert cp b actual + , current = case Map.maxView actual of + Nothing -> b0 + Just (b, _) -> b + } + where + start = OneHistory $ Map.insert GenesisPoint b0 mempty diff --git a/lib/network-layer/src/Cardano/Wallet/Network/Streaming.hs b/lib/network-layer/src/Cardano/Wallet/Network/Streaming.hs new file mode 100644 index 00000000000..5eade5df877 --- /dev/null +++ b/lib/network-layer/src/Cardano/Wallet/Network/Streaming.hs @@ -0,0 +1,230 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeOperators #-} + +module Cardano.Wallet.Network.Streaming + ( -- * ChainStream creation + ChainStream + , withStreamingFromBlockChain + , NewBuffer + , newTMVarBuffer + , newTBQueueBuffer + , Buffer (..) + + -- * ChainStream manipulation + , forConsensusS + , eraBlockS + , eraTxS + , forChainStream + , scanChainStream + ) +where + +import Prelude hiding + ( take + ) + +import Cardano.Wallet.Network + ( ChainFollowLog + , ChainFollower (..) + , NetworkLayer (..) + ) +import Cardano.Wallet.Network.Rollback.ChainPoints + ( ChainPoints (..) + ) +import Cardano.Wallet.Read + ( BHeader + , Block + , ChainPoint (..) + , ChainTip + , ConsensusBlock + , EraValue + , IsEra (..) + , Tx + , applyEraFunValue + , chainPointFromChainTip + , fromConsensusBlock + , getEraTransactions + , sequenceEraValue + , (:*:) (..) + , (:.:) (..) + ) +import Cardano.Wallet.Read.Block.BHeader + ( getEraBHeader + ) +import Control.Monad.Fix + ( fix + ) +import Control.Monad.IO.Unlift + ( MonadUnliftIO + ) +import Control.Monad.Trans.Cont + ( ContT (..) + ) +import Control.Parallel + ( par + ) +import Control.Tracer + ( Tracer + ) +import Data.List.NonEmpty + ( NonEmpty (..) + ) +import Numeric.Natural + ( Natural + ) +import Ouroboros.Consensus.Cardano.Block + ( CardanoBlock + , StandardCrypto + ) +import Streaming + ( MonadIO (..) + , MonadTrans (..) + , Of (..) + , Stream + ) +import UnliftIO.Async + ( withAsync + ) +import UnliftIO.STM + ( atomically + , newEmptyTMVarIO + , newTBQueueIO + , putTMVar + , readTBQueue + , takeTMVar + , writeTBQueue + ) + +import qualified Streaming.Prelude as S + +data Message a = Forward ChainTip a | Rollback ChainPoint + +type ChainStream a = Stream (Of (Message a)) + +-- | A Buffer that can be taken from and put into. We instroduce this abstraction +-- to avoid binding the implementation to IO / MVar. +data Buffer m a = Buffer + { put :: a -> m () + , take :: m a + } + +-- | A function that creates a new Buffer. +type NewBuffer m = forall a. m (Buffer m a) + +-- | Create a new Buffer using an MVar. +newTMVarBuffer :: MonadIO m => NewBuffer m +newTMVarBuffer = do + var <- newEmptyTMVarIO + pure + $ Buffer + { put = atomically . putTMVar var + , take = atomically $ takeTMVar var + } + +-- | Create a new Buffer using a TBQueue. +newTBQueueBuffer + :: MonadIO m + => Natural + -- ^ size of the queue + -> NewBuffer m +newTBQueueBuffer size = do + queue <- newTBQueueIO size + pure + $ Buffer + { put = atomically . writeTBQueue queue + , take = atomically $ readTBQueue queue + } + +-- | Expose a 'ChainFollower' as a 'ChainStream'. A thread is forked to run the +-- 'ChainFollower' in the background. +withStreamingFromBlockChain + :: MonadUnliftIO m + => NetworkLayer m (CardanoBlock StandardCrypto) + -- ^ ChainFollower provider + -> Tracer IO ChainFollowLog + -- ^ ChainFollower logger + -> NewBuffer m + -- ^ Buffer provider + -> ContT r m (ChainStream (NonEmpty ConsensusBlock) m x) +withStreamingFromBlockChain network tr newBuffer = do + messages <- lift newBuffer + let cf = + ChainFollower + { checkpointPolicy = mempty + , readChainPoints = pure [] + , rollForward = \blocks nodeTip -> + put messages $ Forward nodeTip blocks + , rollBackward = \point -> do + put messages $ Rollback point + pure point + } + _ <- ContT $ withAsync $ chainSync network tr cf + pure $ fix $ \next -> do + msg <- lift $ take messages + S.yield msg + next + +explodeBlock :: IsEra era => Block era -> (BHeader :*: ([] :.: Tx)) era +explodeBlock block = + let txs = getEraTransactions block + bh = getEraBHeader block + in (bh :*: Comp txs) + +forConsensusS + :: Monad m + => ChainStream (NonEmpty ConsensusBlock) m r + -> ChainStream (ConsensusBlock) m r +forConsensusS = forChainStream $ \blocks -> do + S.each $ foldr par blocks blocks + +eraBlockS + :: Monad m + => ChainStream (ConsensusBlock) m r + -> ChainStream (EraValue (BHeader :*: ([] :.: Tx))) m r +eraBlockS = S.map $ \case + Forward tip block -> + (Forward tip) + $ (\y -> let (h :*: txs) = explodeBlock y in h :*: txs) + `applyEraFunValue` fromConsensusBlock block + Rollback cp -> Rollback cp + +eraTxS + :: Monad m + => ChainStream (EraValue (ctx :*: ([] :.: Tx))) m r + -> ChainStream (EraValue (ctx :*: Tx)) m r +eraTxS s = S.for s $ \case + Forward tip x -> S.each $ Forward tip <$> q x + Rollback cp -> S.yield $ Rollback cp + where + f :: (ctx :*: ([] :.: Tx)) era -> ([] :.: (ctx :*: Tx)) era + f (bh :*: Comp txs) = Comp $ fmap (bh :*:) txs + q v = sequenceEraValue (f `applyEraFunValue` v) + +-- | Pure scanning of a 'ChainStream'. +scanChainStream + :: Monad m + => (b -> a -> b) + -- ^ How to react to new elements + -> ChainPoints b + -- ^ How to react to rollbacks + -> ChainStream a m r + -- ^ The stream to scan + -> Stream (Of b) m r + -- ^ The scanned stream +scanChainStream ingest state = S.drop 1 . S.scan ingest' state current + where + ingest' s = \case + Forward tip x -> + feed s (chainPointFromChainTip tip) + $ ingest (current s) x + Rollback cp -> rollback s cp + +forChainStream + :: Monad m + => (a -> Stream (Of b) m ()) + -> ChainStream a m r + -> ChainStream b m r +forChainStream f s = S.for s $ \case + Forward tip x -> S.map (Forward tip) $ f x + Rollback cp -> S.yield $ Rollback cp