Skip to content

Commit

Permalink
PersistentWatch requests support (#12)
Browse files Browse the repository at this point in the history
* increase timeout for pong msg

* add persistent request logic

* re-init requestsQueue in scanner.Stop method

* fix after review
  • Loading branch information
louisinger authored Apr 26, 2022
1 parent df3ddfc commit 1c4a6b5
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type State struct {
}

func main() {
// logrus.SetLevel(logrus.DebugLevel)
logrus.SetLevel(logrus.DebugLevel)

state := &State{
filtersDB: inmemory.NewFilterInmemory(),
Expand Down
3 changes: 2 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func startAction(state *State) cli.ActionFunc {
return func(c *cli.Context) error {
peers := c.StringSlice("peers")
peers := c.StringSlice("connect")
if len(peers) == 0 {
return cli.Exit("peers must be specified", 1)
}
Expand Down Expand Up @@ -82,6 +82,7 @@ func startAction(state *State) cli.ActionFunc {
scanSvc.Watch(
scanner.WithStartBlock(0),
scanner.WithWatchItem(watchItem),
scanner.WithPersistentWatch(),
)
if err != nil {
panic(err)
Expand Down
7 changes: 1 addition & 6 deletions pkg/node/cmd_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io"

"github.com/sirupsen/logrus"
"github.com/vulpemventures/neutrino-elements/pkg/binary"
"github.com/vulpemventures/neutrino-elements/pkg/peer"
"github.com/vulpemventures/neutrino-elements/pkg/protocol"
Expand All @@ -14,15 +13,11 @@ import (
func (no node) handleBlock(header *protocol.MessageHeader, p peer.Peer) error {
var block protocol.MsgBlock

currentChainTip, err := no.blockHeadersDb.ChainTip(context.Background())
_, err := no.blockHeadersDb.ChainTip(context.Background())
if err != nil && err != repository.ErrNoBlocksHeaders {
return err
}

if currentChainTip != nil {
logrus.Println(currentChainTip.Height)
}

lr := io.LimitReader(p.Connection(), int64(header.Length))
if err := binary.NewDecoder(lr).Decode(&block); err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (

const (
pingIntervalSec = 120
pingTimeoutSec = 30
pingTimeoutSec = 60
)

type NodeService interface {
Start(initialOutboundPeerAddr string) error
Stop() error
AddOutboundPeer(peer.Peer) error
SendTransaction(txhex string) error
GetChainTip() (*block.Header, error)
}

// node implements an Elements full node.
Expand Down Expand Up @@ -79,6 +80,10 @@ func New(config NodeConfig) (NodeService, error) {
}, nil
}

func (no node) GetChainTip() (*block.Header, error) {
return no.blockHeadersDb.ChainTip(context.Background())
}

// AddOutboundPeer sends a new version message to a new peer
// returns an error if the peer is already connected.
// it also starts a goroutine to monitor the peer's messages.
Expand Down
16 changes: 10 additions & 6 deletions pkg/scanner/request.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package scanner

type ScanRequest struct {
StartHeight uint32 // nil means scan from genesis block
Item WatchItem // item to watch
StartHeight uint32 // nil means scan from genesis block
Item WatchItem // item to watch
IsPersistent bool // if true, the request will be re-added with StartHeight = StartHeiht + 1
}

type ScanRequestOption func(req *ScanRequest)
Expand All @@ -19,11 +20,14 @@ func WithStartBlock(blockHeight uint32) ScanRequestOption {
}
}

func newScanRequest(options ...ScanRequestOption) *ScanRequest {
req := &ScanRequest{
Item: nil,
StartHeight: 0,
func WithPersistentWatch() ScanRequestOption {
return func(req *ScanRequest) {
req.IsPersistent = true
}
}

func newScanRequest(options ...ScanRequestOption) *ScanRequest {
req := &ScanRequest{}
for _, option := range options {
option(req)
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/scanner/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,26 @@ func TestRequestOptions(t *testing.T) {
Item: &fakeWatchItem{
bytes: []byte("fake"),
},
StartHeight: 0,
StartHeight: 0,
IsPersistent: false,
},
},
{
name: "WithStartBlock",
option: scanner.WithStartBlock(666),
expected: scanner.ScanRequest{
Item: initialWatchItem,
StartHeight: 666,
Item: initialWatchItem,
StartHeight: 666,
IsPersistent: false,
},
},
{
name: "WithPersistentWatch",
option: scanner.WithPersistentWatch(),
expected: scanner.ScanRequest{
Item: initialWatchItem,
StartHeight: 0,
IsPersistent: true,
},
},
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ type Report struct {
// BlockHash is the block hash of the block that includes the transaction.
BlockHash *chainhash.Hash
BlockHeight uint32

// the request resolved by the report
Request *ScanRequest
}

type ScannerService interface {
// Start runs a go-routine in order to handle incoming requests via Watch
Start() (<-chan Report, error)
// Stop the scanner
Stop()
// Add a new request to the queue
Watch(...ScanRequestOption)
}

Expand Down Expand Up @@ -74,6 +78,7 @@ func (s *scannerService) Start() (<-chan Report, error) {
func (s *scannerService) Stop() {
s.quitCh <- struct{}{}
s.started = false
s.requestsQueue = newScanRequestQueue()
}

func (s *scannerService) Watch(opts ...ScanRequestOption) {
Expand Down Expand Up @@ -122,7 +127,7 @@ func (s *scannerService) requestsManager(ch chan<- Report) {
// will check if any blocks has the requested item
// if yes, will extract the transaction that match the item
// TODO handle properly errors (enqueue the unresolved requests ??)
func (s *scannerService) requestWorker(startHeight uint32, ch chan<- Report) error {
func (s *scannerService) requestWorker(startHeight uint32, reportsChan chan<- Report) error {
nextBatch := make([]*ScanRequest, 0)
nextHeight := startHeight

Expand Down Expand Up @@ -163,9 +168,14 @@ func (s *scannerService) requestWorker(startHeight uint32, ch chan<- Report) err
return err
}

// if some requests generate a report, send them to the channel
for _, report := range reports {
ch <- report
// send the report to the output channel
reportsChan <- report

// if the request is persistent, the scanner will keep watching the item at the next block height
if report.Request.IsPersistent {
s.Watch(WithStartBlock(report.BlockHeight+1), WithWatchItem(report.Request.Item), WithPersistentWatch())
}
}

// if some requests remain, put them back in the next batch
Expand Down Expand Up @@ -242,6 +252,7 @@ func (s *scannerService) extractBlockMatches(blockHash *chainhash.Hash, requests
Transaction: tx,
BlockHash: blockHash,
BlockHeight: block.Header.Height,
Request: req,
})
}
}
Expand Down
133 changes: 133 additions & 0 deletions pkg/scanner/scanner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package scanner_test

import (
"testing"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/tdex-network/tdex-daemon/pkg/explorer/esplora"
"github.com/vulpemventures/neutrino-elements/pkg/blockservice"
"github.com/vulpemventures/neutrino-elements/pkg/node"
"github.com/vulpemventures/neutrino-elements/pkg/protocol"
"github.com/vulpemventures/neutrino-elements/pkg/repository/inmemory"
"github.com/vulpemventures/neutrino-elements/pkg/scanner"
)

var repoFilter = inmemory.NewFilterInmemory()
var repoHeader = inmemory.NewHeaderInmemory()

func makeNigiriTestServices() (node.NodeService, scanner.ScannerService, <-chan scanner.Report) {
n, err := node.New(node.NodeConfig{
Network: "nigiri",
UserAgent: "neutrino-elements:test",
FiltersDB: repoFilter,
BlockHeadersDB: repoHeader,
})

if err != nil {
panic(err)
}

err = n.Start("localhost:18886")
if err != nil {
panic(err)
}

time.Sleep(time.Second * 3) // wait for the node sync the first headers if the repo is empty

blockSvc := blockservice.NewEsploraBlockService("http://localhost:3001")
genesisBlockHash := protocol.GetCheckpoints(protocol.MagicNigiri)[0]
h, err := chainhash.NewHashFromStr(genesisBlockHash)
if err != nil {
panic(err)
}
s := scanner.New(repoFilter, repoHeader, blockSvc, h)

reportCh, err := s.Start()
if err != nil {
panic(err)
}

return n, s, reportCh
}

func faucet(addr string) (string, error) {
svc, err := esplora.NewService("http://127.0.0.1:3001", 5000)
if err != nil {
return "", err
}

return svc.Faucet(addr, 1, "5ac9f65c0efcc4775e0baec4ec03abdde22473cd3cf33c0419ca290e0751b225")
}

func TestWatch(t *testing.T) {
const address = "el1qq0mjw2fwsc20vr4q2ypq9w7dslg6436zaahl083qehyghv7td3wnaawhrpxphtjlh4xjwm6mu29tp9uczkl8cxfyatqc3vgms"

n, s, reportCh := makeNigiriTestServices()
defer s.Stop()
defer n.Stop()

watchItem, err := scanner.NewScriptWatchItemFromAddress(address)
if err != nil {
t.Fatal(err)
}

tip, err := n.GetChainTip()
if err != nil {
t.Fatal(err)
}

s.Watch(scanner.WithStartBlock(tip.Height+1), scanner.WithWatchItem(watchItem))
txid, err := faucet(address)
if err != nil {
t.Fatal(err)
}

nextReport := <-reportCh

if nextReport.Transaction.TxHash().String() != txid {
t.Fatalf("expected txid %s, got %s", txid, nextReport.Transaction.TxHash().String())
}
}

func TestWatchPersistent(t *testing.T) {
const address = "el1qqfs4ecf5427tyshnsq0x3jy3ad2tqfn03x3fqmxtyn2ycuvmk98urxmh9cdmr5zcqfs42l6a3kpyrk6pkxjx7yuvqsnuuckhp"

n, s, reportCh := makeNigiriTestServices()
defer s.Stop()
defer n.Stop()

watchItem, err := scanner.NewScriptWatchItemFromAddress(address)
if err != nil {
t.Fatal(err)
}

tip, err := n.GetChainTip()
if err != nil {
t.Fatal(err)
}

s.Watch(scanner.WithStartBlock(tip.Height+1), scanner.WithWatchItem(watchItem), scanner.WithPersistentWatch())
txid, err := faucet(address)
if err != nil {
t.Fatal(err)
}

nextReport := <-reportCh

if nextReport.Transaction.TxHash().String() != txid {
t.Fatalf("expected txid %s, got %s", txid, nextReport.Transaction.TxHash().String())
}

// we test if the watch is persistent by sending a new transaction
txid, err = faucet(address)
if err != nil {
t.Fatal(err)
}

nextReport = <-reportCh

if nextReport.Transaction.TxHash().String() != txid {
t.Fatalf("expected txid %s, got %s", txid, nextReport.Transaction.TxHash().String())
}
}

0 comments on commit 1c4a6b5

Please sign in to comment.