Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mempool, netsync, electrum, main, peer: Use utreexotx #214

Merged
2 changes: 1 addition & 1 deletion electrum/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func handleTransactionBroadcast(s *ElectrumServer, cmd *btcjson.Request, conn ne
}
tx.MsgTx().UData = udata

acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, false, false, 0)
acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, udata, false, false, 0)
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
Expand Down
2 changes: 1 addition & 1 deletion mempool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type TxMempool interface {
// error is nil, the list will include the passed transaction itself
// along with any additional orphan transactions that were added as a
// result of the passed one being accepted.
ProcessTransaction(tx *btcutil.Tx, allowOrphan,
ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan,
rateLimit bool, tag Tag) ([]*TxDesc, error)

// RemoveTransaction removes the passed transaction from the mempool.
Expand Down
39 changes: 32 additions & 7 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,22 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcutil
return txD
}

// addUtreexoData add the passed leaves to the memory pool and caches the proof to the accumulator.
// It should not be called directly as it doesn't perform any validation.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) addUtreexoData(tx *btcutil.Tx, udata *wire.UData) error {
// Ingest the proof. Shouldn't error out with the proof being invalid
// here since we've already verified it above.
err := mp.cfg.VerifyUData(udata, tx.MsgTx().TxIn, true)
if err != nil {
return fmt.Errorf("error while ingesting proof. %v", err)
}
mp.poolLeaves[*tx.Hash()] = udata.LeafDatas

return nil
}

// checkPoolDoubleSpend checks whether or not the passed transaction is
// attempting to spend coins already spent by other transactions in the pool.
// If it does, we'll check whether each of those transactions are signaling for
Expand Down Expand Up @@ -1003,14 +1019,14 @@ func (mp *TxPool) validateReplacement(tx *btcutil.Tx,
// more details.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit,
func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData, isNew, rateLimit,
rejectDupOrphans bool) ([]*chainhash.Hash, *TxDesc, error) {

txHash := tx.Hash()

// Check for mempool acceptance.
r, err := mp.checkMempoolAcceptance(
tx, nil, isNew, rateLimit, rejectDupOrphans,
tx, utreexoData, isNew, rateLimit, rejectDupOrphans,
)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1038,6 +1054,13 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit,
// it for the ingestion.
mp.removeTransaction(conflict, false)
}
if utreexoData != nil {
err = mp.addUtreexoData(tx, utreexoData)
if err != nil {
return nil, nil, err
}
}

txD := mp.addTransaction(r.utxoView, tx, r.bestHeight, int64(r.TxFee))

log.Debugf("Accepted transaction %v (pool size: %v)", txHash,
Expand All @@ -1057,10 +1080,12 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit,
// be added to the orphan pool.
//
// This function is safe for concurrent access.
func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) {
func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData,
isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) {

// Protect concurrent access.
mp.mtx.Lock()
hashes, txD, err := mp.maybeAcceptTransaction(tx, isNew, rateLimit, true)
hashes, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, isNew, rateLimit, true)
mp.mtx.Unlock()

