Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forwarding history - no matching #175

Merged
merged 10 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 65 additions & 11 deletions cln/cln_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,28 +198,23 @@ func (c *ClnClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh
pubkey := hex.EncodeToString(peerID)
channels, err := client.GetPeerChannels(pubkey)
if err != nil {
log.Printf("CLN: client.GetPeer(%s) error: %v", pubkey, err)
log.Printf("CLN: client.GetPeerChannels(%s) error: %v", pubkey, err)
return nil, err
}

fundingTxID := channelPoint.Hash.String()
for _, c := range channels {
log.Printf("getChannel destination: %s, Short channel id: %v, local alias: %v , FundingTxID:%v, State:%v ", pubkey, c.ShortChannelId, c.Alias.Local, c.FundingTxId, c.State)
if slices.Contains(OPEN_STATUSES, c.State) && c.FundingTxId == fundingTxID {
confirmedChanID, err := lightning.NewShortChannelIDFromString(c.ShortChannelId)
if err != nil {
fmt.Printf("NewShortChannelIDFromString %v error: %v", c.ShortChannelId, err)
return nil, err
}
initialChanID, err := lightning.NewShortChannelIDFromString(c.Alias.Local)

aliasScid, confirmedScid, err := mapScidsFromChannel(c)
if err != nil {
fmt.Printf("NewShortChannelIDFromString %v error: %v", c.Alias.Local, err)
return nil, err
}
return &lightning.GetChannelResult{
InitialChannelID: *initialChanID,
ConfirmedChannelID: *confirmedChanID,
HtlcMinimumMsat: c.MinimumHtlcOutMsat.MSat(),
AliasScid: aliasScid,
ConfirmedScid: confirmedScid,
HtlcMinimumMsat: c.MinimumHtlcOutMsat.MSat(),
}, nil
}
}
Expand Down Expand Up @@ -324,3 +319,62 @@ func (c *ClnClient) WaitOnline(peerID []byte, deadline time.Time) error {
func (c *ClnClient) WaitChannelActive(peerID []byte, deadline time.Time) error {
return nil
}

func (c *ClnClient) ListChannels() ([]*lightning.Channel, error) {
channels, err := c.client.ListPeerChannels()
if err != nil {
return nil, err
}

result := make([]*lightning.Channel, len(channels))
for i, channel := range channels {
peerId, err := hex.DecodeString(channel.PeerId)
if err != nil {
log.Printf("cln.ListChannels returned channel without peer id: %+v", channel)
continue
}
aliasScid, confirmedScid, err := mapScidsFromChannel(channel)
if err != nil {
return nil, err
}

var outpoint *wire.OutPoint
fundingTxId, err := hex.DecodeString(channel.FundingTxId)
if err == nil && fundingTxId != nil && len(fundingTxId) > 0 {
outpoint, _ = lightning.NewOutPoint(fundingTxId, channel.FundingOutnum)
}
if outpoint == nil {
log.Printf("cln.ListChannels returned channel without outpoint: %+v", channel)
continue
}
result[i] = &lightning.Channel{
AliasScid: aliasScid,
ConfirmedScid: confirmedScid,
ChannelPoint: outpoint,
PeerId: peerId,
}
}

return result, nil
}

func mapScidsFromChannel(c *glightning.PeerChannel) (*lightning.ShortChannelID, *lightning.ShortChannelID, error) {
var confirmedScid *lightning.ShortChannelID
var aliasScid *lightning.ShortChannelID
var err error
if c.ShortChannelId != "" {
confirmedScid, err = lightning.NewShortChannelIDFromString(c.ShortChannelId)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse scid '%s': %w", c.ShortChannelId, err)
}
}

if c.Alias != nil && c.Alias.Local != "" {
aliasScid, err = lightning.NewShortChannelIDFromString(c.Alias.Local)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse scid '%s': %w", c.Alias.Local, err)
}
}

return aliasScid, confirmedScid, nil
}
201 changes: 201 additions & 0 deletions cln/forwards_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package cln

