Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lsps2 forwarding #125

Merged
merged 17 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cln/cln_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (c *ClnClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh
return &lightning.GetChannelResult{
InitialChannelID: *initialChanID,
ConfirmedChannelID: *confirmedChanID,
HtlcMinimumMsat: c.HtlcMinMilliSatoshi,
}, nil
}
}
Expand Down
96 changes: 76 additions & 20 deletions cln_plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/breez/lspd/cln_plugin/proto"
orderedmap "github.com/wk8/go-ordered-map/v2"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
Expand Down Expand Up @@ -52,6 +53,7 @@ type server struct {
htlcStream proto.ClnPlugin_HtlcStreamServer
htlcSendQueue chan *htlcAcceptedMsg
htlcRecvQueue chan *htlcResultMsg
inflightHtlcs *orderedmap.OrderedMap[string, *htlcAcceptedMsg]
custommsgNewSubscriber chan struct{}
custommsgStream proto.ClnPlugin_CustomMsgStreamServer
custommsgSendQueue chan *custommsgMsg
Expand All @@ -71,6 +73,7 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server {
// cln plugin. If there is no subscriber active within the subscriber
// timeout period these results can be put directly on the receive queue.
htlcRecvQueue: make(chan *htlcResultMsg, 10000),
inflightHtlcs: orderedmap.New[string, *htlcAcceptedMsg](),
custommsgRecvQueue: make(chan *custommsgResultMsg, 10000),
started: make(chan struct{}),
startError: make(chan error, 1),
Expand Down Expand Up @@ -162,6 +165,19 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
return fmt.Errorf("already subscribed")
}

newTimeout := time.Now().Add(s.subscriberTimeout)
// Replay in-flight htlcs in fifo order
for pair := s.inflightHtlcs.Oldest(); pair != nil; pair = pair.Next() {
err := sendHtlcAccepted(stream, pair.Value)
if err != nil {
s.mtx.Unlock()
return err
}

// Reset the subscriber timeout for this htlc.
pair.Value.timeout = newTimeout
}

s.htlcStream = stream

// Notify listeners that a new subscriber is active. Replace the chan with
Expand Down Expand Up @@ -199,6 +215,9 @@ func (s *server) ReceiveHtlcResolution() (string, interface{}) {
case <-s.done:
return "", nil
case msg := <-s.htlcRecvQueue:
s.mtx.Lock()
s.inflightHtlcs.Delete(msg.id)
s.mtx.Unlock()
return msg.id, msg.result
}
}
Expand Down Expand Up @@ -258,31 +277,22 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
}
}

// Add the htlc to in-flight htlcs
s.mtx.Lock()
s.inflightHtlcs.Set(msg.id, msg)

// There is a subscriber. Attempt to send the htlc_accepted message.
err := stream.Send(&proto.HtlcAccepted{
Correlationid: msg.id,
Onion: &proto.Onion{
Payload: msg.htlc.Onion.Payload,
ShortChannelId: msg.htlc.Onion.ShortChannelId,
ForwardMsat: msg.htlc.Onion.ForwardMsat,
OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue,
SharedSecret: msg.htlc.Onion.SharedSecret,
NextOnion: msg.htlc.Onion.NextOnion,
},
Htlc: &proto.Htlc{
ShortChannelId: msg.htlc.Htlc.ShortChannelId,
Id: msg.htlc.Htlc.Id,
AmountMsat: msg.htlc.Htlc.AmountMsat,
CltvExpiry: msg.htlc.Htlc.CltvExpiry,
CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative,
PaymentHash: msg.htlc.Htlc.PaymentHash,
},
ForwardTo: msg.htlc.ForwardTo,
})
err := sendHtlcAccepted(stream, msg)

// If there is no error, we're done.
if err == nil {
s.mtx.Unlock()
return
} else {
// Remove the htlc from inflight htlcs again on error, so it won't
// get replayed twice in a row.
s.inflightHtlcs.Delete(msg.id)
s.mtx.Unlock()
}

// If we end up here, there was an error sending the message to the
Expand Down Expand Up @@ -324,6 +334,11 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
s.mtx.Lock()
stream := s.htlcStream
ns := s.htlcnewSubscriber
oldestHtlc := s.inflightHtlcs.Oldest()
var htlcTimeout time.Duration = 1 << 62 // practically infinite
if oldestHtlc != nil {
htlcTimeout = time.Until(oldestHtlc.Value.timeout)
}
s.mtx.Unlock()

if stream == nil {
Expand All @@ -335,6 +350,24 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
case <-ns:
log.Printf("New subscription available for htlc receive, continue receive.")
continue
case <-time.After(htlcTimeout):
log.Printf(
"WARNING: htlc with id '%s' timed out after '%v' waiting "+
"for grpc subscriber: %+v",
oldestHtlc.Value.id,
s.subscriberTimeout,
oldestHtlc.Value.htlc,
)

// If the subscriber timeout expires while holding a htlc
// we short circuit the htlc by sending the default result
// (continue) to cln.
return &proto.HtlcResolution{
Correlationid: oldestHtlc.Value.id,
Outcome: &proto.HtlcResolution_Continue{
Continue: &proto.HtlcContinue{},
},
}
}
}

Expand Down Expand Up @@ -561,3 +594,26 @@ func (s *server) defaultResult() interface{} {
"result": "continue",
}
}

