Skip to content

Commit

Permalink
lsps2: cleanup expired promises
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Oct 23, 2023
1 parent 71c8ea5 commit 084a4d8
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 4 deletions.
43 changes: 43 additions & 0 deletions lsps2/cleanup_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package lsps2

import (
"context"
"log"
"time"
)

type CleanupService struct {
store Lsps2Store
}

// The interval to clean unused promises and buy registrations.
var CleanupInterval time.Duration = time.Hour

// The relax period is a period where expired promises may still be valid, if
// the current chainfees are cheaper than the fees in the promise itself. It is
// set to ~two weeks.
var RelaxPeriod time.Duration = time.Hour * 24 * 14

func NewCleanupService(store Lsps2Store) *CleanupService {
return &CleanupService{
store: store,
}
}

// Periodically cleans up unused buy registrations and promises that have
// expired before the relax interval.
func (c *CleanupService) Start(ctx context.Context) {
for {
before := time.Now().UTC().Add(-RelaxPeriod)
err := c.store.RemoveUnusedExpired(ctx, before)
if err != nil {
log.Printf("Failed to remove unused expired registrations before %v: %v", before, err)
}
select {
case <-time.After(CleanupInterval):
continue
case <-ctx.Done():
return
}
}
}
4 changes: 4 additions & 0 deletions lsps2/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func (s *mockLsps2Store) SavePromises(ctx context.Context, req *SavePromises) er
return nil
}

func (s *mockLsps2Store) RemoveUnusedExpired(ctx context.Context, before time.Time) error {
return nil
}

type mockLightningClient struct {
openResponses []*wire.OutPoint
openRequests []*lightning.OpenChannelRequest
Expand Down
1 change: 1 addition & 0 deletions lsps2/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ type Lsps2Store interface {
GetBuyRegistration(ctx context.Context, scid lightning.ShortChannelID) (*BuyRegistration, error)
SetChannelOpened(ctx context.Context, channelOpened *ChannelOpened) error
SetCompleted(ctx context.Context, registrationId uint64) error
RemoveUnusedExpired(ctx context.Context, before time.Time) error
}
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func main() {

ctx, cancel := context.WithCancel(context.Background())
openingService := shared.NewOpeningService(openingStore, nodesService)
cleanupService := lsps2.NewCleanupService(lsps2Store)
go cleanupService.Start(ctx)
var interceptors []interceptor.HtlcInterceptor
for _, node := range nodes {
var htlcInterceptor interceptor.HtlcInterceptor
Expand Down
58 changes: 56 additions & 2 deletions postgresql/lsps2_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package postgresql
import (
"context"
"fmt"
"log"
"strings"
"time"

"github.com/breez/lspd/lightning"
"github.com/breez/lspd/lsps0"
"github.com/breez/lspd/lsps2"
"github.com/breez/lspd/shared"
"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -233,13 +236,64 @@ func (s *Lsps2Store) SavePromises(

rows := [][]interface{}{}
for _, p := range req.Menu {
rows = append(rows, []interface{}{p.Promise, req.Token})
rows = append(rows, []interface{}{p.Promise, req.Token, p.ValidUntil})
}
_, err := s.pool.CopyFrom(
ctx,
pgx.Identifier{"lsps2", "promises"},
[]string{"promise", "token"},
[]string{"promise", "token", "valid_until"},
pgx.CopyFromRows(rows),
)
return err
}

func (s *Lsps2Store) RemoveUnusedExpired(
ctx context.Context,
before time.Time,
) error {
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
// Rollback will not do anything if Commit() has already been called.
defer tx.Rollback(ctx)

timestamp := before.Format(lsps0.TIME_FORMAT)

// Promises can be deleted without issue.
tag, err := tx.Exec(
ctx,
`DELETE FROM lsps2.buy_registrations r
WHERE r.id IN (
SELECT sr.id
FROM lsps2.buy_registrations sr
LEFT JOIN lsps2.bought_channels sb ON sr.id = sb.registration_id
WHERE sb.registration_id IS NULL
AND sr.params_valid_until < $1
)`,
timestamp,
)
if err != nil {
return err
}
rowsAffected := tag.RowsAffected()
if rowsAffected > 0 {
log.Printf("Deleted %d expired buy registrations before %s", rowsAffected, timestamp)
}

tag, err = tx.Exec(
ctx,
`DELETE FROM lsps2.promises
WHERE valid_until < $1`,
timestamp,
)
if err != nil {
return err
}
rowsAffected = tag.RowsAffected()
if rowsAffected > 0 {
log.Printf("Deleted %d expired promises before %s", rowsAffected, timestamp)
}

return tx.Commit(ctx)
}
6 changes: 4 additions & 2 deletions postgresql/migrations/000014_lsps2_buy.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ CREATE INDEX idx_lsps2_bought_channels_registration_id ON lsps2.bought_channels

CREATE TABLE lsps2.promises (
promise varchar PRIMARY KEY,
token varchar NOT NULL
)
token varchar NOT NULL,
valid_until varchar NOT NULL
);
CREATE INDEX idx_lsps2_promises_valid_until ON lsps2.promises (valid_until);

0 comments on commit 084a4d8

Please sign in to comment.