Skip to content

Commit

Permalink
synchronize forwards
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Feb 5, 2024
1 parent ecf4272 commit 9c20d44
Show file tree
Hide file tree
Showing 6 changed files with 544 additions and 0 deletions.
201 changes: 201 additions & 0 deletions cln/forwards_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package cln

import (
"context"
"fmt"
"log"
"math"
"strconv"
"time"

"github.com/breez/lspd/history"
"github.com/breez/lspd/lightning"
"github.com/elementsproject/glightning/glightning"
)

type ForwardSync struct {
createdOffset uint64
updatedOffset uint64
nodeid []byte
client *ClnClient
store history.Store
}

func NewForwardSync(nodeid []byte, client *ClnClient, store history.Store) *ForwardSync {
return &ForwardSync{
nodeid: nodeid,
client: client,
store: store,
}
}

var forwardSyncInterval time.Duration = time.Minute

func (s *ForwardSync) ForwardsSynchronize(ctx context.Context) {
s.createdOffset, s.updatedOffset, _ = s.store.FetchClnForwardOffsets(ctx, s.nodeid)
s.forwardsSynchronizeOnce(ctx)

for {
select {
case <-ctx.Done():
return
case <-time.After(forwardSyncInterval):
}

s.forwardsSynchronizeOnce(ctx)
}
}

func (s *ForwardSync) forwardsSynchronizeOnce(ctx context.Context) {
s.forwardCreatedSynchronizeOnce(ctx)
s.forwardUpdatedSynchronizeOnce(ctx)
}

func (s *ForwardSync) forwardCreatedSynchronizeOnce(ctx context.Context) {
log.Printf("forwardCreatedSynchronizeOnce(%x) - Begin", s.nodeid)
var limit uint32 = 10000
endReached := false
round := 0
for !endReached {
log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.createdOffset)
var forwards []*history.Forward
var newCreatedOffset uint64
var err error
forwards, newCreatedOffset, endReached, err = s.listForwards("created", s.createdOffset, limit)
if err != nil {
log.Printf("forwardCreatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err)
return
}

log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.createdOffset, len(forwards))
if len(forwards) == 0 {
break
}

err = s.store.InsertForwards(ctx, forwards, s.nodeid)
if err != nil {
log.Printf("forwardCreatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err)
return
}

err = s.store.SetClnForwardOffsets(ctx, s.nodeid, newCreatedOffset, s.updatedOffset)
if err != nil {
log.Printf("forwardCreatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err)
return
}

s.createdOffset = newCreatedOffset
}

log.Printf("forwardCreatedSynchronizeOnce(%x) - Done", s.nodeid)
}

func (s *ForwardSync) forwardUpdatedSynchronizeOnce(ctx context.Context) {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - Begin", s.nodeid)
var limit uint32 = 10000
endReached := false
round := 0
for !endReached {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.updatedOffset)
var forwards []*history.Forward
var newUpdatedOffset uint64
var err error
forwards, newUpdatedOffset, endReached, err = s.listForwards("updated", s.updatedOffset, limit)
if err != nil {
log.Printf("forwardUpdatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err)
return
}

log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.updatedOffset, len(forwards))
if len(forwards) == 0 {
break
}

err = s.store.UpdateForwards(ctx, forwards, s.nodeid)
if err != nil {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err)
return
}

err = s.store.SetClnForwardOffsets(ctx, s.nodeid, s.createdOffset, newUpdatedOffset)
if err != nil {
log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err)
return
}

s.updatedOffset = newUpdatedOffset
}

log.Printf("forwardUpdatedSynchronizeOnce(%x) - Done", s.nodeid)
}

func (s *ForwardSync) listForwards(index string, offset uint64, limit uint32) ([]*history.Forward, uint64, bool, error) {
var response struct {
Forwards []Forward `json:"forwards"`
}

client, err := s.client.getClient()
if err != nil {
return nil, 0, false, err
}

err = client.Request(&glightning.ListForwardsRequest{
Status: "settled",
Index: index,
Start: offset,
Limit: limit * 2,
}, &response)
if err != nil {
return nil, 0, false, err
}

var result []*history.Forward
endReached := len(response.Forwards) < int(limit)
var lastIndex *uint64
for _, forward := range response.Forwards {
in, err := lightning.NewShortChannelIDFromString(forward.InChannel)
if err != nil {
return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.InChannel, err)
}
out, err := lightning.NewShortChannelIDFromString(forward.OutChannel)
if err != nil {
return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.OutChannel, err)
}

sec, dec := math.Modf(forward.ResolvedTime)
result = append(result, &history.Forward{
Identifier: strconv.FormatUint(forward.CreatedIndex, 10),
InChannel: *in,
OutChannel: *out,
InMsat: forward.InMsat.MSat(),
OutMsat: forward.OutMsat.MSat(),
ResolvedTime: time.Unix(int64(sec), int64(dec*(1e9))),
})
if index == "created" {
lastIndex = &forward.CreatedIndex
} else {
lastIndex = &forward.UpdatedIndex
}
}

var newOffset uint64
if lastIndex == nil {
newOffset = 0
} else {
newOffset = *lastIndex + 1
}
return result, newOffset, endReached, nil
}

type Forward struct {
CreatedIndex uint64 `json:"created_index"`
UpdatedIndex uint64 `json:"updated_index"`
InChannel string `json:"in_channel"`
OutChannel string `json:"out_channel"`
InMsat glightning.Amount `json:"in_msat"`
OutMsat glightning.Amount `json:"out_msat"`
Status string `json:"status"`
PaymentHash string `json:"payment_hash"`
ReceivedTime float64 `json:"received_time"`
ResolvedTime float64 `json:"resolved_time"`
}
14 changes: 14 additions & 0 deletions history/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ type ChannelUpdate struct {
LastUpdate time.Time
}

