From ddcf5e01a40b012fd15dd891ce5f61e7b7de9e53 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Mon, 12 Feb 2024 12:32:35 +0100 Subject: [PATCH] cli: add command to export forwards --- .gitignore | 1 + Makefile | 7 +- cmd/lspd_revenue_cli/db.go | 32 +++++ cmd/lspd_revenue_cli/export_forwards.go | 89 +++++++++++++ cmd/lspd_revenue_cli/main.go | 29 +++++ go.mod | 3 + go.sum | 6 + postgresql/revenue_cli_store.go | 166 ++++++++++++++++++++++++ 8 files changed, 332 insertions(+), 1 deletion(-) create mode 100644 cmd/lspd_revenue_cli/db.go create mode 100644 cmd/lspd_revenue_cli/export_forwards.go create mode 100644 cmd/lspd_revenue_cli/main.go create mode 100644 postgresql/revenue_cli_store.go diff --git a/.gitignore b/.gitignore index bdef4c67..cb8041b8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ lspd lspd_plugin lspd_cln_plugin +/lspd_revenue_cli diff --git a/Makefile b/Makefile index 8e43981a..ef813133 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ PKG := github.com/breez/lspd TAG := $(shell git describe --tags --dirty) -release-all: release-lspd release-plugin +release-all: release-lspd release-plugin release-cli release-lspd: go get $(PKG) @@ -11,6 +11,11 @@ release-plugin: go get $(PKG)/cln_plugin/cmd go build -v -trimpath -o lspd_cln_plugin -ldflags="-s -w -X $(PKG)/build.tag=$(TAG)" $(PKG)/cln_plugin/cmd +release-cli: + go get $(PKG)/cmd/lspd_revenue_cli + go build -v -trimpath -o lspd_revenue_cli -ldflags="-s -w -X $(PKG)/build.tag=$(TAG)" $(PKG)/cmd/lspd_revenue_cli + clean: rm -f lspd rm -f lspd_cln_plugin + rm -f lspd_revenue_cli diff --git a/cmd/lspd_revenue_cli/db.go b/cmd/lspd_revenue_cli/db.go new file mode 100644 index 00000000..7c9986b2 --- /dev/null +++ b/cmd/lspd_revenue_cli/db.go @@ -0,0 +1,32 @@ +package main + +import ( + "fmt" + + "github.com/breez/lspd/postgresql" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/urfave/cli" +) + +func getPool(ctx *cli.Context) (*pgxpool.Pool, error) { + url := ctx.String("database-url") + if url == "" { + return nil, fmt.Errorf("database-url is required") + } + + pool, err := postgresql.PgConnect(url) + if err != nil { + return nil, fmt.Errorf("failed to connect to database: %w", err) + } + + return pool, nil +} + +func getStore(ctx *cli.Context) (*postgresql.RevenueCliStore, error) { + pool, err := getPool(ctx) + if err != nil { + return nil, err + } + + return postgresql.NewCliStore(pool), nil +} diff --git a/cmd/lspd_revenue_cli/export_forwards.go b/cmd/lspd_revenue_cli/export_forwards.go new file mode 100644 index 00000000..ca998eba --- /dev/null +++ b/cmd/lspd_revenue_cli/export_forwards.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "time" + + "github.com/urfave/cli" +) + +var exportForwardsCommand = cli.Command{ + Name: "export-forwards", + Usage: "Export forwards with a given peer correlated to an api key for a given time period.", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "node", + Required: true, + Usage: "The public key of your own lightning node to export forwards for.", + }, + cli.StringFlag{ + Name: "peer", + Required: true, + Usage: "The public key of the peer to export the forwards for.", + }, + cli.Uint64Flag{ + Name: "start", + Required: false, + Usage: "Start time of exported forwards as a UTC unix timestamp in seconds. If not set will export from the beginning.", + }, + cli.Uint64Flag{ + Name: "end", + Required: false, + Usage: "End time of exported forwards as a UTC unix timestamp in seconds. If not set will export until now.", + }, + }, + Action: exportForwards, +} + +func exportForwards(ctx *cli.Context) error { + node := ctx.String("node") + if node == "" { + return fmt.Errorf("node is required") + } + nodeId, err := hex.DecodeString(node) + if err != nil || len(nodeId) != 33 { + return fmt.Errorf("node is not a pubkey") + } + + peer := ctx.String("peer") + if peer == "" { + return fmt.Errorf("peer is required") + } + peerId, err := hex.DecodeString(peer) + if err != nil || len(peerId) != 33 { + return fmt.Errorf("peer is not a pubkey") + } + + start := ctx.Uint64("start") + startNs := start * 1_000_000_000 + end := ctx.Uint64("end") + endNs := end * 1_000_000_000 + if endNs == 0 { + endNs = uint64(time.Now().UnixNano()) + } + + if startNs > endNs { + return fmt.Errorf("start cannot be after end") + } + + store, err := getStore(ctx) + if err != nil { + return err + } + result, err := store.ExportTokenForwardsForExternalNode(context.Background(), startNs, endNs, nodeId, peerId) + if err != nil { + return err + } + + j, err := json.Marshal(result) + if err != nil { + return fmt.Errorf("failed to marshal json: %w", err) + } + + _, err = os.Stdout.Write(j) + return err +} diff --git a/cmd/lspd_revenue_cli/main.go b/cmd/lspd_revenue_cli/main.go new file mode 100644 index 00000000..d9e3fbd9 --- /dev/null +++ b/cmd/lspd_revenue_cli/main.go @@ -0,0 +1,29 @@ +package main + +import ( + "log" + "os" + + "github.com/breez/lspd/build" + "github.com/urfave/cli" +) + +func main() { + app := cli.NewApp() + app.Name = "lspd_revenue_cli" + app.Version = build.GetTag() + " commit=" + build.GetRevision() + app.Usage = "get revenue data from lspd" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "database-url", + Usage: "postgres database url for lspd", + Required: true, + }, + } + app.Commands = []cli.Command{ + exportForwardsCommand, + } + if err := app.Run(os.Args); err != nil { + log.Fatal(err) + } +} diff --git a/go.mod b/go.mod index ddb09b76..5303892e 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/lightningnetwork/lnd v0.17.2-beta github.com/lightningnetwork/lnd/tlv v1.1.1 github.com/stretchr/testify v1.8.4 + github.com/urfave/cli v1.22.14 go.starlark.net v0.0.0-20231101134539-556fd59b42f6 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.3.0 @@ -33,6 +34,7 @@ require ( github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect github.com/ethereum/go-ethereum v1.13.5 // indirect @@ -59,6 +61,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect gitlab.com/yawning/bsaes.git v0.0.0-20190805113838-0a714cd429ec // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1 // indirect diff --git a/go.sum b/go.sum index 0badaf86..93bb3cb9 100644 --- a/go.sum +++ b/go.sum @@ -287,9 +287,11 @@ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzA github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpu/goacmedns v0.0.2/go.mod h1:4MipLkI+qScwqtVxcNO6okBhbgRrr7/tKXUSgSL0teQ= +github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80/go.mod h1:gzbVz57IDJgQ9rLQwfSk696JGWof8ftznEL9GoAv3NI= github.com/crate-crypto/go-kzg-4844 v0.7.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= @@ -1012,8 +1014,10 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= +github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= @@ -1105,6 +1109,8 @@ github.com/ulikunitz/xz v0.5.7/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oW github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= +github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/urfave/cli/v2 v2.10.2/go.mod h1:f8iq5LtQ/bLxafbdBSLPPNsgaW0l/2fYYEHhAyPlwvo= github.com/urfave/cli/v2 v2.24.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= diff --git a/postgresql/revenue_cli_store.go b/postgresql/revenue_cli_store.go new file mode 100644 index 00000000..8dd8fadd --- /dev/null +++ b/postgresql/revenue_cli_store.go @@ -0,0 +1,166 @@ +package postgresql + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type RevenueCliStore struct { + pool *pgxpool.Pool +} + +func NewCliStore(pool *pgxpool.Pool) *RevenueCliStore { + return &RevenueCliStore{ + pool: pool, + } +} + +// This CTE selects all channels that were opened associated to a token. +const tokenChannelsCte = ` +WITH token_channels AS ( + SELECT p.tag::json->>'apiKeyHash' AS token + , c.nodeid + , c.peerid + , c.funding_tx_id + , c.funding_tx_outnum + , c.alias_scid + , c.confirmed_scid + , p.incoming_amount_msat - p.outgoing_amount_msat AS channel_fee_msat + FROM public.payments p + INNER JOIN public.channels c + ON p.funding_tx_id = c.funding_tx_id + AND p.funding_tx_outnum = c.funding_tx_outnum + WHERE p.tag IS NOT NULL + UNION ALL + SELECT r.token + , c.nodeid + , c.peerid + , c.funding_tx_id + , c.funding_tx_outnum + , c.alias_scid + , c.confirmed_scid + , b.fee_msat AS channel_fee_msat + FROM lsps2.bought_channels b + INNER JOIN lsps2.buy_registrations r + ON b.registration_id = r.id + INNER JOIN public.channels c + ON b.funding_tx_id = c.funding_tx_id + AND b.funding_tx_outnum = c.funding_tx_outnum +)` + +type ExportedForward struct { + Token string + NodeId []byte + ExternalNodeId []byte + ResolvedTime time.Time + // Direction is 'send' if the client associated to the token sent a payment. + // Direction is 'receive' if the client associated to the token sent a payment. + Direction string + // The amount forwarded to/from the external node + AmountMsat uint64 +} + +func (s *RevenueCliStore) ExportTokenForwardsForExternalNode( + ctx context.Context, + startNs uint64, + endNs uint64, + node []byte, + externalNode []byte, +) ([]*ExportedForward, error) { + err := s.sanityCheck(ctx, startNs, endNs) + if err != nil { + return nil, err + } + + rows, err := s.pool.Query( + ctx, tokenChannelsCte+` + SELECT * FROM ( + SELECT 'send' AS direction + , c_in.token + , h.resolved_time + , h.amt_msat_out AS amt_msat + FROM public.forwarding_history h + INNER JOIN public.channels c_out + ON h.nodeid = c_out.nodeid + AND (h.chanid_out = c_out.confirmed_scid OR h.chanid_out = c_out.alias_scid) + INNER JOIN token_channels c_in + ON h.nodeid = c_in.nodeid + AND (h.chanid_in = c_in.confirmed_scid OR h.chanid_in = c_in.alias_scid) + WHERE h.nodeid = $1 AND c_out.peerid = $2 AND h.resolved_time >= $3 AND h.resolved_time < $4 + UNION ALL + SELECT 'receive' AS direction + , c_out.token + , h.resolved_time + , h.amt_msat_in AS amt_msat + FROM public.forwarding_history h + INNER JOIN token_channels c_out + ON h.nodeid = c_out.nodeid + AND (h.chanid_out = c_out.confirmed_scid OR h.chanid_out = c_out.alias_scid) + INNER JOIN public.channels c_in + ON h.nodeid = c_in.nodeid + AND (h.chanid_in = c_in.confirmed_scid OR h.chanid_in = c_in.alias_scid) + WHERE h.nodeid = $1 AND c_in.peerid = $2 AND h.resolved_time >= $3 AND h.resolved_time < $4 + ) + ORDER BY resolved_time DESC + `, + node, + externalNode, + startNs, + endNs, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + result := make([]*ExportedForward, rows.CommandTag().RowsAffected()) + for rows.Next() { + var direction string + var token string + var resolved_time int64 + var amt_msat int64 + err = rows.Scan(&direction, &token, &resolved_time, &amt_msat) + if err != nil { + return nil, fmt.Errorf("rows.Scan err: %w", err) + } + + result = append(result, &ExportedForward{ + Token: token, + NodeId: node, + ExternalNodeId: externalNode, + ResolvedTime: time.Unix(0, resolved_time), + Direction: direction, + AmountMsat: uint64(amt_msat), + }) + } + + return result, nil +} + +func (s *RevenueCliStore) sanityCheck(ctx context.Context, startNs, endNs uint64) error { + // Sanity check, does forward/channel sync work? Can all forwards be associated to a channel? + row := s.pool.QueryRow(ctx, ` + SELECT COUNT(*) + FROM forwarding_history h + LEFT JOIN channels c_in + ON h.nodeid = c_in.nodeid AND (h.chanid_in = c_in.confirmed_scid OR h.chanid_in = c_in.alias_scid) + LEFT JOIN channels c_out + ON h.nodeid = c_out.nodeid AND (h.chanid_out = c_out.confirmed_scid OR h.chanid_out = c_out.alias_scid) + WHERE h.resolved_time >= $1 AND h.resolved_time < $2 + AND (c_in.nodeid IS NULL OR c_out.nodeid IS NULL) + `, startNs, endNs) + var count int64 + err := row.Scan(&count) + if err != nil { + return fmt.Errorf("failed to do sanity check: %w", err) + } + if count > 0 { + return fmt.Errorf("%d local forwards in the selected time range could not be associated to their channels. Is forward/channel sync working?", count) + } + + return nil +} +