import (
"context"
"fmt"
"log"
"math"
"strconv"
"time"

"github.com/breez/lspd/history"
"github.com/breez/lspd/lightning"
"github.com/elementsproject/glightning/glightning"
)

type ForwardSync struct {
createdOffset uint64
updatedOffset uint64
nodeid []byte
client *ClnClient
store history.Store
}

func NewForwardSync(nodeid []byte, client *ClnClient, store history.Store) *ForwardSync {
return &ForwardSync{
nodeid: nodeid,
client: client,
store: store,
}
}

var forwardSyncInterval time.Duration = time.Minute * 5

func (s *ForwardSync) ForwardsSynchronize(ctx context.Context) {
s.createdOffset, s.updatedOffset, _ = s.store.FetchClnForwardOffsets(ctx, s.nodeid)
s.forwardsSynchronizeOnce(ctx)

for {
select {
case <-ctx.Done():
return
case <-time.After(forwardSyncInterval):
}

s.forwardsSynchronizeOnce(ctx)
}
}

func (s *ForwardSync) forwardsSynchronizeOnce(ctx context.Context) {
s.forwardCreatedSynchronizeOnce(ctx)
s.forwardUpdatedSynchronizeOnce(ctx)
}

func (s *ForwardSync) forwardCreatedSynchronizeOnce(ctx context.Context) {
log.Printf("forwardCreatedSynchronizeOnce(%x) - Begin", s.nodeid)
var limit uint32 = 10000
endReached := false
round := 0
for !endReached {
log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.createdOffset)
var forwards []*history.Forward
var newCreatedOffset uint64
var err error
forwards, newCreatedOffset, endReached, err = s.listForwards("created", s.createdOffset, limit)
if err != nil {
log.Printf("forwardCreatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err)
return
}

log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.createdOffset, len(forwards))
if len(forwards) == 0 {
break
}

err = s.store.InsertForwards(ctx, forwards, s.nodeid)
if err != nil {
log.Printf("forwardCreatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err)
return
}

err = s.store.SetClnForwardOffsets(ctx, s.nodeid, newCreatedOffset, s.updatedOffset)
if err != nil {
log.Printf("forwardCreatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err)
return
}

s.createdOffset = newCreatedOffset
}

log.Printf("forwardCreatedSynchronizeOnce(%x) - Done", s.nodeid)
}

func (s *ForwardSync) forwardUpdatedSynchronizeOnce(ctx context.Context) {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - Begin", s.nodeid)
var limit uint32 = 10000
endReached := false
round := 0
for !endReached {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.updatedOffset)
var forwards []*history.Forward
var newUpdatedOffset uint64
var err error
forwards, newUpdatedOffset, endReached, err = s.listForwards("updated", s.updatedOffset, limit)
if err != nil {
log.Printf("forwardUpdatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err)
return
}

log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.updatedOffset, len(forwards))
if len(forwards) == 0 {
break
}

err = s.store.UpdateForwards(ctx, forwards, s.nodeid)
if err != nil {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err)
return
}

err = s.store.SetClnForwardOffsets(ctx, s.nodeid, s.createdOffset, newUpdatedOffset)
if err != nil {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err)
return
}

s.updatedOffset = newUpdatedOffset
}

log.Printf("forwardUpdatedSynchronizeOnce(%x) - Done", s.nodeid)
}

func (s *ForwardSync) listForwards(index string, offset uint64, limit uint32) ([]*history.Forward, uint64, bool, error) {
var response struct {
Forwards []Forward `json:"forwards"`
}

client, err := s.client.getClient()
if err != nil {
return nil, 0, false, err
}

err = client.Request(&glightning.ListForwardsRequest{
Status: "settled",
Index: index,
Start: offset,
Limit: limit * 2,
}, &response)
if err != nil {
return nil, 0, false, err
}

var result []*history.Forward
endReached := len(response.Forwards) < int(limit)
var lastIndex *uint64
for _, forward := range response.Forwards {
in, err := lightning.NewShortChannelIDFromString(forward.InChannel)
if err != nil {
return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.InChannel, err)
}
out, err := lightning.NewShortChannelIDFromString(forward.OutChannel)
if err != nil {
return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.OutChannel, err)
}

