diff --git a/docs/release-notes/release-notes-0.8.0.md b/docs/release-notes/release-notes-0.8.0.md index 5ad8bb717..052c8a409 100644 --- a/docs/release-notes/release-notes-0.8.0.md +++ b/docs/release-notes/release-notes-0.8.0.md @@ -45,6 +45,10 @@ instance has a fully functional RFQ pipeline and surfaces configuration or lnd-level conflicts immediately. +- [RFQ buy/sell accepts are now written to the database](https://github.com/lightninglabs/taproot-assets/pull/1863) + `rfq_policies` table whenever a policy is agreed, giving us an audit trail + and keeping quotes alive across restarts. + ## RPC Updates - [PR#1841](https://github.com/lightninglabs/taproot-assets/pull/1841): Remove diff --git a/itest/rfq_test.go b/itest/rfq_test.go index ff7ecc3eb..0471ab4ec 100644 --- a/itest/rfq_test.go +++ b/itest/rfq_test.go @@ -228,6 +228,19 @@ func testRfqAssetBuyHtlcIntercept(t *harnessTest) { expectedAssetID := mintedAssetId require.Equal(t.t, expectedAssetID, actualAssetID) + // Restart Bob's tapd to ensure the accepted quote policy survives a + // restart and is restored. + require.NoError(t.t, ts.BobTapd.stop(false)) + require.NoError(t.t, ts.BobTapd.start(false)) + + // Carol should still see the accepted quote after Bob's restart. + acceptedQuotes, err = ts.CarolTapd.QueryPeerAcceptedQuotes( + ctxt, &rfqrpc.QueryPeerAcceptedQuotesRequest{}, + ) + require.NoError(t.t, err) + require.Len(t.t, acceptedQuotes.BuyQuotes, 1) + acceptedQuote = acceptedQuotes.BuyQuotes[0] + // Carol will now use the accepted quote (received from Bob) to create // a lightning invoice which will be given to and settled by Alice. // diff --git a/rfq/interface.go b/rfq/interface.go new file mode 100644 index 000000000..3ab85d9a4 --- /dev/null +++ b/rfq/interface.go @@ -0,0 +1,20 @@ +package rfq + +import ( + "context" + + "github.com/lightninglabs/taproot-assets/rfqmsg" +) + +// PolicyStore abstracts persistence of RFQ policies. +type PolicyStore interface { + // StoreSalePolicy stores an asset sale policy. + StoreSalePolicy(ctx context.Context, accept rfqmsg.BuyAccept) error + + // StorePurchasePolicy stores an asset purchase policy. + StorePurchasePolicy(ctx context.Context, accept rfqmsg.SellAccept) error + + // FetchAcceptedQuotes fetches all accepted buy and sell quotes. + FetchAcceptedQuotes(ctx context.Context) ([]rfqmsg.BuyAccept, + []rfqmsg.SellAccept, error) +} diff --git a/rfq/manager.go b/rfq/manager.go index 98dbcd0ee..04723e98a 100644 --- a/rfq/manager.go +++ b/rfq/manager.go @@ -118,6 +118,9 @@ type ManagerCfg struct { // helps us communicate custom feature bits with our peer. AuxChanNegotiator *tapfeatures.AuxChannelNegotiator + // PolicyStore provides persistence for agreed RFQ policies. + PolicyStore PolicyStore + // AcceptPriceDeviationPpm is the price deviation in // parts per million that is accepted by the RFQ negotiator. // @@ -179,34 +182,6 @@ type Manager struct { // events. acceptHtlcEvents chan *AcceptHtlcEvent - // peerAcceptedBuyQuotes holds buy quotes for assets that our node has - // requested and that have been accepted by peer nodes. These quotes are - // exclusively used by our node for the acquisition of assets, as they - // represent agreed-upon terms for purchase transactions with our peers. - peerAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] - - // peerAcceptedSellQuotes holds sell quotes for assets that our node has - // requested and that have been accepted by peer nodes. These quotes are - // exclusively used by our node for the sale of assets, as they - // represent agreed-upon terms for sale transactions with our peers. - peerAcceptedSellQuotes lnutils.SyncMap[ - SerialisedScid, rfqmsg.SellAccept, - ] - - // localAcceptedBuyQuotes holds buy quotes for assets that our node has - // accepted and that have been requested by peer nodes. These quotes are - // exclusively used by our node for the acquisition of assets, as they - // represent agreed-upon terms for purchase transactions with our peers. - localAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] - - // localAcceptedSellQuotes holds sell quotes for assets that our node - // has accepted and that have been requested by peer nodes. These quotes - // are exclusively used by our node for the sale of assets, as they - // represent agreed-upon terms for sale transactions with our peers. - localAcceptedSellQuotes lnutils.SyncMap[ - SerialisedScid, rfqmsg.SellAccept, - ] - // groupKeyLookupCache is a map that helps us quickly perform an // in-memory look up of the group an asset belongs to. Since this // information is static and generated during minting, it is not @@ -234,10 +209,6 @@ func NewManager(cfg ManagerCfg) (*Manager, error) { outgoingMessages: make(chan rfqmsg.OutgoingMsg), acceptHtlcEvents: make(chan *AcceptHtlcEvent), - peerAcceptedBuyQuotes: lnutils.SyncMap[ - SerialisedScid, rfqmsg.BuyAccept]{}, - peerAcceptedSellQuotes: lnutils.SyncMap[ - SerialisedScid, rfqmsg.SellAccept]{}, subscribers: lnutils.SyncMap[ uint64, *fn.EventReceiver[fn.Event]]{}, @@ -265,13 +236,14 @@ func (m *Manager) startSubsystems(ctx context.Context) error { NoOpHTLCs: m.cfg.NoOpHTLCs, AuxChanNegotiator: m.cfg.AuxChanNegotiator, ErrChan: m.subsystemErrChan, + PolicyStore: m.cfg.PolicyStore, }) if err != nil { return fmt.Errorf("error initializing RFQ order handler: %w", err) } - if err := m.orderHandler.Start(); err != nil { + if err := m.orderHandler.Start(ctx); err != nil { return fmt.Errorf("unable to start RFQ order handler: %w", err) } @@ -436,7 +408,7 @@ func (m *Manager) handleIncomingMessage(incomingMsg rfqmsg.IncomingMsg) error { // quote so that it can be used to send a payment by our // lightning node. scid := msg.ShortChannelId() - m.peerAcceptedBuyQuotes.Store(scid, msg) + m.orderHandler.peerBuyQuotes.Store(scid, msg) // Since we're going to buy assets from our peer, we // need to make sure we can identify the incoming asset @@ -489,7 +461,7 @@ func (m *Manager) handleIncomingMessage(incomingMsg rfqmsg.IncomingMsg) error { // quote so that it can be used to send a payment by our // lightning node. scid := msg.ShortChannelId() - m.peerAcceptedSellQuotes.Store(scid, msg) + m.orderHandler.peerSellQuotes.Store(scid, msg) // Notify subscribers of the incoming peer accepted // asset sell quote. @@ -523,16 +495,16 @@ func (m *Manager) handleOutgoingMessage(outgoingMsg rfqmsg.OutgoingMsg) error { // we inform our peer of our decision, we inform the order // handler that we are willing to sell the asset subject to a // sale policy. - m.orderHandler.RegisterAssetSalePolicy(*msg) - - // We want to store that we accepted the buy quote, in case we - // need to look it up for a direct peer payment. - m.localAcceptedBuyQuotes.Store(msg.ShortChannelId(), *msg) + err := m.orderHandler.RegisterAssetSalePolicy(*msg) + if err != nil { + return fmt.Errorf("registering asset sale "+ + "policy: %w", err) + } // Since our peer is going to buy assets from us, we need to // make sure we can identify the forwarded asset payment by the // outgoing SCID alias within the onion packet. - err := m.addScidAlias( + err = m.addScidAlias( uint64(msg.ShortChannelId()), msg.Request.AssetSpecifier, msg.Peer, ) @@ -546,11 +518,11 @@ func (m *Manager) handleOutgoingMessage(outgoingMsg rfqmsg.OutgoingMsg) error { // we inform our peer of our decision, we inform the order // handler that we are willing to buy the asset subject to a // purchase policy. - m.orderHandler.RegisterAssetPurchasePolicy(*msg) - - // We want to store that we accepted the sell quote, in case we - // need to look it up for a direct peer payment. - m.localAcceptedSellQuotes.Store(msg.ShortChannelId(), *msg) + err := m.orderHandler.RegisterAssetPurchasePolicy(*msg) + if err != nil { + return fmt.Errorf("registering asset purchase "+ + "policy: %w", err) + } } // Send the outgoing message to the peer. @@ -843,10 +815,10 @@ func (m *Manager) PeerAcceptedBuyQuotes() BuyAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. buyQuotesCopy := make(map[SerialisedScid]rfqmsg.BuyAccept) - m.peerAcceptedBuyQuotes.ForEach( + m.orderHandler.peerBuyQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.BuyAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.peerAcceptedBuyQuotes.Delete(scid) + m.orderHandler.peerBuyQuotes.Delete(scid) return nil } @@ -865,10 +837,10 @@ func (m *Manager) PeerAcceptedSellQuotes() SellAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. sellQuotesCopy := make(map[SerialisedScid]rfqmsg.SellAccept) - m.peerAcceptedSellQuotes.ForEach( + m.orderHandler.peerSellQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.SellAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.peerAcceptedSellQuotes.Delete(scid) + m.orderHandler.peerSellQuotes.Delete(scid) return nil } @@ -887,10 +859,10 @@ func (m *Manager) LocalAcceptedBuyQuotes() BuyAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. buyQuotesCopy := make(map[SerialisedScid]rfqmsg.BuyAccept) - m.localAcceptedBuyQuotes.ForEach( + m.orderHandler.localBuyQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.BuyAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.localAcceptedBuyQuotes.Delete(scid) + m.orderHandler.localBuyQuotes.Delete(scid) return nil } @@ -909,10 +881,10 @@ func (m *Manager) LocalAcceptedSellQuotes() SellAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. sellQuotesCopy := make(map[SerialisedScid]rfqmsg.SellAccept) - m.localAcceptedSellQuotes.ForEach( + m.orderHandler.localSellQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.SellAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.localAcceptedSellQuotes.Delete(scid) + m.orderHandler.localSellQuotes.Delete(scid) return nil } diff --git a/rfq/manager_test.go b/rfq/manager_test.go index f6b95b066..0006cce1e 100644 --- a/rfq/manager_test.go +++ b/rfq/manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/lightninglabs/taproot-assets/address" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/proof" + "github.com/lightninglabs/taproot-assets/rfqmsg" tpchmsg "github.com/lightninglabs/taproot-assets/tapchannelmsg" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/routing/route" @@ -59,6 +60,26 @@ var ( peer2 = route.Vertex{77} ) +type mockPolicyStore struct{} + +func (mockPolicyStore) StoreSalePolicy(context.Context, + rfqmsg.BuyAccept) error { + + return nil +} + +func (mockPolicyStore) StorePurchasePolicy(context.Context, + rfqmsg.SellAccept) error { + + return nil +} + +func (mockPolicyStore) FetchAcceptedQuotes(context.Context) ( + []rfqmsg.BuyAccept, []rfqmsg.SellAccept, error) { + + return nil, nil, nil +} + // GroupLookupMock mocks the GroupLookup interface that is required by the // rfq manager to check asset IDs against asset specifiers. type GroupLookupMock struct{} @@ -141,6 +162,7 @@ func assertComputeChannelAssetBalance(t *testing.T, mockGroupLookup := &GroupLookupMock{} cfg := ManagerCfg{ GroupLookup: mockGroupLookup, + PolicyStore: mockPolicyStore{}, } manager, err := NewManager(cfg) require.NoError(t, err) diff --git a/rfq/order.go b/rfq/order.go index 9995f2104..182a6e4af 100644 --- a/rfq/order.go +++ b/rfq/order.go @@ -711,6 +711,9 @@ type OrderHandlerCfg struct { // ErrChan is the main error channel that is used to propagate critical // errors back to the parent manager/server. ErrChan chan<- error + + // PolicyStore persists agreed RFQ policies. + PolicyStore PolicyStore } // OrderHandler orchestrates management of accepted quote bundles. It monitors @@ -723,10 +726,41 @@ type OrderHandler struct { // cfg holds the configuration parameters for the RFQ order handler. cfg OrderHandlerCfg + // policyStore provides persistence for agreed policies. + policyStore PolicyStore + // policies is a map of serialised short channel IDs (SCIDs) to // associated asset transaction policies. policies lnutils.SyncMap[SerialisedScid, Policy] + // peerBuyQuotes holds buy quotes for assets that our node has + // requested and that have been accepted by peer nodes. These quotes are + // exclusively used by our node for the acquisition of assets, as they + // represent agreed-upon terms for purchase transactions with our peers. + peerBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] + + // peerSellQuotes holds sell quotes for assets that our node has + // requested and that have been accepted by peer nodes. These quotes are + // exclusively used by our node for the sale of assets, as they + // represent agreed-upon terms for sale transactions with our peers. + peerSellQuotes lnutils.SyncMap[ + SerialisedScid, rfqmsg.SellAccept, + ] + + // localBuyQuotes holds buy quotes for assets that our node has + // accepted and that have been requested by peer nodes. These quotes are + // exclusively used by our node for the acquisition of assets, as they + // represent agreed-upon terms for purchase transactions with our peers. + localBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] + + // localSellQuotes holds sell quotes for assets that our node + // has accepted and that have been requested by peer nodes. These quotes + // are exclusively used by our node for the sale of assets, as they + // represent agreed-upon terms for sale transactions with our peers. + localSellQuotes lnutils.SyncMap[ + SerialisedScid, rfqmsg.SellAccept, + ] + // htlcToPolicy maps an HTLC circuit key to the policy that applies to // it. We need this map because for failed HTLCs we don't have the RFQ // data available, so we need to cache this info. @@ -740,8 +774,9 @@ type OrderHandler struct { // NewOrderHandler creates a new struct instance. func NewOrderHandler(cfg OrderHandlerCfg) (*OrderHandler, error) { return &OrderHandler{ - cfg: cfg, - policies: lnutils.SyncMap[SerialisedScid, Policy]{}, + cfg: cfg, + policyStore: cfg.PolicyStore, + policies: lnutils.SyncMap[SerialisedScid, Policy]{}, ContextGuard: &fn.ContextGuard{ DefaultTimeout: DefaultTimeout, Quit: make(chan struct{}), @@ -853,7 +888,6 @@ func (h *OrderHandler) mainEventLoop() { log.Debug("Cleaning up any stale policy from the " + "order handler") h.cleanupStalePolicies() - case <-h.Quit: log.Debug("Received quit signal. Stopping negotiator " + "event loop") @@ -919,9 +953,17 @@ func (h *OrderHandler) subscribeHtlcs(ctx context.Context) error { } // Start starts the service. -func (h *OrderHandler) Start() error { +func (h *OrderHandler) Start(ctx context.Context) error { var startErr error + h.startOnce.Do(func() { + startErr = h.restorePersistedPolicies(ctx) + if startErr != nil { + log.Errorf("error restoring persisted RFQ "+ + "policies: %w", startErr) + return + } + // Start the main event loop in a separate goroutine. h.Wg.Add(1) go func() { @@ -984,7 +1026,9 @@ func (h *OrderHandler) ReportMainChanError(err error) { // RegisterAssetSalePolicy generates and registers an asset sale policy with the // order handler. This function takes an outgoing buy accept message as an // argument. -func (h *OrderHandler) RegisterAssetSalePolicy(buyAccept rfqmsg.BuyAccept) { +func (h *OrderHandler) RegisterAssetSalePolicy( + buyAccept rfqmsg.BuyAccept) error { + log.Debugf("Order handler is registering an asset sale policy given a "+ "buy accept message: %s", buyAccept.String()) @@ -992,20 +1036,84 @@ func (h *OrderHandler) RegisterAssetSalePolicy(buyAccept rfqmsg.BuyAccept) { buyAccept, h.cfg.NoOpHTLCs, h.cfg.AuxChanNegotiator, ) + ctx, cancel := h.WithCtxQuit() + defer cancel() + + err := h.policyStore.StoreSalePolicy(ctx, buyAccept) + if err != nil { + return fmt.Errorf("unable to persist asset sale policy "+ + "(id=%x): %w", + buyAccept.ID[:], err) + } + h.policies.Store(policy.AcceptedQuoteId.Scid(), policy) + + // We want to store that we accepted the buy quote, in case we + // need to look it up for a direct peer payment. + h.localBuyQuotes.Store(buyAccept.ShortChannelId(), buyAccept) + + return nil } // RegisterAssetPurchasePolicy generates and registers an asset buy policy with the // order handler. This function takes an incoming sell accept message as an // argument. func (h *OrderHandler) RegisterAssetPurchasePolicy( - sellAccept rfqmsg.SellAccept) { + sellAccept rfqmsg.SellAccept) error { log.Debugf("Order handler is registering an asset buy policy given a "+ "sell accept message: %s", sellAccept.String()) policy := NewAssetPurchasePolicy(sellAccept) + + ctx, cancel := h.WithCtxQuit() + defer cancel() + + err := h.policyStore.StorePurchasePolicy(ctx, sellAccept) + if err != nil { + return fmt.Errorf("unable to persist asset purchase policy "+ + "(id=%x): %w", sellAccept.ID[:], err) + } + h.policies.Store(policy.scid, policy) + + // We want to store that we accepted the sell quote, in case we + // need to look it up for a direct peer payment. + h.localSellQuotes.Store(sellAccept.ShortChannelId(), sellAccept) + + return nil +} + +// restorePersistedPolicies restores persisted policies from the policy store. +func (h *OrderHandler) restorePersistedPolicies(ctx context.Context) error { + buyAccepts, sellAccepts, err := h.cfg.PolicyStore.FetchAcceptedQuotes( + ctx, + ) + if err != nil { + return fmt.Errorf("error fetching persisted policies: %w", err) + } + + for _, accept := range buyAccepts { + policy := NewAssetSalePolicy( + accept, h.cfg.NoOpHTLCs, h.cfg.AuxChanNegotiator, + ) + h.policies.Store(policy.AcceptedQuoteId.Scid(), policy) + } + + for _, accept := range sellAccepts { + policy := NewAssetPurchasePolicy(accept) + h.policies.Store(policy.scid, policy) + } + + for _, accept := range buyAccepts { + h.localBuyQuotes.Store(accept.ShortChannelId(), accept) + } + + for _, accept := range sellAccepts { + h.localSellQuotes.Store(accept.ShortChannelId(), accept) + } + + return nil } // fetchPolicy fetches a policy which is relevant to a given HTLC. If a policy @@ -1076,12 +1184,10 @@ func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy, outgoingPolicy := outPolicy if incomingPolicy.HasExpired() { - scid := incomingPolicy.Scid() - h.policies.Delete(SerialisedScid(scid)) + h.policies.Delete(inScid) } if outgoingPolicy.HasExpired() { - scid := outgoingPolicy.Scid() - h.policies.Delete(SerialisedScid(scid)) + h.policies.Delete(SerialisedScid(outgoingPolicy.Scid())) } // If either the incoming or outgoing policy has expired, we diff --git a/rfqmsg/buy_accept.go b/rfqmsg/buy_accept.go index 54a3e87d1..5f0acb4a5 100644 --- a/rfqmsg/buy_accept.go +++ b/rfqmsg/buy_accept.go @@ -34,10 +34,17 @@ type BuyAccept struct { // sig is a signature over the serialized contents of the message. sig [64]byte + + // AgreedAt is the time at which the quote was accepted. Represents the + // time the wire message was parsed or accept message was generated. + AgreedAt time.Time } // NewBuyAcceptFromRequest creates a new instance of a quote accept message -// given a quote request message. +// given a quote request message. Note that this function sets the AgreedAt +// timestamp to the current time. If callers need to preserve an existing +// AgreedAt value (e.g., when reconstructing from storage), they should +// manually construct the BuyAccept. func NewBuyAcceptFromRequest(request BuyRequest, assetRate AssetRate) *BuyAccept { @@ -47,6 +54,7 @@ func NewBuyAcceptFromRequest(request BuyRequest, Version: latestBuyAcceptVersion, ID: request.ID, AssetRate: assetRate, + AgreedAt: time.Now().UTC(), } } @@ -74,6 +82,7 @@ func newBuyAcceptFromWireMsg(wireMsg WireMessage, ID: msgData.ID.Val, AssetRate: NewAssetRate(assetRate, expiry), sig: msgData.Sig.Val, + AgreedAt: time.Now().UTC(), }, nil } diff --git a/rfqmsg/sell_accept.go b/rfqmsg/sell_accept.go index 1e1701d97..0ebc9173a 100644 --- a/rfqmsg/sell_accept.go +++ b/rfqmsg/sell_accept.go @@ -34,10 +34,17 @@ type SellAccept struct { // sig is a signature over the serialized contents of the message. sig [64]byte + + // AgreedAt is the time at which the quote was accepted. Represents the + // time the wire message was parsed or the accept message was generated. + AgreedAt time.Time } // NewSellAcceptFromRequest creates a new instance of an asset sell quote accept -// message given an asset sell quote request message. +// message given an asset sell quote request message. Note that this function +// sets the AgreedAt timestamp to the current time. If callers need to preserve +// an existing AgreedAt value (e.g., when reconstructing from storage), +// they should manually construct the BuyAccept. func NewSellAcceptFromRequest(request SellRequest, assetRate AssetRate) *SellAccept { @@ -47,6 +54,7 @@ func NewSellAcceptFromRequest(request SellRequest, Version: latestSellAcceptVersion, ID: request.ID, AssetRate: assetRate, + AgreedAt: time.Now().UTC(), } } @@ -78,6 +86,7 @@ func newSellAcceptFromWireMsg(wireMsg WireMessage, ID: msgData.ID.Val, AssetRate: NewAssetRate(assetRate, expiry), sig: msgData.Sig.Val, + AgreedAt: time.Now().UTC(), }, nil } diff --git a/tapcfg/server.go b/tapcfg/server.go index 99f02aaa5..c50766f34 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -127,6 +127,13 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, lndservices.WithPsbtMaxFeeRatio(cfg.Wallet.PsbtMaxFeeRatio), ) + rfqPolicyDB := tapdb.NewTransactionExecutor( + db, func(tx *sql.Tx) tapdb.RfqPolicyStore { + return db.WithTx(tx) + }, + ) + policyStore := tapdb.NewPersistedPolicyStore(rfqPolicyDB) + // Create a block header cache with default configuration. headerCache, err := lndservices.NewBlockHeaderCache( lndservices.DefaultBlockHeaderCacheConfig(), @@ -526,6 +533,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, SendPriceHint: rfqCfg.SendPriceHint, SendPeerId: rfqCfg.PriceOracleSendPeerId, NoOpHTLCs: cfg.Channel.NoopHTLCs, + PolicyStore: policyStore, ErrChan: mainErrChan, }) if err != nil { diff --git a/tapdb/migrations.go b/tapdb/migrations.go index 4ff83484d..ab9d54edb 100644 --- a/tapdb/migrations.go +++ b/tapdb/migrations.go @@ -24,7 +24,7 @@ const ( // daemon. // // NOTE: This MUST be updated when a new migration is added. - LatestMigrationVersion = 48 + LatestMigrationVersion = 49 ) // DatabaseBackend is an interface that contains all methods our different diff --git a/tapdb/rfq_policies.go b/tapdb/rfq_policies.go new file mode 100644 index 000000000..ae5b8171b --- /dev/null +++ b/tapdb/rfq_policies.go @@ -0,0 +1,521 @@ +package tapdb + +import ( + "context" + "fmt" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/fn" + "github.com/lightninglabs/taproot-assets/rfqmath" + "github.com/lightninglabs/taproot-assets/rfqmsg" + "github.com/lightninglabs/taproot-assets/tapdb/sqlc" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) + +// RfqPolicyType denotes the type of a persisted RFQ policy. +type RfqPolicyType string + +const ( + // RfqPolicyTypeAssetSale identifies an asset sale policy. + RfqPolicyTypeAssetSale RfqPolicyType = "RFQ_POLICY_TYPE_SALE" + + // RfqPolicyTypeAssetPurchase identifies an asset purchase policy. + RfqPolicyTypeAssetPurchase RfqPolicyType = "RFQ_POLICY_TYPE_PURCHASE" +) + +// String converts the policy type to its string representation. +func (t RfqPolicyType) String() string { + return string(t) +} + +// rfqPolicy is the database model for an RFQ policy. It contains all the +// necessary fields to reconstruct a BuyAccept or SellAccept message. +type rfqPolicy struct { + // PolicyType denotes the type of the policy (buy or sell). + PolicyType RfqPolicyType + + // Scid is the short channel ID associated with the policy. + Scid uint64 + + // RfqID is the unique identifier for the RFQ session. + RfqID [32]byte + + // Peer is the public key of the peer node. + Peer [33]byte + + // AssetID is the optional specific asset ID. + AssetID *[32]byte + + // AssetGroupKey is the optional asset group key. + AssetGroupKey *[33]byte + + // RateCoefficient is the coefficient of the exchange rate. + RateCoefficient []byte + + // RateScale is the scale of the exchange rate. + RateScale uint8 + + // ExpiryUnix is the expiration timestamp of the policy. + ExpiryUnix uint64 + + // MaxOutAssetAmt is the maximum asset amount for sale policies. + MaxOutAssetAmt *uint64 + + // PaymentMaxMsat is the maximum payment amount for purchase policies. + PaymentMaxMsat *int64 + + // RequestAssetMaxAmt is the requested maximum asset amount. + RequestAssetMaxAmt *uint64 + + // RequestPaymentMaxMsat is the requested maximum payment amount. + RequestPaymentMaxMsat *int64 + + // PriceOracleMetadata contains metadata about the price oracle. + PriceOracleMetadata string + + // RequestVersion is the version of the RFQ request. + RequestVersion *uint32 + + // AgreedAt is the timestamp when the policy was agreed upon. + AgreedAt time.Time +} + +// RfqPolicyStore is the database interface for RFQ policies. +type RfqPolicyStore interface { + // InsertRfqPolicy inserts a new RFQ policy into the database. + InsertRfqPolicy(context.Context, + sqlc.InsertRfqPolicyParams) (int64, error) + + // FetchActiveRfqPolicies retrieves all active RFQ policies from the + // database. + FetchActiveRfqPolicies(context.Context, int64) ([]sqlc.RfqPolicy, error) +} + +// BatchedRfqPolicyStore supports batched database operations. +type BatchedRfqPolicyStore interface { + RfqPolicyStore + BatchedTx[RfqPolicyStore] +} + +// PersistedPolicyStore offers helpers to persist and load RFQ policies. +type PersistedPolicyStore struct { + db BatchedRfqPolicyStore +} + +// NewPersistedPolicyStore creates a new policy persistence helper. +func NewPersistedPolicyStore(db BatchedRfqPolicyStore) *PersistedPolicyStore { + return &PersistedPolicyStore{ + db: db, + } +} + +// StoreSalePolicy persists a buy-accept policy. +func (s *PersistedPolicyStore) StoreSalePolicy(ctx context.Context, + acpt rfqmsg.BuyAccept) error { + + assetID, groupKey := specifierPointers(acpt.Request.AssetSpecifier) + rateBytes := coefficientBytes(acpt.AssetRate.Rate) + expiry := acpt.AssetRate.Expiry.UTC() + + record := rfqPolicy{ + PolicyType: RfqPolicyTypeAssetSale, + Scid: uint64(acpt.ShortChannelId()), + RfqID: rfqIDArray(acpt.ID), + Peer: serializePeer(acpt.Peer), + AssetID: assetID, + AssetGroupKey: groupKey, + RateCoefficient: rateBytes, + RateScale: acpt.AssetRate.Rate.Scale, + ExpiryUnix: uint64(expiry.Unix()), + MaxOutAssetAmt: fn.Ptr(acpt.Request.AssetMaxAmt), + RequestAssetMaxAmt: fn.Ptr(acpt.Request.AssetMaxAmt), + PriceOracleMetadata: acpt.Request.PriceOracleMetadata, + RequestVersion: fn.Ptr(uint32(acpt.Request.Version)), + AgreedAt: acpt.AgreedAt.UTC(), + } + + return s.storePolicy(ctx, record) +} + +// StorePurchasePolicy persists a sell-accept policy. +func (s *PersistedPolicyStore) StorePurchasePolicy(ctx context.Context, + acpt rfqmsg.SellAccept) error { + + assetID, groupKey := specifierPointers(acpt.Request.AssetSpecifier) + rateBytes := coefficientBytes(acpt.AssetRate.Rate) + expiry := acpt.AssetRate.Expiry.UTC() + paymentMax := int64(acpt.Request.PaymentMaxAmt) + + record := rfqPolicy{ + PolicyType: RfqPolicyTypeAssetPurchase, + Scid: uint64(acpt.ShortChannelId()), + RfqID: rfqIDArray(acpt.ID), + Peer: serializePeer(acpt.Peer), + AssetID: assetID, + AssetGroupKey: groupKey, + RateCoefficient: rateBytes, + RateScale: acpt.AssetRate.Rate.Scale, + ExpiryUnix: uint64(expiry.Unix()), + PaymentMaxMsat: fn.Ptr(paymentMax), + RequestPaymentMaxMsat: fn.Ptr(paymentMax), + PriceOracleMetadata: acpt.Request.PriceOracleMetadata, + RequestVersion: fn.Ptr(uint32(acpt.Request.Version)), + AgreedAt: acpt.AgreedAt.UTC(), + } + + return s.storePolicy(ctx, record) +} + +func (s *PersistedPolicyStore) storePolicy(ctx context.Context, + policy rfqPolicy) error { + + writeOpts := WriteTxOption() + return s.db.ExecTx(ctx, writeOpts, func(q RfqPolicyStore) error { + _, err := q.InsertRfqPolicy(ctx, newInsertParams(policy)) + if err != nil { + return fmt.Errorf("error inserting RFQ policy: %w", err) + } + + return nil + }) +} + +// FetchAcceptedQuotes retrieves all non-expired policies from the database and +// returns them as buy and sell accepts. +func (s *PersistedPolicyStore) FetchAcceptedQuotes(ctx context.Context) ( + []rfqmsg.BuyAccept, []rfqmsg.SellAccept, error) { + + readOpts := ReadTxOption() + var ( + buyAccepts []rfqmsg.BuyAccept + sellAccepts []rfqmsg.SellAccept + ) + now := time.Now().UTC() + + err := s.db.ExecTx(ctx, readOpts, func(q RfqPolicyStore) error { + rows, err := q.FetchActiveRfqPolicies(ctx, now.Unix()) + if err != nil { + return fmt.Errorf("error fetching policies: %w", err) + } + + for _, row := range rows { + policy := policyFromRow(row) + + switch policy.PolicyType { + case RfqPolicyTypeAssetSale: + accept, err := buyAcceptFromStored(policy) + if err != nil { + return fmt.Errorf("error restoring "+ + "sale policy: %w", err) + } + buyAccepts = append(buyAccepts, accept) + + case RfqPolicyTypeAssetPurchase: + accept, err := sellAcceptFromStored(policy) + if err != nil { + return fmt.Errorf("error restoring "+ + "purchase policy: %w", err) + } + sellAccepts = append(sellAccepts, accept) + + default: + // This should never happen by assertion. + return fmt.Errorf("unknown policy type: %s", + policy.PolicyType) + } + } + + return nil + }) + if err != nil { + return nil, nil, err + } + + return buyAccepts, sellAccepts, nil +} + +// newInsertParams creates the parameters for inserting an RFQ policy into the +// database. +func newInsertParams(policy rfqPolicy) sqlc.InsertRfqPolicyParams { + params := sqlc.InsertRfqPolicyParams{ + PolicyType: policy.PolicyType.String(), + Scid: int64(policy.Scid), + RfqID: policy.RfqID[:], + Peer: policy.Peer[:], + RateCoefficient: append([]byte(nil), policy.RateCoefficient...), + RateScale: int32(policy.RateScale), + Expiry: int64(policy.ExpiryUnix), + AgreedAt: policy.AgreedAt.Unix(), + } + + if policy.AssetID != nil { + params.AssetID = policy.AssetID[:] + } + + if policy.AssetGroupKey != nil { + params.AssetGroupKey = policy.AssetGroupKey[:] + } + + if policy.MaxOutAssetAmt != nil { + params.MaxOutAssetAmt = sqlPtrInt64(policy.MaxOutAssetAmt) + params.RequestAssetMaxAmt = sqlPtrInt64(policy.MaxOutAssetAmt) + } + + if policy.PaymentMaxMsat != nil { + params.PaymentMaxMsat = sqlPtrInt64(policy.PaymentMaxMsat) + params.RequestPaymentMaxMsat = sqlPtrInt64( + policy.PaymentMaxMsat, + ) + } + + if policy.RequestAssetMaxAmt != nil { + params.RequestAssetMaxAmt = sqlPtrInt64( + policy.RequestAssetMaxAmt, + ) + } + + if policy.RequestPaymentMaxMsat != nil { + params.RequestPaymentMaxMsat = sqlPtrInt64( + policy.RequestPaymentMaxMsat, + ) + } + + params.PriceOracleMetadata = sqlStr(policy.PriceOracleMetadata) + params.RequestVersion = sqlPtrInt32(policy.RequestVersion) + + return params +} + +// policyFromRow converts a database row to an rfqPolicy struct. +func policyFromRow(row sqlc.RfqPolicy) rfqPolicy { + var ( + rfqID [32]byte + peer [33]byte + ) + copy(rfqID[:], row.RfqID) + copy(peer[:], row.Peer) + + var assetIDPtr *[32]byte + if len(row.AssetID) > 0 { + var id [32]byte + copy(id[:], row.AssetID) + assetIDPtr = &id + } + + var groupKeyPtr *[33]byte + if len(row.AssetGroupKey) > 0 { + var key [33]byte + copy(key[:], row.AssetGroupKey) + groupKeyPtr = &key + } + + policy := rfqPolicy{ + PolicyType: RfqPolicyType(row.PolicyType), + Scid: uint64(row.Scid), + RfqID: rfqID, + Peer: peer, + AssetID: assetIDPtr, + AssetGroupKey: groupKeyPtr, + RateCoefficient: append([]byte(nil), row.RateCoefficient...), + RateScale: uint8(row.RateScale), + ExpiryUnix: uint64(row.Expiry), + AgreedAt: time.Unix(row.AgreedAt, 0).UTC(), + } + + if row.PriceOracleMetadata.Valid { + policy.PriceOracleMetadata = row.PriceOracleMetadata.String + } + + policy.RequestVersion = extractSqlInt32Ptr[uint32](row.RequestVersion) + policy.MaxOutAssetAmt = extractSqlInt64Ptr[uint64](row.MaxOutAssetAmt) + policy.PaymentMaxMsat = extractSqlInt64Ptr[int64](row.PaymentMaxMsat) + policy.RequestAssetMaxAmt = extractSqlInt64Ptr[uint64]( + row.RequestAssetMaxAmt, + ) + policy.RequestPaymentMaxMsat = extractSqlInt64Ptr[int64]( + row.RequestPaymentMaxMsat, + ) + + return policy +} + +// specifierPointers extracts pointers to asset ID and group key from a +// specifier. +func specifierPointers(spec asset.Specifier) (*[32]byte, *[33]byte) { + var assetIDPtr *[32]byte + if id := spec.UnwrapIdToPtr(); id != nil { + assetID := new([32]byte) + copy(assetID[:], id[:]) + assetIDPtr = assetID + } + + var groupKeyPtr *[33]byte + if key := spec.UnwrapGroupKeyToPtr(); key != nil { + groupKey := new([33]byte) + copy(groupKey[:], key.SerializeCompressed()) + groupKeyPtr = groupKey + } + + return assetIDPtr, groupKeyPtr +} + +// coefficientBytes returns the bytes of the rate coefficient. +func coefficientBytes(rate rfqmath.BigIntFixedPoint) []byte { + coeff := rate.Coefficient.Bytes() + if len(coeff) == 0 { + return []byte{0} + } + + return append([]byte(nil), coeff...) +} + +// serializePeer serializes the peer public key. +func serializePeer(peer route.Vertex) [33]byte { + var peerBytes [33]byte + copy(peerBytes[:], peer[:]) + return peerBytes +} + +// rfqIDArray converts an RFQ ID to a byte array. +func rfqIDArray(id rfqmsg.ID) [32]byte { + var idBytes [32]byte + copy(idBytes[:], id[:]) + return idBytes +} + +// buyAcceptFromStored reconstructs a BuyAccept message from a stored policy. +func buyAcceptFromStored(row rfqPolicy) (rfqmsg.BuyAccept, error) { + spec, err := assetSpecifierFromStored(row) + if err != nil { + return rfqmsg.BuyAccept{}, err + } + + rate := rateFromStored(row) + + vertex := vertexFromBytes(row.Peer) + id := rfqIDFromBytes(row.RfqID) + + version := rfqmsg.V1 + if row.RequestVersion != nil { + version = rfqmsg.WireMsgDataVersion(*row.RequestVersion) + } + + assetMax := row.RequestAssetMaxAmt + if assetMax == nil { + assetMax = row.MaxOutAssetAmt + } + + expiry := time.Unix(int64(row.ExpiryUnix), 0).UTC() + + request := rfqmsg.BuyRequest{ + Peer: vertex, + Version: version, + ID: id, + AssetSpecifier: spec, + AssetMaxAmt: *assetMax, + AssetRateHint: fn.None[rfqmsg.AssetRate](), + PriceOracleMetadata: row.PriceOracleMetadata, + } + + return rfqmsg.BuyAccept{ + Peer: vertex, + Request: request, + Version: rfqmsg.V1, + ID: id, + AssetRate: rfqmsg.NewAssetRate(rate, expiry), + AgreedAt: row.AgreedAt, + }, nil +} + +// sellAcceptFromStored reconstructs a SellAccept message from a stored policy. +func sellAcceptFromStored(row rfqPolicy) (rfqmsg.SellAccept, error) { + spec, err := assetSpecifierFromStored(row) + if err != nil { + return rfqmsg.SellAccept{}, err + } + + rate := rateFromStored(row) + + vertex := vertexFromBytes(row.Peer) + id := rfqIDFromBytes(row.RfqID) + + version := rfqmsg.V1 + if row.RequestVersion != nil { + version = rfqmsg.WireMsgDataVersion(*row.RequestVersion) + } + + paymentPtr := row.RequestPaymentMaxMsat + if paymentPtr == nil { + paymentPtr = row.PaymentMaxMsat + } + + expiry := time.Unix(int64(row.ExpiryUnix), 0).UTC() + + request := rfqmsg.SellRequest{ + Peer: vertex, + Version: version, + ID: id, + AssetSpecifier: spec, + PaymentMaxAmt: lnwire.MilliSatoshi(*paymentPtr), + AssetRateHint: fn.None[rfqmsg.AssetRate](), + PriceOracleMetadata: row.PriceOracleMetadata, + } + + return rfqmsg.SellAccept{ + Peer: vertex, + Request: request, + Version: rfqmsg.V1, + ID: id, + AssetRate: rfqmsg.NewAssetRate(rate, expiry), + AgreedAt: row.AgreedAt, + }, nil +} + +// assetSpecifierFromStored reconstructs an asset specifier from a stored +// policy. +func assetSpecifierFromStored(row rfqPolicy) (asset.Specifier, error) { + var idPtr *asset.ID + if row.AssetID != nil { + var id asset.ID + copy(id[:], row.AssetID[:]) + idPtr = &id + } + + var groupKey *btcec.PublicKey + if row.AssetGroupKey != nil { + key, err := btcec.ParsePubKey(row.AssetGroupKey[:]) + if err != nil { + return asset.Specifier{}, fmt.Errorf("error parsing "+ + "group key: %w", err) + } + groupKey = key + } + + return asset.NewSpecifier(idPtr, groupKey, nil, true) +} + +// rateFromStored reconstructs the asset rate from a stored policy. +func rateFromStored(row rfqPolicy) rfqmath.BigIntFixedPoint { + coeff := rfqmath.BigInt{}.FromBytes(row.RateCoefficient) + return rfqmath.BigIntFixedPoint{ + Coefficient: coeff, + Scale: row.RateScale, + } +} + +// vertexFromBytes converts a byte array to a route.Vertex. +func vertexFromBytes(raw [33]byte) route.Vertex { + var vertex route.Vertex + copy(vertex[:], raw[:]) + return vertex +} + +// rfqIDFromBytes converts a byte array to an RFQ ID. +func rfqIDFromBytes(raw [32]byte) rfqmsg.ID { + var id rfqmsg.ID + copy(id[:], raw[:]) + return id +} diff --git a/tapdb/sqlc/migrations/000049_rfq_policies.down.sql b/tapdb/sqlc/migrations/000049_rfq_policies.down.sql new file mode 100644 index 000000000..852f0253e --- /dev/null +++ b/tapdb/sqlc/migrations/000049_rfq_policies.down.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS rfq_policies_rfq_id_idx; + +DROP TABLE IF EXISTS rfq_policies; diff --git a/tapdb/sqlc/migrations/000049_rfq_policies.up.sql b/tapdb/sqlc/migrations/000049_rfq_policies.up.sql new file mode 100644 index 000000000..98ea931ad --- /dev/null +++ b/tapdb/sqlc/migrations/000049_rfq_policies.up.sql @@ -0,0 +1,56 @@ +CREATE TABLE IF NOT EXISTS rfq_policies ( + id INTEGER PRIMARY KEY, + + -- policy_type denotes the type of the policy (buy or sell). + -- It can be either 'RFQ_POLICY_TYPE_SALE' or 'RFQ_POLICY_TYPE_PURCHASE'. + policy_type TEXT NOT NULL CHECK ( + policy_type IN ('RFQ_POLICY_TYPE_SALE', 'RFQ_POLICY_TYPE_PURCHASE') + ), + + -- scid is the short channel ID associated with the policy. + scid BIGINT NOT NULL, + + -- rfq_id is the unique identifier for the RFQ session. + rfq_id BLOB NOT NULL CHECK (length(rfq_id) = 32), + + -- peer is the public key of the peer node. + peer BLOB NOT NULL CHECK (length(peer) = 33), + + -- asset_id is the optional asset ID. + asset_id BLOB CHECK (length(asset_id) = 32), + + -- asset_group_key is the optional asset group key. + asset_group_key BLOB CHECK (length(asset_group_key) = 33), + + -- rate_coefficient is the coefficient of the exchange rate. + rate_coefficient BLOB NOT NULL, + + -- rate_scale is the scale of the exchange rate. + rate_scale INTEGER NOT NULL, + + -- expiry is the expiration timestamp of the policy. + expiry BIGINT NOT NULL, + + -- max_out_asset_amt is the maximum asset amount for sale policies. + max_out_asset_amt BIGINT, + + -- payment_max_msat is the maximum payment amount for purchase policies. + payment_max_msat BIGINT, + + -- request_asset_max_amt is the requested maximum asset amount. + request_asset_max_amt BIGINT, + + -- request_payment_max_msat is the requested maximum payment amount. + request_payment_max_msat BIGINT, + + -- price_oracle_metadata contains metadata about the price oracle. + price_oracle_metadata TEXT, + + -- request_version is the version of the RFQ request. + request_version INTEGER, + + -- agreed_at is the timestamp when the policy was agreed upon. + agreed_at BIGINT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS rfq_policies_rfq_id_idx ON rfq_policies (rfq_id); diff --git a/tapdb/sqlc/models.go b/tapdb/sqlc/models.go index 22a61d275..6dd37ef80 100644 --- a/tapdb/sqlc/models.go +++ b/tapdb/sqlc/models.go @@ -366,6 +366,26 @@ type ProofType struct { ProofType string } +type RfqPolicy struct { + ID int64 + PolicyType string + Scid int64 + RfqID []byte + Peer []byte + AssetID []byte + AssetGroupKey []byte + RateCoefficient []byte + RateScale int32 + Expiry int64 + MaxOutAssetAmt sql.NullInt64 + PaymentMaxMsat sql.NullInt64 + RequestAssetMaxAmt sql.NullInt64 + RequestPaymentMaxMsat sql.NullInt64 + PriceOracleMetadata sql.NullString + RequestVersion sql.NullInt32 + AgreedAt int64 +} + type ScriptKey struct { ScriptKeyID int64 InternalKeyID int64 diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index b6fb1fa32..8fee84c9b 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -47,6 +47,7 @@ type Querier interface { DeleteUniverseSupplyLeaf(ctx context.Context, arg DeleteUniverseSupplyLeafParams) error DeleteUniverseSupplyLeaves(ctx context.Context, namespaceRoot string) error DeleteUniverseSupplyRoot(ctx context.Context, namespaceRoot string) error + FetchActiveRfqPolicies(ctx context.Context, minExpiry int64) ([]RfqPolicy, error) FetchAddrEvent(ctx context.Context, id int64) (FetchAddrEventRow, error) FetchAddrEventByAddrKeyAndOutpoint(ctx context.Context, arg FetchAddrEventByAddrKeyAndOutpointParams) (FetchAddrEventByAddrKeyAndOutpointRow, error) FetchAddrEventOutputs(ctx context.Context, addrEventID int64) ([]FetchAddrEventOutputsRow, error) @@ -142,6 +143,7 @@ type Querier interface { InsertNewProofEvent(ctx context.Context, arg InsertNewProofEventParams) error InsertNewSyncEvent(ctx context.Context, arg InsertNewSyncEventParams) error InsertPassiveAsset(ctx context.Context, arg InsertPassiveAssetParams) error + InsertRfqPolicy(ctx context.Context, arg InsertRfqPolicyParams) (int64, error) InsertRootKey(ctx context.Context, arg InsertRootKeyParams) error InsertSupplyCommitTransition(ctx context.Context, arg InsertSupplyCommitTransitionParams) (int64, error) InsertSupplyCommitment(ctx context.Context, arg InsertSupplyCommitmentParams) (int64, error) diff --git a/tapdb/sqlc/queries/rfq.sql b/tapdb/sqlc/queries/rfq.sql new file mode 100644 index 000000000..39b5f9c9b --- /dev/null +++ b/tapdb/sqlc/queries/rfq.sql @@ -0,0 +1,46 @@ +-- name: InsertRfqPolicy :one +INSERT INTO rfq_policies ( + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +) +VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, $16 +) +RETURNING id; + +-- name: FetchActiveRfqPolicies :many +SELECT + id, + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +FROM rfq_policies +WHERE expiry >= sqlc.arg('min_expiry'); diff --git a/tapdb/sqlc/rfq.sql.go b/tapdb/sqlc/rfq.sql.go new file mode 100644 index 000000000..fabb099d0 --- /dev/null +++ b/tapdb/sqlc/rfq.sql.go @@ -0,0 +1,144 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: rfq.sql + +package sqlc + +import ( + "context" + "database/sql" +) + +const FetchActiveRfqPolicies = `-- name: FetchActiveRfqPolicies :many +SELECT + id, + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +FROM rfq_policies +WHERE expiry >= $1 +` + +func (q *Queries) FetchActiveRfqPolicies(ctx context.Context, minExpiry int64) ([]RfqPolicy, error) { + rows, err := q.db.QueryContext(ctx, FetchActiveRfqPolicies, minExpiry) + if err != nil { + return nil, err + } + defer rows.Close() + var items []RfqPolicy + for rows.Next() { + var i RfqPolicy + if err := rows.Scan( + &i.ID, + &i.PolicyType, + &i.Scid, + &i.RfqID, + &i.Peer, + &i.AssetID, + &i.AssetGroupKey, + &i.RateCoefficient, + &i.RateScale, + &i.Expiry, + &i.MaxOutAssetAmt, + &i.PaymentMaxMsat, + &i.RequestAssetMaxAmt, + &i.RequestPaymentMaxMsat, + &i.PriceOracleMetadata, + &i.RequestVersion, + &i.AgreedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const InsertRfqPolicy = `-- name: InsertRfqPolicy :one +INSERT INTO rfq_policies ( + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +) +VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, $16 +) +RETURNING id +` + +type InsertRfqPolicyParams struct { + PolicyType string + Scid int64 + RfqID []byte + Peer []byte + AssetID []byte + AssetGroupKey []byte + RateCoefficient []byte + RateScale int32 + Expiry int64 + MaxOutAssetAmt sql.NullInt64 + PaymentMaxMsat sql.NullInt64 + RequestAssetMaxAmt sql.NullInt64 + RequestPaymentMaxMsat sql.NullInt64 + PriceOracleMetadata sql.NullString + RequestVersion sql.NullInt32 + AgreedAt int64 +} + +func (q *Queries) InsertRfqPolicy(ctx context.Context, arg InsertRfqPolicyParams) (int64, error) { + row := q.db.QueryRowContext(ctx, InsertRfqPolicy, + arg.PolicyType, + arg.Scid, + arg.RfqID, + arg.Peer, + arg.AssetID, + arg.AssetGroupKey, + arg.RateCoefficient, + arg.RateScale, + arg.Expiry, + arg.MaxOutAssetAmt, + arg.PaymentMaxMsat, + arg.RequestAssetMaxAmt, + arg.RequestPaymentMaxMsat, + arg.PriceOracleMetadata, + arg.RequestVersion, + arg.AgreedAt, + ) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/tapdb/sqlc/schemas/generated_schema.sql b/tapdb/sqlc/schemas/generated_schema.sql index 0decc4bf0..6728b94af 100644 --- a/tapdb/sqlc/schemas/generated_schema.sql +++ b/tapdb/sqlc/schemas/generated_schema.sql @@ -782,6 +782,63 @@ CREATE TABLE proof_types ( proof_type TEXT PRIMARY KEY ); +CREATE TABLE rfq_policies ( + id INTEGER PRIMARY KEY, + + -- policy_type denotes the type of the policy (buy or sell). + -- It can be either 'RFQ_POLICY_TYPE_SALE' or 'RFQ_POLICY_TYPE_PURCHASE'. + policy_type TEXT NOT NULL CHECK ( + policy_type IN ('RFQ_POLICY_TYPE_SALE', 'RFQ_POLICY_TYPE_PURCHASE') + ), + + -- scid is the short channel ID associated with the policy. + scid BIGINT NOT NULL, + + -- rfq_id is the unique identifier for the RFQ session. + rfq_id BLOB NOT NULL CHECK (length(rfq_id) = 32), + + -- peer is the public key of the peer node. + peer BLOB NOT NULL CHECK (length(peer) = 33), + + -- asset_id is the optional asset ID. + asset_id BLOB CHECK (length(asset_id) = 32), + + -- asset_group_key is the optional asset group key. + asset_group_key BLOB CHECK (length(asset_group_key) = 33), + + -- rate_coefficient is the coefficient of the exchange rate. + rate_coefficient BLOB NOT NULL, + + -- rate_scale is the scale of the exchange rate. + rate_scale INTEGER NOT NULL, + + -- expiry is the expiration timestamp of the policy. + expiry BIGINT NOT NULL, + + -- max_out_asset_amt is the maximum asset amount for sale policies. + max_out_asset_amt BIGINT, + + -- payment_max_msat is the maximum payment amount for purchase policies. + payment_max_msat BIGINT, + + -- request_asset_max_amt is the requested maximum asset amount. + request_asset_max_amt BIGINT, + + -- request_payment_max_msat is the requested maximum payment amount. + request_payment_max_msat BIGINT, + + -- price_oracle_metadata contains metadata about the price oracle. + price_oracle_metadata TEXT, + + -- request_version is the version of the RFQ request. + request_version INTEGER, + + -- agreed_at is the timestamp when the policy was agreed upon. + agreed_at BIGINT NOT NULL +); + +CREATE UNIQUE INDEX rfq_policies_rfq_id_idx ON rfq_policies (rfq_id); + CREATE TABLE script_keys ( script_key_id INTEGER PRIMARY KEY, diff --git a/tapdb/sqlutils.go b/tapdb/sqlutils.go index e3f269100..94872d703 100644 --- a/tapdb/sqlutils.go +++ b/tapdb/sqlutils.go @@ -40,6 +40,18 @@ func sqlInt64[T constraints.Integer](num T) sql.NullInt64 { } } +// sqlPtrInt64 turns a pointer to a numerical integer type into the NullInt64 +// that sql/sqlc uses. +func sqlPtrInt64[T constraints.Integer](num *T) sql.NullInt64 { + if num == nil { + return sql.NullInt64{} + } + return sql.NullInt64{ + Int64: int64(*num), + Valid: true, + } +} + // sqlInt32 turns a numerical integer type into the NullInt32 that sql/sqlc // uses when an integer field can be permitted to be NULL. // @@ -52,6 +64,18 @@ func sqlInt32[T constraints.Integer](num T) sql.NullInt32 { } } +// sqlPtrInt32 turns a pointer to a numerical integer type into the NullInt32 +// that sql/sqlc uses. +func sqlPtrInt32[T constraints.Integer](num *T) sql.NullInt32 { + if num == nil { + return sql.NullInt32{} + } + return sql.NullInt32{ + Int32: int32(*num), + Valid: true, + } +} + // sqlOptInt32 turns an option of a numerical integer type into the NullInt32 // that sql/sqlc uses when an integer field can be permitted to be NULL. func sqlOptInt32[T constraints.Integer](num fn.Option[T]) sql.NullInt32 { @@ -132,6 +156,15 @@ func extractSqlInt64[T constraints.Integer](num sql.NullInt64) T { return T(num.Int64) } +// extractSqlInt64Ptr turns a NullInt64 into a pointer to a numerical type. +func extractSqlInt64Ptr[T constraints.Integer](num sql.NullInt64) *T { + if !num.Valid { + return nil + } + val := T(num.Int64) + return &val +} + // extractSqlInt32 turns a NullInt32 into a numerical type. This can be useful // when reading directly from the database, as this function handles extracting // the inner value from the "option"-like struct. @@ -139,6 +172,15 @@ func extractSqlInt32[T constraints.Integer](num sql.NullInt32) T { return T(num.Int32) } +// extractSqlInt32Ptr turns a NullInt32 into a pointer to a numerical type. +func extractSqlInt32Ptr[T constraints.Integer](num sql.NullInt32) *T { + if !num.Valid { + return nil + } + val := T(num.Int32) + return &val +} + // extractOptSqlInt32 turns a NullInt32 into an option of a numerical type. func extractOptSqlInt32[T constraints.Integer](num sql.NullInt32) fn.Option[T] { if !num.Valid {