Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 147 additions & 32 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,11 +1363,7 @@ func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
// the pending funds in a channel that has been forcibly closed have been
// swept.
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
var (
openChannels []*OpenChannel
pruneLinkNode *btcec.PublicKey
)
err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
var b bytes.Buffer
if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
return err
Expand Down Expand Up @@ -1413,44 +1409,72 @@ func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// other open channels with this peer. If we don't we'll
// garbage collect it to ensure we don't establish persistent
// connections to peers without open channels.
pruneLinkNode = chanSummary.RemotePub
openChannels, err = c.fetchOpenChannels(
tx, pruneLinkNode,
)
remotePub := chanSummary.RemotePub
openChannels, err := c.fetchOpenChannels(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to fetch open channels for "+
"peer %x: %v",
pruneLinkNode.SerializeCompressed(), err)
remotePub.SerializeCompressed(), err)
}

return nil
}, func() {
openChannels = nil
pruneLinkNode = nil
})
if err != nil {
return err
}
if len(openChannels) > 0 {
return nil
}

// If there are no open channels with this peer, prune the
// link node. We do this within the same transaction to avoid
// a race condition where a new channel could be opened
// between this check and the deletion.
log.Infof("Pruning link node %x with zero open "+
"channels from database",
remotePub.SerializeCompressed())

err = deleteLinkNode(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to delete link "+
"node: %w", err)
}

// Decide whether we want to remove the link node, based upon the number
// of still open channels.
return c.pruneLinkNode(openChannels, pruneLinkNode)
return nil
}, func() {})
}

// pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op.
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
remotePub *btcec.PublicKey) error {
// the database due to no longer having any open channels with it.
//
// NOTE: This function should be called after an initial check shows no open
// channels exist. It will double-check within a write transaction to avoid a
// race condition where a channel could be opened between the initial check
// and the deletion.
func (c *ChannelStateDB) pruneLinkNode(remotePub *btcec.PublicKey) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
// Double-check for open channels to avoid deleting a link node
// if a channel was opened since the caller's initial check.
//
// NOTE: This avoids a race condition where a channel could be
// opened between the initial check and the deletion.
openChannels, err := c.fetchOpenChannels(tx, remotePub)
if err != nil {
return err
}

if len(openChannels) > 0 {
return nil
}
// If channels exist now, don't prune.
if len(openChannels) > 0 {
return nil
}

log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed())
// No open channels, safe to prune the link node.
log.Infof("Pruning link node %x with zero open channels "+
"from database",
remotePub.SerializeCompressed())

return c.linkNodeDB.DeleteLinkNode(remotePub)
err = deleteLinkNode(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to prune link node: %w", err)
}

return nil
}, func() {})
}

// PruneLinkNodes attempts to prune all link nodes found within the database
Expand Down Expand Up @@ -1479,12 +1503,103 @@ func (c *ChannelStateDB) PruneLinkNodes() error {
return err
}

err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
if len(openChannels) > 0 {
continue
}

err = c.pruneLinkNode(linkNode.IdentityPub)
if err != nil {
return err
}
}

return nil
}

// RepairLinkNodes scans all channels in the database and ensures that a
// link node exists for each remote peer. This should be called on startup to
// ensure that our database is consistent.
//
// NOTE: This function is designed to repair database inconsistencies that may
// have occurred due to the race condition in link node pruning (where link
// nodes could be incorrectly deleted while channels still existed). This can
// be removed once we move to native sql.
func (c *ChannelStateDB) RepairLinkNodes(network wire.BitcoinNet) error {
// In a single read transaction, build a list of all peers with open
// channels and check which ones are missing link nodes.
var missingPeers []*btcec.PublicKey

err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
openChanBucket := tx.ReadBucket(openChannelBucket)
if openChanBucket == nil {
return ErrNoActiveChannels
}

var peersWithChannels []*btcec.PublicKey

err := openChanBucket.ForEach(func(nodePubBytes,
_ []byte) error {

nodePub, err := btcec.ParsePubKey(nodePubBytes)
if err != nil {
return err
}

channels, err := c.fetchOpenChannels(tx, nodePub)
if err != nil {
return err
}

if len(channels) > 0 {
peersWithChannels = append(
peersWithChannels, nodePub,
)
}

return nil
})
if err != nil {
return err
}

// Now check which peers are missing link nodes within the
// same transaction.
missingPeers, err = c.linkNodeDB.FindMissingLinkNodes(
tx, peersWithChannels,
)

return err
}, func() {
missingPeers = nil
})
if err != nil && !errors.Is(err, ErrNoActiveChannels) {
return fmt.Errorf("unable to fetch channels: %w", err)
}

