Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lsps2 forwarding #125

Merged
merged 17 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions channel_opener_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ func (s *channelOpenerServer) RegisterPayment(
log.Printf("checkPayment(%v, %v) error: %v", pi.IncomingAmountMsat, pi.OutgoingAmountMsat, err)
return nil, fmt.Errorf("checkPayment(%v, %v) error: %v", pi.IncomingAmountMsat, pi.OutgoingAmountMsat, err)
}
params := &interceptor.OpeningFeeParams{
MinMsat: pi.OpeningFeeParams.MinMsat,
params := &shared.OpeningFeeParams{
MinFeeMsat: pi.OpeningFeeParams.MinMsat,
Proportional: pi.OpeningFeeParams.Proportional,
ValidUntil: pi.OpeningFeeParams.ValidUntil,
MaxIdleTime: pi.OpeningFeeParams.MaxIdleTime,
MinLifetime: pi.OpeningFeeParams.MaxIdleTime,
MaxClientToSelfDelay: pi.OpeningFeeParams.MaxClientToSelfDelay,
Promise: pi.OpeningFeeParams.Promise,
}
Expand Down
1 change: 1 addition & 0 deletions cln/cln_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (c *ClnClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh
return &lightning.GetChannelResult{
InitialChannelID: *initialChanID,
ConfirmedChannelID: *confirmedChanID,
HtlcMinimumMsat: c.HtlcMinMilliSatoshi,
}, nil
}
}
Expand Down
96 changes: 76 additions & 20 deletions cln_plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/breez/lspd/cln_plugin/proto"
orderedmap "github.com/wk8/go-ordered-map/v2"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
Expand Down Expand Up @@ -52,6 +53,7 @@ type server struct {
htlcStream proto.ClnPlugin_HtlcStreamServer
htlcSendQueue chan *htlcAcceptedMsg
htlcRecvQueue chan *htlcResultMsg
inflightHtlcs *orderedmap.OrderedMap[string, *htlcAcceptedMsg]
custommsgNewSubscriber chan struct{}
custommsgStream proto.ClnPlugin_CustomMsgStreamServer
custommsgSendQueue chan *custommsgMsg
Expand All @@ -71,6 +73,7 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server {
// cln plugin. If there is no subscriber active within the subscriber
// timeout period these results can be put directly on the receive queue.
htlcRecvQueue: make(chan *htlcResultMsg, 10000),
inflightHtlcs: orderedmap.New[string, *htlcAcceptedMsg](),
custommsgRecvQueue: make(chan *custommsgResultMsg, 10000),
started: make(chan struct{}),
startError: make(chan error, 1),
Expand Down Expand Up @@ -162,6 +165,19 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
return fmt.Errorf("already subscribed")
}

newTimeout := time.Now().Add(s.subscriberTimeout)
// Replay in-flight htlcs in fifo order
for pair := s.inflightHtlcs.Oldest(); pair != nil; pair = pair.Next() {
err := sendHtlcAccepted(stream, pair.Value)
if err != nil {
s.mtx.Unlock()
return err
}

// Reset the subscriber timeout for this htlc.
pair.Value.timeout = newTimeout
}

s.htlcStream = stream

// Notify listeners that a new subscriber is active. Replace the chan with
Expand Down Expand Up @@ -199,6 +215,9 @@ func (s *server) ReceiveHtlcResolution() (string, interface{}) {
case <-s.done:
return "", nil
case msg := <-s.htlcRecvQueue:
s.mtx.Lock()
s.inflightHtlcs.Delete(msg.id)
s.mtx.Unlock()
return msg.id, msg.result
}
}
Expand Down Expand Up @@ -258,31 +277,22 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
}
}

// Add the htlc to in-flight htlcs
s.mtx.Lock()
s.inflightHtlcs.Set(msg.id, msg)

// There is a subscriber. Attempt to send the htlc_accepted message.
err := stream.Send(&proto.HtlcAccepted{
Correlationid: msg.id,
Onion: &proto.Onion{
Payload: msg.htlc.Onion.Payload,
ShortChannelId: msg.htlc.Onion.ShortChannelId,
ForwardMsat: msg.htlc.Onion.ForwardMsat,
OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue,
SharedSecret: msg.htlc.Onion.SharedSecret,
NextOnion: msg.htlc.Onion.NextOnion,
},
Htlc: &proto.Htlc{
ShortChannelId: msg.htlc.Htlc.ShortChannelId,
Id: msg.htlc.Htlc.Id,
AmountMsat: msg.htlc.Htlc.AmountMsat,
CltvExpiry: msg.htlc.Htlc.CltvExpiry,
CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative,
PaymentHash: msg.htlc.Htlc.PaymentHash,
},
ForwardTo: msg.htlc.ForwardTo,
})
err := sendHtlcAccepted(stream, msg)

