diff --git a/core/block/detailservice/mock_detailservice/mock_Service.go b/core/block/detailservice/mock_detailservice/mock_Service.go index 3ff60d154d..2c0338b670 100644 --- a/core/block/detailservice/mock_detailservice/mock_Service.go +++ b/core/block/detailservice/mock_detailservice/mock_Service.go @@ -833,54 +833,6 @@ func (_c *MockService_SetListIsFavorite_Call) RunAndReturn(run func([]string, bo return _c } -// SetSource provides a mock function with given fields: ctx, objectId, source -func (_m *MockService) SetSource(ctx session.Context, objectId string, source []string) error { - ret := _m.Called(ctx, objectId, source) - - if len(ret) == 0 { - panic("no return value specified for SetSource") - } - - var r0 error - if rf, ok := ret.Get(0).(func(session.Context, string, []string) error); ok { - r0 = rf(ctx, objectId, source) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockService_SetSource_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSource' -type MockService_SetSource_Call struct { - *mock.Call -} - -// SetSource is a helper method to define mock.On call -// - ctx session.Context -// - objectId string -// - source []string -func (_e *MockService_Expecter) SetSource(ctx interface{}, objectId interface{}, source interface{}) *MockService_SetSource_Call { - return &MockService_SetSource_Call{Call: _e.mock.On("SetSource", ctx, objectId, source)} -} - -func (_c *MockService_SetSource_Call) Run(run func(ctx session.Context, objectId string, source []string)) *MockService_SetSource_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(session.Context), args[1].(string), args[2].([]string)) - }) - return _c -} - -func (_c *MockService_SetSource_Call) Return(_a0 error) *MockService_SetSource_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockService_SetSource_Call) RunAndReturn(run func(session.Context, string, []string) error) *MockService_SetSource_Call { - _c.Call.Return(run) - return _c -} - // SetSpaceInfo provides a mock function with given fields: spaceId, details func (_m *MockService) SetSpaceInfo(spaceId string, details *types.Struct) error { ret := _m.Called(spaceId, details) diff --git a/core/block/editor/smartblock/smarttest/smarttest.go b/core/block/editor/smartblock/smarttest/smarttest.go index d1caf97d6a..a752c63196 100644 --- a/core/block/editor/smartblock/smarttest/smarttest.go +++ b/core/block/editor/smartblock/smarttest/smarttest.go @@ -81,10 +81,11 @@ func (st *SmartTest) SetSpace(space smartblock.Space) { } type stubSpace struct { + id string } func (s *stubSpace) Id() string { - return "" + return s.id } func (s *stubSpace) TreeBuilder() objecttreebuilder.TreeBuilder { @@ -127,7 +128,8 @@ func (st *SmartTest) Space() smartblock.Space { if st.space != nil { return st.space } - return &stubSpace{} + + return &stubSpace{id: st.spaceId} } func (st *SmartTest) EnabledRelationAsDependentObjects() { diff --git a/core/indexer/fulltext.go b/core/indexer/fulltext.go index 451782108f..a312f849c9 100644 --- a/core/indexer/fulltext.go +++ b/core/indexer/fulltext.go @@ -25,7 +25,7 @@ import ( var ( ftIndexInterval = 1 * time.Second ftIndexForceMinInterval = time.Second * 10 - ftBatchLimit = 1000 + ftBatchLimit = 50 ftBlockMaxSize = 1024 * 1024 ) @@ -40,37 +40,28 @@ func (i *indexer) ForceFTIndex() { // MUST NOT be called more than once func (i *indexer) ftLoopRoutine() { ticker := time.NewTicker(ftIndexInterval) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - select { - case <-i.quit: - cancel() - case <-ctx.Done(): - } - }() - i.runFullTextIndexer(ctx) + i.runFullTextIndexer(i.componentCtx, i.spacesPriorityGet()) defer close(i.ftQueueFinished) var lastForceIndex time.Time for { select { - case <-ctx.Done(): + case <-i.componentCtx.Done(): return case <-ticker.C: - i.runFullTextIndexer(ctx) + i.runFullTextIndexer(i.componentCtx, i.spacesPriorityGet()) case <-i.forceFt: if time.Since(lastForceIndex) > ftIndexForceMinInterval { - i.runFullTextIndexer(ctx) + i.runFullTextIndexer(i.componentCtx, i.spacesPriorityGet()) lastForceIndex = time.Now() } } } } -func (i *indexer) runFullTextIndexer(ctx context.Context) { +func (i *indexer) runFullTextIndexer(ctx context.Context, spaceIdsPriority []string) { batcher := i.ftsearch.NewAutoBatcher(ftsearch.AutoBatcherRecommendedMaxDocs, ftsearch.AutoBatcherRecommendedMaxSize) - err := i.store.BatchProcessFullTextQueue(ctx, ftBatchLimit, func(objectIds []string) error { + err := i.store.BatchProcessFullTextQueue(ctx, spaceIdsPriority, ftBatchLimit, func(objectIds []string) error { for _, objectId := range objectIds { objDocs, err := i.prepareSearchDocument(ctx, objectId) if err != nil { @@ -224,14 +215,26 @@ func (i *indexer) ftInit() error { return err } if docCount == 0 { - ids, err := i.store.ListIds() + spaceIds, err := i.storageService.AllSpaceIds() if err != nil { return err } - for _, id := range ids { - if err := i.store.AddToIndexQueue(id); err != nil { + var fullIds []domain.FullID + for _, spaceId := range spaceIds { + ids, err := i.store.ListIdsBySpace(spaceId) + if err != nil { return err } + for _, id := range ids { + fullIds = append(fullIds, domain.FullID{ + ObjectID: id, + SpaceID: spaceId, + }) + } + } + err = i.store.AddToIndexQueue(fullIds...) + if err != nil { + return err } } } diff --git a/core/indexer/fulltext_test.go b/core/indexer/fulltext_test.go index 6e1d9035b4..b8ff26e6c4 100644 --- a/core/indexer/fulltext_test.go +++ b/core/indexer/fulltext_test.go @@ -17,6 +17,7 @@ import ( "github.com/anyproto/anytype-heart/core/block/editor/smartblock/smarttest" "github.com/anyproto/anytype-heart/core/block/editor/state" "github.com/anyproto/anytype-heart/core/block/source/mock_source" + "github.com/anyproto/anytype-heart/core/domain" "github.com/anyproto/anytype-heart/core/indexer/mock_indexer" "github.com/anyproto/anytype-heart/core/wallet" "github.com/anyproto/anytype-heart/core/wallet/mock_wallet" @@ -32,6 +33,7 @@ import ( "github.com/anyproto/anytype-heart/tests/blockbuilder" "github.com/anyproto/anytype-heart/tests/testutil" "github.com/anyproto/anytype-heart/util/pbtypes" + "github.com/anyproto/anytype-heart/util/taskmanager" ) type IndexerFixture struct { @@ -64,6 +66,7 @@ func NewIndexerFixture(t *testing.T) *IndexerFixture { testApp.Register(objectStore.FTSearch()) indxr := &indexer{} + indxr.spaceReindexQueue = taskmanager.NewTasksManager(1, indxr.taskPrioritySorter) indexerFx := &IndexerFixture{ indexer: indxr, @@ -85,7 +88,7 @@ func NewIndexerFixture(t *testing.T) *IndexerFixture { indexerFx.ftsearch = indxr.ftsearch indexerFx.pickerFx = mock_cache.NewMockObjectGetter(t) indxr.picker = indexerFx.pickerFx - indxr.quit = make(chan struct{}) + indxr.componentCtx, indxr.componentCancel = context.WithCancel(context.Background()) indxr.forceFt = make(chan struct{}) indxr.config = &config.Config{NetworkMode: pb.RpcAccount_LocalOnly} @@ -326,11 +329,14 @@ func TestRunFullTextIndexer(t *testing.T) { blockbuilder.ID("blockId1"), ), ))) - indexerFx.store.AddToIndexQueue("objectId" + strconv.Itoa(i)) + indexerFx.store.AddToIndexQueue(domain.FullID{ + ObjectID: "objectId" + strconv.Itoa(i), + SpaceID: "space1", + }) indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, "objectId"+strconv.Itoa(i)).Return(smartTest, nil).Once() } - indexerFx.runFullTextIndexer(context.Background()) + indexerFx.runFullTextIndexer(context.Background(), []string{"space1"}) count, _ := indexerFx.ftsearch.DocCount() assert.Equal(t, 10, int(count)) @@ -352,11 +358,14 @@ func TestRunFullTextIndexer(t *testing.T) { ), ))) indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, "objectId"+strconv.Itoa(i)).Return(smartTest, nil).Once() - indexerFx.store.AddToIndexQueue("objectId" + strconv.Itoa(i)) + indexerFx.store.AddToIndexQueue(domain.FullID{ + ObjectID: "objectId" + strconv.Itoa(i), + SpaceID: "space1", + }) } - indexerFx.runFullTextIndexer(context.Background()) + indexerFx.runFullTextIndexer(context.Background(), []string{"space1"}) count, _ = indexerFx.ftsearch.DocCount() assert.Equal(t, 10, int(count)) @@ -381,9 +390,12 @@ func TestPrepareSearchDocument_Reindex_Removed(t *testing.T) { blockbuilder.ID("blockId1"), ), ))) - indexerFx.store.AddToIndexQueue("objectId1") + indexerFx.store.AddToIndexQueue(domain.FullID{ + ObjectID: "objectId1", + SpaceID: "space1", + }) indexerFx.pickerFx.EXPECT().GetObject(mock.Anything, mock.Anything).Return(smartTest, nil) - indexerFx.runFullTextIndexer(context.Background()) + indexerFx.runFullTextIndexer(context.Background(), []string{"space1"}) count, _ = indexerFx.ftsearch.DocCount() assert.Equal(t, uint64(1), count) diff --git a/core/indexer/indexer.go b/core/indexer/indexer.go index cc94e819b5..4a7e555c37 100644 --- a/core/indexer/indexer.go +++ b/core/indexer/indexer.go @@ -10,6 +10,7 @@ import ( "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/spacestorage" + "github.com/gogo/protobuf/types" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -17,6 +18,9 @@ import ( "github.com/anyproto/anytype-heart/core/block/cache" "github.com/anyproto/anytype-heart/core/block/editor/smartblock" "github.com/anyproto/anytype-heart/core/block/source" + "github.com/anyproto/anytype-heart/core/domain" + "github.com/anyproto/anytype-heart/core/subscription" + "github.com/anyproto/anytype-heart/core/syncstatus/syncsubscriptions" "github.com/anyproto/anytype-heart/metrics" "github.com/anyproto/anytype-heart/pkg/lib/bundle" "github.com/anyproto/anytype-heart/pkg/lib/database" @@ -54,23 +58,31 @@ type Hasher interface { Hash() string } +type techSpaceIdGetter interface { + TechSpaceId() string +} + type indexer struct { - store objectstore.ObjectStore - fileStore filestore.FileStore - source source.Service - picker cache.ObjectGetter - ftsearch ftsearch.FTSearch - storageService storage.ClientStorage - - quit chan struct{} + store objectstore.ObjectStore + fileStore filestore.FileStore + source source.Service + picker cache.ObjectGetter + ftsearch ftsearch.FTSearch + storageService storage.ClientStorage + subscriptionService subscription.Service + + componentCtx context.Context + componentCancel context.CancelFunc ftQueueFinished chan struct{} config *config.Config btHash Hasher forceFt chan struct{} - lock sync.Mutex - reindexLogFields []zap.Field + spacesPrioritySubscription *syncsubscriptions.ObjectSubscription[*types.Struct] + lock sync.Mutex + reindexLogFields []zap.Field + spacesPriority []string } func (i *indexer) Init(a *app.App) (err error) { @@ -81,10 +93,11 @@ func (i *indexer) Init(a *app.App) (err error) { i.fileStore = app.MustComponent[filestore.FileStore](a) i.ftsearch = app.MustComponent[ftsearch.FTSearch](a) i.picker = app.MustComponent[cache.ObjectGetter](a) - i.quit = make(chan struct{}) i.ftQueueFinished = make(chan struct{}) i.forceFt = make(chan struct{}) i.config = app.MustComponent[*config.Config](a) + i.subscriptionService = app.MustComponent[subscription.Service](a) + i.componentCtx, i.componentCancel = context.WithCancel(context.Background()) return } @@ -93,6 +106,10 @@ func (i *indexer) Name() (name string) { } func (i *indexer) Run(context.Context) (err error) { + err = i.subscribeToSpaces() + if err != nil { + return + } return i.StartFullTextIndex() } @@ -105,7 +122,8 @@ func (i *indexer) StartFullTextIndex() (err error) { } func (i *indexer) Close(ctx context.Context) (err error) { - close(i.quit) + i.componentCancel() + i.spacesPrioritySubscription.Close() // we need to wait for the ftQueue processing to be finished gracefully. Because we may be in the middle of badger transaction <-i.ftQueueFinished return nil @@ -139,6 +157,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options .. for _, o := range options { o(opts) } + err := i.storageService.BindSpaceID(info.Space.Id(), info.Id) if err != nil { log.Error("failed to bind space id", zap.Error(err), zap.String("id", info.Id)) @@ -208,7 +227,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock.DocInfo, options .. } if !(opts.SkipFullTextIfHeadsNotChanged && lastIndexedHash == headHashToIndex) { - if err := i.store.AddToIndexQueue(info.Id); err != nil { + if err := i.store.AddToIndexQueue(domain.FullID{SpaceID: info.Space.Id(), ObjectID: info.Id}); err != nil { log.With("objectID", info.Id).Errorf("can't add id to index queue: %v", err) } } @@ -245,3 +264,59 @@ func headsHash(heads []string) string { sum := sha256.Sum256([]byte(strings.Join(heads, ","))) return fmt.Sprintf("%x", sum) } + +// subscribeToSpaces subscribes to the lastOpenedSpaces subscription +// it used by fulltext and reindexing to prioritize most recent spaces +func (i *indexer) subscribeToSpaces() error { + objectReq := subscription.SubscribeRequest{ + SubId: "lastOpenedSpaces", + Internal: true, + NoDepSubscription: true, + Keys: []string{bundle.RelationKeyTargetSpaceId.String(), bundle.RelationKeyLastOpenedDate.String(), bundle.RelationKeyLastModifiedDate.String()}, + Filters: []*model.BlockContentDataviewFilter{ + { + RelationKey: bundle.RelationKeyLayout.String(), + Condition: model.BlockContentDataviewFilter_Equal, + Value: pbtypes.Int64(int64(model.ObjectType_spaceView)), + }, + }, + Sorts: []*model.BlockContentDataviewSort{ + { + RelationKey: bundle.RelationKeyLastOpenedDate.String(), + Type: model.BlockContentDataviewSort_Desc, + IncludeTime: true, + Format: model.RelationFormat_date, + EmptyPlacement: model.BlockContentDataviewSort_End, + }, + }, + } + spacePriorityUpdateChan := make(chan []*types.Struct) + go i.spacesPrioritySubscriptionWatcher(spacePriorityUpdateChan) + i.spacesPrioritySubscription = syncsubscriptions.NewSubscription(i.subscriptionService, objectReq) + return i.spacesPrioritySubscription.Run(spacePriorityUpdateChan) +} + +func (i *indexer) spacesPriorityUpdate(priority []string) { + i.lock.Lock() + defer i.lock.Unlock() + i.spacesPriority = priority +} + +func (i *indexer) spacesPriorityGet() []string { + i.lock.Lock() + defer i.lock.Unlock() + return i.spacesPriority +} + +func (i *indexer) spacesPrioritySubscriptionWatcher(ch chan []*types.Struct) { + for { + select { + // subscription and chan will be closed on indexer close + case records := <-ch: + if records == nil { + return + } + i.spacesPriorityUpdate(pbtypes.ExtractString(records, bundle.RelationKeyTargetSpaceId.String(), true)) + } + } +} diff --git a/core/indexer/indexer_test.go b/core/indexer/indexer_test.go index a085842310..700d39402b 100644 --- a/core/indexer/indexer_test.go +++ b/core/indexer/indexer_test.go @@ -16,7 +16,8 @@ import ( func TestIndexer(t *testing.T) { for _, testCase := range []struct { - name string + name string + options smartblock.IndexOption }{ { @@ -51,7 +52,7 @@ func TestIndexer(t *testing.T) { // then assert.NoError(t, err) - count, _ := indexerFx.store.ListIDsFromFullTextQueue(0) + count, _ := indexerFx.store.ListIDsFromFullTextQueue("spaceId1", 0) assert.Equal(t, 0, len(count)) }) @@ -78,7 +79,7 @@ func TestIndexer(t *testing.T) { // then assert.NoError(t, err) - count, _ := indexerFx.store.ListIDsFromFullTextQueue(0) + count, _ := indexerFx.store.ListIDsFromFullTextQueue("spaceId1", 0) assert.Equal(t, 1, len(count)) }) } @@ -106,7 +107,7 @@ func TestIndexer(t *testing.T) { // then assert.NoError(t, err) - count, _ := indexerFx.store.ListIDsFromFullTextQueue(0) + count, _ := indexerFx.store.ListIDsFromFullTextQueue("spaceId1", 0) assert.Equal(t, 1, len(count)) }) } diff --git a/core/syncstatus/syncsubscriptions/objectsubscription.go b/core/syncstatus/syncsubscriptions/objectsubscription.go index 90545517d4..9630573733 100644 --- a/core/syncstatus/syncsubscriptions/objectsubscription.go +++ b/core/syncstatus/syncsubscriptions/objectsubscription.go @@ -2,6 +2,8 @@ package syncsubscriptions import ( "context" + "fmt" + "slices" "sync" "github.com/cheggaaa/mb/v3" @@ -38,8 +40,31 @@ type SubscriptionParams[T any] struct { Unset unset[T] } +func NewSubscription(service subscription.Service, request subscription.SubscribeRequest) *ObjectSubscription[*types.Struct] { + if request.Keys != nil && !slices.Contains(request.Keys, bundle.RelationKeyId.String()) { + request.Keys = append(request.Keys, bundle.RelationKeyId.String()) + } + return &ObjectSubscription[*types.Struct]{ + sorted: len(request.Sorts) > 0, + request: request, + service: service, + ch: make(chan struct{}), + extract: func(t *types.Struct) (string, *types.Struct) { + return pbtypes.GetString(t, bundle.RelationKeyId.String()), t + }, + update: func(s string, value *types.Value, s2 *types.Struct) *types.Struct { + s2.Fields[s] = value + return s2 + }, + unset: func(strings []string, s *types.Struct) *types.Struct { + return pbtypes.StructFilterKeys(s, strings) + }, + } +} + func NewIdSubscription(service subscription.Service, request subscription.SubscribeRequest) *ObjectSubscription[struct{}] { return &ObjectSubscription[struct{}]{ + sorted: len(request.Sorts) > 0, request: request, service: service, ch: make(chan struct{}), @@ -56,20 +81,25 @@ func NewIdSubscription(service subscription.Service, request subscription.Subscr } type ObjectSubscription[T any] struct { - request subscription.SubscribeRequest - service subscription.Service - ch chan struct{} - events *mb.MB[*pb.EventMessage] - ctx context.Context - cancel context.CancelFunc - sub map[string]*entry[T] - extract extract[T] - update update[T] - unset unset[T] - mx sync.Mutex -} - -func (o *ObjectSubscription[T]) Run() error { + sorted bool + request subscription.SubscribeRequest + service subscription.Service + ch chan struct{} + events *mb.MB[*pb.EventMessage] + ctx context.Context + cancel context.CancelFunc + sub map[string]*entry[T] + positions []string + extract extract[T] + update update[T] + unset unset[T] + updateChan chan []T + mx sync.Mutex +} + +// Run starts the subscription +// if updateChan is not nil, it will be used to notify about changes, including the first set of records +func (o *ObjectSubscription[T]) Run(updateChan chan []T) error { resp, err := o.service.Search(o.request) if err != nil { return err @@ -80,7 +110,11 @@ func (o *ObjectSubscription[T]) Run() error { for _, rec := range resp.Records { id, data := o.extract(rec) o.sub[id] = newEntry(data) + if o.sorted { + o.positions = append(o.positions, id) + } } + o.updateChan = updateChan go o.read() return nil } @@ -96,25 +130,96 @@ func (o *ObjectSubscription[T]) Len() int { return len(o.sub) } +func (o *ObjectSubscription[T]) iterateSorted(iter func(id string, data T) bool) { + for _, id := range o.positions { + val := o.sub[id] + if !iter(id, val.data) { + return + } + } +} + +func (o *ObjectSubscription[T]) sendToChan() { + if o.updateChan == nil { + return + } + + var vals = make([]T, 0, o.Len()) + o.Iterate(func(_ string, v T) bool { + vals = append(vals, v) + return true + }) + o.updateChan <- vals +} + func (o *ObjectSubscription[T]) Iterate(iter func(id string, data T) bool) { o.mx.Lock() defer o.mx.Unlock() - for id, ent := range o.sub { - if !iter(id, ent.data) { + if o.sorted { + o.iterateSorted(iter) + return + } + + for id, val := range o.sub { + if !iter(id, val.data) { return } } } +func (o *ObjectSubscription[T]) positionInsertAfter(after, newId string) { + if after == "" { + o.positions = append([]string{newId}, o.positions...) + return + } + + for i, id := range o.positions { + if id == after { + o.positions = append(o.positions[:i+1], append([]string{newId}, o.positions[i+1:]...)...) + return + } + + } +} +func (o *ObjectSubscription[T]) positionRemove(id string) { + for i, pos := range o.positions { + if pos == id { + o.positions = append(o.positions[:i], o.positions[i+1:]...) + return + } + } +} + +func (o *ObjectSubscription[T]) positionMoveIdAfter(after, id string) { + o.positionRemove(id) + o.positionInsertAfter(after, id) +} + func (o *ObjectSubscription[T]) read() { defer close(o.ch) readEvent := func(event *pb.EventMessage) { o.mx.Lock() defer o.mx.Unlock() + fmt.Printf("sub %s got event %t: %+v\n", o.request.SubId, event.Value, event.Value) switch v := event.Value.(type) { case *pb.EventMessageValueOfSubscriptionAdd: - o.sub[v.SubscriptionAdd.Id] = newEmptyEntry[T]() + if o.sorted { + o.positionInsertAfter(v.SubscriptionAdd.AfterId, v.SubscriptionAdd.Id) + } + if _, ok := o.sub[v.SubscriptionAdd.Id]; !ok { + o.sub[v.SubscriptionAdd.Id] = newEmptyEntry[T]() + } + case *pb.EventMessageValueOfSubscriptionPosition: + if o.sorted { + o.positionMoveIdAfter(v.SubscriptionPosition.AfterId, v.SubscriptionPosition.Id) + } + if _, ok := o.sub[v.SubscriptionPosition.Id]; !ok { + o.sub[v.SubscriptionPosition.Id] = newEmptyEntry[T]() + } case *pb.EventMessageValueOfSubscriptionRemove: + if o.sorted { + o.positionRemove(v.SubscriptionRemove.Id) + } delete(o.sub, v.SubscriptionRemove.Id) case *pb.EventMessageValueOfObjectDetailsAmend: curEntry := o.sub[v.ObjectDetailsAmend.Id] @@ -144,5 +249,6 @@ func (o *ObjectSubscription[T]) read() { return } readEvent(event) + o.sendToChan() } } diff --git a/core/syncstatus/syncsubscriptions/objectsubscription_test.go b/core/syncstatus/syncsubscriptions/objectsubscription_test.go index 8dabe872be..63042e7d12 100644 --- a/core/syncstatus/syncsubscriptions/objectsubscription_test.go +++ b/core/syncstatus/syncsubscriptions/objectsubscription_test.go @@ -116,7 +116,45 @@ func TestIdSubscription(t *testing.T) { } subService.EXPECT().Search(mock.Anything).Return(subscribeResponse, nil) sub := NewIdSubscription(subService, subscription.SubscribeRequest{}) - err := sub.Run() + err := sub.Run(nil) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + ids := make(map[string]struct{}) + sub.Iterate(func(id string, _ struct{}) bool { + ids[id] = struct{}{} + return true + }) + require.Len(t, ids, 2) + require.Contains(t, ids, "3") + require.Contains(t, ids, "4") +} + +func TestIdSubscriptionChan(t *testing.T) { + subService := mock_subscription.NewMockService(t) + events := mb.New[*pb.EventMessage](0) + records := makeStructs([]string{"1", "2", "3"}) + // for details amend, set and unset we just check that we handle them correctly (i.e. do nothing) + messages := []*pb.EventMessage{ + makeSubscriptionRemove("2"), + makeDetailsSet("1"), + makeDetailsUnset("2"), + makeDetailsAmend("3"), + makeSubscriptionAdd("4"), + makeSubscriptionRemove("1"), + makeSubscriptionAdd("3"), + makeSubscriptionRemove("5"), + } + for _, msg := range messages { + err := events.Add(context.Background(), msg) + require.NoError(t, err) + } + subscribeResponse := &subscription.SubscribeResponse{ + Output: events, + Records: records, + } + subService.EXPECT().Search(mock.Anything).Return(subscribeResponse, nil) + sub := NewIdSubscription(subService, subscription.SubscribeRequest{}) + err := sub.Run(nil) require.NoError(t, err) time.Sleep(100 * time.Millisecond) ids := make(map[string]struct{}) diff --git a/core/syncstatus/syncsubscriptions/syncingobjects.go b/core/syncstatus/syncsubscriptions/syncingobjects.go index ec88fe9768..deab862fe3 100644 --- a/core/syncstatus/syncsubscriptions/syncingobjects.go +++ b/core/syncstatus/syncsubscriptions/syncingobjects.go @@ -44,7 +44,7 @@ func (s *syncingObjects) Run() error { }, } s.objectSubscription = NewIdSubscription(s.service, objectReq) - errObjects := s.objectSubscription.Run() + errObjects := s.objectSubscription.Run(nil) if errObjects != nil { return fmt.Errorf("error running syncing objects: %w", errObjects) } diff --git a/pkg/lib/localstore/objectstore/indexer_store.go b/pkg/lib/localstore/objectstore/indexer_store.go index 110959a741..8515f0ca14 100644 --- a/pkg/lib/localstore/objectstore/indexer_store.go +++ b/pkg/lib/localstore/objectstore/indexer_store.go @@ -10,44 +10,78 @@ import ( "github.com/anyproto/any-store/query" "github.com/valyala/fastjson" + "github.com/anyproto/anytype-heart/core/domain" "github.com/anyproto/anytype-heart/pkg/lib/pb/model" ) -func (s *dsObjectStore) AddToIndexQueue(id string) error { +func (s *dsObjectStore) AddToIndexQueue(ids ...domain.FullID) error { arena := s.arenaPool.Get() defer func() { arena.Reset() s.arenaPool.Put(arena) }() - obj := arena.NewObject() - obj.Set("id", arena.NewString(id)) - _, err := s.fulltextQueue.UpsertOne(s.componentCtx, obj) - return err -} + txn, err := s.fulltextQueue.WriteTx(s.componentCtx) + if err != nil { + return fmt.Errorf("start write tx: %w", err) + } + rollback := func(err error) error { + return errors.Join(txn.Rollback(), err) + } -func (s *dsObjectStore) BatchProcessFullTextQueue(ctx context.Context, limit int, processIds func(ids []string) error) error { - for { - ids, err := s.ListIDsFromFullTextQueue(limit) - if err != nil { - return fmt.Errorf("list ids from fulltext queue: %w", err) - } - if len(ids) == 0 { - return nil - } - err = processIds(ids) + obj := arena.NewObject() + for _, id := range ids { + obj.Set("id", arena.NewString(id.ObjectID)) + obj.Set("spaceId", arena.NewString(id.SpaceID)) + _, err = s.fulltextQueue.UpsertOne(txn.Context(), obj) if err != nil { - return fmt.Errorf("process ids: %w", err) + return rollback(fmt.Errorf("upsert: %w", err)) } - err = s.RemoveIDsFromFullTextQueue(ids) - if err != nil { - return fmt.Errorf("remove ids from fulltext queue: %w", err) + } + + return txn.Commit() +} + +func (s *dsObjectStore) BatchProcessFullTextQueue(ctx context.Context, spaceIdsPriority []string, limit int, processIds func(ids []string) error) error { + proceed := 0 + for _, spaceId := range spaceIdsPriority { + for { + if limit <= 0 { + return nil + } + ids, err := s.ListIDsFromFullTextQueue(spaceId, limit) + if err != nil { + return fmt.Errorf("list ids from fulltext queue: %w", err) + } + if len(ids) == 0 { + break + } + + err = processIds(ids) + if err != nil { + return fmt.Errorf("process ids: %w", err) + } + proceed += len(ids) + err = s.RemoveIDsFromFullTextQueue(ids) + if err != nil { + return fmt.Errorf("remove ids from fulltext queue: %w", err) + } + if len(ids) < limit { + log.Infof("fulltext queue for space %s is fully proceed; less than limit(%d)", spaceId, len(ids)) + break + } + limit -= len(ids) } } + return nil } -func (s *dsObjectStore) ListIDsFromFullTextQueue(limit int) ([]string, error) { - iter, err := s.fulltextQueue.Find(nil).Limit(uint(limit)).Iter(s.componentCtx) +func (s *dsObjectStore) ListIDsFromFullTextQueue(spaceId string, limit int) ([]string, error) { + var filter any + if spaceId != "" { + filter = query.Key{Path: []string{"spaceId"}, Filter: query.NewComp(query.CompOpEq, spaceId)} + } + iter, err := s.fulltextQueue.Find(filter).Limit(uint(limit)).Iter(s.componentCtx) if err != nil { return nil, fmt.Errorf("create iterator: %w", err) } diff --git a/pkg/lib/localstore/objectstore/indexer_store_test.go b/pkg/lib/localstore/objectstore/indexer_store_test.go index de03907433..b10a7d1ef9 100644 --- a/pkg/lib/localstore/objectstore/indexer_store_test.go +++ b/pkg/lib/localstore/objectstore/indexer_store_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/anyproto/anytype-heart/core/domain" "github.com/anyproto/anytype-heart/pkg/lib/pb/model" ) @@ -14,11 +15,10 @@ func TestDsObjectStore_IndexQueue(t *testing.T) { s := NewStoreFixture(t) t.Run("add to queue", func(t *testing.T) { - require.NoError(t, s.AddToIndexQueue("one")) - require.NoError(t, s.AddToIndexQueue("one")) - require.NoError(t, s.AddToIndexQueue("two")) + require.NoError(t, s.AddToIndexQueue(domain.FullID{ObjectID: "one", SpaceID: "space1"}, domain.FullID{ObjectID: "two", SpaceID: "space1"})) + require.NoError(t, s.AddToIndexQueue(domain.FullID{ObjectID: "one", SpaceID: "space1"})) - ids, err := s.ListIDsFromFullTextQueue(0) + ids, err := s.ListIDsFromFullTextQueue("space1", 0) require.NoError(t, err) assert.ElementsMatch(t, []string{"one", "two"}, ids) @@ -26,7 +26,7 @@ func TestDsObjectStore_IndexQueue(t *testing.T) { t.Run("remove from queue", func(t *testing.T) { s.RemoveIDsFromFullTextQueue([]string{"one"}) - ids, err := s.ListIDsFromFullTextQueue(0) + ids, err := s.ListIDsFromFullTextQueue("space1", 0) require.NoError(t, err) assert.ElementsMatch(t, []string{"two"}, ids) @@ -37,20 +37,20 @@ func TestIndexerBatch(t *testing.T) { s := NewStoreFixture(t) t.Run("batch - no more than limit", func(t *testing.T) { - require.NoError(t, s.AddToIndexQueue("one")) - require.NoError(t, s.AddToIndexQueue("two")) - require.NoError(t, s.AddToIndexQueue("three")) + require.NoError(t, s.AddToIndexQueue(domain.FullID{ObjectID: "one", SpaceID: "space1"}, domain.FullID{ObjectID: "two", SpaceID: "space1"})) + require.NoError(t, s.AddToIndexQueue(domain.FullID{ObjectID: "three", SpaceID: "space2"})) var batches [][]string - err := s.BatchProcessFullTextQueue(context.Background(), 2, func(ids []string) error { + err := s.BatchProcessFullTextQueue(context.Background(), []string{"space2", "space1"}, 2, func(ids []string) error { batches = append(batches, ids) return nil }) require.NoError(t, err) require.Len(t, batches, 2) - assert.ElementsMatch(t, []string{"one", "two"}, batches[0]) - assert.ElementsMatch(t, []string{"three"}, batches[1]) + assert.ElementsMatch(t, []string{"three"}, batches[0]) // priority for space2 + assert.ElementsMatch(t, []string{"one", "two"}, batches[1]) + }) } diff --git a/pkg/lib/localstore/objectstore/mock_objectstore/mock_ObjectStore.go b/pkg/lib/localstore/objectstore/mock_objectstore/mock_ObjectStore.go index 308fbb3125..c690714441 100644 --- a/pkg/lib/localstore/objectstore/mock_objectstore/mock_ObjectStore.go +++ b/pkg/lib/localstore/objectstore/mock_objectstore/mock_ObjectStore.go @@ -40,16 +40,22 @@ func (_m *MockObjectStore) EXPECT() *MockObjectStore_Expecter { } // AddToIndexQueue provides a mock function with given fields: id -func (_m *MockObjectStore) AddToIndexQueue(id string) error { - ret := _m.Called(id) +func (_m *MockObjectStore) AddToIndexQueue(id ...domain.FullID) error { + _va := make([]interface{}, len(id)) + for _i := range id { + _va[_i] = id[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) if len(ret) == 0 { panic("no return value specified for AddToIndexQueue") } var r0 error - if rf, ok := ret.Get(0).(func(string) error); ok { - r0 = rf(id) + if rf, ok := ret.Get(0).(func(...domain.FullID) error); ok { + r0 = rf(id...) } else { r0 = ret.Error(0) } @@ -63,14 +69,21 @@ type MockObjectStore_AddToIndexQueue_Call struct { } // AddToIndexQueue is a helper method to define mock.On call -// - id string -func (_e *MockObjectStore_Expecter) AddToIndexQueue(id interface{}) *MockObjectStore_AddToIndexQueue_Call { - return &MockObjectStore_AddToIndexQueue_Call{Call: _e.mock.On("AddToIndexQueue", id)} +// - id ...domain.FullID +func (_e *MockObjectStore_Expecter) AddToIndexQueue(id ...interface{}) *MockObjectStore_AddToIndexQueue_Call { + return &MockObjectStore_AddToIndexQueue_Call{Call: _e.mock.On("AddToIndexQueue", + append([]interface{}{}, id...)...)} } -func (_c *MockObjectStore_AddToIndexQueue_Call) Run(run func(id string)) *MockObjectStore_AddToIndexQueue_Call { +func (_c *MockObjectStore_AddToIndexQueue_Call) Run(run func(id ...domain.FullID)) *MockObjectStore_AddToIndexQueue_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string)) + variadicArgs := make([]domain.FullID, len(args)-0) + for i, a := range args[0:] { + if a != nil { + variadicArgs[i] = a.(domain.FullID) + } + } + run(variadicArgs...) }) return _c } @@ -80,22 +93,22 @@ func (_c *MockObjectStore_AddToIndexQueue_Call) Return(_a0 error) *MockObjectSto return _c } -func (_c *MockObjectStore_AddToIndexQueue_Call) RunAndReturn(run func(string) error) *MockObjectStore_AddToIndexQueue_Call { +func (_c *MockObjectStore_AddToIndexQueue_Call) RunAndReturn(run func(...domain.FullID) error) *MockObjectStore_AddToIndexQueue_Call { _c.Call.Return(run) return _c } -// BatchProcessFullTextQueue provides a mock function with given fields: ctx, limit, processIds -func (_m *MockObjectStore) BatchProcessFullTextQueue(ctx context.Context, limit int, processIds func([]string) error) error { - ret := _m.Called(ctx, limit, processIds) +// BatchProcessFullTextQueue provides a mock function with given fields: ctx, spaceIdsPriority, limit, processIds +func (_m *MockObjectStore) BatchProcessFullTextQueue(ctx context.Context, spaceIdsPriority []string, limit int, processIds func([]string) error) error { + ret := _m.Called(ctx, spaceIdsPriority, limit, processIds) if len(ret) == 0 { panic("no return value specified for BatchProcessFullTextQueue") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int, func([]string) error) error); ok { - r0 = rf(ctx, limit, processIds) + if rf, ok := ret.Get(0).(func(context.Context, []string, int, func([]string) error) error); ok { + r0 = rf(ctx, spaceIdsPriority, limit, processIds) } else { r0 = ret.Error(0) } @@ -110,15 +123,16 @@ type MockObjectStore_BatchProcessFullTextQueue_Call struct { // BatchProcessFullTextQueue is a helper method to define mock.On call // - ctx context.Context +// - spaceIdsPriority []string // - limit int // - processIds func([]string) error -func (_e *MockObjectStore_Expecter) BatchProcessFullTextQueue(ctx interface{}, limit interface{}, processIds interface{}) *MockObjectStore_BatchProcessFullTextQueue_Call { - return &MockObjectStore_BatchProcessFullTextQueue_Call{Call: _e.mock.On("BatchProcessFullTextQueue", ctx, limit, processIds)} +func (_e *MockObjectStore_Expecter) BatchProcessFullTextQueue(ctx interface{}, spaceIdsPriority interface{}, limit interface{}, processIds interface{}) *MockObjectStore_BatchProcessFullTextQueue_Call { + return &MockObjectStore_BatchProcessFullTextQueue_Call{Call: _e.mock.On("BatchProcessFullTextQueue", ctx, spaceIdsPriority, limit, processIds)} } -func (_c *MockObjectStore_BatchProcessFullTextQueue_Call) Run(run func(ctx context.Context, limit int, processIds func([]string) error)) *MockObjectStore_BatchProcessFullTextQueue_Call { +func (_c *MockObjectStore_BatchProcessFullTextQueue_Call) Run(run func(ctx context.Context, spaceIdsPriority []string, limit int, processIds func([]string) error)) *MockObjectStore_BatchProcessFullTextQueue_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int), args[2].(func([]string) error)) + run(args[0].(context.Context), args[1].([]string), args[2].(int), args[3].(func([]string) error)) }) return _c } @@ -128,7 +142,7 @@ func (_c *MockObjectStore_BatchProcessFullTextQueue_Call) Return(_a0 error) *Moc return _c } -func (_c *MockObjectStore_BatchProcessFullTextQueue_Call) RunAndReturn(run func(context.Context, int, func([]string) error) error) *MockObjectStore_BatchProcessFullTextQueue_Call { +func (_c *MockObjectStore_BatchProcessFullTextQueue_Call) RunAndReturn(run func(context.Context, []string, int, func([]string) error) error) *MockObjectStore_BatchProcessFullTextQueue_Call { _c.Call.Return(run) return _c } @@ -1892,9 +1906,9 @@ func (_c *MockObjectStore_ListAllRelations_Call) RunAndReturn(run func(string) ( return _c } -// ListIDsFromFullTextQueue provides a mock function with given fields: limit -func (_m *MockObjectStore) ListIDsFromFullTextQueue(limit int) ([]string, error) { - ret := _m.Called(limit) +// ListIDsFromFullTextQueue provides a mock function with given fields: spaceId, limit +func (_m *MockObjectStore) ListIDsFromFullTextQueue(spaceId string, limit int) ([]string, error) { + ret := _m.Called(spaceId, limit) if len(ret) == 0 { panic("no return value specified for ListIDsFromFullTextQueue") @@ -1902,19 +1916,19 @@ func (_m *MockObjectStore) ListIDsFromFullTextQueue(limit int) ([]string, error) var r0 []string var r1 error - if rf, ok := ret.Get(0).(func(int) ([]string, error)); ok { - return rf(limit) + if rf, ok := ret.Get(0).(func(string, int) ([]string, error)); ok { + return rf(spaceId, limit) } - if rf, ok := ret.Get(0).(func(int) []string); ok { - r0 = rf(limit) + if rf, ok := ret.Get(0).(func(string, int) []string); ok { + r0 = rf(spaceId, limit) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]string) } } - if rf, ok := ret.Get(1).(func(int) error); ok { - r1 = rf(limit) + if rf, ok := ret.Get(1).(func(string, int) error); ok { + r1 = rf(spaceId, limit) } else { r1 = ret.Error(1) } @@ -1928,14 +1942,15 @@ type MockObjectStore_ListIDsFromFullTextQueue_Call struct { } // ListIDsFromFullTextQueue is a helper method to define mock.On call +// - spaceId string // - limit int -func (_e *MockObjectStore_Expecter) ListIDsFromFullTextQueue(limit interface{}) *MockObjectStore_ListIDsFromFullTextQueue_Call { - return &MockObjectStore_ListIDsFromFullTextQueue_Call{Call: _e.mock.On("ListIDsFromFullTextQueue", limit)} +func (_e *MockObjectStore_Expecter) ListIDsFromFullTextQueue(spaceId interface{}, limit interface{}) *MockObjectStore_ListIDsFromFullTextQueue_Call { + return &MockObjectStore_ListIDsFromFullTextQueue_Call{Call: _e.mock.On("ListIDsFromFullTextQueue", spaceId, limit)} } -func (_c *MockObjectStore_ListIDsFromFullTextQueue_Call) Run(run func(limit int)) *MockObjectStore_ListIDsFromFullTextQueue_Call { +func (_c *MockObjectStore_ListIDsFromFullTextQueue_Call) Run(run func(spaceId string, limit int)) *MockObjectStore_ListIDsFromFullTextQueue_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int)) + run(args[0].(string), args[1].(int)) }) return _c } @@ -1945,7 +1960,7 @@ func (_c *MockObjectStore_ListIDsFromFullTextQueue_Call) Return(_a0 []string, _a return _c } -func (_c *MockObjectStore_ListIDsFromFullTextQueue_Call) RunAndReturn(run func(int) ([]string, error)) *MockObjectStore_ListIDsFromFullTextQueue_Call { +func (_c *MockObjectStore_ListIDsFromFullTextQueue_Call) RunAndReturn(run func(string, int) ([]string, error)) *MockObjectStore_ListIDsFromFullTextQueue_Call { _c.Call.Return(run) return _c } diff --git a/pkg/lib/localstore/objectstore/objects.go b/pkg/lib/localstore/objectstore/objects.go index ab0c56fde0..72ed4e9278 100644 --- a/pkg/lib/localstore/objectstore/objects.go +++ b/pkg/lib/localstore/objectstore/objects.go @@ -97,12 +97,12 @@ type ObjectStore interface { GetRelationFormatByKey(key string) (model.RelationFormat, error) GetObjectType(url string) (*model.ObjectType, error) - BatchProcessFullTextQueue(ctx context.Context, limit int, processIds func(processIds []string) error) error + BatchProcessFullTextQueue(ctx context.Context, spaceIdsPriority []string, limit int, processIds func(processIds []string) error) error } type IndexerStore interface { - AddToIndexQueue(id string) error - ListIDsFromFullTextQueue(limit int) ([]string, error) + AddToIndexQueue(id ...domain.FullID) error + ListIDsFromFullTextQueue(spaceId string, limit int) ([]string, error) RemoveIDsFromFullTextQueue(ids []string) error FTSearch() ftsearch.FTSearch GetGlobalChecksums() (checksums *model.ObjectStoreChecksums, err error) diff --git a/pkg/lib/localstore/objectstore/objects_test.go b/pkg/lib/localstore/objectstore/objects_test.go index 201f03bce4..354ac0dcf1 100644 --- a/pkg/lib/localstore/objectstore/objects_test.go +++ b/pkg/lib/localstore/objectstore/objects_test.go @@ -256,7 +256,7 @@ func TestDeleteObject(t *testing.T) { err = s.SaveLastIndexedHeadsHash("id1", "hash1") require.NoError(t, err) - err = s.AddToIndexQueue("id1") + err = s.AddToIndexQueue(domain.FullID{ObjectID: "id1", SpaceID: "space1"}) require.NoError(t, err) // Act @@ -290,7 +290,7 @@ func TestDeleteObject(t *testing.T) { require.NoError(t, err) assert.Empty(t, hash) - ids, err := s.ListIDsFromFullTextQueue(0) + ids, err := s.ListIDsFromFullTextQueue("space1", 0) require.NoError(t, err) assert.Empty(t, ids) }) diff --git a/util/pbtypes/pbtypes.go b/util/pbtypes/pbtypes.go index b5d0a1a7b5..f8487d8ae0 100644 --- a/util/pbtypes/pbtypes.go +++ b/util/pbtypes/pbtypes.go @@ -108,6 +108,21 @@ func GetString(s *types.Struct, name string) string { return "" } +func ExtractString(s []*types.Struct, key string, omitIfFieldNotSet bool) []string { + var res = make([]string, 0, len(s)) + for _, v := range s { + if v == nil || v.Fields == nil { + continue + } + if val, ok := v.Fields[key]; ok { + res = append(res, val.GetStringValue()) + } else if !omitIfFieldNotSet { + res = append(res, "") + } + } + return res +} + func IsEmptyValueOrAbsent(s *types.Struct, name string) bool { if s == nil || s.Fields == nil { return true diff --git a/util/testMock/objectstore_mock.go b/util/testMock/objectstore_mock.go index 1032052c13..e4c6d0dad4 100644 --- a/util/testMock/objectstore_mock.go +++ b/util/testMock/objectstore_mock.go @@ -49,31 +49,35 @@ func (m *MockObjectStore) EXPECT() *MockObjectStoreMockRecorder { } // AddToIndexQueue mocks base method. -func (m *MockObjectStore) AddToIndexQueue(arg0 string) error { +func (m *MockObjectStore) AddToIndexQueue(arg0 ...domain.FullID) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddToIndexQueue", arg0) + varargs := []any{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "AddToIndexQueue", varargs...) ret0, _ := ret[0].(error) return ret0 } // AddToIndexQueue indicates an expected call of AddToIndexQueue. -func (mr *MockObjectStoreMockRecorder) AddToIndexQueue(arg0 any) *gomock.Call { +func (mr *MockObjectStoreMockRecorder) AddToIndexQueue(arg0 ...any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddToIndexQueue", reflect.TypeOf((*MockObjectStore)(nil).AddToIndexQueue), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddToIndexQueue", reflect.TypeOf((*MockObjectStore)(nil).AddToIndexQueue), arg0...) } // BatchProcessFullTextQueue mocks base method. -func (m *MockObjectStore) BatchProcessFullTextQueue(arg0 context.Context, arg1 int, arg2 func([]string) error) error { +func (m *MockObjectStore) BatchProcessFullTextQueue(arg0 context.Context, arg1 []string, arg2 int, arg3 func([]string) error) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BatchProcessFullTextQueue", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "BatchProcessFullTextQueue", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // BatchProcessFullTextQueue indicates an expected call of BatchProcessFullTextQueue. -func (mr *MockObjectStoreMockRecorder) BatchProcessFullTextQueue(arg0, arg1, arg2 any) *gomock.Call { +func (mr *MockObjectStoreMockRecorder) BatchProcessFullTextQueue(arg0, arg1, arg2, arg3 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchProcessFullTextQueue", reflect.TypeOf((*MockObjectStore)(nil).BatchProcessFullTextQueue), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchProcessFullTextQueue", reflect.TypeOf((*MockObjectStore)(nil).BatchProcessFullTextQueue), arg0, arg1, arg2, arg3) } // Close mocks base method. @@ -551,18 +555,18 @@ func (mr *MockObjectStoreMockRecorder) ListAllRelations(arg0 any) *gomock.Call { } // ListIDsFromFullTextQueue mocks base method. -func (m *MockObjectStore) ListIDsFromFullTextQueue(arg0 int) ([]string, error) { +func (m *MockObjectStore) ListIDsFromFullTextQueue(arg0 string, arg1 int) ([]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListIDsFromFullTextQueue", arg0) + ret := m.ctrl.Call(m, "ListIDsFromFullTextQueue", arg0, arg1) ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } // ListIDsFromFullTextQueue indicates an expected call of ListIDsFromFullTextQueue. -func (mr *MockObjectStoreMockRecorder) ListIDsFromFullTextQueue(arg0 any) *gomock.Call { +func (mr *MockObjectStoreMockRecorder) ListIDsFromFullTextQueue(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListIDsFromFullTextQueue", reflect.TypeOf((*MockObjectStore)(nil).ListIDsFromFullTextQueue), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListIDsFromFullTextQueue", reflect.TypeOf((*MockObjectStore)(nil).ListIDsFromFullTextQueue), arg0, arg1) } // ListIds mocks base method.