func sendHtlcAccepted(stream proto.ClnPlugin_HtlcStreamServer, msg *htlcAcceptedMsg) error {
return stream.Send(&proto.HtlcAccepted{
Correlationid: msg.id,
Onion: &proto.Onion{
Payload: msg.htlc.Onion.Payload,
ShortChannelId: msg.htlc.Onion.ShortChannelId,
ForwardMsat: msg.htlc.Onion.ForwardMsat,
OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue,
SharedSecret: msg.htlc.Onion.SharedSecret,
NextOnion: msg.htlc.Onion.NextOnion,
},
Htlc: &proto.Htlc{
ShortChannelId: msg.htlc.Htlc.ShortChannelId,
Id: msg.htlc.Htlc.Id,
AmountMsat: msg.htlc.Htlc.AmountMsat,
CltvExpiry: msg.htlc.Htlc.CltvExpiry,
CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative,
PaymentHash: msg.htlc.Htlc.PaymentHash,
},
ForwardTo: msg.htlc.ForwardTo,
})
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ require (
require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/ethereum/go-ethereum v1.10.17 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/lightninglabs/neutrino/cache v1.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/moby/term v0.0.0-20221120202655-abb19827d345 // indirect
github.com/morikuni/aec v1.0.0 // indirect
Expand Down Expand Up @@ -153,6 +156,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
Expand Down
1 change: 1 addition & 0 deletions lightning/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type GetInfoResult struct {
type GetChannelResult struct {
InitialChannelID basetypes.ShortChannelID
ConfirmedChannelID basetypes.ShortChannelID
HtlcMinimumMsat uint64
}

type OpenChannelRequest struct {
Expand Down
1 change: 1 addition & 0 deletions lnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (c *LndClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh
return &lightning.GetChannelResult{
InitialChannelID: basetypes.ShortChannelID(c.ChanId),
ConfirmedChannelID: basetypes.ShortChannelID(confirmedChanId),
HtlcMinimumMsat: c.LocalConstraints.MinHtlcMsat,
}, nil
}
}
Expand Down
143 changes: 143 additions & 0 deletions lsps2/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package lsps2

import (
"context"
"errors"
"fmt"
"time"

"github.com/breez/lspd/basetypes"
"github.com/breez/lspd/chain"
"github.com/breez/lspd/lightning"
"github.com/breez/lspd/shared"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
)

var ErrNotImplemented = errors.New("not implemented")

type mockNodesService struct {
node *shared.Node
err error
}

func (m *mockNodesService) GetNode(token string) (*shared.Node, error) {
return m.node, m.err
}

func (m *mockNodesService) GetNodes() []*shared.Node {
return []*shared.Node{m.node}
}

type mockOpeningService struct {
menu []*shared.OpeningFeeParams
err error
invalid bool
}

func (m *mockOpeningService) GetFeeParamsMenu(
token string,
privateKey *btcec.PrivateKey,
) ([]*shared.OpeningFeeParams, error) {
return m.menu, m.err
}

func (m *mockOpeningService) ValidateOpeningFeeParams(
params *shared.OpeningFeeParams,
publicKey *btcec.PublicKey,
) bool {
return !m.invalid
}

type mockLsps2Store struct {
err error
req *RegisterBuy
registrations map[uint64]*BuyRegistration
delay time.Duration
}

func (s *mockLsps2Store) RegisterBuy(ctx context.Context, req *RegisterBuy) error {
s.req = req
return s.err
}

func (s *mockLsps2Store) GetBuyRegistration(ctx context.Context, scid basetypes.ShortChannelID) (*BuyRegistration, error) {
if s.delay.Nanoseconds() != 0 {
<-time.After(s.delay)
}
reg, ok := s.registrations[uint64(scid)]
if !ok {
return nil, ErrNotFound
}
return reg, nil
}

func (s *mockLsps2Store) SetChannelOpened(ctx context.Context, channelOpened *ChannelOpened) error {
return s.err
}

func (s *mockLsps2Store) SetCompleted(ctx context.Context, registrationId uint64) error {
return nil
}

type mockLightningClient struct {
openResponses []*wire.OutPoint
openRequests []*lightning.OpenChannelRequest
getChanRequests []*wire.OutPoint
getChanResponses []*lightning.GetChannelResult
}

func (c *mockLightningClient) GetInfo() (*lightning.GetInfoResult, error) {
return nil, ErrNotImplemented
}
func (c *mockLightningClient) IsConnected(destination []byte) (bool, error) {
return false, ErrNotImplemented
}
func (c *mockLightningClient) OpenChannel(req *lightning.OpenChannelRequest) (*wire.OutPoint, error) {
c.openRequests = append(c.openRequests, req)
if len(c.openResponses) < len(c.openRequests) {
return nil, fmt.Errorf("no responses available")
}

res := c.openResponses[len(c.openRequests)-1]
if res == nil {
return nil, fmt.Errorf("no response available")
}
return res, nil
}

func (c *mockLightningClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*lightning.GetChannelResult, error) {
c.getChanRequests = append(c.getChanRequests, &channelPoint)
if len(c.getChanResponses) < len(c.getChanRequests) {
return nil, fmt.Errorf("no responses available")
}

res := c.getChanResponses[len(c.getChanRequests)-1]
if res == nil {
return nil, fmt.Errorf("no response available")
}
return res, nil
}

func (c *mockLightningClient) GetPeerId(scid *basetypes.ShortChannelID) ([]byte, error) {
return nil, ErrNotImplemented
}
func (c *mockLightningClient) GetNodeChannelCount(nodeID []byte) (int, error) {
return 0, ErrNotImplemented
}
func (c *mockLightningClient) GetClosedChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) {
return nil, ErrNotImplemented
}
func (c *mockLightningClient) WaitOnline(peerID []byte, deadline time.Time) error {
return ErrNotImplemented
}
func (c *mockLightningClient) WaitChannelActive(peerID []byte, deadline time.Time) error {
return ErrNotImplemented
}

type mockFeeEstimator struct {
}

func (f *mockFeeEstimator) EstimateFeeRate(context.Context, chain.FeeStrategy) (*chain.FeeEstimation, error) {
return nil, ErrNotImplemented
}
Loading