// If there is no error, we're done.
if err == nil {
s.mtx.Unlock()
return
} else {
// Remove the htlc from inflight htlcs again on error, so it won't
// get replayed twice in a row.
s.inflightHtlcs.Delete(msg.id)
s.mtx.Unlock()
}

// If we end up here, there was an error sending the message to the
Expand Down Expand Up @@ -324,6 +334,11 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
s.mtx.Lock()
stream := s.htlcStream
ns := s.htlcnewSubscriber
oldestHtlc := s.inflightHtlcs.Oldest()
var htlcTimeout time.Duration = 1 << 62 // practically infinite
if oldestHtlc != nil {
htlcTimeout = time.Until(oldestHtlc.Value.timeout)
}
s.mtx.Unlock()

if stream == nil {
Expand All @@ -335,6 +350,24 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
case <-ns:
log.Printf("New subscription available for htlc receive, continue receive.")
continue
case <-time.After(htlcTimeout):
log.Printf(
"WARNING: htlc with id '%s' timed out after '%v' waiting "+
"for grpc subscriber: %+v",
oldestHtlc.Value.id,
s.subscriberTimeout,
oldestHtlc.Value.htlc,
)

// If the subscriber timeout expires while holding a htlc
// we short circuit the htlc by sending the default result
// (continue) to cln.
return &proto.HtlcResolution{
Correlationid: oldestHtlc.Value.id,
Outcome: &proto.HtlcResolution_Continue{
Continue: &proto.HtlcContinue{},
},
}
}
}

Expand Down Expand Up @@ -561,3 +594,26 @@ func (s *server) defaultResult() interface{} {
"result": "continue",
}
}

