diff --git a/cmd/lspd_revenue_cli/main.go b/cmd/lspd_revenue_cli/main.go index d9e3fbd9..21e19a8f 100644 --- a/cmd/lspd_revenue_cli/main.go +++ b/cmd/lspd_revenue_cli/main.go @@ -22,6 +22,7 @@ func main() { } app.Commands = []cli.Command{ exportForwardsCommand, + revenueCommand, } if err := app.Run(os.Args); err != nil { log.Fatal(err) diff --git a/cmd/lspd_revenue_cli/revenue.go b/cmd/lspd_revenue_cli/revenue.go new file mode 100644 index 00000000..d369134f --- /dev/null +++ b/cmd/lspd_revenue_cli/revenue.go @@ -0,0 +1,650 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "os" + "path/filepath" + "sort" + + "github.com/breez/lspd/postgresql" + "github.com/urfave/cli" +) + +const ( + FirstBehind = -1 + SecondBehind = 1 +) + +var revenueCommand = cli.Command{ + Name: "revenue", + Usage: "Get a revenue report.", + Flags: []cli.Flag{ + cli.Uint64Flag{ + Name: "start", + Required: true, + Usage: "Start time of forwards taken into account as a UTC unix timestamp in seconds.", + }, + cli.Uint64Flag{ + Name: "end", + Required: true, + Usage: "End time of forwards taken into account as a UTC unix timestamp in seconds.", + }, + cli.StringFlag{ + Name: "import", + Required: false, + Usage: "Optional imports to consider when generating the revenue report. Imports are files generated from the export-forwards command on other nodes. Used to correlate local forwards to api keys. Can be a glob pattern.", + }, + }, + Action: revenue, +} + +func revenue(ctx *cli.Context) error { + start := ctx.Uint64("start") + if start == 0 { + return fmt.Errorf("start is required") + } + + end := ctx.Uint64("end") + if end == 0 { + return fmt.Errorf("end is required") + } + + startNs := start * 1_000_000_000 + endNs := end * 1_000_000_000 + if startNs > endNs { + return fmt.Errorf("start cannot be after end") + } + + importedForwards, err := getImportedForwards(ctx, startNs, endNs) + if err != nil { + return err + } + + store, err := getStore(ctx) + if err != nil { + return err + } + + forwardsSortedIn, err := store.GetForwards(context.Background(), startNs, endNs) + if err != nil { + return err + } + + forwardsSortedOut := append([]*postgresql.RevenueForward(nil), forwardsSortedIn...) + sort.SliceStable(forwardsSortedOut, func(i, j int) bool { + first := forwardsSortedOut[i] + second := forwardsSortedOut[j] + nodeCompare := bytes.Compare(first.Nodeid, second.Nodeid) + if nodeCompare < 0 { + return true + } + if nodeCompare > 0 { + return false + } + + peerCompare := bytes.Compare(first.PeeridOut, second.PeeridOut) + if peerCompare < 0 { + return true + } + if peerCompare > 0 { + return false + } + + if first.AmtMsatOut < second.AmtMsatOut { + return true + } + if first.AmtMsatOut > second.AmtMsatOut { + return false + } + + if first.ResolvedTime < second.ResolvedTime { + return true + } + if first.ResolvedTime > second.ResolvedTime { + return false + } + + return false + }) + + // Imported forwards help correlate forwards to tokens + matchImportedForwardsSend(forwardsSortedIn, importedForwards) + matchImportedForwardsReceive(forwardsSortedOut, importedForwards) + + // Match forwards from our own nodes multiple times, each iteration represents a 'hop'. + // Moving information about the token used in the route one hop further. + for i := 0; i < 3; i++ { + matchInternalForwards(forwardsSortedIn, forwardsSortedOut) + } + + openChannelHtlcs, err := store.GetOpenChannelHtlcs(context.Background(), startNs, endNs) + if err != nil { + return err + } + + // Some htlcs were used for channel opens. These are matched to actual settled forwards + // to know the part of the fees that were made by channel opens rather than regular forwarding. + matchOpenChannelHtlcs(forwardsSortedOut, openChannelHtlcs) + + revenue := calculateRevenue(forwardsSortedIn) + j, err := json.Marshal(revenue) + if err != nil { + return fmt.Errorf("failed to marshal json: %w", err) + } + + log.Println(j) + return nil +} + +func getImportedForwards(ctx *cli.Context, startNs, endNs uint64) ([]*importedForward, error) { + var importedForwards []*importedForward + importFiles := ctx.String("import") + if importFiles != "" { + matches, err := filepath.Glob(importFiles) + if err != nil { + return nil, fmt.Errorf("failed to read import files: %w", err) + } + + for _, match := range matches { + forwards, err := readForwards(match, startNs, endNs) + if err != nil { + return nil, err + } + + importedForwards = append(importedForwards, forwards...) + } + } + + sort.SliceStable(importedForwards, func(i, j int) bool { + first := importedForwards[i] + second := importedForwards[j] + nodeCompare := bytes.Compare(first.nodeid, second.nodeid) + if nodeCompare < 0 { + return true + } + if nodeCompare > 0 { + return false + } + + peerCompare := bytes.Compare(first.peerid, second.peerid) + if peerCompare < 0 { + return true + } + if peerCompare > 0 { + return false + } + + if first.amountMsat < second.amountMsat { + return true + } + if first.amountMsat > second.amountMsat { + return false + } + + if first.resolvedTime < second.resolvedTime { + return true + } + if first.resolvedTime > second.resolvedTime { + return false + } + return false + }) + + return importedForwards, nil +} + +func readForwards(fileName string, startNs, endNs uint64) ([]*importedForward, error) { + f, err := os.Open(fileName) + if err != nil { + return nil, fmt.Errorf("failed to open %s: %w", fileName, err) + } + + defer f.Close() + + b, err := io.ReadAll(f) + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", fileName, err) + } + + var exported []*postgresql.ExportedForward + err = json.Unmarshal(b, &exported) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal %s: %w", fileName, err) + } + + var result []*importedForward + for _, imp := range exported { + if imp.Token == "" { + continue + } + + // Filter out resolved on times outside our range too (since this is external data). + resolvedTime := uint64(imp.ResolvedTime.UnixNano()) + if resolvedTime < startNs || resolvedTime >= endNs { + continue + } + + result = append(result, &importedForward{ + nodeid: imp.ExternalNodeId, + peerid: imp.NodeId, + amountMsat: imp.AmountMsat, + isCorrelated: false, + token: imp.Token, + resolvedTime: resolvedTime, + direction: imp.Direction, + }) + } + + return result, nil +} + +// Matches imported forwards to local forwards, in order to isolate the token used. The token is then set on the +// corresponding local forward. This function sets the token used by the sender. +func matchImportedForwardsSend(forwardsSortedIn []*postgresql.RevenueForward, importedForwards []*importedForward) { + forwardIndex := 0 + importedIndex := 0 + for { + if forwardIndex >= len(forwardsSortedIn) { + break + } + + if importedIndex >= len(importedForwards) { + break + } + + importedForward := importedForwards[importedIndex] + if importedForward.direction != "send" { + importedIndex++ + continue + } + forward := forwardsSortedIn[forwardIndex] + behind := compare(forward, importedForward) + if behind == FirstBehind { + forwardIndex++ + continue + } + if behind == SecondBehind { + importedIndex++ + continue + } + + // same node, same peer, same amount + // if the forward is already correlated, go to the next + if forward.SendToken != nil { + forwardIndex++ + continue + } + if importedForward.isCorrelated { + importedIndex++ + continue + } + + // TODO: It would be better to find the best overlap in time range for these forwards + importedForward.isCorrelated = true + forward.SendToken = &importedForward.token + forwardIndex++ + importedIndex++ + } +} + +// Matches imported forwards to local forwards, in order to isolate the token used. The token is then set on the +// corresponding local forward. This function sets the token used by the recipient. +func matchImportedForwardsReceive(forwardsSortedOut []*postgresql.RevenueForward, importedForwards []*importedForward) { + forwardIndex := 0 + importedIndex := 0 + for { + if forwardIndex >= len(forwardsSortedOut) { + break + } + + if importedIndex >= len(importedForwards) { + break + } + + importedForward := importedForwards[importedIndex] + if importedForward.direction != "receive" { + importedIndex++ + continue + } + forward := forwardsSortedOut[forwardIndex] + behind := compare(forward, importedForward) + if behind == FirstBehind { + forwardIndex++ + continue + } + if behind == SecondBehind { + importedIndex++ + continue + } + + // same node, same peer, same amount + // if the forward is already correlated, go to the next + if forward.ReceiveToken != nil { + forwardIndex++ + continue + } + if importedForward.isCorrelated { + importedIndex++ + continue + } + + // TODO: It would be better to find the best overlap in time range for these forwards + importedForward.isCorrelated = true + forward.ReceiveToken = &importedForward.token + forwardIndex++ + importedIndex++ + } +} + +func compare(forward *postgresql.RevenueForward, importedForward *importedForward) int { + nodeCompare := bytes.Compare(importedForward.nodeid, forward.Nodeid) + if nodeCompare > 0 { + return FirstBehind + } + if nodeCompare < 0 { + return SecondBehind + } + + peerCompare := bytes.Compare(importedForward.peerid, forward.PeeridIn) + if peerCompare > 0 { + return FirstBehind + } + if peerCompare < 0 { + return SecondBehind + } + + if importedForward.amountMsat > forward.AmtMsatIn { + return FirstBehind + } + if importedForward.amountMsat < forward.AmtMsatIn { + return SecondBehind + } + + return 0 +} + +// Matches forwards from internal nodes in order to isolate the token used for sending/receiving. +// This function will match forwards for a single outgoing forward to a single incoming forward from +// the other node. +func matchInternalForwards(forwardsSortedIn, forwardsSortedOut []*postgresql.RevenueForward) { + outIndex := 0 + inIndex := 0 + + for { + if outIndex >= len(forwardsSortedOut) { + break + } + + if inIndex >= len(forwardsSortedIn) { + break + } + + inForward := forwardsSortedIn[inIndex] + outForward := forwardsSortedOut[outIndex] + behind := compare(outForward, &importedForward{ + nodeid: inForward.PeeridIn, + peerid: inForward.Nodeid, + amountMsat: inForward.AmtMsatIn, + resolvedTime: inForward.ResolvedTime, + }) + if behind == FirstBehind { + outIndex++ + continue + } + if behind == SecondBehind { + inIndex++ + continue + } + + // same node, same peer, same amount + // if the forward is already correlated, go to the next + if outForward.ReceiveToken != nil { + outIndex++ + continue + } + if inForward.SendToken != nil { + inIndex++ + continue + } + + // TODO: It would be better to find the best overlap in time range for these forwards + inForward.SendToken = outForward.SendToken + outForward.ReceiveToken = inForward.SendToken + inIndex++ + outIndex++ + } +} + +func matchOpenChannelHtlcs(forwardsSortedOut []*postgresql.RevenueForward, openChannelHtlcs []*postgresql.OpenChannelHtlc) { + forwards := append([]*postgresql.RevenueForward(nil), forwardsSortedOut...) + sort.SliceStable(forwards, func(i, j int) bool { + first := forwards[i] + second := forwards[j] + nodeCompare := bytes.Compare(first.Nodeid, second.Nodeid) + if nodeCompare < 0 { + return true + } + if nodeCompare > 0 { + return false + } + + peerCompare := bytes.Compare(first.PeeridOut, second.PeeridOut) + if peerCompare < 0 { + return true + } + if peerCompare > 0 { + return false + } + + if first.AmtMsatOut < second.AmtMsatOut { + return true + } + if first.AmtMsatOut > second.AmtMsatOut { + return false + } + + cphCompare := bytes.Compare(first.ChannelPointOut.Hash[:], second.ChannelPointOut.Hash[:]) + if cphCompare < 0 { + return true + } + if cphCompare > 0 { + return false + } + + if first.ChannelPointOut.Index < second.ChannelPointOut.Index { + return true + } + if first.ChannelPointOut.Index > second.ChannelPointOut.Index { + return false + } + + if first.AmtMsatIn < second.AmtMsatIn { + return true + } + if first.AmtMsatIn > second.AmtMsatIn { + return false + } + + if first.ResolvedTime < second.ResolvedTime { + return true + } + if first.ResolvedTime > second.ResolvedTime { + return false + } + + return false + }) + + htlcIndex := 0 + forwardIndex := 0 + for { + if htlcIndex >= len(openChannelHtlcs) { + break + } + + if forwardIndex >= len(forwardsSortedOut) { + break + } + + htlc := openChannelHtlcs[htlcIndex] + forward := forwardsSortedOut[forwardIndex] + behind := compare(forward, &importedForward{ + nodeid: htlc.Nodeid, + peerid: htlc.Peerid, + amountMsat: htlc.ForwardAmountMsat, + }) + if behind == FirstBehind { + forwardIndex++ + continue + } + if behind == SecondBehind { + htlcIndex++ + continue + } + + cphCompare := bytes.Compare(forward.ChannelPointOut.Hash[:], htlc.ChannelPoint.Hash[:]) + if cphCompare < 0 { + forwardIndex++ + continue + } + if cphCompare > 0 { + htlcIndex++ + continue + } + + if forward.ChannelPointOut.Index < htlc.ChannelPoint.Index { + forwardIndex++ + continue + } + if forward.ChannelPointOut.Index > htlc.ChannelPoint.Index { + htlcIndex++ + continue + } + + if forward.AmtMsatIn < htlc.IncomingAmountMsat { + forwardIndex++ + continue + } + if forward.AmtMsatIn > htlc.IncomingAmountMsat { + htlcIndex++ + continue + } + + if forward.OpenChannelHtlc != nil { + forwardIndex++ + continue + } + + forward.OpenChannelHtlc = htlc + htlcIndex++ + forwardIndex++ + } +} + +func calculateRevenue(forwards []*postgresql.RevenueForward) *RevenueResponse { + result := &RevenueResponse{ + Nodes: make([]*NodeRevenue, 0), + } + + var currentNode *NodeRevenue = nil + for _, forward := range forwards { + if currentNode == nil || !bytes.Equal(currentNode.Nodeid, forward.Nodeid) { + currentNode = &NodeRevenue{ + Nodeid: forward.Nodeid, + Tokens: make(map[string]*TokenRevenue, 0), + } + result.Nodes = append(result.Nodes, currentNode) + } + + currentNode.TotalFeesMsat += forward.AmtMsatIn - forward.AmtMsatOut + currentNode.TotalForwardCountMsat++ + if forward.SendToken != nil { + sendToken, ok := currentNode.Tokens[*forward.SendToken] + if !ok { + sendToken = &TokenRevenue{ + Token: *forward.SendToken, + } + currentNode.Tokens[*forward.SendToken] = sendToken + } + + feesMsat := (forward.AmtMsatIn - forward.AmtMsatOut) / 2 + sendToken.TotalFeesMsatSend += feesMsat + sendToken.TotalForwardsSend++ + currentNode.TotalTokenFeesMsat = feesMsat + } + if forward.ReceiveToken != nil { + receiveToken, ok := currentNode.Tokens[*forward.ReceiveToken] + if !ok { + receiveToken = &TokenRevenue{ + Token: *forward.ReceiveToken, + } + currentNode.Tokens[*forward.ReceiveToken] = receiveToken + } + + feesMsat := (forward.AmtMsatIn - forward.AmtMsatOut) / 2 + var openFeesMsat uint64 + if forward.OpenChannelHtlc != nil { + openFeesMsat = forward.OpenChannelHtlc.OriginalAmountMsat - forward.OpenChannelHtlc.ForwardAmountMsat + } + + receiveToken.TotalChannelOpenFees += openFeesMsat + receiveToken.TotalFeesMsatReceive += feesMsat + receiveToken.TotalForwardsReceive++ + currentNode.TotalTokenFeesMsat = feesMsat + currentNode.TotalChannelFeesMsat += openFeesMsat + } + } + + return result +} + +type RevenueResponse struct { + Nodes []*NodeRevenue +} +type NodeRevenue struct { + Nodeid []byte + Tokens map[string]*TokenRevenue + // amt_msat_in - amt_msat_out for every forward by this node + TotalFeesMsat uint64 + + // counting all forwards + TotalForwardCountMsat uint64 + + TotalTokenFeesMsat uint64 + + TotalChannelFeesMsat uint64 +} + +type TokenRevenue struct { + Token string + + // Total forwards on this node where the token was associated for the send side. + TotalForwardsSend uint64 + + // Total forwards on this node where the token was associated for the receive side. + TotalForwardsReceive uint64 + + // Total fees on this node associated with the token for the send side. + TotalFeesMsatSend uint64 + + // Total fees on this node associated with the token for the receive side. + TotalFeesMsatReceive uint64 + + // Total fees associated to channel opens. + TotalChannelOpenFees uint64 +} + +type importedForward struct { + nodeid []byte + peerid []byte + amountMsat uint64 + isCorrelated bool + token string + resolvedTime uint64 + direction string +} diff --git a/postgresql/revenue_cli_store.go b/postgresql/revenue_cli_store.go index 8dd8fadd..a564d287 100644 --- a/postgresql/revenue_cli_store.go +++ b/postgresql/revenue_cli_store.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/breez/lspd/lightning" + "github.com/btcsuite/btcd/wire" "github.com/jackc/pgx/v5/pgxpool" ) @@ -164,3 +166,204 @@ func (s *RevenueCliStore) sanityCheck(ctx context.Context, startNs, endNs uint64 return nil } +type RevenueForward struct { + Identifier string + Nodeid []byte + PeeridIn []byte + PeeridOut []byte + AmtMsatIn uint64 + AmtMsatOut uint64 + ResolvedTime uint64 + ChannelPointIn wire.OutPoint + ChannelPointOut wire.OutPoint + SendToken *string + ReceiveToken *string + OpenChannelHtlc *OpenChannelHtlc +} + +type OpenChannelHtlc struct { + Nodeid []byte + Peerid []byte + ForwardAmountMsat uint64 + OriginalAmountMsat uint64 + IncomingAmountMsat uint64 + ChannelPoint *wire.OutPoint +} + +func (s *RevenueCliStore) GetOpenChannelHtlcs( + ctx context.Context, + startNs uint64, + endNs uint64, +) ([]*OpenChannelHtlc, error) { + // filter htlcs used for channel opens. Only include the ones that may have been actually settled. + openChannelHtlcs, err := s.pool.Query(ctx, ` + SELECT htlc.nodeid + , htlc.peerid + , htlc.forward_amt_msat + , htlc.original_amt_msat + , htlc.incoming_amt_msat + , htlc.funding_tx_id + , htlc.funding_tx_outnum + FROM open_channel_htlcs htlc + INNER JOIN ( + SELECT DISTINCT h.nodeid, h.peerid, c.funding_tx_id, c.funding_tx_outnum + FROM forwarding_history h + INNER JOIN channels c + ON h.nodeid = c.nodeid AND (h.chanid_out = c.confirmed_scid OR h.chanid_out = c.alias_scid) + WHERE h.resolved_time >= $1 AND h.resolved_time < $2 + ) a + ON htlc.nodeid = a.nodeid + AND htlc.peerid = a.peerid + AND htlc.funding_tx_id = a.funding_tx_id + AND htlc.funding_tx_outnum = a.funding_tx_outnum + ORDER BY htlc.nodeid, htlc.peerid, htlc.funding_tx_id, htlc.funding_tx_outnum + `, startNs, endNs) + if err != nil { + return nil, fmt.Errorf("failed to query open channel htlcs: %w", err) + } + defer openChannelHtlcs.Close() + + var result []*OpenChannelHtlc + for openChannelHtlcs.Next() { + var nodeid []byte + var peerid []byte + var forward_amt_msat int64 + var original_amt_msat int64 + var incoming_amt_msat int64 + var funding_tx_id []byte + var funding_tx_outnum uint32 + err = openChannelHtlcs.Scan( + &nodeid, + &peerid, + &forward_amt_msat, + &original_amt_msat, + &incoming_amt_msat, + &funding_tx_id, + &funding_tx_outnum, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan open channel htlc: %w", err) + } + + cp, err := lightning.NewOutPoint(funding_tx_id, funding_tx_outnum) + if err != nil { + return nil, fmt.Errorf("invalid funding outpoint: %w", err) + } + + result = append(result, &OpenChannelHtlc{ + Nodeid: nodeid, + Peerid: peerid, + ForwardAmountMsat: uint64(forward_amt_msat), + OriginalAmountMsat: uint64(original_amt_msat), + IncomingAmountMsat: uint64(incoming_amt_msat), + ChannelPoint: cp, + }) + } + + return result, nil +} + +// Gets all settled forwards in the defined time range. Ordered by nodeid, peerid_in, amt_msat_in, resolved_time +func (s *RevenueCliStore) GetForwards( + ctx context.Context, + startNs uint64, + endNs uint64, +) ([]*RevenueForward, error) { + err := s.sanityCheck(ctx, startNs, endNs) + if err != nil { + return nil, err + } + + ctxc, cancel := context.WithCancel(ctx) + defer cancel() + + // Select all forwards, and include information about the channel and token used + ourForwards, err := s.pool.Query(ctxc, tokenChannelsCte+` + SELECT h.identifier + , h.nodeid + , h.peerid_in + , h.peerid_out + , h.amt_msat_in + , h.amt_msat_out + , h.resolved_time + , c_in.funding_tx_id AS funding_tx_id_in + , c_in.funding_tx_outnum AS funding_tx_outnum_in + , c_out.funding_tx_id AS funding_tx_id_out + , c_out.funding_tx_outnum AS funding_tx_outnum_out + , tc_in.token AS send_token + , tc_out.token AS receive_token + FROM forwarding_history h + INNER JOIN channels c_in + ON h.nodeid = c_in.nodeid AND (h.chanid_in = c_in.confirmed_scid OR h.chanid_in = c_in.alias_scid) + INNER JOIN channels c_out + ON h.nodeid = c_out.nodeid AND (h.chanid_out = c_out.confirmed_scid OR h.chanid_out = c_out.alias_scid) + LEFT JOIN token_channels tc_in + ON c_in.nodeid = tc_in.nodeid AND c_in.funding_tx_id = tc_in.funding_tx_id AND c_in.funding_tx_outnum = tc_in.funding_tx_outnum + LEFT JOIN token_channels tc_out + ON c_out.nodeid = tc_out.nodeid AND c_out.funding_tx_id = tc_out.funding_tx_id AND c_out.funding_tx_outnum = tc_out.funding_tx_outnum + WHERE h.resolved_time >= $1 AND h.resolved_time < $2 + ORDER BY h.nodeid, h.peerid_in, h.amt_msat_in, h.resolved_time + `, startNs, endNs) + if err != nil { + return nil, fmt.Errorf("failed to query our forwards: %w", err) + } + + var forwards []*RevenueForward + for ourForwards.Next() { + var identifier string + var nodeid []byte + var peerid_in []byte + var peerid_out []byte + var amt_msat_in int64 + var amt_msat_out int64 + var resolved_time int64 + var funding_tx_id_in []byte + var funding_tx_outnum_in uint32 + var funding_tx_id_out []byte + var funding_tx_outnum_out uint32 + var send_token *string + var receive_token *string + err = ourForwards.Scan( + &identifier, + &nodeid, + &peerid_in, + &peerid_out, + &amt_msat_in, + &amt_msat_out, + &resolved_time, + &funding_tx_id_in, + &funding_tx_outnum_in, + &funding_tx_id_out, + &funding_tx_outnum_out, + &send_token, + &receive_token, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan our forward: %w", err) + } + + cpIn, err := lightning.NewOutPoint(funding_tx_id_in, funding_tx_outnum_in) + if err != nil { + return nil, fmt.Errorf("invalid funding outpoint: %w", err) + } + cpOut, err := lightning.NewOutPoint(funding_tx_id_out, funding_tx_outnum_out) + if err != nil { + return nil, fmt.Errorf("invalid funding outpoint: %w", err) + } + forwards = append(forwards, &RevenueForward{ + Identifier: identifier, + Nodeid: nodeid, + PeeridIn: peerid_in, + PeeridOut: peerid_out, + AmtMsatIn: uint64(amt_msat_in), + AmtMsatOut: uint64(amt_msat_out), + ResolvedTime: uint64(resolved_time), + ChannelPointIn: *cpIn, + ChannelPointOut: *cpOut, + SendToken: send_token, + ReceiveToken: receive_token, + }) + } + + return forwards, nil +}