diff --git a/cmd/main.go b/cmd/main.go index 86833ce..50407b3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -20,7 +20,7 @@ type State struct { } func main() { - // logrus.SetLevel(logrus.DebugLevel) + logrus.SetLevel(logrus.DebugLevel) state := &State{ filtersDB: inmemory.NewFilterInmemory(), diff --git a/cmd/start.go b/cmd/start.go index 0afd09e..83e9c85 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -17,7 +17,7 @@ import ( func startAction(state *State) cli.ActionFunc { return func(c *cli.Context) error { - peers := c.StringSlice("peers") + peers := c.StringSlice("connect") if len(peers) == 0 { return cli.Exit("peers must be specified", 1) } @@ -82,6 +82,7 @@ func startAction(state *State) cli.ActionFunc { scanSvc.Watch( scanner.WithStartBlock(0), scanner.WithWatchItem(watchItem), + scanner.WithPersistentWatch(), ) if err != nil { panic(err) diff --git a/pkg/node/cmd_block.go b/pkg/node/cmd_block.go index a5eebf2..e6f0000 100644 --- a/pkg/node/cmd_block.go +++ b/pkg/node/cmd_block.go @@ -4,7 +4,6 @@ import ( "context" "io" - "github.com/sirupsen/logrus" "github.com/vulpemventures/neutrino-elements/pkg/binary" "github.com/vulpemventures/neutrino-elements/pkg/peer" "github.com/vulpemventures/neutrino-elements/pkg/protocol" @@ -14,15 +13,11 @@ import ( func (no node) handleBlock(header *protocol.MessageHeader, p peer.Peer) error { var block protocol.MsgBlock - currentChainTip, err := no.blockHeadersDb.ChainTip(context.Background()) + _, err := no.blockHeadersDb.ChainTip(context.Background()) if err != nil && err != repository.ErrNoBlocksHeaders { return err } - if currentChainTip != nil { - logrus.Println(currentChainTip.Height) - } - lr := io.LimitReader(p.Connection(), int64(header.Length)) if err := binary.NewDecoder(lr).Decode(&block); err != nil { return err diff --git a/pkg/node/node.go b/pkg/node/node.go index ade51cf..cb96dc9 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -16,7 +16,7 @@ import ( const ( pingIntervalSec = 120 - pingTimeoutSec = 30 + pingTimeoutSec = 60 ) type NodeService interface { @@ -24,6 +24,7 @@ type NodeService interface { Stop() error AddOutboundPeer(peer.Peer) error SendTransaction(txhex string) error + GetChainTip() (*block.Header, error) } // node implements an Elements full node. @@ -79,6 +80,10 @@ func New(config NodeConfig) (NodeService, error) { }, nil } +func (no node) GetChainTip() (*block.Header, error) { + return no.blockHeadersDb.ChainTip(context.Background()) +} + // AddOutboundPeer sends a new version message to a new peer // returns an error if the peer is already connected. // it also starts a goroutine to monitor the peer's messages. diff --git a/pkg/scanner/request.go b/pkg/scanner/request.go index 64c2a60..6289ae5 100644 --- a/pkg/scanner/request.go +++ b/pkg/scanner/request.go @@ -1,8 +1,9 @@ package scanner type ScanRequest struct { - StartHeight uint32 // nil means scan from genesis block - Item WatchItem // item to watch + StartHeight uint32 // nil means scan from genesis block + Item WatchItem // item to watch + IsPersistent bool // if true, the request will be re-added with StartHeight = StartHeiht + 1 } type ScanRequestOption func(req *ScanRequest) @@ -19,11 +20,14 @@ func WithStartBlock(blockHeight uint32) ScanRequestOption { } } -func newScanRequest(options ...ScanRequestOption) *ScanRequest { - req := &ScanRequest{ - Item: nil, - StartHeight: 0, +func WithPersistentWatch() ScanRequestOption { + return func(req *ScanRequest) { + req.IsPersistent = true } +} + +func newScanRequest(options ...ScanRequestOption) *ScanRequest { + req := &ScanRequest{} for _, option := range options { option(req) } diff --git a/pkg/scanner/request_test.go b/pkg/scanner/request_test.go index 3a63cf8..56f1ada 100644 --- a/pkg/scanner/request_test.go +++ b/pkg/scanner/request_test.go @@ -45,15 +45,26 @@ func TestRequestOptions(t *testing.T) { Item: &fakeWatchItem{ bytes: []byte("fake"), }, - StartHeight: 0, + StartHeight: 0, + IsPersistent: false, }, }, { name: "WithStartBlock", option: scanner.WithStartBlock(666), expected: scanner.ScanRequest{ - Item: initialWatchItem, - StartHeight: 666, + Item: initialWatchItem, + StartHeight: 666, + IsPersistent: false, + }, + }, + { + name: "WithPersistentWatch", + option: scanner.WithPersistentWatch(), + expected: scanner.ScanRequest{ + Item: initialWatchItem, + StartHeight: 0, + IsPersistent: true, }, }, } diff --git a/pkg/scanner/scanner.go b/pkg/scanner/scanner.go index d9e1384..e5d8fa4 100644 --- a/pkg/scanner/scanner.go +++ b/pkg/scanner/scanner.go @@ -19,6 +19,9 @@ type Report struct { // BlockHash is the block hash of the block that includes the transaction. BlockHash *chainhash.Hash BlockHeight uint32 + + // the request resolved by the report + Request *ScanRequest } type ScannerService interface { @@ -26,6 +29,7 @@ type ScannerService interface { Start() (<-chan Report, error) // Stop the scanner Stop() + // Add a new request to the queue Watch(...ScanRequestOption) } @@ -74,6 +78,7 @@ func (s *scannerService) Start() (<-chan Report, error) { func (s *scannerService) Stop() { s.quitCh <- struct{}{} s.started = false + s.requestsQueue = newScanRequestQueue() } func (s *scannerService) Watch(opts ...ScanRequestOption) { @@ -122,7 +127,7 @@ func (s *scannerService) requestsManager(ch chan<- Report) { // will check if any blocks has the requested item // if yes, will extract the transaction that match the item // TODO handle properly errors (enqueue the unresolved requests ??) -func (s *scannerService) requestWorker(startHeight uint32, ch chan<- Report) error { +func (s *scannerService) requestWorker(startHeight uint32, reportsChan chan<- Report) error { nextBatch := make([]*ScanRequest, 0) nextHeight := startHeight @@ -163,9 +168,14 @@ func (s *scannerService) requestWorker(startHeight uint32, ch chan<- Report) err return err } - // if some requests generate a report, send them to the channel for _, report := range reports { - ch <- report + // send the report to the output channel + reportsChan <- report + + // if the request is persistent, the scanner will keep watching the item at the next block height + if report.Request.IsPersistent { + s.Watch(WithStartBlock(report.BlockHeight+1), WithWatchItem(report.Request.Item), WithPersistentWatch()) + } } // if some requests remain, put them back in the next batch @@ -242,6 +252,7 @@ func (s *scannerService) extractBlockMatches(blockHash *chainhash.Hash, requests Transaction: tx, BlockHash: blockHash, BlockHeight: block.Header.Height, + Request: req, }) } } diff --git a/pkg/scanner/scanner_test.go b/pkg/scanner/scanner_test.go new file mode 100644 index 0000000..afa0d79 --- /dev/null +++ b/pkg/scanner/scanner_test.go @@ -0,0 +1,133 @@ +package scanner_test + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/tdex-network/tdex-daemon/pkg/explorer/esplora" + "github.com/vulpemventures/neutrino-elements/pkg/blockservice" + "github.com/vulpemventures/neutrino-elements/pkg/node" + "github.com/vulpemventures/neutrino-elements/pkg/protocol" + "github.com/vulpemventures/neutrino-elements/pkg/repository/inmemory" + "github.com/vulpemventures/neutrino-elements/pkg/scanner" +) + +var repoFilter = inmemory.NewFilterInmemory() +var repoHeader = inmemory.NewHeaderInmemory() + +func makeNigiriTestServices() (node.NodeService, scanner.ScannerService, <-chan scanner.Report) { + n, err := node.New(node.NodeConfig{ + Network: "nigiri", + UserAgent: "neutrino-elements:test", + FiltersDB: repoFilter, + BlockHeadersDB: repoHeader, + }) + + if err != nil { + panic(err) + } + + err = n.Start("localhost:18886") + if err != nil { + panic(err) + } + + time.Sleep(time.Second * 3) // wait for the node sync the first headers if the repo is empty + + blockSvc := blockservice.NewEsploraBlockService("http://localhost:3001") + genesisBlockHash := protocol.GetCheckpoints(protocol.MagicNigiri)[0] + h, err := chainhash.NewHashFromStr(genesisBlockHash) + if err != nil { + panic(err) + } + s := scanner.New(repoFilter, repoHeader, blockSvc, h) + + reportCh, err := s.Start() + if err != nil { + panic(err) + } + + return n, s, reportCh +} + +func faucet(addr string) (string, error) { + svc, err := esplora.NewService("http://127.0.0.1:3001", 5000) + if err != nil { + return "", err + } + + return svc.Faucet(addr, 1, "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225") +} + +func TestWatch(t *testing.T) { + const address = "el1qq0mjw2fwsc20vr4q2ypq9w7dslg6436zaahl083qehyghv7td3wnaawhrpxphtjlh4xjwm6mu29tp9uczkl8cxfyatqc3vgms" + + n, s, reportCh := makeNigiriTestServices() + defer s.Stop() + defer n.Stop() + + watchItem, err := scanner.NewScriptWatchItemFromAddress(address) + if err != nil { + t.Fatal(err) + } + + tip, err := n.GetChainTip() + if err != nil { + t.Fatal(err) + } + + s.Watch(scanner.WithStartBlock(tip.Height+1), scanner.WithWatchItem(watchItem)) + txid, err := faucet(address) + if err != nil { + t.Fatal(err) + } + + nextReport := <-reportCh + + if nextReport.Transaction.TxHash().String() != txid { + t.Fatalf("expected txid %s, got %s", txid, nextReport.Transaction.TxHash().String()) + } +} + +func TestWatchPersistent(t *testing.T) { + const address = "el1qqfs4ecf5427tyshnsq0x3jy3ad2tqfn03x3fqmxtyn2ycuvmk98urxmh9cdmr5zcqfs42l6a3kpyrk6pkxjx7yuvqsnuuckhp" + + n, s, reportCh := makeNigiriTestServices() + defer s.Stop() + defer n.Stop() + + watchItem, err := scanner.NewScriptWatchItemFromAddress(address) + if err != nil { + t.Fatal(err) + } + + tip, err := n.GetChainTip() + if err != nil { + t.Fatal(err) + } + + s.Watch(scanner.WithStartBlock(tip.Height+1), scanner.WithWatchItem(watchItem), scanner.WithPersistentWatch()) + txid, err := faucet(address) + if err != nil { + t.Fatal(err) + } + + nextReport := <-reportCh + + if nextReport.Transaction.TxHash().String() != txid { + t.Fatalf("expected txid %s, got %s", txid, nextReport.Transaction.TxHash().String()) + } + + // we test if the watch is persistent by sending a new transaction + txid, err = faucet(address) + if err != nil { + t.Fatal(err) + } + + nextReport = <-reportCh + + if nextReport.Transaction.TxHash().String() != txid { + t.Fatalf("expected txid %s, got %s", txid, nextReport.Transaction.TxHash().String()) + } +}