func sendHtlcAccepted(stream proto.ClnPlugin_HtlcStreamServer, msg *htlcAcceptedMsg) error {
return stream.Send(&proto.HtlcAccepted{
Correlationid: msg.id,
Onion: &proto.Onion{
Payload: msg.htlc.Onion.Payload,
ShortChannelId: msg.htlc.Onion.ShortChannelId,
ForwardMsat: msg.htlc.Onion.ForwardMsat,
OutgoingCltvValue: msg.htlc.Onion.OutgoingCltvValue,
SharedSecret: msg.htlc.Onion.SharedSecret,
NextOnion: msg.htlc.Onion.NextOnion,
},
Htlc: &proto.Htlc{
ShortChannelId: msg.htlc.Htlc.ShortChannelId,
Id: msg.htlc.Htlc.Id,
AmountMsat: msg.htlc.Htlc.AmountMsat,
CltvExpiry: msg.htlc.Htlc.CltvExpiry,
CltvExpiryRelative: msg.htlc.Htlc.CltvExpiryRelative,
PaymentHash: msg.htlc.Htlc.PaymentHash,
},
ForwardTo: msg.htlc.ForwardTo,
})
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ require (
require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/ethereum/go-ethereum v1.10.17 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/lightninglabs/neutrino/cache v1.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/moby/term v0.0.0-20221120202655-abb19827d345 // indirect
github.com/morikuni/aec v1.0.0 // indirect
Expand Down Expand Up @@ -153,6 +156,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
Expand Down
16 changes: 10 additions & 6 deletions interceptor/intercept.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/breez/lspd/config"
"github.com/breez/lspd/lightning"
"github.com/breez/lspd/notifications"
"github.com/breez/lspd/shared"
"github.com/btcsuite/btcd/wire"
"golang.org/x/sync/singleflight"
)
Expand Down Expand Up @@ -49,6 +50,7 @@ type Interceptor struct {
client lightning.Client
config *config.NodeConfig
store InterceptStore
openingStore shared.OpeningStore
feeEstimator chain.FeeEstimator
feeStrategy chain.FeeStrategy
payHashGroup singleflight.Group
Expand All @@ -59,6 +61,7 @@ func NewInterceptor(
client lightning.Client,
config *config.NodeConfig,
store InterceptStore,
openingStore shared.OpeningStore,
feeEstimator chain.FeeEstimator,
feeStrategy chain.FeeStrategy,
notificationService *notifications.NotificationService,
Expand All @@ -67,6 +70,7 @@ func NewInterceptor(
client: client,
config: config,
store: store,
openingStore: openingStore,
feeEstimator: feeEstimator,
feeStrategy: feeStrategy,
notificationService: notificationService,
Expand Down Expand Up @@ -176,11 +180,11 @@ func (i *Interceptor) Intercept(scid *basetypes.ShortChannelID, reqPaymentHash [
// TODO: When opening_fee_params is enforced, turn this check in a temporary channel failure.
if params == nil {
log.Printf("DEPRECATED: Intercepted htlc with deprecated fee mechanism. Using default fees. payment hash: %s", reqPaymentHashStr)
params = &OpeningFeeParams{
MinMsat: uint64(i.config.ChannelMinimumFeeMsat),
params = &shared.OpeningFeeParams{
MinFeeMsat: uint64(i.config.ChannelMinimumFeeMsat),
Proportional: uint32(i.config.ChannelFeePermyriad * 100),
ValidUntil: time.Now().UTC().Add(time.Duration(time.Hour * 24)).Format(basetypes.TIME_FORMAT),
MaxIdleTime: uint32(i.config.MaxInactiveDuration / 600),
MinLifetime: uint32(i.config.MaxInactiveDuration / 600),
MaxClientToSelfDelay: uint32(10000),
}
}
Expand Down Expand Up @@ -353,15 +357,15 @@ func (i *Interceptor) notify(reqPaymentHashStr string, nextHop []byte, isRegiste
return nil
}

func (i *Interceptor) isCurrentChainFeeCheaper(token string, params *OpeningFeeParams) bool {
settings, err := i.store.GetFeeParamsSettings(token)
func (i *Interceptor) isCurrentChainFeeCheaper(token string, params *shared.OpeningFeeParams) bool {
settings, err := i.openingStore.GetFeeParamsSettings(token)
if err != nil {
log.Printf("Failed to get fee params settings: %v", err)
return false
}

for _, setting := range settings {
if setting.Params.MinMsat <= params.MinMsat {
if setting.Params.MinFeeMsat <= params.MinFeeMsat {
return true
}
}
Expand Down
19 changes: 3 additions & 16 deletions interceptor/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,13 @@ package interceptor
import (
"time"

"github.com/breez/lspd/shared"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that it's a nit about naming and it's already in the branch, but I don't think that the name common or lncommon convey more the meaning than "shared"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename that in a separate PR

"github.com/btcsuite/btcd/wire"
)

type OpeningFeeParamsSetting struct {
Validity time.Duration
Params *OpeningFeeParams
}
type OpeningFeeParams struct {
MinMsat uint64 `json:"min_msat,string"`
Proportional uint32 `json:"proportional"`
ValidUntil string `json:"valid_until"`
MaxIdleTime uint32 `json:"max_idle_time"`
MaxClientToSelfDelay uint32 `json:"max_client_to_self_delay"`
Promise string `json:"promise"`
}

type InterceptStore interface {
PaymentInfo(htlcPaymentHash []byte) (string, *OpeningFeeParams, []byte, []byte, []byte, int64, int64, *wire.OutPoint, *string, error)
PaymentInfo(htlcPaymentHash []byte) (string, *shared.OpeningFeeParams, []byte, []byte, []byte, int64, int64, *wire.OutPoint, *string, error)
SetFundingTx(paymentHash []byte, channelPoint *wire.OutPoint) error
RegisterPayment(token string, params *OpeningFeeParams, destination, paymentHash, paymentSecret []byte, incomingAmountMsat, outgoingAmountMsat int64, tag string) error
RegisterPayment(token string, params *shared.OpeningFeeParams, destination, paymentHash, paymentSecret []byte, incomingAmountMsat, outgoingAmountMsat int64, tag string) error
InsertChannel(initialChanID, confirmedChanId uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error
GetFeeParamsSettings(token string) ([]*OpeningFeeParamsSetting, error)
}
1 change: 1 addition & 0 deletions lightning/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type GetInfoResult struct {
type GetChannelResult struct {
InitialChannelID basetypes.ShortChannelID
ConfirmedChannelID basetypes.ShortChannelID
HtlcMinimumMsat uint64
}

type OpenChannelRequest struct {
Expand Down
1 change: 1 addition & 0 deletions lnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (c *LndClient) GetChannel(peerID []byte, channelPoint wire.OutPoint) (*ligh
return &lightning.GetChannelResult{
InitialChannelID: basetypes.ShortChannelID(c.ChanId),
ConfirmedChannelID: basetypes.ShortChannelID(confirmedChanId),
HtlcMinimumMsat: c.LocalConstraints.MinHtlcMsat,
}, nil
}
}
Expand Down
Loading