diff --git a/lsps2/cleanup_service.go b/lsps2/cleanup_service.go new file mode 100644 index 00000000..09844ec1 --- /dev/null +++ b/lsps2/cleanup_service.go @@ -0,0 +1,43 @@ +package lsps2 + +import ( + "context" + "log" + "time" +) + +type CleanupService struct { + store Lsps2Store +} + +// The interval to clean unused promises and buy registrations. +var CleanupInterval time.Duration = time.Hour + +// The relax period is a period where expired promises may still be valid, if +// the current chainfees are cheaper than the fees in the promise itself. It is +// set to ~two weeks. +var RelaxPeriod time.Duration = time.Hour * 24 * 14 + +func NewCleanupService(store Lsps2Store) *CleanupService { + return &CleanupService{ + store: store, + } +} + +// Periodically cleans up unused buy registrations and promises that have +// expired before the relax interval. +func (c *CleanupService) Start(ctx context.Context) { + for { + before := time.Now().UTC().Add(-RelaxPeriod) + err := c.store.RemoveUnusedExpired(ctx, before) + if err != nil { + log.Printf("Failed to remove unused expired registrations before %v: %v", before, err) + } + select { + case <-time.After(CleanupInterval): + continue + case <-ctx.Done(): + return + } + } +} diff --git a/lsps2/mocks.go b/lsps2/mocks.go index acdac219..af93c78b 100644 --- a/lsps2/mocks.go +++ b/lsps2/mocks.go @@ -91,6 +91,10 @@ func (s *mockLsps2Store) SavePromises(ctx context.Context, req *SavePromises) er return nil } +func (s *mockLsps2Store) RemoveUnusedExpired(ctx context.Context, before time.Time) error { + return nil +} + type mockLightningClient struct { openResponses []*wire.OutPoint openRequests []*lightning.OpenChannelRequest diff --git a/lsps2/store.go b/lsps2/store.go index 21ed9f28..2f4d2a04 100644 --- a/lsps2/store.go +++ b/lsps2/store.go @@ -69,4 +69,5 @@ type Lsps2Store interface { GetBuyRegistration(ctx context.Context, scid lightning.ShortChannelID) (*BuyRegistration, error) SetChannelOpened(ctx context.Context, channelOpened *ChannelOpened) error SetCompleted(ctx context.Context, registrationId uint64) error + RemoveUnusedExpired(ctx context.Context, before time.Time) error } diff --git a/main.go b/main.go index d844077a..8f7d4da8 100644 --- a/main.go +++ b/main.go @@ -102,6 +102,8 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) openingService := shared.NewOpeningService(openingStore, nodesService) + cleanupService := lsps2.NewCleanupService(lsps2Store) + go cleanupService.Start(ctx) var interceptors []interceptor.HtlcInterceptor for _, node := range nodes { var htlcInterceptor interceptor.HtlcInterceptor diff --git a/postgresql/lsps2_store.go b/postgresql/lsps2_store.go index 8ec08d14..b1aeecb9 100644 --- a/postgresql/lsps2_store.go +++ b/postgresql/lsps2_store.go @@ -3,9 +3,12 @@ package postgresql import ( "context" "fmt" + "log" "strings" + "time" "github.com/breez/lspd/lightning" + "github.com/breez/lspd/lsps0" "github.com/breez/lspd/lsps2" "github.com/breez/lspd/shared" "github.com/btcsuite/btcd/wire" @@ -233,13 +236,64 @@ func (s *Lsps2Store) SavePromises( rows := [][]interface{}{} for _, p := range req.Menu { - rows = append(rows, []interface{}{p.Promise, req.Token}) + rows = append(rows, []interface{}{p.Promise, req.Token, p.ValidUntil}) } _, err := s.pool.CopyFrom( ctx, pgx.Identifier{"lsps2", "promises"}, - []string{"promise", "token"}, + []string{"promise", "token", "valid_until"}, pgx.CopyFromRows(rows), ) return err } + +func (s *Lsps2Store) RemoveUnusedExpired( + ctx context.Context, + before time.Time, +) error { + tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + // Rollback will not do anything if Commit() has already been called. + defer tx.Rollback(ctx) + + timestamp := before.Format(lsps0.TIME_FORMAT) + + // Promises can be deleted without issue. + tag, err := tx.Exec( + ctx, + `DELETE FROM lsps2.buy_registrations r + WHERE r.id IN ( + SELECT sr.id + FROM lsps2.buy_registrations sr + LEFT JOIN lsps2.bought_channels sb ON sr.id = sb.registration_id + WHERE sb.registration_id IS NULL + AND sr.params_valid_until < $1 + )`, + timestamp, + ) + if err != nil { + return err + } + rowsAffected := tag.RowsAffected() + if rowsAffected > 0 { + log.Printf("Deleted %d expired buy registrations before %s", rowsAffected, timestamp) + } + + tag, err = tx.Exec( + ctx, + `DELETE FROM lsps2.promises + WHERE valid_until < $1`, + timestamp, + ) + if err != nil { + return err + } + rowsAffected = tag.RowsAffected() + if rowsAffected > 0 { + log.Printf("Deleted %d expired promises before %s", rowsAffected, timestamp) + } + + return tx.Commit(ctx) +} diff --git a/postgresql/migrations/000014_lsps2_buy.up.sql b/postgresql/migrations/000014_lsps2_buy.up.sql index c805a157..0abf2e09 100644 --- a/postgresql/migrations/000014_lsps2_buy.up.sql +++ b/postgresql/migrations/000014_lsps2_buy.up.sql @@ -34,5 +34,7 @@ CREATE INDEX idx_lsps2_bought_channels_registration_id ON lsps2.bought_channels CREATE TABLE lsps2.promises ( promise varchar PRIMARY KEY, - token varchar NOT NULL -) + token varchar NOT NULL, + valid_until varchar NOT NULL +); +CREATE INDEX idx_lsps2_promises_valid_until ON lsps2.promises (valid_until);