// Early exit if no repairs needed.
if len(missingPeers) == 0 {
return nil
}

// Create all missing link nodes in a single write transaction
// using the LinkNodeDB abstraction.
linkNodesToCreate := make([]*LinkNode, 0, len(missingPeers))
for _, remotePub := range missingPeers {
linkNode := NewLinkNode(c.linkNodeDB, network, remotePub)
linkNodesToCreate = append(linkNodesToCreate, linkNode)

log.Infof("Repairing missing link node for peer %x",
remotePub.SerializeCompressed())
}

err = c.linkNodeDB.CreateLinkNodes(nil, linkNodesToCreate)
if err != nil {
return err
}

log.Infof("Repaired %d missing link nodes on startup",
len(missingPeers))

return nil
}

Expand Down
91 changes: 91 additions & 0 deletions channeldb/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package channeldb

import (
"bytes"
"errors"
"fmt"
"io"
"net"
"time"
Expand Down Expand Up @@ -134,6 +136,95 @@ type LinkNodeDB struct {
backend kvdb.Backend
}

// FindMissingLinkNodes checks which of the provided public keys do not have
// corresponding link nodes in the database. If tx is nil, a new read
// transaction will be created. Otherwise, the provided transaction is used,
// allowing this to be part of a larger batch operation.
func (l *LinkNodeDB) FindMissingLinkNodes(tx kvdb.RTx,
pubKeys []*btcec.PublicKey) ([]*btcec.PublicKey, error) {

var missing []*btcec.PublicKey

findMissing := func(readTx kvdb.RTx) error {
nodeMetaBucket := readTx.ReadBucket(nodeInfoBucket)
if nodeMetaBucket == nil {
// If the bucket doesn't exist, all peers are missing.
missing = pubKeys
return nil
}

for _, pubKey := range pubKeys {
_, err := fetchLinkNode(readTx, pubKey)
if err == nil {
// Link node exists.
continue
}

if !errors.Is(err, ErrNodeNotFound) {
return fmt.Errorf("unable to check link node "+
"for peer %x: %w",
pubKey.SerializeCompressed(), err)
}

// Link node doesn't exist.
missing = append(missing, pubKey)
}

return nil
}

// If no transaction provided, create our own.
if tx == nil {
err := kvdb.View(l.backend, findMissing, func() {
missing = nil
})

return missing, err
}

// Use the provided transaction.
err := findMissing(tx)

return missing, err
}

// CreateLinkNodes creates multiple link nodes. If tx is nil, a new write
// transaction will be created. Otherwise, the provided transaction is used,
// allowing this to be part of a larger batch operation.
func (l *LinkNodeDB) CreateLinkNodes(tx kvdb.RwTx,
linkNodes []*LinkNode) error {

createNodes := func(writeTx kvdb.RwTx) error {
nodeMetaBucket, err := writeTx.CreateTopLevelBucket(
nodeInfoBucket,
)
if err != nil {
return err
}

for _, linkNode := range linkNodes {
err := putLinkNode(nodeMetaBucket, linkNode)
if err != nil {
pubKey := linkNode.IdentityPub.
SerializeCompressed()

return fmt.Errorf("unable to create link "+
"node for peer %x: %w", pubKey, err)
}
}

return nil
}

// If no transaction provided, create our own.
if tx == nil {
return kvdb.Update(l.backend, createNodes, func() {})
}

// Use the provided transaction.
return createNodes(tx)
}

// DeleteLinkNode removes the link node with the given identity from the
// database.
func (l *LinkNodeDB) DeleteLinkNode(identity *btcec.PublicKey) error {
Expand Down
Loading
Loading