diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 4ca32f168e3..11a13595bc0 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -1114,8 +1114,9 @@ func (p *paymentLifecycle) patchLegacyPaymentHash( } // reloadInflightAttempts is called when the payment lifecycle is resumed after -// a restart. It reloads all inflight attempts from the control tower and -// collects the results of the attempts that have been sent before. +// a restart. It reloads all inflight attempts from the control tower, +// reconciles each attempt via the configured ReconcileAttempt callback, and +// then collects the results of the attempts that have been confirmed in-flight. func (p *paymentLifecycle) reloadInflightAttempts() (paymentsdb.DBMPPayment, error) { @@ -1134,6 +1135,18 @@ func (p *paymentLifecycle) reloadInflightAttempts() (paymentsdb.DBMPPayment, // it's a legacy payment. a = p.patchLegacyPaymentHash(a) + // Execute the configured reconciliation strategy. + if err := p.router.cfg.ReconcileAttempt(&a); err != nil { + log.Warnf("Reconciliation failed for attempt "+ + "%v in payment %v: %v. Skipping result "+ + "collection; will retry on next restart.", + a.AttemptID, p.identifier, err) + + continue + } + + // The HTLC attempt is confirmed to be in-flight, so it + // is safe to proceed to awaiting the attempt result. p.resultCollector(&a) } diff --git a/routing/payment_lifecycle_test.go b/routing/payment_lifecycle_test.go index 7e94315a7dc..a860c34f3f1 100644 --- a/routing/payment_lifecycle_test.go +++ b/routing/payment_lifecycle_test.go @@ -86,6 +86,7 @@ func newTestPaymentLifecycle(t *testing.T) (*paymentLifecycle, *mockers) { TrafficShaper: fn.Some[htlcswitch.AuxTrafficShaper]( &mockTrafficShaper{}, ), + ReconcileAttempt: noOpReconcile, }, quit: quitChan, } @@ -1862,3 +1863,78 @@ func TestReloadInflightAttemptsLegacy(t *testing.T) { // Assert the result is received as expected. require.Equal(t, result, r.result) } + +// TestReloadInflightAttemptsWithReconciliation checks that when a +// ReconcileFunc is configured, it is called for each in-flight attempt and +// result collection proceeds on success. +func TestReloadInflightAttemptsWithReconciliation(t *testing.T) { + t.Parallel() + + p, m := newTestPaymentLifecycle(t) + + // Create an attempt. + paymentAmt := 10_000 + preimage := lntypes.Preimage{1} + attempt := makeSettledAttempt(t, paymentAmt, preimage) + + // Track which attempts the callback receives. + var reconciledIDs []uint64 + p.router.cfg.ReconcileAttempt = func(a *paymentsdb.HTLCAttempt) error { + reconciledIDs = append(reconciledIDs, a.AttemptID) + return nil + } + + // Mock FetchPayment and InFlightHTLCs. + m.control.On("FetchPayment", p.identifier).Return(m.payment, nil).Once() + m.payment.On("InFlightHTLCs").Return( + []paymentsdb.HTLCAttempt{*attempt}, + ).Once() + + // Call the method under test. + payment, err := p.reloadInflightAttempts() + require.NoError(t, err) + require.Equal(t, m.payment, payment) + + // The callback should have been invoked with the attempt. + require.Len(t, reconciledIDs, 1) + require.Equal(t, attempt.AttemptID, reconciledIDs[0]) + + // Result collection should have proceeded. + require.Equal(t, 1, m.collectResultsCount) +} + +// TestReloadInflightAttemptsReconciliationError checks that when +// ReconcileFunc returns an error, result collection is skipped for that +// attempt. +func TestReloadInflightAttemptsReconciliationError(t *testing.T) { + t.Parallel() + + p, m := newTestPaymentLifecycle(t) + + // Create an attempt. + paymentAmt := 10_000 + preimage := lntypes.Preimage{1} + attempt := makeSettledAttempt(t, paymentAmt, preimage) + + // Configure a ReconcileFunc that always fails, simulating + // the case where the htlc's in-flight status cannot be + // confirmed. + p.router.cfg.ReconcileAttempt = func(a *paymentsdb.HTLCAttempt) error { + return errors.New("htlc status unknown: dispatch unavailable") + } + + // Mock FetchPayment and InFlightHTLCs. + m.control.On("FetchPayment", p.identifier).Return(m.payment, nil).Once() + m.payment.On("InFlightHTLCs").Return( + []paymentsdb.HTLCAttempt{*attempt}, + ).Once() + + // Call the method under test. + payment, err := p.reloadInflightAttempts() + require.NoError(t, err) + require.Equal(t, m.payment, payment) + + // Result collection should NOT have been called — the attempt was + // skipped because reconciliation failed. + require.Equal(t, 0, m.collectResultsCount) +} diff --git a/routing/router.go b/routing/router.go index bd347f5e71e..9a17c87f080 100644 --- a/routing/router.go +++ b/routing/router.go @@ -185,6 +185,24 @@ type MissionControlQuerier interface { amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64 } +// ReconcileFunc defines the callback invoked for each in-flight HTLC attempt +// during startup before result collection begins. This enables write-first +// recovery for remote dispatchers that need to confirm dispatch status after +// a crash. +// +// Implementations should re-dispatch the attempt idempotently, returning nil +// once the attempt is confirmed in-flight (newly dispatched or duplicate +// acknowledged). Transient failures should be retried internally; only return +// an error for unresolvable cases. +type ReconcileFunc func(a *paymentsdb.HTLCAttempt) error + +// noOpReconcile is the default ReconcileFunc that proceeds directly to result +// tracking, preserving historic behavior of the daemon. The router and switch +// share a crash domain, so no additional reconciliation is needed. +func noOpReconcile(_ *paymentsdb.HTLCAttempt) error { + return nil +} + // FeeSchema is the set fee configuration for a Lightning Node on the network. // Using the coefficients described within the schema, the required fee to // forward outgoing payments can be derived. @@ -295,6 +313,14 @@ type Config struct { // TrafficShaper is an optional traffic shaper that can be used to // control the outgoing channel of a payment. TrafficShaper fn.Option[htlcswitch.AuxTrafficShaper] + + // ReconcileAttempt is the strategy executed for each in-flight + // HTLC attempt during startup before result collection begins. + // See ReconcileFunc for details. + // + // Defaults to noOpReconcile when not set, preserving standard + // lnd behavior where the router and switch share a crash domain. + ReconcileAttempt ReconcileFunc } // EdgeLocator is a struct used to identify a specific edge. @@ -338,6 +364,13 @@ type ChannelRouter struct { // channel graph is a subset of the UTXO set) set, then the router will proceed // to fully sync to the latest state of the UTXO set. func New(cfg Config) (*ChannelRouter, error) { + // Default to a no-op reconciliation callback, preserving + // standard lnd behavior where the router and switch share a + // crash domain. + if cfg.ReconcileAttempt == nil { + cfg.ReconcileAttempt = noOpReconcile + } + return &ChannelRouter{ cfg: &cfg, quit: make(chan struct{}),