sec, dec := math.Modf(forward.ResolvedTime)
result = append(result, &history.Forward{
Identifier: strconv.FormatUint(forward.CreatedIndex, 10),
InChannel: *in,
OutChannel: *out,
InMsat: forward.InMsat.MSat(),
OutMsat: forward.OutMsat.MSat(),
ResolvedTime: time.Unix(int64(sec), int64(dec*(1e9))),
})
if index == "created" {
lastIndex = &forward.CreatedIndex
} else {
lastIndex = &forward.UpdatedIndex
}
}

var newOffset uint64
if lastIndex == nil {
newOffset = 0
} else {
newOffset = *lastIndex + 1
}
return result, newOffset, endReached, nil
}

type Forward struct {
CreatedIndex uint64 `json:"created_index"`
UpdatedIndex uint64 `json:"updated_index"`
InChannel string `json:"in_channel"`
OutChannel string `json:"out_channel"`
InMsat glightning.Amount `json:"in_msat"`
OutMsat glightning.Amount `json:"out_msat"`
Status string `json:"status"`
PaymentHash string `json:"payment_hash"`
ReceivedTime float64 `json:"received_time"`
ResolvedTime float64 `json:"resolved_time"`
}
1 change: 1 addition & 0 deletions common/nodes_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

type Node struct {
NodeId []byte
Client lightning.Client
NodeConfig *config.NodeConfig
PrivateKey *btcec.PrivateKey
Expand Down
82 changes: 82 additions & 0 deletions history/channel_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package history

import (
"context"
"log"
"sync"
"time"

"github.com/breez/lspd/common"
)

type ChannelSync struct {
nodes []*common.Node
store Store
}

func NewChannelSync(nodes []*common.Node, store Store) *ChannelSync {
return &ChannelSync{
nodes: nodes,
store: store,
}
}

var channelSyncInterval time.Duration = time.Minute * 5

func (s *ChannelSync) ChannelsSynchronize(ctx context.Context) {
s.channelsSynchronizeOnce(ctx)

for {
select {
case <-ctx.Done():
return
case <-time.After(channelSyncInterval):
}

s.channelsSynchronizeOnce(ctx)
}
}

func (s *ChannelSync) channelsSynchronizeOnce(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(s.nodes))
for _, n := range s.nodes {
go func(node *common.Node) {
s.channelsSynchronizeNodeOnce(ctx, node)
wg.Done()
}(n)
}
wg.Wait()
}

func (s *ChannelSync) channelsSynchronizeNodeOnce(ctx context.Context, node *common.Node) {
lastUpdate := time.Now()
log.Printf("ChannelsSynchronizeNodeOnce(%x) - Begin %v", node.NodeId, lastUpdate)
channels, err := node.Client.ListChannels()
if err != nil {
log.Printf("ChannelsSynchronizeNodeOnce(%x)- ListChannels error: %v", node.NodeId, err)
return
}

updates := make([]*ChannelUpdate, len(channels))
for i, c := range channels {
if c == nil {
continue
}
updates[i] = &ChannelUpdate{
NodeID: node.NodeId,
PeerId: c.PeerId,
AliasScid: c.AliasScid,
ConfirmedScid: c.ConfirmedScid,
ChannelPoint: c.ChannelPoint,
LastUpdate: lastUpdate,
}
}
err = s.store.UpdateChannels(ctx, updates)
if err != nil {
log.Printf("ChannelsSynchronizeNodeOnce(%x) - store.UpdateChannels error: %v", node.NodeId, err)
return
}

log.Printf("ChannelsSynchronizeNodeOnce(%x) - Done %v", node.NodeId, lastUpdate)
}
Loading
Loading