diff --git a/cln/cln_client.go b/cln/cln_client.go index dc8d7e89..ccd3e141 100644 --- a/cln/cln_client.go +++ b/cln/cln_client.go @@ -198,7 +198,7 @@ func (c *ClnClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh pubkey := hex.EncodeToString(peerID) channels, err := client.GetPeerChannels(pubkey) if err != nil { - log.Printf("CLN: client.GetPeer(%s) error: %v", pubkey, err) + log.Printf("CLN: client.GetPeerChannels(%s) error: %v", pubkey, err) return nil, err } @@ -206,20 +206,15 @@ func (c *ClnClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh for _, c := range channels { log.Printf("getChannel destination: %s, Short channel id: %v, local alias: %v , FundingTxID:%v, State:%v ", pubkey, c.ShortChannelId, c.Alias.Local, c.FundingTxId, c.State) if slices.Contains(OPEN_STATUSES, c.State) && c.FundingTxId == fundingTxID { - confirmedChanID, err := lightning.NewShortChannelIDFromString(c.ShortChannelId) - if err != nil { - fmt.Printf("NewShortChannelIDFromString %v error: %v", c.ShortChannelId, err) - return nil, err - } - initialChanID, err := lightning.NewShortChannelIDFromString(c.Alias.Local) + + aliasScid, confirmedScid, err := mapScidsFromChannel(c) if err != nil { - fmt.Printf("NewShortChannelIDFromString %v error: %v", c.Alias.Local, err) return nil, err } return &lightning.GetChannelResult{ - InitialChannelID: *initialChanID, - ConfirmedChannelID: *confirmedChanID, - HtlcMinimumMsat: c.MinimumHtlcOutMsat.MSat(), + AliasScid: aliasScid, + ConfirmedScid: confirmedScid, + HtlcMinimumMsat: c.MinimumHtlcOutMsat.MSat(), }, nil } } @@ -324,3 +319,62 @@ func (c *ClnClient) WaitOnline(peerID []byte, deadline time.Time) error { func (c *ClnClient) WaitChannelActive(peerID []byte, deadline time.Time) error { return nil } + +func (c *ClnClient) ListChannels() ([]*lightning.Channel, error) { + channels, err := c.client.ListPeerChannels() + if err != nil { + return nil, err + } + + result := make([]*lightning.Channel, len(channels)) + for i, channel := range channels { + peerId, err := hex.DecodeString(channel.PeerId) + if err != nil { + log.Printf("cln.ListChannels returned channel without peer id: %+v", channel) + continue + } + aliasScid, confirmedScid, err := mapScidsFromChannel(channel) + if err != nil { + return nil, err + } + + var outpoint *wire.OutPoint + fundingTxId, err := hex.DecodeString(channel.FundingTxId) + if err == nil && fundingTxId != nil && len(fundingTxId) > 0 { + outpoint, _ = lightning.NewOutPoint(fundingTxId, channel.FundingOutnum) + } + if outpoint == nil { + log.Printf("cln.ListChannels returned channel without outpoint: %+v", channel) + continue + } + result[i] = &lightning.Channel{ + AliasScid: aliasScid, + ConfirmedScid: confirmedScid, + ChannelPoint: outpoint, + PeerId: peerId, + } + } + + return result, nil +} + +func mapScidsFromChannel(c *glightning.PeerChannel) (*lightning.ShortChannelID, *lightning.ShortChannelID, error) { + var confirmedScid *lightning.ShortChannelID + var aliasScid *lightning.ShortChannelID + var err error + if c.ShortChannelId != "" { + confirmedScid, err = lightning.NewShortChannelIDFromString(c.ShortChannelId) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse scid '%s': %w", c.ShortChannelId, err) + } + } + + if c.Alias != nil && c.Alias.Local != "" { + aliasScid, err = lightning.NewShortChannelIDFromString(c.Alias.Local) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse scid '%s': %w", c.Alias.Local, err) + } + } + + return aliasScid, confirmedScid, nil +} diff --git a/cln/forwards_sync.go b/cln/forwards_sync.go new file mode 100644 index 00000000..58361577 --- /dev/null +++ b/cln/forwards_sync.go @@ -0,0 +1,201 @@ +package cln + +import ( + "context" + "fmt" + "log" + "math" + "strconv" + "time" + + "github.com/breez/lspd/history" + "github.com/breez/lspd/lightning" + "github.com/elementsproject/glightning/glightning" +) + +type ForwardSync struct { + createdOffset uint64 + updatedOffset uint64 + nodeid []byte + client *ClnClient + store history.Store +} + +func NewForwardSync(nodeid []byte, client *ClnClient, store history.Store) *ForwardSync { + return &ForwardSync{ + nodeid: nodeid, + client: client, + store: store, + } +} + +var forwardSyncInterval time.Duration = time.Minute * 5 + +func (s *ForwardSync) ForwardsSynchronize(ctx context.Context) { + s.createdOffset, s.updatedOffset, _ = s.store.FetchClnForwardOffsets(ctx, s.nodeid) + s.forwardsSynchronizeOnce(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(forwardSyncInterval): + } + + s.forwardsSynchronizeOnce(ctx) + } +} + +func (s *ForwardSync) forwardsSynchronizeOnce(ctx context.Context) { + s.forwardCreatedSynchronizeOnce(ctx) + s.forwardUpdatedSynchronizeOnce(ctx) +} + +func (s *ForwardSync) forwardCreatedSynchronizeOnce(ctx context.Context) { + log.Printf("forwardCreatedSynchronizeOnce(%x) - Begin", s.nodeid) + var limit uint32 = 10000 + endReached := false + round := 0 + for !endReached { + log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.createdOffset) + var forwards []*history.Forward + var newCreatedOffset uint64 + var err error + forwards, newCreatedOffset, endReached, err = s.listForwards("created", s.createdOffset, limit) + if err != nil { + log.Printf("forwardCreatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err) + return + } + + log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.createdOffset, len(forwards)) + if len(forwards) == 0 { + break + } + + err = s.store.InsertForwards(ctx, forwards, s.nodeid) + if err != nil { + log.Printf("forwardCreatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err) + return + } + + err = s.store.SetClnForwardOffsets(ctx, s.nodeid, newCreatedOffset, s.updatedOffset) + if err != nil { + log.Printf("forwardCreatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err) + return + } + + s.createdOffset = newCreatedOffset + } + + log.Printf("forwardCreatedSynchronizeOnce(%x) - Done", s.nodeid) +} + +func (s *ForwardSync) forwardUpdatedSynchronizeOnce(ctx context.Context) { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - Begin", s.nodeid) + var limit uint32 = 10000 + endReached := false + round := 0 + for !endReached { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.updatedOffset) + var forwards []*history.Forward + var newUpdatedOffset uint64 + var err error + forwards, newUpdatedOffset, endReached, err = s.listForwards("updated", s.updatedOffset, limit) + if err != nil { + log.Printf("forwardUpdatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err) + return + } + + log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.updatedOffset, len(forwards)) + if len(forwards) == 0 { + break + } + + err = s.store.UpdateForwards(ctx, forwards, s.nodeid) + if err != nil { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err) + return + } + + err = s.store.SetClnForwardOffsets(ctx, s.nodeid, s.createdOffset, newUpdatedOffset) + if err != nil { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err) + return + } + + s.updatedOffset = newUpdatedOffset + } + + log.Printf("forwardUpdatedSynchronizeOnce(%x) - Done", s.nodeid) +} + +func (s *ForwardSync) listForwards(index string, offset uint64, limit uint32) ([]*history.Forward, uint64, bool, error) { + var response struct { + Forwards []Forward `json:"forwards"` + } + + client, err := s.client.getClient() + if err != nil { + return nil, 0, false, err + } + + err = client.Request(&glightning.ListForwardsRequest{ + Status: "settled", + Index: index, + Start: offset, + Limit: limit * 2, + }, &response) + if err != nil { + return nil, 0, false, err + } + + var result []*history.Forward + endReached := len(response.Forwards) < int(limit) + var lastIndex *uint64 + for _, forward := range response.Forwards { + in, err := lightning.NewShortChannelIDFromString(forward.InChannel) + if err != nil { + return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.InChannel, err) + } + out, err := lightning.NewShortChannelIDFromString(forward.OutChannel) + if err != nil { + return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.OutChannel, err) + } + + sec, dec := math.Modf(forward.ResolvedTime) + result = append(result, &history.Forward{ + Identifier: strconv.FormatUint(forward.CreatedIndex, 10), + InChannel: *in, + OutChannel: *out, + InMsat: forward.InMsat.MSat(), + OutMsat: forward.OutMsat.MSat(), + ResolvedTime: time.Unix(int64(sec), int64(dec*(1e9))), + }) + if index == "created" { + lastIndex = &forward.CreatedIndex + } else { + lastIndex = &forward.UpdatedIndex + } + } + + var newOffset uint64 + if lastIndex == nil { + newOffset = 0 + } else { + newOffset = *lastIndex + 1 + } + return result, newOffset, endReached, nil +} + +type Forward struct { + CreatedIndex uint64 `json:"created_index"` + UpdatedIndex uint64 `json:"updated_index"` + InChannel string `json:"in_channel"` + OutChannel string `json:"out_channel"` + InMsat glightning.Amount `json:"in_msat"` + OutMsat glightning.Amount `json:"out_msat"` + Status string `json:"status"` + PaymentHash string `json:"payment_hash"` + ReceivedTime float64 `json:"received_time"` + ResolvedTime float64 `json:"resolved_time"` +} diff --git a/common/nodes_service.go b/common/nodes_service.go index e983c3c0..df78b15e 100644 --- a/common/nodes_service.go +++ b/common/nodes_service.go @@ -12,6 +12,7 @@ import ( ) type Node struct { + NodeId []byte Client lightning.Client NodeConfig *config.NodeConfig PrivateKey *btcec.PrivateKey diff --git a/history/channel_sync.go b/history/channel_sync.go new file mode 100644 index 00000000..c4b58ffd --- /dev/null +++ b/history/channel_sync.go @@ -0,0 +1,82 @@ +package history + +import ( + "context" + "log" + "sync" + "time" + + "github.com/breez/lspd/common" +) + +type ChannelSync struct { + nodes []*common.Node + store Store +} + +func NewChannelSync(nodes []*common.Node, store Store) *ChannelSync { + return &ChannelSync{ + nodes: nodes, + store: store, + } +} + +var channelSyncInterval time.Duration = time.Minute * 5 + +func (s *ChannelSync) ChannelsSynchronize(ctx context.Context) { + s.channelsSynchronizeOnce(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(channelSyncInterval): + } + + s.channelsSynchronizeOnce(ctx) + } +} + +func (s *ChannelSync) channelsSynchronizeOnce(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(len(s.nodes)) + for _, n := range s.nodes { + go func(node *common.Node) { + s.channelsSynchronizeNodeOnce(ctx, node) + wg.Done() + }(n) + } + wg.Wait() +} + +func (s *ChannelSync) channelsSynchronizeNodeOnce(ctx context.Context, node *common.Node) { + lastUpdate := time.Now() + log.Printf("ChannelsSynchronizeNodeOnce(%x) - Begin %v", node.NodeId, lastUpdate) + channels, err := node.Client.ListChannels() + if err != nil { + log.Printf("ChannelsSynchronizeNodeOnce(%x)- ListChannels error: %v", node.NodeId, err) + return + } + + updates := make([]*ChannelUpdate, len(channels)) + for i, c := range channels { + if c == nil { + continue + } + updates[i] = &ChannelUpdate{ + NodeID: node.NodeId, + PeerId: c.PeerId, + AliasScid: c.AliasScid, + ConfirmedScid: c.ConfirmedScid, + ChannelPoint: c.ChannelPoint, + LastUpdate: lastUpdate, + } + } + err = s.store.UpdateChannels(ctx, updates) + if err != nil { + log.Printf("ChannelsSynchronizeNodeOnce(%x) - store.UpdateChannels error: %v", node.NodeId, err) + return + } + + log.Printf("ChannelsSynchronizeNodeOnce(%x) - Done %v", node.NodeId, lastUpdate) +} diff --git a/history/store.go b/history/store.go new file mode 100644 index 00000000..2238e629 --- /dev/null +++ b/history/store.go @@ -0,0 +1,36 @@ +package history + +import ( + "context" + "time" + + "github.com/breez/lspd/lightning" + "github.com/btcsuite/btcd/wire" +) + +type ChannelUpdate struct { + NodeID []byte + PeerId []byte + AliasScid *lightning.ShortChannelID + ConfirmedScid *lightning.ShortChannelID + ChannelPoint *wire.OutPoint + LastUpdate time.Time +} + +type Forward struct { + Identifier string + InChannel lightning.ShortChannelID + OutChannel lightning.ShortChannelID + InMsat uint64 + OutMsat uint64 + ResolvedTime time.Time +} + +type Store interface { + UpdateChannels(ctx context.Context, updates []*ChannelUpdate) error + InsertForwards(ctx context.Context, forwards []*Forward, nodeId []byte) error + UpdateForwards(ctx context.Context, forwards []*Forward, nodeId []byte) error + FetchClnForwardOffsets(ctx context.Context, nodeId []byte) (uint64, uint64, error) + SetClnForwardOffsets(ctx context.Context, nodeId []byte, created uint64, updated uint64) error + FetchLndForwardOffset(ctx context.Context, nodeId []byte) (*time.Time, error) +} diff --git a/interceptor/intercept_handler.go b/interceptor/intercept_handler.go index dfea2e9c..c1fe639f 100644 --- a/interceptor/intercept_handler.go +++ b/interceptor/intercept_handler.go @@ -11,6 +11,7 @@ import ( "github.com/breez/lspd/chain" "github.com/breez/lspd/common" "github.com/breez/lspd/config" + "github.com/breez/lspd/history" "github.com/breez/lspd/lightning" "github.com/breez/lspd/lsps0" "github.com/breez/lspd/notifications" @@ -20,9 +21,11 @@ import ( ) type Interceptor struct { + nodeid []byte client lightning.Client config *config.NodeConfig store InterceptStore + historyStore history.Store openingService common.OpeningService feeEstimator chain.FeeEstimator feeStrategy chain.FeeStrategy @@ -34,15 +37,19 @@ func NewInterceptHandler( client lightning.Client, config *config.NodeConfig, store InterceptStore, + historyStore history.Store, openingService common.OpeningService, feeEstimator chain.FeeEstimator, feeStrategy chain.FeeStrategy, notificationService *notifications.NotificationService, ) *Interceptor { + nodeid, _ := hex.DecodeString(config.NodePubkey) return &Interceptor{ + nodeid: nodeid, client: client, config: config, store: store, + historyStore: historyStore, openingService: openingService, feeEstimator: feeEstimator, feeStrategy: feeStrategy, @@ -224,27 +231,31 @@ func (i *Interceptor) Intercept(req common.InterceptRequest) common.InterceptRes for { chanResult, _ := i.client.GetChannel(destination, *channelPoint) if chanResult != nil { - log.Printf("paymentHash: %s, channel opened successfully alias: %v, confirmed: %v", reqPaymentHashStr, chanResult.InitialChannelID.ToString(), chanResult.ConfirmedChannelID.ToString()) - - err := i.store.InsertChannel( - uint64(chanResult.InitialChannelID), - uint64(chanResult.ConfirmedChannelID), - channelPoint.String(), - destination, - time.Now(), - ) - - if err != nil { - log.Printf("paymentHash: %s, insertChannel error: %v", reqPaymentHashStr, err) - return common.InterceptResult{ - Action: common.INTERCEPT_FAIL_HTLC_WITH_CODE, - FailureCode: common.FAILURE_TEMPORARY_CHANNEL_FAILURE, - }, nil + log.Printf("paymentHash: %s, channel opened successfully alias: '%v', confirmed: '%v'", reqPaymentHashStr, chanResult.AliasScid.ToString(), chanResult.ConfirmedScid.ToString()) + + var scid *lightning.ShortChannelID + if chanResult.ConfirmedScid == nil { + if chanResult.AliasScid == nil { + log.Printf("Error: GetChannel: Both confirmed scid and alias scid are nil: %+v", chanResult) + <-time.After(1 * time.Second) + continue + } + scid = chanResult.AliasScid + } else { + scid = chanResult.ConfirmedScid } - channelID := chanResult.ConfirmedChannelID - if uint64(channelID) == 0 { - channelID = chanResult.InitialChannelID + err = i.historyStore.UpdateChannels(context.TODO(), []*history.ChannelUpdate{{ + NodeID: i.nodeid, + PeerId: destination, + AliasScid: chanResult.AliasScid, + ConfirmedScid: chanResult.ConfirmedScid, + ChannelPoint: channelPoint, + LastUpdate: time.Now(), + }}) + if err != nil { + // Don't break here, this is not critical. + log.Printf("paymentHash: %s, failed to insert newly opened channel in history store. %v", reqPaymentHashStr, channelPoint.String()) } useLegacyOnionBlob := slices.Contains(i.config.LegacyOnionTokens, token) @@ -252,7 +263,7 @@ func (i *Interceptor) Intercept(req common.InterceptRequest) common.InterceptRes Action: common.INTERCEPT_RESUME_WITH_ONION, Destination: destination, ChannelPoint: channelPoint, - Scid: channelID, + Scid: *scid, PaymentSecret: paymentSecret, AmountMsat: uint64(amt), TotalAmountMsat: uint64(outgoingAmountMsat), @@ -260,7 +271,7 @@ func (i *Interceptor) Intercept(req common.InterceptRequest) common.InterceptRes }, nil } - log.Printf("paymentHash: %s, waiting for channel to get opened.... %v", reqPaymentHashStr, destination) + log.Printf("paymentHash: %s, waiting for channel to get opened... %x", reqPaymentHashStr, destination) if time.Now().After(deadline) { log.Printf("paymentHash: %s, Stop retrying getChannel(%v, %v)", reqPaymentHashStr, destination, channelPoint.String()) break diff --git a/interceptor/store.go b/interceptor/store.go index 4dcd9827..eeb25271 100644 --- a/interceptor/store.go +++ b/interceptor/store.go @@ -1,8 +1,6 @@ package interceptor import ( - "time" - "github.com/breez/lspd/common" "github.com/btcsuite/btcd/wire" ) @@ -11,5 +9,4 @@ type InterceptStore interface { PaymentInfo(htlcPaymentHash []byte) (string, *common.OpeningFeeParams, []byte, []byte, []byte, int64, int64, *wire.OutPoint, *string, error) SetFundingTx(paymentHash []byte, channelPoint *wire.OutPoint) error RegisterPayment(token string, params *common.OpeningFeeParams, destination, paymentHash, paymentSecret []byte, incomingAmountMsat, outgoingAmountMsat int64, tag string) error - InsertChannel(initialChanID, confirmedChanId uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error } diff --git a/lightning/client.go b/lightning/client.go index 99acd003..757d7e7c 100644 --- a/lightning/client.go +++ b/lightning/client.go @@ -12,9 +12,9 @@ type GetInfoResult struct { } type GetChannelResult struct { - InitialChannelID ShortChannelID - ConfirmedChannelID ShortChannelID - HtlcMinimumMsat uint64 + AliasScid *ShortChannelID + ConfirmedScid *ShortChannelID + HtlcMinimumMsat uint64 } type OpenChannelRequest struct { @@ -25,6 +25,13 @@ type OpenChannelRequest struct { TargetConf *uint32 } +type Channel struct { + AliasScid *ShortChannelID + ConfirmedScid *ShortChannelID + ChannelPoint *wire.OutPoint + PeerId []byte +} + type Client interface { GetInfo() (*GetInfoResult, error) IsConnected(destination []byte) (bool, error) @@ -34,4 +41,5 @@ type Client interface { GetClosedChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) WaitOnline(peerID []byte, deadline time.Time) error WaitChannelActive(peerID []byte, deadline time.Time) error + ListChannels() ([]*Channel, error) } diff --git a/lightning/outpoint.go b/lightning/outpoint.go index 43f2715a..2e186ef7 100644 --- a/lightning/outpoint.go +++ b/lightning/outpoint.go @@ -1,7 +1,11 @@ package lightning import ( + "encoding/hex" + "fmt" "log" + "strconv" + "strings" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" @@ -17,3 +21,22 @@ func NewOutPoint(fundingTxID []byte, index uint32) (*wire.OutPoint, error) { return wire.NewOutPoint(&h, index), nil } + +func NewOutPointFromString(outpoint string) (*wire.OutPoint, error) { + split := strings.Split(outpoint, ":") + if len(split) != 2 { + return nil, fmt.Errorf("invalid outpoint") + } + + fundingTxId, err := hex.DecodeString(split[0]) + if err != nil { + return nil, fmt.Errorf("invalid outpoint") + } + + outnum, err := strconv.ParseUint(split[1], 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid outpoint") + } + + return NewOutPoint(fundingTxId, uint32(outnum)) +} diff --git a/lightning/short_channel_id.go b/lightning/short_channel_id.go index 45ca07ff..8f8d7a2c 100644 --- a/lightning/short_channel_id.go +++ b/lightning/short_channel_id.go @@ -43,6 +43,9 @@ func NewShortChannelIDFromString(channelID string) (*ShortChannelID, error) { } func (c *ShortChannelID) ToString() string { + if c == nil { + return "" + } u := uint64(*c) blockHeight := (u >> 40) & 0xFFFFFF txIndex := (u >> 16) & 0xFFFFFF diff --git a/lnd/client.go b/lnd/client.go index fd9799f6..8f8a938b 100644 --- a/lnd/client.go +++ b/lnd/client.go @@ -13,7 +13,6 @@ import ( "github.com/breez/lspd/lightning" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/chainrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc" @@ -298,17 +297,11 @@ func (c *LndClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh for _, c := range r.Channels { log.Printf("getChannel(%x): %v", peerID, c.ChanId) if c.ChannelPoint == channelPointStr && c.Active { - confirmedChanId := c.ChanId - if c.ZeroConf { - confirmedChanId = c.ZeroConfConfirmedScid - if confirmedChanId == hop.Source.ToUint64() { - confirmedChanId = 0 - } - } + aliasScid, confirmedScid := mapScidsFromChannel(c) return &lightning.GetChannelResult{ - InitialChannelID: lightning.ShortChannelID(c.ChanId), - ConfirmedChannelID: lightning.ShortChannelID(confirmedChanId), - HtlcMinimumMsat: c.LocalConstraints.MinHtlcMsat, + AliasScid: aliasScid, + ConfirmedScid: confirmedScid, + HtlcMinimumMsat: c.LocalConstraints.MinHtlcMsat, }, nil } } @@ -498,3 +491,80 @@ func (c *LndClient) WaitChannelActive(peerID []byte, deadline time.Time) error { return fmt.Errorf("deadline exceeded") } } + +func (c *LndClient) ListChannels() ([]*lightning.Channel, error) { + channels, err := c.client.ListChannels( + context.TODO(), + &lnrpc.ListChannelsRequest{}, + ) + if err != nil { + return nil, err + } + pendingChannels, err := c.client.PendingChannels( + context.TODO(), + &lnrpc.PendingChannelsRequest{}, + ) + if err != nil { + return nil, err + } + + result := make([]*lightning.Channel, len(channels.Channels)) + for i, c := range channels.Channels { + peerId, err := hex.DecodeString(c.RemotePubkey) + if err != nil { + log.Printf("hex.DecodeString in LndClient.ListChannels error: %v", err) + continue + } + alias, confirmedScid := mapScidsFromChannel(c) + outpoint, err := lightning.NewOutPointFromString(c.ChannelPoint) + if err != nil { + log.Printf("lightning.NewOutPointFromString(%s) in LndClient.ListChannels error: %v", c.ChannelPoint, err) + } + + result[i] = &lightning.Channel{ + AliasScid: alias, + ConfirmedScid: confirmedScid, + ChannelPoint: outpoint, + PeerId: peerId, + } + } + + for _, c := range pendingChannels.PendingOpenChannels { + peerId, err := hex.DecodeString(c.Channel.RemoteNodePub) + if err != nil { + log.Printf("hex.DecodeString in LndClient.ListChannels error: %v", err) + continue + } + + outpoint, err := lightning.NewOutPointFromString(c.Channel.ChannelPoint) + if err != nil { + log.Printf("lightning.NewOutPointFromString(%s) in LndClient.ListChannels error: %v", c.Channel.ChannelPoint, err) + } + result = append(result, &lightning.Channel{ + AliasScid: nil, + ConfirmedScid: nil, + ChannelPoint: outpoint, + PeerId: peerId, + }) + } + + return result, nil +} + +func mapScidsFromChannel(c *lnrpc.Channel) (*lightning.ShortChannelID, *lightning.ShortChannelID) { + var alias *lightning.ShortChannelID + var confirmedScid *lightning.ShortChannelID + if c.ZeroConf { + if c.ZeroConfConfirmedScid != 0 { + confirmedScid = (*lightning.ShortChannelID)(&c.ZeroConfConfirmedScid) + } + alias = (*lightning.ShortChannelID)(&c.ChanId) + } else { + confirmedScid = (*lightning.ShortChannelID)(&c.ChanId) + if len(c.AliasScids) > 0 { + alias = (*lightning.ShortChannelID)(&c.AliasScids[0]) + } + } + + return alias, confirmedScid +} diff --git a/lnd/forwarding_event_store.go b/lnd/forwarding_event_store.go deleted file mode 100644 index 22fc4a42..00000000 --- a/lnd/forwarding_event_store.go +++ /dev/null @@ -1,12 +0,0 @@ -package lnd - -type CopyFromSource interface { - Next() bool - Values() ([]interface{}, error) - Err() error -} - -type ForwardingEventStore interface { - LastForwardingEvent() (int64, error) - InsertForwardingEvents(rowSrc CopyFromSource) error -} diff --git a/lnd/forwarding_history.go b/lnd/forwarding_history.go deleted file mode 100644 index debd93fb..00000000 --- a/lnd/forwarding_history.go +++ /dev/null @@ -1,183 +0,0 @@ -package lnd - -import ( - "context" - "encoding/hex" - "fmt" - "log" - "time" - - "github.com/breez/lspd/interceptor" - "github.com/lightningnetwork/lnd/htlcswitch/hop" - "github.com/lightningnetwork/lnd/lnrpc" - "github.com/lightningnetwork/lnd/lnrpc/chainrpc" -) - -type copyFromEvents struct { - events []*lnrpc.ForwardingEvent - idx int - err error -} - -func (cfe *copyFromEvents) Next() bool { - cfe.idx++ - return cfe.idx < len(cfe.events) -} - -func (cfe *copyFromEvents) Values() ([]interface{}, error) { - event := cfe.events[cfe.idx] - values := []interface{}{ - event.TimestampNs, - int64(event.ChanIdIn), int64(event.ChanIdOut), - event.AmtInMsat, event.AmtOutMsat} - return values, nil -} - -func (cfe *copyFromEvents) Err() error { - return cfe.err -} - -type ForwardingHistorySync struct { - client *LndClient - interceptStore interceptor.InterceptStore - forwardingStore ForwardingEventStore -} - -func NewForwardingHistorySync( - client *LndClient, - interceptStore interceptor.InterceptStore, - forwardingStore ForwardingEventStore, -) *ForwardingHistorySync { - return &ForwardingHistorySync{ - client: client, - interceptStore: interceptStore, - forwardingStore: forwardingStore, - } -} - -func (s *ForwardingHistorySync) ChannelsSynchronize(ctx context.Context) { - lastSync := time.Now().Add(-6 * time.Minute) - for { - if ctx.Err() != nil { - return - } - - stream, err := s.client.chainNotifierClient.RegisterBlockEpochNtfn(ctx, &chainrpc.BlockEpoch{}) - if err != nil { - log.Printf("chainNotifierClient.RegisterBlockEpochNtfn(): %v", err) - <-time.After(time.Second) - continue - } - - for { - if ctx.Err() != nil { - return - } - - _, err := stream.Recv() - if err != nil { - log.Printf("stream.Recv: %v", err) - <-time.After(time.Second) - break - } - if lastSync.Add(5 * time.Minute).Before(time.Now()) { - select { - case <-ctx.Done(): - return - case <-time.After(1 * time.Minute): - } - err = s.ChannelsSynchronizeOnce() - lastSync = time.Now() - log.Printf("channelsSynchronizeOnce() err: %v", err) - } - } - } -} - -func (s *ForwardingHistorySync) ChannelsSynchronizeOnce() error { - log.Printf("channelsSynchronizeOnce - begin") - channels, err := s.client.client.ListChannels(context.Background(), &lnrpc.ListChannelsRequest{PrivateOnly: true}) - if err != nil { - log.Printf("ListChannels error: %v", err) - return fmt.Errorf("client.ListChannels() error: %w", err) - } - log.Printf("channelsSynchronizeOnce - received channels") - lastUpdate := time.Now() - for _, c := range channels.Channels { - nodeID, err := hex.DecodeString(c.RemotePubkey) - if err != nil { - log.Printf("hex.DecodeString in channelsSynchronizeOnce error: %v", err) - continue - } - confirmedChanId := c.ChanId - if c.ZeroConf { - confirmedChanId = c.ZeroConfConfirmedScid - if confirmedChanId == hop.Source.ToUint64() { - confirmedChanId = 0 - } - } - err = s.interceptStore.InsertChannel(c.ChanId, confirmedChanId, c.ChannelPoint, nodeID, lastUpdate) - if err != nil { - log.Printf("insertChannel(%v, %v, %x) in channelsSynchronizeOnce error: %v", c.ChanId, c.ChannelPoint, nodeID, err) - continue - } - } - log.Printf("channelsSynchronizeOnce - done") - - return nil -} - -func (s *ForwardingHistorySync) ForwardingHistorySynchronize(ctx context.Context) { - for { - if ctx.Err() != nil { - return - } - - err := s.ForwardingHistorySynchronizeOnce() - log.Printf("forwardingHistorySynchronizeOnce() err: %v", err) - select { - case <-time.After(1 * time.Minute): - case <-ctx.Done(): - } - } -} - -func (s *ForwardingHistorySync) ForwardingHistorySynchronizeOnce() error { - last, err := s.forwardingStore.LastForwardingEvent() - if err != nil { - return fmt.Errorf("lastForwardingEvent() error: %w", err) - } - log.Printf("last1: %v", last) - last = last/1_000_000_000 - 1*3600 - if last <= 0 { - last = 1 - } - log.Printf("last2: %v", last) - now := time.Now() - endTime := uint64(now.Add(time.Hour * 24).Unix()) - indexOffset := uint32(0) - for { - forwardHistory, err := s.client.client.ForwardingHistory(context.Background(), &lnrpc.ForwardingHistoryRequest{ - StartTime: uint64(last), - EndTime: endTime, - NumMaxEvents: 10000, - IndexOffset: indexOffset, - }) - if err != nil { - log.Printf("ForwardingHistory error: %v", err) - return fmt.Errorf("client.ForwardingHistory() error: %w", err) - } - log.Printf("Offset: %v, Events: %v", indexOffset, len(forwardHistory.ForwardingEvents)) - if len(forwardHistory.ForwardingEvents) == 0 { - break - } - indexOffset = forwardHistory.LastOffsetIndex - cfe := copyFromEvents{events: forwardHistory.ForwardingEvents, idx: -1} - err = s.forwardingStore.InsertForwardingEvents(&cfe) - if err != nil { - log.Printf("insertForwardingEvents() error: %v", err) - return fmt.Errorf("insertForwardingEvents() error: %w", err) - } - } - return nil -} diff --git a/lnd/forwards_sync.go b/lnd/forwards_sync.go new file mode 100644 index 00000000..8e9d09ef --- /dev/null +++ b/lnd/forwards_sync.go @@ -0,0 +1,93 @@ +package lnd + +import ( + "context" + "log" + "strconv" + "time" + + "github.com/breez/lspd/history" + "github.com/breez/lspd/lightning" + "github.com/lightningnetwork/lnd/lnrpc" +) + +type ForwardSync struct { + nodeid []byte + client *LndClient + store history.Store +} + +func NewForwardSync( + nodeid []byte, + client *LndClient, + store history.Store, +) *ForwardSync { + return &ForwardSync{ + nodeid: nodeid, + client: client, + store: store, + } +} + +var forwardChannelSyncInterval time.Duration = time.Minute * 5 + +func (s *ForwardSync) ForwardsSynchronize(ctx context.Context) { + s.forwardsSynchronizeOnce(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(forwardChannelSyncInterval): + } + + s.forwardsSynchronizeOnce(ctx) + } +} + +func (s *ForwardSync) forwardsSynchronizeOnce(ctx context.Context) { + last, err := s.store.FetchLndForwardOffset(ctx, s.nodeid) + if err != nil { + log.Printf("forwardsSynchronizeOnce(%x) - FetchLndForwardOffset err: %v", s.nodeid, err) + return + } + + var startTime uint64 + if last != nil { + startTime = uint64(last.UnixNano()) + } + + for { + forwardHistory, err := s.client.client.ForwardingHistory(context.Background(), &lnrpc.ForwardingHistoryRequest{ + StartTime: startTime, + NumMaxEvents: 10000, + }) + if err != nil { + log.Printf("forwardsSynchronizeOnce(%x) - ForwardingHistory error: %v", s.nodeid, err) + return + } + + log.Printf("forwardsSynchronizeOnce(%x) - startTime: %v, Events: %v", s.nodeid, startTime, len(forwardHistory.ForwardingEvents)) + if len(forwardHistory.ForwardingEvents) == 0 { + break + } + + forwards := make([]*history.Forward, len(forwardHistory.ForwardingEvents)) + for i, f := range forwardHistory.ForwardingEvents { + forwards[i] = &history.Forward{ + Identifier: strconv.FormatUint(f.TimestampNs, 10) + "|" + strconv.FormatUint(f.AmtInMsat, 10), + InChannel: lightning.ShortChannelID(f.ChanIdIn), + OutChannel: lightning.ShortChannelID(f.ChanIdOut), + InMsat: f.AmtInMsat, + OutMsat: f.AmtOutMsat, + ResolvedTime: time.Unix(0, int64(f.TimestampNs)), + } + startTime = f.TimestampNs + } + s.store.InsertForwards(ctx, forwards, s.nodeid) + if err != nil { + log.Printf("forwardsSynchronizeOnce(%x) - store.InsertForwards() error: %v", s.nodeid, err) + return + } + } +} diff --git a/lnd/interceptor.go b/lnd/interceptor.go index c3eef64b..0dee610a 100644 --- a/lnd/interceptor.go +++ b/lnd/interceptor.go @@ -23,7 +23,6 @@ import ( ) type LndHtlcInterceptor struct { - fwsync *ForwardingHistorySync interceptor *interceptor.Interceptor config *config.NodeConfig client *LndClient @@ -37,13 +36,11 @@ type LndHtlcInterceptor struct { func NewLndHtlcInterceptor( conf *config.NodeConfig, client *LndClient, - fwsync *ForwardingHistorySync, interceptor *interceptor.Interceptor, ) (*LndHtlcInterceptor, error) { i := &LndHtlcInterceptor{ config: conf, client: client, - fwsync: fwsync, interceptor: interceptor, } @@ -57,8 +54,6 @@ func (i *LndHtlcInterceptor) Start() error { i.ctx = ctx i.cancel = cancel i.stopRequested = false - go i.fwsync.ForwardingHistorySynchronize(ctx) - go i.fwsync.ChannelsSynchronize(ctx) return i.intercept() } diff --git a/lsps2/intercept_handler.go b/lsps2/intercept_handler.go index 09d54bc5..4ae89fb9 100644 --- a/lsps2/intercept_handler.go +++ b/lsps2/intercept_handler.go @@ -10,12 +10,14 @@ import ( "github.com/breez/lspd/chain" "github.com/breez/lspd/common" + "github.com/breez/lspd/history" "github.com/breez/lspd/lightning" "github.com/breez/lspd/lsps0" "github.com/btcsuite/btcd/wire" ) type InterceptorConfig struct { + NodeId []byte AdditionalChannelCapacitySat uint64 MinConfs *uint32 TargetConf uint32 @@ -29,6 +31,7 @@ type InterceptorConfig struct { type Interceptor struct { store Lsps2Store + historyStore history.Store openingService common.OpeningService client lightning.Client feeEstimator chain.FeeEstimator @@ -43,6 +46,7 @@ type Interceptor struct { func NewInterceptHandler( store Lsps2Store, + historyStore history.Store, openingService common.OpeningService, client lightning.Client, feeEstimator chain.FeeEstimator, @@ -54,6 +58,7 @@ func NewInterceptHandler( return &Interceptor{ store: store, + historyStore: historyStore, openingService: openingService, client: client, feeEstimator: feeEstimator, @@ -516,18 +521,38 @@ func (i *Interceptor) ensureChannelOpen(payment *paymentState) { log.Printf( "Got new channel for forward successfully. scid alias: %v, "+ "confirmed scid: %v", - chanResult.InitialChannelID.ToString(), - chanResult.ConfirmedChannelID.ToString(), + chanResult.AliasScid.ToString(), + chanResult.ConfirmedScid.ToString(), ) - scid := chanResult.ConfirmedChannelID - if uint64(scid) == 0 { - scid = chanResult.InitialChannelID + var scid *lightning.ShortChannelID + if chanResult.ConfirmedScid == nil { + if chanResult.AliasScid == nil { + log.Printf("Error: GetChannel: Both confirmed scid and alias scid are nil: %+v", chanResult) + <-time.After(1 * time.Second) + continue + } + scid = chanResult.AliasScid + } else { + scid = chanResult.ConfirmedScid + } + + err := i.historyStore.UpdateChannels(context.TODO(), []*history.ChannelUpdate{{ + NodeID: i.config.NodeId, + PeerId: destination, + AliasScid: chanResult.AliasScid, + ConfirmedScid: chanResult.ConfirmedScid, + ChannelPoint: payment.registration.ChannelPoint, + LastUpdate: time.Now(), + }}) + if err != nil { + // Don't break here, this is not critical. + log.Printf("failed to insert newly opened channel in history store. %v", payment.registration.ChannelPoint.String()) } i.paymentChanOpened <- &paymentChanOpenedEvent{ paymentId: payment.id, - scid: scid, + scid: *scid, channelPoint: payment.registration.ChannelPoint, htlcMinimumMsat: chanResult.HtlcMinimumMsat, } diff --git a/lsps2/intercept_test.go b/lsps2/intercept_test.go index 85ad694c..b745aae2 100644 --- a/lsps2/intercept_test.go +++ b/lsps2/intercept_test.go @@ -31,9 +31,9 @@ var defaultChainHash = chainhash.Hash([32]byte{}) var defaultOutPoint = wire.NewOutPoint(&defaultChainHash, 0) var defaultChannelScid uint64 = 456 var defaultChanResult = &lightning.GetChannelResult{ - HtlcMinimumMsat: defaultConfig().HtlcMinimumMsat, - InitialChannelID: lightning.ShortChannelID(defaultChannelScid), - ConfirmedChannelID: lightning.ShortChannelID(defaultChannelScid), + HtlcMinimumMsat: defaultConfig().HtlcMinimumMsat, + AliasScid: (*lightning.ShortChannelID)(&defaultChannelScid), + ConfirmedScid: (*lightning.ShortChannelID)(&defaultChannelScid), } func defaultOpeningFeeParams() common.OpeningFeeParams { @@ -150,7 +150,7 @@ func setupInterceptor( openingService = defaultopeningService() } - i := NewInterceptHandler(store, openingService, client, f, config) + i := NewInterceptHandler(store, &mockHistoryStore{}, openingService, client, f, config) go i.Start(ctx) return i } diff --git a/lsps2/mocks.go b/lsps2/mocks.go index 7efcca9d..53b79ce2 100644 --- a/lsps2/mocks.go +++ b/lsps2/mocks.go @@ -9,6 +9,7 @@ import ( "github.com/GoWebProd/uuid7" "github.com/breez/lspd/chain" "github.com/breez/lspd/common" + "github.com/breez/lspd/history" "github.com/breez/lspd/lightning" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" @@ -96,6 +97,27 @@ func (s *mockLsps2Store) RemoveUnusedExpired(ctx context.Context, before time.Ti return nil } +type mockHistoryStore struct{} + +func (s *mockHistoryStore) UpdateChannels(ctx context.Context, updates []*history.ChannelUpdate) error { + return nil +} +func (s *mockHistoryStore) InsertForwards(ctx context.Context, forwards []*history.Forward, nodeId []byte) error { + return nil +} +func (s *mockHistoryStore) UpdateForwards(ctx context.Context, forwards []*history.Forward, nodeId []byte) error { + return nil +} +func (s *mockHistoryStore) FetchClnForwardOffsets(ctx context.Context, nodeId []byte) (uint64, uint64, error) { + return 0, 0, ErrNotImplemented +} +func (s *mockHistoryStore) SetClnForwardOffsets(ctx context.Context, nodeId []byte, created uint64, updated uint64) error { + return nil +} +func (s *mockHistoryStore) FetchLndForwardOffset(ctx context.Context, nodeId []byte) (*time.Time, error) { + return nil, ErrNotImplemented +} + type mockLightningClient struct { openResponses []*wire.OutPoint openRequests []*lightning.OpenChannelRequest @@ -147,6 +169,9 @@ func (c *mockLightningClient) WaitOnline(peerID []byte, deadline time.Time) erro func (c *mockLightningClient) WaitChannelActive(peerID []byte, deadline time.Time) error { return ErrNotImplemented } +func (c *mockLightningClient) ListChannels() ([]*lightning.Channel, error) { + return nil, ErrNotImplemented +} type mockFeeEstimator struct { } diff --git a/main.go b/main.go index 47effaf0..ac44ed16 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/breez/lspd/cln" "github.com/breez/lspd/common" "github.com/breez/lspd/config" + "github.com/breez/lspd/history" "github.com/breez/lspd/interceptor" "github.com/breez/lspd/lnd" "github.com/breez/lspd/lsps0" @@ -97,9 +98,9 @@ func main() { interceptStore := postgresql.NewPostgresInterceptStore(pool) openingStore := postgresql.NewPostgresOpeningStore(pool) - forwardingStore := postgresql.NewForwardingEventStore(pool) notificationsStore := postgresql.NewNotificationsStore(pool) lsps2Store := postgresql.NewLsps2Store(pool) + historyStore := postgresql.NewHistoryStore(pool) ctx, cancel := context.WithCancel(context.Background()) notificationService := notifications.NewNotificationService(notificationsStore) @@ -109,6 +110,8 @@ func main() { go lsps2CleanupService.Start(ctx) notificationCleanupService := notifications.NewCleanupService(notificationsStore) go notificationCleanupService.Start(ctx) + channelSync := history.NewChannelSync(nodes, historyStore) + go channelSync.ChannelsSynchronize(ctx) var interceptors []interceptor.HtlcInterceptor for _, node := range nodes { @@ -127,9 +130,11 @@ func main() { } client.StartListeners() - fwsync := lnd.NewForwardingHistorySync(client, interceptStore, forwardingStore) - interceptor := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, openingService, feeEstimator, feeStrategy, notificationService) - htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, fwsync, interceptor) + + forwardSync := lnd.NewForwardSync(node.NodeId, client, historyStore) + go forwardSync.ForwardsSynchronize(ctx) + interceptor := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, historyStore, openingService, feeEstimator, feeStrategy, notificationService) + htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, interceptor) if err != nil { log.Fatalf("failed to initialize LND interceptor: %v", err) } @@ -141,8 +146,11 @@ func main() { log.Fatalf("failed to initialize CLN client: %v", err) } - legacyHandler := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, openingService, feeEstimator, feeStrategy, notificationService) - lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, openingService, client, feeEstimator, &lsps2.InterceptorConfig{ + forwardSync := cln.NewForwardSync(node.NodeId, client, historyStore) + go forwardSync.ForwardsSynchronize(ctx) + legacyHandler := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, historyStore, openingService, feeEstimator, feeStrategy, notificationService) + lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, historyStore, openingService, client, feeEstimator, &lsps2.InterceptorConfig{ + NodeId: node.NodeId, AdditionalChannelCapacitySat: uint64(node.NodeConfig.AdditionalChannelCapacity), MinConfs: node.NodeConfig.MinConfs, TargetConf: node.NodeConfig.TargetConf, @@ -306,6 +314,12 @@ func initializeNodes(configs []*config.NodeConfig) ([]*common.Node, error) { node.NodeConfig.NodePubkey = info.Pubkey } + nodeId, err := hex.DecodeString(info.Pubkey) + if err != nil { + return nil, fmt.Errorf("failed to parse node id %s", info.Pubkey) + } + + node.NodeId = nodeId node.Tokens = config.Tokens nodes = append(nodes, node) } diff --git a/postgresql/forwarding_event_store.go b/postgresql/forwarding_event_store.go deleted file mode 100644 index ce0fbff5..00000000 --- a/postgresql/forwarding_event_store.go +++ /dev/null @@ -1,68 +0,0 @@ -package postgresql - -import ( - "context" - "fmt" - "log" - - "github.com/breez/lspd/lnd" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" -) - -type ForwardingEventStore struct { - pool *pgxpool.Pool -} - -func NewForwardingEventStore(pool *pgxpool.Pool) *ForwardingEventStore { - return &ForwardingEventStore{pool: pool} -} - -func (s *ForwardingEventStore) LastForwardingEvent() (int64, error) { - var last int64 - err := s.pool.QueryRow(context.Background(), - `SELECT coalesce(MAX("timestamp"), 0) AS last FROM forwarding_history`).Scan(&last) - if err != nil { - return 0, err - } - return last, nil -} - -func (s *ForwardingEventStore) InsertForwardingEvents(rowSrc lnd.CopyFromSource) error { - - tx, err := s.pool.Begin(context.Background()) - if err != nil { - return fmt.Errorf("pgxPool.Begin() error: %w", err) - } - defer tx.Rollback(context.Background()) - - _, err = tx.Exec(context.Background(), ` - CREATE TEMP TABLE tmp_table ON COMMIT DROP AS - SELECT * - FROM forwarding_history - WITH NO DATA; - `) - if err != nil { - return fmt.Errorf("CREATE TEMP TABLE error: %w", err) - } - - count, err := tx.CopyFrom(context.Background(), - pgx.Identifier{"tmp_table"}, - []string{"timestamp", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, rowSrc) - if err != nil { - return fmt.Errorf("CopyFrom() error: %w", err) - } - log.Printf("count1: %v", count) - - cmdTag, err := tx.Exec(context.Background(), ` - INSERT INTO forwarding_history - SELECT * - FROM tmp_table - ON CONFLICT DO NOTHING - `) - if err != nil { - return fmt.Errorf("INSERT INTO forwarding_history error: %w", err) - } - log.Printf("count2: %v", cmdTag.RowsAffected()) - return tx.Commit(context.Background()) -} diff --git a/postgresql/history_store.go b/postgresql/history_store.go new file mode 100644 index 00000000..675656d2 --- /dev/null +++ b/postgresql/history_store.go @@ -0,0 +1,347 @@ +package postgresql + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/GoWebProd/uuid7" + "github.com/breez/lspd/history" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type copyFromChanUpdates struct { + channels []*history.ChannelUpdate + idx int + err error +} + +func (cfe *copyFromChanUpdates) Next() bool { + if len(cfe.channels) == 0 { + return false + } + + for { + cfe.idx++ + if cfe.idx >= len(cfe.channels) { + return false + } + + if cfe.channels[cfe.idx] == nil { + continue + } + + return true + } +} + +func (cfe *copyFromChanUpdates) Values() ([]interface{}, error) { + channel := cfe.channels[cfe.idx] + var aliasScid *int64 + if channel.AliasScid != nil { + tmp := uint64(*channel.AliasScid) + tmp2 := int64(tmp) + aliasScid = &tmp2 + } + var confirmedScid *int64 + if channel.ConfirmedScid != nil { + tmp := uint64(*channel.ConfirmedScid) + tmp2 := int64(tmp) + confirmedScid = &tmp2 + } + values := []interface{}{ + channel.NodeID, + channel.PeerId, + aliasScid, + confirmedScid, + channel.ChannelPoint.Hash[:], + channel.ChannelPoint.Index, + channel.LastUpdate, + channel.LastUpdate, + } + return values, nil +} + +func (cfe *copyFromChanUpdates) Err() error { + return cfe.err +} + +type copyFromForwards struct { + forwards []*history.Forward + nodeid []byte + idx int + err error +} + +func (cfe *copyFromForwards) Next() bool { + cfe.idx++ + return cfe.idx < len(cfe.forwards) +} + +func (cfe *copyFromForwards) Values() ([]interface{}, error) { + forward := cfe.forwards[cfe.idx] + values := []interface{}{ + forward.Identifier, + forward.ResolvedTime.UnixNano(), + cfe.nodeid, + int64(uint64(forward.InChannel)), + int64(uint64(forward.OutChannel)), + int64(forward.InMsat), + int64(forward.OutMsat), + } + return values, nil +} + +func (cfe *copyFromForwards) Err() error { + return cfe.err +} + +type HistoryStore struct { + pool *pgxpool.Pool + generator *uuid7.Generator +} + +func NewHistoryStore(pool *pgxpool.Pool) *HistoryStore { + return &HistoryStore{ + pool: pool, + generator: uuid7.New(), + } +} + +func (s *HistoryStore) UpdateChannels( + ctx context.Context, + updates []*history.ChannelUpdate, +) error { + if len(updates) == 0 { + return nil + } + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("pgxPool.Begin() error: %w", err) + } + defer tx.Rollback(ctx) + + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_table ON COMMIT DROP AS + SELECT * + FROM channels + WITH NO DATA; + `) + if err != nil { + return fmt.Errorf("CREATE TEMP TABLE error: %w", err) + } + + rowSrc := ©FromChanUpdates{channels: updates, idx: -1} + count, err := tx.CopyFrom(ctx, + pgx.Identifier{"tmp_table"}, + []string{"nodeid", "peerid", "alias_scid", "confirmed_scid", "funding_tx_id", "funding_tx_outnum", "first_seen", "last_update"}, + rowSrc) + if err != nil { + return fmt.Errorf("CopyFrom() error: %w", err) + } + log.Printf("UpdateChannels - count1: %v", count) + + cmdTag, err := tx.Exec(ctx, ` + INSERT INTO channels + SELECT * + FROM tmp_table + ON CONFLICT (nodeid, funding_tx_id, funding_tx_outnum) DO UPDATE SET + alias_scid = EXCLUDED.alias_scid, + confirmed_scid = EXCLUDED.confirmed_scid, + last_update = EXCLUDED.last_update + `) + if err != nil { + return fmt.Errorf("INSERT INTO channels error: %w", err) + } + log.Printf("UpdateChannels - count2: %v", cmdTag.RowsAffected()) + + return tx.Commit(ctx) +} + +func (s *HistoryStore) InsertForwards( + ctx context.Context, + forwards []*history.Forward, + nodeid []byte, +) error { + if len(forwards) == 0 { + return nil + } + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("pgxPool.Begin() error: %w", err) + } + defer tx.Rollback(ctx) + + rowSrc := copyFromForwards{ + forwards: forwards, + nodeid: nodeid, + idx: -1, + } + + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_table ON COMMIT DROP AS + SELECT * + FROM forwarding_history + WITH NO DATA; + `) + if err != nil { + return fmt.Errorf("CREATE TEMP TABLE error: %w", err) + } + + count, err := tx.CopyFrom( + ctx, + pgx.Identifier{"tmp_table"}, + []string{"identifier", "resolved_time", "nodeid", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, + &rowSrc, + ) + if err != nil { + return fmt.Errorf("CopyFrom() error: %w", err) + } + log.Printf("InsertForwards node %x count1: %v", nodeid, count) + + cmdTag, err := tx.Exec(ctx, ` + INSERT INTO forwarding_history + SELECT * + FROM tmp_table + ON CONFLICT (nodeid, identifier) DO NOTHING + `) + if err != nil { + return fmt.Errorf("INSERT INTO forwarding_history error: %w", err) + } + log.Printf("InsertForwards node %x count2: %v", nodeid, cmdTag.RowsAffected()) + + return tx.Commit(ctx) +} + +func (s *HistoryStore) UpdateForwards( + ctx context.Context, + forwards []*history.Forward, + nodeid []byte, +) error { + if len(forwards) == 0 { + return nil + } + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("pgxPool.Begin() error: %w", err) + } + defer tx.Rollback(ctx) + + rowSrc := copyFromForwards{ + forwards: forwards, + nodeid: nodeid, + idx: -1, + } + + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_table ON COMMIT DROP AS + SELECT * + FROM forwarding_history + WITH NO DATA; + `) + if err != nil { + return fmt.Errorf("CREATE TEMP TABLE error: %w", err) + } + + count, err := tx.CopyFrom( + ctx, + pgx.Identifier{"tmp_table"}, + []string{"identifier", "resolved_time", "nodeid", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, + &rowSrc, + ) + if err != nil { + return fmt.Errorf("CopyFrom() error: %w", err) + } + log.Printf("UpdateForwards node %x count1: %v", nodeid, count) + + cmdTag, err := tx.Exec(ctx, ` + INSERT INTO forwarding_history + SELECT * + FROM tmp_table + ON CONFLICT (nodeid, identifier) DO UPDATE SET + resolved_time = EXCLUDED.resolved_time, + chanid_in = EXCLUDED.chanid_in, + chanid_out = EXCLUDED.chanid_out, + amt_msat_in = EXCLUDED.amt_msat_in, + amt_msat_out = EXCLUDED.amt_msat_out + `) + if err != nil { + return fmt.Errorf("INSERT INTO forwarding_history error: %w", err) + } + log.Printf("UpdateForwards node %x count2: %v", nodeid, cmdTag.RowsAffected()) + + return tx.Commit(ctx) +} + +func (s *HistoryStore) FetchClnForwardOffsets( + ctx context.Context, + nodeId []byte, +) (uint64, uint64, error) { + row := s.pool.QueryRow(ctx, ` + SELECT last_created_index, last_updated_index + FROM public.cln_forwarding_history_offsets + WHERE nodeid = $1 + `, + nodeId) + + var created int64 + var updated int64 + err := row.Scan(&created, &updated) + if err == pgx.ErrNoRows { + return 0, 0, nil + } + if err != nil { + return 0, 0, err + } + + return uint64(created), uint64(updated), nil +} + +func (s *HistoryStore) FetchLndForwardOffset( + ctx context.Context, + nodeId []byte, +) (*time.Time, error) { + row := s.pool.QueryRow(ctx, ` + SELECT MAX(resolved_time) + FROM forwarding_history + WHERE nodeid = $1 + `, + nodeId) + var t *int64 + err := row.Scan(&t) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + if t == nil { + return nil, nil + } + + tt := time.Unix(0, *t) + return &tt, nil +} + +func (s *HistoryStore) SetClnForwardOffsets( + ctx context.Context, + nodeId []byte, + created uint64, + updated uint64, +) error { + _, err := s.pool.Exec(ctx, ` + INSERT INTO public.cln_forwarding_history_offsets (nodeid, last_created_index, last_updated_index) + VALUES($1, $2, $3) + ON CONFLICT (nodeid) DO UPDATE SET last_created_index = EXCLUDED.last_created_index, last_updated_index = EXCLUDED.last_updated_index + `, + nodeId, + int64(created), + int64(updated), + ) + return err +} diff --git a/postgresql/intercept_store.go b/postgresql/intercept_store.go index 46b5e68c..cac0e53d 100644 --- a/postgresql/intercept_store.go +++ b/postgresql/intercept_store.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log" - "time" "github.com/breez/lspd/common" "github.com/breez/lspd/lightning" @@ -102,23 +101,3 @@ func (s *PostgresInterceptStore) RegisterPayment(token string, params *common.Op } return nil } - -func (s *PostgresInterceptStore) InsertChannel(initialChanID, confirmedChanId uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error { - - query := `INSERT INTO - channels (initial_chanid, confirmed_chanid, channel_point, nodeid, last_update) - VALUES ($1, NULLIF($2, 0::int8), $3, $4, $5) - ON CONFLICT (channel_point) DO UPDATE SET confirmed_chanid=NULLIF($2, 0::int8), last_update=$5` - - c, err := s.pool.Exec(context.Background(), - query, int64(initialChanID), int64(confirmedChanId), channelPoint, nodeID, lastUpdate) - if err != nil { - log.Printf("insertChannel(%v, %v, %s, %x) error: %v", - initialChanID, confirmedChanId, channelPoint, nodeID, err) - return fmt.Errorf("insertChannel(%v, %v, %s, %x) error: %w", - initialChanID, confirmedChanId, channelPoint, nodeID, err) - } - log.Printf("insertChannel(%v, %v, %x) result: %v", - initialChanID, confirmedChanId, nodeID, c.String()) - return nil -} diff --git a/postgresql/migrations/000015_history_sync.down.sql b/postgresql/migrations/000015_history_sync.down.sql new file mode 100644 index 00000000..4bdbd6a1 --- /dev/null +++ b/postgresql/migrations/000015_history_sync.down.sql @@ -0,0 +1,28 @@ +ALTER INDEX public.channels_nodeid_idx RENAME TO channels_backup_nodeid_idx; +ALTER TABLE public.channels RENAME TO channels_backup; +ALTER INDEX public.forwarding_history_chanid_in_idx RENAME TO forwarding_history_backup_chanid_in_idx; +ALTER INDEX public.forwarding_history_chanid_out_idx RENAME TO forwarding_history_backup_chanid_out_idx; +ALTER TABLE public.forwarding_history RENAME TO forwarding_history_backup; + +DROP INDEX lsps2_bought_channels_registration_id_funding_tx_idx; +DROP INDEX payments_funding_tx_idx; +DROP TABLE public.cln_forwarding_history_offsets; +DROP INDEX forwarding_history_nodeid_chanid_out_idx; +DROP INDEX forwarding_history_nodeid_chanid_in_idx; +DROP INDEX forwarding_history_resolved_time_idx; +DROP INDEX forwarding_history_nodeid_resolved_time_idx; +DROP INDEX forwarding_history_nodeid_idx; +DROP TABLE public.forwarding_history; +DROP INDEX channels_confirmed_scid_idx; +DROP INDEX channels_alias_scid_idx; +DROP INDEX channels_nodeid_funding_tx_idx; +DROP INDEX channels_funding_tx_idx; +DROP INDEX channels_peerid_idx; +DROP INDEX channels_nodeid_idx; +DROP TABLE public.channels; + +ALTER TABLE public.forwarding_history_backup RENAME TO forwarding_history; +ALTER INDEX public.forwarding_history_backup_chanid_out_idx RENAME TO forwarding_history_chanid_out_idx; +ALTER INDEX public.forwarding_history_backup_chanid_in_idx RENAME TO forwarding_history_chanid_in_idx; +ALTER TABLE public.channels_backup RENAME TO channels; +ALTER INDEX public.channels_backup_nodeid_idx RENAME TO channels_nodeid_idx; diff --git a/postgresql/migrations/000015_history_sync.up.sql b/postgresql/migrations/000015_history_sync.up.sql new file mode 100644 index 00000000..8a55264d --- /dev/null +++ b/postgresql/migrations/000015_history_sync.up.sql @@ -0,0 +1,49 @@ +ALTER INDEX public.channels_nodeid_idx RENAME TO channels_backup_nodeid_idx; +ALTER TABLE public.channels RENAME TO channels_backup; +ALTER INDEX public.forwarding_history_chanid_in_idx RENAME TO forwarding_history_backup_chanid_in_idx; +ALTER INDEX public.forwarding_history_chanid_out_idx RENAME TO forwarding_history_backup_chanid_out_idx; +ALTER TABLE public.forwarding_history RENAME TO forwarding_history_backup; + +CREATE TABLE public.channels ( + nodeid bytea NOT NULL, + peerid bytea NOT NULL, + alias_scid bigint NULL, + confirmed_scid bigint NULL, + funding_tx_id bytea NOT NULL, + funding_tx_outnum bigint NOT NULL, + first_seen timestamp NOT NULL, + last_update timestamp NOT NULL, + UNIQUE(nodeid, funding_tx_id, funding_tx_outnum) +); +CREATE INDEX channels_nodeid_idx ON public.channels USING btree (nodeid); +CREATE INDEX channels_peerid_idx ON public.channels USING btree (peerid); +CREATE INDEX channels_funding_tx_idx ON public.channels USING btree (funding_tx_id, funding_tx_outnum); +CREATE INDEX channels_nodeid_funding_tx_idx ON public.channels USING btree (nodeid, funding_tx_id, funding_tx_outnum); +CREATE INDEX channels_alias_scid_idx ON public.channels USING btree (alias_scid); +CREATE INDEX channels_confirmed_scid_idx ON public.channels USING btree (confirmed_scid); + +CREATE TABLE public.forwarding_history ( + identifier varchar NOT NULL, + resolved_time bigint NOT NULL, + nodeid bytea NOT NULL, + chanid_in bigint NOT NULL, + chanid_out bigint NOT NULL, + amt_msat_in bigint NOT NULL, + amt_msat_out bigint NOT NULL, + UNIQUE(nodeid, identifier) +); +CREATE INDEX forwarding_history_nodeid_idx ON public.forwarding_history (nodeid); +CREATE INDEX forwarding_history_nodeid_resolved_time_idx ON public.forwarding_history USING btree (nodeid, resolved_time); +CREATE INDEX forwarding_history_resolved_time_idx ON public.forwarding_history USING btree (resolved_time); +CREATE INDEX forwarding_history_nodeid_chanid_in_idx ON public.forwarding_history USING btree (nodeid, chanid_in); +CREATE INDEX forwarding_history_nodeid_chanid_out_idx ON public.forwarding_history USING btree (nodeid, chanid_out); + +CREATE TABLE public.cln_forwarding_history_offsets ( + nodeid bytea NOT NULL, + last_created_index bigint NOT NULL, + last_updated_index bigint NOT NULL, + UNIQUE(nodeid) +); + +CREATE INDEX payments_funding_tx_idx ON public.payments USING btree (funding_tx_id, funding_tx_outnum); +CREATE INDEX lsps2_bought_channels_registration_id_funding_tx_idx ON lsps2.bought_channels USING btree (registration_id, funding_tx_id, funding_tx_outnum);