From 786971149a0fd198d4d3b373b7a62ec69a01e1f8 Mon Sep 17 00:00:00 2001 From: Roman Khafizianov Date: Fri, 4 Oct 2024 16:11:39 +0200 Subject: [PATCH] GO-3861 cleanup --- core/indexer/fulltext.go | 69 +--------- core/indexer/indexer.go | 75 +++++++++-- core/indexer/reindex.go | 204 +----------------------------- core/indexer/reindex_test.go | 169 ------------------------- core/indexer/reindexqueue.go | 203 +++++++++++++++++++++++++++++ core/indexer/reindexqueue_test.go | 172 +++++++++++++++++++++++++ 6 files changed, 447 insertions(+), 445 deletions(-) create mode 100644 core/indexer/reindexqueue.go create mode 100644 core/indexer/reindexqueue_test.go diff --git a/core/indexer/fulltext.go b/core/indexer/fulltext.go index 8931cdc83..b0d8f5928 100644 --- a/core/indexer/fulltext.go +++ b/core/indexer/fulltext.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/gogo/protobuf/types" "golang.org/x/exp/slices" "github.com/anyproto/anytype-heart/core/block/cache" @@ -15,8 +14,6 @@ import ( "github.com/anyproto/anytype-heart/core/block/simple" "github.com/anyproto/anytype-heart/core/block/simple/text" "github.com/anyproto/anytype-heart/core/domain" - "github.com/anyproto/anytype-heart/core/subscription" - "github.com/anyproto/anytype-heart/core/syncstatus/syncsubscriptions" "github.com/anyproto/anytype-heart/metrics" "github.com/anyproto/anytype-heart/pkg/lib/bundle" "github.com/anyproto/anytype-heart/pkg/lib/localstore/ftsearch" @@ -39,72 +36,12 @@ func (i *indexer) ForceFTIndex() { } } -func (i *indexer) getSpaceIdsByPriority() []string { - var ids = make([]string, 0, i.lastSpacesSubscription.Len()) - i.lastSpacesSubscription.Iterate(func(_ string, v *types.Struct) bool { - id := pbtypes.GetString(v, bundle.RelationKeyTargetSpaceId.String()) - if id != "" { - ids = append(ids, id) - } - return true - }) - - log.Warnf("ft space ids priority: %v", ids) - return ids -} - -func (i *indexer) updateSpacesPriority(priority []string) { - techSpaceId := i.techSpaceId.TechSpaceId() - i.lock.Lock() - i.spacesPriority = append([]string{techSpaceId}, slices.DeleteFunc(priority, func(s string) bool { - return s == techSpaceId - })...) - i.lock.Unlock() - - i.spaceReindexQueue.RefreshPriority() -} - -func (i *indexer) subscribeToSpaces() error { - objectReq := subscription.SubscribeRequest{ - SubId: "lastOpenedSpaces", - Internal: true, - NoDepSubscription: true, - Keys: []string{bundle.RelationKeyTargetSpaceId.String(), bundle.RelationKeyLastOpenedDate.String(), bundle.RelationKeyLastModifiedDate.String()}, - Filters: []*model.BlockContentDataviewFilter{ - { - RelationKey: bundle.RelationKeyLayout.String(), - Condition: model.BlockContentDataviewFilter_Equal, - Value: pbtypes.Int64(int64(model.ObjectType_spaceView)), - }, - }, - Sorts: []*model.BlockContentDataviewSort{ - { - RelationKey: bundle.RelationKeyLastOpenedDate.String(), - Type: model.BlockContentDataviewSort_Desc, - IncludeTime: true, - Format: model.RelationFormat_date, - EmptyPlacement: model.BlockContentDataviewSort_End, - }, - }, - } - i.lastSpacesSubscription = syncsubscriptions.NewSubscription(i.subscriptionService, objectReq) - return i.lastSpacesSubscription.Run(i.lastSpacesSubscriptionUpdateChan) -} - -func (i *indexer) getIterator() func(id string, data struct{}) bool { - var ids []string - return func(id string, _ struct{}) bool { - ids = append(ids, id) - return true - } -} - // ftLoop runs full-text indexer // MUST NOT be called more than once func (i *indexer) ftLoopRoutine() { ticker := time.NewTicker(ftIndexInterval) - i.runFullTextIndexer(i.componentCtx, i.getSpaceIdsByPriority()) + i.runFullTextIndexer(i.componentCtx, i.getSpacesPriority()) defer close(i.ftQueueFinished) var lastForceIndex time.Time for { @@ -112,10 +49,10 @@ func (i *indexer) ftLoopRoutine() { case <-i.componentCtx.Done(): return case <-ticker.C: - i.runFullTextIndexer(i.componentCtx, i.getSpaceIdsByPriority()) + i.runFullTextIndexer(i.componentCtx, i.getSpacesPriority()) case <-i.forceFt: if time.Since(lastForceIndex) > ftIndexForceMinInterval { - i.runFullTextIndexer(i.componentCtx, i.getSpaceIdsByPriority()) + i.runFullTextIndexer(i.componentCtx, i.getSpacesPriority()) lastForceIndex = time.Now() } } diff --git a/core/indexer/indexer.go b/core/indexer/indexer.go index 334e25345..57c7e5c04 100644 --- a/core/indexer/indexer.go +++ b/core/indexer/indexer.go @@ -81,14 +81,13 @@ type indexer struct { btHash Hasher forceFt chan struct{} - mb *mb.MB[clientspace.Space] - lastSpacesSubscription *syncsubscriptions.ObjectSubscription[*types.Struct] - lastSpacesSubscriptionUpdateChan chan []*types.Struct - lock sync.Mutex - reindexLogFields []zap.Field - techSpaceId techSpaceIdGetter - spacesPriority []string - spaceReindexQueue *taskmanager.TasksManager + mb *mb.MB[clientspace.Space] + lastSpacesSubscription *syncsubscriptions.ObjectSubscription[*types.Struct] + lock sync.Mutex + reindexLogFields []zap.Field + techSpaceId techSpaceIdGetter + spacesPriority []string + spaceReindexQueue *taskmanager.TasksManager } func (i *indexer) Init(a *app.App) (err error) { @@ -104,9 +103,8 @@ func (i *indexer) Init(a *app.App) (err error) { i.config = app.MustComponent[*config.Config](a) i.subscriptionService = app.MustComponent[subscription.Service](a) i.mb = mb.New[clientspace.Space](0) - i.spaceReindexQueue = taskmanager.NewTasksManager(1, i.taskPrioritySorter) + i.spaceReindexQueue = taskmanager.NewTasksManager(1, i.reindexTasksSorter) i.techSpaceId = app.MustComponent[techSpaceIdGetter](a) - i.lastSpacesSubscriptionUpdateChan = make(chan []*types.Struct) i.componentCtx, i.componentCancel = context.WithCancel(context.Background()) return } @@ -116,12 +114,11 @@ func (i *indexer) Name() (name string) { } func (i *indexer) Run(context.Context) (err error) { - i.runReindexerQueue() - err = i.subscribeToSpaces() if err != nil { return } + go i.spaceReindexQueue.Run(i.componentCtx) return i.StartFullTextIndex() } @@ -276,3 +273,57 @@ func headsHash(heads []string) string { sum := sha256.Sum256([]byte(strings.Join(heads, ","))) return fmt.Sprintf("%x", sum) } + +// subscribeToSpaces subscribes to the lastOpenedSpaces subscription +// it used by fulltext and reindexing to prioritize most recent spaces +func (i *indexer) subscribeToSpaces() error { + objectReq := subscription.SubscribeRequest{ + SubId: "lastOpenedSpaces", + Internal: true, + NoDepSubscription: true, + Keys: []string{bundle.RelationKeyTargetSpaceId.String(), bundle.RelationKeyLastOpenedDate.String(), bundle.RelationKeyLastModifiedDate.String()}, + Filters: []*model.BlockContentDataviewFilter{ + { + RelationKey: bundle.RelationKeyLayout.String(), + Condition: model.BlockContentDataviewFilter_Equal, + Value: pbtypes.Int64(int64(model.ObjectType_spaceView)), + }, + }, + Sorts: []*model.BlockContentDataviewSort{ + { + RelationKey: bundle.RelationKeyLastOpenedDate.String(), + Type: model.BlockContentDataviewSort_Desc, + IncludeTime: true, + Format: model.RelationFormat_date, + EmptyPlacement: model.BlockContentDataviewSort_End, + }, + }, + } + updatesChan := make(chan []*types.Struct) + go i.spacesSubscriptionChanWatcher(updatesChan) + i.lastSpacesSubscription = syncsubscriptions.NewSubscription(i.subscriptionService, objectReq) + return i.lastSpacesSubscription.Run(updatesChan) +} + +func (i *indexer) spacesSubscriptionSetPriority(priority []string) { + techSpaceId := i.techSpaceId.TechSpaceId() + i.lock.Lock() + i.spacesPriority = append([]string{techSpaceId}, slices.DeleteFunc(priority, func(s string) bool { + return s == techSpaceId + })...) + i.lock.Unlock() + + // notify the queue about the priority change + i.spaceReindexQueue.RefreshPriority() +} + +func (i *indexer) spacesSubscriptionChanWatcher(ch chan []*types.Struct) { + for { + select { + case <-i.componentCtx.Done(): + return + case records := <-ch: + i.spacesSubscriptionSetPriority(pbtypes.ExtractString(records, bundle.RelationKeyTargetSpaceId.String(), true)) + } + } +} diff --git a/core/indexer/reindex.go b/core/indexer/reindex.go index 078ccb15e..b34583cb6 100644 --- a/core/indexer/reindex.go +++ b/core/indexer/reindex.go @@ -4,8 +4,6 @@ import ( "context" "errors" "fmt" - "sort" - "strconv" "strings" "time" @@ -14,7 +12,6 @@ import ( "github.com/globalsign/mgo/bson" "github.com/gogo/protobuf/types" "go.uber.org/zap" - "golang.org/x/exp/slices" "github.com/anyproto/anytype-heart/core/block/editor/smartblock" "github.com/anyproto/anytype-heart/core/domain" @@ -24,11 +21,9 @@ import ( coresb "github.com/anyproto/anytype-heart/pkg/lib/core/smartblock" "github.com/anyproto/anytype-heart/pkg/lib/database" "github.com/anyproto/anytype-heart/pkg/lib/localstore/addr" - "github.com/anyproto/anytype-heart/pkg/lib/localstore/objectstore" "github.com/anyproto/anytype-heart/pkg/lib/pb/model" "github.com/anyproto/anytype-heart/space/clientspace" "github.com/anyproto/anytype-heart/util/pbtypes" - "github.com/anyproto/anytype-heart/util/taskmanager" ) const ( @@ -55,22 +50,6 @@ const ( ForceMarketplaceReindex int32 = 1 ) -const ( - reindexTimeoutFirstAttempt = time.Second * 10 // next attempts will be increased by 2 times - taskRetrySeparator = "#" -) - -type reindexTaskId string - -func (t reindexTaskId) Parse() (spaceId string, try int) { - s := strings.Split(string(t), taskRetrySeparator) - if len(s) == 1 { - return s[0], 0 - } - retry, _ := strconv.ParseInt(s[1], 10, 64) - return s[0], int(retry) -} - func (i *indexer) buildFlags(spaceID string) (reindexFlags, error) { checksums, err := i.store.GetChecksums(spaceID) if err != nil && !errors.Is(err, anystore.ErrDocNotFound) { @@ -189,12 +168,15 @@ func (i *indexer) ReindexSpace(space clientspace.Space) (err error) { return err } err = i.store.DeleteLastIndexedHeadHash(ids...) + if err != nil { + return fmt.Errorf("delete last indexed head hash: %w", err) + } } - - // add the task to recheck all the stored objects indexed heads and reindex if outdated - i.addReindexOutdatedTask(i.componentCtx, space) } + // add the task to recheck all the stored objects indexed heads and reindex if outdated + i.reindexAddSpaceTask(space) + if flags.deletedObjects { err = i.reindexDeletedObjects(space) if err != nil { @@ -566,177 +548,3 @@ func (i *indexer) RemoveIndexes(spaceId string) error { flags.enableAll() return i.removeCommonIndexes(spaceId, nil, flags) } - -func (i *indexer) runReindexerQueue() { - go func() { - for { - select { - case <-i.componentCtx.Done(): - case priority := <-i.lastSpacesSubscriptionUpdateChan: - i.updateSpacesPriority(pbtypes.ExtractString(priority, bundle.RelationKeyTargetSpaceId.String(), true)) - } - } - }() - go i.spaceReindexQueue.Run(i.componentCtx) -} - -// addReindexOutdatedTask reindex all objects in the space that have outdated head hashes -func (i *indexer) addReindexOutdatedTask(ctx context.Context, space clientspace.Space) { - task := i.newReIndexTask(space, 0) - i.spaceReindexQueue.AddTask(task) - go func() { - for { - result, err := task.WaitResult(ctx) - l := log.With("hadTimeouts", task.hadTimeouts).With("spaceId", space.Id()).With("tryNumber", task.tryNumber).With("total", task.total).With("invalidated", task.invalidated).With("succeed", task.success) - if err != nil { - l.Error("reindex failed", zap.Error(err)) - break - } else { - l = l.With("spentWorkMs", int(result.WorkTime.Milliseconds())).With("spentTotalMs", int(result.FinishTime.Sub(result.StartTime).Milliseconds())) - if task.invalidated-task.success > 0 { - l.Warn("reindex finished not fully") - if task.hadTimeouts { - // reschedule timeouted space task - // it will be executed after all tasks with previous tryNumber are finished - task = i.newReIndexTask(space, task.tryNumber+1) - i.spaceReindexQueue.AddTask(task) - } else { - break - } - } else { - if task.total > 0 { - l.Warn("reindex finished") - } - break - } - } - } - }() -} - -func (i *indexer) newReIndexTask(space clientspace.Space, tryNumber int) *reindexTask { - taskId := fmt.Sprintf("%s%s%d", space.Id(), taskRetrySeparator, tryNumber) - return &reindexTask{ - TaskBase: taskmanager.NewTaskBase(taskId), - space: space, - store: i.store, - indexer: i, - tryNumber: tryNumber, - } -} - -type reindexTask struct { - taskmanager.TaskBase - space clientspace.Space - store objectstore.ObjectStore - indexer *indexer - total int - invalidated int - success int - tryNumber int - hadTimeouts bool -} - -func (t *reindexTask) Timeout() time.Duration { - return reindexTimeoutFirstAttempt * time.Duration(1< 0 { + l.Warn("reindex finished not fully") + if task.hadTimeouts { + // reschedule timeouted space task + // it will be executed after all tasks with previous tryNumber are finished + task = i.reindexNewTask(task.space, task.tryNumber+1) + i.spaceReindexQueue.AddTask(task) + } else { + break + } + } else { + if task.total > 0 { + l.Warn("reindex finished") + } + break + } + } + } +} + +func (i *indexer) reindexNewTask(space clientspace.Space, tryNumber int) *reindexTask { + taskId := fmt.Sprintf("%s%s%d", space.Id(), taskRetrySeparator, tryNumber) + return &reindexTask{ + TaskBase: taskmanager.NewTaskBase(taskId), + space: space, + store: i.store, + indexer: i, + tryNumber: tryNumber, + } +} + +// taskPrioritySorter sort taskIds +// - first by the number of the try (0, 1, 2, ...) +// - then by the space priority. if space priority is not set for the space, it put to the end +func (i *indexer) reindexTasksSorter(taskIds []string) []string { + priority := i.getSpacesPriority() + // Sort the filtered task IDs based on retry attempts and space priority + sort.Slice(taskIds, func(a, b int) bool { + spaceA, tryA := reindexTaskId(taskIds[a]).Parse() + spaceB, tryB := reindexTaskId(taskIds[b]).Parse() + + // First, sort by retry attempts (lower retries have higher priority) + if tryA != tryB { + return tryA < tryB + } + + // Then, sort by the index in spacesPriority (earlier spaces have higher priority) + indexA := slices.Index(priority, spaceA) + indexB := slices.Index(priority, spaceB) + + if indexA == -1 && indexB == -1 { + // to make it stable + return spaceA < spaceB + } + if indexA == -1 { + return false + } + if indexB == -1 { + return true + } + + return indexA < indexB + }) + return taskIds +} + +type reindexTask struct { + taskmanager.TaskBase + space clientspace.Space + store objectstore.ObjectStore + indexer *indexer + total int + invalidated int + success int + tryNumber int + hadTimeouts bool +} + +func (t *reindexTask) Timeout() time.Duration { + return reindexTimeoutFirstAttempt * time.Duration(1<