From 9c20d44f7e0fc5ac30b9ffea80c601e7667febbe Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 29 Dec 2023 11:49:10 +0100 Subject: [PATCH] synchronize forwards --- cln/forwards_sync.go | 201 +++++++++++++++++++++++++++++++++ history/store.go | 14 +++ lnd/forwards_sync.go | 93 ++++++++++++++++ lsps2/mocks.go | 15 +++ main.go | 5 + postgresql/history_store.go | 216 ++++++++++++++++++++++++++++++++++++ 6 files changed, 544 insertions(+) create mode 100644 cln/forwards_sync.go create mode 100644 lnd/forwards_sync.go diff --git a/cln/forwards_sync.go b/cln/forwards_sync.go new file mode 100644 index 00000000..611db732 --- /dev/null +++ b/cln/forwards_sync.go @@ -0,0 +1,201 @@ +package cln + +import ( + "context" + "fmt" + "log" + "math" + "strconv" + "time" + + "github.com/breez/lspd/history" + "github.com/breez/lspd/lightning" + "github.com/elementsproject/glightning/glightning" +) + +type ForwardSync struct { + createdOffset uint64 + updatedOffset uint64 + nodeid []byte + client *ClnClient + store history.Store +} + +func NewForwardSync(nodeid []byte, client *ClnClient, store history.Store) *ForwardSync { + return &ForwardSync{ + nodeid: nodeid, + client: client, + store: store, + } +} + +var forwardSyncInterval time.Duration = time.Minute + +func (s *ForwardSync) ForwardsSynchronize(ctx context.Context) { + s.createdOffset, s.updatedOffset, _ = s.store.FetchClnForwardOffsets(ctx, s.nodeid) + s.forwardsSynchronizeOnce(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(forwardSyncInterval): + } + + s.forwardsSynchronizeOnce(ctx) + } +} + +func (s *ForwardSync) forwardsSynchronizeOnce(ctx context.Context) { + s.forwardCreatedSynchronizeOnce(ctx) + s.forwardUpdatedSynchronizeOnce(ctx) +} + +func (s *ForwardSync) forwardCreatedSynchronizeOnce(ctx context.Context) { + log.Printf("forwardCreatedSynchronizeOnce(%x) - Begin", s.nodeid) + var limit uint32 = 10000 + endReached := false + round := 0 + for !endReached { + log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.createdOffset) + var forwards []*history.Forward + var newCreatedOffset uint64 + var err error + forwards, newCreatedOffset, endReached, err = s.listForwards("created", s.createdOffset, limit) + if err != nil { + log.Printf("forwardCreatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err) + return + } + + log.Printf("forwardCreatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.createdOffset, len(forwards)) + if len(forwards) == 0 { + break + } + + err = s.store.InsertForwards(ctx, forwards, s.nodeid) + if err != nil { + log.Printf("forwardCreatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err) + return + } + + err = s.store.SetClnForwardOffsets(ctx, s.nodeid, newCreatedOffset, s.updatedOffset) + if err != nil { + log.Printf("forwardCreatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err) + return + } + + s.createdOffset = newCreatedOffset + } + + log.Printf("forwardCreatedSynchronizeOnce(%x) - Done", s.nodeid) +} + +func (s *ForwardSync) forwardUpdatedSynchronizeOnce(ctx context.Context) { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - Begin", s.nodeid) + var limit uint32 = 10000 + endReached := false + round := 0 + for !endReached { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v", s.nodeid, round, s.updatedOffset) + var forwards []*history.Forward + var newUpdatedOffset uint64 + var err error + forwards, newUpdatedOffset, endReached, err = s.listForwards("updated", s.updatedOffset, limit) + if err != nil { + log.Printf("forwardUpdatedSynchronizeOnce(%x)- ListForwards error: %v", s.nodeid, err) + return + } + + log.Printf("forwardUpdatedSynchronizeOnce(%x) - round %v, offset %v yielded %v forwards", s.nodeid, round, s.updatedOffset, len(forwards)) + if len(forwards) == 0 { + break + } + + err = s.store.UpdateForwards(ctx, forwards, s.nodeid) + if err != nil { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.InsertForwards error: %v", s.nodeid, err) + return + } + + err = s.store.SetClnForwardOffsets(ctx, s.nodeid, s.createdOffset, newUpdatedOffset) + if err != nil { + log.Printf("forwardUpdatedSynchronizeOnce(%x) - store.SetClnForwardOffsets error: %v", s.nodeid, err) + return + } + + s.updatedOffset = newUpdatedOffset + } + + log.Printf("forwardUpdatedSynchronizeOnce(%x) - Done", s.nodeid) +} + +func (s *ForwardSync) listForwards(index string, offset uint64, limit uint32) ([]*history.Forward, uint64, bool, error) { + var response struct { + Forwards []Forward `json:"forwards"` + } + + client, err := s.client.getClient() + if err != nil { + return nil, 0, false, err + } + + err = client.Request(&glightning.ListForwardsRequest{ + Status: "settled", + Index: index, + Start: offset, + Limit: limit * 2, + }, &response) + if err != nil { + return nil, 0, false, err + } + + var result []*history.Forward + endReached := len(response.Forwards) < int(limit) + var lastIndex *uint64 + for _, forward := range response.Forwards { + in, err := lightning.NewShortChannelIDFromString(forward.InChannel) + if err != nil { + return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.InChannel, err) + } + out, err := lightning.NewShortChannelIDFromString(forward.OutChannel) + if err != nil { + return nil, 0, false, fmt.Errorf("NewShortChannelIDFromString(%s) error: %w", forward.OutChannel, err) + } + + sec, dec := math.Modf(forward.ResolvedTime) + result = append(result, &history.Forward{ + Identifier: strconv.FormatUint(forward.CreatedIndex, 10), + InChannel: *in, + OutChannel: *out, + InMsat: forward.InMsat.MSat(), + OutMsat: forward.OutMsat.MSat(), + ResolvedTime: time.Unix(int64(sec), int64(dec*(1e9))), + }) + if index == "created" { + lastIndex = &forward.CreatedIndex + } else { + lastIndex = &forward.UpdatedIndex + } + } + + var newOffset uint64 + if lastIndex == nil { + newOffset = 0 + } else { + newOffset = *lastIndex + 1 + } + return result, newOffset, endReached, nil +} + +type Forward struct { + CreatedIndex uint64 `json:"created_index"` + UpdatedIndex uint64 `json:"updated_index"` + InChannel string `json:"in_channel"` + OutChannel string `json:"out_channel"` + InMsat glightning.Amount `json:"in_msat"` + OutMsat glightning.Amount `json:"out_msat"` + Status string `json:"status"` + PaymentHash string `json:"payment_hash"` + ReceivedTime float64 `json:"received_time"` + ResolvedTime float64 `json:"resolved_time"` +} diff --git a/history/store.go b/history/store.go index 29107e02..2238e629 100644 --- a/history/store.go +++ b/history/store.go @@ -17,6 +17,20 @@ type ChannelUpdate struct { LastUpdate time.Time } +type Forward struct { + Identifier string + InChannel lightning.ShortChannelID + OutChannel lightning.ShortChannelID + InMsat uint64 + OutMsat uint64 + ResolvedTime time.Time +} + type Store interface { UpdateChannels(ctx context.Context, updates []*ChannelUpdate) error + InsertForwards(ctx context.Context, forwards []*Forward, nodeId []byte) error + UpdateForwards(ctx context.Context, forwards []*Forward, nodeId []byte) error + FetchClnForwardOffsets(ctx context.Context, nodeId []byte) (uint64, uint64, error) + SetClnForwardOffsets(ctx context.Context, nodeId []byte, created uint64, updated uint64) error + FetchLndForwardOffset(ctx context.Context, nodeId []byte) (*time.Time, error) } diff --git a/lnd/forwards_sync.go b/lnd/forwards_sync.go new file mode 100644 index 00000000..8e9d09ef --- /dev/null +++ b/lnd/forwards_sync.go @@ -0,0 +1,93 @@ +package lnd + +import ( + "context" + "log" + "strconv" + "time" + + "github.com/breez/lspd/history" + "github.com/breez/lspd/lightning" + "github.com/lightningnetwork/lnd/lnrpc" +) + +type ForwardSync struct { + nodeid []byte + client *LndClient + store history.Store +} + +func NewForwardSync( + nodeid []byte, + client *LndClient, + store history.Store, +) *ForwardSync { + return &ForwardSync{ + nodeid: nodeid, + client: client, + store: store, + } +} + +var forwardChannelSyncInterval time.Duration = time.Minute * 5 + +func (s *ForwardSync) ForwardsSynchronize(ctx context.Context) { + s.forwardsSynchronizeOnce(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(forwardChannelSyncInterval): + } + + s.forwardsSynchronizeOnce(ctx) + } +} + +func (s *ForwardSync) forwardsSynchronizeOnce(ctx context.Context) { + last, err := s.store.FetchLndForwardOffset(ctx, s.nodeid) + if err != nil { + log.Printf("forwardsSynchronizeOnce(%x) - FetchLndForwardOffset err: %v", s.nodeid, err) + return + } + + var startTime uint64 + if last != nil { + startTime = uint64(last.UnixNano()) + } + + for { + forwardHistory, err := s.client.client.ForwardingHistory(context.Background(), &lnrpc.ForwardingHistoryRequest{ + StartTime: startTime, + NumMaxEvents: 10000, + }) + if err != nil { + log.Printf("forwardsSynchronizeOnce(%x) - ForwardingHistory error: %v", s.nodeid, err) + return + } + + log.Printf("forwardsSynchronizeOnce(%x) - startTime: %v, Events: %v", s.nodeid, startTime, len(forwardHistory.ForwardingEvents)) + if len(forwardHistory.ForwardingEvents) == 0 { + break + } + + forwards := make([]*history.Forward, len(forwardHistory.ForwardingEvents)) + for i, f := range forwardHistory.ForwardingEvents { + forwards[i] = &history.Forward{ + Identifier: strconv.FormatUint(f.TimestampNs, 10) + "|" + strconv.FormatUint(f.AmtInMsat, 10), + InChannel: lightning.ShortChannelID(f.ChanIdIn), + OutChannel: lightning.ShortChannelID(f.ChanIdOut), + InMsat: f.AmtInMsat, + OutMsat: f.AmtOutMsat, + ResolvedTime: time.Unix(0, int64(f.TimestampNs)), + } + startTime = f.TimestampNs + } + s.store.InsertForwards(ctx, forwards, s.nodeid) + if err != nil { + log.Printf("forwardsSynchronizeOnce(%x) - store.InsertForwards() error: %v", s.nodeid, err) + return + } + } +} diff --git a/lsps2/mocks.go b/lsps2/mocks.go index 62d57912..717fb33c 100644 --- a/lsps2/mocks.go +++ b/lsps2/mocks.go @@ -102,6 +102,21 @@ type mockHistoryStore struct{} func (s *mockHistoryStore) UpdateChannels(ctx context.Context, updates []*history.ChannelUpdate) error { return nil } +func (s *mockHistoryStore) InsertForwards(ctx context.Context, forwards []*history.Forward, nodeId []byte) error { + return nil +} +func (s *mockHistoryStore) UpdateForwards(ctx context.Context, forwards []*history.Forward, nodeId []byte) error { + return nil +} +func (s *mockHistoryStore) FetchClnForwardOffsets(ctx context.Context, nodeId []byte) (uint64, uint64, error) { + return 0, 0, ErrNotImplemented +} +func (s *mockHistoryStore) SetClnForwardOffsets(ctx context.Context, nodeId []byte, created uint64, updated uint64) error { + return nil +} +func (s *mockHistoryStore) FetchLndForwardOffset(ctx context.Context, nodeId []byte) (*time.Time, error) { + return nil, ErrNotImplemented +} type mockLightningClient struct { openResponses []*wire.OutPoint diff --git a/main.go b/main.go index 3538966a..d8416634 100644 --- a/main.go +++ b/main.go @@ -128,6 +128,9 @@ func main() { } client.StartListeners() + + forwardSync := lnd.NewForwardSync(node.NodeId, client, historyStore) + go forwardSync.ForwardsSynchronize(ctx) interceptor := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, historyStore, openingService, feeEstimator, feeStrategy, notificationService) htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, interceptor) if err != nil { @@ -141,6 +144,8 @@ func main() { log.Fatalf("failed to initialize CLN client: %v", err) } + forwardSync := cln.NewForwardSync(node.NodeId, client, historyStore) + go forwardSync.ForwardsSynchronize(ctx) legacyHandler := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, historyStore, openingService, feeEstimator, feeStrategy, notificationService) lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, historyStore, openingService, client, feeEstimator, &lsps2.InterceptorConfig{ NodeId: node.NodeId, diff --git a/postgresql/history_store.go b/postgresql/history_store.go index acca5ca2..675656d2 100644 --- a/postgresql/history_store.go +++ b/postgresql/history_store.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "time" "github.com/GoWebProd/uuid7" "github.com/breez/lspd/history" @@ -67,6 +68,36 @@ func (cfe *copyFromChanUpdates) Err() error { return cfe.err } +type copyFromForwards struct { + forwards []*history.Forward + nodeid []byte + idx int + err error +} + +func (cfe *copyFromForwards) Next() bool { + cfe.idx++ + return cfe.idx < len(cfe.forwards) +} + +func (cfe *copyFromForwards) Values() ([]interface{}, error) { + forward := cfe.forwards[cfe.idx] + values := []interface{}{ + forward.Identifier, + forward.ResolvedTime.UnixNano(), + cfe.nodeid, + int64(uint64(forward.InChannel)), + int64(uint64(forward.OutChannel)), + int64(forward.InMsat), + int64(forward.OutMsat), + } + return values, nil +} + +func (cfe *copyFromForwards) Err() error { + return cfe.err +} + type HistoryStore struct { pool *pgxpool.Pool generator *uuid7.Generator @@ -129,3 +160,188 @@ func (s *HistoryStore) UpdateChannels( return tx.Commit(ctx) } + +func (s *HistoryStore) InsertForwards( + ctx context.Context, + forwards []*history.Forward, + nodeid []byte, +) error { + if len(forwards) == 0 { + return nil + } + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("pgxPool.Begin() error: %w", err) + } + defer tx.Rollback(ctx) + + rowSrc := copyFromForwards{ + forwards: forwards, + nodeid: nodeid, + idx: -1, + } + + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_table ON COMMIT DROP AS + SELECT * + FROM forwarding_history + WITH NO DATA; + `) + if err != nil { + return fmt.Errorf("CREATE TEMP TABLE error: %w", err) + } + + count, err := tx.CopyFrom( + ctx, + pgx.Identifier{"tmp_table"}, + []string{"identifier", "resolved_time", "nodeid", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, + &rowSrc, + ) + if err != nil { + return fmt.Errorf("CopyFrom() error: %w", err) + } + log.Printf("InsertForwards node %x count1: %v", nodeid, count) + + cmdTag, err := tx.Exec(ctx, ` + INSERT INTO forwarding_history + SELECT * + FROM tmp_table + ON CONFLICT (nodeid, identifier) DO NOTHING + `) + if err != nil { + return fmt.Errorf("INSERT INTO forwarding_history error: %w", err) + } + log.Printf("InsertForwards node %x count2: %v", nodeid, cmdTag.RowsAffected()) + + return tx.Commit(ctx) +} + +func (s *HistoryStore) UpdateForwards( + ctx context.Context, + forwards []*history.Forward, + nodeid []byte, +) error { + if len(forwards) == 0 { + return nil + } + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("pgxPool.Begin() error: %w", err) + } + defer tx.Rollback(ctx) + + rowSrc := copyFromForwards{ + forwards: forwards, + nodeid: nodeid, + idx: -1, + } + + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_table ON COMMIT DROP AS + SELECT * + FROM forwarding_history + WITH NO DATA; + `) + if err != nil { + return fmt.Errorf("CREATE TEMP TABLE error: %w", err) + } + + count, err := tx.CopyFrom( + ctx, + pgx.Identifier{"tmp_table"}, + []string{"identifier", "resolved_time", "nodeid", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, + &rowSrc, + ) + if err != nil { + return fmt.Errorf("CopyFrom() error: %w", err) + } + log.Printf("UpdateForwards node %x count1: %v", nodeid, count) + + cmdTag, err := tx.Exec(ctx, ` + INSERT INTO forwarding_history + SELECT * + FROM tmp_table + ON CONFLICT (nodeid, identifier) DO UPDATE SET + resolved_time = EXCLUDED.resolved_time, + chanid_in = EXCLUDED.chanid_in, + chanid_out = EXCLUDED.chanid_out, + amt_msat_in = EXCLUDED.amt_msat_in, + amt_msat_out = EXCLUDED.amt_msat_out + `) + if err != nil { + return fmt.Errorf("INSERT INTO forwarding_history error: %w", err) + } + log.Printf("UpdateForwards node %x count2: %v", nodeid, cmdTag.RowsAffected()) + + return tx.Commit(ctx) +} + +func (s *HistoryStore) FetchClnForwardOffsets( + ctx context.Context, + nodeId []byte, +) (uint64, uint64, error) { + row := s.pool.QueryRow(ctx, ` + SELECT last_created_index, last_updated_index + FROM public.cln_forwarding_history_offsets + WHERE nodeid = $1 + `, + nodeId) + + var created int64 + var updated int64 + err := row.Scan(&created, &updated) + if err == pgx.ErrNoRows { + return 0, 0, nil + } + if err != nil { + return 0, 0, err + } + + return uint64(created), uint64(updated), nil +} + +func (s *HistoryStore) FetchLndForwardOffset( + ctx context.Context, + nodeId []byte, +) (*time.Time, error) { + row := s.pool.QueryRow(ctx, ` + SELECT MAX(resolved_time) + FROM forwarding_history + WHERE nodeid = $1 + `, + nodeId) + var t *int64 + err := row.Scan(&t) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + if t == nil { + return nil, nil + } + + tt := time.Unix(0, *t) + return &tt, nil +} + +func (s *HistoryStore) SetClnForwardOffsets( + ctx context.Context, + nodeId []byte, + created uint64, + updated uint64, +) error { + _, err := s.pool.Exec(ctx, ` + INSERT INTO public.cln_forwarding_history_offsets (nodeid, last_created_index, last_updated_index) + VALUES($1, $2, $3) + ON CONFLICT (nodeid) DO UPDATE SET last_created_index = EXCLUDED.last_created_index, last_updated_index = EXCLUDED.last_updated_index + `, + nodeId, + int64(created), + int64(updated), + ) + return err +}