type Forward struct {
Identifier string
InChannel lightning.ShortChannelID
OutChannel lightning.ShortChannelID
InMsat uint64
OutMsat uint64
ResolvedTime time.Time
}

type Store interface {
UpdateChannels(ctx context.Context, updates []*ChannelUpdate) error
InsertForwards(ctx context.Context, forwards []*Forward, nodeId []byte) error
UpdateForwards(ctx context.Context, forwards []*Forward, nodeId []byte) error
FetchClnForwardOffsets(ctx context.Context, nodeId []byte) (uint64, uint64, error)
SetClnForwardOffsets(ctx context.Context, nodeId []byte, created uint64, updated uint64) error
FetchLndForwardOffset(ctx context.Context, nodeId []byte) (*time.Time, error)
}
93 changes: 93 additions & 0 deletions lnd/forwards_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package lnd

import (
"context"
"log"
"strconv"
"time"

"github.com/breez/lspd/history"
"github.com/breez/lspd/lightning"
"github.com/lightningnetwork/lnd/lnrpc"
)

type ForwardSync struct {
nodeid []byte
client *LndClient
store history.Store
}

func NewForwardSync(
nodeid []byte,
client *LndClient,
store history.Store,
) *ForwardSync {
return &ForwardSync{
nodeid: nodeid,
client: client,
store: store,
}
}

var forwardChannelSyncInterval time.Duration = time.Minute * 5

func (s *ForwardSync) ForwardsSynchronize(ctx context.Context) {
s.forwardsSynchronizeOnce(ctx)

for {
select {
case <-ctx.Done():
return
case <-time.After(forwardChannelSyncInterval):
}

s.forwardsSynchronizeOnce(ctx)
}
}

func (s *ForwardSync) forwardsSynchronizeOnce(ctx context.Context) {
last, err := s.store.FetchLndForwardOffset(ctx, s.nodeid)
if err != nil {
log.Printf("forwardsSynchronizeOnce(%x) - FetchLndForwardOffset err: %v", s.nodeid, err)
return
}

var startTime uint64
if last != nil {
startTime = uint64(last.UnixNano())
}

for {
forwardHistory, err := s.client.client.ForwardingHistory(context.Background(), &lnrpc.ForwardingHistoryRequest{
StartTime: startTime,
NumMaxEvents: 10000,
})
if err != nil {
log.Printf("forwardsSynchronizeOnce(%x) - ForwardingHistory error: %v", s.nodeid, err)
return
}

log.Printf("forwardsSynchronizeOnce(%x) - startTime: %v, Events: %v", s.nodeid, startTime, len(forwardHistory.ForwardingEvents))
if len(forwardHistory.ForwardingEvents) == 0 {
break
}

forwards := make([]*history.Forward, len(forwardHistory.ForwardingEvents))
for i, f := range forwardHistory.ForwardingEvents {
forwards[i] = &history.Forward{
Identifier: strconv.FormatUint(f.TimestampNs, 10) + "|" + strconv.FormatUint(f.AmtInMsat, 10),
InChannel: lightning.ShortChannelID(f.ChanIdIn),
OutChannel: lightning.ShortChannelID(f.ChanIdOut),
InMsat: f.AmtInMsat,
OutMsat: f.AmtOutMsat,
ResolvedTime: time.Unix(0, int64(f.TimestampNs)),
}
startTime = f.TimestampNs
}
s.store.InsertForwards(ctx, forwards, s.nodeid)
if err != nil {
log.Printf("forwardsSynchronizeOnce(%x) - store.InsertForwards() error: %v", s.nodeid, err)
return
}
}
}
15 changes: 15 additions & 0 deletions lsps2/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ type mockHistoryStore struct{}
func (s *mockHistoryStore) UpdateChannels(ctx context.Context, updates []*history.ChannelUpdate) error {
return nil
}
func (s *mockHistoryStore) InsertForwards(ctx context.Context, forwards []*history.Forward, nodeId []byte) error {
return nil
}
func (s *mockHistoryStore) UpdateForwards(ctx context.Context, forwards []*history.Forward, nodeId []byte) error {
return nil
}
func (s *mockHistoryStore) FetchClnForwardOffsets(ctx context.Context, nodeId []byte) (uint64, uint64, error) {
return 0, 0, ErrNotImplemented
}
func (s *mockHistoryStore) SetClnForwardOffsets(ctx context.Context, nodeId []byte, created uint64, updated uint64) error {
return nil
}
func (s *mockHistoryStore) FetchLndForwardOffset(ctx context.Context, nodeId []byte) (*time.Time, error) {
return nil, ErrNotImplemented
}

type mockLightningClient struct {
openResponses []*wire.OutPoint
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func main() {
}

client.StartListeners()

forwardSync := lnd.NewForwardSync(node.NodeId, client, historyStore)
go forwardSync.ForwardsSynchronize(ctx)
interceptor := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, historyStore, openingService, feeEstimator, feeStrategy, notificationService)
htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, interceptor)
if err != nil {
Expand All @@ -141,6 +144,8 @@ func main() {
log.Fatalf("failed to initialize CLN client: %v", err)
}

forwardSync := cln.NewForwardSync(node.NodeId, client, historyStore)
go forwardSync.ForwardsSynchronize(ctx)
legacyHandler := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, historyStore, openingService, feeEstimator, feeStrategy, notificationService)
lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, historyStore, openingService, client, feeEstimator, &lsps2.InterceptorConfig{
NodeId: node.NodeId,
Expand Down
Loading

0 comments on commit 9c20d44

Please sign in to comment.