From 5ba90bfa146df09a30ec0051741f7b5e12082ed1 Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Thu, 1 Sep 2022 16:38:05 +0200 Subject: [PATCH 1/4] Fix: /healthz endpoint (#2379) --- plugins/webapi/healthz/plugin.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/webapi/healthz/plugin.go b/plugins/webapi/healthz/plugin.go index b4e2098b56..3cf82a6605 100644 --- a/plugins/webapi/healthz/plugin.go +++ b/plugins/webapi/healthz/plugin.go @@ -9,7 +9,7 @@ import ( "github.com/labstack/echo" "go.uber.org/dig" - "github.com/iotaledger/goshimmer/packages/core/tangleold" + "github.com/iotaledger/goshimmer/packages/core/bootstrapmanager" "github.com/iotaledger/goshimmer/packages/node/shutdown" ) @@ -20,8 +20,8 @@ const PluginName = "WebAPIHealthzEndpoint" type dependencies struct { dig.In - Server *echo.Echo - Tangle *tangleold.Tangle `optional:"true"` + Server *echo.Echo + BootstrapManager *bootstrapmanager.Manager `optional:"true"` } var ( @@ -50,7 +50,7 @@ func worker(ctx context.Context) { } func getHealthz(c echo.Context) error { - if deps.Tangle != nil { + if deps.BootstrapManager != nil && !deps.BootstrapManager.Bootstrapped() { return c.NoContent(http.StatusServiceUnavailable) } return c.NoContent(http.StatusOK) From 64020a363b111cb9b446f0a2f263ce02f0c1eb9e Mon Sep 17 00:00:00 2001 From: Jonas Theis <4181434+jonastheis@users.noreply.github.com> Date: Mon, 5 Sep 2022 14:49:52 +0100 Subject: [PATCH 2/4] Make wallet stateless to prevent faucet getting stuck (#2415) * Debug prints * More prints and hacky fix for spend problem * Make faucet wallet stateless Co-authored-by: Andrea V <1577639+karimodm@users.noreply.github.com> --- client/wallet/options.go | 7 +++++++ client/wallet/output_manager.go | 14 ++++++++++---- client/wallet/wallet.go | 3 ++- plugins/faucet/connector.go | 5 ----- plugins/faucet/faucet.go | 1 + 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/client/wallet/options.go b/client/wallet/options.go index 4349d6cf2c..d4e7a607ac 100644 --- a/client/wallet/options.go +++ b/client/wallet/options.go @@ -81,3 +81,10 @@ func GenericConnector(connector Connector) Option { wallet.connector = connector } } + +// Stateless allows to run the wallet in a stateless mode, meaning outputs will always be refreshed from the connector. +func Stateless(stateless bool) Option { + return func(wallet *Wallet) { + wallet.Stateless = stateless + } +} diff --git a/client/wallet/output_manager.go b/client/wallet/output_manager.go index 12e2072bc1..0accbef927 100644 --- a/client/wallet/output_manager.go +++ b/client/wallet/output_manager.go @@ -10,14 +10,17 @@ type OutputManager struct { addressManager *AddressManager connector Connector unspentOutputs OutputsByAddressAndOutputID + + optsStateless bool } // NewUnspentOutputManager creates a new UnspentOutputManager. -func NewUnspentOutputManager(addressManager *AddressManager, connector Connector) (outputManager *OutputManager) { +func NewUnspentOutputManager(addressManager *AddressManager, connector Connector, stateless bool) (outputManager *OutputManager) { outputManager = &OutputManager{ addressManager: addressManager, connector: connector, unspentOutputs: NewAddressToOutputs(), + optsStateless: stateless, } if err := outputManager.Refresh(true); err != nil { @@ -44,9 +47,12 @@ func (o *OutputManager) Refresh(includeSpentAddresses ...bool) error { if _, addressExists := o.unspentOutputs[addy]; !addressExists { o.unspentOutputs[addy] = make(map[utxo.OutputID]*Output) } - // mark the output as spent if we already marked it as spent locally - if existingOutput, outputExists := o.unspentOutputs[addy][outputID]; outputExists && existingOutput.Spent { - output.Spent = true + + // mark the output as spent if we already marked it as spent locally, only in stateful mode. + if !o.optsStateless { + if existingOutput, outputExists := o.unspentOutputs[addy][outputID]; outputExists && existingOutput.Spent { + output.Spent = true + } } o.unspentOutputs[addy][outputID] = output } diff --git a/client/wallet/wallet.go b/client/wallet/wallet.go index bf369b2728..0e8b890b6e 100644 --- a/client/wallet/wallet.go +++ b/client/wallet/wallet.go @@ -56,6 +56,7 @@ type Wallet struct { reusableAddress bool ConfirmationPollInterval time.Duration ConfirmationTimeout time.Duration + Stateless bool } // New is the factory method of the wallet. It either creates a new wallet or restores the wallet backup that is handed @@ -93,7 +94,7 @@ func New(options ...Option) (wallet *Wallet) { } // initialize output manager - wallet.outputManager = NewUnspentOutputManager(wallet.addressManager, wallet.connector) + wallet.outputManager = NewUnspentOutputManager(wallet.addressManager, wallet.connector, wallet.Stateless) err := wallet.outputManager.Refresh(true) if err != nil { panic(err) diff --git a/plugins/faucet/connector.go b/plugins/faucet/connector.go index db0c328440..23f0882352 100644 --- a/plugins/faucet/connector.go +++ b/plugins/faucet/connector.go @@ -1,8 +1,6 @@ package faucet import ( - "fmt" - "github.com/iotaledger/hive.go/core/types/confirmation" "github.com/pkg/errors" @@ -33,10 +31,8 @@ func (f *FaucetConnector) UnspentOutputs(addresses ...address.Address) (unspentO unspentOutputs = make(map[address.Address]map[utxo.OutputID]*wallet.Output) for _, addr := range addresses { - fmt.Println("> Getting unspent outputs for ", addr.Base58()) f.indexer.CachedAddressOutputMappings(addr.Address()).Consume(func(mapping *indexer.AddressOutputMapping) { f.tangle.Ledger.Storage.CachedOutput(mapping.OutputID()).Consume(func(output utxo.Output) { - fmt.Println("> > Found output ", output.String()) if typedOutput, ok := output.(devnetvm.Output); ok { f.tangle.Ledger.Storage.CachedOutputMetadata(typedOutput.ID()).Consume(func(outputMetadata *ledger.OutputMetadata) { if !outputMetadata.IsSpent() { @@ -61,7 +57,6 @@ func (f *FaucetConnector) UnspentOutputs(addresses ...address.Address) (unspentO }) }) } - fmt.Printf("%+v\n", unspentOutputs) return } diff --git a/plugins/faucet/faucet.go b/plugins/faucet/faucet.go index f10b4116cb..21ae480560 100644 --- a/plugins/faucet/faucet.go +++ b/plugins/faucet/faucet.go @@ -31,6 +31,7 @@ func NewFaucet(faucetSeed *seed.Seed) (f *Faucet) { wallet.FaucetPowDifficulty(Parameters.PowDifficulty), wallet.ConfirmationTimeout(Parameters.MaxAwait), wallet.ConfirmationPollingInterval(500*time.Millisecond), + wallet.Stateless(true), )} // We use index 1 as a proxy address from which we send the funds to the requester. f.Wallet.NewReceiveAddress() From e898998ce990a23b7c358879b7de8bc3b05e6b5e Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Mon, 5 Sep 2022 15:50:45 +0200 Subject: [PATCH 3/4] Fix notarization and networking bugs (#2417) --- packages/core/notarization/commitments.go | 21 +++++++++- packages/core/notarization/manager.go | 15 +++++++ packages/node/p2p/manager.go | 1 + packages/node/p2p/stream.go | 48 ++++++++++++++--------- 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/packages/core/notarization/commitments.go b/packages/core/notarization/commitments.go index ae3a7b8f02..71f8cb522d 100644 --- a/packages/core/notarization/commitments.go +++ b/packages/core/notarization/commitments.go @@ -170,6 +170,15 @@ func (f *EpochCommitmentFactory) removeStateMutationLeaf(ei epoch.Index, txID ut return removeLeaf(commitment.stateMutationTree, txID.Bytes()) } +// hasStateMutationLeaf returns if the leaf is part of the state mutation sparse merkle tree. +func (f *EpochCommitmentFactory) hasStateMutationLeaf(ei epoch.Index, txID utxo.TransactionID) (has bool, err error) { + commitment, err := f.getCommitmentTrees(ei) + if err != nil { + return false, errors.Wrap(err, "could not get commitment while deleting state mutation leaf") + } + return commitment.stateMutationTree.Has(txID.Bytes()) +} + // insertTangleLeaf inserts blk to the Tangle sparse merkle tree. func (f *EpochCommitmentFactory) insertTangleLeaf(ei epoch.Index, blkID tangleold.BlockID) error { commitment, err := f.getCommitmentTrees(ei) @@ -250,11 +259,19 @@ func (f *EpochCommitmentFactory) storeDiffUTXOs(ei epoch.Index, spent, created [ epochDiffStorage := f.storage.getEpochDiffStorage(ei) for _, spentOutputWithMetadata := range spent { - epochDiffStorage.spent.Store(spentOutputWithMetadata).Release() + cachedObj, stored := epochDiffStorage.spent.StoreIfAbsent(spentOutputWithMetadata) + if !stored { + continue + } + cachedObj.Release() } for _, createdOutputWithMetadata := range created { - epochDiffStorage.created.Store(createdOutputWithMetadata).Release() + cachedObj, stored := epochDiffStorage.created.StoreIfAbsent(createdOutputWithMetadata) + if !stored { + continue + } + cachedObj.Release() } } diff --git a/packages/core/notarization/manager.go b/packages/core/notarization/manager.go index e88a668da3..269fd99d43 100644 --- a/packages/core/notarization/manager.go +++ b/packages/core/notarization/manager.go @@ -430,6 +430,15 @@ func (m *Manager) OnTransactionInclusionUpdated(event *ledger.TransactionInclusi txID := event.TransactionID + has, err := m.isTransactionInEpoch(event.TransactionID, oldEpoch); + if err != nil { + m.log.Error(err) + return + } + if !has { + return + } + var spent, created []*ledger.OutputWithMetadata m.tangle.Ledger.Storage.CachedTransaction(txID).Consume(func(tx utxo.Transaction) { spent, created = m.resolveOutputs(tx) @@ -576,6 +585,8 @@ func (m *Manager) includeTransactionInEpoch(txID utxo.TransactionID, ei epoch.In if err := m.epochCommitmentFactory.insertStateMutationLeaf(ei, txID); err != nil { return err } + // TODO: in case of a reorg, a transaction spending the same output of another TX will cause a duplicate element + // in cache in the objectstorage if we don't hook to the reorged transaction "orphanage". m.epochCommitmentFactory.storeDiffUTXOs(ei, spent, created) m.Events.StateMutationTreeInserted.Trigger(&StateMutationTreeUpdatedEvent{TransactionID: txID}) @@ -596,6 +607,10 @@ func (m *Manager) removeTransactionFromEpoch(txID utxo.TransactionID, ei epoch.I return nil } +func (m *Manager) isTransactionInEpoch(txID utxo.TransactionID, ei epoch.Index) (has bool, err error) { + return m.epochCommitmentFactory.hasStateMutationLeaf(ei, txID) +} + // isCommittable returns if the epoch is committable, if all conflicts are resolved and the epoch is old enough. func (m *Manager) isCommittable(ei epoch.Index) bool { return m.isOldEnough(ei) && m.allPastConflictsAreResolved(ei) diff --git a/packages/node/p2p/manager.go b/packages/node/p2p/manager.go index 5666797f7b..a917aaa92d 100644 --- a/packages/node/p2p/manager.go +++ b/packages/node/p2p/manager.go @@ -179,6 +179,7 @@ func (m *Manager) Send(packet proto.Message, protocolID protocol.ID, to ...ident stream := nbr.GetStream(protocolID) if stream == nil { m.log.Warnw("send error, no stream for protocol", "peer-id", nbr.ID(), "protocol", protocolID) + nbr.Close() continue } if err := stream.WritePacket(packet); err != nil { diff --git a/packages/node/p2p/stream.go b/packages/node/p2p/stream.go index b3e0cbcb39..a49f7e80c5 100644 --- a/packages/node/p2p/stream.go +++ b/packages/node/p2p/stream.go @@ -81,7 +81,7 @@ func (m *Manager) dialPeer(ctx context.Context, p *peer.Peer, opts []ConnectPeer } if len(streams) == 0 { - return nil, fmt.Errorf("no streams initiated with peer %s / %s", address, p.ID()) + return nil, errors.Errorf("no streams initiated with peer %s / %s", address, p.ID()) } return streams, nil @@ -102,7 +102,7 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe ctx, cancel = context.WithTimeout(ctx, defaultConnectionTimeout) defer cancel() } - am, err := m.newAcceptMatcher(p, protocolID) + amCtx, am, err := m.newAcceptMatcher(ctx, p, protocolID) if err != nil { return nil, errors.WithStack(err) } @@ -118,11 +118,11 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe select { case ps := <-streamCh: if ps.Protocol() != protocolID { - return nil, fmt.Errorf("accepted stream has wrong protocol: %s != %s", ps.Protocol(), protocolID) + return nil, errors.Errorf("accepted stream has wrong protocol: %s != %s", ps.Protocol(), protocolID) } return ps, nil - case <-ctx.Done(): - err := ctx.Err() + case <-amCtx.Done(): + err := amCtx.Err() if errors.Is(err, context.DeadlineExceeded) { m.log.Debugw("accept timeout", "id", am.Peer.ID(), "proto", protocolID) return nil, errors.WithStack(ErrTimeout) @@ -166,7 +166,7 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe } if len(streams) == 0 { - return nil, fmt.Errorf("no streams accepted from peer %s", p.ID()) + return nil, errors.Errorf("no streams accepted from peer %s", p.ID()) } return streams, nil @@ -175,7 +175,7 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe func (m *Manager) initiateStream(ctx context.Context, libp2pID libp2ppeer.ID, protocolID protocol.ID) (*PacketsStream, error) { protocolHandler, registered := m.registeredProtocols[protocolID] if !registered { - return nil, fmt.Errorf("cannot initiate stream protocol %s is not registered", protocolID) + return nil, errors.Errorf("cannot initiate stream protocol %s is not registered", protocolID) } stream, err := m.GetP2PHost().NewStream(ctx, libp2pID, protocolID) if err != nil { @@ -210,11 +210,14 @@ func (m *Manager) handleStream(stream network.Stream) { am := m.matchNewStream(stream) if am != nil { am.StreamChMutex.RLock() + defer am.StreamChMutex.RUnlock() streamCh := am.StreamCh[protocolID] - am.StreamChMutex.RUnlock() - m.log.Debugw("incoming stream matched", "id", am.Peer.ID(), "proto", protocolID) - streamCh <- ps + select { + case <-am.Ctx.Done(): + case streamCh <- ps: + m.log.Debugw("incoming stream matched", "id", am.Peer.ID(), "proto", protocolID) + } } else { // close the connection if not matched m.log.Debugw("unexpected connection", "addr", stream.Conn().RemoteMultiaddr(), @@ -230,15 +233,17 @@ type AcceptMatcher struct { Libp2pID libp2ppeer.ID StreamChMutex sync.RWMutex StreamCh map[protocol.ID]chan *PacketsStream + Ctx context.Context + CtxCancel context.CancelFunc } -func (m *Manager) newAcceptMatcher(p *peer.Peer, protocolID protocol.ID) (*AcceptMatcher, error) { +func (m *Manager) newAcceptMatcher(ctx context.Context, p *peer.Peer, protocolID protocol.ID) (context.Context, *AcceptMatcher, error) { m.acceptMutex.Lock() defer m.acceptMutex.Unlock() libp2pID, err := libp2putil.ToLibp2pPeerID(p) if err != nil { - return nil, errors.WithStack(err) + return nil, nil, errors.WithStack(err) } acceptMatcher, acceptExists := m.acceptMap[libp2pID] @@ -246,23 +251,27 @@ func (m *Manager) newAcceptMatcher(p *peer.Peer, protocolID protocol.ID) (*Accep acceptMatcher.StreamChMutex.Lock() defer acceptMatcher.StreamChMutex.Unlock() if _, streamChanExists := acceptMatcher.StreamCh[protocolID]; streamChanExists { - return nil, nil + return nil, nil, nil } acceptMatcher.StreamCh[protocolID] = make(chan *PacketsStream) - return acceptMatcher, nil + return acceptMatcher.Ctx, acceptMatcher, nil } + cancelCtx, cancelCtxFunc := context.WithCancel(ctx) + am := &AcceptMatcher{ - Peer: p, - Libp2pID: libp2pID, - StreamCh: make(map[protocol.ID]chan *PacketsStream), + Peer: p, + Libp2pID: libp2pID, + StreamCh: make(map[protocol.ID]chan *PacketsStream), + Ctx: cancelCtx, + CtxCancel: cancelCtxFunc, } am.StreamCh[protocolID] = make(chan *PacketsStream) m.acceptMap[libp2pID] = am - return am, nil + return cancelCtx, am, nil } func (m *Manager) removeAcceptMatcher(am *AcceptMatcher, protocolID protocol.ID) { @@ -270,14 +279,15 @@ func (m *Manager) removeAcceptMatcher(am *AcceptMatcher, protocolID protocol.ID) defer m.acceptMutex.Unlock() existingAm := m.acceptMap[am.Libp2pID] + existingAm.StreamChMutex.Lock() defer existingAm.StreamChMutex.Unlock() - close(existingAm.StreamCh[protocolID]) delete(existingAm.StreamCh, protocolID) if len(existingAm.StreamCh) == 0 { delete(m.acceptMap, am.Libp2pID) + existingAm.CtxCancel() } } From 14b62d83765b1e3cd6d7a4bdd55a2ea5225056f5 Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Mon, 5 Sep 2022 15:57:50 +0200 Subject: [PATCH 4/4] v0.9.7 (#2418) --- CHANGELOG.md | 8 ++++++++ packages/core/epoch/types.go | 2 +- plugins/autopeering/discovery/parameters.go | 2 +- plugins/banner/plugin.go | 2 +- plugins/database/versioning.go | 2 +- 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7efefac6c7..92a29283ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# v0.9.7 - 2022-09-05 + +> This release introduces major bugfixes to the networking and the faucet. + +- Fix notarization and networking bugs (#2417) +- Make wallet stateless to prevent faucet getting stuck (#2415) +- Fix: /healthz endpoint (#2379) + # v0.9.6 - 2022-09-01 > This release introduces major bugfixes to epoch notarization and networking. diff --git a/packages/core/epoch/types.go b/packages/core/epoch/types.go index f7cd2d5d5a..78c4dd56c3 100644 --- a/packages/core/epoch/types.go +++ b/packages/core/epoch/types.go @@ -19,7 +19,7 @@ import ( var ( // GenesisTime is the time (Unix in seconds) of the genesis. - GenesisTime int64 = 1662035280 + GenesisTime int64 = 1662385954 // Duration is the default epoch duration in seconds. Duration int64 = 10 ) diff --git a/plugins/autopeering/discovery/parameters.go b/plugins/autopeering/discovery/parameters.go index c1799f3faf..cd072aa7b8 100644 --- a/plugins/autopeering/discovery/parameters.go +++ b/plugins/autopeering/discovery/parameters.go @@ -5,7 +5,7 @@ import "github.com/iotaledger/goshimmer/plugins/config" // ParametersDefinitionDiscovery contains the definition of configuration parameters used by the autopeering peer discovery. type ParametersDefinitionDiscovery struct { // NetworkVersion defines the config flag of the network version. - NetworkVersion uint32 `default:"64" usage:"autopeering network version"` + NetworkVersion uint32 `default:"65" usage:"autopeering network version"` // EntryNodes defines the config flag of the entry nodes. EntryNodes []string `default:"2PV5487xMw5rasGBXXWeqSi4hLz7r19YBt8Y1TGAsQbj@analysisentry-01.devnet.shimmer.iota.cafe:15626,5EDH4uY78EA6wrBkHHAVBWBMDt7EcksRq6pjzipoW15B@entry-0.devnet.tanglebay.com:14646,CAB87iQZR6BjBrCgEBupQJ4gpEBgvGKKv3uuGVRBKb4n@entry-1.devnet.tanglebay.com:14646" usage:"list of trusted entry nodes for auto peering"` diff --git a/plugins/banner/plugin.go b/plugins/banner/plugin.go index 98da733f11..2c21de02a1 100644 --- a/plugins/banner/plugin.go +++ b/plugins/banner/plugin.go @@ -15,7 +15,7 @@ var ( Plugin = node.NewPlugin(PluginName, nil, node.Enabled, configure, run) // AppVersion version number - AppVersion = "v0.9.6" + AppVersion = "v0.9.7" // SimplifiedAppVersion is the version number without commit hash SimplifiedAppVersion = simplifiedVersion(AppVersion) ) diff --git a/plugins/database/versioning.go b/plugins/database/versioning.go index f44a90e8b3..2cbc879f8e 100644 --- a/plugins/database/versioning.go +++ b/plugins/database/versioning.go @@ -11,7 +11,7 @@ import ( const ( // DBVersion defines the version of the database schema this version of GoShimmer supports. // Every time there's a breaking change regarding the stored data, this version flag should be adjusted. - DBVersion = 64 + DBVersion = 65 ) var (