From 2e7f506cb0417f69288b24a4b7880e457631da05 Mon Sep 17 00:00:00 2001 From: Enrique Lacal Date: Thu, 6 Jun 2024 15:08:00 +0100 Subject: [PATCH 1/3] fix: deleting listeners from event stream pagination Signed-off-by: Enrique Lacal --- pkg/fftm/stream_management.go | 24 ++++++++--- pkg/fftm/stream_management_test.go | 69 ++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 7 deletions(-) diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index 7ff02713..9a302e3f 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -32,7 +32,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/txhandler" ) -const ( +var ( startupPaginationLimit = 25 ) @@ -74,20 +74,30 @@ func (m *manager) restoreStreams() error { func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error { var lastInPage *fftypes.UUID + var nextListenerDefs []*apitypes.Listener + listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) + if err != nil { + return err + } for { - listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) - if err != nil { - return err - } if len(listenerDefs) == 0 { break } - for _, def := range listenerDefs { + for i, def := range listenerDefs { lastInPage = def.ID + if i == len(listenerDefs)-1 { + // Before we delete let's get the next page of listeners from the offset we are about to delete + nextListenerDefs, err = m.persistence.ListStreamListenersByCreateTime(ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) + if err != nil { + return err + } + } if err := m.persistence.DeleteListener(ctx, def.ID); err != nil { return err } } + + listenerDefs = nextListenerDefs } return nil } diff --git a/pkg/fftm/stream_management_test.go b/pkg/fftm/stream_management_test.go index 5cbfe36e..19d455d4 100644 --- a/pkg/fftm/stream_management_test.go +++ b/pkg/fftm/stream_management_test.go @@ -186,6 +186,7 @@ func TestDeleteStartedListenerFail(t *testing.T) { mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{ {ID: lID, StreamID: esID}, }, nil) + mp.On("ListStreamListenersByCreateTime", m.ctx, lID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{}, nil) mp.On("DeleteListener", m.ctx, lID).Return(fmt.Errorf("pop")) err := m.deleteAllStreamListeners(m.ctx, esID) @@ -194,6 +195,74 @@ func TestDeleteStartedListenerFail(t *testing.T) { mp.AssertExpectations(t) } +func TestDeleteStartedListenerWithPaginationFail(t *testing.T) { + + _, m, close := newTestManagerMockPersistence(t) + defer close() + + esID := apitypes.NewULID() + lID := apitypes.NewULID() + secondID := apitypes.NewULID() + fmt.Println(lID) + fmt.Println(secondID) + mp := m.persistence.(*persistencemocks.Persistence) + startupPaginationLimit = 2 + mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{ + {ID: lID, StreamID: esID}, + {ID: secondID, StreamID: esID}, + }, nil).Once() + thirdID := apitypes.NewULID() + mp.On("ListStreamListenersByCreateTime", m.ctx, secondID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{ + {ID: thirdID, StreamID: esID}, + }, fmt.Errorf("pop")).Once() + mp.On("DeleteListener", m.ctx, lID).Return(nil) + + err := m.deleteAllStreamListeners(m.ctx, esID) + assert.Regexp(t, "pop", err) + + startupPaginationLimit = 25 + + mp.AssertExpectations(t) +} + +func TestDeleteStartedListenerWithPagination(t *testing.T) { + + _, m, close := newTestManagerMockPersistence(t) + defer close() + + esID := apitypes.NewULID() + lID := apitypes.NewULID() + secondID := apitypes.NewULID() + fmt.Println(lID) + fmt.Println(secondID) + mp := m.persistence.(*persistencemocks.Persistence) + startupPaginationLimit = 2 + mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{ + {ID: lID, StreamID: esID}, + {ID: secondID, StreamID: esID}, + }, nil).Once() + thirdID := apitypes.NewULID() + mp.On("ListStreamListenersByCreateTime", m.ctx, secondID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{ + {ID: thirdID, StreamID: esID}, + }, nil).Once() + mp.On("ListStreamListenersByCreateTime", m.ctx, thirdID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{}, nil).Once() + mp.On("DeleteListener", m.ctx, lID).Return(nil) + mp.On("DeleteListener", m.ctx, secondID).Return(nil) + mp.On("DeleteListener", m.ctx, thirdID).Return(nil) + + err := m.deleteAllStreamListeners(m.ctx, esID) + assert.NoError(t, err) + + startupPaginationLimit = 25 + + mp.AssertExpectations(t) +} + func TestDeleteStreamBadID(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) From 89d968d79140c01aea10ee13e6d191f46723b19d Mon Sep 17 00:00:00 2001 From: Enrique Lacal Date: Thu, 6 Jun 2024 17:20:33 +0100 Subject: [PATCH 2/3] make this radically simpler Signed-off-by: Enrique Lacal --- pkg/fftm/stream_management.go | 23 ++++------------ pkg/fftm/stream_management_test.go | 44 ++---------------------------- 2 files changed, 9 insertions(+), 58 deletions(-) diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index 9a302e3f..fb88cf30 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -73,31 +73,20 @@ func (m *manager) restoreStreams() error { } func (m *manager) deleteAllStreamListeners(ctx context.Context, streamID *fftypes.UUID) error { - var lastInPage *fftypes.UUID - var nextListenerDefs []*apitypes.Listener - listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) - if err != nil { - return err - } for { + // Do not specify after as we just delete everything + listenerDefs, err := m.persistence.ListStreamListenersByCreateTime(ctx, nil, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) + if err != nil { + return err + } if len(listenerDefs) == 0 { break } - for i, def := range listenerDefs { - lastInPage = def.ID - if i == len(listenerDefs)-1 { - // Before we delete let's get the next page of listeners from the offset we are about to delete - nextListenerDefs, err = m.persistence.ListStreamListenersByCreateTime(ctx, lastInPage, startupPaginationLimit, txhandler.SortDirectionAscending, streamID) - if err != nil { - return err - } - } + for _, def := range listenerDefs { if err := m.persistence.DeleteListener(ctx, def.ID); err != nil { return err } } - - listenerDefs = nextListenerDefs } return nil } diff --git a/pkg/fftm/stream_management_test.go b/pkg/fftm/stream_management_test.go index 19d455d4..054f458a 100644 --- a/pkg/fftm/stream_management_test.go +++ b/pkg/fftm/stream_management_test.go @@ -186,7 +186,6 @@ func TestDeleteStartedListenerFail(t *testing.T) { mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{ {ID: lID, StreamID: esID}, }, nil) - mp.On("ListStreamListenersByCreateTime", m.ctx, lID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return([]*apitypes.Listener{}, nil) mp.On("DeleteListener", m.ctx, lID).Return(fmt.Errorf("pop")) err := m.deleteAllStreamListeners(m.ctx, esID) @@ -195,38 +194,6 @@ func TestDeleteStartedListenerFail(t *testing.T) { mp.AssertExpectations(t) } -func TestDeleteStartedListenerWithPaginationFail(t *testing.T) { - - _, m, close := newTestManagerMockPersistence(t) - defer close() - - esID := apitypes.NewULID() - lID := apitypes.NewULID() - secondID := apitypes.NewULID() - fmt.Println(lID) - fmt.Println(secondID) - mp := m.persistence.(*persistencemocks.Persistence) - startupPaginationLimit = 2 - mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( - []*apitypes.Listener{ - {ID: lID, StreamID: esID}, - {ID: secondID, StreamID: esID}, - }, nil).Once() - thirdID := apitypes.NewULID() - mp.On("ListStreamListenersByCreateTime", m.ctx, secondID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( - []*apitypes.Listener{ - {ID: thirdID, StreamID: esID}, - }, fmt.Errorf("pop")).Once() - mp.On("DeleteListener", m.ctx, lID).Return(nil) - - err := m.deleteAllStreamListeners(m.ctx, esID) - assert.Regexp(t, "pop", err) - - startupPaginationLimit = 25 - - mp.AssertExpectations(t) -} - func TestDeleteStartedListenerWithPagination(t *testing.T) { _, m, close := newTestManagerMockPersistence(t) @@ -235,22 +202,19 @@ func TestDeleteStartedListenerWithPagination(t *testing.T) { esID := apitypes.NewULID() lID := apitypes.NewULID() secondID := apitypes.NewULID() - fmt.Println(lID) - fmt.Println(secondID) mp := m.persistence.(*persistencemocks.Persistence) - startupPaginationLimit = 2 mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( []*apitypes.Listener{ {ID: lID, StreamID: esID}, {ID: secondID, StreamID: esID}, }, nil).Once() thirdID := apitypes.NewULID() - mp.On("ListStreamListenersByCreateTime", m.ctx, secondID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( []*apitypes.Listener{ {ID: thirdID, StreamID: esID}, }, nil).Once() - mp.On("ListStreamListenersByCreateTime", m.ctx, thirdID, startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( - []*apitypes.Listener{}, nil).Once() + mp.On("ListStreamListenersByCreateTime", m.ctx, (*fftypes.UUID)(nil), startupPaginationLimit, txhandler.SortDirectionAscending, esID).Return( + []*apitypes.Listener{}, nil) mp.On("DeleteListener", m.ctx, lID).Return(nil) mp.On("DeleteListener", m.ctx, secondID).Return(nil) mp.On("DeleteListener", m.ctx, thirdID).Return(nil) @@ -258,8 +222,6 @@ func TestDeleteStartedListenerWithPagination(t *testing.T) { err := m.deleteAllStreamListeners(m.ctx, esID) assert.NoError(t, err) - startupPaginationLimit = 25 - mp.AssertExpectations(t) } From c38e7b5aa27826611a409d5b3e57046ce7479ff9 Mon Sep 17 00:00:00 2001 From: Enrique Lacal Date: Thu, 6 Jun 2024 17:28:10 +0100 Subject: [PATCH 3/3] keep as const Signed-off-by: Enrique Lacal --- pkg/fftm/stream_management.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/fftm/stream_management.go b/pkg/fftm/stream_management.go index fb88cf30..200a380e 100644 --- a/pkg/fftm/stream_management.go +++ b/pkg/fftm/stream_management.go @@ -32,7 +32,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/txhandler" ) -var ( +const ( startupPaginationLimit = 25 )