return hashes, txD, err
Expand Down Expand Up @@ -1103,7 +1128,7 @@ func (mp *TxPool) processOrphans(acceptedTx *btcutil.Tx) []*TxDesc {
// Potentially accept an orphan into the tx pool.
for _, tx := range orphans {
missing, txD, err := mp.maybeAcceptTransaction(
tx, true, true, false)
tx, nil, true, true, false)
if err != nil {
// The orphan is now invalid, so there
// is no way any other orphans which
Expand Down Expand Up @@ -1178,15 +1203,15 @@ func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc {
// the passed one being accepted.
//
// This function is safe for concurrent access.
func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
log.Tracef("Processing transaction %v", tx.Hash())

// Protect concurrent access.
mp.mtx.Lock()
defer mp.mtx.Unlock()

// Potentially accept the transaction to the memory pool.
missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit,
missingParents, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, true, rateLimit,
true)
if err != nil {
return nil, err
Expand Down
24 changes: 12 additions & 12 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (ctx *testContext) addSignedTx(inputs []spendableOutput,
ctx.harness.chain.SetMedianTimePast(time.Now())
} else {
acceptedTxns, err := ctx.harness.txPool.ProcessTransaction(
tx, true, false, 0,
tx, nil, true, false, 0,
)
if err != nil {
ctx.t.Fatalf("unable to process transaction: %v", err)
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestSimpleOrphanChain(t *testing.T) {
// Ensure the orphans are accepted (only up to the maximum allowed so
// none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand All @@ -498,7 +498,7 @@ func TestSimpleOrphanChain(t *testing.T) {
// all get accepted. Notice the accept orphans flag is also false here
// to ensure it has no bearing on whether or not already existing
// orphans in the pool are linked.
acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0],
acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0], nil,
false, false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -537,7 +537,7 @@ func TestOrphanReject(t *testing.T) {

// Ensure orphans are rejected when the allow orphans flag is not set.
for _, tx := range chainedTxns[1:] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, false,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, false,
false, 0)
if err == nil {
t.Fatalf("ProcessTransaction: did not fail on orphan "+
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestOrphanEviction(t *testing.T) {
// Add enough orphans to exceed the max allowed while ensuring they are
// all accepted. This will cause an eviction.
for _, tx := range chainedTxns[1:] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -658,7 +658,7 @@ func TestBasicOrphanRemoval(t *testing.T) {
// Ensure the orphans are accepted (only up to the maximum allowed so
// none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -733,7 +733,7 @@ func TestOrphanChainRemoval(t *testing.T) {
// Ensure the orphans are accepted (only up to the maximum allowed so
// none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -796,7 +796,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
// Start by adding the orphan transactions from the generated chain
// except the final one.
for _, tx := range chainedTxns[1:maxOrphans] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand All @@ -822,7 +822,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
if err != nil {
t.Fatalf("unable to create signed tx: %v", err)
}
acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx,
acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx, nil,
true, false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid orphan %v",
Expand All @@ -841,7 +841,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
//
// This will cause the shared output to become a concrete spend which
// will in turn must cause the double spending orphan to be removed.
acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0],
acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0], nil,
false, false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid tx %v", err)
Expand Down Expand Up @@ -889,7 +889,7 @@ func TestCheckSpend(t *testing.T) {
t.Fatalf("unable to create transaction chain: %v", err)
}
for _, tx := range chainedTxns {
_, err := harness.txPool.ProcessTransaction(tx, true,
_, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept "+
Expand Down Expand Up @@ -1817,7 +1817,7 @@ func TestRBF(t *testing.T) {
// it's not a valid one, we should see the error
// expected by the test.
_, err = ctx.harness.txPool.ProcessTransaction(
replacementTx, false, false, 0,
replacementTx, nil, false, false, 0,
)
if testCase.err == "" && err != nil {
ctx.t.Fatalf("expected no error when "+
Expand Down
2 changes: 1 addition & 1 deletion mempool/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *MockTxMempool) HaveTransaction(hash *chainhash.Hash) bool {
// free-standing transactions into the memory pool. It includes functionality
// such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool.
func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, allowOrphan,
func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan,
rateLimit bool, tag Tag) ([]*TxDesc, error) {

args := m.Called(tx, allowOrphan, rateLimit, tag)
Expand Down
44 changes: 34 additions & 10 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ type txMsg struct {
reply chan struct{}
}

// utreexoTxMsg packages a bitcoin utreexo tx message and the peer it came from together
// so the block handler has access to that information.
type utreexoTxMsg struct {
utreexoTx *btcutil.UtreexoTx
peer *peerpkg.Peer
reply chan struct{}
}

// getSyncPeerMsg is a message type to be sent across the message channel for
// retrieving the current sync peer.
type getSyncPeerMsg struct {
Expand Down Expand Up @@ -591,8 +599,7 @@ func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) {
}

// handleTxMsg handles transaction messages from all peers.
func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
peer := tmsg.peer
func (sm *SyncManager) handleTxMsg(tx *btcutil.Tx, peer *peerpkg.Peer, utreexoData *wire.UData) {
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received tx message from unknown peer %s", peer)
Expand All @@ -607,7 +614,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
// spec to proliferate. While this is not ideal, there is no check here
// to disconnect peers for sending unsolicited transactions to provide
// interoperability.
txHash := tmsg.tx.Hash()
txHash := tx.Hash()

// Ignore transactions that we have already rejected. Do not
// send a reject message here because if the transaction was already
Expand All @@ -620,7 +627,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {

// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
acceptedTxs, err := sm.txMemPool.ProcessTransaction(tx, utreexoData,
true, true, mempool.Tag(peer.ID()))

// Remove transaction from request maps. Either the mempool/chain
Expand Down Expand Up @@ -1380,9 +1387,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// Add it to the request queue.
state.requestQueue = append(state.requestQueue, iv)

// If the inv is for a utreexo tx, then also pop off the utreexo
// proof hash invs and add it to the request queue.
if peer.IsUtreexoEnabled() {
if sm.chain.IsUtreexoViewActive() {
switch iv.Type {
case wire.InvTypeTx:
case wire.InvTypeWitnessTx:
Expand Down Expand Up @@ -1549,7 +1554,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}

// Add in the utreexo flag then add the tx inv.
iv.Type |= wire.InvUtreexoFlag
iv.Type = wire.InvTypeUtreexoTx
gdmsg.AddInvVect(iv)
numRequested++

Expand Down Expand Up @@ -1597,7 +1602,11 @@ out:
sm.handleNewPeerMsg(msg.peer)

case *txMsg:
sm.handleTxMsg(msg)
sm.handleTxMsg(msg.tx, msg.peer, nil)
msg.reply <- struct{}{}

case *utreexoTxMsg:
sm.handleTxMsg(&msg.utreexoTx.Tx, msg.peer, &msg.utreexoTx.MsgUtreexoTx().UData)
msg.reply <- struct{}{}

case *blockMsg:
Expand Down Expand Up @@ -1749,8 +1758,10 @@ func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Not

// Reinsert all of the transactions (except the coinbase) into
// the transaction pool.
//
// TODO handle txs here for utreexo nodes.
for _, tx := range block.Transactions()[1:] {
_, _, err := sm.txMemPool.MaybeAcceptTransaction(tx,
_, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, nil,
false, false)
if err != nil {
// Remove the transaction and all transactions
Expand Down Expand Up @@ -1789,6 +1800,19 @@ func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan str
sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
}

// QueueUtreexoTx adds the passed transaction message and peer to the block handling
// queue. Responds to the done channel argument after the utreexo tx message is
// processed.
func (sm *SyncManager) QueueUtreexoTx(tx *btcutil.UtreexoTx, peer *peerpkg.Peer, done chan struct{}) {
// Don't accept more transactions if we're shutting down.
if atomic.LoadInt32(&sm.shutdown) != 0 {
done <- struct{}{}
return
}

sm.msgChan <- &utreexoTxMsg{utreexoTx: tx, peer: peer, reply: done}
}

// QueueBlock adds the passed block message and peer to the block handling
// queue. Responds to the done channel argument after the block message is
// processed.
Expand Down
12 changes: 12 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ type MessageListeners struct {
// OnTx is invoked when a peer receives a tx bitcoin message.
OnTx func(p *Peer, msg *wire.MsgTx)

// OnUtreexoTx is invoked when a peer receives a utreexo tx bitcoin message.
OnUtreexoTx func(p *Peer, msg *wire.MsgUtreexoTx)

// OnBlock is invoked when a peer receives a block bitcoin message.
OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte)

Expand Down Expand Up @@ -1175,6 +1178,7 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st
pendingResponses[wire.CmdBlock] = deadline
pendingResponses[wire.CmdMerkleBlock] = deadline
pendingResponses[wire.CmdTx] = deadline
pendingResponses[wire.CmdUtreexoTx] = deadline
pendingResponses[wire.CmdNotFound] = deadline

case wire.CmdGetHeaders:
Expand Down Expand Up @@ -1234,10 +1238,13 @@ out:
fallthrough
case wire.CmdTx:
fallthrough
case wire.CmdUtreexoTx:
fallthrough
case wire.CmdNotFound:
delete(pendingResponses, wire.CmdBlock)
delete(pendingResponses, wire.CmdMerkleBlock)
delete(pendingResponses, wire.CmdTx)
delete(pendingResponses, wire.CmdUtreexoTx)
delete(pendingResponses, wire.CmdNotFound)

default:
Expand Down Expand Up @@ -1439,6 +1446,11 @@ out:
p.cfg.Listeners.OnTx(p, msg)
}

case *wire.MsgUtreexoTx:
if p.cfg.Listeners.OnUtreexoTx != nil {
p.cfg.Listeners.OnUtreexoTx(p, msg)
}

case *wire.MsgBlock:
if p.cfg.Listeners.OnBlock != nil {
p.cfg.Listeners.OnBlock(p, msg, buf)
Expand Down
2 changes: 1 addition & 1 deletion rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4273,7 +4273,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan
// rpcProcessTx checks that the tx is accepted into the mempool and relays it to peers
// and other processes.
func (s *rpcServer) rpcProcessTx(tx *btcutil.Tx, allowOrphan, rateLimit bool) error {
acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, allowOrphan, rateLimit, 0)
acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, nil, allowOrphan, rateLimit, 0)
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
Expand Down
Loading
Loading